123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299 |
- from fastapi import (
- FastAPI,
- Depends,
- HTTPException,
- status,
- UploadFile,
- File,
- Form,
- )
- from fastapi.middleware.cors import CORSMiddleware
- import requests
- import os, shutil, logging, re
- from datetime import datetime
- from pathlib import Path
- from typing import List, Union, Sequence, Iterator, Any
- from chromadb.utils.batch_utils import create_batches
- from langchain_core.documents import Document
- from langchain_community.document_loaders import (
- WebBaseLoader,
- TextLoader,
- PyPDFLoader,
- CSVLoader,
- BSHTMLLoader,
- Docx2txtLoader,
- UnstructuredEPubLoader,
- UnstructuredWordDocumentLoader,
- UnstructuredMarkdownLoader,
- UnstructuredXMLLoader,
- UnstructuredRSTLoader,
- UnstructuredExcelLoader,
- UnstructuredPowerPointLoader,
- YoutubeLoader,
- OutlookMessageLoader,
- )
- from langchain.text_splitter import RecursiveCharacterTextSplitter
- import validators
- import urllib.parse
- import socket
- from pydantic import BaseModel
- from typing import Optional
- import mimetypes
- import uuid
- import json
- import sentence_transformers
- from apps.webui.models.documents import (
- Documents,
- DocumentForm,
- DocumentResponse,
- )
- from apps.rag.utils import (
- get_model_path,
- get_embedding_function,
- query_doc,
- query_doc_with_hybrid_search,
- query_collection,
- query_collection_with_hybrid_search,
- )
- from apps.rag.search.brave import search_brave
- from apps.rag.search.google_pse import search_google_pse
- from apps.rag.search.main import SearchResult
- from apps.rag.search.searxng import search_searxng
- from apps.rag.search.serper import search_serper
- from apps.rag.search.serpstack import search_serpstack
- from apps.rag.search.serply import search_serply
- from apps.rag.search.duckduckgo import search_duckduckgo
- from apps.rag.search.tavily import search_tavily
- from utils.misc import (
- calculate_sha256,
- calculate_sha256_string,
- sanitize_filename,
- extract_folders_after_data_docs,
- )
- from utils.utils import get_current_user, get_admin_user
- from config import (
- AppConfig,
- ENV,
- SRC_LOG_LEVELS,
- UPLOAD_DIR,
- DOCS_DIR,
- RAG_TOP_K,
- RAG_RELEVANCE_THRESHOLD,
- RAG_EMBEDDING_ENGINE,
- RAG_EMBEDDING_MODEL,
- RAG_EMBEDDING_MODEL_AUTO_UPDATE,
- RAG_EMBEDDING_MODEL_TRUST_REMOTE_CODE,
- ENABLE_RAG_HYBRID_SEARCH,
- ENABLE_RAG_WEB_LOADER_SSL_VERIFICATION,
- RAG_RERANKING_MODEL,
- PDF_EXTRACT_IMAGES,
- RAG_RERANKING_MODEL_AUTO_UPDATE,
- RAG_RERANKING_MODEL_TRUST_REMOTE_CODE,
- RAG_OPENAI_API_BASE_URL,
- RAG_OPENAI_API_KEY,
- DEVICE_TYPE,
- CHROMA_CLIENT,
- CHUNK_SIZE,
- CHUNK_OVERLAP,
- RAG_TEMPLATE,
- ENABLE_RAG_LOCAL_WEB_FETCH,
- YOUTUBE_LOADER_LANGUAGE,
- ENABLE_RAG_WEB_SEARCH,
- RAG_WEB_SEARCH_ENGINE,
- SEARXNG_QUERY_URL,
- GOOGLE_PSE_API_KEY,
- GOOGLE_PSE_ENGINE_ID,
- BRAVE_SEARCH_API_KEY,
- SERPSTACK_API_KEY,
- SERPSTACK_HTTPS,
- SERPER_API_KEY,
- SERPLY_API_KEY,
- TAVILY_API_KEY,
- RAG_WEB_SEARCH_RESULT_COUNT,
- RAG_WEB_SEARCH_CONCURRENT_REQUESTS,
- RAG_EMBEDDING_OPENAI_BATCH_SIZE,
- )
- from constants import ERROR_MESSAGES
- log = logging.getLogger(__name__)
- log.setLevel(SRC_LOG_LEVELS["RAG"])
- app = FastAPI()
- app.state.config = AppConfig()
- app.state.config.TOP_K = RAG_TOP_K
- app.state.config.RELEVANCE_THRESHOLD = RAG_RELEVANCE_THRESHOLD
- app.state.config.ENABLE_RAG_HYBRID_SEARCH = ENABLE_RAG_HYBRID_SEARCH
- app.state.config.ENABLE_RAG_WEB_LOADER_SSL_VERIFICATION = (
- ENABLE_RAG_WEB_LOADER_SSL_VERIFICATION
- )
- app.state.config.CHUNK_SIZE = CHUNK_SIZE
- app.state.config.CHUNK_OVERLAP = CHUNK_OVERLAP
- app.state.config.RAG_EMBEDDING_ENGINE = RAG_EMBEDDING_ENGINE
- app.state.config.RAG_EMBEDDING_MODEL = RAG_EMBEDDING_MODEL
- app.state.config.RAG_EMBEDDING_OPENAI_BATCH_SIZE = RAG_EMBEDDING_OPENAI_BATCH_SIZE
- app.state.config.RAG_RERANKING_MODEL = RAG_RERANKING_MODEL
- app.state.config.RAG_TEMPLATE = RAG_TEMPLATE
- app.state.config.OPENAI_API_BASE_URL = RAG_OPENAI_API_BASE_URL
- app.state.config.OPENAI_API_KEY = RAG_OPENAI_API_KEY
- app.state.config.PDF_EXTRACT_IMAGES = PDF_EXTRACT_IMAGES
- app.state.config.YOUTUBE_LOADER_LANGUAGE = YOUTUBE_LOADER_LANGUAGE
- app.state.YOUTUBE_LOADER_TRANSLATION = None
- app.state.config.ENABLE_RAG_WEB_SEARCH = ENABLE_RAG_WEB_SEARCH
- app.state.config.RAG_WEB_SEARCH_ENGINE = RAG_WEB_SEARCH_ENGINE
- app.state.config.SEARXNG_QUERY_URL = SEARXNG_QUERY_URL
- app.state.config.GOOGLE_PSE_API_KEY = GOOGLE_PSE_API_KEY
- app.state.config.GOOGLE_PSE_ENGINE_ID = GOOGLE_PSE_ENGINE_ID
- app.state.config.BRAVE_SEARCH_API_KEY = BRAVE_SEARCH_API_KEY
- app.state.config.SERPSTACK_API_KEY = SERPSTACK_API_KEY
- app.state.config.SERPSTACK_HTTPS = SERPSTACK_HTTPS
- app.state.config.SERPER_API_KEY = SERPER_API_KEY
- app.state.config.SERPLY_API_KEY = SERPLY_API_KEY
- app.state.config.TAVILY_API_KEY = TAVILY_API_KEY
- app.state.config.RAG_WEB_SEARCH_RESULT_COUNT = RAG_WEB_SEARCH_RESULT_COUNT
- app.state.config.RAG_WEB_SEARCH_CONCURRENT_REQUESTS = RAG_WEB_SEARCH_CONCURRENT_REQUESTS
- def update_embedding_model(
- embedding_model: str,
- update_model: bool = False,
- ):
- if embedding_model and app.state.config.RAG_EMBEDDING_ENGINE == "":
- app.state.sentence_transformer_ef = sentence_transformers.SentenceTransformer(
- get_model_path(embedding_model, update_model),
- device=DEVICE_TYPE,
- trust_remote_code=RAG_EMBEDDING_MODEL_TRUST_REMOTE_CODE,
- )
- else:
- app.state.sentence_transformer_ef = None
- def update_reranking_model(
- reranking_model: str,
- update_model: bool = False,
- ):
- if reranking_model:
- app.state.sentence_transformer_rf = sentence_transformers.CrossEncoder(
- get_model_path(reranking_model, update_model),
- device=DEVICE_TYPE,
- trust_remote_code=RAG_RERANKING_MODEL_TRUST_REMOTE_CODE,
- )
- else:
- app.state.sentence_transformer_rf = None
- update_embedding_model(
- app.state.config.RAG_EMBEDDING_MODEL,
- RAG_EMBEDDING_MODEL_AUTO_UPDATE,
- )
- update_reranking_model(
- app.state.config.RAG_RERANKING_MODEL,
- RAG_RERANKING_MODEL_AUTO_UPDATE,
- )
- app.state.EMBEDDING_FUNCTION = get_embedding_function(
- app.state.config.RAG_EMBEDDING_ENGINE,
- app.state.config.RAG_EMBEDDING_MODEL,
- app.state.sentence_transformer_ef,
- app.state.config.OPENAI_API_KEY,
- app.state.config.OPENAI_API_BASE_URL,
- app.state.config.RAG_EMBEDDING_OPENAI_BATCH_SIZE,
- )
- origins = ["*"]
- app.add_middleware(
- CORSMiddleware,
- allow_origins=origins,
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- class CollectionNameForm(BaseModel):
- collection_name: Optional[str] = "test"
- class UrlForm(CollectionNameForm):
- url: str
- class SearchForm(CollectionNameForm):
- query: str
- @app.get("/")
- async def get_status():
- return {
- "status": True,
- "chunk_size": app.state.config.CHUNK_SIZE,
- "chunk_overlap": app.state.config.CHUNK_OVERLAP,
- "template": app.state.config.RAG_TEMPLATE,
- "embedding_engine": app.state.config.RAG_EMBEDDING_ENGINE,
- "embedding_model": app.state.config.RAG_EMBEDDING_MODEL,
- "reranking_model": app.state.config.RAG_RERANKING_MODEL,
- "openai_batch_size": app.state.config.RAG_EMBEDDING_OPENAI_BATCH_SIZE,
- }
- @app.get("/embedding")
- async def get_embedding_config(user=Depends(get_admin_user)):
- return {
- "status": True,
- "embedding_engine": app.state.config.RAG_EMBEDDING_ENGINE,
- "embedding_model": app.state.config.RAG_EMBEDDING_MODEL,
- "openai_config": {
- "url": app.state.config.OPENAI_API_BASE_URL,
- "key": app.state.config.OPENAI_API_KEY,
- "batch_size": app.state.config.RAG_EMBEDDING_OPENAI_BATCH_SIZE,
- },
- }
- @app.get("/reranking")
- async def get_reraanking_config(user=Depends(get_admin_user)):
- return {
- "status": True,
- "reranking_model": app.state.config.RAG_RERANKING_MODEL,
- }
- class OpenAIConfigForm(BaseModel):
- url: str
- key: str
- batch_size: Optional[int] = None
- class EmbeddingModelUpdateForm(BaseModel):
- openai_config: Optional[OpenAIConfigForm] = None
- embedding_engine: str
- embedding_model: str
- @app.post("/embedding/update")
- async def update_embedding_config(
- form_data: EmbeddingModelUpdateForm, user=Depends(get_admin_user)
- ):
- log.info(
- f"Updating embedding model: {app.state.config.RAG_EMBEDDING_MODEL} to {form_data.embedding_model}"
- )
- try:
- app.state.config.RAG_EMBEDDING_ENGINE = form_data.embedding_engine
- app.state.config.RAG_EMBEDDING_MODEL = form_data.embedding_model
- if app.state.config.RAG_EMBEDDING_ENGINE in ["ollama", "openai"]:
- if form_data.openai_config is not None:
- app.state.config.OPENAI_API_BASE_URL = form_data.openai_config.url
- app.state.config.OPENAI_API_KEY = form_data.openai_config.key
- app.state.config.RAG_EMBEDDING_OPENAI_BATCH_SIZE = (
- form_data.openai_config.batch_size
- if form_data.openai_config.batch_size
- else 1
- )
- update_embedding_model(app.state.config.RAG_EMBEDDING_MODEL)
- app.state.EMBEDDING_FUNCTION = get_embedding_function(
- app.state.config.RAG_EMBEDDING_ENGINE,
- app.state.config.RAG_EMBEDDING_MODEL,
- app.state.sentence_transformer_ef,
- app.state.config.OPENAI_API_KEY,
- app.state.config.OPENAI_API_BASE_URL,
- app.state.config.RAG_EMBEDDING_OPENAI_BATCH_SIZE,
- )
- return {
- "status": True,
- "embedding_engine": app.state.config.RAG_EMBEDDING_ENGINE,
- "embedding_model": app.state.config.RAG_EMBEDDING_MODEL,
- "openai_config": {
- "url": app.state.config.OPENAI_API_BASE_URL,
- "key": app.state.config.OPENAI_API_KEY,
- "batch_size": app.state.config.RAG_EMBEDDING_OPENAI_BATCH_SIZE,
- },
- }
- except Exception as e:
- log.exception(f"Problem updating embedding model: {e}")
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=ERROR_MESSAGES.DEFAULT(e),
- )
- class RerankingModelUpdateForm(BaseModel):
- reranking_model: str
- @app.post("/reranking/update")
- async def update_reranking_config(
- form_data: RerankingModelUpdateForm, user=Depends(get_admin_user)
- ):
- log.info(
- f"Updating reranking model: {app.state.config.RAG_RERANKING_MODEL} to {form_data.reranking_model}"
- )
- try:
- app.state.config.RAG_RERANKING_MODEL = form_data.reranking_model
- update_reranking_model(app.state.config.RAG_RERANKING_MODEL), True
- return {
- "status": True,
- "reranking_model": app.state.config.RAG_RERANKING_MODEL,
- }
- except Exception as e:
- log.exception(f"Problem updating reranking model: {e}")
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=ERROR_MESSAGES.DEFAULT(e),
- )
- @app.get("/config")
- async def get_rag_config(user=Depends(get_admin_user)):
- return {
- "status": True,
- "pdf_extract_images": app.state.config.PDF_EXTRACT_IMAGES,
- "chunk": {
- "chunk_size": app.state.config.CHUNK_SIZE,
- "chunk_overlap": app.state.config.CHUNK_OVERLAP,
- },
- "youtube": {
- "language": app.state.config.YOUTUBE_LOADER_LANGUAGE,
- "translation": app.state.YOUTUBE_LOADER_TRANSLATION,
- },
- "web": {
- "ssl_verification": app.state.config.ENABLE_RAG_WEB_LOADER_SSL_VERIFICATION,
- "search": {
- "enabled": app.state.config.ENABLE_RAG_WEB_SEARCH,
- "engine": app.state.config.RAG_WEB_SEARCH_ENGINE,
- "searxng_query_url": app.state.config.SEARXNG_QUERY_URL,
- "google_pse_api_key": app.state.config.GOOGLE_PSE_API_KEY,
- "google_pse_engine_id": app.state.config.GOOGLE_PSE_ENGINE_ID,
- "brave_search_api_key": app.state.config.BRAVE_SEARCH_API_KEY,
- "serpstack_api_key": app.state.config.SERPSTACK_API_KEY,
- "serpstack_https": app.state.config.SERPSTACK_HTTPS,
- "serper_api_key": app.state.config.SERPER_API_KEY,
- "serply_api_key": app.state.config.SERPLY_API_KEY,
- "tavily_api_key": app.state.config.TAVILY_API_KEY,
- "result_count": app.state.config.RAG_WEB_SEARCH_RESULT_COUNT,
- "concurrent_requests": app.state.config.RAG_WEB_SEARCH_CONCURRENT_REQUESTS,
- },
- },
- }
- class ChunkParamUpdateForm(BaseModel):
- chunk_size: int
- chunk_overlap: int
- class YoutubeLoaderConfig(BaseModel):
- language: List[str]
- translation: Optional[str] = None
- class WebSearchConfig(BaseModel):
- enabled: bool
- engine: Optional[str] = None
- searxng_query_url: Optional[str] = None
- google_pse_api_key: Optional[str] = None
- google_pse_engine_id: Optional[str] = None
- brave_search_api_key: Optional[str] = None
- serpstack_api_key: Optional[str] = None
- serpstack_https: Optional[bool] = None
- serper_api_key: Optional[str] = None
- serply_api_key: Optional[str] = None
- tavily_api_key: Optional[str] = None
- result_count: Optional[int] = None
- concurrent_requests: Optional[int] = None
- class WebConfig(BaseModel):
- search: WebSearchConfig
- web_loader_ssl_verification: Optional[bool] = None
- class ConfigUpdateForm(BaseModel):
- pdf_extract_images: Optional[bool] = None
- chunk: Optional[ChunkParamUpdateForm] = None
- youtube: Optional[YoutubeLoaderConfig] = None
- web: Optional[WebConfig] = None
- @app.post("/config/update")
- async def update_rag_config(form_data: ConfigUpdateForm, user=Depends(get_admin_user)):
- app.state.config.PDF_EXTRACT_IMAGES = (
- form_data.pdf_extract_images
- if form_data.pdf_extract_images is not None
- else app.state.config.PDF_EXTRACT_IMAGES
- )
- if form_data.chunk is not None:
- app.state.config.CHUNK_SIZE = form_data.chunk.chunk_size
- app.state.config.CHUNK_OVERLAP = form_data.chunk.chunk_overlap
- if form_data.youtube is not None:
- app.state.config.YOUTUBE_LOADER_LANGUAGE = form_data.youtube.language
- app.state.YOUTUBE_LOADER_TRANSLATION = form_data.youtube.translation
- if form_data.web is not None:
- app.state.config.ENABLE_RAG_WEB_LOADER_SSL_VERIFICATION = (
- form_data.web.web_loader_ssl_verification
- )
- app.state.config.ENABLE_RAG_WEB_SEARCH = form_data.web.search.enabled
- app.state.config.RAG_WEB_SEARCH_ENGINE = form_data.web.search.engine
- app.state.config.SEARXNG_QUERY_URL = form_data.web.search.searxng_query_url
- app.state.config.GOOGLE_PSE_API_KEY = form_data.web.search.google_pse_api_key
- app.state.config.GOOGLE_PSE_ENGINE_ID = (
- form_data.web.search.google_pse_engine_id
- )
- app.state.config.BRAVE_SEARCH_API_KEY = (
- form_data.web.search.brave_search_api_key
- )
- app.state.config.SERPSTACK_API_KEY = form_data.web.search.serpstack_api_key
- app.state.config.SERPSTACK_HTTPS = form_data.web.search.serpstack_https
- app.state.config.SERPER_API_KEY = form_data.web.search.serper_api_key
- app.state.config.SERPLY_API_KEY = form_data.web.search.serply_api_key
- app.state.config.TAVILY_API_KEY = form_data.web.search.tavily_api_key
- app.state.config.RAG_WEB_SEARCH_RESULT_COUNT = form_data.web.search.result_count
- app.state.config.RAG_WEB_SEARCH_CONCURRENT_REQUESTS = (
- form_data.web.search.concurrent_requests
- )
- return {
- "status": True,
- "pdf_extract_images": app.state.config.PDF_EXTRACT_IMAGES,
- "chunk": {
- "chunk_size": app.state.config.CHUNK_SIZE,
- "chunk_overlap": app.state.config.CHUNK_OVERLAP,
- },
- "youtube": {
- "language": app.state.config.YOUTUBE_LOADER_LANGUAGE,
- "translation": app.state.YOUTUBE_LOADER_TRANSLATION,
- },
- "web": {
- "ssl_verification": app.state.config.ENABLE_RAG_WEB_LOADER_SSL_VERIFICATION,
- "search": {
- "enabled": app.state.config.ENABLE_RAG_WEB_SEARCH,
- "engine": app.state.config.RAG_WEB_SEARCH_ENGINE,
- "searxng_query_url": app.state.config.SEARXNG_QUERY_URL,
- "google_pse_api_key": app.state.config.GOOGLE_PSE_API_KEY,
- "google_pse_engine_id": app.state.config.GOOGLE_PSE_ENGINE_ID,
- "brave_search_api_key": app.state.config.BRAVE_SEARCH_API_KEY,
- "serpstack_api_key": app.state.config.SERPSTACK_API_KEY,
- "serpstack_https": app.state.config.SERPSTACK_HTTPS,
- "serper_api_key": app.state.config.SERPER_API_KEY,
- "serply_api_key": app.state.config.SERPLY_API_KEY,
- "tavily_api_key": app.state.config.TAVILY_API_KEY,
- "result_count": app.state.config.RAG_WEB_SEARCH_RESULT_COUNT,
- "concurrent_requests": app.state.config.RAG_WEB_SEARCH_CONCURRENT_REQUESTS,
- },
- },
- }
- @app.get("/template")
- async def get_rag_template(user=Depends(get_current_user)):
- return {
- "status": True,
- "template": app.state.config.RAG_TEMPLATE,
- }
- @app.get("/query/settings")
- async def get_query_settings(user=Depends(get_admin_user)):
- return {
- "status": True,
- "template": app.state.config.RAG_TEMPLATE,
- "k": app.state.config.TOP_K,
- "r": app.state.config.RELEVANCE_THRESHOLD,
- "hybrid": app.state.config.ENABLE_RAG_HYBRID_SEARCH,
- }
- class QuerySettingsForm(BaseModel):
- k: Optional[int] = None
- r: Optional[float] = None
- template: Optional[str] = None
- hybrid: Optional[bool] = None
- @app.post("/query/settings/update")
- async def update_query_settings(
- form_data: QuerySettingsForm, user=Depends(get_admin_user)
- ):
- app.state.config.RAG_TEMPLATE = (
- form_data.template if form_data.template else RAG_TEMPLATE
- )
- app.state.config.TOP_K = form_data.k if form_data.k else 4
- app.state.config.RELEVANCE_THRESHOLD = form_data.r if form_data.r else 0.0
- app.state.config.ENABLE_RAG_HYBRID_SEARCH = (
- form_data.hybrid if form_data.hybrid else False
- )
- return {
- "status": True,
- "template": app.state.config.RAG_TEMPLATE,
- "k": app.state.config.TOP_K,
- "r": app.state.config.RELEVANCE_THRESHOLD,
- "hybrid": app.state.config.ENABLE_RAG_HYBRID_SEARCH,
- }
- class QueryDocForm(BaseModel):
- collection_name: str
- query: str
- k: Optional[int] = None
- r: Optional[float] = None
- hybrid: Optional[bool] = None
- @app.post("/query/doc")
- def query_doc_handler(
- form_data: QueryDocForm,
- user=Depends(get_current_user),
- ):
- try:
- if app.state.config.ENABLE_RAG_HYBRID_SEARCH:
- return query_doc_with_hybrid_search(
- collection_name=form_data.collection_name,
- query=form_data.query,
- embedding_function=app.state.EMBEDDING_FUNCTION,
- k=form_data.k if form_data.k else app.state.config.TOP_K,
- reranking_function=app.state.sentence_transformer_rf,
- r=(
- form_data.r if form_data.r else app.state.config.RELEVANCE_THRESHOLD
- ),
- )
- else:
- return query_doc(
- collection_name=form_data.collection_name,
- query=form_data.query,
- embedding_function=app.state.EMBEDDING_FUNCTION,
- k=form_data.k if form_data.k else app.state.config.TOP_K,
- )
- except Exception as e:
- log.exception(e)
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=ERROR_MESSAGES.DEFAULT(e),
- )
- class QueryCollectionsForm(BaseModel):
- collection_names: List[str]
- query: str
- k: Optional[int] = None
- r: Optional[float] = None
- hybrid: Optional[bool] = None
- @app.post("/query/collection")
- def query_collection_handler(
- form_data: QueryCollectionsForm,
- user=Depends(get_current_user),
- ):
- try:
- if app.state.config.ENABLE_RAG_HYBRID_SEARCH:
- return query_collection_with_hybrid_search(
- collection_names=form_data.collection_names,
- query=form_data.query,
- embedding_function=app.state.EMBEDDING_FUNCTION,
- k=form_data.k if form_data.k else app.state.config.TOP_K,
- reranking_function=app.state.sentence_transformer_rf,
- r=(
- form_data.r if form_data.r else app.state.config.RELEVANCE_THRESHOLD
- ),
- )
- else:
- return query_collection(
- collection_names=form_data.collection_names,
- query=form_data.query,
- embedding_function=app.state.EMBEDDING_FUNCTION,
- k=form_data.k if form_data.k else app.state.config.TOP_K,
- )
- except Exception as e:
- log.exception(e)
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=ERROR_MESSAGES.DEFAULT(e),
- )
- @app.post("/youtube")
- def store_youtube_video(form_data: UrlForm, user=Depends(get_current_user)):
- try:
- loader = YoutubeLoader.from_youtube_url(
- form_data.url,
- add_video_info=True,
- language=app.state.config.YOUTUBE_LOADER_LANGUAGE,
- translation=app.state.YOUTUBE_LOADER_TRANSLATION,
- )
- data = loader.load()
- collection_name = form_data.collection_name
- if collection_name == "":
- collection_name = calculate_sha256_string(form_data.url)[:63]
- store_data_in_vector_db(data, collection_name, overwrite=True)
- return {
- "status": True,
- "collection_name": collection_name,
- "filename": form_data.url,
- }
- except Exception as e:
- log.exception(e)
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=ERROR_MESSAGES.DEFAULT(e),
- )
- @app.post("/web")
- def store_web(form_data: UrlForm, user=Depends(get_current_user)):
- # "https://www.gutenberg.org/files/1727/1727-h/1727-h.htm"
- try:
- loader = get_web_loader(
- form_data.url,
- verify_ssl=app.state.config.ENABLE_RAG_WEB_LOADER_SSL_VERIFICATION,
- )
- data = loader.load()
- collection_name = form_data.collection_name
- if collection_name == "":
- collection_name = calculate_sha256_string(form_data.url)[:63]
- store_data_in_vector_db(data, collection_name, overwrite=True)
- return {
- "status": True,
- "collection_name": collection_name,
- "filename": form_data.url,
- }
- except Exception as e:
- log.exception(e)
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=ERROR_MESSAGES.DEFAULT(e),
- )
- def get_web_loader(url: Union[str, Sequence[str]], verify_ssl: bool = True):
- # Check if the URL is valid
- if not validate_url(url):
- raise ValueError(ERROR_MESSAGES.INVALID_URL)
- return SafeWebBaseLoader(
- url,
- verify_ssl=verify_ssl,
- requests_per_second=RAG_WEB_SEARCH_CONCURRENT_REQUESTS,
- continue_on_failure=True,
- )
- def validate_url(url: Union[str, Sequence[str]]):
- if isinstance(url, str):
- if isinstance(validators.url(url), validators.ValidationError):
- raise ValueError(ERROR_MESSAGES.INVALID_URL)
- if not ENABLE_RAG_LOCAL_WEB_FETCH:
- # Local web fetch is disabled, filter out any URLs that resolve to private IP addresses
- parsed_url = urllib.parse.urlparse(url)
- # Get IPv4 and IPv6 addresses
- ipv4_addresses, ipv6_addresses = resolve_hostname(parsed_url.hostname)
- # Check if any of the resolved addresses are private
- # This is technically still vulnerable to DNS rebinding attacks, as we don't control WebBaseLoader
- for ip in ipv4_addresses:
- if validators.ipv4(ip, private=True):
- raise ValueError(ERROR_MESSAGES.INVALID_URL)
- for ip in ipv6_addresses:
- if validators.ipv6(ip, private=True):
- raise ValueError(ERROR_MESSAGES.INVALID_URL)
- return True
- elif isinstance(url, Sequence):
- return all(validate_url(u) for u in url)
- else:
- return False
- def resolve_hostname(hostname):
- # Get address information
- addr_info = socket.getaddrinfo(hostname, None)
- # Extract IP addresses from address information
- ipv4_addresses = [info[4][0] for info in addr_info if info[0] == socket.AF_INET]
- ipv6_addresses = [info[4][0] for info in addr_info if info[0] == socket.AF_INET6]
- return ipv4_addresses, ipv6_addresses
- def search_web(engine: str, query: str) -> list[SearchResult]:
- """Search the web using a search engine and return the results as a list of SearchResult objects.
- Will look for a search engine API key in environment variables in the following order:
- - SEARXNG_QUERY_URL
- - GOOGLE_PSE_API_KEY + GOOGLE_PSE_ENGINE_ID
- - BRAVE_SEARCH_API_KEY
- - SERPSTACK_API_KEY
- - SERPER_API_KEY
- - SERPLY_API_KEY
- - TAVILY_API_KEY
- Args:
- query (str): The query to search for
- """
- # TODO: add playwright to search the web
- if engine == "searxng":
- if app.state.config.SEARXNG_QUERY_URL:
- return search_searxng(
- app.state.config.SEARXNG_QUERY_URL,
- query,
- app.state.config.RAG_WEB_SEARCH_RESULT_COUNT,
- )
- else:
- raise Exception("No SEARXNG_QUERY_URL found in environment variables")
- elif engine == "google_pse":
- if (
- app.state.config.GOOGLE_PSE_API_KEY
- and app.state.config.GOOGLE_PSE_ENGINE_ID
- ):
- return search_google_pse(
- app.state.config.GOOGLE_PSE_API_KEY,
- app.state.config.GOOGLE_PSE_ENGINE_ID,
- query,
- app.state.config.RAG_WEB_SEARCH_RESULT_COUNT,
- )
- else:
- raise Exception(
- "No GOOGLE_PSE_API_KEY or GOOGLE_PSE_ENGINE_ID found in environment variables"
- )
- elif engine == "brave":
- if app.state.config.BRAVE_SEARCH_API_KEY:
- return search_brave(
- app.state.config.BRAVE_SEARCH_API_KEY,
- query,
- app.state.config.RAG_WEB_SEARCH_RESULT_COUNT,
- )
- else:
- raise Exception("No BRAVE_SEARCH_API_KEY found in environment variables")
- elif engine == "serpstack":
- if app.state.config.SERPSTACK_API_KEY:
- return search_serpstack(
- app.state.config.SERPSTACK_API_KEY,
- query,
- app.state.config.RAG_WEB_SEARCH_RESULT_COUNT,
- https_enabled=app.state.config.SERPSTACK_HTTPS,
- )
- else:
- raise Exception("No SERPSTACK_API_KEY found in environment variables")
- elif engine == "serper":
- if app.state.config.SERPER_API_KEY:
- return search_serper(
- app.state.config.SERPER_API_KEY,
- query,
- app.state.config.RAG_WEB_SEARCH_RESULT_COUNT,
- )
- else:
- raise Exception("No SERPER_API_KEY found in environment variables")
- elif engine == "serply":
- if app.state.config.SERPLY_API_KEY:
- return search_serply(
- app.state.config.SERPLY_API_KEY,
- query,
- app.state.config.RAG_WEB_SEARCH_RESULT_COUNT,
- )
- else:
- raise Exception("No SERPLY_API_KEY found in environment variables")
- elif engine == "duckduckgo":
- return search_duckduckgo(query, app.state.config.RAG_WEB_SEARCH_RESULT_COUNT)
- elif engine == "tavily":
- if app.state.config.TAVILY_API_KEY:
- return search_tavily(
- app.state.config.TAVILY_API_KEY,
- query,
- app.state.config.RAG_WEB_SEARCH_RESULT_COUNT,
- )
- else:
- raise Exception("No TAVILY_API_KEY found in environment variables")
- else:
- raise Exception("No search engine API key found in environment variables")
- @app.post("/web/search")
- def store_web_search(form_data: SearchForm, user=Depends(get_current_user)):
- try:
- logging.info(
- f"trying to web search with {app.state.config.RAG_WEB_SEARCH_ENGINE, form_data.query}"
- )
- web_results = search_web(
- app.state.config.RAG_WEB_SEARCH_ENGINE, form_data.query
- )
- except Exception as e:
- log.exception(e)
- print(e)
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=ERROR_MESSAGES.WEB_SEARCH_ERROR(e),
- )
- try:
- urls = [result.link for result in web_results]
- loader = get_web_loader(urls)
- data = loader.load()
- collection_name = form_data.collection_name
- if collection_name == "":
- collection_name = calculate_sha256_string(form_data.query)[:63]
- store_data_in_vector_db(data, collection_name, overwrite=True)
- return {
- "status": True,
- "collection_name": collection_name,
- "filenames": urls,
- }
- except Exception as e:
- log.exception(e)
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=ERROR_MESSAGES.DEFAULT(e),
- )
- def store_data_in_vector_db(data, collection_name, overwrite: bool = False) -> bool:
- text_splitter = RecursiveCharacterTextSplitter(
- chunk_size=app.state.config.CHUNK_SIZE,
- chunk_overlap=app.state.config.CHUNK_OVERLAP,
- add_start_index=True,
- )
- docs = text_splitter.split_documents(data)
- if len(docs) > 0:
- log.info(f"store_data_in_vector_db {docs}")
- return store_docs_in_vector_db(docs, collection_name, overwrite), None
- else:
- raise ValueError(ERROR_MESSAGES.EMPTY_CONTENT)
- def store_text_in_vector_db(
- text, metadata, collection_name, overwrite: bool = False
- ) -> bool:
- text_splitter = RecursiveCharacterTextSplitter(
- chunk_size=app.state.config.CHUNK_SIZE,
- chunk_overlap=app.state.config.CHUNK_OVERLAP,
- add_start_index=True,
- )
- docs = text_splitter.create_documents([text], metadatas=[metadata])
- return store_docs_in_vector_db(docs, collection_name, overwrite)
- def store_docs_in_vector_db(docs, collection_name, overwrite: bool = False) -> bool:
- log.info(f"store_docs_in_vector_db {docs} {collection_name}")
- texts = [doc.page_content for doc in docs]
- metadatas = [doc.metadata for doc in docs]
- # ChromaDB does not like datetime formats
- # for meta-data so convert them to string.
- for metadata in metadatas:
- for key, value in metadata.items():
- if isinstance(value, datetime):
- metadata[key] = str(value)
- try:
- if overwrite:
- for collection in CHROMA_CLIENT.list_collections():
- if collection_name == collection.name:
- log.info(f"deleting existing collection {collection_name}")
- CHROMA_CLIENT.delete_collection(name=collection_name)
- collection = CHROMA_CLIENT.create_collection(name=collection_name)
- embedding_func = get_embedding_function(
- app.state.config.RAG_EMBEDDING_ENGINE,
- app.state.config.RAG_EMBEDDING_MODEL,
- app.state.sentence_transformer_ef,
- app.state.config.OPENAI_API_KEY,
- app.state.config.OPENAI_API_BASE_URL,
- app.state.config.RAG_EMBEDDING_OPENAI_BATCH_SIZE,
- )
- embedding_texts = list(map(lambda x: x.replace("\n", " "), texts))
- embeddings = embedding_func(embedding_texts)
- for batch in create_batches(
- api=CHROMA_CLIENT,
- ids=[str(uuid.uuid4()) for _ in texts],
- metadatas=metadatas,
- embeddings=embeddings,
- documents=texts,
- ):
- collection.add(*batch)
- return True
- except Exception as e:
- log.exception(e)
- if e.__class__.__name__ == "UniqueConstraintError":
- return True
- return False
- def get_loader(filename: str, file_content_type: str, file_path: str):
- file_ext = filename.split(".")[-1].lower()
- known_type = True
- known_source_ext = [
- "go",
- "py",
- "java",
- "sh",
- "bat",
- "ps1",
- "cmd",
- "js",
- "ts",
- "css",
- "cpp",
- "hpp",
- "h",
- "c",
- "cs",
- "sql",
- "log",
- "ini",
- "pl",
- "pm",
- "r",
- "dart",
- "dockerfile",
- "env",
- "php",
- "hs",
- "hsc",
- "lua",
- "nginxconf",
- "conf",
- "m",
- "mm",
- "plsql",
- "perl",
- "rb",
- "rs",
- "db2",
- "scala",
- "bash",
- "swift",
- "vue",
- "svelte",
- "msg",
- ]
- if file_ext == "pdf":
- loader = PyPDFLoader(
- file_path, extract_images=app.state.config.PDF_EXTRACT_IMAGES
- )
- elif file_ext == "csv":
- loader = CSVLoader(file_path)
- elif file_ext == "rst":
- loader = UnstructuredRSTLoader(file_path, mode="elements")
- elif file_ext == "xml":
- loader = UnstructuredXMLLoader(file_path)
- elif file_ext in ["htm", "html"]:
- loader = BSHTMLLoader(file_path, open_encoding="unicode_escape")
- elif file_ext == "md":
- loader = UnstructuredMarkdownLoader(file_path)
- elif file_content_type == "application/epub+zip":
- loader = UnstructuredEPubLoader(file_path)
- elif (
- file_content_type
- == "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
- or file_ext in ["doc", "docx"]
- ):
- loader = Docx2txtLoader(file_path)
- elif file_content_type in [
- "application/vnd.ms-excel",
- "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
- ] or file_ext in ["xls", "xlsx"]:
- loader = UnstructuredExcelLoader(file_path)
- elif file_content_type in [
- "application/vnd.ms-powerpoint",
- "application/vnd.openxmlformats-officedocument.presentationml.presentation",
- ] or file_ext in ["ppt", "pptx"]:
- loader = UnstructuredPowerPointLoader(file_path)
- elif file_ext == "msg":
- loader = OutlookMessageLoader(file_path)
- elif file_ext in known_source_ext or (
- file_content_type and file_content_type.find("text/") >= 0
- ):
- loader = TextLoader(file_path, autodetect_encoding=True)
- else:
- loader = TextLoader(file_path, autodetect_encoding=True)
- known_type = False
- return loader, known_type
- @app.post("/doc")
- def store_doc(
- collection_name: Optional[str] = Form(None),
- file: UploadFile = File(...),
- user=Depends(get_current_user),
- ):
- # "https://www.gutenberg.org/files/1727/1727-h/1727-h.htm"
- log.info(f"file.content_type: {file.content_type}")
- try:
- unsanitized_filename = file.filename
- filename = os.path.basename(unsanitized_filename)
- file_path = f"{UPLOAD_DIR}/{filename}"
- contents = file.file.read()
- with open(file_path, "wb") as f:
- f.write(contents)
- f.close()
- f = open(file_path, "rb")
- if collection_name == None:
- collection_name = calculate_sha256(f)[:63]
- f.close()
- loader, known_type = get_loader(filename, file.content_type, file_path)
- data = loader.load()
- try:
- result = store_data_in_vector_db(data, collection_name)
- if result:
- return {
- "status": True,
- "collection_name": collection_name,
- "filename": filename,
- "known_type": known_type,
- }
- except Exception as e:
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=e,
- )
- except Exception as e:
- log.exception(e)
- if "No pandoc was found" in str(e):
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=ERROR_MESSAGES.PANDOC_NOT_INSTALLED,
- )
- else:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=ERROR_MESSAGES.DEFAULT(e),
- )
- class TextRAGForm(BaseModel):
- name: str
- content: str
- collection_name: Optional[str] = None
- @app.post("/text")
- def store_text(
- form_data: TextRAGForm,
- user=Depends(get_current_user),
- ):
- collection_name = form_data.collection_name
- if collection_name == None:
- collection_name = calculate_sha256_string(form_data.content)
- result = store_text_in_vector_db(
- form_data.content,
- metadata={"name": form_data.name, "created_by": user.id},
- collection_name=collection_name,
- )
- if result:
- return {"status": True, "collection_name": collection_name}
- else:
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=ERROR_MESSAGES.DEFAULT(),
- )
- @app.get("/scan")
- def scan_docs_dir(user=Depends(get_admin_user)):
- for path in Path(DOCS_DIR).rglob("./**/*"):
- try:
- if path.is_file() and not path.name.startswith("."):
- tags = extract_folders_after_data_docs(path)
- filename = path.name
- file_content_type = mimetypes.guess_type(path)
- f = open(path, "rb")
- collection_name = calculate_sha256(f)[:63]
- f.close()
- loader, known_type = get_loader(
- filename, file_content_type[0], str(path)
- )
- data = loader.load()
- try:
- result = store_data_in_vector_db(data, collection_name)
- if result:
- sanitized_filename = sanitize_filename(filename)
- doc = Documents.get_doc_by_name(sanitized_filename)
- if doc == None:
- doc = Documents.insert_new_doc(
- user.id,
- DocumentForm(
- **{
- "name": sanitized_filename,
- "title": filename,
- "collection_name": collection_name,
- "filename": filename,
- "content": (
- json.dumps(
- {
- "tags": list(
- map(
- lambda name: {"name": name},
- tags,
- )
- )
- }
- )
- if len(tags)
- else "{}"
- ),
- }
- ),
- )
- except Exception as e:
- log.exception(e)
- pass
- except Exception as e:
- log.exception(e)
- return True
- @app.get("/reset/db")
- def reset_vector_db(user=Depends(get_admin_user)):
- CHROMA_CLIENT.reset()
- @app.get("/reset/uploads")
- def reset_upload_dir(user=Depends(get_admin_user)) -> bool:
- folder = f"{UPLOAD_DIR}"
- try:
- # Check if the directory exists
- if os.path.exists(folder):
- # Iterate over all the files and directories in the specified directory
- for filename in os.listdir(folder):
- file_path = os.path.join(folder, filename)
- try:
- if os.path.isfile(file_path) or os.path.islink(file_path):
- os.unlink(file_path) # Remove the file or link
- elif os.path.isdir(file_path):
- shutil.rmtree(file_path) # Remove the directory
- except Exception as e:
- print(f"Failed to delete {file_path}. Reason: {e}")
- else:
- print(f"The directory {folder} does not exist")
- except Exception as e:
- print(f"Failed to process the directory {folder}. Reason: {e}")
- return True
- @app.get("/reset")
- def reset(user=Depends(get_admin_user)) -> bool:
- folder = f"{UPLOAD_DIR}"
- for filename in os.listdir(folder):
- file_path = os.path.join(folder, filename)
- try:
- if os.path.isfile(file_path) or os.path.islink(file_path):
- os.unlink(file_path)
- elif os.path.isdir(file_path):
- shutil.rmtree(file_path)
- except Exception as e:
- log.error("Failed to delete %s. Reason: %s" % (file_path, e))
- try:
- CHROMA_CLIENT.reset()
- except Exception as e:
- log.exception(e)
- return True
- class SafeWebBaseLoader(WebBaseLoader):
- """WebBaseLoader with enhanced error handling for URLs."""
- def lazy_load(self) -> Iterator[Document]:
- """Lazy load text from the url(s) in web_path with error handling."""
- for path in self.web_paths:
- try:
- soup = self._scrape(path, bs_kwargs=self.bs_kwargs)
- text = soup.get_text(**self.bs_get_text_kwargs)
- # Build metadata
- metadata = {"source": path}
- if title := soup.find("title"):
- metadata["title"] = title.get_text()
- if description := soup.find("meta", attrs={"name": "description"}):
- metadata["description"] = description.get(
- "content", "No description found."
- )
- if html := soup.find("html"):
- metadata["language"] = html.get("lang", "No language found.")
- yield Document(page_content=text, metadata=metadata)
- except Exception as e:
- # Log the error and continue with the next URL
- log.error(f"Error loading {path}: {e}")
- if ENV == "dev":
- @app.get("/ef")
- async def get_embeddings():
- return {"result": app.state.EMBEDDING_FUNCTION("hello world")}
- @app.get("/ef/{text}")
- async def get_embeddings_text(text: str):
- return {"result": app.state.EMBEDDING_FUNCTION(text)}
|