Skip to content

Commit

Permalink
transform: mark missing experiments vocab for curation
Browse files Browse the repository at this point in the history
  • Loading branch information
kpsherva committed Oct 25, 2024
1 parent cfbbcd4 commit a05bc57
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 86 deletions.
2 changes: 1 addition & 1 deletion cds_migrator_kit/rdm/migration/streams.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ records:
transform:
files_dump_dir: cds_migrator_kit/rdm/migration/data/summer_student_reports/files/
missing_users: cds_migrator_kit/rdm/migration/data/users
community_id: 463a8eb6-c2f7-4715-83e3-49fad9595b40
community_id: 9d5c9f2a-9221-4bfa-85cd-bb3736a779b8
154 changes: 71 additions & 83 deletions cds_migrator_kit/rdm/migration/transform/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
ManualImportRequired,
CDSMigrationException,
MissingRequiredField,
RecordAffiliationFlagged,
RecordFlaggedCuration,
)
from cds_migrator_kit.records.log import RDMJsonLogger
from cds_rdm.models import CDSMigrationAffiliationMapping
Expand All @@ -49,6 +49,23 @@
cli_logger = logging.getLogger("migrator")


def search_vocabulary(term, vocab_type):
service = current_service_registry.get("vocabularies")
try:
vocabulary_result = service.search(
system_identity, type=vocab_type, q=f"{term}"
).to_dict()
return vocabulary_result
except RequestError:
raise UnexpectedValue(
subfield="a",
value=term,
field=vocab_type,
message=f"Vocabulary {vocab_type} term {term} not valid search phrase.",
stage="vocabulary match",
)


class CDSToRDMRecordEntry(RDMRecordEntry):
"""Transform CDS record to RDM record."""

Expand Down Expand Up @@ -95,9 +112,6 @@ def _recid(self, record_dump):
def _bucket_id(self, json_entry):
return

def _custom_fields(self, json_entry):
return {}

def _id(self, entry):
return

