functions.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. from pydantic import BaseModel
  2. from peewee import *
  3. from playhouse.shortcuts import model_to_dict
  4. from typing import List, Union, Optional
  5. import time
  6. import logging
  7. from apps.webui.internal.db import DB, JSONField
  8. from apps.webui.models.users import Users
  9. import json
  10. import copy
  11. from config import SRC_LOG_LEVELS
  12. log = logging.getLogger(__name__)
  13. log.setLevel(SRC_LOG_LEVELS["MODELS"])
  14. ####################
  15. # Functions DB Schema
  16. ####################
  17. class Function(Model):
  18. id = CharField(unique=True)
  19. user_id = CharField()
  20. name = TextField()
  21. type = TextField()
  22. content = TextField()
  23. meta = JSONField()
  24. updated_at = BigIntegerField()
  25. created_at = BigIntegerField()
  26. class Meta:
  27. database = DB
  28. class FunctionMeta(BaseModel):
  29. description: Optional[str] = None
  30. class FunctionModel(BaseModel):
  31. id: str
  32. user_id: str
  33. name: str
  34. type: str
  35. content: str
  36. meta: FunctionMeta
  37. updated_at: int # timestamp in epoch
  38. created_at: int # timestamp in epoch
  39. ####################
  40. # Forms
  41. ####################
  42. class FunctionResponse(BaseModel):
  43. id: str
  44. user_id: str
  45. type: str
  46. name: str
  47. meta: FunctionMeta
  48. updated_at: int # timestamp in epoch
  49. created_at: int # timestamp in epoch
  50. class FunctionForm(BaseModel):
  51. id: str
  52. name: str
  53. content: str
  54. meta: FunctionMeta
  55. class FunctionsTable:
  56. def __init__(self, db):
  57. self.db = db
  58. self.db.create_tables([Function])
  59. def insert_new_function(
  60. self, user_id: str, type: str, form_data: FunctionForm
  61. ) -> Optional[FunctionModel]:
  62. function = FunctionModel(
  63. **{
  64. **form_data.model_dump(),
  65. "user_id": user_id,
  66. "type": type,
  67. "updated_at": int(time.time()),
  68. "created_at": int(time.time()),
  69. }
  70. )
  71. try:
  72. result = Function.create(**function.model_dump())
  73. if result:
  74. return function
  75. else:
  76. return None
  77. except Exception as e:
  78. print(f"Error creating tool: {e}")
  79. return None
  80. def get_function_by_id(self, id: str) -> Optional[FunctionModel]:
  81. try:
  82. function = Function.get(Function.id == id)
  83. return FunctionModel(**model_to_dict(function))
  84. except:
  85. return None
  86. def get_functions(self) -> List[FunctionModel]:
  87. return [
  88. FunctionModel(**model_to_dict(function)) for function in Function.select()
  89. ]
  90. def get_functions_by_type(self, type: str) -> List[FunctionModel]:
  91. return [
  92. FunctionModel(**model_to_dict(function))
  93. for function in Function.select().where(Function.type == type)
  94. ]
  95. def get_user_valves_by_id_and_user_id(
  96. self, id: str, user_id: str
  97. ) -> Optional[dict]:
  98. try:
  99. user = Users.get_user_by_id(user_id)
  100. user_settings = user.settings.model_dump()
  101. # Check if user has "functions" and "valves" settings
  102. if "functions" not in user_settings:
  103. user_settings["functions"] = {}
  104. if "valves" not in user_settings["functions"]:
  105. user_settings["functions"]["valves"] = {}
  106. return user_settings["functions"]["valves"].get(id, {})
  107. except Exception as e:
  108. print(f"An error occurred: {e}")
  109. return None
  110. def update_user_valves_by_id_and_user_id(
  111. self, id: str, user_id: str, valves: dict
  112. ) -> Optional[dict]:
  113. try:
  114. user = Users.get_user_by_id(user_id)
  115. user_settings = user.settings.model_dump()
  116. # Check if user has "functions" and "valves" settings
  117. if "functions" not in user_settings:
  118. user_settings["functions"] = {}
  119. if "valves" not in user_settings["functions"]:
  120. user_settings["functions"]["valves"] = {}
  121. user_settings["functions"]["valves"][id] = valves
  122. # Update the user settings in the database
  123. query = Users.update_user_by_id(user_id, {"settings": user_settings})
  124. query.execute()
  125. return user_settings["functions"]["valves"][id]
  126. except Exception as e:
  127. print(f"An error occurred: {e}")
  128. return None
  129. def update_function_by_id(self, id: str, updated: dict) -> Optional[FunctionModel]:
  130. try:
  131. query = Function.update(
  132. **updated,
  133. updated_at=int(time.time()),
  134. ).where(Function.id == id)
  135. query.execute()
  136. function = Function.get(Function.id == id)
  137. return FunctionModel(**model_to_dict(function))
  138. except:
  139. return None
  140. def delete_function_by_id(self, id: str) -> bool:
  141. try:
  142. query = Function.delete().where((Function.id == id))
  143. query.execute() # Remove the rows, return number of rows removed.
  144. return True
  145. except:
  146. return False
  147. Functions = FunctionsTable(DB)