我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用Bio.SeqIO.read()。
def seq(self): """Seq: Get the Seq object from the sequence file, metadata file, or in memory""" if self.sequence_file: log.debug('{}: reading sequence from sequence file {}'.format(self.id, self.sequence_path)) tmp_sr = SeqIO.read(self.sequence_path, 'fasta') return tmp_sr.seq elif self.metadata_file: log.debug('{}: reading sequence from metadata file {}'.format(self.id, self.metadata_path)) tmp_sr = SeqIO.read(self.metadata_path, 'uniprot-xml') return tmp_sr.seq else: if not self._seq: log.debug('{}: no sequence stored in memory'.format(self.id)) else: log.debug('{}: reading sequence from memory'.format(self.id)) return self._seq
def features(self): """list: Get the features from the feature file, metadata file, or in memory""" if self.feature_file: log.debug('{}: reading features from feature file {}'.format(self.id, self.feature_path)) with open(self.feature_path) as handle: feats = list(GFF.parse(handle)) if len(feats) > 1: log.warning('Too many sequences in GFF') else: return feats[0].features elif self.metadata_file: log.debug('{}: reading features from metadata file {}'.format(self.id, self.metadata_path)) tmp_sr = SeqIO.read(self.metadata_path, 'uniprot-xml') return tmp_sr.features else: return self._features
def fasta_files_equal(seq_file1, seq_file2): """Check equality of a FASTA file to another FASTA file Args: seq_file1: Path to a FASTA file seq_file2: Path to another FASTA file Returns: bool: If the sequences are the same """ # Load already set representative sequence seq1 = SeqIO.read(open(seq_file1), 'fasta') # Load kegg sequence seq2 = SeqIO.read(open(seq_file2), 'fasta') # Test equality if str(seq1.seq) == str(seq2.seq): return True else: return False
def seq(self): """Seq: Dynamically loaded Seq object from the sequence file""" if self.sequence_file: file_to_load = copy(self.sequence_path) log.debug('{}: reading sequence from sequence file {}'.format(self.id, file_to_load)) tmp_sr = SeqIO.read(file_to_load, 'fasta') return tmp_sr.seq else: if not self._seq: log.debug('{}: no sequence stored in memory'.format(self.id)) else: log.debug('{}: reading sequence from memory'.format(self.id)) return self._seq
def download_mibig(outputdir, version='1.3'): """ Download and extract MIBiG files into outputdir """ assert version in ['1.0', '1.1', '1.2', '1.3'], \ "Invalid version of MIBiG" server = 'http://mibig.secondarymetabolites.org' filename = "mibig_gbk_%s.tar.gz" % version url = urlparse.urljoin(server, filename) with tempfile.NamedTemporaryFile(delete=True) as f: u = urllib2.urlopen(url) f.write(u.read()) f.file.flush() tar = tarfile.open(f.name) tar.extractall(path=outputdir) tar.close() # MIBiG was packed with strange files ._*gbk. Let's remove it for f in [f for f in os.listdir(outputdir) if f[:2] == '._']: os.remove(os.path.join(outputdir, f)) #def gbk2tablegen(gb_file, strain_id=None): #def cds_from_gbk(gb_file, strain_id=None):
def get_hits(filename, criteria='cum_BLAST_score'): """ Reproduces original Tiago's code: table_1_extender.py In the future allow different criteria. Right now it takes from the very first block, which has the highest Cumulative BLAST. """ with open(filename) as f: df = antiSMASH_to_dataFrame(f.read()) df.dropna(subset=['query_gene'], inplace=True) df.sort_values(by=criteria, ascending=False, na_position='last', inplace=True) return df.groupby('query_gene', as_index=False).first()
def show_weights(sample_ids): # sample_weights = [SeqIO.read('tests/bam/test_cns/' + id + '.bam.cns.fa', 'fasta') for id in sample_ids] # print(sample_weights) alignments = [] for sample_id in sample_ids: alignments.append(kindel.parse_alignment('tests/bam/' + sample_id + '.bam')) for id, alignment in zip(sample_ids, alignments): print(id) for i, w in enumerate(alignment.weights): coverage = sum({nt:w[nt] for nt in list('ACGT')}.values()) consensus = kindel.consensus(w) print(i+1, coverage, consensus[0], consensus[1], consensus[2], str(alignment.clip_starts[i]) + 'clip' + str(alignment.clip_ends[i]), 'TIE' if consensus[3] else '', 'DIVERGENT' if consensus[2] and consensus[2] <= 0.75 else '', w)
def translate_record(self, record, grecord_class=None): """Create a new GraphicRecord from a BioPython Record object. Parameters ---------- record A BioPython Record object or the path to a Genbank file. grecord_class The graphic record class to use, e.g. GraphicRecord (default) or CircularGraphicRecord. """ if isinstance(record, str): record = SeqIO.read(record, "genbank") if grecord_class is None: grecord_class = GraphicRecord return grecord_class(sequence_length=len(record.seq), features=[ self.translate_feature(feature) for feature in self.compute_filtered_features(record.features) if feature.location is not None ], **self.graphic_record_parameters)
def sbol_to_list(self): """ Take an sbol file and return a linear list of components """ # Read in SBOL file # Look for component def with components # Build a list of SBOL componetns from this # If multiple do something fancy? elements = [] self.document.read(self.file_data) for c in self.document.list_components(): if len(c.components) > 0: comp_elems = [] for cl in self.document.get_components(c.identity): role_uri = cl.definition.roles[0] comp_elems.append({'name': cl.display_id, 'role': self.INVERT_ROLES[role_uri].replace(' ', '-')}) elements.append(comp_elems) return elements
def metadata_path(self, m_path): """Provide pointers to the paths of the metadata file Args: m_path: Path to metadata file """ if not m_path: self.metadata_dir = None self.metadata_file = None else: if not op.exists(m_path): raise OSError('{}: file does not exist!'.format(m_path)) if not op.dirname(m_path): self.metadata_dir = '.' else: self.metadata_dir = op.dirname(m_path) self.metadata_file = op.basename(m_path) # Parse the metadata file using Biopython's built in SeqRecord parser # Just updating IDs and stuff tmp_sr = SeqIO.read(self.metadata_path, 'uniprot-xml') parsed = parse_uniprot_xml_metadata(tmp_sr) self.update(parsed, overwrite=True)
def seq_record_loaded_from_file_example(fasta_path): """Original SeqRecord loaded from sequence file""" return SeqIO.read(fasta_path, "fasta")
def seq_record_loaded_from_file_example(sequence_path): """Original SeqRecord loaded from sequence file""" return SeqIO.read(sequence_path, "fasta")
def sequence_path(self, fasta_path): """Provide pointers to the paths of the FASTA file Args: fasta_path: Path to FASTA file """ if not fasta_path: self.sequence_dir = None self.sequence_file = None else: if not op.exists(fasta_path): raise OSError('{}: file does not exist'.format(fasta_path)) if not op.dirname(fasta_path): self.sequence_dir = '.' else: self.sequence_dir = op.dirname(fasta_path) self.sequence_file = op.basename(fasta_path) tmp_sr = SeqIO.read(fasta_path, 'fasta') if self.name == '<unknown name>': self.name = tmp_sr.name if self.description == '<unknown description>': self.description = tmp_sr.description if not self.dbxrefs: self.dbxrefs = tmp_sr.dbxrefs if not self.features: self.features = tmp_sr.features if not self.annotations: self.annotations = tmp_sr.annotations if not self.letter_annotations: self.letter_annotations = tmp_sr.letter_annotations
def load(self, filename): self.data = {} with open(filename, 'r') as f: parsed = parse_antiSMASH(f.read()) for v in parsed: self.data[v] = parsed[v]
def efetch_hit(term, seq_start, seq_stop): """ Fetch the relevant part of a hit """ db = "nucleotide" maxtry = 3 ntry = -1 downloaded = False while ~downloaded and (ntry <= maxtry): ntry += 1 try: handle = Entrez.esearch(db=db, term=term) record = Entrez.read(handle) assert len(record['IdList']) == 1, \ "Sorry, I'm not ready to handle more than one record" handle = Entrez.efetch(db=db, rettype="gb", retmode="text", id=record['IdList'][0], seq_start=seq_start, seq_stop=seq_stop) content = handle.read() downloaded = True except: nap = ntry*3 print "Fail to download (term). I'll take a nap of %s seconds ", \ " and try again." time.sleep(ntry*3) return content
def cds_from_gbk(gb_file): gb_record = SeqIO.read(open(gb_file,"rU"), "genbank") #if strain_id is not None: # gb_record.id = strain_id output = pd.DataFrame() sign = lambda x: '+' if x > 0 else '-' for feature in gb_record.features: if feature.type == "CDS": tmp = {} tmp = {'BGC': gb_record.id, 'locus_tag': feature.qualifiers['locus_tag'][0], 'start': feature.location.start.position, 'stop': feature.location.end.position, 'strand': sign(feature.location.strand) } if 'note' in feature.qualifiers: for note in feature.qualifiers['note']: product = re.search( r"""smCOG: \s (?P<product>.*?) \s+ \(Score: \s* (?P<score>.*); \s* E-value: \s (?P<e_value>.*?)\);""", note, re.VERBOSE) if product is not None: product = product.groupdict() product['score'] = float(product['score']) product['e_value'] = float(product['e_value']) for p in product: tmp[p] = product[p] output = output.append(pd.Series(tmp), ignore_index=True) return output
def test_plot_with_gc_content(tmpdir): fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(8, 4), sharex=True) # Parse the genbank file, plot annotations record = SeqIO.read(example_genbank, "genbank") graphic_record = BiopythonTranslator().translate_record(record) ax, levels = graphic_record.plot() graphic_record.plot(ax=ax1, with_ruler=False) # Plot the local GC content def plot_local_gc_content(record, window_size, ax): def gc_content(seq): return 100.0*len([c for c in seq if c in "GC"]) / len(seq) yy = [gc_content(record.seq[i:i+window_size]) for i in range(len(record.seq)-window_size)] xx = np.arange(len(record.seq)-window_size)+25 ax.fill_between(xx, yy, alpha=0.3) ax.set_ylabel("GC(%)") plot_local_gc_content(record, window_size=50, ax=ax2) # Resize the figure to the right height target_file = os.path.join(str(tmpdir), "with_plot.png") fig.tight_layout() fig.savefig(target_file)
def make_sbol_xml(self): """ Take an SBOL definition and turn into RDF/XML """ xml_file = BytesIO() self.document.write(xml_file) xml_file.seek(0) return xml_file.read()
def get_sbol_from_xml(self, source_data): """ Read in SBOL XML from source data and set document """ filename = '/tmp/' + str(uuid.uuid4()) + '.xml' with open(filename, 'w+') as sf: sf.write(self.file_data.read()) self.document.read(filename) os.remove(filename)
def parse_gb(self): """ Take a genbank file and parse to items/SBOL """ items = [] elements = OrderedDict() sbol = None try: record = SeqIO.read(self.file_data, 'genbank') features = record.features # sorted(record.features, key=attrgetter('location.start')) for feat in features: # The file sometimes has lowercase and sometimes uppercase # types so normalise to lowercase. if feat.type.lower() in self.GENBANK_FEATURE_TYPES: name = '' # Look for the label key. Other keys can be set but # most software simply sets the label key and nothing # else. for key, value in feat.qualifiers.items(): if key == 'label': name = value[0] if name: feature_type = feat.type.lower() if feature_type == 'rbs': feature_type = 'ribosome entry site' elif feature_type == 'primer_bind': feature_type = 'primer binding site' seq = str(feat.extract(record.seq)) elements[name] = self.genbank_to_sbol_component(name, seq, feature_type) item = self.get_inventory_item(name) if item: items.append(item) except Exception as e: print(e) pass else: if len(elements.values()) > 0: self.make_sbol_construct(list(elements.values())) sbol = self.make_sbol_xml() return items, sbol
def need_to_update_release(): ftp = FTP('ftp.ncbi.nlm.nih.gov') ftp.login() ftp.cwd('genbank') """ Check whether the user currently has the latest GB release (in which case we only need to download the daily updates) or whether they have an old release or no release, in which case we need to download everything) """ try: current_release_number = int(open('GB_Release_Number').read()) logging.info( 'You currently have GB release number {}' .format(current_release_number) ) except IOError: logging.info('No current release, downloading the files') return True latest_release_file = StringIO() ftp.retrlines('RETR GB_Release_Number', latest_release_file.write) latest_release_number = int(latest_release_file.getvalue()) logging.info('Latest release number is {}'.format(latest_release_number)) if current_release_number == latest_release_number: logging.info('You have the latest release, getting daily updates') return False else: logging.info('New release available, downloading the files') return True
def delimited(file, delimiter='\n', bufsize=4096): buf = '' while True: newbuf = file.read(bufsize) if not newbuf: yield buf return buf += newbuf lines = buf.split(delimiter) for line in lines[:-1]: yield line buf = lines[-1]
def process_record(record): if 'gaattc' in record.lower(): # quick check, might be false positive real_record = SeqIO.read(StringIO(record), format='gb') if 'gaattc' in str(real_record.seq).lower(): output_file.write(record)
def read_fasta_file(fasta): # read it in # assume there is only one try: fasta_record = SeqIO.read(open(fasta), "fasta") except ValueError: log.error("Fasta file %s has more than one sequence -- Exiting", fasta) raise return fasta_record
def optparser(): print("Parse command line options") # Usage and version strings program_name = "pyssw" program_version = 0.1 version_string = "{}\t{}".format(program_name, program_version) usage_string = "{}.py -s subject.fasta -q fastq (or fasta) [Facultative options]".format(program_name) optparser = optparse.OptionParser(usage = usage_string, version = version_string) # Define optparser options hstr = "Path of the fasta file containing the subject genome sequence. Can be gziped. [REQUIRED] " optparser.add_option( '-s', '--subject', dest="subject", help=hstr) hstr = "Path of the fastq or fasta file containing the short read to be aligned. Can be gziped. [REQUIRED]" optparser.add_option( '-q', '--query', dest="query", help=hstr) hstr = "Type of the query file = fastq or fasta. [default: fastq]" optparser.add_option( '-t', '--qtype', dest="qtype", default="fastq", help=hstr) hstr = "Positive integer for weight match in genome sequence alignment. [default: 2]" optparser.add_option( '-m', '--match', dest="match",default=2, help=hstr) hstr = "Positive integer. The negative value will be used as weight mismatch in genome sequence alignment. [default: 2]" optparser.add_option( '-x', '--mismatch', dest="mismatch", default=2, help=hstr) hstr = "Positive integer. The negative value will be used as weight for the gap opening. [default: 3]" optparser.add_option( '-o', '--gap_open', dest="gap_open", default=3, help=hstr) hstr = "Positive integer. The negative value will be used as weight for the gap opening. [default: 1]" optparser.add_option( '-e', '--gap_extend', dest="gap_extend", default=1, help=hstr) hstr = "Integer. Consider alignments having a score <= as not aligned. [default: 0]" optparser.add_option( '-f', '--min_score', dest="min_score", default=0, help=hstr) hstr = "Integer. Consider alignments having a length <= as not aligned. [default: 0]" optparser.add_option( '-l', '--min_len', dest="min_len", default=0, help=hstr) hstr = "Flag. Align query in forward and reverse orientation and choose the best alignment. [Set by default]" optparser.add_option( '-r', '--reverse', dest="reverse", action="store_true", default=True, help=hstr) hstr = "Flag. Write unaligned reads in sam output [Unset by default]" optparser.add_option( '-u', '--unaligned', dest="unaligned", action="store_true", default=False, help=hstr) # Parse arg and return a dictionnary_like object of options opt, args = optparser.parse_args() if not opt.subject: print ("\nERROR: a subject fasta file has to be provided (-s option)\n") optparser.print_help() sys.exit() if not opt.query: print ("\nERROR: a query fasta or fastq file has to be provided (-q option)\n") optparser.print_help() sys.exit() return opt #~~~~~~~TOP LEVEL INSTRUCTIONS~~~~~~~#
def load_reference(self, path, fmts, metadata, include=2, genes=False): """Assume it's genbank.""" try: self.reference = SeqIO.read(path, 'genbank') except Exception as e: self.log.fatal("Problem reading reference {}. Error: {}".format(path, e)) ## some checks try: assert("strain" in metadata) if include > 0: assert("date" in metadata) except AssertionError as e: self.log.fatal("Poorly defined reference. Error:".format(e)) if genes: # we used to make these FeatureLocation objects here, but that won't go to JSON # so just do it in the Process part instead. For reference: # FeatureLocation(start=f.location.start, end=f.location.end, strand=1) self.reference.genes = { sequence_set.get_gene_name(f.qualifiers['gene'][0], genes): {"start": int(f.location.start), "end": int(f.location.end), "strand": 1} for f in self.reference.features if 'gene' in f.qualifiers and f.qualifiers['gene'][0] in genes } else: self.reference.genes = {} # use the supplied metadata dict to define attributes seq_attr_keys = self.seqs.values()[0].attributes.keys() self.reference.attributes = {k:fix_names(v) for k,v in metadata.items() if k in seq_attr_keys} self.reference.name = self.reference.attributes["strain"] self.reference.id = self.reference.attributes["strain"] # is there any possibility that the reference will be added to the sequences? self.reference.include = include; # flag {0,1,2} if self.reference.name in self.seqs: self.log.notify("Segment {} reference already in dataset".format(self.segmentName)) if include == 0: self.log.notify("Removing reference from pool of sequences to analyse") del self.seqs[self.reference.name] elif include > 0: ## add to sequences (tidy up attributes first) self._parse_date_per_seq(self.reference, fmts) self.seqs[self.reference.name] = self.reference missing_attrs = set(seq_attr_keys) - set(self.reference.attributes.keys()) - set(["date", "num_date"]) if len(missing_attrs) > 0: self.log.notify("Including reference in segment {} but the following attributes are missing: {}".format(self.segmentName, " & ".join(missing_attrs)))