provider.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. import os
  2. import boto3
  3. from botocore.exceptions import ClientError
  4. import shutil
  5. from typing import BinaryIO, Tuple, Optional, Union
  6. from open_webui.constants import ERROR_MESSAGES
  7. from open_webui.config import (
  8. STORAGE_PROVIDER,
  9. S3_ACCESS_KEY_ID,
  10. S3_SECRET_ACCESS_KEY,
  11. S3_BUCKET_NAME,
  12. S3_REGION_NAME,
  13. S3_ENDPOINT_URL,
  14. UPLOAD_DIR,
  15. )
  16. import boto3
  17. from botocore.exceptions import ClientError
  18. from typing import BinaryIO, Tuple, Optional
  19. class StorageProvider:
  20. def __init__(self, provider: Optional[str] = None):
  21. self.storage_provider: str = provider or STORAGE_PROVIDER
  22. self.s3_client = None
  23. self.s3_bucket_name: Optional[str] = None
  24. if self.storage_provider == "s3":
  25. self._initialize_s3()
  26. def _initialize_s3(self) -> None:
  27. """Initializes the S3 client and bucket name if using S3 storage."""
  28. self.s3_client = boto3.client(
  29. "s3",
  30. region_name=S3_REGION_NAME,
  31. endpoint_url=S3_ENDPOINT_URL,
  32. aws_access_key_id=S3_ACCESS_KEY_ID,
  33. aws_secret_access_key=S3_SECRET_ACCESS_KEY,
  34. )
  35. self.bucket_name = S3_BUCKET_NAME
  36. def _upload_to_s3(self, file_path: str, filename: str) -> Tuple[bytes, str]:
  37. """Handles uploading of the file to S3 storage."""
  38. if not self.s3_client:
  39. raise RuntimeError("S3 Client is not initialized.")
  40. try:
  41. self.s3_client.upload_file(file_path, self.bucket_name, filename)
  42. return (
  43. open(file_path, "rb").read(),
  44. "s3://" + self.bucket_name + "/" + filename,
  45. )
  46. except ClientError as e:
  47. raise RuntimeError(f"Error uploading file to S3: {e}")
  48. def _upload_to_local(self, contents: bytes, filename: str) -> Tuple[bytes, str]:
  49. """Handles uploading of the file to local storage."""
  50. file_path = f"{UPLOAD_DIR}/{filename}"
  51. with open(file_path, "wb") as f:
  52. f.write(contents)
  53. return contents, file_path
  54. def _get_file_from_s3(self, file_path: str) -> str:
  55. """Handles downloading of the file from S3 storage."""
  56. if not self.s3_client:
  57. raise RuntimeError("S3 Client is not initialized.")
  58. try:
  59. bucket_name, key = file_path.split("//")[1].split("/")
  60. local_file_path = f"{UPLOAD_DIR}/{key}"
  61. self.s3_client.download_file(bucket_name, key, local_file_path)
  62. return local_file_path
  63. except ClientError as e:
  64. raise RuntimeError(f"Error downloading file from S3: {e}")
  65. def _get_file_from_local(self, file_path: str) -> str:
  66. """Handles downloading of the file from local storage."""
  67. return file_path
  68. def _delete_from_s3(self, filename: str) -> None:
  69. """Handles deletion of the file from S3 storage."""
  70. if not self.s3_client:
  71. raise RuntimeError("S3 Client is not initialized.")
  72. try:
  73. self.s3_client.delete_object(Bucket=self.bucket_name, Key=filename)
  74. except ClientError as e:
  75. raise RuntimeError(f"Error deleting file from S3: {e}")
  76. def _delete_from_local(self, filename: str) -> None:
  77. """Handles deletion of the file from local storage."""
  78. file_path = f"{UPLOAD_DIR}/{filename}"
  79. if os.path.isfile(file_path):
  80. os.remove(file_path)
  81. else:
  82. print(f"File {file_path} not found in local storage.")
  83. def _delete_all_from_s3(self) -> None:
  84. """Handles deletion of all files from S3 storage."""
  85. if not self.s3_client:
  86. raise RuntimeError("S3 Client is not initialized.")
  87. try:
  88. response = self.s3_client.list_objects_v2(Bucket=self.bucket_name)
  89. if "Contents" in response:
  90. for content in response["Contents"]:
  91. self.s3_client.delete_object(
  92. Bucket=self.bucket_name, Key=content["Key"]
  93. )
  94. except ClientError as e:
  95. raise RuntimeError(f"Error deleting all files from S3: {e}")
  96. def _delete_all_from_local(self) -> None:
  97. """Handles deletion of all files from local storage."""
  98. if os.path.exists(UPLOAD_DIR):
  99. for filename in os.listdir(UPLOAD_DIR):
  100. file_path = os.path.join(UPLOAD_DIR, filename)
  101. try:
  102. if os.path.isfile(file_path) or os.path.islink(file_path):
  103. os.unlink(file_path) # Remove the file or link
  104. elif os.path.isdir(file_path):
  105. shutil.rmtree(file_path) # Remove the directory
  106. except Exception as e:
  107. print(f"Failed to delete {file_path}. Reason: {e}")
  108. else:
  109. print(f"Directory {UPLOAD_DIR} not found in local storage.")
  110. def upload_file(self, file: BinaryIO, filename: str) -> Tuple[bytes, str]:
  111. """Uploads a file either to S3 or the local file system."""
  112. contents = file.read()
  113. if not contents:
  114. raise ValueError(ERROR_MESSAGES.EMPTY_CONTENT)
  115. contents, file_path = self._upload_to_local(contents, filename)
  116. if self.storage_provider == "s3":
  117. return self._upload_to_s3(file_path, filename)
  118. return contents, file_path
  119. def get_file(self, file_path: str) -> str:
  120. """Downloads a file either from S3 or the local file system and returns the file path."""
  121. if self.storage_provider == "s3":
  122. return self._get_file_from_s3(file_path)
  123. return self._get_file_from_local(file_path)
  124. def delete_file(self, filename: str) -> None:
  125. """Deletes a file either from S3 or the local file system."""
  126. if self.storage_provider == "s3":
  127. self._delete_from_s3(filename)
  128. # Always delete from local storage
  129. self._delete_from_local(filename)
  130. def delete_all_files(self) -> None:
  131. """Deletes all files from the storage."""
  132. if self.storage_provider == "s3":
  133. self._delete_all_from_s3()
  134. # Always delete from local storage
  135. self._delete_all_from_local()
  136. Storage = StorageProvider(provider=STORAGE_PROVIDER)