Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#124] Tests for s3 dataset registration #125

Merged
merged 9 commits into from
Jun 7, 2024
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ test:

.PHONY: pre-post
pre-post:
docker-compose run catalog-trigger python /app/triggers/management/change_streams_pre_and_post.py
docker-compose run catalog-trigger python /app/api/models/management/change_streams_pre_and_post.py
2 changes: 1 addition & 1 deletion api/adapters/hydroshare.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from api.exceptions import RepositoryException
from api.models import schema
from api.models.catalog import DatasetMetadataDOC
from api.models.user import Submission, SubmissionType
from api.models.user import Submission


class Creator(BaseModel):
Expand Down
31 changes: 24 additions & 7 deletions api/adapters/s3.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import boto3
import json
from botocore.client import Config
from http import HTTPStatus

import boto3
from botocore import UNSIGNED
from botocore.client import Config
from botocore.exceptions import ClientError as S3ClientError

from api.adapters.base import AbstractRepositoryMetadataAdapter, AbstractRepositoryRequestHandler
from api.adapters.utils import RepositoryType, register_adapter
from api.exceptions import RepositoryException
from api.models.catalog import DatasetMetadataDOC
from api.models.user import Submission, SubmissionType
from api.models.user import Submission


class _S3RequestHandler(AbstractRepositoryRequestHandler):
Expand All @@ -16,12 +20,25 @@ def get_metadata(self, record_id: str):
file_key = record_id.split("+")[2]

s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED), endpoint_url=endpoint_url)

response = s3.get_object(Bucket=bucket_name, Key=file_key)
json_content = response['Body'].read().decode('utf-8')
try:
response = s3.get_object(Bucket=bucket_name, Key=file_key)
except S3ClientError as ex:
if ex.response["Error"]["Code"] == "NoSuchKey":
raise RepositoryException(
detail=f"Specified metadata file was not found in S3: {bucket_name}/{file_key}",
status_code=HTTPStatus.NOT_FOUND
)
else:
err_msg = f"Error accessing S3 file({bucket_name}/{file_key}): {str(ex)}"
raise RepositoryException(detail=err_msg, status_code=HTTPStatus.BAD_REQUEST)

json_content = response['Body'].read().decode('utf-8')
# Parse the JSON content
data = json.loads(json_content)
try:
data = json.loads(json_content)
except json.JSONDecodeError as ex:
err_msg = f"Invalid JSON content in S3 file ({file_key}). Error: {str(ex)}"
raise RepositoryException(detail=err_msg, status_code=HTTPStatus.BAD_REQUEST)

return data

Expand Down
10 changes: 6 additions & 4 deletions api/models/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ class S3Path(BaseModel):
@property
def identifier(self):
endpoint_url = self.endpoint_url.rstrip("/")
if endpoint_url.endswith("amazonaws.com"):
identifier = f"{endpoint_url}/{self.path}"
else:
identifier = f"{endpoint_url}/{self.bucket}/{self.path}"
identifier = f"{endpoint_url}/{self.bucket}/{self.path}"
return identifier

@property
def fetch_identifier(self):
# This is the identifier that is used to fetch the file from S3
return f"{self.endpoint_url}+{self.bucket}+{self.path}"


