prompts.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. import time
  2. from typing import Optional
  3. from open_webui.internal.db import Base, get_db
  4. from open_webui.models.users import Users, UserResponse
  5. from pydantic import BaseModel, ConfigDict
  6. from sqlalchemy import BigInteger, Column, String, Text, JSON
  7. from open_webui.utils.access_control import has_access
  8. ####################
  9. # Prompts DB Schema
  10. ####################
  11. class Prompt(Base):
  12. __tablename__ = "prompt"
  13. command = Column(String, primary_key=True)
  14. user_id = Column(String)
  15. title = Column(Text)
  16. content = Column(Text)
  17. timestamp = Column(BigInteger)
  18. access_control = Column(JSON, nullable=True) # Controls data access levels.
  19. # Defines access control rules for this entry.
  20. # - `None`: Public access, available to all users with the "user" role.
  21. # - `{}`: Private access, restricted exclusively to the owner.
  22. # - Custom permissions: Specific access control for reading and writing;
  23. # Can specify group or user-level restrictions:
  24. # {
  25. # "read": {
  26. # "group_ids": ["group_id1", "group_id2"],
  27. # "user_ids": ["user_id1", "user_id2"]
  28. # },
  29. # "write": {
  30. # "group_ids": ["group_id1", "group_id2"],
  31. # "user_ids": ["user_id1", "user_id2"]
  32. # }
  33. # }
  34. class PromptModel(BaseModel):
  35. command: str
  36. user_id: str
  37. title: str
  38. content: str
  39. timestamp: int # timestamp in epoch
  40. access_control: Optional[dict] = None
  41. model_config = ConfigDict(from_attributes=True)
  42. ####################
  43. # Forms
  44. ####################
  45. class PromptUserResponse(PromptModel):
  46. user: Optional[UserResponse] = None
  47. class PromptForm(BaseModel):
  48. command: str
  49. title: str
  50. content: str
  51. access_control: Optional[dict] = None
  52. class PromptsTable:
  53. def insert_new_prompt(
  54. self, user_id: str, form_data: PromptForm
  55. ) -> Optional[PromptModel]:
  56. prompt = PromptModel(
  57. **{
  58. "user_id": user_id,
  59. **form_data.model_dump(),
  60. "timestamp": int(time.time()),
  61. }
  62. )
  63. try:
  64. with get_db() as db:
  65. result = Prompt(**prompt.model_dump())
  66. db.add(result)
  67. db.commit()
  68. db.refresh(result)
  69. if result:
  70. return PromptModel.model_validate(result)
  71. else:
  72. return None
  73. except Exception:
  74. return None
  75. def get_prompt_by_command(self, command: str) -> Optional[PromptModel]:
  76. try:
  77. with get_db() as db:
  78. prompt = db.query(Prompt).filter_by(command=command).first()
  79. return PromptModel.model_validate(prompt)
  80. except Exception:
  81. return None
  82. def get_prompts(self) -> list[PromptUserResponse]:
  83. with get_db() as db:
  84. prompts = []
  85. for prompt in db.query(Prompt).order_by(Prompt.timestamp.desc()).all():
  86. user = Users.get_user_by_id(prompt.user_id)
  87. prompts.append(
  88. PromptUserResponse.model_validate(
  89. {
  90. **PromptModel.model_validate(prompt).model_dump(),
  91. "user": user.model_dump() if user else None,
  92. }
  93. )
  94. )
  95. return prompts
  96. def get_prompts_by_user_id(
  97. self, user_id: str, permission: str = "write"
  98. ) -> list[PromptUserResponse]:
  99. prompts = self.get_prompts()
  100. return [
  101. prompt
  102. for prompt in prompts
  103. if prompt.user_id == user_id
  104. or has_access(user_id, permission, prompt.access_control)
  105. ]
  106. def update_prompt_by_command(
  107. self, command: str, form_data: PromptForm
  108. ) -> Optional[PromptModel]:
  109. try:
  110. with get_db() as db:
  111. prompt = db.query(Prompt).filter_by(command=command).first()
  112. prompt.title = form_data.title
  113. prompt.content = form_data.content
  114. prompt.access_control = form_data.access_control
  115. prompt.timestamp = int(time.time())
  116. db.commit()
  117. return PromptModel.model_validate(prompt)
  118. except Exception:
  119. return None
  120. def delete_prompt_by_command(self, command: str) -> bool:
  121. try:
  122. with get_db() as db:
  123. db.query(Prompt).filter_by(command=command).delete()
  124. db.commit()
  125. return True
  126. except Exception:
  127. return False
  128. Prompts = PromptsTable()