Python heapq 模块,merge() 实例源码

我们从Python开源项目中,提取了以下43个代码示例,用于说明如何使用heapq.merge()

项目:Hawkeye    作者:tozhengxq    | 项目源码 | 文件源码
def sort(items, maxsize=100000, tempdir=None, maxfiles=128):
    """Sorts the given items using an external merge sort.

    :param tempdir: the path of a directory to use for temporary file
        storage. The default is to use the system's temp directory.
    :param maxsize: the maximum number of items to keep in memory at once.
    :param maxfiles: maximum number of files to open at once.
    """

    p = SortingPool(maxsize=maxsize, tempdir=tempdir)
    for item in items:
        p.add(item)
    return p.items(maxfiles=maxfiles)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def hamming_numbers():
    # Generate "5-smooth" numbers, also called "Hamming numbers"
    # or "Regular numbers".  See: http://en.wikipedia.org/wiki/Regular_number
    # Finds solutions to 2**i * 3**j * 5**k  for some integers i, j, and k.

    def deferred_output():
        'Works like a forward reference to the "output" global variable'
        for i in output:
            yield i

    result, p2, p3, p5 = tee(deferred_output(), 4)  # split the output streams
    m2 = (2*x for x in p2)                          # multiples of 2
    m3 = (3*x for x in p3)                          # multiples of 3
    m5 = (5*x for x in p5)                          # multiples of 5
    merged = merge(m2, m3, m5)
    combined = chain([1], merged)                   # prepend starting point
    output = (k for k, v in groupby(combined))      # eliminate duplicates

    return result
项目:nstock    作者:ybenitezf    | 项目源码 | 文件源码
def items(self, maxfiles=128):
        """Returns a sorted list or iterator of the items in the pool.

        :param maxfiles: maximum number of files to open at once.
        """

        if maxfiles < 2:
            raise ValueError("maxfiles=%s must be >= 2" % maxfiles)

        if not self.runs:
            # We never wrote a run to disk, so just sort the queue in memory
            # and return that
            return sorted(self.current)
        # Write a new run with the leftover items in the queue
        self.save()

        # If we have more runs than allowed open files, merge some of the runs
        if maxfiles < len(self.runs):
            self.reduce_to(maxfiles, maxfiles)

        # Take all the runs off the run list and merge them
        runs = self.runs
        self.runs = []  # Minor detail, makes this object reusable
        return self._merge_runs(runs)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def items(self, maxfiles=128):
        """Returns a sorted list or iterator of the items in the pool.

        :param maxfiles: maximum number of files to open at once.
        """

        if maxfiles < 2:
            raise ValueError("maxfiles=%s must be >= 2" % maxfiles)

        if not self.runs:
            # We never wrote a run to disk, so just sort the queue in memory
            # and return that
            return sorted(self.current)
        # Write a new run with the leftover items in the queue
        self.save()

        # If we have more runs than allowed open files, merge some of the runs
        if maxfiles < len(self.runs):
            self.reduce_to(maxfiles, maxfiles)

        # Take all the runs off the run list and merge them
        runs = self.runs
        self.runs = []  # Minor detail, makes this object reusable
        return self._merge_runs(runs)
项目:ScenicOverlook    作者:pschanely    | 项目源码 | 文件源码
def _merge_sorted_lists(keyfn):
    def merge(l1, l2):
        # TODO might be nice to have a specialized version of this which
        # re-uses uninterrupted subtrees from one side; it would boost
        # lists that are mostly sorted already.
        min1, max1 = keyfn(l1[0]), keyfn(l1[-1])
        min2, max2 = keyfn(l2[0]), keyfn(l2[-1])
        if max1 <= min2:
            return l1 + l2
        if max2 < min1: # not "<=" here because it would make the sort unstable
            return l2 + l1
        return viewablelist(list(_mergesorted(iter(l1), iter(l2), keyfn)))
    return merge



