我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用_dummy_thread.start_new_thread()。
def test_uncond_acquire_blocking(self): #Make sure that unconditional acquiring of a locked lock blocks. def delay_unlock(to_unlock, delay): """Hold on to lock for a set amount of time before unlocking.""" time.sleep(delay) to_unlock.release() self.lock.acquire() start_time = int(time.time()) _thread.start_new_thread(delay_unlock,(self.lock, DELAY)) if support.verbose: print() print("*** Waiting for thread to release the lock "\ "(approx. %s sec.) ***" % DELAY) self.lock.acquire() end_time = int(time.time()) if support.verbose: print("done") self.assertTrue((end_time - start_time) >= DELAY, "Blocking by unconditional acquiring failed.")
def test_arg_passing(self): #Make sure that parameter passing works. def arg_tester(queue, arg1=False, arg2=False): """Use to test _thread.start_new_thread() passes args properly.""" queue.put((arg1, arg2)) testing_queue = queue.Queue(1) _thread.start_new_thread(arg_tester, (testing_queue, True, True)) result = testing_queue.get() self.assertTrue(result[0] and result[1], "Argument passing for thread creation using tuple failed") _thread.start_new_thread(arg_tester, tuple(), {'queue':testing_queue, 'arg1':True, 'arg2':True}) result = testing_queue.get() self.assertTrue(result[0] and result[1], "Argument passing for thread creation using kwargs failed") _thread.start_new_thread(arg_tester, (testing_queue, True), {'arg2':True}) result = testing_queue.get() self.assertTrue(result[0] and result[1], "Argument passing for thread creation using both tuple" " and kwargs failed")
def start_new_thread(function, args, kwargs={}): """Dummy implementation of _thread.start_new_thread(). Compatibility is maintained by making sure that ``args`` is a tuple and ``kwargs`` is a dictionary. If an exception is raised and it is SystemExit (which can be done by _thread.exit()) it is caught and nothing is done; all other exceptions are printed out by using traceback.print_exc(). If the executed function calls interrupt_main the KeyboardInterrupt will be raised when the function returns. """ if type(args) != type(tuple()): raise TypeError("2nd arg must be a tuple") if type(kwargs) != type(dict()): raise TypeError("3rd arg must be a dict") global _main _main = False try: function(*args, **kwargs) except SystemExit: pass except: import traceback traceback.print_exc() _main = True global _interrupt if _interrupt: _interrupt = False raise KeyboardInterrupt
def interrupt_main(): """Set _interrupt flag to True to have start_new_thread raise KeyboardInterrupt upon exiting.""" if _main: raise KeyboardInterrupt else: global _interrupt _interrupt = True
def test_interrupt_main(self): #Calling start_new_thread with a function that executes interrupt_main # should raise KeyboardInterrupt upon completion. def call_interrupt(): _thread.interrupt_main() self.assertRaises(KeyboardInterrupt, _thread.start_new_thread, call_interrupt, tuple())
def test_multi_creation(self): #Make sure multiple threads can be created. def queue_mark(queue, delay): """Wait for ``delay`` seconds and then put something into ``queue``""" time.sleep(delay) queue.put(_thread.get_ident()) thread_count = 5 testing_queue = queue.Queue(thread_count) if support.verbose: print() print("*** Testing multiple thread creation "\ "(will take approx. %s to %s sec.) ***" % (DELAY, thread_count)) for count in range(thread_count): if DELAY: local_delay = round(random.random(), 1) else: local_delay = 0 _thread.start_new_thread(queue_mark, (testing_queue, local_delay)) time.sleep(DELAY) if support.verbose: print('done') self.assertTrue(testing_queue.qsize() == thread_count, "Not all %s threads executed properly after %s sec." % (thread_count, DELAY))
def interrupt_main(): """Set _interrupt flag to True to have start_new_thread raise KeyboardInterrupt upon exiting.""" if _main: raise KeyboardInterrupt else: global _interrupt _interrupt = True # Brython-specific to avoid circular references between threading and _threading_local
def start_video(self, video): self.queue.put(lambda: thread.start_new_thread(self.background_download, (video,))) video['start_pause'].set('Pauze') if video['status'] == 4: video['status'] = 1
def append_download_to_queue(self, url): video = None # for v in self._videos: # if v['status'] not in [3, 5] and v['url'] == url: # # TODO: Update opts # video = v # if video is None: if True: video = { 'url': url, 'filename': self.filename.get(), 'outdir': self.outdir.get(), 'subtitles': self.subtitles.get(), 'overwrite': self.overwrite.get(), 'quality': self.quality.get(), # 0: not yet started # 1: in progress # 2: finished # 3: error # 4: paused # 5: cancelled 'status': 0, } self._videos.append(video) video['row'] = len(self._videos) self.queue.put(lambda: thread.start_new_thread(self.fetch_meta, (video,)))