我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用threading._dangling()。
def modules_cleanup(oldmodules): # Encoders/decoders are registered permanently within the internal # codec cache. If we destroy the corresponding modules their # globals will be set to None which will trip up the cached functions. encodings = [(k, v) for k, v in sys.modules.items() if k.startswith('encodings.')] # Was: # sys.modules.clear() # Py2-compatible: for i in range(len(sys.modules)): sys.modules.pop() sys.modules.update(encodings) # XXX: This kind of problem can affect more than just encodings. In particular # extension modules (such as _ssl) don't cope with reloading properly. # Really, test modules should be cleaning out the test specific modules they # know they added (ala test_runpy) rather than relying on this function (as # test_importhooks and test_pkg do currently). # Implicitly imported *real* modules should be left alone (see issue 10556). sys.modules.update(oldmodules) #======================================================================= # Backported versions of threading_setup() and threading_cleanup() which don't refer # to threading._dangling (not available on Py2.7). # Threading support to prevent reporting refleaks when running regrtest.py -R # NOTE: we use thread._count() rather than threading.enumerate() (or the # moral equivalent thereof) because a threading.Thread object is still alive # until its __bootstrap() method has returned, even after it has been # unregistered from the threading module. # thread._count(), on the other hand, only gets decremented *after* the # __bootstrap() method has returned, which gives us reliable reference counts # at the end of a test run.
def threading_setup(): if _thread: return _thread._count(), threading._dangling.copy() else: return 1, ()
def threading_cleanup(*original_values): if not _thread: return _MAX_COUNT = 10 for count in range(_MAX_COUNT): values = _thread._count(), threading._dangling if values == original_values: break time.sleep(0.1) gc_collect() # XXX print a warning in case of failure?
def threading_cleanup(*original_values): if not _thread: return _MAX_COUNT = 100 for count in range(_MAX_COUNT): values = _thread._count(), threading._dangling if values == original_values: break time.sleep(0.01) gc_collect() # XXX print a warning in case of failure?