#
#  Other
#
项目:WhooshSearch    作者:rokartnaz    | 项目源码 | 文件源码
def items(self, maxfiles=128):
        """Returns a sorted list or iterator of the items in the pool.

        :param maxfiles: maximum number of files to open at once.
        """

        if maxfiles < 2:
            raise ValueError("maxfiles=%s must be >= 2" % maxfiles)

        if not self.runs:
            # We never wrote a run to disk, so just sort the queue in memory
            # and return that
            return sorted(self.current)
        # Write a new run with the leftover items in the queue
        self.save()

        # If we have more runs than allowed open files, merge some of the runs
        if maxfiles < len(self.runs):
            self.reduce_to(maxfiles, maxfiles)

        # Take all the runs off the run list and merge them
        runs = self.runs
        self.runs = []  # Minor detail, makes this object reusable
        return self._merge_runs(runs)
项目:LeetCode    作者:YJL33    | 项目源码 | 文件源码
def nthSuperUglyNumber(self, n, primes):
        """
        :type n: int
        :type primes: List[int]
        :rtype: int
        """
        from heapq import merge
        uglies = [1]
        def gen(prime):                             # Here "gen" is a iterator
            for ugly in uglies:
                yield ugly * prime
        merged = heapq.merge(*map(gen, primes))     # Here "merge" is a generator
        while len(uglies) < n:
            ugly = next(merged)
            if ugly != uglies[-1]:
                uglies.append(ugly)
        return uglies[-1]
项目:QualquerMerdaAPI    作者:tiagovizoto    | 项目源码 | 文件源码
def items(self, maxfiles=128):
        """Returns a sorted list or iterator of the items in the pool.

        :param maxfiles: maximum number of files to open at once.
        """

        if maxfiles < 2:
            raise ValueError("maxfiles=%s must be >= 2" % maxfiles)

        if not self.runs:
            # We never wrote a run to disk, so just sort the queue in memory
            # and return that
            return sorted(self.current)
        # Write a new run with the leftover items in the queue
        self.save()

        # If we have more runs than allowed open files, merge some of the runs
        if maxfiles < len(self.runs):
            self.reduce_to(maxfiles, maxfiles)

        # Take all the runs off the run list and merge them
        runs = self.runs
        self.runs = []  # Minor detail, makes this object reusable
        return self._merge_runs(runs)
项目:Machine_Learning_Playground    作者:yao23    | 项目源码 | 文件源码
def nthSuperUglyNumber1(self, n, primes):
        """
        :type n: int
        :type primes: List[int]
        :rtype: int
        """
        uglies = [1]

        def gen(prime):
            for ugly in uglies:
                yield ugly * prime

        merged = heapq.merge(*map(gen, primes))
        while len(uglies) < n:
            ugly = next(merged)
            if ugly != uglies[-1]:
                uglies.append(ugly)
        return uglies[-1]
项目:rectpack    作者:secnot    | 项目源码 | 文件源码
def __init__(self, width, height, rot=True, *args, **kwargs):
        """
        _skyline is the list used to store all the skyline segments, each 
        one is a list with the format [x, y, width] where x is the x
        coordinate of the left most point of the segment, y the y coordinate
        of the segment, and width the length of the segment. The initial 
        segment is allways [0, 0, surface_width]

        Arguments:
            width (int, float): 
            height (int, float):
            rot (bool): Enable or disable rectangle rotation
        """
        self._waste_management = False
        self._waste = WasteManager(rot=rot)
        super(Skyline, self).__init__(width, height, rot, merge=False, *args, **kwargs)
项目:rectpack    作者:secnot    | 项目源码 | 文件源码
def _placement_points_generator(self, skyline, width):
        """Returns a generator for the x coordinates of all the placement
        points on the skyline for a given rectangle.

        WARNING: In some cases could be duplicated points, but it is faster
        to compute them twice than to remove them.

        Arguments:
            skyline (list): Skyline HSegment list
            width (int, float): Rectangle width

        Returns:
            generator
        """ 
        skyline_r = skyline[-1].right
        skyline_l = skyline[0].left

        # Placements using skyline segment left point
        ppointsl = (s.left for s in skyline if s.left+width <= skyline_r)

        # Placements using skyline segment right point
        ppointsr = (s.right-width for s in skyline if s.right-width >= skyline_l)

        # Merge positions
        return heapq.merge(ppointsl, ppointsr)