class Submission(Document):
title: str = None
Expand Down
26 changes: 19 additions & 7 deletions api/routes/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,18 @@ async def refresh_dataset_from_hydroshare(identifier: str, user: Annotated[User,


@router.put("/repository/s3", response_model=DatasetMetadataDOC)
async def register_s3_dataset(request_model: S3Path, user: Annotated[User, Depends(get_current_user)]):
async def register_s3_dataset(s3_path: S3Path, user: Annotated[User, Depends(get_current_user)]):
"""User provides the path to the S3 object. The metadata is fetched from the s3 object and saved to the catalog."""
path = request_model.path
bucket = request_model.bucket
endpoint_url = request_model.endpoint_url
identifier = f"{endpoint_url}+{bucket}+{path}"

identifier = s3_path.identifier
submission: Submission = user.submission_by_repository(repo_type=RepositoryType.S3, identifier=identifier)
dataset = await _save_to_db(repository_type=RepositoryType.S3, identifier=identifier, user=user,
if submission is not None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="This S3 dataset has already been submitted by this user",
)
fetch_identifier = s3_path.fetch_identifier
dataset = await _save_to_db(repository_type=RepositoryType.S3, identifier=fetch_identifier, user=user,
submission=submission)
return dataset

Expand All @@ -171,7 +175,7 @@ async def create_dataset_s3(
if submission is not None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Dataset metadata record was not found",
detail="This S3 dataset has already been submitted by this user",
)
await document.insert()
submission = document.as_submission()
Expand Down Expand Up @@ -221,11 +225,17 @@ async def _save_to_db(repository_type: RepositoryType, identifier: str, user: Us
adapter = get_adapter_by_type(repository_type=repository_type)
# fetch metadata from repository as catalog dataset
repo_dataset: DatasetMetadataDOC = await _get_repo_meta_as_catalog_record(adapter=adapter, identifier=identifier)
s3_path = None
if repository_type == RepositoryType.S3:
s3_endpoint_url, bucket, path = identifier.split("+")
s3_path = S3Path(endpoint_url=s3_endpoint_url, bucket=bucket, path=path)
identifier = s3_path.identifier
if submission is None:
# new registration
await repo_dataset.insert()
submission = repo_dataset.as_submission()
submission = adapter.update_submission(submission=submission, repo_record_id=identifier)
submission.s3_path = s3_path
user.submissions.append(submission)
await user.save(link_rule=WriteRules.WRITE)
dataset = repo_dataset
Expand All @@ -239,12 +249,14 @@ async def _save_to_db(repository_type: RepositoryType, identifier: str, user: Us
updated_submission = adapter.update_submission(submission=updated_submission, repo_record_id=identifier)
updated_submission.id = submission.id
updated_submission.submitted = submission.submitted
updated_submission.s3_path = s3_path
await updated_submission.replace()
dataset = updated_dataset
submission = updated_submission

dataset = inject_repository_identifier(submission, dataset)
dataset = inject_submission_type(submission, dataset)
dataset = inject_submission_s3_path(submission, dataset)
return dataset


Expand Down
64 changes: 47 additions & 17 deletions tests/test_dataset_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async def test_create_dataset_s3(client_test, dataset_data, test_user_access_tok
s3_path = {
"path": "data/.hs/dataset_metadata.json",
"bucket": "iguide-catalog",
"endpoint_url": "https://iguide-catalog.s3.us-west-2.amazonaws.com/",
"endpoint_url": "https://s3.us-west-2.amazonaws.com/",
}

payload = {
Expand All @@ -86,10 +86,7 @@ async def test_create_dataset_s3(client_test, dataset_data, test_user_access_tok
response = await client_test.post("api/catalog/dataset-s3/", json=payload)
assert response.status_code == 201
ds_metadata = response.json()
if object_store_type == "minio":
expected_repository_identifier = f"{s3_path['endpoint_url']}{s3_path['bucket']}/{s3_path['path']}"
else:
expected_repository_identifier = f"{s3_path['endpoint_url']}{s3_path['path']}"
expected_repository_identifier = f"{s3_path['endpoint_url']}{s3_path['bucket']}/{s3_path['path']}"
assert ds_metadata["repository_identifier"] == expected_repository_identifier
assert ds_metadata["submission_type"] == SubmissionType.S3
assert ds_metadata["s3_path"] == s3_path
Expand Down Expand Up @@ -124,7 +121,7 @@ async def test_update_dataset_s3(client_test, dataset_data, test_user_access_tok
s3_path = {
"path": "data/.hs/dataset_metadata.json",
"bucket": "iguide-catalog",
"endpoint_url": "https://iguide-catalog.s3.us-west-2.amazonaws.com/",
"endpoint_url": "https://s3.us-west-2.amazonaws.com/",
}

payload = {
Expand All @@ -135,12 +132,10 @@ async def test_update_dataset_s3(client_test, dataset_data, test_user_access_tok
response = await client_test.post("api/catalog/dataset-s3/", json=payload)
assert response.status_code == 201
ds_metadata = response.json()
if object_store_type == "minio":
expected_repository_identifier = f"{s3_path['endpoint_url']}{s3_path['bucket']}/{s3_path['path']}"
else:
expected_repository_identifier = f"{s3_path['endpoint_url']}{s3_path['path']}"
expected_repository_identifier = f"{s3_path['endpoint_url']}{s3_path['bucket']}/{s3_path['path']}"
assert ds_metadata["repository_identifier"] == expected_repository_identifier
assert ds_metadata["submission_type"] == SubmissionType.S3

# retrieve the record from the db
record_id = ds_metadata.pop('_id')
response = await client_test.get(f"api/catalog/dataset/{record_id}")
Expand All @@ -160,7 +155,7 @@ async def test_update_dataset_s3(client_test, dataset_data, test_user_access_tok
s3_path = {
"path": "data/.hs/dataset_metadata-updated.json",
"bucket": "iguide-catalog-updated",
"endpoint_url": "https://iguide-catalog-updated.s3.us-west-2.amazonaws.com/",
"endpoint_url": "https://s3.us-west-2.amazonaws.com/",
}

payload = {
Expand All @@ -171,14 +166,12 @@ async def test_update_dataset_s3(client_test, dataset_data, test_user_access_tok
response = await client_test.put(f"api/catalog/dataset-s3/{record_id}", json=payload)
assert response.status_code == 200
ds_metadata = response.json()
if object_store_type == "minio":
expected_repository_identifier = f"{s3_path['endpoint_url']}{s3_path['bucket']}/{s3_path['path']}"
else:
expected_repository_identifier = f"{s3_path['endpoint_url']}{s3_path['path']}"
expected_repository_identifier = f"{s3_path['endpoint_url']}{s3_path['bucket']}/{s3_path['path']}"
assert ds_metadata["repository_identifier"] == expected_repository_identifier
assert ds_metadata["submission_type"] == SubmissionType.S3
assert ds_metadata["s3_path"] == s3_path
assert ds_metadata["name"] == dataset_data['name']

# retrieve the record from the db
record_id = ds_metadata.pop('_id')
response = await client_test.get(f"api/catalog/dataset/{record_id}")
Expand Down Expand Up @@ -350,7 +343,7 @@ async def test_get_datasets_different_submission_types(client_test, dataset_data
s3_path = {
"path": "data/.hs/dataset_metadata.json",
"bucket": "iguide-catalog",
"endpoint_url": "https://iguide-catalog.s3.us-west-2.amazonaws.com/",
"endpoint_url": "https://s3.us-west-2.amazonaws.com/",
}

payload = {
Expand Down Expand Up @@ -402,6 +395,43 @@ async def test_get_datasets_exclude_none(client_test, dataset_data):
assert "measurementTechnique" not in a_property


@pytest.mark.parametrize('object_store_type', ['minio', 's3'])
@pytest.mark.asyncio
async def test_register_minio_s3_dataset(client_test, object_store_type):
"""Testing registering metadata for a generic dataset stored on minIO and S3"""

if object_store_type == "minio":
# set the path to the generic metadata file on minIO s3
s3_path = {
"path": "data/.hs/dataset_metadata.json",
"bucket": "catalog-api-test",
"endpoint_url": "https://api.minio.cuahsi.io/",
}
else:
# set the path to the generic metadata file on AWS s3
s3_path = {
"path": "data/.hs/generic/dataset_metadata.json",
"bucket": "iguide-catalog",
"endpoint_url": "https://s3.us-west-2.amazonaws.com/",
}

dataset_response = await client_test.put(
"api/catalog/repository/s3", json=s3_path
)

assert dataset_response.status_code == 200
ds_metadata = dataset_response.json()
expected_repository_identifier = f"{s3_path['endpoint_url']}{s3_path['bucket']}/{s3_path['path']}"
assert ds_metadata["repository_identifier"] == expected_repository_identifier
assert ds_metadata["submission_type"] == SubmissionType.S3
assert ds_metadata["s3_path"] == s3_path

# retrieve the record from the db
record_id = ds_metadata.get('_id')
response = await client_test.get(f"api/catalog/dataset/{record_id}")
assert response.status_code == 200


@pytest.mark.parametrize("multiple", [True, False])
@pytest.mark.asyncio
async def test_get_submissions_1(client_test, dataset_data, multiple):
Expand Down Expand Up @@ -451,7 +481,7 @@ async def test_get_submissions_2(client_test, dataset_data):
s3_path = {
"path": "data/.hs/dataset_metadata.json",
"bucket": "iguide-catalog",
"endpoint_url": "https://iguide-catalog.s3.us-west-2.amazonaws.com/",
"endpoint_url": "https://s3.us-west-2.amazonaws.com/",
}

payload = {
Expand Down
4 changes: 2 additions & 2 deletions triggers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ async def do_daily():
else:
# couldn't retrieve matching repository record
await db["discovery"].delete_one({"_id": submission.identifier})
except:
logger.exception(f"Failed to collect submission {submission.url}")
except Exception as err:
logger.exception(f"Failed to collect submission {submission.url}\n Error: {str(err)}")


def main():
Expand Down
4 changes: 2 additions & 2 deletions triggers/update_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ async def _main():
while True:
try:
await watch_catalog(db)
except:
logger.exception("Submission Watch Task failed, restarting the task")
except Exception as exp:
logger.exception(f"Submission Watch Task failed.\n Error:{str(exp)}\n Restarting the task")
finally:
db.close()

Expand Down
4 changes: 2 additions & 2 deletions triggers/update_typeahead.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ async def _main():
while True:
try:
await watch_discovery(db)
except:
logger.exception("Discovery Watch Task failed, restarting the task")
except Exception as exp:
logger.exception(f"Discovery Watch Task failed.\n Error:{str(exp)}\n Restarting the task")
finally:
db.close()

Expand Down
Loading