我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.asyncio()。
def test_poll(self): @asyncio.coroutine def test(): a, b = self.create_bound_pair(zmq.PUSH, zmq.PULL) f = b.poll(timeout=0) yield from asyncio.sleep(0) self.assertEqual(f.result(), 0) f = b.poll(timeout=1) assert not f.done() evt = yield from f self.assertEqual(evt, 0) f = b.poll(timeout=1000) assert not f.done() yield from a.send_multipart([b'hi', b'there']) evt = yield from f self.assertEqual(evt, zmq.POLLIN) recvd = yield from b.recv_multipart() self.assertEqual(recvd, [b'hi', b'there']) self.loop.run_until_complete(test())
def can_connect(self, server, client): """Check if client can connect to server using tcp transport""" @asyncio.coroutine def go(): result = False iface = 'tcp://127.0.0.1' port = server.bind_to_random_port(iface) client.connect("%s:%i" % (iface, port)) msg = [b"Hello World"] yield from server.send_multipart(msg) if (yield from client.poll(1000)): rcvd_msg = yield from client.recv_multipart() self.assertEqual(rcvd_msg, msg) result = True return result return self.loop.run_until_complete(go())
def run(self): self._loop = zmq.asyncio.ZMQEventLoop() asyncio.set_event_loop(self._loop) self.context = zmq.asyncio.Context() self.status_sock = self.context.socket(zmq.ROUTER) self.data_sock = self.context.socket(zmq.PUB) self.status_sock.bind("tcp://*:%s" % self.status_port) self.data_sock.bind("tcp://*:%s" % self.data_port) self.poller = zmq.asyncio.Poller() self.poller.register(self.status_sock, zmq.POLLIN) self._loop.create_task(self.poll_sockets()) try: self._loop.run_forever() finally: self.status_sock.close() self.data_sock.close() self.context.destroy()
def __init__(self, loop, logger, config): print("test") self.loop = loop self.log = logger self.config = config self.zmq_url = config["BITCOIND"]["zeromq"] self.zmqContext = zmq.asyncio.Context() self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) self.MYSQL_CONFIG = config["MYSQL"] self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") # self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx") # self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock") # self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx") self.zmqSubSocket.connect(self.zmq_url) print(self.zmq_url) self.loop.create_task(self.init_db()) self.loop.create_task(self.handle()) self.loop.create_task(self.rpctest()) # self.loop.create_task(self.mysqltest())
def handle(self) : msg = await self.zmqSubSocket.recv_multipart() topic = msg[0] body = msg[1] sequence = "Unknown" if len(msg[-1]) == 4: msgSequence = struct.unpack('<I', msg[-1])[-1] sequence = str(msgSequence) if topic == b"hashblock": print('- HASH BLOCK ('+sequence+') -') print(binascii.hexlify(body)) elif topic == b"hashtx": print('- HASH TX ('+sequence+') -') print(binascii.hexlify(body)) elif topic == b"rawblock": print('- RAW BLOCK HEADER ('+sequence+') -') print(binascii.hexlify(body)) elif topic == b"rawtx": self.log.debug("new tx") self.loop.create_task(self.handle_tx(body)) # print('- RAW TX ('+sequence+') -') # print(binascii.hexlify(body)) # schedule ourselves to receive the next message asyncio.ensure_future(self.handle())
def __init__(self, bind_address, linger=-1, poll_timeout=2, loop=None): self.bind_address = bind_address self.loop = loop self.context = zmq.asyncio.Context() self.poll_timeout = poll_timeout self.socket = self.context.socket(zmq.ROUTER) self.socket.setsockopt(zmq.LINGER, linger) self.in_poller = zmq.asyncio.Poller() self.in_poller.register(self.socket, zmq.POLLIN) log.info('Bound to: ' + self.bind_address) self.socket.bind(self.bind_address) self._kill = False
def run(): print("Getting ready for hello world client. Ctrl-C to exit.\n") socket = Ctx.socket(zmq.REP) socket.bind(Url) while True: # Wait for next request from client message = await socket.recv() print("Received request: {}".format(message)) # Do some "work" await asyncio.sleep(1) # Send reply back to client message = message.decode('utf-8') message = '{}, world'.format(message) message = message.encode('utf-8') print("Sending reply: {}".format(message)) await socket.send(message)
def start(self, slave_addr, task): self._task = task def _start(id, slave_addr, task): from multiprocessing import Process import multiprocessing #multiprocessing.set_start_method('spawn') Process(target=_worker_main, args=(id, slave_addr, task)).start() from concurrent.futures import ProcessPoolExecutor print("[Worker {0}] Create".format(self.id)) _start(self.id, slave_addr, task) #executor = ProcessPoolExecutor() #loop = asyncio.get_event_loop() #asyncio.ensure_future(loop.run_in_executor(ProcessPoolExecutor(), _worker_main, self.id, slave_addr, task)) #asyncio.ensure_future(_start(self.id, slave_addr, task)) #yield from asyncio.sleep(10) print("***")
def _monitor_disconnects(self): """Monitors the client socket for disconnects """ yield from self._monitor_sock.recv_multipart() self._sock.disable_monitor() self._monitor_sock.disconnect(self._monitor_fd) self._monitor_sock.close(linger=0) self._monitor_sock = None self._sock.disconnect(self._url) self._ready_event.clear() LOGGER.debug("monitor socket received disconnect event") for future in self._futures.future_values(): future.set_result(FutureError()) tasks = list(asyncio.Task.all_tasks(self._event_loop)) for task in tasks: task.cancel() self._event_loop.stop() self._send_queue = None self._recv_queue = None
def put_message(self, message): """ :param message: protobuf generated validator_pb2.Message """ if not self._ready_event.is_set(): return with self._condition: self._condition.wait_for( lambda: self._event_loop is not None and self._send_queue is not None ) asyncio.run_coroutine_threadsafe( self._put_message(message), self._event_loop)
def _send(self, ident, message): """ (asyncio coroutine) Send the message and wait for a response. :param message (sawtooth_sdk.protobuf.Message) :param ident (str) the identity of the zmq.DEALER to send to """ LOGGER.debug( "Sending %s(%s) to %s", str(to_protobuf_class(message.message_type).__name__), str(message.message_type), str(ident) ) return await self._socket.send_multipart([ ident, message.SerializeToString() ])
def start(self): """Starts receiving messages on the underlying socket and passes them to the message router. """ self._is_running = True while self._is_running: try: zmq_msg = await self._socket.recv_multipart() message = Message() message.ParseFromString(zmq_msg[-1]) await self._msg_router.route_msg(message) except DecodeError as e: LOGGER.warning('Unable to decode: %s', e) except zmq.ZMQError as e: LOGGER.warning('Unable to receive: %s', e) return except asyncio.CancelledError: self._is_running = False
def send(self, message_type, message_content, timeout=None): correlation_id = uuid.uuid4().hex self._msg_router.expect_reply(correlation_id) message = Message( correlation_id=correlation_id, content=message_content, message_type=message_type) try: await self._socket.send_multipart([message.SerializeToString()]) except asyncio.CancelledError: raise return await self._msg_router.await_reply(correlation_id, timeout=timeout)
def _receive_message(self): """ Internal coroutine for receiving messages """ while True: try: if self._socket.getsockopt(zmq.TYPE) == zmq.ROUTER: zmq_identity, msg_bytes = \ yield from self._socket.recv_multipart() self._received_from_identity(zmq_identity) self._dispatcher_queue.put_nowait( (zmq_identity, msg_bytes)) else: msg_bytes = yield from self._socket.recv() self._last_message_time = time.time() self._dispatcher_queue.put_nowait((None, msg_bytes)) except CancelledError: # The concurrent.futures.CancelledError is caught by asyncio # when the Task associated with the coroutine is cancelled. # The raise is required to stop this component. raise except Exception as e: # pylint: disable=broad-except LOGGER.exception("Received a message on address %s that " "caused an error: %s", self._address, e)
def __init__(self, context=None, loop=None): super().__init__(context) self.loop = loop or asyncio.get_event_loop() self.__poller = None self.__task = None
def start(self): """Start ZAP authentication""" super().start() self.__poller = Poller() self.__poller.register(self.zap_socket, zmq.POLLIN) self.__task = asyncio.async(self.__handle_zap())
def setUp(self): if asyncio is None: raise SkipTest() self.loop = zaio.ZMQEventLoop() asyncio.set_event_loop(self.loop) super(TestAsyncIOSocket, self).setUp()
def test_recv_multipart(self): @asyncio.coroutine def test(): a, b = self.create_bound_pair(zmq.PUSH, zmq.PULL) f = b.recv_multipart() assert not f.done() yield from a.send(b'hi') recvd = yield from f self.assertEqual(recvd, [b'hi']) self.loop.run_until_complete(test())
def test_recv(self): @asyncio.coroutine def test(): a, b = self.create_bound_pair(zmq.PUSH, zmq.PULL) f1 = b.recv() f2 = b.recv() assert not f1.done() assert not f2.done() yield from a.send_multipart([b'hi', b'there']) recvd = yield from f2 assert f1.done() self.assertEqual(f1.result(), b'hi') self.assertEqual(recvd, b'there') self.loop.run_until_complete(test())
def test_aiohttp(self): try: import aiohttp except ImportError: raise SkipTest("Requires aiohttp") from aiohttp import web zmq.asyncio.install() @asyncio.coroutine def echo(request): print(request.path) return web.Response(body=str(request).encode('utf8')) @asyncio.coroutine def server(loop): app = web.Application(loop=loop) app.router.add_route('GET', '/', echo) srv = yield from loop.create_server(app.make_handler(), '127.0.0.1', 8080) print("Server started at http://127.0.0.1:8080") return srv @asyncio.coroutine def client(): push, pull = self.create_bound_pair(zmq.PUSH, zmq.PULL) res = yield from aiohttp.request('GET', 'http://127.0.0.1:8080/') text = yield from res.text() yield from push.send(text.encode('utf8')) rcvd = yield from pull.recv() self.assertEqual(rcvd.decode('utf8'), text) loop = asyncio.get_event_loop() loop.run_until_complete(server(loop)) print("servered") loop.run_until_complete(client())
def shortDescription(self): """Rewrite doc strings from TestThreadAuthentication from 'threaded' to 'asyncio'. """ doc = self._testMethodDoc if doc: doc = doc.split("\n")[0].strip() if doc.startswith('threaded auth'): doc = doc.replace('threaded auth', 'asyncio auth') return doc
def setUp(self): if asyncio is None: raise SkipTest() self.loop = zaio.ZMQEventLoop() asyncio.set_event_loop(self.loop) super().setUp()
def poll_sockets(self): while not self.stopped: evts = dict(await self.poller.poll(50)) if self.status_sock in evts and evts[self.status_sock] == zmq.POLLIN: ident, msg = await self.status_sock.recv_multipart() if msg == b"WHATSUP": await self.status_sock.send_multipart([ident, b"HI!", json.dumps(self.plot_desc).encode('utf8')]) await asyncio.sleep(0.010)
def stop(self): self.send("irrelevant", np.array([]), msg="done") self.stopped = True pending = asyncio.Task.all_tasks(loop=self._loop) self._loop.stop() time.sleep(1) for task in pending: task.cancel() try: self._loop.run_until_complete(task) except asyncio.CancelledError: pass self._loop.close()
def ping_loop(ctx, ping_interval, cycle_time, initial_ping_timeout, ping_retries, backoff, loop, inventory_router_url): """ :param ctx: :param ping_interval: :param cycle_time: :param initial_ping_timeout: :param ping_retries: :param backoff: :param loop: :param inventory_router_url: :return: """ # load the queue inventory_client = InventoryClient(inventory_router_url) while True: if stop_ping_loop: log.info('Stopping ping loop') break log.debug('Looking for work') now = time.time() for mercury_id, data in list(active_state.items()): # copy to list because the list length could change # out from under us if now - data['last_ping'] > ping_interval and not data['pinging']: log.debug('Scheduling ping for {}'.format(mercury_id)) active_state[mercury_id]['pinging'] = True asyncio.ensure_future(ping(data, ctx, initial_ping_timeout, ping_retries, backoff, inventory_client), loop=loop) await asyncio.sleep(cycle_time)
def get_ctx_and_connect_req_socket(zmq_url): """Creates a ZMQ context and a REQ socket. :param zmq_url: URL for the socket to connect to. :returns: A tuple containing the ZMQ context and the socket. """ ctx = zmq.asyncio.Context() # noinspection PyUnresolvedReferences socket = ctx.socket(zmq.REQ) log.debug('Connection to: {}'.format(zmq_url)) socket.connect(zmq_url) return ctx, socket
def __init__(self, zmq_url, linger=-1, response_timeout=0): self.zmq_url = zmq_url self.ctx, self.socket = get_ctx_and_connect_req_socket(self.zmq_url) self.socket.setsockopt(zmq.LINGER, linger) self.poller = zmq.asyncio.Poller() self.poller.register(self.socket, flags=zmq.POLLIN) self.response_timeout = response_timeout
def main(): try: loop = ZMQEventLoop() asyncio.set_event_loop(loop) loop.run_until_complete(run()) except KeyboardInterrupt: print('\nFinished (interrupted)') sys.exit(0)
def dispatch_msg(self, addr, header, body = b''): async def _dispatch_msg(msg): await self._router.send_multipart(msg) msg = [addr, header, b'', body] asyncio.ensure_future(_dispatch_msg(msg))
def run_server(): asyncio.ensure_future(client_router.run()) asyncio.ensure_future(slave_router.run()) # terminate server if receive a control packet from control socket. control_router = context.socket(zmq.ROUTER) control_router.bind(CONTROL_ROUTER_ADDR) msg = await control_router.recv_multipart()
def dispatch_msg(self, header, body = b''): async def _dispatch_msg(msg): print("_dispatch_msg("+str(msg)+")") await self._router.send_multipart(msg) # why server cannot receive this msg??? print("_dispatch_msg finish") # come here : okay msg = [header, b'', body] asyncio.ensure_future(_dispatch_msg(msg))
def run_server(): asyncio.ensure_future(master_conn.run()) asyncio.ensure_future(worker_router.run()) # terminate server if receive a control packet from control socket. control_router = context.socket(zmq.ROUTER) control_router.bind(CONTROL_ROUTER_ADDR) msg = await control_router.recv_multipart()
def main(MASTER_ADDR, WORKER_ROUTER_ADDR, slave_addr, control_router_addr): global context global master_conn global worker_router global worker_manager global SLAVE_ADDR global CONTROL_ROUTER_ADDR SLAVE_ADDR = slave_addr CONTROL_ROUTER_ADDR = control_router_addr try: loop = ZMQEventLoop() asyncio.set_event_loop(loop) context = Context() master_conn = MasterConnection(context, MASTER_ADDR) worker_router = WorkerRouter(context, WORKER_ROUTER_ADDR) worker_manager = WorkerManager() #loop.set_default_executor(ProcessPoolExecutor()) loop.run_until_complete(run_server()) except KeyboardInterrupt: print('\nFinished (interrupted)') sys.exit(0)
def run(self): for idx in range(TaskSimulator.NUM_TASKS): print("[*] Simulate Task #{0}".format(idx)) task = self._make_task() task_manager.add_task(task) self._process_task(task) await asyncio.sleep(randint(TaskSimulator.TASK_GAP_MIN_SECONDS, TaskSimulator.TASK_GAP_MAX_SECONDS))
def main(): try: loop = ZMQEventLoop() asyncio.set_event_loop(loop) loop.run_until_complete(run_server()) except KeyboardInterrupt: print('\nFinished (interrupted)') sys.exit(0)
def _dispatch_msg_async(msg): async def _dispatch_msg(msg): await self._router.send_multipart(msg) asyncio.ensure_future(_dispatch_msg(msg))
def main(client_router_addr, slave_router_addr): try: loop = ZMQEventLoop() asyncio.set_event_loop(loop) context = Context() loop.run_until_complete(run_master(context, client_router_addr, slave_router_addr)) except KeyboardInterrupt: print('\nFinished (interrupted)') sys.exit(0)
def run_worker(context : Context, slave_addr, serialized_data : bytes): slave_conn = SlaveConnection(context, slave_addr, SlaveMessageHandler()) asyncio.wait([ asyncio.ensure_future(slave_conn.run()), asyncio.ensure_future(do_task(context, TaskInformation.from_bytes(serialized_data))) ])
def main(slave_addr, serialized_data : bytes): try: loop = ZMQEventLoop() asyncio.set_event_loop(loop) context = Context() loop.run_until_complete(run_worker(context, slave_addr, serialized_data)) except KeyboardInterrupt: print('\nFinished (interrupted)') sys.exit(0)
def _do_sleep_task(sleep_task : SleepTask): await asyncio.sleep(sleep_task.job.seconds)
def run_master(context : Context, master_addr, worker_router_addr, worker_file_name): master_conn = MasterConnection(context, master_addr, MasterMessageHandler()) worker_router = WorkerRouter(context, worker_router_addr, WorkerMessageHandler()) WorkerCreator(worker_file_name) asyncio.wait([ asyncio.ensure_future(master_conn.run()), asyncio.ensure_future(worker_router.run()), asyncio.ensure_future(run_polling_workers()) ])
def main(master_addr, worker_router_addr): try: loop = ZMQEventLoop() asyncio.set_event_loop(loop) context = Context() loop.run_until_complete(run_master(context, master_addr, worker_router_addr)) except KeyboardInterrupt: print('\nFinished (interrupted)') sys.exit(0)
def get_message(self): """ :return message: concurrent.futures.Future """ with self._condition: self._condition.wait_for(lambda: self._event_loop is not None) return asyncio.run_coroutine_threadsafe(self._get_message(), self._event_loop)