def unwrap(self, val): if isinstance(val, dbus.ByteArray): return "".join([str(x) for x in val]) if isinstance(val, (dbus.Array, list, tuple)): return [self.unwrap(x) for x in val] if isinstance(val, (dbus.Dictionary, dict)): return dict([(self.unwrap(x), self.unwrap(y)) for x, y in val.items()]) if isinstance(val, dbus.ObjectPath): if val.startswith('/org/freedesktop/NetworkManager/'): classname = val.split('/')[4] classname = { 'Settings': 'Connection', 'Devices': 'Device', }.get(classname, classname) return globals()[classname](val) if isinstance(val, (dbus.Signature, dbus.String)): return unicode(val) if isinstance(val, dbus.Boolean): return bool(val) if isinstance(val, (dbus.Int16, dbus.UInt16, dbus.Int32, dbus.UInt32, dbus.Int64, dbus.UInt64)): return int(val) if isinstance(val, dbus.Byte): return bytes([int(val)]) return val
def get_properties(self): properties = dict() properties['Type'] = self.ad_type if self.service_uuids is not None: properties['ServiceUUIDs'] = dbus.Array(self.service_uuids, signature='s') if self.solicit_uuids is not None: properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids, signature='s') if self.manufacturer_data is not None: properties['ManufacturerData'] = dbus.Dictionary( self.manufacturer_data, signature='qv') if self.service_data is not None: properties['ServiceData'] = dbus.Dictionary(self.service_data, signature='sv') if self.include_tx_power is not None: properties['IncludeTxPower'] = dbus.Boolean(self.include_tx_power) return {LE_ADVERTISEMENT_IFACE: properties}
def dict_to_string(self, d): # Try to trivially translate a dictionary's elements into nice string # formatting. dstr="" for key in d: val=d[key] str_val="" add_string=True if type(val)==type(dbus.Array([])): for elt in val: if type(elt)==type(dbus.Byte(1)): str_val+="%s " % int(elt) elif type(elt)==type(dbus.String("")): str_val+="%s" % elt elif type(val)==type(dbus.Dictionary({})): dstr+=self.dict_to_string(val) add_string=False else: str_val=val if add_string: dstr+="%s: %s\n" % ( key, str_val) return dstr
def Set(self, interface_name, property_name, value, *args, **kwargs): """Standard D-Bus API for setting a property value""" try: iface_props = self.props[interface_name] except KeyError: raise dbus.exceptions.DBusException( 'no such interface ' + interface_name, name=self.interface + '.UnknownInterface') if property_name not in iface_props: raise dbus.exceptions.DBusException( 'no such property ' + property_name, name=self.interface + '.UnknownProperty') iface_props[property_name] = value self.PropertiesChanged(interface_name, dbus.Dictionary({property_name: value}, signature='sv'), dbus.Array([], signature='s'))
def get_properties(self): """Return a dictionary of the service properties. The dictionary has the following keys: - UUID: the service UUID - Primary: whether the service is the primary service - Characteristics: D-Bus array of the characteristic object paths associated with the service. """ return { constants.GATT_SERVICE_IFACE: { 'UUID': self.uuid, 'Primary': self.primary, 'Characteristics': dbus.Array( self.get_characteristic_paths(), signature='o') } }
def get_properties(self): """Return a dictionary of the characteristic properties. The dictionary has the following keys: - Service: the characteristic's service - UUID: the characteristic UUID - Flags: any characteristic flags - Descriptors: D-Bus array of the descriptor object paths associated with the characteristic. """ return { constants.GATT_CHRC_IFACE: { 'Service': self.service.get_path(), 'UUID': self.uuid, 'Flags': self.flags, 'Descriptors': dbus.Array( self.get_descriptor_paths(), signature='o') } }
def read_raw_value(self, flags=''): """ Return this characteristic's value (if allowed). :param flags: "offset": Start offset "device": Device path (Server only) :return: Possible Errors: org.bluez.Error.Failed org.bluez.Error.InProgress org.bluez.Error.NotPermitted org.bluez.Error.InvalidValueLength org.bluez.Error.NotAuthorized org.bluez.Error.NotSupported """ return self.characteristic_methods.ReadValue(dbus.Array())
def convert(dbus_obj): """Converts dbus_obj from dbus type to python type. :param dbus_obj: dbus object. :returns: dbus_obj in python type. """ _isinstance = partial(isinstance, dbus_obj) ConvertType = namedtuple('ConvertType', 'pytype dbustypes') pyint = ConvertType(int, (dbus.Byte, dbus.Int16, dbus.Int32, dbus.Int64, dbus.UInt16, dbus.UInt32, dbus.UInt64)) pybool = ConvertType(bool, (dbus.Boolean, )) pyfloat = ConvertType(float, (dbus.Double, )) pylist = ConvertType(lambda _obj: list(map(convert, dbus_obj)), (dbus.Array, )) pytuple = ConvertType(lambda _obj: tuple(map(convert, dbus_obj)), (dbus.Struct, )) types_str = (dbus.ObjectPath, dbus.Signature, dbus.String) pystr = ConvertType(str, types_str) pydict = ConvertType( lambda _obj: dict(list(zip(list(map(convert, dbus_obj.keys())), list(map(convert, dbus_obj.values())) )) ), (dbus.Dictionary, ) ) for conv in (pyint, pybool, pyfloat, pylist, pytuple, pystr, pydict): if any(map(_isinstance, conv.dbustypes)): return conv.pytype(dbus_obj) else: return dbus_obj
def add_manufacturer_data(self, manuf_code, data): if not self.manufacturer_data: self.manufacturer_data = dbus.Dictionary({}, signature='qv') self.manufacturer_data[manuf_code] = dbus.Array(data, signature='y')
def add_service_data(self, uuid, data): if not self.service_data: self.service_data = dbus.Dictionary({}, signature='sv') self.service_data[uuid] = dbus.Array(data, signature='y')
def get_properties(self): return { GATT_SERVICE_IFACE: { 'UUID': self.uuid, 'Primary': self.primary, 'Characteristics': dbus.Array( self.get_characteristic_paths(), signature='o') } }
def get_properties(self): return { GATT_CHRC_IFACE: { 'Service': self.service.get_path(), 'UUID': self.uuid, 'Flags': self.flags, 'Descriptors': dbus.Array( self.get_descriptor_paths(), signature='o') } }
def convert(dbus_obj): """Converts dbus_obj from dbus type to python type. :param dbus_obj: dbus object. :returns: dbus_obj in python type. """ _isinstance = partial(isinstance, dbus_obj) ConvertType = namedtuple('ConvertType', 'pytype dbustypes') pyint = ConvertType(int, (dbus.Byte, dbus.Int16, dbus.Int32, dbus.Int64, dbus.UInt16, dbus.UInt32, dbus.UInt64)) pybool = ConvertType(bool, (dbus.Boolean, )) pyfloat = ConvertType(float, (dbus.Double, )) pylist = ConvertType(lambda _obj: list(map(convert, dbus_obj)), (dbus.Array, )) pytuple = ConvertType(lambda _obj: tuple(map(convert, dbus_obj)), (dbus.Struct, )) types_str = (dbus.ObjectPath, dbus.Signature, dbus.String) pystr = ConvertType(str, types_str) pydict = ConvertType( lambda _obj: dict(zip(map(convert, dbus_obj.keys()), map(convert, dbus_obj.values()) ) ), (dbus.Dictionary, ) ) for conv in (pyint, pybool, pyfloat, pylist, pytuple, pystr, pydict): if any(map(_isinstance, conv.dbustypes)): return conv.pytype(dbus_obj) else: return dbus_obj
def base_to_python(val): if isinstance(val, dbus.ByteArray): return "".join([str(x) for x in val]) if isinstance(val, (dbus.Array, list, tuple)): return [fixups.base_to_python(x) for x in val] if isinstance(val, (dbus.Dictionary, dict)): return dict([(fixups.base_to_python(x), fixups.base_to_python(y)) for x, y in val.items()]) if isinstance(val, dbus.ObjectPath): for obj in (NetworkManager, Settings, AgentManager): if val == obj.object_path: return obj if val.startswith('/org/freedesktop/NetworkManager/'): classname = val.split('/')[4] classname = { 'Settings': 'Connection', 'Devices': 'Device', }.get(classname, classname) try: return globals()[classname](val) except ObjectVanished: return None if val == '/': return None if isinstance(val, (dbus.Signature, dbus.String)): return six.text_type(val) if isinstance(val, dbus.Boolean): return bool(val) if isinstance(val, (dbus.Int16, dbus.UInt16, dbus.Int32, dbus.UInt32, dbus.Int64, dbus.UInt64)): return int(val) if isinstance(val, dbus.Byte): return six.int2byte(int(val)) return val
def test_dbus_wrap_array(self): value = [1] dbus_value = dbus_mqtt.wrap_dbus_value(value) self.assertIsInstance(dbus_value, dbus.Array) self.assertEqual(dbus.Array([dbus.Int32(1, variant_level=1)]), dbus_value)
def test_dbus_unwrap_array(self): dbus_value = dbus.Array([dbus.Int32(3, variant_level=1), dbus.Int32(7, variant_level=1)], variant_level=1) value = dbus_mqtt.unwrap_dbus_value(dbus_value) self.assertIsInstance(value, list) self.assertEqual([3, 7], value)
def test_dbus_unwrap_empty_array(self): dbus_value = dbus.Array([], variant_level=1) value = dbus_mqtt.unwrap_dbus_value(dbus_value) self.assertIsNone(value)
def set_WFDIEs(self, parameter): self.__set_property(dbus.String("WFDIEs"), dbus.Array(parameter, "y"))
def temperature_cb(self): reading = [get_cpu_temperature()] print('Getting new temperature', reading, self.props[constants.GATT_CHRC_IFACE]['Notifying']) self.props[constants.GATT_CHRC_IFACE]['Value'] = reading self.PropertiesChanged(constants.GATT_CHRC_IFACE, {'Value': dbus.Array(cpu_temp_sint16(reading))}, []) print('Array value: ', cpu_temp_sint16(reading)) return self.props[constants.GATT_CHRC_IFACE]['Notifying']
def ReadValue(self, options): reading = [get_cpu_temperature()] self.props[constants.GATT_CHRC_IFACE]['Value'] = reading return dbus.Array( cpu_temp_sint16(self.props[constants.GATT_CHRC_IFACE]['Value']) )
def __init__(self): self.bus = dbus.SystemBus() self.app = localGATT.Application() self.srv = localGATT.Service(1, CPU_TMP_SRVC, True) self.charc = TemperatureChrc(self.srv) self.charc.service = self.srv.path cpu_format_value = dbus.Array([dbus.Byte(0x0E), dbus.Byte(0xFE), dbus.Byte(0x2F), dbus.Byte(0x27), dbus.Byte(0x01), dbus.Byte(0x00), dbus.Byte(0x00)]) self.cpu_format = localGATT.Descriptor(4, CPU_FMT_DSCP, self.charc, cpu_format_value, ['read']) self.app.add_managed_object(self.srv) self.app.add_managed_object(self.charc) self.app.add_managed_object(self.cpu_format) self.srv_mng = GATT.GattManager(adapter.list_adapters()[0]) self.srv_mng.register_application(self.app, {}) self.dongle = adapter.Adapter(adapter.list_adapters()[0]) advert = advertisement.Advertisement(1, 'peripheral') advert.service_UUIDs = [CPU_TMP_SRVC] # eddystone_data = tools.url_to_advert(WEB_BLINKT, 0x10, TX_POWER) # advert.service_data = {EDDYSTONE: eddystone_data} if not self.dongle.powered: self.dongle.powered = True ad_manager = advertisement.AdvertisingManager(self.dongle.address) ad_manager.register_advertisement(advert, {})
def Set(self, interface_name, property_name, value, *args, **kwargs): """Standard D-Bus API for setting a property value""" if property_name not in self.props[constants.GATT_CHRC_IFACE]: raise dbus.exceptions.DBusException( 'no such property ' + property_name, name=constants.GATT_CHRC_IFACE + '.UnknownProperty') self.props[constants.GATT_CHRC_IFACE][property_name] = value return self.PropertiesChanged(interface_name, dbus.Dictionary({property_name: value}, signature='sv'), dbus.Array([], signature='s'))
def get_properties(self): """Return a dictionary of the advert properties. The dictionary has the following keys: - Type: the advertisement type. - ServiceUUIDS: UUIDs of services to advertise - SolicitUUIDS: - ManufacturerData: dictionary of manufacturer data - ServiceData: dictionary of service data - IncludeTxPower: """ properties = dict() properties['Type'] = self.ad_type if self.service_uuids is not None: properties['ServiceUUIDs'] = dbus.Array(self.service_uuids, signature='s') if self.solicit_uuids is not None: properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids, signature='s') if self.manufacturer_data is not None: properties['ManufacturerData'] = dbus.Dictionary( self.manufacturer_data, signature='qay') if self.service_data is not None: properties['ServiceData'] = dbus.Dictionary(self.service_data, signature='say') if self.include_tx_power is not None: properties['IncludeTxPower'] = dbus.Boolean(self.include_tx_power) return {constants.LE_ADVERTISEMENT_IFACE: properties}
def service_data(self, data): for UUID in data: self.Set(constants.LE_ADVERTISEMENT_IFACE, 'ServiceData', {UUID: dbus.Array(data[UUID], signature='y')})
def GetAll(self, interface_name): """Return the advertisement properties. This method is registered with the D-Bus at ``org.freedesktop.DBus.Properties`` :param interface: interface to get the properties of. The interface must be ``org.bluez.LEAdvertisement1`` otherwise an exception is raised. """ if interface_name != constants.LE_ADVERTISEMENT_IFACE: raise InvalidArgsException() response = {} response['Type'] = self.props[interface_name]['Type'] if self.props[interface_name]['ServiceUUIDs'] is not None: response['ServiceUUIDs'] = dbus.Array( self.props[interface_name]['ServiceUUIDs'], signature='s') if self.props[interface_name]['ServiceData'] is not None: response['ServiceData'] = dbus.Dictionary( self.props[interface_name]['ServiceData'], signature='sv') if self.props[interface_name]['ManufacturerData'] is not None: response['ManufacturerData'] = dbus.Dictionary( self.props[interface_name]['ManufacturerData'], signature='qv') if self.props[interface_name]['SolicitUUIDs'] is not None: response['SolicitUUIDs'] = dbus.Array( self.props[interface_name]['SolicitUUIDs'], signature='s') response['IncludeTxPower'] = dbus.Boolean( self.props[interface_name]['IncludeTxPower']) return response
def write_value(self, value, flags=''): """ Write a new value to the characteristic. :param value: :param flags: :return: """ self.characteristic_methods.WriteValue(value, dbus.Array(flags))
def read_raw_value(self, flags=''): """ Issue a request to read the value of the descriptor. Returns the value if the operation was successful. :param flags: "offset": Start offset "device": Device path (Server only) :return: dbus byte array """ return self.descriptor_methods.ReadValue(dbus.Array(flags))
def write_value(self, value, flags=''): """ Issue a request to write the value of the descriptor. :param value: DBus byte array :param flags: "offset": Start offset "device": Device path (Server only) :return: """ self.descriptor_methods.WriteValue(value, dbus.Array(flags))
def _get_root_iface_properties(self): return { 'CanQuit': (True, None), 'Fullscreen': (False, None), 'CanSetFullscreen': (False, None), 'CanRaise': (False, None), # NOTE Change if adding optional track list support 'HasTrackList': (False, None), 'Identity': (IDENTITY, None), 'DesktopEntry': (DESKTOP, None), 'SupportedUriSchemes': (dbus.Array([], 's', 1), None), # NOTE Return MIME types supported by local backend if support for # reporting supported MIME types is added 'SupportedMimeTypes': (dbus.Array([], 's', 1), None), }
def get_properties(self): return { GATT_CHARACTERISTIC_IFACE: { 'Service': self.service.get_path(), 'UUID': self.uuid, 'Flags': self.flags, 'Descriptors': dbus.Array( self.get_descriptor_paths(), signature='o') } }
def get_properties(self): properties = dict() properties['Type'] = self.ad_type if self.service_uuids is not None: properties['ServiceUUIDs'] = dbus.Array(self.service_uuids, signature='s') if self.solicit_uuids is not None: properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids, signature='s') if self.manufacturer_data is not None: properties['ManufacturerData'] = dbus.Dictionary(self.manufacturer_data, signature='qv') if self.service_data is not None: properties['ServiceData'] = dbus.Dictionary(self.service_data, signature='sv') if self.include_tx_power is not None: properties['IncludeTxPower'] = dbus.Boolean(self.include_tx_power) return {LE_ADVERTISEMENT_IFACE: properties}