项目:lazy_sort    作者:ondergetekende    | 项目源码 | 文件源码
def lazysort(l: list) -> typing.Iterator:
    # Stage 1
    stack = []
    current_list = iter(l)
    sentinel = object()
    first = next(current_list, sentinel)
    while first is not sentinel:
        sortedish, surplus = dropsort(itertools.chain((first,), current_list))
        stack.append(sortedish)
        current_list = surplus
        first = next(current_list, sentinel)

    # Stage 2
    if len(stack) < 2:  # the case where the list `l` is already sorted
        return iter(l)

    cur = heapq.merge(stack.pop(), stack.pop())
    while stack:
        cur = heapq.merge(cur, stack.pop())

    return cur
项目:lazysort    作者:boramalper    | 项目源码 | 文件源码
def lazysort(l: list) -> typing.Iterator:
    # Stage 1
    stack = []
    current_list = iter(l)
    sentinel = object()
    first = next(current_list, sentinel)
    while first is not sentinel:
        sortedish, surplus = dropsort(chain((first,), current_list))
        stack.append(sortedish)
        current_list = surplus
        first = next(current_list, sentinel)

    # Stage 2
    if len(stack) < 2:  # the case where the list `l` is already sorted
        return iter(l)

    cur = merge(stack.pop(), stack.pop())
    while stack:
        cur = merge(cur, stack.pop())

    return cur
项目:indy-plenum    作者:hyperledger    | 项目源码 | 文件源码
def _get_merged_catchup_txns(existing_txns, new_txns):
        """
        Merge any newly received txns during catchup with already received txns
        :param existing_txns:
        :param new_txns:
        :return:
        """
        idx_to_remove = []
        for i, (seq_no, _) in enumerate(existing_txns):
            if seq_no < new_txns[0][0]:
                continue
            if seq_no > new_txns[-1][0]:
                break
            idx_to_remove.append(seq_no - new_txns[0][0])
        for idx in reversed(idx_to_remove):
            new_txns.pop(idx)

        return list(heapq.merge(existing_txns, new_txns,
                                key=operator.itemgetter(0)))
项目:Hawkeye    作者:tozhengxq    | 项目源码 | 文件源码
def items(self, maxfiles=128):
        """Returns a sorted list or iterator of the items in the pool.

        :param maxfiles: maximum number of files to open at once.
        """

        if maxfiles < 2:
            raise ValueError("maxfiles=%s must be >= 2" % maxfiles)

        if not self.runs:
            # We never wrote a run to disk, so just sort the queue in memory
            # and return that
            return sorted(self.current)
        # Write a new run with the leftover items in the queue
        self.save()

        # If we have more runs than allowed open files, merge some of the runs
        if maxfiles < len(self.runs):
            self.reduce_to(maxfiles, maxfiles)

        # Take all the runs off the run list and merge them
        runs = self.runs
        self.runs = []  # Minor detail, makes this object reusable
        return self._merge_runs(runs)
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def posts_for_user(user: User, limit: Optional[int] = None) -> List[Post]:
    relevant = merge(*[user_posts[u] for u in following[user]], reverse=True)
    return list(islice(relevant, limit))
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def date_sorted_sources(*sources):
    """
    Takes an iterable of sources, generating namestrings and
    piping their output into date_sort.
    """
    sorted_stream = heapq.merge(*(_decorate_source(s) for s in sources))

    # Strip out key decoration
    for _, message in sorted_stream:
        yield message
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def apply(func, *mappings):
    """Return a new IntervalMapping applying func to all values of mappings.

    For example, apply(lambda x, y: x + y, m1, m2) returns a new
    matching with the sum of values for m1 and
    m2. IntervalMapping.nothing is passed to func when the value is
    undefined.

    >>> m1 = IntervalMapping()
    >>> m2 = IntervalMapping()
    >>> m1[:] = m2[:] = 0 # avoid problems with undefined values
    >>> m1[0:2] = 1
    >>> m2[1:3] = 2
    >>> m3 = apply(lambda a, b: a + b, m1, m2)
    >>> m3[-1], m3[0], m3[1], m3[2], m3[3]
    (0, 1, 3, 2, 0)
    """

    values = [m.leftmost() for m in mappings]

    def changes():

        def start_i_value(i_m):
            i, m = i_m
            res = ((k.start, i, v) for k, v in m.iteritems(True))
            next(res)
            return res
        change_points = merge(*map(start_i_value, enumerate(mappings)))

        lastbound = None
        for bound, i, v in change_points:
            values[i] = v
            if bound != lastbound:
                yield bound, func(*values)
                lastbound = bound
        yield bound, func(*values)

    return IntervalMapping.from_changes(func(*values), changes())
