我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.Context()。
def __init__(self, opts=None): if opts is None: self.opts = self.process_config(CONFIG_LOCATION) else: self.opts = opts self.ctx = zmq.Context() self.pub_socket = self.ctx.socket(zmq.PUB) self.pub_socket.bind('tcp://127.0.0.1:2000') self.loop = zmq.eventloop.IOLoop.instance() self.pub_stream = zmq.eventloop.zmqstream.ZMQStream(self.pub_socket, self.loop) # Now create PULL socket over IPC to listen to reactor self.pull_socket = self.ctx.socket(zmq.PULL) self.pull_socket.bind('ipc:///tmp/reactor.ipc') self.pull_stream = zmq.eventloop.zmqstream.ZMQStream(self.pull_socket, self.loop) self.pull_stream.on_recv(self.republish)
def serviceA(context=None): #reuse context if it exists, otherwise make a new one context = context or zmq.Context.instance() service = context.socket(zmq.DEALER) #identify worker service.setsockopt(zmq.IDENTITY,b'A') service.connect("tcp://localhost:5560") while True: message = service.recv() with myLock: print "Service A got:" print message if message == "Service A": #do some work time.sleep(random.uniform(0,0.5)) service.send(b"Service A did your laundry") elif message == "END": break else: with myLock: print "the server has the wrong identities!" break
def frontendClient(context=None): #reuse context if it exists, otherwise make a new one context = context or zmq.Context.instance() socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:5559") socket.RCVTIMEO = 2000 #we will only wait 2s for a reply while True: #randomly request either service A or service B serviceRequest = random.choice([b'Service A',b'Service B']) with myLock: print "client wants %s" % serviceRequest socket.send(serviceRequest) try: reply = socket.recv() except Exception as e: print "client timed out" break if not reply: break with myLock: print "Client got reply: " print reply print #take a nap time.sleep(1)
def __init__(self, opts=None): if opts is None: self.opts = self.process_config(CONFIG_LOCATION) else: self.opts = opts # Start setting up ZeroMQ self.ctx = zmq.Context() self.socket = self.ctx.socket(zmq.SUB) self.socket.connect('tcp://localhost:2000') self.loop = zmq.eventloop.IOLoop.instance() self.stream = zmq.eventloop.zmqstream.ZMQStream(self.socket, self.loop) self.stream.on_recv(act) # Load up actions self.actions = loader.load_actions(self.opts, '/home/mp/devel/eventdrivetalk/actions')
def run(self): """ Entry point for the live plotting when started as a separate process. This starts the loop """ self.entity_name = current_process().name plogger.info("Starting new thread %s", self.entity_name) self.context = zmq.Context() self.socket = self.context.socket(zmq.SUB) self.socket.connect("tcp://localhost:%d" % self.port) topic = pickle.dumps(self.var_name, protocol=pickle.HIGHEST_PROTOCOL) self.socket.setsockopt(zmq.SUBSCRIBE, topic) plogger.info("Subscribed to topic %s on port %d", self.var_name, self.port) self.init(**self.init_kwargs) # Reference to animation required so that GC doesn't clean it up. # WILL NOT work if you remove it!!!!! # See: http://matplotlib.org/api/animation_api.html ani = animation.FuncAnimation(self.fig, self.loop, interval=100) self.plt.show()
def notify_msg(self, type, price): import zmq try: context = zmq.Context() socket = context.socket(zmq.PUSH) socket.connect ("tcp://%s:%s" % (config.ZMQ_HOST, config.ZMQ_PORT)) time.sleep(1) message = {'type':type, 'price':price} logging.info( "notify message %s", json.dumps(message)) socket.send_string(json.dumps(message)) except Exception as e: logging.warn("notify_msg Exception") pass
def rec(port): zmq_ctx = zmq.Context() s = zmq_ctx.socket(zmq.SUB) s.bind('tcp://*:{port}'.format(port=port)) s.setsockopt(zmq.SUBSCRIBE, b"") stream = ZMQStream(s) stream.on_recv_stream(rec_frame) ioloop.IOLoop.instance().start() while True: pass
def main(): port = "5556" context = zmq.Context() socket = context.socket(zmq.PAIR) socket.connect("tcp://localhost:%s" % port) socket.send_string(str('hello')) message = '00101110' cnt = 0 while True: reward = socket.recv() # 1 or 0, or '-1' for None print(reward) msg_in = socket.recv() print(msg_in) # think... msg_out = str(random.getrandbits(1) if cnt % 7 == 0 else 1) if cnt % 2 == 0: msg_out = str(message[cnt % 8]) socket.send(msg_out) cnt = cnt + 1
def __init__(self, cmd, port, address=None): try: import zmq except ImportError: raise ImportError("Must have zeromq for remote learner.") if address is None: address = '*' if port is None: port = 5556 elif int(port) < 1 or int(port) > 65535: raise ValueError("Invalid port number: %s" % port) self.context = zmq.Context() self.socket = self.context.socket(zmq.PAIR) self.socket.bind("tcp://%s:%s" % (address, port)) # launch learner if cmd is not None: subprocess.Popen((cmd + ' ' + str(port)).split()) handshake_in = self.socket.recv().decode('utf-8') assert handshake_in == 'hello' # handshake # send to learner, and get response;
def __init__(self): """ Object constructor. """ Command.__init__(self) self._zmq_context = None """ The ZMQ context. :type: Context """ self.__end_points = {} """ The end points of the Enarksh daemons. :type: dict[string,string] """ # ------------------------------------------------------------------------------------------------------------------
def serviceB(context=None): #reuse context if it exists, otherwise make a new one context = context or zmq.Context.instance() service = context.socket(zmq.DEALER) #identify worker service.setsockopt(zmq.IDENTITY,b'B') service.connect("tcp://localhost:5560") while True: message = service.recv() with myLock: print "Service B got:" print message if message == "Service B": #do some work time.sleep(random.uniform(0,0.5)) service.send(b"Service B cleaned your room") elif message == "END": break else: with myLock: print "the server has the wrong identities!" break
def tearDown(self): contexts = set([self.context]) while self.sockets: sock = self.sockets.pop() contexts.add(sock.context) # in case additional contexts are created sock.close(0) for ctx in contexts: t = Thread(target=ctx.term) t.daemon = True t.start() t.join(timeout=2) if t.is_alive(): # reset Context.instance, so the failure to term doesn't corrupt subsequent tests zmq.sugar.context.Context._instance = None raise RuntimeError("context could not terminate, open sockets likely remain in test") super(BaseZMQTestCase, self).tearDown()
def main(): context = zmq.Context() socket = zmq.Socket(context, zmq.SUB) monitor = socket.get_monitor_socket() socket.connect(ipc_sub_url) while True: status = recv_monitor_message(monitor) if status['event'] == zmq.EVENT_CONNECTED: break elif status['event'] == zmq.EVENT_CONNECT_DELAYED: pass print('connected') socket.subscribe('pupil') while True: topic = socket.recv_string() payload = serializer.loads(socket.recv(), encoding='utf-8') print(topic, payload)
def main(): try: context = zmq.Context(1) # Socket do cliente frontend = context.socket(zmq.XREP) frontend.bind("tcp://*:5559") # Socket do servidor backend = context.socket(zmq.XREQ) backend.bind("tcp://*:5560") zmq.device(zmq.QUEUE, frontend, backend) except : for val in sys.exc_info(): print(val) print("Desativa a fila") finally: pass frontend.close() backend.close() context.term()
def test_reqrep_raw_zmq_outside(nsproxy): """ Simple request-reply pattern between an agent and a direct ZMQ connection. """ def rep_handler(agent, message): return message # Create an osBrain agent that will receive the message a1 = run_agent('a1') a1.set_attr(received=None) addr = a1.bind('REP', transport='tcp', handler=rep_handler, serializer='raw') # Create a raw ZeroMQ REQ socket context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect('tcp://%s:%s' % (addr.address.host, addr.address.port)) # Send the message message = b'Hello world' socket.send(message) assert socket.recv() == message socket.close() context.destroy()
def test_pushpull_raw_zmq_outside(nsproxy): """ Simple push-pull pattern test. Channel without serialization. The message is sent from outside osBrain, through a ZMQ PUSH socket. """ # Create an osBrain agent that will receive the message a1 = run_agent('a1') a1.set_attr(received=None) addr = a1.bind('PULL', transport='tcp', handler=set_received, serializer='raw') # Create a raw ZeroMQ PUSH socket context = zmq.Context() socket = context.socket(zmq.PUSH) socket.connect('tcp://%s:%s' % (addr.address.host, addr.address.port)) # Send the message message = b'Hello world' socket.send(message) assert wait_agent_attr(a1, name='received', value=message) socket.close() context.destroy()
def connect(self): self.context = zmq.Context() if not self.context: raise RuntimeError('Failed to create ZMQ context!') self.socket = self.context.socket(zmq.REQ) if not self.socket: raise RuntimeError('Failed to create ZMQ socket!') self.socket.connect(self.endpoint) self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN) self.is_connected = True
def connect(self): self.context = zmq.Context() if not self.context: raise RuntimeError('Failed to create ZMQ context!') self.socket = self.context.socket(zmq.PULL) if not self.socket: raise RuntimeError('Failed to create ZMQ socket!') self.socket.bind(self.endpoint) self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN) self.is_connected = True
def connect(self): self.context = zmq.Context() if not self.context: raise RuntimeError('Failed to create ZMQ context!') self.socket = self.context.socket(zmq.REP) if not self.socket: raise RuntimeError('Failed to create ZMQ socket!') self.socket.bind(self.endpoint) self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN) self.is_connected = True
def __init__(self, repAddress, pubAddress): """Constructor""" super(RpcServer, self).__init__() # ??????????key?????value????? self.__functions = {} # zmq???? self.__context = zmq.Context() self.__socketREP = self.__context.socket(zmq.REP) # ????socket self.__socketREP.bind(repAddress) self.__socketPUB = self.__context.socket(zmq.PUB) # ????socket self.__socketPUB.bind(pubAddress) # ?????? self.__active = False # ???????? self.__thread = threading.Thread(target=self.run) # ???????? #----------------------------------------------------------------------
def __init__(self, reqAddress, subAddress): """Constructor""" super(RpcClient, self).__init__() # zmq???? self.__reqAddress = reqAddress self.__subAddress = subAddress self.__context = zmq.Context() self.__socketREQ = self.__context.socket(zmq.REQ) # ????socket self.__socketSUB = self.__context.socket(zmq.SUB) # ????socket # ??????????????????? self.__active = False # ???????? self.__thread = threading.Thread(target=self.run) # ???????? #----------------------------------------------------------------------
def router_main(_, pidx, args): log = get_logger('examples.zmqserver.extra', pidx) ctx = zmq.Context() ctx.linger = 0 in_sock = ctx.socket(zmq.PULL) in_sock.bind('tcp://*:5000') out_sock = ctx.socket(zmq.PUSH) out_sock.bind('ipc://example-events') try: log.info('router proxy started') zmq.proxy(in_sock, out_sock) except KeyboardInterrupt: pass except: log.exception('unexpected error') finally: log.info('router proxy terminated') in_sock.close() out_sock.close() ctx.term()
def reset(self): self.status = READY context = zmq.Context() self._socket1 = context.socket(zmq.PUSH) self._socket1.bind(self._address1) self._socket1.set_hwm(32) self._socket2 = context.socket(zmq.PULL) self._socket2.set_hwm(32) self._socket2.RCVTIMEO = 1 self._socket2.bind(self._address2) self._prev_drained = False self._sub_drained = False self._conn1_send_count = 0 self._conn1_recv_count = {} self._conn2_send_count = {} self._conn2_recv_count = 0 self._retry_count = 0
def reset(self): self.status = READY context = zmq.Context() self._socket = context.socket(zmq.PULL) self._socket.RCVTIMEO = 1 sync_socket = context.socket(zmq.PUSH) while self._ports['conn1'] is None or self._ports['sync_conn1'] is None: sleep(0.01) # Handshake with main process self._socket.connect(self._address + ':' + str(self._ports['conn1'])) sync_socket.connect(self._address + ':' + str(self._ports['sync_conn1'])) packet = msgpack.dumps(b'SYNC') sync_socket.send(packet) sync_socket.close() self._num_recv = 0 self._drained = False
def test_pub(self): """Publish log messages. bind() to PUB socket.""" # pylint: disable=E1101 context = zmq.Context() pub = context.socket(zmq.PUB) try: pub.bind('tcp://*:{}'.format(self.sub_port)) except zmq.ZMQError as error: print(error) time.sleep(0.1) send_count = self.send_count for i in range(send_count): pub.send_string('hi there {}'.format(i)) time.sleep(1e-5) sys.stdout.flush() # Wait for the watcher thread to exit. while self.watcher.isAlive(): self.watcher.join(timeout=1e-5) pub.close() context.term()
def test_pub(self): """Publish log messages. connect() to PUB socket.""" # pylint: disable=E1101 context = zmq.Context() pub = context.socket(zmq.PUB) try: _address = 'tcp://{}:{}'.format(self.sub_host, self.sub_port) pub.connect(_address) except zmq.ZMQError as error: print('ERROR:', error) time.sleep(0.1) send_count = self.send_count for i in range(send_count): pub.send_string('hi there {}'.format(i)) time.sleep(1e-5) # Wait for the watcher thread to exit while self.watcher.isAlive(): self.watcher.join(timeout=1e-5) pub.close() context.term()
def to(cls, channel, host='127.0.0.1', port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, level=logging.NOTSET): """Convenience class method to create a ZmqLoghandler and connect to a ZMQ subscriber. Args: channel (string): Logging channel name. This is used to build a ZMQ topic. host (string): Hostname / ip address of the subscriber to publish to. port (int, string): Port on which to publish messages. level (int): Logging level """ context = zmq.Context() publisher = context.socket(zmq.PUB) address = 'tcp://{}:{}'.format(host, port) publisher.connect(address) time.sleep(0.1) # This sleep hopefully fixes the silent joiner problem. return cls(channel, publisher, level=level)
def __init__(self): # if not exist server, spawn server, try except around context = zmq.Context() # try to start server in background os.system("justdb serve &") main_socket = context.socket(zmq.REQ) main_socket.connect("tcp://localhost:5555") # print("Connecting to write server") freeze_socket = context.socket(zmq.REQ) freeze_socket.connect("tcp://localhost:6666") self.main_socket = main_socket self.freeze_socket = freeze_socket
def create_server(): context = zmq.Context() try: main_socket = context.socket(zmq.REP) main_socket.bind("tcp://*:5555") freeze_socket = context.socket(zmq.REP) freeze_socket.bind("tcp://*:6666") except zmq.error.ZMQError: print("JustDB already running, this is no error.") sys.exit() print("Successfully started \033[92mjustdb\033[0m") while True: # pragma: no cover _ = main_socket.recv() main_socket.send(b"") _ = freeze_socket.recv() freeze_socket.send(b"")
def create_socket(port): """ Create zmq sub socket. """ context = zmq.Context() socket = context.socket(zmq.SUB) try: socket.bind("tcp://*:%s" % port) except zmq.error.ZMQError: print("Address already in use") sys.exit(1) socket.setsockopt(zmq.SUBSCRIBE, b"") print("Start node-masternode Subscribe") return socket, context
def __init__(self, addr="*", port="8080", logger=None): self.logger = logger # create a socket object self.context = zmq.Context() self.complete_address = Address(addr, port).complete_address self.sync_address = '' # Socket used with the following node self.list_communication_channel = None # This part is just for test # if port == '5555': # self.sync_address = Address(addr, '5562').complete_address # elif port == '5556': # self.sync_address = Address(addr, '5563').complete_address # elif port == '5557': # self.sync_address = Address(addr, '5564').complete_address
def forward(self, data): try: # self.logger.debug('sending message') self.list_communication_channel.send(data) # self.logger.debug('ok with the message') except zmq.NotDone: # time.sleep(TRY_TIMEOUT) self.logger.debug('my recipient is dead, not done') self.list_communication_channel.close() except zmq.Again: self.logger.debug('my recipient is dead') # self.list_communication_channel.close() raise zmq.Again except zmq.ZMQError as a: self.logger.debug("Error in message forward " + a.strerror) self.context.destroy() self.context = zmq.Context()
def send_int_message(self, msg=b'ALIVE', timeout=TRACKER_INFINITE_TIMEOUT): try: self.logger.debug('sending message to {}'.format(self.sync_address)) tracker_object = self.list_communication_channel.send(msg, track=True, copy=False) # wait forever tracker_object.wait(timeout) # self.logger.debug('ok with the message') except zmq.NotDone: self.logger.debug('Something went wrong with that message') time.sleep(TRY_TIMEOUT) # self.logger.debug('Sleep finished') # self.list_communication_channel.close() except zmq.ZMQError as a: self.logger.debug(a.strerror) self.context.destroy() self.context = zmq.Context() self.generate_internal_channel_client_side() # used when it's the first time to sync
def flash(self): if self.pid != str(os.getpid()): # reset process pid self.pid = str(os.getpid()) # update zmq sockets # (couldnt share socket in differenet process) self.zmq_socket = zmq.Context().socket(zmq.REQ) self.zmq_file_socket = zmq.Context().socket(zmq.DEALER) # update context ctx = main_context(self.main_file, self.main_folder) if self.main_param is not None: main_config_path = os.path.join(self.main_folder, self.main_param) params = yaml.load(open(main_config_path, 'r')) ctx.params = params self.context = ctx
def prepare(): config = Config() global tee global input_files_dir global result_files_dir context = zmq.Context() logger_socket = context.socket(zmq.PUSH) logger_socket.connect(config.server_log['external_url']) tee = logger_socket.send_string atexit.register(close_sockets, [logger_socket]) input_files_dir = os.path.expanduser(config.server_files['input_files_dir']) result_files_dir = os.path.expanduser(config.server_files['result_files_dir']) tee('Started service files with pid {}'.format(os.getpid())) return config
def main(): context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect("tcp://%s:%s" % (config.LISTEN_ON_IP, config.LISTEN_ON_PORT)) while True: command = input("Command: ") socket.send(command.encode(config.CODEC)) response = socket.recv().decode(config.CODEC) print(" ... %s" % response) words = shlex.split(response.lower()) status = words[0] if len(words) > 1: info = words[1:] if status == "finished": print("Finished status received from robot") break
def zmq_streamer(): try: context = zmq.Context() # Socket facing clients frontend = context.socket(zmq.PUSH) frontend.bind("tcp://*:%s" % (zmq_queue_port_push)) # Socket facing services backend = context.socket(zmq.PULL) backend.bind("tcp://*:%s" % (zmq_queue_port_pull)) zmq.device(zmq.STREAMER, frontend, backend) except Exception as e: print(e) print("bringing down zmq device") finally: frontend.close() backend.close() context.term()
def __init__(self, data_dir=bqueryd.DEFAULT_DATA_DIR, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.DEBUG): if not os.path.exists(data_dir) or not os.path.isdir(data_dir): raise Exception("Datadir %s is not a valid directory" % data_dir) self.worker_id = binascii.hexlify(os.urandom(8)) self.node_name = socket.gethostname() self.data_dir = data_dir self.data_files = set() context = zmq.Context() self.socket = context.socket(zmq.ROUTER) self.socket.setsockopt(zmq.LINGER, 500) self.socket.identity = self.worker_id self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT) self.redis_server = redis.from_url(redis_url) self.controllers = {} # Keep a dict of timestamps when you last spoke to controllers self.check_controllers() self.last_wrm = 0 self.start_time = time.time() self.logger = bqueryd.logger.getChild('worker ' + self.worker_id) self.logger.setLevel(loglevel) self.msg_count = 0 signal.signal(signal.SIGTERM, self.term_signal())
def __init__(self, address=None, timeout=120, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.INFO, retries=3): self.logger = bqueryd.logger.getChild('rpc') self.logger.setLevel(loglevel) self.context = zmq.Context() self.redis_url = redis_url redis_server = redis.from_url(redis_url) self.retries = retries self.timeout = timeout self.identity = binascii.hexlify(os.urandom(8)) if not address: # Bind to a random controller controllers = list(redis_server.smembers(bqueryd.REDIS_SET_KEY)) if len(controllers) < 1: raise Exception('No Controllers found in Redis set: ' + bqueryd.REDIS_SET_KEY) random.shuffle(controllers) else: controllers = [address] self.controllers = controllers self.connect_socket()
def __init__(self, push, pull, redis_conf): super(MinerClient, self).__init__() print("Connecting to Redis cache {} ...".format(redis_conf)) redis_host, redis_port, redis_db = redis_conf.split(":") self.redis = redis.StrictRedis(host=redis_host, port=int(redis_port), db=int(redis_db)) self.redis.setnx('transaction', 0) # NOTE: Expiration times for pending/processed tasks in seconds. self.transaction_expiration = 60 * 60 self.result_expiration = 60 * 10 context = zmq.Context() print("Connecting to push socket '{}' ...".format(push)) self.push = context.socket(zmq.PUSH) self.push.connect(push) print("Binding to pull socket '{}' ...".format(pull)) self.pull = context.socket(zmq.PULL) self.pull.bind(pull)
def brute_zmq(host, port=5555, user=None, password=None, db=0): context = zmq.Context() # Configure socket = context.socket(zmq.SUB) socket.setsockopt(zmq.SUBSCRIBE, b"") # All topics socket.setsockopt(zmq.LINGER, 0) # All topics socket.RCVTIMEO = 1000 # timeout: 1 sec # Connect socket.connect("tcp://%s:%s" % (host, port)) # Try to receive try: socket.recv() return True except Exception: return False finally: socket.close()
def handle_zmq(host, port=5555, extra_config=None): # log.debug(" * Connection to ZeroMQ: %s : %s" % (host, port)) context = zmq.Context() # Configure socket = context.socket(zmq.SUB) socket.setsockopt(zmq.SUBSCRIBE, b"") # All topics socket.setsockopt(zmq.LINGER, 0) # All topics socket.RCVTIMEO = 1000 # timeout: 1 sec # Connect socket.connect("tcp://%s:%s" % (host, port)) # Try to receive try: socket.recv() return True except Exception: return False finally: socket.close()
def test_tcp_req_socket(event_loop, socket_factory, connect_or_bind): rep_socket = socket_factory.create(zmq.REP) connect_or_bind(rep_socket, 'tcp://127.0.0.1:3333', reverse=True) def run(): frames = rep_socket.recv_multipart() assert frames == [b'my', b'question'] rep_socket.send_multipart([b'your', b'answer']) with run_in_background(run): async with azmq.Context(loop=event_loop) as context: socket = context.socket(azmq.REQ) connect_or_bind(socket, 'tcp://127.0.0.1:3333') await asyncio.wait_for( socket.send_multipart([b'my', b'question']), 1, ) frames = await asyncio.wait_for(socket.recv_multipart(), 1) assert frames == [b'your', b'answer']
def test_tcp_rep_socket(event_loop, socket_factory, connect_or_bind): req_socket = socket_factory.create(zmq.REQ) connect_or_bind(req_socket, 'tcp://127.0.0.1:3333', reverse=True) def run(): req_socket.send_multipart([b'my', b'question']) frames = req_socket.recv_multipart() assert frames == [b'your', b'answer'] with run_in_background(run): async with azmq.Context(loop=event_loop) as context: socket = context.socket(azmq.REP) connect_or_bind(socket, 'tcp://127.0.0.1:3333') frames = await asyncio.wait_for(socket.recv_multipart(), 1) assert frames == [b'my', b'question'] await asyncio.wait_for( socket.send_multipart([b'your', b'answer']), 1, )
def test_tcp_dealer_socket(event_loop, socket_factory, connect_or_bind): rep_socket = socket_factory.create(zmq.REP) connect_or_bind(rep_socket, 'tcp://127.0.0.1:3333', reverse=True) def run(): frames = rep_socket.recv_multipart() assert frames == [b'my', b'question'] rep_socket.send_multipart([b'your', b'answer']) with run_in_background(run): async with azmq.Context(loop=event_loop) as context: socket = context.socket(azmq.DEALER) connect_or_bind(socket, 'tcp://127.0.0.1:3333') await asyncio.wait_for( socket.send_multipart([b'', b'my', b'question']), 1, ) frames = await asyncio.wait_for(socket.recv_multipart(), 1) assert frames == [b'', b'your', b'answer']
def test_tcp_router_socket(event_loop, socket_factory, connect_or_bind): req_socket = socket_factory.create(zmq.REQ) req_socket.identity = b'abcd' connect_or_bind(req_socket, 'tcp://127.0.0.1:3333', reverse=True) def run(): req_socket.send_multipart([b'my', b'question']) frames = req_socket.recv_multipart() assert frames == [b'your', b'answer'] with run_in_background(run): async with azmq.Context(loop=event_loop) as context: socket = context.socket(azmq.ROUTER) connect_or_bind(socket, 'tcp://127.0.0.1:3333') frames = await asyncio.wait_for(socket.recv_multipart(), 1) identity = frames.pop(0) assert identity == req_socket.identity assert frames == [b'', b'my', b'question'] await asyncio.wait_for( socket.send_multipart([identity, b'', b'your', b'answer']), 1, )
def test_tcp_xpub_socket(event_loop, socket_factory, connect_or_bind): sub_socket = socket_factory.create(zmq.SUB) sub_socket.setsockopt(zmq.SUBSCRIBE, b'a') connect_or_bind(sub_socket, 'tcp://127.0.0.1:3333', reverse=True) def run(): frames = sub_socket.recv_multipart() assert frames == [b'a', b'message'] with run_in_background(run) as thread_done_event: async with azmq.Context(loop=event_loop) as context: socket = context.socket(azmq.XPUB) connect_or_bind(socket, 'tcp://127.0.0.1:3333') frames = await asyncio.wait_for(socket.recv_multipart(), 1) assert frames == [b'\1a'] while not thread_done_event.is_set(): await socket.send_multipart([b'a', b'message']) await socket.send_multipart([b'b', b'wrong']) sub_socket.close() frames = await asyncio.wait_for(socket.recv_multipart(), 1) assert frames == [b'\0a']
def test_tcp_sub_socket(event_loop, socket_factory, connect_or_bind): xpub_socket = socket_factory.create(zmq.XPUB) connect_or_bind(xpub_socket, 'tcp://127.0.0.1:3333', reverse=True) def run(): # Wait one second for the subscription to arrive. assert xpub_socket.poll(1000) == zmq.POLLIN topic = xpub_socket.recv_multipart() assert topic == [b'\x01a'] xpub_socket.send_multipart([b'a', b'message']) if connect_or_bind == 'connect': assert xpub_socket.poll(1000) == zmq.POLLIN topic = xpub_socket.recv_multipart() assert topic == [b'\x00a'] with run_in_background(run): async with azmq.Context(loop=event_loop) as context: socket = context.socket(azmq.SUB) await socket.subscribe(b'a') connect_or_bind(socket, 'tcp://127.0.0.1:3333') frames = await asyncio.wait_for(socket.recv_multipart(), 1) assert frames == [b'a', b'message']
def test_tcp_xsub_socket(event_loop, socket_factory, connect_or_bind): xpub_socket = socket_factory.create(zmq.XPUB) connect_or_bind(xpub_socket, 'tcp://127.0.0.1:3333', reverse=True) def run(): # Wait one second for the subscription to arrive. assert xpub_socket.poll(1000) == zmq.POLLIN topic = xpub_socket.recv_multipart() assert topic == [b'\x01a'] xpub_socket.send_multipart([b'a', b'message']) if connect_or_bind == 'connect': assert xpub_socket.poll(1000) == zmq.POLLIN topic = xpub_socket.recv_multipart() assert topic == [b'\x00a'] with run_in_background(run): async with azmq.Context(loop=event_loop) as context: socket = context.socket(azmq.XSUB) await socket.send_multipart([b'\x01a']) connect_or_bind(socket, 'tcp://127.0.0.1:3333') frames = await asyncio.wait_for(socket.recv_multipart(), 1) assert frames == [b'a', b'message']
def test_tcp_push_socket(event_loop, socket_factory, connect_or_bind): pull_socket = socket_factory.create(zmq.PULL) connect_or_bind(pull_socket, 'tcp://127.0.0.1:3333', reverse=True) def run(): assert pull_socket.poll(1000) == zmq.POLLIN message = pull_socket.recv_multipart() assert message == [b'hello', b'world'] with run_in_background(run) as event: async with azmq.Context(loop=event_loop) as context: socket = context.socket(azmq.PUSH) connect_or_bind(socket, 'tcp://127.0.0.1:3333') await socket.send_multipart([b'hello', b'world']) while not event.is_set(): await asyncio.sleep(0.1)