Package bamm :: Module bamRead
[hide private]
[frames] | no frames]

Source Code for Module bamm.bamRead

  1  ############################################################################### 
  2  #                                                                             # 
  3  #    bamRead.py                                                               # 
  4  #                                                                             # 
  5  #    Class for storing information about mapped reads                         # 
  6  #                                                                             # 
  7  #    Copyright (C) Michael Imelfort                                           # 
  8  #                                                                             # 
  9  ############################################################################### 
 10  #                                                                             # 
 11  #    This library is free software; you can redistribute it and/or            # 
 12  #    modify it under the terms of the GNU Lesser General Public               # 
 13  #    License as published by the Free Software Foundation; either             # 
 14  #    version 3.0 of the License, or (at your option) any later version.       # 
 15  #                                                                             # 
 16  #    This library is distributed in the hope that it will be useful,          # 
 17  #    but WITHOUT ANY WARRANTY; without even the implied warranty of           # 
 18  #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU        # 
 19  #    Lesser General Public License for more details.                          # 
 20  #                                                                             # 
 21  #    You should have received a copy of the GNU Lesser General Public         # 
 22  #    License along with this library.                                         # 
 23  #                                                                             # 
 24  ############################################################################### 
 25   
 26  __author__ = "Michael Imelfort" 
 27  __copyright__ = "Copyright 2014" 
 28  __credits__ = ["Michael Imelfort"] 
 29  __license__ = "LGPLv3" 
 30  __maintainer__ = "Michael Imelfort" 
 31  __email__ = "mike@mikeimelfort.com" 
 32   
 33  ############################################################################### 
 34   
 35  # system imports 
 36  import ctypes as c 
 37  import sys 
 38  import multiprocessing as mp 
 39  import gzip 
 40  import Queue 
 41  import time 
 42  from copy import deepcopy 
 43   
 44  # local import 
 45  from bammExceptions import * 
 46  from cWrapper import * 
 47   
 48  ############################################################################### 
 49  ############################################################################### 
 50  ############################################################################### 
 51  ############################################################################### 
 52   
