utils.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. from importlib import util
  2. import os
  3. import re
  4. import sys
  5. import subprocess
  6. from apps.webui.models.tools import Tools
  7. from apps.webui.models.functions import Functions
  8. from config import TOOLS_DIR, FUNCTIONS_DIR
  9. def extract_frontmatter(file_path):
  10. """
  11. Extract frontmatter as a dictionary from the specified file path.
  12. """
  13. frontmatter = {}
  14. frontmatter_started = False
  15. frontmatter_ended = False
  16. frontmatter_pattern = re.compile(r"^\s*([a-z_]+):\s*(.*)\s*$", re.IGNORECASE)
  17. try:
  18. with open(file_path, "r", encoding="utf-8") as file:
  19. first_line = file.readline()
  20. if first_line.strip() != '"""':
  21. # The file doesn't start with triple quotes
  22. return {}
  23. frontmatter_started = True
  24. for line in file:
  25. if '"""' in line:
  26. if frontmatter_started:
  27. frontmatter_ended = True
  28. break
  29. if frontmatter_started and not frontmatter_ended:
  30. match = frontmatter_pattern.match(line)
  31. if match:
  32. key, value = match.groups()
  33. frontmatter[key.strip()] = value.strip()
  34. except FileNotFoundError:
  35. print(f"Error: The file {file_path} does not exist.")
  36. return {}
  37. except Exception as e:
  38. print(f"An error occurred: {e}")
  39. return {}
  40. return frontmatter
  41. def load_toolkit_module_by_id(toolkit_id):
  42. toolkit_path = os.path.join(TOOLS_DIR, f"{toolkit_id}.py")
  43. if not os.path.exists(toolkit_path):
  44. tool = Tools.get_tool_by_id(toolkit_id)
  45. if tool:
  46. with open(toolkit_path, "w") as file:
  47. file.write(tool.content)
  48. else:
  49. raise Exception(f"Toolkit not found: {toolkit_id}")
  50. spec = util.spec_from_file_location(toolkit_id, toolkit_path)
  51. module = util.module_from_spec(spec)
  52. frontmatter = extract_frontmatter(toolkit_path)
  53. try:
  54. install_frontmatter_requirements(frontmatter.get("requirements", ""))
  55. spec.loader.exec_module(module)
  56. print(f"Loaded module: {module.__name__}")
  57. if hasattr(module, "Tools"):
  58. return module.Tools(), frontmatter
  59. else:
  60. raise Exception("No Tools class found")
  61. except Exception as e:
  62. print(f"Error loading module: {toolkit_id}")
  63. # Move the file to the error folder
  64. os.rename(toolkit_path, f"{toolkit_path}.error")
  65. raise e
  66. def load_function_module_by_id(function_id):
  67. function_path = os.path.join(FUNCTIONS_DIR, f"{function_id}.py")
  68. if not os.path.exists(function_path):
  69. function = Functions.get_function_by_id(function_id)
  70. if function:
  71. with open(function_path, "w") as file:
  72. file.write(function.content)
  73. else:
  74. raise Exception(f"Function not found: {function_id}")
  75. spec = util.spec_from_file_location(function_id, function_path)
  76. module = util.module_from_spec(spec)
  77. frontmatter = extract_frontmatter(function_path)
  78. try:
  79. install_frontmatter_requirements(frontmatter.get("requirements", ""))
  80. spec.loader.exec_module(module)
  81. print(f"Loaded module: {module.__name__}")
  82. if hasattr(module, "Pipe"):
  83. return module.Pipe(), "pipe", frontmatter
  84. elif hasattr(module, "Filter"):
  85. return module.Filter(), "filter", frontmatter
  86. elif hasattr(module, "Action"):
  87. return module.Action(), "action", frontmatter
  88. else:
  89. raise Exception("No Function class found")
  90. except Exception as e:
  91. print(f"Error loading module: {function_id}")
  92. # Move the file to the error folder
  93. os.rename(function_path, f"{function_path}.error")
  94. raise e
  95. def install_frontmatter_requirements(requirements):
  96. if requirements:
  97. req_list = [req.strip() for req in requirements.split(",")]
  98. for req in req_list:
  99. print(f"Installing requirement: {req}")
  100. subprocess.check_call([sys.executable, "-m", "pip", "install", req])
  101. else:
  102. print("No requirements found in frontmatter.")