files.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. from pydantic import BaseModel
  2. from peewee import *
  3. from playhouse.shortcuts import model_to_dict
  4. from typing import List, Union, Optional
  5. import time
  6. import logging
  7. from apps.webui.internal.db import DB, JSONField
  8. import json
  9. from config import SRC_LOG_LEVELS
  10. log = logging.getLogger(__name__)
  11. log.setLevel(SRC_LOG_LEVELS["MODELS"])
  12. ####################
  13. # Files DB Schema
  14. ####################
  15. class File(Model):
  16. id = CharField(unique=True)
  17. user_id = CharField()
  18. filename = TextField()
  19. meta = JSONField()
  20. created_at = BigIntegerField()
  21. class Meta:
  22. database = DB
  23. class FileModel(BaseModel):
  24. id: str
  25. user_id: str
  26. filename: str
  27. meta: dict
  28. created_at: int # timestamp in epoch
  29. ####################
  30. # Forms
  31. ####################
  32. class FileResponse(BaseModel):
  33. id: str
  34. user_id: str
  35. filename: str
  36. meta: dict
  37. created_at: int # timestamp in epoch
  38. class FileForm(BaseModel):
  39. id: str
  40. filename: str
  41. meta: dict = {}
  42. class FilesTable:
  43. def __init__(self, db):
  44. self.db = db
  45. self.db.create_tables([File])
  46. def insert_new_file(self, user_id: str, form_data: FileForm) -> Optional[FileModel]:
  47. file = FileModel(
  48. **{
  49. **form_data.model_dump(),
  50. "user_id": user_id,
  51. "created_at": int(time.time()),
  52. }
  53. )
  54. try:
  55. result = File.create(**file.model_dump())
  56. if result:
  57. return file
  58. else:
  59. return None
  60. except Exception as e:
  61. print(f"Error creating tool: {e}")
  62. return None
  63. def get_file_by_id(self, id: str) -> Optional[FileModel]:
  64. try:
  65. file = File.get(File.id == id)
  66. return FileModel(**model_to_dict(file))
  67. except:
  68. return None
  69. def get_files(self) -> List[FileModel]:
  70. return [FileModel(**model_to_dict(file)) for file in File.select()]
  71. def delete_file_by_id(self, id: str) -> bool:
  72. try:
  73. query = File.delete().where((File.id == id))
  74. query.execute() # Remove the rows, return number of rows removed.
  75. return True
  76. except:
  77. return False
  78. def delete_all_files(self) -> bool:
  79. try:
  80. query = File.delete()
  81. query.execute() # Remove the rows, return number of rows removed.
  82. return True
  83. except:
  84. return False
  85. Files = FilesTable(DB)