123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- import os
- import re
- import subprocess
- import sys
- from importlib import util
- import types
- import tempfile
- import logging
- from open_webui.env import SRC_LOG_LEVELS
- from open_webui.models.functions import Functions
- from open_webui.models.tools import Tools
- log = logging.getLogger(__name__)
- log.setLevel(SRC_LOG_LEVELS["MAIN"])
- def extract_frontmatter(content):
- """
- Extract frontmatter as a dictionary from the provided content string.
- """
- frontmatter = {}
- frontmatter_started = False
- frontmatter_ended = False
- frontmatter_pattern = re.compile(r"^\s*([a-z_]+):\s*(.*)\s*$", re.IGNORECASE)
- try:
- lines = content.splitlines()
- if len(lines) < 1 or lines[0].strip() != '"""':
- # The content doesn't start with triple quotes
- return {}
- frontmatter_started = True
- for line in lines[1:]:
- if '"""' in line:
- if frontmatter_started:
- frontmatter_ended = True
- break
- if frontmatter_started and not frontmatter_ended:
- match = frontmatter_pattern.match(line)
- if match:
- key, value = match.groups()
- frontmatter[key.strip()] = value.strip()
- except Exception as e:
- print(f"An error occurred: {e}")
- return {}
- return frontmatter
- def replace_imports(content):
- """
- Replace the import paths in the content.
- """
- replacements = {
- "from utils": "from open_webui.utils",
- "from apps": "from open_webui.apps",
- "from main": "from open_webui.main",
- "from config": "from open_webui.config",
- }
- for old, new in replacements.items():
- content = content.replace(old, new)
- return content
- def load_tools_module_by_id(toolkit_id, content=None):
- if content is None:
- tool = Tools.get_tool_by_id(toolkit_id)
- if not tool:
- raise Exception(f"Toolkit not found: {toolkit_id}")
- content = tool.content
- content = replace_imports(content)
- Tools.update_tool_by_id(toolkit_id, {"content": content})
- else:
- frontmatter = extract_frontmatter(content)
- # Install required packages found within the frontmatter
- install_frontmatter_requirements(frontmatter.get("requirements", ""))
- module_name = f"tool_{toolkit_id}"
- module = types.ModuleType(module_name)
- sys.modules[module_name] = module
- # Create a temporary file and use it to define `__file__` so
- # that it works as expected from the module's perspective.
- temp_file = tempfile.NamedTemporaryFile(delete=False)
- temp_file.close()
- try:
- with open(temp_file.name, "w", encoding="utf-8") as f:
- f.write(content)
- module.__dict__["__file__"] = temp_file.name
- # Executing the modified content in the created module's namespace
- exec(content, module.__dict__)
- frontmatter = extract_frontmatter(content)
- log.info(f"Loaded module: {module.__name__}")
- # Create and return the object if the class 'Tools' is found in the module
- if hasattr(module, "Tools"):
- return module.Tools(), frontmatter
- else:
- raise Exception("No Tools class found in the module")
- except Exception as e:
- log.error(f"Error loading module: {toolkit_id}: {e}")
- del sys.modules[module_name] # Clean up
- raise e
- finally:
- os.unlink(temp_file.name)
- def load_function_module_by_id(function_id, content=None):
- if content is None:
- function = Functions.get_function_by_id(function_id)
- if not function:
- raise Exception(f"Function not found: {function_id}")
- content = function.content
- content = replace_imports(content)
- Functions.update_function_by_id(function_id, {"content": content})
- else:
- frontmatter = extract_frontmatter(content)
- install_frontmatter_requirements(frontmatter.get("requirements", ""))
- module_name = f"function_{function_id}"
- module = types.ModuleType(module_name)
- sys.modules[module_name] = module
- # Create a temporary file and use it to define `__file__` so
- # that it works as expected from the module's perspective.
- temp_file = tempfile.NamedTemporaryFile(delete=False)
- temp_file.close()
- try:
- with open(temp_file.name, "w", encoding="utf-8") as f:
- f.write(content)
- module.__dict__["__file__"] = temp_file.name
- # Execute the modified content in the created module's namespace
- exec(content, module.__dict__)
- frontmatter = extract_frontmatter(content)
- log.info(f"Loaded module: {module.__name__}")
- # Create appropriate object based on available class type in the module
- if hasattr(module, "Pipe"):
- return module.Pipe(), "pipe", frontmatter
- elif hasattr(module, "Filter"):
- return module.Filter(), "filter", frontmatter
- elif hasattr(module, "Action"):
- return module.Action(), "action", frontmatter
- else:
- raise Exception("No Function class found in the module")
- except Exception as e:
- log.error(f"Error loading module: {function_id}: {e}")
- del sys.modules[module_name] # Cleanup by removing the module in case of error
- Functions.update_function_by_id(function_id, {"is_active": False})
- raise e
- finally:
- os.unlink(temp_file.name)
- def install_frontmatter_requirements(requirements):
- if requirements:
- try:
- req_list = [req.strip() for req in requirements.split(",")]
- for req in req_list:
- log.info(f"Installing requirement: {req}")
- subprocess.check_call([sys.executable, "-m", "pip", "install", req])
- except Exception as e:
- log.error(f"Error installing package: {req}")
- raise e
- else:
- log.info("No requirements found in frontmatter.")
|