我们从Python开源项目中,提取了以下43个代码示例,用于说明如何使用heapq.merge()。
def sort(items, maxsize=100000, tempdir=None, maxfiles=128): """Sorts the given items using an external merge sort. :param tempdir: the path of a directory to use for temporary file storage. The default is to use the system's temp directory. :param maxsize: the maximum number of items to keep in memory at once. :param maxfiles: maximum number of files to open at once. """ p = SortingPool(maxsize=maxsize, tempdir=tempdir) for item in items: p.add(item) return p.items(maxfiles=maxfiles)
def hamming_numbers(): # Generate "5-smooth" numbers, also called "Hamming numbers" # or "Regular numbers". See: http://en.wikipedia.org/wiki/Regular_number # Finds solutions to 2**i * 3**j * 5**k for some integers i, j, and k. def deferred_output(): 'Works like a forward reference to the "output" global variable' for i in output: yield i result, p2, p3, p5 = tee(deferred_output(), 4) # split the output streams m2 = (2*x for x in p2) # multiples of 2 m3 = (3*x for x in p3) # multiples of 3 m5 = (5*x for x in p5) # multiples of 5 merged = merge(m2, m3, m5) combined = chain([1], merged) # prepend starting point output = (k for k, v in groupby(combined)) # eliminate duplicates return result
def items(self, maxfiles=128): """Returns a sorted list or iterator of the items in the pool. :param maxfiles: maximum number of files to open at once. """ if maxfiles < 2: raise ValueError("maxfiles=%s must be >= 2" % maxfiles) if not self.runs: # We never wrote a run to disk, so just sort the queue in memory # and return that return sorted(self.current) # Write a new run with the leftover items in the queue self.save() # If we have more runs than allowed open files, merge some of the runs if maxfiles < len(self.runs): self.reduce_to(maxfiles, maxfiles) # Take all the runs off the run list and merge them runs = self.runs self.runs = [] # Minor detail, makes this object reusable return self._merge_runs(runs)
def _merge_sorted_lists(keyfn): def merge(l1, l2): # TODO might be nice to have a specialized version of this which # re-uses uninterrupted subtrees from one side; it would boost # lists that are mostly sorted already. min1, max1 = keyfn(l1[0]), keyfn(l1[-1]) min2, max2 = keyfn(l2[0]), keyfn(l2[-1]) if max1 <= min2: return l1 + l2 if max2 < min1: # not "<=" here because it would make the sort unstable return l2 + l1 return viewablelist(list(_mergesorted(iter(l1), iter(l2), keyfn))) return merge # # Other #
def nthSuperUglyNumber(self, n, primes): """ :type n: int :type primes: List[int] :rtype: int """ from heapq import merge uglies = [1] def gen(prime): # Here "gen" is a iterator for ugly in uglies: yield ugly * prime merged = heapq.merge(*map(gen, primes)) # Here "merge" is a generator while len(uglies) < n: ugly = next(merged) if ugly != uglies[-1]: uglies.append(ugly) return uglies[-1]
def nthSuperUglyNumber1(self, n, primes): """ :type n: int :type primes: List[int] :rtype: int """ uglies = [1] def gen(prime): for ugly in uglies: yield ugly * prime merged = heapq.merge(*map(gen, primes)) while len(uglies) < n: ugly = next(merged) if ugly != uglies[-1]: uglies.append(ugly) return uglies[-1]
def __init__(self, width, height, rot=True, *args, **kwargs): """ _skyline is the list used to store all the skyline segments, each one is a list with the format [x, y, width] where x is the x coordinate of the left most point of the segment, y the y coordinate of the segment, and width the length of the segment. The initial segment is allways [0, 0, surface_width] Arguments: width (int, float): height (int, float): rot (bool): Enable or disable rectangle rotation """ self._waste_management = False self._waste = WasteManager(rot=rot) super(Skyline, self).__init__(width, height, rot, merge=False, *args, **kwargs)
def _placement_points_generator(self, skyline, width): """Returns a generator for the x coordinates of all the placement points on the skyline for a given rectangle. WARNING: In some cases could be duplicated points, but it is faster to compute them twice than to remove them. Arguments: skyline (list): Skyline HSegment list width (int, float): Rectangle width Returns: generator """ skyline_r = skyline[-1].right skyline_l = skyline[0].left # Placements using skyline segment left point ppointsl = (s.left for s in skyline if s.left+width <= skyline_r) # Placements using skyline segment right point ppointsr = (s.right-width for s in skyline if s.right-width >= skyline_l) # Merge positions return heapq.merge(ppointsl, ppointsr)
def lazysort(l: list) -> typing.Iterator: # Stage 1 stack = [] current_list = iter(l) sentinel = object() first = next(current_list, sentinel) while first is not sentinel: sortedish, surplus = dropsort(itertools.chain((first,), current_list)) stack.append(sortedish) current_list = surplus first = next(current_list, sentinel) # Stage 2 if len(stack) < 2: # the case where the list `l` is already sorted return iter(l) cur = heapq.merge(stack.pop(), stack.pop()) while stack: cur = heapq.merge(cur, stack.pop()) return cur
def lazysort(l: list) -> typing.Iterator: # Stage 1 stack = [] current_list = iter(l) sentinel = object() first = next(current_list, sentinel) while first is not sentinel: sortedish, surplus = dropsort(chain((first,), current_list)) stack.append(sortedish) current_list = surplus first = next(current_list, sentinel) # Stage 2 if len(stack) < 2: # the case where the list `l` is already sorted return iter(l) cur = merge(stack.pop(), stack.pop()) while stack: cur = merge(cur, stack.pop()) return cur
def _get_merged_catchup_txns(existing_txns, new_txns): """ Merge any newly received txns during catchup with already received txns :param existing_txns: :param new_txns: :return: """ idx_to_remove = [] for i, (seq_no, _) in enumerate(existing_txns): if seq_no < new_txns[0][0]: continue if seq_no > new_txns[-1][0]: break idx_to_remove.append(seq_no - new_txns[0][0]) for idx in reversed(idx_to_remove): new_txns.pop(idx) return list(heapq.merge(existing_txns, new_txns, key=operator.itemgetter(0)))
def posts_for_user(user: User, limit: Optional[int] = None) -> List[Post]: relevant = merge(*[user_posts[u] for u in following[user]], reverse=True) return list(islice(relevant, limit))
def date_sorted_sources(*sources): """ Takes an iterable of sources, generating namestrings and piping their output into date_sort. """ sorted_stream = heapq.merge(*(_decorate_source(s) for s in sources)) # Strip out key decoration for _, message in sorted_stream: yield message
def apply(func, *mappings): """Return a new IntervalMapping applying func to all values of mappings. For example, apply(lambda x, y: x + y, m1, m2) returns a new matching with the sum of values for m1 and m2. IntervalMapping.nothing is passed to func when the value is undefined. >>> m1 = IntervalMapping() >>> m2 = IntervalMapping() >>> m1[:] = m2[:] = 0 # avoid problems with undefined values >>> m1[0:2] = 1 >>> m2[1:3] = 2 >>> m3 = apply(lambda a, b: a + b, m1, m2) >>> m3[-1], m3[0], m3[1], m3[2], m3[3] (0, 1, 3, 2, 0) """ values = [m.leftmost() for m in mappings] def changes(): def start_i_value(i_m): i, m = i_m res = ((k.start, i, v) for k, v in m.iteritems(True)) next(res) return res change_points = merge(*map(start_i_value, enumerate(mappings))) lastbound = None for bound, i, v in change_points: values[i] = v if bound != lastbound: yield bound, func(*values) lastbound = bound yield bound, func(*values) return IntervalMapping.from_changes(func(*values), changes())
def imerge(iterables): return merge(*iterables)
def merge(key=None, *iterables): # based on code posted by Scott David Daniels in c.l.p. # http://groups.google.com/group/comp.lang.python/msg/484f01f1ea3c832d if key is None: keyed_iterables = iterables else: keyed_iterables = [(Keyed(key(obj), obj) for obj in iterable) for iterable in iterables] for element in heapq.merge(*keyed_iterables): yield element.obj
def msort(input_, output, key=None, buffer_size=32000, tempdirs=None): if tempdirs is None: tempdirs = [] if not tempdirs: tempdirs.append(gettempdir()) chunks = [] try: with open(input_,'rb',64*1024) as input_file: input_iterator = iter(input_file) for tempdir in cycle(tempdirs): current_chunk = list(islice(input_iterator,buffer_size)) if not current_chunk: break current_chunk.sort(key=key) output_chunk = open(os.path.join(tempdir,'%06i'%len(chunks)),'w+b',64*1024) chunks.append(output_chunk) output_chunk.writelines(current_chunk) output_chunk.flush() output_chunk.seek(0) with open(output,'wb',64*1024) as output_file: output_file.writelines(merge(key, *chunks)) finally: for chunk in chunks: try: chunk.close() os.remove(chunk.name) except Exception: pass
def sorted_idx_iter(self, types: List[int]) -> Iterable[int]: """ Returns an iterator of file positions sorted by file position (across different message types) :param types: The message types to return, None returns all types :return: Generator object for sorted file positions """ if types: idx_iters = [self.idx[key] for key in types if key in self.idx] else: idx_iters = [val for key, val in self.idx.items()] # Use the heapq.merge function to return sorted iterator of file indices return heapq.merge(*idx_iters)
def sorted_items(iterables): sorted_iterable = heapq.merge(*(_decorate_items(s) for s in iterables)) for _, item in sorted_iterable: yield item
def batch_sort(input_iterator, output_path, buffer_size=32000, output_class=None): """batch sort helper with temporary files, supports sorting large stuff""" if not output_class: output_class = input_iterator.__class__ chunks = [] try: while True: current_chunk = list(islice(input_iterator, buffer_size)) if not current_chunk: break current_chunk.sort() fd, filepath = tempfile.mkstemp() os.close(fd) output_chunk = output_class(filepath) chunks.append(output_chunk) for elem in current_chunk: output_chunk.write(elem.obj) output_chunk.close() output_file = output_class(output_path) for elem in heapq.merge(*chunks): output_file.write(elem.obj) output_file.close() finally: for chunk in chunks: try: chunk.close() os.remove(chunk.name) except Exception: pass
def __init__(self, name, priority, demand, affinity, affinity_limits=None, data_retention_timeout=0, lease=0, identity_group=None, identity=None, schedule_once=False): self.global_order = _global_order() self.allocation = None self.server = None self.name = name self.affinity = Affinity(affinity, affinity_limits) self.priority = priority self.demand = np.array(demand, dtype=float) self.data_retention_timeout = data_retention_timeout self.lease = lease self.identity_group = identity_group self.identity = identity self.identity_group_ref = None self.schedule_once = schedule_once self.evicted = False self.placement_expiry = None self.renew = False # FIXME: What dictates order? heapq.merge in utilization_queue needs this # comparison.
def utilization_queue(self, free_capacity, visitor=None): """Returns utilization queue including the sub-allocs. All app queues from self and sub-allocs are merged in standard order, and then utilization is recalculated based on total reserved capacity of this alloc and sub-allocs combined. The function maintains invariant that any app (self or inside sub-alloc with utilization < 1 will remain with utilzation < 1. """ total_reserved = self.total_reserved() queues = [alloc.utilization_queue(free_capacity, visitor) for alloc in self.sub_allocations.values()] queues.append(self.priv_utilization_queue()) acc_demand = zero_capacity() available = total_reserved + free_capacity + np.finfo(float).eps # FIXME: heapq.merge has an overhead of comparison for item in heapq.merge(*queues): rank, _util, pending, order, app = item acc_demand = acc_demand + app.demand util = utilization(acc_demand, total_reserved, available) if app.priority == 0: util = _MAX_UTILIZATION # - lower rank allocations take precedence. # - for same rank, utilization takes precedence # - False < True, so for apps with same utilization we prefer # those that already running (False == not pending) # - Global order entry = (rank, util, pending, order, app) if visitor: visitor(self, entry) yield entry
def hamming_numbers(): last = 1 yield last a,b,c = tee(hamming_numbers(), 3) for n in merge((2*i for i in a), (3*i for i in b), (5*i for i in c)): if n != last: yield n last = n
def batch_sort(input_iterator, output_path, buffer_size=32000, output_class=None): """batch sort helper with temporary files, supports sorting large stuff""" if not output_class: output_class = input_iterator.__class__ chunks = [] try: while True: current_chunk = list(islice(input_iterator, buffer_size)) if not current_chunk: break current_chunk.sort() fd, filepath = tempfile.mkstemp() os.close(fd) output_chunk = output_class(filepath) chunks.append(output_chunk) for elem in current_chunk: output_chunk.write(elem.obj) output_chunk.close() output_file = output_class(output_path) for elem in heapq.merge(*chunks): output_file.write(elem.obj) else: output_file.write() output_file.close() finally: for chunk in chunks: try: chunk.close() os.remove(chunk.name) except Exception: pass # magic
def nthSuperUglyNumber(self, n, primes): """ :type n: int :type primes: List[int] :rtype: int """ uglies = [1] merged = heapq.merge(*map(lambda p: (u*p for u in uglies), primes)) uniqed = (u for u, _ in itertools.groupby(merged)) map(uglies.append, itertools.islice(uniqed, n-1)) return uglies[-1] # beats 85.23%
def getNewsFeed(self, userId): """ Retrieve the 10 most recent tweet ids in the user's news feed. Each item in the news feed must be posted by users who the user followed or by the user herself. Tweets must be ordered from most recent to least recent. :type userId: int :rtype: List[int] """ tweets = heapq.merge(*(self.tweets[user] for user in self.followees[userId] | {userId})) return [t for _, t in itertools.islice(tweets, 10)]
def merge_sort(unsorted, start=None, end=None): if start is None: start = 0 end = len(unsorted) midpoint = (start + end) // 2 if end - start == 1: yield unsorted[start] elif end - start > 1: yield from heapq.merge( merge_sort(unsorted, start, midpoint), merge_sort(unsorted, midpoint, end))
def dump_app(app_args, path, begin, now, container_image_pattern = ""): # the app_args argument is supplied in the format # <app name>:<additional influxdb table 1>:<additional influxdb table 2>:... app_args = app_args.split(":") # get the tag (keys and values) and fields from the docker measurements and # any additional tables app = dump_column_names(app_args) # build queries queries = [] # always extract docker metrics (here referred to as 'system metrics') for system in SYSTEM_METRICS: pattern = CONTAINER_IMAGE_PATTERNS[app.name].format(container_image_pattern) q = """select * from "docker_container_{}" where container_name =~ /{}/ and container_image =~ /{}/ and time > '%s' and time < '%s' """.format(system, app.name, pattern) queries.append(scroll(q, begin, now)) if len(app_args) > 1: for app_arg in app_args[1:]: q = "select * from \"{}\" where time > '%s' and time < '%s'".format(app_arg) print(q) queries.append(scroll(q, begin, now, prefix = app.name)) path = os.path.join(path, app.filename) with gzip.open(path, "wb") as f: columns = app.fields + app.tags + ["time"] writer = csv.DictWriter(f, fieldnames=columns, dialect=csv.excel_tab, extrasaction='ignore') writer.writeheader() for _, row in heapq.merge(*queries): writer.writerow(row) return app # in general, all apps have docker metrics for them, se we retrieve the # metrics stored in 'docker_container_*' tables by default. we save these # metrics under an app name equal to the first string in a sequence of strings # separated by a ':'. app-specific metrics for such app names are gathered from # the tables specified in subsequent strings.
def run_executables(execalls, cache=None, ccmode=CC_PROCESSES): # type: (List[ExeCall], Optional[Cache], str) -> Iterator[ExeResult] """Run executables in parallel. Some of the results for the execalls may be found in the cache so we put these aside in cachedresults. Each result is yield as soon as available. """ def c2exeresult(value): # type: (bytes) -> ExeResult returncode, stdout, stderr = unpack_exeresult(value) return make_exeresult(returncode, stdout, stderr) def exeresult2c(exeresult): # type: (ExeResult) -> bytes return pack_exeresult(exeresult.returncode, exeresult.stdout, exeresult.stderr) # Package the execalls for eventuall multiprocessing args_lists = [((ec.exe, ec.cmdargs), {'stdindata': ec.stdindata}) for ec in execalls] # type: List[CallArgs] cachedresults = [] jobs = [] # type: List[CallArgs] keys = [] # type: List[str] jobindices = [] # type: List[int] if cache is not None: qkeys = [execall_hash(ec, cache) for ec in execalls] qresults = cache.mget(qkeys) for idx, (arg, key, cvalue) in enumerate(izip(args_lists, qkeys, qresults)): if cvalue is not None: cachedresults.append((idx, c2exeresult(cvalue))) else: keys.append(key) jobs.append(arg) jobindices.append(idx) else: jobs = args_lists jobindices = list(range(len(jobs))) jobiter = iter_parallel(call_popen, jobs, ccmode=ccmode) def jobresultiter(): # type: () -> Iterator[Tuple[int, ExeResult]] for idx, (jobidx, job, result) in enumerate(izip(jobindices, jobs, jobiter)): if cache is not None: cache.set(keys[idx], exeresult2c(result)) (executable, cmdargs), kwargs = job log_popen(executable, cmdargs, kwargs['stdindata'], result) yield jobidx, result for idx, result in heapq.merge(iter(cachedresults), jobresultiter()): yield result
def _sow(self, watch, pattern, since, handler, impl): """Publish state of the world.""" if since is None: since = 0 def _publish(item): when, path, content = item try: payload = impl.on_event(str(path), None, content) if payload is not None: payload['when'] = when handler.write_message(payload) except Exception as err: # pylint: disable=W0703 handler.send_error_msg(str(err)) db_connections = [] fs_records = self._get_fs_sow(watch, pattern, since) sow = getattr(impl, 'sow', None) sow_table = getattr(impl, 'sow_table', 'sow') try: records = [] if sow: dbs = sorted(glob.glob(os.path.join(self.root, sow, '*'))) for db in dbs: if os.path.basename(db).startswith('.'): continue conn, db_cursor = self._db_records( db, sow_table, watch, pattern, since ) records.append(db_cursor) # FIXME: Figure out pylint use before assign db_connections.append(conn) # pylint: disable=E0601 records.append(fs_records) # Merge db and fs records, removing duplicates. prev_path = None for item in heapq.merge(*records): _when, path, _content = item if path == prev_path: continue prev_path = path _publish(item) finally: for conn in db_connections: if conn: conn.close()