functions.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. from pydantic import BaseModel
  2. from peewee import *
  3. from playhouse.shortcuts import model_to_dict
  4. from typing import List, Union, Optional
  5. import time
  6. import logging
  7. from apps.webui.internal.db import DB, JSONField
  8. from apps.webui.models.users import Users
  9. import json
  10. import copy
  11. from config import SRC_LOG_LEVELS
  12. log = logging.getLogger(__name__)
  13. log.setLevel(SRC_LOG_LEVELS["MODELS"])
  14. ####################
  15. # Functions DB Schema
  16. ####################
  17. class Function(Model):
  18. id = CharField(unique=True)
  19. user_id = CharField()
  20. name = TextField()
  21. type = TextField()
  22. content = TextField()
  23. meta = JSONField()
  24. valves = JSONField()
  25. is_active = BooleanField(default=False)
  26. updated_at = BigIntegerField()
  27. created_at = BigIntegerField()
  28. class Meta:
  29. database = DB
  30. class FunctionMeta(BaseModel):
  31. description: Optional[str] = None
  32. manifest: Optional[dict] = {}
  33. class FunctionModel(BaseModel):
  34. id: str
  35. user_id: str
  36. name: str
  37. type: str
  38. content: str
  39. meta: FunctionMeta
  40. is_active: bool = False
  41. updated_at: int # timestamp in epoch
  42. created_at: int # timestamp in epoch
  43. ####################
  44. # Forms
  45. ####################
  46. class FunctionResponse(BaseModel):
  47. id: str
  48. user_id: str
  49. type: str
  50. name: str
  51. meta: FunctionMeta
  52. is_active: bool
  53. updated_at: int # timestamp in epoch
  54. created_at: int # timestamp in epoch
  55. class FunctionForm(BaseModel):
  56. id: str
  57. name: str
  58. content: str
  59. meta: FunctionMeta
  60. class FunctionValves(BaseModel):
  61. valves: Optional[dict] = None
  62. class FunctionsTable:
  63. def __init__(self, db):
  64. self.db = db
  65. self.db.create_tables([Function])
  66. def insert_new_function(
  67. self, user_id: str, type: str, form_data: FunctionForm
  68. ) -> Optional[FunctionModel]:
  69. function = FunctionModel(
  70. **{
  71. **form_data.model_dump(),
  72. "user_id": user_id,
  73. "type": type,
  74. "updated_at": int(time.time()),
  75. "created_at": int(time.time()),
  76. }
  77. )
  78. try:
  79. result = Function.create(**function.model_dump())
  80. if result:
  81. return function
  82. else:
  83. return None
  84. except Exception as e:
  85. print(f"Error creating tool: {e}")
  86. return None
  87. def get_function_by_id(self, id: str) -> Optional[FunctionModel]:
  88. try:
  89. function = Function.get(Function.id == id)
  90. return FunctionModel(**model_to_dict(function))
  91. except:
  92. return None
  93. def get_functions(self, active_only=False) -> List[FunctionModel]:
  94. if active_only:
  95. return [
  96. FunctionModel(**model_to_dict(function))
  97. for function in Function.select().where(Function.is_active == True)
  98. ]
  99. else:
  100. return [
  101. FunctionModel(**model_to_dict(function))
  102. for function in Function.select()
  103. ]
  104. def get_functions_by_type(
  105. self, type: str, active_only=False
  106. ) -> List[FunctionModel]:
  107. if active_only:
  108. return [
  109. FunctionModel(**model_to_dict(function))
  110. for function in Function.select().where(
  111. Function.type == type, Function.is_active == True
  112. )
  113. ]
  114. else:
  115. return [
  116. FunctionModel(**model_to_dict(function))
  117. for function in Function.select().where(Function.type == type)
  118. ]
  119. def get_function_valves_by_id(self, id: str) -> Optional[dict]:
  120. try:
  121. function = Function.get(Function.id == id)
  122. return function.valves if function.valves else {}
  123. except Exception as e:
  124. print(f"An error occurred: {e}")
  125. return None
  126. def update_function_valves_by_id(
  127. self, id: str, valves: dict
  128. ) -> Optional[FunctionValves]:
  129. try:
  130. query = Function.update(
  131. **{"valves": valves},
  132. updated_at=int(time.time()),
  133. ).where(Function.id == id)
  134. query.execute()
  135. function = Function.get(Function.id == id)
  136. return FunctionValves(**model_to_dict(function))
  137. except:
  138. return None
  139. def get_user_valves_by_id_and_user_id(
  140. self, id: str, user_id: str
  141. ) -> Optional[dict]:
  142. try:
  143. user = Users.get_user_by_id(user_id)
  144. user_settings = user.settings.model_dump()
  145. # Check if user has "functions" and "valves" settings
  146. if "functions" not in user_settings:
  147. user_settings["functions"] = {}
  148. if "valves" not in user_settings["functions"]:
  149. user_settings["functions"]["valves"] = {}
  150. return user_settings["functions"]["valves"].get(id, {})
  151. except Exception as e:
  152. print(f"An error occurred: {e}")
  153. return None
  154. def update_user_valves_by_id_and_user_id(
  155. self, id: str, user_id: str, valves: dict
  156. ) -> Optional[dict]:
  157. try:
  158. user = Users.get_user_by_id(user_id)
  159. user_settings = user.settings.model_dump()
  160. # Check if user has "functions" and "valves" settings
  161. if "functions" not in user_settings:
  162. user_settings["functions"] = {}
  163. if "valves" not in user_settings["functions"]:
  164. user_settings["functions"]["valves"] = {}
  165. user_settings["functions"]["valves"][id] = valves
  166. # Update the user settings in the database
  167. query = Users.update_user_by_id(user_id, {"settings": user_settings})
  168. query.execute()
  169. return user_settings["functions"]["valves"][id]
  170. except Exception as e:
  171. print(f"An error occurred: {e}")
  172. return None
  173. def update_function_by_id(self, id: str, updated: dict) -> Optional[FunctionModel]:
  174. try:
  175. query = Function.update(
  176. **updated,
  177. updated_at=int(time.time()),
  178. ).where(Function.id == id)
  179. query.execute()
  180. function = Function.get(Function.id == id)
  181. return FunctionModel(**model_to_dict(function))
  182. except:
  183. return None
  184. def deactivate_all_functions(self) -> Optional[bool]:
  185. try:
  186. query = Function.update(
  187. **{"is_active": False},
  188. updated_at=int(time.time()),
  189. )
  190. query.execute()
  191. return True
  192. except:
  193. return None
  194. def delete_function_by_id(self, id: str) -> bool:
  195. try:
  196. query = Function.delete().where((Function.id == id))
  197. query.execute() # Remove the rows, return number of rows removed.
  198. return True
  199. except:
  200. return False
  201. Functions = FunctionsTable(DB)