我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用win32file.OPEN_EXISTING。
def testSimpleFiles(self): fd, filename = tempfile.mkstemp() os.close(fd) os.unlink(filename) handle = win32file.CreateFile(filename, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None) test_data = str2bytes("Hello\0there") try: win32file.WriteFile(handle, test_data) handle.Close() # Try and open for read handle = win32file.CreateFile(filename, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None) rc, data = win32file.ReadFile(handle, 1024) self.assertEquals(data, test_data) finally: handle.Close() try: os.unlink(filename) except os.error: pass # A simple test using normal read/write operations.
def setUp(self): self.watcher_threads = [] self.watcher_thread_changes = [] self.dir_names = [] self.dir_handles = [] for i in range(self.num_test_dirs): td = tempfile.mktemp("-test-directory-changes-%d" % i) os.mkdir(td) self.dir_names.append(td) hdir = win32file.CreateFile(td, ntsecuritycon.FILE_LIST_DIRECTORY, win32con.FILE_SHARE_READ, None, # security desc win32con.OPEN_EXISTING, win32con.FILE_FLAG_BACKUP_SEMANTICS | win32con.FILE_FLAG_OVERLAPPED, None) self.dir_handles.append(hdir) changes = [] t = threading.Thread(target=self._watcherThreadOverlapped, args=(td, hdir, changes)) t.start() self.watcher_threads.append(t) self.watcher_thread_changes.append(changes)
def _OpenFileForRead(self, path): try: fhandle = self.fhandle = win32file.CreateFile( path, win32file.GENERIC_READ, win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE, None, win32file.OPEN_EXISTING, win32file.FILE_ATTRIBUTE_NORMAL, None) self._closer = weakref.ref( self, lambda x: win32file.CloseHandle(fhandle)) self.write_enabled = False return fhandle except pywintypes.error as e: raise IOError("Unable to open %s: %s" % (path, e))
def _OpenFileForWrite(self, path): try: fhandle = self.fhandle = win32file.CreateFile( path, win32file.GENERIC_READ | win32file.GENERIC_WRITE, win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE, None, win32file.OPEN_EXISTING, win32file.FILE_ATTRIBUTE_NORMAL, None) self.write_enabled = True self._closer = weakref.ref( self, lambda x: win32file.CloseHandle(fhandle)) return fhandle except pywintypes.error as e: raise IOError("Unable to open %s: %s" % (path, e))
def pipe(bufsize=8192): """Creates overlapped (asynchronous) pipe. """ name = r'\\.\pipe\pycos-pipe-%d-%d' % (os.getpid(), next(_pipe_id)) openmode = (win32pipe.PIPE_ACCESS_INBOUND | win32file.FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE) pipemode = (win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_READMODE_BYTE) rh = wh = None try: rh = win32pipe.CreateNamedPipe( name, openmode, pipemode, 1, bufsize, bufsize, win32pipe.NMPWAIT_USE_DEFAULT_WAIT, None) wh = win32file.CreateFile( name, win32file.GENERIC_WRITE | winnt.FILE_READ_ATTRIBUTES, 0, None, win32file.OPEN_EXISTING, win32file.FILE_FLAG_OVERLAPPED, None) overlapped = pywintypes.OVERLAPPED() # 'yield' can't be used in constructor so use sync wait # (in this case it is should be okay) overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None) rc = win32pipe.ConnectNamedPipe(rh, overlapped) if rc == winerror.ERROR_PIPE_CONNECTED: win32event.SetEvent(overlapped.hEvent) rc = win32event.WaitForSingleObject(overlapped.hEvent, 1000) overlapped = None if rc != win32event.WAIT_OBJECT_0: pycos.logger.warning('connect failed: %s' % rc) raise Exception(rc) return (rh, wh) except: if rh is not None: win32file.CloseHandle(rh) if wh is not None: win32file.CloseHandle(wh) raise
def opencom1(): # returns a handle to the Windows file return win32file.CreateFile(u'COM1:', win32file.GENERIC_READ, \ 0, None, win32file.OPEN_EXISTING, 0, None)
def connect(self, address): win32pipe.WaitNamedPipe(address, self._timeout) try: handle = win32file.CreateFile( address, win32file.GENERIC_READ | win32file.GENERIC_WRITE, 0, None, win32file.OPEN_EXISTING, cSECURITY_ANONYMOUS | cSECURITY_SQOS_PRESENT, 0 ) except win32pipe.error as e: # See Remarks: # https://msdn.microsoft.com/en-us/library/aa365800.aspx if e.winerror == cERROR_PIPE_BUSY: # Another program or thread has grabbed our pipe instance # before we got to it. Wait for availability and attempt to # connect again. win32pipe.WaitNamedPipe(address, RETRY_WAIT_TIMEOUT) return self.connect(address) raise e self.flags = win32pipe.GetNamedPipeInfo(handle)[0] self._handle = handle self._address = address
def testSimpleOverlapped(self): # Create a file in the %TEMP% directory. import win32event testName = os.path.join( win32api.GetTempPath(), "win32filetest.dat" ) desiredAccess = win32file.GENERIC_WRITE overlapped = pywintypes.OVERLAPPED() evt = win32event.CreateEvent(None, 0, 0, None) overlapped.hEvent = evt # Create the file and write shit-loads of data to it. h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.CREATE_ALWAYS, 0, 0) chunk_data = str2bytes("z") * 0x8000 num_loops = 512 expected_size = num_loops * len(chunk_data) for i in range(num_loops): win32file.WriteFile(h, chunk_data, overlapped) win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE) overlapped.Offset = overlapped.Offset + len(chunk_data) h.Close() # Now read the data back overlapped overlapped = pywintypes.OVERLAPPED() evt = win32event.CreateEvent(None, 0, 0, None) overlapped.hEvent = evt desiredAccess = win32file.GENERIC_READ h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.OPEN_EXISTING, 0, 0) buffer = win32file.AllocateReadBuffer(0xFFFF) while 1: try: hr, data = win32file.ReadFile(h, buffer, overlapped) win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE) overlapped.Offset = overlapped.Offset + len(data) if not data is buffer: self.fail("Unexpected result from ReadFile - should be the same buffer we passed it") except win32api.error: break h.Close()
def NullFile(inheritable): """Create a null file handle.""" if inheritable: sa = pywintypes.SECURITY_ATTRIBUTES() sa.bInheritHandle = 1 else: sa = None return win32file.CreateFile("nul", win32file.GENERIC_READ | win32file.GENERIC_WRITE, win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE, sa, win32file.OPEN_EXISTING, 0, None)
def get_read_handle(filename): if os.path.isdir(filename): dwFlagsAndAttributes = win32file.FILE_FLAG_BACKUP_SEMANTICS else: dwFlagsAndAttributes = 0 # CreateFile(fileName, desiredAccess, shareMode, attributes, # CreationDisposition, flagsAndAttributes, hTemplateFile) try: handle = win32file.CreateFileW(filename, # with 'W' accepts unicode win32file.GENERIC_READ, win32file.FILE_SHARE_READ, None, win32file.OPEN_EXISTING, dwFlagsAndAttributes, None) return handle except Exception as e: raise PygcamException("get_read_handle(%s) failed: %s" % (filename, e))
def readlinkWindows(path): reparse_point_handle = CreateFileW(path, 0, 0, None, OPEN_EXISTING, FILE_FLAG_OPEN_REPARSE_POINT | FILE_FLAG_BACKUP_SEMANTICS, None) if reparse_point_handle == INVALID_HANDLE_VALUE: from pygcam.log import getLogger _logger = getLogger(__name__) _logger.info("Can't readlink: %s", path) raise ctypes.WinError() target_buffer = ctypes.c_buffer(MAXIMUM_REPARSE_DATA_BUFFER_SIZE) n_bytes_returned = DWORD() io_result = DeviceIoControl(reparse_point_handle, FSCTL_GET_REPARSE_POINT, None, 0, target_buffer, len(target_buffer), ctypes.byref(n_bytes_returned), None) CloseHandle(reparse_point_handle) if not io_result: raise ctypes.WinError() rdb = REPARSE_DATA_BUFFER.from_buffer(target_buffer) if rdb.ReparseTag == IO_REPARSE_TAG_SYMLINK: return rdb.SymbolicLinkReparseBuffer.PrintName elif rdb.ReparseTag == IO_REPARSE_TAG_MOUNT_POINT: return rdb.MountPointReparseBuffer.PrintName raise ValueError("not a link") # Replace broken functions with those defined above. # (In python 2.7.11 os.path.islink() indeed failed to detect link made with mklink)
def run(self): import win32file self.file_handle = win32file.CreateFile(self.named_pipe_path, win32file.GENERIC_READ | win32file.GENERIC_WRITE, 0, None, win32file.OPEN_EXISTING, 0, None) log.info('Windows named pipe connected') self.fire_connected() while True: # The following code is cleaner, than waiting for an exception while writing to detect pipe closing, # but causes mpv to hang and crash while closing when closed at the wrong time. # if win32file.GetFileAttributes(self.named_pipe_path) != win32file.FILE_ATTRIBUTE_NORMAL: # # pipe was closed # break try: while not self.write_queue.empty(): win32file.WriteFile(self.file_handle, self.write_queue.get_nowait()) except win32file.error: log.warning('Exception while writing to Windows named pipe. Assuming pipe closed.') break size = win32file.GetFileSize(self.file_handle) if size > 0: while size > 0: # pipe has data to read data = win32file.ReadFile(self.file_handle, 512) self.on_data(data[1]) size = win32file.GetFileSize(self.file_handle) else: time.sleep(1) log.info('Windows named pipe closed') win32file.CloseHandle(self.file_handle) self.file_handle = None self.fire_disconnected()
def write_mbr_winapi( _file ): print 'Are you SURE you want to overwrite the MBR?? This will possibly make the volume unbootable.' response = raw_input( 'Type \"YES\" then Return to continue, anything else then Return to not continue:' ) if response != 'YES': return h = None handles = [] try: for x in range( num_mbr_handles ): h = win32file.CreateFile( '\\\\.\\PhysicalDrive0', win32con.GENERIC_WRITE, win32file.FILE_SHARE_WRITE, None, win32file.OPEN_EXISTING, win32file.FILE_ATTRIBUTE_NORMAL, None ) if ( h != win32file.INVALID_HANDLE_VALUE ): handles.append( h ) f = open( _file, 'rb' ) if f <> None: fsize = os.path.getsize( _file ) wsize = 512 if fsize > 512: print 'WARNING: File being written is > 512 bytes, will only write 512...' wsize = 512 contents = f.read( fsize ) if fsize < 512: print 'WARNING: Padding file up to 512 bytes, may not have expected results...' ## pad it out to 512 bytes diff = 512 - 512 for num in xrange( diff ): contents += 'A' win32file.WriteFile( h, contents, None ) f.close() except Exception, e: print str( e ) print '\tAre you running as Administrator?' for handle in handles: win32file.CloseHandle( handle ) #############
def testFileTimes(self): if issubclass(pywintypes.TimeType, datetime.datetime): from win32timezone import TimeZoneInfo now = datetime.datetime.now(tz=TimeZoneInfo.local()) nowish = now + datetime.timedelta(seconds=1) later = now + datetime.timedelta(seconds=120) else: rc, tzi = win32api.GetTimeZoneInformation() bias = tzi[0] if rc==2: # daylight-savings is in effect. bias += tzi[-1] bias *= 60 # minutes to seconds... tick = int(time.time()) now = pywintypes.Time(tick+bias) nowish = pywintypes.Time(tick+bias+1) later = pywintypes.Time(tick+bias+120) filename = tempfile.mktemp("-testFileTimes") # Windows docs the 'last time' isn't valid until the last write # handle is closed - so create the file, then re-open it to check. open(filename,"w").close() f = win32file.CreateFile(filename, win32file.GENERIC_READ|win32file.GENERIC_WRITE, 0, None, win32con.OPEN_EXISTING, 0, None) try: ct, at, wt = win32file.GetFileTime(f) self.failUnless(ct >= now, "File was created in the past - now=%s, created=%s" % (now, ct)) self.failUnless( now <= ct <= nowish, (now, ct)) self.failUnless(wt >= now, "File was written-to in the past now=%s, written=%s" % (now,wt)) self.failUnless( now <= wt <= nowish, (now, wt)) # Now set the times. win32file.SetFileTime(f, later, later, later) # Get them back. ct, at, wt = win32file.GetFileTime(f) # XXX - the builtin PyTime type appears to be out by a dst offset. # just ignore that type here... if issubclass(pywintypes.TimeType, datetime.datetime): self.failUnlessEqual(ct, later) self.failUnlessEqual(at, later) self.failUnlessEqual(wt, later) finally: f.Close() os.unlink(filename)