我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用functools.cmp_to_key()。
def sort(cmpfn): if not this.Class in ('Array', 'Arguments'): return this.to_object() # do nothing arr = [] for i in xrange(len(this)): arr.append(this.get(six.text_type(i))) if not arr: return this if not cmpfn.is_callable(): cmpfn = None cmp = lambda a,b: sort_compare(a, b, cmpfn) if six.PY3: key = functools.cmp_to_key(cmp) arr.sort(key=key) else: arr.sort(cmp=cmp) for i in xrange(len(arr)): this.put(six.text_type(i), arr[i]) return this
def sanity_check(repo, category, name, vernum, suff, rev, slot, ver=None): """ Checks whether a package descriptor is valid and corresponds to a package in a configured portage repository """ if not name: return False if repo and repo not in list(portage_repos()): return False if not ver: if (rev or suff) and not vernum: return False if vernum: ver = ver_str(vernum, suff, rev) else: ver = None pkgs = repo_pkgs(repo, category, name, ver, slot) if not pkgs: return False pkg = sorted(pkgs, key=cmp_to_key(lambda x,y:vercmp(x[3],y[3])), reverse=True)[0] return PortagePackage(*pkg)
def list_remote_version_info(env: Environment, package_name: str) -> list: """ """ def from_key(key: str) -> dict: filename = key.strip('/').split('/')[-1] safe_version = filename.rsplit('.', 1)[0] return dict( name=package_name, safe_version=safe_version, version=versioning.deserialize(safe_version) ) versions = [ from_key(key) for key in list_remote_package_keys(env, package_name) ] def compare_versions(a: dict, b: dict) -> int: return semver.compare(a['version'], b['version']) return sorted(versions, key=functools.cmp_to_key(compare_versions))
def cmp_to_key(mycmp): """Convert a cmp= function into a key= function""" class K(object): __slots__ = ['obj'] def __init__(self, obj, *args): self.obj = obj def __lt__(self, other): return mycmp(self.obj, other.obj) < 0 def __gt__(self, other): return mycmp(self.obj, other.obj) > 0 def __eq__(self, other): return mycmp(self.obj, other.obj) == 0 def __le__(self, other): return mycmp(self.obj, other.obj) <= 0 def __ge__(self, other): return mycmp(self.obj, other.obj) >= 0 def __ne__(self, other): return mycmp(self.obj, other.obj) != 0 def __hash__(self): raise TypeError('hash not implemented') return K # Back up our definitions above in case they're useful
def get_raster_matrixs(self, size): points = sorted(self.submachine.get_raster_points(), key=cmp_to_key(compare_raster_point)) step = self.submachine.stepsize margin = 5 mat = [ [[x, y, 0] for x in np.linspace(size[0] - margin * step[0], size[1] + margin * step[0], (size[1] - size[0] ) / step[0] + 1 + 2 * margin) ] for y in np.linspace(size[2] - margin * step[1], size[3] + margin * step[1], (size[3] - size[2] ) / step[1] + 1 + 2 * margin) ] for p in points: i = int(p[1] / step[1] - size[2] / step[1]) + margin j = int(p[0] / step[0] - size[0] / step[0]) + margin mat[i][j] = p return mat
def get_2d_projection_on_basis(self, basis, offset=None): """ (TODO: Documentation) """ a = np.array(basis[0]) b = np.array(basis[1]) self.concat_contour() bas = np.array([a, b]) data = self.get_3d_polygon() product = np.dot(data, np.transpose(bas)) compare = self.cube.pixel_size filtered = pytriplib.filter_points(product, compare / 2.0) filtered = np.array(sorted(filtered, key=cmp_to_key(_voi_point_cmp))) filtered = pytriplib.points_to_contour(filtered) product = filtered if offset is not None: offset_proj = np.array([np.dot(offset, a), np.dot(offset, b)]) product = product[:] - offset_proj return product
def test_undetected_mutation(self): # Python 2.4a1 did not always detect mutation # So does pypy... memorywaster = [] for i in range(20): def mutating_cmp(x, y): L.append(3) L.pop() return (x > y) - (x < y) L = [1,2] self.assertRaises(ValueError, L.sort, key=cmp_to_key(mutating_cmp)) def mutating_cmp(x, y): L.append(3) del L[:] return (x > y) - (x < y) self.assertRaises(ValueError, L.sort, key=cmp_to_key(mutating_cmp)) memorywaster = [memorywaster] #==============================================================================
def _get_relevant_migrations(migration_scripts, current_version, new_version): relevant_migrations = [] for migration in migration_scripts: migration_original_version, migration_new_version = _get_migration_versions( migration) if not migration_original_version or not migration_new_version: continue if Upgrader.compareVersions(current_version, new_version) >= 0: if Upgrader.compareVersions( migration_new_version, migration_original_version) > 0: continue if Upgrader.compareVersions(migration_original_version, current_version) <= 0 \ and Upgrader.compareVersions(migration_new_version, new_version) >= 0: relevant_migrations.append(migration) else: if Upgrader.compareVersions( migration_original_version, migration_new_version) > 0: continue if Upgrader.compareVersions(migration_original_version, current_version) >= 0 \ and Upgrader.compareVersions(migration_new_version, new_version) <= 0: relevant_migrations.append(migration) relevant_migrations = sorted( relevant_migrations, key=cmp_to_key(_compare_migration_scripts)) return relevant_migrations
def post_process(self, rows): """ Build a table including: Project name | Bug pts | Story pts | Debt % Name1 | NameN | Grand Total | """ project_points_by_type = reduce_(self._to_story_points, rows, {}) results = ({ 'project_name': k, 'bug_points': self._format_points(v[0]), 'story_points': self._format_points(v[1]), 'tech_debt': self._tech_debt_perc(v) } for k, v in project_points_by_type.items()) sort_by_name = partial(self._sort_by_name, 'project_name') return sorted(results, key=cmp_to_key(sort_by_name))
def test_undetected_mutation(self): # Python 2.4a1 did not always detect mutation memorywaster = [] for i in range(20): def mutating_cmp(x, y): L.append(3) L.pop() return (x > y) - (x < y) L = [1,2] self.assertRaises(ValueError, L.sort, key=cmp_to_key(mutating_cmp)) def mutating_cmp(x, y): L.append(3) del L[:] return (x > y) - (x < y) self.assertRaises(ValueError, L.sort, key=cmp_to_key(mutating_cmp)) memorywaster = [memorywaster] #==============================================================================
def test_reverse_stability(self): data = [(random.randrange(100), i) for i in range(200)] copy1 = data[:] copy2 = data[:] def my_cmp(x, y): x0, y0 = x[0], y[0] return (x0 > y0) - (x0 < y0) def my_cmp_reversed(x, y): x0, y0 = x[0], y[0] return (y0 > x0) - (y0 < x0) data.sort(key=cmp_to_key(my_cmp), reverse=True) copy1.sort(key=cmp_to_key(my_cmp_reversed)) self.assertEqual(data, copy1) copy2.sort(key=lambda x: x[0], reverse=True) self.assertEqual(data, copy2) #==============================================================================
def test_cmp_to_key_arguments(self): def cmp1(x, y): return (x > y) - (x < y) key = functools.cmp_to_key(mycmp=cmp1) self.assertEqual(key(obj=3), key(obj=3)) self.assertGreater(key(obj=3), key(obj=1)) with self.assertRaises((TypeError, AttributeError)): key(3) > 1 # rhs is not a K object with self.assertRaises((TypeError, AttributeError)): 1 < key(3) # lhs is not a K object with self.assertRaises(TypeError): key = functools.cmp_to_key() # too few args with self.assertRaises(TypeError): key = functools.cmp_to_key(cmp1, None) # too many args key = functools.cmp_to_key(cmp1) with self.assertRaises(TypeError): key() # too few args with self.assertRaises(TypeError): key(None, None) # too many args
def get_active_messages(self, when=None, event_pattern=None, here=True): """ :param when: the time for which to check effectiveness of the messages, default = the present time :param event_pattern: a regular expression to match the desired event codes. default = all. :param here: True to retrieve local messages, False to retrieve those for other locales """ if when is None: when = time.time() if event_pattern is None: event_pattern = re.compile(".*") elif not hasattr(event_pattern, 'match'): event_pattern = re.compile(event_pattern) if here: msgs = self.__messages else: msgs = self.__elsewhere_messages l = list(filter(lambda m: m.is_effective(when) and event_pattern.match(m.get_event_type()), msgs)) l.sort(key=functools.cmp_to_key(self.same_sort)) return l
def get_active_messages(self, when=None, event_pattern=None, here=True): """ :param when: the time for which to check effectiveness of the messages, default = the present time :param event_pattern: a regular expression to match the desired event codes. default = all. :param here: True to retrieve local messages, False to retrieve those for other locales """ if when is None: when = time.time() if event_pattern is None: event_pattern = re.compile(".*") elif not hasattr(event_pattern, 'match'): event_pattern = re.compile(event_pattern) l = list(filter(lambda m: m.is_effective(self.latlon, self.county_fips, here, when) and event_pattern.match( m.get_event_type()), self.__messages.values())) l.sort(key=functools.cmp_to_key(self.sorter)) return l
def sort(cmpfn): if not this.Class in {'Array', 'Arguments'}: return this.to_object() # do nothing arr = [] for i in xrange(len(this)): arr.append(this.get(six.text_type(i))) if not arr: return this if not cmpfn.is_callable(): cmpfn = None cmp = lambda a,b: sort_compare(a, b, cmpfn) if six.PY3: key = functools.cmp_to_key(cmp) arr.sort(key=key) else: arr.sort(cmp=cmp) for i in xrange(len(arr)): this.put(six.text_type(i), arr[i]) return this
def freeze(obj, unordered_list=False): @functools.cmp_to_key def cmp_with_types(lhs, rhs): try: return (lhs > rhs) - (lhs < rhs) except TypeError: lhs = type(lhs).__name__ rhs = type(rhs).__name__ return (lhs > rhs) - (lhs < rhs) if isinstance(obj, dict): return tuple(sorted(((freeze(k, unordered_list), freeze(v, unordered_list)) for k, v in obj.items()), key=cmp_with_types)) elif isinstance(obj, list): if unordered_list: return tuple(sorted((freeze(i, unordered_list) for i in obj), key=cmp_with_types)) else: return tuple(freeze(i, unordered_list) for i in obj) else: return obj
def filter_candidates_from_snippet(self, candidate_keyterms): from functools import cmp_to_key ordered_keyterms = sorted(candidate_keyterms.itervalues(), key = lambda item: item['cvalue'], reverse = True) selected_keyterms = [item for item in ordered_keyterms if item['cvalue'] > 0] def pos_cmp(keyterm1, keyterm2): if not "NAM" in keyterm1['pos'] and "NAM" in keyterm2['pos']: return -1 elif "NAM" in keyterm1['pos'] and "NAM" not in keyterm2['pos']: return 1 else: return 0 filtered_keyterms = sorted(selected_keyterms, key=cmp_to_key(pos_cmp), reverse=True) keyterms = [{'term' : " ".join(t['words']), 'cvalue': t['cvalue'], 'lemma': t['lemma_string'], 'pos_tag': t['pos']} for t in filtered_keyterms] return keyterms
def best(inputted_word, suggestions, word_model=None): """ ? ????????? ????? ??????????? ???????? ??????, ??????????? ???? ?? ?????????? ????????, ???? ?? ??????????? ???? ? ????????? ?????? """ suggestions = list(suggestions) def comparehamm(one, two): score1 = hamming_distance(inputted_word, one) score2 = hamming_distance(inputted_word, two) return cmp2(score1, score2) # lower is better def comparefreq(one, two): score1 = frequency(one, word_model) score2 = frequency(two, word_model) return cmp2(score2, score1) # higher is better freq_sorted = sorted(suggestions, key=functools.cmp_to_key(comparefreq))[:10] # take the top 10 hamming_sorted = sorted(suggestions, key=functools.cmp_to_key(comparehamm))[:10] # take the top 10 return freq_sorted[0]
def __keylist_sort(self): """Update __keysort, which is used to determine facet matching order. Inherited facets always take priority over local facets so make sure all inherited facets come before local facets in __keylist. All facets from a given source are sorted by length, and facets of equal length are sorted lexically.""" def facet_sort(x, y): i = len(y) - len(x) if i != 0: return i return misc.cmp(x, y) self.__keylist = [] self.__keylist += sorted([ i for i in self if i in self.__inherited ], key=cmp_to_key(facet_sort)) self.__keylist += sorted([ i for i in self if i not in self.__inherited ], key=cmp_to_key(facet_sort))
def transformation_sort(cls,transition_list): # Sort on basis of two keys split length and then token lengths in the respective priority. return sorted(transition_list, key=functools.cmp_to_key(cls._myCmp),reverse=True)
def getTestCaseNames(self, testCaseClass): """Return a sorted sequence of method names found within testCaseClass """ def isTestMethod(attrname, testCaseClass=testCaseClass, prefix=self.testMethodPrefix): return attrname.startswith(prefix) and \ hasattr(getattr(testCaseClass, attrname), '__call__') testFnNames = filter(isTestMethod, dir(testCaseClass)) if self.sortTestMethodsUsing: testFnNames.sort(key=_CmpToKey(self.sortTestMethodsUsing)) return testFnNames
def sort_stats(self, *field): if not field: self.fcn_list = 0 return self if len(field) == 1 and isinstance(field[0], (int, long)): # Be compatible with old profiler field = [ {-1: "stdname", 0: "calls", 1: "time", 2: "cumulative"}[field[0]] ] sort_arg_defs = self.get_sort_arg_defs() sort_tuple = () self.sort_type = "" connector = "" for word in field: sort_tuple = sort_tuple + sort_arg_defs[word][0] self.sort_type += connector + sort_arg_defs[word][1] connector = ", " stats_list = [] for func, (cc, nc, tt, ct, callers) in self.stats.iteritems(): stats_list.append((cc, nc, tt, ct) + func + (func_std_string(func), func)) stats_list.sort(key=cmp_to_key(TupleComp(sort_tuple).compare)) self.fcn_list = fcn_list = [] for tuple in stats_list: fcn_list.append(tuple[-1]) return self
def cmp_to_key(mycmp): """Convert a cmp= function into a key= function""" class K(object): __slots__ = ['obj'] def __init__(self, obj): self.obj = obj def __lt__(self, other): return mycmp(self.obj, other.obj) < 0 def __gt__(self, other): return mycmp(self.obj, other.obj) > 0 def __eq__(self, other): return mycmp(self.obj, other.obj) == 0 def __le__(self, other): return mycmp(self.obj, other.obj) <= 0 def __ge__(self, other): return mycmp(self.obj, other.obj) >= 0 def __ne__(self, other): return mycmp(self.obj, other.obj) != 0 def __hash__(self): raise TypeError('hash not implemented') return K # This function is intended to decorate other functions that will modify # either a string directly, or a function's docstring.
def advisory_extend_html(advisory, issues, package): # sort issues by length to avoid clashes issues = sorted(issues, key=cmp_to_key(lambda a, b: len(a.id) - len(b.id)), reverse=True) for issue in issues: advisory = advisory.replace(' {}'.format(issue.id), ' <a href="/{0}">{0}</a>'.format(issue.id)) advisory = sub('({}) '.format(package.pkgname), '<a href="/package/{0}">\g<1></a> '.format(package.pkgname), advisory, flags=IGNORECASE) advisory = sub(' ({})'.format(package.pkgname), ' <a href="/package/{0}">\g<1></a>'.format(package.pkgname), advisory, flags=IGNORECASE) advisory = sub(';({})'.format(package.pkgname), ';<a href="/package/{0}">\g<1></a>'.format(package.pkgname), advisory, flags=IGNORECASE) return advisory
def get_queue(self): """ Returns the queued nodes sorted by scheduling wait. :rtype: list[enarksh.controller.node.Node.Node] """ return sorted(self.__queue, key=functools.cmp_to_key(Schedule.queue_compare)) # ----------------------------------------------------------------------------------------------------------------------
def __queue_handler(controller): """ Starts a node that is queued and for which there are enough resources available. :param enarksh.controller.Controller.Controller controller: The controller. """ # Return immediately if there are no loaded schedules. if not controller.schedules: return start = False schedules = sorted(controller.schedules.values(), key=functools.cmp_to_key(EventQueueEmptyEventHandler.queue_compare)) for schedule in schedules: queue = schedule.get_queue() for node in queue: # Inquire if there are enough resources available for the node. start = node.inquire_resources() if start: span_job = node.start() # If required send a message to the spanner. if span_job: message = node.get_start_message(schedule.sch_id) controller.message_controller.send_message('spawner', message) else: node.stop(0) # If a node has been started leave inner loop. break # If a node has been started leave the outer loop. if start: break # ------------------------------------------------------------------------------------------------------------------
def sortComplex(filterComplex, filterValues): pairedList = zip(filterComplex, filterValues) sortedComplex = sorted(pairedList, key=functools.cmp_to_key(compare)) sortedComplex = [list(t) for t in zip(*sortedComplex)] return sortedComplex
def sorted_with_cmp(sequence, cmp_func, **kwargs): # sort with a cmp func which works under both Python 2 and 3 if sys.version_info[0] == 3: from functools import cmp_to_key key = cmp_to_key(cmp_func) def do_sort(x): return sorted(x, key=key, **kwargs) else: def do_sort(x): return sorted(x, cmp=cmp_func, **kwargs) return do_sort(sequence)
def compare_obj(a, b): """Old-style comparison function. """ print('comparing {} and {}'.format(a, b)) if a.val < b.val: return -1 elif a.val > b.val: return 1 return 0 # Make a key function using cmp_to_key()
def solve_closest_pair_n_logn2(points): def closest_pair(L, R, points): if L == R: return 0x7fffffff, points[L], points[R] # return int max if R - L == 1: return euclidean_dis_pow(points[L], points[R]), points[L], points[R] mid = (L + R) >> 1 d, p1, p2 = closest_pair(L, mid, points) d2, p3, p4 = closest_pair(mid + 1, R, points) if d > d2: d, p1, p2 = d2, p3, p4 min_x = points[mid][0] - d max_x = points[mid][0] + d suspect = [points[i] for i in range(L, R + 1) if min_x <= points[i][0] <= max_x] suspect.sort(key=lambda x: x[1]) n = len(suspect) for i in range(n): for j in range(i + 1, n): if suspect[j][1] - suspect[i][1] > d: break t = euclidean_dis_pow(suspect[i], suspect[j]) if t < d: d = t p1, p2 = suspect[i], suspect[j] return d, p1, p2 points.sort(key=cmp_to_key(lambda x, y: x[0] - y[0] if x[0] != y[0] else x[1] - y[1])) return closest_pair(0, len(points) - 1, points)
def min_complete_time(p, f): n = len(p) t = list(zip(range(n), list(zip(p, f)))) # it will be like this: [_id,(pi,fi)] t.sort(key=cmp_to_key(lambda x, y: y[1][1] - x[1][1])) order = [] min_time = timecost = 0 for i in range(n): order.append(t[i][0]) timecost += t[i][1][0] min_time = max(min_time, timecost + t[i][1][1]) return min_time, order
def test1(self): """ """ from functools import cmp_to_key public_cards = [[roomai.fivecardstud.FiveCardStudPokerCard("2_Spade"),\ roomai.fivecardstud.FiveCardStudPokerCard("3_Spade"), \ roomai.fivecardstud.FiveCardStudPokerCard("A_Spade")]] public_cards.sort(key = cmp_to_key(roomai.fivecardstud.FiveCardStudPokerCard.compare)) for i in range(len(public_cards[0])): print (public_cards[0][i].key)
def testAGame(self): env = roomai.bridge.BridgeEnv() allcards = list(roomai.bridge.AllBridgePokerCards.values()) allcards.sort(key = cmp_to_key(roomai.common.PokerCard.compare)) infos, public_state, person_states, private_state = env.init({"allcards":allcards, "start_turn":0}) for i in range(4): print (i,person_states[i].hand_cards_dict, len(person_states[i].hand_cards_dict)) self.assertEqual(len(person_states[i].hand_cards_dict),13) self.assertEqual(public_state.turn, 0) #### bidding stage action = roomai.bridge.BridgeAction.lookup("bidding_bid_A_Heart") infos, public_state, person_states, private_state = env.forward(action) action = roomai.bridge.BridgeAction.lookup("bidding_pass") infos, public_state, person_states, private_state = env.forward(action) infos, public_state, person_states, private_state = env.forward(action) infos, public_state, person_states, private_state = env.forward(action) self.assertEqual(public_state.stage, "playing") self.assertEqual(public_state.turn,0) #### playing_stage count = 0 while env.public_state.is_terminal == False: action = list(env.person_states[env.public_state.turn].available_actions.values())[0] count += 1 env.forward(action) self.assertEqual(count,13 * 4) self.assertTrue(env.public_state.scores[0] == 0) self.assertTrue(env.public_state.scores[1] > 0) print (env.public_state.scores) self.assertEqual(env.public_state.scores[1],350)