我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用zmq.UNSUBSCRIBE。
def unsubscribe(self, alias: str, topic: Union[bytes, str]) -> None: ''' Unsubscribe a SUB/SYNC_SUB socket given by its alias from a given specific topic, and delete its entry from the handlers dictionary. If instead of a single topic, a tuple or a list of topics is passed, the agent will unsubscribe from all the supplied topics. ''' if isinstance(topic, (tuple, list)): for t in topic: self.unsubscribe(alias, t) return topic = topic_to_bytes(topic) if isinstance(self.address[alias], AgentAddress): self.socket[alias].setsockopt(zmq.UNSUBSCRIBE, topic) del self.handler[self.socket[alias]][topic] elif isinstance(self.address[alias], AgentChannel): channel = self.address[alias] sub_address = channel.receiver treated_topic = channel.twin_uuid + topic self.socket[sub_address].setsockopt(zmq.UNSUBSCRIBE, treated_topic) del self.handler[self.socket[sub_address]][treated_topic] else: raise NotImplementedError('Unsupported address type %s!' % self.address[alias])
def unsubscribe_all(self): """ Subscription to all events. """ self.socket.setsockopt(zmq.UNSUBSCRIBE, '')
def unsubscribe(self, code): """ Remove subscription to the event named code. """ self.socket.setsockopt(zmq.UNSUBSCRIBE, code.encode('utf-8')) # reception part
def unsubscribe(cls, username): cls._zmq_stream.socket.setsockopt(zmq.UNSUBSCRIBE, username.encode('utf-8'))
def set_socket_option(self, name, option, value): """calls ``zmq.setsockopt`` on the given socket. :param name: the name of the socket where data will pad through :param option: the option from the ``zmq`` module :param value: Here are some examples of options: * ``zmq.HWM``: Set high water mark * ``zmq.AFFINITY``: Set I/O thread affinity * ``zmq.IDENTITY``: Set socket identity * ``zmq.SUBSCRIBE``: Establish message filter * ``zmq.UNSUBSCRIBE``: Remove message filter * ``zmq.SNDBUF``: Set kernel transmit buffer size * ``zmq.RCVBUF``: Set kernel receive buffer size * ``zmq.LINGER``: Set linger period for socket shutdown * ``zmq.BACKLOG``: Set maximum length of the queue of outstanding connections * for the full list go to ``http://api.zeromq.org/4-0:zmq-setsockopt`` **Example:** :: >>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.create('pipe-in', zmq.PULL) >>> >>> # block after 10 messages are queued >>> sockets.set_socket_option('pipe-in', zmq.HWM, 10) """ socket = self.get_by_name(name) socket.setsockopt(option, value)
def _connect(self): # Subscribe to the hello topic - once we recieve a hello we'll send a request # for real data. The callback plugin will effectively block execution until we # Send this request self.socket.setsockopt(zmq.SUBSCRIBE, 'hello') # Define the control socket for responding to the 'hello' topic control_socket = self.context.socket(zmq.REQ) control_socket.connect(self._env['DAUBER_CONTROL_SOCKET_URI']) timeout = 500 t_last = time.time() while (time.time() - t_last) < timeout: ready = dict(self.poller.poll()) if ready.get(self.socket): topic, _ = self.socket.recv_multipart() if topic == 'hello': # Signal that we've connected and we're ready to recieve data control_socket.send(b'') control_socket.recv() break assert (time.time() - t_last) < timeout, \ "Timed out before recieving a hello topic message from the publisher." del control_socket self.socket.setsockopt(zmq.UNSUBSCRIBE, 'hello')
def test_change_subscription(self): # FIXME: Extensive testing showed this particular test is the root cause # of sporadic failures on Travis. pub, sub, port = self.create_bound_pair(zmq.PUB, zmq.SUB) sub.setsockopt(zmq.SUBSCRIBE, b'test') eventlet.sleep(0) sub_ready = eventlet.Event() sub_last = eventlet.Event() sub_done = eventlet.Event() def rx(): while sub.recv() != b'test BEGIN': eventlet.sleep(0) sub_ready.send() count = 0 while True: msg = sub.recv() if msg == b'test BEGIN': # BEGIN may come many times continue if msg == b'test LAST': sub.setsockopt(zmq.SUBSCRIBE, b'done') sub.setsockopt(zmq.UNSUBSCRIBE, b'test') eventlet.sleep(0) # In real application you should either sync # or tolerate loss of messages. sub_last.send() if msg == b'done DONE': break count += 1 sub_done.send(count) def tx(): # Sync receiver ready to avoid loss of first packets while not sub_ready.ready(): pub.send(b'test BEGIN') eventlet.sleep(0.005) for i in range(1, 101): msg = 'test {0}'.format(i).encode() if i != 50: pub.send(msg) else: pub.send(b'test LAST') sub_last.wait() # XXX: putting a real delay of 1ms here fixes sporadic failures on Travis # just yield eventlet.sleep(0) doesn't cut it eventlet.sleep(0.001) pub.send(b'done DONE') eventlet.spawn(rx) eventlet.spawn(tx) rx_count = sub_done.wait() self.assertEqual(rx_count, 50)