我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.NOBLOCK。
def test_root_topic(self): logger, handler, sub = self.connect_handler() handler.socket.bind(self.iface) sub2 = sub.context.socket(zmq.SUB) self.sockets.append(sub2) sub2.connect(self.iface) sub2.setsockopt(zmq.SUBSCRIBE, b'') handler.root_topic = b'twoonly' msg1 = 'ignored' logger.info(msg1) self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK) topic,msg2 = sub2.recv_multipart() self.assertEqual(topic, b'twoonly.INFO') self.assertEqual(msg2, b(msg1)+b'\n') logger.removeHandler(handler)
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'): self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB, in_prefix, out_prefix) alice = self.context.socket(zmq.PAIR) bob = self.context.socket(zmq.PAIR) mon = self.context.socket(zmq.SUB) aport = alice.bind_to_random_port('tcp://127.0.0.1') bport = bob.bind_to_random_port('tcp://127.0.0.1') mport = mon.bind_to_random_port('tcp://127.0.0.1') mon.setsockopt(zmq.SUBSCRIBE, mon_sub) self.device.connect_in("tcp://127.0.0.1:%i"%aport) self.device.connect_out("tcp://127.0.0.1:%i"%bport) self.device.connect_mon("tcp://127.0.0.1:%i"%mport) self.device.start() time.sleep(.2) try: # this is currenlty necessary to ensure no dropped monitor messages # see LIBZMQ-248 for more info mon.recv_multipart(zmq.NOBLOCK) except zmq.ZMQError: pass self.sockets.extend([alice, bob, mon]) return alice, bob, mon
def _handle_recv(self): """Handle a recv event.""" if self._flushed: return try: msg = self.socket.recv_multipart(zmq.NOBLOCK, copy=self._recv_copy) except zmq.ZMQError as e: if e.errno == zmq.EAGAIN: # state changed since poll event pass else: gen_log.error("RECV Error: %s"%zmq.strerror(e.errno)) else: if self._recv_callback: callback = self._recv_callback # self._recv_callback = None self._run_callback(callback, msg) # self.update_state()
def shutdown(self): """Shut down paired listener with <END> signal.""" if hasattr(self, 'socket'): try: self.socket.send_unicode('<END>', zmq.NOBLOCK) except zmq.error.ZMQError: # may need to listen first try: self.socket.recv_unicode(zmq.NOBLOCK) self.socket.send_unicode('<END>', zmq.NOBLOCK) except zmq.error.ZMQError: # paired process is probably dead already pass if hasattr(self, 'process'): # try to let the subprocess clean up, but don't wait too long try: self.process.communicate(timeout=1) except subprocess.TimeoutExpired: self.process.kill()
def send(self, message, send_more=False, block=True, as_json=False): flags = 0 if send_more: flags = zmq.SNDMORE if not block: flags = flags | zmq.NOBLOCK try: if as_json: self.socket.send_json(message, flags) else: self.socket.send(message, flags, copy=self.zmq_copy, track=self.zmq_track) except zmq.Again as e: if not block: pass else: raise e except zmq.ZMQError as e: logger.error(sys.exc_info()[1]) raise e
def get_command(self): """Attempt to return a unicode object from the command socket If no message is available without blocking (as opposed to a blank message), return None """ try: message_bytes = self.socket.recv(zmq.NOBLOCK) log.debug("Received message: %r", message_bytes) except zmq.ZMQError as exc: if exc.errno == zmq.EAGAIN: return None else: raise else: return message_bytes.decode(config.CODEC)
def poll_command_request(self): """If the command RPC socket has an incoming request, separate it into its action and its params and put it on the command request queue. """ try: message = self.rpc.recv(zmq.NOBLOCK) except zmq.ZMQError as exc: if exc.errno == zmq.EAGAIN: return else: raise _logger.debug("Received command %s", message) segments = _unpack(message) action, params = segments[0], segments[1:] _logger.debug("Adding %s, %s to the request queue", action, params) self._command = _Command(action, params)
def generator_from_zmq_pull(context, host): socket = context.socket(zmq.PULL) # TODO: Configure socket with clean properties to avoid message overload. if host.endswith('/'): host = host[:-1] print_item("+", "Binding ZMQ pull socket : " + colorama.Fore.CYAN + "{0}".format(host) + colorama.Style.RESET_ALL) socket.bind(host) while True: try: message = socket.recv(flags=zmq.NOBLOCK) except zmq.Again as e: message = None if message is None: yield None # NOTE: We have to make the generator non blocking. else: task = json.loads(message) yield task
def run_arduino_bridge(self): """ start the bridge :return: """ while True: if self.last_problem: self.report_problem() # noinspection PyBroadException try: z = self.subscriber.recv_multipart(zmq.NOBLOCK) self.payload = umsgpack.unpackb(z[1]) # print("[%s] %s" % (z[0], self.payload)) command = self.payload['command'] if command in self.command_dict: self.command_dict[command]() else: print("can't execute unknown command'") self.board.sleep(.001) except zmq.error.Again: self.board.sleep(.001) # return
def receive_loop(self): """ This is the receive loop for zmq messages It is assumed that this method will be overwritten to meet the needs of the application and to handle received messages. :return: """ while True: try: data = self.subscriber.recv_multipart(zmq.NOBLOCK) self.incoming_message_processing(data[0].decode(), umsgpack.unpackb(data[1])) self.board.sleep(.01) except zmq.error.Again: try: self.board.sleep(.01) except: self.clean_up() except KeyboardInterrupt: self.clean_up() # noinspection PyMethodMayBeStatic
def receive_loop(self): """ This is the receive loop for zmq messages. It is assumed that this method will be overwritten to meet the needs of the application and to handle received messages. :return: """ while True: try: data = self.subscriber.recv_multipart(zmq.NOBLOCK) self.incoming_message_processing(data[0].decode(), umsgpack.unpackb(data[1])) time.sleep(.001) except zmq.error.Again: time.sleep(.001) except KeyboardInterrupt: self.clean_up() # noinspection PyMethodMayBeStatic
def run_raspberry_bridge(self): # self.pi.set_mode(11, pigpio.INPUT) # cb1 = self.pi.callback(11, pigpio.EITHER_EDGE, self.cbf) while True: if self.last_problem: self.report_problem() # noinspection PyBroadException try: z = self.subscriber.recv_multipart(zmq.NOBLOCK) self.payload = umsgpack.unpackb(z[1]) command = self.payload['command'] if command in self.command_dict: self.command_dict[command]() else: print("can't execute unknown command'") # time.sleep(.001) except KeyboardInterrupt: self.cleanup() sys.exit(0) except zmq.error.Again: time.sleep(.001)
def _fetch_messages(self): """ Get an input message from the socket """ try: [_, msg] = self.socket.recv_multipart(flags=zmq.NOBLOCK) if Global.CONFIG_MANAGER.tracing_mode: Global.LOGGER.debug("fetched a new message") self.fetched = self.fetched + 1 obj = pickle.loads(msg) self._deliver_message(obj) return obj except zmq.error.Again: return None except Exception as new_exception: Global.LOGGER.error(new_exception) raise new_exception
def _main_do_control_send(self): nr_scheduled = self._control_mqueue.qsize() nr_done = 0 for i in range(nr_scheduled): job = self._control_mqueue.get() if job.identifier is not None: rc = utils.router_send_json(job.sock, job.identifier, job.payload, flag=zmq.NOBLOCK) else: rc = utils.req_send_json(job.sock, job.payload, flag=zmq.NOBLOCK) if not rc: if job.countdown > 0: self._control_mqueue.put(ControlMessage(job[0], job[1], job[2], job.countdown - 1)) else: nr_done += 1 return nr_done
def _send_raw(self, serialized): self.create_socket() self._socket.send_string(serialized, zmq.NOBLOCK) poller = zmq.Poller() poller.register(self._socket, zmq.POLLIN) if poller.poll(self._timeout * 1000): msg = self._socket.recv() self.on_message(msg) self.cleanup_socket() else: self._transport.log("Peer " + self._address + " timed out.") self.cleanup_socket() self._transport.remove_peer(self._address)
def get_messages(self, timeout=0.1, count=1): started = time() sleep_time = timeout / 10.0 while count: try: msg = self.subscriber.recv_multipart(copy=True, flags=zmq.NOBLOCK) except zmq.Again: if time() - started > timeout: break sleep(sleep_time) else: partition_seqno, global_seqno = unpack(">II", msg[2]) seqno = global_seqno if self.count_global else partition_seqno if not self.counter: self.counter = seqno elif self.counter != seqno: if self.seq_warnings: self.logger.warning("Sequence counter mismatch: expected %d, got %d. Check if system " "isn't missing messages." % (self.counter, seqno)) self.counter = None yield msg[1] count -= 1 if self.counter: self.counter += 1 self.stats[self.stat_key] += 1
def _get_data(self, blocking=True): """Get batch of data.""" # TODO complete docstring. if not blocking: try: batch = self.socket.recv(flags=zmq.NOBLOCK) except zmq.Again: return None else: batch = self.socket.recv() if batch == TERM_MSG: raise EOCError() if self.structure == 'array': batch = numpy.fromstring(batch, dtype=self.dtype) batch = numpy.reshape(batch, self.shape) elif self.structure == 'dict': batch = json.loads(batch) elif self.structure == 'boolean': batch = bool(batch) return batch
def _receiveFromListener(self, quota) -> int: """ Receives messages from listener :param quota: number of messages to receive :return: number of received messages """ assert quota i = 0 while i < quota: try: ident, msg = self.listener.recv_multipart(flags=zmq.NOBLOCK) if not msg: # Router probing sends empty message on connection continue i += 1 if self.onlyListener and ident not in self.remotesByKeys: self.peersWithoutRemotes.add(ident) self._verifyAndAppend(msg, ident) except zmq.Again: break if i > 0: logger.trace('{} got {} messages through listener'. format(self, i)) return i
def run(self): while not self._terminate: connection_message = None try: connection_message = self._socket.recv_multipart(zmq.NOBLOCK) [tag, json_message] = connection_message message = json.loads(json_message.decode('utf8')) if tag == b"queryResponse": self._bus.resolve_response(message) else: handler_thread = ZMQHandlerThread(self._bus, tag.decode('utf-8'), message) handler_thread.start() #time.sleep(0.001) except zmq.ZMQError as e: if e.errno == zmq.EAGAIN: time.sleep(.001) pass elif e.errno == zmq.ETERM: #print("terminate", self._address) self._terminate = True else: print("message zmq exception:", self._address, e, e.errno) except Exception as e: print("message exception:", self._address, e, connection_message) #print("message thread terminated:", self._address)
def send(output): zmq_ctx = zmq.Context() c = zmq_ctx.socket(zmq.PUB) c.connect(output) while True: frame = (output).encode() c.send(frame, zmq.NOBLOCK) time.sleep(0.1)
def send(self, frame): """ passing the zmq frame to the output's connection """ self.connection.send(frame, zmq.NOBLOCK)
def receive_loop(self): """ This is the receive loop for Banyan messages. This method may be overwritten to meet the needs of the application before handling received messages. """ while True: try: data = self.subscriber.recv_multipart(zmq.NOBLOCK) if self.numpy: payload = msgpack.unpackb(data[1], object_hook=m.decode) self.incoming_message_processing(data[0].decode(), payload) else: self.incoming_message_processing(data[0].decode(), umsgpack.unpackb(data[1])) # if no messages are available, zmq throws this exception except zmq.error.Again: try: time.sleep(self.loop_time) except KeyboardInterrupt: self.clean_up() raise KeyboardInterrupt
def send(self, data, flags=0, copy=True, track=False): """send, which will only block current greenlet state_changed always fires exactly once (success or fail) at the end of this method. """ # if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised if flags & zmq.NOBLOCK: try: msg = super(_Socket, self).send(data, flags, copy, track) finally: if not self.__in_send_multipart: self.__state_changed() return msg # ensure the zmq.NOBLOCK flag is part of flags flags |= zmq.NOBLOCK while True: # Attempt to complete this operation indefinitely, blocking the current greenlet try: # attempt the actual call msg = super(_Socket, self).send(data, flags, copy, track) except zmq.ZMQError as e: # if the raised ZMQError is not EAGAIN, reraise if e.errno != zmq.EAGAIN: if not self.__in_send_multipart: self.__state_changed() raise else: if not self.__in_send_multipart: self.__state_changed() return msg # defer to the event loop until we're notified the socket is writable self._wait_write()
def recv(self, flags=0, copy=True, track=False): """recv, which will only block current greenlet state_changed always fires exactly once (success or fail) at the end of this method. """ if flags & zmq.NOBLOCK: try: msg = super(_Socket, self).recv(flags, copy, track) finally: if not self.__in_recv_multipart: self.__state_changed() return msg flags |= zmq.NOBLOCK while True: try: msg = super(_Socket, self).recv(flags, copy, track) except zmq.ZMQError as e: if e.errno != zmq.EAGAIN: if not self.__in_recv_multipart: self.__state_changed() raise else: if not self.__in_recv_multipart: self.__state_changed() return msg self._wait_read()
def test_topic(self): s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB) s2.setsockopt(zmq.SUBSCRIBE, b'x') time.sleep(0.1) msg1 = b'message' s1.send(msg1) self.assertRaisesErrno(zmq.EAGAIN, s2.recv, zmq.NOBLOCK) msg1 = b'xmessage' s1.send(msg1) msg2 = s2.recv() self.assertEqual(msg1, msg2)
def test_again(self): s = self.context.socket(zmq.REP) self.assertRaises(Again, s.recv, zmq.NOBLOCK) self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK) s.close()
def start_listener(self): print('ZMQ listener started') while True: try: self.s.recv(zmq.NOBLOCK) # note NOBLOCK here except zmq.Again: # no message to recv, do other things time.sleep(0.05) else: self.on_q.put(ON_SIGNAL)
def header(self): flags = 0 if self.block else zmq.NOBLOCK self.raw_header = self.socket.recv(flags=flags) return json.loads(self.raw_header.decode("utf-8"))
def next(self, as_json=False): try: if self.raw_header: raw = self.raw_header self.raw_header = None else: flags = 0 if self.block else zmq.NOBLOCK raw = self.socket.recv(flags=flags, copy=self.zmq_copy, track=self.zmq_track) self.statistics.bytes_received += len(raw) if as_json: return json.loads(raw.decode("utf-8")) return raw except zmq.ZMQError: return None
def flush(self, success=True): flags = 0 if self.block else zmq.NOBLOCK # Clear remaining sub-messages while self.has_more(): try: self.socket.recv(flags=flags, copy=self.zmq_copy, track=self.zmq_track) logger.info('Skipping sub-message') except zmq.ZMQError: pass if success: # Update statistics self.statistics.total_bytes_received += self.statistics.bytes_received self.statistics.bytes_received = 0 self.statistics.messages_received += 1
def recv_messages(zmq_subscriber, timeout_count, message_count): """Test utility function. Subscriber thread that receives and counts ZMQ messages. Args: zmq_subscriber (zmq.Socket): ZMQ subscriber socket. timeout_count (int): No. of failed receives until exit. message_count (int): No. of messages expected to be received. Returns: (int) Number of messages received. """ # pylint: disable=E1101 fails = 0 # No. of receives that didn't return a message. receive_count = 0 # Total number of messages received. while fails < timeout_count: try: _ = zmq_subscriber.recv_string(flags=zmq.NOBLOCK) fails = 0 receive_count += 1 if receive_count == message_count: break except zmq.ZMQError as error: if error.errno == zmq.EAGAIN: pass else: raise fails += 1 time.sleep(1e-6) return receive_count
def run(self): """Run loop. Receives log messages from connected publishers and logs them via a python logging interface. """ log = logging.getLogger('sip.logging_aggregator') fail_count = 0 fail_count_limit = 100 # Exponential relaxation of timeout in event loop. timeout = np.logspace(-6, -2, fail_count_limit) while not self._stop_requested.is_set(): try: topic, values = self._subscriber.recv_multipart(zmq.NOBLOCK) str_values = values.decode('utf-8') try: dict_values = json.loads(str_values) record = logging.makeLogRecord(dict_values) log.handle(record) fail_count = 0 except json.decoder.JSONDecodeError: print('ERROR: Unable to convert JSON log record.') raise except zmq.ZMQError as e: if e.errno == zmq.EAGAIN: fail_count += 1 else: raise # Re-raise the exception if fail_count < fail_count_limit: _timeout = timeout[fail_count] else: _timeout = timeout[-1] self._stop_requested.wait(_timeout)
def receive(self): """ Reception and pyobj de-serialization of one message. """ return self.socket.recv_pyobj(zmq.NOBLOCK)
def send_check_address(self, address_name): """ Send request to check address. """ self.logger.trace('send CHECK_ADDRESS {}'.format(address_name)) try: self.socket.send_pyobj((DeferredRequestHeaders.CHECK_ADDRESS, (address_name, )), zmq.NOBLOCK) except zmq.error.Again: self.logger.error('CHECK_ADDRESS not sent')
def send_isolate_addresses(self, address_names): """ Send request to isolate address. """ self.logger.trace('send ISOLATE_ADDRESSES {}'.format(address_names)) try: self.socket.send_pyobj((DeferredRequestHeaders.ISOLATE_ADDRESSES, address_names), zmq.NOBLOCK) except zmq.error.Again: self.logger.error('ISOLATE_ADDRESSES not sent')