utils.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. from importlib import util
  2. import os
  3. from config import TOOLS_DIR, FUNCTIONS_DIR
  4. def load_toolkit_module_by_id(toolkit_id):
  5. toolkit_path = os.path.join(TOOLS_DIR, f"{toolkit_id}.py")
  6. spec = util.spec_from_file_location(toolkit_id, toolkit_path)
  7. module = util.module_from_spec(spec)
  8. try:
  9. spec.loader.exec_module(module)
  10. print(f"Loaded module: {module.__name__}")
  11. if hasattr(module, "Tools"):
  12. return module.Tools()
  13. else:
  14. raise Exception("No Tools class found")
  15. except Exception as e:
  16. print(f"Error loading module: {toolkit_id}")
  17. # Move the file to the error folder
  18. os.rename(toolkit_path, f"{toolkit_path}.error")
  19. raise e
  20. def load_function_module_by_id(function_id):
  21. function_path = os.path.join(FUNCTIONS_DIR, f"{function_id}.py")
  22. spec = util.spec_from_file_location(function_id, function_path)
  23. module = util.module_from_spec(spec)
  24. try:
  25. spec.loader.exec_module(module)
  26. print(f"Loaded module: {module.__name__}")
  27. if hasattr(module, "Pipe"):
  28. return module.Pipe(), "pipe"
  29. elif hasattr(module, "Filter"):
  30. return module.Filter(), "filter"
  31. else:
  32. raise Exception("No Function class found")
  33. except Exception as e:
  34. print(f"Error loading module: {function_id}")
  35. # Move the file to the error folder
  36. os.rename(function_path, f"{function_path}.error")
  37. raise e