项目:nstock    作者:ybenitezf    | 项目源码 | 文件源码
def imerge(iterables):
        return merge(*iterables)
项目:nstock    作者:ybenitezf    | 项目源码 | 文件源码
def sort(items, maxsize=100000, tempdir=None, maxfiles=128):
    """Sorts the given items using an external merge sort.

    :param tempdir: the path of a directory to use for temporary file
        storage. The default is to use the system's temp directory.
    :param maxsize: the maximum number of items to keep in memory at once.
    :param maxfiles: maximum number of files to open at once.
    """

    p = SortingPool(maxsize=maxsize, tempdir=tempdir)
    for item in items:
        p.add(item)
    return p.items(maxfiles=maxfiles)
项目:acris    作者:Acrisel    | 项目源码 | 文件源码
def merge(key=None, *iterables):
    # based on code posted by Scott David Daniels in c.l.p.
    # http://groups.google.com/group/comp.lang.python/msg/484f01f1ea3c832d

    if key is None:
        keyed_iterables = iterables
    else:
        keyed_iterables = [(Keyed(key(obj), obj) for obj in iterable)
                            for iterable in iterables]
    for element in heapq.merge(*keyed_iterables):
        yield element.obj
项目:acris    作者:Acrisel    | 项目源码 | 文件源码
def msort(input_, output, key=None, buffer_size=32000, tempdirs=None):
    if tempdirs is None:
        tempdirs = []
    if not tempdirs:
        tempdirs.append(gettempdir())

    chunks = []
    try:
        with open(input_,'rb',64*1024) as input_file:
            input_iterator = iter(input_file)
            for tempdir in cycle(tempdirs):
                current_chunk = list(islice(input_iterator,buffer_size))
                if not current_chunk:
                    break
                current_chunk.sort(key=key)
                output_chunk = open(os.path.join(tempdir,'%06i'%len(chunks)),'w+b',64*1024)
                chunks.append(output_chunk)
                output_chunk.writelines(current_chunk)
                output_chunk.flush()
                output_chunk.seek(0)
        with open(output,'wb',64*1024) as output_file:
            output_file.writelines(merge(key, *chunks))
    finally:
        for chunk in chunks:
            try:
                chunk.close()
                os.remove(chunk.name)
            except Exception:
                pass
项目:catalyst    作者:enigmampc    | 项目源码 | 文件源码
def date_sorted_sources(*sources):
    """
    Takes an iterable of sources, generating namestrings and
    piping their output into date_sort.
    """
    sorted_stream = heapq.merge(*(_decorate_source(s) for s in sources))

    # Strip out key decoration
    for _, message in sorted_stream:
        yield message
项目:pyimc    作者:oysstu    | 项目源码 | 文件源码
def sorted_idx_iter(self, types: List[int]) -> Iterable[int]:
        """
        Returns an iterator of file positions sorted by file position (across different message types)
        :param types: The message types to return, None returns all types
        :return: Generator object for sorted file positions
        """
        if types:
            idx_iters = [self.idx[key] for key in types if key in self.idx]
        else:
            idx_iters = [val for key, val in self.idx.items()]

        # Use the heapq.merge function to return sorted iterator of file indices
        return heapq.merge(*idx_iters)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def imerge(iterables):
        return merge(*iterables)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def sort(items, maxsize=100000, tempdir=None, maxfiles=128):
    """Sorts the given items using an external merge sort.

    :param tempdir: the path of a directory to use for temporary file
        storage. The default is to use the system's temp directory.
    :param maxsize: the maximum number of items to keep in memory at once.
    :param maxfiles: maximum number of files to open at once.
    """

    p = SortingPool(maxsize=maxsize, tempdir=tempdir)
    for item in items:
        p.add(item)
    return p.items(maxfiles=maxfiles)
项目:ftrace    作者:corakwue    | 项目源码 | 文件源码
def sorted_items(iterables):
    sorted_iterable = heapq.merge(*(_decorate_items(s) for s in iterables))

    for _, item in sorted_iterable:
        yield item
