我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用dbus.SystemBus()。
def enable_networking(self, val): """ function enables/disables networking depending upon the 'val' argument if val is True, networking is enabled if val is False, networking is disabled """ try: bus = dbus.SystemBus() wifi = bus.get_object('org.freedesktop.NetworkManager', '/org/freedesktop/NetworkManager') iface = dbus.Interface(wifi, dbus_interface='org.freedesktop.NetworkManager') # enabling/disabling networking m = iface.get_dbus_method("Enable", dbus_interface=None) m(val) except: pass
def get_active_connection_info(self, ac_path): bus = dbus.SystemBus() wifi = bus.get_object('org.freedesktop.NetworkManager', ac_path) iface = dbus.Interface(wifi, dbus_interface='org.freedesktop.DBus.Properties') # creating proxy 'Get' method m = iface.get_dbus_method("Get", dbus_interface=None) # getting Id of active connection Id = m("org.freedesktop.NetworkManager.Connection.Active", "Id") # getting Type of active connection Type = m("org.freedesktop.NetworkManager.Connection.Active", "Type") # getting Uuid of active connection Uuid = m("org.freedesktop.NetworkManager.Connection.Active", "Uuid") # getting State of active connection State = m("org.freedesktop.NetworkManager.Connection.Active", "State") # NOTE: # this function only returns properties like Id, Type, Uuid, State of an active connection # However, other properties like Dhcp4Config, Dhcp6Config, Ip4Config, Ip6Config etc. can also be obtained return (str(Id), str(Type), str(Uuid), int(State))
def get_access_point_brief_info(self, ap_path): bus = dbus.SystemBus() obj = bus.get_object('org.freedesktop.NetworkManager', ap_path) iface = dbus.Interface(obj, dbus_interface='org.freedesktop.DBus.Properties') m = iface.get_dbus_method("Get", dbus_interface=None) # getting Ssid dbusArray = m("org.freedesktop.NetworkManager.AccessPoint", "Ssid") Ssid = ''.join([chr(character) for character in dbusArray]) # getting Strength Strength = m("org.freedesktop.NetworkManager.AccessPoint", "Strength") # getting HwAddress HwAddress = m("org.freedesktop.NetworkManager.AccessPoint", "HwAddress") # getting Mode Mode = m("org.freedesktop.NetworkManager.AccessPoint", "Mode") return (Ssid, int(Strength), str(HwAddress), int(Mode))
def find_device_in_objects(objects, device_address, adapter_pattern=None): bus = dbus.SystemBus() path_prefix = "" if adapter_pattern: adapter = find_adapter_in_objects(objects, adapter_pattern) path_prefix = adapter.object_path for path, ifaces in objects.iteritems(): device = ifaces.get(DEVICE_INTERFACE) if device is None: continue if (device["Address"] == device_address and path.startswith(path_prefix)): obj = bus.get_object(SERVICE_NAME, path) return dbus.Interface(obj, DEVICE_INTERFACE) raise Exception("Bluetooth device not found")
def __init__(self, adapter_name): self.listener = None self.adapter_name = adapter_name self._bus = dbus.SystemBus() try: adapter_object = self._bus.get_object('org.bluez', '/org/bluez/' + adapter_name) except dbus.exceptions.DBusException as e: raise _error_from_dbus_error(e) object_manager_object = self._bus.get_object("org.bluez", "/") self._adapter = dbus.Interface(adapter_object, 'org.bluez.Adapter1') self._adapter_properties = dbus.Interface(self._adapter, 'org.freedesktop.DBus.Properties') self._object_manager = dbus.Interface(object_manager_object, "org.freedesktop.DBus.ObjectManager") self._device_path_regex = re.compile('^/org/bluez/' + adapter_name + '/dev((_[A-Z0-9]{2}){6})$') self._devices = {} self._discovered_devices = {} self._interface_added_signal = None self._properties_changed_signal = None self._main_loop = None self.update_devices()
def _prepair(self): '''Try to connect to the given dbus services. If successful it will return a callable dbus proxy and those arguments. ''' try: sessionbus = dbus.SessionBus() systembus = dbus.SystemBus() except: return (None, None) for dbus_props in self.DBUS_SHUTDOWN.values(): try: if dbus_props['bus'] == SESSION_BUS: bus = sessionbus else: bus = systembus interface = bus.get_object(dbus_props['service'], dbus_props['objectPath']) proxy = interface.get_dbus_method(dbus_props['method'], dbus_props['interface']) return (proxy, dbus_props['arguments']) except dbus.exceptions.DBusException: continue return (None, None)
def _check_permission(self, sender, action): ''' Verifies if the specified action is permitted, and raises an AccessDeniedException if not. The caller should use ObtainAuthorization() to get permission. ''' try: if sender: kit = dbus.SystemBus().get_object('org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority') kit = dbus.Interface(kit, 'org.freedesktop.PolicyKit1.Authority') (granted, _, details) = kit.CheckAuthorization( ('system-bus-name', {'name': sender}), action, {}, dbus.UInt32(1), '', timeout=600) if not granted: raise AccessDeniedException('Session not authorized by PolicyKit') except AccessDeniedException: raise except dbus.DBusException, ex: raise AccessDeniedException(ex.message)
def create_dbus_server(cls, session_bus=False): '''Return a D-BUS server backend instance. Normally this connects to the system bus. Set session_bus to True to connect to the session bus (for testing). ''' backend = Backend() dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) if session_bus: backend.bus = dbus.SessionBus() backend.enforce_polkit = False else: backend.bus = dbus.SystemBus() try: backend.dbus_name = dbus.service.BusName(DBUS_BUS_NAME, backend.bus) except dbus.exceptions.DBusException as msg: logging.error("Exception when spawning dbus service") logging.error(msg) return None return backend # # Internal methods #
def backend(self): '''Return D-BUS backend client interface. This gets initialized lazily. ''' if self._dbus_iface is None: try: bus = dbus.SystemBus() self._dbus_iface = dbus.Interface(bus.get_object(DBUS_BUS_NAME, '/RecoveryMedia'), DBUS_INTERFACE_NAME) except dbus.DBusException as msg: self.dbus_exception_handler(msg) sys.exit(1) except Exception as msg: self.show_alert(Gtk.MessageType.ERROR, "Exception", str(msg), transient_for=self.tool_widgets.get_object('tool_selector')) return self._dbus_iface
def __init__(self, device_id=None): """Default initialiser. 1. Initialises the program loop using ``GObject``. 2. Registers the Application on the D-Bus. 3. Initialises the list of services offered by the application. """ # Initialise the D-Bus path and register it self.bus = dbus.SystemBus() self.path = '/ukBaz/bluezero' self.bus_name = dbus.service.BusName('ukBaz.bluezero', self.bus) dbus.service.Object.__init__(self, self.bus_name, self.path) # Objects to be associated with this service self.managed_objs = [] self.eventloop = async_tools.EventLoop()
def __init__(self, service_id, uuid, primary): """Default initialiser. 1. Registers the service on the D-Bus. 2. Sets up the service UUID and primary flags. :param service_id: :param uuid: service BLE UUID :param primary: whether or not the service is a primary service """ # Setup D-Bus object paths and register service self.path = self.PATH_BASE + str('{0:04d}'.format(service_id)) self.bus = dbus.SystemBus() self.interface = constants.GATT_SERVICE_IFACE dbus.service.Object.__init__(self, self.bus, self.path) self.props = { constants.GATT_SERVICE_IFACE: { 'UUID': uuid, 'Primary': primary} }
def __init__(self, uuid, primary, type='peripheral'): """Default initialiser. 1. Registers the service on the D-Bus. 2. Sets up the service UUID and primary flags. 3. Initialises the list of characteristics associated with the service. :param uuid: service BLE UUID :param primary: whether or not the service is a primary service """ # Setup D-Bus object paths and register service self.index = id(self) self.path = self.PATH_BASE + str(self.index) self.bus = dbus.SystemBus() dbus.service.Object.__init__(self, self.bus, self.path) # Setup UUID, primary flag self.uuid = uuid self.primary = primary self.type = type self.service_data = None # Initialise characteristics within the service self.characteristics = []
def __init__(self, adapter_addr=None): self.bus = dbus.SystemBus() if adapter_addr is None: adapters = adapter.list_adapters() if len(adapters) > 0: adapter_addr = adapters[0] self.advert_mngr_path = dbus_tools.get_dbus_path(adapter=adapter_addr) self.advert_mngr_obj = self.bus.get_object( constants.BLUEZ_SERVICE_NAME, self.advert_mngr_path) self.advert_mngr_methods = dbus.Interface( self.advert_mngr_obj, constants.LE_ADVERTISING_MANAGER_IFACE) self.advert_mngr_props = dbus.Interface(self.advert_mngr_obj, dbus.PROPERTIES_IFACE)
def __init__(self, adapter_addr, device_addr, profile_uuid): """ Remote GATT Profile Initialisation. :param profile_path: dbus path to the profile. """ self.profile_path = dbus_tools.get_profile_path(adapter_addr, device_addr, profile_uuid) self.bus = dbus.SystemBus() self.profile_object = self.bus.get_object( constants.BLUEZ_SERVICE_NAME, self.profile_path) self.profile_methods = dbus.Interface( self.profile_object, constants.GATT_PROFILE_IFACE) self.profile_props = dbus.Interface(self.profile_object, dbus.PROPERTIES_IFACE)
def __init__(self, adapter_addr): """ GATT Manager Initialisation. :param manager_path: dbus path to the GATT Manager. """ self.manager_path = dbus_tools.get_dbus_path(adapter_addr) self.bus = dbus.SystemBus() self.manager_obj = self.bus.get_object( constants.BLUEZ_SERVICE_NAME, self.manager_path) self.manager_methods = dbus.Interface( self.manager_obj, constants.GATT_MANAGER_IFACE) self.manager_props = dbus.Interface(self.manager_obj, dbus.PROPERTIES_IFACE)
def list_adapters(): """Return list of adapters address available on system.""" paths = [] addresses = [] bus = dbus.SystemBus() manager = dbus.Interface( bus.get_object(constants.BLUEZ_SERVICE_NAME, '/'), constants.DBUS_OM_IFACE) manager_obj = manager.GetManagedObjects() for path, ifaces in manager_obj.items(): if constants.ADAPTER_INTERFACE in ifaces: paths.append(path) addresses.append( manager_obj[path][constants.ADAPTER_INTERFACE]['Address']) if len(paths) < 1: raise AdapterError('No Bluetooth adapter found') else: return addresses
def __init__(self, services=[], filters=[avahi.LOOKUP_RESULT_LOCAL], interface=avahi.IF_UNSPEC, protocol=avahi.PROTO_INET): GObject.GObject.__init__(self) self.filters = filters self.services = services self.interface = interface self.protocol = protocol try: self.system_bus = dbus.SystemBus() self.system_bus.add_signal_receiver( self.avahi_dbus_connect_cb, "NameOwnerChanged", "org.freedesktop.DBus", arg0="org.freedesktop.Avahi") except dbus.DBusException as e: logger.error("Error Owning name on D-Bus: %s", e) sys.exit(1) self.db = ServiceTypeDatabase() self.service_browsers = {} self.started = False
def get_paired_devices(device_name): paired_devices = [] bus = dbus.SystemBus() adapter_path = find_adapter(device_name).object_path om = dbus.Interface(bus.get_object(SERVICE_NAME, "/"), "org.freedesktop.DBus.ObjectManager") objects = om.GetManagedObjects() for path, interfaces in objects.items(): if DEVICE_INTERFACE not in interfaces: continue properties = interfaces[DEVICE_INTERFACE] if properties["Adapter"] != adapter_path: continue paired_devices.append((str(properties["Address"]), str(properties["Alias"]))) return paired_devices
def show_adapter_info(): bus = dbus.SystemBus() om = dbus.Interface(bus.get_object(SERVICE_NAME, "/"), "org.freedesktop.DBus.ObjectManager") objects = om.GetManagedObjects() for path, interfaces in objects.iteritems(): if ADAPTER_INTERFACE not in interfaces: continue print(" [ %s ]" % (path)) props = interfaces[ADAPTER_INTERFACE] for (key, value) in props.items(): if (key == "Class"): print(" %s = 0x%06x" % (key, value)) elif (key == "UUIDs"): continue else: print(" %s = %s" % (key, value)) print()
def publish(self): bus = dbus.SystemBus() server = dbus.Interface( bus.get_object( avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER ), avahi.DBUS_INTERFACE_SERVER ) g = dbus.Interface( bus.get_object( avahi.DBUS_NAME, server.EntryGroupNew() ), avahi.DBUS_INTERFACE_ENTRY_GROUP ) g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, dbus.UInt32(0), self.name, self.stype, self.domain, self.host, dbus.UInt16(self.port), self.text) g.Commit() self.group = g
def _setup_signals(self): """ Connect signals to the PkProgress from libpackagekitlib, because PK DBus exposes only a generic Changed, without specifying the property changed """ self._trans.connect('notify::role', self._emit, 'role-changed', 'role') self._trans.connect('notify::status', self._emit, 'status-changed', 'status') self._trans.connect('notify::percentage', self._emit, 'progress-changed', 'percentage') # SC UI does not support subprogress: #self._trans.connect('notify::subpercentage', self._emit, # 'progress-changed', 'subpercentage') self._trans.connect('notify::percentage', self._emit, 'progress-changed', 'percentage') self._trans.connect('notify::allow-cancel', self._emit, 'cancellable-changed', 'allow-cancel') # connect the delete: proxy = dbus.SystemBus().get_object('org.freedesktop.PackageKit', self.tid) trans = dbus.Interface(proxy, 'org.freedesktop.PackageKit.Transaction') trans.connect_to_signal("Destroy", self._remove)
def is_service_running(service): """ Queries systemd through dbus to see if the service is running """ service_running = False bus = SystemBus() systemd = bus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1') manager = Interface(systemd, dbus_interface='org.freedesktop.systemd1.Manager') try: service_unit = service if service.endswith('.service') else manager.GetUnit('{0}.service'.format(service)) service_proxy = bus.get_object('org.freedesktop.systemd1', str(service_unit)) service_properties = Interface(service_proxy, dbus_interface='org.freedesktop.DBus.Properties') service_load_state = service_properties.Get('org.freedesktop.systemd1.Unit', 'LoadState') service_active_state = service_properties.Get('org.freedesktop.systemd1.Unit', 'ActiveState') if service_load_state == 'loaded' and service_active_state == 'active': service_running = True except DBusException: pass return service_running
def on_battery() -> bool: """Check if we are running on Battery power.""" log.debug("Checking if running on Battery power.") if has_battery(): bus, upower_path = dbus.SystemBus(), '/org/freedesktop/UPower' upower = bus.get_object('org.freedesktop.UPower', upower_path) result = __get_prop(upower, upower_path, 'OnBattery') if result is None: # Cannot read property, something is wrong. print(f"Failed to read D-Bus property: {upower_path}.") result = False # Assume we are connected to a power supply. return result return False # # def on_wifi(): # """Check if we are running on wifi.""" # try: # bus = dbus.SystemBus() # manager = bus.get_object('org.freedesktop.NetworkManager', # '/org/freedesktop/NetworkManager') # # FIXME this returns int, I dunno what they mean ?, investigate. # return __get_prop(manager, 'org.freedesktop.NetworkManager', # 'WirelessEnabled') # except Exception: # return False
def __init__(self): super(ModeBluetooth, self).__init__() # Create a basic menu. self.menu = [("Show Device", self.show_device)] self.build_menu() # Disable the bluetooth adaptor self.start_bluetooth(False) # Initialise dbus for metadata retrieval self.bus = dbus.SystemBus() # Initialise some basic variables self.running = False self.player = None
def get_introspection(self): bus = dbus.SystemBus() obj = bus.get_object('org.freedesktop.NetworkManager', '/org/freedesktop/NetworkManager') iface = dbus.Interface(obj, dbus_interface='org.freedesktop.DBus.Introspectable') # getting introspection xml m = iface.get_dbus_method("Introspect", dbus_interface=None) return m()
def get_devices(self): bus = dbus.SystemBus() wifi = bus.get_object('org.freedesktop.NetworkManager', '/org/freedesktop/NetworkManager') iface = dbus.Interface(wifi, dbus_interface='org.freedesktop.NetworkManager') # getting all devices m = iface.get_dbus_method("GetDevices", dbus_interface=None) devs = [] for dev in m(): devs.append("%s" % dev) return devs
def get_active_connections(self): bus = dbus.SystemBus() wifi = bus.get_object('org.freedesktop.NetworkManager', '/org/freedesktop/NetworkManager') iface = dbus.Interface(wifi, dbus_interface='org.freedesktop.DBus.Properties') m = iface.get_dbus_method("Get", dbus_interface=None) return [ str(ac) for ac in m("org.freedesktop.NetworkManager", "ActiveConnections") ]
def get_wifi_access_points_by_dev(self, device_path): bus = dbus.SystemBus() obj = bus.get_object('org.freedesktop.NetworkManager', device_path) iface = dbus.Interface(obj, dbus_interface='org.freedesktop.NetworkManager.Device.Wireless') # getting all wireless access points m = iface.get_dbus_method("GetAccessPoints", dbus_interface=None) return [str(ap) for ap in m()]
def get_access_point_all_info(self, ap_path): bus = dbus.SystemBus() obj = bus.get_object('org.freedesktop.NetworkManager', ap_path) iface = dbus.Interface(obj, dbus_interface='org.freedesktop.DBus.Properties') m = iface.get_dbus_method("GetAll", dbus_interface=None) # getting all ppoperties like Ssid, Strength, HwAddress etc. props = m("org.freedesktop.NetworkManager.AccessPoint") for k,v in props.iteritems(): print k,v return props
def activate_connection(self, con_path, dev_path, obj_path): bus = dbus.SystemBus() wifi = bus.get_object('org.freedesktop.NetworkManager', '/org/freedesktop/NetworkManager') iface = dbus.Interface(wifi, dbus_interface='org.freedesktop.NetworkManager') # activating connection m = iface.get_dbus_method("ActivateConnection", dbus_interface=None) active_connection = m(con_path, dev_path, obj_path) # on success, active connection object/path is returned return active_connection
def __init__(self): # initially set the standard logger self.set_logger(logging.getLogger(__name__)) # initially set an empty configuration self.set_config(configparser.ConfigParser()) # set up the quit signals self.setup_signals( signals = [signal.SIGINT, signal.SIGTERM, signal.SIGHUP], handler = self.please_stop_now ) # use glib as default mailoop for dbus dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) dbus.mainloop.glib.threads_init() GLib.threads_init() self.loop = GLib.MainLoop() # create mainloop self.systembus = dbus.SystemBus() # the system bus self.systembus.request_name(CO2MONITOR_BUSNAME) # request the bus name bus_name = dbus.service.BusName(CO2MONITOR_BUSNAME, self.systembus) # create bus name # register the object on the bus name dbus.service.Object.__init__(self, bus_name, CO2MONITOR_OBJECTPATH)
def __init__(self, devicefile): # by default, log! self.do_data_logging = True # initially set the standard logger self.set_logger(logging.getLogger(__name__)) # initially set an empty configuration self.set_config(configparser.ConfigParser()) # use glib as default mailoop for dbus dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) dbus.mainloop.glib.threads_init() GLib.threads_init() self.systembus = dbus.SystemBus() # the system bus self.systembus.request_name(CO2MONITOR_BUSNAME) # request the bus name bus_name = dbus.service.BusName(CO2MONITOR_BUSNAME, self.systembus) # create bus name self.devicefile = devicefile # set up the device self.device = device.co2device(self.devicefile) # register the object on the bus name objectpath = "/".join([CO2MONITOR_OBJECTPATH, utils.devicefile2objectname(self.devicefile)]) dbus.service.Object.__init__(self, bus_name, objectpath) self.update_status(_("idle")) threading.Thread.__init__(self) # set the config
def __init__(self, user=False): self.bus = SessionBus() if user else SystemBus() systemd = self.bus.get_object(SYSTEMD_BUSNAME, SYSTEMD_PATH) self.manager = Interface(systemd, dbus_interface=SYSTEMD_MANAGER_INTERFACE)
def get_managed_objects(): bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager") return manager.GetManagedObjects()
def find_adapter_in_objects(objects, pattern=None): bus = dbus.SystemBus() for path, ifaces in objects.iteritems(): adapter = ifaces.get(ADAPTER_INTERFACE) if adapter is None: continue if not pattern or pattern == adapter["Address"] or \ path.endswith(pattern): obj = bus.get_object(SERVICE_NAME, path) return dbus.Interface(obj, ADAPTER_INTERFACE) raise Exception("Bluetooth adapter not found")
def get_dbus( bus_type, obj, path, interface, method, arg): """ utility: executes dbus method on specific interface""" if bus_type == "session": bus = dbus.SessionBus() if bus_type == "system": bus = dbus.SystemBus() proxy = bus.get_object(obj, path) method = proxy.get_dbus_method(method, interface) if arg: return method(arg) else: return method()
def get_dbus_property( bus_type, obj, path, iface, prop): """ utility:reads properties defined on specific dbus interface""" if bus_type == "session": bus = dbus.SessionBus() if bus_type == "system": bus = dbus.SystemBus() proxy = bus.get_object(obj, path) aux = 'org.freedesktop.DBus.Properties' props_iface = dbus.Interface(proxy, aux) try: props = props_iface.Get(iface, prop) return props except: return None
def main(): parser = argparse.ArgumentParser() parser.add_argument('-a', '--adapter-name', type=str, help='Adapter name', default='') args = parser.parse_args() adapter_name = args.adapter_name dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() mainloop = GObject.MainLoop() advertising.advertising_main(mainloop, bus, adapter_name) gatt_server.gatt_server_main(mainloop, bus, adapter_name) mainloop.run()
def __init__(self): bus = dbus.SystemBus() bat0_object = bus.get_object( 'org.freedesktop.UPower', '/org/freedesktop/UPower/devices/battery_BAT0') self.__statistics = bat0_object.get_dbus_method( 'GetStatistics', 'org.freedesktop.UPower.Device') self.__history = bat0_object.get_dbus_method( 'GetHistory', 'org.freedesktop.UPower.Device') self.bat0 = dbus.Interface(bat0_object, 'org.freedesktop.DBus.Properties')
def setup_signals(self): if not self.setup: bus = dbus.SystemBus() for interface in self.interfaces: bus.add_signal_receiver(self.handle_signal, dbus_interface=interface, interface_keyword='interface', member_keyword='signal', path_keyword='path') self.setup = True self.listen_for_restarts()