我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用gi.repository.GLib.MainLoop()。
def setup_signals(self, signals, handler): """ This is a workaround to signal.signal(signal, handler) which does not work with a GLib.MainLoop() for some reason. Thanks to: http://stackoverflow.com/a/26457317/5433146 args: signals (list of signal.SIG... signals): the signals to connect to handler (function): function to be executed on these signals """ def install_glib_handler(sig): # add a unix signal handler GLib.unix_signal_add( GLib.PRIORITY_HIGH, sig, # for the given signal handler, # on this signal, run this function sig # with this argument ) for sig in signals: # loop over all signals GLib.idle_add( # 'execute' install_glib_handler, sig, # add a handler for this signal priority = GLib.PRIORITY_HIGH ) # set the config
def setup_signals(self, signals, handler): """ This is a workaround to signal.signal(signal, handler) which does not work with a GLib.MainLoop() for some reason. Thanks to: http://stackoverflow.com/a/26457317/5433146 args: signals (list of signal.SIG... signals): the signals to connect to handler (function): function to be executed on these signals """ def install_glib_handler(sig): # add a unix signal handler GLib.unix_signal_add( GLib.PRIORITY_HIGH, sig, # for the given signal handler, # on this signal, run this function sig # with this argument ) for sig in signals: # loop over all signals GLib.idle_add( # 'execute' install_glib_handler, sig, # add a handler for this signal priority = GLib.PRIORITY_HIGH ) # build the gui
def __init__(self): """ class constructor """ # initially set an empty configuration self.set_config(configparser.ConfigParser()) # set up the quit signals self.setup_signals( signals = [signal.SIGINT, signal.SIGTERM, signal.SIGHUP], handler = self.quit ) # can't use Gtk.main() because of a bug that prevents proper SIGINT # handling. use Glib.MainLoop() directly instead. self.mainloop = GLib.MainLoop() # main loop ################## ### Properties ### ##################
def setup_signals(self, signals, handler): """ This is a workaround to signal.signal(signal, handler) which does not work with a GLib.MainLoop() for some reason. Thanks to: http://stackoverflow.com/a/26457317/5433146 args: signals (list of signal.SIG... signals): the signals to connect to handler (function): function to be executed on these signals """ def install_glib_handler(sig): # add a unix signal handler GLib.unix_signal_add( GLib.PRIORITY_HIGH, sig, # for the given signal handler, # on this signal, run this function sig # with this argument ) for sig in signals: # loop over all signals GLib.idle_add( # 'execute' install_glib_handler, sig, # add a handler for this signal priority = GLib.PRIORITY_HIGH ) # get an object from the builder
def initialize(self, environment): self.env = environment global _gstreamerAvailable self._initialized = _gstreamerAvailable if not self._initialized: global _availableError self.environment['runtime']['debug'].writeDebugOut('Gstreamer not available ' + _availableError,debug.debugLevel.ERROR) return self._player = Gst.ElementFactory.make('playbin', 'player') bus = self._player.get_bus() bus.add_signal_watch() bus.connect("message", self._onPlayerMessage) self._pipeline = Gst.Pipeline(name='fenrir-pipeline') bus = self._pipeline.get_bus() bus.add_signal_watch() bus.connect("message", self._onPipelineMessage) self._source = Gst.ElementFactory.make('audiotestsrc', 'src') self._sink = Gst.ElementFactory.make('autoaudiosink', 'output') self._pipeline.add(self._source) self._pipeline.add(self._sink) self._source.link(self._sink) self.mainloop = GLib.MainLoop() self.thread = threading.Thread(target=self.mainloop.run) self.thread.start()
def __init__(self, adapter_addr, device_addr): """Default initialiser. Creates object for the specified remote Bluetooth device. This is on the specified adapter specified. :param adapter_addr: Address of the local Bluetooth adapter. :param device_addr: Address of the remote Bluetooth device. """ self.bus = dbus.SystemBus() dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) self.mainloop = GObject.MainLoop() device_path = dbus_tools.get_dbus_path(adapter_addr, device_addr) self.remote_device_path = device_path self.remote_device_obj = self.bus.get_object( constants.BLUEZ_SERVICE_NAME, self.remote_device_path) self.remote_device_methods = dbus.Interface( self.remote_device_obj, constants.DEVICE_INTERFACE) self.remote_device_props = dbus.Interface(self.remote_device_obj, dbus.PROPERTIES_IFACE)
def run_producers_test(producers, names, args): loop = GLib.MainLoop() [producer.start() for producer in producers] producer.start() if args.output: from ..file import FileConsumer consumers = [FileConsumer(n, filename="%s-%s"%(args.output, o)) for o,n in enumerate(names)] def check_complete(*a): if all([c._emitted_complete for c in consumers]): print("ALL RETRIEVED") loop.quit() [consumer.connect('complete', check_complete) for consumer in consumers] [consumer.start() for consumer in consumers] loop.run()
def main(): parser = argparse.ArgumentParser() parser.add_argument("filename") parser.add_argument("-l", "--limit", type=int, default=0) args = utils.parse_args(parser=parser) consumer = FileConsumer(args.name, args.filename) def check(consumer, pct): if args.limit and consumer._callbackCount > args.limit: complete() consumer.connect('progress', check) def complete(*a): loop.quit() consumer.connect('complete', complete) consumer.start() loop = GLib.MainLoop() loop.run()
def main(): utils.parse_args(include_name=False) loop = GLib.MainLoop() GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGINT, signal_cb) GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGTERM, signal_cb) monitor = Gio.VolumeMonitor.get() store = simple_store.Producer(prefix=SUBSCRIPTIONS_SOMA, base=defaults.ENDLESS_NDN_CACHE_PATH, cost=defaults.RouteCost.USB) store.start() for mount in monitor.get_mounts(): mount_added_cb(monitor, mount, store) monitor.connect("mount-added", mount_added_cb, store) monitor.connect("mount-removed", mount_removed_cb, store) maybe_time_out() loop.run()
def _update(self, keys, raw): """ >>> w = _refl("widget") >>> w.selectable_rval = True >>> w.mouse_event_rval = True >>> scr = _refl("screen") >>> scr.get_cols_rows_rval = (15, 5) >>> evl = _refl("event_loop") >>> ml = MainLoop(w, [], scr, event_loop=evl) >>> ml._input_timeout = "old timeout" >>> ml._update(['y'], [121]) # doctest:+ELLIPSIS screen.get_cols_rows() widget.selectable() widget.keypress((15, 5), 'y') >>> ml._update([("mouse press", 1, 5, 4)], []) widget.mouse_event((15, 5), 'mouse press', 1, 5, 4, focus=True) >>> ml._update([], []) """ keys = self.input_filter(keys, raw) if keys: self.process_input(keys) if 'window resize' in keys: self.screen_size = None
def _test_run_screen_event_loop(self): """ >>> w = _refl("widget") >>> scr = _refl("screen") >>> scr.get_cols_rows_rval = (10, 5) >>> scr.get_input_rval = [], [] >>> ml = MainLoop(w, screen=scr) >>> def stop_now(loop, data): ... raise ExitMainLoop() >>> handle = ml.set_alarm_in(0, stop_now) >>> try: ... ml._run_screen_event_loop() ... except ExitMainLoop: ... pass screen.get_cols_rows() widget.render((10, 5), focus=True) screen.draw_screen((10, 5), None) screen.set_input_timeouts(0) screen.get_input(True) """
def run(self): self.mainloop = GLib.MainLoop() self.show() self.mainloop.run()
def __init__(self): self.exit_code = -1 self.mainloop = GLib.MainLoop() self.config = None # hash_of_colors is used to determine if css needs to be reapplied # after configuration change self._hash_of_colors = -1 self._window = None self._registered = False self._last_profile_change = 0 self._recent_profiles_undo = None
def __init__(self): # initially set the standard logger self.set_logger(logging.getLogger(__name__)) # initially set an empty configuration self.set_config(configparser.ConfigParser()) # set up the quit signals self.setup_signals( signals = [signal.SIGINT, signal.SIGTERM, signal.SIGHUP], handler = self.please_stop_now ) # use glib as default mailoop for dbus dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) dbus.mainloop.glib.threads_init() GLib.threads_init() self.loop = GLib.MainLoop() # create mainloop self.systembus = dbus.SystemBus() # the system bus self.systembus.request_name(CO2MONITOR_BUSNAME) # request the bus name bus_name = dbus.service.BusName(CO2MONITOR_BUSNAME, self.systembus) # create bus name # register the object on the bus name dbus.service.Object.__init__(self, bus_name, CO2MONITOR_OBJECTPATH)
def run(self): # can't use Gtk.main() because of a bug that prevents proper SIGINT # handling. use Glib.MainLoop() directly instead. self.mainloop = GLib.MainLoop() # main loop # signal.signal(signal.SIGINT, signal.SIG_DFL) self.logger.debug(_("Starting GLib main loop...")) self.mainloop.run() self.logger.debug(_("GLib main loop ended.")) # quit the gui
def run(self): logging.info('D-Bus process started') GLib.threads_init() # allow threads in GLib GLib.idle_add(self._idleQueueSync) DBusGMainLoop(set_as_default=True) dbusService = SessionDBus(self.taskQueue, self.resultQueue) try: GLib.MainLoop().run() except KeyboardInterrupt: logging.debug("\nThe MainLoop will close...") GLib.MainLoop().quit() return
def start(self): shared.main_loop = GLib.MainLoop() shared.main_loop.run()
def run(self): """ Run the Sniffer main loop. """ if self.adapter is not None: self._log.debug("Clearing the BlueZ device registry.") for path, _ in get_known_devices(): self.adapter.RemoveDevice(path) self._log.debug("Registering the signals InterfacesAdded and PropertiesChanged.") bus = pydbus.SystemBus() bus.subscribe( sender=SERVICE_NAME, iface=OBJECT_MANAGER_INTERFACE, signal="InterfacesAdded", signal_fired=self._cb_interfaces_added ) bus.subscribe( sender=SERVICE_NAME, iface=OBJECT_MANAGER_INTERFACE, signal="InterfacesRemoved", signal_fired=self._cb_interfaces_removed ) bus.subscribe( sender=SERVICE_NAME, iface=PROPERTIES_INTERFACE, signal="PropertiesChanged", arg0=DEVICE_INTERFACE, signal_fired=self._cb_properties_changed ) self._log.debug("Running the main loop.") if self.output_path is not None and self.backup_interval > 0: GLib.timeout_add_seconds(self.backup_interval, self._cb_backup_registry) if self.attempt_connection: GLib.timeout_add_seconds(self.queueing_interval, self._cb_connect_check) loop = GLib.MainLoop() loop.run() else: raise ValueError("Sniffer.run can only be called in a context " "(e.g. `with Sniffer(...) as s: s.run()`)")
def run_dbus_service(self, timeout=None, send_usr1=False): '''Run D-BUS server. If no timeout is given, the server will run forever, otherwise it will return after the specified number of seconds. If send_usr1 is True, this will send a SIGUSR1 to the parent process once the server is ready to take requests. ''' dbus.service.Object.__init__(self, self.bus, '/RecoveryMedia') self.main_loop = GLib.MainLoop() self._timeout = False if timeout: def _quit(): """This function is ran at the end of timeout""" self.main_loop.quit() return True GLib.timeout_add(timeout * 1000, _quit) # send parent process a signal that we are ready now if send_usr1: os.kill(os.getppid(), signal.SIGUSR1) # run until we time out while not self._timeout: if timeout: self._timeout = True self.main_loop.run()
def dbus_sync_call_signal_wrapper(dbus_iface, func, handler_map, *args, **kwargs): '''Run a D-BUS method call while receiving signals. This function is an Ugly Hack™, since a normal synchronous dbus_iface.fn() call does not cause signals to be received until the method returns. Thus it calls func asynchronously and sets up a temporary main loop to receive signals and call their handlers; these are assigned in handler_map (signal name ? signal handler). ''' if not hasattr(dbus_iface, 'connect_to_signal'): # not a D-BUS object return getattr(dbus_iface, func)(*args, **kwargs) def _h_reply(*args, **kwargs): """protected method to send a reply""" global _h_reply_result _h_reply_result = args loop.quit() def _h_error(exception=None): """protected method to send an error""" global _h_exception_exc _h_exception_exc = exception loop.quit() loop = GLib.MainLoop() global _h_reply_result, _h_exception_exc _h_reply_result = None _h_exception_exc = None kwargs['reply_handler'] = _h_reply kwargs['error_handler'] = _h_error kwargs['timeout'] = 86400 for signame, sighandler in handler_map.items(): dbus_iface.connect_to_signal(signame, sighandler) dbus_iface.get_dbus_method(func)(*args, **kwargs) loop.run() if _h_exception_exc: raise _h_exception_exc return _h_reply_result
def __init__(self): self.mainloop = GLib.MainLoop()
def run_producer_test(producer, name, args): loop = GLib.MainLoop() producer.start() if args.output: from ..file import FileConsumer consumer = FileConsumer(name, filename=args.output) consumer.connect('complete', lambda *a: loop.quit()) consumer.start() loop.run()
def main(): utils.parse_args(include_name=False) Notify.init("Content Updates") service = DBusService() GLib.MainLoop().run()
def main(): parser = argparse.ArgumentParser() parser.add_argument("filename") args = utils.parse_args(parser=parser) f = open(args.filename, 'rb') producer = FileProducer(args.name, f) producer.start() loop = GLib.MainLoop() loop.run()
def main(): from gi.repository import GLib utils.parse_args(include_name=False) monitor = AvahiMonitor() loop = GLib.MainLoop() loop.run()
def main(): import argparse parser = argparse.ArgumentParser() parser.add_argument("-t", "--store-dir", required=True) args = utils.parse_args(parser=parser, include_name=False) subscription_producer = subscription.Producer(args.store_dir) subscription_producer.start() store = simple_store.Producer( base=args.store_dir, prefix=names.SUBSCRIPTIONS_SOMA) store.start() GLib.MainLoop().run()
def main(): utils.parse_args(include_name=False) fetcher = Fetcher() loop = GLib.MainLoop() loop.run()
def __init__ (self): # self.event = event self.ev = Event( ) self.loop = GLib.MainLoop( ) self.expired = False
def _test_run(self): """ >>> w = _refl("widget") # _refl prints out function calls >>> w.render_rval = "fake canvas" # *_rval is used for return values >>> scr = _refl("screen") >>> scr.get_input_descriptors_rval = [42] >>> scr.get_cols_rows_rval = (20, 10) >>> scr.started = True >>> scr._urwid_signals = {} >>> evl = _refl("event_loop") >>> evl.enter_idle_rval = 1 >>> evl.watch_file_rval = 2 >>> ml = MainLoop(w, [], scr, event_loop=evl) >>> ml.run() # doctest:+ELLIPSIS screen.start() screen.set_mouse_tracking() screen.unhook_event_loop(...) screen.hook_event_loop(...) event_loop.enter_idle(<bound method MainLoop.entering_idle...>) event_loop.run() event_loop.remove_enter_idle(1) screen.unhook_event_loop(...) screen.stop() >>> ml.draw_screen() # doctest:+ELLIPSIS screen.get_cols_rows() widget.render((20, 10), focus=True) screen.draw_screen((20, 10), 'fake canvas') """
def _test_process_input(self): """ >>> w = _refl("widget") >>> w.selectable_rval = True >>> scr = _refl("screen") >>> scr.get_cols_rows_rval = (10, 5) >>> ml = MainLoop(w, [], scr) >>> ml.process_input(['enter', ('mouse drag', 1, 14, 20)]) screen.get_cols_rows() widget.selectable() widget.keypress((10, 5), 'enter') widget.mouse_event((10, 5), 'mouse drag', 1, 14, 20, focus=True) True """
def __init__(self): from gi.repository import GLib self.GLib = GLib self._alarms = [] self._watch_files = {} self._idle_handle = 0 self._glib_idle_enabled = False # have we called glib.idle_add? self._idle_callbacks = {} self._loop = GLib.MainLoop() self._exc_info = None self._enable_glib_idle()
def _get_loop(seat): "Return a tuple of (main loop, quit loop fn)" loop = GLib.MainLoop() def loopquit(): seat.ungrab() loop.quit() return loop, loopquit
def main(): '''Dispatches control to the appropriate parts of the app''' args = parse_args() config = get_config() if not dependencies_are_satisfied(): return 'This program depends on xprintidle and xdotool being installed.' config.acpi_available = battery.acpi_available() if args.delete: config = get_config() for file_ in (config.config_file, config.desktop_filename): try: os.unlink(file_) except FileNotFoundError: print('{}: File not found'.format(file_)) return 0 elif args.mode == 'indicator': if args.battery is not None: config.battery = args.battery if args.percent is not None: config.batt_percent = args.percent inhibitor = sleepinhibit.gui.main.SleepInhibitGUI() if config.start_inhibited or args.inhibit: inhibitor.on_toggle() try: GLib.MainLoop().run() except KeyboardInterrupt: inhibitor.on_quit(signal='SIGINT') return 0 elif args.mode == 'inhibit-process': from sleepinhibit.inhibitor import run kw = {} if args.battery is True: kw['battery'] = True if args.percent: kw['percent'] = args.percent try: run(**kw) except KeyboardInterrupt: return 0 else: return 'ERROR: Invalid value for --mode!'
def pomodoro_daemon(): pomodoro = get_pomodoro_proxy() pomodoro.StateChanged.connect(handle_state) GLib.MainLoop().run()
def run(self): loop = GLib.MainLoop() loop.run()
def main(): from gi.repository import GLib from . import base parser = argparse.ArgumentParser(description='Command Interest Tests') parser.add_argument("--faceid", "-i", type=int) parser.add_argument("--uri", "-u") parser.add_argument("--local-control-feature", "-l") parser.add_argument("--origin", "-o", type=int) parser.add_argument("--cost", "-c", type=int) parser.add_argument("--forwarding-flags", "-F", type=int) parser.add_argument("--strategy", "-s") parser.add_argument("--expiration-period", "-e", type=int) parser.add_argument("command") args = utils.parse_args(parser=parser) controlParameters = ControlParameters() name = None if args.name: name = Name(args.name) if args.faceid: controlParameters.setFaceId(args.faceid) if args.uri: controlParameters.setUri(args.uri) if args.local_control_feature: controlParameters.setLocalControlFeature(args.local_control_feature) if args.origin: controlParameters.setOrigin(args.origin) if args.cost: controlParameters.setCost(args.cost) if args.forwarding_flags: controlParameters.setForwardingFlags(args.forwarding_flags) if args.strategy: controlParameters.setStrategy(Name(args.strategy)) if args.expiration_period: controlParameters.setExpirationPeriod(args.expiration_period) loop = GLib.MainLoop() def print_and_quit(*args, **kwargs): logger.info(*args, **kwargs) loop.quit() logger.info('running command: %s on %s', args.command, name) ndn = base.Base(name) ndn.expressCommandInterest( args.command, name, controlParameters=controlParameters, interestLifeTime=10000, onFailed=lambda * a: print_and_quit('FAILED: %s', a), onSuccess=lambda *a: print_and_quit('SUCCESS: %s', a)) loop.run()