Python zmq 模块,utils() 实例源码

我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用zmq.utils()

项目:zanph    作者:zanph    | 项目源码 | 文件源码
def recv_json(self, flags=0, **kwargs):
        """receive a Python object as a message using json to serialize

        Keyword arguments are passed on to json.loads

        Parameters
        ----------
        flags : int
            Any valid recv flag.

        Returns
        -------
        obj : Python object
            The Python object that arrives as a message.
        """
        from zmq.utils import jsonapi
        msg = self.recv(flags)
        return jsonapi.loads(msg, **kwargs)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_utils(self):
        """test util imports"""
        import zmq.utils
        from zmq.utils import strtypes
        from zmq.utils import jsonapi
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def shadow(cls, address):
        """Shadow an existing libzmq socket

        address is the integer address of the libzmq socket
        or an FFI pointer to it.

        .. versionadded:: 14.1
        """
        from zmq.utils.interop import cast_int_addr
        address = cast_int_addr(address)
        return cls(shadow=address)

    #-------------------------------------------------------------------------
    # Deprecated aliases
    #-------------------------------------------------------------------------
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def send_json(self, obj, flags=0, **kwargs):
        """send a Python object as a message using json to serialize

        Keyword arguments are passed on to json.dumps

        Parameters
        ----------
        obj : Python object
            The Python object to send
        flags : int
            Any valid send flag
        """
        from zmq.utils import jsonapi
        msg = jsonapi.dumps(obj, **kwargs)
        return self.send(msg, flags)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_utils(self):
        """test util imports"""
        import zmq.utils
        from zmq.utils import strtypes
        from zmq.utils import jsonapi
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_utils(self):
        """test util imports"""
        import zmq.utils
        from zmq.utils import strtypes
        from zmq.utils import jsonapi
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_utils(self):
        """test util imports"""
        import zmq.utils
        from zmq.utils import strtypes
        from zmq.utils import jsonapi
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_utils(self):
        """test util imports"""
        import zmq.utils
        from zmq.utils import strtypes
        from zmq.utils import jsonapi
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_utils(self):
        """test util imports"""
        import zmq.utils
        from zmq.utils import strtypes
        from zmq.utils import jsonapi
项目:volttron-applications    作者:VOLTTRON    | 项目源码 | 文件源码
def main(argv=sys.argv):
    '''Main method called by the eggsecutable.'''
    utils.default_main(AFDDAgent,
                       description='VOLTTRON platform™ AFDD agent',
                       argv=argv)
项目:volttron-applications    作者:VOLTTRON    | 项目源码 | 文件源码
def test():
    from volttron.platform.agent import periodic

    def TestAgent(config_path, **kwargs):
        config = utils.load_config(config_path)
        agent_id = config['agentid']
        rtu_path = dict((key, config[key])
                        for key in ['campus', 'building', 'unit'])

        class Agent(PublishMixin, BaseAgent):
            def setup(self):
                super(Agent, self).setup()
                self.damper = 0

            @matching.match_regex(topics.ACTUATOR_LOCK_ACQUIRE() + '(/.*)')
            def on_lock_result(self, topic, headers, message, match):
                _log.debug("Topic: {topic}, {headers}, Message: {message}".format(
                        topic=topic, headers=headers, message=message))
                self.publish(topics.ACTUATOR_LOCK_RESULT() + match.group(0),
                             headers, jsonapi.dumps('SUCCESS'))

            @matching.match_regex(topics.ACTUATOR_SET() + '(/.*/([^/]+))')
            def on_new_data(self, topic, headers, message, match):
                _log.debug("Topic: {topic}, {headers}, Message: {message}".format(
                        topic=topic, headers=headers, message=message))
                if match.group(2) == 'Damper':
                    self.damper = int(message[0])
                self.publish(topics.ACTUATOR_VALUE() + match.group(0),
                             headers, message[0])

            @periodic(5)
            def send_data(self):
                data = {
                    'ReturnAirTemperature': 55,
                    'OutsideAirTemperature': 50,
                    'MixedAirTemperature': 45,
                    'Damper': self.damper
                }
                self.publish_ex(topics.DEVICES_VALUE(point='all', **rtu_path),
                                {}, ('application/json', jsonapi.dumps(data)))

        Agent.__name__ = 'TestAgent'
        return Agent(**kwargs)

    settings.afdd2_seconds_to_steady_state = 3
    settings.sync_trial_time = 10
    t = threading.Thread(target=utils.default_main, args=(TestAgent, 'test'))
    t.daemon = True
    t.start()
    time.sleep(2)
    main()