我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用dbus.UInt32()。
def unwrap(self, val): if isinstance(val, dbus.ByteArray): return "".join([str(x) for x in val]) if isinstance(val, (dbus.Array, list, tuple)): return [self.unwrap(x) for x in val] if isinstance(val, (dbus.Dictionary, dict)): return dict([(self.unwrap(x), self.unwrap(y)) for x, y in val.items()]) if isinstance(val, dbus.ObjectPath): if val.startswith('/org/freedesktop/NetworkManager/'): classname = val.split('/')[4] classname = { 'Settings': 'Connection', 'Devices': 'Device', }.get(classname, classname) return globals()[classname](val) if isinstance(val, (dbus.Signature, dbus.String)): return unicode(val) if isinstance(val, dbus.Boolean): return bool(val) if isinstance(val, (dbus.Int16, dbus.UInt16, dbus.Int32, dbus.UInt32, dbus.Int64, dbus.UInt64)): return int(val) if isinstance(val, dbus.Byte): return bytes([int(val)]) return val
def _check_permission(self, sender, action): ''' Verifies if the specified action is permitted, and raises an AccessDeniedException if not. The caller should use ObtainAuthorization() to get permission. ''' try: if sender: kit = dbus.SystemBus().get_object('org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority') kit = dbus.Interface(kit, 'org.freedesktop.PolicyKit1.Authority') (granted, _, details) = kit.CheckAuthorization( ('system-bus-name', {'name': sender}), action, {}, dbus.UInt32(1), '', timeout=600) if not granted: raise AccessDeniedException('Session not authorized by PolicyKit') except AccessDeniedException: raise except dbus.DBusException, ex: raise AccessDeniedException(ex.message)
def add_service_type(self, type): interface, protocol, domain = ( self.interface, self.protocol, self.domain) # Are we already browsing this domain for this type? if self.already_browsing(type): return logger.info("Browsing for services of type '%s' in domain '%s' on %s.%i ..." % (type, domain, self.siocgifname(interface), protocol)) browser = self.server.ServiceBrowserNew( interface, protocol, type, domain, dbus.UInt32(0)) bus = dbus.Interface(self.system_bus.get_object( avahi.DBUS_NAME, browser), avahi.DBUS_INTERFACE_SERVICE_BROWSER) bus.connect_to_signal('ItemNew', self.service_add) bus.connect_to_signal('ItemRemove', self.service_remove) self.service_browsers[(interface, protocol, type, domain)] = bus
def RequestPasskey(self, device): print_info("RequestPasskey: {}\n".format(device)) if not self._trust(device): print_error("RequestPasskey: failed to trust\n") raise _Rejected dev_info = self._get_device_info(device) try: passkey = int(self._client.request_passkey(dev_info)) except BaseException as error: print_error("RequestPasskey: {}\n".format(error)) raise _Rejected return dbus.UInt32(passkey)
def publish(self): bus = dbus.SystemBus() server = dbus.Interface( bus.get_object( avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER ), avahi.DBUS_INTERFACE_SERVER ) g = dbus.Interface( bus.get_object( avahi.DBUS_NAME, server.EntryGroupNew() ), avahi.DBUS_INTERFACE_ENTRY_GROUP ) g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, dbus.UInt32(0), self.name, self.stype, self.domain, self.host, dbus.UInt16(self.port), self.text) g.Commit() self.group = g
def convert(dbus_obj): """Converts dbus_obj from dbus type to python type. :param dbus_obj: dbus object. :returns: dbus_obj in python type. """ _isinstance = partial(isinstance, dbus_obj) ConvertType = namedtuple('ConvertType', 'pytype dbustypes') pyint = ConvertType(int, (dbus.Byte, dbus.Int16, dbus.Int32, dbus.Int64, dbus.UInt16, dbus.UInt32, dbus.UInt64)) pybool = ConvertType(bool, (dbus.Boolean, )) pyfloat = ConvertType(float, (dbus.Double, )) pylist = ConvertType(lambda _obj: list(map(convert, dbus_obj)), (dbus.Array, )) pytuple = ConvertType(lambda _obj: tuple(map(convert, dbus_obj)), (dbus.Struct, )) types_str = (dbus.ObjectPath, dbus.Signature, dbus.String) pystr = ConvertType(str, types_str) pydict = ConvertType( lambda _obj: dict(list(zip(list(map(convert, dbus_obj.keys())), list(map(convert, dbus_obj.values())) )) ), (dbus.Dictionary, ) ) for conv in (pyint, pybool, pyfloat, pylist, pytuple, pystr, pydict): if any(map(_isinstance, conv.dbustypes)): return conv.pytype(dbus_obj) else: return dbus_obj
def addr_to_dbus(addr, family): if (family == socket.AF_INET): return dbus.UInt32(struct.unpack('I', socket.inet_pton(family, addr))[0]) else: return dbus.ByteArray(socket.inet_pton(family, addr))
def mask_to_dbus(mask): return dbus.UInt32(mask)
def convert(dbus_obj): """Converts dbus_obj from dbus type to python type. :param dbus_obj: dbus object. :returns: dbus_obj in python type. """ _isinstance = partial(isinstance, dbus_obj) ConvertType = namedtuple('ConvertType', 'pytype dbustypes') pyint = ConvertType(int, (dbus.Byte, dbus.Int16, dbus.Int32, dbus.Int64, dbus.UInt16, dbus.UInt32, dbus.UInt64)) pybool = ConvertType(bool, (dbus.Boolean, )) pyfloat = ConvertType(float, (dbus.Double, )) pylist = ConvertType(lambda _obj: list(map(convert, dbus_obj)), (dbus.Array, )) pytuple = ConvertType(lambda _obj: tuple(map(convert, dbus_obj)), (dbus.Struct, )) types_str = (dbus.ObjectPath, dbus.Signature, dbus.String) pystr = ConvertType(str, types_str) pydict = ConvertType( lambda _obj: dict(zip(map(convert, dbus_obj.keys()), map(convert, dbus_obj.values()) ) ), (dbus.Dictionary, ) ) for conv in (pyint, pybool, pyfloat, pylist, pytuple, pystr, pydict): if any(map(_isinstance, conv.dbustypes)): return conv.pytype(dbus_obj) else: return dbus_obj
def base_to_python(val): if isinstance(val, dbus.ByteArray): return "".join([str(x) for x in val]) if isinstance(val, (dbus.Array, list, tuple)): return [fixups.base_to_python(x) for x in val] if isinstance(val, (dbus.Dictionary, dict)): return dict([(fixups.base_to_python(x), fixups.base_to_python(y)) for x, y in val.items()]) if isinstance(val, dbus.ObjectPath): for obj in (NetworkManager, Settings, AgentManager): if val == obj.object_path: return obj if val.startswith('/org/freedesktop/NetworkManager/'): classname = val.split('/')[4] classname = { 'Settings': 'Connection', 'Devices': 'Device', }.get(classname, classname) try: return globals()[classname](val) except ObjectVanished: return None if val == '/': return None if isinstance(val, (dbus.Signature, dbus.String)): return six.text_type(val) if isinstance(val, dbus.Boolean): return bool(val) if isinstance(val, (dbus.Int16, dbus.UInt16, dbus.Int32, dbus.UInt32, dbus.Int64, dbus.UInt64)): return int(val) if isinstance(val, dbus.Byte): return six.int2byte(int(val)) return val
def addr_to_dbus(addr, family): if family == socket.AF_INET: return dbus.UInt32(struct.unpack('I', socket.inet_pton(family, addr))[0]) else: return dbus.ByteArray(socket.inet_pton(family, addr))
def set_ap_scan(self, value): return self.__set_property("ApScan", dbus.UInt32(value))
def service_add(self, interface, protocol, name, type, domain, flags): logger.debug("Found service '%s' of type '%s:%s' in domain '%s' on %s.%i." % (name, type, flags, domain, self.siocgifname(interface), avahi.LOOKUP_RESULT_LOCAL)) # this check is for local services if flags & avahi.LOOKUP_RESULT_LOCAL: logger.debug('Dropping local service') return self.server.ResolveService(interface, protocol, name, type, domain, avahi.PROTO_INET, dbus.UInt32( 0), reply_handler=self.service_resolved, error_handler=logger.error)
def run(self): _bluetooth.make_discoverable(True, self.timeout) _bluetooth.set_adapter_property("Pairable", dbus.Boolean(1)) _bluetooth.set_adapter_property("PairableTimeout", dbus.UInt32(0)) self.agent = Agent(self.client_class, self.timeout, self.path) if not self._register(): self.shutdown() return self._mainloop.run()
def make_discoverable(self, value=True, timeout=180): try: adapter = bluezutils.find_adapter() except (bluezutils.BluezUtilError, dbus.exceptions.DBusException) as error: print_error(str(error) + "\n") return False try: props = dbus.Interface( self._bus.get_object("org.bluez", adapter.object_path), "org.freedesktop.DBus.Properties") timeout = int(timeout) value = int(value) if int(props.Get( "org.bluez.Adapter1", "DiscoverableTimeout")) != timeout: props.Set( "org.bluez.Adapter1", "DiscoverableTimeout", dbus.UInt32(timeout)) if int(props.Get("org.bluez.Adapter1", "Discoverable")) != value: props.Set( "org.bluez.Adapter1", "Discoverable", dbus.Boolean(value)) except dbus.exceptions.DBusException as error: print_error(str(error) + "\n") return False return True
def discoverable_timeout(self, value): self._adapter_props.Set('org.bluez.Adapter1', 'DiscoverableTimeout', dbus.UInt32(value))
def RequestPasskey(self, device): self.log().info("RequestPasskey (%s)" % (device)) return dbus.UInt32(self.__passcode)
def enable_pairability(self, timeout = 0): self._set_property('Discoverable', True) self._set_property('Pairable', True) self._set_property('PairableTimeout', dbus.UInt32(timeout)) self._set_property('DiscoverableTimeout', dbus.UInt32(timeout))
def disable_pairability(self): try: self._set_property('Discoverable', False) self._set_property('Pairable', False) self._set_property('PairableTimeout', dbus.UInt32(0)) self._set_property('DiscoverableTimeout', dbus.UInt32(180)) except: pass
def RequestPasskey(self, device): self.log().debug("RequestPasskey (%s)" % (device)) return dbus.UInt32(self._passcode)
def _check_polkit_privilege(self, sender, conn, privilege): '''Verify that sender has a given PolicyKit privilege. sender is the sender's (private) D-BUS name, such as ":1:42" (sender_keyword in @dbus.service.methods). conn is the dbus.Connection object (connection_keyword in @dbus.service.methods). privilege is the PolicyKit privilege string. This method returns if the caller is privileged, and otherwise throws a PermissionDeniedByPolicy exception. ''' if sender is None and conn is None: # called locally, not through D-BUS return if not self.enforce_polkit: # that happens for testing purposes when running on the session # bus, and it does not make sense to restrict operations here return # get peer PID if self.dbus_info is None: self.dbus_info = dbus.Interface(conn.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus/Bus', False), 'org.freedesktop.DBus') pid = self.dbus_info.GetConnectionUnixProcessID(sender) # query PolicyKit if self.polkit is None: self.polkit = dbus.Interface(dbus.SystemBus().get_object( 'org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority', False), 'org.freedesktop.PolicyKit1.Authority') try: # we don't need is_challenge return here, since we call with AllowUserInteraction (is_auth, unused, details) = self.polkit.CheckAuthorization( ('unix-process', {'pid': dbus.UInt32(pid, variant_level=1), 'start-time': dbus.UInt64(0, variant_level=1)}), privilege, {'': ''}, dbus.UInt32(1), '', timeout=600) except dbus.DBusException as msg: if msg.get_dbus_name() == \ 'org.freedesktop.DBus.Error.ServiceUnknown': # polkitd timed out, connect again self.polkit = None return self._check_polkit_privilege(sender, conn, privilege) else: raise if not is_auth: logging.debug('_check_polkit_privilege: sender %s on connection %s pid %i is not authorized for %s: %s', sender, conn, pid, privilege, str(details)) raise PermissionDeniedByPolicy(privilege) # # Internal API for calling from Handlers (not exported through D-BUS) #
def __init__(self, onBluetoothConnected_callback, onPlayerChanged_callback): self.onBluetoothConnected_callback = onBluetoothConnected_callback self.onPlayerChanged_callback = onPlayerChanged_callback # Get the system bus try: dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) self.bus = dbus.SystemBus() except Exception as ex: print("Unable to get the system dbus: '{0}'. Exiting. Is dbus running?".format(ex.message)) return False adapter_path = bluezutils.find_adapter(None).object_path adapter = dbus.Interface(self.bus.get_object(bluezutils.SERVICE_NAME, adapter_path), "org.freedesktop.DBus.Properties") # Power on adapter adapter.Set(bluezutils.ADAPTER_INTERFACE, "Powered", dbus.Boolean(1)) # Set name to display in available connections adapter.Set(bluezutils.ADAPTER_INTERFACE, "Alias", bluezutils.BT_DEVICE_NAME) # Set pairable on adapter.Set(bluezutils.ADAPTER_INTERFACE, "Pairable", dbus.Boolean(1)) # Set discoverable on adapter.Set(bluezutils.ADAPTER_INTERFACE, "Discoverable", dbus.Boolean(1)) # Set discoverable timeout to 0 adapter.Set(bluezutils.ADAPTER_INTERFACE, "DiscoverableTimeout", dbus.UInt32(0)) # Set paraible time out to 0 adapter.Set(bluezutils.ADAPTER_INTERFACE, "PairableTimeout", dbus.UInt32(0)) bluezutils.show_adapter_info() self.path = "/test/agent" agent = bluezutils.Agent(self.bus, self.path) obj = self.bus.get_object(bluezutils.SERVICE_NAME, "/org/bluez"); self.manager = dbus.Interface(obj, "org.bluez.AgentManager1") self.manager.RegisterAgent(self.path, "NoInputNoOutput") print("Bluetooth AgentManager registered") # listen for signal of remove adapter # self.bus.add_signal_receiver(self.interfaces_removed, bus_name=bluezutils.SERVICE_NAME, dbus_interface="org.freedesktop.DBus.ObjectManager", signal_name="InterfacesRemoved") # listen for signals on the Bluez bus self.bus.add_signal_receiver(self.device_property_changed, bus_name=bluezutils.SERVICE_NAME, signal_name="PropertiesChanged", path_keyword="device_path", interface_keyword="interface") # listen for signal of changing properties self.bus.add_signal_receiver(self.player_changed, bus_name=bluezutils.SERVICE_NAME, dbus_interface="org.freedesktop.DBus.Properties", signal_name="PropertiesChanged", path_keyword="path") print("Signals receiver registered") self.manager.RequestDefaultAgent(self.path)