Skip to content

Commit de59c44

Browse files
committed
wip
1 parent 760d5dc commit de59c44

File tree

6 files changed

+273
-0
lines changed

6 files changed

+273
-0
lines changed

creator/dewrangle/__init__.py

Whitespace-only changes.

creator/dewrangle/client.py

+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import logging
2+
import json
3+
import os
4+
5+
import requests
6+
from graphql import GraphQLError
7+
from django.conf import settings
8+
9+
from .graphql import *
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class DewrangleClient(object):
15+
16+
def __init__(self, personal_access_token=None, url=None):
17+
pat = personal_access_token or settings.DATA_TRACKER_DEWRANGLE_TOKEN
18+
self.url = url or settings.DEWRANGLE_URL
19+
self.session = requests.Session()
20+
self.session.headers.update(
21+
{
22+
"X-Api-Key": pat,
23+
"Content-Type": "application/json"
24+
}
25+
)
26+
27+
def bulk_create_file_upload_invoices(self, study_id, invoices):
28+
"""
29+
Send graphql mutation to create a batch of file upload invoices
30+
in Dewrangle
31+
"""
32+
body = {
33+
"variables": {
34+
"input": {
35+
"studyId": study_id,
36+
"fileUploadInvoices": invoices
37+
}
38+
}
39+
}
40+
try:
41+
response = self.session.post(self.url, json=body)
42+
response.raise_for_status()
43+
except requests.exceptions.RequestException as e:
44+
logger.exception(
45+
f"Problem sending request to dewrangle {self.url}")
46+
raise
47+
48+
try:
49+
data = response.json()
50+
except json.exceptions.JSONDecodeError:
51+
logger.exception("Problem parsing JSON from response body")
52+
raise
53+
54+
if response.status_code != 200 or "errors" in data:
55+
msg = "There was a problem creating file upload invoices."
56+
logger.exception(msg)
57+
raise GraphQLError(
58+
f"{msg} Caused by:\n{data['errors']}"
59+
)
60+
61+
results = data["fileUploadInvoices"]
62+
63+
return results
64+
65+
def get_storage_analysis(self, obj_id):
66+
"""
67+
Send graphql query to get a StorageAnalysis object from Dewrangle
68+
"""
69+
pass
70+
71+
def get_storage_analyses(self, study_id=None):
72+
"""
73+
Send graphql query to get all StorageAnalysis objects or optionally,
74+
all StorageAnalysis objects for a particular study from Dewrangle
75+
"""
76+
pass

creator/dewrangle/graphql.py

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
2+
FILE_UPLOAD_INVOICE_CREATE = """
3+
mutation newUploadInvoices($input: FileUploadInvoicesCreateInput!) {
4+
fileUploadInvoiceCreate(input: $input) {
5+
fileUploadInvoices {
6+
created
7+
total
8+
}
9+
}
10+
}
11+
"""

creator/files/mutations/version.py

+21
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import logging
12
import graphene
23
import uuid
4+
import django_rq
35
from django.conf import settings
46
from django.db import transaction
57
from django_s3_storage.storage import S3Storage
@@ -16,6 +18,12 @@
1618
from creator.data_templates.nodes.template_version import TemplateVersionNode
1719
from creator.data_templates.models import TemplateVersion
1820
from creator.files.utils import evaluate_template_match
21+
from creator.files.tasks import (
22+
is_file_upload_manifest,
23+
push_to_dewrangle
24+
)
25+
26+
logger = logging.getLogger(__name__)
1927

2028

