Python dbus 模块,SystemBus() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用dbus.SystemBus()

项目:PyDbusNetworkManager    作者:stoic1979    | 项目源码 | 文件源码
def enable_networking(self, val):
        """
        function enables/disables networking depending upon the 'val' argument
        if val is True, networking is enabled
        if val is False, networking is disabled
        """
        try:
            bus = dbus.SystemBus()
            wifi = bus.get_object('org.freedesktop.NetworkManager', '/org/freedesktop/NetworkManager')

            iface = dbus.Interface(wifi, dbus_interface='org.freedesktop.NetworkManager')

            # enabling/disabling networking
            m = iface.get_dbus_method("Enable", dbus_interface=None)
            m(val)
        except:
            pass
项目:PyDbusNetworkManager    作者:stoic1979    | 项目源码 | 文件源码
def get_active_connection_info(self, ac_path):
        bus = dbus.SystemBus()
        wifi = bus.get_object('org.freedesktop.NetworkManager', ac_path)

        iface = dbus.Interface(wifi, dbus_interface='org.freedesktop.DBus.Properties')

        # creating proxy 'Get' method
        m = iface.get_dbus_method("Get", dbus_interface=None)

        # getting Id of active connection
        Id = m("org.freedesktop.NetworkManager.Connection.Active", "Id")

        # getting Type of active connection
        Type = m("org.freedesktop.NetworkManager.Connection.Active", "Type")

        # getting Uuid of active connection
        Uuid = m("org.freedesktop.NetworkManager.Connection.Active", "Uuid")

        # getting State of active connection
        State = m("org.freedesktop.NetworkManager.Connection.Active", "State")

        # NOTE:
        # this function only returns properties like Id, Type, Uuid, State of an active connection
        # However, other properties like Dhcp4Config, Dhcp6Config, Ip4Config, Ip6Config etc. can also be obtained
        return (str(Id), str(Type), str(Uuid), int(State))
项目:PyDbusNetworkManager    作者:stoic1979    | 项目源码 | 文件源码
def get_access_point_brief_info(self, ap_path):

        bus = dbus.SystemBus()
        obj = bus.get_object('org.freedesktop.NetworkManager', ap_path)

        iface = dbus.Interface(obj, dbus_interface='org.freedesktop.DBus.Properties')

        m = iface.get_dbus_method("Get", dbus_interface=None)

        # getting Ssid
        dbusArray = m("org.freedesktop.NetworkManager.AccessPoint", "Ssid")
        Ssid = ''.join([chr(character) for character in dbusArray])

        # getting Strength
        Strength = m("org.freedesktop.NetworkManager.AccessPoint", "Strength")

        # getting HwAddress
        HwAddress = m("org.freedesktop.NetworkManager.AccessPoint", "HwAddress")

        # getting Mode
        Mode = m("org.freedesktop.NetworkManager.AccessPoint", "Mode")

        return (Ssid, int(Strength), str(HwAddress), int(Mode))
项目:OpenXCAccessory    作者:openxc    | 项目源码 | 文件源码
def find_device_in_objects(objects, device_address, adapter_pattern=None):
    bus = dbus.SystemBus()
    path_prefix = ""
    if adapter_pattern:
        adapter = find_adapter_in_objects(objects, adapter_pattern)
        path_prefix = adapter.object_path
    for path, ifaces in objects.iteritems():
        device = ifaces.get(DEVICE_INTERFACE)
        if device is None:
            continue
        if (device["Address"] == device_address and
                        path.startswith(path_prefix)):
            obj = bus.get_object(SERVICE_NAME, path)
            return dbus.Interface(obj, DEVICE_INTERFACE)

    raise Exception("Bluetooth device not found")
项目:gatt-python    作者:getsenic    | 项目源码 | 文件源码
def __init__(self, adapter_name):
        self.listener = None
        self.adapter_name = adapter_name

        self._bus = dbus.SystemBus()
        try:
            adapter_object = self._bus.get_object('org.bluez', '/org/bluez/' + adapter_name)
        except dbus.exceptions.DBusException as e:
            raise _error_from_dbus_error(e)
        object_manager_object = self._bus.get_object("org.bluez", "/")
        self._adapter = dbus.Interface(adapter_object, 'org.bluez.Adapter1')
        self._adapter_properties = dbus.Interface(self._adapter, 'org.freedesktop.DBus.Properties')
        self._object_manager = dbus.Interface(object_manager_object, "org.freedesktop.DBus.ObjectManager")
        self._device_path_regex = re.compile('^/org/bluez/' + adapter_name + '/dev((_[A-Z0-9]{2}){6})$')
        self._devices = {}
        self._discovered_devices = {}
        self._interface_added_signal = None
        self._properties_changed_signal = None
        self._main_loop = None

        self.update_devices()
