我们从Python开源项目中,提取了以下38个代码示例,用于说明如何使用zmq.Again()。
def start(self, socket): """ Start the monitoring thread and socket. :param socket: Socket to monitor. """ # Start a thread only if it is not already running. if self.monitor_listening.is_set(): return # Setup monitor socket. monitor_socket = socket.get_monitor_socket(events=self.events) monitor_socket.setsockopt(zmq.RCVTIMEO, self.receive_timeout) self.monitor_listening.set() def event_listener(monitor_listening): while monitor_listening.is_set(): try: event = recv_monitor_message(monitor_socket) # The socket is closed, just stop listening now. if event["event"] == zmq.EVENT_CLOSED: monitor_listening.clear() self._notify_listeners(event) # In case the receive cannot be completed before the timeout. except zmq.Again: # Heartbeat for listeners - we do not need an additional thread for time based listeners. self._notify_listeners(None) # Cleanup monitor socket. socket.disable_monitor() monitor_socket.close() self.monitor_thread = threading.Thread(target=event_listener, args=(self.monitor_listening,)) # In case someone does not call disconnect, this will stop the thread anyway. self.monitor_thread.daemon = True self.monitor_thread.start()
def send(self, message, send_more=False, block=True, as_json=False): flags = 0 if send_more: flags = zmq.SNDMORE if not block: flags = flags | zmq.NOBLOCK try: if as_json: self.socket.send_json(message, flags) else: self.socket.send(message, flags, copy=self.zmq_copy, track=self.zmq_track) except zmq.Again as e: if not block: pass else: raise e except zmq.ZMQError as e: logger.error(sys.exc_info()[1]) raise e
def forward(self, data): try: # self.logger.debug('sending message') self.list_communication_channel.send(data) # self.logger.debug('ok with the message') except zmq.NotDone: # time.sleep(TRY_TIMEOUT) self.logger.debug('my recipient is dead, not done') self.list_communication_channel.close() except zmq.Again: self.logger.debug('my recipient is dead') # self.list_communication_channel.close() raise zmq.Again except zmq.ZMQError as a: self.logger.debug("Error in message forward " + a.strerror) self.context.destroy() self.context = zmq.Context()
def test_disconnection(self): """ Test the disconnection of subscribers. """ from supvisors.utils import InternalEventHeaders # get the local address local_address = self.supvisors.address_mapper.local_address # test remote disconnection address = next(address for address in self.supvisors.address_mapper.addresses if address != local_address) self.subscriber.disconnect([address]) # send a tick event from the local publisher payload = {'date': 1000} self.publisher.send_tick_event(payload) # check the reception of the tick event msg = self.receive('Tick') self.assertTupleEqual((InternalEventHeaders.TICK, local_address, payload), msg) # test local disconnection self.subscriber.disconnect([local_address]) # send a tick event from the local publisher self.publisher.send_tick_event(payload) # check the non-reception of the tick event with self.assertRaises(zmq.Again): self.subscriber.receive()
def generator_from_zmq_pull(context, host): socket = context.socket(zmq.PULL) # TODO: Configure socket with clean properties to avoid message overload. if host.endswith('/'): host = host[:-1] print_item("+", "Binding ZMQ pull socket : " + colorama.Fore.CYAN + "{0}".format(host) + colorama.Style.RESET_ALL) socket.bind(host) while True: try: message = socket.recv(flags=zmq.NOBLOCK) except zmq.Again as e: message = None if message is None: yield None # NOTE: We have to make the generator non blocking. else: task = json.loads(message) yield task
def get_messages(self, timeout=0.1, count=1): started = time() sleep_time = timeout / 10.0 while count: try: msg = self.subscriber.recv_multipart(copy=True, flags=zmq.NOBLOCK) except zmq.Again: if time() - started > timeout: break sleep(sleep_time) else: partition_seqno, global_seqno = unpack(">II", msg[2]) seqno = global_seqno if self.count_global else partition_seqno if not self.counter: self.counter = seqno elif self.counter != seqno: if self.seq_warnings: self.logger.warning("Sequence counter mismatch: expected %d, got %d. Check if system " "isn't missing messages." % (self.counter, seqno)) self.counter = None yield msg[1] count -= 1 if self.counter: self.counter += 1 self.stats[self.stat_key] += 1
def _get_data(self, blocking=True): """Get batch of data.""" # TODO complete docstring. if not blocking: try: batch = self.socket.recv(flags=zmq.NOBLOCK) except zmq.Again: return None else: batch = self.socket.recv() if batch == TERM_MSG: raise EOCError() if self.structure == 'array': batch = numpy.fromstring(batch, dtype=self.dtype) batch = numpy.reshape(batch, self.shape) elif self.structure == 'dict': batch = json.loads(batch) elif self.structure == 'boolean': batch = bool(batch) return batch
def _receiveFromListener(self, quota) -> int: """ Receives messages from listener :param quota: number of messages to receive :return: number of received messages """ assert quota i = 0 while i < quota: try: ident, msg = self.listener.recv_multipart(flags=zmq.NOBLOCK) if not msg: # Router probing sends empty message on connection continue i += 1 if self.onlyListener and ident not in self.remotesByKeys: self.peersWithoutRemotes.add(ident) self._verifyAndAppend(msg, ident) except zmq.Again: break if i > 0: logger.trace('{} got {} messages through listener'. format(self, i)) return i
def test_retry_recv(self): pull = self.socket(zmq.PULL) pull.rcvtimeo = self.timeout_ms self.alarm() self.assertRaises(zmq.Again, pull.recv) assert self.timer_fired
def test_retry_send(self): push = self.socket(zmq.PUSH) push.sndtimeo = self.timeout_ms self.alarm() self.assertRaises(zmq.Again, push.send, b('buf')) assert self.timer_fired
def test_again(self): s = self.context.socket(zmq.REP) self.assertRaises(Again, s.recv, zmq.NOBLOCK) self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK) s.close()
def start_listener(self): print('ZMQ listener started') while True: try: self.s.recv(zmq.NOBLOCK) # note NOBLOCK here except zmq.Again: # no message to recv, do other things time.sleep(0.05) else: self.on_q.put(ON_SIGNAL)
def client_behavior(settings, logger): internal_channel = InternalChannel(addr="127.0.0.1", port=settings.getIntPort(), logger=logger) try: internal_channel.generate_internal_channel_client_side() except ZMQError as e: logger.debug(e) message = Message() message.priority = ALIVE message.source_flag = INT message.source_id = '1' message.target_id = '1' message.target_addr = '192.168.1.1' message.target_key = '{}:{}'.format(0, 19) internal_channel.send_first_internal_channel_message(dumps(message)) msg = internal_channel.wait_int_message(dont_wait=False) logger.debug("msg : " + msg) external_channel = ExternalChannel(addr="127.0.0.1", port=settings.getExtPort(), logger=logger) external_channel.generate_external_channel_client_side() external_channel.external_channel_subscribe() logger.debug(loads(external_channel.wait_ext_message()).printable_message()) logger.debug("try_to_connect TEST COMPLETED") stop = False while not stop: try: logger.debug(loads(external_channel.wait_ext_message()).printable_message()) sleep(1) except Again: logger.debug("my master is DEAD") stop = True
def server_behavior(settings, logger): internal_channel = InternalChannel(addr="127.0.0.1", port=settings.getIntPort(), logger=logger) try: internal_channel.generate_internal_channel_server_side() msg = loads(internal_channel.wait_int_message(dont_wait=False)) logger.debug("msg : ") logger.debug(msg.printable_message()) internal_channel.reply_to_int_message(OK) except ZMQError as e: logger.debug(e) external_channel = ExternalChannel(addr="127.0.0.1", port=settings.getExtPort(), logger=logger) external_channel.generate_external_channel_server_side() external_channel.external_channel_publish() message = Message() message.priority = ALIVE message.source_flag = EXT message.source_id = '1' message.target_id = '1' message.target_addr = '192.168.1.1' message.target_key = '{}:{}'.format(0, 19) sleep(1) external_channel.forward(dumps(message)) logger.debug("try_to_connect TEST COMPLETED") stop = False while not stop: try: external_channel.forward(dumps(message)) sleep(1) except zmq.Again: stop = True
def wait_int_message(self, dont_wait=True): if dont_wait: # wait for internal message try: msg = self.list_communication_channel.recv(zmq.DONTWAIT) return msg except zmq.Again: raise zmq.Again else: self.logger.debug('waiting for a request') msg = self.list_communication_channel.recv() return msg
def receive(self, event_type): """ This method performs a checked reception on the subscriber. """ try: self.subscriber.socket.poll(1000) return self.subscriber.receive() except zmq.Again: self.fail('Failed to get {} event'.format(event_type))
def receive(self, event_type): """ This method performs a checked reception on the puller. """ try: return self.puller.receive() except zmq.Again: self.fail('Failed to get {} request'. format(event_type))
def check_reception(self, header=None, data=None): """ The method tests that the message is received correctly or not received at all. """ if header and data: # check that subscriber receives the message try: msg = self.subscriber.receive() except zmq.Again: self.fail('Failed to get {} status'.format(header)) self.assertTupleEqual((header, data), msg) else: # check the non-reception of the Supvisors status with self.assertRaises(zmq.Again): self.subscriber.receive()
def _send_and_receive(self, message): """Sending payloads to NM and returning Response instances. Or, if the action failed, an error will be raised during the instantiation of the Response. Can also timeout if the socket receives no data for some period. Args: message: dict of a message to send to NM Returns: Response instance if the request succeeded Raises: TimeoutError: if nothing is received for the timeout """ # zmq is thread unsafe: if we send a second request before # we get back the first response, we throw an exception # fix that -kheimerl with self.lock: # Send the message and poll for responses. self.socket.send(json.dumps(message)) responses = self.socket.poll(timeout=self.socket_timeout * 1000) if responses: try: raw_response_data = self.socket.recv() return Response(raw_response_data) except zmq.Again: pass # If polling fails or recv failes, we reset the socket or # it will be left in a bad state, waiting for a response. self.socket.close() self.setup_socket() self.socket.connect(self.address) raise TimeoutError('did not receive a response')
def handle_in(self): try: tmp = self.socket.recv_multipart() except zmq.Again: return if len(tmp) != 2: self.logger.critical('Received a msg with len != 2, something seriously wrong. ') return sender, msg_buf = tmp msg = msg_factory(msg_buf) data = self.controllers.get(sender) if not data: self.logger.critical('Received a msg from %s - this is an unknown sender' % sender) return data['last_seen'] = time.time() # self.logger.debug('Received from %s' % sender) # TODO Notify Controllers that we are busy, no more messages to be sent # The above busy notification is not perfect as other messages might be on their way already # but for long-running queries it will at least ensure other controllers # don't try and overuse this node by filling up a queue busy_msg = BusyMessage() self.send_to_all(busy_msg) try: tmp = self.handle(msg) except Exception, e: tmp = ErrorMessage(msg) tmp['payload'] = traceback.format_exc() self.logger.exception(tmp['payload']) if tmp: self.send(sender, tmp) self.send_to_all(DoneMessage()) # Send a DoneMessage to all controllers, this flags you as 'Done'. Duh
def run(self): while True: try: message = self.pull.recv(flags=zmq.NOBLOCK) except zmq.Again as e: message = None if message is not None: task = json.loads(message) self.redis.setex( task['transaction'], self.result_expiration, json.dumps(task['data']) )
def send_raw_msg (self, msg): tries = 0 while True: try: self.socket.send(msg) break except zmq.Again: tries += 1 if tries > 5: self.disconnect() return RC_ERR("*** [RPC] - Failed to send message to server") tries = 0 while True: try: response = self.socket.recv() break except zmq.Again: tries += 1 if tries > 5: self.disconnect() return RC_ERR("*** [RPC] - Failed to get server response at {0}".format(self.transport)) return response # processs a single response from server
def receive(self): ''' Return the message received. ..note:: In ZMQ we are unable to get the address where we got the message from. ''' try: msg = self.sub.recv() except zmq.Again as error: log.error('Unable to receive messages: %s', error, exc_info=True) raise ListenerException(error) log.debug('[%s] Received %s', time.time(), msg) return msg, ''
def test_process_does_not_block(self): mock_socket = Mock() mock_socket.recv_unicode.side_effect = zmq.Again() server = ControlServer(None, connection_string='127.0.0.1:10000') server._socket = mock_socket assertRaisesNothing(self, server.process) mock_socket.recv_unicode.assert_has_calls([call(flags=zmq.NOBLOCK)])
def process(self, blocking=False): """ Each time this method is called, the socket tries to retrieve data and passes it to the JSONRPCResponseManager, which in turn passes the RPC to the ExposedObjectCollection. In case no data are available, the method does nothing. This behavior is required for Lewis where everything is running in one thread. The central loop can call process at some point to process remote calls, so the RPC-server does not introduce its own infinite processing loop. If the server has not been started yet (via :meth:`start_server`), a RuntimeError is raised. :param blocking: If True, this function will block until it has received data or a timeout is triggered. Default is False to preserve behavior of prior versions. """ if self._socket is None: raise RuntimeError('The server has not been started yet, use start_server to do so.') try: request = self._socket.recv_unicode(flags=zmq.NOBLOCK if not blocking else 0) self.log.debug('Got request %s', request) try: response = JSONRPCResponseManager.handle(request, self._exposed_object) self._socket.send_unicode(response.json) self.log.debug('Sent response %s', response.json) except TypeError as e: self._socket.send_json( self._unhandled_exception_response(json.loads(request)['id'], e)) except zmq.Again: pass
def receive_message(socket, blocking=True): flags = 0 if blocking else zmq.NOBLOCK try: cmd, data = socket.recv_multipart(flags=flags) return cmd, data except zmq.Again: return None, None except zmq.ContextTerminated: print("Context terminated ..") return None, None except KeyboardInterrupt: return None, None
def _receiveFromRemotes(self, quotaPerRemote) -> int: """ Receives messages from remotes :param quotaPerRemote: number of messages to receive from one remote :return: number of received messages """ assert quotaPerRemote totalReceived = 0 for ident, remote in self.remotesByKeys.items(): if not remote.socket: continue i = 0 sock = remote.socket while i < quotaPerRemote: try: msg, = sock.recv_multipart(flags=zmq.NOBLOCK) if not msg: # Router probing sends empty message on connection continue i += 1 self._verifyAndAppend(msg, ident) except zmq.Again: break if i > 0: logger.trace('{} got {} messages through remote {}'. format(self, i, remote)) totalReceived += i return totalReceived
def transmit(self, msg, uid, timeout=None, serialized=False): remote = self.remotes.get(uid) err_str = None if not remote: logger.debug("Remote {} does not exist!".format(uid)) return False, err_str socket = remote.socket if not socket: logger.debug('{} has uninitialised socket ' 'for remote {}'.format(self, uid)) return False, err_str try: if not serialized: msg = self.prepare_to_send(msg) # socket.send(self.signedMsg(msg), flags=zmq.NOBLOCK) socket.send(msg, flags=zmq.NOBLOCK) logger.debug('{} transmitting message {} to {}' .format(self, msg, uid)) if not remote.isConnected and msg not in self.healthMessages: logger.debug('Remote {} is not connected - ' 'message will not be sent immediately.' 'If this problem does not resolve itself - ' 'check your firewall settings'.format(uid)) return True, err_str except zmq.Again: logger.debug( '{} could not transmit message to {}'.format(self, uid)) except InvalidMessageExceedingSizeException as ex: err_str = '{}Cannot transmit message. Error {}'.format( CONNECTION_PREFIX, ex) logger.error(err_str) return False, err_str
def transmitThroughListener(self, msg, ident) -> Tuple[bool, Optional[str]]: if isinstance(ident, str): ident = ident.encode() if ident not in self.peersWithoutRemotes: logger.debug('{} not sending message {} to {}'. format(self, msg, ident)) logger.debug("This is a temporary workaround for not being able to " "disconnect a ROUTER's remote") return False, None try: msg = self.prepare_to_send(msg) # noinspection PyUnresolvedReferences # self.listener.send_multipart([ident, self.signedMsg(msg)], # flags=zmq.NOBLOCK) logger.trace('{} transmitting {} to {} through listener socket'. format(self, msg, ident)) self.listener.send_multipart([ident, msg], flags=zmq.NOBLOCK) return True, None except zmq.Again: return False, None except InvalidMessageExceedingSizeException as ex: err_str = '{}Cannot transmit message. Error {}'.format( CONNECTION_PREFIX, ex) logger.error(err_str) return False, err_str except Exception as e: err_str = '{}{} got error {} while sending through listener to {}'\ .format(CONNECTION_PREFIX, self, e, ident) logger.error(err_str) return False, err_str return True, None
def receive(self, handler=None, block=True): """ :param handler: Reference to a specific message handler function to use for interpreting the message to be received :param block: Blocking receive call :return: Map holding the data, timestamp, data and main header """ message = None # Set blocking flag in receiver self.receiver.block = block receive_is_successful = False if not handler: try: # Dynamically select handler htype = self.receiver.header()["htype"] except zmq.Again: # not clear if this is needed self.receiver.flush(receive_is_successful) return message except KeyboardInterrupt: raise except: logger.exception('Unable to read header - skipping') # Clear remaining sub-messages if exist self.receiver.flush(receive_is_successful) return message try: handler = receive_handlers[htype] except: logger.debug(sys.exc_info()[1]) logger.warning('htype - ' + htype + ' - not supported') try: data = handler(self.receiver) # as an extra safety margin if data: receive_is_successful = True message = Message(self.receiver.statistics, data) except KeyboardInterrupt: raise except: logger.exception('Unable to decode message - skipping') # Clear remaining sub-messages if exist self.receiver.flush(receive_is_successful) return message
def _run (self): # socket must be created on the same thread self.socket.setsockopt(zmq.SUBSCRIBE, b'') self.socket.setsockopt(zmq.RCVTIMEO, 5000) self.socket.connect(self.tr) got_data = False self.monitor.reset() while self.active: try: with self.monitor: line = self.socket.recv_string() self.monitor.on_recv_msg(line) self.last_data_recv_ts = time.time() # signal once if not got_data: self.event_handler.on_async_alive() got_data = True # got a timeout - mark as not alive and retry except zmq.Again: # signal once if got_data: self.event_handler.on_async_dead() got_data = False continue except zmq.ContextTerminated: # outside thread signaled us to exit assert(not self.active) break msg = json.loads(line) name = msg['name'] data = msg['data'] type = msg['type'] baseline = msg.get('baseline', False) self.raw_snapshot[name] = data self.__dispatch(name, type, data, baseline) # closing of socket must be from the same thread self.socket.close(linger = 0)
def recv_loop(self, configfile=None): """ This is the main loop receiving data and calling functions. First it calls the read_config function if not done previously. Afterwards it connects the ZeroMQ publisher. The reception is non-blocking. If nothing is received, the JobMonitor sleeps for a second. This is no problem since ZeroMQ queues the strings. Each loop checks whether it is time to call the update function. If the filter applies, it is analyzed for the status attribute and if it exists, the value is checked whether a function is registered for it and finally calls it. """ if not self.config: self.read_config(configfile=configfile) if not self.context: self.connect() updatetime = datetime.datetime.now() + datetime.timedelta(seconds=self.interval) while not self.terminate: s = None try: s = self.socket.recv(flags=zmq.NOBLOCK) except zmq.Again as e: time.sleep(1) except KeyboardInterrupt: self.terminate = True pass if not self.terminate: if datetime.datetime.now() > updatetime: logging.debug("Calling update function") self.update() updatetime = datetime.datetime.now() + datetime.timedelta(seconds=self.interval) if s and self._filter(s): logging.debug("Received string: %s" % s) m = Measurement(s) if self.status_attr: logging.debug("Checking status_attr: %s" % self.status_attr) stat = m.get_attr(self.status_attr) if stat: for key in self.stat_funcs: if key == stat: logging.debug("Calling %s function" % key) self.stat_funcs[key](m) self.get(m) self.disconnect()
def testSimpleZStacksMsgs(tdir, looper): names = ['Alpha', 'Beta'] genKeys(tdir, names) names = ['Alpha', 'Beta'] aseed = randomSeed() bseed = randomSeed() size = 100000 msg = json.dumps({'random': randomSeed(size).decode()}).encode() def aHandler(m): str_m = "{}".format(m) print('{} printing... {}'.format(names[0], str_m[:100])) d, _ = m print('Message size is {}'.format(len(d['random']))) assert len(d['random']) == size def bHandler(m): print(beta.msgHandler) a = list(beta.peersWithoutRemotes)[0] try: beta.listener.send_multipart([a, msg], flags=zmq.NOBLOCK) except zmq.Again: return False str_m = "{}".format(m) print('{} printing... {}'.format(names[1], str_m[:100])) stackParams = { "name": names[0], "ha": genHa(), "auto": 2, "basedirpath": tdir } alpha = SimpleZStack(stackParams, aHandler, aseed, False) stackParams = { "name": names[1], "ha": genHa(), "auto": 2, "basedirpath": tdir } beta = SimpleZStack(stackParams, bHandler, bseed, True) amotor = SMotor(alpha) looper.add(amotor) bmotor = SMotor(beta) looper.add(bmotor) alpha.connect(name=beta.name, ha=beta.ha, verKeyRaw=beta.verKeyRaw, publicKeyRaw=beta.publicKeyRaw) looper.runFor(0.25) alpha.send({'greetings': 'hi'}, beta.name) looper.runFor(1)