1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 __author__ = "Michael Imelfort"
27 __copyright__ = "Copyright 2014"
28 __credits__ = ["Michael Imelfort"]
29 __license__ = "LGPLv3"
30 __maintainer__ = "Michael Imelfort"
31 __email__ = "mike@mikeimelfort.com"
32
33
34
35
36 import ctypes as c
37 import sys
38 import multiprocessing as mp
39 import gzip
40 import Queue
41 import time
42 from copy import deepcopy
43
44
45 from bammExceptions import *
46 from cWrapper import *
47
48
49
50
51
52
54 '''The principle manager of a collection of ReadSet objects
55
56 Determines the exact names of the output files when extracting reads.
57 Ensures that only one thread can write to one file at a time.
58 '''
59
61 '''Default constructor.
62
63 Initializes a ReadSet instance with the provided set of properties.
64
65 Inputs:
66 manager - multiprocessing.Manager() instance owned by the
67 BamExtractor use this to make all Queues.
68 Outputs:
69 None
70 '''
71
72
73
74
75 self.outFiles = {}
76
77
78
79 self.fnPrefix2ReadSet = {}
80
81
82
83 self.lock = manager.Lock()
84 self.readSetInUse = {}
85
86
87
88
89 self._threadsAreValid = True
90
91
92
93
94
95
96
97 self.requestQueue = manager.Queue()
98 self.freeQueue = manager.Queue()
99
100
101
102
103
104
105 self.responseQueues = None
106
107
108
109 self.printQueue = None
110
112 '''Stop all the threads from running
113
114 Also stops any looping processes in the ReadSets
115
116 Inputs:
117 None
118 Outputs:
119 None
120 '''
121 self._threadsAreValid = False
122 for file_name in self.fnPrefix2ReadSet.keys():
123 self.fnPrefix2ReadSet[file_name].threadsAreValid = False
124
126 '''Set response queues used to pass ReadSets to the extract threads
127
128 Inputs:
129 queues - dict { outPrefix : mp.Manager().Queue() }, one for each
130 thread that's parsing BAM files for the BAmExtractor
131 Outputs:
132 None
133 '''
134 self.responseQueues = queues
135
137 '''Set the print queue for communcating upwards
138
139 Inputs:
140 queue - mp.Manager().Queue(), print queue managed by the BamExtractor
141 Outputs:
142 None
143 '''
144 self.printQueue = queue
145
147 '''Ensure that only one thread at a time has access to a read set
148
149 Inputs:
150 bid - int, the unique identifier for a BAM file
151 gid - int, the unique identifier for a target group
152 rpi - enum, describes the type of read (RPI.FIR etc.)
153 threadId - string, identifies the thread that is requesting
154 the resource.
155 Outputs:
156 The requested read set or None if it is not available
157 '''
158 read_set = self.outFiles[bid][gid][rpi]
159 file_prefix = read_set.getConstFP()
160 with self.lock:
161 if file_prefix in self.readSetInUse:
162 read_set = None
163 else:
164 self.readSetInUse[file_prefix] = threadId
165 return read_set
166
168 '''Indicate that a thread is finished with a read set
169
170 Inputs:
171 bid - int, the unique identifier for a BAM file
172 gid - int, the unique identifier for a target group
173 rpi - enum, describes the type of read (RPI.FIR etc.)
174 threadId - string, identifies the thread that is freeing
175 the resource.
176 Outputs:
177 None
178
179 Raises:
180 InvalidParameterSetException - if the supplied information
181 doesn't make sense
182 '''
183 read_set = self.outFiles[bid][gid][rpi]
184 file_prefix = read_set.getConstFP()
185 with self.lock:
186 try:
187 if self.readSetInUse[file_prefix] == threadId:
188 del self.readSetInUse[file_prefix]
189 else:
190 raise InvalidParameterSetException( \
191 "%s owned by %s, not %s" % \
192 (file_prefix,
193 self.readSetInUse[file_prefix],
194 threadId))
195 except KeyError:
196 raise InvalidParameterSetException("%s not owned by anyone" % \
197 file_prefix)
198 pass
199
201 '''Manage requests by parsing threads to access ReadSets
202
203 This process runs on it's own thread and continues until it discovers
204 a None on the requestQueue or self._threadsAreValid is set to False
205
206 Inputs:
207 None
208
209 Outputs:
210 None
211 '''
212
213 while self._threadsAreValid:
214 item = self.requestQueue.get(timeout=None, block=True)
215 if item is None:
216 break
217 else:
218
219 (thread_id, bid, gid, rpi, is_fastq) = item
220 return_queue = self.responseQueues[thread_id]
221 read_set = None
222 while read_set is None and self._threadsAreValid:
223 read_set = self.getReadSet(bid, gid, rpi, thread_id)
224
225 if read_set is None:
226
227
228 try:
229 (f_thread_id,
230 f_bid,
231 f_gid,
232 f_rpi) = self.freeQueue.get(block=True,
233 timeout=2)
234 try:
235 self.freeReadSet(f_bid,
236 f_gid,
237 f_rpi,
238 f_thread_id)
239 except InvalidParameterSetException: pass
240 except Queue.Empty:
241
242 time.sleep(2)
243
244
245
246 if read_set is None:
247
248 return_queue.put(None)
249 break
250
251
252
253 return_queue.put(deepcopy(read_set))
254
255
256
257 if is_fastq:
258 read_set._fastqWritten = True
259 else:
260 read_set._fastaWritten = True
261
262 - def organiseOutFiles(self,
263 prettyBamFileNames,
264 groupNames,
265 zipped,
266 interleaved,
267 mixBams,
268 mixGroups,
269 mixReads,
270 headersOnly,
271 outFolder,
272 prefix,
273 ):
274 '''Determine all the outFile prefixes needed for extraction.
275
276 The RSM manages a collection of output file objects called ReadSets
277 These are made here and placed into hashes for retrieval later. This
278 function also populates instance variables self.fnPrefix2ReadSet and
279 self.outFiles; the two main ways that ReadSets can be accessed.
280
281 File names vary wildly depending on the values of flags such as:
282 mixGroups, mixBams etc. Take care when modifying this code.
283
284 Inputs:
285 prettyBamFileNames - [ string ] simple versions of the BAM filenames
286 groupNames - [ string ] identify contig groups. EX: [bin1, bin2]
287 zipped - bool, True if the output should be zipped
288 interleaved - bool, True if the output should be interleaved
289 mixBams - bool, True if BAM file origin should be ignored
290 mixGroups - bool, True if group origin should be ignored
291 mixReads - bool, True if (un)paired distinction should be ignored
292 headersOnly - bool, True if only headers should be printed
293 outFolder - string, folder to write output to
294 prefix - string, prefix to apply to all output files
295
296 Outputs:
297 of_prefixes - { bamId : { groupId : { rpi : outFile prefix } } },
298 this hash can be used to get the file name prefix for a
299 read set based on BAM file origin, group origin and
300 pairing information.
301 '''
302 of_prefixes = {}
303 base_path = os.path.join(os.path.abspath(outFolder), prefix)
304
305
306 for bid in range(len(prettyBamFileNames)):
307 if mixBams:
308 bam_str = "allMapped"
309 else:
310 bam_str = prettyBamFileNames[bid]
311
312 if bam_str not in self.outFiles:
313 self.outFiles[bid] = {}
314 of_prefixes[bid] = {}
315
316 for gid in range(len(groupNames)):
317 if mixGroups:
318 grp_str = "allGroups"
319 else:
320 grp_str = groupNames[gid]
321
322 if gid not in self.outFiles[bid]:
323 self.outFiles[bid][gid] = {}
324 of_prefixes[bid][gid] = {}
325
326 if prefix == "":
327 fn = "%s%s.%s" % (base_path, bam_str, grp_str)
328 else:
329 fn = "%s.%s.%s" % (base_path, bam_str, grp_str)
330
331 if mixReads:
332
333 working_fn = fn + ".allReads"
334 try:
335 read_set_P = self.fnPrefix2ReadSet[working_fn]
336 except KeyError:
337 read_set_P = ReadSet(groupNames,
338 working_fn,
339 zipped=zipped,
340 headersOnly=headersOnly)
341 self.fnPrefix2ReadSet[working_fn] = read_set_P
342 read_set_S = read_set_P
343
344 elif interleaved:
345
346 working_fn_P = fn + ".pairedReads"
347 working_fn_S = fn + ".unpairedReads"
348 try:
349 read_set_P = self.fnPrefix2ReadSet[working_fn_P]
350 except KeyError:
351 read_set_P = ReadSet(groupNames,
352 working_fn_P,
353 paired=True,
354 zipped=zipped,
355 headersOnly=headersOnly)
356 self.fnPrefix2ReadSet[working_fn_P] = read_set_P
357
358 try:
359 read_set_S = self.fnPrefix2ReadSet[working_fn_S]
360 except KeyError:
361 read_set_S = ReadSet(groupNames,
362 working_fn_S,
363 zipped=zipped,
364 headersOnly=headersOnly)
365 self.fnPrefix2ReadSet[working_fn_S] = read_set_S
366
367 else:
368
369 working_fn_1 = fn + ".1"
370 working_fn_2 = fn + ".2"
371 working_fn_S = fn + ".unpairedReads"
372 try:
373 read_set_P = self.fnPrefix2ReadSet[working_fn_1]
374 except KeyError:
375 read_set_P = ReadSet(groupNames,
376 working_fn_1,
377 outPrefix2=working_fn_2,
378 paired=True,
379 zipped=zipped,
380 headersOnly=headersOnly)
381 self.fnPrefix2ReadSet[working_fn_1] = read_set_P
382
383 try:
384 read_set_S = self.fnPrefix2ReadSet[working_fn_S]
385 except KeyError:
386 read_set_S = ReadSet(groupNames,
387 working_fn_S,
388 zipped=zipped,
389 headersOnly=headersOnly)
390 self.fnPrefix2ReadSet[working_fn_S] = read_set_S
391
392
393 self.outFiles[bid][gid][RPI.FIR] = read_set_P
394 self.outFiles[bid][gid][RPI.SEC] = read_set_P
395 of_prefixes[bid][gid][RPI.FIR] = read_set_P.getConstFP()
396 of_prefixes[bid][gid][RPI.SEC] = read_set_P.getConstFP()
397
398 self.outFiles[bid][gid][RPI.SNGL] = read_set_S
399 self.outFiles[bid][gid][RPI.SNGL_FIR] = read_set_S
400 self.outFiles[bid][gid][RPI.SNGL_SEC] = read_set_S
401 of_prefixes[bid][gid][RPI.SNGL] = read_set_S.getConstFP()
402 of_prefixes[bid][gid][RPI.SNGL_FIR] = read_set_S.getConstFP()
403 of_prefixes[bid][gid][RPI.SNGL_SEC] = read_set_S.getConstFP()
404
405 return of_prefixes
406
407
408
409
410
411
413 '''Container class to encapsulate the concept of a(n un)paired read set
414
415 This class manages file properties and writing functionality.'''
416
417 - def __init__(self,
418 groupNames,
419 outPrefix1,
420 outPrefix2 = None,
421 paired=False,
422 zipped=True,
423 headersOnly=False
424 ):
425 '''Default constructor.
426
427 Initializes a ReadSet instance with the provided set of properties.
428
429 Inputs:
430 groupNames - [ string ], target names, one for each target group.
431 outPrefix1 - string prefix of the first output file (read1).
432 outPrefix2 - string prefix of the second output file or None for
433 interleaved or unpaired files.
434 isPaired - bool, True if the file is a paired read file.
435 zipped - bool, True if the data should be compressed for writing.
436 headersOnly - bool, True if only headers should be printed.
437 '''
438
439 self.groupNames = groupNames
440 self.isPaired = paired
441 self.zipOutput = zipped
442 self.headersOnly = headersOnly
443
444
445 self._outPrefix1 = outPrefix1
446
447 self._outPrefix2 = outPrefix2
448
449
450 if zipped:
451 self._writeOpen = gzip.open
452 else:
453 self._writeOpen = open
454
455
456
457
458 self._threadsAreValid = True
459
460
461
462
463 self._fastqWritten = False
464 self._fastaWritten = False
465
467 '''return the unchanging filename prefix associated with this ReadSet
468
469 Inputs:
470 fNumber - which file prefix is needed? [1 or 2]
471 Outputs:
472 The corresponding prefix
473 '''
474 if 1 == fNumber:
475 return self._outPrefix1
476 else:
477 return self._outPrefix2
478
480 '''Determine the suffix of the file depending on if we're writing
481 fasta or fastq
482
483 Inputs:
484 isFastq - bool, is the file fastq (or fasta)
485 Outputs:
486 Filenames to be written to.
487 (fileName1, fileName2)
488 fileName2 will be None for unpaired or interleaved-paired files
489 '''
490 if self.headersOnly:
491 ext = ".list"
492 else:
493 if isFastq:
494 ext = ".fq"
495 else:
496 ext = ".fna"
497
498 if self.zipOutput:
499 ext += ".gz"
500
501 file_name1 = self.getConstFP()+ext
502 file_name2 = self.getConstFP(fNumber=2)
503 if file_name2 is not None:
504 file_name2 += ext
505
506 return (file_name1, file_name2)
507
508 - def writeChain(self,
509 pBMM,
510 isFastq,
511 printQueue=None
512 ):
513 '''Write a single print chain to disk
514
515 A print chain is a linked list of mapped reads that have been
516 pre-ordered and are ready to write (or print). The print chain can
517 contain either Fasta or Fastq reads but never both. File names are
518 determined on the fly based on the presence or absence of quality info
519 of the first read in the chain (determined by the BamExtractor) and
520 passed to this function as isFastq.
521
522 NOTE: This function does NOT free any memory associated with pBMM.
523
524 Inputs:
525 pBMM - c.POINTER(BM_mappedRead_C), the start of a linked list of
526 mapped reads, pre-ordered for printing by the BamExtractor
527 isFastq - bool, True if reads have quality information.
528 printQueue - Managed by the BamExtractor. Place all printing strings
529 here. Acts as a verbose flag.
530 Outputs:
531 None
532 '''
533 CW = CWrapper()
534
535
536
537 buffer_c = c.create_string_buffer(20000)
538 pbuffer_c = c.POINTER(c.c_char_p)
539 pbuffer_c = c.pointer(buffer_c)
540
541
542 str_len_c = c.c_int(0)
543 pstr_len_c = c.cast(c.addressof(str_len_c), c.POINTER(c.c_int))
544
545 paired_c = c.c_int(1)
546 unpaired_c = c.c_int(0)
547 headers = c.c_int(self.headersOnly)
548
549
550
551
552
553 group_name_c = c.c_char_p()
554
555
556 (out_file1, out_file2) = self.determineFileSuffix(isFastq)
557
558
559
560
561 opened = False
562 if isFastq and self._fastqWritten:
563 opened = True
564 elif not isFastq and self._fastaWritten:
565 opened = True
566
567 if opened:
568
569 open_mode = "a"
570 mode_desc = "Appending to"
571 else:
572
573 open_mode = "w"
574 mode_desc = "Writing"
575
576 if self.isPaired:
577
578
579 isFh1 = True
580
581
582 fh1 = self._writeOpen(out_file1, open_mode)
583 if out_file2 is None:
584 if printQueue:
585 printQueue.put(" %s interleaved file: %s" % (mode_desc,
586 out_file1))
587 fh2 = fh1
588 else:
589 if printQueue:
590 printQueue.put(" %s coupled files: %s %s" % (mode_desc,
591 out_file1,
592 out_file2))
593 fh2 = self._writeOpen(out_file2, open_mode)
594
595
596 while pBMM and self._threadsAreValid:
597
598 group_name_c = self.groupNames[pBMM.contents.group]
599 CW._sprintMappedRead(pBMM,
600 pbuffer_c,
601 pstr_len_c,
602 group_name_c,
603 headers,
604 paired_c)
605
606 printable_string = \
607 (c.cast(pbuffer_c,
608 c.POINTER(c.c_char*str_len_c.value)).contents).value
609 if isFh1:
610 fh1.write(printable_string)
611 isFh1 = False
612 else:
613 fh2.write(printable_string)
614 isFh1 = True
615
616
617 pBMM = CW._getNextPrintRead(pBMM)
618
619
620 fh1.close()
621 if out_file2 is not None:
622 fh2.close()
623 else:
624 fh = self._writeOpen(out_file1, open_mode)
625 if printQueue:
626 printQueue.put(" %s unpaired file: %s (%s)" % (mode_desc,
627 out_file1,
628 self))
629 while pBMM and self._threadsAreValid:
630 group_name_c = self.groupNames[pBMM.contents.group]
631 CW._sprintMappedRead(pBMM,
632 pbuffer_c,
633 pstr_len_c,
634 group_name_c,
635 headers,
636 unpaired_c)
637 printable_string = \
638 (c.cast(pbuffer_c,
639 c.POINTER(c.c_char*str_len_c.value)).contents).value
640 fh.write(printable_string)
641 pBMM = CW._getNextPrintRead(pBMM)
642
643 fh.close()
644
645
646
647
648
649