documents.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. from pydantic import BaseModel
  2. from peewee import *
  3. from playhouse.shortcuts import model_to_dict
  4. from typing import List, Union, Optional
  5. import time
  6. from utils.utils import decode_token
  7. from utils.misc import get_gravatar_url
  8. from apps.web.internal.db import DB
  9. import json
  10. ####################
  11. # Documents DB Schema
  12. ####################
  13. class Document(Model):
  14. collection_name = CharField(unique=True)
  15. name = CharField(unique=True)
  16. title = CharField()
  17. filename = CharField()
  18. content = TextField(null=True)
  19. user_id = CharField()
  20. timestamp = DateField()
  21. class Meta:
  22. database = DB
  23. class DocumentModel(BaseModel):
  24. collection_name: str
  25. name: str
  26. title: str
  27. filename: str
  28. content: Optional[str] = None
  29. user_id: str
  30. timestamp: int # timestamp in epoch
  31. ####################
  32. # Forms
  33. ####################
  34. class DocumentResponse(BaseModel):
  35. collection_name: str
  36. name: str
  37. title: str
  38. filename: str
  39. content: Optional[dict] = None
  40. user_id: str
  41. timestamp: int # timestamp in epoch
  42. class DocumentUpdateForm(BaseModel):
  43. name: str
  44. title: str
  45. class DocumentForm(DocumentUpdateForm):
  46. collection_name: str
  47. filename: str
  48. content: Optional[str] = None
  49. class DocumentsTable:
  50. def __init__(self, db):
  51. self.db = db
  52. self.db.create_tables([Document])
  53. def insert_new_doc(
  54. self, user_id: str, form_data: DocumentForm
  55. ) -> Optional[DocumentModel]:
  56. document = DocumentModel(
  57. **{
  58. **form_data.model_dump(),
  59. "user_id": user_id,
  60. "timestamp": int(time.time()),
  61. }
  62. )
  63. try:
  64. result = Document.create(**document.model_dump())
  65. if result:
  66. return document
  67. else:
  68. return None
  69. except:
  70. return None
  71. def get_doc_by_name(self, name: str) -> Optional[DocumentModel]:
  72. try:
  73. document = Document.get(Document.name == name)
  74. return DocumentModel(**model_to_dict(document))
  75. except:
  76. return None
  77. def get_docs(self) -> List[DocumentModel]:
  78. return [
  79. DocumentModel(**model_to_dict(doc))
  80. for doc in Document.select()
  81. # .limit(limit).offset(skip)
  82. ]
  83. def update_doc_by_name(
  84. self, name: str, form_data: DocumentUpdateForm
  85. ) -> Optional[DocumentModel]:
  86. try:
  87. query = Document.update(
  88. title=form_data.title,
  89. name=form_data.name,
  90. timestamp=int(time.time()),
  91. ).where(Document.name == name)
  92. query.execute()
  93. doc = Document.get(Document.name == form_data.name)
  94. return DocumentModel(**model_to_dict(doc))
  95. except Exception as e:
  96. print(e)
  97. return None
  98. def update_doc_content_by_name(
  99. self, name: str, updated: dict
  100. ) -> Optional[DocumentModel]:
  101. try:
  102. doc = self.get_doc_by_name(name)
  103. doc_content = json.loads(doc.content if doc.content else "{}")
  104. doc_content = {**doc_content, **updated}
  105. query = Document.update(
  106. content=json.dumps(doc_content),
  107. timestamp=int(time.time()),
  108. ).where(Document.name == name)
  109. query.execute()
  110. doc = Document.get(Document.name == name)
  111. return DocumentModel(**model_to_dict(doc))
  112. except Exception as e:
  113. print(e)
  114. return None
  115. def delete_doc_by_name(self, name: str) -> bool:
  116. try:
  117. query = Document.delete().where((Document.name == name))
  118. query.execute() # Remove the rows, return number of rows removed.
  119. return True
  120. except:
  121. return False
  122. Documents = DocumentsTable(DB)