我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.REP。
def receive_message(self, event, event_data, listener_data): """ Receives a messages from another processes. :param * event: Not used. :param * event_data: Not used. :param * listener_data: Not used. """ del event, event_data, listener_data # Make a poller for all incoming sockets. poller = zmq.Poller() for socket in self.__end_points.values(): if socket.type in [zmq.PULL, zmq.REP]: poller.register(socket, zmq.POLLIN) # Wait for socket is ready for reading. socks = dict(poller.poll()) for name, socket in self.__end_points.items(): if socket in socks: self._receive_message(name, socket) # ------------------------------------------------------------------------------------------------------------------
def __register_sockets(self): """ Registers ZMQ sockets for communication with other processes in Enarksh. """ config = Config.get() # Register socket for receiving asynchronous incoming messages. self.message_controller.register_end_point('pull', zmq.PULL, config.get_controller_pull_end_point()) # Create socket for lockstep incoming messages. self.message_controller.register_end_point('lockstep', zmq.REP, config.get_controller_lockstep_end_point()) # Create socket for sending asynchronous messages to the spanner. self.message_controller.register_end_point('spawner', zmq.PUSH, config.get_spawner_pull_end_point()) # Create socket for sending asynchronous messages to the logger. self.message_controller.register_end_point('logger', zmq.PUSH, config.get_logger_pull_end_point()) # ------------------------------------------------------------------------------------------------------------------
def register_end_point(self, name, socket_type, end_point): """ Registers an end point. :param str name: The name of the end point. :param int socket_type: The socket type, one of - zmq.PULL for asynchronous incoming messages - zmq.REP for lockstep incoming messages - zmq.PUSH for asynchronous outgoing messages :param str end_point: The end point. """ socket = self.__zmq_context.socket(socket_type) self.__end_points[name] = socket if socket_type in [zmq.PULL, zmq.REP]: socket.bind(end_point) elif socket_type == zmq.PUSH: socket.connect(end_point) else: raise ValueError("Unknown socket type {0}".format(socket_type)) # ------------------------------------------------------------------------------------------------------------------
def no_barking(self, seconds): """ During start up of ZMQ the incoming file descriptors become 'ready for reading' while there is no message on the socket. This method prevent incoming sockets barking that the are ready the for reading. :param int seconds: The number of seconds the give the other ZMQ thread to start up. """ sleep(seconds) for _ in range(1, len(self.end_points)): poller = zmq.Poller() for socket in self.end_points.values(): if socket.type in [zmq.PULL, zmq.REP]: poller.register(socket, zmq.POLLIN) poller.poll(1) # ----------------------------------------------------------------------------------------------------------------------
def test_poller_events(self): """Tornado poller implementation maps events correctly""" req,rep = self.create_bound_pair(zmq.REQ, zmq.REP) poller = ioloop.ZMQPoller() poller.register(req, ioloop.IOLoop.READ) poller.register(rep, ioloop.IOLoop.READ) events = dict(poller.poll(0)) self.assertEqual(events.get(rep), None) self.assertEqual(events.get(req), None) poller.register(req, ioloop.IOLoop.WRITE) poller.register(rep, ioloop.IOLoop.WRITE) events = dict(poller.poll(1)) self.assertEqual(events.get(req), ioloop.IOLoop.WRITE) self.assertEqual(events.get(rep), None) poller.register(rep, ioloop.IOLoop.READ) req.send(b'hi') events = dict(poller.poll(1)) self.assertEqual(events.get(rep), ioloop.IOLoop.READ) self.assertEqual(events.get(req), None)
def test_monitor_connected(self): """Test connected monitoring socket.""" s_rep = self.context.socket(zmq.REP) s_req = self.context.socket(zmq.REQ) self.sockets.extend([s_rep, s_req]) s_req.bind("tcp://127.0.0.1:6667") # try monitoring the REP socket # create listening socket for monitor s_event = s_rep.get_monitor_socket() s_event.linger = 0 self.sockets.append(s_event) # test receive event for connect event s_rep.connect("tcp://127.0.0.1:6667") m = recv_monitor_message(s_event) if m['event'] == zmq.EVENT_CONNECT_DELAYED: self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667") # test receive event for connected event m = recv_monitor_message(s_event) self.assertEqual(m['event'], zmq.EVENT_CONNECTED) self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
def test_single_socket_forwarder_connect(self): if zmq.zmq_version() in ('4.1.1', '4.0.6'): raise SkipTest("libzmq-%s broke single-socket devices" % zmq.zmq_version()) dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1) req = self.context.socket(zmq.REQ) port = req.bind_to_random_port('tcp://127.0.0.1') dev.connect_in('tcp://127.0.0.1:%i'%port) dev.start() time.sleep(.25) msg = b'hello' req.send(msg) self.assertEqual(msg, self.recv(req)) del dev req.close() dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1) req = self.context.socket(zmq.REQ) port = req.bind_to_random_port('tcp://127.0.0.1') dev.connect_out('tcp://127.0.0.1:%i'%port) dev.start() time.sleep(.25) msg = b'hello again' req.send(msg) self.assertEqual(msg, self.recv(req)) del dev req.close()
def connect(self): self.context = zmq.Context() if not self.context: raise RuntimeError('Failed to create ZMQ context!') self.socket = self.context.socket(zmq.REP) if not self.socket: raise RuntimeError('Failed to create ZMQ socket!') self.socket.bind(self.endpoint) self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN) self.is_connected = True
def __init__(self, repAddress, pubAddress): """Constructor""" super(RpcServer, self).__init__() # ??????????key?????value????? self.__functions = {} # zmq???? self.__context = zmq.Context() self.__socketREP = self.__context.socket(zmq.REP) # ????socket self.__socketREP.bind(repAddress) self.__socketPUB = self.__context.socket(zmq.PUB) # ????socket self.__socketPUB.bind(pubAddress) # ?????? self.__active = False # ???????? self.__thread = threading.Thread(target=self.run) # ???????? #----------------------------------------------------------------------
def create_server(): context = zmq.Context() try: main_socket = context.socket(zmq.REP) main_socket.bind("tcp://*:5555") freeze_socket = context.socket(zmq.REP) freeze_socket.bind("tcp://*:6666") except zmq.error.ZMQError: print("JustDB already running, this is no error.") sys.exit() print("Successfully started \033[92mjustdb\033[0m") while True: # pragma: no cover _ = main_socket.recv() main_socket.send(b"") _ = freeze_socket.recv() freeze_socket.send(b"")
def test_tcp_req_socket(event_loop, socket_factory, connect_or_bind): rep_socket = socket_factory.create(zmq.REP) connect_or_bind(rep_socket, 'tcp://127.0.0.1:3333', reverse=True) def run(): frames = rep_socket.recv_multipart() assert frames == [b'my', b'question'] rep_socket.send_multipart([b'your', b'answer']) with run_in_background(run): async with azmq.Context(loop=event_loop) as context: socket = context.socket(azmq.REQ) connect_or_bind(socket, 'tcp://127.0.0.1:3333') await asyncio.wait_for( socket.send_multipart([b'my', b'question']), 1, ) frames = await asyncio.wait_for(socket.recv_multipart(), 1) assert frames == [b'your', b'answer']
def test_tcp_rep_socket(event_loop, socket_factory, connect_or_bind): req_socket = socket_factory.create(zmq.REQ) connect_or_bind(req_socket, 'tcp://127.0.0.1:3333', reverse=True) def run(): req_socket.send_multipart([b'my', b'question']) frames = req_socket.recv_multipart() assert frames == [b'your', b'answer'] with run_in_background(run): async with azmq.Context(loop=event_loop) as context: socket = context.socket(azmq.REP) connect_or_bind(socket, 'tcp://127.0.0.1:3333') frames = await asyncio.wait_for(socket.recv_multipart(), 1) assert frames == [b'my', b'question'] await asyncio.wait_for( socket.send_multipart([b'your', b'answer']), 1, )
def test_tcp_dealer_socket(event_loop, socket_factory, connect_or_bind): rep_socket = socket_factory.create(zmq.REP) connect_or_bind(rep_socket, 'tcp://127.0.0.1:3333', reverse=True) def run(): frames = rep_socket.recv_multipart() assert frames == [b'my', b'question'] rep_socket.send_multipart([b'your', b'answer']) with run_in_background(run): async with azmq.Context(loop=event_loop) as context: socket = context.socket(azmq.DEALER) connect_or_bind(socket, 'tcp://127.0.0.1:3333') await asyncio.wait_for( socket.send_multipart([b'', b'my', b'question']), 1, ) frames = await asyncio.wait_for(socket.recv_multipart(), 1) assert frames == [b'', b'your', b'answer']
def test_tcp_big_messages(event_loop, socket_factory, connect_or_bind): rep_socket = socket_factory.create(zmq.REP) connect_or_bind(rep_socket, 'tcp://127.0.0.1:3333', reverse=True) def run(): frames = rep_socket.recv_multipart() assert frames == [b'1' * 500, b'2' * 100000] rep_socket.send_multipart([b'3' * 500, b'4' * 100000]) with run_in_background(run): async with azmq.Context(loop=event_loop) as context: socket = context.socket(azmq.REQ) connect_or_bind(socket, 'tcp://127.0.0.1:3333') await asyncio.wait_for( socket.send_multipart([b'1' * 500, b'2' * 100000]), 1, ) frames = await asyncio.wait_for(socket.recv_multipart(), 1) assert frames == [b'3' * 500, b'4' * 100000]
def test_single_socket_forwarder_connect(self): dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1) req = self.context.socket(zmq.REQ) port = req.bind_to_random_port('tcp://127.0.0.1') dev.connect_in('tcp://127.0.0.1:%i'%port) dev.start() time.sleep(.25) msg = b'hello' req.send(msg) self.assertEqual(msg, self.recv(req)) del dev req.close() dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1) req = self.context.socket(zmq.REQ) port = req.bind_to_random_port('tcp://127.0.0.1') dev.connect_out('tcp://127.0.0.1:%i'%port) dev.start() time.sleep(.25) msg = b'hello again' req.send(msg) self.assertEqual(msg, self.recv(req)) del dev req.close()
def listen_thread(): ctx = zmq.Context() # Reply - act as server socket = ctx.socket(zmq.REP) socket.bind('tcp://*:%s' % config.ZEROMQ_PORT) while True: request = socket.recv() print('ZEROMQ request -', request) socket.send('OK') if request == 'KILL': socket.close() print('Socket closed.') break with webapp.app_context(): data = json.loads(request) print('Sending to SocketIO ...') socketio.emit( 'new_update', data['msg'] + ' at ' + data['timestamp'], namespace='/browser', )
def __init__(self, callback, host=None, res_port=None, use_security=False): if host is None: host = env.get_master_host() context = zmq.Context() self._socket = context.socket(zmq.REP) self._auth = None if use_security: self._auth = Authenticator.instance( env.get_server_public_key_dir()) self._auth.set_server_key( self._socket, env.get_server_secret_key_path()) if res_port is None: res_port = env.get_res_port() self._socket.connect( 'tcp://{host}:{port}'.format(host=host, port=res_port)) self._callback = callback self._thread = None self._lock = threading.Lock()
def __init__(self, name, priority, actor_context, endpoints, operation_function, operation_queue): """ Create a server Keyword arguments: name - Name of the timer priority - Priority of the subscriber actor_context - ZMQ context of the actor process endpoints - A list of endpoint strings operation_function - Operation function of the subscriber operation_queue - The operation queue object """ self.name = name self.priority = priority self.endpoints = endpoints self.operation_function = operation_function self.operation_queue = operation_queue self.context = actor_context self.server_socket = self.context.socket(zmq.REP) for endpoint in self.endpoints: self.server_socket.bind(endpoint) self.ready = True self.func_mutex = Lock()
def run(): print("Getting ready for hello world client. Ctrl-C to exit.\n") socket = Ctx.socket(zmq.REP) socket.bind(Url) while True: # Wait for next request from client message = await socket.recv() print("Received request: {}".format(message)) # Do some "work" await asyncio.sleep(1) # Send reply back to client message = message.decode('utf-8') message = '{}, world'.format(message) message = message.encode('utf-8') print("Sending reply: {}".format(message)) await socket.send(message)
def start(self): """Create and bind the ZAP socket""" self.zap_socket = self.context.socket(zmq.REP) self.zap_socket.linger = 1 self.zap_socket.bind("inproc://zeromq.zap.01")
def test_subclass(self): """subclasses can assign attributes""" class S(zmq.Socket): a = None def __init__(self, *a, **kw): self.a=-1 super(S, self).__init__(*a, **kw) s = S(self.context, zmq.REP) self.sockets.append(s) self.assertEqual(s.a, -1) s.a=1 self.assertEqual(s.a, 1) a=s.a self.assertEqual(a, 1)
def test_close_after_destroy(self): """s.close() after ctx.destroy() should be fine""" ctx = self.Context() s = ctx.socket(zmq.REP) ctx.destroy() # reaper is not instantaneous time.sleep(1e-2) s.close() self.assertTrue(s.closed)
def test_many_sockets(self): """opening and closing many sockets shouldn't cause problems""" ctx = self.Context() for i in range(16): sockets = [ ctx.socket(zmq.REP) for i in range(65) ] [ s.close() for s in sockets ] # give the reaper a chance time.sleep(1e-2) ctx.term()
def test_destroy(self): """Context.destroy should close sockets""" ctx = self.Context() sockets = [ ctx.socket(zmq.REP) for i in range(65) ] # close half of the sockets [ s.close() for s in sockets[::2] ] ctx.destroy() # reaper is not instantaneous time.sleep(1e-2) for s in sockets: self.assertTrue(s.closed)
def test_term_thread(self): """ctx.term should not crash active threads (#139)""" ctx = self.Context() evt = Event() evt.clear() def block(): s = ctx.socket(zmq.REP) s.bind_to_random_port('tcp://127.0.0.1') evt.set() try: s.recv() except zmq.ZMQError as e: self.assertEqual(e.errno, zmq.ETERM) return finally: s.close() self.fail("recv should have been interrupted with ETERM") t = Thread(target=block) t.start() evt.wait(1) self.assertTrue(evt.is_set(), "sync event never fired") time.sleep(0.01) ctx.term() t.join(timeout=1) self.assertFalse(t.is_alive(), "term should have interrupted s.recv()")
def test_basic(self): s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP) msg1 = b'message 1' msg2 = self.ping_pong(s1, s2, msg1) self.assertEqual(msg1, msg2)
def test_multiple(self): s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP) for i in range(10): msg1 = i*b' ' msg2 = self.ping_pong(s1, s2, msg1) self.assertEqual(msg1, msg2)
def test_bad_send_recv(self): s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP) if zmq.zmq_version() != '2.1.8': # this doesn't work on 2.1.8 for copy in (True,False): self.assertRaisesErrno(zmq.EFSM, s1.recv, copy=copy) self.assertRaisesErrno(zmq.EFSM, s2.send, b'asdf', copy=copy) # I have to have this or we die on an Abort trap. msg1 = b'asdf' msg2 = self.ping_pong(s1, s2, msg1) self.assertEqual(msg1, msg2)
def test_json(self): s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP) o = dict(a=10,b=list(range(10))) o2 = self.ping_pong_json(s1, s2, o)
def test_large_msg(self): s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP) msg1 = 10000*b'X' for i in range(10): msg2 = self.ping_pong(s1, s2, msg1) self.assertEqual(msg1, msg2)
def setUp(self): self.context = zmq.Context() self.socket = self.context.socket(zmq.REP) self.loop = ioloop.IOLoop.instance() self.stream = zmqstream.ZMQStream(self.socket)
def test_again(self): s = self.context.socket(zmq.REP) self.assertRaises(Again, s.recv, zmq.NOBLOCK) self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK) s.close()
def atest_ctxterm(self): s = self.context.socket(zmq.REP) t = Thread(target=self.context.term) t.start() self.assertRaises(ContextTerminated, s.recv, zmq.NOBLOCK) self.assertRaisesErrno(zmq.TERM, s.recv, zmq.NOBLOCK) s.close() t.join()
def zap_handler(self): socket = self.context.socket(zmq.REP) socket.bind("inproc://zeromq.zap.01") try: msg = self.recv_multipart(socket) version, sequence, domain, address, identity, mechanism = msg[:6] if mechanism == b'PLAIN': username, password = msg[6:] elif mechanism == b'CURVE': key = msg[6] self.assertEqual(version, b"1.0") self.assertEqual(identity, b"IDENT") reply = [version, sequence] if mechanism == b'CURVE' or \ (mechanism == b'PLAIN' and username == USER and password == PASS) or \ (mechanism == b'NULL'): reply.extend([ b"200", b"OK", b"anonymous", b"\5Hello\0\0\0\5World", ]) else: reply.extend([ b"400", b"Invalid username or password", b"", b"", ]) socket.send_multipart(reply) finally: socket.close()
def test_monitor(self): """Test monitoring interface for sockets.""" s_rep = self.context.socket(zmq.REP) s_req = self.context.socket(zmq.REQ) self.sockets.extend([s_rep, s_req]) s_req.bind("tcp://127.0.0.1:6666") # try monitoring the REP socket s_rep.monitor("inproc://monitor.rep", zmq.EVENT_ALL) # create listening socket for monitor s_event = self.context.socket(zmq.PAIR) self.sockets.append(s_event) s_event.connect("inproc://monitor.rep") s_event.linger = 0 # test receive event for connect event s_rep.connect("tcp://127.0.0.1:6666") m = recv_monitor_message(s_event) if m['event'] == zmq.EVENT_CONNECT_DELAYED: self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666") # test receive event for connected event m = recv_monitor_message(s_event) self.assertEqual(m['event'], zmq.EVENT_CONNECTED) self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666") # test monitor can be disabled. s_rep.disable_monitor() m = recv_monitor_message(s_event) self.assertEqual(m['event'], zmq.EVENT_MONITOR_STOPPED)
def test_single_socket_forwarder_bind(self): if zmq.zmq_version() in ('4.1.1', '4.0.6'): raise SkipTest("libzmq-%s broke single-socket devices" % zmq.zmq_version()) dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1) # select random port: binder = self.context.socket(zmq.REQ) port = binder.bind_to_random_port('tcp://127.0.0.1') binder.close() time.sleep(0.1) req = self.context.socket(zmq.REQ) req.connect('tcp://127.0.0.1:%i'%port) dev.bind_in('tcp://127.0.0.1:%i'%port) dev.start() time.sleep(.25) msg = b'hello' req.send(msg) self.assertEqual(msg, self.recv(req)) del dev req.close() dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1) # select random port: binder = self.context.socket(zmq.REQ) port = binder.bind_to_random_port('tcp://127.0.0.1') binder.close() time.sleep(0.1) req = self.context.socket(zmq.REQ) req.connect('tcp://127.0.0.1:%i'%port) dev.bind_in('tcp://127.0.0.1:%i'%port) dev.start() time.sleep(.25) msg = b'hello again' req.send(msg) self.assertEqual(msg, self.recv(req)) del dev req.close()