Python zmq 模块,XPUB 实例源码

我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用zmq.XPUB

项目:azmq    作者:ereOn    | 项目源码 | 文件源码
def test_tcp_xpub_socket(event_loop, socket_factory, connect_or_bind):
    sub_socket = socket_factory.create(zmq.SUB)
    sub_socket.setsockopt(zmq.SUBSCRIBE, b'a')
    connect_or_bind(sub_socket, 'tcp://127.0.0.1:3333', reverse=True)

    def run():
        frames = sub_socket.recv_multipart()
        assert frames == [b'a', b'message']

    with run_in_background(run) as thread_done_event:
        async with azmq.Context(loop=event_loop) as context:
            socket = context.socket(azmq.XPUB)
            connect_or_bind(socket, 'tcp://127.0.0.1:3333')

            frames = await asyncio.wait_for(socket.recv_multipart(), 1)
            assert frames == [b'\1a']

            while not thread_done_event.is_set():
                await socket.send_multipart([b'a', b'message'])
                await socket.send_multipart([b'b', b'wrong'])

            sub_socket.close()
            frames = await asyncio.wait_for(socket.recv_multipart(), 1)
            assert frames == [b'\0a']
项目:azmq    作者:ereOn    | 项目源码 | 文件源码
def test_tcp_sub_socket(event_loop, socket_factory, connect_or_bind):
    xpub_socket = socket_factory.create(zmq.XPUB)
    connect_or_bind(xpub_socket, 'tcp://127.0.0.1:3333', reverse=True)

    def run():
        # Wait one second for the subscription to arrive.
        assert xpub_socket.poll(1000) == zmq.POLLIN
        topic = xpub_socket.recv_multipart()
        assert topic == [b'\x01a']
        xpub_socket.send_multipart([b'a', b'message'])

        if connect_or_bind == 'connect':
            assert xpub_socket.poll(1000) == zmq.POLLIN
            topic = xpub_socket.recv_multipart()
            assert topic == [b'\x00a']

    with run_in_background(run):
        async with azmq.Context(loop=event_loop) as context:
            socket = context.socket(azmq.SUB)
            await socket.subscribe(b'a')
            connect_or_bind(socket, 'tcp://127.0.0.1:3333')

            frames = await asyncio.wait_for(socket.recv_multipart(), 1)
            assert frames == [b'a', b'message']
项目:azmq    作者:ereOn    | 项目源码 | 文件源码
def test_tcp_xsub_socket(event_loop, socket_factory, connect_or_bind):
    xpub_socket = socket_factory.create(zmq.XPUB)
    connect_or_bind(xpub_socket, 'tcp://127.0.0.1:3333', reverse=True)

    def run():
        # Wait one second for the subscription to arrive.
        assert xpub_socket.poll(1000) == zmq.POLLIN
        topic = xpub_socket.recv_multipart()
        assert topic == [b'\x01a']
        xpub_socket.send_multipart([b'a', b'message'])

        if connect_or_bind == 'connect':
            assert xpub_socket.poll(1000) == zmq.POLLIN
            topic = xpub_socket.recv_multipart()
            assert topic == [b'\x00a']

    with run_in_background(run):
        async with azmq.Context(loop=event_loop) as context:
            socket = context.socket(azmq.XSUB)
            await socket.send_multipart([b'\x01a'])
            connect_or_bind(socket, 'tcp://127.0.0.1:3333')

            frames = await asyncio.wait_for(socket.recv_multipart(), 1)
            assert frames == [b'a', b'message']
项目:uniflex    作者:uniflex    | 项目源码 | 文件源码
def run(self):
        self.log.debug("Broker starts XPUB:{}, XSUB:{}"
                       .format(self.xpub_url, self.xsub_url))
        # self.proxy.start()
        poller = zmq.Poller()
        poller.register(self.xpub, zmq.POLLIN)
        poller.register(self.xsub, zmq.POLLIN)
        self.running = True
        while self.running:
            events = dict(poller.poll(1000))
            if self.xpub in events:
                message = self.xpub.recv_multipart()
                self.log.debug("subscription message: {}".format(message[0]))
                self.xsub.send_multipart(message)
            if self.xsub in events:
                message = self.xsub.recv_multipart()
                self.log.debug("publishing message: {}".format(message))
                self.xpub.send_multipart(message)
