我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用Levenshtein.distance()。
def wer(self, s1, s2): """ Computes the Word Error Rate, defined as the edit distance between the two provided sentences after tokenizing to words. Arguments: s1 (string): space-separated sentence s2 (string): space-separated sentence """ # build mapping of words to integers b = set(s1.split() + s2.split()) word2char = {ss: ii for ii, ss in enumerate(b)} # map the words to a char array (Levenshtein packages only accepts # strings) w1 = [chr(word2char[w]) for w in s1.split()] w2 = [chr(word2char[w]) for w in s2.split()] return Lev.distance(''.join(w1), ''.join(w2))
def levenshtein_distance(a, b): """Return the Levenshtein edit distance between two strings *a* and *b*.""" if a == b: return 0 if len(a) < len(b): a, b = b, a if not a: return len(b) previous_row = range(len(b) + 1) for i, column1 in enumerate(a): current_row = [i + 1] for j, column2 in enumerate(b): insertions = previous_row[j + 1] + 1 deletions = current_row[j] + 1 substitutions = previous_row[j] + (column1 != column2) current_row.append(min(insertions, deletions, substitutions)) previous_row = current_row return previous_row[-1]
def getStopFromString(self, candidate): normalizedCandidate = Stop.normalizeStopName(candidate) if not Tpg.getTodaysStops(): return None for stop in Tpg.getTodaysStops(): if candidate.upper() == stop.code: return stop if normalizedCandidate == stop.normalizedName: return stop for stop in Tpg.getTodaysStops(): if normalizedCandidate in stop.normalizedName: return stop # calculate the Levenshtein distance to all stop names codeToLevenshtein = {stop: Levenshtein.distance( normalizedCandidate, stop.normalizedName) for stop in Tpg.getTodaysStops()} # smallest Levenshtein distance minimum = min(codeToLevenshtein, key=codeToLevenshtein.get) return minimum
def test_parse(self): for file in os.listdir(SAMPLE_DIR): if not file.endswith('.rst'): continue filename = os.path.join(SAMPLE_DIR, file) article = parse_article(filename) rendered = article.render().strip() with open(filename) as f: source = f.read().strip() source = source.expandtabs(4).decode('utf8') if source != rendered: lev_ = distance(source, rendered) jaro_ = jaro(source, rendered) if lev_ > 10 and jaro_ < 0.8 and file not in MUTATED_FILES: print('%d %f %s' % (lev_, jaro_, filename)) raise AssertionError(filename)
def compute_edit_distance(session, labels_true_st, labels_pred_st): """Compute edit distance per mini-batch. Args: session: labels_true_st: A `SparseTensor` of ground truth labels_pred_st: A `SparseTensor` of prediction Returns: edit_distances: list of edit distance of each uttearance """ indices, values, dense_shape = labels_true_st labels_pred_pl = tf.SparseTensor(indices, values, dense_shape) indices, values, dense_shape = labels_pred_st labels_true_pl = tf.SparseTensor(indices, values, dense_shape) edit_op = tf.edit_distance(labels_pred_pl, labels_true_pl, normalize=True) edit_distances = session.run(edit_op) return edit_distances
def compute_per(ref, hyp, normalize=True): """Compute Phone Error Rate. Args: ref (list): phones in the reference transcript hyp (list): phones in the predicted transcript normalize (bool, optional): if True, divide by the length of str_true Returns: per (float): Phone Error Rate between str_true and str_pred """ # Build mapping of phone to index phone_set = set(ref + hyp) phone2char = dict(zip(phone_set, range(len(phone_set)))) # Map phones to a single char array # NOTE: Levenshtein packages only accepts strings phones_ref = [chr(phone2char[p]) for p in ref] phones_hyp = [chr(phone2char[p]) for p in hyp] per = lev.distance(''.join(phones_ref), ''.join(phones_hyp)) if normalize: per /= len(ref) return per
def inference(predictions_op, true_labels_op, display, sess): """ Perform inference per batch on pre-trained model. This function performs inference and computes the CER per utterance. Args: predictions_op: Prediction op true_labels_op: True Labels op display: print sample predictions if True sess: default session to evaluate the ops. Returns: char_err_rate: list of CER per utterance. """ char_err_rate = [] # Perform inference of batch worth of data at a time. [predictions, true_labels] = sess.run([predictions_op, true_labels_op]) pred_label = sparse_to_labels(predictions[0][0]) actual_label = sparse_to_labels(true_labels) for (label, pred) in zip(actual_label, pred_label): char_err_rate.append(distance(label, pred)/len(label)) if display: # Print sample responses for i in range(ARGS.batch_size): print(actual_label[i] + ' vs ' + pred_label[i]) return char_err_rate
def get_strings_for_search(value): """ Returns all statements which have a substring of the given value :param value: String :return: dict() with Statements.uid as key and 'text', 'distance' as well as 'arguments' as values """ tmp_dict = OrderedDict() db_statements = get_not_disabled_statement_as_query().join(TextVersion, Statement.textversion_uid == TextVersion.uid).all() for stat in db_statements: if value.lower() in stat.textversions.content.lower(): # get distance between input value and saved value rd = __get_fuzzy_string_dict(current_text=value, return_text=stat.textversions.content, uid=stat.uid) tmp_dict[str(stat.uid)] = rd tmp_dict = __sort_dict(tmp_dict) return_index = list(islice(tmp_dict, list_length)) return_dict = OrderedDict() for index in return_index: return_dict[index] = tmp_dict[index] return return_dict
def get_strings_for_public_nickname(value, nickname): """ Returns dictionaries with public nicknames of users, where the nickname containts the value :param value: String :param nickname: current users nickname :return: dict() """ db_user = DBDiscussionSession.query(User).filter(func.lower(User.public_nickname).contains(func.lower(value)), ~User.public_nickname.in_([nickname, 'admin', nick_of_anonymous_user])).all() return_array = [] for index, user in enumerate(db_user): dist = get_distance(value, user.public_nickname) return_array.append({'index': index, 'distance': dist, 'text': user.public_nickname, 'avatar': get_public_profile_picture(user)}) return_array = __sort_array(return_array) return return_array[:list_length]
def __sort_array(list): """ Returns sorted array, based on the distance :param list: Array :return: Array """ return_list = [] newlist = sorted(list, key=lambda k: k['distance']) if mechanism == 'SequenceMatcher': # sort descending newlist = reversed(newlist) # add index for index, dict in enumerate(newlist): dict['index'] = index return_list.append(dict) return return_list
def __sort_dict(dictionary): """ Returns sorted dictionary, based on the distance :param dictionary: dict() :return: dict() """ dictionary = OrderedDict(sorted(dictionary.items())) return_dict = OrderedDict() for i in list(dictionary.keys())[0:return_count]: return_dict[i] = dictionary[i] if mechanism == 'SequenceMatcher': # sort descending return_dict = OrderedDict(sorted(dictionary.items(), key=lambda kv: kv[0], reverse=True)) else: # sort ascending return_dict = OrderedDict() for i in list(dictionary.keys())[0:return_count]: return_dict[i] = dictionary[i] return return_dict
def getSignificantItems(item_list): tokenised_list = [] logging.info('Tokenising input data.') for item in item_list: tokenised_list.append(tokeniseUrl(item)) items = np.asarray(item_list) tokenised_items = np.asarray(tokenised_list) logging.info('Calculating Levenshtein distances between items.') lev_similarity = -1*np.array([[Levenshtein.distance(i1,i2) for i1 in tokenised_items] for i2 in tokenised_items]) logging.info('Applying affinity propagation to data.') aff_prop = sklearn.cluster.AffinityPropagation(affinity='precomputed', damping=0.7) aff_prop.fit(lev_similarity) logging.info('Completed! Assembling list.') output_list = [] for cluster_id in np.unique(aff_prop.labels_): exemplar = items[aff_prop.cluster_centers_indices_[cluster_id]] output_list.append(exemplar) return output_list
def wer(self, s1, s2): """ Computes the Word Error Rate, defined as the edit distance between the two provided sentences after tokenizing to words. Arguments: s1 (string): space-separated sentence s2 (string): space-separated sentence """ # build mapping of words to integers b = set(s1.split() + s2.split()) word2char = dict(zip(b, range(len(b)))) # map the words to a char array (Levenshtein packages only accepts # strings) w1 = [chr(word2char[w]) for w in s1.split()] w2 = [chr(word2char[w]) for w in s2.split()] return Lev.distance(''.join(w1), ''.join(w2))
def maybe_same(str1,str2): '''??2????????????''' if len(str1) > len(str2): temp = str1 str1 = str2 str2 = temp #?????????????? if float(len(str2))/ len(str1) > 2 and len(str1)>=4: return False #????????2, ????? distance = Levenshtein.distance(str1,str2) if distance <= 3 and len(str1)>=10: return True if distance <= 4 and len(str1)>=13: return True if distance <= 1 and len(str1)>=5: return True if distance > 2 and len(str1)<=6: return False if distance > 3: return False return True
def find_knn(self, train_strings, train_labels, test_strings): """Find 3 nearest neighbors of each item in test_strings in train_strings and report their labels as the prediction. Args: train_strings (ndarray): Numpy array with strings in training set train_labels (ndarray): Numpy array with labels of train_strings test_strings (ndarray): Numpy array with string to be predict for """ prediction = np.zeros((len(test_strings), self.num_classes)) for i in range(len(test_strings)): a_str = test_strings[i] dists = np.array([0] * len(train_strings)) for j in range(len(train_strings)): b_str = train_strings[j] dists[j] = lev.distance(a_str, b_str) # finding the top 3 top3 = dists.argsort()[:3] for ind in top3: prediction[i][self.column_index[train_labels[ind]]] += 1.0 / 3 return prediction
def gitignores(*args): to_send = [] gitignore_list = list() for arg in set(args): if arg in gitignore_list: to_send.append(arg) elif __name__ == '__main__': possibles = [] for gitignore in gitignore_list: if Levenshtein.distance(gitignore, arg) == 1: possibles.append(gitignore) print('WARNING: {} is not in gitignore list.'.format(arg), file=sys.stderr, end='') if possibles: if len(possibles) == 1: possible_string = possibles[0] else: possible_string = ', '.join(possibles[:-1]) + ' or ' + possibles[-1] print(' Did you mean {}?'.format(possible_string), file=sys.stderr) else: print('', file=sys.stderr) if not to_send: return '\n' text = _get_text_from_url('{}/{}'.format(API_URL, ','.join(to_send))) return '\n'.join(text.split('\n')[2:])
def prune_useless_elements(path_root): to_remove = [] for c in path_root.children: for useless in USELESS_KEYWORDS: if c.id and (distance(c.id, useless) <= MAX_DISTANCE or useless in c.id): #print('Removing {0} because of id {1}'.format(c, useless)) to_remove.append(c) if c.cls: for cl in c.cls: if distance(cl, useless) <= MAX_DISTANCE or useless in cl: #print('Removing {0} because of class name {1}'.format(c, cl)) to_remove.append(c) path_root.children = [c for c in path_root.children if c not in to_remove] for c in path_root.children: prune_useless_elements(c)
def distanceDomain(domain, DomainDict, ccTldDict, tldDict): similarDomain = "" minDistance = sys.maxint level = domain.split(".") if len(level) <=1: return ("not a domain", sys.maxint) (domain2LD, domain3LD, domain2LDs, domain3LDs) = extractLevelDomain(domain, ccTldDict, tldDict) for popularDomain in DomainDict: distance = Levenshtein.distance(domain2LD.decode('utf-8'), popularDomain.decode('utf-8')) if distance < minDistance: minDistance = distance similarDomain = popularDomain #debug #sys.stdout.write("subdomain: %s, similarDomain: %s, minDistance: %d\n" % (subdomain, similarDomain, minDistance)) if len(similarDomain) > 0: return (similarDomain, minDistance/float(len(similarDomain))) else: return (domain2LD, 0) # check whether a domain contains invalid TLD
def wer(self, s1, s2): """ Computes the Word Error Rate, defined as the edit distance between the two provided sentences after tokenizing to words. Arguments: s1 (string): space-separated sentence s2 (string): space-separated sentence """ # build mapping of words to integers s1 = s1.replace(' ','') s2 = s2.replace(' ','') b = set(s1.split('<space>') + s2.split('<space>')) word2char = dict(zip(b, range(len(b)))) # map the words to a char array (Levenshtein packages only accepts # strings) w1 = [chr(word2char[w]) for w in s1.split()] w2 = [chr(word2char[w]) for w in s2.split()] return Lev.distance(''.join(w1), ''.join(w2))
def distance(self): if not self._distance: self._distance = distance(self._str1, self._str2) return self._distance
def levenshtein(self, msg, args): """Calculate levenshtein distance between two words""" if len(args) == 2: result = "Levenshtein distance: " + str(pylev.distance(args[0],args[1])) else: result = "Two words are needed to calculate Levenshtein distance" return result
def filter_hits_by_distance(hits, source_text, min_similarity=DEFAULT_MIN_SIMILARITY): """Returns ES `hits` filtered according to their Levenshtein distance to the `source_text`. Any hits with a similarity value (0..1) lower than `min_similarity` will be discarded. It's assumed that `hits` is already sorted from higher to lower score. """ if min_similarity <= 0 or min_similarity >= 1: min_similarity = DEFAULT_MIN_SIMILARITY filtered_hits = [] for hit in hits: hit_source_text = hit['_source']['source'] distance = Levenshtein.distance(source_text, hit_source_text) similarity = ( 1 - distance / float(max(len(source_text), len(hit_source_text))) ) logger.debug( 'Similarity: %.2f (distance: %d)\nOriginal:\t%s\nComparing with:\t%s', similarity, distance, source_text, hit_source_text ) if similarity < min_similarity: break filtered_hits.append(hit) return filtered_hits
def fuzzy_match(self, locale, condition_name): condition_name = self.normalize_input(condition_name) conditions_candidates = self.get_condition_candidates(locale, condition_name) sorted_candidates = sorted(conditions_candidates.items(), cmp=lambda x, y: Levenshtein.distance(condition_name, x[1]) - Levenshtein.distance( condition_name, y[1])) return sorted_candidates[0][0]
def get_condition_candidates(self, locale, condition_name): return {condition: min(mappings[condition][locale], key=lambda s: Levenshtein.distance(condition_name, s)) for condition in list(SnipsWeatherConditions)}
def match_something(item, list): item = item.replace(" ","") item = item.replace(".", "") item = item.replace(",", "") lowest = list[0] lowestdelta = Levenshtein.distance(item, list[0]) for entry in list: delta = Levenshtein.distance(item, entry) if delta < lowestdelta: lowestdelta = delta lowest = entry print(delta, item, entry) return lowest
def cer(self, s1, s2): """ Computes the Character Error Rate, defined as the edit distance. Arguments: s1 (string): space-separated sentence s2 (string): space-separated sentence """ return Lev.distance(s1, s2)
def compare_strings_concat_levenshtein(sample, ref): """ Concatenates all strings from `sample` into one, and all strings from `ref` into another. They are then compared by their Levenshtein distance. This results in a fuzzy comparison: it detects changes within strings and within the list of strings. """ if hasattr(ref, 'strs') and ref.strs is not None: i = 0 ratios = 0 for section in ref.strs: if section not in sample.strs: continue strs_a_concat = ''.join(sample.strs[section]) strs_b_concat = ''.join(ref.strs[section]) if len(strs_a_concat) == 0 or len(strs_b_concat) == 0: continue # Similarity meassurement from # Gheorghescu, M. (2005). An Automated Virus Classification System. # Virus Bulletin Conference, (October), 294-300. # (although they use it on a list of basic blocks instead of a # character string) ratio_sec = 1 - (Levenshtein.distance(strs_a_concat, strs_b_concat) / float(max(len(strs_a_concat), len(strs_b_concat)))) ratios += ratio_sec i += 1 ratio = ratios / i if i > 0 else 0.0 else: ratio = 0.0 return (ratio * 100, ref.name, ref.version)
def compare_cc_list_levenshtein(sample, ref): """ Compares the cyclomatic complexity values of all functions in `sample` with those of all functions in `ref`, by taking the Levenshtein distance between these lists. This detects added/removed functions and functions that have changed in complexity between a sample and a reference. """ if hasattr(ref, 'cclist') and ref.cclist is not None: ratio = 1 - (editdistance.eval(sample.cclist, ref.cclist) / float(max(len(sample.cclist), len(ref.cclist)))) else: ratio = 0.0 return (ratio * 100, ref.name, ref.version)
def setup_argparser(parser): parser.add_argument('-m','--patternmodel', type=str,help="Pattern model of a background corpus (training data; Colibri Core unindexed patternmodel)", action='store',required=True) parser.add_argument('-l','--lexicon', type=str,help="Lexicon file (training data; plain text, one word per line)", action='store',required=False) parser.add_argument('-L','--lm', type=str,help="Language model file in ARPA format", action='store',required=False) parser.add_argument('-c','--classfile', type=str,help="Class file of background corpus", action='store',required=True) parser.add_argument('-k','--neighbours','--neighbors', type=int,help="Maximum number of anagram distances to consider (the actual amount of anagrams is likely higher)", action='store',default=3, required=False) parser.add_argument('-K','--candidates', type=int,help="Maximum number of candidates to consider per input token/pattern", action='store',default=100, required=False) parser.add_argument('-n','--topn', type=int,help="Maximum number of candidates to return", action='store',default=10,required=False) parser.add_argument('-N','--ngrams', type=int,help="N-grams to consider (max value of n). Ensure that your background corpus is trained for at least the same length for this to have any effect!", action='store',default=3,required=False) parser.add_argument('-D','--maxld', type=int,help="Maximum levenshtein distance", action='store',default=5,required=False) parser.add_argument('-M','--maxvd', type=int,help="Maximum vector distance", action='store',default=5,required=False) parser.add_argument('-t','--minfreq', type=int,help="Minimum frequency threshold (occurrence count) in background corpus", action='store',default=1,required=False) parser.add_argument('-a','--alphafreq', type=int,help="Minimum alphabet frequency threshold (occurrence count); characters occuring less are not considered in the anagram vectors", action='store',default=10,required=False) parser.add_argument('-b','--beamsize', type=int,help="Beamsize for the decoder", action='store',default=100,required=False) parser.add_argument('--maxdeleteratio', type=float,help="Do not allow a word to lose more than this fraction of its letters", action='store',default=0.34,required=False) parser.add_argument('--lexfreq', type=int,help="Artificial frequency (occurrence count) for items in the lexicon that are not in the background corpus", action='store',default=1,required=False) parser.add_argument('--ldweight', type=float,help="Levenshtein distance weight for candidating ranking", action='store',default=1,required=False) parser.add_argument('--vdweight', type=float,help="Vector distance weight for candidating ranking", action='store',default=1,required=False) parser.add_argument('--freqweight', type=float,help="Frequency weight for candidating ranking", action='store',default=1,required=False) parser.add_argument('--lexweight', type=float,help="Lexicon distance weight for candidating ranking", action='store',default=1,required=False) parser.add_argument('--lmweight', type=float,help="Language Model weight for Language Model selection (together with --correctionweight)", action='store',default=1,required=False) parser.add_argument('--correctionweight', type=float,help="Correction Model weight for Language Model selection (together with --lmweight)", action='store',default=1,required=False) parser.add_argument('--correctscore', type=float,help="The score a word must reach to be marked correct prior to decoding", action='store',default=0.60,required=False) parser.add_argument('--correctfreq', type=float,help="The frequency a word must have for it to be marked correct prior to decoding",action='store',default=200,required=False) parser.add_argument('--punctweight', type=int,help="Punctuation character weight for anagram vector representation", action='store',default=1,required=False) parser.add_argument('--unkweight', type=int,help="Unknown character weight for anagram vector representation", action='store',default=1,required=False) parser.add_argument('--ngramboost',type=float, help="Boost unigram candidates that are also predicted as part of larger ngrams, by the specified factor",action='store', default=0.25,required=False) parser.add_argument('-1','--simpledecoder',action='store_true', help="Use only unigrams in decoding") parser.add_argument('--lmwin',action='store_true', help="Boost the scores of the LM selection (to 1.0) just prior to output") parser.add_argument('--locallm',action='store_true', help="Use a local LM to select a preferred candidate in each candidate list instead of the LM integrated in the decoder") parser.add_argument('--blocksize',type=int, action='store', help="Block size: determines the amount of test tokens to process in one go (dimensions of the anavec test matrix), setting this helps reduce memory at the cost of speed (0 = unlimited)",default=1000) parser.add_argument('--report',action='store_true', help="Output a full report") parser.add_argument('--json',action='store_true', help="Output JSON") parser.add_argument('--tok',action='store_true', help="Input is already tokenized") parser.add_argument('--noout',dest='output',action='store_false', help="Do not output") parser.add_argument('-d', '--debug',action='store_true')
def PopulateAmCacheTemporalCollaterals(fileName, sqlTweak, DB, collateralDBTableName, reconWindow=3): countHostsProcessed = 0 # Process each occurrence of the FileName if sqlTweak is "": data = DB.Query("SELECT RowID, HostID, FileName, FirstRun from Entries WHERE EntryType = %s AND FileName = '%s'" % (settings.__AMCACHE__, fileName)) else: data = DB.Query("SELECT RowID, HostID, FileName, FirstRun from Entries_FilePaths WHERE EntryType = %s AND FileName = '%s' AND %s" % (settings.__AMCACHE__, fileName, sqlTweak)) rowList = [] countRowsToProcess = len(data) countRowsProcessed = 0 # Executed before for row in data: rowID = row[0] hostID = row[1] fileName = row[2] firstRun = row[3] # Insert entry into DB DB.Execute("INSERT INTO " + collateralDBTableName + " VALUES (NULL,%s, 0, 0, 0, 0)" % (rowID)) # Check recon window countRowsProcessed += 1 update_progress(float(countRowsProcessed) / float(countRowsToProcess), fileName) minFirstRun = firstRun - datetime.timedelta(0,60 * reconWindow) maxFirstRun = firstRun + datetime.timedelta(0,60 * reconWindow) reconEntries = DB.Query("SELECT RowID, HostID, FileName, FirstRun FROM Entries WHERE EntryType = %s AND (FirstRun >= '%s' AND FirstRun <= '%s')" % (settings.__AMCACHE__, minFirstRun, maxFirstRun)) # Filter out incorrect correlations when RowID jumps from one host to the next # Weight correlation value according to temporal execution distance for entry in reconEntries: if entry[1] == hostID and entry[2] != fileName: weight = (1.0 / (math.pow(abs(rowID -entry[0]),2))*10) if entry[3] < firstRun: rowList.append(tuple((int(entry[0]), 1, 0, weight))) else: rowList.append(tuple((int(entry[0]), 0, 1, weight))) DB.ExecuteMany("INSERT INTO " + collateralDBTableName + " VALUES (NULL,?, ?, ?, ?, 0)", rowList)
def distance_to(self, other): ''' Length-adjusted Levenshtein "distance" to other OTU other: OTU distance to this OTU returns: float ''' return Levenshtein.distance(self.sequence, other.sequence) / (0.5 * (len(self.sequence) + len(other.sequence)))
def __init__(self, seq_table, records, max_dist, min_fold, threshold_pval, log=None): ''' seq_table: pandas.DataFrame Samples on the columns; sequences on the rows records: index of Bio.Seq Indexed, unaligned input sequences. This could come from BioPython's SeqIO.to_dict or SeqIO.index. max_dist: float genetic distance cutoff above which a sequence will not be merged into an OTU min_fold: float Multiply the sequence's abundance by this fold to get the minimum abundance of an OTU for merging threshold_pval: float P-value below which a sequence will not be merged into an OTU log: filehandle Log file reporting the abundance, genetic, and distribution checks. ''' self.seq_table = seq_table self.records = records self.max_dist = max_dist self.min_fold = min_fold self.threshold_pval = threshold_pval self.log = log # get a list of the names of the sequences in order of their (decreasing) abundance self.seq_abunds = self.seq_table.sum(axis=1).sort_values(ascending=False) # check that all sequence IDs in the table are in the fasta missing_ids = [seq_id for seq_id in self.seq_abunds.index if seq_id not in self.records] if len(missing_ids) > 0: raise RuntimeError("{} sequence IDs found in the sequence table but not in the fasta: {}".format(len(missing_ids), missing_ids)) # initialize OTU information self.membership = {} self.otus = []
def ga_matches(self, candidate): ''' OTUs that meet the genetic and abundance criteria candidate: OTU sequence to evaluate ''' # find abundance matches min_abundance = self.min_fold * candidate.abundance abundance_matches = [otu for otu in self.otus if otu.abundance > min_abundance] if self.log is not None: print(candidate.name, 'abundance_check', *[otu.name for otu in abundance_matches], sep='\t', file=self.log) if len(abundance_matches) == 0: return [] else: # find genetic matches (in order of decreasing genetic distance) matches_distances = [(otu.distance_to(candidate), otu) for otu in abundance_matches] matches_distances.sort(key=lambda x: (x[0], -x[1].abundance, x[1].name)) matches = [otu for dist, otu in matches_distances if dist < self.max_dist] if self.log is not None: print(candidate.name, 'genetic_check', *[otu.name for otu in matches], sep='\t', file=self.log) return matches
def call_otus(seq_table_fh, fasta_fh, output_fh, dist_crit, abund_crit, pval_crit, log=None, membership=None): ''' Read in input files, call OTUs, and return output. seq_table_fh: filehandle sequence count table fasta_fh: filehandle or filename sequences fasta output_fh: filehandle place to write main output OTU table dist_crit, abund_crit, pval_crit: float threshold values for distance, abundance, and pvalue log, membership: filehandles places to write supplementary output ''' # read in the sequences table seq_table = read_sequence_table(seq_table_fh) # set up the input fasta records records = SeqIO.index(fasta_fh, 'fasta') # generate the caller object caller = DBCaller(seq_table, records, dist_crit, abund_crit, pval_crit, log) caller.generate_otu_table() caller.write_otu_table(output_fh) if membership is not None: caller.write_membership(membership)
def compute_cer(str_pred, str_true, normalize=True): """Compute Character Error Rate. Args: str_pred (string): a sentence without spaces str_true (string): a sentence without spaces normalize (bool, optional): if True, divide by the length of str_true Returns: cer (float): Character Error Rate between str_true and str_pred """ cer = lev.distance(str_pred, str_true) if normalize: cer /= len(list(str_true)) return cer
def _execute(self, str1, str2): LDAlgorithm._execute(self, str1, str2) return levenshtein_distance(str1, str2)
def get_ratio(old, new): """Return a "similiarity ratio" (in percent) representing the similarity between the two strings where 0 is equal and anything above less than equal. """ if not all([old, new]): return VERSIONING_RATIO if IS_SPEEDUP: return Levenshtein.distance(old, new) / (len(old) / 100.0) else: return levenshtein_distance(old, new) / (len(old) / 100.0)
def testDefaultParseValueFuzz(self, value): try: result = parser.DefaultParseValue(value) except TypeError: # It's OK to get a TypeError if the string has the null character. if u'\x00' in value: return raise except MemoryError: if len(value) > 100: # This is not what we're testing. return raise try: uvalue = unicode(value) uresult = unicode(result) except UnicodeDecodeError: # This is not what we're testing. return # Check that the parsed value doesn't differ too much from the input. distance = Levenshtein.distance(uresult, uvalue) max_distance = ( 2 + # Quotes or parenthesis can be implicit. sum(c.isspace() for c in value) + value.count('"') + value.count("'") + 3 * (value.count(',') + 1) + # 'a,' can expand to "'a', " 3 * (value.count(':')) + # 'a:' can expand to "'a': " 2 * value.count('\\')) if '#' in value: max_distance += len(value) - value.index('#') if not isinstance(result, six.string_types): max_distance += value.count('0') # Leading 0s are stripped. # Note: We don't check distance for dicts since item order can be changed. if '{' not in value: self.assertLessEqual(distance, max_distance, (distance, max_distance, uvalue, uresult))
def closest_token(stemmed_token_lst, merchant_info): score = 0 merchant_tokens = merchant_info.split() # only split works in merchant_info for t in stemmed_token_lst: min_dist = sys.maxint for m in merchant_tokens: tmp_dist = distance(t, m) if min_dist > tmp_dist: min_dist = tmp_dist score += min_dist return score
def closest_token(stemmed_token_lst, merchant_info): score = 0 merchant_tokens = [stemmer.stem(m) for m in merchant_info.split()] # stem merchant tokens here for t in stemmed_token_lst: min_dist = sys.maxint for m in merchant_tokens: tmp_dist = distance(t, m) if min_dist > tmp_dist: min_dist = tmp_dist score += min_dist return score
def closest_token(stemmed_token_lst, merchant_info): min_dist = sys.maxint # only use the min_dist for all as the score merchant_tokens = [stemmer.stem(m) for m in merchant_info.split()] # stem merchant tokens here for t in stemmed_token_lst: for m in merchant_tokens: tmp_dist = distance(t, m) if min_dist > tmp_dist: min_dist = tmp_dist return min_dist
def hamming_distance(string1, string2): """ Computes the Hamming distance between two strings. The Hamming distance between two strings of equal length is the number of positions at which the corresponding symbols are different. In another way, it measures the minimum number of substitutions required to change one string into the other, or the minimum number of errors that could have transformed one string into the other. Args: string1,string2 (str): Input strings Returns: Hamming distance (int) Raises: TypeError : If the inputs are not strings or if one of the inputs is None. ValueError : If the input strings are not of same length Examples: >>> hamming_distance('', '') 0 >>> hamming_distance('alex', 'john') 4 >>> hamming_distance(' ', 'a') 0 >>> hamming_distance('JOHN', 'john') 4 """ # input validations utils.sim_check_for_none(string1, string2) utils.tok_check_for_string_input(string1, string2) # for Hamming Distance string length should be same utils.sim_check_for_same_len(string1, string2) # sum all the mismatch characters at the corresponding index of # input strings return sum(bool(ord(c1) - ord(c2)) for c1, c2 in zip(string1, string2))
def levenshtein(string1, string2): """ Computes the Levenshtein distance between two strings. Levenshtein distance computes the minimum cost of transforming one string into the other. Transforming a string is carried out using a sequence of the following operators: delete a character, insert a character, and substitute one character for another. Args: string1,string2 (str): Input strings Returns: Levenshtein distance (int) Raises: TypeError : If the inputs are not strings Examples: >>> levenshtein('a', '') 1 >>> levenshtein('example', 'samples') 3 >>> levenshtein('levenshtein', 'frankenstein') 6 Note: This implementation internally uses python-levenshtein package to compute the Levenshtein distance """ # input validations utils.sim_check_for_none(string1, string2) utils.sim_check_for_string_inputs(string1, string2) # using Levenshtein library return Levenshtein.distance(string1, string2)
def get_edit_distance(str1, str2): return Levenshtein.distance(str1, str2)
def __get_fuzzy_string_dict(index=0, current_text='', return_text='', uid=0): """ Returns dictionary with index, distance, text and statement_uid as keys :param index: int :param current_text: string :param return_text: string :param uid: int :return: dict() """ return {'index': index, 'distance': get_distance(current_text.lower(), return_text.lower()), 'text': return_text, 'statement_uid': uid}