provider.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. import os
  2. import shutil
  3. from abc import ABC, abstractmethod
  4. from typing import BinaryIO, Tuple
  5. import boto3
  6. from botocore.exceptions import ClientError
  7. from open_webui.config import (
  8. S3_ACCESS_KEY_ID,
  9. S3_BUCKET_NAME,
  10. S3_ENDPOINT_URL,
  11. S3_REGION_NAME,
  12. S3_SECRET_ACCESS_KEY,
  13. GCS_PROJECT_ID,
  14. GCS_BUCKET_NAME,
  15. STORAGE_PROVIDER,
  16. UPLOAD_DIR,
  17. )
  18. from google.cloud import storage
  19. from google.cloud.exceptions import GoogleCloudError, NotFound
  20. from open_webui.constants import ERROR_MESSAGES
  21. class StorageProvider(ABC):
  22. @abstractmethod
  23. def get_file(self, file_path: str) -> str:
  24. pass
  25. @abstractmethod
  26. def upload_file(self, file: BinaryIO, filename: str) -> Tuple[bytes, str]:
  27. pass
  28. @abstractmethod
  29. def delete_all_files(self) -> None:
  30. pass
  31. @abstractmethod
  32. def delete_file(self, file_path: str) -> None:
  33. pass
  34. class LocalStorageProvider(StorageProvider):
  35. @staticmethod
  36. def upload_file(file: BinaryIO, filename: str) -> Tuple[bytes, str]:
  37. contents = file.read()
  38. if not contents:
  39. raise ValueError(ERROR_MESSAGES.EMPTY_CONTENT)
  40. file_path = f"{UPLOAD_DIR}/{filename}"
  41. with open(file_path, "wb") as f:
  42. f.write(contents)
  43. return contents, file_path
  44. @staticmethod
  45. def get_file(file_path: str) -> str:
  46. """Handles downloading of the file from local storage."""
  47. return file_path
  48. @staticmethod
  49. def delete_file(file_path: str) -> None:
  50. """Handles deletion of the file from local storage."""
  51. filename = file_path.split("/")[-1]
  52. file_path = f"{UPLOAD_DIR}/{filename}"
  53. if os.path.isfile(file_path):
  54. os.remove(file_path)
  55. else:
  56. print(f"File {file_path} not found in local storage.")
  57. @staticmethod
  58. def delete_all_files() -> None:
  59. """Handles deletion of all files from local storage."""
  60. if os.path.exists(UPLOAD_DIR):
  61. for filename in os.listdir(UPLOAD_DIR):
  62. file_path = os.path.join(UPLOAD_DIR, filename)
  63. try:
  64. if os.path.isfile(file_path) or os.path.islink(file_path):
  65. os.unlink(file_path) # Remove the file or link
  66. elif os.path.isdir(file_path):
  67. shutil.rmtree(file_path) # Remove the directory
  68. except Exception as e:
  69. print(f"Failed to delete {file_path}. Reason: {e}")
  70. else:
  71. print(f"Directory {UPLOAD_DIR} not found in local storage.")
  72. class S3StorageProvider(StorageProvider):
  73. def __init__(self):
  74. self.s3_client = boto3.client(
  75. "s3",
  76. region_name=S3_REGION_NAME,
  77. endpoint_url=S3_ENDPOINT_URL,
  78. aws_access_key_id=S3_ACCESS_KEY_ID,
  79. aws_secret_access_key=S3_SECRET_ACCESS_KEY,
  80. )
  81. self.bucket_name = S3_BUCKET_NAME
  82. def upload_file(self, file: BinaryIO, filename: str) -> Tuple[bytes, str]:
  83. """Handles uploading of the file to S3 storage."""
  84. _, file_path = LocalStorageProvider.upload_file(file, filename)
  85. try:
  86. self.s3_client.upload_file(file_path, self.bucket_name, filename)
  87. return (
  88. open(file_path, "rb").read(),
  89. "s3://" + self.bucket_name + "/" + filename,
  90. )
  91. except ClientError as e:
  92. raise RuntimeError(f"Error uploading file to S3: {e}")
  93. def get_file(self, file_path: str) -> str:
  94. """Handles downloading of the file from S3 storage."""
  95. try:
  96. bucket_name, key = file_path.split("//")[1].split("/")
  97. local_file_path = f"{UPLOAD_DIR}/{key}"
  98. self.s3_client.download_file(bucket_name, key, local_file_path)
  99. return local_file_path
  100. except ClientError as e:
  101. raise RuntimeError(f"Error downloading file from S3: {e}")
  102. def delete_file(self, file_path: str) -> None:
  103. """Handles deletion of the file from S3 storage."""
  104. filename = file_path.split("/")[-1]
  105. try:
  106. self.s3_client.delete_object(Bucket=self.bucket_name, Key=filename)
  107. except ClientError as e:
  108. raise RuntimeError(f"Error deleting file from S3: {e}")
  109. # Always delete from local storage
  110. LocalStorageProvider.delete_file(file_path)
  111. def delete_all_files(self) -> None:
  112. """Handles deletion of all files from S3 storage."""
  113. try:
  114. response = self.s3_client.list_objects_v2(Bucket=self.bucket_name)
  115. if "Contents" in response:
  116. for content in response["Contents"]:
  117. self.s3_client.delete_object(
  118. Bucket=self.bucket_name, Key=content["Key"]
  119. )
  120. except ClientError as e:
  121. raise RuntimeError(f"Error deleting all files from S3: {e}")
  122. # Always delete from local storage
  123. LocalStorageProvider.delete_all_files()
  124. class GCSStorageProvider(StorageProvider):
  125. def __init__(self):
  126. self.gcs_client = storage.Client(project=GCS_PROJECT_ID)
  127. self.bucket_name = self.gcs_client.bucket(GCS_BUCKET_NAME)
  128. def upload_file(self, file: BinaryIO, filename: str):
  129. """Handles uploading of the file to GCS storage."""
  130. contents, _ = LocalStorageProvider.upload_file(file, filename)
  131. try:
  132. # Get the blob (object in the bucket)
  133. blob = self.bucket_name.blob(filename)
  134. # Upload the file to the bucket
  135. blob.upload_from_file(BytesIO(contents))
  136. print("file successfully uploaded")
  137. except GoogleCloudError as e:
  138. raise RuntimeError(f"Error uploading file to GCS: {e}")
  139. def get_file(self, file_path:str) -> str:
  140. """Handles downloading of the file from GCS storage."""
  141. try:
  142. local_file_path = f"{UPLOAD_DIR}/{file_path}"
  143. # Get the blob (object in the bucket)
  144. blob = self.bucket_name.blob(file_path)
  145. # Download the file to a local destination
  146. blob.download_to_filename(local_file_path)
  147. except NotFound as e:
  148. raise RuntimeError(f"Error downloading file from GCS: {e}")
  149. def delete_file(self, file_path:str) -> None:
  150. """Handles deletion of the file from GCS storage."""
  151. try:
  152. # Get the blob (object in the bucket)
  153. blob = self.bucket_name.blob(file_path)
  154. # Delete the file
  155. blob.delete()
  156. except NotFound as e:
  157. raise RuntimeError(f"Error deleting file from GCS: {e}")
  158. # Always delete from local storage
  159. LocalStorageProvider.delete_file(file_path)
  160. def delete_all_files(self) -> None:
  161. """Handles deletion of all files from GCS storage."""
  162. try:
  163. # List all objects in the bucket
  164. blobs = self.bucket_name.list_blobs()
  165. # Delete all files
  166. for blob in blobs:
  167. blob.delete()
  168. except NotFound as e:
  169. raise RuntimeError(f"Error deleting all files from GCS: {e}")
  170. # Always delete from local storage
  171. LocalStorageProvider.delete_all_files()
  172. def get_storage_provider(storage_provider: str):
  173. if storage_provider == "local":
  174. Storage = LocalStorageProvider()
  175. elif storage_provider == "s3":
  176. Storage = S3StorageProvider()
  177. elif storage_provider == "gcs":
  178. Storage = GCSStorageProvider()
  179. else:
  180. raise RuntimeError(f"Unsupported storage provider: {storage_provider}")
  181. return Storage
  182. Storage = get_storage_provider(STORAGE_PROVIDER)