我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sublime.set_timeout_async()。
def wait_for_open(self, dest): """Wait for new linter window to open in another thread.""" def open_linter_py(): """Wait until the new linter window has opened and open linter.py.""" start = datetime.datetime.now() while True: time.sleep(0.25) delta = datetime.datetime.now() - start # Wait a maximum of 5 seconds if delta.seconds > 5: break window = sublime.active_window() folders = window.folders() if folders and folders[0] == dest: window.open_file(os.path.join(dest, 'linter.py')) break sublime.set_timeout_async(open_linter_py, 0)
def generate_color_scheme(from_reload=True): """ Asynchronously call generate_color_scheme_async. from_reload is True if this is called from the change callback for user settings. """ # If this was called from a reload of prefs, turn off the prefs observer, # otherwise we'll end up back here when ST updates the prefs with the new color. if from_reload: from . import persist def prefs_reloaded(): persist.settings.observe_prefs() persist.settings.observe_prefs(observer=prefs_reloaded) # ST crashes unless this is run async sublime.set_timeout_async(generate_color_scheme_async, 0)
def on_post_save(self, view): file_name = view.file_name() if not ReloadPlugin.PACKAGE_NAME in file_name: return if ReloadPlugin.PLUGIN_PYTHON_FILE in file_name: return original_file_name = view.file_name() plugin_python_file = os.path.join(sublime.packages_path(), ReloadPlugin.PLUGIN_PYTHON_FILE) if not os.path.isfile(plugin_python_file): return def _open_original_file(): view.window().open_file(original_file_name) plugin_view = view.window().open_file(plugin_python_file) print("save", plugin_view.file_name()) plugin_view.run_command("save") sublime.set_timeout_async(_open_original_file, self.PLUGIN_RELOAD_TIME_MS)
def on_modified(self, view): """ """ sel = view.sel() if not view.match_selector(sel[0].a, "source.swift"): return # clear linting self.errors = {} view.erase_regions("swiftkitten.diagnostics") self.query_id = None self.pending += 1 def handle_timeout(): self.handle_timeout(view) sublime.set_timeout_async(handle_timeout, self.delay)
def queue_did_change(view: sublime.View): buffer_id = view.buffer_id() buffer_version = 1 pending_buffer = None if buffer_id in pending_buffer_changes: pending_buffer = pending_buffer_changes[buffer_id] buffer_version = pending_buffer["version"] + 1 pending_buffer["version"] = buffer_version else: pending_buffer_changes[buffer_id] = { "view": view, "version": buffer_version } sublime.set_timeout_async( lambda: purge_did_change(buffer_id, buffer_version), 500)
def on_window_command(self, window, command_name, args): '''Very unstable switching projects, safety measure''' reset = ["prompt_open_project_or_workspace", "prompt_select_workspace", "open_recent_project_or_workspace", "close_workspace", "project_manager", 'close_window'] if ACTIVE and command_name in reset: # resetting variables and stopping CodeMap until CodeMap file is found again reset_globals() sublime.set_timeout_async(lambda: reactivate()) # -----------------
def ensure_reload(): """ Ensure all modules reload to initialize plugin successfully. """ start_upgrade_msg = "Do not close the Sublime Text. Upgrading {}".format( PACKAGE_NAME ) finish_upgrade_msg = "{} upgrade finished.".format(PACKAGE_NAME) active_view = sublime.active_window().active_view() active_view.set_status("afi_status", start_upgrade_msg) def erase_status(): active_view.erase_status("afi_status") def reload(): sublime.run_command("afi_reload") active_view.set_status("afi_status", finish_upgrade_msg) sublime.set_timeout(erase_status, 2000) sublime.set_timeout_async(reload, 5000)
def get(imageurl, user_callback=None): cached = get_cache_for(imageurl) if cached: return cached elif imageurl in ImageManager.loading.keys(): # return None (the file is still loading, already made a request) # return string the base64 of the url (which is going to be cached) temp_cached = ImageManager.loading[imageurl] if temp_cached == 404: return to_base64('404.png') if temp_cached: cache(imageurl, temp_cached) del ImageManager.loading[imageurl] return temp_cached else: # load from internet ImageManager.loading[imageurl] = None callback = get_base64_saver(ImageManager.loading, imageurl) loader = ImageLoader(imageurl, callback) loader.start() sublime.set_timeout_async(lambda: loader.join(), TIMEOUT * 1000)
def run(self, changes=None): # debug('workspace edit', changes) if changes: for uri, file_changes in changes.items(): path = uri_to_filename(uri) view = self.window.open_file(path) if view: if view.is_loading(): # TODO: wait for event instead. sublime.set_timeout_async( lambda: view.run_command('lsp_apply_document_edit', {'changes': file_changes}), 500 ) else: view.run_command('lsp_apply_document_edit', {'changes': file_changes, 'show_status': False}) else: debug('view not found to apply', path, file_changes) message = 'Applied changes to {} documents'.format(len(changes)) self.window.status_message(message) else: self.window.status_message('No changes to apply to workspace')
def plugin_loaded() -> None: """Called directly from sublime on plugin load """ package_folder = os.path.dirname(__file__) if not os.path.exists(os.path.join(package_folder, 'Main.sublime-menu')): template_file = os.path.join( package_folder, 'templates', 'Main.sublime-menu.tpl' ) with open(template_file, 'r', encoding='utf8') as tplfile: template = Template(tplfile.read()) menu_file = os.path.join(package_folder, 'Main.sublime-menu') with open(menu_file, 'w', encoding='utf8') as menu: menu.write(template.safe_substitute({ 'package_folder': os.path.basename(package_folder) })) # unload any conflictive package while anaconda is running sublime.set_timeout_async(monitor_plugins, 0) if not LOOP_RUNNING: ioloop.loop()
def update(self, i): """Update the progress bar """ if self.die: return size = 8 pos = i % size status = '{}={}'.format(' ' * pos, ' ' * ((size - 1) - pos)) sublime.status_message('{} [{}]'.format( self.messages['start'], status) ) if not (size - 1) - pos: self.addition = -1 if not pos: self.addition = 1 i += self.addition sublime.set_timeout_async(lambda: self.update(i), 100)
def update_logs(self): def __update_logs(): logs_json = self.query_logs() if not logs_json: self.poll_url = None return self.poll_url = LogManager.get_poll_url(logs_json) logs_list = logs_json["logs"] i = len(logs_list) if logs_list else 0 while i > 0: i -= 1 log = logs_list[i] if self.last_shown_log and LogManager.logs_are_equal(self.last_shown_log, log): i += 1 break while i < len(logs_list): log = logs_list[i] self.write_to_console(log) self.last_shown_log = log i += 1 sublime.set_timeout_async(__update_logs, 0)
def on_hover(self, view, point, hover_zone): # Popup only on text if hover_zone != sublime.HOVER_TEXT: return # Check file size to optionnaly disable the feature (finding the information can be quite long) threshold = view.settings().get('vhdl.hover_max_size',-1) if view.size() > threshold and threshold!=-1 : return # Only show a popup for vhdl, when not in a string of a comment scope = view.scope_name(point) if 'source.vhdl' not in scope: return if any(w in scope for w in ['comment', 'string', 'keyword']): return popup = VhdlTypePopup(view) sublime.set_timeout_async(lambda r=view.word(point), p=point: popup.show(r,p))
def plugin_loaded(): window.set_status_bar_message("") window.subscribe(sublime.active_window().active_view()) open_settings_window_if_credentials_are_not_set() def run_main_loop(): Spotify( side_effect = window.set_status_bar_message ).run_main_loop(settings_manager) ''' When sublime starts up and the plugin is enabled, the server should start on a different thread, so that sublime doesn't hang until the circuit breaker kicks in ''' sublime.set_timeout_async(run_main_loop, 5000)
def run(self, callback): with self._lock: self._tasks_running += 1 if self._tasks_running == 1: self._progress_animation = ProgressAnimation(self.tasks_running).start() def callback_wrapper(): try: callback() except Exception as e: log_exception( 'Background task failed with exception: [{exception}]'.format( exception=e)) with self._lock: self._tasks_running -= 1 self.log('tasks running = ' + str(self._tasks_running)) sublime.set_timeout_async(callback_wrapper, 0)
def run(self, edit): # check whether the lua files suffix_setting = self.view.settings().get('syntax') file_suffix = suffix_setting.split('.')[0] if file_suffix[-3:].lower() != 'lua': return # get lines of replacement r = sublime.Region(0, self.view.size()) lines = [] for region in self.view.lines(r): cache = self.view.substr(region) lines.append(cache) # get cursor position before the replacement selection = self.view.sel()[0].b row, col = self.view.rowcol(selection) # replace the content after format print("Run Lua Format") self.view.replace(edit, r, lua_format(lines, get_settings())) # deal cursor position selection = self.view.full_line(self.view.text_point(row - 1, 0)).b cursor_pos = sublime.Region(selection, selection) regions = self.view.sel() regions.clear() regions.add(cursor_pos) sublime.set_timeout_async(lambda: self.view.show(cursor_pos), 0)
def generate_menus(**kwargs): """Asynchronously call generate_menus_async.""" sublime.set_timeout_async(generate_menus_async, 0)
def pfile_modified(pfile, view): pfile.dirty = True now = time.time() if now - pfile.last_modified > .5: pfile.last_modified = now if is_st2: sublime.set_timeout(lambda: maybe_save_pfile(pfile, view, now), 5000) else: sublime.set_timeout_async(lambda: maybe_save_pfile(pfile, view, now), 5000) if pfile.cached_completions and sel_start(view.sel()[0]) < pfile.cached_completions[0]: pfile.cached_completions = None if pfile.cached_arguments and sel_start(view.sel()[0]) < pfile.cached_arguments[0]: pfile.cached_arguments = None
def run(self, edit): sublime.set_timeout_async(self.run_async)
def on_selection_modified_async(self, view): self.view = view sublime.set_timeout_async( lambda: self.run_check(view) )
def on_selection_modified_async(self, view): self.view = view sublime.set_timeout_async( lambda: self.run_coverage(view) )
def on_query_completions(self, view, prefix, locations): # Return the pending completions and clear them if self.completions_ready and self.completions: self.completions_ready = False return self.completions sublime.set_timeout_async( lambda: self.on_query_completions_async( view, prefix, locations ) )
def tab_selected(self, selected): if selected > -1: if self.is_file[selected]: self.window.open_file(os.path.join(self.mypath,self.files[selected])) else: folder = self.files[selected] if folder == "`": folder = ".." self.mypath = os.path.join(self.mypath, folder) sublime.set_timeout_async(self.open_for_current_path, 50) return selected
def make_requests(self, requests, env=None): """Make requests concurrently using a `ThreadPool`, which itself runs on an alternate thread so as not to block the UI. """ pools = self.RESPONSE_POOLS pool = ResponseThreadPool(requests, env, self.MAX_WORKERS) # pass along env vars to thread pool pools.put(pool) while pools.qsize() > self.MAX_NUM_RESPONSE_POOLS: old_pool = pools.get() old_pool.is_done = True # don't display responses for a pool which has already been removed sublime.set_timeout_async(pool.run, 0) # run on an alternate thread sublime.set_timeout(lambda: self.gather_responses(pool), 15) # small delay to show activity for requests that are returned in less than REFRESH_MS
def handle_responses(self, responses): """Invoke the real function on a different thread to avoid blocking UI. """ sublime.set_timeout_async(self._handle_responses, 0)
def run(self): view = self.window.active_view() url = view.settings().get('requester.request_url', None) if url is None: return sublime.set_timeout_async(lambda: self.show_options(url, view), 0)
def _startTimer(self): self._updateData() self._showStatus() sublime.set_timeout_async(lambda: self._startTimer(), self._updateInterval * 1e3)
def updateLanguage(self, index=None): if index == -1 or index == None: return languageName = re.search('^\w+', self.languageList[index]).group(0) self.languageName = languageName sublime.set_timeout_async(self.checkoutLanguage, 0)
def on_selection_modified_async(self, view): if not getSetting('auto'): return global currentView currentView = view self.prevTime = time.time() if not self.delaying: sublime.set_timeout_async(self.doAutoShow, getSetting('auto_delay') + 50) self.delaying = True
def doAutoShow(self): delayTime = getSetting('auto_delay') if (time.time() - self.prevTime) * 1000 > delayTime: self.delaying = False if not currentView.is_popup_visible(): currentView.run_command('docphp_show_definition') else: sublime.set_timeout_async(self.doAutoShow, int(delayTime - (time.time() - self.prevTime) * 1000) + 50)
def run(self, **args): async = args['async'] action_id = args['action_id'] action, delay = code_map_marshaler._actions.pop(action_id) if async: sublime.set_timeout_async(action, delay) else: sublime.set_timeout(action, delay) # =============================================================================
def run(self, edit): point = self.view.sel()[0].a line_region = self.view.line(point) self.view.sel().clear() self.view.sel().add(line_region) sublime.set_timeout_async(lambda: self.view.sel().add(line_region), 10) # =============================================================================
def on_close(self, view): if ACTIVE and view.file_name() == code_map_file: reset_globals() if settings().get('close_empty_group_on_closing_map', False): reset_layout() sublime.set_timeout_async(focus_source_code) # -----------------
def init(): log("Initializing icons") if not os.path.exists(path.get_overlay()): _create_dirs() sublime.set_timeout_async(provide, 0) else: sublime.set_timeout_async(_copy_specific, 0) dump("All the necessary icons are provided")
def enable(): log("Enabling aliases") if not _is_enabled(): if settings.is_package_archive(): sublime.set_timeout_async(_extract, 0) else: sublime.set_timeout_async(_copy, 0) else: dump("Aliases already enabled")
def disable(): log("Disabling aliases") if _is_enabled(): sublime.set_timeout_async(_remove, 0) else: dump("Aliases already disabled")
def on_close(self, view): if is_supported_syntax(view.settings().get("syntax")): Events.publish("view.on_close", view) sublime.set_timeout_async(check_window_unloaded, 500)
def run(self): view = self.window.active_view() available_config = get_scope_client_config(view, client_configs.defaults) or get_default_client_config(view) if available_config: client_configs.enable(available_config.name) clear_window_client_configs(self.window) sublime.set_timeout_async(lambda: start_view(view), 500) self.window.status_message("{} enabled, starting server...".format(available_config.name)) return self.window.status_message("No config available to enable")
def run(self): view = self.window.active_view() # if no default_config, nothing we can do. default_config = get_default_client_config(view) if default_config: enable_in_project(self.window, default_config.name) clear_window_client_configs(self.window) sublime.set_timeout_async(lambda: start_view(view), 500) self.window.status_message("{} enabled in project, starting server...".format(default_config.name)) else: self.window.status_message("No config available to enable")
def run(self): view = self.window.active_view() global_config = get_scope_client_config(view, client_configs.defaults) if global_config: disable_in_project(self.window, global_config.name) clear_window_client_configs(self.window) sublime.set_timeout_async(lambda: unload_window_clients(self.window.id()), 500) self.window.status_message("{} disabled in project, shutting down server...".format(global_config.name)) return else: self.window.status_message("No config available to disable")
def finish(self, proc): super(PytestExecCommand, self).finish(proc) view = self.output_view output = get_whole_text(view).strip() last_line = output[output.rfind('\n'):] summary = '' matches = re.finditer(r' ([\d]+) ', last_line) if matches: test_count = sum(int(m.group(1)) for m in matches) summary = "Ran %s tests. " % (test_count) summary += last_line.replace('=', '') failures = proc.exit_code() != 0 if failures: broadcast("pytest_will_fail") broadcast('pytest_finished', { "summary": summary, "failures": failures }) # This is a 'best' guess. Maybe we should parse the output for the # `rootdir` pytest uses. base_dir = view.settings().get('result_base_dir') # For the 'line' go with output regex parsing bc the reporter # isn't useful at all. if self._tb_mode == 'line': sublime.set_timeout_async( functools.partial( parse_output, output, base_dir, Matchers[self._tb_mode])) else: sublime.set_timeout_async( functools.partial( parse_result, base_dir, Matchers[self._tb_mode]))
def prevent_spam(func): """Prevent spamming in the logger """ _last_messages = {} def _remove_from_cache(args): m = _last_messages.pop(args) if m == 1: return method = getattr(Log._logger, args[0]) method( '{}\n ...last message repeated {} in the last 10s'.format( args[1], 'one more time' if m == 2 else '{} times'.format(m) ) ) @functools.wraps(func) def wrapper(cls, *args, **kwargs): if args in _last_messages: _last_messages[args] += 1 return _last_messages[args] = 1 sublime.set_timeout_async(lambda: _remove_from_cache(args), 10000) func(cls, args[0], *args[1:], **kwargs) return wrapper
def execute(cls, callback, **data): """Execute the given method remotely and call the callback with result """ def _start_worker(wk, cb, **d): wk.start() if wk.status == WorkerStatus.healthy: wk._execute(cb, **d) return sublime.set_timeout_async(lambda: _start_worker(wk, cb, **d), 5000) window_id = sublime.active_window().id() worker = cls.get(cls, window_id) if worker is None: # hire a new worker worker = cls.hire(cls) cls.add(cls, window_id, worker) if worker.status == WorkerStatus.faulty: return if worker.status == WorkerStatus.quiting: cls.fire(cls, window_id) return if worker.client is not None: if not worker.client.connected: worker.reconnecting = True worker.state = WorkerStatus.incomplete _start_worker(worker, callback, **data) else: worker._append_context_data(data) worker._execute(callback, **data) if worker.status == WorkerStatus.quiting: # that means that we need to let the worker go cls.fire(cls, window_id) else: _start_worker(worker, callback, **data)
def update_log_windows(restart_timer=True): global project_env_map try: for (project_path, env) in list(project_env_map.items()): # Clean up project windows first if not ProjectManager.is_electric_imp_project_window(env.window): # It's not a windows that corresponds to an EI project, remove it from the list del project_env_map[project_path] log_debug("Removing project window: " + str(env.window) + ", total #: " + str(len(project_env_map))) continue env.log_manager.update_logs() finally: if restart_timer: sublime.set_timeout_async(update_log_windows, PL_LOGS_UPDATE_PERIOD)
def run(self,edit): mname = getModuleName(self.view) if not mname: print('[VHDL.navigation] No entity/architecture found !') return txt = self.view.substr(sublime.Region(0, self.view.size())) inst_l = vhdl_util.get_inst_list(txt,mname) if not inst_l: print('[VHDL.navigation] No hierarchy found !') return sublime.status_message("Show Hierarchy can take some time, please wait ...") sublime.set_timeout_async(lambda inst_l=inst_l, w=self.view.window(), mname=mname : self.showHierarchy(w,inst_l,mname))