Python zmq 模块,SUBSCRIBE 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.SUBSCRIBE

项目:live-plotter    作者:anandtrex    | 项目源码 | 文件源码
def run(self):
        """
        Entry point for the live plotting when started as a separate process. This starts the loop
        """
        self.entity_name = current_process().name
        plogger.info("Starting new thread %s", self.entity_name)

        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.SUB)

        self.socket.connect("tcp://localhost:%d" % self.port)
        topic = pickle.dumps(self.var_name, protocol=pickle.HIGHEST_PROTOCOL)

        self.socket.setsockopt(zmq.SUBSCRIBE, topic)
        plogger.info("Subscribed to topic %s on port %d", self.var_name, self.port)

        self.init(**self.init_kwargs)
        # Reference to animation required so that GC doesn't clean it up.
        # WILL NOT work if you remove it!!!!!
        # See: http://matplotlib.org/api/animation_api.html
        ani = animation.FuncAnimation(self.fig, self.loop, interval=100)
        self.plt.show()
项目:odr-stream-router    作者:digris    | 项目源码 | 文件源码
def rec(port):

    zmq_ctx = zmq.Context()

    s = zmq_ctx.socket(zmq.SUB)
    s.bind('tcp://*:{port}'.format(port=port))
    s.setsockopt(zmq.SUBSCRIBE, b"")


    stream = ZMQStream(s)

    stream.on_recv_stream(rec_frame)

    ioloop.IOLoop.instance().start()

    while True:
        pass
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_init_iface(self):
        logger = self.logger
        ctx = self.context
        handler = handlers.PUBHandler(self.iface)
        self.assertFalse(handler.ctx is ctx)
        self.sockets.append(handler.socket)
        # handler.ctx.term()
        handler = handlers.PUBHandler(self.iface, self.context)
        self.sockets.append(handler.socket)
        self.assertTrue(handler.ctx is ctx)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)
        sub = ctx.socket(zmq.SUB)
        self.sockets.append(sub)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        sub.connect(self.iface)
        import time; time.sleep(0.25)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_init_socket(self):
        pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
        logger = self.logger
        handler = handlers.PUBHandler(pub)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)

        self.assertTrue(handler.socket is pub)
        self.assertTrue(handler.ctx is pub.context)
        self.assertTrue(handler.ctx is self.context)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        import time; time.sleep(0.1)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_root_topic(self):
        logger, handler, sub = self.connect_handler()
        handler.socket.bind(self.iface)
        sub2 = sub.context.socket(zmq.SUB)
        self.sockets.append(sub2)
        sub2.connect(self.iface)
        sub2.setsockopt(zmq.SUBSCRIBE, b'')
        handler.root_topic = b'twoonly'
        msg1 = 'ignored'
        logger.info(msg1)
        self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
        topic,msg2 = sub2.recv_multipart()
        self.assertEqual(topic, b'twoonly.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')

        logger.removeHandler(handler)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
        self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
                                            in_prefix, out_prefix)
        alice = self.context.socket(zmq.PAIR)
        bob = self.context.socket(zmq.PAIR)
        mon = self.context.socket(zmq.SUB)

        aport = alice.bind_to_random_port('tcp://127.0.0.1')
        bport = bob.bind_to_random_port('tcp://127.0.0.1')
        mport = mon.bind_to_random_port('tcp://127.0.0.1')
        mon.setsockopt(zmq.SUBSCRIBE, mon_sub)

        self.device.connect_in("tcp://127.0.0.1:%i"%aport)
        self.device.connect_out("tcp://127.0.0.1:%i"%bport)
        self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
        self.device.start()
        time.sleep(.2)
        try:
            # this is currenlty necessary to ensure no dropped monitor messages
            # see LIBZMQ-248 for more info
            mon.recv_multipart(zmq.NOBLOCK)
        except zmq.ZMQError:
            pass
        self.sockets.extend([alice, bob, mon])
        return alice, bob, mon
