我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用zmq.utils()。
def recv_json(self, flags=0, **kwargs): """receive a Python object as a message using json to serialize Keyword arguments are passed on to json.loads Parameters ---------- flags : int Any valid recv flag. Returns ------- obj : Python object The Python object that arrives as a message. """ from zmq.utils import jsonapi msg = self.recv(flags) return jsonapi.loads(msg, **kwargs)
def test_utils(self): """test util imports""" import zmq.utils from zmq.utils import strtypes from zmq.utils import jsonapi
def shadow(cls, address): """Shadow an existing libzmq socket address is the integer address of the libzmq socket or an FFI pointer to it. .. versionadded:: 14.1 """ from zmq.utils.interop import cast_int_addr address = cast_int_addr(address) return cls(shadow=address) #------------------------------------------------------------------------- # Deprecated aliases #-------------------------------------------------------------------------
def send_json(self, obj, flags=0, **kwargs): """send a Python object as a message using json to serialize Keyword arguments are passed on to json.dumps Parameters ---------- obj : Python object The Python object to send flags : int Any valid send flag """ from zmq.utils import jsonapi msg = jsonapi.dumps(obj, **kwargs) return self.send(msg, flags)
def main(argv=sys.argv): '''Main method called by the eggsecutable.''' utils.default_main(AFDDAgent, description='VOLTTRON platform™ AFDD agent', argv=argv)
def test(): from volttron.platform.agent import periodic def TestAgent(config_path, **kwargs): config = utils.load_config(config_path) agent_id = config['agentid'] rtu_path = dict((key, config[key]) for key in ['campus', 'building', 'unit']) class Agent(PublishMixin, BaseAgent): def setup(self): super(Agent, self).setup() self.damper = 0 @matching.match_regex(topics.ACTUATOR_LOCK_ACQUIRE() + '(/.*)') def on_lock_result(self, topic, headers, message, match): _log.debug("Topic: {topic}, {headers}, Message: {message}".format( topic=topic, headers=headers, message=message)) self.publish(topics.ACTUATOR_LOCK_RESULT() + match.group(0), headers, jsonapi.dumps('SUCCESS')) @matching.match_regex(topics.ACTUATOR_SET() + '(/.*/([^/]+))') def on_new_data(self, topic, headers, message, match): _log.debug("Topic: {topic}, {headers}, Message: {message}".format( topic=topic, headers=headers, message=message)) if match.group(2) == 'Damper': self.damper = int(message[0]) self.publish(topics.ACTUATOR_VALUE() + match.group(0), headers, message[0]) @periodic(5) def send_data(self): data = { 'ReturnAirTemperature': 55, 'OutsideAirTemperature': 50, 'MixedAirTemperature': 45, 'Damper': self.damper } self.publish_ex(topics.DEVICES_VALUE(point='all', **rtu_path), {}, ('application/json', jsonapi.dumps(data))) Agent.__name__ = 'TestAgent' return Agent(**kwargs) settings.afdd2_seconds_to_steady_state = 3 settings.sync_trial_time = 10 t = threading.Thread(target=utils.default_main, args=(TestAgent, 'test')) t.daemon = True t.start() time.sleep(2) main()