项目:WhooshSearch    作者:rokartnaz    | 项目源码 | 文件源码
def imerge(iterables):
        return merge(*iterables)
项目:WhooshSearch    作者:rokartnaz    | 项目源码 | 文件源码
def sort(items, maxsize=100000, tempdir=None, maxfiles=128):
    """Sorts the given items using an external merge sort.

    :param tempdir: the path of a directory to use for temporary file
        storage. The default is to use the system's temp directory.
    :param maxsize: the maximum number of items to keep in memory at once.
    :param maxfiles: maximum number of files to open at once.
    """

    p = SortingPool(maxsize=maxsize, tempdir=tempdir)
    for item in items:
        p.add(item)
    return p.items(maxfiles=maxfiles)
项目:cuckoodroid-2.0    作者:idanr1986    | 项目源码 | 文件源码
def batch_sort(input_iterator, output_path, buffer_size=32000, output_class=None):
    """batch sort helper with temporary files, supports sorting large stuff"""
    if not output_class:
        output_class = input_iterator.__class__

    chunks = []
    try:
        while True:
            current_chunk = list(islice(input_iterator, buffer_size))
            if not current_chunk:
                break

            current_chunk.sort()
            fd, filepath = tempfile.mkstemp()
            os.close(fd)
            output_chunk = output_class(filepath)
            chunks.append(output_chunk)

            for elem in current_chunk:
                output_chunk.write(elem.obj)
            output_chunk.close()

        output_file = output_class(output_path)
        for elem in heapq.merge(*chunks):
            output_file.write(elem.obj)
        output_file.close()
    finally:
        for chunk in chunks:
            try:
                chunk.close()
                os.remove(chunk.name)
            except Exception:
                pass
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def __init__(self, name, priority, demand, affinity,
                 affinity_limits=None,
                 data_retention_timeout=0,
                 lease=0,
                 identity_group=None,
                 identity=None,
                 schedule_once=False):

        self.global_order = _global_order()
        self.allocation = None
        self.server = None

        self.name = name
        self.affinity = Affinity(affinity, affinity_limits)
        self.priority = priority
        self.demand = np.array(demand, dtype=float)
        self.data_retention_timeout = data_retention_timeout
        self.lease = lease
        self.identity_group = identity_group
        self.identity = identity
        self.identity_group_ref = None
        self.schedule_once = schedule_once
        self.evicted = False
        self.placement_expiry = None
        self.renew = False

    # FIXME: What dictates order? heapq.merge in utilization_queue needs this
    # comparison.
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def utilization_queue(self, free_capacity, visitor=None):
        """Returns utilization queue including the sub-allocs.

        All app queues from self and sub-allocs are merged in standard order,
        and then utilization is recalculated based on total reserved capacity
        of this alloc and sub-allocs combined.

        The function maintains invariant that any app (self or inside sub-alloc
        with utilization < 1 will remain with utilzation < 1.
        """
        total_reserved = self.total_reserved()
        queues = [alloc.utilization_queue(free_capacity, visitor)
                  for alloc in self.sub_allocations.values()]

        queues.append(self.priv_utilization_queue())

        acc_demand = zero_capacity()
        available = total_reserved + free_capacity + np.finfo(float).eps

        # FIXME: heapq.merge has an overhead of comparison
        for item in heapq.merge(*queues):
            rank, _util, pending, order, app = item
            acc_demand = acc_demand + app.demand
            util = utilization(acc_demand, total_reserved, available)
            if app.priority == 0:
                util = _MAX_UTILIZATION
            # - lower rank allocations take precedence.
            # - for same rank, utilization takes precedence
            # - False < True, so for apps with same utilization we prefer
            #   those that already running (False == not pending)
            # - Global order
            entry = (rank, util, pending, order, app)
            if visitor:
                visitor(self, entry)
            yield entry
项目:CodeWars    作者:smiks    | 项目源码 | 文件源码
def hamming_numbers():
    last = 1
    yield last
    a,b,c = tee(hamming_numbers(), 3)
    for n in merge((2*i for i in a), (3*i for i in b), (5*i for i in c)):
        if n != last:
            yield n
            last = n
