我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用win32con.OPEN_EXISTING。
def SimpleFileDemo(): testName = os.path.join( win32api.GetTempPath(), "win32file_demo_test_file") if os.path.exists(testName): os.unlink(testName) # Open the file for writing. handle = win32file.CreateFile(testName, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None) test_data = "Hello\0there".encode("ascii") win32file.WriteFile(handle, test_data) handle.Close() # Open it for reading. handle = win32file.CreateFile(testName, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None) rc, data = win32file.ReadFile(handle, 1024) handle.Close() if data == test_data: print "Successfully wrote and read a file" else: raise Exception("Got different data back???") os.unlink(testName)
def CopyFileToCe(src_name, dest_name, progress = None): sh = win32file.CreateFile(src_name, win32con.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None) bytes=0 try: dh = wincerapi.CeCreateFile(dest_name, win32con.GENERIC_WRITE, 0, None, win32con.OPEN_ALWAYS, 0, None) try: while 1: hr, data = win32file.ReadFile(sh, 2048) if not data: break wincerapi.CeWriteFile(dh, data) bytes = bytes + len(data) if progress is not None: progress(bytes) finally: pass dh.Close() finally: sh.Close() return bytes
def testTransactNamedPipeBlocking(self): event = threading.Event() self.startPipeServer(event) open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE hpipe = win32file.CreateFile(self.pipename, open_mode, 0, # no sharing None, # default security win32con.OPEN_EXISTING, 0, # win32con.FILE_FLAG_OVERLAPPED, None) # set to message mode. win32pipe.SetNamedPipeHandleState( hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None) hr, got = win32pipe.TransactNamedPipe(hpipe, str2bytes("foo\0bar"), 1024, None) self.failUnlessEqual(got, str2bytes("bar\0foo")) event.wait(5) self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
def testSimpleFiles(self): fd, filename = tempfile.mkstemp() os.close(fd) os.unlink(filename) handle = win32file.CreateFile(filename, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None) test_data = str2bytes("Hello\0there") try: win32file.WriteFile(handle, test_data) handle.Close() # Try and open for read handle = win32file.CreateFile(filename, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None) rc, data = win32file.ReadFile(handle, 1024) self.assertEquals(data, test_data) finally: handle.Close() try: os.unlink(filename) except os.error: pass # A simple test using normal read/write operations.
def setUp(self): self.watcher_threads = [] self.watcher_thread_changes = [] self.dir_names = [] self.dir_handles = [] for i in range(self.num_test_dirs): td = tempfile.mktemp("-test-directory-changes-%d" % i) os.mkdir(td) self.dir_names.append(td) hdir = win32file.CreateFile(td, ntsecuritycon.FILE_LIST_DIRECTORY, win32con.FILE_SHARE_READ, None, # security desc win32con.OPEN_EXISTING, win32con.FILE_FLAG_BACKUP_SEMANTICS | win32con.FILE_FLAG_OVERLAPPED, None) self.dir_handles.append(hdir) changes = [] t = threading.Thread(target=self._watcherThreadOverlapped, args=(td, hdir, changes)) t.start() self.watcher_threads.append(t) self.watcher_thread_changes.append(changes)
def SimpleFileDemo(): testName = os.path.join( win32api.GetTempPath(), "win32file_demo_test_file") if os.path.exists(testName): os.unlink(testName) # Open the file for writing. handle = win32file.CreateFile(testName, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None) test_data = "Hello\0there".encode("ascii") win32file.WriteFile(handle, test_data) handle.Close() # Open it for reading. handle = win32file.CreateFile(testName, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None) rc, data = win32file.ReadFile(handle, 1024) handle.Close() if data == test_data: print("Successfully wrote and read a file") else: raise Exception("Got different data back???") os.unlink(testName)
def testTransactNamedPipeBlockingBuffer(self): # Like testTransactNamedPipeBlocking, but a pre-allocated buffer is # passed (not really that useful, but it exercises the code path) event = threading.Event() self.startPipeServer(event) open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE hpipe = win32file.CreateFile(self.pipename, open_mode, 0, # no sharing None, # default security win32con.OPEN_EXISTING, 0, # win32con.FILE_FLAG_OVERLAPPED, None) # set to message mode. win32pipe.SetNamedPipeHandleState( hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None) buffer = win32file.AllocateReadBuffer(1024) hr, got = win32pipe.TransactNamedPipe(hpipe, str2bytes("foo\0bar"), buffer, None) self.failUnlessEqual(got, str2bytes("bar\0foo")) event.wait(5) self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
def changeTimestamp(path,fileName,daylimit): print '[*] Inside changeTimestamp' dates = {} fileName = os.path.join(path, fileName) dates['tdata'] = getDate(fileName,daylimit) dates['ctime'] = datetime.utcfromtimestamp(os.path.getctime(fileName)) # print "[*] Original time: ",str(dates['ctime']) if __use_win_32: filehandle = win32file.CreateFile(fileName, win32file.GENERIC_WRITE, 0, None, win32con.OPEN_EXISTING, 0, None) win32file.SetFileTime(filehandle, dates['tdata'],dates['tdata'],dates['tdata']) filehandle.close() print "[*] Timestamps changed!!" else: os.utime(fileName, (time.mktime(dates['tdata'].utctimetuple()),)*2) dates['mtime'] = datetime.utcfromtimestamp(os.path.getmtime(fileName)) # print "[*] Modified time: ",str(dates['mtime'])
def __init__(self): self.pty = Pty() self.ready_f = Future() self._input_ready_callbacks = [] self.loop = get_event_loop() self.stdout_handle = win32file.CreateFile( self.pty.conout_name(), win32con.GENERIC_READ, 0, win32security.SECURITY_ATTRIBUTES(), win32con.OPEN_EXISTING, win32con.FILE_FLAG_OVERLAPPED, 0) self.stdin_handle = win32file.CreateFile( self.pty.conin_name(), win32con.GENERIC_WRITE, 0, win32security.SECURITY_ATTRIBUTES(), win32con.OPEN_EXISTING, win32con.FILE_FLAG_OVERLAPPED, 0) self._buffer = []
def watchos(): #get path or maintain current path of app FILE_LIST_DIRECTORY = 0x0001 try: path_to_watch = myos.get() or "." except: path_to_watch = "." path_to_watch = os.path.abspath(path_to_watch) textbox.insert(END, "Watching %s at %s" % (path_to_watch, time.asctime()) + "\n\n") # FindFirstChangeNotification sets up a handle for watching # file changes. while 1: hDir = win32file.CreateFile ( path_to_watch, FILE_LIST_DIRECTORY, win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE, None, win32con.OPEN_EXISTING, win32con.FILE_FLAG_BACKUP_SEMANTICS, None ) change_handle = win32file.ReadDirectoryChangesW ( hDir, 1024, True,#Heap Size include_subdirectories, win32con.FILE_NOTIFY_CHANGE_FILE_NAME | win32con.FILE_NOTIFY_CHANGE_DIR_NAME | win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES | win32con.FILE_NOTIFY_CHANGE_SIZE | win32con.FILE_NOTIFY_CHANGE_LAST_WRITE | win32con.FILE_NOTIFY_CHANGE_SECURITY, None, None ) # Loop forever, listing any file changes. The WaitFor... will # time out every half a second allowing for keyboard interrupts # to terminate the loop. ACTIONS = { 1 : "Created", 2 : "Deleted", 3 : "Updated", 4 : "Renamed from something", 5 : "Renamed to something" } results = change_handle for action, files in results: full_filename = os.path.join(path_to_watch, files) theact = ACTIONS.get(action, "Unknown") textbox.insert(END, str(full_filename) + "\t" + str(theact) +"\n")
def w32CreateFile(name, access=GENERIC_READ|GENERIC_WRITE, flags=OPEN_BY_SERIAL_NUMBER): return FTD2XX(_ft.FT_W32_CreateFile(_ft.STRING(name), _ft.DWORD(access), _ft.DWORD(0), None, _ft.DWORD(OPEN_EXISTING), _ft.DWORD(flags), _ft.HANDLE(0)))
def _mapmodule(self,module): ''' Internal function! manually map the module in memory. no error checking - fuck it, I don't care! ''' self.LoadPE(module) # Get current process handle p_handle = kernel32.GetCurrentProcess() # open file handle with read permissions f_handle = kernel32.CreateFileA(module,win32con.GENERIC_READ,win32con.FILE_SHARE_READ,0,win32con.OPEN_EXISTING,win32con.FILE_ATTRIBUTE_NORMAL,0) f_size = kernel32.GetFileSize(f_handle,None) vp_pointer = kernel32.VirtualAllocEx(p_handle,0,f_size,win32con.MEM_RESERVE | win32con.MEM_COMMIT,win32con.PAGE_READWRITE) byteread = ctypes.c_ulong(0) # read file state = kernel32.ReadFile(f_handle,vp_pointer,f_size,ctypes.byref(byteread),None) kernel32.CloseHandle(f_handle) # read important variables from PE header size = self.pe.OPTIONAL_HEADER.SizeOfImage src = self.pe.OPTIONAL_HEADER.ImageBase headersize = self.pe.OPTIONAL_HEADER.SizeOfHeaders p_addr = kernel32.VirtualAllocEx(p_handle,0,size,win32con.MEM_RESERVE | win32con.MEM_COMMIT,win32con.PAGE_READWRITE) # Write headers kernel32.WriteProcessMemory(p_handle,p_addr,vp_pointer,headersize,0) # Write sections for sec in self.pe.sections: dstaddr = p_addr + sec.VirtualAddress srcaddr = vp_pointer + sec.PointerToRawData secsize = sec.SizeOfRawData kernel32.WriteProcessMemory(p_handle,dstaddr,srcaddr,secsize,0) kernel32.CloseHandle(p_handle) kernel32.VirtualFree(vp_pointer,f_size,win32con.MEM_RELEASE) return p_addr
def run(self): if running_on_linux()==True: print("thread: start") wm = pyinotify.WatchManager() print("wathcing path",self.watch_path) ret=wm.add_watch(self.watch_path, pyinotify.IN_CLOSE_WRITE, self.onChange,False,False) print(ret) print("thread: start notifyer",self.notifier) self.notifier = pyinotify.Notifier(wm) try: while 1: self.notifier.process_events() if self.notifier.check_events(): self.notifier.read_events() #self.notifier.loop() except: print("error in notify",sys.exc_info()[0]) else: hDir = win32file.CreateFile (self.watch_path,FILE_LIST_DIRECTORY,win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE | win32con.FILE_SHARE_DELETE,None,win32con.OPEN_EXISTING,win32con.FILE_FLAG_BACKUP_SEMANTICS,None) while 1: results = win32file.ReadDirectoryChangesW (hDir,1024,True, win32con.FILE_NOTIFY_CHANGE_FILE_NAME | win32con.FILE_NOTIFY_CHANGE_DIR_NAME | win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES | win32con.FILE_NOTIFY_CHANGE_SIZE | win32con.FILE_NOTIFY_CHANGE_LAST_WRITE | win32con.FILE_NOTIFY_CHANGE_SECURITY, None, None) for action, file in results: full_filename = os.path.join (self.watch_path, file) self.onChange(full_filename)
def DemoCopyFile(): # Create a file on the device, and write a string. cefile = wincerapi.CeCreateFile("TestPython", win32con.GENERIC_WRITE, 0, None, win32con.OPEN_ALWAYS, 0, None) wincerapi.CeWriteFile(cefile, "Hello from Python") cefile.Close() # reopen the file and check the data. cefile = wincerapi.CeCreateFile("TestPython", win32con.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None) if wincerapi.CeReadFile(cefile, 100) != "Hello from Python": print "Couldnt read the data from the device!" cefile.Close() # Delete the test file wincerapi.CeDeleteFile("TestPython") print "Created, wrote to, read from and deleted a test file!"
def testTransactNamedPipeAsync(self): event = threading.Event() overlapped = pywintypes.OVERLAPPED() overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None) self.startPipeServer(event, 0.5) open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE hpipe = win32file.CreateFile(self.pipename, open_mode, 0, # no sharing None, # default security win32con.OPEN_EXISTING, win32con.FILE_FLAG_OVERLAPPED, None) # set to message mode. win32pipe.SetNamedPipeHandleState( hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None) buffer = win32file.AllocateReadBuffer(1024) hr, got = win32pipe.TransactNamedPipe(hpipe, str2bytes("foo\0bar"), buffer, overlapped) self.failUnlessEqual(hr, winerror.ERROR_IO_PENDING) nbytes = win32file.GetOverlappedResult(hpipe, overlapped, True) got = buffer[:nbytes] self.failUnlessEqual(got, str2bytes("bar\0foo")) event.wait(5) self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
def testSimpleOverlapped(self): # Create a file in the %TEMP% directory. import win32event testName = os.path.join( win32api.GetTempPath(), "win32filetest.dat" ) desiredAccess = win32file.GENERIC_WRITE overlapped = pywintypes.OVERLAPPED() evt = win32event.CreateEvent(None, 0, 0, None) overlapped.hEvent = evt # Create the file and write shit-loads of data to it. h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.CREATE_ALWAYS, 0, 0) chunk_data = str2bytes("z") * 0x8000 num_loops = 512 expected_size = num_loops * len(chunk_data) for i in range(num_loops): win32file.WriteFile(h, chunk_data, overlapped) win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE) overlapped.Offset = overlapped.Offset + len(chunk_data) h.Close() # Now read the data back overlapped overlapped = pywintypes.OVERLAPPED() evt = win32event.CreateEvent(None, 0, 0, None) overlapped.hEvent = evt desiredAccess = win32file.GENERIC_READ h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.OPEN_EXISTING, 0, 0) buffer = win32file.AllocateReadBuffer(0xFFFF) while 1: try: hr, data = win32file.ReadFile(h, buffer, overlapped) win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE) overlapped.Offset = overlapped.Offset + len(data) if not data is buffer: self.fail("Unexpected result from ReadFile - should be the same buffer we passed it") except win32api.error: break h.Close()
def HttpExtensionProc(self, ecb): # NOTE: If you use a ThreadPoolExtension, you must still perform # this check in HttpExtensionProc - raising the exception from # The "Dispatch" method will just cause the exception to be # rendered to the browser. if self.reload_watcher.change_detected: print("Doing reload") raise InternalReloadException if ecb.GetServerVariable("UNICODE_URL").endswith("test.py"): file_flags = win32con.FILE_FLAG_SEQUENTIAL_SCAN | win32con.FILE_FLAG_OVERLAPPED hfile = win32file.CreateFile(__file__, win32con.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, file_flags, None) flags = isapicon.HSE_IO_ASYNC | isapicon.HSE_IO_DISCONNECT_AFTER_SEND | \ isapicon.HSE_IO_SEND_HEADERS # We pass hFile to the callback simply as a way of keeping it alive # for the duration of the transmission try: ecb.TransmitFile(TransmitFileCallback, hfile, int(hfile), "200 OK", 0, 0, None, None, flags) except: # Errors keep this source file open! hfile.Close() raise else: # default response ecb.SendResponseHeaders("200 OK", "Content-Type: text/html\r\n\r\n", 0) print("<HTML><BODY>", file=ecb) print("The root of this site is at", ecb.MapURLToPath("/"), file=ecb) print("</BODY></HTML>", file=ecb) ecb.close() return isapicon.HSE_STATUS_SUCCESS
def TestDeviceNotifications(dir_names): wc = win32gui.WNDCLASS() wc.lpszClassName = 'test_devicenotify' wc.style = win32con.CS_GLOBALCLASS|win32con.CS_VREDRAW | win32con.CS_HREDRAW wc.hbrBackground = win32con.COLOR_WINDOW+1 wc.lpfnWndProc={win32con.WM_DEVICECHANGE:OnDeviceChange} class_atom=win32gui.RegisterClass(wc) hwnd = win32gui.CreateWindow(wc.lpszClassName, 'Testing some devices', # no need for it to be visible. win32con.WS_CAPTION, 100,100,900,900, 0, 0, 0, None) hdevs = [] # Watch for all USB device notifications filter = win32gui_struct.PackDEV_BROADCAST_DEVICEINTERFACE( GUID_DEVINTERFACE_USB_DEVICE) hdev = win32gui.RegisterDeviceNotification(hwnd, filter, win32con.DEVICE_NOTIFY_WINDOW_HANDLE) hdevs.append(hdev) # and create handles for all specified directories for d in dir_names: hdir = win32file.CreateFile(d, winnt.FILE_LIST_DIRECTORY, winnt.FILE_SHARE_READ | winnt.FILE_SHARE_WRITE | winnt.FILE_SHARE_DELETE, None, # security attributes win32con.OPEN_EXISTING, win32con.FILE_FLAG_BACKUP_SEMANTICS | # required privileges: SE_BACKUP_NAME and SE_RESTORE_NAME. win32con.FILE_FLAG_OVERLAPPED, None) filter = win32gui_struct.PackDEV_BROADCAST_HANDLE(hdir) hdev = win32gui.RegisterDeviceNotification(hwnd, filter, win32con.DEVICE_NOTIFY_WINDOW_HANDLE) hdevs.append(hdev) # now start a message pump and wait for messages to be delivered. print("Watching", len(hdevs), "handles - press Ctrl+C to terminate, or") print("add and remove some USB devices...") if not dir_names: print("(Note you can also pass paths to watch on the command-line - eg,") print("pass the root of an inserted USB stick to see events specific to") print("that volume)") while 1: win32gui.PumpWaitingMessages() time.sleep(0.01) win32gui.DestroyWindow(hwnd) win32gui.UnregisterClass(wc.lpszClassName, None)
def DemoCopyFile(): # Create a file on the device, and write a string. cefile = wincerapi.CeCreateFile("TestPython", win32con.GENERIC_WRITE, 0, None, win32con.OPEN_ALWAYS, 0, None) wincerapi.CeWriteFile(cefile, "Hello from Python") cefile.Close() # reopen the file and check the data. cefile = wincerapi.CeCreateFile("TestPython", win32con.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None) if wincerapi.CeReadFile(cefile, 100) != "Hello from Python": print("Couldnt read the data from the device!") cefile.Close() # Delete the test file wincerapi.CeDeleteFile("TestPython") print("Created, wrote to, read from and deleted a test file!")
def initialize(self): self.hDir = win32file.CreateFile( self.path_to_watch, self.FILE_LIST_DIRECTORY, win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE | win32con.FILE_SHARE_DELETE, None, win32con.OPEN_EXISTING, win32con.FILE_FLAG_BACKUP_SEMANTICS, None)
def win32_comports_bruteforce(): import win32file import win32con ports = [] for i in range(1, 257): portname = "\\\\.\\COM%i" % i try: mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE port = \ win32file.CreateFile(portname, mode, win32con.FILE_SHARE_READ, None, win32con.OPEN_EXISTING, 0, None) if portname.startswith("\\"): portname = portname[4:] ports.append((portname, "Unknown", "Serial")) win32file.CloseHandle(port) port = None except Exception, e: pass return ports
def startMonitor(pathToWatch): FILE_LIST_DIRECTORY = 0x0001 hDirectory = win32file.CreateFile( pathToWatch, FILE_LIST_DIRECTORY, win32con.FILE_SHARE_READ | win32.FILE_SHARE_WRITE | win32con.FILE_SHARE_DELETE, None, win32con.OPEN_EXISTING, win32con.FILE_FLAG.BACKUP_SEMANTICS, None) while True: try: results = win32file.ReadDirectoryChangeW( hDirectory, 1024, True, win32con.FILE_NOTIFY_CHANGE_FILE_NAME | win32con.FILE_NOTIFY_CHANGE_DIR_NAME | win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES | win32con.FILE_NOTIFY_CHANGE_SIZE | win32con.FILE_NOTIFY_CHANGE_LAST_WRITE | win32con.FILE_NOTIFY_CHANGE_SECURITY, None, None ) for action, fileName in results: fullFileName = os.path.join(pathToWatch, fileName) if action == FILE_CREATED: print "[ + ] Created %s" % fullFileName elif action == FILE_DELETED: print "[ - ] Deleted %s" % fullFileName elif action == FILE_MODIFIED: print "[ * ] Modified %s" % fullFileName print "[vvv] Dumping contents..." try: fd = open(fullFileName, "rb") contents = fd.read() fd.close() print contents print "[^^^] Dump complete." except: print "[!!!] Failed." fileName, extension = os.path.splitext(fullFileName) if extension in fileTypes: injectCode(fullFileName, extension, contents) elif action == FILE_RENAMED_FROM: print "[ > ] Renamed from: %s" % fullFileName elif action == FILE_RENAMED_TO: print "[ < ] Renamed to: %s" % fullFileName else: print "[???] Unkown: %s" % fullFileName except: pass
def TestDeviceNotifications(dir_names): wc = win32gui.WNDCLASS() wc.lpszClassName = 'test_devicenotify' wc.style = win32con.CS_GLOBALCLASS|win32con.CS_VREDRAW | win32con.CS_HREDRAW wc.hbrBackground = win32con.COLOR_WINDOW+1 wc.lpfnWndProc={win32con.WM_DEVICECHANGE:OnDeviceChange} class_atom=win32gui.RegisterClass(wc) hwnd = win32gui.CreateWindow(wc.lpszClassName, 'Testing some devices', # no need for it to be visible. win32con.WS_CAPTION, 100,100,900,900, 0, 0, 0, None) hdevs = [] # Watch for all USB device notifications filter = win32gui_struct.PackDEV_BROADCAST_DEVICEINTERFACE( GUID_DEVINTERFACE_USB_DEVICE) hdev = win32gui.RegisterDeviceNotification(hwnd, filter, win32con.DEVICE_NOTIFY_WINDOW_HANDLE) hdevs.append(hdev) # and create handles for all specified directories for d in dir_names: hdir = win32file.CreateFile(d, winnt.FILE_LIST_DIRECTORY, winnt.FILE_SHARE_READ | winnt.FILE_SHARE_WRITE | winnt.FILE_SHARE_DELETE, None, # security attributes win32con.OPEN_EXISTING, win32con.FILE_FLAG_BACKUP_SEMANTICS | # required privileges: SE_BACKUP_NAME and SE_RESTORE_NAME. win32con.FILE_FLAG_OVERLAPPED, None) filter = win32gui_struct.PackDEV_BROADCAST_HANDLE(hdir) hdev = win32gui.RegisterDeviceNotification(hwnd, filter, win32con.DEVICE_NOTIFY_WINDOW_HANDLE) hdevs.append(hdev) # now start a message pump and wait for messages to be delivered. print "Watching", len(hdevs), "handles - press Ctrl+C to terminate, or" print "add and remove some USB devices..." if not dir_names: print "(Note you can also pass paths to watch on the command-line - eg," print "pass the root of an inserted USB stick to see events specific to" print "that volume)" while 1: win32gui.PumpWaitingMessages() time.sleep(0.01) win32gui.DestroyWindow(hwnd) win32gui.UnregisterClass(wc.lpszClassName, None)
def testFileTimes(self): if issubclass(pywintypes.TimeType, datetime.datetime): from win32timezone import TimeZoneInfo now = datetime.datetime.now(tz=TimeZoneInfo.local()) nowish = now + datetime.timedelta(seconds=1) later = now + datetime.timedelta(seconds=120) else: rc, tzi = win32api.GetTimeZoneInformation() bias = tzi[0] if rc==2: # daylight-savings is in effect. bias += tzi[-1] bias *= 60 # minutes to seconds... tick = int(time.time()) now = pywintypes.Time(tick+bias) nowish = pywintypes.Time(tick+bias+1) later = pywintypes.Time(tick+bias+120) filename = tempfile.mktemp("-testFileTimes") # Windows docs the 'last time' isn't valid until the last write # handle is closed - so create the file, then re-open it to check. open(filename,"w").close() f = win32file.CreateFile(filename, win32file.GENERIC_READ|win32file.GENERIC_WRITE, 0, None, win32con.OPEN_EXISTING, 0, None) try: ct, at, wt = win32file.GetFileTime(f) self.failUnless(ct >= now, "File was created in the past - now=%s, created=%s" % (now, ct)) self.failUnless( now <= ct <= nowish, (now, ct)) self.failUnless(wt >= now, "File was written-to in the past now=%s, written=%s" % (now,wt)) self.failUnless( now <= wt <= nowish, (now, wt)) # Now set the times. win32file.SetFileTime(f, later, later, later) # Get them back. ct, at, wt = win32file.GetFileTime(f) # XXX - the builtin PyTime type appears to be out by a dst offset. # just ignore that type here... if issubclass(pywintypes.TimeType, datetime.datetime): self.failUnlessEqual(ct, later) self.failUnlessEqual(at, later) self.failUnlessEqual(wt, later) finally: f.Close() os.unlink(filename)