forked from cwhelan/quartetibdanalysis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
quartetibdanalysis
executable file
·482 lines (393 loc) · 19.1 KB
/
quartetibdanalysis
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
#!/usr/bin/env python
#
# Launcher script for GATK tools. Delegates to java -jar, spark-submit, or gcloud as appropriate,
# and sets many important Spark and htsjdk properties before launch.
#
# If running a non-Spark tool, or a Spark tool in local mode, will search for GATK executables
# as follows:
# -If the GATK_LOCAL_JAR environment variable is set, uses that jar
# -Otherwise if the GATK_RUN_SCRIPT created by "gradle installDist" exists, uses that
# -Otherwise uses the newest local jar in the same directory as the script or the BIN_PATH
# (in that order of precedence)
#
# If running a Spark tool, searches for GATK executables as follows:
# -If the GATK_SPARK_JAR environment variable is set, uses that jar
# -Otherwise uses the newest Spark jar in the same directory as the script or the BIN_PATH
# (in that order of precedence)
#
import sys
from subprocess import check_call, CalledProcessError, call
import os
import hashlib
import signal
import re
script = os.path.dirname(os.path.realpath(__file__))
projectName = "QuartetIBDAnalysis"
BUILD_LOCATION = script +"/build/install/" + projectName + "/bin/"
GATK_RUN_SCRIPT = BUILD_LOCATION + projectName
GATK_LOCAL_JAR_ENV_VARIABLE = "GATK_LOCAL_JAR"
GATK_SPARK_JAR_ENV_VARIABLE = "GATK_SPARK_JAR"
BIN_PATH = script + "/build/libs"
EXTRA_JAVA_OPTIONS_SPARK= "-DGATK_STACKTRACE_ON_USER_EXCEPTION=true " \
"-Dsamjdk.use_async_io_read_samtools=false " \
"-Dsamjdk.use_async_io_write_samtools=false " \
"-Dsamjdk.use_async_io_write_tribble=false " \
"-Dsamjdk.compression_level=2 "
PACKAGED_LOCAL_JAR_OPTIONS= ["-Dsamjdk.use_async_io_read_samtools=false",
"-Dsamjdk.use_async_io_write_samtools=true",
"-Dsamjdk.use_async_io_write_tribble=false",
"-Dsamjdk.compression_level=2"]
DEFAULT_SPARK_ARGS_PREFIX = '--conf'
DEFAULT_SPARK_ARGS = {
"spark.kryoserializer.buffer.max" : "512m",
"spark.driver.maxResultSize" : "0",
"spark.driver.userClassPathFirst" : "false",
"spark.io.compression.codec" : "lzf",
"spark.yarn.executor.memoryOverhead" : "600",
"spark.driver.extraJavaOptions" : EXTRA_JAVA_OPTIONS_SPARK,
"spark.executor.extraJavaOptions" : EXTRA_JAVA_OPTIONS_SPARK
}
def createSparkConfArgs(javaOptions):
sparkConfArgs = DEFAULT_SPARK_ARGS
if javaOptions is not None:
sparkConfArgs["spark.driver.extraJavaOptions"] = sparkConfArgs["spark.driver.extraJavaOptions"] + ' ' + javaOptions
sparkConfArgs["spark.executor.extraJavaOptions"] = sparkConfArgs["spark.executor.extraJavaOptions"] + ' ' + javaOptions
return DEFAULT_SPARK_ARGS_PREFIX, sparkConfArgs
class GATKLaunchException(Exception):
pass
def signal_handler(signal, frame):
sys.exit(1)
def main(args):
#suppress stack trace when killed by keyboard interrupt
signal.signal(signal.SIGINT, signal_handler)
try:
if len(args) is 0 or (len(args) is 1 and (args[0] == "--help" or args[0] == "-h")):
print("")
print(" Usage template for all tools (uses --spark-runner LOCAL when used with a Spark tool)")
print(" gatk AnyTool toolArgs")
print("")
print(" Usage template for Spark tools (will NOT work on non-Spark tools)")
print(" gatk SparkTool toolArgs [ -- --spark-runner <LOCAL | SPARK | GCS> sparkArgs ]")
print("")
print(" Getting help")
print(" gatk --list Print the list of available tools" )
print("")
print(" gatk Tool --help Print help on a particular tool" )
print("")
print(" Configuration File Specification")
print(" --gatk-config-file PATH/TO/GATK/PROPERTIES/FILE")
print("")
print(" gatk forwards commands to GATK and adds some sugar for submitting spark jobs")
print("")
print(" --spark-runner <target> controls how spark tools are run")
print(" valid targets are:")
print(" LOCAL: run using the in-memory spark runner")
print(" SPARK: run using spark-submit on an existing cluster ")
print(" --spark-master must be specified")
print(" --spark-submit-command may be specified to control the Spark submit command")
print(" arguments to spark-submit may optionally be specified after -- ")
print(" GCS: run using Google cloud dataproc")
print(" commands after the -- will be passed to dataproc")
print(" --cluster <your-cluster> must be specified after the --")
print(" spark properties and some common spark-submit parameters will be translated ")
print(" to dataproc equivalents")
print("")
print(" --dry-run may be specified to output the generated command line without running it")
print(" --java-options 'OPTION1[ OPTION2=Y ... ]' optional - pass the given string of options to the ")
print(" java JVM at runtime. ")
print(" Java options MUST be passed inside a single string with space-separated values.")
sys.exit(0)
if len(args) is 1 and args[0] == "--list":
args[0] = "--help" # if we're invoked with --list, invoke the GATK with --help
dryRun = "--dry-run" in args
if dryRun:
dryRun = True
args.remove("--dry-run")
javaOptions = getValueForArgument(args, "--java-options")
if javaOptions is not None:
i = args.index("--java-options")
del args[i] #remove javaOptions
del args[i] #and its parameter
sparkRunner = getValueForArgument(args, "--spark-runner")
if sparkRunner is not None:
i = args.index("--spark-runner")
del args[i] #remove spark target
del args[i] #and its parameter
sparkSubmitCommand = getValueForArgument(args, "--spark-submit-command")
if sparkSubmitCommand is not None:
i = args.index("--spark-submit-command")
del args[i] #remove sparkSubmitCommand target
del args[i] #and its parameter
(gatkArgs, sparkArgs) = getSplitArgs(args)
sparkMaster = getValueForArgument(sparkArgs, "--spark-master")
if sparkMaster is not None:
i = sparkArgs.index("--spark-master")
del sparkArgs[i] #remove spark target
del sparkArgs[i] #and its parameter
gatkArgs += ["--spark-master", sparkMaster]
runGATK(sparkRunner, sparkSubmitCommand, dryRun, gatkArgs, sparkArgs, javaOptions)
except GATKLaunchException as e:
sys.stderr.write(str(e)+"\n")
sys.exit(3)
except CalledProcessError as e:
sys.exit(e.returncode)
def getSparkSubmitCommand(sparkSubmitCommand):
if sparkSubmitCommand is None:
sparkhome = os.environ.get("SPARK_HOME")
if sparkhome is not None:
return sparkhome +"/bin/spark-submit"
else:
return "spark-submit"
else:
return sparkSubmitCommand
def getGCloudSubmitCommand(gcloudCmd):
if gcloudCmd is None:
gcloudHome = os.environ.get("GCLOUD_HOME")
if gcloudHome is not None:
return gcloudHome +"/gcloud"
else:
return "gcloud"
else:
return gcloudCmd
def getLocalGatkRunCommand(javaOptions):
localJarFromEnv = getJarFromEnv(GATK_LOCAL_JAR_ENV_VARIABLE)
# Add java options to our packaged local jar options
if javaOptions is not None:
PACKAGED_LOCAL_JAR_OPTIONS.extend(javaOptions.split())
if localJarFromEnv is not None:
return formatLocalJarCommand(localJarFromEnv)
wrapperScript = getGatkWrapperScript(throwIfNotFound=False)
if wrapperScript is not None:
# Add options to JAVA_OPTS environment var for dispatch script
if javaOptions is not None:
envJavaOpts = os.environ.get('JAVA_OPTS')
if envJavaOpts is not None:
envJavaOpts = envJavaOpts + ' ' + javaOptions
else:
envJavaOpts = javaOptions
os.environ['JAVA_OPTS'] = envJavaOpts
return [wrapperScript]
return formatLocalJarCommand(getLocalJar()) # will throw if local jar not found
def formatLocalJarCommand(localJar):
return ["java"] + PACKAGED_LOCAL_JAR_OPTIONS + [ "-jar", localJar]
def getGatkWrapperScript(throwIfNotFound=True):
if not os.path.exists(GATK_RUN_SCRIPT):
if throwIfNotFound:
raise GATKLaunchException("Missing wrapper script: " + GATK_RUN_SCRIPT + "\nTo generate the wrapper run:\n\n " + script + "/gradlew installDist")
else:
return None
sys.stderr.write("Using wrapper script " + GATK_RUN_SCRIPT)
return GATK_RUN_SCRIPT
def getLocalJar(throwIfNotFound=True):
localJar = findJar("local.jar", envVariableOverride=GATK_LOCAL_JAR_ENV_VARIABLE)
if localJar is None and throwIfNotFound:
raise GATKLaunchException("No local jar was found, please build one by running\n\n " +
script + "/gradlew localJar\n"
"or\n"
" export " + GATK_LOCAL_JAR_ENV_VARIABLE + "=<path_to_local_jar>")
return localJar
def getSparkJar(throwIfNotFound=True):
sparkJar = findJar("spark.jar", envVariableOverride=GATK_SPARK_JAR_ENV_VARIABLE)
if sparkJar is None and throwIfNotFound:
raise GATKLaunchException("No spark jar was found, please build one by running\n\n " +
script + "/gradlew sparkJar\n"
"or\n"
" export " + GATK_SPARK_JAR_ENV_VARIABLE + "=<path_to_spark_jar>")
return sparkJar
def findJar(jarSuffix, jarPrefix=projectName, envVariableOverride=None, jarSearchDirs=(script, BIN_PATH)):
if envVariableOverride is not None:
jarPathFromEnv = getJarFromEnv(envVariableOverride)
if jarPathFromEnv is not None:
return jarPathFromEnv
for jarDir in jarSearchDirs:
jar = getNewestJarInDir(jarDir, jarSuffix, jarPrefix)
if jar is not None:
sys.stderr.write("Using GATK jar " + jar)
return jar
return None
def getJarFromEnv(envVariableName):
jarPathFromEnv = os.environ.get(envVariableName)
if jarPathFromEnv is not None:
if not os.path.exists(jarPathFromEnv):
raise GATKLaunchException(envVariableName + " was set to: " + jarPathFromEnv + " but this file doesn't exist. Please fix your environment")
else:
sys.stderr.write("Using GATK jar " + jarPathFromEnv + " defined in environment variable " + envVariableName)
return jarPathFromEnv
return None
def getNewestJarInDir(dir, jarSuffix, jarPrefix):
if not os.path.exists(dir):
return None
dirContents = os.listdir(dir)
jarPattern = re.compile("^" + jarPrefix + ".*" + jarSuffix + "$")
jars = [f for f in dirContents if jarPattern.match(f)]
if len(jars) != 0:
newestJar = max(jars, key=lambda x: os.stat(dir + "/" + x).st_mtime)
return dir + "/" + newestJar
return None
def md5(file):
hash = hashlib.md5()
with open(file, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash.update(chunk)
return hash.hexdigest()
def cacheJarOnGCS(jar, dryRun):
staging = os.environ.get("GATK_GCS_STAGING")
if staging is not None:
staging = staging.strip('/')
staging = staging + '/'
if not staging.startswith("gs://"):
staging = "gs://" + staging
if dryRun is True:
return jar
elif staging is None:
sys.stderr.write( "\njar caching is disabled because GATK_GCS_STAGING is not set\n\n"
"please set GATK_GCS_STAGING to a bucket you have write access too in order to enable jar caching\n"
"add the following line to you .bashrc or equivalent startup script\n\n"
" export GATK_GCS_STAGING=gs://<my_bucket>/\n")
return jar
else:
jarname = os.path.basename(jar)
(name, ext) = os.path.splitext(jarname)
jarmd5 = md5(jar)
gcsjar = staging + name + "_"+ jarmd5 + ext
try:
if call(["gsutil", "-q", "stat", gcsjar]) is 0:
sys.stderr.write("\nfound cached jar: " + gcsjar + "\n")
return gcsjar
else:
if call(["gsutil", "cp", jar, gcsjar]) is 0:
sys.stderr.write("\nuploaded " + jar + " -> " + gcsjar + "\n")
return gcsjar
else:
sys.stderr.write("\nfailed to upload " + jar + " -> " + gcsjar + "\nThere may be something wrong with your bucket permissions or gsutil installation\n")
return jar
except OSError:
sys.stderr.write("\nTried to execute gsutil to upload the jar but it wasn't available\n "
"See https://cloud.google.com/sdk/#Quick_Start for instructions on installing gsutil\n\n")
return jar
def runGATK(sparkRunner, suppliedSparkSubmitCommand, dryrun, gatkArgs, sparkArgs, javaOptions):
if sparkRunner is None or sparkRunner == "LOCAL":
cmd = getLocalGatkRunCommand(javaOptions) + gatkArgs + sparkArgs
runCommand(cmd, dryrun)
elif sparkRunner == "SPARK":
sparkSubmitCmd = getSparkSubmitCommand(suppliedSparkSubmitCommand)
sparkConfArgsPrefix, sparkConfArgs = createSparkConfArgs(javaOptions)
sparkConfArgList = [[sparkConfArgsPrefix, "%s=%s" % (a, sparkConfArgs[a])] for a in sparkConfArgs];
cmd = [ sparkSubmitCmd,
"--master", getSparkMasterSpecified(gatkArgs)] \
+ [s for args in sparkConfArgList for s in args] \
+ sparkArgs \
+ [getSparkJar()] \
+ gatkArgs
try:
runCommand(cmd, dryrun)
except OSError:
raise GATKLaunchException("Tried to run %s but failed.\nMake sure %s is available in your path" % (sparkSubmitCmd, sparkSubmitCmd))
elif sparkRunner == "GCS":
jarPath = cacheJarOnGCS(getSparkJar(), dryrun)
gcloudCmd = getGCloudSubmitCommand(suppliedSparkSubmitCommand)
# Note: For GCS we don't need the prefix for the sparkConfArgs, so we ignore it and only grab the second
# return value of createSparkConfArgs
sparkConfArgs = createSparkConfArgs(javaOptions)[1]
dataprocargs = convertSparkSubmitToDataprocArgs(sparkConfArgs, sparkArgs)
sys.stderr.write("\nReplacing spark-submit style args with dataproc style args\n\n" + " ".join(sparkArgs) +" -> " + " ".join(dataprocargs) +"\n" )
cmd = [ gcloudCmd, "dataproc", "jobs", "submit", "spark"] \
+ dataprocargs \
+ ["--jar", jarPath] \
+ ["--"] + gatkArgs + ["--spark-master", "yarn"]
try:
runCommand(cmd, dryrun)
except OSError:
raise GATKLaunchException("Tried to run gcloud but failed.\nMake sure gcloud is available in your path and you are properly authenticated")
else:
raise GATKLaunchException("Value: " + sparkRunner + " is not a valid value for --spark-runner. Choose one of LOCAL, SPARK, GCS")
def runCommand(cmd, dryrun):
if dryrun:
print("\nDry run:\n")
# Display environment variables for dry run
if len(os.environ) != 0:
print(" Env:\n")
print( '\n'.join([' %s = %s' % (v, os.environ[v]) for v in os.environ]) )
print(" Cmd:\n")
print((" " + " ".join(cmd)+"\n"))
else:
sys.stderr.write( "\nRunning:\n")
sys.stderr.write(" " + " ".join(cmd)+"\n")
gatk_env = os.environ.copy()
gatk_env["SUPPRESS_GCLOUD_CREDS_WARNING"] = "true"
check_call(cmd, env=gatk_env)
def getSplitArgs(args):
inFirstGroup = True
firstArgs = []
secondArgs = []
for arg in args:
if arg == "--":
if not inFirstGroup:
raise GATKLaunchException("Argument '--' must only be specified once")
inFirstGroup = False
else:
if inFirstGroup:
firstArgs.append(arg)
else:
secondArgs.append(arg)
return (firstArgs, secondArgs)
def isDryRun(args):
return "--dry-run" in args
def getValueForArgument(args, argument):
if argument in args:
i = args.index(argument)
if len(args) <= i+1:
raise GATKLaunchException("Argument: " + argument + " requires a parameter")
return args[i+1]
return None
def getSparkMasterSpecified(args):
value = getValueForArgument(args, "--spark-master")
if value is None:
raise GATKLaunchException("The argument --spark-master <master url> must be specified")
else:
return value
# translate select spark-submit parameters to their gcloud equivalent
def convertSparkSubmitToDataprocArgs(sparkConfArgs, sparkArgs):
replacements = {"--driver-memory": "spark.driver.memory",
"--driver-cores": "spark.driver.cores",
"--executor-memory": "spark.executor.memory",
"--executor-cores": "spark.executor.cores",
"--num-executors": "spark.executor.instances" }
dataprocargs = []
filesToAdd = []
properties = []
try:
# Arguments passed as the sparkConfArgs should be passed through as properties.
# In practice yarn files will be added through the 'sparkArgs' argument that is parsed below,
# so we don't need to check for them up here.
properties.extend(['%s=%s' % (p, sparkConfArgs[p]) for p in sparkConfArgs])
# Iterate through sparkArgs
i = 0
while i < len(sparkArgs):
arg = sparkArgs[i]
if arg == "--conf":
i += 1
property = sparkArgs[i]
if "spark.yarn.dist.files" in property: #intercept yarn files and pass it through --files instead
files = property.split("=")[1]
filesToAdd = filesToAdd + files.split(",")
else:
properties.append(sparkArgs[i])
elif not replacements.get(arg) is None:
i += 1
propertyname = replacements.get(arg)
properties.append(propertyname + "=" + sparkArgs[i])
else:
dataprocargs.append(arg)
i +=1
except IndexError:
raise GATKLaunchException("Found an argument: " + arg + "with no matching value.")
if not len(properties) is 0:
dataprocargs.append("--properties")
dataprocargs.append(",".join(properties))
if not len(filesToAdd) is 0:
dataprocargs.append("--files")
dataprocargs.append(",".join(filesToAdd))
return dataprocargs
if __name__ == "__main__":
main(sys.argv[1:])