tools.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. import inspect
  2. import logging
  3. import re
  4. from typing import Any, Awaitable, Callable, get_type_hints
  5. from functools import update_wrapper, partial
  6. from langchain_core.utils.function_calling import convert_to_openai_function
  7. from open_webui.apps.webui.models.tools import Tools
  8. from open_webui.apps.webui.models.users import UserModel
  9. from open_webui.apps.webui.utils import load_tools_module_by_id
  10. from pydantic import BaseModel, Field, create_model
  11. log = logging.getLogger(__name__)
  12. def apply_extra_params_to_tool_function(
  13. function: Callable, extra_params: dict
  14. ) -> Callable[..., Awaitable]:
  15. sig = inspect.signature(function)
  16. extra_params = {k: v for k, v in extra_params.items() if k in sig.parameters}
  17. partial_func = partial(function, **extra_params)
  18. if inspect.iscoroutinefunction(function):
  19. update_wrapper(partial_func, function)
  20. return partial_func
  21. async def new_function(*args, **kwargs):
  22. return partial_func(*args, **kwargs)
  23. update_wrapper(new_function, function)
  24. return new_function
  25. # Mutation on extra_params
  26. def get_tools(
  27. webui_app, tool_ids: list[str], user: UserModel, extra_params: dict
  28. ) -> dict[str, dict]:
  29. tools_dict = {}
  30. for tool_id in tool_ids:
  31. tools = Tools.get_tool_by_id(tool_id)
  32. if tools is None:
  33. continue
  34. module = webui_app.state.TOOLS.get(tool_id, None)
  35. if module is None:
  36. module, _ = load_tools_module_by_id(tool_id)
  37. webui_app.state.TOOLS[tool_id] = module
  38. extra_params["__id__"] = tool_id
  39. if hasattr(module, "valves") and hasattr(module, "Valves"):
  40. valves = Tools.get_tool_valves_by_id(tool_id) or {}
  41. module.valves = module.Valves(**valves)
  42. if hasattr(module, "UserValves"):
  43. extra_params["__user__"]["valves"] = module.UserValves( # type: ignore
  44. **Tools.get_user_valves_by_id_and_user_id(tool_id, user.id)
  45. )
  46. for spec in tools.specs:
  47. # Remove internal parameters
  48. spec["parameters"]["properties"] = {
  49. key: val
  50. for key, val in spec["parameters"]["properties"].items()
  51. if not key.startswith("__")
  52. }
  53. function_name = spec["name"]
  54. # convert to function that takes only model params and inserts custom params
  55. original_func = getattr(module, function_name)
  56. callable = apply_extra_params_to_tool_function(original_func, extra_params)
  57. # TODO: This needs to be a pydantic model
  58. tool_dict = {
  59. "toolkit_id": tool_id,
  60. "callable": callable,
  61. "spec": spec,
  62. "pydantic_model": function_to_pydantic_model(callable),
  63. "file_handler": hasattr(module, "file_handler") and module.file_handler,
  64. "citation": hasattr(module, "citation") and module.citation,
  65. }
  66. # TODO: if collision, prepend toolkit name
  67. if function_name in tools_dict:
  68. log.warning(f"Tool {function_name} already exists in another tools!")
  69. log.warning(f"Collision between {tools} and {tool_id}.")
  70. log.warning(f"Discarding {tools}.{function_name}")
  71. else:
  72. tools_dict[function_name] = tool_dict
  73. return tools_dict
  74. def parse_docstring(docstring):
  75. """
  76. Parse a function's docstring to extract parameter descriptions in reST format.
  77. Args:
  78. docstring (str): The docstring to parse.
  79. Returns:
  80. dict: A dictionary where keys are parameter names and values are descriptions.
  81. """
  82. if not docstring:
  83. return {}
  84. # Regex to match `:param name: description` format
  85. param_pattern = re.compile(r":param (\w+):\s*(.+)")
  86. param_descriptions = {}
  87. for line in docstring.splitlines():
  88. match = param_pattern.match(line.strip())
  89. if match:
  90. param_name, param_description = match.groups()
  91. param_descriptions[param_name] = param_description
  92. return param_descriptions
  93. def function_to_pydantic_model(func: Callable) -> type[BaseModel]:
  94. """
  95. Converts a Python function's type hints and docstring to a Pydantic model,
  96. including support for nested types, default values, and descriptions.
  97. Args:
  98. func: The function whose type hints and docstring should be converted.
  99. model_name: The name of the generated Pydantic model.
  100. Returns:
  101. A Pydantic model class.
  102. """
  103. type_hints = get_type_hints(func)
  104. signature = inspect.signature(func)
  105. parameters = signature.parameters
  106. docstring = func.__doc__
  107. descriptions = parse_docstring(docstring)
  108. field_defs = {}
  109. for name, param in parameters.items():
  110. type_hint = type_hints.get(name, Any)
  111. default_value = param.default if param.default is not param.empty else ...
  112. description = descriptions.get(name, None)
  113. if not description:
  114. field_defs[name] = type_hint, default_value
  115. continue
  116. field_defs[name] = type_hint, Field(default_value, description=description)
  117. return create_model(func.__name__, **field_defs)
  118. def get_callable_attributes(tool: object) -> list[Callable]:
  119. return [
  120. getattr(tool, func)
  121. for func in dir(tool)
  122. if callable(getattr(tool, func))
  123. and not func.startswith("__")
  124. and not inspect.isclass(getattr(tool, func))
  125. ]
  126. def get_tools_specs(tool_class: object) -> list[dict]:
  127. function_list = get_callable_attributes(tool_class)
  128. models = map(function_to_pydantic_model, function_list)
  129. return [convert_to_openai_function(tool) for tool in models]