ingest.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. #!/usr/bin/env python3
  2. import os
  3. import glob
  4. from typing import List
  5. from multiprocessing import Pool
  6. from tqdm import tqdm
  7. from langchain.document_loaders import (
  8. CSVLoader,
  9. EverNoteLoader,
  10. PyMuPDFLoader,
  11. TextLoader,
  12. UnstructuredEmailLoader,
  13. UnstructuredEPubLoader,
  14. UnstructuredHTMLLoader,
  15. UnstructuredMarkdownLoader,
  16. UnstructuredODTLoader,
  17. UnstructuredPowerPointLoader,
  18. UnstructuredWordDocumentLoader,
  19. )
  20. from langchain.text_splitter import RecursiveCharacterTextSplitter
  21. from langchain.vectorstores import Chroma
  22. from langchain.embeddings import HuggingFaceEmbeddings
  23. from langchain.docstore.document import Document
  24. from constants import CHROMA_SETTINGS
  25. # Load environment variables
  26. persist_directory = os.environ.get('PERSIST_DIRECTORY', 'db')
  27. source_directory = os.environ.get('SOURCE_DIRECTORY', 'source_documents')
  28. embeddings_model_name = os.environ.get('EMBEDDINGS_MODEL_NAME', 'all-MiniLM-L6-v2')
  29. chunk_size = 500
  30. chunk_overlap = 50
  31. # Custom document loaders
  32. class MyElmLoader(UnstructuredEmailLoader):
  33. """Wrapper to fallback to text/plain when default does not work"""
  34. def load(self) -> List[Document]:
  35. """Wrapper adding fallback for elm without html"""
  36. try:
  37. try:
  38. doc = UnstructuredEmailLoader.load(self)
  39. except ValueError as e:
  40. if 'text/html content not found in email' in str(e):
  41. # Try plain text
  42. self.unstructured_kwargs["content_source"]="text/plain"
  43. doc = UnstructuredEmailLoader.load(self)
  44. else:
  45. raise
  46. except Exception as e:
  47. # Add file_path to exception message
  48. raise type(e)(f"{self.file_path}: {e}") from e
  49. return doc
  50. # Map file extensions to document loaders and their arguments
  51. LOADER_MAPPING = {
  52. ".csv": (CSVLoader, {}),
  53. # ".docx": (Docx2txtLoader, {}),
  54. ".doc": (UnstructuredWordDocumentLoader, {}),
  55. ".docx": (UnstructuredWordDocumentLoader, {}),
  56. ".enex": (EverNoteLoader, {}),
  57. ".eml": (MyElmLoader, {}),
  58. ".epub": (UnstructuredEPubLoader, {}),
  59. ".html": (UnstructuredHTMLLoader, {}),
  60. ".md": (UnstructuredMarkdownLoader, {}),
  61. ".odt": (UnstructuredODTLoader, {}),
  62. ".pdf": (PyMuPDFLoader, {}),
  63. ".ppt": (UnstructuredPowerPointLoader, {}),
  64. ".pptx": (UnstructuredPowerPointLoader, {}),
  65. ".txt": (TextLoader, {"encoding": "utf8"}),
  66. # Add more mappings for other file extensions and loaders as needed
  67. }
  68. def load_single_document(file_path: str) -> List[Document]:
  69. if os.path.getsize(file_path) != 0:
  70. filename, ext = os.path.splitext(file_path)
  71. if ext in LOADER_MAPPING:
  72. loader_class, loader_args = LOADER_MAPPING[ext]
  73. try:
  74. loader = loader_class(file_path, **loader_args)
  75. if loader:
  76. return loader.load()
  77. except:
  78. print(f"Corrupted file {file_path}. Ignoring it.")
  79. else:
  80. print(f"Unsupported file {file_path}. Ignoring it.")
  81. else:
  82. print(f"Empty file {file_path}. Ignoring it.")
  83. def load_documents(source_dir: str, ignored_files: List[str] = []) -> List[Document]:
  84. """
  85. Loads all documents from the source documents directory, ignoring specified files
  86. """
  87. all_files = []
  88. for ext in LOADER_MAPPING:
  89. all_files.extend(
  90. glob.glob(os.path.join(source_dir, f"**/*{ext}"), recursive=True)
  91. )
  92. filtered_files = [file_path for file_path in all_files if file_path not in ignored_files]
  93. with Pool(processes=os.cpu_count()) as pool:
  94. results = []
  95. with tqdm(total=len(filtered_files), desc='Loading new documents', ncols=80) as pbar:
  96. for i, docs in enumerate(pool.imap_unordered(load_single_document, filtered_files)):
  97. if docs:
  98. results.extend(docs)
  99. pbar.update()
  100. return results
  101. def process_documents(ignored_files: List[str] = []) -> List[Document]:
  102. """
  103. Load documents and split in chunks
  104. """
  105. print(f"Loading documents from {source_directory}")
  106. documents = load_documents(source_directory, ignored_files)
  107. if not documents:
  108. print("No new documents to load")
  109. exit(0)
  110. print(f"Loaded {len(documents)} new documents from {source_directory}")
  111. text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
  112. texts = text_splitter.split_documents(documents)
  113. print(f"Split into {len(texts)} chunks of text (max. {chunk_size} tokens each)")
  114. return texts
  115. def does_vectorstore_exist(persist_directory: str) -> bool:
  116. """
  117. Checks if vectorstore exists
  118. """
  119. if os.path.exists(os.path.join(persist_directory, 'index')):
  120. if os.path.exists(os.path.join(persist_directory, 'chroma-collections.parquet')) and os.path.exists(os.path.join(persist_directory, 'chroma-embeddings.parquet')):
  121. list_index_files = glob.glob(os.path.join(persist_directory, 'index/*.bin'))
  122. list_index_files += glob.glob(os.path.join(persist_directory, 'index/*.pkl'))
  123. # At least 3 documents are needed in a working vectorstore
  124. if len(list_index_files) > 3:
  125. return True
  126. return False
  127. def main():
  128. # Create embeddings
  129. embeddings = HuggingFaceEmbeddings(model_name=embeddings_model_name)
  130. if does_vectorstore_exist(persist_directory):
  131. # Update and store locally vectorstore
  132. print(f"Appending to existing vectorstore at {persist_directory}")
  133. db = Chroma(persist_directory=persist_directory, embedding_function=embeddings, client_settings=CHROMA_SETTINGS)
  134. collection = db.get()
  135. texts = process_documents([metadata['source'] for metadata in collection['metadatas']])
  136. print(f"Creating embeddings. May take some minutes...")
  137. db.add_documents(texts)
  138. else:
  139. # Create and store locally vectorstore
  140. print("Creating new vectorstore")
  141. texts = process_documents()
  142. print(f"Creating embeddings. May take some minutes...")
  143. db = Chroma.from_documents(texts, embeddings, persist_directory=persist_directory)
  144. db.persist()
  145. db = None
  146. print(f"Ingestion complete! You can now run privateGPT.py to query your documents")
  147. if __name__ == "__main__":
  148. main()