我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.PUB。
def __init__(self, opts=None): if opts is None: self.opts = self.process_config(CONFIG_LOCATION) else: self.opts = opts self.ctx = zmq.Context() self.pub_socket = self.ctx.socket(zmq.PUB) self.pub_socket.bind('tcp://127.0.0.1:2000') self.loop = zmq.eventloop.IOLoop.instance() self.pub_stream = zmq.eventloop.zmqstream.ZMQStream(self.pub_socket, self.loop) # Now create PULL socket over IPC to listen to reactor self.pull_socket = self.ctx.socket(zmq.PULL) self.pull_socket.bind('ipc:///tmp/reactor.ipc') self.pull_stream = zmq.eventloop.zmqstream.ZMQStream(self.pull_socket, self.loop) self.pull_stream.on_recv(self.republish)
def test_unicode_sockopts(self): """test setting/getting sockopts with unicode strings""" topic = "tést" if str is not unicode: topic = topic.decode('utf8') p,s = self.create_bound_pair(zmq.PUB, zmq.SUB) self.assertEqual(s.send_unicode, s.send_unicode) self.assertEqual(p.recv_unicode, p.recv_unicode) self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic) self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic) s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16') self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic) s.setsockopt_unicode(zmq.SUBSCRIBE, topic) self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY) self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE) identb = s.getsockopt(zmq.IDENTITY) identu = identb.decode('utf16') identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16') self.assertEqual(identu, identu2) time.sleep(0.1) # wait for connection/subscription p.send_unicode(topic,zmq.SNDMORE) p.send_unicode(topic*2, encoding='latin-1') self.assertEqual(topic, s.recv_unicode()) self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
def test_hwm(self): zmq3 = zmq.zmq_version_info()[0] >= 3 for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER): s = self.context.socket(stype) s.hwm = 100 self.assertEqual(s.hwm, 100) if zmq3: try: self.assertEqual(s.sndhwm, 100) except AttributeError: pass try: self.assertEqual(s.rcvhwm, 100) except AttributeError: pass s.close()
def test_shadow(self): ctx = self.Context() ctx2 = self.Context.shadow(ctx.underlying) self.assertEqual(ctx.underlying, ctx2.underlying) s = ctx.socket(zmq.PUB) s.close() del ctx2 self.assertFalse(ctx.closed) s = ctx.socket(zmq.PUB) ctx2 = self.Context.shadow(ctx.underlying) s2 = ctx2.socket(zmq.PUB) s.close() s2.close() ctx.term() self.assertRaisesErrno(zmq.EFAULT, ctx2.socket, zmq.PUB) del ctx2
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'): self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB, in_prefix, out_prefix) alice = self.context.socket(zmq.PAIR) bob = self.context.socket(zmq.PAIR) mon = self.context.socket(zmq.SUB) aport = alice.bind_to_random_port('tcp://127.0.0.1') bport = bob.bind_to_random_port('tcp://127.0.0.1') mport = mon.bind_to_random_port('tcp://127.0.0.1') mon.setsockopt(zmq.SUBSCRIBE, mon_sub) self.device.connect_in("tcp://127.0.0.1:%i"%aport) self.device.connect_out("tcp://127.0.0.1:%i"%bport) self.device.connect_mon("tcp://127.0.0.1:%i"%mport) self.device.start() time.sleep(.2) try: # this is currenlty necessary to ensure no dropped monitor messages # see LIBZMQ-248 for more info mon.recv_multipart(zmq.NOBLOCK) except zmq.ZMQError: pass self.sockets.extend([alice, bob, mon]) return alice, bob, mon
def compose_message(message: bytes, topic: bytes, serializer: AgentAddressSerializer) -> bytes: """ Compose a message and leave it ready to be sent through a socket. This is used in PUB-SUB patterns to combine the topic and the message in a single bytes buffer. Parameters ---------- message Message to be composed. topic Topic to combine the message with. serializer Serialization for the message part. Returns ------- The bytes representation of the final message to be sent. """ if serializer.requires_separator: return topic + TOPIC_SEPARATOR + message return topic + message
def test_agentchannel_sync_pub(): """ Test basic SYNC_PUB AgentChannel operations: initialization, equivalence and basic methods. """ sender = AgentAddress('ipc', 'addr0', 'PUB', 'server', 'pickle') receiver = AgentAddress('ipc', 'addr0', 'PULL', 'server', 'pickle') channel = AgentChannel('SYNC_PUB', sender=sender, receiver=receiver) # Equivalence assert channel == AgentChannel('SYNC_PUB', sender=sender, receiver=receiver) assert not channel == 'foo' assert channel != 'foo' # Basic methods assert channel.twin() == AgentChannel('SYNC_SUB', sender=receiver.twin(), receiver=sender.twin()) # Other attributes assert hasattr(channel, 'uuid') assert channel.transport == 'ipc' assert channel.serializer == 'pickle'
def indicators_create(self, data): if isinstance(data, dict): data = Indicator(**data) if isinstance(data, Indicator): data = [data] if self.socket_type == "PUB": for i in data: self.socket.send_multipart([self.topic, str(i)]) return if self.socket_type == 'PUSH_ZYRE_GATEWAY': for i in data: self.socket.send_multipart(['PUB', self.topic, str(i)]) return for i in data: self.socket.send(str(i))
def __init__(self, repAddress, pubAddress): """Constructor""" super(RpcServer, self).__init__() # ??????????key?????value????? self.__functions = {} # zmq???? self.__context = zmq.Context() self.__socketREP = self.__context.socket(zmq.REP) # ????socket self.__socketREP.bind(repAddress) self.__socketPUB = self.__context.socket(zmq.PUB) # ????socket self.__socketPUB.bind(pubAddress) # ?????? self.__active = False # ???????? self.__thread = threading.Thread(target=self.run) # ???????? #----------------------------------------------------------------------
def run(self): self._loop = zmq.asyncio.ZMQEventLoop() asyncio.set_event_loop(self._loop) self.context = zmq.asyncio.Context() self.status_sock = self.context.socket(zmq.ROUTER) self.data_sock = self.context.socket(zmq.PUB) self.status_sock.bind("tcp://*:%s" % self.status_port) self.data_sock.bind("tcp://*:%s" % self.data_port) self.poller = zmq.asyncio.Poller() self.poller.register(self.status_sock, zmq.POLLIN) self._loop.create_task(self.poll_sockets()) try: self._loop.run_forever() finally: self.status_sock.close() self.data_sock.close() self.context.destroy()
def test_pub(self): """Publish log messages. bind() to PUB socket.""" # pylint: disable=E1101 context = zmq.Context() pub = context.socket(zmq.PUB) try: pub.bind('tcp://*:{}'.format(self.sub_port)) except zmq.ZMQError as error: print(error) time.sleep(0.1) send_count = self.send_count for i in range(send_count): pub.send_string('hi there {}'.format(i)) time.sleep(1e-5) sys.stdout.flush() # Wait for the watcher thread to exit. while self.watcher.isAlive(): self.watcher.join(timeout=1e-5) pub.close() context.term()
def test_pub(self): """Publish log messages. connect() to PUB socket.""" # pylint: disable=E1101 context = zmq.Context() pub = context.socket(zmq.PUB) try: _address = 'tcp://{}:{}'.format(self.sub_host, self.sub_port) pub.connect(_address) except zmq.ZMQError as error: print('ERROR:', error) time.sleep(0.1) send_count = self.send_count for i in range(send_count): pub.send_string('hi there {}'.format(i)) time.sleep(1e-5) # Wait for the watcher thread to exit while self.watcher.isAlive(): self.watcher.join(timeout=1e-5) pub.close() context.term()
def to(cls, channel, host='127.0.0.1', port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, level=logging.NOTSET): """Convenience class method to create a ZmqLoghandler and connect to a ZMQ subscriber. Args: channel (string): Logging channel name. This is used to build a ZMQ topic. host (string): Hostname / ip address of the subscriber to publish to. port (int, string): Port on which to publish messages. level (int): Logging level """ context = zmq.Context() publisher = context.socket(zmq.PUB) address = 'tcp://{}:{}'.format(host, port) publisher.connect(address) time.sleep(0.1) # This sleep hopefully fixes the silent joiner problem. return cls(channel, publisher, level=level)
def test_init_socket(self): pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB) logger = self.logger handler = handlers.PUBHandler(pub) handler.setLevel(logging.DEBUG) handler.root_topic = self.topic logger.addHandler(handler) self.assertTrue(handler.socket is pub) self.assertTrue(handler.ctx is pub.context) self.assertTrue(handler.ctx is self.context) sub.setsockopt(zmq.SUBSCRIBE, b(self.topic)) import time; time.sleep(0.1) msg1 = 'message' logger.info(msg1) (topic, msg2) = sub.recv_multipart() self.assertEqual(topic, b'zmq.INFO') self.assertEqual(msg2, b(msg1)+b'\n') logger.removeHandler(handler)
def __init__(self, port=PORT): context = zmq.Context() self.port = port self.socket = context.socket(zmq.PUB) self.socket.bind("tcp://*:%d" % self.port) rlogger.info("Listening on port %d", self.port)
def send(output): zmq_ctx = zmq.Context() c = zmq_ctx.socket(zmq.PUB) c.connect(output) while True: frame = (output).encode() c.send(frame, zmq.NOBLOCK) time.sleep(0.1)
def connect(self): logger.debug('Connecting output to {}'.format(self.output)) c = self.zmq_ctx.socket(zmq.PUB) c.connect(self.output) self.connected = True return c
def start_broker(): global global_socket context = zmq.Context() global_socket = context.socket(zmq.PUB) global_socket.bind("tcp://*:%s"%config.getint('general','revocation_notifier_port'))
def __init__(self, interface_or_socket, context=None): logging.Handler.__init__(self) if isinstance(interface_or_socket, zmq.Socket): self.socket = interface_or_socket self.ctx = self.socket.context else: self.ctx = context or zmq.Context() self.socket = self.ctx.socket(zmq.PUB) self.socket.bind(interface_or_socket)
def __init__(self, in_type, out_type, mon_type=PUB, in_prefix=b'in', out_prefix=b'out'): ProxyBase.__init__(self, in_type=in_type, out_type=out_type, mon_type=mon_type) self._in_prefix = in_prefix self._out_prefix = out_prefix
def __init__(self, in_type, out_type, mon_type=zmq.PUB): Device.__init__(self, in_type=in_type, out_type=out_type) self.mon_type = mon_type self._mon_binds = [] self._mon_connects = [] self._mon_sockopts = []
def test_create(self): ctx = self.Context() s = ctx.socket(zmq.PUB) # Superluminal protocol not yet implemented self.assertRaisesErrno(zmq.EPROTONOSUPPORT, s.bind, 'ftl://a') self.assertRaisesErrno(zmq.EPROTONOSUPPORT, s.connect, 'ftl://a') self.assertRaisesErrno(zmq.EINVAL, s.bind, 'tcp://') s.close() del ctx
def test_dir(self): ctx = self.Context() s = ctx.socket(zmq.PUB) self.assertTrue('send' in dir(s)) self.assertTrue('IDENTITY' in dir(s)) self.assertTrue('AFFINITY' in dir(s)) self.assertTrue('FD' in dir(s)) s.close() ctx.term()
def test_connect_unicode(self): s = self.socket(zmq.PUB) s.connect(unicode("tcp://127.0.0.1:5555"))
def test_bind_to_random_port(self): # Check that bind_to_random_port do not hide useful exception ctx = self.Context() c = ctx.socket(zmq.PUB) # Invalid format try: c.bind_to_random_port('tcp:*') except zmq.ZMQError as e: self.assertEqual(e.errno, zmq.EINVAL) # Invalid protocol try: c.bind_to_random_port('rand://*') except zmq.ZMQError as e: self.assertEqual(e.errno, zmq.EPROTONOSUPPORT)
def test_bad_sockopts(self): """Test that appropriate errors are raised on bad socket options""" s = self.context.socket(zmq.PUB) self.sockets.append(s) s.setsockopt(zmq.LINGER, 0) # unrecognized int sockopts pass through to libzmq, and should raise EINVAL self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, 9999, 5) self.assertRaisesErrno(zmq.EINVAL, s.getsockopt, 9999) # but only int sockopts are allowed through this way, otherwise raise a TypeError self.assertRaises(TypeError, s.setsockopt, 9999, b"5") # some sockopts are valid in general, but not on every socket: self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, zmq.SUBSCRIBE, b'hi')
def test_close(self): ctx = self.Context() s = ctx.socket(zmq.PUB) s.close() self.assertRaisesErrno(zmq.ENOTSOCK, s.bind, b'') self.assertRaisesErrno(zmq.ENOTSOCK, s.connect, b'') self.assertRaisesErrno(zmq.ENOTSOCK, s.setsockopt, zmq.SUBSCRIBE, b'') self.assertRaisesErrno(zmq.ENOTSOCK, s.send, b'asdf') self.assertRaisesErrno(zmq.ENOTSOCK, s.recv) del ctx
def test_ipc_path_max_length_msg(self): if zmq.IPC_PATH_MAX_LEN == 0: raise SkipTest("IPC_PATH_MAX_LEN undefined") s = self.context.socket(zmq.PUB) self.sockets.append(s) try: s.bind('ipc://{0}'.format('a' * (zmq.IPC_PATH_MAX_LEN + 1))) except zmq.ZMQError as e: self.assertTrue(str(zmq.IPC_PATH_MAX_LEN) in e.strerror)