memories.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. from pydantic import BaseModel
  2. from peewee import *
  3. from playhouse.shortcuts import model_to_dict
  4. from typing import List, Union, Optional
  5. from apps.webui.internal.db import DB
  6. from apps.webui.models.chats import Chats
  7. import time
  8. import uuid
  9. ####################
  10. # Memory DB Schema
  11. ####################
  12. class Memory(Model):
  13. id = CharField(unique=True)
  14. user_id = CharField()
  15. content = TextField()
  16. updated_at = BigIntegerField()
  17. created_at = BigIntegerField()
  18. class Meta:
  19. database = DB
  20. class MemoryModel(BaseModel):
  21. id: str
  22. user_id: str
  23. content: str
  24. updated_at: int # timestamp in epoch
  25. created_at: int # timestamp in epoch
  26. ####################
  27. # Forms
  28. ####################
  29. class MemoriesTable:
  30. def __init__(self, db):
  31. self.db = db
  32. self.db.create_tables([Memory])
  33. def insert_new_memory(
  34. self,
  35. user_id: str,
  36. content: str,
  37. ) -> Optional[MemoryModel]:
  38. id = str(uuid.uuid4())
  39. memory = MemoryModel(
  40. **{
  41. "id": id,
  42. "user_id": user_id,
  43. "content": content,
  44. "created_at": int(time.time()),
  45. "updated_at": int(time.time()),
  46. }
  47. )
  48. result = Memory.create(**memory.model_dump())
  49. if result:
  50. return memory
  51. else:
  52. return None
  53. def update_memory_by_id(
  54. self,
  55. id: str,
  56. content: str,
  57. ) -> Optional[MemoryModel]:
  58. try:
  59. memory = Memory.get(Memory.id == id)
  60. memory.content = content
  61. memory.updated_at = int(time.time())
  62. memory.save()
  63. return MemoryModel(**model_to_dict(memory))
  64. except:
  65. return None
  66. def get_memories(self) -> List[MemoryModel]:
  67. try:
  68. memories = Memory.select()
  69. return [MemoryModel(**model_to_dict(memory)) for memory in memories]
  70. except:
  71. return None
  72. def get_memories_by_user_id(self, user_id: str) -> List[MemoryModel]:
  73. try:
  74. memories = Memory.select().where(Memory.user_id == user_id)
  75. return [MemoryModel(**model_to_dict(memory)) for memory in memories]
  76. except:
  77. return None
  78. def get_memory_by_id(self, id) -> Optional[MemoryModel]:
  79. try:
  80. memory = Memory.get(Memory.id == id)
  81. return MemoryModel(**model_to_dict(memory))
  82. except:
  83. return None
  84. def delete_memory_by_id(self, id: str) -> bool:
  85. try:
  86. query = Memory.delete().where(Memory.id == id)
  87. query.execute() # Remove the rows, return number of rows removed.
  88. return True
  89. except:
  90. return False
  91. def delete_memories_by_user_id(self, user_id: str) -> bool:
  92. try:
  93. query = Memory.delete().where(Memory.user_id == user_id)
  94. query.execute()
  95. return True
  96. except:
  97. return False
  98. def delete_memory_by_id_and_user_id(self, id: str, user_id: str) -> bool:
  99. try:
  100. query = Memory.delete().where(Memory.id == id, Memory.user_id == user_id)
  101. query.execute()
  102. return True
  103. except:
  104. return False
  105. Memories = MemoriesTable(DB)