项目:osbrain    作者:opensistemas-hub    | 项目源码 | 文件源码
def _subscribe_to_topic(self, alias: str, topic: Union[bytes, str]):
        '''
        Do the actual ZeroMQ subscription of a socket given by its alias to
        a specific topic. This method only makes sense to be called on
        SUB/SYNC_SUB sockets.

        Note that the handler is not set within this function.
        '''
        topic = topic_to_bytes(topic)

        if isinstance(self.address[alias], AgentAddress):
            self.socket[alias].setsockopt(zmq.SUBSCRIBE, topic)
        elif isinstance(self.address[alias], AgentChannel):
            channel = self.address[alias]
            sub_address = channel.receiver
            treated_topic = channel.uuid + topic
            self.socket[sub_address].setsockopt(zmq.SUBSCRIBE, treated_topic)
        else:
            raise NotImplementedError('Unsupported address type %s!' %
                                      self.address[alias])
项目:Chasar    作者:camilochs    | 项目源码 | 文件源码
def create_socket(port):
    """
    Create zmq sub socket.
    """
    context = zmq.Context()
    socket = context.socket(zmq.SUB)

    try:
        socket.bind("tcp://*:%s" % port)
    except zmq.error.ZMQError:
        print("Address already in use")
        sys.exit(1)

    socket.setsockopt(zmq.SUBSCRIBE, b"")
    print("Start node-masternode Subscribe")
    return socket, context
项目:Chasar    作者:camilochs    | 项目源码 | 文件源码
def create_socket(port):
    """
    Create zmq sub socket.
    """
    context = zmq.Context()
    socket = context.socket(zmq.SUB)

    try:
        socket.bind("tcp://*:%s" % port)
    except zmq.error.ZMQError:
        print("Address already in use")
        sys.exit(1)

    socket.setsockopt(zmq.SUBSCRIBE, b"")
    print("Start node-masternode Subscribe")
    return socket, context
项目:simple-processing    作者:bitaps-com    | 项目源码 | 文件源码
def __init__(self, loop, logger, config):
        print("test")
        self.loop = loop
        self.log = logger
        self.config = config
        self.zmq_url = config["BITCOIND"]["zeromq"]
        self.zmqContext = zmq.asyncio.Context()
        self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
        self.MYSQL_CONFIG = config["MYSQL"]
        self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
        # self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
        # self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")
        # self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx")
        self.zmqSubSocket.connect(self.zmq_url)
        print(self.zmq_url)
        self.loop.create_task(self.init_db())
        self.loop.create_task(self.handle())
        self.loop.create_task(self.rpctest())
        # self.loop.create_task(self.mysqltest())
项目:networkzero    作者:tjguk    | 项目源码 | 文件源码
def wait_for_news_from(self, address, topic, wait_for_s):
        if isinstance(address, list):
            addresses = address
        else:
            addresses = [address]
        socket = self.get_socket(addresses, "subscriber")
        if isinstance(topic, str):
            topics = [topic]
        else:
            topics = topic
        for t in topics:
            socket.set(zmq.SUBSCRIBE, t.encode(config.ENCODING))        
        try:
            result = self._receive_with_timeout(socket, wait_for_s, use_multipart=True)
            unserialised_result = _unserialise_for_pubsub(result)
            return unserialised_result
        except (core.SocketTimedOutError, core.SocketInterruptedError):
            return None, None
项目:enteletaor    作者:cr0hn    | 项目源码 | 文件源码
def brute_zmq(host, port=5555, user=None, password=None, db=0):

    context = zmq.Context()

    # Configure
    socket = context.socket(zmq.SUB)
    socket.setsockopt(zmq.SUBSCRIBE, b"")  # All topics
    socket.setsockopt(zmq.LINGER, 0)  # All topics
    socket.RCVTIMEO = 1000  # timeout: 1 sec

    # Connect
    socket.connect("tcp://%s:%s" % (host, port))

    # Try to receive
    try:
        socket.recv()

        return True
    except Exception:
        return False
    finally:
        socket.close()
项目:enteletaor    作者:cr0hn    | 项目源码 | 文件源码
def handle_zmq(host, port=5555, extra_config=None):

    # log.debug("      * Connection to ZeroMQ: %s : %s" % (host, port))

    context = zmq.Context()

    # Configure
    socket = context.socket(zmq.SUB)
    socket.setsockopt(zmq.SUBSCRIBE, b"")  # All topics
    socket.setsockopt(zmq.LINGER, 0)  # All topics
    socket.RCVTIMEO = 1000  # timeout: 1 sec

    # Connect
    socket.connect("tcp://%s:%s" % (host, port))

    # Try to receive
    try:
        socket.recv()

        return True
    except Exception:
        return False
    finally:
        socket.close()