项目:bcloud    作者:wangYanJava    | 项目源码 | 文件源码
def _prepair(self):
        '''Try to connect to the given dbus services. If successful it will
        return a callable dbus proxy and those arguments.
        '''
        try:
            sessionbus = dbus.SessionBus()
            systembus  = dbus.SystemBus()
        except:
            return (None, None)
        for dbus_props in self.DBUS_SHUTDOWN.values():
            try:
                if dbus_props['bus'] == SESSION_BUS:
                    bus = sessionbus
                else:
                    bus = systembus
                interface = bus.get_object(dbus_props['service'],
                                           dbus_props['objectPath'])
                proxy = interface.get_dbus_method(dbus_props['method'],
                                                  dbus_props['interface'])
                return (proxy, dbus_props['arguments'])
            except dbus.exceptions.DBusException:
                continue
        return (None, None)
项目:ubuntu-cleaner    作者:gerardpuig    | 项目源码 | 文件源码
def _check_permission(self, sender, action):
        '''
        Verifies if the specified action is permitted, and raises
        an AccessDeniedException if not.

        The caller should use ObtainAuthorization() to get permission.
        '''

        try:
            if sender:
                kit = dbus.SystemBus().get_object('org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority')
                kit = dbus.Interface(kit, 'org.freedesktop.PolicyKit1.Authority')

                (granted, _, details) = kit.CheckAuthorization(
                                ('system-bus-name', {'name': sender}),
                                action, {}, dbus.UInt32(1), '', timeout=600)

                if not granted:
                    raise AccessDeniedException('Session not authorized by PolicyKit')

        except AccessDeniedException:
            raise

        except dbus.DBusException, ex:
            raise AccessDeniedException(ex.message)
项目:dell-recovery    作者:dell    | 项目源码 | 文件源码
def create_dbus_server(cls, session_bus=False):
        '''Return a D-BUS server backend instance.

        Normally this connects to the system bus. Set session_bus to True to
        connect to the session bus (for testing).

        '''
        backend = Backend()
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        if session_bus:
            backend.bus = dbus.SessionBus()
            backend.enforce_polkit = False
        else:
            backend.bus = dbus.SystemBus()
        try:
            backend.dbus_name = dbus.service.BusName(DBUS_BUS_NAME, backend.bus)
        except dbus.exceptions.DBusException as msg:
            logging.error("Exception when spawning dbus service")
            logging.error(msg)
            return None
        return backend

    #
    # Internal methods
    #
