|
@@ -1,10 +1,12 @@
|
|
|
import io
|
|
|
-
|
|
|
+import os
|
|
|
import boto3
|
|
|
import pytest
|
|
|
from botocore.exceptions import ClientError
|
|
|
from moto import mock_aws
|
|
|
from open_webui.storage import provider
|
|
|
+from gcp_storage_emulator.server import create_server
|
|
|
+from google.cloud import storage
|
|
|
|
|
|
|
|
|
def mock_upload_dir(monkeypatch, tmp_path):
|
|
@@ -19,6 +21,7 @@ def test_imports():
|
|
|
provider.StorageProvider
|
|
|
provider.LocalStorageProvider
|
|
|
provider.S3StorageProvider
|
|
|
+ provider.GCSStorageProvider
|
|
|
provider.Storage
|
|
|
|
|
|
|
|
@@ -27,6 +30,8 @@ def test_get_storage_provider():
|
|
|
assert isinstance(Storage, provider.LocalStorageProvider)
|
|
|
Storage = provider.get_storage_provider("s3")
|
|
|
assert isinstance(Storage, provider.S3StorageProvider)
|
|
|
+ Storage = provider.get_storage_provider("gcs")
|
|
|
+ assert isinstance(Storage, provider.GCSStorageProvider)
|
|
|
with pytest.raises(RuntimeError):
|
|
|
provider.get_storage_provider("invalid")
|
|
|
|
|
@@ -42,6 +47,7 @@ def test_class_instantiation():
|
|
|
Test()
|
|
|
provider.LocalStorageProvider()
|
|
|
provider.S3StorageProvider()
|
|
|
+ provider.GCSStorageProvider()
|
|
|
|
|
|
|
|
|
class TestLocalStorageProvider:
|
|
@@ -171,3 +177,105 @@ class TestS3StorageProvider:
|
|
|
self.Storage.delete_all_files()
|
|
|
assert not (upload_dir / self.filename).exists()
|
|
|
assert not (upload_dir / self.filename_extra).exists()
|
|
|
+
|
|
|
+class TestGCSStorageProvider:
|
|
|
+ Storage = provider.GCSStorageProvider()
|
|
|
+ Storage.bucket_name = "my-bucket"
|
|
|
+ file_content = b"test content"
|
|
|
+ filename = "test.txt"
|
|
|
+ filename_extra = "test_exyta.txt"
|
|
|
+ file_bytesio_empty = io.BytesIO()
|
|
|
+
|
|
|
+ @pytest.fixture
|
|
|
+ def setup(self):
|
|
|
+ host, port = "localhost", 9023
|
|
|
+
|
|
|
+ server = create_server(host, port, in_memory=True)
|
|
|
+ server.start()
|
|
|
+ os.environ["STORAGE_EMULATOR_HOST"] = f"http://{host}:{port}"
|
|
|
+
|
|
|
+ gcs_client = storage.Client()
|
|
|
+ bucket = gcs_client.bucket(self.Storage.bucket_name)
|
|
|
+ bucket.create()
|
|
|
+ yield gcs_client, bucket
|
|
|
+ bucket.delete(force=True)
|
|
|
+ server.stop()
|
|
|
+
|
|
|
+ def test_upload_file(self, monkeypatch, tmp_path, setup):
|
|
|
+ upload_dir = mock_upload_dir(monkeypatch, tmp_path)
|
|
|
+ # test error if bucket does not exist
|
|
|
+ with pytest.raises(Exception):
|
|
|
+ self.Storage.upload_file(io.BytesIO(self.file_content), self.filename)
|
|
|
+ # creates bucket and test upload_file method, downloads the file and confirms contents
|
|
|
+ self.Storage.gcs_client, self.Storage.bucket = setup
|
|
|
+ contents, gcs_file_path = self.Storage.upload_file(
|
|
|
+ io.BytesIO(self.file_content), self.filename
|
|
|
+ )
|
|
|
+ object = self.Storage.bucket.blob(self.filename)
|
|
|
+ assert self.file_content == object.download_as_bytes()
|
|
|
+ # local checks
|
|
|
+ assert (upload_dir / self.filename).exists()
|
|
|
+ assert (upload_dir / self.filename).read_bytes() == self.file_content
|
|
|
+ assert contents == self.file_content
|
|
|
+ assert gcs_file_path == "gs://" + self.Storage.bucket_name + "/" + self.filename
|
|
|
+ # test error if file is empty
|
|
|
+ with pytest.raises(ValueError):
|
|
|
+ self.Storage.upload_file(self.file_bytesio_empty, self.filename)
|
|
|
+
|
|
|
+ # def test_get_file(self, monkeypatch, tmp_path):
|
|
|
+ # upload_dir = mock_upload_dir(monkeypatch, tmp_path)
|
|
|
+ # self.s3_client.create_bucket(Bucket=self.Storage.bucket_name)
|
|
|
+ # contents, s3_file_path = self.Storage.upload_file(
|
|
|
+ # io.BytesIO(self.file_content), self.filename
|
|
|
+ # )
|
|
|
+ # file_path = self.Storage.get_file(s3_file_path)
|
|
|
+ # assert file_path == str(upload_dir / self.filename)
|
|
|
+ # assert (upload_dir / self.filename).exists()
|
|
|
+
|
|
|
+ # def test_delete_file(self, monkeypatch, tmp_path):
|
|
|
+ # upload_dir = mock_upload_dir(monkeypatch, tmp_path)
|
|
|
+ # self.s3_client.create_bucket(Bucket=self.Storage.bucket_name)
|
|
|
+ # contents, s3_file_path = self.Storage.upload_file(
|
|
|
+ # io.BytesIO(self.file_content), self.filename
|
|
|
+ # )
|
|
|
+ # assert (upload_dir / self.filename).exists()
|
|
|
+ # self.Storage.delete_file(s3_file_path)
|
|
|
+ # assert not (upload_dir / self.filename).exists()
|
|
|
+ # with pytest.raises(ClientError) as exc:
|
|
|
+ # self.s3_client.Object(self.Storage.bucket_name, self.filename).load()
|
|
|
+ # error = exc.value.response["Error"]
|
|
|
+ # assert error["Code"] == "404"
|
|
|
+ # assert error["Message"] == "Not Found"
|
|
|
+
|
|
|
+ # def test_delete_all_files(self, monkeypatch, tmp_path):
|
|
|
+ # upload_dir = mock_upload_dir(monkeypatch, tmp_path)
|
|
|
+ # # create 2 files
|
|
|
+ # self.s3_client.create_bucket(Bucket=self.Storage.bucket_name)
|
|
|
+ # self.Storage.upload_file(io.BytesIO(self.file_content), self.filename)
|
|
|
+ # object = self.s3_client.Object(self.Storage.bucket_name, self.filename)
|
|
|
+ # assert self.file_content == object.get()["Body"].read()
|
|
|
+ # assert (upload_dir / self.filename).exists()
|
|
|
+ # assert (upload_dir / self.filename).read_bytes() == self.file_content
|
|
|
+ # self.Storage.upload_file(io.BytesIO(self.file_content), self.filename_extra)
|
|
|
+ # object = self.s3_client.Object(self.Storage.bucket_name, self.filename_extra)
|
|
|
+ # assert self.file_content == object.get()["Body"].read()
|
|
|
+ # assert (upload_dir / self.filename).exists()
|
|
|
+ # assert (upload_dir / self.filename).read_bytes() == self.file_content
|
|
|
+
|
|
|
+ # self.Storage.delete_all_files()
|
|
|
+ # assert not (upload_dir / self.filename).exists()
|
|
|
+ # with pytest.raises(ClientError) as exc:
|
|
|
+ # self.s3_client.Object(self.Storage.bucket_name, self.filename).load()
|
|
|
+ # error = exc.value.response["Error"]
|
|
|
+ # assert error["Code"] == "404"
|
|
|
+ # assert error["Message"] == "Not Found"
|
|
|
+ # assert not (upload_dir / self.filename_extra).exists()
|
|
|
+ # with pytest.raises(ClientError) as exc:
|
|
|
+ # self.s3_client.Object(self.Storage.bucket_name, self.filename_extra).load()
|
|
|
+ # error = exc.value.response["Error"]
|
|
|
+ # assert error["Code"] == "404"
|
|
|
+ # assert error["Message"] == "Not Found"
|
|
|
+
|
|
|
+ # self.Storage.delete_all_files()
|
|
|
+ # assert not (upload_dir / self.filename).exists()
|
|
|
+ # assert not (upload_dir / self.filename_extra).exists()
|