我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用heapq.heappushpop()。
def high_degree_nodes(k, G): if nx.is_directed(G): my_degree_function = G.out_degree else: my_degree_function = G.degree # the top k nodes to be returned; initialization with first k elements H = [(my_degree_function(i), i) for i in G.nodes()[0:k]] hq.heapify(H) # iterate through the remaining nodes; add to heap if larger than heap root for i in G.nodes()[k:]: if my_degree_function(i) > H[0][0]: hq.heappushpop(H, (my_degree_function(i), i)) return H # Generator variant of high_degree_nodes(k, G) # More time-efficient for range calls if k log k > log V # Generates _all_ sets of i nodes of highest degree, with 1 <= i <= k # -> Time complexity: O(V log (V)) # -> Memory complexity: Theta(V)
def search_docs(inputs, max_ex=5, opts=None): """Given a set of document ids (returned by ranking for a question), search for top N best matching (by heuristic) paragraphs that contain the answer. """ if not opts: raise RuntimeError('Options dict must be supplied.') doc_ids, q_tokens, answer = inputs examples = [] for i, doc_id in enumerate(doc_ids): for j, paragraph in enumerate(re.split(r'\n+', fetch_text(doc_id))): found = find_answer(paragraph, q_tokens, answer, opts) if found: # Reverse ranking, giving priority to early docs + paragraphs score = (found[0], -i, -j, random.random()) if len(examples) < max_ex: heapq.heappush(examples, (score, found[1])) else: heapq.heappushpop(examples, (score, found[1])) return [e[1] for e in examples]
def _addResult(self, test, *args): try: name = test.id() except AttributeError: name = 'Unknown.unknown' test_class, test_name = name.rsplit('.', 1) elapsed = (self._now() - self.start_time).total_seconds() item = (elapsed, test_class, test_name) if len(self.slow_tests) >= self.num_slow_tests: heapq.heappushpop(self.slow_tests, item) else: heapq.heappush(self.slow_tests, item) self.results.setdefault(test_class, []) self.results[test_class].append((test_name, elapsed) + args) self.last_time[test_class] = self._now() self.writeTests()
def call_plugins(self, queue_name, time, *args): args = (time,) + args queue = self.plugin_queues[queue_name] if len(queue) == 0: return while queue[0][0] <= time: plugin = queue[0][2] getattr(plugin, queue_name)(*args) for trigger in plugin.trigger_interval: if trigger[1] == queue_name: interval = trigger[0] new_item = (time + interval, queue[0][1], plugin) heapq.heappushpop(queue, new_item)
def add(self, item): if item is None: return if len(self._heap) < self._n: heapq.heappush(self._heap, item) else: heapq.heappushpop(self._heap, item)
def push(self, x): """Pushes a new element.""" assert self._data is not None if len(self._data) < self._n: heapq.heappush(self._data, x) else: heapq.heappushpop(self._data, x)
def nsmallest(n, a): h = [] for e in a[:n]: heapq.heappush(h, -e) # we use a max heap but push -1 * e for e in a[n:]: heapq.heappushpop(h, -e) return -heapq.heappop(h)
def shuf_file(f, shuf_win): heap = [] for line in f: key = hash(line) if len(heap) < shuf_win: heapq.heappush(heap, (key, line)) else: _, out = heapq.heappushpop(heap, (key, line)) yield out while len(heap) > 0: _, out = heapq.heappop(heap) yield out
def findKthLargest(self, nums, k): """ :type nums: List[int] :type k: int :rtype: int """ # heap = [] # for num in nums: # if len(heap) < k: # heapq.heappush(heap, num) # else: # heapq.heappushpop(heap, num) # return heap[0] def select_sort(s, e): l, r = s, e - 1 while l < r: while l < r and nums[l] < nums[e]: l += 1 while l < r and nums[r] >= nums[e]: r -= 1 nums[l], nums[r] = nums[r], nums[l] nums[l], nums[e] = nums[e], nums[l] return l s, e = 0, len(nums)-1 i, k = select_sort(s, e), len(nums)-k print i print nums while i != k: # print '{},{}'.format(i, k) if i < k: s = i + 1 else: e = i - 1 i = select_sort(s, e) return nums[i]
def add_to_ranked_target_data(RankedDataHeap, maxSize, Data, weights, keep='largest'): ''' ''' docIDs = np.arange(Data.nDoc) # First, decide which docs are promising, # since we don't want to blow-up memory costs by using *all* docs if len(RankedDataHeap) > 0: cutoffThr = RankedDataHeap[0][0] if keep == 'largest': docIDs = np.argsort(-1 * weights)[:maxSize] docIDs = docIDs[weights[docIDs] > cutoffThr] else: docIDs = np.argsort(weights)[:maxSize] docIDs = docIDs[weights[docIDs] < cutoffThr] if len(docIDs) < 1: return # For promising docs, convert to list-of-tuples format, # and add to the heap if keep == 'largest': tList = Data.to_list_of_tuples(docIDs, w=weights) else: tList = Data.to_list_of_tuples(docIDs, w=-1 * weights) for docID, unitTuple in enumerate(tList): try: if len(RankedDataHeap) >= maxSize: heapq.heappushpop(RankedDataHeap, unitTuple) else: heapq.heappush(RankedDataHeap, unitTuple) except ValueError as error: # skip stupid errors related to duplicate weights pass # sample_target_data ###########################################################
def explain(self, userid, user_items, itemid, user_weights=None, N=10): """ Provides explanations for why the item is liked by the user. Parameters --------- userid : int The userid to explain recommendations for user_items : csr_matrix Sparse matrix containing the liked items for the user itemid : int The itemid to explain recommendations for user_weights : ndarray, optional Precomputed Cholesky decomposition of the weighted user liked items. Useful for speeding up repeated calls to this function, this value is returned N : int, optional The number of liked items to show the contribution for Returns ------- total_score : float The total predicted score for this user/item pair top_contributions : list A list of the top N (itemid, score) contributions for this user/item pair user_weights : ndarray A factorized representation of the user. Passing this in to future 'explain' calls will lead to noticeable speedups """ # user_weights = Cholesky decomposition of Wu^-1 # from section 5 of the paper CF for Implicit Feedback Datasets user_items = user_items.tocsr() if user_weights is None: A, _ = user_linear_equation(self.item_factors, self.YtY, user_items, userid, self.regularization, self.factors) user_weights = scipy.linalg.cho_factor(A) seed_item = self.item_factors[itemid] # weighted_item = y_i^t W_u weighted_item = scipy.linalg.cho_solve(user_weights, seed_item) total_score = 0.0 h = [] for i, (itemid, confidence) in enumerate(nonzeros(user_items, userid)): factor = self.item_factors[itemid] # s_u^ij = (y_i^t W^u) y_j score = weighted_item.dot(factor) * confidence total_score += score contribution = (score, itemid) if i < N: heapq.heappush(h, contribution) else: heapq.heappushpop(h, contribution) items = (heapq.heappop(h) for i in range(len(h))) top_contributions = list((i, s) for s, i in items)[::-1] return total_score, top_contributions, user_weights
def stats_reducer(self, key, values): outputType = CST(key[0]) item = key[1] crawl = MonthlyCrawl.to_name(key[2]) if outputType in (CST.size, CST.new_items, CST.size_estimate, CST.size_robotstxt): verbose_key = (outputType.name, CST(item).name, crawl) if outputType in (CST.size, CST.size_robotstxt): val = sum(values) elif outputType == CST.new_items: val = MultiCount.sum_values(values) elif outputType == CST.size_estimate: # already "reduced" in count job for val in values: break yield verbose_key, val elif outputType == CST.histogram: yield((outputType.name, CST(item).name, crawl, CST(key[3]).name, key[4]), sum(values)) elif outputType in (CST.mimetype, CST.mimetype_detected, CST.scheme, CST.surt_domain, CST.tld, CST.domain, CST.host, CST.http_status, CST.robotstxt_status): item = key[1] for counts in values: page_count = MultiCount.get_count(0, counts) url_count = MultiCount.get_count(1, counts) if outputType in (CST.domain, CST.surt_domain, CST.tld): host_count = MultiCount.get_count(2, counts) if (self.options.min_domain_frequency <= 1 or outputType not in (CST.host, CST.domain, CST.surt_domain)): self.counters[(CST.size.name, outputType.name, crawl)] += 1 self.counters[(CST.histogram.name, outputType.name, crawl, CST.page.name, page_count)] += 1 self.counters[(CST.histogram.name, outputType.name, crawl, CST.url.name, url_count)] += 1 if outputType in (CST.domain, CST.surt_domain, CST.tld): self.counters[(CST.histogram.name, outputType.name, crawl, CST.host.name, host_count)] += 1 if outputType == CST.tld: domain_count = MultiCount.get_count(3, counts) self.counters[(CST.histogram.name, outputType.name, crawl, CST.domain.name, domain_count)] += 1 if outputType in (CST.domain, CST.host, CST.surt_domain): outKey = (outputType.name, crawl) outVal = (page_count, url_count, item) if outputType in (CST.domain, CST.surt_domain): outVal = (page_count, url_count, host_count, item) # take most common if len(self.mostfrequent[outKey]) < self.options.max_hosts: heapq.heappush(self.mostfrequent[outKey], outVal) else: heapq.heappushpop(self.mostfrequent[outKey], outVal) else: yield((outputType.name, item, crawl), counts) else: logging.error('Unhandled type {}\n'.format(outputType)) raise