项目:dell-recovery    作者:dell    | 项目源码 | 文件源码
def backend(self):
        '''Return D-BUS backend client interface.

        This gets initialized lazily.
        '''
        if self._dbus_iface is None:
            try:
                bus = dbus.SystemBus()
                self._dbus_iface = dbus.Interface(bus.get_object(DBUS_BUS_NAME,
                                                  '/RecoveryMedia'),
                                                  DBUS_INTERFACE_NAME)
            except dbus.DBusException as msg:
                self.dbus_exception_handler(msg)
                sys.exit(1)
            except Exception as msg:
                self.show_alert(Gtk.MessageType.ERROR, "Exception", str(msg),
                           transient_for=self.tool_widgets.get_object('tool_selector'))

        return self._dbus_iface
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def __init__(self, device_id=None):
        """Default initialiser.

        1. Initialises the program loop using ``GObject``.
        2. Registers the Application on the D-Bus.
        3. Initialises the list of services offered by the application.

        """
        # Initialise the D-Bus path and register it
        self.bus = dbus.SystemBus()
        self.path = '/ukBaz/bluezero'
        self.bus_name = dbus.service.BusName('ukBaz.bluezero', self.bus)
        dbus.service.Object.__init__(self, self.bus_name, self.path)

        # Objects to be associated with this service
        self.managed_objs = []
        self.eventloop = async_tools.EventLoop()
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def __init__(self, service_id, uuid, primary):
        """Default initialiser.

        1. Registers the service on the D-Bus.
        2. Sets up the service UUID and primary flags.

        :param service_id:
        :param uuid: service BLE UUID
        :param primary: whether or not the service is a primary service
        """
        # Setup D-Bus object paths and register service
        self.path = self.PATH_BASE + str('{0:04d}'.format(service_id))
        self.bus = dbus.SystemBus()
        self.interface = constants.GATT_SERVICE_IFACE
        dbus.service.Object.__init__(self, self.bus, self.path)
        self.props = {
            constants.GATT_SERVICE_IFACE: {
                'UUID': uuid,
                'Primary': primary}
        }
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def __init__(self, uuid, primary, type='peripheral'):
        """Default initialiser.

        1. Registers the service on the D-Bus.
        2. Sets up the service UUID and primary flags.
        3. Initialises the list of characteristics associated with the service.

        :param uuid: service BLE UUID
        :param primary: whether or not the service is a primary service
        """
        # Setup D-Bus object paths and register service
        self.index = id(self)
        self.path = self.PATH_BASE + str(self.index)
        self.bus = dbus.SystemBus()
        dbus.service.Object.__init__(self, self.bus, self.path)

        # Setup UUID, primary flag
        self.uuid = uuid
        self.primary = primary
        self.type = type
        self.service_data = None

        # Initialise characteristics within the service
        self.characteristics = []
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def __init__(self, adapter_addr=None):

        self.bus = dbus.SystemBus()

        if adapter_addr is None:
            adapters = adapter.list_adapters()
            if len(adapters) > 0:
                adapter_addr = adapters[0]

        self.advert_mngr_path = dbus_tools.get_dbus_path(adapter=adapter_addr)
        self.advert_mngr_obj = self.bus.get_object(
            constants.BLUEZ_SERVICE_NAME,
            self.advert_mngr_path)
        self.advert_mngr_methods = dbus.Interface(
            self.advert_mngr_obj,
            constants.LE_ADVERTISING_MANAGER_IFACE)
        self.advert_mngr_props = dbus.Interface(self.advert_mngr_obj,
                                                dbus.PROPERTIES_IFACE)
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def __init__(self, adapter_addr, device_addr, profile_uuid):
        """
        Remote GATT Profile Initialisation.

        :param profile_path: dbus path to the profile.
        """
        self.profile_path = dbus_tools.get_profile_path(adapter_addr,
                                                        device_addr,
                                                        profile_uuid)
        self.bus = dbus.SystemBus()
        self.profile_object = self.bus.get_object(
            constants.BLUEZ_SERVICE_NAME,
            self.profile_path)
        self.profile_methods = dbus.Interface(
            self.profile_object,
            constants.GATT_PROFILE_IFACE)
        self.profile_props = dbus.Interface(self.profile_object,
                                            dbus.PROPERTIES_IFACE)
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def __init__(self, adapter_addr):
        """
        GATT Manager Initialisation.

        :param manager_path: dbus path to the GATT Manager.
        """
        self.manager_path = dbus_tools.get_dbus_path(adapter_addr)
        self.bus = dbus.SystemBus()
        self.manager_obj = self.bus.get_object(
            constants.BLUEZ_SERVICE_NAME,
            self.manager_path)
        self.manager_methods = dbus.Interface(
            self.manager_obj,
            constants.GATT_MANAGER_IFACE)
        self.manager_props = dbus.Interface(self.manager_obj,
                                            dbus.PROPERTIES_IFACE)
项目:python-bluezero    作者:ukBaz    | 项目源码 | 文件源码
def list_adapters():
    """Return list of adapters address available on system."""
    paths = []
    addresses = []
    bus = dbus.SystemBus()
    manager = dbus.Interface(
        bus.get_object(constants.BLUEZ_SERVICE_NAME, '/'),
        constants.DBUS_OM_IFACE)
    manager_obj = manager.GetManagedObjects()
    for path, ifaces in manager_obj.items():
        if constants.ADAPTER_INTERFACE in ifaces:
            paths.append(path)
            addresses.append(
                manager_obj[path][constants.ADAPTER_INTERFACE]['Address'])
    if len(paths) < 1:
        raise AdapterError('No Bluetooth adapter found')
    else:
        return addresses
项目:eos-data-distribution    作者:endlessm    | 项目源码 | 文件源码
def __init__(self, services=[], filters=[avahi.LOOKUP_RESULT_LOCAL],
                 interface=avahi.IF_UNSPEC, protocol=avahi.PROTO_INET):
        GObject.GObject.__init__(self)

        self.filters = filters
        self.services = services
        self.interface = interface
        self.protocol = protocol

        try:
            self.system_bus = dbus.SystemBus()
            self.system_bus.add_signal_receiver(
                self.avahi_dbus_connect_cb, "NameOwnerChanged", "org.freedesktop.DBus", arg0="org.freedesktop.Avahi")
        except dbus.DBusException as e:
            logger.error("Error Owning name on D-Bus: %s", e)
            sys.exit(1)

        self.db = ServiceTypeDatabase()
        self.service_browsers = {}
        self.started = False
