Python zmq 模块,Again() 实例源码

我们从Python开源项目中,提取了以下38个代码示例,用于说明如何使用zmq.Again()

项目:mflow    作者:datastreaming    | 项目源码 | 文件源码
def start(self, socket):
        """
        Start the monitoring thread and socket.
        :param socket: Socket to monitor.
        """
        # Start a thread only if it is not already running.
        if self.monitor_listening.is_set():
            return

        # Setup monitor socket.
        monitor_socket = socket.get_monitor_socket(events=self.events)
        monitor_socket.setsockopt(zmq.RCVTIMEO, self.receive_timeout)
        self.monitor_listening.set()

        def event_listener(monitor_listening):
            while monitor_listening.is_set():
                try:
                    event = recv_monitor_message(monitor_socket)
                    # The socket is closed, just stop listening now.
                    if event["event"] == zmq.EVENT_CLOSED:
                        monitor_listening.clear()

                    self._notify_listeners(event)
                # In case the receive cannot be completed before the timeout.
                except zmq.Again:
                    # Heartbeat for listeners - we do not need an additional thread for time based listeners.
                    self._notify_listeners(None)

            # Cleanup monitor socket.
            socket.disable_monitor()
            monitor_socket.close()

        self.monitor_thread = threading.Thread(target=event_listener, args=(self.monitor_listening,))
        # In case someone does not call disconnect, this will stop the thread anyway.
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
项目:mflow    作者:datastreaming    | 项目源码 | 文件源码
def send(self, message, send_more=False, block=True, as_json=False):

        flags = 0
        if send_more:
            flags = zmq.SNDMORE
        if not block:
            flags = flags | zmq.NOBLOCK

        try:
            if as_json:
                self.socket.send_json(message, flags)
            else:
                self.socket.send(message, flags, copy=self.zmq_copy, track=self.zmq_track)
        except zmq.Again as e:
            if not block:
                pass
            else:
                raise e
        except zmq.ZMQError as e:
            logger.error(sys.exc_info()[1])
            raise e
项目:CellsCycle    作者:AQuadroTeam    | 项目源码 | 文件源码
def forward(self, data):

        try:
            # self.logger.debug('sending message')
            self.list_communication_channel.send(data)
            # self.logger.debug('ok with the message')
        except zmq.NotDone:
            # time.sleep(TRY_TIMEOUT)
            self.logger.debug('my recipient is dead, not done')
            self.list_communication_channel.close()
        except zmq.Again:
            self.logger.debug('my recipient is dead')
            # self.list_communication_channel.close()
            raise zmq.Again
        except zmq.ZMQError as a:
            self.logger.debug("Error in message forward " + a.strerror)
            self.context.destroy()
            self.context = zmq.Context()
项目:supvisors    作者:julien6387    | 项目源码 | 文件源码
def test_disconnection(self):
        """ Test the disconnection of subscribers. """
        from supvisors.utils import InternalEventHeaders
        # get the local address
        local_address = self.supvisors.address_mapper.local_address
        # test remote disconnection
        address = next(address
                       for address in self.supvisors.address_mapper.addresses
                       if address != local_address)
        self.subscriber.disconnect([address])
        # send a tick event from the local publisher
        payload = {'date': 1000}
        self.publisher.send_tick_event(payload)
        # check the reception of the tick event
        msg = self.receive('Tick')
        self.assertTupleEqual((InternalEventHeaders.TICK,
                               local_address, payload), msg)
        # test local disconnection
        self.subscriber.disconnect([local_address])
        # send a tick event from the local publisher
        self.publisher.send_tick_event(payload)
        # check the non-reception of the tick event
        with self.assertRaises(zmq.Again):
            self.subscriber.receive()
项目:og-miner    作者:opendns    | 项目源码 | 文件源码
def generator_from_zmq_pull(context, host):
    socket = context.socket(zmq.PULL)
    # TODO: Configure socket with clean properties to avoid message overload.
    if host.endswith('/'):
        host = host[:-1]
    print_item("+", "Binding ZMQ pull socket : " + colorama.Fore.CYAN + "{0}".format(host) + colorama.Style.RESET_ALL)
    socket.bind(host)

    while True:
        try:
            message = socket.recv(flags=zmq.NOBLOCK)
        except zmq.Again as e:
            message = None
        if message is None:
            yield None # NOTE: We have to make the generator non blocking.
        else:
            task = json.loads(message)
            yield task