项目:azmq    作者:ereOn    | 项目源码 | 文件源码
def test_tcp_pub_socket(event_loop, socket_factory, connect_or_bind):
    sub_socket = socket_factory.create(zmq.SUB)
    sub_socket.setsockopt(zmq.SUBSCRIBE, b'a')
    connect_or_bind(sub_socket, 'tcp://127.0.0.1:3333', reverse=True)

    def run():
        frames = sub_socket.recv_multipart()
        assert frames == [b'a', b'message']

    with run_in_background(run) as thread_done_event:
        async with azmq.Context(loop=event_loop) as context:
            socket = context.socket(azmq.PUB)
            connect_or_bind(socket, 'tcp://127.0.0.1:3333')

            while not thread_done_event.is_set():
                await socket.send_multipart([b'a', b'message'])
                await socket.send_multipart([b'b', b'wrong'])
项目:azmq    作者:ereOn    | 项目源码 | 文件源码
def test_tcp_xpub_socket(event_loop, socket_factory, connect_or_bind):
    sub_socket = socket_factory.create(zmq.SUB)
    sub_socket.setsockopt(zmq.SUBSCRIBE, b'a')
    connect_or_bind(sub_socket, 'tcp://127.0.0.1:3333', reverse=True)

    def run():
        frames = sub_socket.recv_multipart()
        assert frames == [b'a', b'message']

    with run_in_background(run) as thread_done_event:
        async with azmq.Context(loop=event_loop) as context:
            socket = context.socket(azmq.XPUB)
            connect_or_bind(socket, 'tcp://127.0.0.1:3333')

            frames = await asyncio.wait_for(socket.recv_multipart(), 1)
            assert frames == [b'\1a']

            while not thread_done_event.is_set():
                await socket.send_multipart([b'a', b'message'])
                await socket.send_multipart([b'b', b'wrong'])

            sub_socket.close()
            frames = await asyncio.wait_for(socket.recv_multipart(), 1)
            assert frames == [b'\0a']
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_init_iface(self):
        logger = self.logger
        ctx = self.context
        handler = handlers.PUBHandler(self.iface)
        self.assertFalse(handler.ctx is ctx)
        self.sockets.append(handler.socket)
        # handler.ctx.term()
        handler = handlers.PUBHandler(self.iface, self.context)
        self.sockets.append(handler.socket)
        self.assertTrue(handler.ctx is ctx)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)
        sub = ctx.socket(zmq.SUB)
        self.sockets.append(sub)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        sub.connect(self.iface)
        import time; time.sleep(0.25)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_init_socket(self):
        pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
        logger = self.logger
        handler = handlers.PUBHandler(pub)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)

        self.assertTrue(handler.socket is pub)
        self.assertTrue(handler.ctx is pub.context)
        self.assertTrue(handler.ctx is self.context)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        import time; time.sleep(0.1)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_root_topic(self):
        logger, handler, sub = self.connect_handler()
        handler.socket.bind(self.iface)
        sub2 = sub.context.socket(zmq.SUB)
        self.sockets.append(sub2)
        sub2.connect(self.iface)
        sub2.setsockopt(zmq.SUBSCRIBE, b'')
        handler.root_topic = b'twoonly'
        msg1 = 'ignored'
        logger.info(msg1)
        self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
        topic,msg2 = sub2.recv_multipart()
        self.assertEqual(topic, b'twoonly.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')

        logger.removeHandler(handler)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
        self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
                                            in_prefix, out_prefix)
        alice = self.context.socket(zmq.PAIR)
        bob = self.context.socket(zmq.PAIR)
        mon = self.context.socket(zmq.SUB)

        aport = alice.bind_to_random_port('tcp://127.0.0.1')
        bport = bob.bind_to_random_port('tcp://127.0.0.1')
        mport = mon.bind_to_random_port('tcp://127.0.0.1')
        mon.setsockopt(zmq.SUBSCRIBE, mon_sub)

        self.device.connect_in("tcp://127.0.0.1:%i"%aport)
        self.device.connect_out("tcp://127.0.0.1:%i"%bport)
        self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
        self.device.start()
        time.sleep(.2)
        try:
            # this is currenlty necessary to ensure no dropped monitor messages
            # see LIBZMQ-248 for more info
            mon.recv_multipart(zmq.NOBLOCK)
        except zmq.ZMQError:
            pass
        self.sockets.extend([alice, bob, mon])
        return alice, bob, mon
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_init_socket(self):
        pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
        logger = self.logger
        handler = handlers.PUBHandler(pub)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)

        self.assertTrue(handler.socket is pub)
        self.assertTrue(handler.ctx is pub.context)
        self.assertTrue(handler.ctx is self.context)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        import time; time.sleep(0.1)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_root_topic(self):
        logger, handler, sub = self.connect_handler()
        handler.socket.bind(self.iface)
        sub2 = sub.context.socket(zmq.SUB)
        self.sockets.append(sub2)
        sub2.connect(self.iface)
        sub2.setsockopt(zmq.SUBSCRIBE, b'')
        handler.root_topic = b'twoonly'
        msg1 = 'ignored'
        logger.info(msg1)
        self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
        topic,msg2 = sub2.recv_multipart()
        self.assertEqual(topic, b'twoonly.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')

        logger.removeHandler(handler)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
        self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
                                            in_prefix, out_prefix)
        alice = self.context.socket(zmq.PAIR)
        bob = self.context.socket(zmq.PAIR)
        mon = self.context.socket(zmq.SUB)

        aport = alice.bind_to_random_port('tcp://127.0.0.1')
        bport = bob.bind_to_random_port('tcp://127.0.0.1')
        mport = mon.bind_to_random_port('tcp://127.0.0.1')
        mon.setsockopt(zmq.SUBSCRIBE, mon_sub)

        self.device.connect_in("tcp://127.0.0.1:%i"%aport)
        self.device.connect_out("tcp://127.0.0.1:%i"%bport)
        self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
        self.device.start()
        time.sleep(.2)
        try:
            # this is currenlty necessary to ensure no dropped monitor messages
            # see LIBZMQ-248 for more info
            mon.recv_multipart(zmq.NOBLOCK)
        except zmq.ZMQError:
            pass
        self.sockets.extend([alice, bob, mon])
        return alice, bob, mon
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_init_iface(self):
        logger = self.logger
        ctx = self.context
        handler = handlers.PUBHandler(self.iface)
        self.assertFalse(handler.ctx is ctx)
        self.sockets.append(handler.socket)
        # handler.ctx.term()
        handler = handlers.PUBHandler(self.iface, self.context)
        self.sockets.append(handler.socket)
        self.assertTrue(handler.ctx is ctx)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)
        sub = ctx.socket(zmq.SUB)
        self.sockets.append(sub)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        sub.connect(self.iface)
        import time; time.sleep(0.25)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_root_topic(self):
        logger, handler, sub = self.connect_handler()
        handler.socket.bind(self.iface)
        sub2 = sub.context.socket(zmq.SUB)
        self.sockets.append(sub2)
        sub2.connect(self.iface)
        sub2.setsockopt(zmq.SUBSCRIBE, b'')
        handler.root_topic = b'twoonly'
        msg1 = 'ignored'
        logger.info(msg1)
        self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
        topic,msg2 = sub2.recv_multipart()
        self.assertEqual(topic, b'twoonly.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')

        logger.removeHandler(handler)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
        self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
                                            in_prefix, out_prefix)
        alice = self.context.socket(zmq.PAIR)
        bob = self.context.socket(zmq.PAIR)
        mon = self.context.socket(zmq.SUB)

        aport = alice.bind_to_random_port('tcp://127.0.0.1')
        bport = bob.bind_to_random_port('tcp://127.0.0.1')
        mport = mon.bind_to_random_port('tcp://127.0.0.1')
        mon.setsockopt(zmq.SUBSCRIBE, mon_sub)

        self.device.connect_in("tcp://127.0.0.1:%i"%aport)
        self.device.connect_out("tcp://127.0.0.1:%i"%bport)
        self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
        self.device.start()
        time.sleep(.2)
        try:
            # this is currenlty necessary to ensure no dropped monitor messages
            # see LIBZMQ-248 for more info
            mon.recv_multipart(zmq.NOBLOCK)
        except zmq.ZMQError:
            pass
        self.sockets.extend([alice, bob, mon])
        return alice, bob, mon
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_init_iface(self):
        logger = self.logger
        ctx = self.context
        handler = handlers.PUBHandler(self.iface)
        self.assertFalse(handler.ctx is ctx)
        self.sockets.append(handler.socket)
        # handler.ctx.term()
        handler = handlers.PUBHandler(self.iface, self.context)
        self.sockets.append(handler.socket)
        self.assertTrue(handler.ctx is ctx)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)
        sub = ctx.socket(zmq.SUB)
        self.sockets.append(sub)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        sub.connect(self.iface)
        import time; time.sleep(0.25)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_init_socket(self):
        pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
        logger = self.logger
        handler = handlers.PUBHandler(pub)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)

        self.assertTrue(handler.socket is pub)
        self.assertTrue(handler.ctx is pub.context)
        self.assertTrue(handler.ctx is self.context)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        import time; time.sleep(0.1)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
        self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
                                            in_prefix, out_prefix)
        alice = self.context.socket(zmq.PAIR)
        bob = self.context.socket(zmq.PAIR)
        mon = self.context.socket(zmq.SUB)

        aport = alice.bind_to_random_port('tcp://127.0.0.1')
        bport = bob.bind_to_random_port('tcp://127.0.0.1')
        mport = mon.bind_to_random_port('tcp://127.0.0.1')
        mon.setsockopt(zmq.SUBSCRIBE, mon_sub)

        self.device.connect_in("tcp://127.0.0.1:%i"%aport)
        self.device.connect_out("tcp://127.0.0.1:%i"%bport)
        self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
        self.device.start()
        time.sleep(.2)
        try:
            # this is currenlty necessary to ensure no dropped monitor messages
            # see LIBZMQ-248 for more info
            mon.recv_multipart(zmq.NOBLOCK)
        except zmq.ZMQError:
            pass
        self.sockets.extend([alice, bob, mon])
        return alice, bob, mon
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_init_iface(self):
        logger = self.logger
        ctx = self.context
        handler = handlers.PUBHandler(self.iface)
        self.assertFalse(handler.ctx is ctx)
        self.sockets.append(handler.socket)
        # handler.ctx.term()
        handler = handlers.PUBHandler(self.iface, self.context)
        self.sockets.append(handler.socket)
        self.assertTrue(handler.ctx is ctx)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)
        sub = ctx.socket(zmq.SUB)
        self.sockets.append(sub)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        sub.connect(self.iface)
        import time; time.sleep(0.25)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_init_socket(self):
        pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
        logger = self.logger
        handler = handlers.PUBHandler(pub)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)

        self.assertTrue(handler.socket is pub)
        self.assertTrue(handler.ctx is pub.context)
        self.assertTrue(handler.ctx is self.context)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        import time; time.sleep(0.1)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_root_topic(self):
        logger, handler, sub = self.connect_handler()
        handler.socket.bind(self.iface)
        sub2 = sub.context.socket(zmq.SUB)
        self.sockets.append(sub2)
        sub2.connect(self.iface)
        sub2.setsockopt(zmq.SUBSCRIBE, b'')
        handler.root_topic = b'twoonly'
        msg1 = 'ignored'
        logger.info(msg1)
        self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
        topic,msg2 = sub2.recv_multipart()
        self.assertEqual(topic, b'twoonly.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')

        logger.removeHandler(handler)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_init_iface(self):
        logger = self.logger
        ctx = self.context
        handler = handlers.PUBHandler(self.iface)
        self.assertFalse(handler.ctx is ctx)
        self.sockets.append(handler.socket)
        # handler.ctx.term()
        handler = handlers.PUBHandler(self.iface, self.context)
        self.sockets.append(handler.socket)
        self.assertTrue(handler.ctx is ctx)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)
        sub = ctx.socket(zmq.SUB)
        self.sockets.append(sub)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        sub.connect(self.iface)
        import time; time.sleep(0.25)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_init_socket(self):
        pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
        logger = self.logger
        handler = handlers.PUBHandler(pub)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)

        self.assertTrue(handler.socket is pub)
        self.assertTrue(handler.ctx is pub.context)
        self.assertTrue(handler.ctx is self.context)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        import time; time.sleep(0.1)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_root_topic(self):
        logger, handler, sub = self.connect_handler()
        handler.socket.bind(self.iface)
        sub2 = sub.context.socket(zmq.SUB)
        self.sockets.append(sub2)
        sub2.connect(self.iface)
        sub2.setsockopt(zmq.SUBSCRIBE, b'')
        handler.root_topic = b'twoonly'
        msg1 = 'ignored'
        logger.info(msg1)
        self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
        topic,msg2 = sub2.recv_multipart()
        self.assertEqual(topic, b'twoonly.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')

        logger.removeHandler(handler)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
        self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
                                            in_prefix, out_prefix)
        alice = self.context.socket(zmq.PAIR)
        bob = self.context.socket(zmq.PAIR)
        mon = self.context.socket(zmq.SUB)

        aport = alice.bind_to_random_port('tcp://127.0.0.1')
        bport = bob.bind_to_random_port('tcp://127.0.0.1')
        mport = mon.bind_to_random_port('tcp://127.0.0.1')
        mon.setsockopt(zmq.SUBSCRIBE, mon_sub)

        self.device.connect_in("tcp://127.0.0.1:%i"%aport)
        self.device.connect_out("tcp://127.0.0.1:%i"%bport)
        self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
        self.device.start()
        time.sleep(.2)
        try:
            # this is currenlty necessary to ensure no dropped monitor messages
            # see LIBZMQ-248 for more info
            mon.recv_multipart(zmq.NOBLOCK)
        except zmq.ZMQError:
            pass
        self.sockets.extend([alice, bob, mon])
        return alice, bob, mon
