def _async_call(f, args, kwargs, on_done): def run(data): f, args, kwargs, on_done = data error = None result = None try: result = f(*args, **kwargs) except Exception as e: e.traceback = traceback.format_exc() error = 'Unhandled exception in asyn call:\n{}'.format(e.traceback) GLib.idle_add(lambda: on_done(result, error)) data = f, args, kwargs, on_done thread = threading.Thread(target=run, args=(data,)) thread.daemon = True thread.start()
def InitSignal(gui): def signal_action(signal): if signal.value is signal.SIGINT.value: print("\r", end="") logging.debug("Caught signal SIGINT(2)") gui.stop() def handler(*args): signal_action(args[0]) def idle_handler(*args): GLib.idle_add(signal_action, priority=GLib.PRIORITY_HIGH) def install_glib_handler(sig): GLib.unix_signal_add(GLib.PRIORITY_HIGH, sig, handler, sig) SIGS = [getattr(signal, s, None) for s in "SIGINT".split()] for sig in filter(None, SIGS): signal.signal(sig, idle_handler) GLib.idle_add(install_glib_handler, sig, priority=GLib.PRIORITY_HIGH)
def _load_profiles(self): """ Search for file containign list of profiles and reads it. This is done in thread, with crazy hope that it will NOT crash GTK in the process. """ p = os.path.join(os.path.expanduser(self.STEAMPATH), "userdata") i = 0 if os.path.exists(p): for user in os.listdir(p): sharedconfig = os.path.join(p, user, self.PROFILE_LIST) if os.path.isfile(sharedconfig): self._lock.acquire() log.debug("Loading sharedconfig from '%s'", sharedconfig) try: i = self._parse_profile_list(i, sharedconfig) except Exception, e: log.exception(e) self._lock.release() GLib.idle_add(self._load_finished)
def vdf_import_confirmed(self, *a): name = self.builder.get_object("txName").get_text().strip() if len(self._profile.action_sets) > 1: # Update ChangeProfileActions with correct profile names for x in self._profile.action_set_switches: id = int(x._profile.split(":")[-1]) target_set = self._profile.action_set_by_id(id) x._profile = self.gen_aset_name(name, target_set) # Save action set profiles for k in self._profile.action_sets: if k != 'default': filename = self.gen_aset_name(name, k) + ".sccprofile" path = os.path.join(get_profiles_path(), filename) self._profile.action_sets[k].save(path) self.app.new_profile(self._profile, name) GLib.idle_add(self.window.destroy)
def setup_signals(self, signals, handler): """ This is a workaround to signal.signal(signal, handler) which does not work with a GLib.MainLoop() for some reason. Thanks to: http://stackoverflow.com/a/26457317/5433146 args: signals (list of signal.SIG... signals): the signals to connect to handler (function): function to be executed on these signals """ def install_glib_handler(sig): # add a unix signal handler GLib.unix_signal_add( GLib.PRIORITY_HIGH, sig, # for the given signal handler, # on this signal, run this function sig # with this argument ) for sig in signals: # loop over all signals GLib.idle_add( # 'execute' install_glib_handler, sig, # add a handler for this signal priority = GLib.PRIORITY_HIGH ) # set the config
def setup_signals(self, signals, handler): """ This is a workaround to signal.signal(signal, handler) which does not work with a GLib.MainLoop() for some reason. Thanks to: http://stackoverflow.com/a/26457317/5433146 args: signals (list of signal.SIG... signals): the signals to connect to handler (function): function to be executed on these signals """ def install_glib_handler(sig): # add a unix signal handler GLib.unix_signal_add( GLib.PRIORITY_HIGH, sig, # for the given signal handler, # on this signal, run this function sig # with this argument ) for sig in signals: # loop over all signals GLib.idle_add( # 'execute' install_glib_handler, sig, # add a handler for this signal priority = GLib.PRIORITY_HIGH ) # build the gui
def load_game_parm(self,fname): try: fp = open(fname) except : gv.gui.info_box("Error loading game - file not found") return fp.close() gv.gui.window.set_title(NAME + " " + VERSION + " " + os.path.basename(fname)) if fname.endswith(".psn"): self.get_header_from_file(fname) if gv.show_header == True: GLib.idle_add(gv.gui.header_lblsente.set_text, gv.sente[:50]) GLib.idle_add(gv.gui.header_lblgote.set_text, gv.gote[:50]) GLib.idle_add(gv.gui.header_lblevent.set_text, gv.event[:50]) GLib.idle_add(gv.gui.header_lbldate.set_text, gv.gamedate[:50]) self.psn.load_game_psn(fname) return
def command(self, cmd): e = self.side + "(" + self.get_running_engine().strip() + "):" if gv.verbose or gv.verbose_uci: print("->" + e + cmd.strip()) GObject.idle_add(self.engine_debug.add_to_log, "->" + e + cmd.strip()) try: # write as string (not bytes) since universal_newlines=True self.p.stdin.write(cmd) except AttributeError: GObject.idle_add( self.engine_debug.add_to_log, "# engine process is not running") except IOError: GObject.idle_add( self.engine_debug.add_to_log, "# engine process is not running")
def nvim_request(self, nvim: object, sub: str, args: object) -> object: """Signal emitted on a request from neovim. This is called on neovim's event loop, so modifications to GTK widgets should be done through `GLib.idle_add()`. After emission, should return an appropriate value expected by the request. :nvim: the `neovim.Nvim` instance. :sub: the request name. :args: the request arguments. """ if sub == 'Gui' and args[0] == 'Clipboard': return self.update_clipboard(*args[1:])
def update_clipboard(self, method: str, data: list): """Handles a clipboard request. This is called on neovim's event loop, so modifications to GTK widgets should be done through `GLib.idle_add()`. :method: `get` to obtain the clipboard's contents or `set` to update them. :data: when the method is `set`, contains the lines to add to the clipboard, else None. :returns: the clipboard's contents when the method is `get`, else None. """ cb = Gtk.Clipboard.get_default(Gdk.Display.get_default()) if method == 'set': text = '\n'.join(data[0]) return cb.set_text(text, len(text)) if method == 'get': text = cb.wait_for_text() return text.split('\n') if text else []
def switched(meta, builder): switch = builder.get_object('connect-switch') state = switch.get_active() logger.info("switch activated, old state {}".format(state)) if not state: logger.info("setting switch ON") GLib.idle_add(lambda: switch.set_active(True)) activate_connection(meta=meta, builder=builder) else: notify("eduVPN disconnecting...", "Disconnecting from {}".format(meta.display_name)) logger.info("setting switch OFF") GLib.idle_add(lambda: switch.set_active(False)) try: disconnect_provider(meta.uuid) except Exception as e: window = builder.get_object('eduvpn-window') error_helper(window, "can't disconnect", "{}: {}".format(type(e).__name__, str(e))) GLib.idle_add(lambda: switch.set_active(True)) raise
def _background(meta, oauth, dialog, builder): try: cert, key = create_keypair(oauth, meta.api_base_uri) meta.cert = cert meta.key = key meta.config = get_profile_config(oauth, meta.api_base_uri, meta.profile_id) except Exception as e: GLib.idle_add(lambda: error_helper(dialog, "can't finalize configuration", "{}: {}".format(type(e).__name__, str(e)))) GLib.idle_add(lambda: dialog.hide()) raise else: try: uuid = store_provider(meta) monitor_vpn(uuid=uuid, callback=lambda *args, **kwargs: vpn_change(builder=builder)) GLib.idle_add(lambda: notify("eduVPN provider added", "added provider '{}'".format(meta.display_name))) except Exception as e: GLib.idle_add(lambda: error_helper(dialog, "can't store configuration", "{} {}".format(type(e).__name__, str(e)))) GLib.idle_add(lambda: dialog.hide()) raise else: GLib.idle_add(lambda: dialog.hide()) GLib.idle_add(lambda: update_providers(builder))
def setup_signals(self, signals, handler): """ This is a workaround to signal.signal(signal, handler) which does not work with a GLib.MainLoop() for some reason. Thanks to: http://stackoverflow.com/a/26457317/5433146 args: signals (list of signal.SIG... signals): the signals to connect to handler (function): function to be executed on these signals """ def install_glib_handler(sig): # add a unix signal handler GLib.unix_signal_add( GLib.PRIORITY_HIGH, sig, # for the given signal handler, # on this signal, run this function sig # with this argument ) for sig in signals: # loop over all signals GLib.idle_add( # 'execute' install_glib_handler, sig, # add a handler for this signal priority = GLib.PRIORITY_HIGH ) # get an object from the builder
def paste_emoji(self, completer, model, tree_iter): shortname = model[tree_iter][1] codepoint = model[tree_iter][3] if '-' in codepoint: sequence = codepoint.split('-') string = chr(int(sequence[0], 16)) + chr(int(sequence[1], 16)) else: string = chr(int(codepoint, 16)) self.entry.set_text('') # When selecting match with mouse window.close() doesn't seem to work self.hide_window(self, None) GLib.idle_add(shared.clipboard.paste, string) if shortname in shared.recent: shared.recent.remove(shortname) shared.recent.appendleft(shortname) shared.emoji.categories['recent'].get_model().refilter() # Need to trigger resort when emoji already in recent gets used again shared.emoji.categories['recent'].set_sort_func( 1, shared.emoji.sort_recent, None) return True
def async_call(func, *args, callback=None): '''Call `func` in background thread, and then call `callback` in Gtk main thread. If error occurs in `func`, error will keep the traceback and passed to `callback` as second parameter. Always check `error` is not None. ''' def do_call(): result = None error = None try: result = func(*args) except Exception: error = traceback.format_exc() logger.error(error) if callback: GLib.idle_add(callback, result, error) thread = threading.Thread(target=do_call) thread.daemon = True thread.start()
def _connect(self, device): def cb_connect(): try: bus = pydbus.SystemBus() proxy = bus.get(SERVICE_NAME, device.path) proxy.Connect() except KeyError: self._log.debug("The device has likely disappeared.", exc_info=True) except GLib.Error: self._log.debug("Connect() failed:", exc_info=True) else: self._log.info("Connection successful.") self.queued_connections -= 1 if self.queued_connections == 0: print_device(device, "Connecting") GLib.idle_add(cb_connect) device.connected = True self.queued_connections += 1
def on_fontsize_adjustment_value_changed(self, adjustment): ''' The fontsize adjustment in the header bar has been changed :param adjustment: The adjustment used to change the fontsize :type adjustment: Gtk.Adjustment object ''' value = adjustment.get_value() if _ARGS.debug: sys.stdout.write( 'on_fontsize_adjustment_value_changed() value = %s\n' %value) self._fontsize = value self._save_options() self._busy_start() GLib.idle_add(self._change_flowbox_font)
def on_fallback_check_button_toggled(self, check_button): ''' The fallback check button in the header bar has been toggled :param toggle_button: The check button used to select whether fallback fonts should be used. :type adjustment: Gtk.CheckButton object ''' self._fallback = check_button.get_active() if _ARGS.debug: sys.stdout.write( 'on_fallback_check_button_toggled() self._fallback = %s\n' %self._fallback) self._save_options() self._busy_start() GLib.idle_add(self._change_flowbox_font)
def __init__(self): Gtk.Window.__init__(self, title='apart') self.dying = False self.status_listener = MessageListener( on_message=lambda m: GLib.idle_add(self.on_status_msg, m), message_predicate=lambda m: m['type'] == 'status') self.core = ApartCore(listeners=[self.status_listener], on_finish=lambda code: GLib.idle_add(self.on_delete)) self.sources = None self.sources_interest = [] # array of callbacks on sources update self.set_default_size(height=300, width=300 * 16/9) self.loading_body = LoadingBody() self.clone_body = None self.add(self.loading_body) self.connect('delete-event', self.on_delete) self.set_icon_name('apart')
def try_bridge_connection(self, cb): import phue while True: try: bridge = phue.Bridge(ip=self.selected_bridge) except phue.PhueRegistrationException: time.sleep(1) except Exception: import traceback traceback.print_exc() break else: self.bridge = bridge break GLib.idle_add(cb)
def do_activate(self): mark_time("in app activate") if self.get_active_window() is not None: self.get_active_window().present() return window = Window(self) mark_time("have window") self.add_window(window) mark_time("added window") window.show() mark_time("showed window") GLib.idle_add(mark_time, "in main loop") mark_time("app activate done")
def multithread(handler): """Multithread decorator""" def action(*args, **kwargs): with global_threading_lock: try: result = handler(*args, **kwargs) GLib.idle_add(on_done, result, args[0]) except Exception as e: print("Error in multithreading:\n%s" % str(e)) def on_done(result, inst): set_cursor(inst) if callable(result): result() def wrapper(*args, **kwargs): set_cursor(args[0], Gdk.Cursor(Gdk.CursorType.WATCH)) thread = threading.Thread(target=action, args=args, kwargs=kwargs) thread.daemon = True thread.start() return wrapper
def toggle_cursor(widget, hide=False): window = widget.get_window() if window: blank = Gdk.CursorType.BLANK_CURSOR cursor = Gdk.Cursor(blank) if hide else None GLib.idle_add(window.set_cursor, cursor)
def __init__(self, app): self.app = app self.stack = app.channels_stack self.filter = None self.channels = Gtk.Builder() self.channels.add_from_file(relative_path('ui/channels.ui')) self.channels.connect_signals(self) self.channels_box = self.channels.get_object('box_channels') self.stack.add_named(self.channels_box, 'channels_container') self.channels_filters = self.channels.get_object('list_box_channels_filters') self.channels_list = self.channels.get_object('flow_box_channels_list') self.channels_list.set_filter_func(self.on_channels_list_row_changed) GLib.idle_add(self.do_initial_setup)
def run(self): self.running = True while self.running: event = self.display.next_event() try: if event.type == X.ButtonPress: # Check if pointer is inside monitored windows for w in self.windows: if (Gtk.MAJOR_VERSION, Gtk.MINOR_VERSION) >= (3, 20): pdevice = Gdk.Display.get_default().get_default_seat().get_pointer() else: pdevice = Gdk.Display.get_default().get_device_manager().get_client_pointer() p = self.get_window().get_device_position(pdevice) g = self.get_size() if p.x >= 0 and p.y >= 0 and p.x <= g.width and p.y <= g.height: break else: # Is outside, so activate GLib.idle_add(self.idle) self.display.allow_events(X.ReplayPointer, event.time) else: self.display.allow_events(X.ReplayPointer, X.CurrentTime) except Exception as e: print "Unexpected error: " + str(e)
def load_feeds(self): with concurrent.futures.ProcessPoolExecutor() as executor: feeds = executor.map(scrape_rss, self.feed_manager.get_feeds()) frontpage_entries = [] for feed in feeds: feed.entries = feed.entries[:10] # Only get 1st 10 GLib.idle_add(self.add_new_feed_tab, feed.feed.title, feed.entries) # Load into frontpage-o frontpage_entries.extend(feed.entries) frontpage_entries = sorted(frontpage_entries, key=operator.attrgetter('updated')) GLib.idle_add(self.add_new_feed_tab, "Frontpage", frontpage_entries)
def on_video_search1_activated(self,searchbox): """ Start searching when the user presses enter """ # Show a spinner to the user while the results are being retrieved spinner = Gtk.Spinner() searching = Gtk.Label("Buscando...") # iterate through the list items and remove child items self.video_list.foreach(lambda child: self.video_list.remove(child)) # Show the spinner and searching label self.video_list.add(spinner) self.video_list.add(searching) spinner.start() # Update the changes self.video_list.show_all() #we spawn a new thread to consume the api asynchronously def start_search(): #for now we use a single backend self.search_response = self.backend.search(searchbox.get_text()) for search_result in self.search_response.get("items",[]): if search_result["id"]["kind"] == "youtube#video": GLib.idle_add(spinner.destroy) GLib.idle_add(searching.destroy) GLib.idle_add(self.update_list,search_result) GLib.idle_add(self.video_list.show) self.download_thread = threading.Thread(target=start_search) self.download_thread.daemon = True self.download_thread.start()
def download_video_thread(self,video_id, filename, video_iter): def update_progress(progress): """ Depending on the download status update the dialog or notify the user """ for item in self.download_queue_store: def get_status(status): return { 'downloading' : 'Descargando', 'finished' : 'Terminado', 'converting' : 'Convirtiendo' }[status] # This mumbo-jumbo exists because messing with the progress bar without using Glib.idle_add caused crashes with huge files. def update_progressbar(progress): self.download_queue_store[video_iter][1] = get_status(progress['status']) if 'total_bytes_estimate' in progress: self.download_queue_store[video_iter][2] = progress['downloaded_bytes'] / progress['total_bytes_estimate'] * 100 else: self.download_queue_store[video_iter][2] = progress['downloaded_bytes'] / progress['total_bytes'] * 100 GLib.idle_add(update_progressbar, progress) if progress['status'] != 'downloading': GLib.idle_add(self.finish_download) """ Spawn a new thread and download """ def start_download(): self.backend.video = video_id self.backend.filename = filename self.backend.download_callback = update_progress self.backend.download() self.download_thread = threading.Thread(target=start_download) self.download_thread.daemon = True self.download_thread.start()
def emit(self, *args): GLib.idle_add(GObject.GObject.emit, self, *args)
def on_profile_changed(self, c, profile): """ Called when controller profile is changed from daemon """ if not self.set_profile(profile, True): if self._first_time: def later(): # Cannot be executed right away, as profile-changed is # emitted before DaemonManager finishes initiaalisation self.emit('unknown-profile', profile) GLib.idle_add(later) self._first_time = False
def setup_statusicon(self): menu = self.builder.get_object("mnuTray") self.statusicon = get_status_icon(self.imagepath, menu) self.statusicon.connect('clicked', self.on_statusicon_clicked) if not self.statusicon.is_clickable(): self.builder.get_object("mnuShowWindowTray").set_visible(True) GLib.idle_add(self.statusicon.set, "scc-%s" % (self.status,), _("SC Controller"))
def set_daemon_status(self, status, daemon_runs): """ Updates image that shows daemon status and menu shown when image is clicked """ log.debug("daemon status: %s", status) icon = os.path.join(self.imagepath, "scc-%s.svg" % (status,)) imgDaemonStatus = self.builder.get_object("imgDaemonStatus") btDaemon = self.builder.get_object("btDaemon") mnuEmulationEnabled = self.builder.get_object("mnuEmulationEnabled") mnuEmulationEnabledTray = self.builder.get_object("mnuEmulationEnabledTray") imgDaemonStatus.set_from_file(icon) mnuEmulationEnabled.set_sensitive(True) mnuEmulationEnabledTray.set_sensitive(True) self.window.set_icon_from_file(icon) self.status = status if self.statusicon: GLib.idle_add(self.statusicon.set, "scc-%s" % (self.status,), _("SC Controller")) self.recursing = True if status == "alive": btDaemon.set_tooltip_text(_("Emulation is active")) elif status == "error": btDaemon.set_tooltip_text(_("Error enabling emulation")) elif status == "dead": btDaemon.set_tooltip_text(_("Emulation is inactive")) else: btDaemon.set_tooltip_text(_("Checking emulation status...")) mnuEmulationEnabled.set_active(daemon_runs) mnuEmulationEnabledTray.set_active(daemon_runs) self.recursing = False
def on_sbArea_focus_out_event(self, button, *a): GLib.idle_add(self.on_sbArea_output, button)
def on_scc_import_confirmed(self, *a): files = self.builder.get_object("lstImportPackage") new_profile_names = {} new_menu_names = {} for enabled, name, importas, trash, obj in files: if enabled != 0: if isinstance(obj.obj, Profile): new_profile_names[name] = importas elif isinstance(obj.obj, MenuData): new_menu_names["%s.menu" % (name,)] = "%s.menu" % (importas,) def apply_replacements(obj): for a in obj.get_all_actions(): if isinstance(a, ChangeProfileAction): if a.profile in new_profile_names: a.profile = new_profile_names[a.profile] a.parameters = tuple([ a.profile ] + list(a.parameters[1:])) elif isinstance(a, MenuAction): if a.menu_id in new_menu_names: a.menu_id = new_menu_names[a.menu_id] a.parameters = tuple([ a.menu_id ] + list(a.parameters[1:])) for enabled, trash, importas, trash, obj in files: if enabled != 0: # TODO: update references if isinstance(obj.obj, Profile): apply_replacements(obj.obj) obj.obj.save(os.path.join(get_profiles_path(), "%s.sccprofile" % (importas,))) elif isinstance(obj.obj, MenuData): apply_replacements(obj.obj) jstr = Encoder(sort_keys=True, indent=4).encode(obj.obj) filename = os.path.join(get_menus_path(), "%s.menu" % (importas,)) open(filename, "w").write(jstr) trash, trash, importas, trash, obj = files[0] # 1st is always profile that's being imported self.app.new_profile(obj.obj, importas) GLib.idle_add(self.window.destroy)
def _parse_profile_list(self, i, filename): """ Parses sharedconfig.vdf and loads game and profile IDs. That is later decoded into name of game and profile name. Called from _load_profiles, in thread. Exceptions are catched and logged from there. Calls GLib.idle_add to send loaded data into UI. """ data = parse_vdf(open(filename, "r")) # Sanity check if "userroamingconfigstore" not in data: return if "controller_config" not in data["userroamingconfigstore"]: return # Grab config cc = data["userroamingconfigstore"]["controller_config"] # Go through all games listitems = [] for gameid in cc: if "selected" in cc[gameid] and cc[gameid]["selected"].startswith("workshop"): profile_id = cc[gameid]["selected"].split("/")[-1] listitems.append(( i, gameid, profile_id, None )) i += 1 if len(listitems) > 10: GLib.idle_add(self.fill_list, listitems) listitems = [] GLib.idle_add(self.fill_list, listitems) return i
def _load_game_names(self): """ Loads names for game ids in q_games. This is done in thread (not in same thread as _load_profiles), because it involves searching for apropriate file and parsing it entirely. Calls GLib.idle_add to send loaded data into UI. """ sa_path = self._find_steamapps() while True: self._s_games.acquire(True) # Wait until something is added to the queue try: index, gameid = self._q_games.popleft() except IndexError: break if gameid.isdigit(): name = _("Unknown App ID %s") % (gameid) filename = os.path.join(sa_path, "appmanifest_%s.acf" % (gameid,)) self._lock.acquire() if os.path.exists(filename): try: data = parse_vdf(open(filename, "r")) name = data['appstate']['name'] except Exception, e: log.error("Failed to load app manifest for '%s'", gameid) log.exception(e) else: log.warning("Skiping non-existing app manifest '%s'", filename) self._lock.release() else: name = gameid GLib.idle_add(self._set_game_name, index, name)
def _load_profile_names(self): """ Loads names for profiles ids in q_profiles. This is same as _load_game_names, but for profiles. """ content_path = os.path.join(self._find_steamapps(), "workshop/content") if not os.path.exists(content_path): log.warning("Cannot find '%s'; Cannot import anything without it", content_path) return while True: self._s_profiles.acquire(True) # Wait until something is added to the queue try: index, gameid, profile_id = self._q_profiles.popleft() except IndexError: break self._lock.acquire() for user in os.listdir(content_path): filename = os.path.join(content_path, user, profile_id, "controller_configuration.vdf") if not os.path.exists(filename): # If there is no 'controller_configuration.vdf', try finding *_legacy.bin filename = self._find_legacy_bin(os.path.join(content_path, user, profile_id)) if not filename or not os.path.exists(filename): # If not even *_legacy.bin is found, skip to next user continue log.info("Reading '%s'", filename) try: data = parse_vdf(open(filename, "r")) name = data['controller_mappings']['title'] GLib.idle_add(self._set_profile_name, index, name, filename) break except Exception, e: log.error("Failed to read profile name from '%s'", filename) log.exception(e) else: log.warning("Profile %s for game %s not found.", profile_id, gameid) name = _("(not found)") GLib.idle_add(self._set_profile_name, index, name, None) self._lock.release()
def on_daemon_reconfigured(self, *a): # config is reloaded in main window 'reconfigured' handler. # Using GLib.idle_add here ensures that main window hanlder will run # *before* self.load_conditions GLib.idle_add(self.load_settings)
def SignalHandler(app): def signal_action(signal): if signal is 1: print("Caught signal SIGHUP(1)") elif signal is 2: print("Caught signal SIGINT(2)") elif signal is 15: print("Caught signal SIGTERM(15)") app.exit_gracefully() def handler(*args): # Activate GLib signal handler signal_action(args[0]) def idle_handler(*args): # Activate python signal handler GLib.idle_add(signal_action, priority=GLib.PRIORITY_HIGH) def install_glib_handler(sig): unix_signal_add = None if hasattr(GLib, "unix_signal_add"): unix_signal_add = GLib.unix_signal_add elif hasattr(GLib, "unix_signal_add_full"): unix_signal_add = GLib.unix_signal_add_full if unix_signal_add: # Register GLib signal handler unix_signal_add(GLib.PRIORITY_HIGH, sig, handler, sig) else: print("WARNING: Can't install GLib signal handler, too old gi.") SIGS = [getattr(signal, s, None) for s in "SIGINT SIGTERM SIGHUP".split()] for sig in filter(None, SIGS): # Register Python signal handler signal.signal(sig, idle_handler) GLib.idle_add(install_glib_handler, sig, priority=GLib.PRIORITY_HIGH)
def read_stdout(self): while True: try: e = ("<-" + self.side + "(" + self.get_running_engine().strip() + "):") line= "" # python2 line = unicode(self.p.stdout.readline(), errors ='ignore') # or: 'your iso 8859-15 text'.decode('iso8859-15') # python3 (doesn't work) lineb = self.p.stdout.readline().encode("utf-8", "ignore") #print(lineb) #line = str(lineb) #print(line, "line") line = self.p.stdout.readline() if line == "": if gv.verbose: print(e + "eof reached") if gv.verbose: print(e + "stderr:", self.p.stderr.read()) break #line = line[2:-3] #print(line) line = line.strip() if gv.verbose or gv.verbose_uci: print(e + line) GObject.idle_add(self.engine_debug.add_to_log, e+line) if line.startswith("info"): GObject.idle_add( self.engine_output.add_to_log, self.side, self.get_running_engine().strip(), line) self.op.append(line) except Exception as e: # line = e + "error" print("subprocess error in uci.py read_stdout:", e, "at:", line)
def build_board(self): GObject.idle_add(self.update) # convert jcchess co-ordinates for square into # standard notation (e.g. (1, 0) -> b1)
def set_square_as_unoccupied(self, x, y): self.dnd = (x, y) # redraw square GLib.idle_add(gv.gui.get_event_box(x, y).queue_draw) # called from gui.py when editing the board position to set the piece # on a square.
def set_piece_at_square(self, x, y, piece, colour): self.chessboard.set_piece_at(chess.square(x, y), chess.Piece(piece, colour)) GLib.idle_add(gv.gui.get_event_box(x, y).queue_draw) # called from gui.py to remove piece during promotion dialog
def remove_piece_at_square(self, x, y): piece=self.chessboard.remove_piece_at(chess.square(x, y)) GLib.idle_add(gv.gui.get_event_box(x, y).queue_draw) return piece # called when user does a "clear board" in board edit
def set_toolbar_time_control(self, txt, side_to_move): if side_to_move is None: return GLib.idle_add(self.tc_lbl[side_to_move].set_text, txt)
def run(self): logging.info('D-Bus process started') GLib.threads_init() # allow threads in GLib GLib.idle_add(self._idleQueueSync) DBusGMainLoop(set_as_default=True) dbusService = SessionDBus(self.taskQueue, self.resultQueue) try: GLib.MainLoop().run() except KeyboardInterrupt: logging.debug("\nThe MainLoop will close...") GLib.MainLoop().quit() return
def update_color(self, bg_color: str, is_dark: bool): """Updates the widget's background color and theme. :bg_color: string representing the color, or 'None'. :is_dark: whether the background is dark, see `:h background`. """ self.get_settings().props.gtk_application_prefer_dark_theme = is_dark if bg_color != 'None': rgba = Gdk.RGBA() rgba.parse(bg_color) self.set_color_background(rgba) else: GLib.idle_add(self.reset_color)