我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用win32con.FILE_FLAG_OVERLAPPED。
def testTransactNamedPipeBlocking(self): event = threading.Event() self.startPipeServer(event) open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE hpipe = win32file.CreateFile(self.pipename, open_mode, 0, # no sharing None, # default security win32con.OPEN_EXISTING, 0, # win32con.FILE_FLAG_OVERLAPPED, None) # set to message mode. win32pipe.SetNamedPipeHandleState( hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None) hr, got = win32pipe.TransactNamedPipe(hpipe, str2bytes("foo\0bar"), 1024, None) self.failUnlessEqual(got, str2bytes("bar\0foo")) event.wait(5) self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
def testTransactNamedPipeBlockingBuffer(self): # Like testTransactNamedPipeBlocking, but a pre-allocated buffer is # passed (not really that useful, but it exercises the code path) event = threading.Event() self.startPipeServer(event) open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE hpipe = win32file.CreateFile(self.pipename, open_mode, 0, # no sharing None, # default security win32con.OPEN_EXISTING, 0, # win32con.FILE_FLAG_OVERLAPPED, None) # set to message mode. win32pipe.SetNamedPipeHandleState( hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None) buffer = win32file.AllocateReadBuffer(1024) hr, got = win32pipe.TransactNamedPipe(hpipe, str2bytes("foo\0bar"), buffer, None) self.failUnlessEqual(got, str2bytes("bar\0foo")) event.wait(5) self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
def setUp(self): self.watcher_threads = [] self.watcher_thread_changes = [] self.dir_names = [] self.dir_handles = [] for i in range(self.num_test_dirs): td = tempfile.mktemp("-test-directory-changes-%d" % i) os.mkdir(td) self.dir_names.append(td) hdir = win32file.CreateFile(td, ntsecuritycon.FILE_LIST_DIRECTORY, win32con.FILE_SHARE_READ, None, # security desc win32con.OPEN_EXISTING, win32con.FILE_FLAG_BACKUP_SEMANTICS | win32con.FILE_FLAG_OVERLAPPED, None) self.dir_handles.append(hdir) changes = [] t = threading.Thread(target=self._watcherThreadOverlapped, args=(td, hdir, changes)) t.start() self.watcher_threads.append(t) self.watcher_thread_changes.append(changes)
def __init__(self): self.pty = Pty() self.ready_f = Future() self._input_ready_callbacks = [] self.loop = get_event_loop() self.stdout_handle = win32file.CreateFile( self.pty.conout_name(), win32con.GENERIC_READ, 0, win32security.SECURITY_ATTRIBUTES(), win32con.OPEN_EXISTING, win32con.FILE_FLAG_OVERLAPPED, 0) self.stdin_handle = win32file.CreateFile( self.pty.conin_name(), win32con.GENERIC_WRITE, 0, win32security.SECURITY_ATTRIBUTES(), win32con.OPEN_EXISTING, win32con.FILE_FLAG_OVERLAPPED, 0) self._buffer = []
def testTransactNamedPipeAsync(self): event = threading.Event() overlapped = pywintypes.OVERLAPPED() overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None) self.startPipeServer(event, 0.5) open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE hpipe = win32file.CreateFile(self.pipename, open_mode, 0, # no sharing None, # default security win32con.OPEN_EXISTING, win32con.FILE_FLAG_OVERLAPPED, None) # set to message mode. win32pipe.SetNamedPipeHandleState( hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None) buffer = win32file.AllocateReadBuffer(1024) hr, got = win32pipe.TransactNamedPipe(hpipe, str2bytes("foo\0bar"), buffer, overlapped) self.failUnlessEqual(hr, winerror.ERROR_IO_PENDING) nbytes = win32file.GetOverlappedResult(hpipe, overlapped, True) got = buffer[:nbytes] self.failUnlessEqual(got, str2bytes("bar\0foo")) event.wait(5) self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
def testCompletionPortsNonQueued(self, test_overlapped_death = 0): # In 204 we had a reference count bug when OVERLAPPED objects were # associated with a completion port other than via # PostQueuedCompletionStatus. This test is based on the reproduction # reported with that bug. # Create the pipe. BUFSIZE = 512 pipe_name = r"\\.\pipe\pywin32_test_pipe" handle = win32pipe.CreateNamedPipe(pipe_name, win32pipe.PIPE_ACCESS_DUPLEX| win32file.FILE_FLAG_OVERLAPPED, win32pipe.PIPE_TYPE_MESSAGE| win32pipe.PIPE_READMODE_MESSAGE| win32pipe.PIPE_WAIT, 1, BUFSIZE, BUFSIZE, win32pipe.NMPWAIT_WAIT_FOREVER, None) # Create an IOCP and associate it with the handle. port = win32file.CreateIoCompletionPort(-1, 0, 0, 0) win32file.CreateIoCompletionPort(handle, port, 1, 0) t = threading.Thread(target=self._IOCPServerThread, args=(handle,port, test_overlapped_death)) t.setDaemon(True) # avoid hanging entire test suite on failure. t.start() try: time.sleep(0.1) # let thread do its thing. try: win32pipe.CallNamedPipe(r"\\.\pipe\pywin32_test_pipe", str2bytes("Hello there"), BUFSIZE, 0) except win32pipe.error: # Testing for overlapped death causes this if not test_overlapped_death: raise finally: if not test_overlapped_death: handle.Close() t.join(3) self.failIf(t.isAlive(), "thread didn't finish")
def HttpExtensionProc(self, ecb): # NOTE: If you use a ThreadPoolExtension, you must still perform # this check in HttpExtensionProc - raising the exception from # The "Dispatch" method will just cause the exception to be # rendered to the browser. if self.reload_watcher.change_detected: print "Doing reload" raise InternalReloadException if ecb.GetServerVariable("UNICODE_URL").endswith("test.py"): file_flags = win32con.FILE_FLAG_SEQUENTIAL_SCAN | win32con.FILE_FLAG_OVERLAPPED hfile = win32file.CreateFile(__file__, win32con.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, file_flags, None) flags = isapicon.HSE_IO_ASYNC | isapicon.HSE_IO_DISCONNECT_AFTER_SEND | \ isapicon.HSE_IO_SEND_HEADERS # We pass hFile to the callback simply as a way of keeping it alive # for the duration of the transmission try: ecb.TransmitFile(TransmitFileCallback, hfile, int(hfile), "200 OK", 0, 0, None, None, flags) except: # Errors keep this source file open! hfile.Close() raise else: # default response ecb.SendResponseHeaders("200 OK", "Content-Type: text/html\r\n\r\n", 0) print >> ecb, "<HTML><BODY>" print >> ecb, "The root of this site is at", ecb.MapURLToPath("/") print >> ecb, "</BODY></HTML>" ecb.close() return isapicon.HSE_STATUS_SUCCESS
def HttpExtensionProc(self, ecb): # NOTE: If you use a ThreadPoolExtension, you must still perform # this check in HttpExtensionProc - raising the exception from # The "Dispatch" method will just cause the exception to be # rendered to the browser. if self.reload_watcher.change_detected: print("Doing reload") raise InternalReloadException if ecb.GetServerVariable("UNICODE_URL").endswith("test.py"): file_flags = win32con.FILE_FLAG_SEQUENTIAL_SCAN | win32con.FILE_FLAG_OVERLAPPED hfile = win32file.CreateFile(__file__, win32con.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, file_flags, None) flags = isapicon.HSE_IO_ASYNC | isapicon.HSE_IO_DISCONNECT_AFTER_SEND | \ isapicon.HSE_IO_SEND_HEADERS # We pass hFile to the callback simply as a way of keeping it alive # for the duration of the transmission try: ecb.TransmitFile(TransmitFileCallback, hfile, int(hfile), "200 OK", 0, 0, None, None, flags) except: # Errors keep this source file open! hfile.Close() raise else: # default response ecb.SendResponseHeaders("200 OK", "Content-Type: text/html\r\n\r\n", 0) print("<HTML><BODY>", file=ecb) print("The root of this site is at", ecb.MapURLToPath("/"), file=ecb) print("</BODY></HTML>", file=ecb) ecb.close() return isapicon.HSE_STATUS_SUCCESS
def TestDeviceNotifications(dir_names): wc = win32gui.WNDCLASS() wc.lpszClassName = 'test_devicenotify' wc.style = win32con.CS_GLOBALCLASS|win32con.CS_VREDRAW | win32con.CS_HREDRAW wc.hbrBackground = win32con.COLOR_WINDOW+1 wc.lpfnWndProc={win32con.WM_DEVICECHANGE:OnDeviceChange} class_atom=win32gui.RegisterClass(wc) hwnd = win32gui.CreateWindow(wc.lpszClassName, 'Testing some devices', # no need for it to be visible. win32con.WS_CAPTION, 100,100,900,900, 0, 0, 0, None) hdevs = [] # Watch for all USB device notifications filter = win32gui_struct.PackDEV_BROADCAST_DEVICEINTERFACE( GUID_DEVINTERFACE_USB_DEVICE) hdev = win32gui.RegisterDeviceNotification(hwnd, filter, win32con.DEVICE_NOTIFY_WINDOW_HANDLE) hdevs.append(hdev) # and create handles for all specified directories for d in dir_names: hdir = win32file.CreateFile(d, winnt.FILE_LIST_DIRECTORY, winnt.FILE_SHARE_READ | winnt.FILE_SHARE_WRITE | winnt.FILE_SHARE_DELETE, None, # security attributes win32con.OPEN_EXISTING, win32con.FILE_FLAG_BACKUP_SEMANTICS | # required privileges: SE_BACKUP_NAME and SE_RESTORE_NAME. win32con.FILE_FLAG_OVERLAPPED, None) filter = win32gui_struct.PackDEV_BROADCAST_HANDLE(hdir) hdev = win32gui.RegisterDeviceNotification(hwnd, filter, win32con.DEVICE_NOTIFY_WINDOW_HANDLE) hdevs.append(hdev) # now start a message pump and wait for messages to be delivered. print("Watching", len(hdevs), "handles - press Ctrl+C to terminate, or") print("add and remove some USB devices...") if not dir_names: print("(Note you can also pass paths to watch on the command-line - eg,") print("pass the root of an inserted USB stick to see events specific to") print("that volume)") while 1: win32gui.PumpWaitingMessages() time.sleep(0.01) win32gui.DestroyWindow(hwnd) win32gui.UnregisterClass(wc.lpszClassName, None)
def TestDeviceNotifications(dir_names): wc = win32gui.WNDCLASS() wc.lpszClassName = 'test_devicenotify' wc.style = win32con.CS_GLOBALCLASS|win32con.CS_VREDRAW | win32con.CS_HREDRAW wc.hbrBackground = win32con.COLOR_WINDOW+1 wc.lpfnWndProc={win32con.WM_DEVICECHANGE:OnDeviceChange} class_atom=win32gui.RegisterClass(wc) hwnd = win32gui.CreateWindow(wc.lpszClassName, 'Testing some devices', # no need for it to be visible. win32con.WS_CAPTION, 100,100,900,900, 0, 0, 0, None) hdevs = [] # Watch for all USB device notifications filter = win32gui_struct.PackDEV_BROADCAST_DEVICEINTERFACE( GUID_DEVINTERFACE_USB_DEVICE) hdev = win32gui.RegisterDeviceNotification(hwnd, filter, win32con.DEVICE_NOTIFY_WINDOW_HANDLE) hdevs.append(hdev) # and create handles for all specified directories for d in dir_names: hdir = win32file.CreateFile(d, winnt.FILE_LIST_DIRECTORY, winnt.FILE_SHARE_READ | winnt.FILE_SHARE_WRITE | winnt.FILE_SHARE_DELETE, None, # security attributes win32con.OPEN_EXISTING, win32con.FILE_FLAG_BACKUP_SEMANTICS | # required privileges: SE_BACKUP_NAME and SE_RESTORE_NAME. win32con.FILE_FLAG_OVERLAPPED, None) filter = win32gui_struct.PackDEV_BROADCAST_HANDLE(hdir) hdev = win32gui.RegisterDeviceNotification(hwnd, filter, win32con.DEVICE_NOTIFY_WINDOW_HANDLE) hdevs.append(hdev) # now start a message pump and wait for messages to be delivered. print "Watching", len(hdevs), "handles - press Ctrl+C to terminate, or" print "add and remove some USB devices..." if not dir_names: print "(Note you can also pass paths to watch on the command-line - eg," print "pass the root of an inserted USB stick to see events specific to" print "that volume)" while 1: win32gui.PumpWaitingMessages() time.sleep(0.01) win32gui.DestroyWindow(hwnd) win32gui.UnregisterClass(wc.lpszClassName, None)