Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: kids-first/kf-api-study-creator
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 16dc71d4b61fda8f47a4df02489788e19801137a
Choose a base ref
..
head repository: kids-first/kf-api-study-creator
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 4aaaa2186a2b58c1e734a29da68c5fd9c4f233ea
Choose a head ref
71 changes: 37 additions & 34 deletions creator/dewrangle/client.py
Original file line number Diff line number Diff line change
@@ -44,33 +44,50 @@ def _send_post_request(self, body):
logger.exception("Problem parsing JSON from response body")
raise

logger.debug(pformat(resp))

return resp, response.status_code

def bulk_create_file_upload_invoices(self, study_id, invoices):
def _send_mutation(self, body, mutation_name):
"""
Helper to send GraphQL mutation request and handle errors
"""
resp, status_code = self._send_post_request(body)

errors = resp.get("errors")

if status_code != 200 or errors:
msg = f"Failed {mutation_name} mutation @ {self.url}"
logger.exception(msg)
raise GraphQLError(
f"{msg} Caused by:\n{errors}"
)

if "data" not in resp:
raise GraphQLError(
f"Unexpected response format. Caused by:\n{pformat(resp)}"
)

return resp, status_code

def bulk_upsert_expected_files(self, study_id, expected_files):
"""
Send graphql mutation to create a batch of file upload invoices
Send graphql mutation to create a batch of expected_files
in Dewrangle
"""
body = {
"query": FILE_UPLOAD_INVOICE_CREATE.strip(),
"query": EXPECTED_FILE_UPSERT_MANY.strip(),
"variables": {
"input": {
"studyId": study_id,
"fileUploadInvoices": invoices
"expectedFiles": expected_files
}
}
}

resp, status_code = self._send_post_request(body)

if status_code != 200 or "errors" in resp:
msg = f"Failed fileUploadInvoiceCreate mutation @ {self.url}"
logger.exception(msg)
raise GraphQLError(
f"{msg} Caused by:\n{resp['errors']}"
)

results = resp["data"]["fileUploadInvoiceCreate"]["fileUploadInvoices"]
mutation_name = "expectedFileUpsertMany"
resp, status_code = self._send_mutation(body, mutation_name)
results = resp["data"][mutation_name]

return results

@@ -90,16 +107,9 @@ def create_study(self, study):
}
}

resp, status_code = self._send_post_request(body)

if status_code != 200 or "errors" in resp:
msg = f"Failed studyCreate mutation @ {self.url}"
logger.exception(msg)
raise GraphQLError(
f"{msg} Caused by:\n{resp['errors']}"
)

results = resp["data"]["studyCreate"]["study"]
mutation_name = "studyCreate"
resp, status_code = self._send_mutation(body, mutation_name)
results = resp["data"][mutation_name]["study"]

study.dewrangle_id = results["id"]
study.save()
@@ -121,16 +131,9 @@ def create_organization(self, organization):
}
}

resp, status_code = self._send_post_request(body)

if status_code != 200 or "errors" in resp:
msg = f"Failed organizationCreate mutation @ {self.url}"
logger.exception(msg)
raise GraphQLError(
f"{msg} Caused by:\n{resp['errors']}"
)

results = resp["data"]["organizationCreate"]["organization"]
mutation_name = "organizationCreate"
resp, status_code = self._send_mutation(body, mutation_name)
results = resp["data"][mutation_name]["organization"]

organization.dewrangle_id = results["id"]
organization.save()
41 changes: 5 additions & 36 deletions creator/dewrangle/graphql.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,11 @@