53 -class ReadSetManager(object):
54 '''The principle manager of a collection of ReadSet objects 55 56 Determines the exact names of the output files when extracting reads. 57 Ensures that only one thread can write to one file at a time. 58 ''' 59
60 - def __init__(self, manager):
61 '''Default constructor. 62 63 Initializes a ReadSet instance with the provided set of properties. 64 65 Inputs: 66 manager - multiprocessing.Manager() instance owned by the 67 BamExtractor use this to make all Queues. 68 Outputs: 69 None 70 ''' 71 # We use two data structures to manage ReadSets 72 # self.outFiles is a nested hash that links bamFile Id 73 # group Id and read-pair information (rpi) to a ReadSet 74 # self.outFiles = { bamId : { groupId : { rpi : ReadSet } } } 75 self.outFiles = {} 76 # self.fnPrefix2ReadSet is a simple hash that links the 77 # file name prefix to a ReadSet 78 # self.fnPrefix2ReadSet = { outPrefix : ReadSet } 79 self.fnPrefix2ReadSet = {} 80 81 # The two variables ensure that only one thread has access to a 82 # ReadSet at a time 83 self.lock = manager.Lock() 84 self.readSetInUse = {} 85 86 # The program uses threads which cannot be terminated without 87 # using a trick like altering a global variable. 88 # NOTE: Use self.invalidateThreads to modify this variable 89 self._threadsAreValid = True 90 91 # The RSM communicates with the parsing threads using 92 # a collection of queues. 93 # The request queue is used by threads to make requests 94 # for access to ReadSets. The freeQueue is used to indicate 95 # that access to a resource is no loneger needed. 96 # See self.manageRequests for details. 97 self.requestQueue = manager.Queue() 98 self.freeQueue = manager.Queue() 99 100 # These Queues must be passed in by the BamExtractor 101 # The RSM gives threads access to the ReadSets by placing 102 # copies of them on the appropriate responseQueue. 103 # self.responseQueues is a hash of Queues: 104 # { outPrefix : mp.Manager().Queue() } 105 self.responseQueues = None 106 107 # All strings for printing to std out are placed on this queue 108 # to ensure print statements are not garbled 109 self.printQueue = None
110
111 - def invalidateThreads(self):
112 '''Stop all the threads from running 113 114 Also stops any looping processes in the ReadSets 115 116 Inputs: 117 None 118 Outputs: 119 None 120 ''' 121 self._threadsAreValid = False 122 for file_name in self.fnPrefix2ReadSet.keys(): 123 self.fnPrefix2ReadSet[file_name].threadsAreValid = False
124
125 - def setResponseQueues(self, queues):
126 '''Set response queues used to pass ReadSets to the extract threads 127 128 Inputs: 129 queues - dict { outPrefix : mp.Manager().Queue() }, one for each 130 thread that's parsing BAM files for the BAmExtractor 131 Outputs: 132 None 133 ''' 134 self.responseQueues = queues
135
136 - def setPrintQueue(self, queue):
137 '''Set the print queue for communcating upwards 138 139 Inputs: 140 queue - mp.Manager().Queue(), print queue managed by the BamExtractor 141 Outputs: 142 None 143 ''' 144 self.printQueue = queue
145
146 - def getReadSet(self, bid, gid, rpi, threadId):
147 '''Ensure that only one thread at a time has access to a read set 148 149 Inputs: 150 bid - int, the unique identifier for a BAM file 151 gid - int, the unique identifier for a target group 152 rpi - enum, describes the type of read (RPI.FIR etc.) 153 threadId - string, identifies the thread that is requesting 154 the resource. 155 Outputs: 156 The requested read set or None if it is not available 157 ''' 158 read_set = self.outFiles[bid][gid][rpi] 159 file_prefix = read_set.getConstFP() 160 with self.lock: 161 if file_prefix in self.readSetInUse: 162 read_set = None 163 else: 164 self.readSetInUse[file_prefix] = threadId 165 return read_set
166
167 - def freeReadSet(self, bid, gid, rpi, threadId):
168 '''Indicate that a thread is finished with a read set 169 170 Inputs: 171 bid - int, the unique identifier for a BAM file 172 gid - int, the unique identifier for a target group 173 rpi - enum, describes the type of read (RPI.FIR etc.) 174 threadId - string, identifies the thread that is freeing 175 the resource. 176 Outputs: 177 None 178 179 Raises: 180 InvalidParameterSetException - if the supplied information 181 doesn't make sense 182 ''' 183 read_set = self.outFiles[bid][gid][rpi] 184 file_prefix = read_set.getConstFP() 185 with self.lock: 186 try: 187 if self.readSetInUse[file_prefix] == threadId: 188 del self.readSetInUse[file_prefix] 189 else: 190 raise InvalidParameterSetException( \ 191 "%s owned by %s, not %s" % \ 192 (file_prefix, 193 self.readSetInUse[file_prefix], 194 threadId)) 195 except KeyError: 196 raise InvalidParameterSetException("%s not owned by anyone" % \ 197 file_prefix) 198 pass
199
200 - def manageRequests(self):
201 '''Manage requests by parsing threads to access ReadSets 202 203 This process runs on it's own thread and continues until it discovers 204 a None on the requestQueue or self._threadsAreValid is set to False 205 206 Inputs: 207 None 208 209 Outputs: 210 None 211 ''' 212 # loop on a global, so that way we can kill threads as needed 213 while self._threadsAreValid: 214 item = self.requestQueue.get(timeout=None, block=True) 215 if item is None: 216 break 217 else: 218 # is this a request for a readset 219 (thread_id, bid, gid, rpi, is_fastq) = item 220 return_queue = self.responseQueues[thread_id] 221 read_set = None 222 while read_set is None and self._threadsAreValid: 223 read_set = self.getReadSet(bid, gid, rpi, thread_id) 224 225 if read_set is None: 226 # the read_set is in use by another thread. Pop an entry 227 # off the top of the freeQueue and see if that helps 228 try: 229 (f_thread_id, 230 f_bid, 231 f_gid, 232 f_rpi) = self.freeQueue.get(block=True, 233 timeout=2) 234 try: 235 self.freeReadSet(f_bid, 236 f_gid, 237 f_rpi, 238 f_thread_id) 239 except InvalidParameterSetException: pass 240 except Queue.Empty: 241 # avoid wheel spinning 242 time.sleep(2) 243 # else, we have the RS and we're good to go 244 245 # read_set should NOT be None here 246 if read_set is None: 247 # free the thread 248 return_queue.put(None) 249 break 250 251 # let's start trying to get ready for writing 252 # filename should be set and fasta/fastq should be checked 253 return_queue.put(deepcopy(read_set)) 254 255 # this read set is now "opened". This change will 256 # be used on the next usage of the ReadSet 257 if is_fastq: 258 read_set._fastqWritten = True 259 else: 260 read_set._fastaWritten = True
261
262 - def organiseOutFiles(self, 263 prettyBamFileNames, 264 groupNames, 265 zipped, 266 interleaved, 267 mixBams, 268 mixGroups, 269 mixReads, 270 headersOnly, 271 outFolder, 272 prefix, 273 ):
274 '''Determine all the outFile prefixes needed for extraction. 275 276 The RSM manages a collection of output file objects called ReadSets 277 These are made here and placed into hashes for retrieval later. This 278 function also populates instance variables self.fnPrefix2ReadSet and 279 self.outFiles; the two main ways that ReadSets can be accessed. 280 281 File names vary wildly depending on the values of flags such as: 282 mixGroups, mixBams etc. Take care when modifying this code. 283 284 Inputs: 285 prettyBamFileNames - [ string ] simple versions of the BAM filenames 286 groupNames - [ string ] identify contig groups. EX: [bin1, bin2] 287 zipped - bool, True if the output should be zipped 288 interleaved - bool, True if the output should be interleaved 289 mixBams - bool, True if BAM file origin should be ignored 290 mixGroups - bool, True if group origin should be ignored 291 mixReads - bool, True if (un)paired distinction should be ignored 292 headersOnly - bool, True if only headers should be printed 293 outFolder - string, folder to write output to 294 prefix - string, prefix to apply to all output files 295 296 Outputs: 297 of_prefixes - { bamId : { groupId : { rpi : outFile prefix } } }, 298 this hash can be used to get the file name prefix for a 299 read set based on BAM file origin, group origin and 300 pairing information. 301 ''' 302 of_prefixes = {} 303 base_path = os.path.join(os.path.abspath(outFolder), prefix) 304 305 # we need to make a filename for every eventuality 306 for bid in range(len(prettyBamFileNames)): 307 if mixBams: 308 bam_str = "allMapped" 309 else: 310 bam_str = prettyBamFileNames[bid] 311 312 if bam_str not in self.outFiles: 313 self.outFiles[bid] = {} 314 of_prefixes[bid] = {} 315 316 for gid in range(len(groupNames)): 317 if mixGroups: 318 grp_str = "allGroups" 319 else: 320 grp_str = groupNames[gid] 321 322 if gid not in self.outFiles[bid]: 323 self.outFiles[bid][gid] = {} 324 of_prefixes[bid][gid] = {} 325 326 if prefix == "": 327 fn = "%s%s.%s" % (base_path, bam_str, grp_str) 328 else: 329 fn = "%s.%s.%s" % (base_path, bam_str, grp_str) 330 331 if mixReads: 332 # all reads should go into the one file 333 working_fn = fn + ".allReads" 334 try: 335 read_set_P = self.fnPrefix2ReadSet[working_fn] 336 except KeyError: 337 read_set_P = ReadSet(groupNames, 338 working_fn, 339 zipped=zipped, 340 headersOnly=headersOnly) 341 self.fnPrefix2ReadSet[working_fn] = read_set_P 342 read_set_S = read_set_P 343 344 elif interleaved: 345 # one file for pairs and one for singles 346 working_fn_P = fn + ".pairedReads" 347 working_fn_S = fn + ".unpairedReads" 348 try: 349 read_set_P = self.fnPrefix2ReadSet[working_fn_P] 350 except KeyError: 351 read_set_P = ReadSet(groupNames, 352 working_fn_P, 353 paired=True, 354 zipped=zipped, 355 headersOnly=headersOnly) 356 self.fnPrefix2ReadSet[working_fn_P] = read_set_P 357 358 try: 359 read_set_S = self.fnPrefix2ReadSet[working_fn_S] 360 except KeyError: 361 read_set_S = ReadSet(groupNames, 362 working_fn_S, 363 zipped=zipped, 364 headersOnly=headersOnly) 365 self.fnPrefix2ReadSet[working_fn_S] = read_set_S 366 367 else: 368 # each in their own file 369 working_fn_1 = fn + ".1" 370 working_fn_2 = fn + ".2" 371 working_fn_S = fn + ".unpairedReads" 372 try: 373 read_set_P = self.fnPrefix2ReadSet[working_fn_1] 374 except KeyError: 375 read_set_P = ReadSet(groupNames, 376 working_fn_1, 377 outPrefix2=working_fn_2, 378 paired=True, 379 zipped=zipped, 380 headersOnly=headersOnly) 381 self.fnPrefix2ReadSet[working_fn_1] = read_set_P 382 383 try: 384 read_set_S = self.fnPrefix2ReadSet[working_fn_S] 385 except KeyError: 386 read_set_S = ReadSet(groupNames, 387 working_fn_S, 388 zipped=zipped, 389 headersOnly=headersOnly) 390 self.fnPrefix2ReadSet[working_fn_S] = read_set_S 391 392 # we use the filenames to link everything up below 393 self.outFiles[bid][gid][RPI.FIR] = read_set_P 394 self.outFiles[bid][gid][RPI.SEC] = read_set_P 395 of_prefixes[bid][gid][RPI.FIR] = read_set_P.getConstFP() 396 of_prefixes[bid][gid][RPI.SEC] = read_set_P.getConstFP() 397 398 self.outFiles[bid][gid][RPI.SNGL] = read_set_S 399 self.outFiles[bid][gid][RPI.SNGL_FIR] = read_set_S 400 self.outFiles[bid][gid][RPI.SNGL_SEC] = read_set_S 401 of_prefixes[bid][gid][RPI.SNGL] = read_set_S.getConstFP() 402 of_prefixes[bid][gid][RPI.SNGL_FIR] = read_set_S.getConstFP() 403 of_prefixes[bid][gid][RPI.SNGL_SEC] = read_set_S.getConstFP() 404 405 return of_prefixes
406 407 ############################################################################### 408 ############################################################################### 409 ############################################################################### 410 ############################################################################### 411
412 -class ReadSet(object):
413 '''Container class to encapsulate the concept of a(n un)paired read set 414 415 This class manages file properties and writing functionality.''' 416
417 - def __init__(self, 418 groupNames, 419 outPrefix1, 420 outPrefix2 = None, 421 paired=False, 422 zipped=True, 423 headersOnly=False 424 ):
425 '''Default constructor. 426 427 Initializes a ReadSet instance with the provided set of properties. 428 429 Inputs: 430 groupNames - [ string ], target names, one for each target group. 431 outPrefix1 - string prefix of the first output file (read1). 432 outPrefix2 - string prefix of the second output file or None for 433 interleaved or unpaired files. 434 isPaired - bool, True if the file is a paired read file. 435 zipped - bool, True if the data should be compressed for writing. 436 headersOnly - bool, True if only headers should be printed. 437 ''' 438 # output read file properties 439 self.groupNames = groupNames 440 self.isPaired = paired 441 self.zipOutput = zipped 442 self.headersOnly = headersOnly 443 444 # prefixes of the files we'll be writing to 445 self._outPrefix1 = outPrefix1 446 # prefix2 may well be None 447 self._outPrefix2 = outPrefix2 448 449 # work out how we'll write files 450 if zipped: 451 self._writeOpen = gzip.open 452 else: 453 self._writeOpen = open 454 455 # If the RSM catches a ^C during a write operation 456 # we need a global variable to exit and kill the thread 457 # Loop as long as threads are valid 458 self._threadsAreValid = True 459 460 # we could potentially be writing both fasta and fastq 461 # these variables keep track of such things and make sure that 462 # the right data goes to the right place 463 self._fastqWritten = False 464 self._fastaWritten = False
465
466 - def getConstFP(self, fNumber=1):
467 '''return the unchanging filename prefix associated with this ReadSet 468 469 Inputs: 470 fNumber - which file prefix is needed? [1 or 2] 471 Outputs: 472 The corresponding prefix 473 ''' 474 if 1 == fNumber: 475 return self._outPrefix1 476 else: 477 return self._outPrefix2
478
479 - def determineFileSuffix(self, isFastq):
480 '''Determine the suffix of the file depending on if we're writing 481 fasta or fastq 482 483 Inputs: 484 isFastq - bool, is the file fastq (or fasta) 485 Outputs: 486 Filenames to be written to. 487 (fileName1, fileName2) 488 fileName2 will be None for unpaired or interleaved-paired files 489 ''' 490 if self.headersOnly: 491 ext = ".list" 492 else: 493 if isFastq: 494 ext = ".fq" 495 else: 496 ext = ".fna" 497 498 if self.zipOutput: 499 ext += ".gz" 500 501 file_name1 = self.getConstFP()+ext 502 file_name2 = self.getConstFP(fNumber=2) 503 if file_name2 is not None: 504 file_name2 += ext 505 506 return (file_name1, file_name2)
507
508 - def writeChain(self, 509 pBMM, 510 isFastq, 511 printQueue=None 512 ):
513 '''Write a single print chain to disk 514 515 A print chain is a linked list of mapped reads that have been 516 pre-ordered and are ready to write (or print). The print chain can 517 contain either Fasta or Fastq reads but never both. File names are 518 determined on the fly based on the presence or absence of quality info 519 of the first read in the chain (determined by the BamExtractor) and 520 passed to this function as isFastq. 521 522 NOTE: This function does NOT free any memory associated with pBMM. 523 524 Inputs: 525 pBMM - c.POINTER(BM_mappedRead_C), the start of a linked list of 526 mapped reads, pre-ordered for printing by the BamExtractor 527 isFastq - bool, True if reads have quality information. 528 printQueue - Managed by the BamExtractor. Place all printing strings 529 here. Acts as a verbose flag. 530 Outputs: 531 None 532 ''' 533 CW = CWrapper() 534 535 # reads are written (in C land) to this string buffer 536 # is 20000 bases enough for PAC-bio? 537 buffer_c = c.create_string_buffer(20000) 538 pbuffer_c = c.POINTER(c.c_char_p) 539 pbuffer_c = c.pointer(buffer_c) 540 541 # this variable records how much of the buffer is used for each read 542 str_len_c = c.c_int(0) 543 pstr_len_c = c.cast(c.addressof(str_len_c), c.POINTER(c.c_int)) 544 545 paired_c = c.c_int(1) 546 unpaired_c = c.c_int(0) 547 headers = c.c_int(self.headersOnly) 548 549 # buffer to hold the group name in C format 550 # it's a bit of a waste of time to pass a string to C only to have it 551 # passed right back, but this approach reduces complexity and makes the 552 # C code more useful, so it's preferred. 553 group_name_c = c.c_char_p() 554 555 # get the fileNames to write to 556 (out_file1, out_file2) = self.determineFileSuffix(isFastq) 557 558 # determine file write mode. This instance is likely a copy 559 # of the main one managed by the RSM. so there is no need 560 # to update the value of self._fastXWritten here. Just use it. 561 opened = False 562 if isFastq and self._fastqWritten: 563 opened = True 564 elif not isFastq and self._fastaWritten: 565 opened = True 566 567 if opened: 568 # we will append to an existing file 569 open_mode = "a" 570 mode_desc = "Appending to" 571 else: 572 # overwrite any existing file 573 open_mode = "w" 574 mode_desc = "Writing" 575 576 if self.isPaired: 577 # swap writing to file 1 and file 2. 578 # always start writing to fh1 first! 579 isFh1 = True 580 581 # open files 582 fh1 = self._writeOpen(out_file1, open_mode) 583 if out_file2 is None: 584 if printQueue: 585 printQueue.put(" %s interleaved file: %s" % (mode_desc, 586 out_file1)) 587 fh2 = fh1 588 else: 589 if printQueue: 590 printQueue.put(" %s coupled files: %s %s" % (mode_desc, 591 out_file1, 592 out_file2)) 593 fh2 = self._writeOpen(out_file2, open_mode) 594 595 # write 596 while pBMM and self._threadsAreValid: 597 # get C to write the read into the string buffer 598 group_name_c = self.groupNames[pBMM.contents.group] 599 CW._sprintMappedRead(pBMM, 600 pbuffer_c, 601 pstr_len_c, 602 group_name_c, 603 headers, 604 paired_c) 605 # unwrap the buffer and transport into python land 606 printable_string = \ 607 (c.cast(pbuffer_c, 608 c.POINTER(c.c_char*str_len_c.value)).contents).value 609 if isFh1: 610 fh1.write(printable_string) 611 isFh1 = False 612 else: 613 fh2.write(printable_string) 614 isFh1 = True 615 616 # be sure that we're going to the next PRINT read 617 pBMM = CW._getNextPrintRead(pBMM) 618 619 # and close 620 fh1.close() 621 if out_file2 is not None: 622 fh2.close() 623 else: 624 fh = self._writeOpen(out_file1, open_mode) 625 if printQueue: 626 printQueue.put(" %s unpaired file: %s (%s)" % (mode_desc, 627 out_file1, 628 self)) 629 while pBMM and self._threadsAreValid: 630 group_name_c = self.groupNames[pBMM.contents.group] 631 CW._sprintMappedRead(pBMM, 632 pbuffer_c, 633 pstr_len_c, 634 group_name_c, 635 headers, 636 unpaired_c) 637 printable_string = \ 638 (c.cast(pbuffer_c, 639 c.POINTER(c.c_char*str_len_c.value)).contents).value 640 fh.write(printable_string) 641 pBMM = CW._getNextPrintRead(pBMM) 642 643 fh.close()
644 645 ############################################################################### 646 ############################################################################### 647 ############################################################################### 648 ############################################################################### 649