我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用win32event.ResetEvent()。
def serialReadEvent(self): #get that character we set up n = win32file.GetOverlappedResult(self._serial.hComPort, self._overlappedRead, 0) if n: first = str(self.read_buf[:n]) #now we should get everything that is already in the buffer flags, comstat = win32file.ClearCommError(self._serial.hComPort) if comstat.cbInQue: win32event.ResetEvent(self._overlappedRead.hEvent) rc, buf = win32file.ReadFile(self._serial.hComPort, win32file.AllocateReadBuffer(comstat.cbInQue), self._overlappedRead) n = win32file.GetOverlappedResult(self._serial.hComPort, self._overlappedRead, 1) #handle all the received data: self.protocol.dataReceived(first + str(buf[:n])) else: #handle all the received data: self.protocol.dataReceived(first) #set up next one win32event.ResetEvent(self._overlappedRead.hEvent) rc, self.read_buf = win32file.ReadFile(self._serial.hComPort, win32file.AllocateReadBuffer(1), self._overlappedRead)
def Connect(entryName, bUseCallback): if bUseCallback: theCallback = Callback win32event.ResetEvent(callbackEvent) else: theCallback = None # in order to *use* the username/password of a particular dun entry, one must # explicitly get those params under win95.... try: dp, b = win32ras.GetEntryDialParams( None, entryName ) except: print "Couldn't find DUN entry: %s" % entryName else: hras, rc = win32ras.Dial(None, None, (entryName, "", "", dp[ 3 ], dp[ 4 ], ""),theCallback) # hras, rc = win32ras.Dial(None, None, (entryName, ),theCallback) # print hras, rc if not bUseCallback and rc != 0: print "Could not dial the RAS connection:", win32ras.GetErrorString(rc) hras = HangUp( hras ) # don't wait here if there's no need to.... elif bUseCallback and win32event.WaitForSingleObject(callbackEvent, 60000)!=win32event.WAIT_OBJECT_0: print "Gave up waiting for the process to complete!" # sdk docs state one must explcitly hangup, even if there's an error.... try: cs = win32ras.GetConnectStatus( hras ) except: # on error, attempt a hang up anyway.... hras = HangUp( hras ) else: if int( cs[ 0 ] ) == win32ras.RASCS_Disconnected: hras = HangUp( hras ) return hras, rc
def testResetEvent(self): event = win32event.CreateEvent(None, True, True, None) self.assertSignaled(event) res = win32event.ResetEvent(event) self.assertEquals(res, None) self.assertNotSignaled(event) event.close() self.assertRaises(pywintypes.error, win32event.ResetEvent, event)
def Connect(entryName, bUseCallback): if bUseCallback: theCallback = Callback win32event.ResetEvent(callbackEvent) else: theCallback = None # in order to *use* the username/password of a particular dun entry, one must # explicitly get those params under win95.... try: dp, b = win32ras.GetEntryDialParams( None, entryName ) except: print("Couldn't find DUN entry: %s" % entryName) else: hras, rc = win32ras.Dial(None, None, (entryName, "", "", dp[ 3 ], dp[ 4 ], ""),theCallback) # hras, rc = win32ras.Dial(None, None, (entryName, ),theCallback) # print hras, rc if not bUseCallback and rc != 0: print("Could not dial the RAS connection:", win32ras.GetErrorString(rc)) hras = HangUp( hras ) # don't wait here if there's no need to.... elif bUseCallback and win32event.WaitForSingleObject(callbackEvent, 60000)!=win32event.WAIT_OBJECT_0: print("Gave up waiting for the process to complete!") # sdk docs state one must explcitly hangup, even if there's an error.... try: cs = win32ras.GetConnectStatus( hras ) except: # on error, attempt a hang up anyway.... hras = HangUp( hras ) else: if int( cs[ 0 ] ) == win32ras.RASCS_Disconnected: hras = HangUp( hras ) return hras, rc
def __init__(self, tagName='trader_msg', size = 1024*1024): self.fm = mmap.mmap(-1, size, access=mmap.ACCESS_WRITE, tagname=tagName) self.eventAu = win32event.OpenEvent(win32event.EVENT_ALL_ACCESS, 0 ,"aauto_trigger") self.eventPy = win32event.OpenEvent(win32event.EVENT_ALL_ACCESS, 0 ,"python_trigger") self.timeOut = 0xFFFFFFFF #???? win32event.ResetEvent(self.eventAu) win32event.ResetEvent(self.eventPy)
def sendCmd(self, cmd_id, arg=None): self.fm.seek(0) # ????? cmd = struct.pack('i', cmd_id) self.fm.write(cmd) # ????struct??? if arg: self.fm.write(arg) # ??AAuto?? win32event.ResetEvent(self.eventPy) win32event.SetEvent(self.eventAu)