我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用Bio.SeqIO.parse()。
def parse_specificity(feature, params): '''parse the feature's specificity entries''' if 'specificity' in feature.qualifiers: for spec in feature.qualifiers['specificity']: if spec.startswith('KR activity: '): params['kr_activity'] = False if spec.endswith('inactive') else True continue if spec.startswith('KR stereochemistry: '): params['kr_stereochemistry'] = spec.split(':')[-1].strip() continue if spec.startswith('NRPSpredictor2 SVM: '): params['nrps_predictor'] = spec.split(':')[-1].strip() continue if spec.startswith('Stachelhaus code: '): params['stachelhaus'] = spec.split(':')[-1].strip() continue if spec.startswith('Minowa: '): params['minowa'] = spec.split(':')[-1].strip() continue if spec.startswith('PKS signature: '): params['pks_signature'] = spec.split(':')[-1].strip() continue if spec.startswith('consensus: '): params['consensus'] = spec.split(':')[-1].strip() continue
def parse_fasta(self): self.ref_id=dict() self.ref_inf=dict() i=1 N = 0 ref_inf=np.empty(shape=[0,3]) for seqs in SeqIO.parse(self.ref,'fasta'): seq_id = seqs.id self.ref_id[i] = seq_id seq = str(seqs.seq.upper()) seq_len = len(seq) self.ref_inf[seq_id]=seq_len N+=seq.count('N') ref_inf = np.append(ref_inf,[[i,seq_id,seq_len]],axis=0) i+=1 self.ref_detail = pd.DataFrame(ref_inf,columns=['Index','Contig','Length(bp)']) self.N = N
def printShortenedFASTA(fixed,locus): """ Prints out a only genomic FASTA sequences with ID, but only -200 bases before exon 2 """ for seq_record in SeqIO.parse(fixed,"imgt"): if seq_record.description.startswith(locus) and len(seq_record.seq) > 1: # the new exon record we are extracting newSeq = "" extracted = False # we will not be able to extract all parts as many sequences has missing introns for f in seq_record.features: # we hope the features are in order if f.type == "intron" and f.qualifiers['number'] == ['1']: newSeq = seq_record.seq[f.location.end-200:] extracted = True break # we have the whole sequence now if extracted: sys.stdout.write( SeqRecord(newSeq, id=seq_record.id, description=seq_record.description).format("fasta") )
def getUniqueRedundSets(fileName,speciesName): '''Run through genbank file fileName, get set of unique genes (with no redundancies), and set with redundancies.''' f = open(fileName, 'rU') geneL=[] for record in SeqIO.parse(f, "genbank"): # iterate through the genes on the chromosome for feature in record.features: # choose only the features that are protein coding genes if feature.type == "CDS" and 'protein_id' in feature.qualifiers: geneName = speciesName + '-' + feature.qualifiers['protein_id'][0] geneL.append(geneName) f.close() # now figure out which ones are unique uniqueS = set() redundS = set() for gene in geneL: if geneL.count(gene)>1: redundS.add(gene) else: uniqueS.add(gene) return uniqueS,redundS
def dump_cluster(c): if os.path.exists("%s_dist_%s_aligned_short.fasta-cluster%d.png" % (prefix, dist, c)): print """ ## Subcluster %d """ % (c) if c == 0: print "Cluster 0 represents isolates that do not cluster with any other isolates within the distance cut-off, i.e. singleton sequences. The sequences presented are unrelated." print """ ![Subcluster %d](%s_dist_%s_aligned_short.fasta-cluster%d.pdf.png) """ % (c, prefix, dist, c) else: print """ ## Subcluster %s (Tree not shown for clusters with <5 isolates) Isolates: """ % (c) for rec in SeqIO.parse(open("%s_dist_%s_aligned_short.fasta-cluster%d" % (prefix, dist, c)), "fasta"): print " - %s" % (rec.id)
def parse_barcodes(barcode_file): #print "parsing barcodes" barcode_list = list() barcode_list.append("uncalssified") barcode_dict = dict() barcode_sequences = SeqIO.parse(open(barcode_file),'fasta') for barcode in barcode_sequences: name, sequence = barcode.id, str(barcode.seq) barcode_dict[name]=sequence barcode_list.append(name) barcode_dict[name+"_rev"]=str(barcode.reverse_complement().seq) #print barcode_list #for barcode in barcode_dict: # print barcode, barcode_dict[barcode] #sys.exit() return barcode_dict,barcode_list
def filter_using_summary(fq, args): """Use quality scores from albacore summary file for filtering Use the summary file from albacore for more accurate quality estimate Get the dataframe from nanoget, convert to dictionary """ data = {entry[0]: entry[1] for entry in process_summary( summaryfile=args.summary, threads="NA", readtype=args.readtype, barcoded=False)[ ["readIDs", "quals"]].itertuples(index=False)} try: for record in SeqIO.parse(fq, "fastq"): if data[record.id] > args.quality and len(record) > args.length: print(record[args.headcrop:args.tailcrop].format("fastq"), end="") except KeyError: logging.error("mismatch between summary and fastq: \ {} was not found in the summary file.".format(record.id)) sys.exit('\nERROR: mismatch between sequencing_summary and fastq file: \ {} was not found in the summary file.\nQuitting.'.format(record.id))
def filter_fastq(input_file,output_file,downweight_number,ctot,gtoa): """ Takes a Fastq file as input and weights the quality of the bases down at the start and the end of the reads. """ in_iterator = SeqIO.parse(input_file,'fastq') input_records=list(in_iterator) for i, record in enumerate(input_records): change_bases_c = None change_bases_t = None temp_qual = record.letter_annotations['phred_quality'] if(ctot): change_bases_c = [check_c_2_t(nuc) and i < downweight_number for i, nuc in enumerate(record.seq)] if(gtoa): change_bases_t = [check_g_2_a(nuc) and (len(record.seq)-i) <= downweight_number for i, nuc in enumerate(record.seq)] new_qual =downweight_quality(temp_qual, change_bases_c ,change_bases_t) input_records[i].letter_annotations['phred_quality']=new_qual handle = file(output_file,'wt') SeqIO.write(input_records, handle, "fastq")
def match_both_pairs_filter(self): self.logger.warning("Extracting read names from FASTQ file where both reads must match") regex = re.compile(r'/[12]$') base_read_names = {} with open(self.fastq_file, "r") as fastq_file_input: for record in SeqIO.parse(fastq_file_input, "fastq"): base_read_name = regex.sub('', record.id) if base_read_name in base_read_names: base_read_names[base_read_name] = record.id else: base_read_names[base_read_name] = 1 with open(self.output_readnames_file, "w") as readnames_output: for base_name, read_name in base_read_names.items(): if read_name== 1: continue readnames_output.write(read_name + '\n')
def remove_small_large_contigs(self,input_file, output_file): with open(input_file, "r") as spades_input_file, open(output_file, "w") as spades_output_file: sequences = [] for record in SeqIO.parse(spades_input_file, "fasta"): if self.min_spades_contig_coverage != 0 and self.sequence_coverage(record.id) < self.min_spades_contig_coverage: self.logger.warning("Excluding contig with low coverage of "+ str(self.sequence_coverage(record.id))+ " from "+ self.spades_assembly_file()) continue if self.max_spades_contig_coverage != 0 and self.sequence_coverage(record.id) > self.max_spades_contig_coverage: self.logger.warning("Excluding contig with high coverage of "+ str(self.sequence_coverage(record.id))+ " from "+ self.spades_assembly_file()) continue if len(record.seq) > self.minimum_length: sequences.append(record) else: self.logger.warning("Excluding contig of length "+ str(len(record.seq))+ " from "+ self.spades_assembly_file() ) SeqIO.write(sequences, spades_output_file, "fasta")
def read_id2idx(reads_fn): if reads_fn[-1] == 'q': fmt = 'fastq' else: fmt = 'fasta' reads_idx={} reads_fh = open(reads_fn, "rU") num_read=0 for record in SeqIO.parse(reads_fh, fmt): reads_idx[record.id] = num_read num_read += 1 reads_fh.close() return reads_idx
def writeReadsFileSE(mappedReadsDict, outReads, fastq): if fastq.endswith('.gz'): subprocess.call(['gunzip', fastq]) newFq = fastq.replace('.gz','') else: newFq = fastq out = open(outReads, 'w') fqF = open(newFq, 'rU') for record in SeqIO.parse(fqF, 'fastq'): if record.id in mappedReadsDict: newRec = SeqRecord(record.seq, id = record.id, description = '') SeqIO.write(newRec,out,'fasta') out.close() fqF.close() if fastq.endswith('.gz'): subprocess.call(['gzip',newFq])
def writeFailedReconstructions(outF, chain, writtenArr, output, idNameDict, fastaDict): recF = output + '.reconstructed.junctions.' + chain + '.fa' if os.path.isfile(recF): f = open(recF, 'rU') segDict = dict() for tcrRecord in SeqIO.parse(f, 'fasta'): tcrSeq = str(tcrRecord.seq) if tcrSeq.find('NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN') != -1: status = 'Failed reconstruction - reached maximum number of iterations' segDict = addSegmentsToDict(segDict, status, writtenArr, tcrRecord, idNameDict, fastaDict) elif tcrSeq.find('NNNN') != -1: status = 'Failed reconstruction - V and J segment do not overlap' segDict = addSegmentsToDict(segDict, status, writtenArr, tcrRecord, idNameDict, fastaDict) f.close() if len(segDict) > 0: writeSegDict(segDict, outF, chain)
def _read_reference(self, reference): """Read in the reference from the file into a dictionary. Parameters ---------- reference: str Path to the reference. Returns ------- reference: dict Dictionary with chromosome - sequence mapping. """ if reference is None: self._reference = None else: self._reference = {} with open(reference) as reference_fp: for record in SeqIO.parse(reference_fp, "fasta"): self._reference[record.id] = list(record.seq)
def from_orfs_files(cls, seq_path, paths_path, graph_path): """ Create object from graph, ORFs files :param seq_path: path to .fasta file with ORFs sequences :param paths_path: path to .path file with ORF's paths :param graph_path: path to graph :return: LinkageCluster """ from Bio import SeqIO orfs = collections.OrderedDict() with open(seq_path) as seq_file, open(paths_path) as path_file: paths_dict = {} orfid = '' for ind, line in enumerate(path_file.read().split('\n')): if ind % 2 == 0: try: orfid = re.findall(r'(ORF_\d+)', line)[0] except: continue else: paths_dict[orfid] = [int(re.findall(r'^(\d+),', line)[0])] + re.findall('(\d+)([+-]),', line) +\ [int(re.findall(r',(\d+)$', line)[0])] for rec in SeqIO.parse(seq_file, 'fasta'): orfs[str(rec.seq)] = (paths_dict[rec.id], rec.id) return cls(GFAgraph().load_graph(graph_path), orfs)
def single_fasta(ref, wd): """ From a fasta file make single files with each sequence :param ref: :param wd: :return: """ wd_split = wd + '/split/' logistic.check_create_dir(wd_split) fastaFile = open(ref, 'r') single_fasta_list = [] for record in SeqIO.parse(fastaFile, "fasta"): fasta_name = wd_split + '/' + record.id + '.fasta' single_fasta_list.append(fasta_name) output_handle = open(fasta_name, "w") SeqIO.write(record, output_handle, "fasta") output_handle.close() return single_fasta_list
def mga_summary(orf_fp, contigs_fp, sample_id): """Summarizes results from MetaGene Annotator.""" genes = [] gene_no = 0 contigs = SeqIO.parse(contigs_fp, 'fasta') contig_seqs = {r.description: r.seq for r in contigs} annotations = MetaGeneAnnotation.parse(open(orf_fp)) for anno in annotations: contig_seq = contig_seqs[anno.id] # Gather putative genes and create SeqRecords for them for pgene in anno.genes: gene_seq = contig_seq[pgene.start:pgene.end] if pgene.strand == '-': gene_seq = gene_seq.reverse_complement() gene = SeqRecord( gene_seq, id=anno.id, description="{}:{}".format(sample_id, gene_no)) genes.append(gene) gene_no += 1 return genes
def FastaToFDB(self, fastafile, username): fdb_registers = [] content = open(fastafile, "r") sequences = parse(content, 'fasta') for sequence in sequences: fdb_register = FDBRegister() fdb_register.description = sequence.description fdb_register.gene = str(sequence.seq) fdb_register.geneinfo = sequence.annotations fdb_register.filename = fastafile fdb_register.date = date.today() fdb_register.user = username fdb_registers.append(fdb_register) content.close() return self.mount_fdb_file(fdb_registers)
def is_sequence_file(self, file_name, file_format, show_error=True): """ Try to open a sequence file using Biopython. Returns 'True' if the file is a valid fasta file. """ valid_file = False file_handler = None try: file_handler = open(file_name,"r") r = list(SeqIO.parse(file_handler, file_format)) if len(r) > 0: valid_file = True else: valid_file = False except: valid_file = False if file_handler != None: file_handler.close() if not valid_file: title = "File Type Error" message = "The selected File is not a valid %s." % (pmdt.supported_sequence_file_types[file_format]) if show_error: self.show_error_message(title,message) return valid_file
def get_psi_blast_record(self,result_handle): """ Convert it to a list because when a using .parse(), Biopython returns a generator. """ records = list(NCBIXML.parse(result_handle)) return records[0] ############################################################################################### # ALIGNMENT BUILDING. # ############################################################################################### ################################################################# # Step 1/4 for performing an alignment from the main menu. # # Methods to launch an alignment program and check if it can be # # used (for example, if it is installed on the user's machine). # #################################################################
def get_fasta_in_kraken_format(outfile_fasta='sequences.fa'): output=open(outfile_fasta,'w') for file_name in os.listdir(cwd): if file_name.endswith('.gbff'): records = SeqIO.parse(file_name, "genbank") for seq_record in records: seq_id=seq_record.id seq=seq_record.seq for feature in seq_record.features: if 'source' in feature.type: print(feature.qualifiers) taxid=''.join(feature.qualifiers['db_xref']) taxid=re.sub(r'.*taxon:','kraken:taxid|',taxid) print(''.join(taxid)) outseq=">"+seq_id+"|"+taxid+"\n"+str(seq)+"\n" output.write(outseq) os.remove(file_name) output.close() return
def read(self,input_file,trim=None): # load file to a parser fasta_sequences = SeqIO.parse(open(input_file),'fasta') if(trim==None): # if there is nothing to trim from # parse sequences as they are # by storing name and sequence as # string for fasta in fasta_sequences: name, sequence = fasta.id, str(fasta.seq) self._sequence_dictionary[name] = sequence else: # if there is sothing to trim from # parse sequences as they are but when holding sequence name # as dictionary key trim as suggested for fasta in fasta_sequences: name, sequence = fasta.id, str(fasta.seq) self._sequence_dictionary[name.strip(str(trim))] = sequence # method that returns the sequence dictionary
def run_cnvnator(input_file,output_file): root = output_file[:-3] + 'root' # 1 cnv_extract_bam(input_file,root,others) # 2 chr_path = file_path + '/cnv/scaffold' path = chr_path if not os.path.exists(path): os.mkdir(path) for record in SeqIO.parse(ref_fa,'fasta'): SeqIO.write(record,path+'/'+record.id+'.fa','fasta') cnv_generate_hist(root,chr_path,bin_win,others) # 3 cnv_statistics(root,bin_win,others) # 4 cnv_partitioning(root,bin_win,others) # 5 cnv_call(root,output_file,bin_win,others)
def count_records(input_object, format='fasta'): """Count SeqRecord objects from a file in the specified format. :param input_object: A file object or a file name. :param format: Input format (fasta by default). :returns: Number of records in input file. :rtype: int """ handle = input_object if type(handle) == str: handle = open(handle, "rU") counter = 0 for _ in SeqIO.parse(handle, format): counter += 1 return counter
def get_fasta_seq_dictonary(fa_file): #returns fasta files dictonary for length and gc content dict_fa = {} for seq_record in SeqIO.parse(fa_file, "fasta"): fa_id = seq_record.id faseq = seq_record.seq gc_count = GC(faseq) seq_len = len(faseq) #calculate gc content distribution to nearest 10 gc_content_decimal_distribution = math.floor(gc_count / 10) * 10 #10-bin window #gc_content_decimal_distribution = gc_count/seq_len dict_fa[fa_id] = [faseq, seq_len, gc_content_decimal_distribution] return dict_fa
def get_Short(genesList): for gene in genesList: # gene = gene.rstrip('\n') pathtoDir = os.path.join(os.path.dirname(gene), "short") if not os.path.exists(pathtoDir): os.makedirs(pathtoDir) shortgene = os.path.join(os.path.dirname(gene), "short", os.path.basename(gene)) shortgene = shortgene.replace(".fasta", "_short.fasta") #gene_fp2 = HTSeq.FastaReader(gene) for allele in SeqIO.parse(gene, "fasta", generic_dna): fG = open(shortgene, 'w') fG.write('>' + str(allele.id) + '\n' + str(allele.seq) + '\n') fG.close() break return True
def check_if_list_or_folder(folder_or_list): list_files = [] # check if given a list of genomes paths or a folder to create schema try: f = open(folder_or_list, 'r') f.close() list_files = folder_or_list except IOError: for gene in os.listdir(folder_or_list): try: genepath = os.path.join(folder_or_list, gene) for allele in SeqIO.parse(genepath, "fasta", generic_dna): break list_files.append(os.path.abspath(genepath)) except Exception as e: print e pass return list_files # ================================================ MAIN ================================================ #
def calculateN_50(input_file,ratio): lengths=[] sums=0 for seq_record in SeqIO.parse(input_file, "fasta"): lengths.append(len(seq_record.seq)) lengths=sorted(lengths, reverse=True) contigsMore1000=0 totalLengthMore1000=0 for elem in lengths: if elem >=10000: contigsMore1000+=1 totalLengthMore1000+=int(elem) N_half=sum(lengths)*ratio for i in range(0, len(lengths)): sums=sums+lengths[i] if sums >= N_half: return lengths[i],len(lengths),N_half/ratio,contigsMore1000,totalLengthMore1000
def check_if_list_or_folder(folder_or_list): list_files = [] # check if given a list of genomes paths or a folder to create schema try: f = open(folder_or_list, 'r') f.close() list_files = folder_or_list except IOError: for gene in os.listdir(folder_or_list): try: genepath = os.path.join(folder_or_list, gene) for allele in SeqIO.parse(genepath, "fasta", generic_dna): break list_files.append(os.path.abspath(genepath)) except Exception as e: print e pass return list_files
def get_Short (gene): newFasta='' for allele in SeqIO.parse(gene, "fasta", generic_dna): try: translatedSequence,sequence=translateSeq(allele.seq) newFasta+=">"+allele.id+"\n"+str(sequence)+"\n" except Exception as e: print e pass with open(gene, "wb") as f: f.write(newFasta) return True
def resolve_config(config, workdir=None): """ Parameters ---------- config : str, dict If str, assume it's a YAML file and parse it; otherwise pass through workdir : str Optional location to specify relative location of all paths in `config` """ if isinstance(config, str): config = yaml.load(open(config)) def rel(pth): if workdir is None or os.path.isabs(pth): return pth return os.path.join(workdir, pth) for key in PATH_KEYS: if key in config: config[key] = rel(config[key]) return config
def fastq_info(path): """ Found some info about how to ignore warnings in code blocks here: - http://stackoverflow.com/questions/14463277/how-to-disable-python-warnings """ numBases = 0 numReads = 0 readLengths = Counter() GCTot = 0 with warnings.catch_warnings(): warnings.simplefilter("ignore") handle = gzip.open(path, "rt") for record in SeqIO.parse(handle, "fastq"): numBases += len(record) numReads += 1 readLengths[len(record)] += 1 GCTot += sum(record.seq.count(x) for x in ['G', 'C', 'g', 'c', 'S', 's']) handle.close() GCPer = (GCTot/numBases) avgReadLen = (sum(value*count for value,count in readLengths.items())/numReads) return {"numBases": numBases, "numReads": numReads, "numGCBases": GCTot, "portionGC": GCPer, "avgReadLen": avgReadLen}
def parse_gene_id_and_name(): """return a mapping of genes ids to gene names/description as dictionary deliberately only looking at protein coding genes """ gene_id_to_name = dict() # modelled after http://genome.crg.es/~lpryszcz/scripts/gb2gtf.py #allowed_types = set(['gene', 'CDS', 'tRNA', 'tmRNA', 'rRNA', 'ncRNA']) allowed_types = set(['CDS']) #wanted_qualifiers = set(['product', 'locus_tag']) with open(GB_FILE) as fh: for gb in SeqIO.parse(fh, 'gb'): for f in gb.features: if f.type not in allowed_types: continue qualifiers = dict(f.qualifiers) assert len(qualifiers['locus_tag'])==1 and len(qualifiers['product'])==1 gene_id = qualifiers['locus_tag'][0] gene_name = qualifiers['product'][0] assert len(qualifiers['locus_tag']) == 1, (qualifiers['locus_tag']) gene_id_to_name[gene_id] = gene_name return gene_id_to_name
def generate_corpusfile(corpus_fname, n, out): ''' Args: corpus_fname: corpus file name n: the number of chunks to split. In other words, "n" for "n-gram" out: output corpus file path Description: Protvec uses word2vec inside, and it requires to load corpus file to generate corpus. ''' f = open(out, "w") for r in SeqIO.parse(corpus_fname, "fasta"): ngram_patterns = split_ngrams(r.seq, n) for ngram_pattern in ngram_patterns: f.write(" ".join(ngram_pattern) + "\n") sys.stdout.write(".") f.close()
def get_Short(genesList): for gene in genesList: # gene = gene.rstrip('\n') pathtoDir = os.path.join(os.path.dirname(gene), "short") if not os.path.exists(pathtoDir): os.makedirs(pathtoDir) shortgene = os.path.join(os.path.dirname(gene), "short", os.path.basename(gene)) shortgene = shortgene.replace(".fasta", "_short.fasta") #gene_fp2 = HTSeq.FastaReader(gene) for allele in SeqIO.parse(gene, "fasta", generic_dna): fG = open(shortgene, 'w') fG.write('>' + str(allele.id) + '\n' + str(allele.seq.upper()) + '\n') fG.close() break return True
def check_if_list_or_folder(folder_or_list): list_files = [] # check if given a list of genomes paths or a folder to create schema try: f = open(folder_or_list, 'r') f.close() list_files = folder_or_list except IOError: for gene in os.listdir(folder_or_list): try: genepath = os.path.join(folder_or_list, gene) for allele in SeqIO.parse(genepath, "fasta", generic_dna): break list_files.append(os.path.abspath(genepath)) except Exception as e: print (e) pass return list_files
def mmff_from_file(fasta_file, morphs): '''Compute a metamorphic tests files from an input FASTA file. Arguments: fasta_file: an open file object for the FASTA file morphs: list of morph functions to apply Result: tbc ''' morphs_dict={ 'reverse':mmff_reverse, "passthrough": mmff_passthrough } morphed_sequences= [] for seq in SeqIO.parse(fasta_file, "fasta"): for morph in morphs: #print("morph "+str(morphs_dict[morph])+" on "+seq.id) morphed_sequences.append(morphs_dict[morph](seq)) return morphed_sequences
def mmff_from_file(fasta_file, morphs): '''Compute a metamorphic tests files from an input FASTA file. Arguments: fasta_file: an open file object for the FASTA file morphs: list of morph functions to apply Result: tbc ''' morphs_dict={ 'reverse':mmff_reverse, "passthrough": mmff_passthrough, 'numeric_header': mmff_numeric_header } morphed_sequences= [] for seq in SeqIO.parse(fasta_file, "fasta"): for morph in morphs: #print("morph "+str(morphs_dict[morph])+" on "+seq.id) morphed_sequences.append(morphs_dict[morph](seq)) return morphed_sequences
def mmff_from_file(fasta_file, morphs): '''Compute a FastaStats object from an input FASTA file. Arguments: fasta_file: an open file object for the FASTA file morphs: list of morph functions to apply Result: tbc ''' morphs_dict={ 'reverse':mmff_reverse, "passthrough": mmff_passthrough } morphed_sequences= [] for seq in SeqIO.parse(fasta_file, "fasta"): for morph in morphs: print("morph "+str(morphs_dict[morph])+" on "+seq.id) morphed_sequences.append(morphs_dict[morph](seq)) return morphed_sequences
def main(args): for record in SeqIO.parse(args.infile, 'fasta'): if args.discard: if sum([1 for rx in args.discard if re.match(rx, record.id)]) > 0: continue subseqcounter = 0 printlog(args.debug, "DEBUG: convert to upper case", record.id) sequence = str(record.seq).upper() printlog(args.debug, "DEBUG: split seq by Ns", record.id) subseqs = [ss for ss in re.split('[^ACGT]+', sequence) if len(ss) > args.minlength] printlog(args.debug, "DEBUG: print subseqs", record.id) for subseq in subseqs: subseqcounter += 1 subid = '{:s}_chunk_{:d}'.format(record.id, subseqcounter) subrecord = SeqRecord(Seq(subseq), subid, '', '') SeqIO.write(subrecord, args.outfile, 'fasta')
def READ_FASTA_ENTRY(file_name): fasta_sequences=[] sequence_name=[] full_names_dict={} sequences_dict={} if os.stat(file_name)[6]!=0: #not empty fh = open(file_name, "r") for record in SeqIO.parse(fh, "fasta"): short_name=str(record.id).split(' ')[0] sequence_name.append(short_name) full_names_dict[short_name]=str(record.id) fasta_sequences.append(str(record.seq)) sequences_dict[short_name]=str(record.seq) return sequences_dict,fasta_sequences, sequence_name, full_names_dict
def READ_FASTA_ENTRY(file_name): fasta_sequences=[] sequence_name=[] full_names_dict={} sequences_dict={} if os.stat(file_name)[6]!=0: #not empty fh = open(file_name, "r") for record in SeqIO.parse(fh, "fasta"): short_name=str(record.id).split(' ')[0] sequences_dict[short_name]=str(record.seq) return sequences_dict #------------------------------------------------------------------------------------------
def test_malign(self): self.setUpPhylotyper(aa=False) tmpfiles = [ os.path.join(self.test_dir, 'tmp1.fasta'), os.path.join(self.test_dir, 'tmp2.fasta') ] tmpfile = os.path.join(self.test_dir, 'tmp_saln.fasta') aln = SeqAligner(self.configObj) aln.malign(self.subtype_options['input'], tmpfiles, tmpfile) lengths = [] tmpfiles.append(tmpfile) for f in tmpfiles: seqs = SeqIO.parse(open(f, 'r'),'fasta') lengths.append(len(str(seqs.next().seq))) self.assertEqual(lengths[0]+lengths[1], lengths[2]) ## TODO Add test for new multi amino acid/dna
def remove_duplicates(self): """Remove duplicate genbank records There are multiple methods to download genes. Each download method appends to the genbank file, so method this is needed to remove duplicates. Args: None """ input_seq_iterator = SeqIO.parse(open(self._tmpfile, "rU"), "genbank") unique_seq_iterator = self.unique(input_seq_iterator) output_handle = open(self._gbfile, "w") SeqIO.write(unique_seq_iterator, output_handle, "genbank") output_handle.close() os.remove(self._tmpfile)
def get_lengths(fastq): ''' Loop over the fastq file, extract length of sequences ''' return np.array([len(record) for record in SeqIO.parse(fastq, "fastq")])
def get_bin(fq, sizeRange): ''' Loop over the fastq file Extract list of nucleotides and list of quality scores in tuples in list Only select those reads of which the length is within the size range ''' logging.info("Extracting nucleotides and quality scores of selected bin.") return [(list(rec.seq), list(rec.letter_annotations["phred_quality"])) for rec in SeqIO.parse(fq, "fastq") if sizeRange[0] < len(rec) < sizeRange[1]]
def main(): '''Run the import''' if len(sys.argv) < 2: print('Usage: {} <gbk file>'.format(sys.argv[0]), file=sys.stderr) sys.exit(2) connection = psycopg2.connect(DB_CONNECTION) recs = SeqIO.parse(sys.argv[1], 'genbank') with connection: with connection.cursor() as cursor: for rec in recs: if rec.name in BLACKLIST: print('Skipping blacklisted record {!r}'.format(rec.name), file=sys.stderr) continue load_record(rec, cursor) connection.close()
def parse_smcog(feature): '''Parse the smCOG feature qualifier''' if 'note' not in feature.qualifiers: raise ValueError('No note qualifier in {}'.format(feature)) for entry in feature.qualifiers['note']: if not entry.startswith('smCOG:'): continue match = SMCOG_PATTERN.search(entry) if match is None: print(entry) raise ValueError('Failed to parse smCOG line {!r}'.format(entry)) return match.groups() raise ValueError('No smcog qualifier in {}'.format(feature))