项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def batch_sort(input_iterator, output_path, buffer_size=32000, output_class=None):
    """batch sort helper with temporary files, supports sorting large stuff"""
    if not output_class:
        output_class = input_iterator.__class__

    chunks = []
    try:
        while True:
            current_chunk = list(islice(input_iterator, buffer_size))
            if not current_chunk:
                break
            current_chunk.sort()
            fd, filepath = tempfile.mkstemp()
            os.close(fd)
            output_chunk = output_class(filepath)
            chunks.append(output_chunk)

            for elem in current_chunk:
                output_chunk.write(elem.obj)
            output_chunk.close()

        output_file = output_class(output_path)
        for elem in heapq.merge(*chunks):
            output_file.write(elem.obj)
        else:
            output_file.write()
        output_file.close()
    finally:
        for chunk in chunks:
            try:
                chunk.close()
                os.remove(chunk.name)
            except Exception:
                pass


# magic
项目:QualquerMerdaAPI    作者:tiagovizoto    | 项目源码 | 文件源码
def imerge(iterables):
        return merge(*iterables)
项目:QualquerMerdaAPI    作者:tiagovizoto    | 项目源码 | 文件源码
def sort(items, maxsize=100000, tempdir=None, maxfiles=128):
    """Sorts the given items using an external merge sort.

    :param tempdir: the path of a directory to use for temporary file
        storage. The default is to use the system's temp directory.
    :param maxsize: the maximum number of items to keep in memory at once.
    :param maxfiles: maximum number of files to open at once.
    """

    p = SortingPool(maxsize=maxsize, tempdir=tempdir)
    for item in items:
        p.add(item)
    return p.items(maxfiles=maxfiles)
项目:Machine_Learning_Playground    作者:yao23    | 项目源码 | 文件源码
def nthSuperUglyNumber(self, n, primes):
        """
        :type n: int
        :type primes: List[int]
        :rtype: int
        """
        uglies = [1]
        merged = heapq.merge(*map(lambda p: (u*p for u in uglies), primes))
        uniqed = (u for u, _ in itertools.groupby(merged))
        map(uglies.append, itertools.islice(uniqed, n-1))
        return uglies[-1]

    # beats 85.23%
项目:leetcode    作者:Dimen61    | 项目源码 | 文件源码
def getNewsFeed(self, userId):
        """
        Retrieve the 10 most recent tweet ids in the user's news feed. Each item in the news feed must be posted by users who the user followed or by the user herself. Tweets must be ordered from most recent to least recent.
        :type userId: int
        :rtype: List[int]
        """
        tweets = heapq.merge(*(self.tweets[user]
                               for user in self.followees[userId] | {userId}))
        return [t for _, t in itertools.islice(tweets, 10)]
项目:lazy_sort    作者:ondergetekende    | 项目源码 | 文件源码
def merge_sort(unsorted, start=None, end=None):
    if start is None:
        start = 0
        end = len(unsorted)

    midpoint = (start + end) // 2

    if end - start == 1:
        yield unsorted[start]
    elif end - start > 1:
        yield from heapq.merge(
            merge_sort(unsorted, start, midpoint),
            merge_sort(unsorted, midpoint, end))
项目:Hawkeye    作者:tozhengxq    | 项目源码 | 文件源码
def imerge(iterables):
        return merge(*iterables)
项目:rca-evaluation    作者:sieve-microservices    | 项目源码 | 文件源码
def dump_app(app_args, path, begin, now, container_image_pattern = ""):

    # the app_args argument is supplied in the format 
    #   <app name>:<additional influxdb table 1>:<additional influxdb table 2>:...
    app_args = app_args.split(":")
    # get the tag (keys and values) and fields from the docker measurements and 
    # any additional tables
    app = dump_column_names(app_args)

    # build queries
    queries = []
    # always extract docker metrics (here referred to as 'system metrics')
    for system in SYSTEM_METRICS:
        pattern = CONTAINER_IMAGE_PATTERNS[app.name].format(container_image_pattern)
        q = """select * from "docker_container_{}" where
                container_name =~ /{}/
                and container_image =~ /{}/
                and time > '%s' and time < '%s'
            """.format(system, app.name, pattern)
        queries.append(scroll(q, begin, now))

    if len(app_args) > 1:
        for app_arg in app_args[1:]:
            q = "select * from \"{}\" where time > '%s' and time < '%s'".format(app_arg)
            print(q)
            queries.append(scroll(q, begin, now, prefix = app.name))

    path = os.path.join(path, app.filename)

    with gzip.open(path, "wb") as f:
        columns = app.fields + app.tags + ["time"]
        writer = csv.DictWriter(f, fieldnames=columns, dialect=csv.excel_tab, extrasaction='ignore')
        writer.writeheader()
        for _, row in heapq.merge(*queries):
            writer.writerow(row)
    return app