Expand Down Expand Up @@ -214,15 +228,15 @@ def _match_affiliation(self, affiliation_name):
elif match.ror_exact_match:
return {"id": normalize_ror(match.ror_exact_match)}
elif match.ror_not_exact_match:
raise RecordAffiliationFlagged(
raise RecordFlaggedCuration(
subfield="u",
value={"id": normalize_ror(match.ror_not_exact_match)},
field="author",
message=f"Affiliation {normalize_ror(match.ror_not_exact_match)} not found as an exact match, ROR id should be checked.",
stage="vocabulary match",
)
else:
raise RecordAffiliationFlagged(
raise RecordFlaggedCuration(
subfield="u",
value={"name": affiliation_name},
field="author",
Expand All @@ -240,7 +254,7 @@ def creator_affiliations(creator):
try:
affiliation = self._match_affiliation(affiliation_name)
transformed_aff.append(affiliation)
except RecordAffiliationFlagged as exc:
except RecordFlaggedCuration as exc:
# Save not exact match affiliation and reraise to flag the record
RDMJsonLogger().add_success_state(
json_entry["recid"],
Expand Down Expand Up @@ -292,102 +306,57 @@ def _resource_type(entry):
# filter empty keys
return {k: v for k, v in metadata.items() if v}

def _custom_fields(self, json_entry):

experiments = json_entry.get("custom_fields", {}).get("cern:experiments")
custom_fields = {
"cern:experiments": [],
"cern:departments": [],
"cern:accelerators": [],
"cern:projects": [],
"cern:facilities": [],
"cern:studies": [],
}

if experiments:
vocab_type = "experiments"
service = current_service_registry.get("vocabularies")
def _custom_fields(self, json_entry, json_output):

def field_experiments(record_json, custom_fields_dict):
experiments = record_json.get("custom_fields", {}).get("cern:experiments", [])
for experiment in experiments:
try:
vocabulary_result = service.search(
system_identity, type=vocab_type, q=f"{experiment}"
).to_dict()
except RequestError:
raise UnexpectedValue(
subfield="a",
value=experiment,
field="experiments",
message=f"Experiment {experiment} " f"not valid search phrase.",
stage="vocabulary match",
)
if vocabulary_result["hits"]["total"]:
result = search_vocabulary(experiment, "experiments")

custom_fields["cern:experiments"].append(
{"id": vocabulary_result["hits"]["hits"][0]["id"]}
if result["hits"]["total"]:
custom_fields_dict["cern:experiments"].append(
{"id": result["hits"]["hits"][0]["id"]}
)

else:
raise UnexpectedValue(
subfield="a",
subj = json_output["metadata"].get("subjects", [])
subj.append({"subject": experiment})
json_output["metadata"]["subjects"] = subj
raise RecordFlaggedCuration(
subfield="u",
value=experiment,
field="experiment",
message=f"Experiment {experiment} not found.",
field="author",
message=f"Experiment {experiment} not found, added as a subject",
stage="vocabulary match",
)

departments = json_entry.get("custom_fields", {}).get("cern:departments")
if departments:
vocab_type = "departments"
service = current_service_registry.get("vocabularies")

def field_departments(record_json, custom_fields_dict):
departments = record_json.get("custom_fields", {}).get("cern:departments", [])
for department in departments:
try:
vocabulary_result = service.search(
system_identity, type=vocab_type, q=f"{department}"
).to_dict()
except RequestError:
raise UnexpectedValue(
subfield="a",
value=department,
field="experiment",
message=f"Department {department} " f"not valid search phrase.",
stage="vocabulary match",
)
if vocabulary_result["hits"]["total"]:
custom_fields["cern:departments"].append(
{"id": vocabulary_result["hits"]["hits"][0]["id"]}
result = search_vocabulary(department, "departments")
if result["hits"]["total"]:
custom_fields_dict["cern:departments"].append(
{"id": result["hits"]["hits"][0]["id"]}
)

else:
raise UnexpectedValue(
subfield="a",
value=department,
field="experiment",
field="department",
message=f"Department {department} not found.",
stage="vocabulary match",
)

accelerators = json_entry.get("custom_fields", {}).get("cern:accelerators")
if accelerators:
vocab_type = "accelerators"
service = current_service_registry.get("vocabularies")
def field_accelerators(record_json, custom_fields_dict):
accelerators = record_json.get("custom_fields", {}).get("cern:accelerators", [])
for accelerator in accelerators:
try:
vocabulary_result = service.search(
system_identity, type=vocab_type, q=f"{accelerator}"
).to_dict()
except RequestError:
raise UnexpectedValue(
subfield="a",
value=accelerator,
field="experiment",
message=f"Accelerator {accelerator} "
f"not valid search phrase.",
stage="vocabulary match",
)
if vocabulary_result["hits"]["total"]:
result = search_vocabulary(accelerator, "accelerators")
if result["hits"]["total"]:

custom_fields["cern:accelerators"].append(
{"id": vocabulary_result["hits"]["hits"][0]["id"]}
custom_fields_dict["cern:accelerators"].append(
{"id": result["hits"]["hits"][0]["id"]}
)

else:
Expand All @@ -398,6 +367,24 @@ def _custom_fields(self, json_entry):
message=f"Accelerator {accelerator} not found.",
stage="vocabulary match",
)

custom_fields = {
"cern:experiments": [],
"cern:departments": [],
"cern:accelerators": [],
"cern:projects": [],
"cern:facilities": [],
"cern:studies": [],
}
try:
field_experiments(json_entry, custom_fields)
except RecordFlaggedCuration as exc:
RDMJsonLogger().add_success_state(
json_entry["recid"],
{"message": exc.message, "value": exc.value},
)
field_departments(json_entry, custom_fields)
field_accelerators(json_entry, custom_fields)
custom_fields["cern:projects"] = json_entry.get("custom_fields", {}).get(
"cern:projects", []
)
Expand Down Expand Up @@ -426,9 +413,10 @@ def transform(self, entry):
"files": self._files(record_dump),
"metadata": self._metadata(json_data),
}
custom_fields = self._custom_fields(json_data)
custom_fields = self._custom_fields(json_data, json_output)
if custom_fields:
json_output.update({"custom_fields": custom_fields})

return {
"created": self._created(json_data),
"updated": self._updated(record_dump),
Expand Down Expand Up @@ -571,7 +559,7 @@ def compute_files(file_dump, versions_dict):
{
file["full_name"]: {
"eos_tmp_path": tmp_eos_root
/ full_path.relative_to(legacy_path_root),
/ full_path.relative_to(legacy_path_root),
"id_bibdoc": file["bibdocid"],
"key": file["full_name"],
"metadata": {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class RestrictedFileDetected(CDSMigrationException):
description = "[Restricted file detected]"


class RecordAffiliationFlagged(CDSMigrationException):
class RecordFlaggedCuration(CDSMigrationException):
"""Record statistics error."""

description = "[Record affiliation needs to be checked]"
description = "[Record needs to be curated]"

0 comments on commit a05bc57

Please sign in to comment.