ingest.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. #!/usr/bin/env python3
  2. import os
  3. import glob
  4. from typing import List
  5. from dotenv import load_dotenv
  6. from multiprocessing import Pool
  7. from tqdm import tqdm
  8. from langchain.document_loaders import (
  9. CSVLoader,
  10. EverNoteLoader,
  11. PyMuPDFLoader,
  12. TextLoader,
  13. UnstructuredEmailLoader,
  14. UnstructuredEPubLoader,
  15. UnstructuredHTMLLoader,
  16. UnstructuredMarkdownLoader,
  17. UnstructuredODTLoader,
  18. UnstructuredPowerPointLoader,
  19. UnstructuredWordDocumentLoader,
  20. )
  21. from langchain.text_splitter import RecursiveCharacterTextSplitter
  22. from langchain.vectorstores import Chroma
  23. from langchain.embeddings import HuggingFaceEmbeddings
  24. from langchain.docstore.document import Document
  25. from constants import CHROMA_SETTINGS
  26. load_dotenv()
  27. # Load environment variables
  28. persist_directory = os.environ.get('PERSIST_DIRECTORY', 'db')
  29. source_directory = os.environ.get('SOURCE_DIRECTORY', 'source_documents')
  30. embeddings_model_name = os.environ.get('EMBEDDINGS_MODEL_NAME', 'all-MiniLM-L6-v2')
  31. chunk_size = 500
  32. chunk_overlap = 50
  33. # Custom document loaders
  34. class MyElmLoader(UnstructuredEmailLoader):
  35. """Wrapper to fallback to text/plain when default does not work"""
  36. def load(self) -> List[Document]:
  37. """Wrapper adding fallback for elm without html"""
  38. try:
  39. try:
  40. doc = UnstructuredEmailLoader.load(self)
  41. except ValueError as e:
  42. if 'text/html content not found in email' in str(e):
  43. # Try plain text
  44. self.unstructured_kwargs["content_source"]="text/plain"
  45. doc = UnstructuredEmailLoader.load(self)
  46. else:
  47. raise
  48. except Exception as e:
  49. # Add file_path to exception message
  50. raise type(e)(f"{self.file_path}: {e}") from e
  51. return doc
  52. # Map file extensions to document loaders and their arguments
  53. LOADER_MAPPING = {
  54. ".csv": (CSVLoader, {}),
  55. # ".docx": (Docx2txtLoader, {}),
  56. ".doc": (UnstructuredWordDocumentLoader, {}),
  57. ".docx": (UnstructuredWordDocumentLoader, {}),
  58. ".enex": (EverNoteLoader, {}),
  59. ".eml": (MyElmLoader, {}),
  60. ".epub": (UnstructuredEPubLoader, {}),
  61. ".html": (UnstructuredHTMLLoader, {}),
  62. ".md": (UnstructuredMarkdownLoader, {}),
  63. ".odt": (UnstructuredODTLoader, {}),
  64. ".pdf": (PyMuPDFLoader, {}),
  65. ".ppt": (UnstructuredPowerPointLoader, {}),
  66. ".pptx": (UnstructuredPowerPointLoader, {}),
  67. ".txt": (TextLoader, {"encoding": "utf8"}),
  68. # Add more mappings for other file extensions and loaders as needed
  69. }
  70. def load_single_document(file_path: str) -> List[Document]:
  71. ext = "." + file_path.rsplit(".", 1)[-1]
  72. if ext in LOADER_MAPPING:
  73. loader_class, loader_args = LOADER_MAPPING[ext]
  74. loader = loader_class(file_path, **loader_args)
  75. return loader.load()
  76. raise ValueError(f"Unsupported file extension '{ext}'")
  77. def load_documents(source_dir: str, ignored_files: List[str] = []) -> List[Document]:
  78. """
  79. Loads all documents from the source documents directory, ignoring specified files
  80. """
  81. all_files = []
  82. for ext in LOADER_MAPPING:
  83. all_files.extend(
  84. glob.glob(os.path.join(source_dir, f"**/*{ext}"), recursive=True)
  85. )
  86. filtered_files = [file_path for file_path in all_files if file_path not in ignored_files]
  87. with Pool(processes=os.cpu_count()) as pool:
  88. results = []
  89. with tqdm(total=len(filtered_files), desc='Loading new documents', ncols=80) as pbar:
  90. for i, docs in enumerate(pool.imap_unordered(load_single_document, filtered_files)):
  91. results.extend(docs)
  92. pbar.update()
  93. return results
  94. def process_documents(ignored_files: List[str] = []) -> List[Document]:
  95. """
  96. Load documents and split in chunks
  97. """
  98. print(f"Loading documents from {source_directory}")
  99. documents = load_documents(source_directory, ignored_files)
  100. if not documents:
  101. print("No new documents to load")
  102. exit(0)
  103. print(f"Loaded {len(documents)} new documents from {source_directory}")
  104. text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
  105. texts = text_splitter.split_documents(documents)
  106. print(f"Split into {len(texts)} chunks of text (max. {chunk_size} tokens each)")
  107. return texts
  108. def does_vectorstore_exist(persist_directory: str) -> bool:
  109. """
  110. Checks if vectorstore exists
  111. """
  112. if os.path.exists(os.path.join(persist_directory, 'index')):
  113. if os.path.exists(os.path.join(persist_directory, 'chroma-collections.parquet')) and os.path.exists(os.path.join(persist_directory, 'chroma-embeddings.parquet')):
  114. list_index_files = glob.glob(os.path.join(persist_directory, 'index/*.bin'))
  115. list_index_files += glob.glob(os.path.join(persist_directory, 'index/*.pkl'))
  116. # At least 3 documents are needed in a working vectorstore
  117. if len(list_index_files) > 3:
  118. return True
  119. return False
  120. def main():
  121. # Create embeddings
  122. embeddings = HuggingFaceEmbeddings(model_name=embeddings_model_name)
  123. if does_vectorstore_exist(persist_directory):
  124. # Update and store locally vectorstore
  125. print(f"Appending to existing vectorstore at {persist_directory}")
  126. db = Chroma(persist_directory=persist_directory, embedding_function=embeddings, client_settings=CHROMA_SETTINGS)
  127. collection = db.get()
  128. texts = process_documents([metadata['source'] for metadata in collection['metadatas']])
  129. print(f"Creating embeddings. May take some minutes...")
  130. db.add_documents(texts)
  131. else:
  132. # Create and store locally vectorstore
  133. print("Creating new vectorstore")
  134. texts = process_documents()
  135. print(f"Creating embeddings. May take some minutes...")
  136. db = Chroma.from_documents(texts, embeddings, persist_directory=persist_directory, client_settings=CHROMA_SETTINGS)
  137. db.persist()
  138. db = None
  139. print(f"Ingestion complete! You can now run privateGPT.py to query your documents")
  140. if __name__ == "__main__":
  141. main()