项目:BlueDot    作者:martinohanlon    | 项目源码 | 文件源码
def get_paired_devices(device_name):
    paired_devices = []

    bus = dbus.SystemBus()
    adapter_path = find_adapter(device_name).object_path
    om = dbus.Interface(bus.get_object(SERVICE_NAME, "/"), "org.freedesktop.DBus.ObjectManager")
    objects = om.GetManagedObjects()

    for path, interfaces in objects.items():
        if DEVICE_INTERFACE not in interfaces:
            continue
        properties = interfaces[DEVICE_INTERFACE]
        if properties["Adapter"] != adapter_path:
            continue

        paired_devices.append((str(properties["Address"]), str(properties["Alias"])))

    return paired_devices
项目:BMW-RPi-iBUS    作者:KLUSEK    | 项目源码 | 文件源码
def show_adapter_info():
    bus = dbus.SystemBus()
    om = dbus.Interface(bus.get_object(SERVICE_NAME, "/"), "org.freedesktop.DBus.ObjectManager")
    objects = om.GetManagedObjects()
    for path, interfaces in objects.iteritems():
        if ADAPTER_INTERFACE not in interfaces:
            continue

        print(" [ %s ]" % (path))
        props = interfaces[ADAPTER_INTERFACE]

        for (key, value) in props.items():
            if (key == "Class"):
                print("    %s = 0x%06x" % (key, value))
            elif (key == "UUIDs"):
                continue                
            else:
                print("    %s = %s" % (key, value))
        print()
项目:SkiffOS    作者:paralin    | 项目源码 | 文件源码
def find_device_in_objects(objects, device_address, adapter_pattern=None):
    bus = dbus.SystemBus()
    path_prefix = ""
    if adapter_pattern:
        adapter = find_adapter_in_objects(objects, adapter_pattern)
        path_prefix = adapter.object_path
    for path, ifaces in objects.iteritems():
        device = ifaces.get(DEVICE_INTERFACE)
        if device is None:
            continue
        if (device["Address"] == device_address and
                        path.startswith(path_prefix)):
            obj = bus.get_object(SERVICE_NAME, path)
            return dbus.Interface(obj, DEVICE_INTERFACE)

    raise Exception("Bluetooth device not found")
项目:mpradio    作者:morrolinux    | 项目源码 | 文件源码
def find_device_in_objects(objects, device_address, adapter_pattern=None):
    bus = dbus.SystemBus()
    path_prefix = ""
    if adapter_pattern:
        adapter = find_adapter_in_objects(objects, adapter_pattern)
        path_prefix = adapter.object_path
    for path, ifaces in objects.iteritems():
        device = ifaces.get(DEVICE_INTERFACE)
        if device is None:
            continue
        if (device["Address"] == device_address and
                        path.startswith(path_prefix)):
            obj = bus.get_object(SERVICE_NAME, path)
            return dbus.Interface(obj, DEVICE_INTERFACE)

    raise Exception("Bluetooth device not found")
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def publish(self):
        bus = dbus.SystemBus()
        server = dbus.Interface(
            bus.get_object(
                avahi.DBUS_NAME,
                avahi.DBUS_PATH_SERVER
            ),
            avahi.DBUS_INTERFACE_SERVER
        )

        g = dbus.Interface(
            bus.get_object(
                avahi.DBUS_NAME,
                server.EntryGroupNew()
            ),
            avahi.DBUS_INTERFACE_ENTRY_GROUP
        )

        g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, dbus.UInt32(0),
                     self.name, self.stype, self.domain, self.host,
                     dbus.UInt16(self.port), self.text)

        g.Commit()
        self.group = g
项目:btc-fpga-miner    作者:marsohod4you    | 项目源码 | 文件源码
def publish(self):
        bus = dbus.SystemBus()
        server = dbus.Interface(
            bus.get_object(
                avahi.DBUS_NAME,
                avahi.DBUS_PATH_SERVER
            ),
            avahi.DBUS_INTERFACE_SERVER
        )

        g = dbus.Interface(
            bus.get_object(
                avahi.DBUS_NAME,
                server.EntryGroupNew()
            ),
            avahi.DBUS_INTERFACE_ENTRY_GROUP
        )

        g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, dbus.UInt32(0),
                     self.name, self.stype, self.domain, self.host,
                     dbus.UInt16(self.port), self.text)

        g.Commit()
        self.group = g