项目:uniflex    作者:uniflex    | 项目源码 | 文件源码
def __init__(self, xpub="tcp://127.0.0.1:8990",
                 xsub="tcp://127.0.0.1:8989"):
        self.log = logging.getLogger("{module}.{name}".format(
            module=self.__class__.__module__, name=self.__class__.__name__))
        super(Broker, self).__init__()
        self.running = False
        self.xpub_url = xpub
        self.xsub_url = xsub
        self.ctx = zmq.Context()
        self.xpub = self.ctx.socket(zmq.XPUB)
        self.xpub.bind(self.xpub_url)
        self.xsub = self.ctx.socket(zmq.XSUB)
        self.xsub.bind(self.xsub_url)
        # self.proxy = zmq.proxy(xpub, xsub)
项目:idealoom    作者:conversence    | 项目源码 | 文件源码
def start_dispatch_thread():
    global INITED, DISPATCHER
    if INITED:
        return
    DISPATCHER = zmq.devices.ThreadDevice(zmq.FORWARDER, zmq.XSUB, zmq.XPUB)
    DISPATCHER.bind_in(INTERNAL_SOCKET)
    DISPATCHER.connect_out(CHANGES_SOCKET)
    DISPATCHER.setsockopt_in(zmq.IDENTITY, b'XSUB')
    DISPATCHER.setsockopt_out(zmq.IDENTITY, b'XPUB')
    DISPATCHER.start()
    #Fix weird nosetests problems. TODO: find and fix underlying problem
    sleep(0.01)
    INITED = True
项目:frontera-docs-zh_CN    作者:xsren    | 项目源码 | 文件源码
def __init__(self, address, base_port):
        self.ctx = zmq.Context()
        self.loop = IOLoop.instance()
        self.stats = {
            'started': time(),
            'spiders_out_recvd': 0,
            'spiders_in_recvd': 0,
            'db_in_recvd': 0,
            'db_out_recvd': 0,
            'sw_in_recvd': 0,
            'sw_out_recvd': 0
        }

        socket_config = SocketConfig(address, base_port)

        if socket_config.is_ipv6:
            self.ctx.setsockopt(zmq.IPV6, True)

        spiders_in_s = self.ctx.socket(zmq.XPUB)
        spiders_out_s = self.ctx.socket(zmq.XSUB)
        sw_in_s = self.ctx.socket(zmq.XPUB)
        sw_out_s = self.ctx.socket(zmq.XSUB)
        db_in_s = self.ctx.socket(zmq.XPUB)
        db_out_s = self.ctx.socket(zmq.XSUB)

        spiders_in_s.bind(socket_config.spiders_in())
        spiders_out_s.bind(socket_config.spiders_out())
        sw_in_s.bind(socket_config.sw_in())
        sw_out_s.bind(socket_config.sw_out())
        db_in_s.bind(socket_config.db_in())
        db_out_s.bind(socket_config.db_out())

        self.spiders_in = ZMQStream(spiders_in_s)
        self.spiders_out = ZMQStream(spiders_out_s)
        self.sw_in = ZMQStream(sw_in_s)
        self.sw_out = ZMQStream(sw_out_s)
        self.db_in = ZMQStream(db_in_s)
        self.db_out = ZMQStream(db_out_s)

        self.spiders_out.on_recv(self.handle_spiders_out_recv)
        self.sw_out.on_recv(self.handle_sw_out_recv)
        self.db_out.on_recv(self.handle_db_out_recv)

        self.sw_in.on_recv(self.handle_sw_in_recv)
        self.db_in.on_recv(self.handle_db_in_recv)
        self.spiders_in.on_recv(self.handle_spiders_in_recv)
        logging.basicConfig(format="%(asctime)s %(message)s",
                            datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
        self.logger = logging.getLogger("distributed_frontera.messagebus"
                                        ".zeromq.broker.Server")
        self.logger.info("Using socket: {}:{}".format(socket_config.ip_addr,
                                                      socket_config.base_port))