ingest.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. #!/usr/bin/env python3
  2. import os
  3. import glob
  4. from typing import List
  5. from multiprocessing import Pool
  6. from tqdm import tqdm
  7. from langchain.document_loaders import (
  8. CSVLoader,
  9. EverNoteLoader,
  10. PyMuPDFLoader,
  11. TextLoader,
  12. UnstructuredEmailLoader,
  13. UnstructuredEPubLoader,
  14. UnstructuredHTMLLoader,
  15. UnstructuredMarkdownLoader,
  16. UnstructuredODTLoader,
  17. UnstructuredPowerPointLoader,
  18. UnstructuredWordDocumentLoader,
  19. )
  20. from langchain.text_splitter import RecursiveCharacterTextSplitter
  21. from langchain.vectorstores import Chroma
  22. from langchain.embeddings import HuggingFaceEmbeddings
  23. from langchain.docstore.document import Document
  24. from constants import CHROMA_SETTINGS
  25. # Load environment variables
  26. persist_directory = os.environ.get('PERSIST_DIRECTORY', 'db')
  27. source_directory = os.environ.get('SOURCE_DIRECTORY', 'source_documents')
  28. embeddings_model_name = os.environ.get('EMBEDDINGS_MODEL_NAME', 'all-MiniLM-L6-v2')
  29. chunk_size = 500
  30. chunk_overlap = 50
  31. # Custom document loaders
  32. class MyElmLoader(UnstructuredEmailLoader):
  33. """Wrapper to fallback to text/plain when default does not work"""
  34. def load(self) -> List[Document]:
  35. """Wrapper adding fallback for elm without html"""
  36. try:
  37. try:
  38. doc = UnstructuredEmailLoader.load(self)
  39. except ValueError as e:
  40. if 'text/html content not found in email' in str(e):
  41. # Try plain text
  42. self.unstructured_kwargs["content_source"]="text/plain"
  43. doc = UnstructuredEmailLoader.load(self)
  44. else:
  45. raise
  46. except Exception as e:
  47. # Add file_path to exception message
  48. raise type(e)(f"{self.file_path}: {e}") from e
  49. return doc
  50. # Map file extensions to document loaders and their arguments
  51. LOADER_MAPPING = {
  52. ".csv": (CSVLoader, {}),
  53. # ".docx": (Docx2txtLoader, {}),
  54. ".doc": (UnstructuredWordDocumentLoader, {}),
  55. ".docx": (UnstructuredWordDocumentLoader, {}),
  56. ".enex": (EverNoteLoader, {}),
  57. ".eml": (MyElmLoader, {}),
  58. ".epub": (UnstructuredEPubLoader, {}),
  59. ".html": (UnstructuredHTMLLoader, {}),
  60. ".md": (UnstructuredMarkdownLoader, {}),
  61. ".odt": (UnstructuredODTLoader, {}),
  62. ".pdf": (PyMuPDFLoader, {}),
  63. ".ppt": (UnstructuredPowerPointLoader, {}),
  64. ".pptx": (UnstructuredPowerPointLoader, {}),
  65. ".txt": (TextLoader, {"encoding": "utf8"}),
  66. # Add more mappings for other file extensions and loaders as needed
  67. }
  68. def load_single_document(file_path: str) -> List[Document]:
  69. ext = "." + file_path.rsplit(".", 1)[-1]
  70. if ext in LOADER_MAPPING:
  71. loader_class, loader_args = LOADER_MAPPING[ext]
  72. loader = loader_class(file_path, **loader_args)
  73. return loader.load()
  74. raise ValueError(f"Unsupported file extension '{ext}'")
  75. def load_documents(source_dir: str, ignored_files: List[str] = []) -> List[Document]:
  76. """
  77. Loads all documents from the source documents directory, ignoring specified files
  78. """
  79. all_files = []
  80. for ext in LOADER_MAPPING:
  81. all_files.extend(
  82. glob.glob(os.path.join(source_dir, f"**/*{ext}"), recursive=True)
  83. )
  84. filtered_files = [file_path for file_path in all_files if file_path not in ignored_files]
  85. with Pool(processes=os.cpu_count()) as pool:
  86. results = []
  87. with tqdm(total=len(filtered_files), desc='Loading new documents', ncols=80) as pbar:
  88. for i, docs in enumerate(pool.imap_unordered(load_single_document, filtered_files)):
  89. results.extend(docs)
  90. pbar.update()
  91. return results
  92. def process_documents(ignored_files: List[str] = []) -> List[Document]:
  93. """
  94. Load documents and split in chunks
  95. """
  96. print(f"Loading documents from {source_directory}")
  97. documents = load_documents(source_directory, ignored_files)
  98. if not documents:
  99. print("No new documents to load")
  100. exit(0)
  101. print(f"Loaded {len(documents)} new documents from {source_directory}")
  102. text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
  103. texts = text_splitter.split_documents(documents)
  104. print(f"Split into {len(texts)} chunks of text (max. {chunk_size} tokens each)")
  105. return texts
  106. def does_vectorstore_exist(persist_directory: str) -> bool:
  107. """
  108. Checks if vectorstore exists
  109. """
  110. if os.path.exists(os.path.join(persist_directory, 'index')):
  111. if os.path.exists(os.path.join(persist_directory, 'chroma-collections.parquet')) and os.path.exists(os.path.join(persist_directory, 'chroma-embeddings.parquet')):
  112. list_index_files = glob.glob(os.path.join(persist_directory, 'index/*.bin'))
  113. list_index_files += glob.glob(os.path.join(persist_directory, 'index/*.pkl'))
  114. # At least 3 documents are needed in a working vectorstore
  115. if len(list_index_files) > 3:
  116. return True
  117. return False
  118. def main():
  119. # Create embeddings
  120. embeddings = HuggingFaceEmbeddings(model_name=embeddings_model_name)
  121. if does_vectorstore_exist(persist_directory):
  122. # Update and store locally vectorstore
  123. print(f"Appending to existing vectorstore at {persist_directory}")
  124. db = Chroma(persist_directory=persist_directory, embedding_function=embeddings, client_settings=CHROMA_SETTINGS)
  125. collection = db.get()
  126. texts = process_documents([metadata['source'] for metadata in collection['metadatas']])
  127. print(f"Creating embeddings. May take some minutes...")
  128. db.add_documents(texts)
  129. else:
  130. # Create and store locally vectorstore
  131. print("Creating new vectorstore")
  132. texts = process_documents()
  133. print(f"Creating embeddings. May take some minutes...")
  134. db = Chroma.from_documents(texts, embeddings, persist_directory=persist_directory)
  135. db.persist()
  136. db = None
  137. print(f"Ingestion complete! You can now run privateGPT.py to query your documents")
  138. if __name__ == "__main__":
  139. main()