项目:frontera-docs-zh_CN    作者:xsren    | 项目源码 | 文件源码
def get_messages(self, timeout=0.1, count=1):
        started = time()
        sleep_time = timeout / 10.0
        while count:
            try:
                msg = self.subscriber.recv_multipart(copy=True, flags=zmq.NOBLOCK)
            except zmq.Again:
                if time() - started > timeout:
                    break
                sleep(sleep_time)
            else:
                partition_seqno, global_seqno = unpack(">II", msg[2])
                seqno = global_seqno if self.count_global else partition_seqno
                if not self.counter:
                    self.counter = seqno
                elif self.counter != seqno:
                    if self.seq_warnings:
                        self.logger.warning("Sequence counter mismatch: expected %d, got %d. Check if system "
                                            "isn't missing messages." % (self.counter, seqno))
                    self.counter = None
                yield msg[1]
                count -= 1
                if self.counter:
                    self.counter += 1
                self.stats[self.stat_key] += 1
项目:spyking-circus-ort    作者:spyking-circus    | 项目源码 | 文件源码
def _get_data(self, blocking=True):
        """Get batch of data."""
        # TODO complete docstring.

        if not blocking:
            try:
                batch = self.socket.recv(flags=zmq.NOBLOCK)
            except zmq.Again:
                return None
        else:
            batch = self.socket.recv()

        if batch == TERM_MSG:
            raise EOCError()

        if self.structure == 'array':
            batch = numpy.fromstring(batch, dtype=self.dtype)
            batch = numpy.reshape(batch, self.shape)
        elif self.structure == 'dict':
            batch = json.loads(batch)
        elif self.structure == 'boolean':
            batch = bool(batch)

        return batch
项目:indy-plenum    作者:hyperledger    | 项目源码 | 文件源码
def _receiveFromListener(self, quota) -> int:
        """
        Receives messages from listener
        :param quota: number of messages to receive
        :return: number of received messages
        """
        assert quota
        i = 0
        while i < quota:
            try:
                ident, msg = self.listener.recv_multipart(flags=zmq.NOBLOCK)
                if not msg:
                    # Router probing sends empty message on connection
                    continue
                i += 1
                if self.onlyListener and ident not in self.remotesByKeys:
                    self.peersWithoutRemotes.add(ident)
                self._verifyAndAppend(msg, ident)
            except zmq.Again:
                break
        if i > 0:
            logger.trace('{} got {} messages through listener'.
                         format(self, i))
        return i
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_retry_recv(self):
        pull = self.socket(zmq.PULL)
        pull.rcvtimeo = self.timeout_ms
        self.alarm()
        self.assertRaises(zmq.Again, pull.recv)
        assert self.timer_fired
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_retry_send(self):
        push = self.socket(zmq.PUSH)
        push.sndtimeo = self.timeout_ms
        self.alarm()
        self.assertRaises(zmq.Again, push.send, b('buf'))
        assert self.timer_fired
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_again(self):
        s = self.context.socket(zmq.REP)
        self.assertRaises(Again, s.recv, zmq.NOBLOCK)
        self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
        s.close()
项目:YubiGuard    作者:pykong    | 项目源码 | 文件源码
def start_listener(self):
        print('ZMQ listener started')
        while True:
            try:
                self.s.recv(zmq.NOBLOCK)  # note NOBLOCK here
            except zmq.Again:
                # no message to recv, do other things
                time.sleep(0.05)
            else:
                self.on_q.put(ON_SIGNAL)
