我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用zmq.HWM。
def get_hwm(self): """get the High Water Mark On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM """ major = zmq.zmq_version_info()[0] if major >= 3: # return sndhwm, fallback on rcvhwm try: return self.getsockopt(zmq.SNDHWM) except zmq.ZMQError as e: pass return self.getsockopt(zmq.RCVHWM) else: return self.getsockopt(zmq.HWM)
def set_hwm(self, value): """set the High Water Mark On libzmq ? 3, this sets both SNDHWM and RCVHWM """ major = zmq.zmq_version_info()[0] if major >= 3: raised = None try: self.sndhwm = value except Exception as e: raised = e try: self.rcvhwm = value except Exception: raised = e if raised: raise raised else: return self.setsockopt(zmq.HWM, value)
def _setup_ipc(self): ''' Subscribe to the pub IPC and publish the messages on the right transport. ''' self.ctx = zmq.Context() log.debug('Setting up the publisher puller') self.sub = self.ctx.socket(zmq.PULL) self.sub.bind(PUB_IPC_URL) try: self.sub.setsockopt(zmq.HWM, self.opts['hwm']) # zmq 2 except AttributeError: # zmq 3 self.sub.setsockopt(zmq.RCVHWM, self.opts['hwm'])
def set_hwm(self, value): """set the High Water Mark On libzmq ? 3, this sets both SNDHWM and RCVHWM .. warning:: New values only take effect for subsequent socket bind/connects. """ major = zmq.zmq_version_info()[0] if major >= 3: raised = None try: self.sndhwm = value except Exception as e: raised = e try: self.rcvhwm = value except Exception as e: raised = e if raised: raise raised else: return self.setsockopt(zmq.HWM, value)
def _setup_ipc(self): ''' Setup the listener ICP pusher. ''' log.debug('Setting up the listener IPC pusher') self.ctx = zmq.Context() self.pub = self.ctx.socket(zmq.PUSH) self.pub.connect(LST_IPC_URL) log.debug('Setting HWM for the listener: %d', self.opts['hwm']) try: self.pub.setsockopt(zmq.HWM, self.opts['hwm']) # zmq 2 except AttributeError: # zmq 3 self.pub.setsockopt(zmq.SNDHWM, self.opts['hwm'])
def _setup_ipc(self): ''' Subscribe to the right topic in the device IPC and publish to the publisher proxy. ''' self.ctx = zmq.Context() # subscribe to device IPC log.debug('Creating the dealer IPC for %s', self._name) self.sub = self.ctx.socket(zmq.DEALER) if six.PY2: self.sub.setsockopt(zmq.IDENTITY, self._name) elif six.PY3: self.sub.setsockopt(zmq.IDENTITY, bytes(self._name, 'utf-8')) try: self.sub.setsockopt(zmq.HWM, self.opts['hwm']) # zmq 2 except AttributeError: # zmq 3 self.sub.setsockopt(zmq.RCVHWM, self.opts['hwm']) # subscribe to the corresponding IPC pipe self.sub.connect(DEV_IPC_URL) # self.sub.setsockopt(zmq.SUBSCRIBE, '') # publish to the publisher IPC self.pub = self.ctx.socket(zmq.PUSH) self.pub.connect(PUB_IPC_URL) try: self.pub.setsockopt(zmq.HWM, self.opts['hwm']) # zmq 2 except AttributeError: # zmq 3 self.pub.setsockopt(zmq.SNDHWM, self.opts['hwm'])
def start(self): ''' Startup the zmq consumer. ''' zmq_uri = '{protocol}://{address}:{port}'.format( protocol=self.protocol, address=self.address, port=self.port ) if self.port else\ '{protocol}://{address}'.format( # noqa protocol=self.protocol, address=self.address ) log.debug('ZMQ URI: %s', zmq_uri) self.ctx = zmq.Context() if hasattr(zmq, self.type): skt_type = getattr(zmq, self.type) else: skt_type = zmq.PULL self.sub = self.ctx.socket(skt_type) self.sub.connect(zmq_uri) if self.hwm is not None: try: self.sub.setsockopt(zmq.HWM, self.hwm) except AttributeError: self.sub.setsockopt(zmq.RCVHWM, self.hwm) if self.recvtimeout is not None: log.debug('Setting RCVTIMEO to %d', self.recvtimeout) self.sub.setsockopt(zmq.RCVTIMEO, self.recvtimeout) if self.keepalive is not None: log.debug('Setting TCP_KEEPALIVE to %d', self.keepalive) self.sub.setsockopt(zmq.TCP_KEEPALIVE, self.keepalive) if self.keepalive_idle is not None: log.debug('Setting TCP_KEEPALIVE_IDLE to %d', self.keepalive_idle) self.sub.setsockopt(zmq.TCP_KEEPALIVE_IDLE, self.keepalive_idle) if self.keepalive_interval is not None: log.debug('Setting TCP_KEEPALIVE_INTVL to %d', self.keepalive_interval) self.sub.setsockopt(zmq.TCP_KEEPALIVE_INTVL, self.keepalive_interval)
def _setup_ipc(self): ''' Setup the IPC pub and sub. Subscript to the listener IPC and publish to the device specific IPC. ''' log.debug('Setting up the server IPC puller to receive from the listener') self.ctx = zmq.Context() # subscribe to listener self.sub = self.ctx.socket(zmq.PULL) self.sub.bind(LST_IPC_URL) try: self.sub.setsockopt(zmq.HWM, self.opts['hwm']) # zmq 2 except AttributeError: # zmq 3 self.sub.setsockopt(zmq.RCVHWM, self.opts['hwm']) # device publishers log.debug('Creating the router ICP on the server') self.pub = self.ctx.socket(zmq.ROUTER) self.pub.bind(DEV_IPC_URL) try: self.pub.setsockopt(zmq.HWM, self.opts['hwm']) # zmq 2 except AttributeError: # zmq 3 self.pub.setsockopt(zmq.SNDHWM, self.opts['hwm'])
def set_socket_option(self, name, option, value): """calls ``zmq.setsockopt`` on the given socket. :param name: the name of the socket where data will pad through :param option: the option from the ``zmq`` module :param value: Here are some examples of options: * ``zmq.HWM``: Set high water mark * ``zmq.AFFINITY``: Set I/O thread affinity * ``zmq.IDENTITY``: Set socket identity * ``zmq.SUBSCRIBE``: Establish message filter * ``zmq.UNSUBSCRIBE``: Remove message filter * ``zmq.SNDBUF``: Set kernel transmit buffer size * ``zmq.RCVBUF``: Set kernel receive buffer size * ``zmq.LINGER``: Set linger period for socket shutdown * ``zmq.BACKLOG``: Set maximum length of the queue of outstanding connections * for the full list go to ``http://api.zeromq.org/4-0:zmq-setsockopt`` **Example:** :: >>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.create('pipe-in', zmq.PULL) >>> >>> # block after 10 messages are queued >>> sockets.set_socket_option('pipe-in', zmq.HWM, 10) """ socket = self.get_by_name(name) socket.setsockopt(option, value)
def test_recv_during_send(self): sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ) eventlet.sleep() done = eventlet.Event() try: SNDHWM = zmq.SNDHWM except AttributeError: # ZeroMQ <3.0 SNDHWM = zmq.HWM sender.setsockopt(SNDHWM, 10) sender.setsockopt(zmq.SNDBUF, 10) receiver.setsockopt(zmq.RCVBUF, 10) def tx(): tx_i = 0 while tx_i <= 1000: sender.send(str(tx_i).encode()) tx_i += 1 done.send(0) eventlet.spawn(tx) final_i = done.wait() self.assertEqual(final_i, 0)