我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用heapq.heapify()。
def high_degree_nodes(k, G): if nx.is_directed(G): my_degree_function = G.out_degree else: my_degree_function = G.degree # the top k nodes to be returned; initialization with first k elements H = [(my_degree_function(i), i) for i in G.nodes()[0:k]] hq.heapify(H) # iterate through the remaining nodes; add to heap if larger than heap root for i in G.nodes()[k:]: if my_degree_function(i) > H[0][0]: hq.heappushpop(H, (my_degree_function(i), i)) return H # Generator variant of high_degree_nodes(k, G) # More time-efficient for range calls if k log k > log V # Generates _all_ sets of i nodes of highest degree, with 1 <= i <= k # -> Time complexity: O(V log (V)) # -> Memory complexity: Theta(V)
def __next__(self): try: self.dt = advance_iterator(self.gen) except StopIteration: if self.genlist[0] is self: heapq.heappop(self.genlist) else: self.genlist.remove(self) heapq.heapify(self.genlist)
def nthUglyNumber(self, n): """ :type n: int :rtype: int """ l=[1] primes=[2,3,5] heapq.heapify(l) set1=set(l) n-=1 while n>0: z=heapq.heappop(l) n-=1 for i in primes: if z*i not in set1: heapq.heappush(l, z*i) set1.add(z*i) return heapq.heappop(l)
def assign_jobs(self): # Trivial case if len(self.jobs) <= self.num_workers: self._assigned_workers = [i for i in range(len(self._jobs))] self._start_times = [0] * len(self._jobs) return # Create Heap from collections import namedtuple import heapq MyThread = namedtuple('MyThread', 'start_time, id') heap = [MyThread(0, i) for i in range(self._num_workers)] heapq.heapify(heap) for i, job in enumerate(self._jobs): # Read the root contents sched_thread_id = heap[0].id sched_thread_start = heap[0].start_time # Append to output self._assigned_workers[i] = sched_thread_id self._start_times[i] = sched_thread_start # Update heap with next start time heapq.heapreplace(heap, MyThread(sched_thread_start + job, sched_thread_id))
def convertBST(self, root): """ :type root: TreeNode :rtype: TreeNode """ if not root: return root max_heap = [] self.makelist_traverse(root, max_heap) heapq.heapify(max_heap) prev_val = -heapq.heappop(max_heap)[0] sum_val = prev_val for i in range(len(max_heap)): t = heapq.heappop(max_heap)[1] cur_val = t.val if t.val < prev_val: t.val += sum_val sum_val += cur_val return root
def rearrangeString(self, str, k): """ :type str: str :type k: int :rtype: str """ mapi = Counter(list(str)) h = [(-mapi[key], key) for key in mapi] heapq.heapify(h) q, res = deque(), [] while h: cnt, letter = heapq.heappop(h) if not cnt: break res.append(letter) q.append((cnt + 1, letter)) if len(q) >= k: heapq.heappush(h, q.popleft()) return ''.join(res) if len(res) == len(str) else ''
def rearrangeString(self, s, k): """ :type s: str :type k: int :rtype: str """ if k == 0: return s h = [(-freq, ch) for (ch, freq) in collections.Counter(s).items()] heapq.heapify(h) res = [] while len(res) < len(s): q = [] for _ in xrange(k): if len(res) == len(s): return ''.join(res) if not h: return '' freq, ch = heapq.heappop(h) res.append(ch) if freq < -1: q.append((freq+1, ch)) while q: heapq.heappush(h, q.pop()) return ''.join(res)
def mergeSort(seqs): """ perform merge sort on a list of sorted iterators """ queue = [] for s in seqs: s = assertIsSorted(s) it = iter(s) try: queue.append((it.next(), it.next)) except StopIteration: pass heapq.heapify(queue) while queue: item, it = queue[0] yield item try: heapq.heapreplace(queue, (it(), it)) except StopIteration: heapq.heappop(queue) # ---------------------------------------------------------------------------
def kthSmallest(self, matrix, k): """ Use heap Time complexity: O(klog(n)+nlog(n)) :type matrix: List[List[int]] :type k: int :rtype: int """ import heapq # (value, y, x) # value means item in the matrix # y means the y coordinate of the item # x means the x coordinate of the item h = [(matrix[0][x], 0, x) for x in range(len(matrix[0]))] heapq.heapify(h) for i in range(k): val, y, x = heapq.heappop(h) if y + 1 < len(matrix): heapq.heappush(h, (matrix[y+1][x], y+1, x)) return val
def rebuild_heap(self): ''' ?????cancel?timer?????,???? :return: ''' import heapq tmp_timer_lst = [] for timer in self.heap.queue: # ???????cancel ?timer ???????? if timer.canceled: continue tmp_timer_lst.append(timer) # ????,??????????cancel???? self.heap.queue = tmp_timer_lst heapq.heapify(self.heap.queue) self.cancel_timer_count = 0
def build_huffman_tree(freqs): nodes = [HierarchicalSoftmaxNode() for _ in freqs] heap = list(zip(freqs, nodes)) heapq.heapify(heap) def pop_initialize(parent, branch): freq, node = heapq.heappop(heap) node.parent = parent node.branch = branch return freq idx = len(nodes) - 1 while len(heap) > 1: idx += 1 node = HierarchicalSoftmaxNode() nodes.append(node) freq = pop_initialize(idx, True) + pop_initialize(idx, False) heapq.heappush(heap, (freq, node)) assert len(heap) == 1 return nodes
def filter_out_loc(self, loc): old_best = self.get_best_claim() copy = list(self.heap) for claim in self.heap: # this is a problem for roots? if loc is claim.get_parent().loc: copy.remove(claim) claim.set_heap(None) if loc.gameMap().getLocation(self.site().loc, self.dir) == loc: self.dir = None self.heap = copy heapq.heapify(self.heap) self.check_heap(old_best) # # logger.debug("Removed all children from %s that would come to %s" % (self.site().loc, loc ) )
def filter_out_loc(self, loc): old_best = self.get_best_claim() copy = list(self.heap) for claim in self.heap: # this is a problem for roots? if loc is claim.get_parent().loc: copy.remove(claim) claim.set_heap(None) if loc.gameMap.getLocation(self.site.loc, self.dir) == loc: self.dir = None self.heap = copy heapq.heapify(self.heap) self.check_heap(old_best) # # logger.debug("Removed all children from %s that would come to %s" % (self.site.loc, loc ) )
def filter_out_loc(self, loc): old_best = self.get_best_claim() copy = list(self.heap) for claim in self.heap: # this is a problem for roots? if loc is claim.get_parent().loc: copy.remove(claim) claim.set_heap(None) if loc.gameMap().getLocation(self.site().loc, self.dir) == loc: self.dir = None self.heap = copy heapq.heapify(self.heap) self.check_heap(old_best) # logger.debug("Removed all children from %s that would come to %s" % (self.site().loc, loc ) )
def cancel(self, event): """Remove an event from the queue. This must be presented the ID as returned by enter(). If the event is not in the queue, this raises ValueError. """ self._queue.remove(event) heapq.heapify(self._queue)
def _create_binary_tree(self): """ Create a binary Huffman tree using stored vocabulary word counts. Frequent words will have shorter binary codes. Called internally from `build_vocab()`. """ vocab_size = len(self.vocab) logger.info("constructing a huffman tree from %i words" % vocab_size) # build the huffman tree heap = list(self.vocab.values()) heapq.heapify(heap) for i in range(vocab_size - 1): min1, min2 = heapq.heappop(heap), heapq.heappop(heap) heapq.heappush(heap, Vocab(count=min1.count + min2.count, index=i + vocab_size, left=min1, right=min2)) # recurse over the tree, assigning a binary code to each vocabulary word if heap: max_depth, stack = 0, [(heap[0], [], [])] while stack: node, codes, points = stack.pop() if node.index < vocab_size: # leaf node => store its path from the root node.code, node.point = codes, points max_depth = max(len(codes), max_depth) else: # inner node => continue recursion points = np.array(list(points) + [node.index - vocab_size], dtype=int) stack.append((node.left, np.array(list(codes) + [0], dtype=int), points)) stack.append((node.right, np.array(list(codes) + [1], dtype=int), points)) logger.info("built huffman tree with maximum node depth %i" % max_depth)
def __delitem__(self, item): self._remove_from_dict(item) self.heap = [(v,k) for v,k in self.heap if k != item] heapq.heapify(self.heap)
def isorted(iterable): lst = list(iterable) heapq.heapify(lst) pop = heapq.heappop while lst: yield pop(lst)
def remove(self, task): """ Removes the tasks from Queue Currently it takes O(n) time to find , and O(log n) to remove, making it O(n) further improvements can be done :param task: task to removed from the Queue """ import heapq for task_pair in self.heap: if task_pair[2] == task: self.heap.remove(task_pair) heapq.heapify(self.heap)
def run(self, epochs=1): for q in self.plugin_queues.values(): heapq.heapify(q) for i in range(1, epochs + 1): self.train() self.call_plugins('epoch', i)
def huff_code(txt, ab, freq): l = len(ab) Q = [Node(i, freq[i]) for i in range(l)] heapq.heapify(Q) #for node in Q: # print(node) for i in range(l-1): n1 = heapq.heappop(Q) n2 = heapq.heappop(Q) n = Node(-1, n1.freq+n2.freq, n1, n2) heapq.heappush(Q,n) return heapq.heappop(Q)
def __init__(self, items=None): if items is None: self._items = [] else: heapq.heapify(items) self._items = items
def __init__(self, initial=None, key=lambda x:x): self.__key = key if initial: self._data = [(key(item), item) for item in initial] heapq.heapify(self._data) else: self._data = []
def heapify(self, items): return KeyHeapq(initial=items)
def create_huffman_tree(txt): q = [TreeNode(c, cnt) for c, cnt in collections.Counter(txt).items()] heapq.heapify(q) while len(q) > 1: a, b = heapq.heappop(q), heapq.heappop(q) heapq.heappush(q, TreeNode('', a.cnt + b.cnt, a, b)) return q.pop()
def _iter(self): rlist = [] self._rdate.sort() self._genitem(rlist, iter(self._rdate)) for gen in [iter(x) for x in self._rrule]: self._genitem(rlist, gen) exlist = [] self._exdate.sort() self._genitem(exlist, iter(self._exdate)) for gen in [iter(x) for x in self._exrule]: self._genitem(exlist, gen) lastdt = None total = 0 heapq.heapify(rlist) heapq.heapify(exlist) while rlist: ritem = rlist[0] if not lastdt or lastdt != ritem.dt: while exlist and exlist[0] < ritem: exitem = exlist[0] advance_iterator(exitem) if exlist and exlist[0] is exitem: heapq.heapreplace(exlist, exitem) if not exlist or ritem != exlist[0]: total += 1 yield ritem.dt lastdt = ritem.dt advance_iterator(ritem) if rlist and rlist[0] is ritem: heapq.heapreplace(rlist, ritem) self._len = total
def create_binary_tree(self): """ Create a binary Huffman tree using stored vocabulary word counts. Frequent words will have shorter binary codes. Called internally from `build_vocab()`. """ logger.info("constructing a huffman tree from %i words" % len(self.vocab)) # build the huffman tree heap = list(itervalues(self.vocab)) heapq.heapify(heap) for i in xrange(len(self.vocab) - 1): min1, min2 = heapq.heappop(heap), heapq.heappop(heap) heapq.heappush(heap, Vocab(count=min1.count + min2.count, index=i + len(self.vocab), left=min1, right=min2)) # recurse over the tree, assigning a binary code to each vocabulary word if heap: max_depth, stack = 0, [(heap[0], [], [])] while stack: node, codes, points = stack.pop() if node.index < len(self.vocab): # leaf node => store its path from the root node.code, node.point = codes, points max_depth = max(len(codes), max_depth) else: # inner node => continue recursion points = array(list(points) + [node.index - len(self.vocab)], dtype=uint32) stack.append((node.left, array(list(codes) + [0], dtype=uint8), points)) stack.append((node.right, array(list(codes) + [1], dtype=uint8), points)) logger.info("built huffman tree with maximum node depth %i" % max_depth)
def heap_sort(): l=[random.randint(0,100) for i in range(100)] print l h_l=heapq.heapify(l) print type(h_l) print l print type(l) ''' m=heapq.heappop(l) print m print l ''' while len(l)!=0: print heapq.heappop(l) print 'done' #basic_usage()
def solve(self): self.path = [] if self.node_array and self.start_node and self.end_node: closed_nodes = set() opened_nodes = [] heapq.heapify(opened_nodes) heapq.heappush(opened_nodes, (self.start_node.f, self.start_node)) # ?????????? while opened_nodes: # ?????????f????node f, node = heapq.heappop(opened_nodes) # ???node???????, ?????? closed_nodes.add(node) # ????????????? self.path (???????) if node is self.end_node: self.path = [] while node is not self.start_node: self.path.append((node.x, node.y)) node = node.p self.path.append((self.start_node.x, self.start_node.y)) self.path.reverse() return # ??????? # ?????????? adj_nodes = self.get_adjacent_nodes(node) for adj_node in adj_nodes: # ????????, ?????? if adj_node.r and adj_node not in closed_nodes: # ?????????????? if (adj_node.f, adj_node) in opened_nodes: # ??????????????????????? if node.g < adj_node.g: self.update_node(adj_node, node) else: # ?????????, ?????????? self.update_node(adj_node, node) heapq.heappush(opened_nodes, (adj_node.f, adj_node))
def cancel(self, event): """Remove an event from the queue. This must be presented the ID as returned by enter(). If the event is not in the queue, this raises ValueError. """ with self._lock: self._queue.remove(event) heapq.heapify(self._queue)
def dijkstra(vertices, teleport_edges, source): infinity = 10e50 dist = {v: infinity for v in vertices} dist[source] = 0 prev = {v: None for v in vertices} Q = [] for v in vertices: heapq.heappush(Q, (dist[v], v)) while Q: _, v = heapq.heappop(Q) for v2 in vertices: if v != v2: d = distance(v, v2, teleport_edges) if d is None: continue if dist[v] + d < dist[v2]: dist[v2] = dist[v] + d prev[v2] = v #decrease-key for i, node in enumerate(Q): if node[1] == v2: Q[i] = (dist[v] + d, v2) break heapq.heapify(Q) return dist, prev
def dijkstra(aGraph, start): # print '''Dijkstra's shortest path''' # Set the distance for the start node to zero start.set_distance(0) # Put tuple pair into the priority queue unvisited_queue = [(v.get_distance(), v) for v in aGraph] heapq.heapify(unvisited_queue) while len(unvisited_queue): # Pops a vertex with the smallest distance uv = heapq.heappop(unvisited_queue) current = uv[1] current.set_visited() for next in current.adjacent: # if visited, skip if next.visited: continue new_dist = current.get_distance() + current.get_weight(next) if new_dist < next.get_distance(): next.set_distance(new_dist) next.set_previous(current) # print 'updated : current = %s next = %s new_dist = %s' \ # %(current.get_id(), next.get_id(), str(next.get_distance())) # else: # print 'not updated : current = %s next = %s new_dist = %s' \ # %(current.get_id(), next.get_id(), str(next.get_distance())) # Rebuild heap # 1. Pop every item while len(unvisited_queue): heapq.heappop(unvisited_queue) # 2. Put all vertices not visited into the queue unvisited_queue = [(v.get_distance(), v) for v in aGraph if not v.visited] heapq.heapify(unvisited_queue) return
def shortest_path(digr, s): """ Finds the shortest path from s to every other vertex findable from s using Dijkstra's algorithm in O(mlogn) time. Uses heaps for super fast implementation """ nodes_explored = set([s]) nodes_unexplored = DFS(digr, s) # all accessible nodes from s nodes_unexplored.remove(s) dist = {s:0} node_heap = [] for n in nodes_unexplored: min = compute_min_dist(digr, n, nodes_explored, dist) heapq.heappush(node_heap, (min, n)) while len(node_heap) > 0: min_dist, nearest_node = heapq.heappop(node_heap) dist[nearest_node] = min_dist nodes_explored.add(nearest_node) nodes_unexplored.remove(nearest_node) # recompute keys for just popped node for v in digr.neighbors(nearest_node): if v in nodes_unexplored: for i in range(len(node_heap)): if node_heap[i][1] == v: node_heap[i] = (compute_min_dist(digr, v, nodes_explored, dist), v) heapq.heapify(node_heap) return dist
def minimum_spanning_tree(gr): """ Uses prim's algorithm to return the minimum cost spanning tree in a undirected connected graph. Works only with undirected and connected graphs """ s = gr.nodes()[0] nodes_explored = set([s]) nodes_unexplored = gr.nodes() nodes_unexplored.remove(s) min_cost, node_heap = 0, [] #computes the key for each vertex in unexplored for n in nodes_unexplored: min = compute_key(gr, n, nodes_explored) heapq.heappush(node_heap, (min, n)) while len(nodes_unexplored) > 0: # adds the cheapest to "explored" node_cost, min_node = heapq.heappop(node_heap) min_cost += node_cost nodes_explored.add(min_node) nodes_unexplored.remove(min_node) # recompute keys for neighbors of deleted node for v in gr.neighbors(min_node): if v in nodes_unexplored: for i in range(len(node_heap)): if node_heap[i][1] == v: node_heap[i] = (compute_key(gr, v, nodes_explored), v) heapq.heapify(node_heap) return min_cost
def create_min_heap_from(self, data): self.min_heap = data[:] heapq.heapify(self.min_heap)
def _resort(self): heapq.heapify(self.queue)
def _init(self, maxsize, items=None): if items: self.queue = list(items) heapq.heapify(self.queue) else: self.queue = []
def find_path(self, start_tiles, goal_tiles): """ Given a list of starting locations and a list of ending locations, find the shortest path between them. Uses a priority queue to do Uniform Cost Search (think Breadth First Search, but with movement cost included). """ open_q = [(0, tile) for tile in start_tiles] heapq.heapify(open_q) goals = {tile for tile in goal_tiles} source = defaultdict(lambda: (None, 100000000)) for tile in start_tiles: source[tile] = (tile, 0) while open_q: moves, working = heapq.heappop(open_q) for neighbor in working.get_neighbors(): if neighbor in goals: steps = [neighbor, working] previous = working while source[previous][0] != previous: previous = source[previous][0] steps.append(previous) return list(reversed(steps)) if not pathable(neighbor): continue previous_tile, previous_distance = source[neighbor] current_distance = moves + move_cost(working, neighbor) if current_distance < previous_distance: source[neighbor] = (working, current_distance) heapq.heappush(open_q, (current_distance, neighbor)) return []