我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用win32api.GetCurrentThreadId()。
def DoTestInterpInThread(cookie): try: pythoncom.CoInitialize() myThread = win32api.GetCurrentThreadId() GIT = CreateGIT() interp = GIT.GetInterfaceFromGlobal(cookie, pythoncom.IID_IDispatch) interp = win32com.client.Dispatch(interp) TestInterp(interp) interp.Exec("import win32api") print "The test thread id is %d, Python.Interpreter's thread ID is %d" % (myThread, interp.Eval("win32api.GetCurrentThreadId()")) interp = None pythoncom.CoUninitialize() except: traceback.print_exc()
def test(fn): print "The main thread is %d" % (win32api.GetCurrentThreadId()) GIT = CreateGIT() interp = win32com.client.Dispatch("Python.Interpreter") cookie = GIT.RegisterInterfaceInGlobal(interp._oleobj_, pythoncom.IID_IDispatch) events = fn(4, cookie) numFinished = 0 while 1: try: rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT) if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events): numFinished = numFinished + 1 if numFinished >= len(events): break elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message # This is critical - whole apartment model demo will hang. pythoncom.PumpWaitingMessages() else: # Timeout print "Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount()) except KeyboardInterrupt: break GIT.RevokeInterfaceFromGlobal(cookie) del interp del GIT
def ResetAXDebugging(self): traceenter("ResetAXDebugging", self, "with refcount", len(self.recursiveData)) if win32api.GetCurrentThreadId()!=self.debuggingThread: trace("ResetAXDebugging called on other thread") return if len(self.recursiveData)==0: # print "ResetAXDebugging called for final time." self.logicalbotframe = None self.debuggingThread = None self.currentframe = None self.debuggingThreadStateHandle = None return self.logbotframe, self.stopframe, self.currentframe, self.debuggingThreadStateHandle = self.recursiveData[0] self.recursiveData = self.recursiveData[1:]
def HandleOutput(self,message): # debug("QueueOutput on thread %d, flags %d with '%s'...\n" % (win32api.GetCurrentThreadId(), self.writeQueueing, message )) self.outputQueue.put(message) if win32api.GetCurrentThreadId() != self.mainThreadId: pass # debug("not my thread - ignoring queue options!\n") elif self.writeQueueing==flags.WQ_LINE: pos = message.rfind('\n') if pos>=0: # debug("Line queueing - forcing flush\n") self.QueueFlush() return elif self.writeQueueing==flags.WQ_NONE: # debug("WQ_NONE - flushing!\n") self.QueueFlush() return # Let our idle handler get it - wake it up try: win32ui.GetMainFrame().PostMessage(win32con.WM_USER) # Kick main thread off. except win32ui.error: # This can happen as the app is shutting down, so we send it to the C++ debugger win32api.OutputDebugString(message) # delegate certain fns to my view.
def DoTestInterpInThread(cookie): try: pythoncom.CoInitialize() myThread = win32api.GetCurrentThreadId() GIT = CreateGIT() interp = GIT.GetInterfaceFromGlobal(cookie, pythoncom.IID_IDispatch) interp = win32com.client.Dispatch(interp) TestInterp(interp) interp.Exec("import win32api") print("The test thread id is %d, Python.Interpreter's thread ID is %d" % (myThread, interp.Eval("win32api.GetCurrentThreadId()"))) interp = None pythoncom.CoUninitialize() except: traceback.print_exc()
def test(fn): print("The main thread is %d" % (win32api.GetCurrentThreadId())) GIT = CreateGIT() interp = win32com.client.Dispatch("Python.Interpreter") cookie = GIT.RegisterInterfaceInGlobal(interp._oleobj_, pythoncom.IID_IDispatch) events = fn(4, cookie) numFinished = 0 while 1: try: rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT) if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events): numFinished = numFinished + 1 if numFinished >= len(events): break elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message # This is critical - whole apartment model demo will hang. pythoncom.PumpWaitingMessages() else: # Timeout print("Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount())) except KeyboardInterrupt: break GIT.RevokeInterfaceFromGlobal(cookie) del interp del GIT
def serve(clsids): infos = factory.RegisterClassFactories(clsids) pythoncom.EnableQuitMessage(win32api.GetCurrentThreadId()) pythoncom.CoResumeClassObjects() pythoncom.PumpMessages() factory.RevokeClassFactories( infos ) pythoncom.CoUninitialize()
def _doTestInThread(self, interp): pythoncom.CoInitialize() myThread = win32api.GetCurrentThreadId() if freeThreaded: interp = pythoncom.CoGetInterfaceAndReleaseStream(interp, pythoncom.IID_IDispatch) interp = win32com.client.Dispatch(interp) interp.Exec("import win32api") #print "The test thread id is %d, Python.Interpreter's thread ID is %d" % (myThread, interp.Eval("win32api.GetCurrentThreadId()")) pythoncom.CoUninitialize()
def _DoTestMarshal(self, fn, bCoWait = 0): #print "The main thread is %d" % (win32api.GetCurrentThreadId()) threads, events = fn(2) numFinished = 0 while 1: try: if bCoWait: rc = pythoncom.CoWaitForMultipleHandles(0, 2000, events) else: # Specifying "bWaitAll" here will wait for messages *and* all events # (which is pretty useless) rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT) if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events): numFinished = numFinished + 1 if numFinished >= len(events): break elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message # This is critical - whole apartment model demo will hang. pythoncom.PumpWaitingMessages() else: # Timeout print "Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount()) except KeyboardInterrupt: break for t in threads: t.join(2) self.failIf(t.isAlive(), "thread failed to stop!?") threads = None # threads hold references to args # Seems to be a leak here I can't locate :( #self.failUnlessEqual(pythoncom._GetInterfaceCount(), 0) #self.failUnlessEqual(pythoncom._GetGatewayCount(), 0)
def OnQuit(self): thread = win32api.GetCurrentThreadId() print "OnQuit event processed on thread %d"%thread win32event.SetEvent(self.event)
def TestExplorerEvents(): iexplore = win32com.client.DispatchWithEvents( "InternetExplorer.Application", ExplorerEvents) thread = win32api.GetCurrentThreadId() print 'TestExplorerEvents created IE object on thread %d'%thread iexplore.Visible = 1 try: iexplore.Navigate(win32api.GetFullPathName('..\\readme.htm')) except pythoncom.com_error, details: print "Warning - could not open the test HTML file", details # Wait for the event to be signalled while pumping messages. if not WaitWhileProcessingMessages(iexplore.event): print "Document load event FAILED to fire!!!" iexplore.Quit() # # Give IE a chance to shutdown, else it can get upset on fast machines. # Note, Quit generates events. Although this test does NOT catch them # it is NECESSARY to pump messages here instead of a sleep so that the Quit # happens properly! if not WaitWhileProcessingMessages(iexplore.event): print "OnQuit event FAILED to fire!!!" iexplore = None
def OnDocumentComplete(self, pDisp=pythoncom.Empty, URL=pythoncom.Empty): # # Caution: Since the main thread and events thread(s) are different # it may be necessary to serialize access to shared data. Because # this is a simple test case, that is not required here. Your # situation may be different. Caveat programmer. # thread = win32api.GetCurrentThreadId() print "OnDocumentComplete event processed on thread %d"%thread # Set the event our main thread is waiting on. win32event.SetEvent(self.event)
def TestExplorerEvents(): iexplore = win32com.client.DispatchWithEvents( "InternetExplorer.Application", ExplorerEvents) thread = win32api.GetCurrentThreadId() print 'TestExplorerEvents created IE object on thread %d'%thread iexplore.Visible = 1 try: iexplore.Navigate(win32api.GetFullPathName('..\\readme.htm')) except pythoncom.com_error, details: print "Warning - could not open the test HTML file", details # In this free-threaded example, we can simply wait until an event has # been set - we will give it 2 seconds before giving up. rc = win32event.WaitForSingleObject(iexplore.event, 2000) if rc != win32event.WAIT_OBJECT_0: print "Document load event FAILED to fire!!!" iexplore.Quit() # Now we can do the same thing to wait for exit! # Although Quit generates events, in this free-threaded world we # do *not* need to run any message pumps. rc = win32event.WaitForSingleObject(iexplore.event, 2000) if rc != win32event.WAIT_OBJECT_0: print "OnQuit event FAILED to fire!!!" iexplore = None print "Finished the IE event sample!"
def waitForEvents(self, timeout): from win32api import GetCurrentThreadId from win32event import INFINITE from win32event import MsgWaitForMultipleObjects, \ QS_ALLINPUT, WAIT_TIMEOUT, WAIT_OBJECT_0 from pythoncom import PumpWaitingMessages import types if not isinstance(timeout, int): raise TypeError("The timeout argument is not an integer") if self.tid != GetCurrentThreadId(): raise Exception("wait for events from the same thread you inited!") if timeout < 0: cMsTimeout = INFINITE else: cMsTimeout = timeout rc = MsgWaitForMultipleObjects(self.handles, 0, cMsTimeout, QS_ALLINPUT) if WAIT_OBJECT_0 <= rc < WAIT_OBJECT_0 + len(self.handles): # is it possible? rc = 2 elif rc == WAIT_OBJECT_0 + len(self.handles): # Waiting messages PumpWaitingMessages() rc = 0 else: # Timeout rc = 1 # check for interruption self.oIntCv.acquire() if self.fInterrupted: self.fInterrupted = False rc = 1 self.oIntCv.release() return rc
def trace(*args): """A function used instead of "print" for debugging output. """ if not debuggingTrace: return print win32api.GetCurrentThreadId(), for arg in args: print arg, print # Note that the DebugManager is not a COM gateway class for the # debugger - but it does create and manage them.
def SetupAXDebugging(self, baseFrame = None, userFrame = None): """Get ready for potential debugging. Must be called on the thread that is being debugged. """ # userFrame is for non AXScript debugging. This is the first frame of the # users code. if userFrame is None: userFrame = baseFrame else: # We have missed the "dispatch_call" function, so set this up now! userFrame.f_locals['__axstack_address__'] = axdebug.GetStackAddress() traceenter("SetupAXDebugging", self) self._threadprotectlock.acquire() try: thisThread = win32api.GetCurrentThreadId() if self.debuggingThread is None: self.debuggingThread = thisThread else: if self.debuggingThread!=thisThread: trace("SetupAXDebugging called on other thread - ignored!") return # push our context. self.recursiveData.insert(0, (self.logicalbotframe,self.stopframe, self.currentframe,self.debuggingThreadStateHandle)) finally: self._threadprotectlock.release() trace("SetupAXDebugging has base frame as", _dumpf(baseFrame)) self.botframe = baseFrame self.stopframe = userFrame self.logicalbotframe = baseFrame self.currentframe = None self.debuggingThreadStateHandle = axdebug.GetThreadStateHandle() self._BreakFlagsChanged() # RemoteDebugApplicationEvents
def _BreakFlagsChanged(self): traceenter("_BreakFlagsChanged to %s with our thread = %s, and debugging thread = %s" % (self.breakFlags, self.debuggingThread, win32api.GetCurrentThreadId())) trace("_BreakFlagsChanged has breaks", self.breaks) # If a request comes on our debugging thread, then do it now! # if self.debuggingThread!=win32api.GetCurrentThreadId(): # return if len(self.breaks) or self.breakFlags: if self.logicalbotframe: trace("BreakFlagsChange with bot frame", _dumpf(self.logicalbotframe)) # We have frames not to be debugged (eg, Scripting engine frames # (sys.settrace will be set when out logicalbotframe is hit - # this may not be the right thing to do, as it may not cause the # immediate break we desire.) self.logicalbotframe.f_trace = self.trace_dispatch else: trace("BreakFlagsChanged, but no bottom frame") if self.stopframe is not None: self.stopframe.f_trace = self.trace_dispatch # If we have the thread-state for the thread being debugged, then # we dynamically set its trace function - it is possible that the thread # being debugged is in a blocked call (eg, a message box) and we # want to hit the debugger the instant we return if self.debuggingThreadStateHandle is not None and \ self.breakFlags and \ self.debuggingThread != win32api.GetCurrentThreadId(): axdebug.SetThreadStateTrace(self.debuggingThreadStateHandle, self.trace_dispatch)
def trace(*args): if not debugging: return print str(win32api.GetCurrentThreadId()) + ":", for arg in args: print arg, print # The AXDebugging implementation assumes that the returned COM pointers are in # some cases identical. Eg, from a C++ perspective: # p->GetSomeInterface( &p1 ); # p->GetSomeInterface( &p2 ); # p1==p2 # By default, this is _not_ true for Python. # (Now this is only true for Document objects, and Python # now does ensure this.
def thread_test(o): for i in range(5): o.write("Hi from thread %d\n" % (win32api.GetCurrentThreadId())) win32api.Sleep(100)
def __init__(self, title, msg = "", maxticks = 100, tickincr = 1): self.title = title self.msg = msg self.threadid = win32api.GetCurrentThreadId() CStatusProgressDialog.__init__(self, title, msg, maxticks, tickincr)
def OnPaint (self): # print "Paint message from thread", win32api.GetCurrentThreadId() dc, paintStruct = self.BeginPaint() self.OnPrepareDC(dc, None) if (self.width == 0 and self.height == 0): left, top, right, bottom = self.GetClientRect() self.width = right - left self.height = bottom - top x, y = self.width // 2, self.height // 2 dc.TextOut (x, y, self.text[:self.index]) self.EndPaint(paintStruct)
def OnDocumentComplete(self, pDisp=pythoncom.Empty, URL=pythoncom.Empty): thread = win32api.GetCurrentThreadId() print "OnDocumentComplete event processed on thread %d"%thread # Set the event our main thread is waiting on. win32event.SetEvent(self.event)
def trace(*args): """A function used instead of "print" for debugging output. """ if not debuggingTrace: return print(win32api.GetCurrentThreadId(), end=' ') for arg in args: print(arg, end=' ') print() # Note that the DebugManager is not a COM gateway class for the # debugger - but it does create and manage them.
def trace(*args): if not debugging: return print(str(win32api.GetCurrentThreadId()) + ":", end=' ') for arg in args: print(arg, end=' ') print() # The AXDebugging implementation assumes that the returned COM pointers are in # some cases identical. Eg, from a C++ perspective: # p->GetSomeInterface( &p1 ); # p->GetSomeInterface( &p2 ); # p1==p2 # By default, this is _not_ true for Python. # (Now this is only true for Document objects, and Python # now does ensure this.