chats.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. from pydantic import BaseModel
  2. from typing import List, Union, Optional
  3. from peewee import *
  4. from playhouse.shortcuts import model_to_dict
  5. import json
  6. import uuid
  7. import time
  8. from apps.web.internal.db import DB
  9. ####################
  10. # Chat DB Schema
  11. ####################
  12. class Chat(Model):
  13. id = CharField(unique=True)
  14. user_id = CharField()
  15. title = CharField()
  16. chat = TextField() # Save Chat JSON as Text
  17. timestamp = DateField()
  18. class Meta:
  19. database = DB
  20. class ChatModel(BaseModel):
  21. id: str
  22. user_id: str
  23. title: str
  24. chat: str
  25. timestamp: int # timestamp in epoch
  26. ####################
  27. # Forms
  28. ####################
  29. class ChatForm(BaseModel):
  30. chat: dict
  31. class ChatTitleForm(BaseModel):
  32. title: str
  33. class ChatResponse(BaseModel):
  34. id: str
  35. user_id: str
  36. title: str
  37. chat: dict
  38. timestamp: int # timestamp in epoch
  39. class ChatTitleIdResponse(BaseModel):
  40. id: str
  41. title: str
  42. class ChatTable:
  43. def __init__(self, db):
  44. self.db = db
  45. db.create_tables([Chat])
  46. def insert_new_chat(self, user_id: str, form_data: ChatForm) -> Optional[ChatModel]:
  47. id = str(uuid.uuid4())
  48. chat = ChatModel(
  49. **{
  50. "id": id,
  51. "user_id": user_id,
  52. "title": (
  53. form_data.chat["title"] if "title" in form_data.chat else "New Chat"
  54. ),
  55. "chat": json.dumps(form_data.chat),
  56. "timestamp": int(time.time()),
  57. }
  58. )
  59. result = Chat.create(**chat.model_dump())
  60. return chat if result else None
  61. def update_chat_by_id(self, id: str, chat: dict) -> Optional[ChatModel]:
  62. try:
  63. query = Chat.update(
  64. chat=json.dumps(chat),
  65. title=chat["title"] if "title" in chat else "New Chat",
  66. timestamp=int(time.time()),
  67. ).where(Chat.id == id)
  68. query.execute()
  69. chat = Chat.get(Chat.id == id)
  70. return ChatModel(**model_to_dict(chat))
  71. except:
  72. return None
  73. def get_chat_lists_by_user_id(
  74. self, user_id: str, skip: int = 0, limit: int = 50
  75. ) -> List[ChatModel]:
  76. return [
  77. ChatModel(**model_to_dict(chat))
  78. for chat in Chat.select()
  79. .where(Chat.user_id == user_id)
  80. .order_by(Chat.timestamp.desc())
  81. # .limit(limit)
  82. # .offset(skip)
  83. ]
  84. def get_chat_lists_by_chat_ids(
  85. self, chat_ids: List[str], skip: int = 0, limit: int = 50
  86. ) -> List[ChatModel]:
  87. return [
  88. ChatModel(**model_to_dict(chat))
  89. for chat in Chat.select()
  90. .where(Chat.id.in_(chat_ids))
  91. .order_by(Chat.timestamp.desc())
  92. ]
  93. def get_all_chats(self) -> List[ChatModel]:
  94. return [
  95. ChatModel(**model_to_dict(chat))
  96. for chat in Chat.select().order_by(Chat.timestamp.desc())
  97. ]
  98. def get_all_chats_by_user_id(self, user_id: str) -> List[ChatModel]:
  99. return [
  100. ChatModel(**model_to_dict(chat))
  101. for chat in Chat.select()
  102. .where(Chat.user_id == user_id)
  103. .order_by(Chat.timestamp.desc())
  104. ]
  105. def get_chat_by_id_and_user_id(self, id: str, user_id: str) -> Optional[ChatModel]:
  106. try:
  107. chat = Chat.get(Chat.id == id, Chat.user_id == user_id)
  108. return ChatModel(**model_to_dict(chat))
  109. except:
  110. return None
  111. def get_chats(self, skip: int = 0, limit: int = 50) -> List[ChatModel]:
  112. return [
  113. ChatModel(**model_to_dict(chat))
  114. for chat in Chat.select().limit(limit).offset(skip)
  115. ]
  116. def delete_chat_by_id_and_user_id(self, id: str, user_id: str) -> bool:
  117. try:
  118. query = Chat.delete().where((Chat.id == id) & (Chat.user_id == user_id))
  119. query.execute() # Remove the rows, return number of rows removed.
  120. return True
  121. except:
  122. return False
  123. def delete_chats_by_user_id(self, user_id: str) -> bool:
  124. try:
  125. query = Chat.delete().where(Chat.user_id == user_id)
  126. query.execute() # Remove the rows, return number of rows removed.
  127. return True
  128. except:
  129. return False
  130. Chats = ChatTable(DB)