documents.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. from pydantic import BaseModel
  2. from peewee import *
  3. from playhouse.shortcuts import model_to_dict
  4. from typing import List, Union, Optional
  5. import time
  6. from utils.utils import decode_token
  7. from utils.misc import get_gravatar_url
  8. from apps.web.internal.db import DB
  9. import json
  10. ####################
  11. # Documents DB Schema
  12. ####################
  13. class Document(Model):
  14. collection_name = CharField(unique=True)
  15. name = CharField(unique=True)
  16. title = CharField()
  17. filename = CharField()
  18. content = TextField(null=True)
  19. user_id = CharField()
  20. timestamp = DateField()
  21. class Meta:
  22. database = DB
  23. class DocumentModel(BaseModel):
  24. collection_name: str
  25. name: str
  26. title: str
  27. filename: str
  28. content: Optional[str] = None
  29. user_id: str
  30. timestamp: int # timestamp in epoch
  31. ####################
  32. # Forms
  33. ####################
  34. class DocumentUpdateForm(BaseModel):
  35. name: str
  36. title: str
  37. class DocumentForm(DocumentUpdateForm):
  38. collection_name: str
  39. filename: str
  40. content: Optional[str] = None
  41. class DocumentsTable:
  42. def __init__(self, db):
  43. self.db = db
  44. self.db.create_tables([Document])
  45. def insert_new_doc(
  46. self, user_id: str, form_data: DocumentForm
  47. ) -> Optional[DocumentModel]:
  48. document = DocumentModel(
  49. **{
  50. **form_data.model_dump(),
  51. "user_id": user_id,
  52. "timestamp": int(time.time()),
  53. }
  54. )
  55. try:
  56. result = Document.create(**document.model_dump())
  57. if result:
  58. return document
  59. else:
  60. return None
  61. except:
  62. return None
  63. def get_doc_by_name(self, name: str) -> Optional[DocumentModel]:
  64. try:
  65. document = Document.get(Document.name == name)
  66. return DocumentModel(**model_to_dict(document))
  67. except:
  68. return None
  69. def get_docs(self) -> List[DocumentModel]:
  70. return [
  71. DocumentModel(**model_to_dict(doc))
  72. for doc in Document.select()
  73. # .limit(limit).offset(skip)
  74. ]
  75. def update_doc_by_name(
  76. self, name: str, form_data: DocumentUpdateForm
  77. ) -> Optional[DocumentModel]:
  78. try:
  79. query = Document.update(
  80. title=form_data.title,
  81. name=form_data.name,
  82. timestamp=int(time.time()),
  83. ).where(Document.name == name)
  84. query.execute()
  85. doc = Document.get(Document.name == name)
  86. return DocumentModel(**model_to_dict(doc))
  87. except:
  88. return None
  89. def delete_doc_by_name(self, name: str) -> bool:
  90. try:
  91. query = Document.delete().where((Document.name == name))
  92. query.execute() # Remove the rows, return number of rows removed.
  93. return True
  94. except:
  95. return False
  96. Documents = DocumentsTable(DB)