我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用gobject.timeout_add()。
def _add_watch(self, obj): mask = self._get_mask(obj) if mask: source_id = gobject.io_add_watch(obj, mask, self._handle_io, priority=IO_PRIORITY) self._watch_map[obj] = (source_id, mask) return False # This should be exceptional. The channel is still open, but is neither # readable nor writable. Retry until it is, but with a timeout_add() to # preserve CPU. if self._watch_map[obj][1] == 'idle_add': source_id = gobject.timeout_add(200, self._add_watch, obj, priority=gobject.PRIORITY_LOW) self._watch_map[obj] = (source_id, 'timeout_add') return False return True
def play(args): window = main_window(args.get('window-type', 'toplevel'), args['title'], args['width']/2, args['height']/2) window.resize(args['width'], args['height']) webview = create_webview() webview.load_string(args['html'], 'text/html', 'UTF-8', args['base']) if 'x' in args and 'y' in args: window.move(args['x'], args['y']) window.add(webview) window.show_all() if 'screenshot-file' in args: import gobject gobject.timeout_add(1500, save_screenshot, window, args['screenshot-file'])
def doIteration(self, delay): # flush some pending events, return if there was something to do # don't use the usual "while self.context.pending(): self.context.iteration()" # idiom because lots of IO (in particular test_tcp's # ProperlyCloseFilesTestCase) can keep us from ever exiting. log.msg(channel='system', event='iteration', reactor=self) if self.__pending(): self.__iteration(0) return # nothing to do, must delay if delay == 0: return # shouldn't delay, so just return self.doIterationTimer = gobject.timeout_add(int(delay * 1000), self.doIterationTimeout) # This will either wake up from IO or from a timeout. self.__iteration(1) # block # note: with the .simulate timer below, delays > 0.1 will always be # woken up by the .simulate timer if self.doIterationTimer: # if woken by IO, need to cancel the timer gobject.source_remove(self.doIterationTimer) self.doIterationTimer = None
def parse(self): self.pb = ProgressBarDetails() self.pb.set_title(self.collector.name + " Parser Output") self.pb.appendText("Starting parser for " + self.collector.name + "...\n") self.text_buffer = self.pb.text_buffer if os.name == 'nt': subprocess.Popen( self.parserInputs, cwd=os.path.dirname(os.path.realpath(__file__)), stdout=subprocess.PIPE, shell=True, stderr=subprocess.PIPE) self.status = "running" else: self.sub_proc = subprocess.Popen(self.parserInputs, stdout=subprocess.PIPE, shell=False) self.status = "running" gobject.timeout_add(100, self.update_textbuffer)
def _seek_show(self): if not self._seek_available() or self.timepos_popup_visible: return self.timepos_popup_visible = True self._seek_update_vals() width, height = self.timepos_popup.get_size() pheight = 55 self.timepos_popup.move(gtk.gdk.screen_width() - width, gtk.gdk.screen_height() - height - pheight) self.timepos_popup.show() self.current_engine.handler_unblock(self.timepos_sync_pos_handler) if not self.seek_autohide: def f(): self._seek_hide() self.seek_autohide = None return False self.seek_autohide = gobject.timeout_add(5000, f) # _seek_show()
def test_timeout_add(self): """ A L{reactor.callLater<twisted.internet.interfaces.IReactorTime.callLater>} call scheduled from a C{gobject.timeout_add} call is run on time. """ import gobject reactor = self.buildReactor() result = [] def gschedule(): reactor.callLater(0, callback) return 0 def callback(): result.append(True) reactor.stop() reactor.callWhenRunning(gobject.timeout_add, 10, gschedule) self.runReactor(reactor, 5) self.assertEqual(result, [True])
def _send_next_ssid(self): if not self.ssids: logger.debug("No SSIDs available.") return self.is_notifying next_ssid = '' for ssid in self.ssids: if ssid not in self._ssids_sent: next_ssid = ssid break if not next_ssid: # all SSIDs have been sent at least once, repeat: self._ssids_sent = [] GObject.timeout_add(self._wait_time, self._start_send_ssids) return False logger.debug("Sending next SSID: %s" % next_ssid) self.value_update(string_to_dbus_array(next_ssid)) self._ssid_last_sent = next_ssid self._ssids_sent.append(next_ssid) return self.is_notifying
def find_word(self, backwards=False): pos=-1 if backwards: lst = range(0, self.search['index']) lst.reverse() lst.extend(reversed(range(self.search['index'], len(self.search['lines'])))) else: lst = range(self.search['index'], len(self.search['lines'])) lst.extend(range(0, self.search['index'])) for i in lst: pos = self.search['lines'][i].find(self.search['word']) if pos != -1: self.search['index'] = i if backwards else i + 1 #print 'found at line %d column %d, index=%d' % (i, pos, self.search['index']) gobject.timeout_add(0, lambda: self.search['terminal'].get_adjustment().set_value(i)) self.search['terminal'].queue_draw() break if pos==-1: self.search['index'] = len(self.search['lines']) if backwards else 0
def start_scrsaver(self, saver_type): """start screen saver""" #start duration timer self.video_artwork_widget = self._main_images[(self.WinMain.emu_ini.getint('movie_artwork_no') - 1)] self.slide_duration = self.WinMain.wahcade_ini.getint('slide_duration') * 1000 self.running = True self.movie_type = '' self.saver_type = saver_type self.WinMain.show_window('scrsaver') if saver_type in ['slideshow', 'movie']: self.scr_timer_id = gobject.timeout_add(self.slide_duration, self.on_slide_timer) #show first game self.on_slide_timer() elif saver_type == 'launch_scr': #start external screen saver os.system(self.WinMain.emu_ini.get('scr_file'))
def start_timer(self, timer_type): """start given timer""" #screen saver if timer_type == 'scrsave' and self.scrsave_delay > 0: if self.scrsave_timer: gobject.source_remove(self.scrsave_timer) self.scrsave_timer = gobject.timeout_add(2500, self.on_scrsave_timer) #video elif timer_type == 'video' and self.video_enabled: #stop any playing vids first self.stop_video() #restart timer self.video_timer = gobject.timeout_add( self.delaymovieprev * 1000, self.on_video_timer) #joystick elif timer_type == 'joystick' and self.joy: self.joystick_timer = gobject.timeout_add(50, self.joy.poll, self.on_winMain_key_press)
def run(self, installSignalHandlers=1): self.startRunning(installSignalHandlers=installSignalHandlers) gobject.timeout_add(0, self.simulate) self.__run()
def simulate(self): """Run simulation loops and reschedule callbacks. """ global _simtag if _simtag is not None: gobject.source_remove(_simtag) self.runUntilCurrent() timeout = min(self.timeout(), 0.1) if timeout is None: timeout = 0.1 # grumble _simtag = gobject.timeout_add(int(timeout * 1010), self.simulate)
def simulate(self): """Run simulation loops and reschedule callbacks. """ global _simtag if _simtag is not None: gobject.source_remove(_simtag) self.iterate() timeout = min(self.timeout(), 0.1) if timeout is None: timeout = 0.1 # grumble _simtag = gobject.timeout_add(int(timeout * 1010), self.simulate)
def _start_update_timer(self): if self._update_timeout_id is not None: gobject.source_remove(self._update_timeout_id) #print "start_update_timer" self._update_timeout_id = gobject.timeout_add(int(SAMPLE_PERIOD/min(self.speed, 1)*1e3), self.update_view_timeout, priority=PRIORITY_UPDATE_VIEW)
def start(self): self.scan_topology() self.window.connect("delete-event", self._quit) #self._start_update_timer() gobject.timeout_add(200, self.autoscale_view) self.simulation.start() try: __IPYTHON__ except NameError: pass else: self._monkey_patch_ipython() gtk.main()
def _update_hr_msrmt_simulation(self): print('Update HR Measurement Simulation') if not self.notifying: return GObject.timeout_add(1000, self.hr_msrmt_cb)
def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.BATTERY_LVL_UUID, ['read', 'notify'], service) self.notifying = False self.battery_lvl = 100 GObject.timeout_add(5000, self.drain_battery)
def _on_disconnect(self, client, userdata, rc): logging.error('[Disconnected] Lost connection to broker') if self._socket_watch is not None: gobject.source_remove(self._socket_watch) self._socket_watch = None logging.info('[Disconnected] Set timer') gobject.timeout_add(5000, exit_on_error, self._reconnect)
def on_btn_startBenchmark_clicked(self, widget): seconds = int(self.get('seconds_adjustment').get_value()) # Half time for upload speed and half for download self.start_benchmark(seconds/2) self.timeleft = seconds self.get('seconds_spinbox').set_sensitive(False) self.get('hbox_buttons').set_visible(False) self.get('time_left_label').set_text(_("Benchmark finishing in %d seconds...") % self.timeleft) self.countdown_event = gobject.timeout_add(1000, self.update_time_left) self.get('hbox_status').set_visible(True)
def get_results(self): self.more_output = gobject.timeout_add(200, self.get_more_output) self.output_timeout = gobject.timeout_add(self.timeout*1000, self.show_results, True)
def gotScreenshot(self, handle, reply): for i in self.cstore: if handle == i[C_SESSION_HANDLE]: # We want to ask for thumbnails every 5 sec after the last one. # So if the client is too stressed and needs 7 secs to # send a thumbnail, we'll ask for one every 12 secs. gobject.timeout_add(5000, self.askScreenshot, handle) # print "I got a screenshot from %s." % handle if not reply: return try: rowstride, size, pixels = reply.split('\n', 2) except: return rowstride = int(rowstride) width, height = size.split('x') pxb = gtk.gdk.pixbuf_new_from_data(pixels, gtk.gdk.COLORSPACE_RGB, False, 8, int(width), int(height), rowstride) self.currentScreenshots[handle] = pxb self.cstore[i.path][C_PIXBUF] = pxb return # That handle is no longer in the cstore, remove it try: del self.currentScreenshots[handle] except: pass
def on_preferences_changed(self, preferences): if self.widget is None: return try: self.window.remove(self.widget) except ValueError: pass self.window.add_tab(_('icIndex'), self.widget, preferences['pane']) self.widget.show_lines(preferences['show_lines']) # Start after a while to give time to close previous window. gobject.timeout_add(100, lambda *a: self._init_index_ext(preferences)) self.widget.show_all()
def _clear_pagenames_cache(self): if not self._clear_cache_scheduled: def _clear(): '''Clear tags cache.''' if len(self._pagenames_cache) > 200: self._pagenames_cache = {} self._clear_cache_scheduled = False return False # to not call again gobject.timeout_add(500, _clear) self._clear_cache_scheduled = True
def __init__( self, w, h, speed ): super( Screen, self ).__init__( ) ## Old fashioned way to connect expose. I don't savvy the gobject stuff. self.connect ( "expose_event", self.do_expose_event ) ## We want to know where the mouse is: self.connect ( "motion_notify_event", self._mouseMoved ) ## More GTK voodoo : unmask events self.add_events ( gdk.BUTTON_PRESS_MASK | gdk.BUTTON_RELEASE_MASK | gdk.POINTER_MOTION_MASK ) ## This is what gives the animation life! gobject.timeout_add( speed, self.tick ) # Go call tick every 'speed' whatsits. self.width, self.height = w, h self.set_size_request ( w, h ) self.x, self.y = 11110,11111110 #unlikely first coord to prevent false hits.
def _seekbar_changed(self, seekbar): if self.seekbar_seek_timeout: return def f(): self._do_seek() self.seekbar_seek_timeout = None return False self.seekbar_seek_timeout = gobject.timeout_add(200, f) # _seekbar_changed()
def _register_state_timeout(self): self.length_tries = 20 def handler(): if not self.proc or not self.playing: return False else: if self.length is None and self.length_tries: self.length_tries -= 1 self._cmd("get_time_length") v = self._read_ans("ANS_LENGTH=", timeout=1000) if v: self.length = float(v) self.info["length"] = self.length self.emit("media-info", self.info) self._cmd("get_time_pos") v = self._read_ans("ANS_TIME_POSITION=", timeout=1000) if not v: return True self.pos = float(v) self.emit("pos", self.pos) return True self.state_timeout = gobject.timeout_add(500, handler) # _register_state_timeout()
def start(self): self.timeout_id = gobject.timeout_add(int(self.step * 1000), self.tick)
def __init__(self): gtk.DrawingArea.__init__(self) self.graph = Graph() self.openfilename = None self.set_flags(gtk.CAN_FOCUS) self.add_events(gtk.gdk.BUTTON_PRESS_MASK | gtk.gdk.BUTTON_RELEASE_MASK) self.connect("button-press-event", self.on_area_button_press) self.connect("button-release-event", self.on_area_button_release) self.add_events(gtk.gdk.POINTER_MOTION_MASK | gtk.gdk.POINTER_MOTION_HINT_MASK | gtk.gdk.BUTTON_RELEASE_MASK) self.connect("motion-notify-event", self.on_area_motion_notify) self.connect("scroll-event", self.on_area_scroll_event) self.connect("size-allocate", self.on_area_size_allocate) self.connect('key-press-event', self.on_key_press_event) self.last_mtime = None gobject.timeout_add(1000, self.update) self.x, self.y = 0.0, 0.0 self.zoom_ratio = 1.0 self.zoom_to_fit_on_resize = False self.animation = NoAnimation(self) self.drag_action = NullAction(self) self.presstime = None self.highlight = None
def _update_temp_value(self): if not self.props[constants.GATT_CHRC_IFACE]['Notifying']: return print('Starting timer event') GObject.timeout_add(500, self.temperature_cb)
def __init__(self): self.tray = gtk.StatusIcon() self.tray.connect('activate', self.refresh) # Create menu menu = gtk.Menu() i = gtk.MenuItem("Configure") i.show() i.connect("activate", self.ConfigurePiJuice) menu.append(i) i = gtk.MenuItem("About...") i.show() i.connect("activate", self.show_about) menu.append(i) self.tray.connect('popup-menu', self.show_menu, menu) self.pijuice = PiJuice(1,0x14) # Initalise and start battery display self.refresh(None) self.tray.set_visible(True) gobject.timeout_add(REFRESH_INTERVAL, self.refresh, False)
def __init__(self, node, *args, **kwargs): gtk.DrawingArea.__init__(self, *args, **kwargs) self.node = node self.timeoutID = gobject.timeout_add(5000, self.timeout) self.comms = {} self.incomingComms = {} # poison the node with our GUI hooks self.node._protocol.__gui = self self.node._protocol.__realSendRPC = self.node._protocol.sendRPC self.node._protocol.sendRPC = self.__guiSendRPC self.node._protocol.__realDatagramReceived = self.node._protocol.datagramReceived self.node._protocol.datagramReceived = self.__guiDatagramReceived self.msgCounter = 0 self.printMsgCount = False
def drawMsgCounter(self): self.printMsgCount = True gobject.timeout_add(3000, self.removeMsgCount)
def drawComms(self, contactID, msg): if contactID not in self.comms: self.comms[contactID] = msg gobject.timeout_add(750, self.removeComm, contactID) self.window.invalidate_rect(self.allocation, False)
def drawIncomingComms(self, contactID, msg): if contactID not in self.incomingComms: self.incomingComms[contactID] = msg gobject.timeout_add(750, self.removeIncomingComm, contactID) self.window.invalidate_rect(self.allocation, False)
def update_do_btns(self, refresh): self.set_do_buttons_state() if self.actionAutoRefresh.get_active() and refresh: self.timeout = gobject.timeout_add(self.pref.timeout_value, self.autorefresh_call)
def emulate_key(self, widget, key=None): try: event = gtk.gdk.Event(gtk.gdk.KEY_PRESS) if key: event.keyval = int(gtk.gdk.keyval_from_name(key)) else: event.keyval = ord(widget.get_label()) event.hardware_keycode = _keymap.get_entries_for_keyval(event.keyval)[0][0] # add control mask if ctrl is active if self.builder.get_object('ctrl').get_active(): event.state = gtk.gdk.CONTROL_MASK self.builder.get_object('ctrl').set_active(False) event.window = self.entry.window self.entry.event(event) # Do the initial event # Call key repeat function every 50ms self.wait_counter = 0 # Set counter for repeat timeout gobject.timeout_add(50, self.key_repeat, widget, event) except Exception as e: log.exception(e) #log.error("HAZZY KEYBOARD ERROR: key emulation error - " + str(e)) self.window.hide() # Unshift if left shift is active, right shift is "sticky" if self.builder.get_object('left_shift').get_active(): self.shift(False)
def map(self, *args): gobject.timeout_add(50, self.poll)
def on_zoom_in_pressed(self, widget, data=None): self.zoom_in_pressed = True gobject.timeout_add(50, self.zoom) self.set_image('zoom_in_image', 'plus_selected.png')