123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- from pydantic import BaseModel
- from peewee import *
- from playhouse.shortcuts import model_to_dict
- from typing import List, Union, Optional
- import time
- import logging
- from utils.utils import decode_token
- from utils.misc import get_gravatar_url
- from apps.web.internal.db import DB
- import json
- from config import SRC_LOG_LEVELS
- log = logging.getLogger(__name__)
- log.setLevel(SRC_LOG_LEVELS["MODELS"])
- ####################
- # Documents DB Schema
- ####################
- class Document(Model):
- collection_name = CharField(unique=True)
- name = CharField(unique=True)
- title = TextField()
- filename = TextField()
- content = TextField(null=True)
- user_id = CharField()
- timestamp = BigIntegerField()
- class Meta:
- database = DB
- class DocumentModel(BaseModel):
- collection_name: str
- name: str
- title: str
- filename: str
- content: Optional[str] = None
- user_id: str
- timestamp: int # timestamp in epoch
- ####################
- # Forms
- ####################
- class DocumentResponse(BaseModel):
- collection_name: str
- name: str
- title: str
- filename: str
- content: Optional[dict] = None
- user_id: str
- timestamp: int # timestamp in epoch
- class DocumentUpdateForm(BaseModel):
- name: str
- title: str
- class DocumentForm(DocumentUpdateForm):
- collection_name: str
- filename: str
- content: Optional[str] = None
- class DocumentsTable:
- def __init__(self, db):
- self.db = db
- self.db.create_tables([Document])
- def insert_new_doc(
- self, user_id: str, form_data: DocumentForm
- ) -> Optional[DocumentModel]:
- document = DocumentModel(
- **{
- **form_data.model_dump(),
- "user_id": user_id,
- "timestamp": int(time.time()),
- }
- )
- try:
- result = Document.create(**document.model_dump())
- if result:
- return document
- else:
- return None
- except:
- return None
- def get_doc_by_name(self, name: str) -> Optional[DocumentModel]:
- try:
- document = Document.get(Document.name == name)
- return DocumentModel(**model_to_dict(document))
- except:
- return None
- def get_docs(self) -> List[DocumentModel]:
- return [
- DocumentModel(**model_to_dict(doc))
- for doc in Document.select()
- # .limit(limit).offset(skip)
- ]
- def update_doc_by_name(
- self, name: str, form_data: DocumentUpdateForm
- ) -> Optional[DocumentModel]:
- try:
- query = Document.update(
- title=form_data.title,
- name=form_data.name,
- timestamp=int(time.time()),
- ).where(Document.name == name)
- query.execute()
- doc = Document.get(Document.name == form_data.name)
- return DocumentModel(**model_to_dict(doc))
- except Exception as e:
- log.exception(e)
- return None
- def update_doc_content_by_name(
- self, name: str, updated: dict
- ) -> Optional[DocumentModel]:
- try:
- doc = self.get_doc_by_name(name)
- doc_content = json.loads(doc.content if doc.content else "{}")
- doc_content = {**doc_content, **updated}
- query = Document.update(
- content=json.dumps(doc_content),
- timestamp=int(time.time()),
- ).where(Document.name == name)
- query.execute()
- doc = Document.get(Document.name == name)
- return DocumentModel(**model_to_dict(doc))
- except Exception as e:
- log.exception(e)
- return None
- def delete_doc_by_name(self, name: str) -> bool:
- try:
- query = Document.delete().where((Document.name == name))
- query.execute() # Remove the rows, return number of rows removed.
- return True
- except:
- return False
- Documents = DocumentsTable(DB)
|