utils.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. import os
  2. import re
  3. import subprocess
  4. import sys
  5. from importlib import util
  6. from open_webui.apps.webui.models.functions import Functions
  7. from open_webui.apps.webui.models.tools import Tools
  8. from open_webui.config import FUNCTIONS_DIR, TOOLS_DIR
  9. def extract_frontmatter(file_path):
  10. """
  11. Extract frontmatter as a dictionary from the specified file path.
  12. """
  13. frontmatter = {}
  14. frontmatter_started = False
  15. frontmatter_ended = False
  16. frontmatter_pattern = re.compile(r"^\s*([a-z_]+):\s*(.*)\s*$", re.IGNORECASE)
  17. try:
  18. with open(file_path, "r", encoding="utf-8") as file:
  19. first_line = file.readline()
  20. if first_line.strip() != '"""':
  21. # The file doesn't start with triple quotes
  22. return {}
  23. frontmatter_started = True
  24. for line in file:
  25. if '"""' in line:
  26. if frontmatter_started:
  27. frontmatter_ended = True
  28. break
  29. if frontmatter_started and not frontmatter_ended:
  30. match = frontmatter_pattern.match(line)
  31. if match:
  32. key, value = match.groups()
  33. frontmatter[key.strip()] = value.strip()
  34. except FileNotFoundError:
  35. print(f"Error: The file {file_path} does not exist.")
  36. return {}
  37. except Exception as e:
  38. print(f"An error occurred: {e}")
  39. return {}
  40. return frontmatter
  41. def load_toolkit_module_by_id(toolkit_id):
  42. toolkit_path = os.path.join(TOOLS_DIR, f"{toolkit_id}.py")
  43. if not os.path.exists(toolkit_path):
  44. tool = Tools.get_tool_by_id(toolkit_id)
  45. if tool:
  46. with open(toolkit_path, "w") as file:
  47. file.write(tool.content)
  48. else:
  49. raise Exception(f"Toolkit not found: {toolkit_id}")
  50. spec = util.spec_from_file_location(toolkit_id, toolkit_path)
  51. module = util.module_from_spec(spec)
  52. frontmatter = extract_frontmatter(toolkit_path)
  53. try:
  54. install_frontmatter_requirements(frontmatter.get("requirements", ""))
  55. spec.loader.exec_module(module)
  56. print(f"Loaded module: {module.__name__}")
  57. if hasattr(module, "Tools"):
  58. return module.Tools(), frontmatter
  59. else:
  60. raise Exception("No Tools class found")
  61. except Exception as e:
  62. print(f"Error loading module: {toolkit_id}")
  63. # Move the file to the error folder
  64. os.rename(toolkit_path, f"{toolkit_path}.error")
  65. raise e
  66. def load_function_module_by_id(function_id):
  67. function_path = os.path.join(FUNCTIONS_DIR, f"{function_id}.py")
  68. if not os.path.exists(function_path):
  69. function = Functions.get_function_by_id(function_id)
  70. if function:
  71. with open(function_path, "w") as file:
  72. file.write(function.content)
  73. else:
  74. raise Exception(f"Function not found: {function_id}")
  75. spec = util.spec_from_file_location(function_id, function_path)
  76. module = util.module_from_spec(spec)
  77. frontmatter = extract_frontmatter(function_path)
  78. try:
  79. install_frontmatter_requirements(frontmatter.get("requirements", ""))
  80. spec.loader.exec_module(module)
  81. print(f"Loaded module: {module.__name__}")
  82. if hasattr(module, "Pipe"):
  83. return module.Pipe(), "pipe", frontmatter
  84. elif hasattr(module, "Filter"):
  85. return module.Filter(), "filter", frontmatter
  86. elif hasattr(module, "Action"):
  87. return module.Action(), "action", frontmatter
  88. else:
  89. raise Exception("No Function class found")
  90. except Exception as e:
  91. print(f"Error loading module: {function_id}")
  92. # Move the file to the error folder
  93. os.rename(function_path, f"{function_path}.error")
  94. raise e
  95. def install_frontmatter_requirements(requirements):
  96. if requirements:
  97. req_list = [req.strip() for req in requirements.split(",")]
  98. for req in req_list:
  99. print(f"Installing requirement: {req}")
  100. subprocess.check_call([sys.executable, "-m", "pip", "install", req])
  101. else:
  102. print("No requirements found in frontmatter.")