我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用dbus.Byte()。
def unwrap(self, val): if isinstance(val, dbus.ByteArray): return "".join([str(x) for x in val]) if isinstance(val, (dbus.Array, list, tuple)): return [self.unwrap(x) for x in val] if isinstance(val, (dbus.Dictionary, dict)): return dict([(self.unwrap(x), self.unwrap(y)) for x, y in val.items()]) if isinstance(val, dbus.ObjectPath): if val.startswith('/org/freedesktop/NetworkManager/'): classname = val.split('/')[4] classname = { 'Settings': 'Connection', 'Devices': 'Device', }.get(classname, classname) return globals()[classname](val) if isinstance(val, (dbus.Signature, dbus.String)): return unicode(val) if isinstance(val, dbus.Boolean): return bool(val) if isinstance(val, (dbus.Int16, dbus.UInt16, dbus.Int32, dbus.UInt32, dbus.Int64, dbus.UInt64)): return int(val) if isinstance(val, dbus.Byte): return bytes([int(val)]) return val
def hr_msrmt_cb(self): value = [] value.append(dbus.Byte(0x06)) value.append(dbus.Byte(randint(90, 130))) if self.hr_ee_count % 10 == 0: value[0] = dbus.Byte(value[0] | 0x08) value.append(dbus.Byte(self.service.energy_expended & 0xff)) value.append(dbus.Byte((self.service.energy_expended >> 8) & 0xff)) self.service.energy_expended = \ min(0xffff, self.service.energy_expended + 1) self.hr_ee_count += 1 print('Updating value: ' + repr(value)) self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, []) return self.notifying
def dict_to_string(self, d): # Try to trivially translate a dictionary's elements into nice string # formatting. dstr="" for key in d: val=d[key] str_val="" add_string=True if type(val)==type(dbus.Array([])): for elt in val: if type(elt)==type(dbus.Byte(1)): str_val+="%s " % int(elt) elif type(elt)==type(dbus.String("")): str_val+="%s" % elt elif type(val)==type(dbus.Dictionary({})): dstr+=self.dict_to_string(val) add_string=False else: str_val=val if add_string: dstr+="%s: %s\n" % ( key, str_val) return dstr
def write_value(self, value, offset=0): """ Attempts to write a value to the characteristic. Success or failure will be notified by calls to `write_value_succeeded` or `write_value_failed` respectively. :param value: array of bytes to be written :param offset: offset from where to start writing the bytes (defaults to 0) """ bytes = [dbus.Byte(b) for b in value] try: self._object.WriteValue( bytes, {'offset': dbus.UInt16(offset, variant_level=1)}, reply_handler=self._write_value_succeeded, error_handler=self._write_value_failed, dbus_interface='org.bluez.GattCharacteristic1') except dbus.exceptions.DBusException as e: self._write_value_failed(self, error=e)
def send_notify_event(self, value): """Send a notification event. :param value: the value that the characteristic is to be set to. This function sets the characteristic value, and if the characteristic is set to notify emits a PropertiesChanged() signal with the new value. """ # print('send', self, value) self.value = value if not self.notifying: print('Not notifying') return # print('Update prop') self.PropertiesChanged( constants.GATT_CHRC_IFACE, {'Value': [dbus.Byte(self.value)]}, []) #################### # Descriptor Classes ####################
def convert(dbus_obj): """Converts dbus_obj from dbus type to python type. :param dbus_obj: dbus object. :returns: dbus_obj in python type. """ _isinstance = partial(isinstance, dbus_obj) ConvertType = namedtuple('ConvertType', 'pytype dbustypes') pyint = ConvertType(int, (dbus.Byte, dbus.Int16, dbus.Int32, dbus.Int64, dbus.UInt16, dbus.UInt32, dbus.UInt64)) pybool = ConvertType(bool, (dbus.Boolean, )) pyfloat = ConvertType(float, (dbus.Double, )) pylist = ConvertType(lambda _obj: list(map(convert, dbus_obj)), (dbus.Array, )) pytuple = ConvertType(lambda _obj: tuple(map(convert, dbus_obj)), (dbus.Struct, )) types_str = (dbus.ObjectPath, dbus.Signature, dbus.String) pystr = ConvertType(str, types_str) pydict = ConvertType( lambda _obj: dict(list(zip(list(map(convert, dbus_obj.keys())), list(map(convert, dbus_obj.values())) )) ), (dbus.Dictionary, ) ) for conv in (pyint, pybool, pyfloat, pylist, pytuple, pystr, pydict): if any(map(_isinstance, conv.dbustypes)): return conv.pytype(dbus_obj) else: return dbus_obj
def ssid_to_dbus(ssid): if isinstance(ssid, unicode): ssid = ssid.encode('utf-8') return [dbus.Byte(x) for x in ssid]
def mac_to_dbus(mac): return [dbus.Byte(int(x, 16)) for x in mac.split(':')]
def set_hint_byte(self, key, value): """Set a hint with a dbus byte value. The input value can be an integer or a bytes string of length 1. """ self.hints[key] = dbus.Byte(value)
def notify_battery_level(self): if not self.notifying: return self.PropertiesChanged( GATT_CHRC_IFACE, {'Value': [dbus.Byte(self.battery_lvl)] }, [])
def ReadValue(self, options): print('Battery level read: ' + repr(self.battery_lvl)) return [dbus.Byte(self.battery_lvl)]
def ReadValue(self, options): return [ dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t') ]
def convert(dbus_obj): """Converts dbus_obj from dbus type to python type. :param dbus_obj: dbus object. :returns: dbus_obj in python type. """ _isinstance = partial(isinstance, dbus_obj) ConvertType = namedtuple('ConvertType', 'pytype dbustypes') pyint = ConvertType(int, (dbus.Byte, dbus.Int16, dbus.Int32, dbus.Int64, dbus.UInt16, dbus.UInt32, dbus.UInt64)) pybool = ConvertType(bool, (dbus.Boolean, )) pyfloat = ConvertType(float, (dbus.Double, )) pylist = ConvertType(lambda _obj: list(map(convert, dbus_obj)), (dbus.Array, )) pytuple = ConvertType(lambda _obj: tuple(map(convert, dbus_obj)), (dbus.Struct, )) types_str = (dbus.ObjectPath, dbus.Signature, dbus.String) pystr = ConvertType(str, types_str) pydict = ConvertType( lambda _obj: dict(zip(map(convert, dbus_obj.keys()), map(convert, dbus_obj.values()) ) ), (dbus.Dictionary, ) ) for conv in (pyint, pybool, pyfloat, pylist, pytuple, pystr, pydict): if any(map(_isinstance, conv.dbustypes)): return conv.pytype(dbus_obj) else: return dbus_obj
def base_to_python(val): if isinstance(val, dbus.ByteArray): return "".join([str(x) for x in val]) if isinstance(val, (dbus.Array, list, tuple)): return [fixups.base_to_python(x) for x in val] if isinstance(val, (dbus.Dictionary, dict)): return dict([(fixups.base_to_python(x), fixups.base_to_python(y)) for x, y in val.items()]) if isinstance(val, dbus.ObjectPath): for obj in (NetworkManager, Settings, AgentManager): if val == obj.object_path: return obj if val.startswith('/org/freedesktop/NetworkManager/'): classname = val.split('/')[4] classname = { 'Settings': 'Connection', 'Devices': 'Device', }.get(classname, classname) try: return globals()[classname](val) except ObjectVanished: return None if val == '/': return None if isinstance(val, (dbus.Signature, dbus.String)): return six.text_type(val) if isinstance(val, dbus.Boolean): return bool(val) if isinstance(val, (dbus.Int16, dbus.UInt16, dbus.Int32, dbus.UInt32, dbus.Int64, dbus.UInt64)): return int(val) if isinstance(val, dbus.Byte): return six.int2byte(int(val)) return val
def ssid_to_dbus(ssid): if isinstance(ssid, six.text_type): ssid = ssid.encode('utf-8') return [dbus.Byte(x) for x in ssid]
def cert_to_dbus(cert): if not isinstance(cert, bytes): if not cert.startswith('file://'): cert = 'file://' + cert cert = cert.encode('utf-8') + b'\0' return [dbus.Byte(x) for x in cert] # Turn NetworkManager and Settings into singleton objects
def test_dbus_unwrap_byte(self): dbus_value = dbus.Byte(245, variant_level=1) value = dbus_mqtt.unwrap_dbus_value(dbus_value) self.assertIsInstance(value, int) self.assertEqual(int(dbus_value), value)
def cpu_temp_sint16(value): answer = [] value_int16 = sint16(value[0]) for bytes in value_int16: answer.append(dbus.Byte(bytes)) return answer
def __init__(self): self.bus = dbus.SystemBus() self.app = localGATT.Application() self.srv = localGATT.Service(1, CPU_TMP_SRVC, True) self.charc = TemperatureChrc(self.srv) self.charc.service = self.srv.path cpu_format_value = dbus.Array([dbus.Byte(0x0E), dbus.Byte(0xFE), dbus.Byte(0x2F), dbus.Byte(0x27), dbus.Byte(0x01), dbus.Byte(0x00), dbus.Byte(0x00)]) self.cpu_format = localGATT.Descriptor(4, CPU_FMT_DSCP, self.charc, cpu_format_value, ['read']) self.app.add_managed_object(self.srv) self.app.add_managed_object(self.charc) self.app.add_managed_object(self.cpu_format) self.srv_mng = GATT.GattManager(adapter.list_adapters()[0]) self.srv_mng.register_application(self.app, {}) self.dongle = adapter.Adapter(adapter.list_adapters()[0]) advert = advertisement.Advertisement(1, 'peripheral') advert.service_UUIDs = [CPU_TMP_SRVC] # eddystone_data = tools.url_to_advert(WEB_BLINKT, 0x10, TX_POWER) # advert.service_data = {EDDYSTONE: eddystone_data} if not self.dongle.powered: self.dongle.powered = True ad_manager = advertisement.AdvertisingManager(self.dongle.address) ad_manager.register_advertisement(advert, {})
def ReadValue(self, options): """Return the characteristic value. This method is registered with the D-Bus at ``org.bluez.GattCharacteristic1``. """ # print('Reading Characteristic', self.value) if self.value is None: self.value = 0 return [dbus.Byte(self.value)]
def set_connection_state(self, state, current_ssid): self.state = state self.current_ssid = current_ssid if self.is_notifying: logger.info("Sending updated connection state") if self.state != WifiConnectionState.DISCONNECTED and self.current_ssid: self.value_update([dbus.Byte(self.state.value)] + string_to_dbus_array(self.current_ssid)) else: self.value_update([dbus.Byte(self.state.value)])
def _read_value(self, options): logger.info("Read Connection State Value") if self.state != WifiConnectionState.DISCONNECTED and self.current_ssid: return [dbus.Byte(self.state.value)] + string_to_dbus_array(self.current_ssid) else: return [dbus.Byte(self.state.value)]
def string_to_dbus_array(value): return [dbus.Byte(c) for c in value.encode()]