folders.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. import logging
  2. import time
  3. import uuid
  4. from typing import Optional
  5. from open_webui.apps.webui.internal.db import Base, get_db
  6. from open_webui.env import SRC_LOG_LEVELS
  7. from pydantic import BaseModel, ConfigDict
  8. from sqlalchemy import BigInteger, Column, Text, JSON, Boolean
  9. log = logging.getLogger(__name__)
  10. log.setLevel(SRC_LOG_LEVELS["MODELS"])
  11. ####################
  12. # Folder DB Schema
  13. ####################
  14. class Folder(Base):
  15. __tablename__ = "folder"
  16. id = Column(Text, primary_key=True)
  17. parent_id = Column(Text, nullable=True)
  18. user_id = Column(Text)
  19. name = Column(Text)
  20. items = Column(JSON, nullable=True)
  21. meta = Column(JSON, nullable=True)
  22. is_expanded = Column(Boolean, default=False)
  23. created_at = Column(BigInteger)
  24. updated_at = Column(BigInteger)
  25. class FolderModel(BaseModel):
  26. id: str
  27. parent_id: Optional[str] = None
  28. user_id: str
  29. name: str
  30. items: Optional[dict] = None
  31. meta: Optional[dict] = None
  32. is_expanded: bool = False
  33. created_at: int
  34. updated_at: int
  35. model_config = ConfigDict(from_attributes=True)
  36. ####################
  37. # Forms
  38. ####################
  39. class FolderForm(BaseModel):
  40. name: str
  41. model_config = ConfigDict(extra="allow")
  42. class FolderTable:
  43. def insert_new_folder(
  44. self, user_id: str, name: str, parent_id: Optional[str] = None
  45. ) -> Optional[FolderModel]:
  46. with get_db() as db:
  47. id = str(uuid.uuid4())
  48. folder = FolderModel(
  49. **{
  50. "id": id,
  51. "user_id": user_id,
  52. "name": name,
  53. "parent_id": parent_id,
  54. "created_at": int(time.time()),
  55. "updated_at": int(time.time()),
  56. }
  57. )
  58. try:
  59. result = Folder(**folder.model_dump())
  60. db.add(result)
  61. db.commit()
  62. db.refresh(result)
  63. if result:
  64. return FolderModel.model_validate(result)
  65. else:
  66. return None
  67. except Exception as e:
  68. print(e)
  69. return None
  70. def get_folder_by_id_and_user_id(
  71. self, id: str, user_id: str
  72. ) -> Optional[FolderModel]:
  73. try:
  74. with get_db() as db:
  75. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  76. if not folder:
  77. return None
  78. return FolderModel.model_validate(folder)
  79. except Exception:
  80. return None
  81. def get_folders_by_user_id(self, user_id: str) -> list[FolderModel]:
  82. with get_db() as db:
  83. return [
  84. FolderModel.model_validate(folder)
  85. for folder in db.query(Folder).filter_by(user_id=user_id).all()
  86. ]
  87. def get_folder_by_parent_id_and_user_id_and_name(
  88. self, parent_id: Optional[str], user_id: str, name: str
  89. ) -> Optional[FolderModel]:
  90. try:
  91. with get_db() as db:
  92. # Check if folder exists
  93. folder = (
  94. db.query(Folder)
  95. .filter_by(parent_id=parent_id, user_id=user_id)
  96. .filter(Folder.name.ilike(name))
  97. .first()
  98. )
  99. if not folder:
  100. return None
  101. return FolderModel.model_validate(folder)
  102. except Exception as e:
  103. log.error(f"get_folder_by_parent_id_and_user_id_and_name: {e}")
  104. return None
  105. def get_folders_by_parent_id_and_user_id(
  106. self, parent_id: Optional[str], user_id: str
  107. ) -> list[FolderModel]:
  108. with get_db() as db:
  109. return [
  110. FolderModel.model_validate(folder)
  111. for folder in db.query(Folder)
  112. .filter_by(parent_id=parent_id, user_id=user_id)
  113. .all()
  114. ]
  115. def update_folder_parent_id_by_id_and_user_id(
  116. self,
  117. id: str,
  118. user_id: str,
  119. parent_id: str,
  120. ) -> Optional[FolderModel]:
  121. try:
  122. with get_db() as db:
  123. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  124. if not folder:
  125. return None
  126. folder.parent_id = parent_id
  127. folder.updated_at = int(time.time())
  128. db.commit()
  129. return FolderModel.model_validate(folder)
  130. except Exception as e:
  131. log.error(f"update_folder: {e}")
  132. return
  133. def update_folder_name_by_id_and_user_id(
  134. self, id: str, user_id: str, name: str
  135. ) -> Optional[FolderModel]:
  136. try:
  137. with get_db() as db:
  138. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  139. if not folder:
  140. return None
  141. existing_folder = (
  142. db.query(Folder)
  143. .filter_by(name=name, parent_id=folder.parent_id, user_id=user_id)
  144. .first()
  145. )
  146. if existing_folder:
  147. return None
  148. folder.name = name
  149. folder.updated_at = int(time.time())
  150. db.commit()
  151. return FolderModel.model_validate(folder)
  152. except Exception as e:
  153. log.error(f"update_folder: {e}")
  154. return
  155. def update_folder_is_expanded_by_id_and_user_id(
  156. self, id: str, user_id: str, is_expanded: bool
  157. ) -> Optional[FolderModel]:
  158. try:
  159. with get_db() as db:
  160. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  161. if not folder:
  162. return None
  163. folder.is_expanded = is_expanded
  164. folder.updated_at = int(time.time())
  165. db.commit()
  166. return FolderModel.model_validate(folder)
  167. except Exception as e:
  168. log.error(f"update_folder: {e}")
  169. return
  170. def delete_folder_by_id_and_user_id(self, id: str, user_id: str) -> bool:
  171. try:
  172. with get_db() as db:
  173. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  174. db.delete(folder)
  175. db.commit()
  176. return True
  177. except Exception as e:
  178. log.error(f"delete_folder: {e}")
  179. return False
  180. Folders = FolderTable()