1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 __author__ = "Michael Imelfort"
27 __copyright__ = "Copyright 2014"
28 __credits__ = ["Michael Imelfort"]
29 __license__ = "LGPLv3"
30 __maintainer__ = "Michael Imelfort"
31 __email__ = "mike@mikeimelfort.com"
32
33
34
35
36 import os
37 import ctypes as c
38 from multiprocessing import Manager, Process, Value
39 import numpy as np
40 import sys
41 import gzip
42 import Queue
43 import time
44 from threading import Thread
45
46
47 from cWrapper import *
48 from bamFile import *
49 from bammExceptions import *
50 from bamRead import *
51
52
53
54
55
56
57
58
59
60
61
62
83 '''Single-process BAMfile read extraction.
84
85 cTypes pointers are unpickleable unless they are top level, so this function
86 lives outside the class and has 1,000,000 member variables passed to it.
87 Life would be easier if we could pass the class but any implicit copy
88 operations that follow are somewhat difficult to detect and can cause WOE.
89 Lot's of WOE, believe me...
90
91 Inputs:
92 threadId - string, a unique Id for this process / thread
93 outFilePrefixes - 3D dict for finding outFilePrefixes based on bamFile,
94 group and pairing information
95 bamPaths - { bid : string }, full paths to the BAM files
96 prettyBamFileNames - { bid : string }, short, print-friendly BAM names
97 numGroups - int, the number of groups reads are split into
98 perContigGroups - [int], contains groups Ids, insync with contigs array
99 contigs - [string], contig ids as written in the BAM
100 printQueue - Manager.Queue, thread-safe communication with users
101 extractQueue - Manager.Queue, bids (BAMs) yet to be extracted from
102 requestQueue - Manager.Queue, make requests for ReadSets for printing
103 freeQueue - Manager.Queue, tell the RSM when finished with a ReadSet
104 responseQueue - Manager.Queue, recieve copies of ReadSets from the RSM
105 headersOnly - == True -> write read headers only
106 mixGroups - == True -> use one file for all groups
107 minMapQual - int, skip all reads with a lower mapping quality score
108 maxMisMatches - int, skip all reads with more mismatches (NM aux files)
109 useSuppAlignments - == True -> skip supplementary alignments
110 useSecondaryAlignments - == True -> skip secondary alignments
111 verbose - == True -> be verbose
112
113 Outputs:
114 None
115 '''
116 while True:
117 p_bid = extractQueue.get(block=True, timeout=None)
118 if p_bid is None:
119 break
120 else:
121 if verbose:
122 printQueue.put("%s Preparing to extract reads from file: %s" % \
123 (threadId, prettyBamFileNames[p_bid] ) )
124
125
126 bamfile_c = c.c_char_p()
127 bamfile_c = bamPaths[p_bid]
128
129 pretty_name_c = c.c_char_p()
130 pretty_name_c = prettyBamFileNames[p_bid]
131
132 num_contigs = len(contigs)
133 contigs_c_array = (c.c_char_p * num_contigs)()
134 contigs_c_array[:] = contigs
135
136 groups_c_array = (c.c_uint16 * num_contigs)()
137 groups_c_array[:] = perContigGroups
138
139 headers_only_c = c.c_uint32()
140 headers_only_c = headersOnly
141
142 min_mapping_quality_c = c.c_uint32()
143 min_mapping_quality_c = minMapQual
144
145 max_mismatches_c = c.c_uint32()
146 max_mismatches_c = maxMisMatches
147
148 pBMM = c.POINTER(BM_mappedRead_C)
149
150
151 CW = CWrapper()
152 pBMM = CW._extractReads(bamfile_c,
153 contigs_c_array,
154 num_contigs,
155 groups_c_array,
156 pretty_name_c,
157 headers_only_c,
158 min_mapping_quality_c,
159 max_mismatches_c,
160 ignoreSuppAlignments,
161 ignoreSecondaryAlignments)
162
163 if verbose:
164 printQueue.put("%s Finished C-based extraction for: %s" \
165 % (threadId, prettyBamFileNames[p_bid]))
166 printQueue.put("%s Re-ordering reads before printing" % \
167 (threadId))
168
169
170
171
172
173
174
175
176
177 overlapper = {}
178 chain_info = {}
179
180
181 for gid in range(numGroups):
182 chain_info[gid] = {}
183
184 for rpi in [RPI.FIR, RPI.SNGL]:
185 file_name = outFilePrefixes[p_bid][gid][rpi]
186 try:
187 storage = overlapper[file_name]
188 except KeyError:
189
190 storage = [None, None, 0]
191 overlapper[file_name] = storage
192 chain_info[gid][rpi] = {'storage' : storage}
193
194 while pBMM:
195 '''
196 USE THIS CODE TO GET THE READ ID WHEN DEBUGGING
197 buffer_c = c.create_string_buffer(20000)
198 pbuffer_c = c.POINTER(c.c_char_p)
199 pbuffer_c = c.pointer(buffer_c)
200
201 # this variable records how much of the buffer is used for each read
202 str_len_c = c.c_int(0)
203 pstr_len_c = c.cast(c.addressof(str_len_c), c.POINTER(c.c_int))
204
205 paired_c = c.c_int(1)
206 headers = c.c_int(1)
207
208 group_name_c = c.c_char_p()
209 group_name_c = "THIS__"
210
211 CW._sprintMappedRead(pBMM,
212 pbuffer_c,
213 pstr_len_c,
214 group_name_c,
215 headers,
216 paired_c)
217 # unwrap the buffer and transport into python land
218 read_ID_debug = \
219 (c.cast(pbuffer_c,
220 c.POINTER(c.c_char*str_len_c.value)).contents).value
221
222 read_ID_debug = read_ID_debug.split(";")[-1].rstrip()
223 '''
224
225
226 rpi = pBMM.contents.rpi
227 c_rpi = RPIConv[rpi]
228
229
230
231 addable = []
232
233 if c_rpi != RPI.SEC:
234
235 addable.append([c.addressof(pBMM.contents), c_rpi])
236
237
238 if rpi == RPI.FIR:
239
240
241
242 r2_rpi = RPI.ERROR
243
244 if (1 == CW._partnerInSameGroup(pBMM)):
245
246
247 r2_rpi = RPI.FIR
248 else:
249
250
251
252 if mixGroups:
253
254
255 r2_rpi = RPI.FIR
256 else:
257
258 r2_rpi = RPI.SNGL
259 addable[0][1] = RPI.SNGL
260
261
262 addable.append([c.addressof((CW._getPartner(pBMM)).contents), r2_rpi])
263
264
265 for mappedRead in addable:
266 tmp_pBMM = c.cast(mappedRead[0], c.POINTER(BM_mappedRead_C))
267 has_qual = (tmp_pBMM.contents.qualLen != 0)
268 group = tmp_pBMM.contents.group
269
270
271 working_rpi = mappedRead[1]
272 stored_rpi = tmp_pBMM.contents.rpi
273 mi = MI.ER_EM_EG
274 if working_rpi == RPI.FIR:
275 mi = MI.PR_PM_PG
276 elif working_rpi == RPI.SNGL:
277 if stored_rpi == RPI.FIR or stored_rpi == RPI.SEC:
278 mi = MI.PR_PM_UG
279 if stored_rpi == RPI.SNGL_FIR or stored_rpi == RPI.SNGL_SEC:
280 mi = MI.PR_UM_NG
281 elif stored_rpi == RPI.SNGL:
282 mi = MI.UR_NM_NG
283
284 CW._setMICode(tmp_pBMM, mi)
285
286
287
288
289 try:
290
291
292 if chain_info[group][working_rpi]['isFastq'] ^ has_qual:
293
294
295 raise MixedFileTypesException( \
296 "You cannot mix Fasta and Fastq reads " \
297 "together in an output file")
298 except KeyError:
299
300
301
302
303
304
305
306 chain_info[group][working_rpi]['isFastq'] = has_qual
307
308
309 if chain_info[group][working_rpi]['storage'][1] is None:
310
311 chain_info[group][working_rpi]['storage'][0] = \
312 mappedRead[0]
313 chain_info[group][working_rpi]['storage'][1] = \
314 mappedRead[0]
315 chain_info[group][working_rpi]['storage'][2] = 1
316 else:
317
318 CW._setNextPrintRead( \
319 c.cast(chain_info[group][working_rpi]['storage'][1],
320 c.POINTER(BM_mappedRead_C)),
321 c.cast(mappedRead[0],
322 c.POINTER(BM_mappedRead_C)
323 )
324 )
325 chain_info[group][working_rpi]['storage'][1] = \
326 mappedRead[0]
327 chain_info[group][working_rpi]['storage'][2] += 1
328
329
330 pBMM = CW._getNextMappedRead(pBMM)
331
332
333 if verbose:
334 printQueue.put("%s Re-ordering complete. Preparing to write" % \
335 (threadId))
336
337 for gid in range(numGroups):
338 for rpi in [RPI.FIR, RPI.SNGL]:
339 if chain_info[gid][rpi]['storage'][1] is not None:
340
341 pBMM_chain = \
342 c.cast(chain_info[gid][rpi]['storage'][0],
343 c.POINTER(BM_mappedRead_C)
344 )
345
346
347
348
349
350 try:
351 requestQueue.put((threadId,
352 p_bid,
353 gid,
354 rpi,
355 chain_info[gid][rpi]['isFastq']))
356
357
358 RS = responseQueue.get(block=True, timeout=None)
359 if RS is None:
360
361 CW._destroyPrintChain(pBMM_chain)
362 else:
363
364 pBMM_destroy = c.POINTER(BM_mappedRead_C)
365 pBMM_destroy = pBMM_chain
366 RS.writeChain(pBMM_chain,
367 chain_info[gid][rpi]['isFastq'])
368 CW._destroyPrintChain(pBMM_destroy)
369
370
371 freeQueue.put((threadId,
372 p_bid,
373 gid,
374 rpi))
375
376
377 chain_info[gid][rpi]['storage'][1] = None
378
379 except KeyError:
380
381
382
383
384
385
386
387
388 pass
389
390 if verbose:
391 printQueue.put("%s Read extraction complete for file: %s" % \
392 (threadId, prettyBamFileNames[p_bid])
393 )
394
395
396
397
398
399
401 '''Class used to manage extracting reads from multiple BAM files'''
419 '''
420 Default constructor.
421
422 Set all the instance variables, make ReadSets, organise output files
423
424 Inputs:
425 contigs - [[string]], list of list contig IDs (used as a filter)
426 bamFiles - [string], list of bamfiles to extract reads from
427 prefix - string, append this string to the start of all output files
428 groupNames - [string], list of names of the groups in the contigs list
429 outFolder - path, write output to this folder
430 mixBams - == True -> use one file for all bams
431 mixGroups - == True -> use one file for all groups
432 mixReads - == True -> use one file for paired / unpaired reads
433 interleaved - == True -> use interleaved format for paired reads
434 bigFile - == True -> do NOT gzip outputs
435 headersOnly - == True -> write read headers only
436 minMapQual - int, skip all reads with a lower mapping quality score
437 maxMisMatches - int, skip all reads with more mismatches (NM aux files)
438 useSuppAlignments - == True -> DON'T skip supplementary alignments
439 useSecondaryAlignments - == True -> DON'T skip secondary alignments
440
441 Outputs:
442 None
443 '''
444
445 self.outFolder = outFolder
446
447
448 self.makeSurePathExists(self.outFolder)
449
450 self.bamFiles = bamFiles
451 self.prettyBamFileNames = [os.path.basename(bam).replace(".bam", "")
452 for bam in self.bamFiles]
453
454 self.prefix = prefix
455
456 self.mixBams = mixBams
457 self.mixGroups = mixGroups
458 self.mixReads = mixReads
459
460 self.interleaved = interleaved
461 if headersOnly:
462 self.headersOnly = 1
463 else:
464 self.headersOnly = 0
465
466 self.minMapQual = minMapQual
467 self.maxMisMatches = maxMisMatches
468
469 if useSuppAlignments:
470 self.ignoreSuppAlignments = 0
471 else:
472 self.ignoreSuppAlignments = 1
473
474 if useSuppAlignments:
475 self.ignoreSecondaryAlignments = 0
476 else:
477 self.ignoreSecondaryAlignments = 1
478
479
480 if bigFile:
481 self.zipped = False
482 else:
483 self.zipped = True
484
485
486 if groupNames == []:
487
488 groupNames = ["group_%d" % i for i in range(1, len(contigs)+1)]
489 self.groupNames = groupNames
490
491
492 self.contigs = contigs[0]
493 self.perContigGroups = [0]*len(self.contigs)
494
495 for i in range(1, len(contigs)):
496 self.contigs += contigs[i]
497 self.perContigGroups += [i] * len(contigs[i])
498
499 self.manager = Manager()
500
501
502 self.outputStream = sys.stderr
503 self.printQueue = self.manager.Queue()
504 self.printDelay = 0.5
505
506 self.RSM = ReadSetManager(self.manager)
507
508
509 self.RSM.setPrintQueue(self.printQueue)
510
511 self.outFilePrefixes= self.RSM.organiseOutFiles(self.prettyBamFileNames,
512 self.groupNames,
513 self.zipped,
514 self.interleaved,
515 self.mixBams,
516 self.mixGroups,
517 self.mixReads,
518 self.headersOnly,
519 self.outFolder,
520 self.prefix)
521
522
523 '''
524 for bid in range(len(self.bamFiles)):
525 for gid in range(len(self.groupNames)):
526 for rpi in [RPI.FIR, RPI.SEC, RPI.SNGL, RPI.SNGL_FIR, RPI.SNGL_SEC]:
527 sys.stderr.write("%s %s %s %s\n" % (self.prettyBamFileNames[bid], self.groupNames[gid], RPI2Str(rpi), str(self.outFilePrefixes[bid][gid][rpi])))
528 '''
529
531 '''Start extracting reads from the BAM files
532
533 This function is responsible for starting and stopping all threads and
534 processes used in bamm extract. Due to python multiprocessing's need to
535 pickle everything the actual work of extraction is carried out in the
536 first level function called externalExtractWrapper. See there for actual
537 extraction details. This function is primarily concerned with thread
538 and process management.
539
540 Inputs:
541 threads - int, the number of threads / processes to use
542 verbose - bool, True if lot's of stuff should be printed to screen
543
544 Outputs:
545 None
546 '''
547
548 extract_queue = self.manager.Queue()
549 for bid in range(len(self.bamFiles)):
550 extract_queue.put(bid)
551
552
553
554 for _ in range(threads):
555 extract_queue.put(None)
556
557
558 thread_ids = ["Thread_%s" % str(tt) for tt in range(threads)]
559
560
561
562
563 print_process = Process(target=self.managePrintQueue)
564 print_process.start()
565
566
567 request_management_threads = [Thread(target=self.RSM.manageRequests)
568 for _ in range(threads)]
569 for w in request_management_threads:
570 w.start()
571
572
573 response_queues = dict(zip(thread_ids,
574 [self.manager.Queue() for _ in range(threads)]
575 )
576 )
577
578 self.RSM.setResponseQueues(response_queues)
579
580
581 try:
582
583 extract_proc = [Process(target=externalExtractWrapper,
584 args=(thread_ids[tt],
585 self.outFilePrefixes,
586 self.bamFiles,
587 self.prettyBamFileNames,
588 len(self.groupNames),
589 self.perContigGroups,
590 self.contigs,
591 self.printQueue,
592 extract_queue,
593 self.RSM.requestQueue,
594 self.RSM.freeQueue,
595 response_queues[thread_ids[tt]],
596 self.headersOnly,
597 self.mixGroups,
598 self.minMapQual,
599 self.maxMisMatches,
600 self.ignoreSuppAlignments,
601 self.ignoreSecondaryAlignments,
602 verbose
603 )
604 )
605 for tt in range(threads)]
606
607
608 for p in extract_proc:
609 p.start()
610
611 for p in extract_proc:
612 p.join()
613
614
615 self.RSM.invalidateThreads()
616 for w in request_management_threads:
617 self.RSM.requestQueue.put(None)
618 for w in request_management_threads:
619 w.join()
620
621
622 self.printQueue.put(None)
623 print_process.join()
624
625
626 return 0
627
628 except:
629
630 sys.stderr.write("\nEXITING...\n")
631
632 for p in extract_proc:
633 p.terminate()
634
635
636 self.RSM.invalidateThreads()
637 for w in request_management_threads:
638 self.RSM.requestQueue.put(None)
639 for w in request_management_threads:
640 w.join()
641
642
643 print_process.terminate()
644
645
646 return 1
647
649 '''Write all the print requests to stdout / stderr
650
651 This function is run as a process and so can be terminated.
652 Place a None on the printQueue to terminate the process.
653
654 Change self.outputStream to determine where text will be written to.
655
656 Inputs:
657 None
658
659 Outputs:
660 None
661 '''
662 while True:
663 stuff = self.printQueue.get(timeout=None, block=True)
664 if stuff is None:
665 break
666 else:
667 self.outputStream.write("%s\n" % stuff)
668
670 '''Make sure that a path exists, make it if necessary
671
672 Inputs:
673 path - string, full or relative path to create
674
675 Outputs:
676 None
677 '''
678 try:
679 os.makedirs(path)
680 except OSError as exception:
681 import errno
682 if exception.errno != errno.EEXIST:
683 raise
684
685
686
687
688
689