documents.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. from pydantic import BaseModel
  2. from peewee import *
  3. from playhouse.shortcuts import model_to_dict
  4. from typing import List, Union, Optional
  5. import time
  6. import logging
  7. from utils.utils import decode_token
  8. from utils.misc import get_gravatar_url
  9. from apps.web.internal.db import DB
  10. import json
  11. from config import SRC_LOG_LEVELS
  12. log = logging.getLogger(__name__)
  13. log.setLevel(SRC_LOG_LEVELS["MODELS"])
  14. ####################
  15. # Documents DB Schema
  16. ####################
  17. class Document(Model):
  18. collection_name = CharField(unique=True)
  19. name = CharField(unique=True)
  20. title = TextField()
  21. filename = TextField()
  22. content = TextField(null=True)
  23. user_id = CharField()
  24. timestamp = BigIntegerField()
  25. class Meta:
  26. database = DB
  27. class DocumentModel(BaseModel):
  28. collection_name: str
  29. name: str
  30. title: str
  31. filename: str
  32. content: Optional[str] = None
  33. user_id: str
  34. timestamp: int # timestamp in epoch
  35. ####################
  36. # Forms
  37. ####################
  38. class DocumentResponse(BaseModel):
  39. collection_name: str
  40. name: str
  41. title: str
  42. filename: str
  43. content: Optional[dict] = None
  44. user_id: str
  45. timestamp: int # timestamp in epoch
  46. class DocumentUpdateForm(BaseModel):
  47. name: str
  48. title: str
  49. class DocumentForm(DocumentUpdateForm):
  50. collection_name: str
  51. filename: str
  52. content: Optional[str] = None
  53. class DocumentsTable:
  54. def __init__(self, db):
  55. self.db = db
  56. self.db.create_tables([Document])
  57. def insert_new_doc(
  58. self, user_id: str, form_data: DocumentForm
  59. ) -> Optional[DocumentModel]:
  60. document = DocumentModel(
  61. **{
  62. **form_data.model_dump(),
  63. "user_id": user_id,
  64. "timestamp": int(time.time()),
  65. }
  66. )
  67. try:
  68. result = Document.create(**document.model_dump())
  69. if result:
  70. return document
  71. else:
  72. return None
  73. except:
  74. return None
  75. def get_doc_by_name(self, name: str) -> Optional[DocumentModel]:
  76. try:
  77. document = Document.get(Document.name == name)
  78. return DocumentModel(**model_to_dict(document))
  79. except:
  80. return None
  81. def get_docs(self) -> List[DocumentModel]:
  82. return [
  83. DocumentModel(**model_to_dict(doc))
  84. for doc in Document.select()
  85. # .limit(limit).offset(skip)
  86. ]
  87. def update_doc_by_name(
  88. self, name: str, form_data: DocumentUpdateForm
  89. ) -> Optional[DocumentModel]:
  90. try:
  91. query = Document.update(
  92. title=form_data.title,
  93. name=form_data.name,
  94. timestamp=int(time.time()),
  95. ).where(Document.name == name)
  96. query.execute()
  97. doc = Document.get(Document.name == form_data.name)
  98. return DocumentModel(**model_to_dict(doc))
  99. except Exception as e:
  100. log.exception(e)
  101. return None
  102. def update_doc_content_by_name(
  103. self, name: str, updated: dict
  104. ) -> Optional[DocumentModel]:
  105. try:
  106. doc = self.get_doc_by_name(name)
  107. doc_content = json.loads(doc.content if doc.content else "{}")
  108. doc_content = {**doc_content, **updated}
  109. query = Document.update(
  110. content=json.dumps(doc_content),
  111. timestamp=int(time.time()),
  112. ).where(Document.name == name)
  113. query.execute()
  114. doc = Document.get(Document.name == name)
  115. return DocumentModel(**model_to_dict(doc))
  116. except Exception as e:
  117. log.exception(e)
  118. return None
  119. def delete_doc_by_name(self, name: str) -> bool:
  120. try:
  121. query = Document.delete().where((Document.name == name))
  122. query.execute() # Remove the rows, return number of rows removed.
  123. return True
  124. except:
  125. return False
  126. Documents = DocumentsTable(DB)