123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318 |
- from pydantic import BaseModel
- from typing import List, Union, Optional
- from peewee import *
- from playhouse.shortcuts import model_to_dict
- import json
- import uuid
- import time
- from apps.web.internal.db import DB
- ####################
- # Chat DB Schema
- ####################
- class Chat(Model):
- id = CharField(unique=True)
- user_id = CharField()
- title = TextField()
- chat = TextField() # Save Chat JSON as Text
- created_at = BigIntegerField()
- updated_at = BigIntegerField()
- share_id = CharField(null=True, unique=True)
- archived = BooleanField(default=False)
- class Meta:
- database = DB
- class ChatModel(BaseModel):
- id: str
- user_id: str
- title: str
- chat: str
- created_at: int # timestamp in epoch
- updated_at: int # timestamp in epoch
- share_id: Optional[str] = None
- archived: bool = False
- ####################
- # Forms
- ####################
- class ChatForm(BaseModel):
- chat: dict
- class ChatTitleForm(BaseModel):
- title: str
- class ChatResponse(BaseModel):
- id: str
- user_id: str
- title: str
- chat: dict
- updated_at: int # timestamp in epoch
- created_at: int # timestamp in epoch
- share_id: Optional[str] = None # id of the chat to be shared
- archived: bool
- class ChatTitleIdResponse(BaseModel):
- id: str
- title: str
- updated_at: int
- created_at: int
- class ChatTable:
- def __init__(self, db):
- self.db = db
- db.create_tables([Chat])
- def insert_new_chat(self, user_id: str, form_data: ChatForm) -> Optional[ChatModel]:
- id = str(uuid.uuid4())
- chat = ChatModel(
- **{
- "id": id,
- "user_id": user_id,
- "title": (
- form_data.chat["title"] if "title" in form_data.chat else "New Chat"
- ),
- "chat": json.dumps(form_data.chat),
- "created_at": int(time.time()),
- "updated_at": int(time.time()),
- }
- )
- result = Chat.create(**chat.model_dump())
- return chat if result else None
- def update_chat_by_id(self, id: str, chat: dict) -> Optional[ChatModel]:
- try:
- query = Chat.update(
- chat=json.dumps(chat),
- title=chat["title"] if "title" in chat else "New Chat",
- updated_at=int(time.time()),
- ).where(Chat.id == id)
- query.execute()
- chat = Chat.get(Chat.id == id)
- return ChatModel(**model_to_dict(chat))
- except:
- return None
- def insert_shared_chat_by_chat_id(self, chat_id: str) -> Optional[ChatModel]:
- # Get the existing chat to share
- chat = Chat.get(Chat.id == chat_id)
- # Check if the chat is already shared
- if chat.share_id:
- return self.get_chat_by_id_and_user_id(chat.share_id, "shared")
- # Create a new chat with the same data, but with a new ID
- shared_chat = ChatModel(
- **{
- "id": str(uuid.uuid4()),
- "user_id": f"shared-{chat_id}",
- "title": chat.title,
- "chat": chat.chat,
- "created_at": chat.created_at,
- "updated_at": int(time.time()),
- }
- )
- shared_result = Chat.create(**shared_chat.model_dump())
- # Update the original chat with the share_id
- result = (
- Chat.update(share_id=shared_chat.id).where(Chat.id == chat_id).execute()
- )
- return shared_chat if (shared_result and result) else None
- def update_shared_chat_by_chat_id(self, chat_id: str) -> Optional[ChatModel]:
- try:
- print("update_shared_chat_by_id")
- chat = Chat.get(Chat.id == chat_id)
- print(chat)
- query = Chat.update(
- title=chat.title,
- chat=chat.chat,
- ).where(Chat.id == chat.share_id)
- query.execute()
- chat = Chat.get(Chat.id == chat.share_id)
- return ChatModel(**model_to_dict(chat))
- except:
- return None
- def delete_shared_chat_by_chat_id(self, chat_id: str) -> bool:
- try:
- query = Chat.delete().where(Chat.user_id == f"shared-{chat_id}")
- query.execute() # Remove the rows, return number of rows removed.
- return True
- except:
- return False
- def update_chat_share_id_by_id(
- self, id: str, share_id: Optional[str]
- ) -> Optional[ChatModel]:
- try:
- query = Chat.update(
- share_id=share_id,
- ).where(Chat.id == id)
- query.execute()
- chat = Chat.get(Chat.id == id)
- return ChatModel(**model_to_dict(chat))
- except:
- return None
- def toggle_chat_archive_by_id(self, id: str) -> Optional[ChatModel]:
- try:
- chat = self.get_chat_by_id(id)
- query = Chat.update(
- archived=(not chat.archived),
- ).where(Chat.id == id)
- query.execute()
- chat = Chat.get(Chat.id == id)
- return ChatModel(**model_to_dict(chat))
- except:
- return None
- def get_archived_chat_list_by_user_id(
- self, user_id: str, skip: int = 0, limit: int = 50
- ) -> List[ChatModel]:
- return [
- ChatModel(**model_to_dict(chat))
- for chat in Chat.select()
- .where(Chat.archived == True)
- .where(Chat.user_id == user_id)
- .order_by(Chat.updated_at.desc())
- # .limit(limit)
- # .offset(skip)
- ]
- def get_chat_list_by_user_id(
- self, user_id: str, skip: int = 0, limit: int = 50
- ) -> List[ChatModel]:
- return [
- ChatModel(**model_to_dict(chat))
- for chat in Chat.select()
- .where(Chat.archived == False)
- .where(Chat.user_id == user_id)
- .order_by(Chat.updated_at.desc())
- # .limit(limit)
- # .offset(skip)
- ]
- def get_chat_list_by_chat_ids(
- self, chat_ids: List[str], skip: int = 0, limit: int = 50
- ) -> List[ChatModel]:
- return [
- ChatModel(**model_to_dict(chat))
- for chat in Chat.select()
- .where(Chat.archived == False)
- .where(Chat.id.in_(chat_ids))
- .order_by(Chat.updated_at.desc())
- ]
- def get_chat_by_id(self, id: str) -> Optional[ChatModel]:
- try:
- chat = Chat.get(Chat.id == id)
- return ChatModel(**model_to_dict(chat))
- except:
- return None
- def get_chat_by_share_id(self, id: str) -> Optional[ChatModel]:
- try:
- chat = Chat.get(Chat.share_id == id)
- if chat:
- chat = Chat.get(Chat.id == id)
- return ChatModel(**model_to_dict(chat))
- else:
- return None
- except:
- return None
- def get_chat_by_id_and_user_id(self, id: str, user_id: str) -> Optional[ChatModel]:
- try:
- chat = Chat.get(Chat.id == id, Chat.user_id == user_id)
- return ChatModel(**model_to_dict(chat))
- except:
- return None
- def get_chats(self, skip: int = 0, limit: int = 50) -> List[ChatModel]:
- return [
- ChatModel(**model_to_dict(chat))
- for chat in Chat.select().order_by(Chat.updated_at.desc())
- # .limit(limit).offset(skip)
- ]
- def get_chats_by_user_id(self, user_id: str) -> List[ChatModel]:
- return [
- ChatModel(**model_to_dict(chat))
- for chat in Chat.select()
- .where(Chat.user_id == user_id)
- .order_by(Chat.updated_at.desc())
- # .limit(limit).offset(skip)
- ]
- def delete_chat_by_id(self, id: str) -> bool:
- try:
- query = Chat.delete().where((Chat.id == id))
- query.execute() # Remove the rows, return number of rows removed.
- return True and self.delete_shared_chat_by_chat_id(id)
- except:
- return False
- def delete_chat_by_id_and_user_id(self, id: str, user_id: str) -> bool:
- try:
- query = Chat.delete().where((Chat.id == id) & (Chat.user_id == user_id))
- query.execute() # Remove the rows, return number of rows removed.
- return True and self.delete_shared_chat_by_chat_id(id)
- except:
- return False
- def delete_chats_by_user_id(self, user_id: str) -> bool:
- try:
- self.delete_shared_chats_by_user_id(user_id)
- query = Chat.delete().where(Chat.user_id == user_id)
- query.execute() # Remove the rows, return number of rows removed.
- return True
- except:
- return False
- def delete_shared_chats_by_user_id(self, user_id: str) -> bool:
- try:
- shared_chat_ids = [
- f"shared-{chat.id}"
- for chat in Chat.select().where(Chat.user_id == user_id)
- ]
- query = Chat.delete().where(Chat.user_id << shared_chat_ids)
- query.execute() # Remove the rows, return number of rows removed.
- return True
- except:
- return False
- Chats = ChatTable(DB)
|