项目:DirtyTooth-RaspberryPi    作者:ElevenPaths    | 项目源码 | 文件源码
def find_device_in_objects(objects, device_address, adapter_pattern=None):
    bus = dbus.SystemBus()
    path_prefix = ""
    if adapter_pattern:
        adapter = find_adapter_in_objects(objects, adapter_pattern)
        path_prefix = adapter.object_path
    for path, ifaces in objects.iteritems():
        device = ifaces.get(DEVICE_INTERFACE)
        if device is None:
            continue
        if (device["Address"] == device_address and
                        path.startswith(path_prefix)):
            obj = bus.get_object(SERVICE_NAME, path)
            return dbus.Interface(obj, DEVICE_INTERFACE)

    raise Exception("Bluetooth device not found")
项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def _setup_signals(self):
        """ Connect signals to the PkProgress from libpackagekitlib,
        because PK DBus exposes only a generic Changed, without
        specifying the property changed
        """
        self._trans.connect('notify::role', self._emit,
            'role-changed', 'role')
        self._trans.connect('notify::status', self._emit,
            'status-changed', 'status')
        self._trans.connect('notify::percentage', self._emit,
            'progress-changed', 'percentage')
        # SC UI does not support subprogress:
        #self._trans.connect('notify::subpercentage', self._emit,
        #    'progress-changed', 'subpercentage')
        self._trans.connect('notify::percentage', self._emit,
            'progress-changed', 'percentage')
        self._trans.connect('notify::allow-cancel', self._emit,
            'cancellable-changed', 'allow-cancel')

        # connect the delete:
        proxy = dbus.SystemBus().get_object('org.freedesktop.PackageKit',
            self.tid)
        trans = dbus.Interface(proxy, 'org.freedesktop.PackageKit.Transaction')
        trans.connect_to_signal("Destroy", self._remove)
项目:x-mario-center    作者:fossasia    | 项目源码 | 文件源码
def _setup_signals(self):
        """ Connect signals to the PkProgress from libpackagekitlib,
        because PK DBus exposes only a generic Changed, without
        specifying the property changed
        """
        self._trans.connect('notify::role', self._emit,
            'role-changed', 'role')
        self._trans.connect('notify::status', self._emit,
            'status-changed', 'status')
        self._trans.connect('notify::percentage', self._emit,
            'progress-changed', 'percentage')
        # SC UI does not support subprogress:
        #self._trans.connect('notify::subpercentage', self._emit,
        #    'progress-changed', 'subpercentage')
        self._trans.connect('notify::percentage', self._emit,
            'progress-changed', 'percentage')
        self._trans.connect('notify::allow-cancel', self._emit,
            'cancellable-changed', 'allow-cancel')

        # connect the delete:
        proxy = dbus.SystemBus().get_object('org.freedesktop.PackageKit',
            self.tid)
        trans = dbus.Interface(proxy, 'org.freedesktop.PackageKit.Transaction')
        trans.connect_to_signal("Destroy", self._remove)
项目:origin-ci-tool    作者:openshift    | 项目源码 | 文件源码
def is_service_running(service):
    """ Queries systemd through dbus to see if the service is running """
    service_running = False
    bus = SystemBus()
    systemd = bus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1')
    manager = Interface(systemd, dbus_interface='org.freedesktop.systemd1.Manager')
    try:
        service_unit = service if service.endswith('.service') else manager.GetUnit('{0}.service'.format(service))
        service_proxy = bus.get_object('org.freedesktop.systemd1', str(service_unit))
        service_properties = Interface(service_proxy, dbus_interface='org.freedesktop.DBus.Properties')
        service_load_state = service_properties.Get('org.freedesktop.systemd1.Unit', 'LoadState')
        service_active_state = service_properties.Get('org.freedesktop.systemd1.Unit', 'ActiveState')
        if service_load_state == 'loaded' and service_active_state == 'active':
            service_running = True
    except DBusException:
        pass

    return service_running
项目:origin-ci-tool    作者:openshift    | 项目源码 | 文件源码
def is_service_running(service):
    """ Queries systemd through dbus to see if the service is running """
    service_running = False
    bus = SystemBus()
    systemd = bus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1')
    manager = Interface(systemd, dbus_interface='org.freedesktop.systemd1.Manager')
    try:
        service_unit = service if service.endswith('.service') else manager.GetUnit('{0}.service'.format(service))
        service_proxy = bus.get_object('org.freedesktop.systemd1', str(service_unit))
        service_properties = Interface(service_proxy, dbus_interface='org.freedesktop.DBus.Properties')
        service_load_state = service_properties.Get('org.freedesktop.systemd1.Unit', 'LoadState')
        service_active_state = service_properties.Get('org.freedesktop.systemd1.Unit', 'ActiveState')
        if service_load_state == 'loaded' and service_active_state == 'active':
            service_running = True
    except DBusException:
        pass

    return service_running
