tavily.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import requests
  2. import logging
  3. from typing import Iterator, List, Literal, Union
  4. from langchain_core.document_loaders import BaseLoader
  5. from langchain_core.documents import Document
  6. from open_webui.env import SRC_LOG_LEVELS
  7. log = logging.getLogger(__name__)
  8. log.setLevel(SRC_LOG_LEVELS["RAG"])
  9. class TavilyLoader(BaseLoader):
  10. """Extract web page content from URLs using Tavily Extract API.
  11. This is a LangChain document loader that uses Tavily's Extract API to
  12. retrieve content from web pages and return it as Document objects.
  13. Args:
  14. urls: URL or list of URLs to extract content from.
  15. api_key: The Tavily API key.
  16. extract_depth: Depth of extraction, either "basic" or "advanced".
  17. continue_on_failure: Whether to continue if extraction of a URL fails.
  18. """
  19. def __init__(
  20. self,
  21. urls: Union[str, List[str]],
  22. api_key: str,
  23. extract_depth: Literal["basic", "advanced"] = "basic",
  24. continue_on_failure: bool = True,
  25. ) -> None:
  26. """Initialize Tavily Extract client.
  27. Args:
  28. urls: URL or list of URLs to extract content from.
  29. api_key: The Tavily API key.
  30. include_images: Whether to include images in the extraction.
  31. extract_depth: Depth of extraction, either "basic" or "advanced".
  32. advanced extraction retrieves more data, including tables and
  33. embedded content, with higher success but may increase latency.
  34. basic costs 1 credit per 5 successful URL extractions,
  35. advanced costs 2 credits per 5 successful URL extractions.
  36. continue_on_failure: Whether to continue if extraction of a URL fails.
  37. """
  38. if not urls:
  39. raise ValueError("At least one URL must be provided.")
  40. self.api_key = api_key
  41. self.urls = urls if isinstance(urls, list) else [urls]
  42. self.extract_depth = extract_depth
  43. self.continue_on_failure = continue_on_failure
  44. self.api_url = "https://api.tavily.com/extract"
  45. def lazy_load(self) -> Iterator[Document]:
  46. """Extract and yield documents from the URLs using Tavily Extract API."""
  47. batch_size = 20
  48. for i in range(0, len(self.urls), batch_size):
  49. batch_urls = self.urls[i : i + batch_size]
  50. try:
  51. headers = {
  52. "Content-Type": "application/json",
  53. "Authorization": f"Bearer {self.api_key}",
  54. }
  55. # Use string for single URL, array for multiple URLs
  56. urls_param = batch_urls[0] if len(batch_urls) == 1 else batch_urls
  57. payload = {"urls": urls_param, "extract_depth": self.extract_depth}
  58. # Make the API call
  59. response = requests.post(self.api_url, headers=headers, json=payload)
  60. response.raise_for_status()
  61. response_data = response.json()
  62. # Process successful results
  63. for result in response_data.get("results", []):
  64. url = result.get("url", "")
  65. content = result.get("raw_content", "")
  66. if not content:
  67. log.warning(f"No content extracted from {url}")
  68. continue
  69. # Add URLs as metadata
  70. metadata = {"source": url}
  71. yield Document(
  72. page_content=content,
  73. metadata=metadata,
  74. )
  75. for failed in response_data.get("failed_results", []):
  76. url = failed.get("url", "")
  77. error = failed.get("error", "Unknown error")
  78. log.error(f"Failed to extract content from {url}: {error}")
  79. except Exception as e:
  80. if self.continue_on_failure:
  81. log.error(f"Error extracting content from batch {batch_urls}: {e}")
  82. else:
  83. raise e