我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用gobject.MainLoop()。
def __init__(self, device_id=None): """Default initialiser. 1. Initialises the program loop using ``GObject``. 2. Registers the Application on the D-Bus. 3. Initialises the list of services offered by the application. """ # Initialise the loop that the application runs in GObject.threads_init() dbus.mainloop.glib.threads_init() dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) self.mainloop = GObject.MainLoop() # Initialise the D-Bus path and register it self.bus = dbus.SystemBus() self.path = '/ukBaz/bluezero/application{}'.format(id(self)) self.bus_name = dbus.service.BusName('ukBaz.bluezero', self.bus) dbus.service.Object.__init__(self, self.bus_name, self.path) # Initialise services within the application self.services = [] self.dongle = adapter.Adapter(device_id)
def __init__(self, adapter_addr, device_addr): """Default initialiser. Creates object for the specified remote Bluetooth device. This is on the specified adapter specified. :param adapter_addr: Address of the local Bluetooth adapter. :param device_addr: Address of the remote Bluetooth device. """ self.bus = dbus.SystemBus() dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) self.mainloop = GObject.MainLoop() device_path = dbus_tools.get_dbus_path(adapter_addr, device_addr) self.remote_device_path = device_path self.remote_device_obj = self.bus.get_object( constants.BLUEZ_SERVICE_NAME, self.remote_device_path) self.remote_device_methods = dbus.Interface( self.remote_device_obj, constants.DEVICE_INTERFACE) self.remote_device_props = dbus.Interface(self.remote_device_obj, dbus.PROPERTIES_IFACE)
def main(): global bluetooth global ibus bluetooth = bt_.BluetoothService(onBluetoothConnected, onPlayerChanged) ibus = ibus_.IBUSService(onIBUSready, onIBUSpacket) ibus.cmd = ibus_.IBUSCommands(ibus) ibus.main_thread = threading.Thread(target=ibus.start) ibus.main_thread.daemon = True ibus.main_thread.start() try: mainloop = GObject.MainLoop() mainloop.run() except KeyboardInterrupt: pass except: print("Unable to run the gobject main loop") print("") shutdown() sys.exit(0)
def main(): global ibus ibus = ibus_.IBUSService(onIBUSready, onIBUSpacket) ibus.cmd = ibus_.IBUSCommands(ibus) ibus.main_thread = threading.Thread(target=ibus.start) ibus.main_thread.daemon = True ibus.main_thread.start() try: mainloop = GObject.MainLoop() mainloop.run() except KeyboardInterrupt: pass except: print("Unable to run the gobject main loop") print("") shutdown() sys.exit(0)
def run(self): """ Registers advertisement and services to D-Bus and starts the main loop. """ if self._main_loop: return self._main_loop = GObject.MainLoop() self._disconnect_all() self._register() logger.info("--- Mainloop started ---") try: self._main_loop.run() except KeyboardInterrupt: # ignore exception as it is a valid way to exit the program # and skip to finally clause pass except Exception as e: logger.error(e) finally: logger.info("--- Mainloop finished ---") self._unregister() self._main_loop.quit() self._main_loop = None
def __init__(self, useGtk=True): self.context = gobject.main_context_default() self.loop = gobject.MainLoop() posixbase.PosixReactorBase.__init__(self) # pre 2.3.91 the glib iteration and mainloop functions didn't release # global interpreter lock, thus breaking thread and signal support. if (hasattr(gobject, "pygtk_version") and gobject.pygtk_version >= (2, 3, 91) and not useGtk): self.__pending = self.context.pending self.__iteration = self.context.iteration self.__crash = self.loop.quit self.__run = self.loop.run else: import gtk self.__pending = gtk.events_pending self.__iteration = gtk.main_iteration self.__crash = _our_mainquit self.__run = gtk.main # The input_add function in pygtk1 checks for objects with a # 'fileno' method and, if present, uses the result of that method # as the input source. The pygtk2 input_add does not do this. The # function below replicates the pygtk1 functionality. # In addition, pygtk maps gtk.input_add to _gobject.io_add_watch, and # g_io_add_watch() takes different condition bitfields than # gtk_input_add(). We use g_io_add_watch() here in case pygtk fixes this # bug.
def main(): parser = argparse.ArgumentParser() parser.add_argument('-a', '--adapter-name', type=str, help='Adapter name', default='') args = parser.parse_args() adapter_name = args.adapter_name dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() mainloop = GObject.MainLoop() advertising.advertising_main(mainloop, bus, adapter_name) gatt_server.gatt_server_main(mainloop, bus, adapter_name) mainloop.run()
def __init__(self): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) self.mainloop = gobject.MainLoop() bus = dbus.SystemBus() proxy = bus.get_object("org.bluez", "/") manager = dbus.Interface(proxy, "org.bluez.Manager") adapter_path = manager.DefaultAdapter() proxy = bus.get_object("org.bluez", adapter_path) self.adapter = dbus.Interface(proxy, "org.bluez.Adapter") self.oob_adapter = dbus.Interface(proxy, "org.bluez.OutOfBand")
def __init__(self): self.mainloop = gobject.MainLoop() bus = dbus.SystemBus() proxy = bus.get_object("org.bluez", "/") manager = dbus.Interface(proxy, "org.bluez.Manager") adapter_path = manager.DefaultAdapter() proxy = bus.get_object("org.bluez", adapter_path) self.adapter = dbus.Interface(proxy, "org.bluez.Adapter") self.oob_adapter = dbus.Interface(proxy, "org.bluez.OutOfBand")
def run(self): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus_name = dbus.service.BusName("com.prozacville.steam_monitor", dbus.SessionBus()) dbus.service.Object.__init__(self, bus_name, "/com/prozacville/steam_monitor") self._loop = gobject.MainLoop() print "Service running..." self._loop.run() print "Service stopped"
def __init__(self): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) self.bus = dbus.SystemBus() self._mainloop = GObject.MainLoop() self.wifi_manager = WiFiControl() self.callbacks = {} self.current_state = self.OFF_STATE self.current_ssid = None self.reconnect_worker = DaemonTreeObj(WORKER_NAME)
def __init__( self, client_class, timeout=180, capability="KeyboardDisplay", path="/org/bluez/my_bluetooth_agent"): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) self.client_class = client_class self.timeout = timeout self.capability = capability self.path = path self._bus = dbus.SystemBus() self._mainloop = GObject.MainLoop() _bluetooth.make_discoverable(False)
def __init__(self, tcp_port_in=8043, tcp_port_out=None, channel=1): self._spp = SerialPort(channel) dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) dbus.service.Object.__init__( self, dbus.SystemBus(), self._spp.profile_path) self.tcp_port_in = tcp_port_in self.tcp_port_out = tcp_port_out self._mainloop = GObject.MainLoop()
def main(): global mainloop global display dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() # Get ServiceManager and AdvertisingManager service_manager = get_service_manager(bus) ad_manager = get_ad_manager(bus) # Create gatt services display = setup_display() app = LedApplication(bus, display) # Create advertisement test_advertisement = LedAdvertisement(bus, 0) mainloop = GObject.MainLoop() # Register gatt services service_manager.RegisterApplication(app.get_path(), {}, reply_handler=register_app_cb, error_handler=register_app_error_cb) # Register advertisement ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {}, reply_handler=register_ad_cb, error_handler=register_ad_error_cb) try: mainloop.run() except KeyboardInterrupt: display.clear() display.write_display()
def bluetoothConnection(): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() obj = bus.get_object(BUS_NAME, "/org/bluez"); profile_manager = dbus.Interface(obj, "org.bluez.ProfileManager1") profile_path = "/foo/bar/profile" auto_connect = {"AutoConnect": False} profile_uuid = "1101" profile = Profile(bus, profile_path) profile_manager.RegisterProfile(profile_path, profile_uuid, auto_connect) mainloop = GObject.MainLoop() mainloop.run()
def bluetoothConnection(): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() obj = bus.get_object(BUS_NAME, "/org/bluez") profile_manager = dbus.Interface(obj, "org.bluez.ProfileManager1") profile_path = "/foo/bar/profile" auto_connect = {"AutoConnect": False} profile_uuid = "1101" profile = Profile(bus, profile_path) profile_manager.RegisterProfile(profile_path, profile_uuid, auto_connect) mainloop = GObject.MainLoop() try: mainloop.run() except (KeyboardInterrupt, SystemExit): closeThreads()
def _listen_for_wifi_state_changes(self): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) def print_status(status, nm_state): if status in (WifiConnectionState.CONNECTING, WifiConnectionState.CONNECTED): logger.info("Wifi status changed: %s (%d) to %s" % (status, nm_state, self._current_ssid)) else: logger.info("Wifi status changed: %s (%d)" % (status, nm_state)) def on_state_changed(nm_instance, nm_state, **kwargs): if nm_state >= NetworkManager.NM_STATE_CONNECTED_GLOBAL: new_status = WifiConnectionState.CONNECTED elif nm_state > NetworkManager.NM_STATE_DISCONNECTING: new_status = WifiConnectionState.CONNECTING else: new_status = WifiConnectionState.DISCONNECTED self._update_current_ssid() if new_status == self._wifi_status: return print_status(new_status, nm_state) self._wifi_status = new_status self._on_wifi_status_changed() # check initial status: initial_state = NetworkManager.NetworkManager.State on_state_changed(None, initial_state) # listen for changes: NetworkManager.NetworkManager.OnStateChanged(on_state_changed) logger.debug("Start listening to network status changes") # Attention: a GObject.MainLoop() is required for this to work # in this case it is started by the BLE Peripheral object
def run(self): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) # check initial status: state = NetworkManager.NetworkManager.State self._on_state_changed(None, state) # listen for changes: NetworkManager.NetworkManager.OnStateChanged(self._on_state_changed) logger.debug("Start listening to network status changes") loop = GObject.MainLoop() loop.run()
def run(cls, store, master_options): gobject.threads_init() dbus.glib.init_threads() DBusGMainLoop(set_as_default=True) bus = dbus.SessionBus() name = dbus.service.BusName("com.airbus.rebus.bus", bus) svc = cls(bus, "/bus", store) svc.mainloop = gobject.MainLoop() log.info("Entering main loop.") try: svc.mainloop.run() except (KeyboardInterrupt, SystemExit): if len(svc.clients) > 0: log.info("Trying to stop all agents properly. Press Ctrl-C " "again to stop.") # stop scheduler svc.sched.shutdown() # ask slave agents to shutdown nicely & save internal state log.info("Expecting %u more agents to exit (ex. %s)", len(svc.clients), svc.clients.keys()[0]) svc.bus_exit(store.STORES_INTSTATE) store.store_state() try: svc.mainloop.run() except (KeyboardInterrupt, SystemExit): if len(svc.clients) > 0: log.info( "Not all agents have stopped, exiting nonetheless") log.info("Stopping storage...") store.store_state()
def run_agents(self): self.agent.run_and_catch_exc() if self.agent.__class__.run != Agent.run: # the run() method has been overridden - agent will run on his own # then quit self.iface.unregister(self.agent_id) return log.info("Entering agent loop") self.loop = gobject.MainLoop() try: self.loop.run() except (KeyboardInterrupt, SystemExit): for args in self.agent.held_locks: self.agent.unlock(*args) self.loop.quit() # Clean up signals - useful for tests, where one process runs several # agents successively self.bus.remove_signal_receiver(self.broadcast_wrapper, dbus_interface="com.airbus.rebus.bus", signal_name="new_descriptor") self.bus.remove_signal_receiver(self.targeted_wrapper, dbus_interface="com.airbus.rebus.bus", signal_name="targeted_descriptor") self.bus.remove_signal_receiver(self.bus_exit_handler, dbus_interface="com.airbus.rebus.bus", signal_name="bus_exit") self.iface.unregister(self.agent_id) self.agent.save_internal_state() # DBus specific functions
def main_loop(self): return gobject.MainLoop()
def run(self): """ Starts phony service which manages device pairing and setting up of hands-free profile services. This function never returns. """ bus = phony.base.ipc.BusProvider() # Find the first audio card that provides # audio input and output mixers. audio_card_index = -1 with phony.bluetooth.adapters.Bluez5(bus) as adapter, \ phony.bluetooth.profiles.handsfree.Ofono(bus) as hfp, \ phony.audio.alsa.Alsa(card_index=audio_card_index) as audio, \ phony.headset.HandsFreeHeadset(bus, adapter, hfp, audio) as hs: # Register to receive some bluetooth events hs.on_device_connected(self.device_connected) hs.on_incoming_call(self.incoming_call) hs.on_call_began(self.call_began) hs.on_call_ended(self.call_ended) hs.start('MyBluetoothHeadset', pincode='1234') hs.enable_pairability(timeout=30) self._hs = hs # Wait forever gobject.MainLoop().run() # # Call these from your event handlers #
def __init__(self, service): super(MainLoop, self).__init__() self.__service = service
def run(self): gobject.io_add_watch(0, gobject.IO_IN | gobject.IO_HUP, self.__stdin_cb) super(MainLoop, self).run()
def __init__(self): self.main_loop = gobject.MainLoop() # Create a window with a horizontal scale. self.wnd = gtk.Window() self.wnd.set_default_size(640, 480) self.wnd.set_title('Have fun with the transparency slider') hscale = gtk.HScale() hscale.set_digits(0) hscale.set_increments(1, 10) hscale.set_range(0, 100) hscale.set_value(100) hscale.connect('value_changed', self.set_window_alpha) self.wnd.add(hscale) # Note: gtk window must be realized before installing extensions. self.wnd.realize() self.wnd.show_all() self.win32ext = GTKWin32Ext(self.wnd) self.win32ext.add_notify_icon() # GTK menus from the notify icon! menu = gtk.Menu() menu_item = gtk.MenuItem('Baloons!') menu_item.connect_object('activate', self.menu_cb, self.wnd) menu.append(menu_item) menu_item = gtk.MenuItem('Fadeout Window') menu_item.connect('activate', self.fadeoutwindow) menu.append(menu_item) menu_item = gtk.MenuItem('Window Disappeared?') menu_item.connect('activate', self.fadeinwindow) menu.append(menu_item) menu.show_all() self.win32ext.notify_icon.menu = menu # Set up the callback messages self.win32ext.message_map({ WM_TRAYMESSAGE: self.on_notifyicon_activity })