项目:hydra    作者:lake-lerna    | 项目源码 | 文件源码
def sub_task(self, name, ip_port_sub):
        if (ip_port_sub == "0"):
            return
        ctx = zmq.Context()
        # subscribe socket
        socket_sub = ctx.socket(zmq.SUB)
        socket_sub.connect("tcp://%s" % ip_port_sub)
        socket_sub.setsockopt(zmq.SUBSCRIBE, '')
        total_value = 0
        self.sub_msg_cnt = 0
        while not self.shutdown:
            string = socket_sub.recv()
            topic, messageData = string.split()
            total_value += int(messageData)
            self.sub_msg_cnt += 1
            print("SUB:: [%d] %s %s" % (self.sub_msg_cnt, topic, messageData))
项目:OldSpeak    作者:0rbitAeolian    | 项目源码 | 文件源码
def execute_command_forwarder():
    from oldspeak.console.parsers.streamer import parser

    args = parser.parse_args(get_sub_parser_argv())
    bootstrap_conf_with_gevent(args)

    device = Device(zmq.FORWARDER, zmq.SUB, zmq.PUB)

    device.bind_in(args.subscriber)
    device.bind_out(args.publisher)
    device.setsockopt_in(zmq.SUBSCRIBE, b'')
    if args.subscriber_hwm:
        device.setsockopt_in(zmq.RCVHWM, args.subscriber_hwm)

    if args.publisher_hwm:
        device.setsockopt_out(zmq.SNDHWM, args.publisher_hwm)

    print "oldspeak forwarder started"
    print "date", datetime.utcnow().isoformat()
    print "subscriber", (getattr(args, 'subscriber'))
    print "publisher", (getattr(args, 'publisher'))
    device.start()
