prompts.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. import time
  2. from typing import Optional
  3. from open_webui.apps.webui.internal.db import Base, get_db
  4. from open_webui.apps.webui.models.users import Users, UserResponse
  5. from pydantic import BaseModel, ConfigDict
  6. from sqlalchemy import BigInteger, Column, String, Text, JSON
  7. from open_webui.utils.access_control import has_access
  8. ####################
  9. # Prompts DB Schema
  10. ####################
  11. class Prompt(Base):
  12. __tablename__ = "prompt"
  13. command = Column(String, primary_key=True)
  14. user_id = Column(String)
  15. title = Column(Text)
  16. content = Column(Text)
  17. timestamp = Column(BigInteger)
  18. access_control = Column(JSON, nullable=True) # Controls data access levels.
  19. # Defines access control rules for this entry.
  20. # - `None`: Public access, available to all users with the "user" role.
  21. # - `{}`: Private access, restricted exclusively to the owner.
  22. # - Custom permissions: Specific access control for reading and writing;
  23. # Can specify group or user-level restrictions:
  24. # {
  25. # "read": {
  26. # "group_ids": ["group_id1", "group_id2"],
  27. # "user_ids": ["user_id1", "user_id2"]
  28. # },
  29. # "write": {
  30. # "group_ids": ["group_id1", "group_id2"],
  31. # "user_ids": ["user_id1", "user_id2"]
  32. # }
  33. # }
  34. class PromptModel(BaseModel):
  35. command: str
  36. user_id: str
  37. title: str
  38. content: str
  39. timestamp: int # timestamp in epoch
  40. access_control: Optional[dict] = None
  41. model_config = ConfigDict(from_attributes=True)
  42. ####################
  43. # Forms
  44. ####################
  45. class PromptUserResponse(PromptModel):
  46. user: Optional[UserResponse] = None
  47. class PromptForm(BaseModel):
  48. command: str
  49. title: str
  50. content: str
  51. access_control: Optional[dict] = None
  52. class PromptsTable:
  53. def insert_new_prompt(
  54. self, user_id: str, form_data: PromptForm
  55. ) -> Optional[PromptModel]:
  56. prompt = PromptModel(
  57. **{
  58. "user_id": user_id,
  59. **form_data.model_dump(),
  60. "timestamp": int(time.time()),
  61. }
  62. )
  63. try:
  64. with get_db() as db:
  65. result = Prompt(**prompt.model_dump())
  66. db.add(result)
  67. db.commit()
  68. db.refresh(result)
  69. if result:
  70. return PromptModel.model_validate(result)
  71. else:
  72. return None
  73. except Exception:
  74. return None
  75. def get_prompt_by_command(self, command: str) -> Optional[PromptModel]:
  76. try:
  77. with get_db() as db:
  78. prompt = db.query(Prompt).filter_by(command=command).first()
  79. return PromptModel.model_validate(prompt)
  80. except Exception:
  81. return None
  82. def get_prompts(self) -> list[PromptUserResponse]:
  83. with get_db() as db:
  84. return [
  85. PromptUserResponse.model_validate(
  86. {
  87. **PromptModel.model_validate(prompt).model_dump(),
  88. "user": Users.get_user_by_id(prompt.user_id).model_dump(),
  89. }
  90. )
  91. for prompt in db.query(Prompt).all()
  92. ]
  93. def get_prompts_by_user_id(
  94. self, user_id: str, permission: str = "write"
  95. ) -> list[PromptUserResponse]:
  96. prompts = self.get_prompts()
  97. return [
  98. prompt
  99. for prompt in prompts
  100. if prompt.user_id == user_id
  101. or has_access(user_id, permission, prompt.access_control)
  102. ]
  103. def update_prompt_by_command(
  104. self, command: str, form_data: PromptForm
  105. ) -> Optional[PromptModel]:
  106. try:
  107. with get_db() as db:
  108. prompt = db.query(Prompt).filter_by(command=command).first()
  109. prompt.title = form_data.title
  110. prompt.content = form_data.content
  111. prompt.access_control = form_data.access_control
  112. prompt.timestamp = int(time.time())
  113. db.commit()
  114. return PromptModel.model_validate(prompt)
  115. except Exception:
  116. return None
  117. def delete_prompt_by_command(self, command: str) -> bool:
  118. try:
  119. with get_db() as db:
  120. db.query(Prompt).filter_by(command=command).delete()
  121. db.commit()
  122. return True
  123. except Exception:
  124. return False
  125. Prompts = PromptsTable()