documents.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. from pydantic import BaseModel, ConfigDict
  2. from typing import List, Optional
  3. import time
  4. import logging
  5. from sqlalchemy import String, Column, BigInteger
  6. from sqlalchemy.orm import Session
  7. from apps.webui.internal.db import Base, get_session
  8. import json
  9. from config import SRC_LOG_LEVELS
  10. log = logging.getLogger(__name__)
  11. log.setLevel(SRC_LOG_LEVELS["MODELS"])
  12. ####################
  13. # Documents DB Schema
  14. ####################
  15. class Document(Base):
  16. __tablename__ = "document"
  17. collection_name = Column(String, primary_key=True)
  18. name = Column(String, unique=True)
  19. title = Column(String)
  20. filename = Column(String)
  21. content = Column(String, nullable=True)
  22. user_id = Column(String)
  23. timestamp = Column(BigInteger)
  24. class DocumentModel(BaseModel):
  25. model_config = ConfigDict(from_attributes=True)
  26. collection_name: str
  27. name: str
  28. title: str
  29. filename: str
  30. content: Optional[str] = None
  31. user_id: str
  32. timestamp: int # timestamp in epoch
  33. ####################
  34. # Forms
  35. ####################
  36. class DocumentResponse(BaseModel):
  37. collection_name: str
  38. name: str
  39. title: str
  40. filename: str
  41. content: Optional[dict] = None
  42. user_id: str
  43. timestamp: int # timestamp in epoch
  44. class DocumentUpdateForm(BaseModel):
  45. name: str
  46. title: str
  47. class DocumentForm(DocumentUpdateForm):
  48. collection_name: str
  49. filename: str
  50. content: Optional[str] = None
  51. class DocumentsTable:
  52. def insert_new_doc(
  53. self, user_id: str, form_data: DocumentForm
  54. ) -> Optional[DocumentModel]:
  55. document = DocumentModel(
  56. **{
  57. **form_data.model_dump(),
  58. "user_id": user_id,
  59. "timestamp": int(time.time()),
  60. }
  61. )
  62. try:
  63. with get_session() as db:
  64. result = Document(**document.model_dump())
  65. db.add(result)
  66. db.commit()
  67. db.refresh(result)
  68. if result:
  69. return DocumentModel.model_validate(result)
  70. else:
  71. return None
  72. except:
  73. return None
  74. def get_doc_by_name(self, name: str) -> Optional[DocumentModel]:
  75. try:
  76. with get_session() as db:
  77. document = db.query(Document).filter_by(name=name).first()
  78. return DocumentModel.model_validate(document) if document else None
  79. except:
  80. return None
  81. def get_docs(self) -> List[DocumentModel]:
  82. with get_session() as db:
  83. return [DocumentModel.model_validate(doc) for doc in db.query(Document).all()]
  84. def update_doc_by_name(
  85. self, name: str, form_data: DocumentUpdateForm
  86. ) -> Optional[DocumentModel]:
  87. try:
  88. with get_session() as db:
  89. db.query(Document).filter_by(name=name).update(
  90. {
  91. "title": form_data.title,
  92. "name": form_data.name,
  93. "timestamp": int(time.time()),
  94. }
  95. )
  96. db.commit()
  97. return self.get_doc_by_name(form_data.name)
  98. except Exception as e:
  99. log.exception(e)
  100. return None
  101. def update_doc_content_by_name(
  102. self, name: str, updated: dict
  103. ) -> Optional[DocumentModel]:
  104. try:
  105. with get_session() as db:
  106. doc = self.get_doc_by_name(name)
  107. doc_content = json.loads(doc.content if doc.content else "{}")
  108. doc_content = {**doc_content, **updated}
  109. db.query(Document).filter_by(name=name).update(
  110. {
  111. "content": json.dumps(doc_content),
  112. "timestamp": int(time.time()),
  113. }
  114. )
  115. db.commit()
  116. return self.get_doc_by_name(name)
  117. except Exception as e:
  118. log.exception(e)
  119. return None
  120. def delete_doc_by_name(self, name: str) -> bool:
  121. try:
  122. with get_session() as db:
  123. db.query(Document).filter_by(name=name).delete()
  124. return True
  125. except:
  126. return False
  127. Documents = DocumentsTable()