functions.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. from pydantic import BaseModel
  2. from peewee import *
  3. from playhouse.shortcuts import model_to_dict
  4. from typing import List, Union, Optional
  5. import time
  6. import logging
  7. from apps.webui.internal.db import DB, JSONField
  8. from apps.webui.models.users import Users
  9. import json
  10. import copy
  11. from config import SRC_LOG_LEVELS
  12. log = logging.getLogger(__name__)
  13. log.setLevel(SRC_LOG_LEVELS["MODELS"])
  14. ####################
  15. # Functions DB Schema
  16. ####################
  17. class Function(Model):
  18. id = CharField(unique=True)
  19. user_id = CharField()
  20. name = TextField()
  21. type = TextField()
  22. content = TextField()
  23. meta = JSONField()
  24. valves = JSONField()
  25. is_active = BooleanField(default=False)
  26. is_global = BooleanField(default=False)
  27. updated_at = BigIntegerField()
  28. created_at = BigIntegerField()
  29. class Meta:
  30. database = DB
  31. class FunctionMeta(BaseModel):
  32. description: Optional[str] = None
  33. manifest: Optional[dict] = {}
  34. class FunctionModel(BaseModel):
  35. id: str
  36. user_id: str
  37. name: str
  38. type: str
  39. content: str
  40. meta: FunctionMeta
  41. is_active: bool = False
  42. is_global: bool = False
  43. updated_at: int # timestamp in epoch
  44. created_at: int # timestamp in epoch
  45. ####################
  46. # Forms
  47. ####################
  48. class FunctionResponse(BaseModel):
  49. id: str
  50. user_id: str
  51. type: str
  52. name: str
  53. meta: FunctionMeta
  54. is_active: bool
  55. is_global: bool
  56. updated_at: int # timestamp in epoch
  57. created_at: int # timestamp in epoch
  58. class FunctionForm(BaseModel):
  59. id: str
  60. name: str
  61. content: str
  62. meta: FunctionMeta
  63. class FunctionValves(BaseModel):
  64. valves: Optional[dict] = None
  65. class FunctionsTable:
  66. def __init__(self, db):
  67. self.db = db
  68. self.db.create_tables([Function])
  69. def insert_new_function(
  70. self, user_id: str, type: str, form_data: FunctionForm
  71. ) -> Optional[FunctionModel]:
  72. function = FunctionModel(
  73. **{
  74. **form_data.model_dump(),
  75. "user_id": user_id,
  76. "type": type,
  77. "updated_at": int(time.time()),
  78. "created_at": int(time.time()),
  79. }
  80. )
  81. try:
  82. result = Function.create(**function.model_dump())
  83. if result:
  84. return function
  85. else:
  86. return None
  87. except Exception as e:
  88. print(f"Error creating tool: {e}")
  89. return None
  90. def get_function_by_id(self, id: str) -> Optional[FunctionModel]:
  91. try:
  92. function = Function.get(Function.id == id)
  93. return FunctionModel(**model_to_dict(function))
  94. except:
  95. return None
  96. def get_functions(self, active_only=False) -> List[FunctionModel]:
  97. if active_only:
  98. return [
  99. FunctionModel(**model_to_dict(function))
  100. for function in Function.select().where(Function.is_active == True)
  101. ]
  102. else:
  103. return [
  104. FunctionModel(**model_to_dict(function))
  105. for function in Function.select()
  106. ]
  107. def get_functions_by_type(
  108. self, type: str, active_only=False
  109. ) -> List[FunctionModel]:
  110. if active_only:
  111. return [
  112. FunctionModel(**model_to_dict(function))
  113. for function in Function.select().where(
  114. Function.type == type, Function.is_active == True
  115. )
  116. ]
  117. else:
  118. return [
  119. FunctionModel(**model_to_dict(function))
  120. for function in Function.select().where(Function.type == type)
  121. ]
  122. def get_global_filter_functions(self) -> List[FunctionModel]:
  123. return [
  124. FunctionModel(**model_to_dict(function))
  125. for function in Function.select().where(
  126. Function.type == "filter",
  127. Function.is_active == True,
  128. Function.is_global == True,
  129. )
  130. ]
  131. def get_function_valves_by_id(self, id: str) -> Optional[dict]:
  132. try:
  133. function = Function.get(Function.id == id)
  134. return function.valves if function.valves else {}
  135. except Exception as e:
  136. print(f"An error occurred: {e}")
  137. return None
  138. def update_function_valves_by_id(
  139. self, id: str, valves: dict
  140. ) -> Optional[FunctionValves]:
  141. try:
  142. query = Function.update(
  143. **{"valves": valves},
  144. updated_at=int(time.time()),
  145. ).where(Function.id == id)
  146. query.execute()
  147. function = Function.get(Function.id == id)
  148. return FunctionValves(**model_to_dict(function))
  149. except:
  150. return None
  151. def get_user_valves_by_id_and_user_id(
  152. self, id: str, user_id: str
  153. ) -> Optional[dict]:
  154. try:
  155. user = Users.get_user_by_id(user_id)
  156. user_settings = user.settings.model_dump()
  157. # Check if user has "functions" and "valves" settings
  158. if "functions" not in user_settings:
  159. user_settings["functions"] = {}
  160. if "valves" not in user_settings["functions"]:
  161. user_settings["functions"]["valves"] = {}
  162. return user_settings["functions"]["valves"].get(id, {})
  163. except Exception as e:
  164. print(f"An error occurred: {e}")
  165. return None
  166. def update_user_valves_by_id_and_user_id(
  167. self, id: str, user_id: str, valves: dict
  168. ) -> Optional[dict]:
  169. try:
  170. user = Users.get_user_by_id(user_id)
  171. user_settings = user.settings.model_dump()
  172. # Check if user has "functions" and "valves" settings
  173. if "functions" not in user_settings:
  174. user_settings["functions"] = {}
  175. if "valves" not in user_settings["functions"]:
  176. user_settings["functions"]["valves"] = {}
  177. user_settings["functions"]["valves"][id] = valves
  178. # Update the user settings in the database
  179. query = Users.update_user_by_id(user_id, {"settings": user_settings})
  180. query.execute()
  181. return user_settings["functions"]["valves"][id]
  182. except Exception as e:
  183. print(f"An error occurred: {e}")
  184. return None
  185. def update_function_by_id(self, id: str, updated: dict) -> Optional[FunctionModel]:
  186. try:
  187. query = Function.update(
  188. **updated,
  189. updated_at=int(time.time()),
  190. ).where(Function.id == id)
  191. query.execute()
  192. function = Function.get(Function.id == id)
  193. return FunctionModel(**model_to_dict(function))
  194. except:
  195. return None
  196. def deactivate_all_functions(self) -> Optional[bool]:
  197. try:
  198. query = Function.update(
  199. **{"is_active": False},
  200. updated_at=int(time.time()),
  201. )
  202. query.execute()
  203. return True
  204. except:
  205. return None
  206. def delete_function_by_id(self, id: str) -> bool:
  207. try:
  208. query = Function.delete().where((Function.id == id))
  209. query.execute() # Remove the rows, return number of rows removed.
  210. return True
  211. except:
  212. return False
  213. Functions = FunctionsTable(DB)