tools.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. import inspect
  2. import logging
  3. import re
  4. import inspect
  5. import uuid
  6. from typing import Any, Awaitable, Callable, get_type_hints
  7. from functools import update_wrapper, partial
  8. from fastapi import Request
  9. from pydantic import BaseModel, Field, create_model
  10. from langchain_core.utils.function_calling import convert_to_openai_function
  11. from open_webui.models.tools import Tools
  12. from open_webui.models.users import UserModel
  13. from open_webui.utils.plugin import load_tools_module_by_id
  14. log = logging.getLogger(__name__)
  15. def apply_extra_params_to_tool_function(
  16. function: Callable, extra_params: dict
  17. ) -> Callable[..., Awaitable]:
  18. sig = inspect.signature(function)
  19. extra_params = {k: v for k, v in extra_params.items() if k in sig.parameters}
  20. partial_func = partial(function, **extra_params)
  21. if inspect.iscoroutinefunction(function):
  22. update_wrapper(partial_func, function)
  23. return partial_func
  24. async def new_function(*args, **kwargs):
  25. return partial_func(*args, **kwargs)
  26. update_wrapper(new_function, function)
  27. return new_function
  28. # Mutation on extra_params
  29. def get_tools(
  30. request: Request, tool_ids: list[str], user: UserModel, extra_params: dict
  31. ) -> dict[str, dict]:
  32. tools_dict = {}
  33. for tool_id in tool_ids:
  34. tools = Tools.get_tool_by_id(tool_id)
  35. if tools is None:
  36. continue
  37. module = request.app.state.TOOLS.get(tool_id, None)
  38. if module is None:
  39. module, _ = load_tools_module_by_id(tool_id)
  40. request.app.state.TOOLS[tool_id] = module
  41. extra_params["__id__"] = tool_id
  42. if hasattr(module, "valves") and hasattr(module, "Valves"):
  43. valves = Tools.get_tool_valves_by_id(tool_id) or {}
  44. module.valves = module.Valves(**valves)
  45. if hasattr(module, "UserValves"):
  46. extra_params["__user__"]["valves"] = module.UserValves( # type: ignore
  47. **Tools.get_user_valves_by_id_and_user_id(tool_id, user.id)
  48. )
  49. for spec in tools.specs:
  50. # TODO: Fix hack for OpenAI API
  51. # Some times breaks OpenAI but others don't. Leaving the comment
  52. for val in spec.get("parameters", {}).get("properties", {}).values():
  53. if val["type"] == "str":
  54. val["type"] = "string"
  55. # Remove internal parameters
  56. spec["parameters"]["properties"] = {
  57. key: val
  58. for key, val in spec["parameters"]["properties"].items()
  59. if not key.startswith("__")
  60. }
  61. function_name = spec["name"]
  62. # convert to function that takes only model params and inserts custom params
  63. original_func = getattr(module, function_name)
  64. callable = apply_extra_params_to_tool_function(original_func, extra_params)
  65. if callable.__doc__ and callable.__doc__.strip() != "":
  66. s = re.split(":(param|return)", callable.__doc__, 1)
  67. spec["description"] = s[0]
  68. else:
  69. spec["description"] = function_name
  70. # TODO: This needs to be a pydantic model
  71. tool_dict = {
  72. "toolkit_id": tool_id,
  73. "callable": callable,
  74. "spec": spec,
  75. "pydantic_model": function_to_pydantic_model(callable),
  76. "file_handler": hasattr(module, "file_handler") and module.file_handler,
  77. "citation": hasattr(module, "citation") and module.citation,
  78. }
  79. # TODO: if collision, prepend toolkit name
  80. if function_name in tools_dict:
  81. log.warning(f"Tool {function_name} already exists in another tools!")
  82. log.warning(f"Collision between {tools} and {tool_id}.")
  83. log.warning(f"Discarding {tools}.{function_name}")
  84. else:
  85. tools_dict[function_name] = tool_dict
  86. return tools_dict
  87. def parse_description(docstring: str | None) -> str:
  88. """
  89. Parse a function's docstring to extract the description.
  90. Args:
  91. docstring (str): The docstring to parse.
  92. Returns:
  93. str: The description.
  94. """
  95. if not docstring:
  96. return ""
  97. lines = [line.strip() for line in docstring.strip().split("\n")]
  98. description_lines: list[str] = []
  99. for line in lines:
  100. if re.match(r":param", line) or re.match(r":return", line):
  101. break
  102. description_lines.append(line)
  103. return "\n".join(description_lines)
  104. def parse_docstring(docstring):
  105. """
  106. Parse a function's docstring to extract parameter descriptions in reST format.
  107. Args:
  108. docstring (str): The docstring to parse.
  109. Returns:
  110. dict: A dictionary where keys are parameter names and values are descriptions.
  111. """
  112. if not docstring:
  113. return {}
  114. # Regex to match `:param name: description` format
  115. param_pattern = re.compile(r":param (\w+):\s*(.+)")
  116. param_descriptions = {}
  117. for line in docstring.splitlines():
  118. match = param_pattern.match(line.strip())
  119. if not match:
  120. continue
  121. param_name, param_description = match.groups()
  122. if param_name.startswith("__"):
  123. continue
  124. param_descriptions[param_name] = param_description
  125. return param_descriptions
  126. def function_to_pydantic_model(func: Callable) -> type[BaseModel]:
  127. """
  128. Converts a Python function's type hints and docstring to a Pydantic model,
  129. including support for nested types, default values, and descriptions.
  130. Args:
  131. func: The function whose type hints and docstring should be converted.
  132. model_name: The name of the generated Pydantic model.
  133. Returns:
  134. A Pydantic model class.
  135. """
  136. type_hints = get_type_hints(func)
  137. signature = inspect.signature(func)
  138. parameters = signature.parameters
  139. docstring = func.__doc__
  140. descriptions = parse_docstring(docstring)
  141. tool_description = parse_description(docstring)
  142. field_defs = {}
  143. for name, param in parameters.items():
  144. type_hint = type_hints.get(name, Any)
  145. default_value = param.default if param.default is not param.empty else ...
  146. description = descriptions.get(name, None)
  147. if not description:
  148. field_defs[name] = type_hint, default_value
  149. continue
  150. field_defs[name] = type_hint, Field(default_value, description=description)
  151. model = create_model(func.__name__, **field_defs)
  152. model.__doc__ = tool_description
  153. return model
  154. def get_callable_attributes(tool: object) -> list[Callable]:
  155. return [
  156. getattr(tool, func)
  157. for func in dir(tool)
  158. if callable(getattr(tool, func))
  159. and not func.startswith("__")
  160. and not inspect.isclass(getattr(tool, func))
  161. ]
  162. def get_tools_specs(tool_class: object) -> list[dict]:
  163. function_list = get_callable_attributes(tool_class)
  164. models = map(function_to_pydantic_model, function_list)
  165. return [convert_to_openai_function(tool) for tool in models]