chats.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. from pydantic import BaseModel
  2. from typing import List, Union, Optional
  3. from peewee import *
  4. from playhouse.shortcuts import model_to_dict
  5. import json
  6. import uuid
  7. import time
  8. from apps.web.internal.db import DB
  9. ####################
  10. # Chat DB Schema
  11. ####################
  12. class Chat(Model):
  13. id = CharField(unique=True)
  14. user_id = CharField()
  15. title = CharField()
  16. chat = TextField() # Save Chat JSON as Text
  17. timestamp = DateField()
  18. class Meta:
  19. database = DB
  20. class ChatModel(BaseModel):
  21. id: str
  22. user_id: str
  23. title: str
  24. chat: str
  25. timestamp: int # timestamp in epoch
  26. ####################
  27. # Forms
  28. ####################
  29. class ChatForm(BaseModel):
  30. chat: dict
  31. class ChatUpdateForm(ChatForm):
  32. id: str
  33. class ChatTitleIdResponse(BaseModel):
  34. id: str
  35. title: str
  36. class ChatTable:
  37. def __init__(self, db):
  38. self.db = db
  39. db.create_tables([Chat])
  40. def insert_new_chat(self, user_id: str, form_data: ChatForm) -> Optional[ChatModel]:
  41. id = str(uuid.uuid4())
  42. chat = ChatModel(
  43. **{
  44. "id": id,
  45. "user_id": user_id,
  46. "title": form_data.chat["title"]
  47. if "title" in form_data.chat
  48. else "New Chat",
  49. "chat": json.dumps(form_data.chat),
  50. "timestamp": int(time.time()),
  51. }
  52. )
  53. result = Chat.create(**chat.model_dump())
  54. return chat if result else None
  55. def update_chat_by_id(self, id: str, chat: dict) -> Optional[ChatModel]:
  56. try:
  57. query = Chat.update(chat=json.dumps(chat)).where(Chat.id == id)
  58. query.execute()
  59. chat = Chat.get(Chat.id == id)
  60. return ChatModel(**model_to_dict(chat))
  61. except:
  62. return None
  63. def get_chat_lists_by_user_id(
  64. self, user_id: str, skip: int = 0, limit: int = 50
  65. ) -> List[ChatModel]:
  66. return [
  67. ChatModel(**model_to_dict(chat))
  68. for chat in Chat.select()
  69. .where(Chat.user_id == user_id)
  70. .limit(limit)
  71. .offset(skip)
  72. ]
  73. def get_chat_by_id_and_user_id(self, id: str, user_id: str) -> Optional[ChatModel]:
  74. try:
  75. chat = Chat.get(Chat.id == id, Chat.user_id == user_id)
  76. return ChatModel(**model_to_dict(chat))
  77. except:
  78. return None
  79. def get_chats(self, skip: int = 0, limit: int = 50) -> List[ChatModel]:
  80. return [
  81. ChatModel(**model_to_dict(chat))
  82. for chat in Chat.select().limit(limit).offset(skip)
  83. ]
  84. Chats = ChatTable(DB)