我们从Python开源项目中,提取了以下5个代码示例,用于说明如何使用gi.repository.Gio.bus_get_sync()。
def __init__(self): self.logger = logging.getLogger('rauc_hawkbit') self.dbus_events = asyncio.Queue() loop = asyncio.get_event_loop() # handle dbus events in async way self.dbus_event_task = loop.create_task(self.handle_dbus_event()) # holds active subscriptions self.signal_subscriptions = [] # ({interface}, {signal}): {callback} self.signal_callbacks = {} # ({interface}, {property}): {callback} self.property_callbacks = {} self.system_bus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None) # always subscribe to property changes by default self.new_signal_subscription('org.freedesktop.DBus.Properties', 'PropertiesChanged', self.property_changed_callback)
def __init__(self, name, dbus_name, skeleton): self._cb_registery = dict() self._obj_registery = dict() self._dbus_name = build_dbus_name(dbus_name, name) self._interface_skeleton_object = skeleton self.con = Gio.bus_get_sync(BUS_TYPE, None) dbus_path = BASE_DBUS_PATH # build_dbus_path(name) logger.debug('Producer ObjectManagerServer dbus: %s, %s', dbus_path, self._dbus_name) self._object_manager = Gio.DBusObjectManagerServer(object_path=dbus_path) self._object_manager.set_connection(self.con) Gio.bus_own_name_on_connection( self.con, self._dbus_name, Gio.BusNameOwnerFlags.NONE, None, None)
def __init__(self, app, ui): self.__app = app self.__ui = ui self.__rating = None self.__cozy_id = 0 self.__metadata = {"mpris:trackid": GLib.Variant( "o", "/org/mpris/MediaPlayer2/TrackList/NoTrack")} self.__track_id = self.__get_media_id(0) self.__bus = Gio.bus_get_sync(Gio.BusType.SESSION, None) Gio.bus_own_name_on_connection(self.__bus, self.__MPRIS_COZY, Gio.BusNameOwnerFlags.NONE, None, None) Server.__init__(self, self.__bus, self.__MPRIS_PATH) bus = get_gst_bus() bus.connect("message", self.__on_gst_message) #Lp().player.connect("current-changed", self.__on_current_changed) #Lp().player.connect("seeked", self.__on_seeked) #Lp().player.connect("status-changed", self.__on_status_changed) #Lp().player.connect("volume-changed", self.__on_volume_changed)
def __init__(self): self.con = Gio.bus_get_sync(Gio.BusType.SESSION, None) Gio.bus_own_name_on_connection( self.con, 'com.endlessm.EknSubscriptionsDownloader', Gio.BusNameOwnerFlags.NONE, None, None) self.con.register_object( object_path='/com/endlessm/EknSubscriptionsDownloader', interface_info=IFACE_INFO, method_call_closure=self._on_method_call) self._consumer = Consumer(name=SUBSCRIPTIONS_INSTALLED) self._consumer.connect('data', self._on_data) # FIXME: Port to GNotification instead self._notification = Notify.Notification.new("", "")
def __init__(self): self.bus = Gio.bus_get_sync(Gio.BusType.SESSION, None) self.proxy = Gio.DBusProxy.new_sync( self.bus, Gio.DBusProxyFlags.NONE, None, 'org.gnome.Mutter.DisplayConfig', '/org/gnome/Mutter/DisplayConfig', 'org.gnome.Mutter.DisplayConfig', None) self.get_resources()