utils.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import socket
  2. import urllib.parse
  3. import validators
  4. from typing import Union, Sequence, Iterator
  5. from langchain_community.document_loaders import (
  6. WebBaseLoader,
  7. )
  8. from langchain_core.documents import Document
  9. from open_webui.constants import ERROR_MESSAGES
  10. from open_webui.config import ENABLE_RAG_LOCAL_WEB_FETCH
  11. from open_webui.env import SRC_LOG_LEVELS
  12. import logging
  13. log = logging.getLogger(__name__)
  14. log.setLevel(SRC_LOG_LEVELS["RAG"])
  15. def validate_url(url: Union[str, Sequence[str]]):
  16. if isinstance(url, str):
  17. if isinstance(validators.url(url), validators.ValidationError):
  18. raise ValueError(ERROR_MESSAGES.INVALID_URL)
  19. if not ENABLE_RAG_LOCAL_WEB_FETCH:
  20. # Local web fetch is disabled, filter out any URLs that resolve to private IP addresses
  21. parsed_url = urllib.parse.urlparse(url)
  22. # Get IPv4 and IPv6 addresses
  23. ipv4_addresses, ipv6_addresses = resolve_hostname(parsed_url.hostname)
  24. # Check if any of the resolved addresses are private
  25. # This is technically still vulnerable to DNS rebinding attacks, as we don't control WebBaseLoader
  26. for ip in ipv4_addresses:
  27. if validators.ipv4(ip, private=True):
  28. raise ValueError(ERROR_MESSAGES.INVALID_URL)
  29. for ip in ipv6_addresses:
  30. if validators.ipv6(ip, private=True):
  31. raise ValueError(ERROR_MESSAGES.INVALID_URL)
  32. return True
  33. elif isinstance(url, Sequence):
  34. return all(validate_url(u) for u in url)
  35. else:
  36. return False
  37. def safe_validate_urls(url: Sequence[str]) -> Sequence[str]:
  38. valid_urls = []
  39. for u in url:
  40. try:
  41. if validate_url(u):
  42. valid_urls.append(u)
  43. except ValueError:
  44. continue
  45. return valid_urls
  46. def resolve_hostname(hostname):
  47. # Get address information
  48. addr_info = socket.getaddrinfo(hostname, None)
  49. # Extract IP addresses from address information
  50. ipv4_addresses = [info[4][0] for info in addr_info if info[0] == socket.AF_INET]
  51. ipv6_addresses = [info[4][0] for info in addr_info if info[0] == socket.AF_INET6]
  52. return ipv4_addresses, ipv6_addresses
  53. class SafeWebBaseLoader(WebBaseLoader):
  54. """WebBaseLoader with enhanced error handling for URLs."""
  55. def lazy_load(self) -> Iterator[Document]:
  56. """Lazy load text from the url(s) in web_path with error handling."""
  57. for path in self.web_paths:
  58. try:
  59. soup = self._scrape(path, bs_kwargs=self.bs_kwargs)
  60. text = soup.get_text(**self.bs_get_text_kwargs)
  61. # Build metadata
  62. metadata = {"source": path}
  63. if title := soup.find("title"):
  64. metadata["title"] = title.get_text()
  65. if description := soup.find("meta", attrs={"name": "description"}):
  66. metadata["description"] = description.get(
  67. "content", "No description found."
  68. )
  69. if html := soup.find("html"):
  70. metadata["language"] = html.get("lang", "No language found.")
  71. yield Document(page_content=text, metadata=metadata)
  72. except Exception as e:
  73. # Log the error and continue with the next URL
  74. log.error(f"Error loading {path}: {e}")
  75. def get_web_loader(
  76. urls: Union[str, Sequence[str]],
  77. verify_ssl: bool = True,
  78. requests_per_second: int = 2,
  79. ):
  80. # Check if the URLs are valid
  81. safe_urls = safe_validate_urls([urls] if isinstance(urls, str) else urls)
  82. return SafeWebBaseLoader(
  83. safe_urls,
  84. verify_ssl=verify_ssl,
  85. requests_per_second=requests_per_second,
  86. continue_on_failure=True,
  87. )