# in general, all apps have docker metrics for them, se we retrieve the 
# metrics stored in 'docker_container_*' tables by default. we save these 
# metrics under an app name equal to the first string in a sequence of strings 
# separated by a ':'. app-specific metrics for such app names are gathered from 
# the tables specified in subsequent strings.
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def run_executables(execalls, cache=None, ccmode=CC_PROCESSES):
    # type: (List[ExeCall], Optional[Cache], str) -> Iterator[ExeResult]
    """Run executables in parallel.

    Some of the results for the execalls may be found in the cache
    so we put these aside in cachedresults.

    Each result is yield as soon as available.
    """

    def c2exeresult(value):
        # type: (bytes) -> ExeResult
        returncode, stdout, stderr = unpack_exeresult(value)
        return make_exeresult(returncode, stdout, stderr)

    def exeresult2c(exeresult):
        # type: (ExeResult) -> bytes
        return pack_exeresult(exeresult.returncode, exeresult.stdout, exeresult.stderr)

    # Package the execalls for eventuall multiprocessing
    args_lists = [((ec.exe, ec.cmdargs), {'stdindata': ec.stdindata})
                  for ec in execalls]  # type: List[CallArgs]

    cachedresults = []
    jobs = []  # type: List[CallArgs]
    keys = []  # type: List[str]
    jobindices = []  # type: List[int]
    if cache is not None:
        qkeys = [execall_hash(ec, cache) for ec in execalls]
        qresults = cache.mget(qkeys)

        for idx, (arg, key, cvalue) in enumerate(izip(args_lists, qkeys, qresults)):
            if cvalue is not None:
                cachedresults.append((idx, c2exeresult(cvalue)))
            else:
                keys.append(key)
                jobs.append(arg)
                jobindices.append(idx)
    else:
        jobs = args_lists
        jobindices = list(range(len(jobs)))

    jobiter = iter_parallel(call_popen, jobs, ccmode=ccmode)

    def jobresultiter():
        # type: () -> Iterator[Tuple[int, ExeResult]]
        for idx, (jobidx, job, result) in enumerate(izip(jobindices, jobs, jobiter)):
            if cache is not None:
                cache.set(keys[idx], exeresult2c(result))
            (executable, cmdargs), kwargs = job
            log_popen(executable, cmdargs, kwargs['stdindata'], result)
            yield jobidx, result

    for idx, result in heapq.merge(iter(cachedresults), jobresultiter()):
        yield result
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def _sow(self, watch, pattern, since, handler, impl):
        """Publish state of the world."""
        if since is None:
            since = 0

        def _publish(item):
            when, path, content = item
            try:
                payload = impl.on_event(str(path), None, content)
                if payload is not None:
                    payload['when'] = when
                    handler.write_message(payload)
            except Exception as err:  # pylint: disable=W0703
                handler.send_error_msg(str(err))

        db_connections = []
        fs_records = self._get_fs_sow(watch, pattern, since)

        sow = getattr(impl, 'sow', None)
        sow_table = getattr(impl, 'sow_table', 'sow')
        try:
            records = []
            if sow:
                dbs = sorted(glob.glob(os.path.join(self.root, sow, '*')))
                for db in dbs:
                    if os.path.basename(db).startswith('.'):
                        continue

                    conn, db_cursor = self._db_records(
                        db, sow_table, watch, pattern, since
                    )
                    records.append(db_cursor)
                    # FIXME: Figure out pylint use before assign
                    db_connections.append(conn)  # pylint: disable=E0601

            records.append(fs_records)
            # Merge db and fs records, removing duplicates.
            prev_path = None

            for item in heapq.merge(*records):
                _when, path, _content = item
                if path == prev_path:
                    continue
                prev_path = path
                _publish(item)
        finally:
            for conn in db_connections:
                if conn:
                    conn.close()