我们从Python开源项目中,提取了以下4个代码示例,用于说明如何使用win32con.FILE_NOTIFY_CHANGE_SIZE。
def watchos(): #get path or maintain current path of app FILE_LIST_DIRECTORY = 0x0001 try: path_to_watch = myos.get() or "." except: path_to_watch = "." path_to_watch = os.path.abspath(path_to_watch) textbox.insert(END, "Watching %s at %s" % (path_to_watch, time.asctime()) + "\n\n") # FindFirstChangeNotification sets up a handle for watching # file changes. while 1: hDir = win32file.CreateFile ( path_to_watch, FILE_LIST_DIRECTORY, win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE, None, win32con.OPEN_EXISTING, win32con.FILE_FLAG_BACKUP_SEMANTICS, None ) change_handle = win32file.ReadDirectoryChangesW ( hDir, 1024, True,#Heap Size include_subdirectories, win32con.FILE_NOTIFY_CHANGE_FILE_NAME | win32con.FILE_NOTIFY_CHANGE_DIR_NAME | win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES | win32con.FILE_NOTIFY_CHANGE_SIZE | win32con.FILE_NOTIFY_CHANGE_LAST_WRITE | win32con.FILE_NOTIFY_CHANGE_SECURITY, None, None ) # Loop forever, listing any file changes. The WaitFor... will # time out every half a second allowing for keyboard interrupts # to terminate the loop. ACTIONS = { 1 : "Created", 2 : "Deleted", 3 : "Updated", 4 : "Renamed from something", 5 : "Renamed to something" } results = change_handle for action, files in results: full_filename = os.path.join(path_to_watch, files) theact = ACTIONS.get(action, "Unknown") textbox.insert(END, str(full_filename) + "\t" + str(theact) +"\n")
def run(self): if running_on_linux()==True: print("thread: start") wm = pyinotify.WatchManager() print("wathcing path",self.watch_path) ret=wm.add_watch(self.watch_path, pyinotify.IN_CLOSE_WRITE, self.onChange,False,False) print(ret) print("thread: start notifyer",self.notifier) self.notifier = pyinotify.Notifier(wm) try: while 1: self.notifier.process_events() if self.notifier.check_events(): self.notifier.read_events() #self.notifier.loop() except: print("error in notify",sys.exc_info()[0]) else: hDir = win32file.CreateFile (self.watch_path,FILE_LIST_DIRECTORY,win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE | win32con.FILE_SHARE_DELETE,None,win32con.OPEN_EXISTING,win32con.FILE_FLAG_BACKUP_SEMANTICS,None) while 1: results = win32file.ReadDirectoryChangesW (hDir,1024,True, win32con.FILE_NOTIFY_CHANGE_FILE_NAME | win32con.FILE_NOTIFY_CHANGE_DIR_NAME | win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES | win32con.FILE_NOTIFY_CHANGE_SIZE | win32con.FILE_NOTIFY_CHANGE_LAST_WRITE | win32con.FILE_NOTIFY_CHANGE_SECURITY, None, None) for action, file in results: full_filename = os.path.join (self.watch_path, file) self.onChange(full_filename)
def run(self,**kwargs): ''' sits and waits for a directory change. on change: 1. reads current db monitor list 2. if file is being monitored, copies file zipped to backup dir 3. queues upload job ''' while True: results = win32file.ReadDirectoryChangesW( self.hDir, 16384, True, win32con.FILE_NOTIFY_CHANGE_FILE_NAME | win32con.FILE_NOTIFY_CHANGE_DIR_NAME | win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES | win32con.FILE_NOTIFY_CHANGE_SIZE | win32con.FILE_NOTIFY_CHANGE_LAST_WRITE | win32con.FILE_NOTIFY_CHANGE_SECURITY, None, None) if kwargs.get('function'): kwargs.get('function')() for action, file in results: full_filename = os.path.join(self.path_to_watch,file) monitor_comparison = check_monitor(full_filename) if len(monitor_comparison) == 1: try: outcome = check_md5_and_process(monitor_comparison,full_filename,self.backup_dir) if outcome == True: if kwargs.get('signal'): kwargs.get('signal').emit() except FileNotFoundError: pass
def startMonitor(pathToWatch): FILE_LIST_DIRECTORY = 0x0001 hDirectory = win32file.CreateFile( pathToWatch, FILE_LIST_DIRECTORY, win32con.FILE_SHARE_READ | win32.FILE_SHARE_WRITE | win32con.FILE_SHARE_DELETE, None, win32con.OPEN_EXISTING, win32con.FILE_FLAG.BACKUP_SEMANTICS, None) while True: try: results = win32file.ReadDirectoryChangeW( hDirectory, 1024, True, win32con.FILE_NOTIFY_CHANGE_FILE_NAME | win32con.FILE_NOTIFY_CHANGE_DIR_NAME | win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES | win32con.FILE_NOTIFY_CHANGE_SIZE | win32con.FILE_NOTIFY_CHANGE_LAST_WRITE | win32con.FILE_NOTIFY_CHANGE_SECURITY, None, None ) for action, fileName in results: fullFileName = os.path.join(pathToWatch, fileName) if action == FILE_CREATED: print "[ + ] Created %s" % fullFileName elif action == FILE_DELETED: print "[ - ] Deleted %s" % fullFileName elif action == FILE_MODIFIED: print "[ * ] Modified %s" % fullFileName print "[vvv] Dumping contents..." try: fd = open(fullFileName, "rb") contents = fd.read() fd.close() print contents print "[^^^] Dump complete." except: print "[!!!] Failed." fileName, extension = os.path.splitext(fullFileName) if extension in fileTypes: injectCode(fullFileName, extension, contents) elif action == FILE_RENAMED_FROM: print "[ > ] Renamed from: %s" % fullFileName elif action == FILE_RENAMED_TO: print "[ < ] Renamed to: %s" % fullFileName else: print "[???] Unkown: %s" % fullFileName except: pass