Python twisted.internet.task 模块,coiterate() 实例源码

我们从Python开源项目中,提取了以下2个代码示例,用于说明如何使用twisted.internet.task.coiterate()

项目:txamqp    作者:txamqp    | 项目源码 | 文件源码
def gotConnection(conn, username, password, body, count=1):
    print("Connected to broker.")
    yield conn.authenticate(username, password)

    print("Authenticated. Ready to send messages")
    chan = yield conn.channel(1)
    yield chan.channel_open()

    def send_messages():
        def message_iterator():
            for i in range(count):
                content = body + "-%d" % i
                msg = Content(content)
                msg["delivery mode"] = 2
                chan.basic_publish(exchange="chatservice", content=msg, routing_key="txamqp_chatroom")
                print("Sending message: %s" % content)
                yield None
        return task.coiterate(message_iterator())

    yield send_messages()

    stopToken = "STOP"
    msg = Content(stopToken)
    msg["delivery mode"] = 2
    chan.basic_publish(exchange="chatservice", content=msg, routing_key="txamqp_chatroom")
    print("Sending message: %s" % stopToken)

    yield chan.channel_close()

    chan0 = yield conn.channel(0)
    yield chan0.connection_close()

    reactor.stop()
项目:maas    作者:maas    | 项目源码 | 文件源码
def _perform_action_on_nodes(
            self, system_ids, action_class, concurrency=2):
        """Perform a node action on the identified nodes.

        This is *asynchronous*.

        :param system_ids: An iterable of `Node.system_id` values.
        :param action_class: A value from `ACTIONS_DICT`.
        :param concurrency: The number of actions to run concurrently.

        :return: A `dict` mapping `system_id` to results, where the result can
            be a string (see `_perform_action_on_node`), or a `Failure` if
            something went wrong.
        """
        # We're going to be making the same call for every specified node, so
        # bundle up the common bits here to keep the noise down later on.
        perform = partial(
            deferToDatabase, self._perform_action_on_node,
            action_class=action_class)

        # The results will be a `system_id` -> `result` mapping, where
        # `result` can be a string like "done" or "not_actionable", or a
        # Failure instance.
        results = {}

        # Convenient callback.
        def record(result, system_id):
            results[system_id] = result

        # A *lazy* list of tasks to be run. It's very important that each task
        # is only created at the moment it's needed. Each task records its
        # outcome via `record`, be that success or failure.
        tasks = (
            perform(system_id).addBoth(record, system_id)
            for system_id in system_ids
        )

        # Create `concurrency` co-iterators. Each draws work from `tasks`.
        deferreds = (coiterate(tasks) for _ in range(concurrency))
        # Capture the moment when all the co-iterators have finished.
        done = DeferredList(deferreds, consumeErrors=True)
        # Return only the `results` mapping; ignore the result from `done`.

        return done.addCallback(lambda _: results)