memories.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. from pydantic import BaseModel
  2. from peewee import *
  3. from playhouse.shortcuts import model_to_dict
  4. from typing import List, Union, Optional
  5. from apps.web.internal.db import DB
  6. from apps.web.models.chats import Chats
  7. import time
  8. import uuid
  9. ####################
  10. # Memory DB Schema
  11. ####################
  12. class Memory(Model):
  13. id = CharField(unique=True)
  14. user_id = CharField()
  15. content = TextField()
  16. updated_at = BigIntegerField()
  17. created_at = BigIntegerField()
  18. class Meta:
  19. database = DB
  20. class MemoryModel(BaseModel):
  21. id: str
  22. user_id: str
  23. content: str
  24. updated_at: int # timestamp in epoch
  25. created_at: int # timestamp in epoch
  26. ####################
  27. # Forms
  28. ####################
  29. class MemoriesTable:
  30. def __init__(self, db):
  31. self.db = db
  32. self.db.create_tables([Memory])
  33. def insert_new_memory(
  34. self,
  35. user_id: str,
  36. content: str,
  37. ) -> Optional[MemoryModel]:
  38. id = str(uuid.uuid4())
  39. memory = MemoryModel(
  40. **{
  41. "id": id,
  42. "user_id": user_id,
  43. "content": content,
  44. "created_at": int(time.time()),
  45. "updated_at": int(time.time()),
  46. }
  47. )
  48. result = Memory.create(**memory.model_dump())
  49. if result:
  50. return memory
  51. else:
  52. return None
  53. def get_memories(self) -> List[MemoryModel]:
  54. try:
  55. memories = Memory.select()
  56. return [MemoryModel(**model_to_dict(memory)) for memory in memories]
  57. except:
  58. return None
  59. def get_memories_by_user_id(self, user_id: str) -> List[MemoryModel]:
  60. try:
  61. memories = Memory.select().where(Memory.user_id == user_id)
  62. return [MemoryModel(**model_to_dict(memory)) for memory in memories]
  63. except:
  64. return None
  65. def get_memory_by_id(self, id) -> Optional[MemoryModel]:
  66. try:
  67. memory = Memory.get(Memory.id == id)
  68. return MemoryModel(**model_to_dict(memory))
  69. except:
  70. return None
  71. def delete_memory_by_id(self, id: str) -> bool:
  72. try:
  73. query = Memory.delete().where(Memory.id == id)
  74. query.execute() # Remove the rows, return number of rows removed.
  75. return True
  76. except:
  77. return False
  78. def delete_memories_by_user_id(self, user_id: str) -> bool:
  79. try:
  80. query = Memory.delete().where(Memory.user_id == user_id)
  81. query.execute()
  82. return True
  83. except:
  84. return False
  85. def delete_memory_by_id_and_user_id(self, id: str, user_id: str) -> bool:
  86. try:
  87. query = Memory.delete().where(Memory.id == id, Memory.user_id == user_id)
  88. query.execute()
  89. return True
  90. except:
  91. return False
  92. Memories = MemoriesTable(DB)