项目:pymindwave    作者:frans-fuerst    | 项目源码 | 文件源码
def _data_listener(self):
        if len(sys.argv) > 1:
            for l in open(sys.argv[1]).readlines():
                QtCore.QMetaObject.invokeMethod(
                    self, "_on_server_message",
                    QtCore.Qt.QueuedConnection,
                    QtCore.Q_ARG(dict, json.loads(l)))

        port = 9876
        context = zmq.Context()
        socket = context.socket(zmq.SUB)

        socket.connect ("tcp://localhost:%d" % port)
        socket.setsockopt(zmq.SUBSCRIBE, '')
        while True:
            msg = socket.recv_json()
            try:
                QtCore.QMetaObject.invokeMethod(
                    self, "_on_server_message",
                    QtCore.Qt.QueuedConnection,
                    QtCore.Q_ARG(dict, msg))
            except AttributeError:
                pass
项目:jps    作者:OTL    | 项目源码 | 文件源码
def main(pub_port=None, sub_port=None):
    '''main of forwarder

    :param sub_port: port for subscribers
    :param pub_port: port for publishers
    '''
    try:
        if sub_port is None:
            sub_port = get_sub_port()
        if pub_port is None:
            pub_port = get_pub_port()
        context = zmq.Context(1)
        frontend = context.socket(zmq.SUB)
        backend = context.socket(zmq.PUB)

        frontend.bind('tcp://*:{pub_port}'.format(pub_port=pub_port))
        frontend.setsockopt(zmq.SUBSCRIBE, b'')
        backend.bind('tcp://*:{sub_port}'.format(sub_port=sub_port))
        zmq.device(zmq.FORWARDER, frontend, backend)
    except KeyboardInterrupt:
        pass
    finally:
        frontend.close()
        backend.close()
        context.term()
