Package bamm :: Module bamExtractor
[hide private]
[frames] | no frames]

Source Code for Module bamm.bamExtractor

  1  ############################################################################### 
  2  #                                                                             # 
  3  #    BamExtractor.py                                                          # 
  4  #                                                                             # 
  5  #    Class for extracting reads from BAM files                                # 
  6  #                                                                             # 
  7  #    Copyright (C) Michael Imelfort                                           # 
  8  #                                                                             # 
  9  ############################################################################### 
 10  #                                                                             # 
 11  #    This library is free software; you can redistribute it and/or            # 
 12  #    modify it under the terms of the GNU Lesser General Public               # 
 13  #    License as published by the Free Software Foundation; either             # 
 14  #    version 3.0 of the License, or (at your option) any later version.       # 
 15  #                                                                             # 
 16  #    This library is distributed in the hope that it will be useful,          # 
 17  #    but WITHOUT ANY WARRANTY; without even the implied warranty of           # 
 18  #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU        # 
 19  #    Lesser General Public License for more details.                          # 
 20  #                                                                             # 
 21  #    You should have received a copy of the GNU Lesser General Public         # 
 22  #    License along with this library.                                         # 
 23  #                                                                             # 
 24  ############################################################################### 
 25   
 26  __author__ = "Michael Imelfort" 
 27  __copyright__ = "Copyright 2014" 
 28  __credits__ = ["Michael Imelfort"] 
 29  __license__ = "LGPLv3" 
 30  __maintainer__ = "Michael Imelfort" 
 31  __email__ = "mike@mikeimelfort.com" 
 32   
 33  ############################################################################### 
 34   
 35  # system imports 
 36  import os 
 37  import ctypes as c 
 38  from multiprocessing import Manager, Process, Value 
 39  import numpy as np 
 40  import sys 
 41  import gzip 
 42  import Queue 
 43  import time 
 44  from threading import Thread 
 45   
 46  # local imports 
 47  from cWrapper import * 
 48  from bamFile import * 
 49  from bammExceptions import * 
 50  from bamRead import * 
 51   
 52  ############################################################################### 
 53  ############################################################################### 
 54  ############################################################################### 
 55  # Multiprocessing requires that all passed items be pickleable. That is they 
 56  # must be vanilla variables or functions defined in the file itself, ie. not 
 57  # within a class. We get around this by writing an external function which calls 
 58  # a class function. Hacky, but it works. 
 59  ############################################################################### 
 60  ############################################################################### 
 61  ############################################################################### 
 62   
