-
Notifications
You must be signed in to change notification settings - Fork 0
/
mutate.nucleotide.sequence.records.py
executable file
·243 lines (198 loc) · 7.21 KB
/
mutate.nucleotide.sequence.records.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
#!/usr/bin/env python3
import gzip
import os
import shutil
import sys
from argparse import ArgumentParser
from tempfile import mkdtemp
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
import numpy as np
from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio.SeqIO.QualityIO import FastqGeneralIterator
def parse_args():
parser = ArgumentParser(
description='Introduce SNPs into sequence records',
epilog='NOTE: More mutations than the length of a sequence is'
' supported. Each mutation position is chosen at random, and it is'
' possible 2 mutations would occur at the same site.',
add_help=False,
)
req = parser.add_argument_group('Required')
req.add_argument(
'-i',
'--infile',
required=True,
metavar='FILE',
help='input sequence file '
'(FastA or FastQ, optionally gunzip compressed)',
)
opt = parser.add_argument_group('Input Options')
opt.add_argument(
'-k',
'--keep-lowercase',
action='store_true',
default=False,
help='keep mutated nucleotides lowercase [off]',
)
opt.add_argument(
'-m',
'--mutations-per-record',
metavar='INT',
type=require_int_nonnegative,
default=1,
help='number of mutation events to introduce for each sequence record'
' [%(default)s]',
)
opt.add_argument(
'-o',
'--outfile',
metavar='FILE',
type=str,
default=None,
help='output file [stdout]',
)
opt.add_argument(
'-h',
'--help',
action='help',
help='show this help message and exit'
)
return parser.parse_args()
def require_int_nonnegative(x):
try:
if int(x) < 0 or '.' in str(x):
sys.stderr.write(
'ERROR: {} must be a non-negative integer\n'.format(x)
)
sys.exit(1)
except ValueError:
sys.stderr.write('ERROR: {} must be an integer\n'.format(x))
sys.exit(1)
return int(x)
def decompress_file(infile, outdir):
# Create a new output file name
uncompressed_file = os.path.basename(infile).rstrip('.gz')
outfile = os.path.join(outdir, uncompressed_file)
# Decompress the gunzipped file
with gzip.open(infile, 'rb') as ifh, open(outfile, 'wb') as ofh:
shutil.copyfileobj(ifh, ofh)
# Send the full path of the decompressed file back
return outfile
def mutate_nucleotide_site(input_nucleotide, bool_keep_lowercase):
nucleotides = ['a', 't', 'c', 'g']
# Remove the input nucleotide from the list to guarantee we create a SNP
# NOTE: even an 'N' or other non-ATCG nucleotide will be mutated
if input_nucleotide.lower() in nucleotides:
nucleotides.remove(input_nucleotide.lower())
# Randomly choose a new (different) nucleotide
random_value = np.random.randint(low=0, high=len(nucleotides))
new_nucleotide = nucleotides[random_value]
# Choose upper or lowercase for the mutation character
if not bool_keep_lowercase:
new_nucleotide = new_nucleotide.upper()
return new_nucleotide
def mutate_nucleotide_string(input_string, position, bool_keep_lowercase):
# Extract the individual nucleotide to be mutated
old_nucleotide = input_string[position]
# Randomly choose a different nucleotide
new_nucleotide = mutate_nucleotide_site(
old_nucleotide, bool_keep_lowercase
)
# Form the new full length nucleotide string
mutated_nucleotide_string = (
input_string[:position] + new_nucleotide + input_string[position+1:]
)
return mutated_nucleotide_string
def mutate_fastq(infile, mutations_per_record, bool_keep_lowercase, outfile):
new_sequence_records = []
for title, sequence, quality in FastqGeneralIterator(infile):
seq = str(sequence)
# Determine random positions within the nucleotide string to mutate
random_positions = list(
np.random.randint(
low=0, high=len(sequence), size=mutations_per_record
)
)
# Mutate the sequence; handles 1 or more mutations per record
for pos in random_positions:
seq = mutate_nucleotide_string(seq, pos, bool_keep_lowercase)
# Verify we truly have mutated the sequence
if seq == sequence:
sys.stderr.write('ERROR: {} sequence not mutated\n'.format(seq))
sys.exit(1)
# Construct a new FastQ sequence record object
fastq_record_string = '@{}\n{}\n+\n{}\n'.format(title, seq, quality)
new_rec = SeqIO.read(StringIO(fastq_record_string), 'fastq')
new_sequence_records.append(new_rec)
# Write the mutated output and report some general info
SeqIO.write(new_sequence_records, outfile, 'fastq')
print(
'INFO: mutated all {} sequence records'.format(
len(new_sequence_records)
)
)
def mutate_fasta(infile, mutations_per_record, bool_keep_lowercase, outfile):
new_sequence_records = []
for record in SeqIO.parse(infile, 'fasta'):
seq = str(record.seq)
# Determine random positions within the nucleotide string to mutate
random_positions = list(
np.random.randint(low=0, high=len(seq), size=mutations_per_record)
)
# Mutate the sequence; handles 1 or more mutations per record
for pos in random_positions:
seq = mutate_nucleotide_string(seq, pos, bool_keep_lowercase)
# Verify we truly have mutated the sequence
if seq == record.seq:
sys.stderr.write('ERROR: {} sequence not mutated\n'.format(seq))
sys.exit(1)
# Construct a new FastA sequence record object
new_rec = SeqRecord(
id=record.description, seq=Seq(seq), description='', name=''
)
new_sequence_records.append(new_rec)
# Write the mutated output and report some general info
SeqIO.write(new_sequence_records, outfile, 'fasta')
print(
'INFO: mutated all {} sequence records'.format(
len(new_sequence_records)
)
)
def main():
opt = parse_args()
# I/O handling
mutations_per_record = opt.mutations_per_record
infile = os.path.realpath(os.path.expanduser(opt.infile))
if opt.outfile is None:
outfile = sys.stdout
else:
outfile = os.path.realpath(os.path.expanduser(opt.outfile))
bool_keep_lowercase = opt.keep_lowercase
# Auto-handle gunzip compressed input
tmp = mkdtemp()
if infile.endswith('.gz'):
infile = decompress_file(infile, tmp)
# Iteratate across each sequence record and mutate
if infile.lower().endswith(('.fastq', '.fq', '.fsq')):
mutate_fastq(
infile, mutations_per_record, bool_keep_lowercase, outfile
)
elif infile.lower().endswith(('.fasta', '.fa', '.fsa', '.fas')):
mutate_fasta(
infile, mutations_per_record, bool_keep_lowercase, outfile
)
else:
sys.stderr.write(
'ERROR: unable to detect file extension as FastA or FastQ'
' format\n'
)
sys.exit(1)
# Cleanup
shutil.rmtree(tmp)
if __name__ == '__main__':
main()