我们从Python开源项目中,提取了以下40个代码示例,用于说明如何使用win32file.WriteFile()。
def SimpleFileDemo(): testName = os.path.join( win32api.GetTempPath(), "win32file_demo_test_file") if os.path.exists(testName): os.unlink(testName) # Open the file for writing. handle = win32file.CreateFile(testName, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None) test_data = "Hello\0there".encode("ascii") win32file.WriteFile(handle, test_data) handle.Close() # Open it for reading. handle = win32file.CreateFile(testName, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None) rc, data = win32file.ReadFile(handle, 1024) handle.Close() if data == test_data: print "Successfully wrote and read a file" else: raise Exception("Got different data back???") os.unlink(testName)
def testSimpleFiles(self): fd, filename = tempfile.mkstemp() os.close(fd) os.unlink(filename) handle = win32file.CreateFile(filename, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None) test_data = str2bytes("Hello\0there") try: win32file.WriteFile(handle, test_data) handle.Close() # Try and open for read handle = win32file.CreateFile(filename, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None) rc, data = win32file.ReadFile(handle, 1024) self.assertEquals(data, test_data) finally: handle.Close() try: os.unlink(filename) except os.error: pass # A simple test using normal read/write operations.
def _IOCPServerThread(self, handle, port, drop_overlapped_reference): overlapped = pywintypes.OVERLAPPED() win32pipe.ConnectNamedPipe(handle, overlapped) if drop_overlapped_reference: # Be naughty - the overlapped object is now dead, but # GetQueuedCompletionStatus will still find it. Our check of # reference counting should catch that error. overlapped = None # even if we fail, be sure to close the handle; prevents hangs # on Vista 64... try: self.failUnlessRaises(RuntimeError, win32file.GetQueuedCompletionStatus, port, -1) finally: handle.Close() return result = win32file.GetQueuedCompletionStatus(port, -1) ol2 = result[-1] self.failUnless(ol2 is overlapped) data = win32file.ReadFile(handle, 512)[1] win32file.WriteFile(handle, data)
def SimpleFileDemo(): testName = os.path.join( win32api.GetTempPath(), "win32file_demo_test_file") if os.path.exists(testName): os.unlink(testName) # Open the file for writing. handle = win32file.CreateFile(testName, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None) test_data = "Hello\0there".encode("ascii") win32file.WriteFile(handle, test_data) handle.Close() # Open it for reading. handle = win32file.CreateFile(testName, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None) rc, data = win32file.ReadFile(handle, 1024) handle.Close() if data == test_data: print("Successfully wrote and read a file") else: raise Exception("Got different data back???") os.unlink(testName)
def WriteFile(handle, data, ol = None): c_written = DWORD() success = ctypes.windll.kernel32.WriteFile(handle, ctypes.create_string_buffer(encode(data)), len(data), ctypes.byref(c_written), ol) return ctypes.windll.kernel32.GetLastError(), c_written.value
def send(self, input): if not self.stdin: return None try: x = msvcrt.get_osfhandle(self.stdin.fileno()) (errCode, written) = WriteFile(x, input) except ValueError: return self._close('stdin') except (subprocess.pywintypes.error, Exception): if geterror()[0] in (109, errno.ESHUTDOWN): return self._close('stdin') raise return written
def checkWork(self): numBytesWritten = 0 if not self.outQueue: if self.disconnecting: self.writeConnectionLost() return 0 try: win32file.WriteFile(self.writePipe, '', None) except pywintypes.error: self.writeConnectionLost() return numBytesWritten while self.outQueue: data = self.outQueue.pop(0) errCode = 0 try: errCode, nBytesWritten = win32file.WriteFile(self.writePipe, data, None) except win32api.error: self.writeConnectionLost() break else: # assert not errCode, "wtf an error code???" numBytesWritten += nBytesWritten if len(data) > nBytesWritten: self.outQueue.insert(0, data[nBytesWritten:]) break else: resumed = self.bufferEmpty() if not resumed and self.disconnecting: self.writeConnectionLost() return numBytesWritten
def write(self, data): if data: if self.writeInProgress: self.outQueue.append(data) else: self.writeInProgress = 1 win32file.WriteFile(self._serial.hComPort, data, self._overlappedWrite)
def serialWriteEvent(self): try: dataToWrite = self.outQueue.pop(0) except IndexError: self.writeInProgress = 0 return else: win32file.WriteFile(self._serial.hComPort, dataToWrite, self._overlappedWrite)
def WriteFile(handle, s): return _WriteFile(handle, s.encode('ascii'))
def send(self, input): if not self.stdin: return None try: x = msvcrt.get_osfhandle(self.stdin.fileno()) (errCode, written) = WriteFile(x, input) except ValueError: return self._close('stdin') except (subprocess.pywintypes.error, Exception), why: if why[0] in (109, errno.ESHUTDOWN): return self._close('stdin') raise return written
def send(self, string, flags=0): err, nbytes = win32file.WriteFile(self._handle, string) return nbytes
def _serverThread(self, pipe_handle, event, wait_time): # just do one connection and terminate. hr = win32pipe.ConnectNamedPipe(pipe_handle) self.failUnless(hr in (0, winerror.ERROR_PIPE_CONNECTED), "Got error code 0x%x" % (hr,)) hr, got = win32file.ReadFile(pipe_handle, 100) self.failUnlessEqual(got, str2bytes("foo\0bar")) time.sleep(wait_time) win32file.WriteFile(pipe_handle, str2bytes("bar\0foo")) pipe_handle.Close() event.set()
def testMoreFiles(self): # Create a file in the %TEMP% directory. testName = os.path.join( win32api.GetTempPath(), "win32filetest.dat" ) desiredAccess = win32file.GENERIC_READ | win32file.GENERIC_WRITE # Set a flag to delete the file automatically when it is closed. fileFlags = win32file.FILE_FLAG_DELETE_ON_CLOSE h = win32file.CreateFile( testName, desiredAccess, win32file.FILE_SHARE_READ, None, win32file.CREATE_ALWAYS, fileFlags, 0) # Write a known number of bytes to the file. data = str2bytes("z") * 1025 win32file.WriteFile(h, data) self.failUnless(win32file.GetFileSize(h) == len(data), "WARNING: Written file does not have the same size as the length of the data in it!") # Ensure we can read the data back. win32file.SetFilePointer(h, 0, win32file.FILE_BEGIN) hr, read_data = win32file.ReadFile(h, len(data)+10) # + 10 to get anything extra self.failUnless(hr==0, "Readfile returned %d" % hr) self.failUnless(read_data == data, "Read data is not what we wrote!") # Now truncate the file at 1/2 its existing size. newSize = len(data)//2 win32file.SetFilePointer(h, newSize, win32file.FILE_BEGIN) win32file.SetEndOfFile(h) self.failUnlessEqual(win32file.GetFileSize(h), newSize) # GetFileAttributesEx/GetFileAttributesExW tests. self.failUnlessEqual(win32file.GetFileAttributesEx(testName), win32file.GetFileAttributesExW(testName)) attr, ct, at, wt, size = win32file.GetFileAttributesEx(testName) self.failUnless(size==newSize, "Expected GetFileAttributesEx to return the same size as GetFileSize()") self.failUnless(attr==win32file.GetFileAttributes(testName), "Expected GetFileAttributesEx to return the same attributes as GetFileAttributes") h = None # Close the file by removing the last reference to the handle! self.failUnless(not os.path.isfile(testName), "After closing the file, it still exists!")
def testFilePointer(self): # via [ 979270 ] SetFilePointer fails with negative offset # Create a file in the %TEMP% directory. filename = os.path.join( win32api.GetTempPath(), "win32filetest.dat" ) f = win32file.CreateFile(filename, win32file.GENERIC_READ|win32file.GENERIC_WRITE, 0, None, win32file.CREATE_ALWAYS, win32file.FILE_ATTRIBUTE_NORMAL, 0) try: #Write some data data = str2bytes('Some data') (res, written) = win32file.WriteFile(f, data) self.failIf(res) self.assertEqual(written, len(data)) #Move at the beginning and read the data win32file.SetFilePointer(f, 0, win32file.FILE_BEGIN) (res, s) = win32file.ReadFile(f, len(data)) self.failIf(res) self.assertEqual(s, data) #Move at the end and read the data win32file.SetFilePointer(f, -len(data), win32file.FILE_END) (res, s) = win32file.ReadFile(f, len(data)) self.failIf(res) self.failUnlessEqual(s, data) finally: f.Close() os.unlink(filename)
def checkWork(self): numBytesWritten = 0 if not self.outQueue: if self.disconnecting: self.writeConnectionLost() return 0 try: win32file.WriteFile(self.writePipe, b'', None) except pywintypes.error: self.writeConnectionLost() return numBytesWritten while self.outQueue: data = self.outQueue.pop(0) errCode = 0 try: errCode, nBytesWritten = win32file.WriteFile(self.writePipe, data, None) except win32api.error: self.writeConnectionLost() break else: # assert not errCode, "wtf an error code???" numBytesWritten += nBytesWritten if len(data) > nBytesWritten: self.outQueue.insert(0, data[nBytesWritten:]) break else: resumed = self.bufferEmpty() if not resumed and self.disconnecting: self.writeConnectionLost() return numBytesWritten
def write(self, offset, data): win32file.SetFilePointer(self.fhandle, offset, 0) # The WinPmem driver returns bytes_written == 0 always. This is probably # a bug in its write routine, so we ignore it here. If the operation was # successful we assume all bytes were written. err, _bytes_written = win32file.WriteFile(self.fhandle, data) if err == 0: return len(data) return 0
def write_text(self, text): " Write text to the stdin of the process. " win32file.WriteFile(self.stdin_handle, text.encode('utf-8'))
def write(self, string): win32file.WriteFile(self.Fd, string)
def run(self): import win32file self.file_handle = win32file.CreateFile(self.named_pipe_path, win32file.GENERIC_READ | win32file.GENERIC_WRITE, 0, None, win32file.OPEN_EXISTING, 0, None) log.info('Windows named pipe connected') self.fire_connected() while True: # The following code is cleaner, than waiting for an exception while writing to detect pipe closing, # but causes mpv to hang and crash while closing when closed at the wrong time. # if win32file.GetFileAttributes(self.named_pipe_path) != win32file.FILE_ATTRIBUTE_NORMAL: # # pipe was closed # break try: while not self.write_queue.empty(): win32file.WriteFile(self.file_handle, self.write_queue.get_nowait()) except win32file.error: log.warning('Exception while writing to Windows named pipe. Assuming pipe closed.') break size = win32file.GetFileSize(self.file_handle) if size > 0: while size > 0: # pipe has data to read data = win32file.ReadFile(self.file_handle, 512) self.on_data(data[1]) size = win32file.GetFileSize(self.file_handle) else: time.sleep(1) log.info('Windows named pipe closed') win32file.CloseHandle(self.file_handle) self.file_handle = None self.fire_disconnected()
def send(self, input): if not self.stdin: return None try: x = msvcrt.get_osfhandle(self.stdin.fileno()) (errCode, written) = WriteFile(x, input) except ValueError: print("close stdin") return self._close('stdin') except (subprocess.pywintypes.error, Exception) as why: if why[0] in (109, errno.ESHUTDOWN): print("close stdin") return self._close('stdin') raise return written
def write_mbr_winapi( _file ): print 'Are you SURE you want to overwrite the MBR?? This will possibly make the volume unbootable.' response = raw_input( 'Type \"YES\" then Return to continue, anything else then Return to not continue:' ) if response != 'YES': return h = None handles = [] try: for x in range( num_mbr_handles ): h = win32file.CreateFile( '\\\\.\\PhysicalDrive0', win32con.GENERIC_WRITE, win32file.FILE_SHARE_WRITE, None, win32file.OPEN_EXISTING, win32file.FILE_ATTRIBUTE_NORMAL, None ) if ( h != win32file.INVALID_HANDLE_VALUE ): handles.append( h ) f = open( _file, 'rb' ) if f <> None: fsize = os.path.getsize( _file ) wsize = 512 if fsize > 512: print 'WARNING: File being written is > 512 bytes, will only write 512...' wsize = 512 contents = f.read( fsize ) if fsize < 512: print 'WARNING: Padding file up to 512 bytes, may not have expected results...' ## pad it out to 512 bytes diff = 512 - 512 for num in xrange( diff ): contents += 'A' win32file.WriteFile( h, contents, None ) f.close() except Exception, e: print str( e ) print '\tAre you running as Administrator?' for handle in handles: win32file.CloseHandle( handle ) #############
def _stub(cmd_name, stdin_name, stdout_name, stderr_name): """INTERNAL: Stub process that will start up the child process.""" # Open the 4 pipes (command, stdin, stdout, stderr) cmd_pipe = CreateFile(cmd_name, GENERIC_READ|GENERIC_WRITE, 0, None, OPEN_EXISTING, 0, None) SetHandleInformation(cmd_pipe, HANDLE_FLAG_INHERIT, 1) stdin_pipe = CreateFile(stdin_name, GENERIC_READ, 0, None, OPEN_EXISTING, 0, None) SetHandleInformation(stdin_pipe, HANDLE_FLAG_INHERIT, 1) stdout_pipe = CreateFile(stdout_name, GENERIC_WRITE, 0, None, OPEN_EXISTING, 0, None) SetHandleInformation(stdout_pipe, HANDLE_FLAG_INHERIT, 1) stderr_pipe = CreateFile(stderr_name, GENERIC_WRITE, 0, None, OPEN_EXISTING, 0, None) SetHandleInformation(stderr_pipe, HANDLE_FLAG_INHERIT, 1) # Learn what we need to do.. header = _read_header(cmd_pipe) input = _parse_header(header) if 'command' not in input or 'args' not in input: ExitProcess(2) # http://msdn.microsoft.com/en-us/library/ms682499(VS.85).aspx startupinfo = STARTUPINFO() startupinfo.dwFlags |= STARTF_USESTDHANDLES | STARTF_USESHOWWINDOW startupinfo.hStdInput = stdin_pipe startupinfo.hStdOutput = stdout_pipe startupinfo.hStdError = stderr_pipe startupinfo.wShowWindow = SW_HIDE # Grant access so that our parent can open its grandchild. if 'parent_sid' in input: mysid = _get_current_sid() parent = ConvertStringSidToSid(input['parent_sid']) sattrs = _create_security_attributes(mysid, parent, access=PROCESS_ALL_ACCESS) else: sattrs = None try: res = CreateProcess(input['command'], input['args'], sattrs, None, True, CREATE_NEW_CONSOLE, os.environ, os.getcwd(), startupinfo) except WindowsError, e: message = _quote_header(str(e)) WriteFile(cmd_pipe, 'status=error\nmessage=%s\n\n' % message) ExitProcess(3) else: pid = res[2] # Pass back results and exit err, nbytes = WriteFile(cmd_pipe, 'status=ok\npid=%s\n\n' % pid) ExitProcess(0)
def SpoolWorker(srcHandle, destHandle, outFiles, doneEvent): """Thread entry point for implementation of MakeSpyPipe""" try: buffer = win32file.AllocateReadBuffer(SPOOL_BYTES) while 1: try: #print >> SPOOL_ERROR, "Calling ReadFile..."; SPOOL_ERROR.flush() hr, data = win32file.ReadFile(srcHandle, buffer) #print >> SPOOL_ERROR, "ReadFile returned '%s', '%s'" % (str(hr), str(data)); SPOOL_ERROR.flush() if hr != 0: raise Exception("win32file.ReadFile returned %i, '%s'" % (hr, data)) elif len(data) == 0: break except pywintypes.error, e: #print >> SPOOL_ERROR, "ReadFile threw '%s'" % str(e); SPOOL_ERROR.flush() if e.args[0] == winerror.ERROR_BROKEN_PIPE: break else: raise e #print >> SPOOL_ERROR, "Writing to %i file objects..." % len(outFiles); SPOOL_ERROR.flush() for f in outFiles: f.write(data) #print >> SPOOL_ERROR, "Done writing to file objects."; SPOOL_ERROR.flush() #print >> SPOOL_ERROR, "Writing to destination %s" % str(destHandle); SPOOL_ERROR.flush() if destHandle: #print >> SPOOL_ERROR, "Calling WriteFile..."; SPOOL_ERROR.flush() hr, bytes = win32file.WriteFile(destHandle, data) #print >> SPOOL_ERROR, "WriteFile() passed %i bytes and returned %i, %i" % (len(data), hr, bytes); SPOOL_ERROR.flush() if hr != 0 or bytes != len(data): raise Exception("win32file.WriteFile() passed %i bytes and returned %i, %i" % (len(data), hr, bytes)) srcHandle.Close() if doneEvent: win32event.SetEvent(doneEvent) if destHandle: destHandle.Close() except: info = sys.exc_info() SPOOL_ERROR.writelines(apply(traceback.format_exception, info), '') SPOOL_ERROR.flush() del info