我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用zmq.zmq_version()。
def test_single_socket_forwarder_connect(self): if zmq.zmq_version() in ('4.1.1', '4.0.6'): raise SkipTest("libzmq-%s broke single-socket devices" % zmq.zmq_version()) dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1) req = self.context.socket(zmq.REQ) port = req.bind_to_random_port('tcp://127.0.0.1') dev.connect_in('tcp://127.0.0.1:%i'%port) dev.start() time.sleep(.25) msg = b'hello' req.send(msg) self.assertEqual(msg, self.recv(req)) del dev req.close() dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1) req = self.context.socket(zmq.REQ) port = req.bind_to_random_port('tcp://127.0.0.1') dev.connect_out('tcp://127.0.0.1:%i'%port) dev.start() time.sleep(.25) msg = b'hello again' req.send(msg) self.assertEqual(msg, self.recv(req)) del dev req.close()
def test_bad_send_recv(self): s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP) if zmq.zmq_version() != '2.1.8': # this doesn't work on 2.1.8 for copy in (True,False): self.assertRaisesErrno(zmq.EFSM, s1.recv, copy=copy) self.assertRaisesErrno(zmq.EFSM, s2.send, b'asdf', copy=copy) # I have to have this or we die on an Abort trap. msg1 = b'asdf' msg2 = self.ping_pong(s1, s2, msg1) self.assertEqual(msg1, msg2)
def test_single_socket_forwarder_bind(self): if zmq.zmq_version() in ('4.1.1', '4.0.6'): raise SkipTest("libzmq-%s broke single-socket devices" % zmq.zmq_version()) dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1) # select random port: binder = self.context.socket(zmq.REQ) port = binder.bind_to_random_port('tcp://127.0.0.1') binder.close() time.sleep(0.1) req = self.context.socket(zmq.REQ) req.connect('tcp://127.0.0.1:%i'%port) dev.bind_in('tcp://127.0.0.1:%i'%port) dev.start() time.sleep(.25) msg = b'hello' req.send(msg) self.assertEqual(msg, self.recv(req)) del dev req.close() dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1) # select random port: binder = self.context.socket(zmq.REQ) port = binder.bind_to_random_port('tcp://127.0.0.1') binder.close() time.sleep(0.1) req = self.context.socket(zmq.REQ) req.connect('tcp://127.0.0.1:%i'%port) dev.bind_in('tcp://127.0.0.1:%i'%port) dev.start() time.sleep(.25) msg = b'hello again' req.send(msg) self.assertEqual(msg, self.recv(req)) del dev req.close()
def test_zmq_version(self): v = zmq.zmq_version() self.assertTrue(isinstance(v, str))
def get_monitor_socket(self, events=None, addr=None): """Return a connected PAIR socket ready to receive the event notifications. .. versionadded:: libzmq-4.0 .. versionadded:: 14.0 Parameters ---------- events : bitfield (int) [default: ZMQ_EVENTS_ALL] The bitmask defining which events are wanted. addr : string [default: None] The optional endpoint for the monitoring sockets. Returns ------- socket : (PAIR) The socket is already connected and ready to receive messages. """ # safe-guard, method only available on libzmq >= 4 if zmq.zmq_version_info() < (4,): raise NotImplementedError("get_monitor_socket requires libzmq >= 4, have %s" % zmq.zmq_version()) if addr is None: # create endpoint name from internal fd addr = "inproc://monitor.s-%d" % self.FD if events is None: # use all events events = zmq.EVENT_ALL # attach monitoring socket self.monitor(addr, events) # create new PAIR socket and connect it ret = self.context.socket(zmq.PAIR) ret.connect(addr) return ret
def __init__(self, min_version, msg='Feature'): global _zmq_version if _zmq_version is None: from zmq import zmq_version _zmq_version = zmq_version() self.msg = msg self.min_version = min_version self.version = _zmq_version
def test_zmq(pyi_builder): pyi_builder.test_source( """ import zmq print(zmq.__version__) print(zmq.zmq_version()) # This is a problematic module and might cause some issues. import zmq.utils.strtypes """)