2129
class VersionMutation(graphene.Mutation):
@@ -220,6 +228,19 @@ def mutate(
220228
analysis.creator = user
221229
analysis.save()
222230

231+
if (
232+
settings.FEAT_DEWRANGLE_INTEGRATION and
233+
is_file_upload_manifest(version)
234+
):
235+
logger.info(
236+
f"Queued version {version.kf_id} {version.root_file.name} for"
237+
" audit processing..."
238+
)
239+
push_to_dewrangle(version.pk)
240+
# django_rq.enqueue(
241+
# push_to_dewrangle, version_id=version.pk
242+
# )
243+
223244
return VersionUploadMutation(success=True, version=version)
224245

225246

creator/files/tasks.py

+151
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
import logging
2+
import requests
3+
from pprint import pprint
4+
5+
from django.conf import settings
6+
7+
from creator.decorators import task
8+
from creator.files.models import Version
9+
from creator.analyses.analyzer import extract_data
10+
from creator.dewrangle.client import DewrangleClient
11+
from kf_lib_data_ingest.common.io import read_df
12+
13+
DATAFRAME_CHUNK_SIZE = 100
14+
KNOWN_FORMATS = {
15+
".csv": {"reader": pandas.read_csv, "sep": ","},
16+
".tsv": {"reader": pandas.read_csv, "sep": "\t"},
17+
".txt": {"reader": pandas.read_csv, "sep": None},
18+
}
19+
FILE_UPLOAD_MANIFEST_SCHEMA = {
20+
"required": [
21+
"Source File Name",
22+
"Hash",
23+
"Hash Algorithm",
24+
"Size",
25+
],
26+
"optional": [
27+
"Patient IDs",
28+
"Specimen IDs"
29+
]
30+
}
31+
32+
logger = logginer.getLogger(__name__)
33+
34+
35+
class ExtractDataError(Exception):
36+
pass
37+
38+
39+
def is_file_upload_manifest(version):
40+
"""
41+
Check whether this file version conforms to the File Upload Manifest schema
42+
"""
43+
header = {
44+
"_".join(c.split(" ")).lower()
45+
for c in read_df(version.key, nrows=0).columns
46+
}
47+
expected = ["_".join(c.split(" ")).lower()
48+
for c in FILE_UPLOAD_MANIFEST_SCHEMA["required"]]
49+
return expected <= header
50+
51+
52+
def chunked_dataframe_reader(version):
53+
"""
54+
Read a tabular file into chunks of Dataframes and yield chunks
55+
"""
56+
# Need to set storage location for study bucket if using S3 backend
57+
if settings.DEFAULT_FILE_STORAGE == "django_s3_storage.storage.S3Storage":
58+
if version.study is not None:
59+
study = version.study
60+
elif version.root_file is not None:
61+
study = version.root_file.study
62+
else:
63+
raise GraphQLError("Version must be part of a study.")
64+
65+
version.key.storage = S3Storage(aws_s3_bucket_name=study.bucket)
66+
67+
# Check file format
68+
_, ext = os.path.splitext(version.key.name)
69+
if ext not in KNOWN_FORMATS:
70+
raise IOError(
71+
"Unsupported file format. Unable to read file upload manifest: "
72+
f"{version.pk} {version.file_name}"
73+
)
74+
75+
# Read file into chunks (DataFrames)
76+
reader = KNOWN_FORMATS[ext]["reader"]
77+
delim = KNOWN_FORMATS[ext]["sep"]
78+
try:
79+
for i, chunk in enumerate(
80+
reader(version.key, sep=delim, chunksize=DATAFRAME_CHUNK_SIZE)
81+
):
82+
logger.info(
83+
f"Reading {DATAFRAME_CHUNK_SIZE} rows from "
84+
f"{version.file_name} into DataFrame"
85+
)
86+
yield chunk
87+
except Exception as e:
88+
er_msg = (
89+
f"Error in parsing {version.pk}: {version.file_name}"
90+
" content into a DataFrame."
91+
)
92+
raise ExtractDataError from e
93+
94+
95+
def dataframe_to_invoices(df):
96+
"""
97+
Helper to convert a file upload manifest DataFrame into a list of
98+
FileUploadInvoice dicts in preparation to send to Dewrangle API
99+
"""
100+
extract_cols = {
101+
c: "_".join(c.split(" ")).lower()
102+
for c in (
103+
FILE_UPLOAD_MANIFEST_SCHEMA["required"] +
104+
FILE_UPLOAD_MANIFEST_SCHEMA["optional"]
105+
)
106+
}
107+
df = df[[c for c in extract_cols if c in df.columns]]
108+
109+
mapping = {
110+
"source_file_name": "fileName",
111+
"hash": "hash",
112+
"hash_algorithm": "hashAlgorithm",
113+
"size": "size",
114+
"patient_ids": "patientIds",
115+
"specimen_ids": "specimenIds"
116+
}
117+
df = df.rename(columns=mapped_cols)
118+
119+
return df.to_dict(orient="records")
120+
121+
122+
@task("push_to_dewrangle")
123+
def push_to_dewrangle(version_id):
124+
"""
125+
Push the records in a file upload manifest to the Dewrangle API
126+
where they will processed to produce an audit report of files in
127+
cloud storage
128+
"""
129+
try:
130+
dewrangle = DewrangleClient()
131+
for df in chunked_dataframe_reader(version):
132+
logger.info(
133+
f"Submitting {df.shape[0]} file upload invoices to"
134+
f" {dewrangle.url}"
135+
)
136+
pprint(dataframe_to_invoices(df))
137+
# result = dewrangle.bulk_create_file_upload_invoices({
138+
# "studyId": version.study.pk,
139+
# "fileUploadInvoices": dataframe_to_invoices(df)
140+
# })
141+
logger.info(
142+
f"Success. Created: {result['created']} invoices. Total:"
143+
f" {results['total']} invoices"
144+
)
145+
146+
except Exception as e:
147+
# TODO Set Version.submitted_for_audit = fail
148+
raise
149+
150+
# TODO
151+
# set Version.submitted_for_audit = completed

creator/settings/features.py

+14
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,20 @@
55
import os
66

77

8+
# DEWRANGLE ##################################################################
9+
# Study creator delegates auditing of files uploaded to S3 buckets to the
10+
# Dewrangle API. User submitted file upload manifest must be pushed to
11+
# Dewrangle to complete auditing
12+
FEAT_DEWRANGLE_INTEGRATION = os.environ.get(
13+
"FEAT_DEWRANGLE_INTEGRATION", False
14+
)
15+
DEWRANGLE_URL = os.environ.get(
16+
"DEWRANGLE_URL", "http://localhost:3000/api/graphql"
17+
)
18+
DATA_TRACKER_DEWRANGLE_TOKEN = os.environ.get(
19+
"DATA_TRACKER_DEWRANGLE_TOKEN", None
20+
)
21+
822
# STUDIES ######################################################################
923
# The Study Creator's primary purpose is to create and track studies.
1024

0 commit comments

Comments
 (0)