-
Notifications
You must be signed in to change notification settings - Fork 5
/
evaluate_referencegenome.py
143 lines (116 loc) · 5.22 KB
/
evaluate_referencegenome.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import os
import shutil
import multiprocessing as mp
import argparse
import mappy
import numpy as np
import pandas as pd
from pandas.errors import EmptyDataError
from src.io import read_fast, find_files
from src.evaluation import eval_pair
def results_queue_writer(output_file, q):
while True:
df = q.get()
if df == 'kill':
break
else:
df = pd.DataFrame(df, index=[0])
header = True
with open(output_file, 'r') as f:
for line in f:
if not line.startswith('#'):
header = False
break
df.to_csv(output_file, mode='a', header=header, index=False)
def eval_pair_wrapper(reads_queue, writer_queue, ref_fasta_file, homopolymer_min_length, verbose):
"""Wrapper evaluate a prediction in the queue
Args:
references (dict): dictionary with reference sequences
read_queue (multiprocessing.Queue): queue from where to get the predictions
writer_queue (multiprocessing.Queue): queue where to send the results
verbose (bool): whether to print the read_id being processed
"""
ref = mappy.Aligner(ref_fasta_file)
while not reads_queue.empty():
data = reads_queue.get()
read_id, prediction = data
if verbose:
print(read_id)
if isinstance(prediction, tuple):
pred, comment, phredq = prediction
if comment == '+' or comment == '-':
comment = None
else:
pred = prediction
phredq = None
comment = None
result = eval_pair(
ref = ref,
que = pred,
read_id = read_id,
homopolymer_min_length = homopolymer_min_length,
phredq = phredq,
comment = comment,
reference_genome = True,
)
writer_queue.put(result)
return None
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--basecalls-path", type=str, required=True, help='Path to a fasta or fastq file or dir to be searched')
parser.add_argument("--reference-path", type=str, required=True, help='Path to a fasta reference file')
parser.add_argument("--model-name", type=str, required=True, help='Name of the model')
parser.add_argument("--homopolymer-length", type=int, default = 5, help='Minimum length of same consecutive bases to be considered a homopolymer')
parser.add_argument("--output-file", type=str, help='csv output file', default = None)
parser.add_argument("--depth", type=int, help='How deep to look for fastq or fasta files', default = 1)
parser.add_argument("--processes", type=int, help='Number of parallel processes', default = 2)
parser.add_argument("--verbose", action="store_true", help='Print read ids as they are being evaluated')
parser.add_argument("--overwrite", action='store_true', help='Overwrite existing output file')
args = parser.parse_args()
# get all the basecall files
fast_files = list()
if os.path.isfile(args.basecalls_path):
fast_files.append(os.path.abspath(args.basecalls_path))
else:
for fast_file in find_files(args.basecalls_path, endings = ['.fasta', '.fastq'], maxdepth = args.depth):
fast_files.append(os.path.abspath(fast_file))
fast_files = np.unique(fast_files)
# output file name
if args.output_file is None:
output_file = os.path.join("/".join(fast_files[0].split('/')[:-1]), 'evaluation.csv')
else:
output_file = args.output_file
assert output_file.endswith('.csv'), "output file must end with .csv"
# check if output file exists to skip evaluated reads
processed_ids = set()
if os.path.isfile(output_file):
if args.overwrite:
os.remove(output_file)
with open(output_file, 'w') as f:
f.write('#'+args.model_name+'\n')
else:
try:
df = pd.read_csv(output_file, header = 0, index_col = False, comment = '#')
processed_ids = set(df['read_id'])
print('Output file already exists, {0} reads already evaluated'.format(len(processed_ids)))
except EmptyDataError:
pass
else:
with open(output_file, 'w') as f:
f.write('#'+args.model_name+'\n')
# start multiprocessing manager and queues for reading and writing
manager = mp.Manager()
writer_queue = manager.Queue()
reads_queue = manager.Queue()
watcher_writer = mp.Process(target = results_queue_writer, args = (output_file, writer_queue, ))
watcher_writer.start()
for basecalls_file in fast_files:
for read_id, basecalls in read_fast(basecalls_file).items():
if read_id in processed_ids:
continue
reads_queue.put((read_id, basecalls))
with mp.Pool(processes=args.processes-1) as pool:
multiple_results = [pool.apply_async(eval_pair_wrapper, (reads_queue, writer_queue, args.reference_path, args.homopolymer_length, args.verbose)) for _ in range(args.processes-1)]
results = [res.get() for res in multiple_results]
writer_queue.put('kill')
watcher_writer.join()