-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathfamily_fasta.py
156 lines (131 loc) · 5.96 KB
/
family_fasta.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import errno
import sqlite3
import os
import pandas as pd
import argparse
import util
"""
This script, `family_fasta.py`, is responsible for generating FASTA files for each family of a specified higher taxon
from a SQLite database.
The script performs the following steps:
1. Connects to the SQLite database.
2. Retrieves distinct families and BINs (Barcode Index Numbers) for the higher taxon defined in the query restrictions.
3. Iterates over each unique family and creates a directory for each family.
4. For each BIN in a family, it fetches the longest sequence and writes it to the corresponding family's FASTA file.
5. The sequence written to the FASTA file is stripped of non-ACGT characters as HMMER (used in later steps of the
workflow) chokes on them.
6. The script continues this process until it has iterated over all families and their respective BINs.
7. Closes the connection to the database.
The script uses command line arguments for the database file to query, directory to write FASTA files to, taxonomic
level to filter (e.g. order), taxon name to filter (e.g. Primates), number of chunks (families) to write to file,
marker code (e.g. COI-5P), and log level. This script is invoked by the Snakefile as a shell command with the required
arguments in the rule `family_fasta`.
"""
def get_family_bins(q, conn):
"""
Gets distinct families and bins for the higher taxon defined in the query restrictions
:param q: Dictionary with query restrictions
:param conn: Connection to SQLite database
:return Pandas data frame
"""
# Check if filter_level in config.yaml is usable
if q['level'].lower() in ['kingdom', 'phylum', 'class', 'order', 'family', 'subfamily', 'genus', 'all']:
# Select all distinct family names that match config.yaml filters
level = q['level']
name = q['name']
marker_code = q['marker_code']
sql = f'''
SELECT DISTINCT family, bin_uri
FROM barcode
WHERE marker_code = '{marker_code}'
AND "{level}" = '{name}'
AND family IS NOT NULL
AND family <> ''
ORDER BY family ASC, bin_uri ASC;
'''
fam = pd.read_sql_query(sql, conn)
# Check if this configuration contains any records at all
if len(fam) == 0:
raise Exception(f"No records found with {q}.")
else:
raise Exception(f"Filter level {q['level']} from config file does not exists as a column in the database")
return fam
def write_bin(q, conn, fh):
"""
Writes the longest sequence for a BIN to file
:param q: query object
:param conn: DB connection
:param fh: file handle
:return:
"""
# Fetch the longest sequence in the BIN
logger.info(f"Writing longest sequence for BIN {q['bin_uri']} to FASTA")
query = f'''
SELECT b.processid, t.bin_uri, t.opentol_id, t.species, b.nuc, b.barcode_id
FROM barcode b
JOIN taxon t ON b.taxon_id = t.taxon_id
WHERE
t."{q["level"]}" = "{q["name"]}" AND
t.family = "{q["family"]}" AND
t.bin_uri = "{q["bin_uri"]}" AND
b.marker_code = "{q["marker_code"]}" AND
t.species IS NOT NULL AND
t.species <> ''
ORDER BY
b.nuc_basecount DESC LIMIT 1
'''
logger.debug(query)
famseq = pd.read_sql_query(query, conn)
# Append to file handle fh
for _, row in famseq.iterrows():
defline = f'>{row["barcode_id"]}|ott{row["opentol_id"]}|{row["processid"]}|{row["bin_uri"]}|{row["species"]}\n'
fh.write(defline)
# Strip non-ACGT characters (dashes, esp.) because hmmer chokes on them
seq = row['nuc'].replace('-', '') + '\n'
fh.write(seq)
if __name__ == '__main__':
# Define and process command line arguments
parser = argparse.ArgumentParser(description='Required command line arguments.')
parser.add_argument('-d', '--database', required=True, help='Database file to query')
parser.add_argument('-f', '--fasta_dir', required=True, help='Directory to write FASTA files to')
parser.add_argument('-l', '--level', required=True, help='Taxonomic level to filter (e.g. order)')
parser.add_argument('-n', '--name', required=True, help='Taxon name to filter (e.g. Primates)')
parser.add_argument('-c', '--chunks', required=True, help="Number of chunks (families) to write to file")
parser.add_argument('-m', '--marker', required=True, help='Marker code, e.g. COI-5P')
parser.add_argument('-v', '--verbosity', required=True, help='Log level (e.g. DEBUG)')
args = parser.parse_args()
database_file = args.database
# Configure logger
logger = util.get_formatted_logger('family_fasta', args.verbosity)
# Connect to the database (creates a new file if it doesn't exist)
logger.info(f"Going to connect to database {args.database}")
connection = sqlite3.connect(args.database)
cursor = connection.cursor()
# Get families and bins for configured level and name
query = {
'level': args.level,
'name': args.name,
'marker_code': args.marker
}
df = get_family_bins(query, connection)
# Iterate over distinct families
index = 1
for family in df['family'].unique():
logger.info(f"Writing {family}")
# Make directory and open file handle
subdir = os.path.join(args.fasta_dir, f"{index}-of-{args.chunks}")
try:
os.mkdir(subdir)
except OSError as error:
logger.warning(error)
with open(os.path.join(subdir, 'unaligned.fa'), 'w') as handle:
# Iterate over bins in family
family_bin_uris = df[df['family'] == family]['bin_uri'].unique()
for bin_uri in family_bin_uris:
logger.debug(f"Writing {bin_uri}")
query['bin_uri'] = bin_uri
query['family'] = family
write_bin(query, connection, handle)
index += 1
# Close the connection
connection.close()