From def479c2a3b4d326fb823cdad2aa9f5332d14b54 Mon Sep 17 00:00:00 2001 From: Mike Dacre Date: Wed, 10 May 2017 14:43:41 -0700 Subject: [PATCH 1/5] Switching to a many-to-many plink model for speed --- LD_Direction/compare_snp_lists.py | 268 +++++++++++++++++++++--------- 1 file changed, 185 insertions(+), 83 deletions(-) diff --git a/LD_Direction/compare_snp_lists.py b/LD_Direction/compare_snp_lists.py index 6739fec..2246462 100644 --- a/LD_Direction/compare_snp_lists.py +++ b/LD_Direction/compare_snp_lists.py @@ -51,12 +51,14 @@ import os as _os import re as _re import sys as _sys +import sqlite3 as _sqlite3 import pickle as _pickle import subprocess as _sub import argparse as _argparse import multiprocessing as _mp import bz2 as _bz2 import gzip as _gzip +from time import sleep as _sleep from tempfile import mkstemp as _temp try: @@ -87,7 +89,9 @@ from .distance_filtering import distance_filter -DB_PATH = '/godot/dbsnp' +MIN_BUNDLE = 150 # Minimum number of jobs per node, 150 takes less than a minute. + +DB_PATH = '/godot/dbsnp/db/hg19' DB_VERS = 150 POPULATIONS = ["ALL", "AFR", "AMR", "EAS", "EUR", "SAS", "ACB", "ASW", "BEB", @@ -229,12 +233,15 @@ def __init__(self, pop_file=None, plink='plink'): individuals[pop].append(ind) self.individuals = individuals - def pop_file(self, populations=None): + def pop_file(self, populations=None, outfile=None): """Write temp file with a list of individuals in population.""" populations = populations if populations else self.individuals.keys() if isinstance(populations, str): populations = [populations] populations = list(populations) + if outfile and _os.path.isfile(outfile): + self.written_files[','.join(populations)] = outfile + return outfile if ','.join(populations) in self.written_files: fl = self.written_files[','.join(populations)] if _os.path.isfile(fl): @@ -255,14 +262,15 @@ def pop_file(self, populations=None): pop_ids = sorted(set(pop_ids)) - _, file_location = _temp(prefix='-'.join(populations), dir='/tmp') - with open(file_location, 'w') as outfile: - outfile.write('\n'.join( + if not outfile: + _, outfile = _temp(prefix='-'.join(populations), dir='/tmp') + with open(outfile, 'w') as fout: + fout.write('\n'.join( [' '.join(x) for x in zip(pop_ids, pop_ids)] )) - self.written_files[','.join(populations)] = file_location - return file_location + self.written_files[','.join(populations)] = outfile + return outfile def bim_snps(self, chrom): """Return and cache all SNPs in a bim file.""" @@ -411,6 +419,10 @@ def list_to_rsid_and_locs(list1, raise_on_missing=False): Uses a single large dbSNP lookup to create a dictionary and then parses the two lists. + If more than 5000 items in rsid list, creates a temp table and joins it. + + Requires about 25 minutes for 250,000 rsIDs. + Parameters ---------- list1 : list_of_str @@ -466,15 +478,26 @@ def list_to_rsid_and_locs(list1, raise_on_missing=False): if list1_rs: _sys.stderr.write('Doing rsID DB lookup.\n') - db = _dbSNP.DB(DB_PATH, DB_VERS) + #print(_arrow.now()) + if len(list1_rs) > 5000: + conn = _sqlite3.connect(DB_PATH + '/dbsnp{}.db'.format(DB_VERS)) + conn.execute("CREATE TEMP TABLE 'temp' (name)") + conn.executemany("INSERT INTO temp (name) VALUES (?)", [(i,) for i in list1_rs]) + q = conn.execute("SELECT dbSNP.name, dbSNP.chrom, dbSNP.start, dbSNP.end FROM dbSNP INNER JOIN temp ON dbSNP.name=temp.name").fetchall() + else: + db = _dbSNP.DB(DB_PATH, DB_VERS) + q = [] + for chunk in [list1_rs[i:i + 990] for i in range(0, len(list1_rs), 990)]: + q += db.query( + db.Row.name, db.Row.chrom, db.Row.start, db.Row.end + ).filter( + db.Row.name.in_(chunk) + ).all() rs_lookup = { - i[0]: (i[0], i[1], i[2]+1) for i in db.query( - db.Row.name, db.Row.chrom, db.Row.start, db.Row.end - ).filter( - db.Row.name.in_(list1_rs) - ) + i[0]: (i[0], i[1], i[2]+1) for i in q if not i[3]-i[2] > 1 } + #print(_arrow.now()) if list1_locs: _sys.stderr.write('Doing position DB lookup.\n') @@ -483,7 +506,7 @@ def list_to_rsid_and_locs(list1, raise_on_missing=False): for chrom, loc in [i.split(':') for i in list1_locs]: if chrom not in query: query[chrom] = [] - query[chrom].append(int(loc-1)) + query[chrom].append(int(loc)-1) loc_lookup = { '{}:{}'.format(i.chrom, i.start+1): (i.name, i.chrom, i.start+1) for i in db.lookup_locations(query) @@ -491,7 +514,7 @@ def list_to_rsid_and_locs(list1, raise_on_missing=False): } failed = [] - results = {} + results = [] done = [] for snp in list1: if isinstance(snp, str) and '\t' in snp: @@ -511,12 +534,10 @@ def list_to_rsid_and_locs(list1, raise_on_missing=False): failed.append(snp) continue s_rs, s_chrom, s_loc = loc_lookup[snp] - if s_chrom not in results: - results[s_chrom] = [] if s_rs in done: continue done.append(s_rs) - results[s_chrom].append((s_rs, s_loc)) + results.append((s_rs, s_chrom, s_loc)) if failed: err = '{} not in dbSNP'.format(snp) if raise_on_missing: @@ -526,6 +547,16 @@ def list_to_rsid_and_locs(list1, raise_on_missing=False): return results +def snp_list_to_dict(list1): + """Convert a list from list_to_rsid_and_locs() to a chrom dict.""" + results = {} + for s_rs, s_chrom, s_loc in list1: + if s_chrom not in results: + results[s_chrom] = [] + results[s_chrom].append((s_rs, int(s_loc))) + return results + + def save_list(snp_list, outfile=None, pickle=False): """Convert a list of rsIDs or positions to a pre-processed format. @@ -555,21 +586,17 @@ def save_list(snp_list, outfile=None, pickle=False): _sys.stderr.write('Parsing input list.\n') parsed_list = list_to_rsid_and_locs(snp_list) _sys.stderr.write('Done.\n') - output = [] - for chrom, info in parsed_list.items(): - for rsid, loc in info: - output.append((rsid, chrom, loc)) if outfile: _sys.stderr.write('Writing output.\n') if pickle: with _open_zipped(outfile, 'wb') as fout: - _pickle.dump(output, fout) + _pickle.dump(parsed_list, fout) else: with _open_zipped(outfile, 'w') as fout: - for row in output: + for row in parsed_list: rsid, chrom, loc = row fout.write('{}\t{}\t{}\n'.format(rsid, chrom, loc)) - return output + return parsed_list @@ -606,25 +633,29 @@ def filter_by_distance(ref_snps, comp_snps, distance='50kb'): """ # Calculate distance if isinstance(distance, str): - dist, mod = _re.match(r'(\d+)(\D+)', '50kb').groups() + dist, mod = _re.match(r'(\d+)(\D+)', distance).groups() dist = int(dist) mod = mod.lower() - if not mod in ['kb', 'mb']: - raise ValueError('Cannot parse {}, must be in kb or mb only' - .format(distance)) - if mod == 'kb': - dist = dist * 1000 - elif mod == 'mb': - dist = dist * 1000000 + if mod: + if not mod in ['kb', 'mb']: + raise ValueError('Cannot parse {}, must be in kb or mb only' + .format(distance)) + if mod == 'kb': + dist = dist * 1000 + elif mod == 'mb': + dist = dist * 1000000 results = {} for chrom, snps in ref_snps.items(): + if chrom not in comp_snps: + continue results[chrom] = distance_filter(snps, comp_snps[chrom], dist) return results -def filter_by_ld(pairs, r2=0.6, populations=None, plink='plink'): +def filter_by_ld(pairs, r2=0.6, populations=None, plink='plink', + cluster_jobs=100, force_local=False, **cluster_opts): """Use plink to lookup LD and filter lists by the r2. Parameters @@ -637,43 +668,101 @@ def filter_by_ld(pairs, r2=0.6, populations=None, plink='plink'): (mega-bases) plink : str, optional Path to plink executable, otherwise searches PATH + cluster_jobs : int, optional + Number of jobs to run on the cluster if cluster running is enabled, if + fyrd is not enabled, this ends up being the number of local cores (in + that case, the max is the result of multiprocessing.cpu_count()). + force_local : bool, optional + If True, disallow running jobs on the cluster with fyrd + cluster_opts : dict, optional + Optional keyword arguments to pass to fyrd """ plink = PLINK(plink=plink) + plink.pop_file(populations, outfile=','.join(populations) + '.pops.txt') l = 0 for v in pairs.values(): - for i in [i[2] for i in v]: + for i in [i[2] for i in v if i[2]]: l += len(i) - multi = True if l > 200 else False - if multi: - jobs = {} - if _fyrd and _fyrd.queue.MODE != 'local': + multi = True if l > 200 else False # 200 jobs take 50 seconds + if multi and cluster_jobs > 1: + # Set up cluster running + if _fyrd and _fyrd.queue.MODE != 'local' and not force_local: print('Running jobs on the cluster with fyrd.') + fyrd = True else: - pool = _mp.Pool() - print('Running jobs in parallel') - - results = {} + print('Running jobs in parallel locally') + cores = max(cluster_jobs, _mp.cpu_count()) + pool = _mp.Pool(cores) + fyrd = False + # Bundle jobs + bundle_count = int(l/cluster_jobs) + bundle_count = max(bundle_count, MIN_BUNDLE) + count = bundle_count + # Get walltime based on 6s per job + 40 minutes + # Average on a standard machine is 2 seconds, so this is + # an overestimate + time = _to_time((bundle_count*6)) + _sys.stderr.write('Estimated max time per job {} '.format(time) + + '(walltime req will add 40 min to this)\n') + + jobs = [[]] + n = 0 + skipped = 0 for chrom in sorted(pairs, key=_chrom_sort): snps = pairs[chrom] - print('Working on chromosome {}'.format(chrom)) - args = (snps, plink, chrom, r2, populations) + args = (chrom, r2, populations, False) + # Bundle jobs + if not multi: + # Bundle by chromosome if not multithreading + bundle_count = len(snps) + count = bundle_count + for snp, _, comp_list in snps: + if not comp_list: + skipped += 1 + continue + comp_snps = [rsid for rsid, loc in comp_list] + if not count: + count = bundle_count + jobs.append([]) + n += 1 + jobs[n].append((snp, comp_snps) + args) + count -= 1 + + if skipped: + _sys.stderr.write('{} jobs had no comparison list and were skipped\n' + .format(skipped)) + # Submit jobs + running = [] + results = {} + for jobset in jobs: if multi: - args += (False, ) - if _fyrd and _fyrd.queue.MODE != 'local': - jobs[chrom] = _fyrd.submit( - _ld_job, args, walltime="01:00:00", mem='8G', cores=4) + if fyrd: + time = _to_time((len(jobset)*6)+2400) + running.append( + _fyrd.submit( + _ld_job, (jobset, plink), + walltime=time, mem='8G', cores=4, **cluster_opts) + ) else: - jobs[chrom] = pool.apply_async( - _ld_job, args) + running.append(pool.apply_async(_ld_job, jobset)) else: - results[chrom] = _ld_job(*args) + # Just run jobs per chromosome + results.update(_ld_job(jobset, plink)) if multi: print('Getting results from parallel jobs') - for chrom, job in jobs.items(): - results[chrom] = job.get() - if not _fyrd and _fyrd.queue.MODE != 'local': + if fyrd: + todo = len(running) + while todo: + for job in running: + if job.done: + results.update(job.get()) + todo -= 1 + _sleep(2) + else: + for job in running: + results.update(job.get()) pool.close() # Add all data back again @@ -681,36 +770,40 @@ def filter_by_ld(pairs, r2=0.6, populations=None, plink='plink'): for chrom, data in pairs.items(): for snp, loc, comp_list in data: filtered[snp] = {'loc': loc, 'chrom': chrom} - if snp in results[chrom]: - if results[chrom][snp]: + if snp in results: + if results[snp]: filtered[snp]['matches'] = {} for snp2, pos2 in comp_list: - if snp2 in results[chrom][snp]: - filtered[snp]['matches'][snp2] = results[chrom][snp][snp2] + if snp2 in results[snp]: + filtered[snp]['matches'][snp2] = results[snp][snp2] filtered[snp]['matches'][snp2]['loc'] = pos2 return filtered -def _ld_job(snps, plink, chrom, r2, populations, pbar=True): - """Private function to run plink, see filter_by_ld().""" - if pbar and pb: - it = pb(snps, unit='plink_calculations') - log = it - else: - it = snps - log = _sys.stderr +def _ld_job(jobs, plink): + """Private function to run plink, see filter_by_ld(). + + Parameters + ---------- + jobs : list_of_tuple + A list of job arguments: + (snp, comp_snps, chrom, r2, populations, raise_on_error, logfile) + plink : PLINK + An initialized plink class + """ snp_results = {} - for snp, _, comp_list in it: - if not comp_list: - continue - comp_snps = [rsid for rsid, loc in comp_list] - snp_results[snp] = plink.one_to_many( - snp, comp_snps, chrom, r2, populations, raise_on_error=False, - logfile=log - ) + for job in jobs: + snp_results[job[0]] = plink.one_to_many(*job) return snp_results +def _to_time(s): + """Convert seconds to 00:00:00 format.""" + m, s = divmod(s, 60) + h, m = divmod(m, 60) + return '{}:{:02}:{:02}'.format(h, m, s) + + ############################################################################### # Output Parsing Functions # ############################################################################### @@ -773,7 +866,8 @@ def pairs_to_SNP_Pairs(pairs, populations): def comp_snp_lists(list1, list2, populations=None, r2=0.6, distance='50kb', - plink='plink', return_dataframe=False, return_snppair=False, + plink='plink', lists_preparsed=False, + return_dataframe=False, return_snp_pairs=False, raise_on_error=False): """Compare two SNP lists, filter by r2. @@ -803,9 +897,12 @@ def comp_snp_lists(list1, list2, populations=None, r2=0.6, distance='50kb', (mega-bases) plink : str, optional Path to plink executable, otherwise searches PATH + lists_preparsed : bool, optional + If the input lists are already in three-tuples will all data acounted + for, skip the dbSNP phase. return_dataframe : bool, optional Return a dataframe instead of JSON - return_snppair : bool, optional + return_snp_pairs : bool, optional Return a dictionary of SNP_Link objects raise_on_error : bool, optional Raise an Exception if there is a problem with any of the SNPs in the @@ -814,7 +911,7 @@ def comp_snp_lists(list1, list2, populations=None, r2=0.6, distance='50kb', Returns ------- JSON : dict - If return_dataframe and return_snppairs are False, a json-style + If return_dataframe and return_snp_pairss are False, a json-style dictionary is returned with the format: {snp1: {chrom: chrX, loc: #, matches: {snp2: {loc: #, r2: float, dprime: float, phased: AT/GC, @@ -830,10 +927,15 @@ def comp_snp_lists(list1, list2, populations=None, r2=0.6, distance='50kb', If return_snplink is True, a dictionary of SNP_Link objects is returned {snp1: [SNP_Pair, SNP_Pair, ...]} """ - print('Getting SNP info for list1') - snps1 = list_to_rsid_and_locs(list1, raise_on_error) - print('Getting SNP info for list2') - snps2 = list_to_rsid_and_locs(list2, raise_on_error) + if not lists_preparsed: + print('Getting SNP info for list1') + list1 = list_to_rsid_and_locs(list1, raise_on_error) + print('Getting SNP info for list2') + list2 = list_to_rsid_and_locs(list2, raise_on_error) + + snps1 = snp_list_to_dict(list1) + snps2 = snp_list_to_dict(list2) + print('Getting all possible pairs within {}'.format(distance)) pairs = filter_by_distance(snps1, snps2, distance=distance) l = 0 @@ -845,7 +947,7 @@ def comp_snp_lists(list1, list2, populations=None, r2=0.6, distance='50kb', print('Done') if return_dataframe: return pairs_to_dataframe(pairs) - if return_snppair: + if return_snp_pairs: return pairs_to_SNP_Pairs(pairs, populations=populations) return pairs From 754df9184291c760e6df6ef9e4945fcf983da4d8 Mon Sep 17 00:00:00 2001 From: Mike Dacre Date: Fri, 12 May 2017 08:40:01 -0700 Subject: [PATCH 2/5] Working on plink changes --- LD_Direction/compare_snp_lists.py | 610 ++++++++++++++++++++++++------ 1 file changed, 493 insertions(+), 117 deletions(-) diff --git a/LD_Direction/compare_snp_lists.py b/LD_Direction/compare_snp_lists.py index 2246462..39752f4 100644 --- a/LD_Direction/compare_snp_lists.py +++ b/LD_Direction/compare_snp_lists.py @@ -106,96 +106,6 @@ 'filter_by_ld', 'save_list'] -############################################################################### -# Helper Functions # -############################################################################### - - -class MissingSNPError(Exception): - - """Exceception to catch missing SNPs.""" - - pass - - -class BadSNPError(Exception): - - """Exceception to catch missing SNPs.""" - - pass - - -def run(command, raise_on_error=False): - """Run a command with subprocess the way it should be. - - Parameters - ---------- - command : str - A command to execute, piping is fine. - raise_on_error : bool - Raise a subprocess.CalledProcessError on exit_code != 0 - - Returns - ------- - stdout : str - stderr : str - exit_code : int - """ - pp = _sub.Popen(command, shell=True, universal_newlines=True, - stdout=_sub.PIPE, stderr=_sub.PIPE) - out, err = pp.communicate() - code = pp.returncode - if raise_on_error and code != 0: - _sys.stderr.write( - '{}\nfailed with:\nCODE: {}\nSTDOUT:\n{}\nSTDERR:\n{}\n' - .format(command, code, out, err) - ) - raise _sub.CalledProcessError( - returncode=code, cmd=command, output=out, stderr=err - ) - return out, err, code - - -def _open_zipped(infile, mode='r'): - """Return file handle of file regardless of compressed or not. - - Also returns already opened files unchanged, text mode automatic for - compatibility with python2. - """ - # Return already open files - if hasattr(infile, 'write'): - return infile - # Make text mode automatic - if len(mode) == 1: - mode = mode + 't' - if not isinstance(infile, str): - raise ValueError("I cannot open a filename that isn't a string.") - if infile.endswith('.gz'): - return _gzip.open(infile, mode) - if infile.endswith('.bz2'): - if hasattr(_bz2, 'open'): - return _bz2.open(infile, mode) - else: - return _bz2.BZ2File(infile, mode) - return open(infile, mode) - - -def _chrom_sort(x): - """Return an integer for sorting from a chromosome.""" - if x.startswith('chr'): - x = x[3:] - if x.upper() == 'X': - return 100 - elif x.upper() == 'Y': - return 101 - elif x.upper().startswith('M'): - return 150 - elif x.isdigit(): - return int(x) - else: - return x - - ############################################################################### # Run plink jobs without repetative code # ############################################################################### @@ -203,25 +113,34 @@ def _chrom_sort(x): class PLINK(object): - """A reusable object to run plink jobs quickly.""" + """A reusable object to run plink jobs on 1000genomes files quickly.""" written_files = {} bims = {} - def __init__(self, pop_file=None, plink='plink'): + def __init__(self, data=None, pop_file=None, plink='plink'): """Load population information. Parameters ---------- + data : str, optional + Path the the 1000genomes data files + (e.g. ALL.chr16.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.{bed,bim}) + Default set in DATA_DIR hardcoded in this file. pop_file : str, optional Path to the 1000 genomes population file plink : str, optional Path to plink executable, otherwise searches PATH """ self.plink = plink + data = data if data else DATA_DIR + if not _os.path.isdir(data): + raise ValueError('{} must be a directory, but no directory found' + .format(data)) + self.path = _os.path.abspath(data) if not pop_file: pop_file = _os.path.join( - DATA_DIR, 'integrated_call_samples_v3.20130502.ALL.panel' + self.path, 'integrated_call_samples_v3.20130502.ALL.panel' ) individuals = {} with open(pop_file) as fin: @@ -232,6 +151,35 @@ def __init__(self, pop_file=None, plink='plink'): individuals[pop] = [] individuals[pop].append(ind) self.individuals = individuals + self.files = {} + for fl in _os.listdir(self.path): + if 'phase3_' not in fl: + continue + if not fl.endswith('bed'): + continue + if not fl.startswith('ALL'): + continue + chrom = fl.split('.')[1] + if not chrom.startswith('chr'): + continue + fl = _os.path.abspath(_os.path.join(self.path, fl)) + root = '.'.join(fl.split('.')[:-1]) + bed = _os.path.abspath('{}.bed'.format(root)) + bim = _os.path.abspath('{}.bim'.format(root)) + fam = _os.path.abspath('{}.fam'.format(root)) + assert _os.path.isfile(bed) + assert _os.path.isfile(bim) + assert _os.path.isfile(fam) + c1 = chrom if chrom.startswith('chr') else 'chr' + str(chrom) + c2 = c1[3:] + for c in [c1, c2]: + self.files[c] = { + 'root': root, + 'bed': bed, + 'bim': bim, + 'fam': fam, + } + print(self.files.keys()) def pop_file(self, populations=None, outfile=None): """Write temp file with a list of individuals in population.""" @@ -272,28 +220,224 @@ def pop_file(self, populations=None, outfile=None): self.written_files[','.join(populations)] = outfile return outfile - def bim_snps(self, chrom): - """Return and cache all SNPs in a bim file.""" - if chrom in self.bims: - with open(self.bims[chrom], 'rb') as fin: - return _pickle.load(fin) - bfile = _os.path.join( - DATA_DIR, - 'ALL.{}.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes' - .format(chrom) - ) - bim = bfile + '.bim' + def bim_file(self, chrom, snps, outfile=None): + """Filter a bim file to only include SNPs in snps. + + Parameters + ---------- + chrom : str + A chromosome name for the file to work on. + snps : list_of_tuple + A list of snps from list_to_rsid_and_locs(): + (rsid, chrom, loc) + outfile : str, optional + A file to write to. A tempfile will be created if not is provided. + + Returns + ------- + file_path : str + Absolute path to the newly created file. + """ + if not chrom in self.files: + raise ValueError( + '{} is not in our bim files:\n{}'.format( + chrom, + '\n'.join([i[' bim'] for i in self.files.values()]) + ) + ) + snps = [(str(name), int(loc)) for name, loc in snps] + if not outfile: + _, outfile = _temp( + prefix='filtered_bim.', suffix='.bim', dir='/tmp' + ) + outfile = _os.path.abspath(outfile) + with _open_zipped(outfile, 'w') as fout: + for c, name, cm, pos, a1, a2 in read_bim(self.files[chrom]['bim']): + if (name, pos) in snps: + fout.write('\t'.join([c, name, cm, str(pos), a1, a2]) + '\n') + return outfile + + def bim_snps(self, chrom, bim_file=None): + """Return and cache all SNPs in a bim file. + + Note: will only cache rsids in bim if no bim_file is given. + + Parameters + ---------- + chrom : str + bim_file : str, optional + Bim file to work on, otherwise the chromosome is used to pick the + complete file. + + Returns + ------- + rsids : frozenset + """ + if not bim_file: + if chrom in self.bims: + with open(self.bims[chrom], 'rb') as fin: + return _pickle.load(fin) + try: + bim_file = self.files[chrom]['bim'] + except KeyError: + raise KeyError('{} is not in the bim file list'.format(chrom)) + _, pfl = _temp() + else: + pfl = None rsids = [] - with open(bim) as fin: + with open(bim_file) as fin: for line in fin: - rsids.append(line.split('\t')[1]) + if not line.strip(): + continue + rsids.append(line.strip().split('\t')[1]) rsids = frozenset(rsids) - _, pfl = _temp() - with open(pfl, 'wb') as fout: - _pickle.dump(rsids, fout) - self.bims[chrom] = pfl + if pfl: + with open(pfl, 'wb') as fout: + _pickle.dump(rsids, fout) + self.bims[chrom] = pfl return rsids + def many_to_many(self, snps, comp_list, chrom, r2=0.6, populations=None, + fbim_file=None, outfile=None, raise_on_error=False, + logfile=_sys.stderr): + """Get many-to-many LD information using plink. + + Will do a single lookup for all SNPs in snps using plink on a filtered + bim file generated to only contain the SNPs in comp_list. + + Parameters + ---------- + snps : list_of_tuple + list of rsIDs to query in the format: + (rsid, loc) (from pairs_to_lists()) + comp_list : list_of_tuple + list of rsIDs to compare to in the format: + (rsid, loc) (from pairs_to_lists()) + chrom : str + which chromosome to search + r2 : float, optional + r-squared level to use for filtering + populations : list_of_str, optional + list of populations to include in the analysis + fbim_file : str, optional + File to write the filtered bims to, if not provided a temp file + will be used and deleted. + outfile : str, optional + A file root for plink to write to, output will have '.ld' appended + to it. If not provided, temp file used and deleted after use. + raise_on_error : bool, optional + if False, will return None if primary SNP missing from bim file + logfile : filehandle, optional + A file like object to write to + + Returns + ------- + matching_snps : dict + For every matching SNP that beats the r-squared: { + snp: {r2: r-squared, dprime: d-prime, phased: phased-alleles} + } + If plink job fails, returns an empty dictionary. + """ + if not snps or not comp_list: + raise ValueError('snps and comp_list cannot be null') + snps = set(snps) + comp_list = set(comp_list) + all_snps = snps | comp_list + bfile = self.bim_file(chrom, all_snps, fbim_file) + bsnps = self.bim_snps(chrom, bfile) + good = [] + bad = [] + rsids = {} + for rsid, loc in snps: + if rsid in bsnps: + good.append(rsid) + rsids[rsid] = int(loc) + else: + bad.append(rsid) + if bad: + _sys.stderr.write( + 'The following SNPs are not in the bim file and cannot be ' + + 'queried:\n{}\n'.format(bad) + ) + del(bsnps) + # Initialize necessary files + bed = self.files[chrom]['bed'] + fam = self.files[chrom]['fam'] + pop_file = self.pop_file(populations) + if outfile: + del_file = False + else: + _, outfile = _temp(prefix='plink', dir='/tmp') + del_file = True + _, snp_file = _temp(prefix='plinksnps', dir='/tmp') + with open(snp_file, 'w') as fout: + fout.write('\n'.join(sorted([snp for snp, _ in comp_list]))) + + # Build the command + plink_cmnd = ( + '{plink} --bed {bed} --bim {bim} --fam {fam} --r2 in-phase dprime ' + '--ld-snp-list {snp_file} --keep {ind_file} --out {out}' + ).format( + plink=self.plink, + bed=bed, bim=bfile, fam=fam, snp_file=snp_file, + ind_file=pop_file, out=outfile + ) + print(plink_cmnd) + + # Run it + stdout, stderr, code = run(plink_cmnd, raise_on_error) + # Parse the output file + if code != 0: + logfile.write( + '{}: plink command failed'.format(chrom) + + 'Command: {}\nExit Code: {}\nSTDOUT:\n{}\bSTDERR:\n{}\n' + .format(plink_cmnd, code, stdout, stderr) + ) + return {} + + results = {} + with open(outfile + '.ld') as fin: + # Check header + line = fin.readline().strip() + assert _re.split(r' +', line) == [ + 'CHR_A', 'BP_A', 'SNP_A', 'CHR_B', 'BP_B', + 'SNP_B', 'PHASE', 'R2', 'DP' + ] + for line in fin: + f = _re.split(r' +', line.strip()) + snp1, snp2, phased = f[2], f[5], f[6] + loc1, loc2 = int(f[1]), int(f[4]) + rsquared, dprime = float(f[7]), float(f[8]) + if snp1 not in good: + continue + if loc1 != rsids[snp1]: + continue + if snp1 not in results: + results[snp1] = { + 'chrom': chrom, 'loc': loc1, 'matches': {} + } + if rsquared < r2: + continue + try: + p1, p2 = phased.split('/') + s1a, s2a = p1 + s1b, s2b = p2 + lookup = {snp1: {s1a: s2a, s1b: s2b}, + snp2: {s2a: s1a, s2b: s1b}} + except ValueError: + lookup = {} + results[snp1]['matches'][snp2] = { + 'r2': rsquared, 'dprime': dprime, 'loc': loc2, + 'phased': phased, 'lookup': lookup + } + + if del_file: + _os.remove(outfile + '.ld') + + return results + + + def one_to_many(self, snp, comp_list, chrom, r2=0.6, populations=None, raise_on_error=False, logfile=_sys.stderr): """Get one-to-many LD information using plink. @@ -405,6 +549,7 @@ def one_to_many(self, snp, comp_list, chrom, r2=0.6, populations=None, return results + ############################################################################### # dbSNP Connection # ############################################################################### @@ -639,7 +784,7 @@ def filter_by_distance(ref_snps, comp_snps, distance='50kb'): if mod: if not mod in ['kb', 'mb']: raise ValueError('Cannot parse {}, must be in kb or mb only' - .format(distance)) + .format(distance)) if mod == 'kb': dist = dist * 1000 elif mod == 'mb': @@ -655,9 +800,73 @@ def filter_by_distance(ref_snps, comp_snps, distance='50kb'): def filter_by_ld(pairs, r2=0.6, populations=None, plink='plink', - cluster_jobs=100, force_local=False, **cluster_opts): + data_dir=None): """Use plink to lookup LD and filter lists by the r2. + One job per chromosome. + + Parameters + ---------- + pairs : dict + The output of filter_by_distance() + r2 : float + An r-squared cutoff to use for filtering + distance : str_or_int + A distance away from a SNP in list1 to consider SNPs in list2, can be + a integer of bases or a string with a suffix kb (kilo-bases) or mb + (mega-bases) + plink : str, optional + Path to plink executable, otherwise searches PATH + data_dir : str, optional + Path to directory containing 1000genomes data, default set in DATA_DIR + hardcoded into this file. + + Returns + ------- + pairs : dict + snp: {chrom: chrom, loc: loc, matches: { + snp2: {rsquared: r2, dprime: d', loc: loc2} + """ + plink = PLINK(data=data_dir, plink=plink) + # Initialize the population file + plink.pop_file(populations, outfile='.'.join(populations) + '.pops.txt') + l = 0 + for v in pairs.values(): + for i in [i[2] for i in v if i[2]]: + l += len(i) + multi = True if l > 200 else False + + # Run plink jobs + results = {} + for chrom in sorted(pairs, key=_chrom_sort): + data = pairs[chrom] + snps, comp_list = pairs_to_lists(data) + results.update( + plink.many_to_many( + snps, comp_list, chrom, r2=r2, populations=populations + ) + ) + + # Add all data back again + filtered = {} + for chrom, data in pairs.items(): + for snp, loc, comp_list in data: + filtered[snp] = {'loc': loc, 'chrom': chrom} + if snp in results: + if results[snp]: + filtered[snp]['matches'] = {} + for snp2, pos2 in comp_list: + if snp2 in results[snp]: + filtered[snp]['matches'][snp2] = results[snp][snp2] + filtered[snp]['matches'][snp2]['loc'] = pos2 + return filtered + + +def filter_by_ld_one_by_one(pairs, r2=0.6, populations=None, plink='plink', + cluster_jobs=100, force_local=False, + **cluster_opts): + """Use plink to lookup LD and filter lists by the r2 with one job per snp. + Parameters ---------- r2 : float @@ -867,6 +1076,7 @@ def pairs_to_SNP_Pairs(pairs, populations): def comp_snp_lists(list1, list2, populations=None, r2=0.6, distance='50kb', plink='plink', lists_preparsed=False, + dbsnp_loc=None, dbsnp_ver=None, data_dir=None, return_dataframe=False, return_snp_pairs=False, raise_on_error=False): """Compare two SNP lists, filter by r2. @@ -900,6 +1110,12 @@ def comp_snp_lists(list1, list2, populations=None, r2=0.6, distance='50kb', lists_preparsed : bool, optional If the input lists are already in three-tuples will all data acounted for, skip the dbSNP phase. + dbsnp_loc : str, optional + Path to dbSNP sqlite database files + dbsnp_ver : int : optional + Version of dbSNP to use + data_dir : str, optional + Path to 1000genomes files return_dataframe : bool, optional Return a dataframe instead of JSON return_snp_pairs : bool, optional @@ -943,7 +1159,8 @@ def comp_snp_lists(list1, list2, populations=None, r2=0.6, distance='50kb', for i in [i[2] for i in v]: l += len(i) print('Running plink filters on {} calculations'.format(l)) - pairs = filter_by_ld(pairs, r2, populations=populations, plink=plink) + pairs = filter_by_ld(pairs, r2, populations=populations, plink=plink, + data_dir=data_dir) print('Done') if return_dataframe: return pairs_to_dataframe(pairs) @@ -952,6 +1169,148 @@ def comp_snp_lists(list1, list2, populations=None, r2=0.6, distance='50kb', return pairs +############################################################################### +# Helper Functions # +############################################################################### + + +class MissingSNPError(Exception): + + """Exceception to catch missing SNPs.""" + + pass + + +class BadSNPError(Exception): + + """Exceception to catch missing SNPs.""" + + pass + + +def pairs_to_lists(pairs): + """Convert list of pairs into to two tuple lists. + + Removes any SNPs with an empty match list. + + Parameters + ---------- + pairs : list + From filter_by_distance(): + (rsid, loc, set((rsid, loc))) + + Returns + ------- + snp_list : list_of_tuple + comp_list : list_of_tuple + (rsid, loc) + """ + query = [] + comp = [] + for snp, loc, matches in pairs: + if not matches: + continue + query.append((snp, loc)) + for snp2, loc2 in matches: + comp.append((snp2, loc2)) + return set(query), set(comp) + + +def run(command, raise_on_error=False): + """Run a command with subprocess the way it should be. + + Parameters + ---------- + command : str + A command to execute, piping is fine. + raise_on_error : bool + Raise a subprocess.CalledProcessError on exit_code != 0 + + Returns + ------- + stdout : str + stderr : str + exit_code : int + """ + pp = _sub.Popen(command, shell=True, universal_newlines=True, + stdout=_sub.PIPE, stderr=_sub.PIPE) + out, err = pp.communicate() + code = pp.returncode + if raise_on_error and code != 0: + _sys.stderr.write( + '{}\nfailed with:\nCODE: {}\nSTDOUT:\n{}\nSTDERR:\n{}\n' + .format(command, code, out, err) + ) + raise _sub.CalledProcessError( + returncode=code, cmd=command, output=out, stderr=err + ) + return out, err, code + + +def read_bim(bim_file): + """Yields a tuple for each line in bim_file. + + Parameters + ---------- + bim_file : str + Path to a bim file, can be zipped or an open file handle. + + Yields + ------ + chromsome : str + name : str + cm : str + Position in centimorgans, usually 0 + position : int + allele_1 : str + allele_2 : str + """ + with _open_zipped(bim_file) as fin: + for line in fin: + chrom, name, cm, loc, a1, a2 = line.split('\t') + yield chrom, name, cm, int(loc), a1, a2 + + +def _open_zipped(infile, mode='r'): + """Return file handle of file regardless of compressed or not. + + Also returns already opened files unchanged, text mode automatic for + compatibility with python2. + """ + # Return already open files + if hasattr(infile, 'write'): + return infile + # Make text mode automatic + if len(mode) == 1: + mode = mode + 't' + if not isinstance(infile, str): + raise ValueError("I cannot open a filename that isn't a string.") + if infile.endswith('.gz'): + return _gzip.open(infile, mode) + if infile.endswith('.bz2'): + if hasattr(_bz2, 'open'): + return _bz2.open(infile, mode) + else: + return _bz2.BZ2File(infile, mode) + return open(infile, mode) + + +def _chrom_sort(x): + """Return an integer for sorting from a chromosome.""" + if x.startswith('chr'): + x = x[3:] + if x.upper() == 'X': + return 100 + elif x.upper() == 'Y': + return 101 + elif x.upper().startswith('M'): + return 150 + elif x.isdigit(): + return int(x) + else: + return x + + ############################################################################### # Run as a Script # ############################################################################### @@ -1011,7 +1370,8 @@ def comp_snp_lists(list1, list2, populations=None, r2=0.6, distance='50kb', rsid, chromosome, location (base-1) """ -def get_arg_parser(): + +def argument_parser(): """Create an argument parser.""" parser = _argparse.ArgumentParser( description=__doc__, #usage=general_usage, @@ -1046,6 +1406,18 @@ def get_arg_parser(): filtering.add_argument('--distance', default='50kb', help='Max distance to consider LD (50kb)') + # Optional files + paths = analyze.add_argument_group( + 'data_paths', + 'Paths to 1000genomes and dbSNP' + ) + paths.add_argument('--dbsnp-path', default=DB_PATH, + help='Path to dbSNP directory') + paths.add_argument('--dbsnp-ver', default=DB_VERS, + help='dbSNP version to use') + paths.add_argument('--1000genomes', default=DATA_DIR, + help='1000genomes data files') + # Optional flags analyze.add_argument('-p', '--pandas', action="store_true", help="Write file as a pandas DataFrame instead.") @@ -1075,7 +1447,7 @@ def main(argv=None): if not argv: argv = _sys.argv[1:] - parser = get_arg_parser() + parser = argument_parser() args = parser.parse_args(argv) @@ -1105,8 +1477,12 @@ def main(argv=None): else: pops = None - out = comp_snp_lists(snp_list, comp_list, pops, r2=float(args.r2), - distance=args.distance, return_dataframe=True) + out = comp_snp_lists( + snp_list, comp_list, pops, + r2=float(args.r2), + distance=args.distance, + return_dataframe=True, + ) if args.pandas: out.to_pickle(args.outfile) From d53893836b69f1b6d61512ce30fbdef6e56d045a Mon Sep 17 00:00:00 2001 From: Mike Dacre Date: Fri, 12 May 2017 22:02:53 -0700 Subject: [PATCH 3/5] Plink many-to-many is now 100% functional --- LD_Direction/compare_snp_lists.py | 922 +++++++++++++++++++++++------- 1 file changed, 712 insertions(+), 210 deletions(-) diff --git a/LD_Direction/compare_snp_lists.py b/LD_Direction/compare_snp_lists.py index 6739fec..e18e786 100644 --- a/LD_Direction/compare_snp_lists.py +++ b/LD_Direction/compare_snp_lists.py @@ -51,12 +51,14 @@ import os as _os import re as _re import sys as _sys +import sqlite3 as _sqlite3 import pickle as _pickle import subprocess as _sub import argparse as _argparse import multiprocessing as _mp import bz2 as _bz2 import gzip as _gzip +from time import sleep as _sleep from tempfile import mkstemp as _temp try: @@ -87,7 +89,9 @@ from .distance_filtering import distance_filter -DB_PATH = '/godot/dbsnp' +MIN_BUNDLE = 150 # Minimum number of jobs per node, 150 takes less than a minute. + +DB_PATH = '/godot/dbsnp/db/hg19' DB_VERS = 150 POPULATIONS = ["ALL", "AFR", "AMR", "EAS", "EUR", "SAS", "ACB", "ASW", "BEB", @@ -102,96 +106,6 @@ 'filter_by_ld', 'save_list'] -############################################################################### -# Helper Functions # -############################################################################### - - -class MissingSNPError(Exception): - - """Exceception to catch missing SNPs.""" - - pass - - -class BadSNPError(Exception): - - """Exceception to catch missing SNPs.""" - - pass - - -def run(command, raise_on_error=False): - """Run a command with subprocess the way it should be. - - Parameters - ---------- - command : str - A command to execute, piping is fine. - raise_on_error : bool - Raise a subprocess.CalledProcessError on exit_code != 0 - - Returns - ------- - stdout : str - stderr : str - exit_code : int - """ - pp = _sub.Popen(command, shell=True, universal_newlines=True, - stdout=_sub.PIPE, stderr=_sub.PIPE) - out, err = pp.communicate() - code = pp.returncode - if raise_on_error and code != 0: - _sys.stderr.write( - '{}\nfailed with:\nCODE: {}\nSTDOUT:\n{}\nSTDERR:\n{}\n' - .format(command, code, out, err) - ) - raise _sub.CalledProcessError( - returncode=code, cmd=command, output=out, stderr=err - ) - return out, err, code - - -def _open_zipped(infile, mode='r'): - """Return file handle of file regardless of compressed or not. - - Also returns already opened files unchanged, text mode automatic for - compatibility with python2. - """ - # Return already open files - if hasattr(infile, 'write'): - return infile - # Make text mode automatic - if len(mode) == 1: - mode = mode + 't' - if not isinstance(infile, str): - raise ValueError("I cannot open a filename that isn't a string.") - if infile.endswith('.gz'): - return _gzip.open(infile, mode) - if infile.endswith('.bz2'): - if hasattr(_bz2, 'open'): - return _bz2.open(infile, mode) - else: - return _bz2.BZ2File(infile, mode) - return open(infile, mode) - - -def _chrom_sort(x): - """Return an integer for sorting from a chromosome.""" - if x.startswith('chr'): - x = x[3:] - if x.upper() == 'X': - return 100 - elif x.upper() == 'Y': - return 101 - elif x.upper().startswith('M'): - return 150 - elif x.isdigit(): - return int(x) - else: - return x - - ############################################################################### # Run plink jobs without repetative code # ############################################################################### @@ -199,25 +113,34 @@ def _chrom_sort(x): class PLINK(object): - """A reusable object to run plink jobs quickly.""" + """A reusable object to run plink jobs on 1000genomes files quickly.""" written_files = {} bims = {} - def __init__(self, pop_file=None, plink='plink'): + def __init__(self, data=None, pop_file=None, plink='plink'): """Load population information. Parameters ---------- + data : str, optional + Path the the 1000genomes data files + (e.g. ALL.chr16.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.{bed,bim}) + Default set in DATA_DIR hardcoded in this file. pop_file : str, optional Path to the 1000 genomes population file plink : str, optional Path to plink executable, otherwise searches PATH """ self.plink = plink + data = data if data else DATA_DIR + if not _os.path.isdir(data): + raise ValueError('{} must be a directory, but no directory found' + .format(data)) + self.path = _os.path.abspath(data) if not pop_file: pop_file = _os.path.join( - DATA_DIR, 'integrated_call_samples_v3.20130502.ALL.panel' + self.path, 'integrated_call_samples_v3.20130502.ALL.panel' ) individuals = {} with open(pop_file) as fin: @@ -228,13 +151,45 @@ def __init__(self, pop_file=None, plink='plink'): individuals[pop] = [] individuals[pop].append(ind) self.individuals = individuals - - def pop_file(self, populations=None): + self.files = {} + for fl in _os.listdir(self.path): + if 'phase3_' not in fl: + continue + if not fl.endswith('bed'): + continue + if not fl.startswith('ALL'): + continue + chrom = fl.split('.')[1] + if not chrom.startswith('chr'): + continue + fl = _os.path.abspath(_os.path.join(self.path, fl)) + root = '.'.join(fl.split('.')[:-1]) + bed = _os.path.abspath('{}.bed'.format(root)) + bim = _os.path.abspath('{}.bim'.format(root)) + fam = _os.path.abspath('{}.fam'.format(root)) + assert _os.path.isfile(bed) + assert _os.path.isfile(bim) + assert _os.path.isfile(fam) + c1 = chrom if chrom.startswith('chr') else 'chr' + str(chrom) + c2 = c1[3:] + for c in [c1, c2]: + self.files[c] = { + 'root': root, + 'bed': bed, + 'bim': bim, + 'fam': fam, + } + + def pop_file(self, populations=None, outfile=None): """Write temp file with a list of individuals in population.""" populations = populations if populations else self.individuals.keys() if isinstance(populations, str): populations = [populations] populations = list(populations) + if outfile and _os.path.isfile(outfile): + outfile = _os.path.abspath(outfile) + self.written_files[','.join(populations)] = outfile + return outfile if ','.join(populations) in self.written_files: fl = self.written_files[','.join(populations)] if _os.path.isfile(fl): @@ -255,37 +210,249 @@ def pop_file(self, populations=None): pop_ids = sorted(set(pop_ids)) - _, file_location = _temp(prefix='-'.join(populations), dir='/tmp') - with open(file_location, 'w') as outfile: - outfile.write('\n'.join( + if not outfile: + _, outfile = _temp(prefix='-'.join(populations), dir='/tmp') + with open(outfile, 'w') as fout: + fout.write('\n'.join( [' '.join(x) for x in zip(pop_ids, pop_ids)] )) - self.written_files[','.join(populations)] = file_location - return file_location + self.written_files[','.join(populations)] = outfile + return outfile - def bim_snps(self, chrom): - """Return and cache all SNPs in a bim file.""" - if chrom in self.bims: - with open(self.bims[chrom], 'rb') as fin: - return _pickle.load(fin) - bfile = _os.path.join( - DATA_DIR, - 'ALL.{}.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes' - .format(chrom) - ) - bim = bfile + '.bim' + def bim_file(self, chrom, snps, outfile=None): + """Filter a bim file to only include SNPs in snps. + + Parameters + ---------- + chrom : str + A chromosome name for the file to work on. + snps : list_of_tuple + A list of snps from list_to_rsid_and_locs(): + (rsid, chrom, loc) + outfile : str, optional + A file to write to. A tempfile will be created if not is provided. + + Returns + ------- + file_path : str + Absolute path to the newly created file. + """ + if not chrom in self.files: + raise ValueError( + '{} is not in our bim files:\n{}'.format( + chrom, + '\n'.join([i[' bim'] for i in self.files.values()]) + ) + ) + snps = [(str(name), int(loc)) for name, loc in snps] + if not outfile: + _, outfile = _temp( + prefix='filtered_bim.', suffix='.bim', dir='/tmp' + ) + outfile = _os.path.abspath(outfile) + with _open_zipped(outfile, 'w') as fout: + for c, name, cm, pos, a1, a2 in read_bim(self.files[chrom]['bim']): + if (name, pos) in snps: + fout.write('\t'.join([c, name, cm, str(pos), a1, a2]) + '\n') + return outfile + + def bim_snps(self, chrom, bim_file=None): + """Return and cache all SNPs in a bim file. + + Note: will only cache rsids in bim if no bim_file is given. + + Parameters + ---------- + chrom : str + bim_file : str, optional + Bim file to work on, otherwise the chromosome is used to pick the + complete file. + + Returns + ------- + rsids : frozenset + """ + if not bim_file: + if chrom in self.bims: + with open(self.bims[chrom], 'rb') as fin: + return _pickle.load(fin) + try: + bim_file = self.files[chrom]['bim'] + except KeyError: + raise KeyError('{} is not in the bim file list'.format(chrom)) + _, pfl = _temp() + else: + pfl = None rsids = [] - with open(bim) as fin: + with open(bim_file) as fin: for line in fin: - rsids.append(line.split('\t')[1]) + if not line.strip(): + continue + rsids.append(line.strip().split('\t')[1]) rsids = frozenset(rsids) - _, pfl = _temp() - with open(pfl, 'wb') as fout: - _pickle.dump(rsids, fout) - self.bims[chrom] = pfl + if pfl: + with open(pfl, 'wb') as fout: + _pickle.dump(rsids, fout) + self.bims[chrom] = pfl return rsids + def many_to_many(self, snps, comp_list, chrom, r2=0.6, populations=None, + window_size=50000, fbim_file=None, outfile=None, + keep_int_files=False, raise_on_error=False, + logfile=_sys.stderr): + """Get many-to-many LD information using plink. + + Will do a single lookup for all SNPs in snps using plink on a filtered + bim file generated to only contain the SNPs in comp_list. + + Parameters + ---------- + snps : list_of_tuple + list of rsIDs to query in the format: + (rsid, loc) (from pairs_to_lists()) + comp_list : list_of_tuple + list of rsIDs to compare to in the format: + (rsid, loc) (from pairs_to_lists()) + chrom : str + which chromosome to search + r2 : float, optional + r-squared level to use for filtering + populations : list_of_str, optional + list of populations to include in the analysis + window_size : int, optional + Integer number for window size, default 50kb. + fbim_file : str, optional + File to write the filtered bims to, if not provided a temp file + will be used and deleted. + outfile : str, optional + A file root for plink to write to, output will have '.ld' appended + to it. If not provided, temp file used and deleted after use. + keep_int_files : bool, optional + Do not delete intermediate SNP files + raise_on_error : bool, optional + if False, will return None if primary SNP missing from bim file + logfile : filehandle, optional + A file like object to write to + + Returns + ------- + matching_snps : dict + For every matching SNP that beats the r-squared: { + snp: {r2: r-squared, dprime: d-prime, phased: phased-alleles} + } + If plink job fails, returns an empty dictionary. + """ + window_size = get_length(window_size) + if not snps or not comp_list: + raise ValueError('snps and comp_list cannot be null') + snps = set(snps) + comp_list = set(comp_list) + bsnps = self.bim_snps(chrom) + all_snps = [i for i, _ in snps | comp_list if i in bsnps] + good = [] + bad = [] + rsids = {} + for rsid, loc in snps: + if rsid in bsnps: + good.append(rsid) + rsids[rsid] = int(loc) + else: + bad.append(rsid) + if bad: + _sys.stderr.write( + 'The following SNPs are not in the bim file and cannot be ' + + 'queried:\n{}\n'.format(bad) + ) + del(bsnps) + # Initialize necessary files + bfile = self.files[chrom]['root'] + pop_file = self.pop_file(populations) + if outfile: + del_file = False + else: + _, outfile = _temp(prefix='plink', dir='/tmp') + _os.remove(outfile) + del_file = True + _, snp_file = _temp(prefix='plinksnps', dir='/tmp') + with open(snp_file, 'w') as fout: + fout.write('\n'.join(sorted([snp for snp, _ in snps])) + '\n') + _, comp_file = _temp(prefix='plinkcomp', dir='/tmp') + with open(comp_file, 'w') as fout: + fout.write('\n'.join(sorted(all_snps)) + '\n') + + # Build the command + plink_cmnd = ( + '{plink} --bfile {bfile} --r2 in-phase dprime ' + '--ld-snp-list {snp_file} --extract {comp_file} ' + '--ld-window {window} --keep {ind_file} --out {out}' + ).format( + plink=self.plink, bfile=bfile, window=window_size, + snp_file=snp_file, comp_file=comp_file, ind_file=pop_file, + out=outfile + ) + + # Run it + stdout, stderr, code = run(plink_cmnd, raise_on_error) + # Parse the output file + if code != 0: + logfile.write( + '{}: plink command failed'.format(chrom) + + 'Command: {}\nExit Code: {}\nSTDOUT:\n{}\bSTDERR:\n{}\n' + .format(plink_cmnd, code, stdout, stderr) + ) + return {} + + results = {} + with open(outfile + '.ld') as fin: + # Check header + line = fin.readline().strip() + assert _re.split(r' +', line) == [ + 'CHR_A', 'BP_A', 'SNP_A', 'CHR_B', 'BP_B', + 'SNP_B', 'PHASE', 'R2', 'DP' + ] + for line in fin: + f = _re.split(r' +', line.strip()) + snp1, snp2, phased = f[2], f[5], f[6] + loc1, loc2 = int(f[1]), int(f[4]) + rsquared, dprime = float(f[7]), float(f[8]) + if snp1 not in good: + continue + if snp1 == snp2: + continue + if loc1 != rsids[snp1]: + continue + if rsquared < r2: + continue + if snp1 not in results: + results[snp1] = { + 'chrom': chrom, 'loc': loc1, 'matches': {} + } + try: + p1, p2 = phased.split('/') + s1a, s2a = p1 + s1b, s2b = p2 + lookup = {snp1: {s1a: s2a, s1b: s2b}, + snp2: {s2a: s1a, s2b: s1b}} + except ValueError: + lookup = {} + results[snp1]['matches'][snp2] = { + 'r2': rsquared, 'dprime': dprime, 'loc': loc2, + 'phased': phased, 'lookup': lookup + } + + if del_file: + pass + for fl in [_os.path.join('/tmp', f) for f in _os.listdir('/tmp')]: + if fl.startswith(outfile): + _os.remove(fl) + if not keep_int_files: + _os.remove(snp_file) + _os.remove(comp_file) + + return results + + def one_to_many(self, snp, comp_list, chrom, r2=0.6, populations=None, raise_on_error=False, logfile=_sys.stderr): """Get one-to-many LD information using plink. @@ -411,6 +578,10 @@ def list_to_rsid_and_locs(list1, raise_on_missing=False): Uses a single large dbSNP lookup to create a dictionary and then parses the two lists. + If more than 5000 items in rsid list, creates a temp table and joins it. + + Requires about 25 minutes for 250,000 rsIDs. + Parameters ---------- list1 : list_of_str @@ -466,13 +637,22 @@ def list_to_rsid_and_locs(list1, raise_on_missing=False): if list1_rs: _sys.stderr.write('Doing rsID DB lookup.\n') - db = _dbSNP.DB(DB_PATH, DB_VERS) + if len(list1_rs) > 5000: + conn = _sqlite3.connect(DB_PATH + '/dbsnp{}.db'.format(DB_VERS)) + conn.execute("CREATE TEMP TABLE 'temp' (name)") + conn.executemany("INSERT INTO temp (name) VALUES (?)", [(i,) for i in list1_rs]) + q = conn.execute("SELECT dbSNP.name, dbSNP.chrom, dbSNP.start, dbSNP.end FROM dbSNP INNER JOIN temp ON dbSNP.name=temp.name").fetchall() + else: + db = _dbSNP.DB(DB_PATH, DB_VERS) + q = [] + for chunk in [list1_rs[i:i + 990] for i in range(0, len(list1_rs), 990)]: + q += db.query( + db.Row.name, db.Row.chrom, db.Row.start, db.Row.end + ).filter( + db.Row.name.in_(chunk) + ).all() rs_lookup = { - i[0]: (i[0], i[1], i[2]+1) for i in db.query( - db.Row.name, db.Row.chrom, db.Row.start, db.Row.end - ).filter( - db.Row.name.in_(list1_rs) - ) + i[0]: (i[0], i[1], i[2]+1) for i in q if not i[3]-i[2] > 1 } @@ -483,7 +663,7 @@ def list_to_rsid_and_locs(list1, raise_on_missing=False): for chrom, loc in [i.split(':') for i in list1_locs]: if chrom not in query: query[chrom] = [] - query[chrom].append(int(loc-1)) + query[chrom].append(int(loc)-1) loc_lookup = { '{}:{}'.format(i.chrom, i.start+1): (i.name, i.chrom, i.start+1) for i in db.lookup_locations(query) @@ -491,7 +671,7 @@ def list_to_rsid_and_locs(list1, raise_on_missing=False): } failed = [] - results = {} + results = [] done = [] for snp in list1: if isinstance(snp, str) and '\t' in snp: @@ -511,12 +691,10 @@ def list_to_rsid_and_locs(list1, raise_on_missing=False): failed.append(snp) continue s_rs, s_chrom, s_loc = loc_lookup[snp] - if s_chrom not in results: - results[s_chrom] = [] if s_rs in done: continue done.append(s_rs) - results[s_chrom].append((s_rs, s_loc)) + results.append((s_rs, s_chrom, s_loc)) if failed: err = '{} not in dbSNP'.format(snp) if raise_on_missing: @@ -526,6 +704,16 @@ def list_to_rsid_and_locs(list1, raise_on_missing=False): return results +def snp_list_to_dict(list1): + """Convert a list from list_to_rsid_and_locs() to a chrom dict.""" + results = {} + for s_rs, s_chrom, s_loc in list1: + if s_chrom not in results: + results[s_chrom] = [] + results[s_chrom].append((s_rs, int(s_loc))) + return results + + def save_list(snp_list, outfile=None, pickle=False): """Convert a list of rsIDs or positions to a pre-processed format. @@ -555,21 +743,17 @@ def save_list(snp_list, outfile=None, pickle=False): _sys.stderr.write('Parsing input list.\n') parsed_list = list_to_rsid_and_locs(snp_list) _sys.stderr.write('Done.\n') - output = [] - for chrom, info in parsed_list.items(): - for rsid, loc in info: - output.append((rsid, chrom, loc)) if outfile: _sys.stderr.write('Writing output.\n') if pickle: with _open_zipped(outfile, 'wb') as fout: - _pickle.dump(output, fout) + _pickle.dump(parsed_list, fout) else: with _open_zipped(outfile, 'w') as fout: - for row in output: + for row in parsed_list: rsid, chrom, loc = row fout.write('{}\t{}\t{}\n'.format(rsid, chrom, loc)) - return output + return parsed_list @@ -604,29 +788,72 @@ def filter_by_distance(ref_snps, comp_snps, distance='50kb'): } } """ - # Calculate distance - if isinstance(distance, str): - dist, mod = _re.match(r'(\d+)(\D+)', '50kb').groups() - dist = int(dist) - mod = mod.lower() - if not mod in ['kb', 'mb']: - raise ValueError('Cannot parse {}, must be in kb or mb only' - .format(distance)) - if mod == 'kb': - dist = dist * 1000 - elif mod == 'mb': - dist = dist * 1000000 - + distance = get_length(distance) results = {} for chrom, snps in ref_snps.items(): - results[chrom] = distance_filter(snps, comp_snps[chrom], dist) - + if chrom not in comp_snps: + continue + results[chrom] = distance_filter(snps, comp_snps[chrom], distance) return results -def filter_by_ld(pairs, r2=0.6, populations=None, plink='plink'): +def filter_by_ld(pairs, r2=0.6, window_size=50000, populations=None, + plink='plink', data_dir=None): """Use plink to lookup LD and filter lists by the r2. + One job per chromosome. + + Parameters + ---------- + pairs : dict + The output of filter_by_distance() + r2 : float + An r-squared cutoff to use for filtering + window_size : int + A distance away from a SNP in list1 to consider SNPs in list2, must be + a integer of bases + plink : str, optional + Path to plink executable, otherwise searches PATH + data_dir : str, optional + Path to directory containing 1000genomes data, default set in DATA_DIR + hardcoded into this file. + + Returns + ------- + pairs : dict + snp: {chrom: chrom, loc: loc, matches: { + snp2: {rsquared: r2, dprime: d', loc: loc2} + """ + window_size = get_length(window_size) + plink = PLINK(data=data_dir, plink=plink) + # Initialize the population file + plink.pop_file(populations, outfile='.'.join(populations) + '.pops.txt') + l = 0 + for v in pairs.values(): + for i in [i[2] for i in v if i[2]]: + l += len(i) + multi = True if l > 200 else False + + # Run plink jobs + results = {} + for chrom in sorted(pairs, key=_chrom_sort): + data = pairs[chrom] + snps, comp_list = pairs_to_lists(data) + results.update( + plink.many_to_many( + snps, comp_list, chrom, r2=r2, populations=populations, + window_size=window_size + ) + ) + + return results + + +def filter_by_ld_one_by_one(pairs, r2=0.6, populations=None, plink='plink', + cluster_jobs=100, force_local=False, + **cluster_opts): + """Use plink to lookup LD and filter lists by the r2 with one job per snp. + Parameters ---------- r2 : float @@ -637,80 +864,144 @@ def filter_by_ld(pairs, r2=0.6, populations=None, plink='plink'): (mega-bases) plink : str, optional Path to plink executable, otherwise searches PATH + cluster_jobs : int, optional + Number of jobs to run on the cluster if cluster running is enabled, if + fyrd is not enabled, this ends up being the number of local cores (in + that case, the max is the result of multiprocessing.cpu_count()). + force_local : bool, optional + If True, disallow running jobs on the cluster with fyrd + cluster_opts : dict, optional + Optional keyword arguments to pass to fyrd """ plink = PLINK(plink=plink) + plink.pop_file(populations, outfile=','.join(populations) + '.pops.txt') l = 0 for v in pairs.values(): - for i in [i[2] for i in v]: + for i in [i[2] for i in v if i[2]]: l += len(i) - multi = True if l > 200 else False - if multi: - jobs = {} - if _fyrd and _fyrd.queue.MODE != 'local': + multi = True if l > 200 else False # 200 jobs take 50 seconds + multi = False + if multi and cluster_jobs > 1: + # Set up cluster running + if _fyrd and _fyrd.queue.MODE != 'local' and not force_local: print('Running jobs on the cluster with fyrd.') + fyrd = True else: - pool = _mp.Pool() - print('Running jobs in parallel') - - results = {} + print('Running jobs in parallel locally') + cores = max(cluster_jobs, _mp.cpu_count()) + pool = _mp.Pool(cores) + fyrd = False + # Bundle jobs + bundle_count = int(l/cluster_jobs) + bundle_count = max(bundle_count, MIN_BUNDLE) + count = bundle_count + # Get walltime based on 6s per job + 40 minutes + # Average on a standard machine is 2 seconds, so this is + # an overestimate + time = _to_time((bundle_count*6)) + _sys.stderr.write('Estimated max time per job {} '.format(time) + + '(walltime req will add 40 min to this)\n') + + jobs = [[]] + n = 0 + skipped = 0 for chrom in sorted(pairs, key=_chrom_sort): snps = pairs[chrom] - print('Working on chromosome {}'.format(chrom)) - args = (snps, plink, chrom, r2, populations) + args = (chrom, r2, populations, False) + # Bundle jobs + if not multi: + # Bundle by chromosome if not multithreading + bundle_count = len(snps) + count = bundle_count + for snp, _, comp_list in snps: + if not comp_list: + skipped += 1 + continue + comp_snps = [rsid for rsid, loc in comp_list] + if not count: + count = bundle_count + jobs.append([]) + n += 1 + jobs[n].append((snp, comp_snps) + args) + count -= 1 + + if skipped: + _sys.stderr.write('{} jobs had no comparison list and were skipped\n' + .format(skipped)) + # Submit jobs + running = [] + results = {} + for jobset in jobs: if multi: - args += (False, ) - if _fyrd and _fyrd.queue.MODE != 'local': - jobs[chrom] = _fyrd.submit( - _ld_job, args, walltime="01:00:00", mem='8G', cores=4) + if fyrd: + time = _to_time((len(jobset)*6)+2400) + running.append( + _fyrd.submit( + _ld_job, (jobset, plink), + walltime=time, mem='8G', cores=4, **cluster_opts) + ) else: - jobs[chrom] = pool.apply_async( - _ld_job, args) + running.append(pool.apply_async(_ld_job, jobset)) else: - results[chrom] = _ld_job(*args) + # Just run jobs per chromosome + results.update(_ld_job(jobset, plink)) if multi: print('Getting results from parallel jobs') - for chrom, job in jobs.items(): - results[chrom] = job.get() - if not _fyrd and _fyrd.queue.MODE != 'local': + if fyrd: + todo = len(running) + while todo: + for job in running: + if job.done: + results.update(job.get()) + todo -= 1 + _sleep(2) + else: + for job in running: + results.update(job.get()) pool.close() # Add all data back again filtered = {} for chrom, data in pairs.items(): for snp, loc, comp_list in data: - filtered[snp] = {'loc': loc, 'chrom': chrom} - if snp in results[chrom]: - if results[chrom][snp]: - filtered[snp]['matches'] = {} - for snp2, pos2 in comp_list: - if snp2 in results[chrom][snp]: - filtered[snp]['matches'][snp2] = results[chrom][snp][snp2] - filtered[snp]['matches'][snp2]['loc'] = pos2 + if snp not in results: + continue + if results[snp]: + filtered[snp] = {'loc': loc, 'chrom': chrom} + filtered[snp]['matches'] = {} + for snp2, pos2 in comp_list: + if snp2 in results[snp]: + filtered[snp]['matches'][snp2] = results[snp][snp2] + filtered[snp]['matches'][snp2]['loc'] = pos2 return filtered -def _ld_job(snps, plink, chrom, r2, populations, pbar=True): - """Private function to run plink, see filter_by_ld().""" - if pbar and pb: - it = pb(snps, unit='plink_calculations') - log = it - else: - it = snps - log = _sys.stderr +def _ld_job(jobs, plink): + """Private function to run plink, see filter_by_ld(). + + Parameters + ---------- + jobs : list_of_tuple + A list of job arguments: + (snp, comp_snps, chrom, r2, populations, raise_on_error, logfile) + plink : PLINK + An initialized plink class + """ snp_results = {} - for snp, _, comp_list in it: - if not comp_list: - continue - comp_snps = [rsid for rsid, loc in comp_list] - snp_results[snp] = plink.one_to_many( - snp, comp_snps, chrom, r2, populations, raise_on_error=False, - logfile=log - ) + for job in jobs: + snp_results[job[0]] = plink.one_to_many(*job) return snp_results +def _to_time(s): + """Convert seconds to 00:00:00 format.""" + m, s = divmod(s, 60) + h, m = divmod(m, 60) + return '{}:{:02}:{:02}'.format(h, m, s) + + ############################################################################### # Output Parsing Functions # ############################################################################### @@ -773,7 +1064,9 @@ def pairs_to_SNP_Pairs(pairs, populations): def comp_snp_lists(list1, list2, populations=None, r2=0.6, distance='50kb', - plink='plink', return_dataframe=False, return_snppair=False, + plink='plink', lists_preparsed=False, one_to_many=False, + dbsnp_loc=None, dbsnp_ver=None, data_dir=None, + return_dataframe=False, return_snp_pairs=False, raise_on_error=False): """Compare two SNP lists, filter by r2. @@ -803,9 +1096,20 @@ def comp_snp_lists(list1, list2, populations=None, r2=0.6, distance='50kb', (mega-bases) plink : str, optional Path to plink executable, otherwise searches PATH + lists_preparsed : bool, optional + If the input lists are already in three-tuples will all data acounted + for, skip the dbSNP phase. + one_to_many : bool, optional + Force plink to run one job per lookup SNP, much slower. + dbsnp_loc : str, optional + Path to dbSNP sqlite database files + dbsnp_ver : int : optional + Version of dbSNP to use + data_dir : str, optional + Path to 1000genomes files return_dataframe : bool, optional Return a dataframe instead of JSON - return_snppair : bool, optional + return_snp_pairs : bool, optional Return a dictionary of SNP_Link objects raise_on_error : bool, optional Raise an Exception if there is a problem with any of the SNPs in the @@ -814,7 +1118,7 @@ def comp_snp_lists(list1, list2, populations=None, r2=0.6, distance='50kb', Returns ------- JSON : dict - If return_dataframe and return_snppairs are False, a json-style + If return_dataframe and return_snp_pairss are False, a json-style dictionary is returned with the format: {snp1: {chrom: chrX, loc: #, matches: {snp2: {loc: #, r2: float, dprime: float, phased: AT/GC, @@ -830,10 +1134,16 @@ def comp_snp_lists(list1, list2, populations=None, r2=0.6, distance='50kb', If return_snplink is True, a dictionary of SNP_Link objects is returned {snp1: [SNP_Pair, SNP_Pair, ...]} """ - print('Getting SNP info for list1') - snps1 = list_to_rsid_and_locs(list1, raise_on_error) - print('Getting SNP info for list2') - snps2 = list_to_rsid_and_locs(list2, raise_on_error) + distance = get_length(distance) + if not lists_preparsed: + print('Getting SNP info for list1') + list1 = list_to_rsid_and_locs(list1, raise_on_error) + print('Getting SNP info for list2') + list2 = list_to_rsid_and_locs(list2, raise_on_error) + + snps1 = snp_list_to_dict(list1) + snps2 = snp_list_to_dict(list2) + print('Getting all possible pairs within {}'.format(distance)) pairs = filter_by_distance(snps1, snps2, distance=distance) l = 0 @@ -841,15 +1151,190 @@ def comp_snp_lists(list1, list2, populations=None, r2=0.6, distance='50kb', for i in [i[2] for i in v]: l += len(i) print('Running plink filters on {} calculations'.format(l)) - pairs = filter_by_ld(pairs, r2, populations=populations, plink=plink) + if one_to_many: + # Not a good method, but kept for comparisons + pairs = filter_by_ld_one_by_one( + pairs, r2, populations=populations, plink=plink, + force_local=True + ) + else: + pairs = filter_by_ld(pairs, r2, populations=populations, plink=plink, + window_size=distance, data_dir=data_dir) print('Done') if return_dataframe: return pairs_to_dataframe(pairs) - if return_snppair: + if return_snp_pairs: return pairs_to_SNP_Pairs(pairs, populations=populations) return pairs +############################################################################### +# Helper Functions # +############################################################################### + + +class MissingSNPError(Exception): + + """Exceception to catch missing SNPs.""" + + pass + + +class BadSNPError(Exception): + + """Exceception to catch missing SNPs.""" + + pass + + +def pairs_to_lists(pairs): + """Convert list of pairs into to two tuple lists. + + Removes any SNPs with an empty match list. + + Parameters + ---------- + pairs : list + From filter_by_distance(): + (rsid, loc, set((rsid, loc))) + + Returns + ------- + snp_list : list_of_tuple + comp_list : list_of_tuple + (rsid, loc) + """ + query = [] + comp = [] + for snp, loc, matches in pairs: + if not matches: + continue + query.append((snp, loc)) + for snp2, loc2 in matches: + comp.append((snp2, loc2)) + return set(query), set(comp) + + +def get_length(length_string): + """Convert a string length like 50kb to an integer length.""" + # Calculate length_string + if isinstance(length_string, str): + if length_string.isdigit(): + return int(length_string) + dist, mod = _re.match(r'(\d+)(\D+)', length_string).groups() + dist = int(dist) + mod = mod.lower() + if mod: + if not mod in ['kb', 'mb']: + raise ValueError('Cannot parse {}, must be in kb or mb only' + .format(length_string)) + if mod == 'kb': + dist = dist * 1000 + elif mod == 'mb': + dist = dist * 1000000 + elif isinstance(length_string, int): + return length_string + else: + raise ValueError('length_string must be either a string or an integer, is ' + '{}'.format(type(length_string))) + return dist + + +def run(command, raise_on_error=False): + """Run a command with subprocess the way it should be. + + Parameters + ---------- + command : str + A command to execute, piping is fine. + raise_on_error : bool + Raise a subprocess.CalledProcessError on exit_code != 0 + + Returns + ------- + stdout : str + stderr : str + exit_code : int + """ + pp = _sub.Popen(command, shell=True, universal_newlines=True, + stdout=_sub.PIPE, stderr=_sub.PIPE) + out, err = pp.communicate() + code = pp.returncode + if raise_on_error and code != 0: + _sys.stderr.write( + '{}\nfailed with:\nCODE: {}\nSTDOUT:\n{}\nSTDERR:\n{}\n' + .format(command, code, out, err) + ) + raise _sub.CalledProcessError( + returncode=code, cmd=command, output=out, stderr=err + ) + return out, err, code + + +def read_bim(bim_file): + """Yields a tuple for each line in bim_file. + + Parameters + ---------- + bim_file : str + Path to a bim file, can be zipped or an open file handle. + + Yields + ------ + chromsome : str + name : str + cm : str + Position in centimorgans, usually 0 + position : int + allele_1 : str + allele_2 : str + """ + with _open_zipped(bim_file) as fin: + for line in fin: + chrom, name, cm, loc, a1, a2 = line.split('\t') + yield chrom, name, cm, int(loc), a1, a2 + + +def _open_zipped(infile, mode='r'): + """Return file handle of file regardless of compressed or not. + + Also returns already opened files unchanged, text mode automatic for + compatibility with python2. + """ + # Return already open files + if hasattr(infile, 'write'): + return infile + # Make text mode automatic + if len(mode) == 1: + mode = mode + 't' + if not isinstance(infile, str): + raise ValueError("I cannot open a filename that isn't a string.") + if infile.endswith('.gz'): + return _gzip.open(infile, mode) + if infile.endswith('.bz2'): + if hasattr(_bz2, 'open'): + return _bz2.open(infile, mode) + else: + return _bz2.BZ2File(infile, mode) + return open(infile, mode) + + +def _chrom_sort(x): + """Return an integer for sorting from a chromosome.""" + if x.startswith('chr'): + x = x[3:] + if x.upper() == 'X': + return 100 + elif x.upper() == 'Y': + return 101 + elif x.upper().startswith('M'): + return 150 + elif x.isdigit(): + return int(x) + else: + return x + + ############################################################################### # Run as a Script # ############################################################################### @@ -909,7 +1394,8 @@ def comp_snp_lists(list1, list2, populations=None, r2=0.6, distance='50kb', rsid, chromosome, location (base-1) """ -def get_arg_parser(): + +def argument_parser(): """Create an argument parser.""" parser = _argparse.ArgumentParser( description=__doc__, #usage=general_usage, @@ -944,6 +1430,18 @@ def get_arg_parser(): filtering.add_argument('--distance', default='50kb', help='Max distance to consider LD (50kb)') + # Optional files + paths = analyze.add_argument_group( + 'data_paths', + 'Paths to 1000genomes and dbSNP' + ) + paths.add_argument('--dbsnp-path', default=DB_PATH, + help='Path to dbSNP directory') + paths.add_argument('--dbsnp-ver', default=DB_VERS, + help='dbSNP version to use') + paths.add_argument('--1000genomes', default=DATA_DIR, + help='1000genomes data files') + # Optional flags analyze.add_argument('-p', '--pandas', action="store_true", help="Write file as a pandas DataFrame instead.") @@ -973,7 +1471,7 @@ def main(argv=None): if not argv: argv = _sys.argv[1:] - parser = get_arg_parser() + parser = argument_parser() args = parser.parse_args(argv) @@ -1003,8 +1501,12 @@ def main(argv=None): else: pops = None - out = comp_snp_lists(snp_list, comp_list, pops, r2=float(args.r2), - distance=args.distance, return_dataframe=True) + out = comp_snp_lists( + snp_list, comp_list, pops, + r2=float(args.r2), + distance=args.distance, + return_dataframe=True, + ) if args.pandas: out.to_pickle(args.outfile) From ad1416ec5ba3795c0ab0cf9a607e251036451b11 Mon Sep 17 00:00:00 2001 From: Mike Dacre Date: Fri, 12 May 2017 23:07:22 -0700 Subject: [PATCH 4/5] Restructured for version 0.2.0b1 --- LD_Direction/LDpair.py | 172 ++++---- LD_Direction/__init__.py | 44 +- LD_Direction/_run.py | 135 ++++++ LD_Direction/compare_snp_lists.py | 688 +++--------------------------- LD_Direction/plink_1000genomes.py | 518 ++++++++++++++++++++++ setup.py | 2 +- 6 files changed, 813 insertions(+), 746 deletions(-) create mode 100644 LD_Direction/_run.py create mode 100644 LD_Direction/plink_1000genomes.py diff --git a/LD_Direction/LDpair.py b/LD_Direction/LDpair.py index 055cd66..b575e68 100755 --- a/LD_Direction/LDpair.py +++ b/LD_Direction/LDpair.py @@ -7,7 +7,6 @@ """ import os import sys -import json import math import argparse import subprocess as _sub @@ -15,8 +14,11 @@ # https://github.com/MikeDacre/dbSNP import dbSNP -DB_SNP_LOC = '/godot/dbsnp' -DB_SNP_VER = 150 +from . import _run + +############################################################################### +# User Defined Globals # +############################################################################### POPULATIONS = ["ALL", "AFR", "AMR", "EAS", "EUR", "SAS", "ACB", "ASW", "BEB", "CDX", "CEU", "CHB", "CHS", "CLM", "ESN", "FIN", "GBR", "GIH", @@ -25,7 +27,13 @@ # Set data directories DATA_DIR = "/godot/1000genomes/1000GP_Phase3" -SNP_DB = "/godot/dbsnp/dbsnp144.db" +DB_SNP_LOC = '/godot/dbsnp' +DB_SNP_VER = 150 + + +############################################################################### +# Helper Functions # +############################################################################### class SNP_Lookup_Failure(Exception): @@ -35,42 +43,60 @@ class SNP_Lookup_Failure(Exception): pass -def output_json(output): - """Return JSON formated string.""" - return json.dumps(output, sort_keys=True, indent=2) - - -def run_cmnd(cmnd): - """Run a command and return the output split into a list by newline.""" - output, _, _ = run(cmnd, raise_on_error=True) - return output.strip().split('\n') - - -def run(command, raise_on_error=False): - """Run a command with subprocess the way it should be. +def print_summary(json_dict, outfile=None): + """Write a formatted summary to file from a dictionary. Parameters ---------- - command : str - A command to execute, piping is fine. - raise_on_error : bool - Raise a subprocess.CalledProcessError on exit_code != 0 + json_dict : dict + A dictionary of the kind generated by calculate_pair() + filehandle : IOBase, optional + An open writable file handle Returns ------- - stdout : str - stderr : str - exit_code : int + str + A formatted string, the same one that is written to outfile if outfile + is provided. """ - pp = _sub.Popen(command, shell=True, universal_newlines=True, - stdout=_sub.PIPE, stderr=_sub.PIPE) - out, err = pp.communicate() - code = pp.returncode - if raise_on_error and code != 0: - raise _sub.CalledProcessError( - returncode=code, cmd=command, output=out, stderr=err - ) - return out, err, code + outstr = "Query SNPs:\n" + outstr += json_dict["snp1"]["rsnum"] + " (" + json_dict["snp1"]["coord"] + ")\n" + outstr += json_dict["snp2"]["rsnum"] + " (" + json_dict["snp2"]["coord"] + ")\n" + outstr += "\n" + outstr += ','.join(json_dict['populations']) + " Haplotypes:\n" + outstr += " " * 15 + json_dict["snp2"]["rsnum"] + '\n' + outstr += " " * 15 + json_dict["snp2"]["allele_1"]["allele"] + " " * 7 + json_dict["snp2"]["allele_2"]["allele"] + '\n' + outstr += " " * 13 + "-" * 17 + '\n' + outstr += " " * 11 + json_dict["snp1"]["allele_1"]["allele"] + " | " + json_dict["two_by_two"]["cells"]["c11"] + " " * (5 - len(json_dict["two_by_two"]["cells"]["c11"])) + " | " + json_dict["two_by_two"]["cells"]["c12"] + " " * (5 - len(json_dict["two_by_two"]["cells"]["c12"])) + " | " + json_dict["snp1"]["allele_1"]["count"] + " " * (5 - len(json_dict["snp1"]["allele_1"]["count"])) + " (" + json_dict["snp1"]["allele_1"]["frequency"] + ")\n" + outstr += json_dict["snp1"]["rsnum"] + " " * (10 - len(json_dict["snp1"]["rsnum"])) + " " * 3 + "-" * 17 + '\n' + outstr += " " * 11 + json_dict["snp1"]["allele_2"]["allele"] + " | " + json_dict["two_by_two"]["cells"]["c21"] + " " * (5 - len(json_dict["two_by_two"]["cells"]["c21"])) + " | " + json_dict["two_by_two"]["cells"]["c22"] + " " * (5 - len(json_dict["two_by_two"]["cells"]["c22"])) + " | " + json_dict["snp1"]["allele_2"]["count"] + " " * (5 - len(json_dict["snp1"]["allele_2"]["count"])) + " (" + json_dict["snp1"]["allele_2"]["frequency"] + ")\n" + outstr += " " * 13 + "-" * 17 + '\n' + outstr += " " * 15 + json_dict["snp2"]["allele_1"]["count"] + " " * (5 - len(json_dict["snp2"]["allele_1"]["count"])) + " " * 3 + json_dict["snp2"]["allele_2"]["count"] + " " * (5 - len(json_dict["snp2"]["allele_2"]["count"])) + " " * 3 + json_dict["two_by_two"]["total"] + '\n' + outstr += " " * 14 + "(" + json_dict["snp2"]["allele_1"]["frequency"] + ")" + " " * (5 - len(json_dict["snp2"]["allele_1"]["frequency"])) + " (" + json_dict["snp2"]["allele_2"]["frequency"] + ")" + " " * (5 - len(json_dict["snp2"]["allele_2"]["frequency"])) + '\n' + outstr += "\n" + outstr += " " + json_dict["haplotypes"]["hap1"]["alleles"] + ": " + json_dict["haplotypes"]["hap1"]["count"] + " (" + json_dict["haplotypes"]["hap1"]["frequency"] + ")\n" + outstr += " " + json_dict["haplotypes"]["hap2"]["alleles"] + ": " + json_dict["haplotypes"]["hap2"]["count"] + " (" + json_dict["haplotypes"]["hap2"]["frequency"] + ")\n" + outstr += " " + json_dict["haplotypes"]["hap3"]["alleles"] + ": " + json_dict["haplotypes"]["hap3"]["count"] + " (" + json_dict["haplotypes"]["hap3"]["frequency"] + ")\n" + outstr += " " + json_dict["haplotypes"]["hap4"]["alleles"] + ": " + json_dict["haplotypes"]["hap4"]["count"] + " (" + json_dict["haplotypes"]["hap4"]["frequency"] + ")\n" + outstr += "\n" + outstr += " D': " + json_dict["statistics"]["d_prime"] + '\n' + outstr += " R\u00b2: " + json_dict["statistics"]["r2"] + '\n' + outstr += " Chi-sq: " + json_dict["statistics"]["chisq"] + '\n' + outstr += " p-value: " + json_dict["statistics"]["p"] + '\n' + outstr += "\n" + if len(json_dict["corr_alleles"]) == 2: + outstr += json_dict["corr_alleles"][0] + '\n' + outstr += json_dict["corr_alleles"][1] + '\n' + else: + outstr += json_dict["corr_alleles"][0] + '\n' + + if 'warning' in json_dict: + outstr += "WARNING: " + json_dict["warning"] + "!\n" + + if outfile: + outfile.write(outstr) + + return outstr def get_snp_info(snp): @@ -142,7 +168,7 @@ def get_snp_info(snp): ) err = None try: - vcf = run_cmnd(tabix_cmd) + vcf = _run.run_cmnd(tabix_cmd) except _sub.CalledProcessError as e: err = e raise @@ -153,7 +179,7 @@ def get_snp_info(snp): ) vcf = [i for i in vcf if 'END' not in i] # Import SNP VCF files - if len(vcf) == 0: + if not vcf: raise SNP_Lookup_Failure(snp + " is not in 1000G reference panel.") elif len(vcf) == 1: geno = vcf[0].strip().split() @@ -196,11 +222,16 @@ def get_snp_info(snp): # Get headers tabix_header_cmd = "tabix -H {} | grep CHROM".format(vcf_file) - head = run_cmnd(tabix_header_cmd)[0].strip().split() + head = _run.run_cmnd(tabix_header_cmd)[0].strip().split() return snp, snp_chrom, snp_loc, geno, allele, head, snp_a1, snp_a2 +############################################################################### +# Primary Function Adapted from LDpair # +############################################################################### + + def calculate_pair(snp1, snp2, pops, write_summary=None, return_json=False): """Find LD information for any two SNPs. @@ -235,7 +266,7 @@ def calculate_pair(snp1, snp2, pops, write_summary=None, return_json=False): except SNP_Lookup_Failure as e: if return_json: output["error"] = str(e) - return output_json(output) + return _run.output_json(output) else: raise @@ -263,14 +294,14 @@ def calculate_pair(snp1, snp2, pops, write_summary=None, return_json=False): err = "VCF File does not match variant coordinates for SNP1." if return_json: output["error"] = err - return output_json(output) + return _run.output_json(output) else: raise SNP_Lookup_Failure(err) if geno2[1] != str(snp2_loc): err = "VCF File does not match variant coordinates for SNP2." if return_json: output["error"] = err - return output_json(output) + return _run.output_json(output) else: raise SNP_Lookup_Failure(err) @@ -309,7 +340,7 @@ def calculate_pair(snp1, snp2, pops, write_summary=None, return_json=False): ).format(bad_pops, POPULATIONS) if return_json: output["error"] = err - return output_json(output) + return _run.output_json(output) else: raise SNP_Lookup_Failure(err) @@ -559,64 +590,13 @@ def calculate_pair(snp1, snp2, pops, write_summary=None, return_json=False): # Return output if return_json: - return output_json(output) - else: - return output + return _run.output_json(output) + return output -def print_summary(json_dict, outfile=None): - """Write a formatted summary to file from a dictionary. - Parameters - ---------- - json_dict : dict - A dictionary of the kind generated by calculate_pair() - filehandle : IOBase, optional - An open writable file handle - - Returns - ------- - str - A formatted string, the same one that is written to outfile if outfile - is provided. - """ - outstr = "Query SNPs:\n" - outstr += json_dict["snp1"]["rsnum"] + " (" + json_dict["snp1"]["coord"] + ")\n" - outstr += json_dict["snp2"]["rsnum"] + " (" + json_dict["snp2"]["coord"] + ")\n" - outstr += "\n" - outstr += ','.join(json_dict['populations']) + " Haplotypes:\n" - outstr += " " * 15 + json_dict["snp2"]["rsnum"] + '\n' - outstr += " " * 15 + json_dict["snp2"]["allele_1"]["allele"] + " " * 7 + json_dict["snp2"]["allele_2"]["allele"] + '\n' - outstr += " " * 13 + "-" * 17 + '\n' - outstr += " " * 11 + json_dict["snp1"]["allele_1"]["allele"] + " | " + json_dict["two_by_two"]["cells"]["c11"] + " " * (5 - len(json_dict["two_by_two"]["cells"]["c11"])) + " | " + json_dict["two_by_two"]["cells"]["c12"] + " " * (5 - len(json_dict["two_by_two"]["cells"]["c12"])) + " | " + json_dict["snp1"]["allele_1"]["count"] + " " * (5 - len(json_dict["snp1"]["allele_1"]["count"])) + " (" + json_dict["snp1"]["allele_1"]["frequency"] + ")\n" - outstr += json_dict["snp1"]["rsnum"] + " " * (10 - len(json_dict["snp1"]["rsnum"])) + " " * 3 + "-" * 17 + '\n' - outstr += " " * 11 + json_dict["snp1"]["allele_2"]["allele"] + " | " + json_dict["two_by_two"]["cells"]["c21"] + " " * (5 - len(json_dict["two_by_two"]["cells"]["c21"])) + " | " + json_dict["two_by_two"]["cells"]["c22"] + " " * (5 - len(json_dict["two_by_two"]["cells"]["c22"])) + " | " + json_dict["snp1"]["allele_2"]["count"] + " " * (5 - len(json_dict["snp1"]["allele_2"]["count"])) + " (" + json_dict["snp1"]["allele_2"]["frequency"] + ")\n" - outstr += " " * 13 + "-" * 17 + '\n' - outstr += " " * 15 + json_dict["snp2"]["allele_1"]["count"] + " " * (5 - len(json_dict["snp2"]["allele_1"]["count"])) + " " * 3 + json_dict["snp2"]["allele_2"]["count"] + " " * (5 - len(json_dict["snp2"]["allele_2"]["count"])) + " " * 3 + json_dict["two_by_two"]["total"] + '\n' - outstr += " " * 14 + "(" + json_dict["snp2"]["allele_1"]["frequency"] + ")" + " " * (5 - len(json_dict["snp2"]["allele_1"]["frequency"])) + " (" + json_dict["snp2"]["allele_2"]["frequency"] + ")" + " " * (5 - len(json_dict["snp2"]["allele_2"]["frequency"])) + '\n' - outstr += "\n" - outstr += " " + json_dict["haplotypes"]["hap1"]["alleles"] + ": " + json_dict["haplotypes"]["hap1"]["count"] + " (" + json_dict["haplotypes"]["hap1"]["frequency"] + ")\n" - outstr += " " + json_dict["haplotypes"]["hap2"]["alleles"] + ": " + json_dict["haplotypes"]["hap2"]["count"] + " (" + json_dict["haplotypes"]["hap2"]["frequency"] + ")\n" - outstr += " " + json_dict["haplotypes"]["hap3"]["alleles"] + ": " + json_dict["haplotypes"]["hap3"]["count"] + " (" + json_dict["haplotypes"]["hap3"]["frequency"] + ")\n" - outstr += " " + json_dict["haplotypes"]["hap4"]["alleles"] + ": " + json_dict["haplotypes"]["hap4"]["count"] + " (" + json_dict["haplotypes"]["hap4"]["frequency"] + ")\n" - outstr += "\n" - outstr += " D': " + json_dict["statistics"]["d_prime"] + '\n' - outstr += " R\u00b2: " + json_dict["statistics"]["r2"] + '\n' - outstr += " Chi-sq: " + json_dict["statistics"]["chisq"] + '\n' - outstr += " p-value: " + json_dict["statistics"]["p"] + '\n' - outstr += "\n" - if len(json_dict["corr_alleles"]) == 2: - outstr += json_dict["corr_alleles"][0] + '\n' - outstr += json_dict["corr_alleles"][1] + '\n' - else: - outstr += json_dict["corr_alleles"][0] + '\n' - - if 'warning' in json_dict: - outstr += "WARNING: " + json_dict["warning"] + "!\n" - - if outfile: - outfile.write(outstr) - - return outstr +############################################################################### +# Run as a Script # +############################################################################### def get_arg_parser(): diff --git a/LD_Direction/__init__.py b/LD_Direction/__init__.py index a009d92..77c60de 100644 --- a/LD_Direction/__init__.py +++ b/LD_Direction/__init__.py @@ -15,33 +15,51 @@ Organization: Stanford University License: MIT License, property of Stanford and NCI, use as you wish Created: 2017-21-21 10:04 -Version: 0.1.0a +Version: 0.2.0b1 Modules ------- -snp_link : provides functions and classes to do one-to-one LD comparisons on -either just two SNPs, or a list of SNP pairs -compare_snp_lists : provides functions to do a one-to-many comparison for -arbitrarily large lists of SNPs (limited only by memory) +compare_snp_lists + provides functions to do a one-to-many comparison for arbitrarily large + lists of SNPs (limited only by memory) +snp_link + provides functions and classes to do one-to-one LD comparisons on either + just two SNPs, or a list of SNP pairs +LDpair + Adapted from LDlink, provides LD calculation tools directly from the VCF + file without the need for plink + +PLINK +----- +https://www.cog-genomics.org/plink +https://github.com/chrchang/plink-ng/ + +compare_snp_lists depends entirely upon plink and could not function without +it. Many, many thanks to Christopher Chang for creating it, and for his help +with getting this package working. LDLink ------ https://analysistools.nci.nih.gov/LDlink https://github.com/CBIIT/nci-webtools-dceg-linkage -This tool uses the above API entirely, thanks to Mitchell Machiela and the -Chanock lab for writing that tool! +This package makes extensive use of the LDlink API entirely, thanks to Mitchell +Machiela and the Chanock lab for writing that tool! -Citation -~~~~~~~~ +Citations +--------- +Chang, Christopher C and Chow, Carson C and Tellier, Laurent C A M and + Vattikuti, Shashaank and Purcell, Shaun M and Lee, James J. Second- + generation {PLINK}: {R}ising to the challenge of larger and richer + datasets. GigaScience. 2015. DOI 10.1186/s13742-015-0047-8 Machiela MJ, Chanock SJ. LDlink a web-based application for exploring -population-specific haplotype structure and linking correlated alleles of -possible functional variants. Bioinformatics. 2015 Jul 2. PMID: 26139635. + population-specific haplotype structure and linking correlated alleles of + possible functional variants. Bioinformatics. 2015 Jul 2. PMID: 26139635. """ -__version__ = '0.1.0a' +__version__ = '0.2.0b1' __all__ = ['snp_link', 'compare_snp_lists'] # Make core functions easily available -from . import snp_link from . import compare_snp_lists +from . import snp_link diff --git a/LD_Direction/_run.py b/LD_Direction/_run.py new file mode 100644 index 0000000..2734be0 --- /dev/null +++ b/LD_Direction/_run.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Simple functions used by the rest of the module. + +Functions +--------- +output_json + Return a string rendition of a dictionary +run + Run a system command and return stdout, stderr, exit_code +run_cmnd + Run a system command with run() and return result split by newline +open_zipped + Open a regular, gzipped, bz2zipped, or open filehandle agnostically +get_length + Convert kb and mb lengths into integers +chrom_sort_key + For use with sorted(), allows correct ordering of chromosomes. +""" +import re as _re +import sys as _sys +import bz2 as _bz2 +import gzip as _gzip +import subprocess as _sub + +import json + +__all__ = ["output_json", "run", "run_cmnd", "open_zipped", "get_length", + "chrom_sort_key"] + + +def output_json(output): + """Return JSON formated string.""" + return json.dumps(output, sort_keys=True, indent=2) + + +def run(command, raise_on_error=False): + """Run a command with subprocess the way it should be. + + Parameters + ---------- + command : str + A command to execute, piping is fine. + raise_on_error : bool + Raise a subprocess.CalledProcessError on exit_code != 0 + + Returns + ------- + stdout : str + stderr : str + exit_code : int + """ + pp = _sub.Popen(command, shell=True, universal_newlines=True, + stdout=_sub.PIPE, stderr=_sub.PIPE) + out, err = pp.communicate() + code = pp.returncode + if raise_on_error and code != 0: + _sys.stderr.write( + '{}\nfailed with:\nCODE: {}\nSTDOUT:\n{}\nSTDERR:\n{}\n' + .format(command, code, out, err) + ) + raise _sub.CalledProcessError( + returncode=code, cmd=command, output=out, stderr=err + ) + return out, err, code + + +def run_cmnd(cmnd): + """Run a command and return the output split into a list by newline.""" + output, _, _ = _run.run(cmnd, raise_on_error=True) + return output.strip().split('\n') + + +def open_zipped(infile, mode='r'): + """Return file handle of file regardless of compressed or not. + + Also returns already opened files unchanged, text mode automatic for + compatibility with python2. + """ + # Return already open files + if hasattr(infile, 'write'): + return infile + # Make text mode automatic + if len(mode) == 1: + mode = mode + 't' + if not isinstance(infile, str): + raise ValueError("I cannot open a filename that isn't a string.") + if infile.endswith('.gz'): + return _gzip.open(infile, mode) + if infile.endswith('.bz2'): + if hasattr(_bz2, 'open'): + return _bz2.open(infile, mode) + return _bz2.BZ2File(infile, mode) + return open(infile, mode) + + +def get_length(length_string): + """Convert a string length like 50kb to an integer length.""" + # Calculate length_string + if isinstance(length_string, str): + if length_string.isdigit(): + return int(length_string) + dist, mod = _re.match(r'(\d+)(\D+)', length_string).groups() + dist = int(dist) + mod = mod.lower() + if mod: + if not mod in ['kb', 'mb']: + raise ValueError('Cannot parse {}, must be in kb or mb only' + .format(length_string)) + if mod == 'kb': + dist = dist * 1000 + elif mod == 'mb': + dist = dist * 1000000 + elif isinstance(length_string, int): + return length_string + else: + raise ValueError('length_string must be either a string or an integer, is ' + '{}'.format(type(length_string))) + return dist + + +def chrom_sort_key(x): + """Return an integer for sorting from a chromosome.""" + if x.startswith('chr'): + x = x[3:] + if x.upper() == 'X': + return 100 + elif x.upper() == 'Y': + return 101 + elif x.upper().startswith('M'): + return 150 + elif x.isdigit(): + return int(x) + return x diff --git a/LD_Direction/compare_snp_lists.py b/LD_Direction/compare_snp_lists.py index e18e786..803b170 100644 --- a/LD_Direction/compare_snp_lists.py +++ b/LD_Direction/compare_snp_lists.py @@ -49,17 +49,12 @@ TODO """ import os as _os -import re as _re import sys as _sys import sqlite3 as _sqlite3 import pickle as _pickle -import subprocess as _sub import argparse as _argparse import multiprocessing as _mp -import bz2 as _bz2 -import gzip as _gzip from time import sleep as _sleep -from tempfile import mkstemp as _temp try: import pandas as _pd @@ -73,7 +68,11 @@ import dbSNP as _dbSNP +from . import plink_1000genomes as _plink +from . import _run from .snp_link import SNP_Pair +from .plink_1000genomes import PLINK +from .distance_filtering import distance_filter try: from tqdm import tqdm, tqdm_notebook @@ -87,489 +86,27 @@ except ImportError: pb = None -from .distance_filtering import distance_filter - -MIN_BUNDLE = 150 # Minimum number of jobs per node, 150 takes less than a minute. - -DB_PATH = '/godot/dbsnp/db/hg19' -DB_VERS = 150 - -POPULATIONS = ["ALL", "AFR", "AMR", "EAS", "EUR", "SAS", "ACB", "ASW", "BEB", - "CDX", "CEU", "CHB", "CHS", "CLM", "ESN", "FIN", "GBR", "GIH", - "GWD", "IBS", "ITU", "JPT", "KHV", "LWK", "MSL", "MXL", "PEL", - "PJL", "PUR", "STU", "TSI", "YRI"] - -# Set data directories -DATA_DIR = "/godot/1000genomes/1000GP_Phase3" - __all__ = ['comp_snp_lists', 'list_to_rsid_and_locs', 'filter_by_distance', 'filter_by_ld', 'save_list'] - ############################################################################### -# Run plink jobs without repetative code # +# User Set Globals # ############################################################################### +# Minimum number of jobs per node in one-to-many mode, 150 takes less than a +# minute. +MIN_BUNDLE = 150 -class PLINK(object): - - """A reusable object to run plink jobs on 1000genomes files quickly.""" - - written_files = {} - bims = {} - - def __init__(self, data=None, pop_file=None, plink='plink'): - """Load population information. - - Parameters - ---------- - data : str, optional - Path the the 1000genomes data files - (e.g. ALL.chr16.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.{bed,bim}) - Default set in DATA_DIR hardcoded in this file. - pop_file : str, optional - Path to the 1000 genomes population file - plink : str, optional - Path to plink executable, otherwise searches PATH - """ - self.plink = plink - data = data if data else DATA_DIR - if not _os.path.isdir(data): - raise ValueError('{} must be a directory, but no directory found' - .format(data)) - self.path = _os.path.abspath(data) - if not pop_file: - pop_file = _os.path.join( - self.path, 'integrated_call_samples_v3.20130502.ALL.panel' - ) - individuals = {} - with open(pop_file) as fin: - assert fin.readline() == 'sample\tpop\tsuper_pop\tgender\t\t\n' - for line in fin: - ind, pop, _, _ = line.split('\t') - if pop not in individuals: - individuals[pop] = [] - individuals[pop].append(ind) - self.individuals = individuals - self.files = {} - for fl in _os.listdir(self.path): - if 'phase3_' not in fl: - continue - if not fl.endswith('bed'): - continue - if not fl.startswith('ALL'): - continue - chrom = fl.split('.')[1] - if not chrom.startswith('chr'): - continue - fl = _os.path.abspath(_os.path.join(self.path, fl)) - root = '.'.join(fl.split('.')[:-1]) - bed = _os.path.abspath('{}.bed'.format(root)) - bim = _os.path.abspath('{}.bim'.format(root)) - fam = _os.path.abspath('{}.fam'.format(root)) - assert _os.path.isfile(bed) - assert _os.path.isfile(bim) - assert _os.path.isfile(fam) - c1 = chrom if chrom.startswith('chr') else 'chr' + str(chrom) - c2 = c1[3:] - for c in [c1, c2]: - self.files[c] = { - 'root': root, - 'bed': bed, - 'bim': bim, - 'fam': fam, - } - - def pop_file(self, populations=None, outfile=None): - """Write temp file with a list of individuals in population.""" - populations = populations if populations else self.individuals.keys() - if isinstance(populations, str): - populations = [populations] - populations = list(populations) - if outfile and _os.path.isfile(outfile): - outfile = _os.path.abspath(outfile) - self.written_files[','.join(populations)] = outfile - return outfile - if ','.join(populations) in self.written_files: - fl = self.written_files[','.join(populations)] - if _os.path.isfile(fl): - return fl - pop_ids = [] - bad_pops = [] - for pop_i in populations: - if pop_i in self.individuals: - pop_ids += self.individuals[pop_i] - else: - bad_pops.append(pop_i) - if bad_pops: - err = ( - "{} are not ancestral populations. Choose one of the following " - "ancestral populations: {}" - ).format(bad_pops, POPULATIONS) - raise ValueError(err) - - pop_ids = sorted(set(pop_ids)) - - if not outfile: - _, outfile = _temp(prefix='-'.join(populations), dir='/tmp') - with open(outfile, 'w') as fout: - fout.write('\n'.join( - [' '.join(x) for x in zip(pop_ids, pop_ids)] - )) - - self.written_files[','.join(populations)] = outfile - return outfile - - def bim_file(self, chrom, snps, outfile=None): - """Filter a bim file to only include SNPs in snps. - - Parameters - ---------- - chrom : str - A chromosome name for the file to work on. - snps : list_of_tuple - A list of snps from list_to_rsid_and_locs(): - (rsid, chrom, loc) - outfile : str, optional - A file to write to. A tempfile will be created if not is provided. - - Returns - ------- - file_path : str - Absolute path to the newly created file. - """ - if not chrom in self.files: - raise ValueError( - '{} is not in our bim files:\n{}'.format( - chrom, - '\n'.join([i[' bim'] for i in self.files.values()]) - ) - ) - snps = [(str(name), int(loc)) for name, loc in snps] - if not outfile: - _, outfile = _temp( - prefix='filtered_bim.', suffix='.bim', dir='/tmp' - ) - outfile = _os.path.abspath(outfile) - with _open_zipped(outfile, 'w') as fout: - for c, name, cm, pos, a1, a2 in read_bim(self.files[chrom]['bim']): - if (name, pos) in snps: - fout.write('\t'.join([c, name, cm, str(pos), a1, a2]) + '\n') - return outfile - - def bim_snps(self, chrom, bim_file=None): - """Return and cache all SNPs in a bim file. - - Note: will only cache rsids in bim if no bim_file is given. - - Parameters - ---------- - chrom : str - bim_file : str, optional - Bim file to work on, otherwise the chromosome is used to pick the - complete file. - - Returns - ------- - rsids : frozenset - """ - if not bim_file: - if chrom in self.bims: - with open(self.bims[chrom], 'rb') as fin: - return _pickle.load(fin) - try: - bim_file = self.files[chrom]['bim'] - except KeyError: - raise KeyError('{} is not in the bim file list'.format(chrom)) - _, pfl = _temp() - else: - pfl = None - rsids = [] - with open(bim_file) as fin: - for line in fin: - if not line.strip(): - continue - rsids.append(line.strip().split('\t')[1]) - rsids = frozenset(rsids) - if pfl: - with open(pfl, 'wb') as fout: - _pickle.dump(rsids, fout) - self.bims[chrom] = pfl - return rsids - - def many_to_many(self, snps, comp_list, chrom, r2=0.6, populations=None, - window_size=50000, fbim_file=None, outfile=None, - keep_int_files=False, raise_on_error=False, - logfile=_sys.stderr): - """Get many-to-many LD information using plink. - - Will do a single lookup for all SNPs in snps using plink on a filtered - bim file generated to only contain the SNPs in comp_list. - - Parameters - ---------- - snps : list_of_tuple - list of rsIDs to query in the format: - (rsid, loc) (from pairs_to_lists()) - comp_list : list_of_tuple - list of rsIDs to compare to in the format: - (rsid, loc) (from pairs_to_lists()) - chrom : str - which chromosome to search - r2 : float, optional - r-squared level to use for filtering - populations : list_of_str, optional - list of populations to include in the analysis - window_size : int, optional - Integer number for window size, default 50kb. - fbim_file : str, optional - File to write the filtered bims to, if not provided a temp file - will be used and deleted. - outfile : str, optional - A file root for plink to write to, output will have '.ld' appended - to it. If not provided, temp file used and deleted after use. - keep_int_files : bool, optional - Do not delete intermediate SNP files - raise_on_error : bool, optional - if False, will return None if primary SNP missing from bim file - logfile : filehandle, optional - A file like object to write to - - Returns - ------- - matching_snps : dict - For every matching SNP that beats the r-squared: { - snp: {r2: r-squared, dprime: d-prime, phased: phased-alleles} - } - If plink job fails, returns an empty dictionary. - """ - window_size = get_length(window_size) - if not snps or not comp_list: - raise ValueError('snps and comp_list cannot be null') - snps = set(snps) - comp_list = set(comp_list) - bsnps = self.bim_snps(chrom) - all_snps = [i for i, _ in snps | comp_list if i in bsnps] - good = [] - bad = [] - rsids = {} - for rsid, loc in snps: - if rsid in bsnps: - good.append(rsid) - rsids[rsid] = int(loc) - else: - bad.append(rsid) - if bad: - _sys.stderr.write( - 'The following SNPs are not in the bim file and cannot be ' + - 'queried:\n{}\n'.format(bad) - ) - del(bsnps) - # Initialize necessary files - bfile = self.files[chrom]['root'] - pop_file = self.pop_file(populations) - if outfile: - del_file = False - else: - _, outfile = _temp(prefix='plink', dir='/tmp') - _os.remove(outfile) - del_file = True - _, snp_file = _temp(prefix='plinksnps', dir='/tmp') - with open(snp_file, 'w') as fout: - fout.write('\n'.join(sorted([snp for snp, _ in snps])) + '\n') - _, comp_file = _temp(prefix='plinkcomp', dir='/tmp') - with open(comp_file, 'w') as fout: - fout.write('\n'.join(sorted(all_snps)) + '\n') - - # Build the command - plink_cmnd = ( - '{plink} --bfile {bfile} --r2 in-phase dprime ' - '--ld-snp-list {snp_file} --extract {comp_file} ' - '--ld-window {window} --keep {ind_file} --out {out}' - ).format( - plink=self.plink, bfile=bfile, window=window_size, - snp_file=snp_file, comp_file=comp_file, ind_file=pop_file, - out=outfile - ) - - # Run it - stdout, stderr, code = run(plink_cmnd, raise_on_error) - # Parse the output file - if code != 0: - logfile.write( - '{}: plink command failed'.format(chrom) + - 'Command: {}\nExit Code: {}\nSTDOUT:\n{}\bSTDERR:\n{}\n' - .format(plink_cmnd, code, stdout, stderr) - ) - return {} - - results = {} - with open(outfile + '.ld') as fin: - # Check header - line = fin.readline().strip() - assert _re.split(r' +', line) == [ - 'CHR_A', 'BP_A', 'SNP_A', 'CHR_B', 'BP_B', - 'SNP_B', 'PHASE', 'R2', 'DP' - ] - for line in fin: - f = _re.split(r' +', line.strip()) - snp1, snp2, phased = f[2], f[5], f[6] - loc1, loc2 = int(f[1]), int(f[4]) - rsquared, dprime = float(f[7]), float(f[8]) - if snp1 not in good: - continue - if snp1 == snp2: - continue - if loc1 != rsids[snp1]: - continue - if rsquared < r2: - continue - if snp1 not in results: - results[snp1] = { - 'chrom': chrom, 'loc': loc1, 'matches': {} - } - try: - p1, p2 = phased.split('/') - s1a, s2a = p1 - s1b, s2b = p2 - lookup = {snp1: {s1a: s2a, s1b: s2b}, - snp2: {s2a: s1a, s2b: s1b}} - except ValueError: - lookup = {} - results[snp1]['matches'][snp2] = { - 'r2': rsquared, 'dprime': dprime, 'loc': loc2, - 'phased': phased, 'lookup': lookup - } - - if del_file: - pass - for fl in [_os.path.join('/tmp', f) for f in _os.listdir('/tmp')]: - if fl.startswith(outfile): - _os.remove(fl) - if not keep_int_files: - _os.remove(snp_file) - _os.remove(comp_file) - - return results - - - def one_to_many(self, snp, comp_list, chrom, r2=0.6, populations=None, - raise_on_error=False, logfile=_sys.stderr): - """Get one-to-many LD information using plink. - - Parameters - ---------- - snp : str - rsID of a SNP to query - comp_list : list_of_str - list of rsIDs to compare to - chrom : str - which chromosome to search - r2 : float, optional - r-squared level to use for filtering - populations : list_of_str, optional - list of populations to include in the analysis - raise_on_error : bool, optional - if False, will return None if primary SNP missing from bim file - logfile : filehandle, optional - A file like object to write to - - Returns - ------- - matching_snps : dict - For every matching SNP that beats the r-squared: { - snp: {r2: r-squared, dprime: d-prime, phased: phased-alleles} - } - If plink job fails, returns an empty dictionary. - """ - _, temp_file = _temp(prefix='plink', dir='/tmp') - pop_file = self.pop_file(populations) - bfile = _os.path.join( - DATA_DIR, - 'ALL.{}.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes' - .format(chrom) - ) - bim = bfile + '.bim' - # We need to include the query SNP in the lookup list - comp_list = list(comp_list) - comp_list.append(snp) - comp_list = sorted(set(comp_list)) - # Filter SNPs not in the bim - bim_snps = self.bim_snps(chrom) - bad = [] - if snp not in bim_snps: - err = ('Primary SNP {} is not in BIM {}, cannot continue.' - .format(snp, bim)) - if raise_on_error: - raise BadSNPError(err) - else: - logfile.write(err + '\n') - return None - bad = [] - for s in comp_list: - if s not in bim_snps: - bad.append(s) - comp_list.remove(s) - if bad: - _sys.stderr.write(('{} removed from comparison list as not in ' + - 'bim file\n').format(bad)) - del(bim_snps) - # Build the command - plink_cmnd = ( - '{plink} --bfile {bfile} --r2 in-phase dprime --ld-snp {snp} ' - '--snps {comp_list} --keep {ind_file} --out {tmp}' - ).format( - plink=self.plink, - bfile=bfile, - snp=snp, comp_list=' '.join(comp_list), - ind_file=pop_file, tmp=temp_file - ) - # Run it - stdout, stderr, code = run(plink_cmnd, raise_on_error) - # Parse the output file - if code != 0: - logfile.write( - '{}: plink command failed'.format(snp) + - 'Command: {}\nExit Code: {}\nSTDOUT:\n{}\bSTDERR:\n{}\n' - .format(plink_cmnd, code, stdout, stderr) - ) - return {} - results = {} - with open(temp_file + '.ld') as fin: - # Check header - line = fin.readline().strip() - assert _re.split(r' +', line) == [ - 'CHR_A', 'BP_A', 'SNP_A', 'CHR_B', 'BP_B', - 'SNP_B', 'PHASE', 'R2', 'DP' - ] - for line in fin: - f = _re.split(r' +', line.strip()) - snp2, phased = f[5], f[6] - rsquared, dprime = float(f[7]), float(f[8]) - if snp2 == snp: - continue - if rsquared < r2: - continue - try: - p1, p2 = phased.split('/') - s1a, s2a = p1 - s1b, s2b = p2 - lookup = {snp: {s1a: s2a, s1b: s2b}, - snp2: {s2a: s1a, s2b: s1b}} - except ValueError: - lookup = {} - results[snp2] = {'r2': rsquared, 'dprime': dprime, - 'phased': phased, 'lookup': lookup} - _os.remove(temp_file + '.ld') - return results - +DB_PATH = '/godot/dbsnp/db/hg19' +DB_VERS = 150 ############################################################################### # dbSNP Connection # ############################################################################### -def list_to_rsid_and_locs(list1, raise_on_missing=False): +def list_to_rsid_and_locs(list1, dbsnp_loc=None, dbsnp_ver=None, + raise_on_missing=False): """Convert every entry in each list to an rsid and chr:loc position. dbSNP is base-0 but the chr:loc positions are all treated as base-1, so any @@ -589,13 +126,17 @@ def list_to_rsid_and_locs(list1, raise_on_missing=False): Note, itmes can also be a three-tuple: (rsid, chrom, loc) In this format, no lookup is performed, so function runs very fast. + dbsnp_loc : str, optional + Path to dbSNP sqlite database files + dbsnp_ver : int : optional + Version of dbSNP to use raise_on_missing : str Raise an Exception if there is a problem with any of the SNPs (i.e. they aren't in dbSNP or are indels Raises ------ - MissingSNPError + _plink.MissingSNPError If a SNP is missing from dbSNP BadSNPError If a SNP is an indel @@ -605,6 +146,8 @@ def list_to_rsid_and_locs(list1, raise_on_missing=False): dict_of_lists {'chrom': [(rsid, int(position))]} # position is base-1 """ + dbsnp_loc = dbsnp_loc if dbsnp_loc else DB_PATH + dbsnp_ver = dbsnp_ver if dbsnp_ver else DB_VERS list1_rs = [] list1_locs = [] list1_done = {} @@ -632,16 +175,21 @@ def list_to_rsid_and_locs(list1, raise_on_missing=False): .format(bad) ) if raise_on_missing: - raise MissingSNPError(err) + raise _plink.MissingSNPError(err) _sys.stderr.write(err + '\n') if list1_rs: _sys.stderr.write('Doing rsID DB lookup.\n') if len(list1_rs) > 5000: - conn = _sqlite3.connect(DB_PATH + '/dbsnp{}.db'.format(DB_VERS)) + conn = _sqlite3.connect( + _os.path.join(dbsnp_loc, 'dbsnp{}.db'.format(dbsnp_ver)) + ) conn.execute("CREATE TEMP TABLE 'temp' (name)") conn.executemany("INSERT INTO temp (name) VALUES (?)", [(i,) for i in list1_rs]) - q = conn.execute("SELECT dbSNP.name, dbSNP.chrom, dbSNP.start, dbSNP.end FROM dbSNP INNER JOIN temp ON dbSNP.name=temp.name").fetchall() + q = conn.execute( + "SELECT dbSNP.name, dbSNP.chrom, dbSNP.start, dbSNP.end " + + "FROM dbSNP INNER JOIN temp ON dbSNP.name=temp.name" + ).fetchall() else: db = _dbSNP.DB(DB_PATH, DB_VERS) q = [] @@ -698,22 +246,12 @@ def list_to_rsid_and_locs(list1, raise_on_missing=False): if failed: err = '{} not in dbSNP'.format(snp) if raise_on_missing: - raise MissingSNPError(err) + raise _plink.MissingSNPError(err) else: _sys.stderr.write(err + '\n') return results -def snp_list_to_dict(list1): - """Convert a list from list_to_rsid_and_locs() to a chrom dict.""" - results = {} - for s_rs, s_chrom, s_loc in list1: - if s_chrom not in results: - results[s_chrom] = [] - results[s_chrom].append((s_rs, int(s_loc))) - return results - - def save_list(snp_list, outfile=None, pickle=False): """Convert a list of rsIDs or positions to a pre-processed format. @@ -746,10 +284,10 @@ def save_list(snp_list, outfile=None, pickle=False): if outfile: _sys.stderr.write('Writing output.\n') if pickle: - with _open_zipped(outfile, 'wb') as fout: + with _run.open_zipped(outfile, 'wb') as fout: _pickle.dump(parsed_list, fout) else: - with _open_zipped(outfile, 'w') as fout: + with _run.open_zipped(outfile, 'w') as fout: for row in parsed_list: rsid, chrom, loc = row fout.write('{}\t{}\t{}\n'.format(rsid, chrom, loc)) @@ -788,7 +326,7 @@ def filter_by_distance(ref_snps, comp_snps, distance='50kb'): } } """ - distance = get_length(distance) + distance = _run.get_length(distance) results = {} for chrom, snps in ref_snps.items(): if chrom not in comp_snps: @@ -816,7 +354,7 @@ def filter_by_ld(pairs, r2=0.6, window_size=50000, populations=None, Path to plink executable, otherwise searches PATH data_dir : str, optional Path to directory containing 1000genomes data, default set in DATA_DIR - hardcoded into this file. + hardcoded into this module. Returns ------- @@ -824,7 +362,7 @@ def filter_by_ld(pairs, r2=0.6, window_size=50000, populations=None, snp: {chrom: chrom, loc: loc, matches: { snp2: {rsquared: r2, dprime: d', loc: loc2} """ - window_size = get_length(window_size) + window_size = _run.get_length(window_size) plink = PLINK(data=data_dir, plink=plink) # Initialize the population file plink.pop_file(populations, outfile='.'.join(populations) + '.pops.txt') @@ -832,11 +370,11 @@ def filter_by_ld(pairs, r2=0.6, window_size=50000, populations=None, for v in pairs.values(): for i in [i[2] for i in v if i[2]]: l += len(i) - multi = True if l > 200 else False + # multi = True if l > 2000 else False # Run plink jobs results = {} - for chrom in sorted(pairs, key=_chrom_sort): + for chrom in sorted(pairs, key=_run.chrom_sort_key): data = pairs[chrom] snps, comp_list = pairs_to_lists(data) results.update( @@ -906,7 +444,7 @@ def filter_by_ld_one_by_one(pairs, r2=0.6, populations=None, plink='plink', jobs = [[]] n = 0 skipped = 0 - for chrom in sorted(pairs, key=_chrom_sort): + for chrom in sorted(pairs, key=_run.chrom_sort_key): snps = pairs[chrom] args = (chrom, r2, populations, False) # Bundle jobs @@ -1134,12 +672,14 @@ def comp_snp_lists(list1, list2, populations=None, r2=0.6, distance='50kb', If return_snplink is True, a dictionary of SNP_Link objects is returned {snp1: [SNP_Pair, SNP_Pair, ...]} """ - distance = get_length(distance) + distance = _run.get_length(distance) if not lists_preparsed: print('Getting SNP info for list1') - list1 = list_to_rsid_and_locs(list1, raise_on_error) + list1 = list_to_rsid_and_locs(list1, dbsnp_loc, dbsnp_ver, + raise_on_error) print('Getting SNP info for list2') - list2 = list_to_rsid_and_locs(list2, raise_on_error) + list2 = list_to_rsid_and_locs(list2, dbsnp_loc, dbsnp_ver, + raise_on_error) snps1 = snp_list_to_dict(list1) snps2 = snp_list_to_dict(list2) @@ -1173,18 +713,14 @@ def comp_snp_lists(list1, list2, populations=None, r2=0.6, distance='50kb', ############################################################################### -class MissingSNPError(Exception): - - """Exceception to catch missing SNPs.""" - - pass - - -class BadSNPError(Exception): - - """Exceception to catch missing SNPs.""" - - pass +def snp_list_to_dict(list1): + """Convert a list from list_to_rsid_and_locs() to a chrom dict.""" + results = {} + for s_rs, s_chrom, s_loc in list1: + if s_chrom not in results: + results[s_chrom] = [] + results[s_chrom].append((s_rs, int(s_loc))) + return results def pairs_to_lists(pairs): @@ -1215,126 +751,6 @@ def pairs_to_lists(pairs): return set(query), set(comp) -def get_length(length_string): - """Convert a string length like 50kb to an integer length.""" - # Calculate length_string - if isinstance(length_string, str): - if length_string.isdigit(): - return int(length_string) - dist, mod = _re.match(r'(\d+)(\D+)', length_string).groups() - dist = int(dist) - mod = mod.lower() - if mod: - if not mod in ['kb', 'mb']: - raise ValueError('Cannot parse {}, must be in kb or mb only' - .format(length_string)) - if mod == 'kb': - dist = dist * 1000 - elif mod == 'mb': - dist = dist * 1000000 - elif isinstance(length_string, int): - return length_string - else: - raise ValueError('length_string must be either a string or an integer, is ' - '{}'.format(type(length_string))) - return dist - - -def run(command, raise_on_error=False): - """Run a command with subprocess the way it should be. - - Parameters - ---------- - command : str - A command to execute, piping is fine. - raise_on_error : bool - Raise a subprocess.CalledProcessError on exit_code != 0 - - Returns - ------- - stdout : str - stderr : str - exit_code : int - """ - pp = _sub.Popen(command, shell=True, universal_newlines=True, - stdout=_sub.PIPE, stderr=_sub.PIPE) - out, err = pp.communicate() - code = pp.returncode - if raise_on_error and code != 0: - _sys.stderr.write( - '{}\nfailed with:\nCODE: {}\nSTDOUT:\n{}\nSTDERR:\n{}\n' - .format(command, code, out, err) - ) - raise _sub.CalledProcessError( - returncode=code, cmd=command, output=out, stderr=err - ) - return out, err, code - - -def read_bim(bim_file): - """Yields a tuple for each line in bim_file. - - Parameters - ---------- - bim_file : str - Path to a bim file, can be zipped or an open file handle. - - Yields - ------ - chromsome : str - name : str - cm : str - Position in centimorgans, usually 0 - position : int - allele_1 : str - allele_2 : str - """ - with _open_zipped(bim_file) as fin: - for line in fin: - chrom, name, cm, loc, a1, a2 = line.split('\t') - yield chrom, name, cm, int(loc), a1, a2 - - -def _open_zipped(infile, mode='r'): - """Return file handle of file regardless of compressed or not. - - Also returns already opened files unchanged, text mode automatic for - compatibility with python2. - """ - # Return already open files - if hasattr(infile, 'write'): - return infile - # Make text mode automatic - if len(mode) == 1: - mode = mode + 't' - if not isinstance(infile, str): - raise ValueError("I cannot open a filename that isn't a string.") - if infile.endswith('.gz'): - return _gzip.open(infile, mode) - if infile.endswith('.bz2'): - if hasattr(_bz2, 'open'): - return _bz2.open(infile, mode) - else: - return _bz2.BZ2File(infile, mode) - return open(infile, mode) - - -def _chrom_sort(x): - """Return an integer for sorting from a chromosome.""" - if x.startswith('chr'): - x = x[3:] - if x.upper() == 'X': - return 100 - elif x.upper() == 'Y': - return 101 - elif x.upper().startswith('M'): - return 150 - elif x.isdigit(): - return int(x) - else: - return x - - ############################################################################### # Run as a Script # ############################################################################### @@ -1439,7 +855,7 @@ def argument_parser(): help='Path to dbSNP directory') paths.add_argument('--dbsnp-ver', default=DB_VERS, help='dbSNP version to use') - paths.add_argument('--1000genomes', default=DATA_DIR, + paths.add_argument('--1000genomes', default=_plink.DATA_DIR, help='1000genomes data files') # Optional flags @@ -1481,7 +897,7 @@ def main(argv=None): _sys.stderr.write('Mode required.\n') return 2 - with _open_zipped(args.snp_list) as fin: + with _run.open_zipped(args.snp_list) as fin: snp_list = fin.read().strip().split('\n') if args.mode == 'preprocess': @@ -1493,7 +909,7 @@ def main(argv=None): _sys.stderr.write('Cannot write pandas DataFrame to STDOUT\n') return 1 - with _open_zipped(args.comp_list) as fin: + with _run.open_zipped(args.comp_list) as fin: comp_list = fin.read().strip().split('\n') if args.populations: diff --git a/LD_Direction/plink_1000genomes.py b/LD_Direction/plink_1000genomes.py new file mode 100644 index 0000000..dfa5fdc --- /dev/null +++ b/LD_Direction/plink_1000genomes.py @@ -0,0 +1,518 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Methods to run plink commands on the 1000genomes phase 3 dataset. +""" +import os as _os +import re as _re +import sys as _sys +import pickle as _pickle +from tempfile import mkstemp as _temp + +from . import _run + +# Set data directories +DATA_DIR = "/godot/1000genomes/1000GP_Phase3" + +POPULATIONS = ["ALL", "AFR", "AMR", "EAS", "EUR", "SAS", "ACB", "ASW", "BEB", + "CDX", "CEU", "CHB", "CHS", "CLM", "ESN", "FIN", "GBR", "GIH", + "GWD", "IBS", "ITU", "JPT", "KHV", "LWK", "MSL", "MXL", "PEL", + "PJL", "PUR", "STU", "TSI", "YRI"] + +__all__ = ["PLINK", "read_bim"] + +############################################################################### +# Run plink jobs without repetative code # +############################################################################### + + +class PLINK(object): + + """A reusable object to run plink jobs on 1000genomes files quickly.""" + + written_files = {} + bims = {} + + def __init__(self, data=None, pop_file=None, plink='plink'): + """Load population information. + + Parameters + ---------- + data : str, optional + Path the the 1000genomes data files + (e.g. ALL.chr16.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.{bed,bim}) + Default set in DATA_DIR hardcoded in this file. + pop_file : str, optional + Path to the 1000 genomes population file + plink : str, optional + Path to plink executable, otherwise searches PATH + """ + self.plink = plink + data = data if data else DATA_DIR + if not _os.path.isdir(data): + raise ValueError('{} must be a directory, but no directory found' + .format(data)) + self.path = _os.path.abspath(data) + if not pop_file: + pop_file = _os.path.join( + self.path, 'integrated_call_samples_v3.20130502.ALL.panel' + ) + individuals = {} + with open(pop_file) as fin: + assert fin.readline() == 'sample\tpop\tsuper_pop\tgender\t\t\n' + for line in fin: + ind, pop, _, _ = line.split('\t') + if pop not in individuals: + individuals[pop] = [] + individuals[pop].append(ind) + self.individuals = individuals + self.files = {} + for fl in _os.listdir(self.path): + if 'phase3_' not in fl: + continue + if not fl.endswith('bed'): + continue + if not fl.startswith('ALL'): + continue + chrom = fl.split('.')[1] + if not chrom.startswith('chr'): + continue + fl = _os.path.abspath(_os.path.join(self.path, fl)) + root = '.'.join(fl.split('.')[:-1]) + bed = _os.path.abspath('{}.bed'.format(root)) + bim = _os.path.abspath('{}.bim'.format(root)) + fam = _os.path.abspath('{}.fam'.format(root)) + assert _os.path.isfile(bed) + assert _os.path.isfile(bim) + assert _os.path.isfile(fam) + c1 = chrom if chrom.startswith('chr') else 'chr' + str(chrom) + c2 = c1[3:] + for c in [c1, c2]: + self.files[c] = { + 'root': root, + 'bed': bed, + 'bim': bim, + 'fam': fam, + } + + def pop_file(self, populations=None, outfile=None): + """Write temp file with a list of individuals in population.""" + populations = populations if populations else self.individuals.keys() + if isinstance(populations, str): + populations = [populations] + populations = list(populations) + if outfile and _os.path.isfile(outfile): + outfile = _os.path.abspath(outfile) + self.written_files[','.join(populations)] = outfile + return outfile + if ','.join(populations) in self.written_files: + fl = self.written_files[','.join(populations)] + if _os.path.isfile(fl): + return fl + pop_ids = [] + bad_pops = [] + for pop_i in populations: + if pop_i in self.individuals: + pop_ids += self.individuals[pop_i] + else: + bad_pops.append(pop_i) + if bad_pops: + err = ( + "{} are not ancestral populations. Choose one of the following " + "ancestral populations: {}" + ).format(bad_pops, POPULATIONS) + raise ValueError(err) + + pop_ids = sorted(set(pop_ids)) + + if not outfile: + _, outfile = _temp(prefix='-'.join(populations), dir='/tmp') + with open(outfile, 'w') as fout: + fout.write('\n'.join( + [' '.join(x) for x in zip(pop_ids, pop_ids)] + )) + + self.written_files[','.join(populations)] = outfile + return outfile + + def bim_file(self, chrom, snps, outfile=None): + """Filter a bim file to only include SNPs in snps. + + Parameters + ---------- + chrom : str + A chromosome name for the file to work on. + snps : list_of_tuple + A list of snps from list_to_rsid_and_locs(): + (rsid, chrom, loc) + outfile : str, optional + A file to write to. A tempfile will be created if not is provided. + + Returns + ------- + file_path : str + Absolute path to the newly created file. + """ + if not chrom in self.files: + raise ValueError( + '{} is not in our bim files:\n{}'.format( + chrom, + '\n'.join([i[' bim'] for i in self.files.values()]) + ) + ) + snps = [(str(name), int(loc)) for name, loc in snps] + if not outfile: + _, outfile = _temp( + prefix='filtered_bim.', suffix='.bim', dir='/tmp' + ) + outfile = _os.path.abspath(outfile) + with _run.open_zipped(outfile, 'w') as fout: + for c, name, cm, pos, a1, a2 in read_bim(self.files[chrom]['bim']): + if (name, pos) in snps: + fout.write('\t'.join([c, name, cm, str(pos), a1, a2]) + '\n') + return outfile + + def bim_snps(self, chrom, bim_file=None): + """Return and cache all SNPs in a bim file. + + Note: will only cache rsids in bim if no bim_file is given. + + Parameters + ---------- + chrom : str + bim_file : str, optional + Bim file to work on, otherwise the chromosome is used to pick the + complete file. + + Returns + ------- + rsids : frozenset + """ + if not bim_file: + if chrom in self.bims: + with open(self.bims[chrom], 'rb') as fin: + return _pickle.load(fin) + try: + bim_file = self.files[chrom]['bim'] + except KeyError: + raise KeyError('{} is not in the bim file list'.format(chrom)) + _, pfl = _temp() + else: + pfl = None + rsids = [] + with open(bim_file) as fin: + for line in fin: + if not line.strip(): + continue + rsids.append(line.strip().split('\t')[1]) + rsids = frozenset(rsids) + if pfl: + with open(pfl, 'wb') as fout: + _pickle.dump(rsids, fout) + self.bims[chrom] = pfl + return rsids + + def many_to_many(self, snps, comp_list, chrom, r2=0.6, populations=None, + window_size=50000, outfile=None, + keep_int_files=False, raise_on_error=False, + logfile=_sys.stderr): + """Get many-to-many LD information using plink. + + Will do a single lookup for all SNPs in snps using plink on a filtered + bim file generated to only contain the SNPs in comp_list. + + Parameters + ---------- + snps : list_of_tuple + list of rsIDs to query in the format: + (rsid, loc) (from pairs_to_lists()) + comp_list : list_of_tuple + list of rsIDs to compare to in the format: + (rsid, loc) (from pairs_to_lists()) + chrom : str + which chromosome to search + r2 : float, optional + r-squared level to use for filtering + populations : list_of_str, optional + list of populations to include in the analysis + window_size : int, optional + Integer number for window size, default 50kb. + outfile : str, optional + A file root for plink to write to, output will have '.ld' appended + to it. If not provided, temp file used and deleted after use. + keep_int_files : bool, optional + Do not delete intermediate SNP files + raise_on_error : bool, optional + if False, will return None if primary SNP missing from bim file + logfile : filehandle, optional + A file like object to write to + + Returns + ------- + matching_snps : dict + For every matching SNP that beats the r-squared: { + snp: {r2: r-squared, dprime: d-prime, phased: phased-alleles} + } + If plink job fails, returns an empty dictionary. + """ + window_size = _run.get_length(window_size) + if not snps or not comp_list: + raise ValueError('snps and comp_list cannot be null') + snps = set(snps) + comp_list = set(comp_list) + bsnps = self.bim_snps(chrom) + all_snps = [i for i, _ in snps | comp_list if i in bsnps] + good = [] + bad = [] + rsids = {} + for rsid, loc in snps: + if rsid in bsnps: + good.append(rsid) + rsids[rsid] = int(loc) + else: + bad.append(rsid) + if bad: + _sys.stderr.write( + 'The following SNPs are not in the bim file and cannot be ' + + 'queried:\n{}\n'.format(bad) + ) + del(bsnps) + # Initialize necessary files + bfile = self.files[chrom]['root'] + pop_file = self.pop_file(populations) + if outfile: + del_file = False + else: + _, outfile = _temp(prefix='plink', dir='/tmp') + _os.remove(outfile) + del_file = True + _, snp_file = _temp(prefix='plinksnps', dir='/tmp') + with open(snp_file, 'w') as fout: + fout.write('\n'.join(sorted([snp for snp, _ in snps])) + '\n') + _, comp_file = _temp(prefix='plinkcomp', dir='/tmp') + with open(comp_file, 'w') as fout: + fout.write('\n'.join(sorted(all_snps)) + '\n') + + # Build the command + plink_cmnd = ( + '{plink} --bfile {bfile} --r2 in-phase dprime ' + '--ld-snp-list {snp_file} --extract {comp_file} ' + '--ld-window {window} --keep {ind_file} --out {out}' + ).format( + plink=self.plink, bfile=bfile, window=window_size, + snp_file=snp_file, comp_file=comp_file, ind_file=pop_file, + out=outfile + ) + + # Run it + stdout, stderr, code = _run.run(plink_cmnd, raise_on_error) + # Parse the output file + if code != 0: + logfile.write( + '{}: plink command failed'.format(chrom) + + 'Command: {}\nExit Code: {}\nSTDOUT:\n{}\bSTDERR:\n{}\n' + .format(plink_cmnd, code, stdout, stderr) + ) + return {} + + results = {} + with open(outfile + '.ld') as fin: + # Check header + line = fin.readline().strip() + assert _re.split(r' +', line) == [ + 'CHR_A', 'BP_A', 'SNP_A', 'CHR_B', 'BP_B', + 'SNP_B', 'PHASE', 'R2', 'DP' + ] + for line in fin: + f = _re.split(r' +', line.strip()) + snp1, snp2, phased = f[2], f[5], f[6] + loc1, loc2 = int(f[1]), int(f[4]) + rsquared, dprime = float(f[7]), float(f[8]) + if snp1 not in good: + continue + if snp1 == snp2: + continue + if loc1 != rsids[snp1]: + continue + if rsquared < r2: + continue + if snp1 not in results: + results[snp1] = { + 'chrom': chrom, 'loc': loc1, 'matches': {} + } + try: + p1, p2 = phased.split('/') + s1a, s2a = p1 + s1b, s2b = p2 + lookup = {snp1: {s1a: s2a, s1b: s2b}, + snp2: {s2a: s1a, s2b: s1b}} + except ValueError: + lookup = {} + results[snp1]['matches'][snp2] = { + 'r2': rsquared, 'dprime': dprime, 'loc': loc2, + 'phased': phased, 'lookup': lookup + } + + if del_file: + for fl in [_os.path.join('/tmp', f) for f in _os.listdir('/tmp')]: + if fl.startswith(outfile): + _os.remove(fl) + if not keep_int_files: + _os.remove(snp_file) + _os.remove(comp_file) + + return results + + + def one_to_many(self, snp, comp_list, chrom, r2=0.6, populations=None, + raise_on_error=False, logfile=_sys.stderr): + """Get one-to-many LD information using plink. + + Parameters + ---------- + snp : str + rsID of a SNP to query + comp_list : list_of_str + list of rsIDs to compare to + chrom : str + which chromosome to search + r2 : float, optional + r-squared level to use for filtering + populations : list_of_str, optional + list of populations to include in the analysis + raise_on_error : bool, optional + if False, will return None if primary SNP missing from bim file + logfile : filehandle, optional + A file like object to write to + + Returns + ------- + matching_snps : dict + For every matching SNP that beats the r-squared: { + snp: {r2: r-squared, dprime: d-prime, phased: phased-alleles} + } + If plink job fails, returns an empty dictionary. + """ + _, temp_file = _temp(prefix='plink', dir='/tmp') + pop_file = self.pop_file(populations) + bfile = _os.path.join( + DATA_DIR, + 'ALL.{}.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes' + .format(chrom) + ) + bim = bfile + '.bim' + # We need to include the query SNP in the lookup list + comp_list = list(comp_list) + comp_list.append(snp) + comp_list = sorted(set(comp_list)) + # Filter SNPs not in the bim + bim_snps = self.bim_snps(chrom) + bad = [] + if snp not in bim_snps: + err = ('Primary SNP {} is not in BIM {}, cannot continue.' + .format(snp, bim)) + if raise_on_error: + raise BadSNPError(err) + else: + logfile.write(err + '\n') + return None + bad = [] + for s in comp_list: + if s not in bim_snps: + bad.append(s) + comp_list.remove(s) + if bad: + _sys.stderr.write(('{} removed from comparison list as not in ' + + 'bim file\n').format(bad)) + del(bim_snps) + # Build the command + plink_cmnd = ( + '{plink} --bfile {bfile} --r2 in-phase dprime --ld-snp {snp} ' + '--snps {comp_list} --keep {ind_file} --out {tmp}' + ).format( + plink=self.plink, + bfile=bfile, + snp=snp, comp_list=' '.join(comp_list), + ind_file=pop_file, tmp=temp_file + ) + # Run it + stdout, stderr, code = _run.run(plink_cmnd, raise_on_error) + # Parse the output file + if code != 0: + logfile.write( + '{}: plink command failed'.format(snp) + + 'Command: {}\nExit Code: {}\nSTDOUT:\n{}\bSTDERR:\n{}\n' + .format(plink_cmnd, code, stdout, stderr) + ) + return {} + results = {} + with open(temp_file + '.ld') as fin: + # Check header + line = fin.readline().strip() + assert _re.split(r' +', line) == [ + 'CHR_A', 'BP_A', 'SNP_A', 'CHR_B', 'BP_B', + 'SNP_B', 'PHASE', 'R2', 'DP' + ] + for line in fin: + f = _re.split(r' +', line.strip()) + snp2, phased = f[5], f[6] + rsquared, dprime = float(f[7]), float(f[8]) + if snp2 == snp: + continue + if rsquared < r2: + continue + try: + p1, p2 = phased.split('/') + s1a, s2a = p1 + s1b, s2b = p2 + lookup = {snp: {s1a: s2a, s1b: s2b}, + snp2: {s2a: s1a, s2b: s1b}} + except ValueError: + lookup = {} + results[snp2] = {'r2': rsquared, 'dprime': dprime, + 'phased': phased, 'lookup': lookup} + _os.remove(temp_file + '.ld') + return results + + +############################################################################### +# Helper Functions # +############################################################################### + + +class MissingSNPError(Exception): + + """Exceception to catch missing SNPs.""" + + pass + + +class BadSNPError(Exception): + + """Exceception to catch missing SNPs.""" + + pass + + +def read_bim(bim_file): + """Yields a tuple for each line in bim_file. + + Parameters + ---------- + bim_file : str + Path to a bim file, can be zipped or an open file handle. + + Yields + ------ + chromsome : str + name : str + cm : str + Position in centimorgans, usually 0 + position : int + allele_1 : str + allele_2 : str + """ + with _run.open_zipped(bim_file) as fin: + for line in fin: + chrom, name, cm, loc, a1, a2 = line.split('\t') + yield chrom, name, cm, int(loc), a1, a2 diff --git a/setup.py b/setup.py index 17d139f..7ba429d 100755 --- a/setup.py +++ b/setup.py @@ -24,7 +24,7 @@ setup( name='LD_Direction', - version='0.1.0a', + version='0.2.0b1', description=('Lookup LD between any two lists of SNPs'), long_description=long_description, url='https://github.com/MikeDacre/dbSNP', From e033f073c047fc8a616a22386cb7e756e685407a Mon Sep 17 00:00:00 2001 From: Mike Dacre Date: Fri, 12 May 2017 23:53:53 -0700 Subject: [PATCH 5/5] Little bug fixes --- LD_Direction/LDpair.py | 2 ++ LD_Direction/_run.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/LD_Direction/LDpair.py b/LD_Direction/LDpair.py index b575e68..62f105d 100755 --- a/LD_Direction/LDpair.py +++ b/LD_Direction/LDpair.py @@ -182,6 +182,8 @@ def get_snp_info(snp): if not vcf: raise SNP_Lookup_Failure(snp + " is not in 1000G reference panel.") elif len(vcf) == 1: + if not vcf[0].strip(): + raise SNP_Lookup_Failure(snp + " is not in 1000G reference panel.") geno = vcf[0].strip().split() else: geno = [] diff --git a/LD_Direction/_run.py b/LD_Direction/_run.py index 2734be0..e224c95 100644 --- a/LD_Direction/_run.py +++ b/LD_Direction/_run.py @@ -68,7 +68,7 @@ def run(command, raise_on_error=False): def run_cmnd(cmnd): """Run a command and return the output split into a list by newline.""" - output, _, _ = _run.run(cmnd, raise_on_error=True) + output, _, _ = run(cmnd, raise_on_error=True) return output.strip().split('\n')