我们从Python开源项目中,提取了以下32个代码示例,用于说明如何使用imp.lock_held()。
def check_parallel_module_init(self): if imp.lock_held(): # This triggers on, e.g., from test import autotest. raise unittest.SkipTest("can't run when import lock is held") done = threading.Event() for N in (20, 50) * 3: if verbose: print("Trying", N, "threads ...", end=' ') # Make sure that random and modulefinder get reimported freshly for modname in ['random', 'modulefinder']: try: del sys.modules[modname] except KeyError: pass errors = [] done_tasks = [] done.clear() for i in range(N): thread.start_new_thread(task, (N, done, done_tasks, errors,)) done.wait(60) self.assertFalse(errors) if verbose: print("OK.")
def test_main(): # magic name! see above global N, done import imp if imp.lock_held(): # This triggers on, e.g., from test import autotest. raise unittest.SkipTest("can't run when import lock is held") done.acquire() for N in (20, 50) * 3: if verbose: print "Trying", N, "threads ...", for i in range(N): thread.start_new_thread(task, ()) done.acquire() if verbose: print "OK." done.release() test_import_hangers()
def verify_lock_state(self, expected): self.assertEqual(imp.lock_held(), expected, "expected imp.lock_held() to be %r" % expected)
def testLock(self): LOOPS = 50 # The import lock may already be held, e.g. if the test suite is run # via "import test.autotest". lock_held_at_start = imp.lock_held() self.verify_lock_state(lock_held_at_start) for i in range(LOOPS): imp.acquire_lock() self.verify_lock_state(True) for i in range(LOOPS): imp.release_lock() # The original state should be restored now. self.verify_lock_state(lock_held_at_start) if not lock_held_at_start: try: imp.release_lock() except RuntimeError: pass else: self.fail("release_lock() without lock should raise " "RuntimeError")
def test_main(): if imp.lock_held(): # If the import lock is held, the threads will hang raise unittest.SkipTest("can't run when import lock is held") test.support.run_unittest(SocketServerTest)
def test_main(): if imp.lock_held(): # If the import lock is held, the threads will hang raise unittest.SkipTest("can't run when import lock is held") test.test_support.run_unittest(SocketServerTest)
def find_module(self, name, path=None): # Simulate some thread-unsafe behaviour. If calls to find_module() # are properly serialized, `x` will end up the same as `numcalls`. # Otherwise not. assert imp.lock_held() with self.lock: self.numcalls += 1 x = self.x time.sleep(0.01) self.x = x + 1
def check_parallel_module_init(self): if imp.lock_held(): # This triggers on, e.g., from test import autotest. raise unittest.SkipTest("can't run when import lock is held") done = threading.Event() for N in (20, 50) * 3: if verbose: print("Trying", N, "threads ...", end=' ') # Make sure that random and modulefinder get reimported freshly for modname in ['random', 'modulefinder']: try: del sys.modules[modname] except KeyError: pass errors = [] done_tasks = [] done.clear() for i in range(N): t = threading.Thread(target=task, args=(N, done, done_tasks, errors,)) t.start() self.assertTrue(done.wait(60)) self.assertFalse(errors) if verbose: print("OK.")
def execute(meth, *args, **kwargs): """ Execute *meth* in a Python thread, blocking the current coroutine/ greenthread until the method completes. The primary use case for this is to wrap an object or module that is not amenable to monkeypatching or any of the other tricks that Eventlet uses to achieve cooperative yielding. With tpool, you can force such objects to cooperate with green threads by sticking them in native threads, at the cost of some overhead. """ setup() # if already in tpool, don't recurse into the tpool # also, call functions directly if we're inside an import lock, because # if meth does any importing (sadly common), it will hang my_thread = threading.currentThread() if my_thread in _threads or imp.lock_held() or _nthreads == 0: return meth(*args, **kwargs) e = event.Event() _reqq.put((e, meth, args, kwargs)) rv = e.wait() if isinstance(rv, tuple) \ and len(rv) == 3 \ and isinstance(rv[1], EXC_CLASSES): (c, e, tb) = rv if not QUIET: traceback.print_exception(c, e, tb) traceback.print_stack() six.reraise(c, e, tb) return rv
def __start_core(prefer, opts): # Load the JVM dynamic library from ._util import jvm_load jvm_load(prefer) # Start the JVM in its own thread import threading, imp from ._internal import jvm_create jvm = jvm_create() # if the JVM is already created, this will raise an exception event = threading.Event() locked = imp.lock_held() if locked: # If we are in the middle of an import (which is definitely possible) we can't start # another thread unless we release the import lock. If we do start the thread and then wait # for it to start, it will deadlock Python imp.release_lock() try: t = threading.Thread(target=jvm.run, name='JVM-Main', args=(opts,event)) t.daemon = True t.start() event.wait() # wait for it to be initialized finally: if locked: imp.acquire_lock() jvm.check_pending_exception() global __jvm __jvm = jvm