63 -def externalExtractWrapper(threadId, 64 outFilePrefixes, 65 bamPaths, 66 prettyBamFileNames, 67 numGroups, 68 perContigGroups, 69 contigs, 70 printQueue, 71 extractQueue, 72 requestQueue, 73 freeQueue, 74 responseQueue, 75 headersOnly, 76 mixGroups, 77 minMapQual, 78 maxMisMatches, 79 ignoreSuppAlignments, 80 ignoreSecondaryAlignments, 81 verbose=False 82 ):
83 '''Single-process BAMfile read extraction. 84 85 cTypes pointers are unpickleable unless they are top level, so this function 86 lives outside the class and has 1,000,000 member variables passed to it. 87 Life would be easier if we could pass the class but any implicit copy 88 operations that follow are somewhat difficult to detect and can cause WOE. 89 Lot's of WOE, believe me... 90 91 Inputs: 92 threadId - string, a unique Id for this process / thread 93 outFilePrefixes - 3D dict for finding outFilePrefixes based on bamFile, 94 group and pairing information 95 bamPaths - { bid : string }, full paths to the BAM files 96 prettyBamFileNames - { bid : string }, short, print-friendly BAM names 97 numGroups - int, the number of groups reads are split into 98 perContigGroups - [int], contains groups Ids, insync with contigs array 99 contigs - [string], contig ids as written in the BAM 100 printQueue - Manager.Queue, thread-safe communication with users 101 extractQueue - Manager.Queue, bids (BAMs) yet to be extracted from 102 requestQueue - Manager.Queue, make requests for ReadSets for printing 103 freeQueue - Manager.Queue, tell the RSM when finished with a ReadSet 104 responseQueue - Manager.Queue, recieve copies of ReadSets from the RSM 105 headersOnly - == True -> write read headers only 106 mixGroups - == True -> use one file for all groups 107 minMapQual - int, skip all reads with a lower mapping quality score 108 maxMisMatches - int, skip all reads with more mismatches (NM aux files) 109 useSuppAlignments - == True -> skip supplementary alignments 110 useSecondaryAlignments - == True -> skip secondary alignments 111 verbose - == True -> be verbose 112 113 Outputs: 114 None 115 ''' 116 while True: 117 p_bid = extractQueue.get(block=True, timeout=None) 118 if p_bid is None: # poison pill 119 break 120 else: 121 if verbose: 122 printQueue.put("%s Preparing to extract reads from file: %s" % \ 123 (threadId, prettyBamFileNames[p_bid] ) ) 124 125 # first we need to C-ify variables 126 bamfile_c = c.c_char_p() 127 bamfile_c = bamPaths[p_bid] 128 129 pretty_name_c = c.c_char_p() 130 pretty_name_c = prettyBamFileNames[p_bid] 131 132 num_contigs = len(contigs) 133 contigs_c_array = (c.c_char_p * num_contigs)() 134 contigs_c_array[:] = contigs 135 136 groups_c_array = (c.c_uint16 * num_contigs)() 137 groups_c_array[:] = perContigGroups 138 139 headers_only_c = c.c_uint32() 140 headers_only_c = headersOnly 141 142 min_mapping_quality_c = c.c_uint32() 143 min_mapping_quality_c = minMapQual 144 145 max_mismatches_c = c.c_uint32() 146 max_mismatches_c = maxMisMatches 147 148 pBMM = c.POINTER(BM_mappedRead_C) 149 150 # call the C function to extract the reads 151 CW = CWrapper() 152 pBMM = CW._extractReads(bamfile_c, 153 contigs_c_array, 154 num_contigs, 155 groups_c_array, 156 pretty_name_c, 157 headers_only_c, 158 min_mapping_quality_c, 159 max_mismatches_c, 160 ignoreSuppAlignments, 161 ignoreSecondaryAlignments) 162 163 if verbose: 164 printQueue.put("%s Finished C-based extraction for: %s" \ 165 % (threadId, prettyBamFileNames[p_bid])) 166 printQueue.put("%s Re-ordering reads before printing" % \ 167 (threadId)) 168 169 # pBMM is one large linked list consisting of all mapped reads that 170 # could be extracted from the BAM file. We have information about 171 # the group and rpi of each read. The destination for each read is 172 # encapsulated in the structure of the chain_info hash and 173 # corresponding "storage" hash. We will re-order the linked list so 174 # that adjacent connections indicate adjacency in the output file. 175 # This is done by setting the "nextPrintRead" pointer in each BMM 176 177 overlapper = {} # keep track of readSets with the same filename 178 chain_info = {} # store start / end and count of a printing chain 179 180 # initialise the helper data structures 181 for gid in range(numGroups): 182 chain_info[gid] = {} 183 # ReadSets exist for only FIR and SNGL 184 for rpi in [RPI.FIR, RPI.SNGL]: 185 file_name = outFilePrefixes[p_bid][gid][rpi] 186 try: 187 storage = overlapper[file_name] 188 except KeyError: 189 # [start of chain, end of chain, chain length] 190 storage = [None, None, 0] 191 overlapper[file_name] = storage 192 chain_info[gid][rpi] = {'storage' : storage} 193 194 while pBMM: 195 ''' 196 USE THIS CODE TO GET THE READ ID WHEN DEBUGGING 197 buffer_c = c.create_string_buffer(20000) 198 pbuffer_c = c.POINTER(c.c_char_p) 199 pbuffer_c = c.pointer(buffer_c) 200 201 # this variable records how much of the buffer is used for each read 202 str_len_c = c.c_int(0) 203 pstr_len_c = c.cast(c.addressof(str_len_c), c.POINTER(c.c_int)) 204 205 paired_c = c.c_int(1) 206 headers = c.c_int(1) 207 208 group_name_c = c.c_char_p() 209 group_name_c = "THIS__" 210 211 CW._sprintMappedRead(pBMM, 212 pbuffer_c, 213 pstr_len_c, 214 group_name_c, 215 headers, 216 paired_c) 217 # unwrap the buffer and transport into python land 218 read_ID_debug = \ 219 (c.cast(pbuffer_c, 220 c.POINTER(c.c_char*str_len_c.value)).contents).value 221 222 read_ID_debug = read_ID_debug.split(";")[-1].rstrip() 223 ''' 224 225 # get hold of the next item in the linked list 226 rpi = pBMM.contents.rpi 227 c_rpi = RPIConv[rpi] 228 229 # we may need to add one or two reads, depending on pairing 230 # always add pairs together to keep output files in sync 231 addable = [] 232 233 if c_rpi != RPI.SEC: # RPI.FIR or RPI.SNGL 234 # append RPI.FIR and RPI.SNGL, SEC is handled below 235 addable.append([c.addressof(pBMM.contents), c_rpi]) 236 237 # use raw rpi here! 238 if rpi == RPI.FIR: 239 # We know this guys has a partner however 240 # we may need to treat this as a single read 241 # or we may have to step up the order of it's partner 242 r2_rpi = RPI.ERROR 243 244 if (1 == CW._partnerInSameGroup(pBMM)): 245 # partner is in same group. 246 # RPI.FIR and RPI.SEC ALWAYS point to the same ReadSet 247 r2_rpi = RPI.FIR 248 else: 249 # partner is in a different group 250 # we should treat as a single, unless we don't care 251 # i.e. (mixGroups == True) 252 if mixGroups: 253 # we don't care, print it now as a pair 254 # RPI.FIR and RPI.SEC ALWAYS point to same ReadSet 255 r2_rpi = RPI.FIR 256 else: 257 # we'll treat both paired reads as singles 258 r2_rpi = RPI.SNGL 259 addable[0][1] = RPI.SNGL # update this guy 260 # the storage for this rpi may remain == problems 261 262 addable.append([c.addressof((CW._getPartner(pBMM)).contents), r2_rpi]) 263 264 # update the printing chain 265 for mappedRead in addable: 266 tmp_pBMM = c.cast(mappedRead[0], c.POINTER(BM_mappedRead_C)) 267 has_qual = (tmp_pBMM.contents.qualLen != 0) 268 group = tmp_pBMM.contents.group 269 270 # set the MI code here 271 working_rpi = mappedRead[1] 272 stored_rpi = tmp_pBMM.contents.rpi 273 mi = MI.ER_EM_EG 274 if working_rpi == RPI.FIR: 275 mi = MI.PR_PM_PG 276 elif working_rpi == RPI.SNGL: 277 if stored_rpi == RPI.FIR or stored_rpi == RPI.SEC: 278 mi = MI.PR_PM_UG 279 if stored_rpi == RPI.SNGL_FIR or stored_rpi == RPI.SNGL_SEC: 280 mi = MI.PR_UM_NG 281 elif stored_rpi == RPI.SNGL: 282 mi = MI.UR_NM_NG 283 284 CW._setMICode(tmp_pBMM, mi) 285 286 #sys.stderr.write("%s -- %s\n" % (RPI2Str(working_rpi), RPI2Str(stored_rpi))) 287 288 # set and check the quality info 289 try: 290 # 'isFastq' is not set above, so on the first go 291 # this will raise a KeyError 292 if chain_info[group][working_rpi]['isFastq'] ^ has_qual: 293 # this will happen when people have merged BAMs with 294 # and without quality information 295 raise MixedFileTypesException( \ 296 "You cannot mix Fasta and Fastq reads " \ 297 "together in an output file") 298 except KeyError: 299 # Now we can set the type of the file. 300 # Only get here on the first read for each group, rpi 301 # Because of the way that the same storage object can be 302 # linked to multiple rpis, there's a chance that 303 # we won't set 'isFastq' for some rpis. Further down we 304 # need to be aware of this and just pass on the KeyError 305 # CODE==RPI_SKIP 306 chain_info[group][working_rpi]['isFastq'] = has_qual 307 308 # build or maintain the chain 309 if chain_info[group][working_rpi]['storage'][1] is None: 310 # this is the first time we've seen this print chain 311 chain_info[group][working_rpi]['storage'][0] = \ 312 mappedRead[0] 313 chain_info[group][working_rpi]['storage'][1] = \ 314 mappedRead[0] 315 chain_info[group][working_rpi]['storage'][2] = 1 316 else: 317 # join this pBMM onto the end of the existing chain 318 CW._setNextPrintRead( \ 319 c.cast(chain_info[group][working_rpi]['storage'][1], 320 c.POINTER(BM_mappedRead_C)), 321 c.cast(mappedRead[0], 322 c.POINTER(BM_mappedRead_C) 323 ) 324 ) 325 chain_info[group][working_rpi]['storage'][1] = \ 326 mappedRead[0] 327 chain_info[group][working_rpi]['storage'][2] += 1 328 329 # next! 330 pBMM = CW._getNextMappedRead(pBMM) 331 332 # Write the newly created chains to disk 333 if verbose: 334 printQueue.put("%s Re-ordering complete. Preparing to write" % \ 335 (threadId)) 336 # search the chain_info hash for printable chains 337 for gid in range(numGroups): 338 for rpi in [RPI.FIR, RPI.SNGL]: 339 if chain_info[gid][rpi]['storage'][1] is not None: 340 # if we got here then there should be a chain to print 341 pBMM_chain = \ 342 c.cast(chain_info[gid][rpi]['storage'][0], 343 c.POINTER(BM_mappedRead_C) 344 ) 345 # we need to print here, so what we will do is make a 346 # request to the RSM for a fileName etc. that we can 347 # write to. We block on this call so we may have to 348 # wait for a bit BUT... it's either this, or go single 349 # threaded. So this is what we'll do. 350 try: 351 requestQueue.put((threadId, 352 p_bid, 353 gid, 354 rpi, 355 chain_info[gid][rpi]['isFastq'])) 356 357 # wait for the RSM to return us a copy of a ReadSet 358 RS = responseQueue.get(block=True, timeout=None) 359 if RS is None: 360 # free the memory, it is useless to me! 361 CW._destroyPrintChain(pBMM_chain) 362 else: 363 # we can print stuff 364 pBMM_destroy = c.POINTER(BM_mappedRead_C) 365 pBMM_destroy = pBMM_chain 366 RS.writeChain(pBMM_chain, 367 chain_info[gid][rpi]['isFastq']) 368 CW._destroyPrintChain(pBMM_destroy) 369 370 # free the RS now 371 freeQueue.put((threadId, 372 p_bid, 373 gid, 374 rpi)) 375 376 # set this to None so it's not added twice 377 chain_info[gid][rpi]['storage'][1] = None 378 379 except KeyError: 380 # this will happen when we have chosen to mix reads. 381 # it's no problem and I can't see that it hides any 382 # other bug. The "best" way to handle this is to set 383 # up a new variable that works out if we've set the 384 # 'isFastq' for a particular group and rpi. But this 385 # is really the same as checking chain_info[gid][rpi] 386 # for a KeyError here. So this is what we'll do... 387 # see: CODE==RPI_SKIP 388 pass 389 390 if verbose: 391 printQueue.put("%s Read extraction complete for file: %s" % \ 392 (threadId, prettyBamFileNames[p_bid]) 393 )
394 395 ############################################################################### 396 ############################################################################### 397 ############################################################################### 398 ############################################################################### 399
400 -class BamExtractor:
401 '''Class used to manage extracting reads from multiple BAM files'''
402 - def __init__(self, 403 contigs, 404 bamFiles, 405 prefix="", 406 groupNames=[], 407 outFolder=".", 408 mixBams=False, 409 mixGroups=False, 410 mixReads=False, 411 interleaved=False, 412 bigFile=False, 413 headersOnly=False, 414 minMapQual=0, 415 maxMisMatches=1000, 416 useSuppAlignments=False, 417 useSecondaryAlignments=False, 418 ):
419 ''' 420 Default constructor. 421 422 Set all the instance variables, make ReadSets, organise output files 423 424 Inputs: 425 contigs - [[string]], list of list contig IDs (used as a filter) 426 bamFiles - [string], list of bamfiles to extract reads from 427 prefix - string, append this string to the start of all output files 428 groupNames - [string], list of names of the groups in the contigs list 429 outFolder - path, write output to this folder 430 mixBams - == True -> use one file for all bams 431 mixGroups - == True -> use one file for all groups 432 mixReads - == True -> use one file for paired / unpaired reads 433 interleaved - == True -> use interleaved format for paired reads 434 bigFile - == True -> do NOT gzip outputs 435 headersOnly - == True -> write read headers only 436 minMapQual - int, skip all reads with a lower mapping quality score 437 maxMisMatches - int, skip all reads with more mismatches (NM aux files) 438 useSuppAlignments - == True -> DON'T skip supplementary alignments 439 useSecondaryAlignments - == True -> DON'T skip secondary alignments 440 441 Outputs: 442 None 443 ''' 444 # make sure the output folder exists 445 self.outFolder = outFolder 446 # it's a waste if we abort but I like to check if write permissions 447 # are intact before I do lots of work. 448 self.makeSurePathExists(self.outFolder) 449 450 self.bamFiles = bamFiles 451 self.prettyBamFileNames = [os.path.basename(bam).replace(".bam", "") 452 for bam in self.bamFiles] 453 454 self.prefix = prefix 455 456 self.mixBams = mixBams 457 self.mixGroups = mixGroups 458 self.mixReads = mixReads 459 460 self.interleaved = interleaved 461 if headersOnly: 462 self.headersOnly = 1 463 else: 464 self.headersOnly = 0 465 466 self.minMapQual = minMapQual 467 self.maxMisMatches = maxMisMatches 468 469 if useSuppAlignments: 470 self.ignoreSuppAlignments = 0 471 else: 472 self.ignoreSuppAlignments = 1 473 474 if useSuppAlignments: 475 self.ignoreSecondaryAlignments = 0 476 else: 477 self.ignoreSecondaryAlignments = 1 478 479 # are we going to zip the output? 480 if bigFile: 481 self.zipped = False 482 else: 483 self.zipped = True 484 485 # munge the groups 486 if groupNames == []: 487 # no names specified, just use "group_1", "group_2" etc... 488 groupNames = ["group_%d" % i for i in range(1, len(contigs)+1)] 489 self.groupNames = groupNames 490 491 # initialise to the first set of groups 492 self.contigs = contigs[0] 493 self.perContigGroups = [0]*len(self.contigs) 494 495 for i in range(1, len(contigs)): 496 self.contigs += contigs[i] 497 self.perContigGroups += [i] * len(contigs[i]) 498 499 self.manager = Manager() 500 501 # make sure printing to stdout is handled in a threadsafe manner 502 self.outputStream = sys.stderr 503 self.printQueue = self.manager.Queue() 504 self.printDelay = 0.5 # delay between checks for new print statements 505 506 self.RSM = ReadSetManager(self.manager) 507 508 # make sure the RSM can talk to us 509 self.RSM.setPrintQueue(self.printQueue) 510 511 self.outFilePrefixes= self.RSM.organiseOutFiles(self.prettyBamFileNames, 512 self.groupNames, 513 self.zipped, 514 self.interleaved, 515 self.mixBams, 516 self.mixGroups, 517 self.mixReads, 518 self.headersOnly, 519 self.outFolder, 520 self.prefix) 521 522 523 ''' 524 for bid in range(len(self.bamFiles)): 525 for gid in range(len(self.groupNames)): 526 for rpi in [RPI.FIR, RPI.SEC, RPI.SNGL, RPI.SNGL_FIR, RPI.SNGL_SEC]: 527 sys.stderr.write("%s %s %s %s\n" % (self.prettyBamFileNames[bid], self.groupNames[gid], RPI2Str(rpi), str(self.outFilePrefixes[bid][gid][rpi]))) 528 '''
529
530 - def extract(self, threads=1, verbose=False):
531 '''Start extracting reads from the BAM files 532 533 This function is responsible for starting and stopping all threads and 534 processes used in bamm extract. Due to python multiprocessing's need to 535 pickle everything the actual work of extraction is carried out in the 536 first level function called externalExtractWrapper. See there for actual 537 extraction details. This function is primarily concerned with thread 538 and process management. 539 540 Inputs: 541 threads - int, the number of threads / processes to use 542 verbose - bool, True if lot's of stuff should be printed to screen 543 544 Outputs: 545 None 546 ''' 547 # make a queue containing all the bids to extract reads from 548 extract_queue = self.manager.Queue() 549 for bid in range(len(self.bamFiles)): 550 extract_queue.put(bid) 551 552 # place one None on the extract queue for each thread we have access to 553 # AKA poison pill 554 for _ in range(threads): 555 extract_queue.put(None) 556 557 # each thread gets a unique identifier 558 thread_ids = ["Thread_%s" % str(tt) for tt in range(threads)] 559 560 # start the Queue management processes and threads 561 562 # printing process 563 print_process = Process(target=self.managePrintQueue) 564 print_process.start() 565 566 # several threads for writing to disk 567 request_management_threads = [Thread(target=self.RSM.manageRequests) 568 for _ in range(threads)] 569 for w in request_management_threads: 570 w.start() 571 572 # each thread gets its own queue for recieving ReadSet instances on 573 response_queues = dict(zip(thread_ids, 574 [self.manager.Queue() for _ in range(threads)] 575 ) 576 ) 577 # The RSM is waiting wor this queue too 578 self.RSM.setResponseQueues(response_queues) 579 580 # start the machine 581 try: 582 # make the extraction processes 583 extract_proc = [Process(target=externalExtractWrapper, 584 args=(thread_ids[tt], 585 self.outFilePrefixes, 586 self.bamFiles, 587 self.prettyBamFileNames, 588 len(self.groupNames), 589 self.perContigGroups, 590 self.contigs, 591 self.printQueue, 592 extract_queue, 593 self.RSM.requestQueue, 594 self.RSM.freeQueue, 595 response_queues[thread_ids[tt]], 596 self.headersOnly, 597 self.mixGroups, 598 self.minMapQual, 599 self.maxMisMatches, 600 self.ignoreSuppAlignments, 601 self.ignoreSecondaryAlignments, 602 verbose 603 ) 604 ) 605 for tt in range(threads)] 606 607 # start the extraction processes 608 for p in extract_proc: 609 p.start() 610 611 for p in extract_proc: 612 p.join() 613 614 # stop any rogue file writing action 615 self.RSM.invalidateThreads() 616 for w in request_management_threads: 617 self.RSM.requestQueue.put(None) 618 for w in request_management_threads: 619 w.join() 620 621 # stop the printer 622 self.printQueue.put(None) 623 print_process.join() 624 625 # success 626 return 0 627 628 except: 629 # ctrl-c! Make sure all processes are terminated 630 sys.stderr.write("\nEXITING...\n") 631 632 for p in extract_proc: 633 p.terminate() 634 635 # stop any rogue file writing action 636 self.RSM.invalidateThreads() 637 for w in request_management_threads: 638 self.RSM.requestQueue.put(None) 639 for w in request_management_threads: 640 w.join() 641 642 # stop the printer 643 print_process.terminate() 644 645 # dismal failure 646 return 1
647
648 - def managePrintQueue(self):
649 '''Write all the print requests to stdout / stderr 650 651 This function is run as a process and so can be terminated. 652 Place a None on the printQueue to terminate the process. 653 654 Change self.outputStream to determine where text will be written to. 655 656 Inputs: 657 None 658 659 Outputs: 660 None 661 ''' 662 while True: 663 stuff = self.printQueue.get(timeout=None, block=True) 664 if stuff is None: 665 break 666 else: 667 self.outputStream.write("%s\n" % stuff)
668
669 - def makeSurePathExists(self, path):
670 '''Make sure that a path exists, make it if necessary 671 672 Inputs: 673 path - string, full or relative path to create 674 675 Outputs: 676 None 677 ''' 678 try: 679 os.makedirs(path) 680 except OSError as exception: 681 import errno 682 if exception.errno != errno.EEXIST: 683 raise
684 685 ############################################################################### 686 ############################################################################### 687 ############################################################################### 688 ############################################################################### 689