项目:origin-ci-tool    作者:openshift    | 项目源码 | 文件源码
def is_service_running(service):
    """ Queries systemd through dbus to see if the service is running """
    service_running = False
    bus = SystemBus()
    systemd = bus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1')
    manager = Interface(systemd, dbus_interface='org.freedesktop.systemd1.Manager')
    try:
        service_unit = service if service.endswith('.service') else manager.GetUnit('{0}.service'.format(service))
        service_proxy = bus.get_object('org.freedesktop.systemd1', str(service_unit))
        service_properties = Interface(service_proxy, dbus_interface='org.freedesktop.DBus.Properties')
        service_load_state = service_properties.Get('org.freedesktop.systemd1.Unit', 'LoadState')
        service_active_state = service_properties.Get('org.freedesktop.systemd1.Unit', 'ActiveState')
        if service_load_state == 'loaded' and service_active_state == 'active':
            service_running = True
    except DBusException:
        pass

    return service_running
项目:origin-ci-tool    作者:openshift    | 项目源码 | 文件源码
def is_service_running(service):
    """ Queries systemd through dbus to see if the service is running """
    service_running = False
    bus = SystemBus()
    systemd = bus.get_object('org.freedesktop.systemd1', '/org/freedesktop/systemd1')
    manager = Interface(systemd, dbus_interface='org.freedesktop.systemd1.Manager')
    try:
        service_unit = service if service.endswith('.service') else manager.GetUnit('{0}.service'.format(service))
        service_proxy = bus.get_object('org.freedesktop.systemd1', str(service_unit))
        service_properties = Interface(service_proxy, dbus_interface='org.freedesktop.DBus.Properties')
        service_load_state = service_properties.Get('org.freedesktop.systemd1.Unit', 'LoadState')
        service_active_state = service_properties.Get('org.freedesktop.systemd1.Unit', 'ActiveState')
        if service_load_state == 'loaded' and service_active_state == 'active':
            service_running = True
    except DBusException:
        pass

    return service_running
项目:anglerfish    作者:juancarlospaco    | 项目源码 | 文件源码
def on_battery() -> bool:
    """Check if we are running on Battery power."""
    log.debug("Checking if running on Battery power.")
    if has_battery():
        bus, upower_path = dbus.SystemBus(), '/org/freedesktop/UPower'
        upower = bus.get_object('org.freedesktop.UPower', upower_path)
        result = __get_prop(upower, upower_path, 'OnBattery')
        if result is None:  # Cannot read property, something is wrong.
            print(f"Failed to read D-Bus property: {upower_path}.")
            result = False  # Assume we are connected to a power supply.
        return result
    return False

#
# def on_wifi():
#     """Check if we are running on wifi."""
#     try:
#         bus = dbus.SystemBus()
#         manager = bus.get_object('org.freedesktop.NetworkManager',
#                                  '/org/freedesktop/NetworkManager')
#          # FIXME this returns int, I dunno what they mean ?, investigate.
#         return __get_prop(manager, 'org.freedesktop.NetworkManager',
#                          'WirelessEnabled')
#     except Exception:
#         return False
项目:PiRadio    作者:elParaguayo    | 项目源码 | 文件源码
def __init__(self):
        super(ModeBluetooth, self).__init__()

        # Create a basic menu.
        self.menu = [("Show Device", self.show_device)]
        self.build_menu()

        # Disable the bluetooth adaptor
        self.start_bluetooth(False)

        # Initialise dbus for metadata retrieval
        self.bus = dbus.SystemBus()

        # Initialise some basic variables
        self.running = False
        self.player = None
项目:PyDbusNetworkManager    作者:stoic1979    | 项目源码 | 文件源码
def get_introspection(self):
        bus = dbus.SystemBus()
        obj = bus.get_object('org.freedesktop.NetworkManager', '/org/freedesktop/NetworkManager')

        iface = dbus.Interface(obj, dbus_interface='org.freedesktop.DBus.Introspectable')

        # getting introspection xml
        m = iface.get_dbus_method("Introspect", dbus_interface=None)
        return m()
项目:PyDbusNetworkManager    作者:stoic1979    | 项目源码 | 文件源码
def get_devices(self):
        bus = dbus.SystemBus()
        wifi = bus.get_object('org.freedesktop.NetworkManager', '/org/freedesktop/NetworkManager')

        iface = dbus.Interface(wifi, dbus_interface='org.freedesktop.NetworkManager')

        # getting all devices
        m = iface.get_dbus_method("GetDevices", dbus_interface=None)
        devs = []
        for dev in m():
            devs.append("%s" % dev)
        return devs
