我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用pysam.FastaFile()。
def __init__(self, fasta_filename, prediction_type, cellgroup_mapping=None): self._genome_handle = pysam.FastaFile(fasta_filename) self.prediction_type = prediction_type if cellgroup_mapping is not None: self.cellgroup_mapping = cellgroup_mapping self.C = len(self.cellgroup_mapping)
def check_fasta(fa_f, pysam_flag=True): if not os.path.isfile(fa_f + '.fai'): pysam.faidx(fa_f) if pysam_flag: # return pysam FastaFile object fa = pysam.FastaFile(fa_f) return fa else: # return fasta file path return fa_f
def iter_seqs(self, fasta_fname): genome = pysam.FastaFile(fasta_fname) return ( genome.fetch(contig, start, stop+1).upper() for contig, start, stop in self.iter_regions(flank_size=400) )
def __init__(self, bam:str, reference:str, columns:int=20000, cache:int=None, minor_gaps:bool=False): """Interface to samtools tview functionality. :param bam: path to bam file of reads aligned to a common reference. :param reference: path to fasta file of reference to which reads are aligned. :param columns: default number of tview columns to read in one go. :param cache: Currently not implemented. :param minor_gaps: encoded gaps in non-reference positions distinctly. """ self.logger = logging.getLogger('TView') self._bam = bam self._reference = reference self._columns = columns self._cache = cache self._minor_gaps = minor_gaps if self._minor_gaps: raise NotImplementedError self._columns_per_base = 3 # reasonable estimate self._column_pad = 1.1 # multiplier when _columns_per_base is updated # crosscheck the inputs with pysam.AlignmentFile(bam, 'rb') as b: brefs = b.references blens = b.lengths rrefs = set(pysam.FastaFile(reference).references) sbrefs = set(brefs) if not sbrefs.issuperset(rrefs): raise ValueError("input bam and reference do not appear consistent: {} vs {}.".format(sbrefs, rrefs)) self._refs = set(brefs) self._reflens = dict(zip(brefs, blens)) self._pileup = None # buffered `Pileup` object
def load_fasta(fa_path): """ A convenient wrapper function for constructing a :py:class:`pysam.FastaFile` Parameters ---------- fa_path : str Path to FASTA Returns ------- FASTA File Interface : :py:class:`pysam.FastaFile` """ return pysam.FastaFile(fa_path)
def get_chrom_length(self, chrom): """ Extract chromosome length from fasta. Parameters ---------- chrom : str The name of the chromosome or scaffold. Returns ------- length : int The length (integer) of the chromsome/scaffold Raises ------ RuntimeError If chromosome name not present in bam header """ fastafile = pysam.FastaFile(self.filepath) lengths = dict(zip(fastafile.references, fastafile.lengths)) try: lens = lengths[chrom] fastafile.close() return lens except: self.logger.error( "{} not present in {}. Exiting.".format( chrom, self.filepath)) logging.shutdown() raise RuntimeError( "Chromosome name not present in fasta. Exiting")
def chromosome_lengths(self): """ Returns ------- tuple Chromosome lengths ordered by sequence order in fasta """ fastafile = pysam.FastaFile(self.filepath) lengths = fastafile.lengths fastafile.close() # pysam's .lengths does not return a tuple (despite what is in the docs), # so, convert to tuple before returning. return tuple(lengths)
def chromosome_names(self): """ Returns ------- tuple Chromosome names ordered by sequence order in fasta """ fastafile = pysam.FastaFile(self.filepath) names = fastafile.references fastafile.close() # pysam's .lengths does not return a tuple (despite what is in the docs), # so, convert to tuple before returning. return tuple(names)
def __init__(self, filename): self._fh = FastaFile(filename)
def close(self): if self._fh: self._fh.close() self._fh = None subprocess.check_call([bgzip_exe, "--force", self._basepath]) os.rename(self._basepath + ".gz", self.filename) # open file with FastaFile to create indexes, then make all read-only _fh = FastaFile(self.filename) _fh.close() os.chmod(self.filename, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) os.chmod(self.filename + ".fai", stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) os.chmod(self.filename + ".gzi", stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) logger.info("{} written; added {} sequences".format(self.filename, len(self._added)))
def covplot(seqfiles, chrom, start, end, fastafile, out=None): ''' ''' if type(seqfiles) is not list: seqfiles = [seqfiles] chrom = str(chrom) with pysam.FastaFile(fastafile) as handle: reference = handle.fetch(start=start, end=end, region=chrom) ref = reference axis_offset = 75 cov_height = 200 height = len(seqfiles) * cov_height + (len(seqfiles) - 1 ) * 50 + axis_offset out_type, surface = fileformat(out, width=(end - start) * 10, height=height) context = cairo.Context(surface) plot_axis(context, start, end, axis_offset - 10) plot_grid(context, start, end, axis_offset, height) for seqfile in seqfiles: seq = pysam.AlignmentFile(seqfile, 'rb') coords = OrderedDict({0: -1e9}) reps = ( parse_read(x, coords, ref, start) for x in seq.fetch(chrom, start, end) ) plot_coverage(context, get_coverage(reps), axis_offset, start, (end - start) * 10, cov_height) axis_offset += cov_height if seqfiles.index(seqfile) < len(seqfiles) - 1: insert_spacer(context, coords, start, end) axis_offset += 50 context.save() if out_type == 'png': surface.write_to_png(out) elif out_type is None: return surface.write_to_png()
def __init__(self, fasta_file): self.f = pysam.FastaFile(fasta_file)
def bam_pileup_to_hdf(hdf, bams, ref_fasta, ref_name:str=None, start:int=0, end:int=None, truth_bam=None, chunk_len=50000, overlap=1000): """Create an .hdf file containing chunks of a pileup. :param hdf: output .hdf file (will be overwritten). :param bams: iterable of input .bam files. :param ref_fasta: input .fasta file. :param ref_name: name of reference within .bam to parse. :param start: start reference coordinate. :param end: end reference coordinate. If `None` the full extent of the reference will be parsed. :param truth_bam: .bam file of truth aligned to ref to generate labels. :param chunk_len: int, chunk length in reference coordinates. :param overlap: int, overlap of adjacent chunks in reference coordinates. ..note:: Datasets will be name as `{reference}_{start}_{end}`, indicating the reference coordinates spanned. """ if ref_name is None: # use all references refs = pysam.FastaFile(ref_fasta).references assert end == None assert start == 0 else: refs = [ref_name,] with h5py.File(hdf, 'w') as hdf: for ref in refs: if truth_bam is not None: gen = generate_training_chunks(truth_bam, bams, ref_fasta, ref_name=ref, start=start, end=end, chunk_len=chunk_len, overlap=overlap) else: gen = generate_pileup_chunks(bams, ref_fasta, ref_name=ref, start=start, end=end, chunk_len=chunk_len, overlap=overlap) logger.info("Processing reference {}.".format(ref)) for c in gen: pos = c.pileups[0].positions chunk_str = '{}_{}_{}'.format(ref, pos['major'][0], pos['major'][-1] + 1) if c.labels is not None: # save labels hdf['{}/labels'.format(chunk_str)] = np.char.encode(c.labels) if c.ref_seq is not None: # save ref hdf['{}/ref_seq'.format(chunk_str)] = np.char.encode(c.ref_seq) for bam_count, p in enumerate(c.pileups): assert p.ref_name == ref grp = '{}/datatype{}'.format(chunk_str, bam_count) hdf['{}/positions'.format(grp)] = p.positions hdf[grp].attrs['rname'] = p.ref_name hdf[grp].attrs['start'] = p.positions['major'][0] hdf[grp].attrs['end'] = p.positions['major'][-1] hdf[grp].attrs['bam'] = p.bam hdf['{}/data'.format(grp)] = p.reads
def seqplot(seqfiles, chrom, start, end, fastafile, out=None, by_strand=False): ''' the plotting function Args: seqfiles: list of paths to sequence files chrom: chromosome to fetch reads from start: start nucleotide of plotting window. end: end nucleotide of plotting window. fastafile: path to reference FASTA file. out: path to write image file to, or None to return bytes-encoded png by_strand: whether to shade reads by strand Returns: None, or if out is None, returns image plot as bytes-encoded png ''' if type(seqfiles) is not list: seqfiles = [seqfiles] chrom = str(chrom) with pysam.FastaFile(fastafile) as handle: reference = handle.fetch(start=start, end=end, region=chrom) ref = reference axis_offset = 75 height = get_height(seqfiles, chrom, start, end, axis_offset) out_type, surface = fileformat(out, width=(end - start) * 10, height=height) context = cairo.Context(surface) depths = [axis_offset] for seqfile in seqfiles: seq = pysam.AlignmentFile(seqfile, 'rb') coords = OrderedDict({max(depths): -1e9}) reps = ( parse_read(x, coords, ref, start) for x in seq.fetch(chrom, start, end) ) _plot(context, reps, start, end, axis_offset, height, reference, by_strand) reference = None # don't plot the reference in subsequent BAMs if seqfiles.index(seqfile) < len(seqfiles) - 1: insert_spacer(context, coords, start, end) depths.append(max(coords)) context.save() if out_type == 'png': surface.write_to_png(out) elif out_type is None: return surface.write_to_png()