项目:CellsCycle    作者:AQuadroTeam    | 项目源码 | 文件源码
def client_behavior(settings, logger):

    internal_channel = InternalChannel(addr="127.0.0.1", port=settings.getIntPort(), logger=logger)

    try:
        internal_channel.generate_internal_channel_client_side()
    except ZMQError as e:
        logger.debug(e)

    message = Message()
    message.priority = ALIVE
    message.source_flag = INT
    message.source_id = '1'
    message.target_id = '1'
    message.target_addr = '192.168.1.1'
    message.target_key = '{}:{}'.format(0, 19)

    internal_channel.send_first_internal_channel_message(dumps(message))

    msg = internal_channel.wait_int_message(dont_wait=False)

    logger.debug("msg : " + msg)

    external_channel = ExternalChannel(addr="127.0.0.1", port=settings.getExtPort(), logger=logger)
    external_channel.generate_external_channel_client_side()
    external_channel.external_channel_subscribe()

    logger.debug(loads(external_channel.wait_ext_message()).printable_message())

    logger.debug("try_to_connect TEST COMPLETED")

    stop = False
    while not stop:
        try:
            logger.debug(loads(external_channel.wait_ext_message()).printable_message())
            sleep(1)
        except Again:
            logger.debug("my master is DEAD")
            stop = True
项目:CellsCycle    作者:AQuadroTeam    | 项目源码 | 文件源码
def server_behavior(settings, logger):

    internal_channel = InternalChannel(addr="127.0.0.1", port=settings.getIntPort(), logger=logger)

    try:
        internal_channel.generate_internal_channel_server_side()
        msg = loads(internal_channel.wait_int_message(dont_wait=False))
        logger.debug("msg : ")
        logger.debug(msg.printable_message())
        internal_channel.reply_to_int_message(OK)
    except ZMQError as e:
        logger.debug(e)

    external_channel = ExternalChannel(addr="127.0.0.1", port=settings.getExtPort(), logger=logger)

    external_channel.generate_external_channel_server_side()
    external_channel.external_channel_publish()

    message = Message()
    message.priority = ALIVE
    message.source_flag = EXT
    message.source_id = '1'
    message.target_id = '1'
    message.target_addr = '192.168.1.1'
    message.target_key = '{}:{}'.format(0, 19)

    sleep(1)

    external_channel.forward(dumps(message))

    logger.debug("try_to_connect TEST COMPLETED")

    stop = False
    while not stop:
        try:
            external_channel.forward(dumps(message))
            sleep(1)
        except zmq.Again:
            stop = True
项目:CellsCycle    作者:AQuadroTeam    | 项目源码 | 文件源码
def wait_int_message(self, dont_wait=True):
        if dont_wait:
            # wait for internal message
            try:
                msg = self.list_communication_channel.recv(zmq.DONTWAIT)
                return msg
            except zmq.Again:
                raise zmq.Again
        else:
            self.logger.debug('waiting for a request')
            msg = self.list_communication_channel.recv()
            return msg
项目:supvisors    作者:julien6387    | 项目源码 | 文件源码
def receive(self, event_type):
        """ This method performs a checked reception on the subscriber. """
        try:
            self.subscriber.socket.poll(1000)
            return self.subscriber.receive()
        except zmq.Again:
            self.fail('Failed to get {} event'.format(event_type))
项目:supvisors    作者:julien6387    | 项目源码 | 文件源码
def receive(self, event_type):
        """ This method performs a checked reception on the puller. """
        try:
            return self.puller.receive()
        except zmq.Again:
            self.fail('Failed to get {} request'. format(event_type))
