tools.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. import inspect
  2. import logging
  3. from typing import Awaitable, Callable, get_type_hints
  4. from apps.webui.models.tools import Tools
  5. from apps.webui.models.users import UserModel
  6. from apps.webui.utils import load_toolkit_module_by_id
  7. log = logging.getLogger(__name__)
  8. def apply_extra_params_to_tool_function(
  9. function: Callable, extra_params: dict
  10. ) -> Callable[..., Awaitable]:
  11. sig = inspect.signature(function)
  12. extra_params = {
  13. key: value for key, value in extra_params.items() if key in sig.parameters
  14. }
  15. is_coroutine = inspect.iscoroutinefunction(function)
  16. async def new_function(**kwargs):
  17. extra_kwargs = kwargs | extra_params
  18. if is_coroutine:
  19. return await function(**extra_kwargs)
  20. return function(**extra_kwargs)
  21. return new_function
  22. # Mutation on extra_params
  23. def get_tools(
  24. webui_app, tool_ids: list[str], user: UserModel, extra_params: dict
  25. ) -> dict[str, dict]:
  26. tools = {}
  27. for tool_id in tool_ids:
  28. toolkit = Tools.get_tool_by_id(tool_id)
  29. if toolkit is None:
  30. continue
  31. module = webui_app.state.TOOLS.get(tool_id, None)
  32. if module is None:
  33. module, _ = load_toolkit_module_by_id(tool_id)
  34. webui_app.state.TOOLS[tool_id] = module
  35. extra_params["__id__"] = tool_id
  36. if hasattr(module, "valves") and hasattr(module, "Valves"):
  37. valves = Tools.get_tool_valves_by_id(tool_id) or {}
  38. module.valves = module.Valves(**valves)
  39. if hasattr(module, "UserValves"):
  40. extra_params["__user__"]["valves"] = module.UserValves( # type: ignore
  41. **Tools.get_user_valves_by_id_and_user_id(tool_id, user.id)
  42. )
  43. for spec in toolkit.specs:
  44. # TODO: Fix hack for OpenAI API
  45. for val in spec.get("parameters", {}).get("properties", {}).values():
  46. if val["type"] == "str":
  47. val["type"] = "string"
  48. function_name = spec["name"]
  49. # convert to function that takes only model params and inserts custom params
  50. original_func = getattr(module, function_name)
  51. callable = apply_extra_params_to_tool_function(original_func, extra_params)
  52. if hasattr(original_func, "__doc__"):
  53. callable.__doc__ = original_func.__doc__
  54. # TODO: This needs to be a pydantic model
  55. tool_dict = {
  56. "toolkit_id": tool_id,
  57. "callable": callable,
  58. "spec": spec,
  59. "file_handler": hasattr(module, "file_handler") and module.file_handler,
  60. "citation": hasattr(module, "citation") and module.citation,
  61. }
  62. # TODO: if collision, prepend toolkit name
  63. if function_name in tools:
  64. log.warning(f"Tool {function_name} already exists in another toolkit!")
  65. log.warning(f"Collision between {toolkit} and {tool_id}.")
  66. log.warning(f"Discarding {toolkit}.{function_name}")
  67. else:
  68. tools[function_name] = tool_dict
  69. return tools
  70. def doc_to_dict(docstring):
  71. lines = docstring.split("\n")
  72. description = lines[1].strip()
  73. param_dict = {}
  74. for line in lines:
  75. if ":param" in line:
  76. line = line.replace(":param", "").strip()
  77. param, desc = line.split(":", 1)
  78. param_dict[param.strip()] = desc.strip()
  79. ret_dict = {"description": description, "params": param_dict}
  80. return ret_dict
  81. def get_tools_specs(tools) -> list[dict]:
  82. function_list = [
  83. {"name": func, "function": getattr(tools, func)}
  84. for func in dir(tools)
  85. if callable(getattr(tools, func))
  86. and not func.startswith("__")
  87. and not inspect.isclass(getattr(tools, func))
  88. ]
  89. specs = []
  90. for function_item in function_list:
  91. function_name = function_item["name"]
  92. function = function_item["function"]
  93. function_doc = doc_to_dict(function.__doc__ or function_name)
  94. specs.append(
  95. {
  96. "name": function_name,
  97. # TODO: multi-line desc?
  98. "description": function_doc.get("description", function_name),
  99. "parameters": {
  100. "type": "object",
  101. "properties": {
  102. param_name: {
  103. "type": param_annotation.__name__.lower(),
  104. **(
  105. {
  106. "enum": (
  107. str(param_annotation.__args__)
  108. if hasattr(param_annotation, "__args__")
  109. else None
  110. )
  111. }
  112. if hasattr(param_annotation, "__args__")
  113. else {}
  114. ),
  115. "description": function_doc.get("params", {}).get(
  116. param_name, param_name
  117. ),
  118. }
  119. for param_name, param_annotation in get_type_hints(
  120. function
  121. ).items()
  122. if param_name != "return"
  123. and not (
  124. param_name.startswith("__") and param_name.endswith("__")
  125. )
  126. },
  127. "required": [
  128. name
  129. for name, param in inspect.signature(
  130. function
  131. ).parameters.items()
  132. if param.default is param.empty
  133. and not (name.startswith("__") and name.endswith("__"))
  134. ],
  135. },
  136. }
  137. )
  138. return specs