knowledge.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. import json
  2. import logging
  3. import time
  4. from typing import Optional
  5. import uuid
  6. from open_webui.apps.webui.internal.db import Base, get_db
  7. from open_webui.env import SRC_LOG_LEVELS
  8. from open_webui.apps.webui.models.files import FileMetadataResponse
  9. from open_webui.apps.webui.models.users import Users, UserResponse
  10. from pydantic import BaseModel, ConfigDict
  11. from sqlalchemy import BigInteger, Column, String, Text, JSON
  12. from open_webui.utils.access_control import has_access
  13. log = logging.getLogger(__name__)
  14. log.setLevel(SRC_LOG_LEVELS["MODELS"])
  15. ####################
  16. # Knowledge DB Schema
  17. ####################
  18. class Knowledge(Base):
  19. __tablename__ = "knowledge"
  20. id = Column(Text, unique=True, primary_key=True)
  21. user_id = Column(Text)
  22. name = Column(Text)
  23. description = Column(Text)
  24. data = Column(JSON, nullable=True)
  25. meta = Column(JSON, nullable=True)
  26. access_control = Column(JSON, nullable=True) # Controls data access levels.
  27. # Defines access control rules for this entry.
  28. # - `None`: Public access, available to all users with the "user" role.
  29. # - `{}`: Private access, restricted exclusively to the owner.
  30. # - Custom permissions: Specific access control for reading and writing;
  31. # Can specify group or user-level restrictions:
  32. # {
  33. # "read": {
  34. # "group_ids": ["group_id1", "group_id2"],
  35. # "user_ids": ["user_id1", "user_id2"]
  36. # },
  37. # "write": {
  38. # "group_ids": ["group_id1", "group_id2"],
  39. # "user_ids": ["user_id1", "user_id2"]
  40. # }
  41. # }
  42. created_at = Column(BigInteger)
  43. updated_at = Column(BigInteger)
  44. class KnowledgeModel(BaseModel):
  45. model_config = ConfigDict(from_attributes=True)
  46. id: str
  47. user_id: str
  48. name: str
  49. description: str
  50. data: Optional[dict] = None
  51. meta: Optional[dict] = None
  52. access_control: Optional[dict] = None
  53. created_at: int # timestamp in epoch
  54. updated_at: int # timestamp in epoch
  55. ####################
  56. # Forms
  57. ####################
  58. class KnowledgeUserModel(KnowledgeModel):
  59. user: Optional[UserResponse] = None
  60. class KnowledgeResponse(KnowledgeModel):
  61. files: Optional[list[FileMetadataResponse | dict]] = None
  62. class KnowledgeUserResponse(KnowledgeUserModel):
  63. files: Optional[list[FileMetadataResponse | dict]] = None
  64. class KnowledgeForm(BaseModel):
  65. name: str
  66. description: str
  67. data: Optional[dict] = None
  68. access_control: Optional[dict] = None
  69. class KnowledgeTable:
  70. def insert_new_knowledge(
  71. self, user_id: str, form_data: KnowledgeForm
  72. ) -> Optional[KnowledgeModel]:
  73. with get_db() as db:
  74. knowledge = KnowledgeModel(
  75. **{
  76. **form_data.model_dump(),
  77. "id": str(uuid.uuid4()),
  78. "user_id": user_id,
  79. "created_at": int(time.time()),
  80. "updated_at": int(time.time()),
  81. }
  82. )
  83. try:
  84. result = Knowledge(**knowledge.model_dump())
  85. db.add(result)
  86. db.commit()
  87. db.refresh(result)
  88. if result:
  89. return KnowledgeModel.model_validate(result)
  90. else:
  91. return None
  92. except Exception:
  93. return None
  94. def get_knowledge_bases(self) -> list[KnowledgeUserModel]:
  95. with get_db() as db:
  96. knowledge_bases = []
  97. for knowledge in (
  98. db.query(Knowledge).order_by(Knowledge.updated_at.desc()).all()
  99. ):
  100. user = Users.get_user_by_id(knowledge.user_id)
  101. knowledge_bases.append(
  102. KnowledgeUserModel.model_validate(
  103. {
  104. **KnowledgeModel.model_validate(knowledge).model_dump(),
  105. "user": user.model_dump() if user else None,
  106. }
  107. )
  108. )
  109. return knowledge_bases
  110. def get_knowledge_bases_by_user_id(
  111. self, user_id: str, permission: str = "write"
  112. ) -> list[KnowledgeUserModel]:
  113. knowledge_bases = self.get_knowledge_bases()
  114. return [
  115. knowledge_base
  116. for knowledge_base in knowledge_bases
  117. if knowledge_base.user_id == user_id
  118. or has_access(user_id, permission, knowledge_base.access_control)
  119. ]
  120. def get_knowledge_by_id(self, id: str) -> Optional[KnowledgeModel]:
  121. try:
  122. with get_db() as db:
  123. knowledge = db.query(Knowledge).filter_by(id=id).first()
  124. return KnowledgeModel.model_validate(knowledge) if knowledge else None
  125. except Exception:
  126. return None
  127. def update_knowledge_by_id(
  128. self, id: str, form_data: KnowledgeForm, overwrite: bool = False
  129. ) -> Optional[KnowledgeModel]:
  130. try:
  131. with get_db() as db:
  132. knowledge = self.get_knowledge_by_id(id=id)
  133. db.query(Knowledge).filter_by(id=id).update(
  134. {
  135. **form_data.model_dump(),
  136. "updated_at": int(time.time()),
  137. }
  138. )
  139. db.commit()
  140. return self.get_knowledge_by_id(id=id)
  141. except Exception as e:
  142. log.exception(e)
  143. return None
  144. def update_knowledge_data_by_id(
  145. self, id: str, data: dict
  146. ) -> Optional[KnowledgeModel]:
  147. try:
  148. with get_db() as db:
  149. knowledge = self.get_knowledge_by_id(id=id)
  150. db.query(Knowledge).filter_by(id=id).update(
  151. {
  152. "data": data,
  153. "updated_at": int(time.time()),
  154. }
  155. )
  156. db.commit()
  157. return self.get_knowledge_by_id(id=id)
  158. except Exception as e:
  159. log.exception(e)
  160. return None
  161. def delete_knowledge_by_id(self, id: str) -> bool:
  162. try:
  163. with get_db() as db:
  164. db.query(Knowledge).filter_by(id=id).delete()
  165. db.commit()
  166. return True
  167. except Exception:
  168. return False
  169. def delete_all_knowledge(self) -> bool:
  170. with get_db() as db:
  171. try:
  172. db.query(Knowledge).delete()
  173. db.commit()
  174. return True
  175. except Exception:
  176. return False
  177. Knowledges = KnowledgeTable()