我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用pyinotify.IN_CLOSE_WRITE。
def watch_config_changes(config_path, queues, nuimo_apps, processes, ha_api_url, ble_adapter_name): class ModificationHandler(pyinotify.ProcessEvent): def process_IN_CLOSE_WRITE(self, event): if hasattr(event, 'pathname') and event.pathname == config_path: logger.info("Config file was changed, reloading it...") update_from_config_file(config_path, queues, nuimo_apps, processes, ha_api_url, ble_adapter_name) handler = ModificationHandler() watch_manager = pyinotify.WatchManager() notifier = pyinotify.Notifier(watch_manager, handler) # IN_CLOSE_WRITE is fired when the file was closed after modification # in opposite to IN_MODIFY which is called for each partial write watch_manager.add_watch(config_path, pyinotify.IN_CLOSE_WRITE) logger.info("Listening to changes of: %s", config_path) notifier.loop() logger.info("Stopped listening to changes of: %s", config_path)
def run(self): self.running = True wm = pyinotify.WatchManager() notifier = pyinotify.Notifier(wm) def inotify_callback(event): if event.mask == pyinotify.IN_CLOSE_WRITE: self.populate_app(event.pathname) elif event.mask == pyinotify.IN_DELETE: with self.lock: for cmd, data in self.apps.items(): if data[2] == event.pathname: del self.apps[cmd] break for p in DESKTOP_PATHS: if os.path.exists(p): wm.add_watch(p, pyinotify.IN_CLOSE_WRITE | pyinotify.IN_DELETE, inotify_callback) notifier.loop()
def watch(self): # watch changes to the config file -- needs to be run in a separate thread configStatusManager = pyinotify.WatchManager() configStatusNotifier = pyinotify.Notifier(configStatusManager) configStatusManager.add_watch(self.path, pyinotify.IN_CLOSE_WRITE, proc_fun=PipelinesConfigUpdateHandler(config=self)) configStatusNotifier.loop()
def watch(self): # watch changes to the config file -- needs to be run in a separate thread configStatusManager = pyinotify.WatchManager() configStatusNotifier = pyinotify.Notifier(configStatusManager) configStatusManager.add_watch(self.path, pyinotify.IN_CLOSE_WRITE, proc_fun=PipelineConfigUpdateHandler(config=self)) configStatusNotifier.loop()
def wait_pyinotify(self, bld): class PE(w_pyinotify.ProcessEvent): def stop(self, event): self.notif.ev = True self.notif.stop() raise ValueError("stop for delete") process_IN_DELETE = stop process_IN_CLOSE = stop process_default = stop proc = PE() wm = w_pyinotify.WatchManager() notif = w_pyinotify.Notifier(wm, proc) proc.notif = notif # well, we should add all the folders to watch here for x in self.enumerate(bld.srcnode): wm.add_watch(x, w_pyinotify.IN_DELETE | w_pyinotify.IN_CLOSE_WRITE) try: # pyinotify uses an infinite loop ... not too nice, so we have to use an exception notif.loop() except ValueError: pass if not hasattr(notif, 'ev'): raise KeyboardInterrupt
def main(): wm = pyinotify.WatchManager() # watched events mask = pyinotify.IN_CREATE | pyinotify.IN_CLOSE_WRITE notifier = pyinotify.AsyncNotifier(wm, EventHandler()) wdd = wm.add_watch(SAMPLES_DIR, mask, rec=True) asyncore.loop()
def run(self): if running_on_linux()==True: print("thread: start") wm = pyinotify.WatchManager() print("wathcing path",self.watch_path) ret=wm.add_watch(self.watch_path, pyinotify.IN_CLOSE_WRITE, self.onChange,False,False) print(ret) print("thread: start notifyer",self.notifier) self.notifier = pyinotify.Notifier(wm) try: while 1: self.notifier.process_events() if self.notifier.check_events(): self.notifier.read_events() #self.notifier.loop() except: print("error in notify",sys.exc_info()[0]) else: hDir = win32file.CreateFile (self.watch_path,FILE_LIST_DIRECTORY,win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE | win32con.FILE_SHARE_DELETE,None,win32con.OPEN_EXISTING,win32con.FILE_FLAG_BACKUP_SEMANTICS,None) while 1: results = win32file.ReadDirectoryChangesW (hDir,1024,True, win32con.FILE_NOTIFY_CHANGE_FILE_NAME | win32con.FILE_NOTIFY_CHANGE_DIR_NAME | win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES | win32con.FILE_NOTIFY_CHANGE_SIZE | win32con.FILE_NOTIFY_CHANGE_LAST_WRITE | win32con.FILE_NOTIFY_CHANGE_SECURITY, None, None) for action, file in results: full_filename = os.path.join (self.watch_path, file) self.onChange(full_filename)