我们从Python开源项目中,提取了以下2个代码示例,用于说明如何使用twisted.internet.task.coiterate()。
def gotConnection(conn, username, password, body, count=1): print("Connected to broker.") yield conn.authenticate(username, password) print("Authenticated. Ready to send messages") chan = yield conn.channel(1) yield chan.channel_open() def send_messages(): def message_iterator(): for i in range(count): content = body + "-%d" % i msg = Content(content) msg["delivery mode"] = 2 chan.basic_publish(exchange="chatservice", content=msg, routing_key="txamqp_chatroom") print("Sending message: %s" % content) yield None return task.coiterate(message_iterator()) yield send_messages() stopToken = "STOP" msg = Content(stopToken) msg["delivery mode"] = 2 chan.basic_publish(exchange="chatservice", content=msg, routing_key="txamqp_chatroom") print("Sending message: %s" % stopToken) yield chan.channel_close() chan0 = yield conn.channel(0) yield chan0.connection_close() reactor.stop()
def _perform_action_on_nodes( self, system_ids, action_class, concurrency=2): """Perform a node action on the identified nodes. This is *asynchronous*. :param system_ids: An iterable of `Node.system_id` values. :param action_class: A value from `ACTIONS_DICT`. :param concurrency: The number of actions to run concurrently. :return: A `dict` mapping `system_id` to results, where the result can be a string (see `_perform_action_on_node`), or a `Failure` if something went wrong. """ # We're going to be making the same call for every specified node, so # bundle up the common bits here to keep the noise down later on. perform = partial( deferToDatabase, self._perform_action_on_node, action_class=action_class) # The results will be a `system_id` -> `result` mapping, where # `result` can be a string like "done" or "not_actionable", or a # Failure instance. results = {} # Convenient callback. def record(result, system_id): results[system_id] = result # A *lazy* list of tasks to be run. It's very important that each task # is only created at the moment it's needed. Each task records its # outcome via `record`, be that success or failure. tasks = ( perform(system_id).addBoth(record, system_id) for system_id in system_ids ) # Create `concurrency` co-iterators. Each draws work from `tasks`. deferreds = (coiterate(tasks) for _ in range(concurrency)) # Capture the moment when all the co-iterators have finished. done = DeferredList(deferreds, consumeErrors=True) # Return only the `results` mapping; ignore the result from `done`. return done.addCallback(lambda _: results)