项目:supvisors    作者:julien6387    | 项目源码 | 文件源码
def check_reception(self, header=None, data=None):
        """ The method tests that the message is received correctly
        or not received at all. """
        if header and data:
            # check that subscriber receives the message
            try:
                msg = self.subscriber.receive()
            except zmq.Again:
                self.fail('Failed to get {} status'.format(header))
            self.assertTupleEqual((header, data), msg)
        else:
            # check the non-reception of the Supvisors status
            with self.assertRaises(zmq.Again):
                self.subscriber.receive()
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def _send_and_receive(self, message):
    """Sending payloads to NM and returning Response instances.

    Or, if the action failed, an error will be raised during the instantiation
    of the Response.  Can also timeout if the socket receives no data for some
    period.

    Args:
      message: dict of a message to send to NM

    Returns:
      Response instance if the request succeeded

    Raises:
      TimeoutError: if nothing is received for the timeout
    """
    # zmq is thread unsafe: if we send a second request before
    # we get back the first response, we throw an exception
    # fix that -kheimerl
    with self.lock:
      # Send the message and poll for responses.
      self.socket.send(json.dumps(message))
      responses = self.socket.poll(timeout=self.socket_timeout * 1000)
      if responses:
        try:
          raw_response_data = self.socket.recv()
          return Response(raw_response_data)
        except zmq.Again:
          pass
      # If polling fails or recv failes, we reset the socket or
      # it will be left in a bad state, waiting for a response.
      self.socket.close()
      self.setup_socket()
      self.socket.connect(self.address)
      raise TimeoutError('did not receive a response')
项目:bqueryd    作者:visualfabriq    | 项目源码 | 文件源码
def handle_in(self):
        try:
            tmp = self.socket.recv_multipart()
        except zmq.Again:
            return
        if len(tmp) != 2:
            self.logger.critical('Received a msg with len != 2, something seriously wrong. ')
            return

        sender, msg_buf = tmp
        msg = msg_factory(msg_buf)

        data = self.controllers.get(sender)
        if not data:
            self.logger.critical('Received a msg from %s - this is an unknown sender' % sender)
            return
        data['last_seen'] = time.time()
        # self.logger.debug('Received from %s' % sender)

        # TODO Notify Controllers that we are busy, no more messages to be sent
        # The above busy notification is not perfect as other messages might be on their way already
        # but for long-running queries it will at least ensure other controllers
        # don't try and overuse this node by filling up a queue
        busy_msg = BusyMessage()
        self.send_to_all(busy_msg)

        try:
            tmp = self.handle(msg)
        except Exception, e:
            tmp = ErrorMessage(msg)
            tmp['payload'] = traceback.format_exc()
            self.logger.exception(tmp['payload'])
        if tmp:
            self.send(sender, tmp)

        self.send_to_all(DoneMessage())  # Send a DoneMessage to all controllers, this flags you as 'Done'. Duh
项目:og-miner    作者:opendns    | 项目源码 | 文件源码
def run(self):
        while True:
            try:
                message = self.pull.recv(flags=zmq.NOBLOCK)
            except zmq.Again as e:
                message = None
            if message is not None:
                task = json.loads(message)
                self.redis.setex(
                    task['transaction'],
                    self.result_expiration,
                    json.dumps(task['data'])
                )
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_again(self):
        s = self.context.socket(zmq.REP)
        self.assertRaises(Again, s.recv, zmq.NOBLOCK)
        self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
        s.close()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_again(self):
        s = self.context.socket(zmq.REP)
        self.assertRaises(Again, s.recv, zmq.NOBLOCK)
        self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
        s.close()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_again(self):
        s = self.context.socket(zmq.REP)
        self.assertRaises(Again, s.recv, zmq.NOBLOCK)
        self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
        s.close()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_again(self):
        s = self.context.socket(zmq.REP)
        self.assertRaises(Again, s.recv, zmq.NOBLOCK)
        self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
        s.close()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_again(self):
        s = self.context.socket(zmq.REP)
        self.assertRaises(Again, s.recv, zmq.NOBLOCK)
        self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
        s.close()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def send_raw_msg (self, msg):

        tries = 0
        while True:
            try:
                self.socket.send(msg)
                break
            except zmq.Again:
                tries += 1
                if tries > 5:
                    self.disconnect()
                    return RC_ERR("*** [RPC] - Failed to send message to server")


        tries = 0
        while True:
            try:
                response = self.socket.recv()
                break
            except zmq.Again:
                tries += 1
                if tries > 5:
                    self.disconnect()
                    return RC_ERR("*** [RPC] - Failed to get server response at {0}".format(self.transport))


        return response



    # processs a single response from server
