我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用zmq.XPUB。
def test_tcp_xpub_socket(event_loop, socket_factory, connect_or_bind): sub_socket = socket_factory.create(zmq.SUB) sub_socket.setsockopt(zmq.SUBSCRIBE, b'a') connect_or_bind(sub_socket, 'tcp://127.0.0.1:3333', reverse=True) def run(): frames = sub_socket.recv_multipart() assert frames == [b'a', b'message'] with run_in_background(run) as thread_done_event: async with azmq.Context(loop=event_loop) as context: socket = context.socket(azmq.XPUB) connect_or_bind(socket, 'tcp://127.0.0.1:3333') frames = await asyncio.wait_for(socket.recv_multipart(), 1) assert frames == [b'\1a'] while not thread_done_event.is_set(): await socket.send_multipart([b'a', b'message']) await socket.send_multipart([b'b', b'wrong']) sub_socket.close() frames = await asyncio.wait_for(socket.recv_multipart(), 1) assert frames == [b'\0a']
def test_tcp_sub_socket(event_loop, socket_factory, connect_or_bind): xpub_socket = socket_factory.create(zmq.XPUB) connect_or_bind(xpub_socket, 'tcp://127.0.0.1:3333', reverse=True) def run(): # Wait one second for the subscription to arrive. assert xpub_socket.poll(1000) == zmq.POLLIN topic = xpub_socket.recv_multipart() assert topic == [b'\x01a'] xpub_socket.send_multipart([b'a', b'message']) if connect_or_bind == 'connect': assert xpub_socket.poll(1000) == zmq.POLLIN topic = xpub_socket.recv_multipart() assert topic == [b'\x00a'] with run_in_background(run): async with azmq.Context(loop=event_loop) as context: socket = context.socket(azmq.SUB) await socket.subscribe(b'a') connect_or_bind(socket, 'tcp://127.0.0.1:3333') frames = await asyncio.wait_for(socket.recv_multipart(), 1) assert frames == [b'a', b'message']
def test_tcp_xsub_socket(event_loop, socket_factory, connect_or_bind): xpub_socket = socket_factory.create(zmq.XPUB) connect_or_bind(xpub_socket, 'tcp://127.0.0.1:3333', reverse=True) def run(): # Wait one second for the subscription to arrive. assert xpub_socket.poll(1000) == zmq.POLLIN topic = xpub_socket.recv_multipart() assert topic == [b'\x01a'] xpub_socket.send_multipart([b'a', b'message']) if connect_or_bind == 'connect': assert xpub_socket.poll(1000) == zmq.POLLIN topic = xpub_socket.recv_multipart() assert topic == [b'\x00a'] with run_in_background(run): async with azmq.Context(loop=event_loop) as context: socket = context.socket(azmq.XSUB) await socket.send_multipart([b'\x01a']) connect_or_bind(socket, 'tcp://127.0.0.1:3333') frames = await asyncio.wait_for(socket.recv_multipart(), 1) assert frames == [b'a', b'message']
def run(self): self.log.debug("Broker starts XPUB:{}, XSUB:{}" .format(self.xpub_url, self.xsub_url)) # self.proxy.start() poller = zmq.Poller() poller.register(self.xpub, zmq.POLLIN) poller.register(self.xsub, zmq.POLLIN) self.running = True while self.running: events = dict(poller.poll(1000)) if self.xpub in events: message = self.xpub.recv_multipart() self.log.debug("subscription message: {}".format(message[0])) self.xsub.send_multipart(message) if self.xsub in events: message = self.xsub.recv_multipart() self.log.debug("publishing message: {}".format(message)) self.xpub.send_multipart(message)
def __init__(self, xpub="tcp://127.0.0.1:8990", xsub="tcp://127.0.0.1:8989"): self.log = logging.getLogger("{module}.{name}".format( module=self.__class__.__module__, name=self.__class__.__name__)) super(Broker, self).__init__() self.running = False self.xpub_url = xpub self.xsub_url = xsub self.ctx = zmq.Context() self.xpub = self.ctx.socket(zmq.XPUB) self.xpub.bind(self.xpub_url) self.xsub = self.ctx.socket(zmq.XSUB) self.xsub.bind(self.xsub_url) # self.proxy = zmq.proxy(xpub, xsub)
def start_dispatch_thread(): global INITED, DISPATCHER if INITED: return DISPATCHER = zmq.devices.ThreadDevice(zmq.FORWARDER, zmq.XSUB, zmq.XPUB) DISPATCHER.bind_in(INTERNAL_SOCKET) DISPATCHER.connect_out(CHANGES_SOCKET) DISPATCHER.setsockopt_in(zmq.IDENTITY, b'XSUB') DISPATCHER.setsockopt_out(zmq.IDENTITY, b'XPUB') DISPATCHER.start() #Fix weird nosetests problems. TODO: find and fix underlying problem sleep(0.01) INITED = True
def __init__(self, address, base_port): self.ctx = zmq.Context() self.loop = IOLoop.instance() self.stats = { 'started': time(), 'spiders_out_recvd': 0, 'spiders_in_recvd': 0, 'db_in_recvd': 0, 'db_out_recvd': 0, 'sw_in_recvd': 0, 'sw_out_recvd': 0 } socket_config = SocketConfig(address, base_port) if socket_config.is_ipv6: self.ctx.setsockopt(zmq.IPV6, True) spiders_in_s = self.ctx.socket(zmq.XPUB) spiders_out_s = self.ctx.socket(zmq.XSUB) sw_in_s = self.ctx.socket(zmq.XPUB) sw_out_s = self.ctx.socket(zmq.XSUB) db_in_s = self.ctx.socket(zmq.XPUB) db_out_s = self.ctx.socket(zmq.XSUB) spiders_in_s.bind(socket_config.spiders_in()) spiders_out_s.bind(socket_config.spiders_out()) sw_in_s.bind(socket_config.sw_in()) sw_out_s.bind(socket_config.sw_out()) db_in_s.bind(socket_config.db_in()) db_out_s.bind(socket_config.db_out()) self.spiders_in = ZMQStream(spiders_in_s) self.spiders_out = ZMQStream(spiders_out_s) self.sw_in = ZMQStream(sw_in_s) self.sw_out = ZMQStream(sw_out_s) self.db_in = ZMQStream(db_in_s) self.db_out = ZMQStream(db_out_s) self.spiders_out.on_recv(self.handle_spiders_out_recv) self.sw_out.on_recv(self.handle_sw_out_recv) self.db_out.on_recv(self.handle_db_out_recv) self.sw_in.on_recv(self.handle_sw_in_recv) self.db_in.on_recv(self.handle_db_in_recv) self.spiders_in.on_recv(self.handle_spiders_in_recv) logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO) self.logger = logging.getLogger("distributed_frontera.messagebus" ".zeromq.broker.Server") self.logger.info("Using socket: {}:{}".format(socket_config.ip_addr, socket_config.base_port))