我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用pysam.AlignedSegment()。
def test_process_reads_read_obs_paired_end(self): aln1b = pysam.AlignedSegment() aln1b.reference_start = 30 aln1b.query_name = 'read1' aln1b.mapping_quality = 30 aln1b.query_sequence = "AAAAACAAAACAAAAT" aln1b.query_qualities = [30] * 16 aln1b.cigarstring = '16M' self.alns.append(aln1b) var_pos = [15, 20, 25, 35, 40] res = preprocess.process_reads(self.alns, var_pos, 20, 10) exp = {'read1':{15:'T', 20:'T', 25:'T', 35:'C', 40:'C'}, 'read2':{15:'G', 20:'G', 25:'G'}} self.assertEqual(res, exp)
def test_process_reads_read_obs_paired_end_overlap(self): aln1b = pysam.AlignedSegment() aln1b.reference_start = 20 aln1b.query_name = 'read1' aln1b.mapping_quality = 20 aln1b.query_sequence = "AAAAATAAAACAAAAT" aln1b.query_qualities = [30] * 16 aln1b.cigarstring = '16M' self.alns.append(aln1b) var_pos = [15, 20, 25, 35] res = preprocess.process_reads(self.alns, var_pos, 20, 10) exp = {'read1':{15:'T', 25:'T', 35:'T'}, 'read2':{15:'G', 20:'G', 25:'G'}} self.assertEqual(res, exp)
def test_process_reads_read_obs_paired_end_overlap_1bad_base_qual(self): aln1b = pysam.AlignedSegment() aln1b.reference_start = 20 aln1b.query_name = 'read1' aln1b.mapping_quality = 20 aln1b.query_sequence = "AAAAATAAAACAAAAC" qqual = [30] * 16 qqual[0] = 5 aln1b.query_qualities = qqual aln1b.cigarstring = '16M' self.alns.append(aln1b) var_pos = [15, 20, 25, 35] res = preprocess.process_reads(self.alns, var_pos, 20, 10) exp = {'read1':{15:'T', 20:'T', 25:'T', 35:'C'}, 'read2':{15:'G', 20:'G', 25:'G'}} self.assertEqual(res, exp)
def cigar_parse(self, tuples): """ arguments: <tuples> a CIGAR string tuple list in pysam format purpose: This function uses the pysam cigarstring tuples format and returns a list of tuples in the internal format, [(20, 'M'), (5, "I")], et cetera. The zeroth element of each tuple is the number of bases for the CIGAR string feature. The first element of each tuple is the CIGAR string feature type. There are several feature types in SAM/BAM files. See below: 'M' - match 'I' - insertion relative to reference 'D' - deletion relative to reference 'N' - skipped region from the reference 'S' - soft clip, not aligned but still in sam file 'H' - hard clip, not aligned and not in sam file 'P' - padding (silent deletion from padded reference) '=' - sequence match 'X' - sequence mismatch 'B' - BAM_CBACK (I don't actually know what this is) """ # I used the map values from http://pysam.readthedocs.io/en/latest/api.html#pysam.AlignedSegment psam_to_char = {0: 'M', 1: 'I', 2: 'D', 3: 'N', 4: 'S', 5: 'H', 6: 'P', 7: '=', 8: 'X', 9: 'B'} return [(value, psam_to_char[feature]) for feature, value in tuples]
def assignReadRefAlt(x, bam): ''' Given a VCF record and a bam, return the read names that support either the REF or ALT alleles :param x: VCF record python object :param bam: BAM file handle :return: dictionary containing two lists of strings, ref/alt. ''' iter = input_bam.pileup(x.contig, x.start, x.stop) # Iterable mpileup, covering a much wider genomic range, from start of left-most read output_dict = {'ref': None, 'alt': None, 'nomatch': 0, 'coverage': None} # Initialise output object for pileupcolumn in iter: ref_reads = [] alt_reads = [] if pileupcolumn.pos == x.start: for pileupread in pileupcolumn.pileups: if not pileupread.is_del and not pileupread.is_refskip: a_read = pileupread.alignment # pysam.AlignedSegment base_at_pos = a_read.query_sequence[pileupread.query_position] #print base_at_pos, x.alleles, x.info["AF"] if base_at_pos == x.ref[0]: ref_reads.append(a_read.query_name) # Read matches ref at pos elif base_at_pos == x.alts[0]: alt_reads.append(a_read.query_name) # Read matches alt at pos else: output_dict['nomatch'] += 1 ## Allele matches neither ref/alt output_dict['ref'] = ref_reads output_dict['alt'] = alt_reads output_dict['coverage'] = float(pileupcolumn.n) return output_dict
def make_read(seq, cigar, mdtag=None, name="dummy", mapq=10, baseq=30): read = pysam.AlignedSegment() read.seq = seq read.cigarstring = cigar if mdtag: read.set_tag("MD", mdtag) read.qname = name read.mapq = mapq qualities_string = pysam.qualities_to_qualitystring([baseq] * len(seq)) qualities_bytes = qualities_string.encode("ascii") read.qual = qualities_bytes return read
def CreateReadObject(read, newseq, newqual, newcigar, startread, basetag=[]) : a = pysam.AlignedSegment() a.query_name = read.query_name a.query_sequence = newseq a.query_qualities = pysam.qualitystring_to_array(newqual) a.cigar = newcigar a.reference_start = startread # If (Star) mapper has assigned a value of 255 to mapping quality, # change it to 50 mapqual = read.mapping_quality if mapqual == 255 : a.mapping_quality = 50 else : a.mapping_quality = mapqual a.reference_id = read.reference_id # If read has RG read group tag, keep it try : r_RG = read.get_tag('RG') a.tags = () a.set_tag('RG', r_RG) except : a.tags = () a.next_reference_id = -1 a.next_reference_start = -1 a.template_length = 0 a.flag = UpdateFlag(read.flag) return a # Goes through the given cigar list and reports how many times cigarType is equal to 3 # Optional field is finalpos, which is cutoff position for counting the splits (relative to start pos) # Default value for finalpos is something very large
def __init__(self, line, start=-1, end=-1, strand=1, file_format='', bamfile=None, info=''): self.info = "" self.file_format = file_format if type(line) == pysam.AlignedRead or type(line) == pysam.AlignedSegment: self.load_pysamread(line, bamfile) elif start == -1: self.load_line(line, file_format) elif end == -1: self.load_pos(line, start, start, strand) else: self.load_pos(line, start, end, strand) if len(info) > 0: self.info = info
def setUp(self): self.mq = 30 self.bq = 30 aln1 = pysam.AlignedSegment() aln1.reference_start = 10 aln1.query_name = 'read1' aln1.mapping_quality = 30 aln1.query_sequence = "AAAAATAAAATAAAAT" aln1.query_qualities = [30] * 16 aln1.cigarstring = '16M' aln2 = pysam.AlignedSegment() aln2.reference_start = 12 aln2.query_name = 'read2' aln2.mapping_quality = 20 aln2.query_sequence = "AAAGAAGAAAAG" qqual = [33] * 12 qqual[3] = 20 aln2.query_qualities = qqual aln2.cigarstring = '5M2D7M' aln3 = pysam.AlignedSegment() aln3.mapping_quality = 0 aln3.query_name = 'read3' self.alns = [aln1, aln2, aln3]
def construct_subgraph(mbam, read_qname, mread_dict, processed_mreads, chr_list, winsize=50, unstranded=False): """DOCSTRING Args: Returns: """ # record of processed alignments only need kept on within-subgraph level processed_mread_alignments = set() counter = 0 # a list of `pysam.AlignedSegment` objects # note that all taggers are already stored in `pysam.AlignedSegment.opt('RT')` read_aln_list = [x for x in mread_dict[read_qname]] processed_mreads.add(read_qname) read_to_locations = defaultdict(dict) # read_qname -> {node_name1:segment1, node_name2:segment2} # enumerate all connected components while True: counter+=1; print "%i: %i"%(counter, len(read_aln_list)) next_read_aln_list = [] gen = read_aln_list if len(read_aln_list)<200 else tqdm(read_aln_list) for alignment in gen: ## build a node for this mread alignment ## (if not already processed, i.e. built before) if alignment in processed_mread_alignments: continue genomic_cluster, this_mread_dict, discarded_mread_list = \ build_read_cluster(alignment, chr_list, mbam, unstranded=unstranded, winsize=winsize) _ = map(processed_mread_alignments.add, discarded_mread_list) if genomic_cluster is None: # this cluster is invald (only double-mappers) continue ## update loc2read, read2loc node_name = ':'.join([str(x) for x in genomic_cluster]) #if node_name in subgraph: # logger.debug("I revisited '%s' at read '%s'."%(node_name, read_qname)) # break #subgraph.add(node_name) for x_qname in this_mread_dict: read_to_locations[x_qname].update({node_name : this_mread_dict[x_qname]}) ## then add new alignments(edges) to generate connected nodes ## in the next iteration _ = map(processed_mread_alignments.add, this_mread_dict.values()) for read_x_qname in this_mread_dict: if read_x_qname in processed_mreads: continue x_aln_list = [aln for aln in mread_dict[read_x_qname] if not aln in processed_mread_alignments] next_read_aln_list.extend(x_aln_list) ## .. and record to processed reads since we have generated ## the nodes for them _ = map(processed_mreads.add, this_mread_dict.keys()) # if no more connected nodes can be found, break loop if len(next_read_aln_list)==0: break read_aln_list = next_read_aln_list return read_to_locations, processed_mreads
def stats_from_aligned_read(read): """Create summary information for an aligned read :param read: :class:`pysam.AlignedSegment` object """ tags = dict(read.tags) try: tags.get('NM') except: raise IOError("Read is missing required 'NM' tag. Try running 'samtools fillmd -S - ref.fa'.") name = read.qname if read.flag == 4: return None counts = defaultdict(int) for (i, j) in read.cigar: counts[i] += j match = counts[0] ins = counts[1] delt = counts[2] # NM is edit distance: NM = INS + DEL + SUB sub = tags['NM'] - ins - delt length = match + ins + delt iden = 100*float(match - sub)/match acc = 100 - 100*float(tags['NM'])/length read_length = read.infer_read_length() coverage = 100*float(read.query_alignment_length) / read_length direction = '-' if read.is_reverse else '+' results = { "name":name, "coverage":coverage, "direction":direction, "length":length, "read_length":read_length, "ins":ins, "del":delt, "sub":sub, "iden":iden, "acc":acc } return results