我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用dbus.Int32()。
def unwrap(self, val): if isinstance(val, dbus.ByteArray): return "".join([str(x) for x in val]) if isinstance(val, (dbus.Array, list, tuple)): return [self.unwrap(x) for x in val] if isinstance(val, (dbus.Dictionary, dict)): return dict([(self.unwrap(x), self.unwrap(y)) for x, y in val.items()]) if isinstance(val, dbus.ObjectPath): if val.startswith('/org/freedesktop/NetworkManager/'): classname = val.split('/')[4] classname = { 'Settings': 'Connection', 'Devices': 'Device', }.get(classname, classname) return globals()[classname](val) if isinstance(val, (dbus.Signature, dbus.String)): return unicode(val) if isinstance(val, dbus.Boolean): return bool(val) if isinstance(val, (dbus.Int16, dbus.UInt16, dbus.Int32, dbus.UInt32, dbus.Int64, dbus.UInt64)): return int(val) if isinstance(val, dbus.Byte): return bytes([int(val)]) return val
def control(self,char): val = OMXDriver.KEY_MAP[char] #logger.info('>control received and sent to omxplayer ' + str(self.pid)) if self.is_running(): try: self.__iface_player.Action(dbus.Int32(val)) except dbus.exceptions.DBusException as ex: #logger.info('Failed to send control - dbus exception: {}'.format(ex.get_dbus_message())) return else: logger.info('Failed to send control - process not running') return # USE ONLY at end and after load # return succces of the operation, several tries if pause did not work and no error reported.
def result(operation_id, code, text): """Encode a result This method encodes operation id, result code and result text into a dictionary of dbus types. @param operation_id Id of the operation for which the result is reported @param code Result code of the operation (one of SWMResult) @param text Text message @return Result dictionary using dbus types. """ if not SWMResult.isValid(code): code = SWMResult.SWM_GENERAL_ERROR return { 'id': dbus.String(operation_id, variant_level=1), 'result_code': dbus.Int32(code, variant_level=1), 'result_text': dbus.String(text, variant_level=1) }
def convert(dbus_obj): """Converts dbus_obj from dbus type to python type. :param dbus_obj: dbus object. :returns: dbus_obj in python type. """ _isinstance = partial(isinstance, dbus_obj) ConvertType = namedtuple('ConvertType', 'pytype dbustypes') pyint = ConvertType(int, (dbus.Byte, dbus.Int16, dbus.Int32, dbus.Int64, dbus.UInt16, dbus.UInt32, dbus.UInt64)) pybool = ConvertType(bool, (dbus.Boolean, )) pyfloat = ConvertType(float, (dbus.Double, )) pylist = ConvertType(lambda _obj: list(map(convert, dbus_obj)), (dbus.Array, )) pytuple = ConvertType(lambda _obj: tuple(map(convert, dbus_obj)), (dbus.Struct, )) types_str = (dbus.ObjectPath, dbus.Signature, dbus.String) pystr = ConvertType(str, types_str) pydict = ConvertType( lambda _obj: dict(list(zip(list(map(convert, dbus_obj.keys())), list(map(convert, dbus_obj.values())) )) ), (dbus.Dictionary, ) ) for conv in (pyint, pybool, pyfloat, pylist, pytuple, pystr, pydict): if any(map(_isinstance, conv.dbustypes)): return conv.pytype(dbus_obj) else: return dbus_obj
def convert(dbus_obj): """Converts dbus_obj from dbus type to python type. :param dbus_obj: dbus object. :returns: dbus_obj in python type. """ _isinstance = partial(isinstance, dbus_obj) ConvertType = namedtuple('ConvertType', 'pytype dbustypes') pyint = ConvertType(int, (dbus.Byte, dbus.Int16, dbus.Int32, dbus.Int64, dbus.UInt16, dbus.UInt32, dbus.UInt64)) pybool = ConvertType(bool, (dbus.Boolean, )) pyfloat = ConvertType(float, (dbus.Double, )) pylist = ConvertType(lambda _obj: list(map(convert, dbus_obj)), (dbus.Array, )) pytuple = ConvertType(lambda _obj: tuple(map(convert, dbus_obj)), (dbus.Struct, )) types_str = (dbus.ObjectPath, dbus.Signature, dbus.String) pystr = ConvertType(str, types_str) pydict = ConvertType( lambda _obj: dict(zip(map(convert, dbus_obj.keys()), map(convert, dbus_obj.values()) ) ), (dbus.Dictionary, ) ) for conv in (pyint, pybool, pyfloat, pylist, pytuple, pystr, pydict): if any(map(_isinstance, conv.dbustypes)): return conv.pytype(dbus_obj) else: return dbus_obj
def base_to_python(val): if isinstance(val, dbus.ByteArray): return "".join([str(x) for x in val]) if isinstance(val, (dbus.Array, list, tuple)): return [fixups.base_to_python(x) for x in val] if isinstance(val, (dbus.Dictionary, dict)): return dict([(fixups.base_to_python(x), fixups.base_to_python(y)) for x, y in val.items()]) if isinstance(val, dbus.ObjectPath): for obj in (NetworkManager, Settings, AgentManager): if val == obj.object_path: return obj if val.startswith('/org/freedesktop/NetworkManager/'): classname = val.split('/')[4] classname = { 'Settings': 'Connection', 'Devices': 'Device', }.get(classname, classname) try: return globals()[classname](val) except ObjectVanished: return None if val == '/': return None if isinstance(val, (dbus.Signature, dbus.String)): return six.text_type(val) if isinstance(val, dbus.Boolean): return bool(val) if isinstance(val, (dbus.Int16, dbus.UInt16, dbus.Int32, dbus.UInt32, dbus.Int64, dbus.UInt64)): return int(val) if isinstance(val, dbus.Byte): return six.int2byte(int(val)) return val
def test_dbus_wrap_int(self): value = 1121 dbus_value = dbus_mqtt.wrap_dbus_value(value) self.assertIsInstance(dbus_value, dbus.Int32) self.assertEqual(dbus.Int32(value, variant_level=1), dbus_value)
def test_dbus_wrap_array(self): value = [1] dbus_value = dbus_mqtt.wrap_dbus_value(value) self.assertIsInstance(dbus_value, dbus.Array) self.assertEqual(dbus.Array([dbus.Int32(1, variant_level=1)]), dbus_value)
def test_dbus_wrap_dict(self): value = {'a' : 3, 'b': 7.0} dbus_value = dbus_mqtt.wrap_dbus_value(value) print(dbus_value) self.assertIsInstance(dbus_value, dbus.Dictionary) self.assertEqual(dbus.Dictionary({ dbus.String('a', variant_level=1): dbus.Int32(3, variant_level=1), dbus.String('b', variant_level=1): dbus.Double(7.0, variant_level=1)}, variant_level=1), dbus_value)
def test_dbus_unwrap_int32(self): dbus_value = dbus.Int32(123, variant_level=1) value = dbus_mqtt.unwrap_dbus_value(dbus_value) self.assertIsInstance(value, int) self.assertEqual(int(dbus_value), value)
def test_dbus_unwrap_int32(self): dbus_value = dbus.Int32(3323213, variant_level=1) value = dbus_mqtt.unwrap_dbus_value(dbus_value) self.assertIsInstance(value, int) self.assertEqual(int(dbus_value), value)
def set_scan_interval(self, value): return self.__set_property("ScanInterval", dbus.Int32(value))