123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- #!/usr/bin/env python3
- import os
- import glob
- from typing import List
- from multiprocessing import Pool
- from tqdm import tqdm
- from langchain.document_loaders import (
- CSVLoader,
- EverNoteLoader,
- PyMuPDFLoader,
- TextLoader,
- UnstructuredEmailLoader,
- UnstructuredEPubLoader,
- UnstructuredHTMLLoader,
- UnstructuredMarkdownLoader,
- UnstructuredODTLoader,
- UnstructuredPowerPointLoader,
- UnstructuredWordDocumentLoader,
- )
- from langchain.text_splitter import RecursiveCharacterTextSplitter
- from langchain.vectorstores import Chroma
- from langchain.embeddings import HuggingFaceEmbeddings
- from langchain.docstore.document import Document
- from constants import CHROMA_SETTINGS
- # Load environment variables
- persist_directory = os.environ.get('PERSIST_DIRECTORY', 'db')
- source_directory = os.environ.get('SOURCE_DIRECTORY', 'source_documents')
- embeddings_model_name = os.environ.get('EMBEDDINGS_MODEL_NAME', 'all-MiniLM-L6-v2')
- chunk_size = 500
- chunk_overlap = 50
- # Custom document loaders
- class MyElmLoader(UnstructuredEmailLoader):
- """Wrapper to fallback to text/plain when default does not work"""
- def load(self) -> List[Document]:
- """Wrapper adding fallback for elm without html"""
- try:
- try:
- doc = UnstructuredEmailLoader.load(self)
- except ValueError as e:
- if 'text/html content not found in email' in str(e):
- # Try plain text
- self.unstructured_kwargs["content_source"]="text/plain"
- doc = UnstructuredEmailLoader.load(self)
- else:
- raise
- except Exception as e:
- # Add file_path to exception message
- raise type(e)(f"{self.file_path}: {e}") from e
- return doc
- # Map file extensions to document loaders and their arguments
- LOADER_MAPPING = {
- ".csv": (CSVLoader, {}),
- # ".docx": (Docx2txtLoader, {}),
- ".doc": (UnstructuredWordDocumentLoader, {}),
- ".docx": (UnstructuredWordDocumentLoader, {}),
- ".enex": (EverNoteLoader, {}),
- ".eml": (MyElmLoader, {}),
- ".epub": (UnstructuredEPubLoader, {}),
- ".html": (UnstructuredHTMLLoader, {}),
- ".md": (UnstructuredMarkdownLoader, {}),
- ".odt": (UnstructuredODTLoader, {}),
- ".pdf": (PyMuPDFLoader, {}),
- ".ppt": (UnstructuredPowerPointLoader, {}),
- ".pptx": (UnstructuredPowerPointLoader, {}),
- ".txt": (TextLoader, {"encoding": "utf8"}),
- # Add more mappings for other file extensions and loaders as needed
- }
- def load_single_document(file_path: str) -> List[Document]:
- if os.path.getsize(file_path) != 0:
- filename, ext = os.path.splitext(file_path)
- if ext in LOADER_MAPPING:
- loader_class, loader_args = LOADER_MAPPING[ext]
- try:
- loader = loader_class(file_path, **loader_args)
- if loader:
- return loader.load()
- except:
- print(f"Corrupted file {file_path}. Ignoring it.")
- else:
- print(f"Unsupported file {file_path}. Ignoring it.")
- else:
- print(f"Empty file {file_path}. Ignoring it.")
- def load_documents(source_dir: str, ignored_files: List[str] = []) -> List[Document]:
- """
- Loads all documents from the source documents directory, ignoring specified files
- """
- all_files = []
- for ext in LOADER_MAPPING:
- all_files.extend(
- glob.glob(os.path.join(source_dir, f"**/*{ext}"), recursive=True)
- )
- filtered_files = [file_path for file_path in all_files if file_path not in ignored_files]
- with Pool(processes=os.cpu_count()) as pool:
- results = []
- with tqdm(total=len(filtered_files), desc='Loading new documents', ncols=80) as pbar:
- for i, docs in enumerate(pool.imap_unordered(load_single_document, filtered_files)):
- if docs:
- results.extend(docs)
- pbar.update()
- return results
- def process_documents(ignored_files: List[str] = []) -> List[Document]:
- """
- Load documents and split in chunks
- """
- print(f"Loading documents from {source_directory}")
- documents = load_documents(source_directory, ignored_files)
- if not documents:
- print("No new documents to load")
- exit(0)
- print(f"Loaded {len(documents)} new documents from {source_directory}")
- text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
- texts = text_splitter.split_documents(documents)
- print(f"Split into {len(texts)} chunks of text (max. {chunk_size} tokens each)")
- return texts
- def does_vectorstore_exist(persist_directory: str) -> bool:
- """
- Checks if vectorstore exists
- """
- if os.path.exists(os.path.join(persist_directory, 'index')):
- if os.path.exists(os.path.join(persist_directory, 'chroma-collections.parquet')) and os.path.exists(os.path.join(persist_directory, 'chroma-embeddings.parquet')):
- list_index_files = glob.glob(os.path.join(persist_directory, 'index/*.bin'))
- list_index_files += glob.glob(os.path.join(persist_directory, 'index/*.pkl'))
- # At least 3 documents are needed in a working vectorstore
- if len(list_index_files) > 3:
- return True
- return False
- def main():
- # Create embeddings
- embeddings = HuggingFaceEmbeddings(model_name=embeddings_model_name)
- if does_vectorstore_exist(persist_directory):
- # Update and store locally vectorstore
- print(f"Appending to existing vectorstore at {persist_directory}")
- db = Chroma(persist_directory=persist_directory, embedding_function=embeddings, client_settings=CHROMA_SETTINGS)
- collection = db.get()
- texts = process_documents([metadata['source'] for metadata in collection['metadatas']])
- print(f"Creating embeddings. May take some minutes...")
- db.add_documents(texts)
- else:
- # Create and store locally vectorstore
- print("Creating new vectorstore")
- texts = process_documents()
- print(f"Creating embeddings. May take some minutes...")
- db = Chroma.from_documents(texts, embeddings, persist_directory=persist_directory)
- db.persist()
- db = None
- print(f"Ingestion complete! You can now run privateGPT.py to query your documents")
- if __name__ == "__main__":
- main()
|