项目:PyDbusNetworkManager    作者:stoic1979    | 项目源码 | 文件源码
def get_active_connections(self):
        bus = dbus.SystemBus()
        wifi = bus.get_object('org.freedesktop.NetworkManager', '/org/freedesktop/NetworkManager')

        iface = dbus.Interface(wifi, dbus_interface='org.freedesktop.DBus.Properties')

        m = iface.get_dbus_method("Get", dbus_interface=None)
        return [ str(ac) for ac in m("org.freedesktop.NetworkManager", "ActiveConnections") ]
项目:PyDbusNetworkManager    作者:stoic1979    | 项目源码 | 文件源码
def get_wifi_access_points_by_dev(self, device_path):
        bus = dbus.SystemBus()
        obj = bus.get_object('org.freedesktop.NetworkManager', device_path)

        iface = dbus.Interface(obj, dbus_interface='org.freedesktop.NetworkManager.Device.Wireless')

        # getting all wireless access points
        m = iface.get_dbus_method("GetAccessPoints", dbus_interface=None)

        return [str(ap) for ap in m()]
项目:PyDbusNetworkManager    作者:stoic1979    | 项目源码 | 文件源码
def get_access_point_all_info(self, ap_path):

        bus = dbus.SystemBus()
        obj = bus.get_object('org.freedesktop.NetworkManager', ap_path)

        iface = dbus.Interface(obj, dbus_interface='org.freedesktop.DBus.Properties')

        m = iface.get_dbus_method("GetAll", dbus_interface=None)

        # getting all ppoperties like Ssid, Strength, HwAddress etc.
        props = m("org.freedesktop.NetworkManager.AccessPoint")
        for k,v in props.iteritems():
            print k,v

        return props
项目:PyDbusNetworkManager    作者:stoic1979    | 项目源码 | 文件源码
def activate_connection(self, con_path, dev_path, obj_path):
        bus = dbus.SystemBus()
        wifi = bus.get_object('org.freedesktop.NetworkManager', '/org/freedesktop/NetworkManager')

        iface = dbus.Interface(wifi, dbus_interface='org.freedesktop.NetworkManager')

        # activating connection
        m = iface.get_dbus_method("ActivateConnection", dbus_interface=None)
        active_connection = m(con_path, dev_path, obj_path)

        # on success, active connection object/path is returned
        return active_connection
项目:co2monitor    作者:nobodyinperson    | 项目源码 | 文件源码
def __init__(self):
        # initially set the standard logger
        self.set_logger(logging.getLogger(__name__))
        # initially set an empty configuration
        self.set_config(configparser.ConfigParser())
        # set up the quit signals
        self.setup_signals(
            signals = [signal.SIGINT, signal.SIGTERM, signal.SIGHUP],
            handler = self.please_stop_now
        )

        # use glib as default mailoop for dbus
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        dbus.mainloop.glib.threads_init()
        GLib.threads_init()

        self.loop = GLib.MainLoop() # create mainloop

        self.systembus = dbus.SystemBus() # the system bus
        self.systembus.request_name(CO2MONITOR_BUSNAME) # request the bus name
        bus_name = dbus.service.BusName(CO2MONITOR_BUSNAME, self.systembus) # create bus name
        # register the object on the bus name
        dbus.service.Object.__init__(self, bus_name, CO2MONITOR_OBJECTPATH)
项目:co2monitor    作者:nobodyinperson    | 项目源码 | 文件源码
def __init__(self, devicefile):
        # by default, log!
        self.do_data_logging = True
        # initially set the standard logger
        self.set_logger(logging.getLogger(__name__))
        # initially set an empty configuration
        self.set_config(configparser.ConfigParser())

        # use glib as default mailoop for dbus
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        dbus.mainloop.glib.threads_init()
        GLib.threads_init()

        self.systembus = dbus.SystemBus() # the system bus
        self.systembus.request_name(CO2MONITOR_BUSNAME) # request the bus name
        bus_name = dbus.service.BusName(CO2MONITOR_BUSNAME, self.systembus) # create bus name

        self.devicefile = devicefile

        # set up the device
        self.device = device.co2device(self.devicefile)

        # register the object on the bus name
        objectpath = "/".join([CO2MONITOR_OBJECTPATH,
            utils.devicefile2objectname(self.devicefile)])
        dbus.service.Object.__init__(self, bus_name, objectpath)
        self.update_status(_("idle"))
        threading.Thread.__init__(self)


    # set the config
