我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用gtk.events_pending()。
def gui_ask_for_api(): """Gtk dialog for API key insert.""" message = gtk.MessageDialog(type=gtk.MESSAGE_QUESTION, buttons=gtk.BUTTONS_OK_CANCEL) message.set_markup(colorize.MSG_ASK_API.replace(colorize.URL,"<u>" + colorize.URL +"</u>")) entry = gtk.Entry(max=64) entry.set_text("Enter your API key") entry.show() message.vbox.pack_end(entry) entry.connect("activate", lambda _: d.response(gtk.RESPONSE_OK)) message.set_default_response(gtk.RESPONSE_OK) message.run() api_key = entry.get_text().decode('utf8') fp = open(colorize.HOME + colorize.API_KEY_FILE, 'w') fp.write("YOUR_API_KEY={0}{1}".format(api_key, os.linesep)) fp.close() # process buttong click immediately message.destroy() while gtk.events_pending(): gtk.main_iteration()
def doIteration(self, delay): # flush some pending events, return if there was something to do # don't use the usual "while gtk.events_pending(): mainiteration()" # idiom because lots of IO (in particular test_tcp's # ProperlyCloseFilesTestCase) can keep us from ever exiting. log.msg(channel='system', event='iteration', reactor=self) if gtk.events_pending(): gtk.mainiteration(0) return # nothing to do, must delay if delay == 0: return # shouldn't delay, so just return self.doIterationTimer = gtk.timeout_add(int(delay * 1000), self.doIterationTimeout) # This will either wake up from IO or from a timeout. gtk.mainiteration(1) # block # note: with the .simulate timer below, delays > 0.1 will always be # woken up by the .simulate timer if self.doIterationTimer: # if woken by IO, need to cancel the timer gtk.timeout_remove(self.doIterationTimer) self.doIterationTimer = None
def stopall_collectors(self, button): self.status_context_menu.tray_ind.set_icon(gtk.STOCK_NO) self.status_context_menu.stopall_menu_item.set_sensitive(False) self.status_context_menu.startall_menu_item.set_sensitive(True) self.stopall_button.set_sensitive(False) self.startall_button.set_sensitive(True) i = 0.0 pb = ProgressBarDetails() pb.setValue(0.0) while gtk.events_pending(): gtk.main_iteration() for collector in self.engine.collectors: if collector.is_enabled() and isinstance(collector, engine.collector.AutomaticCollector): collector.terminate() pb.setValue(i/len(self.engine.collectors)) pb.appendText("stopping "+collector.name) while gtk.events_pending(): gtk.main_iteration() i += 1 pb.setValue(1.0) if not pb.emit("delete-event", gtk.gdk.Event(gtk.gdk.DELETE)): pb.destroy()
def fetch_picture(self): if self.image_url: tmp_dest = tempfile.mktemp(prefix='poster_', dir=self.locations['temp']) self.image = tmp_dest.split('poster_', 1)[1] dest = "%s.jpg" % tmp_dest try: self.progress.set_data(self.parent_window, _("Fetching poster"), _("Wait a moment"), False) retriever = Retriever(self.image_url, self.parent_window, self.progress, dest, useurllib2=self.useurllib2) retriever.start() while retriever.isAlive(): self.progress.pulse() if self.progress.status: retriever.join() while gtk.events_pending(): gtk.main_iteration() urlcleanup() except: log.exception('') self.image = "" try: os.remove("%s.jpg" % tmp_dest) except: log.info("Can't remove %s file" % tmp_dest) else: self.image = ""
def start(self): while self.timeline.is_playing: time.sleep(1.0/self.timeline.framerate) self.timeline.on_goto(None,NEXT) # while has fixed frames jump to the next while self.timeline.frames[self.timeline.active].fixed: if not self.timeline.is_replay and self.timeline.active >= len(self.timeline.frames)-1: self.timeline.on_toggle_play(self.play_button) self.timeline.on_goto(None,NEXT) # if all frames is already played, and is no replay, stop the loop. if not self.timeline.is_replay and self.timeline.active >= len(self.timeline.frames)-1: self.timeline.on_toggle_play(self.play_button) # call gtk event handler. while gtk.events_pending(): gtk.main_iteration()
def keypress(self, widget, event=None): if event.keyval == 65307: if not gtk.events_pending(): self.window.iconify() self.window.hide() time.sleep(15) self.window.show() self.window.deiconify() self.window.present() return True
def gui_message(text, message_type): """Gtk dialog for error message display""" message = gtk.MessageDialog(type=message_type, buttons=gtk.BUTTONS_CLOSE) message.set_markup(text) message.run() message.destroy() while gtk.events_pending(): gtk.main_iteration()
def __init__(self, useGtk=True): self.context = gobject.main_context_default() self.loop = gobject.MainLoop() posixbase.PosixReactorBase.__init__(self) # pre 2.3.91 the glib iteration and mainloop functions didn't release # global interpreter lock, thus breaking thread and signal support. if (hasattr(gobject, "pygtk_version") and gobject.pygtk_version >= (2, 3, 91) and not useGtk): self.__pending = self.context.pending self.__iteration = self.context.iteration self.__crash = self.loop.quit self.__run = self.loop.run else: import gtk self.__pending = gtk.events_pending self.__iteration = gtk.main_iteration self.__crash = _our_mainquit self.__run = gtk.main # The input_add function in pygtk1 checks for objects with a # 'fileno' method and, if present, uses the result of that method # as the input source. The pygtk2 input_add does not do this. The # function below replicates the pygtk1 functionality. # In addition, pygtk maps gtk.input_add to _gobject.io_add_watch, and # g_io_add_watch() takes different condition bitfields than # gtk_input_add(). We use g_io_add_watch() here in case pygtk fixes this # bug.
def update_view_timeout(self): #print "view: update_view_timeout called at real time ", time.time() # while the simulator is busy, run the gtk event loop while not self.simulation.lock.acquire(False): while gtk.events_pending(): gtk.main_iteration() pause_messages = self.simulation.pause_messages self.simulation.pause_messages = [] try: self.update_view() self.simulation.target_time = ns.core.Simulator.Now ().GetSeconds () + self.sample_period #print "view: target time set to %f" % self.simulation.target_time finally: self.simulation.lock.release() if pause_messages: #print pause_messages dialog = gtk.MessageDialog(parent=self.window, flags=0, type=gtk.MESSAGE_WARNING, buttons=gtk.BUTTONS_OK, message_format='\n'.join(pause_messages)) dialog.connect("response", lambda d, r: d.destroy()) dialog.show() self.play_button.set_active(False) # if we're paused, stop the update timer if not self.play_button.get_active(): self._update_timeout_id = None return False #print "view: self.simulation.go.set()" self.simulation.go.set() #print "view: done." return True
def startall_collectors(self, button): self.status_context_menu.tray_ind.set_icon(gtk.STOCK_MEDIA_RECORD) self.status_context_menu.startall_menu_item.set_sensitive(False) self.status_context_menu.stopall_menu_item.set_sensitive(True) self.startall_button.set_sensitive(False) self.stopall_button.set_sensitive(True) i = 0.0 pb = ProgressBarDetails() pb.setValue(0.0) while gtk.events_pending(): gtk.main_iteration() for collector in self.engine.collectors: if collector.is_enabled() and isinstance(collector, engine.collector.AutomaticCollector): collector.run() pb.setValue(i / len(self.engine.collectors)) pb.appendText("processing "+collector.name) pb.pbar.set_text("Starting " + collector.name) while gtk.events_pending(): gtk.main_iteration() i += 1 pb.setValue(1.0) if not pb.emit("delete-event", gtk.gdk.Event(gtk.gdk.DELETE)): pb.destroy()
def __init__(self): self.ip_entry_text = "" self.port_entry_text = "" self.initial_entry_text = "" self.comment_entry_text = "" response = self.save_shot() if response == gtk.RESPONSE_ACCEPT: # wait 1 second before taking the snapshot while gtk.events_pending(): gtk.main_iteration() sleep(1) timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") #take the snapshot bitmap = autopy.bitmap.capture_screen() savepath = os.path.join(os.getcwd(), 'plugins', 'collectors', 'manualscreenshot', 'raw') savefilename = self.comment_entry_text.strip().replace("-","") + "-" + timestamp savefull = os.path.join(savepath,savefilename+".png") bitmap.save(savefull) #TODO: Fix to have raw folder created! #write to metafile f = open(os.path.join(savepath,"."+savefilename), 'w') string = "[{\n" + \ " \"ip\" : \"" + self.ip_entry_text + "\"\n"\ " \"port\" : \"" + self.port_entry_text + "\"\n"\ " \"initial\" : \"" + self.initial_entry_text + "\"\n"\ " \"timestamp\" : \"" + timestamp + "\"\n"\ " \"savepath\" : \"" + savefull + "\"\n"\ " \"comment\" : \"" + self.comment_entry_text + "\"\n"\ "}]\n" f.write(string) f.close() #print "write metadata to:",os.path.join(savepath,"."+savefilename) #print "Screen shot taken:", savefull #else: #print "cancelled"
def refresh(self): while gtk.events_pending(): gtk.main_iteration()
def popup_message(message): """shows popup message while executing decorated function""" def wrap(f): def wrapped_f(*args, **kwargs): if gtk: window = gtk.Window() window.set_title('Griffith info') window.set_position(gtk.WIN_POS_CENTER) window.set_keep_above(True) window.stick() window.set_default_size(200, 50) label = gtk.Label() label.set_markup("""<big><b>Griffith:</b> %s</big>""" % message) window.add(label) window.set_modal(True) window.set_type_hint(gtk.gdk.WINDOW_TYPE_HINT_DIALOG) window.show_all() while gtk.events_pending(): # give GTK some time for updates gtk.main_iteration() else: print message, res = f(*args, **kwargs) if gtk: window.destroy() else: print ' [done]' return res return wrapped_f return wrap
def _get(self, url, parent_window, decode=True): # initialize the progress dialog once for the following search process if not self.progress: self.progress = Progress(parent_window) data = None url = url.encode('utf-8', 'replace') log.debug('Using url <%s>', url) self.progress.set_data(parent_window, _('Searching'), _('Wait a moment'), True) try: retriever = Retriever(url, parent_window, self.progress) retriever.start() while retriever.isAlive(): self.progress.pulse() if self.progress.status: retriever.join() while gtk.events_pending(): gtk.main_iteration() try: if retriever.html: ifile = file(retriever.html[0], 'rb') try: data = ifile.read() finally: ifile.close() # check for gzip compressed pages before decoding to unicode if len(data) > 2 and data[0:2] == '\037\213': data = gutils.decompress(data) if decode: data = data.decode('utf-8', 'replace') os.remove(retriever.html[0]) except IOError: log.exception('') finally: self.progress.hide() urlcleanup() return data
def search(self, parent_window): # dont use base functionality # use the Amazon Web API self.titles = [""] self.ids = [""] try: accesskey = self.config.get('amazon_accesskey', None, section='extensions') secretkey = self.config.get('amazon_secretkey', None, section='extensions') if not accesskey or not secretkey: gutils.error(_('Please configure your Amazon Access Key ID and Secret Key correctly in the preferences dialog.')) return False amazon.setLicense(accesskey, secretkey) locale = self.config.get('amazon_locale', 0, section='extensions') if locale == '1' or locale == 'UK': locale = 'uk' elif locale == '2' or locale == 'DE': locale = 'de' elif locale == '3' or locale == 'CA': locale = 'ca' elif locale == '4' or locale == 'FR': locale = 'fr' elif locale == '5' or locale == 'JP': locale = 'jp' else: locale = None retriever = AmazonRetriever(self.title, locale, parent_window, self.progress) retriever.start() while retriever.isAlive(): self.progress.pulse() while gtk.events_pending(): gtk.main_iteration() self.page = retriever.result except: try: log.exception('Error retrieving results from amazon.') log.error(retriever.result.Request.Errors.Error.Message) except: pass return self.page
def do_events(self): """process any outstanding gtk events""" while gtk.events_pending(): gtk.main_iteration(False)
def do_dmrmarc(self, do_import): self.window.set_cursor(gtk.gdk.Cursor(gtk.gdk.WATCH)) if not self.do_dmrmarc_prompt(): self.window.set_cursor(None) return city = CONF.get("city", "dmrmarc") state = CONF.get("state", "dmrmarc") country = CONF.get("country", "dmrmarc") # Do this in case the import process is going to take a while # to make sure we process events leading up to this gtk.gdk.window_process_all_updates() while gtk.events_pending(): gtk.main_iteration(False) if do_import: eset = self.get_current_editorset() dmrmarcstr = "dmrmarc://%s/%s/%s" % (city, state, country) eset.do_import(dmrmarcstr) else: try: from chirp import dmrmarc radio = dmrmarc.DMRMARCRadio(None) radio.set_params(city, state, country) self.do_open_live(radio, read_only=True) except errors.RadioError, e: common.show_error(e) self.window.set_cursor(None)
def do_rfinder(self, do_import): self.window.set_cursor(gtk.gdk.Cursor(gtk.gdk.WATCH)) if not self.do_rfinder_prompt(): self.window.set_cursor(None) return lat = CONF.get_float("Latitude", "rfinder") lon = CONF.get_float("Longitude", "rfinder") passwd = CONF.get("Password", "rfinder") email = CONF.get("Email", "rfinder") miles = CONF.get_int("Range_in_Miles", "rfinder") # Do this in case the import process is going to take a while # to make sure we process events leading up to this gtk.gdk.window_process_all_updates() while gtk.events_pending(): gtk.main_iteration(False) if do_import: eset = self.get_current_editorset() rfstr = "rfinder://%s/%s/%f/%f/%i" % \ (email, passwd, lat, lon, miles) count = eset.do_import(rfstr) else: from chirp.drivers import rfinder radio = rfinder.RFinderRadio(None) radio.set_params((lat, lon), miles, email, passwd) self.do_open_live(radio, read_only=True) self.window.set_cursor(None)
def do_radioreference(self, do_import): self.window.set_cursor(gtk.gdk.Cursor(gtk.gdk.WATCH)) if not self.do_radioreference_prompt(): self.window.set_cursor(None) return username = CONF.get("Username", "radioreference") passwd = CONF.get("Password", "radioreference") zipcode = CONF.get("Zipcode", "radioreference") # Do this in case the import process is going to take a while # to make sure we process events leading up to this gtk.gdk.window_process_all_updates() while gtk.events_pending(): gtk.main_iteration(False) if do_import: eset = self.get_current_editorset() rrstr = "radioreference://%s/%s/%s" % (zipcode, username, passwd) count = eset.do_import(rrstr) else: try: from chirp import radioreference radio = radioreference.RadioReferenceRadio(None) radio.set_params(zipcode, username, passwd) self.do_open_live(radio, read_only=True) except errors.RadioError, e: common.show_error(e) self.window.set_cursor(None)
def set_text(self, text): self.label.set_text(text) self.queue_draw() while gtk.events_pending(): gtk.main_iteration_do(False)
def grind(self): while gtk.events_pending(): gtk.main_iteration(False) self.prog.pulse()
def set(self, fraction): while gtk.events_pending(): gtk.main_iteration(False) self.prog.set_fraction(fraction)
def open_page(self, parent_window=None, url=None): if url is None: url_to_fetch = self.url else: url_to_fetch = url if parent_window is not None: self.parent_window = parent_window self.progress.set_data(parent_window, _("Fetching data"), _("Wait a moment"), False) retriever = Retriever(url_to_fetch, self.parent_window, self.progress, useurllib2=self.useurllib2) retriever.start() while retriever.isAlive(): self.progress.pulse() if self.progress.status: retriever.join() while gtk.events_pending(): gtk.main_iteration() data = None try: if retriever.exception is None: if retriever.html: ifile = file(retriever.html[0], "rb") try: data = ifile.read() finally: ifile.close() # check for gzip compressed pages before decoding to unicode if len(data) > 2 and data[0:2] == '\037\213': data = gutils.decompress(data) try: # try to decode it strictly if self.encode: data = data.decode(self.encode) except UnicodeDecodeError, exc: # something is wrong, perhaps a wrong character set # or some pages are not as strict as they should be # (like OFDb, mixes utf8 with iso8859-1) # I want to log the error here so that I can find it # but the program should not terminate log.error(exc) data = data.decode(self.encode, 'ignore') else: gutils.urllib_error(_("Connection error"), self.parent_window) except IOError: log.exception('') if url is None: self.page = data urlcleanup() return data
def open_search(self, parent_window, destination=None): self.titles = [""] self.ids = [""] if self.url.find('%s') > 0: self.url = self.url % self.title self.url = string.replace(self.url, ' ', '%20') else: if not self.usepostrequest: self.url = string.replace(self.url + self.title, ' ', '%20') try: url = self.url.encode(self.encode) except UnicodeEncodeError: url = self.url.encode('utf-8') self.progress.set_data(parent_window, _("Searching"), _("Wait a moment"), True) if self.usepostrequest: postdata = self.get_postdata() retriever = Retriever(url, parent_window, self.progress, destination, useurllib2=self.useurllib2, postdata=postdata) else: retriever = Retriever(url, parent_window, self.progress, destination, useurllib2=self.useurllib2) retriever.start() while retriever.isAlive(): self.progress.pulse() if self.progress.status: retriever.join() while gtk.events_pending(): gtk.main_iteration() try: if retriever.exception is None: if destination: # caller gave an explicit destination file # don't care about the content return True if retriever.html: ifile = file(retriever.html[0], 'rb') try: self.page = ifile.read() finally: ifile.close() # check for gzip compressed pages before decoding to unicode if len(self.page) > 2 and self.page[0:2] == '\037\213': self.page = gutils.decompress(self.page) self.page = self.page.decode(self.encode, 'replace') else: return False else: self.progress.hide() gutils.urllib_error(_("Connection error"), parent_window) return False except IOError: log.exception('') finally: urlcleanup() return True
def on_terminal_keypress(self, widget, event, *args): if shortcuts.has_key(get_key_name(event)): cmd = shortcuts[get_key_name(event)] if type(cmd) == list: #comandos predefinidos if cmd == _COPY: self.terminal_copy(widget) elif cmd == _PASTE: self.terminal_paste(widget) elif cmd == _COPY_ALL: self.terminal_copy_all(widget) elif cmd == _SAVE: self.show_save_buffer(widget) elif cmd == _FIND: self.get_widget('txtSearch').select_region(0, -1) self.get_widget('txtSearch').grab_focus() elif cmd == _FIND_NEXT: if hasattr(self, 'search'): self.find_word() elif cmd == _CLEAR: widget.reset(True, True) elif cmd == _FIND_BACK: if hasattr(self, 'search'): self.find_word(backwards=True) elif cmd == _CONSOLE_PREV: widget.get_parent().get_parent().prev_page() elif cmd == _CONSOLE_NEXT: widget.get_parent().get_parent().next_page() elif cmd == _CONSOLE_CLOSE: wid = widget.get_parent() page = widget.get_parent().get_parent().page_num(wid) if page != -1: widget.get_parent().get_parent().remove_page(page) wid.destroy() elif cmd == _CONSOLE_RECONNECT: if not hasattr(widget, "command"): widget.fork_command(SHELL) else: widget.fork_command(widget.command[0], widget.command[1]) while gtk.events_pending(): gtk.main_iteration(False) #esperar 2 seg antes de enviar el pass para dar tiempo a que se levante expect y prevenir que se muestre el pass if widget.command[2]!=None and widget.command[2]!='': gobject.timeout_add(2000, self.send_data, widget, widget.command[2]) widget.get_parent().get_parent().get_tab_label(widget.get_parent()).mark_tab_as_active() return True elif cmd == _CONNECT: self.on_btnConnect_clicked(None) elif cmd[0][0:8] == "console_": page = int(cmd[0][8:]) - 1 widget.get_parent().get_parent().set_current_page(page) else: #comandos del usuario widget.feed_child(cmd) return True return False
def do_repeaterbook_proximity(self, do_import): self.window.set_cursor(gtk.gdk.Cursor(gtk.gdk.WATCH)) if not self.do_repeaterbook_proximity_prompt(): self.window.set_cursor(None) return loc = CONF.get("location", "repeaterbook") try: dist = int(CONF.get("distance", "repeaterbook")) except: dist = 20 try: band = int(CONF.get("band", "repeaterbook")) or '%' band = str(band) except: band = '%' query = "https://www.repeaterbook.com/repeaters/downloads/CHIRP/" \ "app_direct.php?loc=%s&band=%s&dist=%s" % (loc, band, dist) print query # Do this in case the import process is going to take a while # to make sure we process events leading up to this gtk.gdk.window_process_all_updates() while gtk.events_pending(): gtk.main_iteration(False) fn = tempfile.mktemp(".csv") filename, headers = urllib.urlretrieve(query, fn) if not os.path.exists(filename): LOG.error("Failed, headers were: %s", headers) common.show_error(_("RepeaterBook query failed")) self.window.set_cursor(None) return try: # Validate CSV radio = repeaterbook.RBRadio(filename) if radio.errors: reporting.report_misc_error("repeaterbook", ("query=%s\n" % query) + ("\n") + ("\n".join(radio.errors))) except errors.InvalidDataError, e: common.show_error(str(e)) self.window.set_cursor(None) return except Exception, e: common.log_exception() reporting.report_model_usage(radio, "import", True) self.window.set_cursor(None) if do_import: eset = self.get_current_editorset() count = eset.do_import(filename) else: self.do_open_live(radio, read_only=True)