我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.SUB。
def __init__(self, opts=None): if opts is None: self.opts = self.process_config(CONFIG_LOCATION) else: self.opts = opts # Start setting up ZeroMQ self.ctx = zmq.Context() self.socket = self.ctx.socket(zmq.SUB) self.socket.connect('tcp://localhost:2000') self.loop = zmq.eventloop.IOLoop.instance() self.stream = zmq.eventloop.zmqstream.ZMQStream(self.socket, self.loop) self.stream.on_recv(act) # Load up actions self.actions = loader.load_actions(self.opts, '/home/mp/devel/eventdrivetalk/actions')
def run(self): """ Entry point for the live plotting when started as a separate process. This starts the loop """ self.entity_name = current_process().name plogger.info("Starting new thread %s", self.entity_name) self.context = zmq.Context() self.socket = self.context.socket(zmq.SUB) self.socket.connect("tcp://localhost:%d" % self.port) topic = pickle.dumps(self.var_name, protocol=pickle.HIGHEST_PROTOCOL) self.socket.setsockopt(zmq.SUBSCRIBE, topic) plogger.info("Subscribed to topic %s on port %d", self.var_name, self.port) self.init(**self.init_kwargs) # Reference to animation required so that GC doesn't clean it up. # WILL NOT work if you remove it!!!!! # See: http://matplotlib.org/api/animation_api.html ani = animation.FuncAnimation(self.fig, self.loop, interval=100) self.plt.show()
def rec(port): zmq_ctx = zmq.Context() s = zmq_ctx.socket(zmq.SUB) s.bind('tcp://*:{port}'.format(port=port)) s.setsockopt(zmq.SUBSCRIBE, b"") stream = ZMQStream(s) stream.on_recv_stream(rec_frame) ioloop.IOLoop.instance().start() while True: pass
def test_unicode_sockopts(self): """test setting/getting sockopts with unicode strings""" topic = "tést" if str is not unicode: topic = topic.decode('utf8') p,s = self.create_bound_pair(zmq.PUB, zmq.SUB) self.assertEqual(s.send_unicode, s.send_unicode) self.assertEqual(p.recv_unicode, p.recv_unicode) self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic) self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic) s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16') self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic) s.setsockopt_unicode(zmq.SUBSCRIBE, topic) self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY) self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE) identb = s.getsockopt(zmq.IDENTITY) identu = identb.decode('utf16') identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16') self.assertEqual(identu, identu2) time.sleep(0.1) # wait for connection/subscription p.send_unicode(topic,zmq.SNDMORE) p.send_unicode(topic*2, encoding='latin-1') self.assertEqual(topic, s.recv_unicode()) self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
def test_hwm(self): zmq3 = zmq.zmq_version_info()[0] >= 3 for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER): s = self.context.socket(stype) s.hwm = 100 self.assertEqual(s.hwm, 100) if zmq3: try: self.assertEqual(s.sndhwm, 100) except AttributeError: pass try: self.assertEqual(s.rcvhwm, 100) except AttributeError: pass s.close()
def test_init_iface(self): logger = self.logger ctx = self.context handler = handlers.PUBHandler(self.iface) self.assertFalse(handler.ctx is ctx) self.sockets.append(handler.socket) # handler.ctx.term() handler = handlers.PUBHandler(self.iface, self.context) self.sockets.append(handler.socket) self.assertTrue(handler.ctx is ctx) handler.setLevel(logging.DEBUG) handler.root_topic = self.topic logger.addHandler(handler) sub = ctx.socket(zmq.SUB) self.sockets.append(sub) sub.setsockopt(zmq.SUBSCRIBE, b(self.topic)) sub.connect(self.iface) import time; time.sleep(0.25) msg1 = 'message' logger.info(msg1) (topic, msg2) = sub.recv_multipart() self.assertEqual(topic, b'zmq.INFO') self.assertEqual(msg2, b(msg1)+b'\n') logger.removeHandler(handler)
def test_init_socket(self): pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB) logger = self.logger handler = handlers.PUBHandler(pub) handler.setLevel(logging.DEBUG) handler.root_topic = self.topic logger.addHandler(handler) self.assertTrue(handler.socket is pub) self.assertTrue(handler.ctx is pub.context) self.assertTrue(handler.ctx is self.context) sub.setsockopt(zmq.SUBSCRIBE, b(self.topic)) import time; time.sleep(0.1) msg1 = 'message' logger.info(msg1) (topic, msg2) = sub.recv_multipart() self.assertEqual(topic, b'zmq.INFO') self.assertEqual(msg2, b(msg1)+b'\n') logger.removeHandler(handler)
def test_root_topic(self): logger, handler, sub = self.connect_handler() handler.socket.bind(self.iface) sub2 = sub.context.socket(zmq.SUB) self.sockets.append(sub2) sub2.connect(self.iface) sub2.setsockopt(zmq.SUBSCRIBE, b'') handler.root_topic = b'twoonly' msg1 = 'ignored' logger.info(msg1) self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK) topic,msg2 = sub2.recv_multipart() self.assertEqual(topic, b'twoonly.INFO') self.assertEqual(msg2, b(msg1)+b'\n') logger.removeHandler(handler)
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'): self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB, in_prefix, out_prefix) alice = self.context.socket(zmq.PAIR) bob = self.context.socket(zmq.PAIR) mon = self.context.socket(zmq.SUB) aport = alice.bind_to_random_port('tcp://127.0.0.1') bport = bob.bind_to_random_port('tcp://127.0.0.1') mport = mon.bind_to_random_port('tcp://127.0.0.1') mon.setsockopt(zmq.SUBSCRIBE, mon_sub) self.device.connect_in("tcp://127.0.0.1:%i"%aport) self.device.connect_out("tcp://127.0.0.1:%i"%bport) self.device.connect_mon("tcp://127.0.0.1:%i"%mport) self.device.start() time.sleep(.2) try: # this is currenlty necessary to ensure no dropped monitor messages # see LIBZMQ-248 for more info mon.recv_multipart(zmq.NOBLOCK) except zmq.ZMQError: pass self.sockets.extend([alice, bob, mon]) return alice, bob, mon
def main(): context = zmq.Context() socket = zmq.Socket(context, zmq.SUB) monitor = socket.get_monitor_socket() socket.connect(ipc_sub_url) while True: status = recv_monitor_message(monitor) if status['event'] == zmq.EVENT_CONNECTED: break elif status['event'] == zmq.EVENT_CONNECT_DELAYED: pass print('connected') socket.subscribe('pupil') while True: topic = socket.recv_string() payload = serializer.loads(socket.recv(), encoding='utf-8') print(topic, payload)
def __init__(self, ctx, url, topics=(), block_until_connected=True): self.socket = zmq.Socket(ctx, zmq.SUB) assert type(topics) != str if block_until_connected: # connect node and block until a connecetion has been made monitor = self.socket.get_monitor_socket() self.socket.connect(url) while True: status = recv_monitor_message(monitor) if status['event'] == zmq.EVENT_CONNECTED: break elif status['event'] == zmq.EVENT_CONNECT_DELAYED: pass else: raise Exception("ZMQ connection failed") self.socket.disable_monitor() else: self.socket.connect(url) for t in topics: self.subscribe(t)
def __init__(self, reqAddress, subAddress): """Constructor""" super(RpcClient, self).__init__() # zmq???? self.__reqAddress = reqAddress self.__subAddress = subAddress self.__context = zmq.Context() self.__socketREQ = self.__context.socket(zmq.REQ) # ????socket self.__socketSUB = self.__context.socket(zmq.SUB) # ????socket # ??????????????????? self.__active = False # ???????? self.__thread = threading.Thread(target=self.run) # ???????? #----------------------------------------------------------------------
def create_socket(port): """ Create zmq sub socket. """ context = zmq.Context() socket = context.socket(zmq.SUB) try: socket.bind("tcp://*:%s" % port) except zmq.error.ZMQError: print("Address already in use") sys.exit(1) socket.setsockopt(zmq.SUBSCRIBE, b"") print("Start node-masternode Subscribe") return socket, context
def __init__(self, loop, logger, config): print("test") self.loop = loop self.log = logger self.config = config self.zmq_url = config["BITCOIND"]["zeromq"] self.zmqContext = zmq.asyncio.Context() self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) self.MYSQL_CONFIG = config["MYSQL"] self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") # self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx") # self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock") # self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx") self.zmqSubSocket.connect(self.zmq_url) print(self.zmq_url) self.loop.create_task(self.init_db()) self.loop.create_task(self.handle()) self.loop.create_task(self.rpctest()) # self.loop.create_task(self.mysqltest())
def brute_zmq(host, port=5555, user=None, password=None, db=0): context = zmq.Context() # Configure socket = context.socket(zmq.SUB) socket.setsockopt(zmq.SUBSCRIBE, b"") # All topics socket.setsockopt(zmq.LINGER, 0) # All topics socket.RCVTIMEO = 1000 # timeout: 1 sec # Connect socket.connect("tcp://%s:%s" % (host, port)) # Try to receive try: socket.recv() return True except Exception: return False finally: socket.close()
def handle_zmq(host, port=5555, extra_config=None): # log.debug(" * Connection to ZeroMQ: %s : %s" % (host, port)) context = zmq.Context() # Configure socket = context.socket(zmq.SUB) socket.setsockopt(zmq.SUBSCRIBE, b"") # All topics socket.setsockopt(zmq.LINGER, 0) # All topics socket.RCVTIMEO = 1000 # timeout: 1 sec # Connect socket.connect("tcp://%s:%s" % (host, port)) # Try to receive try: socket.recv() return True except Exception: return False finally: socket.close()
def test_tcp_pub_socket(event_loop, socket_factory, connect_or_bind): sub_socket = socket_factory.create(zmq.SUB) sub_socket.setsockopt(zmq.SUBSCRIBE, b'a') connect_or_bind(sub_socket, 'tcp://127.0.0.1:3333', reverse=True) def run(): frames = sub_socket.recv_multipart() assert frames == [b'a', b'message'] with run_in_background(run) as thread_done_event: async with azmq.Context(loop=event_loop) as context: socket = context.socket(azmq.PUB) connect_or_bind(socket, 'tcp://127.0.0.1:3333') while not thread_done_event.is_set(): await socket.send_multipart([b'a', b'message']) await socket.send_multipart([b'b', b'wrong'])
def test_tcp_xpub_socket(event_loop, socket_factory, connect_or_bind): sub_socket = socket_factory.create(zmq.SUB) sub_socket.setsockopt(zmq.SUBSCRIBE, b'a') connect_or_bind(sub_socket, 'tcp://127.0.0.1:3333', reverse=True) def run(): frames = sub_socket.recv_multipart() assert frames == [b'a', b'message'] with run_in_background(run) as thread_done_event: async with azmq.Context(loop=event_loop) as context: socket = context.socket(azmq.XPUB) connect_or_bind(socket, 'tcp://127.0.0.1:3333') frames = await asyncio.wait_for(socket.recv_multipart(), 1) assert frames == [b'\1a'] while not thread_done_event.is_set(): await socket.send_multipart([b'a', b'message']) await socket.send_multipart([b'b', b'wrong']) sub_socket.close() frames = await asyncio.wait_for(socket.recv_multipart(), 1) assert frames == [b'\0a']
def test_tcp_sub_socket(event_loop, socket_factory, connect_or_bind): xpub_socket = socket_factory.create(zmq.XPUB) connect_or_bind(xpub_socket, 'tcp://127.0.0.1:3333', reverse=True) def run(): # Wait one second for the subscription to arrive. assert xpub_socket.poll(1000) == zmq.POLLIN topic = xpub_socket.recv_multipart() assert topic == [b'\x01a'] xpub_socket.send_multipart([b'a', b'message']) if connect_or_bind == 'connect': assert xpub_socket.poll(1000) == zmq.POLLIN topic = xpub_socket.recv_multipart() assert topic == [b'\x00a'] with run_in_background(run): async with azmq.Context(loop=event_loop) as context: socket = context.socket(azmq.SUB) await socket.subscribe(b'a') connect_or_bind(socket, 'tcp://127.0.0.1:3333') frames = await asyncio.wait_for(socket.recv_multipart(), 1) assert frames == [b'a', b'message']