项目:sysdweb    作者:ogarcia    | 项目源码 | 文件源码
def __init__(self, user=False):
        self.bus = SessionBus() if user else SystemBus()
        systemd = self.bus.get_object(SYSTEMD_BUSNAME, SYSTEMD_PATH)
        self.manager = Interface(systemd, dbus_interface=SYSTEMD_MANAGER_INTERFACE)
项目:OpenXCAccessory    作者:openxc    | 项目源码 | 文件源码
def get_managed_objects():
    bus = dbus.SystemBus()
    manager = dbus.Interface(bus.get_object("org.bluez", "/"),
                "org.freedesktop.DBus.ObjectManager")
    return manager.GetManagedObjects()
项目:OpenXCAccessory    作者:openxc    | 项目源码 | 文件源码
def find_adapter_in_objects(objects, pattern=None):
    bus = dbus.SystemBus()
    for path, ifaces in objects.iteritems():
        adapter = ifaces.get(ADAPTER_INTERFACE)
        if adapter is None:
            continue
        if not pattern or pattern == adapter["Address"] or \
                            path.endswith(pattern):
            obj = bus.get_object(SERVICE_NAME, path)
            return dbus.Interface(obj, ADAPTER_INTERFACE)
    raise Exception("Bluetooth adapter not found")
项目:OpenXCAccessory    作者:openxc    | 项目源码 | 文件源码
def get_managed_objects():
    bus = dbus.SystemBus()
    manager = dbus.Interface(bus.get_object("org.bluez", "/"),
                "org.freedesktop.DBus.ObjectManager")
    return manager.GetManagedObjects()
项目:OpenXCAccessory    作者:openxc    | 项目源码 | 文件源码
def find_adapter_in_objects(objects, pattern=None):
    bus = dbus.SystemBus()
    for path, ifaces in objects.iteritems():
        adapter = ifaces.get(ADAPTER_INTERFACE)
        if adapter is None:
            continue
        if not pattern or pattern == adapter["Address"] or \
                            path.endswith(pattern):
            obj = bus.get_object(SERVICE_NAME, path)
            return dbus.Interface(obj, ADAPTER_INTERFACE)
    raise Exception("Bluetooth adapter not found")
项目:nautilus_scripts    作者:SergKolo    | 项目源码 | 文件源码
def get_dbus( bus_type, obj, path, interface, method, arg):
    """ utility: executes dbus method on specific interface"""
    if bus_type == "session":
        bus = dbus.SessionBus()
    if bus_type == "system":
        bus = dbus.SystemBus()
    proxy = bus.get_object(obj, path)
    method = proxy.get_dbus_method(method, interface)
    if arg:
        return method(arg)
    else:
        return method()
项目:nautilus_scripts    作者:SergKolo    | 项目源码 | 文件源码
def get_dbus_property( bus_type, obj, path, iface, prop):
    """ utility:reads properties defined on specific dbus interface"""
    if bus_type == "session":
        bus = dbus.SessionBus()
    if bus_type == "system":
        bus = dbus.SystemBus()
    proxy = bus.get_object(obj, path)
    aux = 'org.freedesktop.DBus.Properties'
    props_iface = dbus.Interface(proxy, aux)
    try:
        props = props_iface.Get(iface, prop)
        return props
    except:
        return None
项目:python-gatt-server    作者:Jumperr-labs    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-a', '--adapter-name', type=str, help='Adapter name', default='')
    args = parser.parse_args()
    adapter_name = args.adapter_name

    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
    bus = dbus.SystemBus()
    mainloop = GObject.MainLoop()

    advertising.advertising_main(mainloop, bus, adapter_name)
    gatt_server.gatt_server_main(mainloop, bus, adapter_name)
    mainloop.run()
项目:cpu-g    作者:atareao    | 项目源码 | 文件源码
def __init__(self):
        bus = dbus.SystemBus()
        bat0_object = bus.get_object(
            'org.freedesktop.UPower',
            '/org/freedesktop/UPower/devices/battery_BAT0')
        self.__statistics = bat0_object.get_dbus_method(
            'GetStatistics',
            'org.freedesktop.UPower.Device')
        self.__history = bat0_object.get_dbus_method(
            'GetHistory',
            'org.freedesktop.UPower.Device')
        self.bat0 = dbus.Interface(bat0_object,
                                   'org.freedesktop.DBus.Properties')
项目:python-eduvpn-client    作者:eduvpn    | 项目源码 | 文件源码
def setup_signals(self):
        if not self.setup:
            bus = dbus.SystemBus()
            for interface in self.interfaces:
                bus.add_signal_receiver(self.handle_signal, dbus_interface=interface, interface_keyword='interface',
                                        member_keyword='signal', path_keyword='path')
            self.setup = True
        self.listen_for_restarts()