functions.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. from pydantic import BaseModel
  2. from peewee import *
  3. from playhouse.shortcuts import model_to_dict
  4. from typing import List, Union, Optional
  5. import time
  6. import logging
  7. from apps.webui.internal.db import DB, JSONField
  8. from apps.webui.models.users import Users
  9. import json
  10. import copy
  11. from config import SRC_LOG_LEVELS
  12. log = logging.getLogger(__name__)
  13. log.setLevel(SRC_LOG_LEVELS["MODELS"])
  14. ####################
  15. # Functions DB Schema
  16. ####################
  17. class Function(Model):
  18. id = CharField(unique=True)
  19. user_id = CharField()
  20. name = TextField()
  21. type = TextField()
  22. content = TextField()
  23. meta = JSONField()
  24. valves = JSONField()
  25. is_active = BooleanField(default=False)
  26. updated_at = BigIntegerField()
  27. created_at = BigIntegerField()
  28. class Meta:
  29. database = DB
  30. class FunctionMeta(BaseModel):
  31. description: Optional[str] = None
  32. class FunctionModel(BaseModel):
  33. id: str
  34. user_id: str
  35. name: str
  36. type: str
  37. content: str
  38. meta: FunctionMeta
  39. is_active: bool
  40. updated_at: int # timestamp in epoch
  41. created_at: int # timestamp in epoch
  42. ####################
  43. # Forms
  44. ####################
  45. class FunctionResponse(BaseModel):
  46. id: str
  47. user_id: str
  48. type: str
  49. name: str
  50. meta: FunctionMeta
  51. is_active: bool
  52. updated_at: int # timestamp in epoch
  53. created_at: int # timestamp in epoch
  54. class FunctionForm(BaseModel):
  55. id: str
  56. name: str
  57. content: str
  58. meta: FunctionMeta
  59. class FunctionValves(BaseModel):
  60. valves: Optional[dict] = None
  61. class FunctionsTable:
  62. def __init__(self, db):
  63. self.db = db
  64. self.db.create_tables([Function])
  65. def insert_new_function(
  66. self, user_id: str, type: str, form_data: FunctionForm
  67. ) -> Optional[FunctionModel]:
  68. function = FunctionModel(
  69. **{
  70. **form_data.model_dump(),
  71. "user_id": user_id,
  72. "type": type,
  73. "updated_at": int(time.time()),
  74. "created_at": int(time.time()),
  75. }
  76. )
  77. try:
  78. result = Function.create(**function.model_dump())
  79. if result:
  80. return function
  81. else:
  82. return None
  83. except Exception as e:
  84. print(f"Error creating tool: {e}")
  85. return None
  86. def get_function_by_id(self, id: str) -> Optional[FunctionModel]:
  87. try:
  88. function = Function.get(Function.id == id)
  89. return FunctionModel(**model_to_dict(function))
  90. except:
  91. return None
  92. def get_functions(self) -> List[FunctionModel]:
  93. return [
  94. FunctionModel(**model_to_dict(function)) for function in Function.select()
  95. ]
  96. def get_functions_by_type(self, type: str) -> List[FunctionModel]:
  97. return [
  98. FunctionModel(**model_to_dict(function))
  99. for function in Function.select().where(Function.type == type)
  100. ]
  101. def get_user_valves_by_id_and_user_id(
  102. self, id: str, user_id: str
  103. ) -> Optional[dict]:
  104. try:
  105. user = Users.get_user_by_id(user_id)
  106. user_settings = user.settings.model_dump()
  107. # Check if user has "functions" and "valves" settings
  108. if "functions" not in user_settings:
  109. user_settings["functions"] = {}
  110. if "valves" not in user_settings["functions"]:
  111. user_settings["functions"]["valves"] = {}
  112. return user_settings["functions"]["valves"].get(id, {})
  113. except Exception as e:
  114. print(f"An error occurred: {e}")
  115. return None
  116. def update_user_valves_by_id_and_user_id(
  117. self, id: str, user_id: str, valves: dict
  118. ) -> Optional[dict]:
  119. try:
  120. user = Users.get_user_by_id(user_id)
  121. user_settings = user.settings.model_dump()
  122. # Check if user has "functions" and "valves" settings
  123. if "functions" not in user_settings:
  124. user_settings["functions"] = {}
  125. if "valves" not in user_settings["functions"]:
  126. user_settings["functions"]["valves"] = {}
  127. user_settings["functions"]["valves"][id] = valves
  128. # Update the user settings in the database
  129. query = Users.update_user_by_id(user_id, {"settings": user_settings})
  130. query.execute()
  131. return user_settings["functions"]["valves"][id]
  132. except Exception as e:
  133. print(f"An error occurred: {e}")
  134. return None
  135. def update_function_by_id(self, id: str, updated: dict) -> Optional[FunctionModel]:
  136. try:
  137. query = Function.update(
  138. **updated,
  139. updated_at=int(time.time()),
  140. ).where(Function.id == id)
  141. query.execute()
  142. function = Function.get(Function.id == id)
  143. return FunctionModel(**model_to_dict(function))
  144. except:
  145. return None
  146. def delete_function_by_id(self, id: str) -> bool:
  147. try:
  148. query = Function.delete().where((Function.id == id))
  149. query.execute() # Remove the rows, return number of rows removed.
  150. return True
  151. except:
  152. return False
  153. Functions = FunctionsTable(DB)