项目:napalm-logs    作者:napalm-automation    | 项目源码 | 文件源码
def receive(self):
        '''
        Return the message received.

        ..note::
            In ZMQ we are unable to get the address where we got the message from.
        '''
        try:
            msg = self.sub.recv()
        except zmq.Again as error:
            log.error('Unable to receive messages: %s', error, exc_info=True)
            raise ListenerException(error)
        log.debug('[%s] Received %s', time.time(), msg)
        return msg, ''
项目:lewis    作者:DMSC-Instrument-Data    | 项目源码 | 文件源码
def test_process_does_not_block(self):
        mock_socket = Mock()
        mock_socket.recv_unicode.side_effect = zmq.Again()

        server = ControlServer(None, connection_string='127.0.0.1:10000')
        server._socket = mock_socket
        assertRaisesNothing(self, server.process)

        mock_socket.recv_unicode.assert_has_calls([call(flags=zmq.NOBLOCK)])
项目:lewis    作者:DMSC-Instrument-Data    | 项目源码 | 文件源码
def process(self, blocking=False):
        """
        Each time this method is called, the socket tries to retrieve data and passes
        it to the JSONRPCResponseManager, which in turn passes the RPC to the
        ExposedObjectCollection.

        In case no data are available, the method does nothing. This behavior is required for
        Lewis where everything is running in one thread. The central loop can call process
        at some point to process remote calls, so the RPC-server does not introduce its own
        infinite processing loop.

        If the server has not been started yet (via :meth:`start_server`), a RuntimeError
        is raised.

        :param blocking: If True, this function will block until it has received data or a timeout
                         is triggered. Default is False to preserve behavior of prior versions.
        """
        if self._socket is None:
            raise RuntimeError('The server has not been started yet, use start_server to do so.')

        try:
            request = self._socket.recv_unicode(flags=zmq.NOBLOCK if not blocking else 0)

            self.log.debug('Got request %s', request)

            try:
                response = JSONRPCResponseManager.handle(request, self._exposed_object)
                self._socket.send_unicode(response.json)

                self.log.debug('Sent response %s', response.json)
            except TypeError as e:
                self._socket.send_json(
                    self._unhandled_exception_response(json.loads(request)['id'], e))
        except zmq.Again:
            pass
项目:idasec    作者:RobinDavid    | 项目源码 | 文件源码
def receive_message(socket, blocking=True):
        flags = 0 if blocking else zmq.NOBLOCK
        try:
            cmd, data = socket.recv_multipart(flags=flags)
            return cmd, data
        except zmq.Again:
            return None, None
        except zmq.ContextTerminated:
            print("Context terminated ..")
            return None, None
        except KeyboardInterrupt:
            return None, None
项目:indy-plenum    作者:hyperledger    | 项目源码 | 文件源码
def _receiveFromRemotes(self, quotaPerRemote) -> int:
        """
        Receives messages from remotes
        :param quotaPerRemote: number of messages to receive from one remote
        :return: number of received messages
        """

        assert quotaPerRemote
        totalReceived = 0
        for ident, remote in self.remotesByKeys.items():
            if not remote.socket:
                continue
            i = 0
            sock = remote.socket
            while i < quotaPerRemote:
                try:
                    msg, = sock.recv_multipart(flags=zmq.NOBLOCK)
                    if not msg:
                        # Router probing sends empty message on connection
                        continue
                    i += 1
                    self._verifyAndAppend(msg, ident)
                except zmq.Again:
                    break
            if i > 0:
                logger.trace('{} got {} messages through remote {}'.
                             format(self, i, remote))
            totalReceived += i
        return totalReceived