项目:blink1-python-old    作者:todbot    | 项目源码 | 文件源码
def run(white_point):
    config = discover()
    downstream_url = config['downstream']

    socket = context.socket(zmq.SUB)
    socket.connect(config['downstream'])
    log.info("Connecting to %s" % downstream_url)

    socket.setsockopt_string(zmq.SUBSCRIBE, DEFAULT_CHANNEL)
    stream = ZMQStream(socket)

    loop = ioloop.IOLoop.instance()

    with blink1(white_point=white_point) as b1:
        reciever = Receiver(b1, loop)
        stream.on_recv(reciever.recieve)

        loop.add_callback(reciever.throbber)

        loop.start()
项目:napalm-logs    作者:napalm-automation    | 项目源码 | 文件源码
def startup_local_client():
    '''
    Startup a local ZMQ client to receive the published messages.
    '''
    time.sleep(2)
    global TEST_CLIENT
    context = zmq.Context()
    TEST_CLIENT = context.socket(zmq.SUB)
    TEST_CLIENT.connect('tcp://{addr}:{port}'.format(
        addr=NAPALM_LOGS_TEST_PUB_ADDR,
        port=NAPALM_LOGS_TEST_PUB_PORT)
    )
    TEST_CLIENT.setsockopt(zmq.SUBSCRIBE, b'')


