我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用zmq.SNDHWM。
def get_hwm(self): """get the High Water Mark On libzmq ? 3, this gets SNDHWM if available, otherwise RCVHWM """ major = zmq.zmq_version_info()[0] if major >= 3: # return sndhwm, fallback on rcvhwm try: return self.getsockopt(zmq.SNDHWM) except zmq.ZMQError as e: pass return self.getsockopt(zmq.RCVHWM) else: return self.getsockopt(zmq.HWM)
def set_hwm(self, value): """set the High Water Mark On libzmq ? 3, this sets both SNDHWM and RCVHWM """ major = zmq.zmq_version_info()[0] if major >= 3: raised = None try: self.sndhwm = value except Exception as e: raised = e try: self.rcvhwm = value except Exception: raised = e if raised: raise raised else: return self.setsockopt(zmq.HWM, value)
def execute_command_forwarder(): from oldspeak.console.parsers.streamer import parser args = parser.parse_args(get_sub_parser_argv()) bootstrap_conf_with_gevent(args) device = Device(zmq.FORWARDER, zmq.SUB, zmq.PUB) device.bind_in(args.subscriber) device.bind_out(args.publisher) device.setsockopt_in(zmq.SUBSCRIBE, b'') if args.subscriber_hwm: device.setsockopt_in(zmq.RCVHWM, args.subscriber_hwm) if args.publisher_hwm: device.setsockopt_out(zmq.SNDHWM, args.publisher_hwm) print "oldspeak forwarder started" print "date", datetime.utcnow().isoformat() print "subscriber", (getattr(args, 'subscriber')) print "publisher", (getattr(args, 'publisher')) device.start()
def execute_command_streamer(): from oldspeak.console.parsers.streamer import parser args = parser.parse_args(get_sub_parser_argv()) bootstrap_conf_with_gevent(args) device = Device(zmq.STREAMER, zmq.PULL, zmq.PUSH) device.bind_in(args.pull) device.bind_out(args.push) if args.pull_hwm: device.setsockopt_in(zmq.RCVHWM, args.pull_hwm) if args.push_hwm: device.setsockopt_out(zmq.SNDHWM, args.push_hwm) print "oldspeak streamer started" print "date", datetime.utcnow().isoformat() print "pull", (getattr(args, 'pull')) print "push", (getattr(args, 'push')) device.start()
def set_hwm(self, value): """set the High Water Mark On libzmq ? 3, this sets both SNDHWM and RCVHWM .. warning:: New values only take effect for subsequent socket bind/connects. """ major = zmq.zmq_version_info()[0] if major >= 3: raised = None try: self.sndhwm = value except Exception as e: raised = e try: self.rcvhwm = value except Exception as e: raised = e if raised: raise raised else: return self.setsockopt(zmq.HWM, value)
def _setup_ipc(self): ''' Setup the listener ICP pusher. ''' log.debug('Setting up the listener IPC pusher') self.ctx = zmq.Context() self.pub = self.ctx.socket(zmq.PUSH) self.pub.connect(LST_IPC_URL) log.debug('Setting HWM for the listener: %d', self.opts['hwm']) try: self.pub.setsockopt(zmq.HWM, self.opts['hwm']) # zmq 2 except AttributeError: # zmq 3 self.pub.setsockopt(zmq.SNDHWM, self.opts['hwm'])
def _setup_ipc(self): ''' Subscribe to the right topic in the device IPC and publish to the publisher proxy. ''' self.ctx = zmq.Context() # subscribe to device IPC log.debug('Creating the dealer IPC for %s', self._name) self.sub = self.ctx.socket(zmq.DEALER) if six.PY2: self.sub.setsockopt(zmq.IDENTITY, self._name) elif six.PY3: self.sub.setsockopt(zmq.IDENTITY, bytes(self._name, 'utf-8')) try: self.sub.setsockopt(zmq.HWM, self.opts['hwm']) # zmq 2 except AttributeError: # zmq 3 self.sub.setsockopt(zmq.RCVHWM, self.opts['hwm']) # subscribe to the corresponding IPC pipe self.sub.connect(DEV_IPC_URL) # self.sub.setsockopt(zmq.SUBSCRIBE, '') # publish to the publisher IPC self.pub = self.ctx.socket(zmq.PUSH) self.pub.connect(PUB_IPC_URL) try: self.pub.setsockopt(zmq.HWM, self.opts['hwm']) # zmq 2 except AttributeError: # zmq 3 self.pub.setsockopt(zmq.SNDHWM, self.opts['hwm'])
def _setup_ipc(self): ''' Setup the IPC pub and sub. Subscript to the listener IPC and publish to the device specific IPC. ''' log.debug('Setting up the server IPC puller to receive from the listener') self.ctx = zmq.Context() # subscribe to listener self.sub = self.ctx.socket(zmq.PULL) self.sub.bind(LST_IPC_URL) try: self.sub.setsockopt(zmq.HWM, self.opts['hwm']) # zmq 2 except AttributeError: # zmq 3 self.sub.setsockopt(zmq.RCVHWM, self.opts['hwm']) # device publishers log.debug('Creating the router ICP on the server') self.pub = self.ctx.socket(zmq.ROUTER) self.pub.bind(DEV_IPC_URL) try: self.pub.setsockopt(zmq.HWM, self.opts['hwm']) # zmq 2 except AttributeError: # zmq 3 self.pub.setsockopt(zmq.SNDHWM, self.opts['hwm'])
def __init__(self, context, location, partitions, hwm, hostname_partitioning): super(SpiderFeedProducer, self).__init__(context, location, b'sf') self.partitioner = Crc32NamePartitioner(partitions) if hostname_partitioning else \ FingerprintPartitioner(partitions) self.sender.set(zmq.SNDHWM, hwm)
def test_recv_during_send(self): sender, receiver, port = self.create_bound_pair(zmq.XREQ, zmq.XREQ) eventlet.sleep() done = eventlet.Event() try: SNDHWM = zmq.SNDHWM except AttributeError: # ZeroMQ <3.0 SNDHWM = zmq.HWM sender.setsockopt(SNDHWM, 10) sender.setsockopt(zmq.SNDBUF, 10) receiver.setsockopt(zmq.RCVBUF, 10) def tx(): tx_i = 0 while tx_i <= 1000: sender.send(str(tx_i).encode()) tx_i += 1 done.send(0) eventlet.spawn(tx) final_i = done.wait() self.assertEqual(final_i, 0)
def zthread_fork(ctx, func, *args, **kwargs): """ Create an attached thread. An attached thread gets a ctx and a PAIR pipe back to its parent. It must monitor its pipe, and exit if the pipe becomes unreadable. Returns pipe, or NULL if there was an error. """ a = ctx.socket(zmq.PAIR) a.setsockopt(zmq.LINGER, 0) a.setsockopt(zmq.RCVHWM, 100) a.setsockopt(zmq.SNDHWM, 100) a.setsockopt(zmq.SNDTIMEO, 5000) a.setsockopt(zmq.RCVTIMEO, 5000) b = ctx.socket(zmq.PAIR) b.setsockopt(zmq.LINGER, 0) b.setsockopt(zmq.RCVHWM, 100) b.setsockopt(zmq.SNDHWM, 100) b.setsockopt(zmq.SNDTIMEO, 5000) a.setsockopt(zmq.RCVTIMEO, 5000) iface = "inproc://%s" % binascii.hexlify(os.urandom(8)) a.bind(iface) b.connect(iface) thread = threading.Thread(target=func, args=((ctx, b) + args), kwargs=kwargs) thread.daemon = False thread.start() return a
def __init__(self, bus, is_root=False): self.address = None self.process_id = None self.is_root_connection = is_root self._bus = bus self._signal_socket = self._bus._context.socket(zmq.PUB) self._lock = threading.Lock() #self._signal_socket.setsockopt(zmq.SNDHWM, 1)
def _init_zmq(self): # this is ugly but work well import zmq # tasks_or_queue only return the indices, need to get it from self._jobs def wrapped_map(pID, tasks, remain_jobs): # ====== create ZMQ socket ====== # ctx = zmq.Context() sk = ctx.socket(zmq.PAIR) sk.set(zmq.SNDHWM, self._hwm) sk.set(zmq.LINGER, -1) sk.bind("ipc:///tmp/%d" % (self._ID + pID)) # ====== Doing the jobs ====== # t = tasks.get() while t is not None: # `t` is just list of indices t = [self._jobs[i] for i in t] # monitor current number of remain jobs remain_jobs.add(-len(t)) if self._batch == 1: # batch=1, NO need for list of inputs ret = self._func(t[0]) else: # we have input is list of inputs here ret = self._func(t) # if a generator is return, traverse through the # iterator and return each result if not isinstance(ret, types.GeneratorType): ret = (ret,) for r in ret: # ignore None values if r is not None: sk.send_pyobj(r) # delete old data (this work, checked) del ret # ge tne tasks t = tasks.get() # ending signal sk.send_pyobj(None) # wait for ending message sk.recv() sk.close() ctx.term() sys.exit(0) # ====== start the processes ====== # self._processes = [Process(target=wrapped_map, args=(i, self._tasks, self._remain_jobs)) for i in range(self._ncpu)] [p.start() for p in self._processes] # ====== pyzmq PULL socket ====== # ctx = zmq.Context() sockets = [] for i in range(self._ncpu): sk = ctx.socket(zmq.PAIR) sk.set(zmq.RCVHWM, 0) # no limit receiving sk.connect("ipc:///tmp/%d" % (self._ID + i)) sockets.append(sk) self._ctx = ctx self._sockets = sockets self._zmq_noblock = zmq.NOBLOCK self._zmq_again = zmq.error.Again