项目:indy-plenum    作者:hyperledger    | 项目源码 | 文件源码
def transmit(self, msg, uid, timeout=None, serialized=False):
        remote = self.remotes.get(uid)
        err_str = None
        if not remote:
            logger.debug("Remote {} does not exist!".format(uid))
            return False, err_str
        socket = remote.socket
        if not socket:
            logger.debug('{} has uninitialised socket '
                         'for remote {}'.format(self, uid))
            return False, err_str
        try:
            if not serialized:
                msg = self.prepare_to_send(msg)
            # socket.send(self.signedMsg(msg), flags=zmq.NOBLOCK)
            socket.send(msg, flags=zmq.NOBLOCK)
            logger.debug('{} transmitting message {} to {}'
                         .format(self, msg, uid))
            if not remote.isConnected and msg not in self.healthMessages:
                logger.debug('Remote {} is not connected - '
                             'message will not be sent immediately.'
                             'If this problem does not resolve itself - '
                             'check your firewall settings'.format(uid))
            return True, err_str
        except zmq.Again:
            logger.debug(
                '{} could not transmit message to {}'.format(self, uid))
        except InvalidMessageExceedingSizeException as ex:
            err_str = '{}Cannot transmit message. Error {}'.format(
                CONNECTION_PREFIX, ex)
            logger.error(err_str)
        return False, err_str
项目:indy-plenum    作者:hyperledger    | 项目源码 | 文件源码
def transmitThroughListener(self, msg, ident) -> Tuple[bool, Optional[str]]:
        if isinstance(ident, str):
            ident = ident.encode()
        if ident not in self.peersWithoutRemotes:
            logger.debug('{} not sending message {} to {}'.
                         format(self, msg, ident))
            logger.debug("This is a temporary workaround for not being able to "
                         "disconnect a ROUTER's remote")
            return False, None
        try:
            msg = self.prepare_to_send(msg)
            # noinspection PyUnresolvedReferences
            # self.listener.send_multipart([ident, self.signedMsg(msg)],
            #                              flags=zmq.NOBLOCK)
            logger.trace('{} transmitting {} to {} through listener socket'.
                         format(self, msg, ident))
            self.listener.send_multipart([ident, msg], flags=zmq.NOBLOCK)
            return True, None
        except zmq.Again:
            return False, None
        except InvalidMessageExceedingSizeException as ex:
            err_str = '{}Cannot transmit message. Error {}'.format(
                CONNECTION_PREFIX, ex)
            logger.error(err_str)
            return False, err_str
        except Exception as e:
            err_str = '{}{} got error {} while sending through listener to {}'\
                .format(CONNECTION_PREFIX, self, e, ident)
            logger.error(err_str)
            return False, err_str
        return True, None
项目:mflow    作者:datastreaming    | 项目源码 | 文件源码
def receive(self, handler=None, block=True):
        """
        :param handler:     Reference to a specific message handler function to use for interpreting
                            the message to be received
        :param block:       Blocking receive call
        :return:            Map holding the data, timestamp, data and main header
        """

        message = None
        # Set blocking flag in receiver
        self.receiver.block = block
        receive_is_successful = False

        if not handler:
            try:
                # Dynamically select handler
                htype = self.receiver.header()["htype"]
            except zmq.Again:
                # not clear if this is needed
                self.receiver.flush(receive_is_successful)
                return message
            except KeyboardInterrupt:
                raise
            except:
                logger.exception('Unable to read header - skipping')
                # Clear remaining sub-messages if exist
                self.receiver.flush(receive_is_successful)
                return message

            try:
                handler = receive_handlers[htype]
            except:
                logger.debug(sys.exc_info()[1])
                logger.warning('htype - ' + htype + ' -  not supported')

        try:
            data = handler(self.receiver)
            # as an extra safety margin
            if data:
                receive_is_successful = True
                message = Message(self.receiver.statistics, data)
        except KeyboardInterrupt:
            raise
        except:
            logger.exception('Unable to decode message - skipping')

        # Clear remaining sub-messages if exist
        self.receiver.flush(receive_is_successful)

        return message
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def _run (self):

        # socket must be created on the same thread 
        self.socket.setsockopt(zmq.SUBSCRIBE, b'')
        self.socket.setsockopt(zmq.RCVTIMEO, 5000)
        self.socket.connect(self.tr)

        got_data = False

        self.monitor.reset()


        while self.active:
            try:

                with self.monitor:
                    line = self.socket.recv_string()

                self.monitor.on_recv_msg(line)

                self.last_data_recv_ts = time.time()

                # signal once
                if not got_data:
                    self.event_handler.on_async_alive()
                    got_data = True


            # got a timeout - mark as not alive and retry
            except zmq.Again:
                # signal once
                if got_data:
                    self.event_handler.on_async_dead()
                    got_data = False

                continue

            except zmq.ContextTerminated:
                # outside thread signaled us to exit
                assert(not self.active)
                break

            msg = json.loads(line)

            name = msg['name']
            data = msg['data']
            type = msg['type']
            baseline = msg.get('baseline', False)

            self.raw_snapshot[name] = data

            self.__dispatch(name, type, data, baseline)


        # closing of socket must be from the same thread
        self.socket.close(linger = 0)