# Startup the local ZMQ client.
项目:agentzero    作者:gabrielfalcao    | 项目源码 | 文件源码
def set_topic(self, name, topic):
        """shortcut to :py:meth:SocketManager.set_socket_option(zmq.TOPIC, topic)

        :param name: the name of the socket where data will pad through
        :param topic: the option from the ``zmq`` module

        **Example:**

        ::

          >>> import zmq
          >>> from agentzero.core import SocketManager
          >>>
          >>> sockets = SocketManager()
          >>> sockets.ensure_and_bind('events', zmq.SUB, 'tcp://*:6000', zmq.POLLIN)
          >>>
          >>> # subscribe only to topics beginning with "logs"
          >>> sockets.set_topic('events', 'logs')
          >>> event = sockets.recv_event_safe('events')
          >>> event.topic, event.data
          'logs:2016-06-20', {'stdout': 'hello world'}
        """

        safe_topic = bytes(topic)
        self.set_socket_option(name, self.zmq.SUBSCRIBE, safe_topic)
项目:DarkWallet    作者:DissentDifference    | 项目源码 | 文件源码
def feedback_loop(self, *args):
        # feedback socket
        ctx = zmq.Context()
        socket = ctx.socket(zmq.SUB)
        socket.setsockopt(zmq.SUBSCRIBE, "")
        socket.connect(config.get("broadcaster-feedback-url", "tcp://localhost:9110"))
        print "brc feedback channel connected"
        while True:
            msg = [socket.recv()]
            while socket.getsockopt(zmq.RCVMORE):
                msg.append(socket.recv())
                print "feedback msg"
            if len(msg) == 3:
                self.on_feedback_msg(*msg)
            else:
                print "bad feedback message", len(msg)
项目:DarkWallet    作者:DissentDifference    | 项目源码 | 文件源码
def status_loop(self, *args):
        # feedback socket
        print "connect brc feedback"
        ctx = zmq.Context()
        socket = ctx.socket(zmq.SUB)
        socket.setsockopt(zmq.SUBSCRIBE, "")
        socket.connect(config.get("broadcaster-feedback-url", "tcp://localhost:9112"))
        print "brc status channel connected"
        while True:
            msg = socket.recv()
            nodes = 0
            try:
                nodes = struct.unpack("<Q", msg)[0]
                self.last_status = time.time()
            except:
                print "bad nodes data", msg
            if not nodes == self.last_nodes:
                print "brc hosts", nodes
                self.last_nodes = nodes