FILE_UPLOAD_INVOICE_CREATE = """
mutation newUploadInvoices($input: FileUploadInvoicesCreateInput!) {
fileUploadInvoiceCreate(input: $input) {
EXPECTED_FILE_UPSERT_MANY = """
mutation newExpectedFiles($input: ExpectedFileManyInput!) {
expectedFileUpsertMany(input: $input) {
errors {
message
}
fileUploadInvoices {
created
total
}
upserted
total
}
}
"""

STUDY_CREATE = """
mutation ($input: StudyCreateInput!) {
studyCreate(input: $input) {
errors {
message
}
study {
id
name
}
}
}
"""

ORG_CREATE = """
mutation ($input: OrganizationCreateInput!) {
organizationCreate(input: $input) {
errors {
message
}
organization {
id
name
}
}
}
"""
30 changes: 24 additions & 6 deletions creator/files/management/commands/upload.py
Original file line number Diff line number Diff line change
@@ -14,11 +14,12 @@
)
from creator.studies.factories import StudyFactory, Study
from creator.files.models import Version, File as StudyFile
from creator.storage_analyses.tasks.file_upload_invoice import (
from creator.storage_analyses.tasks.expected_file import (
prepare_audit_submission
)

MANIFEST_FILE = "demo/data/cbtn-file-upload-manifest.tsv"
# MANIFEST_FILE = "demo/data/cbtn-file-upload-manifest.tsv"
MANIFEST_FILE = "demo/data/cbtn_file_upload_manifest_12_03_2021.csv"
logger = logging.getLogger(__name__)


@@ -33,7 +34,7 @@ def update_version_content(df, file_version):
file_version.save()


def prep_file(manifest_fp=MANIFEST_FILE):
def prep_file(manifest_fp, **read_kwargs):
"""
Create a study, file, version with a file upload manifest
"""
@@ -51,7 +52,8 @@ def prep_file(manifest_fp=MANIFEST_FILE):
version.save()

# Create version w upload manifest
df = pandas.read_csv(manifest_fp, sep="\t")
# df = pandas.read_csv(manifest_fp, sep="\t")
df = pandas.read_csv(manifest_fp, **read_kwargs)
update_version_content(df, version)

logger.info(f"Prepped study, file and version: {version}")
@@ -62,15 +64,31 @@ class Command(BaseCommand):
help = 'Create file upload manifest for testing dewrangle integration'

def add_arguments(self, parser):
parser.add_argument('--manifest_file', help='Path to manifest file')
parser.add_argument(
'--manifest_file',
default=MANIFEST_FILE,
help='Path to manifest file'
)
parser.add_argument(
'--nrows',
type=int,
default=None,
help='Number of rows to read'
)

def handle(self, *args, **options):
"""
Fake a version upload so we can test the push to dewrangle
functionality
"""
kwargs = {}
if options["nrows"]:
kwargs.update({"nrows": options["nrows"]})

fp = options["manifest_file"]

# Create version with upload manifest content
version = prep_file()
version = prep_file(fp, **kwargs)

# Pretend to call upload version
if (
2 changes: 1 addition & 1 deletion creator/files/mutations/file.py
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@
from creator.events.models import Event
from creator.files.nodes.file import FileNode, FileType
from creator.data_templates.models import TemplateVersion
from creator.storage_analyses.tasks.file_upload_invoice import (
from creator.storage_analyses.tasks.expected_file import (
prepare_audit_submission
)

2 changes: 1 addition & 1 deletion creator/files/mutations/version.py
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@
from creator.data_templates.nodes.template_version import TemplateVersionNode
from creator.data_templates.models import TemplateVersion
from creator.files.utils import evaluate_template_match
from creator.storage_analyses.tasks.file_upload_invoice import (
from creator.storage_analyses.tasks.expected_file import (
prepare_audit_submission
)

2 changes: 1 addition & 1 deletion creator/settings/development.py
Original file line number Diff line number Diff line change
@@ -121,7 +121,7 @@

DATABASES = {
"default": {
"ENGINE": "django.db.backends.postgresql",
"ENGINE": "psqlextra.backend",
"NAME": os.environ.get("PG_NAME", "postgres"),
"USER": os.environ.get("PG_USER", "postgres"),
"PASSWORD": os.environ.get("PG_PASS", "postgres"),
2 changes: 1 addition & 1 deletion creator/settings/production.py
Original file line number Diff line number Diff line change
@@ -157,7 +157,7 @@ def traces_sampler(sampling_context):

DATABASES = {
"default": {
"ENGINE": "django.db.backends.postgresql",
"ENGINE": "psqlextra.backend",
"NAME": os.environ.get("PG_NAME", "postgres"),
"USER": os.environ.get("PG_USER", "postgres"),
"PASSWORD": os.environ.get("PG_PASS", "postgres"),
2 changes: 1 addition & 1 deletion creator/settings/testing.py
Original file line number Diff line number Diff line change
@@ -117,7 +117,7 @@

DATABASES = {
"default": {
"ENGINE": "django.db.backends.postgresql",
"ENGINE": "psqlextra.backend",
"NAME": os.environ.get("PG_NAME", "postgres"),
"USER": os.environ.get("PG_USER", "postgres"),
"PASSWORD": os.environ.get("PG_PASS", "postgres"),
38 changes: 16 additions & 22 deletions creator/storage_analyses/models.py
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@
from django.db import models
from django.contrib.auth import get_user_model
from django_fsm import FSMField, transition
from django.contrib.postgres.fields import JSONField
from django.utils import timezone
from django.conf import settings
from django.core.validators import MinValueValidator
@@ -35,38 +36,36 @@ class AuditState:
"size",
],
"optional": [
"patient_ids",
"specimen_ids"
]
}
UNIQUE_CONSTRAINT = ["source_file_name", "hash", "hash_algorithm"]
UNIQUE_CONSTRAINT = ["source_file_name", "study_id"]


class FileUploadInvoice(models.Model):
class ExpectedFile(models.Model):
"""
A row in a File Upload Manifest which represents a file that was
uploaded directly to a study's cloud storage.
Each file upload invoice is submitted to Dewrangle, which is the system
Each file record is submitted to Dewrangle, which is the system
that will audit/determine whether the file exists in the study's cloud
storage
"""

class Meta:
permissions = [
("list_all_fileuploadinvoices", "Show all file_upload_invoices"),
("list_all_expectedfiles", "Show all expected_files"),
]
constraints = [
models.UniqueConstraint(
fields=UNIQUE_CONSTRAINT, name="unique_file_upload_invoice"
fields=UNIQUE_CONSTRAINT, name="unique_expected_file"
)
]

id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
created_at = models.DateTimeField(
default=timezone.now,
null=True,
help_text="When the file upload invoice created",
help_text="When the expected file was created",
)
source_file_name = models.CharField(
max_length=200,
@@ -87,25 +86,20 @@ class Meta:
],
help_text="Size of the uploaded file in bytes",
)
patient_ids = models.TextField(
null=True,
help_text="Delimited string of the patient identifiers that this file"
"is related to ",
)
specimen_ids = models.TextField(
null=True,
help_text="Delimited string of the specimen identifiers that this file"
"is related to ",
custom_fields = JSONField(
default=dict,
help_text="Additional fields in the file upload manifest that need"
" to be captured",
)
audit_state = FSMField(
default=AuditState.NOT_SUBMITTED,
help_text="The state of the invoice in the submission process to"
help_text="The state of the expected file in the submission process to"
"Dewrangle",
)
study = models.ForeignKey(
Study,
on_delete=models.CASCADE,
related_name="file_upload_invoices",
related_name="expected_files",
help_text="id of the study whose cloud storage this file was uploaded"
" to",
)
@@ -117,7 +111,7 @@ class Meta:
)
def start_submission(self):
"""
Begin the process to submit this file upload invoice to the auditing
Begin the process to submit this expected file to the auditing
system.
"""
pass
@@ -126,7 +120,7 @@ def start_submission(self):
target=AuditState.SUBMITTED)
def complete_submission(self):
"""
Mark completion of the file upload invoice being submitted to the
Mark completion of the expected file being submitted to the
auditing system
"""
pass
@@ -136,7 +130,7 @@ def complete_submission(self):
)
def fail_submission(self):
"""
Fail submission of the file upload invoice to the audit system due
Fail submission of the expected file to the audit system due
to some unexpected error
"""
pass
251 changes: 251 additions & 0 deletions creator/storage_analyses/tasks/expected_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
import os
import logging

import pandas
import django_rq
from django.conf import settings
from django.db import IntegrityError

from creator.decorators import task
from creator.studies.models import Study
from creator.files.models import Version
from creator.storage_analyses.models import (
UNIQUE_CONSTRAINT,
FILE_UPLOAD_MANIFEST_SCHEMA,
ExpectedFile,
AuditState
)
from creator.storage_analyses import utils
from creator.dewrangle.client import DewrangleClient
from pprint import pprint

EXPECTED_FILE_BATCH_SIZE = 1000
REQUIRED_COLS = FILE_UPLOAD_MANIFEST_SCHEMA["required"]
OPTIONAL_COLS = FILE_UPLOAD_MANIFEST_SCHEMA["optional"]

logger = logging.getLogger(__name__)


def _bulk_update_audit_state(expected_files, method_name):
"""
Bulk update the audit state for a batch of ExpectedFiles via
its state transition method
"""
for f in expected_files:
state_transition = getattr(f, method_name)
state_transition()

return ExpectedFile.objects.bulk_update(
expected_files, fields=["audit_state"]
)


def _dataframe_to_expected_files(df, study):
"""
Helper to convert a file upload manifest DataFrame into a list of
ExpectedFile dicts in preparation to send to Dewrangle API
"""
extract_cols = REQUIRED_COLS + OPTIONAL_COLS

# Clean up col names
df = df.rename(
columns={
c: "_".join(c.strip().split(" ")).lower()
for c in df.columns
}
)

# Collapse additional columns into a custom_fields column
custom_cols = [c for c in df.columns if c not in extract_cols]
df["custom_fields"] = df.apply(
lambda row: {k: v for k, v in row.items() if k in custom_cols}, axis=1
)

# Add study_id column
df["study_id"] = study.pk

# Extract only what we need to create expected files
extract_cols.extend(["custom_fields", "study_id"])
df = df[[c for c in extract_cols if c in df.columns]]

# Try weeding out duplicate records early
duplicates = df.duplicated(subset=UNIQUE_CONSTRAINT)
df = df[~duplicates]
dups = df[duplicates].shape[0]
if dups:
logger.warning(
f"Dropping {dups} duplicate rows based on "
f"unique constraint: {UNIQUE_CONSTRAINT}"
)

# Drop any rows where required cols are null
missing_required = df[REQUIRED_COLS].isnull().any(axis=1)
df = df[~missing_required]
req = df[missing_required].shape[0]
if req:
logger.warning(
f"Dropping {req} rows due to one or "
f"more required fields: {REQUIRED_COLS} being null"
)

return df.to_dict(orient="records")


@task("prepare_audit_submission")
def prepare_audit_submission(version_id):
"""
Read the content of a file upload manifest and produce a ExpectedFile
for each row
Save ExpectedFiles to the study so they can later be submitted to the
auditing system (Dewrangle) to verify that the file exists in the
study's cloud storage
"""
logger.info(
f"Preparing expected files in {version_id} for submission to "
"audit system"
)
version = Version.objects.get(pk=version_id)
study = version.root_file.study
try:
total_upserted = 0
total_rows = 0

# Iterate over large DataFrame in smaller DataFrame chunks and
# Submit rows in each DataFrame chunk to bulk upsert ExpectedFiles
for df in utils.chunked_dataframe_reader(
version,
batch_size=EXPECTED_FILE_BATCH_SIZE
):
logger.info(
f"Reading {EXPECTED_FILE_BATCH_SIZE} rows from "
f"{version.file_name} into DataFrame"
)

total_rows += df.shape[0]

upserted_files = utils.bulk_upsert_expected_files(
_dataframe_to_expected_files(df, study),
)

total_upserted += len(upserted_files)

logger.info(
f"Upserted {total_upserted} expected_files. Total rows "
f"processed: {total_rows} in file upload manifest: "
f"{version_id} ..."
)
except Exception as e:
logger.exception(
f"Something went wrong while processing version {version_id} "
"in prepration for audit submission"
)
version.fail_audit_prep()
version.save()
raise

version.complete_audit_prep()
version.save()

django_rq.enqueue(
submit_study_for_audit, study.pk
)


@task("submit_study_for_audit")
def submit_study_for_audit(study_id):
"""
Submit a Study's ExpectedFiles to the auditing system (Dewrangle)
for audit processing
Only submit the files that have not started or have previously failed
submission
"""
dewrangle = DewrangleClient()
study = Study.objects.get(pk=study_id)

logger.info(
f"Begin submitting expected files for study {study_id} "
f"to audit system @ {dewrangle.url}"
)
states = [AuditState.NOT_SUBMITTED, AuditState.FAILED]
count = ExpectedFile.objects.filter(audit_state__in=states).count()

if not count:
logger.info(
"There are no more expected files to submit for study "
f"{study_id}"
)
return

# Create study/org in Dewrangle if they don't exist
if not study.organization.dewrangle_id:
dewrangle.create_organization(study.organization)
logger.info(
f"Created new organization {study.organization.dewrangle_id}"
f" in dewrangle"
)

if not study.dewrangle_id:
dewrangle.create_study(study)
logger.info(
f"Created new study {study.dewrangle_id} in dewrangle"
)

batched_expected_files = utils.batched_queryset_iterator(
ExpectedFile.objects.filter(audit_state__in=states).all(),
batch_size=EXPECTED_FILE_BATCH_SIZE
)
for batch in batched_expected_files:
# Set audit state to submitting
_bulk_update_audit_state(batch, "start_submission")
try:
payloads = [
{
"location": expected_file.source_file_name,
"hash": expected_file.hash,
"hashAlgorithm": expected_file.hash_algorithm.upper(),
"size": expected_file.size,
}
for expected_file in batch
]
result = dewrangle.bulk_upsert_expected_files(
study.dewrangle_id, payloads
)
logger.info(
f"Successfully submitted expected_files: {result['upserted']} "
f"Total expected_files: {result['total']} "
)
except Exception as e:
logger.exception(
"Failed submitting batch of expected_files to audit system @"
f" {dewrangle.url}"
)

# Set audit state to completed submission
_bulk_update_audit_state(batch, "fail_submission")

raise

# Set audit state to completed submission
_bulk_update_audit_state(batch, "complete_submission")


@task("retry_failed_audit_prep")
def retry_failed_audit_prep(study_id):
"""
Scan for any versions that failed preparation for audit submission and
retry processing them
"""
pass


@task("prepare_expected_files")
def delete_submitted_expected_files(study_id):
"""
Delete successfully submitted expected_files that are older than 1 week
Once a ExpectedFile has successfully been submitted to the auditing
system for processing it is no longer needed.
"""
pass
232 changes: 0 additions & 232 deletions creator/storage_analyses/tasks/file_upload_invoice.py

This file was deleted.

11 changes: 11 additions & 0 deletions creator/storage_analyses/utils.py
Original file line number Diff line number Diff line change
@@ -3,6 +3,9 @@

import pandas
from django.conf import settings
from psqlextra.util import postgres_manager
from psqlextra.query import ConflictAction
from creator.storage_analyses.models import ExpectedFile, UNIQUE_CONSTRAINT

KNOWN_FORMATS = {
".csv": {"reader": pandas.read_csv, "sep": ","},
@@ -54,3 +57,11 @@ def batched_queryset_iterator(queryset, batch_size=None):
iterator = queryset.iterator(chunk_size=batch_size)
for thing in iterator:
yield [ti for ti in chain([thing], islice(iterator, batch_size - 1))]


def bulk_upsert_expected_files(files):
"""
Bulk upsert ExpectedFile
"""
with postgres_manager(ExpectedFile) as manager:
return manager.bulk_upsert(UNIQUE_CONSTRAINT, rows=files)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -26,3 +26,4 @@ kf_lib_data_ingest @ git+https://git@github.com/kids-first/kf-lib-data-ingest.gi
graphene_django @ git+https://git@github.com/graphql-python/graphene-django.git@d52b18a700d2c1cec4b2de1f673bff14e2d18071
marshmallow==3.12.1
XlsxWriter==1.4.4
django-postgres-extra==2.0.3