provider.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. import os
  2. import shutil
  3. from abc import ABC, abstractmethod
  4. from typing import BinaryIO, Tuple
  5. import boto3
  6. from botocore.exceptions import ClientError
  7. from open_webui.config import (
  8. S3_ACCESS_KEY_ID,
  9. S3_BUCKET_NAME,
  10. S3_ENDPOINT_URL,
  11. S3_REGION_NAME,
  12. S3_SECRET_ACCESS_KEY,
  13. STORAGE_PROVIDER,
  14. UPLOAD_DIR,
  15. )
  16. from google.cloud import storage
  17. from open_webui.constants import ERROR_MESSAGES
  18. class StorageProvider(ABC):
  19. @abstractmethod
  20. def get_file(self, file_path: str) -> str:
  21. pass
  22. @abstractmethod
  23. def upload_file(self, file: BinaryIO, filename: str) -> Tuple[bytes, str]:
  24. pass
  25. @abstractmethod
  26. def delete_all_files(self) -> None:
  27. pass
  28. @abstractmethod
  29. def delete_file(self, file_path: str) -> None:
  30. pass
  31. class LocalStorageProvider(StorageProvider):
  32. @staticmethod
  33. def upload_file(file: BinaryIO, filename: str) -> Tuple[bytes, str]:
  34. contents = file.read()
  35. if not contents:
  36. raise ValueError(ERROR_MESSAGES.EMPTY_CONTENT)
  37. file_path = f"{UPLOAD_DIR}/{filename}"
  38. with open(file_path, "wb") as f:
  39. f.write(contents)
  40. return contents, file_path
  41. @staticmethod
  42. def get_file(file_path: str) -> str:
  43. """Handles downloading of the file from local storage."""
  44. return file_path
  45. @staticmethod
  46. def delete_file(file_path: str) -> None:
  47. """Handles deletion of the file from local storage."""
  48. filename = file_path.split("/")[-1]
  49. file_path = f"{UPLOAD_DIR}/{filename}"
  50. if os.path.isfile(file_path):
  51. os.remove(file_path)
  52. else:
  53. print(f"File {file_path} not found in local storage.")
  54. @staticmethod
  55. def delete_all_files() -> None:
  56. """Handles deletion of all files from local storage."""
  57. if os.path.exists(UPLOAD_DIR):
  58. for filename in os.listdir(UPLOAD_DIR):
  59. file_path = os.path.join(UPLOAD_DIR, filename)
  60. try:
  61. if os.path.isfile(file_path) or os.path.islink(file_path):
  62. os.unlink(file_path) # Remove the file or link
  63. elif os.path.isdir(file_path):
  64. shutil.rmtree(file_path) # Remove the directory
  65. except Exception as e:
  66. print(f"Failed to delete {file_path}. Reason: {e}")
  67. else:
  68. print(f"Directory {UPLOAD_DIR} not found in local storage.")
  69. class S3StorageProvider(StorageProvider):
  70. def __init__(self):
  71. self.s3_client = boto3.client(
  72. "s3",
  73. region_name=S3_REGION_NAME,
  74. endpoint_url=S3_ENDPOINT_URL,
  75. aws_access_key_id=S3_ACCESS_KEY_ID,
  76. aws_secret_access_key=S3_SECRET_ACCESS_KEY,
  77. )
  78. self.bucket_name = S3_BUCKET_NAME
  79. def upload_file(self, file: BinaryIO, filename: str) -> Tuple[bytes, str]:
  80. """Handles uploading of the file to S3 storage."""
  81. _, file_path = LocalStorageProvider.upload_file(file, filename)
  82. try:
  83. self.s3_client.upload_file(file_path, self.bucket_name, filename)
  84. return (
  85. open(file_path, "rb").read(),
  86. "s3://" + self.bucket_name + "/" + filename,
  87. )
  88. except ClientError as e:
  89. raise RuntimeError(f"Error uploading file to S3: {e}")
  90. def get_file(self, file_path: str) -> str:
  91. """Handles downloading of the file from S3 storage."""
  92. try:
  93. bucket_name, key = file_path.split("//")[1].split("/")
  94. local_file_path = f"{UPLOAD_DIR}/{key}"
  95. self.s3_client.download_file(bucket_name, key, local_file_path)
  96. return local_file_path
  97. except ClientError as e:
  98. raise RuntimeError(f"Error downloading file from S3: {e}")
  99. def delete_file(self, file_path: str) -> None:
  100. """Handles deletion of the file from S3 storage."""
  101. filename = file_path.split("/")[-1]
  102. try:
  103. self.s3_client.delete_object(Bucket=self.bucket_name, Key=filename)
  104. except ClientError as e:
  105. raise RuntimeError(f"Error deleting file from S3: {e}")
  106. # Always delete from local storage
  107. LocalStorageProvider.delete_file(file_path)
  108. def delete_all_files(self) -> None:
  109. """Handles deletion of all files from S3 storage."""
  110. try:
  111. response = self.s3_client.list_objects_v2(Bucket=self.bucket_name)
  112. if "Contents" in response:
  113. for content in response["Contents"]:
  114. self.s3_client.delete_object(
  115. Bucket=self.bucket_name, Key=content["Key"]
  116. )
  117. except ClientError as e:
  118. raise RuntimeError(f"Error deleting all files from S3: {e}")
  119. # Always delete from local storage
  120. LocalStorageProvider.delete_all_files()
  121. class GCSStorageProvider(StorageProvider):
  122. def __init__(self):
  123. self.gcs_client = storage.Client()
  124. self.bucket_name = GCS_BUCKET_NAME
  125. def get_storage_provider(storage_provider: str):
  126. if storage_provider == "local":
  127. Storage = LocalStorageProvider()
  128. elif storage_provider == "s3":
  129. Storage = S3StorageProvider()
  130. else:
  131. raise RuntimeError(f"Unsupported storage provider: {storage_provider}")
  132. return Storage
  133. Storage = get_storage_provider(STORAGE_PROVIDER)