项目:LMS    作者:RRZE-HPC    | 项目源码 | 文件源码
def recv_loop(self, configfile=None):
        """
        This is the main loop receiving data and calling functions. First it calls
        the read_config function if not done previously. Afterwards it connects the
        ZeroMQ publisher.
        The reception is non-blocking. If nothing is received, the JobMonitor sleeps
        for a second. This is no problem since ZeroMQ queues the strings.
        Each loop checks whether it is time to call the update function.
        If the filter applies, it is analyzed for the status attribute and if it exists,
        the value is checked whether a function is registered for it and finally calls it.
        """
        if not self.config:
            self.read_config(configfile=configfile)
        if not self.context:
            self.connect()

        updatetime = datetime.datetime.now() + datetime.timedelta(seconds=self.interval)
        while not self.terminate:
            s = None
            try:
                s = self.socket.recv(flags=zmq.NOBLOCK)
            except zmq.Again as e:
                time.sleep(1)
            except KeyboardInterrupt:
                self.terminate = True
                pass
            if not self.terminate:
                if datetime.datetime.now() > updatetime:
                    logging.debug("Calling update function")
                    self.update()
                    updatetime = datetime.datetime.now() + datetime.timedelta(seconds=self.interval)
                if s and self._filter(s):
                    logging.debug("Received string: %s" % s)
                    m = Measurement(s)
                    if self.status_attr:
                        logging.debug("Checking status_attr: %s" % self.status_attr)
                        stat = m.get_attr(self.status_attr)
                        if stat:
                            for key in self.stat_funcs:
                                if key == stat:
                                    logging.debug("Calling %s function" % key)
                                    self.stat_funcs[key](m)
                    self.get(m)
        self.disconnect()
项目:indy-plenum    作者:hyperledger    | 项目源码 | 文件源码
def testSimpleZStacksMsgs(tdir, looper):
    names = ['Alpha', 'Beta']
    genKeys(tdir, names)
    names = ['Alpha', 'Beta']
    aseed = randomSeed()
    bseed = randomSeed()

    size = 100000
    msg = json.dumps({'random': randomSeed(size).decode()}).encode()

    def aHandler(m):
        str_m = "{}".format(m)
        print('{} printing... {}'.format(names[0], str_m[:100]))
        d, _ = m
        print('Message size is {}'.format(len(d['random'])))
        assert len(d['random']) == size

    def bHandler(m):
        print(beta.msgHandler)
        a = list(beta.peersWithoutRemotes)[0]
        try:
            beta.listener.send_multipart([a, msg],
                                         flags=zmq.NOBLOCK)
        except zmq.Again:
            return False
        str_m = "{}".format(m)
        print('{} printing... {}'.format(names[1], str_m[:100]))

    stackParams = {
        "name": names[0],
        "ha": genHa(),
        "auto": 2,
        "basedirpath": tdir
    }
    alpha = SimpleZStack(stackParams, aHandler, aseed, False)

    stackParams = {
        "name": names[1],
        "ha": genHa(),
        "auto": 2,
        "basedirpath": tdir
    }
    beta = SimpleZStack(stackParams, bHandler, bseed, True)

    amotor = SMotor(alpha)
    looper.add(amotor)

    bmotor = SMotor(beta)
    looper.add(bmotor)

    alpha.connect(name=beta.name, ha=beta.ha,
                  verKeyRaw=beta.verKeyRaw, publicKeyRaw=beta.publicKeyRaw)

    looper.runFor(0.25)
    alpha.send({'greetings': 'hi'}, beta.name)
    looper.runFor(1)