我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pysam.AlignmentFile()。
def readBAM(BAMFile, contigs): """ Parses an aligned BAM file for coverage. Args: BAMFile: The BAM file to parse. contigs: List of sidr.common.Contigs taken from input FASTA. Returns: contigs: Input contigs updated with coverage, measured as an average over the whole contig. """ alignment = pysam.AlignmentFile(BAMFile, "rb") click.echo("Reading BAM file") with click.progressbar(contigs) as ci: for contig in ci: covArray = [] # coverage over contig = sum(coverage per base)/number of bases for pile in alignment.pileup(region=str(contig)): covArray.append(pile.nsegments) try: contigs[contig].variables["Coverage"] = (sum(covArray) / len(covArray)) except ZeroDivisionError: # should only occur if 0 coverage recorded contigs[contig].variables["Coverage"] = 0 return contigs
def __init__(self, filename, doubled): self.filename = filename self.doubled = doubled #determine if the file is bam or sam self.filetype = os.path.splitext(self.filename)[1] #throw an error if the file is not bam or sam if self.filetype not in ['.bam']: raise Exception("""You have provided a file with an extension other than '.bam', please check your command-line arguments""") #now make sure there is an index file for the bam file if not os.path.exists("{}.bai".format(self.filename)): raise Exception("""Your .bam file is there, but it isn't indexed and there isn't a .bai file to go with it. Use 'samtools index <yourfile>.bam' to fix it.""") #now open the file and just call it a sambam file filetype_dict = {'.sam': '', '.bam': 'b'} self.sambam = pysam.AlignmentFile(self.filename, "r{}".format(filetype_dict[self.filetype])) self.seqlength = self.sambam.lengths[0] self.true_seqlength = self.seqlength if not self.doubled else int(self.seqlength/2) self.raw_depthmap = self.get_depthmap() self.features = self.parse() self.features.sort_values(by=['POS','MAPLEN'], ascending=[True, False] ,inplace=True) self.features.reset_index(inplace=True) self.features.drop('index', 1, inplace=True)
def parse_fusion_bam(bam_f): fusions = {} bam = pysam.AlignmentFile(bam_f, 'rb') for read in bam: if read.is_secondary: # not the primary alignment continue if not read.has_tag('XF'): # not fusion junctions continue chr1, chr2 = read.get_tag('XF').split()[1].split('-') if chr1 != chr2: # not on the same chromosome continue strand = '+' if not read.is_reverse else '-' if read.query_name not in fusions: # first fragment fusions[read.query_name] = [chr1, strand, read.reference_start, read.reference_end] else: # second fragment if chr1 == fusions[read.query_name][0] \ and strand == fusions[read.query_name][1]: yield [chr1, strand, read.reference_start, read.reference_end] yield fusions[read.query_name] bam.close()
def test_init_2(tmpdir): "it should open sam file if provided" make_bam(tmpdir.strpath, """ 123456789_123456789_ r1 + ........... r1 - ......*.... r2 + .........*. r2 - .....*....... """) o = Namespace(query="test.vcf", cfdna=tmpdir.join("test.bam").strpath, gdna=None, output=None) init(o) assert isinstance(o.cfdna, AlignmentFile) assert o.gdna == None
def test_get_reads_1(tmpdir): "it should get all but only the reads that covers the given position" make_bam(tmpdir.strpath, """ 123456789_123456789_12 r1 + ........... r1 - ......*.... r2 + .........*. r2 - .....*....... r3 + ........... r3 - ....*...... r4 + ........... r4 - ........... 123456789_123456789_12 """) o = Namespace(verbos=False, mismatch_limit=-1) sam = AlignmentFile(tmpdir.join("test.bam").strpath) assert sum( 1 for _ in get_reads(o, sam, 'ref', '4') ) == 2 assert sum( 1 for _ in get_reads(o, sam, 'ref', '12') ) == 7 assert sum( 1 for _ in get_reads(o, sam, 'ref', '20') ) == 2
def test_get_reads_2(tmpdir): "it should read properties correctly" make_bam(tmpdir.strpath, """ 123456789_123 r1 + ...*....... r1 - .*......... """) o = Namespace(verbos=False, mismatch_limit=-1) sam = AlignmentFile(tmpdir.join("test.bam").strpath) r = next(get_reads(o, sam, 'ref', '4')) assert r[0] == "r1" # name assert r[3] == 0 # 0-based pos assert r[4] == 11 # length assert r[5] == -1 # mismatch, not caculated assert r[6] == 2 # mate pos assert r[7] == 13 # template length assert r[8] == False # is_reverse assert r[9] == True # paired and mapped
def test_pad_softclip_3(tmpdir): "it should pad softclipped bases" make_bam(tmpdir.strpath, """ 123456789_123 r1 + __.*....... r1 - .*......... r2 - ...*....... r2 + .*.......__ """) o = Namespace(verbos=False, mismatch_limit=-1) sam = AlignmentFile(tmpdir.join("test.bam").strpath) adjusted_pos = pad_softclip(sam) assert adjusted_pos["r1"] == (0, 13) # 0-based position assert adjusted_pos["r2"] == (0, 13)
def init(o): if o.cfdna == None and o.gdna == None: raise Exception("At least one of --cfdna and --gdna should be specified") if o.cfdna != None: o.cfdna = AlignmentFile(o.cfdna, "rb") if not o.cfdna.has_index(): raise Exception("Index not found, use `samtools index` to generate") if o.gdna != None: o.gdna = AlignmentFile(o.gdna, "rb") if not o.gdna.has_index(): raise Exception("Index not found, use `samtools index` to generate") if o.output == None: basename, extname = splitext(o.query) o.output = basename + "_MrBam" + extname
def go(args): bed = read_bed_file(args.bedfile) infile = pysam.AlignmentFile(args.alignment, "rb") for s in infile: #print s.get_aligned_pairs() #print ">%s\n%s" % (s.query_name, s.query_alignment_sequence) p1 = find_primer(bed, s.reference_start, '+') p2 = find_primer(bed, s.reference_end, '-') primer_start = p1[2]['start'] # start is the 5' primer_end = p2[2]['start'] query_align_start = find_query_pos(s, primer_start) query_align_end = find_query_pos(s, primer_end) print >> sys.stderr, "%s\t%s\t%s\t%s" % (primer_start, primer_end, primer_end - primer_start, s.query_length) startpos = max(0, query_align_start - 40) endpos = min(query_align_end+40, s.query_length) print ">%s\n%s" % (s.query_name, s.query_sequence[startpos:endpos]) #query_align_end + 30])
def stride(self, output_filename, stride=10, shift=0, random=False): """Write a subset of reads to BAM output :param str output_filename: name of output file :param int stride: optionnal, number of reads to read to output one read :param int shift: number of reads to ignore at the begining of input file :param bool random: if True, at each step the read to output is randomly selected """ assert output_filename != self.filename, \ "output filename should be different from the input filename" self.reset() with pysam.AlignmentFile(output_filename,"wb", template=self.data) as fh: if random: shift = np.random.randint(stride) for i, read in enumerate(self.data): if (i + shift) % stride == 0: fh.write(read) if random: shift = np.random.randint(stride)
def filter_length(self, output_filename, threshold_min=0, threshold_max=np.inf): """Select and Write reads within a given range :param str output_filename: name of output file :param int threshold_min: minimum length of the reads to keep :param int threshold_max: maximum length of the reads to keep """ assert threshold_min < threshold_max assert output_filename != self.filename, \ "output filename should be different from the input filename" self.reset() with pysam.AlignmentFile(output_filename, "wb", template=self.data) as fh: for read in self.data: if ((read.query_length > threshold_min) & (read.query_length < threshold_max)): fh.write(read)
def filter_bool(self, output_filename, mask): """Select and Write reads using a mask :param str output_filename: name of output file :param list list_bool: True to write read to output, False to ignore it """ assert output_filename != self.filename, \ "output filename should be different from the input filename" assert len(mask) == self._N, \ "list of bool must be the same size as BAM file" self.reset() with pysam.AlignmentFile(output_filename, "wb", template=self.data) as fh: for read, keep in zip(self.data, mask): if keep: fh.write(read)
def get_bam_pairs(bams): """ Function returns pairs of bam files because sample ID relies on samples being encoded with a '.' @bams A List of Bam files. """ ### TODO Use the read group to merge the reads, far smarter! bam_list = {} for bam in bams: sam = pysam.AlignmentFile(bam,'rb') sample_id = (sam.header['RG'][0]['SM']) try: bam_list[sample_id].append(bam) except KeyError: bam_list[sample_id] = [bam] return bam_list
def merge_reads(bams, merged_output_dir): """ Merge bam files, can take odd numbers of reads. """ try: os.mkdir(merged_output_dir) except OSError: pass bam_list = get_bam_pairs(bams) for sample_id, bams in bam_list.items(): header = get_header(bams[0]) bam_out = os.path.join(merged_output_dir, os.path.basename(sample_id)) + '.bam' out = pysam.AlignmentFile(bam_out, 'wb', header=header) for bam in bams: sam = pysam.AlignmentFile(bam, 'rb') for read in sam: out.write(read) out.close() pysam.sort(bam_out, 'tmp') os.rename('tmp.bam',bam_out) pysam.index(bam_out)
def getUnDictRatio(bamF, start, end, tcr, unDict): unMappedCount = 0 usedArr = [] mappedFile = pysam.AlignmentFile(bamF,"rb") readsIter = mappedFile.fetch(tcr, start, end) for read in readsIter: if read.is_read1 : newName = read.query_name + '_1' else: newName = read.query_name + '_2' if newName not in usedArr: usedArr.append(newName) if newName in unDict: unMappedCount += 1 mappedFile.close() return (float(float(unMappedCount)/len(unDict)), unMappedCount)
def bam_to_alignments(truth_bam, ref_name, start=None, end=None): """Create list of TruthAlignment objects from a bam of Truth aligned to ref. :param truth_bam: (sorted indexed) bam with true sequence aligned to reference :param ref: name of reference to process :param start: starting position within reference :param end: ending position within reference (all alignments with any overlap with the interval start:end will be retrieved) :returns: tuple(positions, encoded_label_array) - positions: numpy structured array with 'ref_major' (reference position index) and 'ref_minor' (trailing insertion index) fields. - feature_array: 1D numpy array of encoded labels """ with pysam.AlignmentFile(truth_bam, 'rb') as bamfile: aln_reads = bamfile.fetch(reference=ref_name, start=start, end=end) alignments = [TruthAlignment(r) for r in aln_reads if not (r.is_unmapped or r.is_secondary)] alignments.sort(key=attrgetter('start')) return alignments
def get_ref_len_from_bam(bam_path, target_contig): """ Fetch the length of a given reference sequence from a :py:class:`pysam.AlignmentFile`. Parameters ---------- bam_path : str Path to the BAM alignment target_contig : str The name of the contig for which to recover haplotypes. Returns ------- end_pos : int The 1-indexed genomic position at which to stop considering variants. """ bam = pysam.AlignmentFile(bam_path) end = bam.lengths[bam.get_tid(target_contig)] bam.close() return end
def write_anomalous_read_to_bam(bam,split_reads,span_reads,anom_reads,out): print('Writing anom reads to file') split_reads = np.unique(split_reads['query_name']) span_reads = np.unique(span_reads['query_name']) anom_reads = np.unique(anom_reads['query_name']) # need to filter out any reads that were at any point marked as valid supporting reads anom_reads = np.array([x for x in anom_reads if x not in split_reads]) anom_reads = np.array([x for x in anom_reads if x not in span_reads]) bamf = pysam.AlignmentFile(bam, "rb") index = pysam.IndexedReads(bamf) index.build() anom_bam = pysam.AlignmentFile("%s_anom_reads.bam" % out, "wb", template=bamf) for read_name in anom_reads: for read in index.find(read_name): anom_bam.write(read) anom_bam.close()
def isPaired(bamfile, alignments=1000): '''check if a *bamfile* contains paired end data The method reads at most the first *alignments* and returns True if any of the alignments are paired. ''' samfile = pysam.AlignmentFile(bamfile) n = 0 for read in samfile: if read.is_paired: break n += 1 if n == alignments: break samfile.close() return n != alignments
def estimateInsertSizeDistribution(bamfile, alignments=50000): '''estimate insert size from first alignments in bam file. returns mean and stddev of insert sizes. ''' assert isPaired(bamfile), \ 'can only estimate insert size from' \ 'paired bam files' samfile = pysam.AlignmentFile(bamfile) # only get positive to avoid double counting inserts = numpy.array( [read.tlen for read in samfile.head(alignments) if read.is_proper_pair and read.tlen > 0]) insert_mean, insert_std = numpy.mean(inserts), numpy.std(inserts) print('Insert mean of %f, with standard deviation of %f inferred' % (insert_mean, insert_std)) if insert_mean > 10000 or insert_std > 1000 or \ insert_mean < 1 or insert_std < 1: print('''WARNING: anomalous insert sizes detected. Please double check or consider setting values manually.''') return insert_mean, insert_std
def retrieve_loc_reads(sv, bam, max_dep, threshold): bp_dtype = [('chrom', 'S20'), ('start', int), ('end', int), ('dir', 'S1')] sv_id, chr1, pos1, dir1, chr2, pos2, dir2, \ sv_class, orig_id, orig_pos1, orig_pos2 = [h[0] for h in dtypes.sv_dtype] bp1 = np.array((sv[chr1], sv[pos1]-(threshold*2), \ sv[pos1]+(threshold*2), sv[dir1]), dtype=bp_dtype) bp2 = np.array((sv[chr2], sv[pos2]-(threshold*2), \ sv[pos2]+(threshold*2), sv[dir2]), dtype=bp_dtype) bamf = pysam.AlignmentFile(bam, "rb") loc1_reads, err_code1 = count.get_loc_reads(bp1, bamf, max_dep) loc2_reads, err_code2 = count.get_loc_reads(bp2, bamf, max_dep) bamf.close() sv_class = str(sv['classification']) if err_code1 == 1 or err_code2 == 1: sv['classification'] = 'HIDEP' if sv_class == '' else sv_class+';HIDEP' elif err_code1 == 2 or err_code2 == 2: sv['classification'] = 'READ_FETCH_FAILED' if sv_class == '' \ else sv_class+';READ_FETCH_FAILED' return sv, loc1_reads, loc2_reads, err_code1, err_code2
def pysam_open(alignment_file, in_format='BAM'): """Open SAM/BAM file using pysam. :param alignment_file: Input file. :param in_format: Format (SAM or BAM). :returns: pysam.AlignmentFile :rtype: pysam.AlignmentFile """ if in_format == 'BAM': mode = "rb" elif in_format == 'SAM': mode = "r" else: raise Exception("Invalid format: {}".format(in_format)) aln_iter = pysam.AlignmentFile(alignment_file, mode) return aln_iter
def count_ref_reads(bampath, reference, chrom, start, end): ref_counts = numpy.zeros(end-start) non_ref_counts = numpy.zeros(end-start) bam = pysam.AlignmentFile(bampath) # stepper = "all" skips dupe, unmapped, secondary, and qcfail reads start = max(0, start) for col in bam.pileup(chrom, start, end, truncate=True, stepper="all"): refnuc = reference.fasta[chrom][col.reference_pos].upper() nuc_counts = collections.Counter() for read in col.pileups: if read.query_position is None: # nuc_counts["indel"] += 1 pass else: nuc_counts[read.alignment.query_sequence[read.query_position]] += 1 ref_counts[col.reference_pos-start] = nuc_counts[refnuc] non_ref_counts[col.reference_pos-start] = sum(nuc_counts.values()) - nuc_counts[refnuc] return ref_counts, non_ref_counts
def get_bam_coverages(options, sample, dataset): window_skip = int(1e6) window_size = 1000 skip_at_ends = int(1e6) bam = pysam.AlignmentFile(dataset.bam) print "getting coverages" coverages = [] count = 0 for chrom, start, end in get_search_regions(options.reference, window_skip, window_size, skip_at_ends): coverages.append(bam.count(chrom, start, end)) if count % 1000 == 0: print count count += 1 return coverages
def dump(bamstream, refrseqs=None, upint=50000, logstream=sys.stderr): """ Parse read alignments in BAM/SAM format. - bamstream: open file handle to the BAM/SAM file input - refrseqs: dictionary of reference sequences, indexed by sequence ID; if provided, perfect matches to the reference sequence will be discarded - upint: update interval for progress indicator - logstream: file handle do which progress indicator will write output """ bam = pysam.AlignmentFile(bamstream, 'rb') for i, record in enumerate(bam, 1): if i % upint == 0: # pragma: no cover print('...processed', i, 'records', file=logstream) if record.is_secondary or record.is_supplementary: continue if refrseqs and perfectmatch(record, bam, refrseqs): continue rn = readname(record) yield screed.Record(name=rn, sequence=record.seq, quality=record.qual)
def bam_reader(bam_file, xs_tag): global merge_data bam = pysam.AlignmentFile(bam_file, "rb") filtered_data = [] chromosomes = bam.references for chrom in chromosomes: data = bam.fetch(chrom, multiple_iterators=True) for entry in data: try: chrom_bam = bam.get_reference_name(entry.reference_id) except: continue if (entry.is_unmapped == False and chrom_bam == chrom): if xs_tag and entry.has_tag("XS"): continue else: filtered_data = get_bam_filter(entry, chrom_bam) chromosome_info(filtered_data) print_results(merge_data) merge_data = [] bam.close()
def _prepare_inputs(ma_fn, bam_file, out_dir): """ Convert to fastq with counts """ fixed_fa = os.path.join(out_dir, "file_reads.fa") count_name =dict() with file_transaction(fixed_fa) as out_tx: with open(out_tx, 'w') as out_handle: with open(ma_fn) as in_handle: h = in_handle.next() for line in in_handle: cols = line.split("\t") name_with_counts = "%s_x%s" % (cols[0], sum(map(int, cols[2:]))) count_name[cols[0]] = name_with_counts print >>out_handle, ">%s\n%s" % (name_with_counts, cols[1]) fixed_bam = os.path.join(out_dir, "align.bam") bam_handle = pysam.AlignmentFile(bam_file, "rb") with pysam.AlignmentFile(fixed_bam, "wb", template=bam_handle) as out_handle: for read in bam_handle.fetch(): read.query_name = count_name[read.query_name] out_handle.write(read) return fixed_fa, fixed_bam
def get_read_refs_from_SAM( self ): read_refs = {} read_lens = {} samfile = pysam.AlignmentFile(self.opts.sam, "rb") for k,aln in enumerate(samfile.fetch()): if aln.mapq == 254: read = "/".join(aln.query_name.split("/")[:2]) read = "_".join(read.split("_")[1:]) # ref = aln.reference_name.split("|")[0] ref = slugify(aln.reference_name) read_refs[read] = ref read_lens[read] = len(aln.seq) # Write the read_refs file to match the readnames file readnames = np.loadtxt(os.path.join( self.opts.tmp, self.fns["read_names"]), dtype="str") f_names = open(os.path.join( self.opts.tmp, self.fns["read_refs"]), "wb") f_lens = open(os.path.join( self.opts.tmp, self.fns["read_lengths"]), "wb") for readname in readnames: try: f_names.write("%s\n" % read_refs[readname]) except KeyError: f_names.write("unknown\n") try: f_lens.write("%s\n" % read_lens[readname]) except KeyError: f_lens.write("0\n") f_names.close() f_lens.close()
def test_pad_softclip_1(tmpdir): "it should memorize the result" make_bam(tmpdir.strpath, """ r1 + __.*....... r1 - .*.......__ """) o = Namespace(verbos=False, mismatch_limit=-1) sam = AlignmentFile(tmpdir.join("test.bam").strpath) a = pad_softclip(sam) b = pad_softclip(sam) assert a is b
def BAM2FASTQ(input_file = bam_in): ''' Extract paired FASTQ records from a BAM. :return: ''' import pysam samfile = pysam.AlignmentFile(bam_in, "rb") # Open file handle # Iterate through all reads
def samfile_from_args(args): return AlignmentFile(args.bam)
def load_bam(bam_path): return AlignmentFile(data_path(bam_path))
def __init__(self, filename): """ :param str filename: input BAM file """ self.filename = filename self.data = pysam.AlignmentFile(filename, check_sq=False) self._N = None self._df = None self._nb_pass = None
def reset(self): self.data.close() self.data = pysam.AlignmentFile(self.filename, check_sq=False)
def random_selection(self, output_filename, nreads=None, expected_coverage=None, reference_length=None): """Select random reads :param nreads: number of reads to select randomly. Must be less than number of available reads in the orignal file. :param expected_coverage: :param reference_length: of expected_coverage and reference_length provided, nreads is replaced automatically. """ assert output_filename != self.filename, \ "output filename should be different from the input filename" self.reset() if expected_coverage and reference_length: mu = self.stats['mean'] nreads = int(expected_coverage * reference_length / mu) assert nreads < len(self), "nreads parameter larger than actual Number of reads" selector = random.sample(range(len(self)), nreads) logger.info("Creating a pacbio BAM file with {} reads".format(nreads)) with pysam.AlignmentFile(output_filename,"wb", template=self.data) as fh: for i, read in enumerate(self.data): if i in selector: fh.write(read)
def get_header(bam): """ Return the BAM header. """ return pysam.AlignmentFile(bam,'rb').header
def findReadsAndSegments(samF, mappedReadsDict, idNameDict,chain): samFile = pysam.AlignmentFile(samF,'r') readsIter = samFile.fetch(until_eof = True) for read in readsIter: if read.is_unmapped == False: seg = samFile.getrname(read.reference_id) if seg in idNameDict: if idNameDict[seg].find(chain) != -1: readName = read.query_name if readName not in mappedReadsDict: mappedReadsDict[readName] = [] if seg not in mappedReadsDict[readName]: mappedReadsDict[readName].append(seg) samFile.close() return mappedReadsDict
def findCountsInRegion(bamF, start, end, tcr): readsArr = [] mappedFile = pysam.AlignmentFile(bamF,"rb") readsIter = mappedFile.fetch(tcr, start, end) for read in readsIter: if read.is_read1 : newName = read.query_name + '_1' else: newName = read.query_name + '_2' if newName not in readsArr: readsArr.append(newName) mappedFile.close() counts = len(readsArr) return counts
def __init__(self, bam:str, reference:str, columns:int=20000, cache:int=None, minor_gaps:bool=False): """Interface to samtools tview functionality. :param bam: path to bam file of reads aligned to a common reference. :param reference: path to fasta file of reference to which reads are aligned. :param columns: default number of tview columns to read in one go. :param cache: Currently not implemented. :param minor_gaps: encoded gaps in non-reference positions distinctly. """ self.logger = logging.getLogger('TView') self._bam = bam self._reference = reference self._columns = columns self._cache = cache self._minor_gaps = minor_gaps if self._minor_gaps: raise NotImplementedError self._columns_per_base = 3 # reasonable estimate self._column_pad = 1.1 # multiplier when _columns_per_base is updated # crosscheck the inputs with pysam.AlignmentFile(bam, 'rb') as b: brefs = b.references blens = b.lengths rrefs = set(pysam.FastaFile(reference).references) sbrefs = set(brefs) if not sbrefs.issuperset(rrefs): raise ValueError("input bam and reference do not appear consistent: {} vs {}.".format(sbrefs, rrefs)) self._refs = set(brefs) self._reflens = dict(zip(brefs, blens)) self._pileup = None # buffered `Pileup` object
def generate_pileup_chunks(bams, ref_fasta, ref_name, start=0, end=None, chunk_len=50000, overlap=1000): """Yield chunks of pileup(s). :param bams: iterable of input .bam files. :param ref_fasta: input .fasta file. :param ref_name: name of reference within .bam to parse. :param start: start reference coordinate. :param end: end reference coordinate. If `None` the full extent of the reference will be parsed. :param chunk_len: int, chunk length in reference coordinates. :param overlap: int, overlap of adjacent chunks in reference coordinates. :yields: (tuple `Pileup` objects, None) ..note:: the None in the yielded values is for compatibility with an old API and will be removed. """ tview = MultiTView(bams, ref_fasta, columns=chunk_len) if end is None: align_fhs = (pysam.AlignmentFile(bam) for bam in bams) end = min([dict(zip(p.references, p.lengths))[ref_name] for p in align_fhs]) logger.info("Chunking {}: {}-{} in chunks of {} overlapping by {}".format(ref_name, start, end, chunk_len, overlap)) #TODO: we could change how this is done since the class could cache # everything in the entire region of interest. for chunk_start, chunk_end in segment_limits(start, end, segment_len=chunk_len, overlap_len=overlap): try: pileups, ref_seq = tview.pileup(ref_name, chunk_start, chunk_end) except TViewException: logger.info("Skipping region {}-{} as TView did not find any reads".format(chunk_start, chunk_end)) else: yield LabelledPileup(pileups=pileups, labels=None, ref_seq=ref_seq)
def get_chrom_length(self, chrom): """ Extract chromosome length from BAM header. Parameters ---------- chrom : str The name of the chromosome or scaffold. Returns ------- length : int The length (integer) of the chromsome/scaffold Raises ------ RuntimeError If chromosome name not present in bam header """ bamfile = pysam.AlignmentFile(self.filepath, "rb") lengths = dict(zip(bamfile.references, bamfile.lengths)) try: lens = lengths[chrom] bamfile.close() return lens except: self.logger.error( "{} not present in bam header for {}. Exiting.".format( chrom, self.filepath)) logging.shutdown() raise RuntimeError( "Chromosome name not present in bam header. Exiting")
def chromosome_lengths(self): """ Returns ------- tuple chromosome lengths ordered by sequence order in bam header """ bamfile = pysam.AlignmentFile(self.filepath, "rb") lengths = bamfile.lengths bamfile.close() return lengths
def chromosome_names(self): """ Returns ------- tuple chromosome names ordered by sequence order in bam header """ bamfile = pysam.AlignmentFile(self.filepath, "rb") names = bamfile.references bamfile.close() return names
def chrom_counts(self): """ Get read counts per chrom from a bamfile """ self.logger.info( "Using index of {} to get read counts per chromosome".format( self.filepath)) analyze_start = time.time() samfile = pysam.AlignmentFile(self.filepath, "rb") idx_out = samfile.get_index_statistics() samfile.close() self.logger.info("Index analysis complete. Elapsed time: {} seconds".format( time.time() - analyze_start)) return idx_out
def reads_to_sam(reads,bam,bp1,bp2,dirout,name): ''' For testing read assignemnts. Takes reads from array, matches them to bam file reads by query name and outputs them to Sam ''' bamf = pysam.AlignmentFile(bam, "rb") loc1 = '%s:%d:%d' % (bp1['chrom'], bp1['start'], bp1['end']) loc2 = '%s:%d:%d' % (bp2['chrom'], bp2['start'], bp2['end']) iter_loc1 = bamf.fetch(region=loc1,until_eof=True) iter_loc2 = bamf.fetch(region=loc2,until_eof=True) loc1 = '%s-%d' % (bp1['chrom'], (bp1['start']+bp1['end'])/2) loc2 = '%s-%d' % (bp2['chrom'], (bp1['start']+bp1['end'])/2) sam_name = '%s_%s-%s' % (name,loc1,loc2) if not os.path.exists(dirout): os.makedirouts(dirout) bam_out = pysam.AlignmentFile('%s/%s.sam'%(dirout,sam_name), "w", header=bamf.header) for x in iter_loc1: if len(reads)==0: break if x.query_name in reads: bam_out.write(x) bam_out.write(bamf.mate(x)) idx = int(np.where(reads==x.query_name)[0]) reads = np.delete(reads,idx) for x in iter_loc2: if len(reads)==0: break if x.query_name in reads: bam_out.write(x) bam_out.write(bamf.mate(x)) idx = int(np.where(reads==x.query_name)[0]) reads = np.delete(reads,idx) bamf.close() bam_out.close()
def recount_anomalous_reads(bam,outname,anom_reads,max_dp,max_ins): print('Recounting anomalous reads') anom_reads = np.unique(anom_reads['query_name']) sv_proc = np.genfromtxt(outname,delimiter='\t',names=True,dtype=dtypes.sv_out_dtype,invalid_raise=False) for idx,row in enumerate(sv_proc): sv_id, chr1_field, pos1_field, dir1_field, \ chr2_field, pos2_field, \ dir2_field, sv_class, \ oid_field, opos1_field, opos2_field = [h[0] for h in dtypes.sv_dtype] bp1 = np.array((row[chr1_field],row[pos1_field]-max_ins, row[pos1_field]+max_ins,row[dir1_field]),dtype=dtypes.bp_dtype) bp2 = np.array((row[chr2_field],row[pos2_field]-max_ins, row[pos2_field]+max_ins,row[dir2_field]),dtype=dtypes.bp_dtype) bamf = pysam.AlignmentFile(bam, "rb") loc1_reads, err_code1 = get_loc_reads(bp1,bamf,max_dp) loc2_reads, err_code2 = get_loc_reads(bp2,bamf,max_dp) bamf.close() if err_code1==0 and err_code2==0: anom1 = [ x['query_name'] for x in loc1_reads if x['query_name'] in anom_reads] anom2 = [ x['query_name'] for x in loc2_reads if x['query_name'] in anom_reads] anom = np.concatenate([anom1,anom2]) anom_count = len(np.unique(anom)) sv_proc[idx]['anomalous'] = anom_count print('found %d anomalous reads at %s:%d|%s:%d' % (anom_count,row[chr1_field],row[pos1_field],row[chr2_field],row[pos2_field])) with open(outname,'w') as outf: header_out = [h[0] for idx,h in enumerate(dtypes.sv_out_dtype)] writer = csv.writer(outf,delimiter='\t',quoting=csv.QUOTE_NONE) writer.writerow(header_out) for row in sv_proc: writer = csv.writer(outf,delimiter='\t',quoting=csv.QUOTE_NONE) writer.writerow(row)
def getNumberOfAlignments(bamfile): ''' return number of alignments in bamfile. ''' samfile = pysam.AlignmentFile(bamfile) return samfile.mapped
def __init__(self,fh=None,Ped=None,gen=None): self.id = None self.chr_flag=False # True == 'chrN'; False == 'N' self.sex=None self.char='b' self.refs=OrderedDict() self.fh=fh self.Snv=None self.snv_index=None sm=[] bam = pysam.AlignmentFile(fh) if bam.is_cram==True: self.char='c' header = bam.header if header.get('RG')==None: sys.stderr.write('WARNING: {} lacks Read Group (@RG) entry in the header. Skipping ...\n'.format(fh)) else: for entry in header['RG']: if entry.get('SM')!=None: sm.append(entry['SM']) sm = list(set(sm)) if len(sm)>1: sys.stderr.write('WARNING: {} contains two sample entries ({}) in the header. Skipping {} ...\n'.format(fh,sm,fh)) else: self.id=sm[0] if self.id!=None: if Ped.sex.get(self.id)==None: sys.stderr.write('WARNING: {} is not found in the PED file. Skipping {} ...\n'.format(self.id,fh)) else: self.sex=Ped.sex[self.id] if str(bam.references[0]).startswith('chr'): self.chr_flag=True chroms = accepted_chrom(self.chr_flag,gen) for i in range(len(bam.references)): chrom,leng = str(bam.references[i]),bam.lengths[i] if chrom in chroms: self.refs[chrom]=int(leng) bam.close()
def bam_init(args=None,Ped=None,Snv=None,gen=None): bams=[] for f in args: errFH(f) try: pysam.AlignmentFile(f).check_index() except ValueError: sys.stderr.write('WARNING: {} is not indexed with samtools index. Skipping ...\n'.format(f)) continue except AttributeError: sys.stderr.write('WARNING: {} appears to be in SAM format. Convert to BAM with `samtools view -bh {}` and index with samtools index\n'.format(f,f)) continue bam = Bam(f,Ped,gen) if len(Snv)<1: print 'FATAL ERROR: SNV file(s) were not formatted correctly. See https://github.com/dantaki/SV2/wiki/input#snv-vcf for details' sys.stderr.write('FATAL ERROR: SNV file(s) were not formatted correctly. See https://github.com/dantaki/SV2/wiki/input#snv-vcf for details\n') sys.exit(1) for snv in Snv: if snv.id.get(bam.id)!=None: bam.Snv,bam.snv_index=snv,snv.id[bam.id] if bam.id!=None and bam.Snv==None: sys.stderr.write('WARNING: BAM file {} sample name (@RG SM:<sample_id>):{} not found in SNV VCFs. Skipping {} ...\n'.format(f,bam.id,f)) if bam.id==None: sys.stderr.write('WARNING: Skipping BAM file {}. No sample name (@RG SM:<sample_id>). See https://github.com/dantaki/SV2/wiki/input#bam for details') if bam.id!=None and bam.Snv!=None: bams.append(bam) if len(bams)<1: print 'FATAL ERROR: BAM file(s) were not formatted correctly. See https://github.com/dantaki/SV2/wiki/input#bam for details' sys.stderr.write('FATAL ERROR: BAM file(s) were not formatted correctly. See https://github.com/dantaki/SV2/wiki/input#bam for details\n') sys.exit(1) return bams
def open_asg(in_fn, in_format): # try to detect kraken and histo formats automatically if in_format is None: if '.' in in_fn: f_ext = in_fn.split('.')[-1] if f_ext in IN_FMTS: in_format = f_ext else: with open(in_fn, 'rb') as f: f_start = f.read(2) if f_start == b'C\t' or f_start == b'U\t': in_format = 'kraken' elif f_start == b'#O': in_format = 'histo' if in_format == 'sam': in_f = pysam.AlignmentFile(in_fn, "r") elif in_format == 'bam': in_f = pysam.AlignmentFile(in_fn, "rb") elif in_format == 'cram': in_f = pysam.AlignmentFile(in_fn, "rc") elif in_format == 'uncompressed_bam': in_f = pysam.AlignmentFile(in_fn, "ru") elif in_format == 'kraken': in_f = open(in_fn, 'r') # no need to load assignments if input is a histogram -> go to load_histo elif in_format == 'histo': in_f = None # let pysam assess the format else: in_f = pysam.AlignmentFile(in_fn) return in_f, in_format