123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538 |
- import asyncio
- import logging
- import socket
- import ssl
- import urllib.parse
- import urllib.request
- from collections import defaultdict
- from datetime import datetime, time, timedelta
- from typing import (
- Any,
- AsyncIterator,
- Dict,
- Iterator,
- List,
- Optional,
- Sequence,
- Union,
- Literal,
- )
- import aiohttp
- import certifi
- import validators
- from langchain_community.document_loaders import PlaywrightURLLoader, WebBaseLoader
- from langchain_community.document_loaders.firecrawl import FireCrawlLoader
- from langchain_community.document_loaders.base import BaseLoader
- from langchain_core.documents import Document
- from open_webui.constants import ERROR_MESSAGES
- from open_webui.config import (
- ENABLE_RAG_LOCAL_WEB_FETCH,
- PLAYWRIGHT_WS_URI,
- RAG_WEB_LOADER_ENGINE,
- FIRECRAWL_API_BASE_URL,
- FIRECRAWL_API_KEY,
- )
- from open_webui.env import SRC_LOG_LEVELS
- log = logging.getLogger(__name__)
- log.setLevel(SRC_LOG_LEVELS["RAG"])
- def validate_url(url: Union[str, Sequence[str]]):
- if isinstance(url, str):
- if isinstance(validators.url(url), validators.ValidationError):
- raise ValueError(ERROR_MESSAGES.INVALID_URL)
- if not ENABLE_RAG_LOCAL_WEB_FETCH:
- # Local web fetch is disabled, filter out any URLs that resolve to private IP addresses
- parsed_url = urllib.parse.urlparse(url)
- # Get IPv4 and IPv6 addresses
- ipv4_addresses, ipv6_addresses = resolve_hostname(parsed_url.hostname)
- # Check if any of the resolved addresses are private
- # This is technically still vulnerable to DNS rebinding attacks, as we don't control WebBaseLoader
- for ip in ipv4_addresses:
- if validators.ipv4(ip, private=True):
- raise ValueError(ERROR_MESSAGES.INVALID_URL)
- for ip in ipv6_addresses:
- if validators.ipv6(ip, private=True):
- raise ValueError(ERROR_MESSAGES.INVALID_URL)
- return True
- elif isinstance(url, Sequence):
- return all(validate_url(u) for u in url)
- else:
- return False
- def safe_validate_urls(url: Sequence[str]) -> Sequence[str]:
- valid_urls = []
- for u in url:
- try:
- if validate_url(u):
- valid_urls.append(u)
- except ValueError:
- continue
- return valid_urls
- def resolve_hostname(hostname):
- # Get address information
- addr_info = socket.getaddrinfo(hostname, None)
- # Extract IP addresses from address information
- ipv4_addresses = [info[4][0] for info in addr_info if info[0] == socket.AF_INET]
- ipv6_addresses = [info[4][0] for info in addr_info if info[0] == socket.AF_INET6]
- return ipv4_addresses, ipv6_addresses
- def extract_metadata(soup, url):
- metadata = {"source": url}
- if title := soup.find("title"):
- metadata["title"] = title.get_text()
- if description := soup.find("meta", attrs={"name": "description"}):
- metadata["description"] = description.get("content", "No description found.")
- if html := soup.find("html"):
- metadata["language"] = html.get("lang", "No language found.")
- return metadata
- def verify_ssl_cert(url: str) -> bool:
- """Verify SSL certificate for the given URL."""
- if not url.startswith("https://"):
- return True
- try:
- hostname = url.split("://")[-1].split("/")[0]
- context = ssl.create_default_context(cafile=certifi.where())
- with context.wrap_socket(ssl.socket(), server_hostname=hostname) as s:
- s.connect((hostname, 443))
- return True
- except ssl.SSLError:
- return False
- except Exception as e:
- log.warning(f"SSL verification failed for {url}: {str(e)}")
- return False
- class SafeFireCrawlLoader(BaseLoader):
- def __init__(
- self,
- web_paths,
- verify_ssl: bool = True,
- trust_env: bool = False,
- requests_per_second: Optional[float] = None,
- continue_on_failure: bool = True,
- api_key: Optional[str] = None,
- api_url: Optional[str] = None,
- mode: Literal["crawl", "scrape", "map"] = "crawl",
- proxy: Optional[Dict[str, str]] = None,
- params: Optional[Dict] = None,
- ):
- """Concurrent document loader for FireCrawl operations.
- Executes multiple FireCrawlLoader instances concurrently using thread pooling
- to improve bulk processing efficiency.
- Args:
- web_paths: List of URLs/paths to process.
- verify_ssl: If True, verify SSL certificates.
- trust_env: If True, use proxy settings from environment variables.
- requests_per_second: Number of requests per second to limit to.
- continue_on_failure (bool): If True, continue loading other URLs on failure.
- api_key: API key for FireCrawl service. Defaults to None
- (uses FIRE_CRAWL_API_KEY environment variable if not provided).
- api_url: Base URL for FireCrawl API. Defaults to official API endpoint.
- mode: Operation mode selection:
- - 'crawl': Website crawling mode (default)
- - 'scrape': Direct page scraping
- - 'map': Site map generation
- proxy: Proxy override settings for the FireCrawl API.
- params: The parameters to pass to the Firecrawl API.
- Examples include crawlerOptions.
- For more details, visit: https://github.com/mendableai/firecrawl-py
- """
- proxy_server = proxy.get("server") if proxy else None
- if trust_env and not proxy_server:
- env_proxies = urllib.request.getproxies()
- env_proxy_server = env_proxies.get("https") or env_proxies.get("http")
- if env_proxy_server:
- if proxy:
- proxy["server"] = env_proxy_server
- else:
- proxy = {"server": env_proxy_server}
- self.web_paths = web_paths
- self.verify_ssl = verify_ssl
- self.requests_per_second = requests_per_second
- self.last_request_time = None
- self.trust_env = trust_env
- self.continue_on_failure = continue_on_failure
- self.api_key = api_key
- self.api_url = api_url
- self.mode = mode
- self.params = params
- def lazy_load(self) -> Iterator[Document]:
- """Load documents concurrently using FireCrawl."""
- for url in self.web_paths:
- try:
- self._safe_process_url_sync(url)
- loader = FireCrawlLoader(
- url=url,
- api_key=self.api_key,
- api_url=self.api_url,
- mode=self.mode,
- params=self.params,
- )
- yield from loader.lazy_load()
- except Exception as e:
- if self.continue_on_failure:
- log.exception(e, "Error loading %s", url)
- continue
- raise e
- async def alazy_load(self):
- """Async version of lazy_load."""
- for url in self.web_paths:
- try:
- await self._safe_process_url(url)
- loader = FireCrawlLoader(
- url=url,
- api_key=self.api_key,
- api_url=self.api_url,
- mode=self.mode,
- params=self.params,
- )
- async for document in loader.alazy_load():
- yield document
- except Exception as e:
- if self.continue_on_failure:
- log.exception(e, "Error loading %s", url)
- continue
- raise e
- def _verify_ssl_cert(self, url: str) -> bool:
- return verify_ssl_cert(url)
- async def _wait_for_rate_limit(self):
- """Wait to respect the rate limit if specified."""
- if self.requests_per_second and self.last_request_time:
- min_interval = timedelta(seconds=1.0 / self.requests_per_second)
- time_since_last = datetime.now() - self.last_request_time
- if time_since_last < min_interval:
- await asyncio.sleep((min_interval - time_since_last).total_seconds())
- self.last_request_time = datetime.now()
- def _sync_wait_for_rate_limit(self):
- """Synchronous version of rate limit wait."""
- if self.requests_per_second and self.last_request_time:
- min_interval = timedelta(seconds=1.0 / self.requests_per_second)
- time_since_last = datetime.now() - self.last_request_time
- if time_since_last < min_interval:
- time.sleep((min_interval - time_since_last).total_seconds())
- self.last_request_time = datetime.now()
- async def _safe_process_url(self, url: str) -> bool:
- """Perform safety checks before processing a URL."""
- if self.verify_ssl and not self._verify_ssl_cert(url):
- raise ValueError(f"SSL certificate verification failed for {url}")
- await self._wait_for_rate_limit()
- return True
- def _safe_process_url_sync(self, url: str) -> bool:
- """Synchronous version of safety checks."""
- if self.verify_ssl and not self._verify_ssl_cert(url):
- raise ValueError(f"SSL certificate verification failed for {url}")
- self._sync_wait_for_rate_limit()
- return True
- class SafePlaywrightURLLoader(PlaywrightURLLoader):
- """Load HTML pages safely with Playwright, supporting SSL verification, rate limiting, and remote browser connection.
- Attributes:
- web_paths (List[str]): List of URLs to load.
- verify_ssl (bool): If True, verify SSL certificates.
- trust_env (bool): If True, use proxy settings from environment variables.
- requests_per_second (Optional[float]): Number of requests per second to limit to.
- continue_on_failure (bool): If True, continue loading other URLs on failure.
- headless (bool): If True, the browser will run in headless mode.
- proxy (dict): Proxy override settings for the Playwright session.
- playwright_ws_url (Optional[str]): WebSocket endpoint URI for remote browser connection.
- """
- def __init__(
- self,
- web_paths: List[str],
- verify_ssl: bool = True,
- trust_env: bool = False,
- requests_per_second: Optional[float] = None,
- continue_on_failure: bool = True,
- headless: bool = True,
- remove_selectors: Optional[List[str]] = None,
- proxy: Optional[Dict[str, str]] = None,
- playwright_ws_url: Optional[str] = None,
- ):
- """Initialize with additional safety parameters and remote browser support."""
- proxy_server = proxy.get("server") if proxy else None
- if trust_env and not proxy_server:
- env_proxies = urllib.request.getproxies()
- env_proxy_server = env_proxies.get("https") or env_proxies.get("http")
- if env_proxy_server:
- if proxy:
- proxy["server"] = env_proxy_server
- else:
- proxy = {"server": env_proxy_server}
- # We'll set headless to False if using playwright_ws_url since it's handled by the remote browser
- super().__init__(
- urls=web_paths,
- continue_on_failure=continue_on_failure,
- headless=headless if playwright_ws_url is None else False,
- remove_selectors=remove_selectors,
- proxy=proxy,
- )
- self.verify_ssl = verify_ssl
- self.requests_per_second = requests_per_second
- self.last_request_time = None
- self.playwright_ws_url = playwright_ws_url
- self.trust_env = trust_env
- def lazy_load(self) -> Iterator[Document]:
- """Safely load URLs synchronously with support for remote browser."""
- from playwright.sync_api import sync_playwright
- with sync_playwright() as p:
- # Use remote browser if ws_endpoint is provided, otherwise use local browser
- if self.playwright_ws_url:
- browser = p.chromium.connect(self.playwright_ws_url)
- else:
- browser = p.chromium.launch(headless=self.headless, proxy=self.proxy)
- for url in self.urls:
- try:
- self._safe_process_url_sync(url)
- page = browser.new_page()
- response = page.goto(url)
- if response is None:
- raise ValueError(f"page.goto() returned None for url {url}")
- text = self.evaluator.evaluate(page, browser, response)
- metadata = {"source": url}
- yield Document(page_content=text, metadata=metadata)
- except Exception as e:
- if self.continue_on_failure:
- log.exception(e, "Error loading %s", url)
- continue
- raise e
- browser.close()
- async def alazy_load(self) -> AsyncIterator[Document]:
- """Safely load URLs asynchronously with support for remote browser."""
- from playwright.async_api import async_playwright
- async with async_playwright() as p:
- # Use remote browser if ws_endpoint is provided, otherwise use local browser
- if self.playwright_ws_url:
- browser = await p.chromium.connect(self.playwright_ws_url)
- else:
- browser = await p.chromium.launch(
- headless=self.headless, proxy=self.proxy
- )
- for url in self.urls:
- try:
- await self._safe_process_url(url)
- page = await browser.new_page()
- response = await page.goto(url)
- if response is None:
- raise ValueError(f"page.goto() returned None for url {url}")
- text = await self.evaluator.evaluate_async(page, browser, response)
- metadata = {"source": url}
- yield Document(page_content=text, metadata=metadata)
- except Exception as e:
- if self.continue_on_failure:
- log.exception(e, "Error loading %s", url)
- continue
- raise e
- await browser.close()
- def _verify_ssl_cert(self, url: str) -> bool:
- return verify_ssl_cert(url)
- async def _wait_for_rate_limit(self):
- """Wait to respect the rate limit if specified."""
- if self.requests_per_second and self.last_request_time:
- min_interval = timedelta(seconds=1.0 / self.requests_per_second)
- time_since_last = datetime.now() - self.last_request_time
- if time_since_last < min_interval:
- await asyncio.sleep((min_interval - time_since_last).total_seconds())
- self.last_request_time = datetime.now()
- def _sync_wait_for_rate_limit(self):
- """Synchronous version of rate limit wait."""
- if self.requests_per_second and self.last_request_time:
- min_interval = timedelta(seconds=1.0 / self.requests_per_second)
- time_since_last = datetime.now() - self.last_request_time
- if time_since_last < min_interval:
- time.sleep((min_interval - time_since_last).total_seconds())
- self.last_request_time = datetime.now()
- async def _safe_process_url(self, url: str) -> bool:
- """Perform safety checks before processing a URL."""
- if self.verify_ssl and not self._verify_ssl_cert(url):
- raise ValueError(f"SSL certificate verification failed for {url}")
- await self._wait_for_rate_limit()
- return True
- def _safe_process_url_sync(self, url: str) -> bool:
- """Synchronous version of safety checks."""
- if self.verify_ssl and not self._verify_ssl_cert(url):
- raise ValueError(f"SSL certificate verification failed for {url}")
- self._sync_wait_for_rate_limit()
- return True
- class SafeWebBaseLoader(WebBaseLoader):
- """WebBaseLoader with enhanced error handling for URLs."""
- def __init__(self, trust_env: bool = False, *args, **kwargs):
- """Initialize SafeWebBaseLoader
- Args:
- trust_env (bool, optional): set to True if using proxy to make web requests, for example
- using http(s)_proxy environment variables. Defaults to False.
- """
- super().__init__(*args, **kwargs)
- self.trust_env = trust_env
- async def _fetch(
- self, url: str, retries: int = 3, cooldown: int = 2, backoff: float = 1.5
- ) -> str:
- async with aiohttp.ClientSession(trust_env=self.trust_env) as session:
- for i in range(retries):
- try:
- kwargs: Dict = dict(
- headers=self.session.headers,
- cookies=self.session.cookies.get_dict(),
- )
- if not self.session.verify:
- kwargs["ssl"] = False
- async with session.get(
- url, **(self.requests_kwargs | kwargs)
- ) as response:
- if self.raise_for_status:
- response.raise_for_status()
- return await response.text()
- except aiohttp.ClientConnectionError as e:
- if i == retries - 1:
- raise
- else:
- log.warning(
- f"Error fetching {url} with attempt "
- f"{i + 1}/{retries}: {e}. Retrying..."
- )
- await asyncio.sleep(cooldown * backoff**i)
- raise ValueError("retry count exceeded")
- def _unpack_fetch_results(
- self, results: Any, urls: List[str], parser: Union[str, None] = None
- ) -> List[Any]:
- """Unpack fetch results into BeautifulSoup objects."""
- from bs4 import BeautifulSoup
- final_results = []
- for i, result in enumerate(results):
- url = urls[i]
- if parser is None:
- if url.endswith(".xml"):
- parser = "xml"
- else:
- parser = self.default_parser
- self._check_parser(parser)
- final_results.append(BeautifulSoup(result, parser, **self.bs_kwargs))
- return final_results
- async def ascrape_all(
- self, urls: List[str], parser: Union[str, None] = None
- ) -> List[Any]:
- """Async fetch all urls, then return soups for all results."""
- results = await self.fetch_all(urls)
- return self._unpack_fetch_results(results, urls, parser=parser)
- def lazy_load(self) -> Iterator[Document]:
- """Lazy load text from the url(s) in web_path with error handling."""
- for path in self.web_paths:
- try:
- soup = self._scrape(path, bs_kwargs=self.bs_kwargs)
- text = soup.get_text(**self.bs_get_text_kwargs)
- # Build metadata
- metadata = extract_metadata(soup, path)
- yield Document(page_content=text, metadata=metadata)
- except Exception as e:
- # Log the error and continue with the next URL
- log.exception(e, "Error loading %s", path)
- async def alazy_load(self) -> AsyncIterator[Document]:
- """Async lazy load text from the url(s) in web_path."""
- results = await self.ascrape_all(self.web_paths)
- for path, soup in zip(self.web_paths, results):
- text = soup.get_text(**self.bs_get_text_kwargs)
- metadata = {"source": path}
- if title := soup.find("title"):
- metadata["title"] = title.get_text()
- if description := soup.find("meta", attrs={"name": "description"}):
- metadata["description"] = description.get(
- "content", "No description found."
- )
- if html := soup.find("html"):
- metadata["language"] = html.get("lang", "No language found.")
- yield Document(page_content=text, metadata=metadata)
- async def aload(self) -> list[Document]:
- """Load data into Document objects."""
- return [document async for document in self.alazy_load()]
- RAG_WEB_LOADER_ENGINES = defaultdict(lambda: SafeWebBaseLoader)
- RAG_WEB_LOADER_ENGINES["playwright"] = SafePlaywrightURLLoader
- RAG_WEB_LOADER_ENGINES["safe_web"] = SafeWebBaseLoader
- RAG_WEB_LOADER_ENGINES["firecrawl"] = SafeFireCrawlLoader
- def get_web_loader(
- urls: Union[str, Sequence[str]],
- verify_ssl: bool = True,
- requests_per_second: int = 2,
- trust_env: bool = False,
- ):
- # Check if the URLs are valid
- safe_urls = safe_validate_urls([urls] if isinstance(urls, str) else urls)
- web_loader_args = {
- "web_paths": safe_urls,
- "verify_ssl": verify_ssl,
- "requests_per_second": requests_per_second,
- "continue_on_failure": True,
- "trust_env": trust_env,
- }
- if PLAYWRIGHT_WS_URI.value:
- web_loader_args["playwright_ws_url"] = PLAYWRIGHT_WS_URI.value
- if RAG_WEB_LOADER_ENGINE.value == "firecrawl":
- web_loader_args["api_key"] = FIRECRAWL_API_KEY.value
- web_loader_args["api_url"] = FIRECRAWL_API_BASE_URL.value
- # Create the appropriate WebLoader based on the configuration
- WebLoaderClass = RAG_WEB_LOADER_ENGINES[RAG_WEB_LOADER_ENGINE.value]
- web_loader = WebLoaderClass(**web_loader_args)
- log.debug(
- "Using RAG_WEB_LOADER_ENGINE %s for %s URLs",
- web_loader.__class__.__name__,
- len(safe_urls),
- )
- return web_loader
|