我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用pysam.Fastafile()。
def test_ctnnb1_get_aa_mut_info(): import pysam from prob2020.python.gene_sequence import GeneSequence # read fasta ctnnb1_fasta = os.path.join(file_dir, 'data/CTNNB1.fa') gene_fa = pysam.Fastafile(ctnnb1_fasta) gs = GeneSequence(gene_fa, nuc_context=1) # read CTNNB1 bed file ctnnb1_bed = os.path.join(file_dir, 'data/CTNNB1.bed') bed_list = [b for b in utils.bed_generator(ctnnb1_bed)] gs.set_gene(bed_list[0]) # specify mutation coding_pos = [0] somatic_base = ['C'] # check mutation info aa_info = mc.get_aa_mut_info(coding_pos, somatic_base, gs) ref_codon_msg = 'First codon should be start codon ({0})'.format(aa_info['Reference Codon'][0]) assert aa_info['Reference Codon'][0] == 'ATG', ref_codon_msg assert aa_info['Somatic Codon'][0] == 'CTG', 'First "A" should be replaced with a "C"' assert aa_info['Codon Pos'][0] == 0, 'Start codon should be position 0'
def recover_unmapped_mut_info(mut_info, bed, sc, opts): # retreive info based on annotated protein effects and genomic coordinates has_unmapped_opts = ('use_unmapped' in opts) and ('genome' in opts) use_unmapped = opts['use_unmapped'] and opts['genome'] if has_unmapped_opts and use_unmapped: genome_fa = pysam.Fastafile(opts['genome']) # try to still use mutations that are not on the reference transcript tmp_mut_info = mut_info[mut_info['Coding Position'].isnull()] unmapped_mut_info = get_unmapped_aa_mut_info(tmp_mut_info, genome_fa, bed.strand, bed.chrom, opts['context']) genome_fa.close() # fill in tumor sample/tumor type info unmapped_mut_info['Tumor_Sample'] = tmp_mut_info['Tumor_Sample'].tolist() unmapped_mut_info['Tumor_Type'] = tmp_mut_info['Tumor_Type'].tolist() # filter out cases where the nucleotide context does not exist # on the reference transcript bad_contexts = [i for i in range(len(unmapped_mut_info['Context'])) if not sc.is_valid_context(unmapped_mut_info['Context'][i])] for key in unmapped_mut_info: unmapped_mut_info[key] = utils.filter_list(unmapped_mut_info[key], bad_contexts) else: unmapped_mut_info = {'Context': [], 'Reference AA': [], 'Codon Pos': [], 'Somatic AA': [], 'Tumor_Allele': [], 'Tumor_Sample': [], 'Tumor_Type':[]} return unmapped_mut_info
def main(opts): # read bed file, extract gene sequence from genome, write to fasta genome_fa = pysam.Fastafile(opts['input']) with open(opts['output'], 'w') as handle: for bed_row in utils.bed_generator(opts['bed']): fasta_seq = gs.fetch_gene_fasta(bed_row, genome_fa) handle.write(fasta_seq) genome_fa.close()
def __init__(self, options): # Openning tabix file representing the reference genome self.fastafile = pysam.Fastafile(options.args['reference']) # Retrieving the sequence of a genomic region
def __init__(self, task_queue, result_queue, bamName, referenceName, honH5): #*args, **kwargs): multiprocessing.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue self.bam = pysam.Samfile(bamName) if referenceName is not None: self.reference = pysam.Fastafile(referenceName) else: self.reference = None self.honH5 = honH5 #self.args = args #self.kwargs = kwargs
def __init__(self,options): # Openning tabix file representing the reference genome self.fastafile=pysam.Fastafile(options.args['reference']) # Retrieving the sequence of a genomic region
def is_nonsilent(mut_df, bed_dict, opts): # convert dictionary to list for bed objects gene_beds = [b for chrom in bed_dict for b in bed_dict[chrom]] # initiate gene sequences gene_fa = pysam.Fastafile(opts['input']) gs = GeneSequence(gene_fa, nuc_context=opts['context']) # non-silent SNV classes non_silent_snv = ['Nonsense_Mutation', 'Nonstop_Mutation', 'Splice_Site', 'Translation_Start_Site', 'Missense_Mutation'] # record indels and get only snvs mut_df['is_nonsilent'] = 0 indel_flag = indel.is_indel_annotation(mut_df) mut_df.loc[indel_flag, 'is_nonsilent'] = 1 snv_df = mut_df[~indel_flag] # iterate over each gene for bed in gene_beds: # initiate for this gene tmp_df = snv_df[snv_df['Gene']==bed.gene_name] gs.set_gene(bed) # compute context counts and somatic bases for each context gene_tuple = compute_mutation_context(bed, gs, tmp_df, opts) context_cts, context_to_mutations, mutations_df, gs, sc = gene_tuple if len(mutations_df): # get snv information tmp_mut_info = get_aa_mut_info(mutations_df['Coding Position'], mutations_df['Tumor_Allele'].tolist(), gs) # get string describing variant var_class = cutils.get_variant_classification(tmp_mut_info['Reference AA'], tmp_mut_info['Somatic AA'], tmp_mut_info['Codon Pos']) # detect if non-silent snv is_nonsilent_snv = [1 if (x in non_silent_snv) else 0 for x in var_class] mut_df.loc[tmp_df.index, 'is_nonsilent'] = is_nonsilent_snv # return a pandas series indicating nonsilent status is_nonsilent_series = mut_df['is_nonsilent'].copy() del mut_df['is_nonsilent'] return is_nonsilent_series
def main(opts): # hack to index the FASTA file gene_fa = pysam.Fastafile(opts['input']) gene_fa.close() # Get Mutations mut_df = pd.read_csv(opts['mutations'], sep='\t') orig_num_mut = len(mut_df) # rename columns to fit my internal column names rename_dict = { 'Hugo_Symbol': 'Gene', 'Tumor_Sample_Barcode': 'Tumor_Sample', 'Tumor_Seq_Allele2' : 'Tumor_Allele' } mut_df.rename(columns=rename_dict, inplace=True) # restrict to only observed genes if flag present restricted_genes = None if opts['restrict_genes']: restricted_genes = set(mut_df['Gene'].unique()) # process indels indel_df = indel.keep_indels(mut_df) # return indels only indel_df.loc[:, 'Start_Position'] = indel_df['Start_Position'] - 1 # convert to 0-based indel_df.loc[:, 'indel len'] = indel_df['indel len'] + 1 logger.info('There were {0} indels identified.'.format(len(indel_df))) mut_df = mut_df.dropna(subset=['Tumor_Allele', 'Start_Position', 'Chromosome']) logger.info('Kept {0} mutations after droping mutations with missing ' 'information (Droped: {1})'.format(len(mut_df), orig_num_mut - len(mut_df))) # select valid single nucleotide variants only mut_df = utils._fix_mutation_df(mut_df, opts['unique']) # read in bed info bed_dict = utils.read_bed(opts['bed'], restricted_genes) # perform permutation opts['handle'] = open(opts['output'], 'w') multiprocess_permutation(bed_dict, mut_df, opts, indel_df) # save indels if opts['maf']: #with open(opts['output'], 'a') as handle: mywriter = csv.writer(opts['handle'], delimiter='\t', lineterminator='\n') for maf_lines in indel.simulate_indel_maf(indel_df, bed_dict, opts['num_iterations'], opts['seed']): mywriter.writerows(maf_lines) opts['handle'].close()
def main(opts): # read in mutations mut_df = pd.read_csv(opts['mutations'], sep='\t') orig_num_mut = len(mut_df) # correct chromosome names mut_df['Chromosome'] = correct_chrom_names(mut_df['Chromosome']) # fix additional issues mut_df = mut_df.dropna(subset=['Tumor_Allele', 'Start_Position', 'Chromosome']) logger.info('Kept {0} mutations after droping mutations with missing ' 'information (Droped: {1})'.format(len(mut_df), orig_num_mut - len(mut_df))) mut_df = utils._fix_mutation_df(mut_df) # read genome fasta file genome_fa = pysam.Fastafile(opts['fasta']) # read BED file for transcripts bed_dict = utils.read_bed(opts['bed'], []) gene2bed = {item.gene_name: item for bed_list in bed_dict.values() for item in bed_list} # group mutations by gene mut_grpby = mut_df.groupby('Gene') unmapped_mut_list = [] for i, mut_info in mut_grpby: gene_name = mut_info['Gene'].iloc[0] # try to find tx annotation for gene bed = None try: bed = gene2bed[gene_name] except KeyError: pass if bed: # get coding positions, mutations unmapped to the reference tx will have # NA for a coding position for ix, row in mut_info.iterrows(): coding_pos = bed.query_position(bed.strand, row['Chromosome'], row['Start_Position']) if not coding_pos: unmapped_mut_list.append(row.tolist()) else: #unmapped_mut_df = pd.concat([unmapped_mut_df, mut_info]) unmapped_mut_list += mut_info.values.tolist() # save the unmapped mutations to a file unmapped_mut_df = pd.DataFrame(unmapped_mut_list, columns=mut_df.columns) logger.info('{0} mutations were unmappable to a ' 'reference transcript'.format(len(unmapped_mut_df))) unmapped_mut_df.to_csv(opts['unmapped'], sep='\t', index=False) coord_base, coord_strand = detect_coordinates(mut_df, genome_fa) logger.info('RESULT: {0}-based coordinates, positions reported on {1} strand'.format(coord_base, coord_strand)) genome_fa.close()
def test(argv): numpy.seterr(all="ignore") args = parseArgs(argv) setupLogging(True)#keep debug on.. you're testing! logging.critical(("Running HSpots.py directly implements testing mode. " "If you're trying to run the full, actual program, use " "Honey.py spots")) bam = pysam.Samfile(args.bam) reference = pysam.Fastafile(args.reference) try: if bam.header["HD"]["SO"] != "coordinate": logging.warning("BAM is not sorted by coordinates! Performance may be slower") except KeyError: logging.warning("Assuming BAM is sorted by coordinate. Be sure this is correct") logging.info("Running in test mode") #do what you will.. from here #spot = SpotResult(chrom='11', start=2215290, end=2215798, svtype="DEL", size=208) #spot = SpotResult(chrom='22', start=45964261, end=45965596, svtype="DEL", size=-1) # This is what I need to start with #spot = SpotResult(chrom="22", start=45963975, end=45964532, svtype="DEL", size=57) fh = open("honeymissing.bed") for line in fh.readlines(): data = line.strip().split('\t') spot = SpotResult(chrom=data[0], start=int(data[1]), end = int(data[2]), \ size=int(data[3].split('=')[-1]), svtype="DEL") j = SpotCaller('group', spot.chrom, spot.start, spot.end, args) if j.supportingReadsFilter(spot, bam, args): consen = ConsensusCaller(spot, args) consen(bam, reference, 'none') for i in consen.newSpots: i.tags["seqmade"] = True print i if len(consen.newSpots) == 0: spot.tags["noseq"] = True print str(spot) else: spot.tags["filtfail"] = True print str(spot) #done with test code logging.info("Finished testing")
def test(argv): numpy.seterr(all="ignore") args = parseArgs(argv) setupLogging(True)#keep debug on.. you're testing! logging.critical(("Running HSpots.py directly implements testing mode. " "If you're trying to run the full, actual program, use " "Honey.py spots")) bam = pysam.Samfile(args.bam) reference = pysam.Fastafile(args.reference) try: if bam.header["HD"]["SO"] != "coordinate": logging.warning("BAM is not sorted by coordinates! Performance may be slower") except KeyError: logging.warning("Assuming BAM is sorted by coordinate. Be sure this is correct") logging.info("Running in test mode") #do what you will.. from here # This is what I need to start with #spot = SpotResult(chrom="7", start=138402727, end=138402830, svtype="INS", size=113) chrom="3" start,end = (195498264, 195498609) start -=200 end +=200 spot = SpotResult(chrom=chrom, start=start, end=end, svtype="DEL", size=100) #fh = open("possible.bed") #for line in fh.readlines(): #data = line.strip().split('\t') #spot = SpotResult(chrom=data[0], start=int(data[8]), end = int(data[9]), \ #size=int(data[5]), svtype=data[4]) j = SpotCaller('group', spot.chrom, spot.start, spot.end, args) if j.supportingReadsFilter(spot, bam, args): consen = ConsensusCaller(spot, args) consen(bam, reference, 'none') for i in consen.newSpots: i.tags["seqmade"] = True print i if len(consen.newSpots) == 0: spot.tags["noseq"] = True print str(spot) else: spot.tags["filtfail"] = True print str(spot) #done with test code logging.info("Finished testing")
def bam2hist(args): samfile = pysam.Samfile(args.bam) regions = samfile (rname, start, end) = parse_region(args.region) refseq = None if args.genome: fasta = pysam.Fastafile(args.genome) refseq = fasta.fetch(reference=rname, start=start, end=end) if args.with_header: print ('rname', 'pos', 'offset', 'ref', 'nread', 'major_base', 'major_count', 'minor_count', 'major_freq', 'minor_freq', 'N', 'A', 'T', 'C' ,'G', 'D', 'ndel', 'nins', sep='\t') if args.use_multimaps: skip_flag = BAM_FLAGS.unmapped else: skip_flag = SKIP_FLAG for p in samfile.pileup(reference=rname, start=start, end=end, mask=skip_flag): if (p.pos < start) or (end is not None and end <= p.pos): # skip outside of region continue h = ReadHist() for read in p.pileups: if read.query_position is None: continue info = ReadInfo(read, skip_flag=skip_flag) if info.skip_flag: continue h.add_read(info) major_base = h.major_base() major_count = getattr(h, major_base) minor_count = h.total_count() - major_count major_freq = h.major_freq() if major_freq > args.max_major_freq: continue print (rname, p.pos + 1, p.pos - start, '.' if refseq is None else refseq[p.pos - start], p.n, major_base, major_count, minor_count, '{0:.3f}'.format(major_freq), '{0:.3f}'.format(1 - major_freq), h.N, h.A, h.T, h.C, h.G, h.D, h.ndel, h.nins, sep='\t')