我们从Python开源项目中,提取了以下4个代码示例,用于说明如何使用watchdog.events.PatternMatchingEventHandler()。
def __init__(self, runner): self.runner = runner self.event_handler = PatternMatchingEventHandler(patterns=["*.fast5"], ignore_patterns=[], ignore_directories=True) self.event_handler.on_created = self.on_created self.event_handler.on_moved = self.on_moved self.observer = Observer() self.observedPaths = [] for path in self.runner.input: if os.path.isdir(path): self.observer.schedule(self.event_handler, path, recursive=True) self.observedPaths.append(path) log("Monitoring {} in real time. Press Ctrl+C to exit.".format(", ".join(self.observedPaths)))
def __init__(self, container, host_dir, container_dir): """ Initialize a new instance of ContainerNotifier Args: container: Container host_dir (str): Host directory container_dir (str): Container directory """ self.container = container self.host_dir = host_dir self.container_dir = container_dir event_handler = PatternMatchingEventHandler(ignore_directories=False) handler = self.__change_handler event_handler.on_created = handler event_handler.on_moved = handler event_handler.on_modified = handler self.observer = Observer() self.observer.schedule(event_handler, host_dir, recursive=True) self.observer.start()
def __init__(self, loop, *args, **kwargs): self.loop = loop # awaitable future to race on self.changed = asyncio.Future(loop=loop) # Continue init for EventHandler return super(Handler, self).__init__(*args, **kwargs)
def watch(mode, target, only_page="", pdf_file=DEFAULT_PDF_FILE, es_upload=NO_ES_UP): """Look for changed files and re-run the build whenever there's an update. Runs until interrupted.""" target = get_target(target) class UpdaterHandler(PatternMatchingEventHandler): """Updates to pattern-matched files means rendering.""" def on_any_event(self, event): logger.info("got event!") # bypass_errors=True because Watch shouldn't # just die if a file is temporarily not found if mode == "pdf": make_pdf(pdf_file, target=target, bypass_errors=True, only_page=only_page, es_upload=es_upload) else: render_pages(target, mode=mode, bypass_errors=True, only_page=only_page, es_upload=es_upload) logger.info("done rendering") patterns = ["*template-*.html", "*.md", "*code_samples/*"] event_handler = UpdaterHandler(patterns=patterns) observer = Observer() observer.schedule(event_handler, config["template_path"], recursive=True) observer.schedule(event_handler, config["content_path"], recursive=True) observer.start() # The above starts an observing thread, # so the main thread can just wait try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join()