我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.PAIR。
def main(): port = "5556" context = zmq.Context() socket = context.socket(zmq.PAIR) socket.connect("tcp://localhost:%s" % port) socket.send_string(str('hello')) message = '00101110' cnt = 0 while True: reward = socket.recv() # 1 or 0, or '-1' for None print(reward) msg_in = socket.recv() print(msg_in) # think... msg_out = str(random.getrandbits(1) if cnt % 7 == 0 else 1) if cnt % 2 == 0: msg_out = str(message[cnt % 8]) socket.send(msg_out) cnt = cnt + 1
def __init__(self, cmd, port, address=None): try: import zmq except ImportError: raise ImportError("Must have zeromq for remote learner.") if address is None: address = '*' if port is None: port = 5556 elif int(port) < 1 or int(port) > 65535: raise ValueError("Invalid port number: %s" % port) self.context = zmq.Context() self.socket = self.context.socket(zmq.PAIR) self.socket.bind("tcp://%s:%s" % (address, port)) # launch learner if cmd is not None: subprocess.Popen((cmd + ' ' + str(port)).split()) handshake_in = self.socket.recv().decode('utf-8') assert handshake_in == 'hello' # handshake # send to learner, and get response;
def test_send_unicode(self): "test sending unicode objects" a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR) self.sockets.extend([a,b]) u = "ç?§" if str is not unicode: u = u.decode('utf8') self.assertRaises(TypeError, a.send, u,copy=False) self.assertRaises(TypeError, a.send, u,copy=True) a.send_unicode(u) s = b.recv() self.assertEqual(s,u.encode('utf8')) self.assertEqual(s.decode('utf8'),u) a.send_unicode(u,encoding='utf16') s = b.recv_unicode(encoding='utf16') self.assertEqual(s,u)
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'): self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB, in_prefix, out_prefix) alice = self.context.socket(zmq.PAIR) bob = self.context.socket(zmq.PAIR) mon = self.context.socket(zmq.SUB) aport = alice.bind_to_random_port('tcp://127.0.0.1') bport = bob.bind_to_random_port('tcp://127.0.0.1') mport = mon.bind_to_random_port('tcp://127.0.0.1') mon.setsockopt(zmq.SUBSCRIBE, mon_sub) self.device.connect_in("tcp://127.0.0.1:%i"%aport) self.device.connect_out("tcp://127.0.0.1:%i"%bport) self.device.connect_mon("tcp://127.0.0.1:%i"%mport) self.device.start() time.sleep(.2) try: # this is currenlty necessary to ensure no dropped monitor messages # see LIBZMQ-248 for more info mon.recv_multipart(zmq.NOBLOCK) except zmq.ZMQError: pass self.sockets.extend([alice, bob, mon]) return alice, bob, mon
def test_multisend(self): """ensure that a message remains intact after multiple sends""" a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR) s = b"message" m = zmq.Frame(s) self.assertEqual(s, m.bytes) a.send(m, copy=False) time.sleep(0.1) self.assertEqual(s, m.bytes) a.send(m, copy=False) time.sleep(0.1) self.assertEqual(s, m.bytes) a.send(m, copy=True) time.sleep(0.1) self.assertEqual(s, m.bytes) a.send(m, copy=True) time.sleep(0.1) self.assertEqual(s, m.bytes) for i in range(4): r = b.recv() self.assertEqual(s,r) self.assertEqual(s, m.bytes)
def test_noncopying_recv(self): """check for clobbering message buffers""" null = b'\0'*64 sa,sb = self.create_bound_pair(zmq.PAIR, zmq.PAIR) for i in range(32): # try a few times sb.send(null, copy=False) m = sa.recv(copy=False) mb = m.bytes # buf = view(m) buf = m.buffer del m for i in range(5): ff=b'\xff'*(40 + i*10) sb.send(ff, copy=False) m2 = sa.recv(copy=False) if view.__name__ == 'buffer': b = bytes(buf) else: b = buf.tobytes() self.assertEqual(b, null) self.assertEqual(mb, null) self.assertEqual(m2.bytes, ff)
def test_timeout(self): """make sure Poller.poll timeout has the right units (milliseconds).""" s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) poller = self.Poller() poller.register(s1, zmq.POLLIN) tic = time.time() evt = poller.poll(.005) toc = time.time() self.assertTrue(toc-tic < 0.1) tic = time.time() evt = poller.poll(5) toc = time.time() self.assertTrue(toc-tic < 0.1) self.assertTrue(toc-tic > .001) tic = time.time() evt = poller.poll(500) toc = time.time() self.assertTrue(toc-tic < 1) self.assertTrue(toc-tic > 0.1)
def test_multiple(self): s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) for i in range(10): msg = i*x s1.send(msg) for i in range(10): msg = i*x s2.send(msg) for i in range(10): msg = s1.recv() self.assertEqual(msg, i*x) for i in range(10): msg = s2.recv() self.assertEqual(msg, i*x)
def test_tcp_pair_socket(event_loop, socket_factory, connect_or_bind): pair_socket = socket_factory.create(zmq.PAIR) connect_or_bind(pair_socket, 'tcp://127.0.0.1:3333', reverse=True) def run(): assert pair_socket.poll(1000) == zmq.POLLIN message = pair_socket.recv_multipart() assert message == [b'hello', b'world'] pair_socket.send_multipart([b'my', b'message']) with run_in_background(run): async with azmq.Context(loop=event_loop) as context: socket = context.socket(azmq.PAIR) connect_or_bind(socket, 'tcp://127.0.0.1:3333') await socket.send_multipart([b'hello', b'world']) message = await asyncio.wait_for(socket.recv_multipart(), 1) assert message == [b'my', b'message']
def ensure_and_bind(self, socket_name, socket_type, address, polling_mechanism): """Ensure that a socket exists, that is *binded* to the given address and that is registered with the given polling mechanism. This method is a handy replacement for calling ``.get_or_create()``, ``.bind()`` and then ``.engage()``. returns the socket itself. :param socket_name: the socket name :param socket_type: a valid socket type (i.e: ``zmq.REQ``, ``zmq.PUB``, ``zmq.PAIR``, ...) :param address: a valid zeromq address (i.e: inproc://whatevs) :param polling_mechanism: ``zmq.POLLIN``, ``zmq.POLLOUT`` or ``zmq.POLLIN | zmq.POLLOUT`` """ self.get_or_create(socket_name, socket_type, polling_mechanism) socket = self.bind(socket_name, address, polling_mechanism) self.engage() return socket
def get_or_create(self, name, socket_type, polling_mechanism): """ensure that a socket exists and is registered with a given polling_mechanism (POLLIN, POLLOUT or both) returns the socket itself. :param name: the socket name :param socket_type: a valid socket type (i.e: ``zmq.REQ``, ``zmq.PUB``, ``zmq.PAIR``, ...) :param polling_mechanism: one of (``zmq.POLLIN``, ``zmq.POLLOUT``, ``zmq.POLLIN | zmq.POLLOUT``) """ if name not in self.sockets: self.create(name, socket_type) socket = self.get_by_name(name) self.register_socket(socket, polling_mechanism) return socket
def test_recv_spawned_before_send_is_non_blocking(self): req, rep, port = self.create_bound_pair(zmq.PAIR, zmq.PAIR) # req.connect(ipc) # rep.bind(ipc) eventlet.sleep() msg = dict(res=None) done = eventlet.Event() def rx(): msg['res'] = rep.recv() done.send('done') eventlet.spawn(rx) req.send(b'test') done.wait() self.assertEqual(msg['res'], b'test')
def main(): import zmq import random port = "5556" context = zmq.Context() socket = context.socket(zmq.PAIR) socket.connect("tcp://localhost:%s" % port) socket.send_string(str('hello')) message = '00101110' cnt = 0 while True: reward = socket.recv() # 1 or 0, or '-1' for None print(reward) msg_in = socket.recv() print(msg_in) # think... msg_out = str(random.getrandbits(1) if cnt % 7 == 0 else 1) if cnt % 2 == 0: msg_out = str(message[cnt % 8]) socket.send(msg_out) cnt = cnt + 1
def __init__(self, cmd, port): try: import zmq except ImportError: raise ImportError("Must have zeromq for remote learner.") self.port = port if port is not None else 5556 self.context = zmq.Context() self.socket = self.context.socket(zmq.PAIR) self.socket.bind("tcp://*:%s" % port) # launch learner subprocess.Popen((cmd + ' ' + str(self.port)).split()) handshake_in = self.socket.recv() assert handshake_in == 'hello' # handshake # send to learner, and get response;
def zcreate_pipe(ctx, hwm=1000): backend = zsocket.ZSocket(ctx, zmq.PAIR) frontend = zsocket.ZSocket(ctx, zmq.PAIR) backend.set_hwm(hwm) frontend.set_hwm(hwm) # close immediately on shutdown backend.setsockopt(zmq.LINGER, 0) frontend.setsockopt(zmq.LINGER, 0) endpoint = "inproc://zactor-%04x-%04x\n"\ %(random.randint(0, 0x10000), random.randint(0, 0x10000)) while True: try: frontend.bind(endpoint) except: endpoint = "inproc://zactor-%04x-%04x\n"\ %(random.randint(0, 0x10000), random.randint(0, 0x10000)) else: break backend.connect(endpoint) return (frontend, backend)
def start(self): """Start the authentication thread""" # create a socket to communicate with auth thread. self.pipe = self.context.socket(zmq.PAIR) self.pipe.linger = 1 self.pipe.bind(self.pipe_endpoint) authenticator = MultiZapAuthenticator(self.context, encoding=self.encoding, log=self.log) self.thread = AuthenticationThread(self.context, self.pipe_endpoint, encoding=self.encoding, log=self.log, authenticator=authenticator) self.thread.start() # Event.wait:Changed in version 2.7: Previously, the method always returned None. if sys.version_info < (2, 7): self.thread.started.wait(timeout=10) else: if not self.thread.started.wait(timeout=10): raise RuntimeError("Authenticator thread failed to start")