我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用pyinotify.ThreadedNotifier()。
def __init__(self, rt): ManagerPlugin.__init__(self, rt) self.git_repo = GitRepo(directory=self.rt.paths.skills, url='https://github.com/MatthewScholefield/mycroft-light.git', branch='skills', update_freq=1) self.blacklist = self.config['blacklist'] sys.path.append(self.rt.paths.skills) GroupPlugin.__init__(self, SkillPlugin, 'mycroft.skills', '_skill') for cls in self._classes.values(): cls.rt = rt self.init_plugins() # The watch manager stores the watches and provides operations on watches wm = pyinotify.WatchManager() mask = pyinotify.IN_MODIFY | pyinotify.IN_CREATE | pyinotify.IN_DELETE | pyinotify.IN_MOVED_TO skills_dir = self.rt.paths.skills handler = EventHandler(self) notifier = pyinotify.ThreadedNotifier(wm, handler) notifier.daemon = True wdd = wm.add_watch(skills_dir, mask, rec=True) notifier.start()
def setup_inotify(configuration): class EventHandler(pyinotify.ProcessEvent): def process_default(self, event): basename = os.path.basename(event.pathname) if basename == 'node.json': log("node.json changed") configuration.parse_node_json() elif basename == 'config.json': log("config.json changed!") configuration.parse_config_json() elif basename.endswith('.py'): abort_service("python file changed") wm = pyinotify.WatchManager() notifier = pyinotify.ThreadedNotifier(wm, EventHandler()) notifier.daemon = True notifier.start() wm.add_watch('.', pyinotify.IN_MOVED_TO)
def start_rtsync(self): if self.notifier is None: self.watchman = pyinotify.WatchManager() self.notifier = pyinotify.ThreadedNotifier(self.watchman, self.syncevt, read_freq=self.watch_delay) self.notifier.coalesce_events(True) mask = self.get_mask() wpaths = self.syncevt.get_wpaths() for awpath in wpaths: self.logger.debug("adding %s to sync watches" %awpath['src']) wpath = copy.deepcopy(awpath) self.watchman.add_watch(wpath['src'], mask, rec=True, auto_add=True) # do an initial sync, dont wait for rsync to finish. wpath['wait']=False self.rsync.sync(**wpath) self.logger.debug("starting notifier...") self.notifier.start() else: self.logger.debug("start_rtsync called by notifier not none.")
def __init__( self ): self.monitorId = 0 self.wm = pyinotify.WatchManager() self.wdds = {} self.callbacks = {} self.notifier = pyinotify.ThreadedNotifier(self.wm, self.fileChanged) self.notifier.setDaemon( True ) self.notifier.start()
def watch_shakemaps_push( working_dir, timeout=None, handler=None, daemon=False): wm = pyinotify.WatchManager() if daemon: notifier = pyinotify.ThreadedNotifier(wm, handler, timeout=timeout) else: notifier = pyinotify.Notifier(wm, handler, timeout=timeout) flags = pyinotify.IN_CREATE | pyinotify.IN_MODIFY | pyinotify.IN_MOVED_TO wm.add_watch(working_dir, flags, rec=True, auto_add=True) return notifier