我们从Python开源项目中,提取了以下39个代码示例,用于说明如何使用threading.main_thread()。
def cron_task_host(): """??????, ?????????, ???????????""" while True: # ????????, ?????? if not enable_cron_tasks: if threading.current_thread() != threading.main_thread(): exit() else: return sleep(60) try: task_scheduler.run() except: # coverage: exclude errprint('ErrorDuringExecutingCronTasks') traceback.print_exc()
def ki_manager(deliver_cb, restrict_keyboard_interrupt_to_checkpoints): if (threading.current_thread() != threading.main_thread() or signal.getsignal(signal.SIGINT) != signal.default_int_handler): yield return def handler(signum, frame): assert signum == signal.SIGINT protection_enabled = ki_protection_enabled(frame) if protection_enabled or restrict_keyboard_interrupt_to_checkpoints: deliver_cb() else: raise KeyboardInterrupt signal.signal(signal.SIGINT, handler) try: yield finally: if signal.getsignal(signal.SIGINT) is handler: signal.signal(signal.SIGINT, signal.default_int_handler)
def test_threaded(testbot): def threadtest(signal): # If a new event loop isn't created for the thread, this will crash try: assert threading.current_thread() != threading.main_thread() testbot.load_data() except Exception as error: # Pytest will catch this stdout and print it and the signal will # fail the test print(error) signal.clear() else: signal.set() signal = threading.Event() thread = threading.Thread(target=threadtest, args=(signal, )) thread.start() thread.join() assert signal.is_set()
def test_main_thread_after_fork(self): code = """if 1: import os, threading pid = os.fork() if pid == 0: main = threading.main_thread() print(main.name) print(main.ident == threading.current_thread().ident) print(main.ident == threading.get_ident()) else: os.waitpid(pid, 0) """ _, out, err = assert_python_ok("-c", code) data = out.decode().replace('\r', '') self.assertEqual(err, b"") self.assertEqual(data, "MainThread\nTrue\nTrue\n")
def test_main_thread_after_fork_from_nonmain_thread(self): code = """if 1: import os, threading, sys def f(): pid = os.fork() if pid == 0: main = threading.main_thread() print(main.name) print(main.ident == threading.current_thread().ident) print(main.ident == threading.get_ident()) # stdout is fully buffered because not a tty, # we have to flush before exit. sys.stdout.flush() else: os.waitpid(pid, 0) th = threading.Thread(target=f) th.start() th.join() """ _, out, err = assert_python_ok("-c", code) data = out.decode().replace('\r', '') self.assertEqual(err, b"") self.assertEqual(data, "Thread-1\nTrue\nTrue\n")
def test_3_join_in_forked_from_thread(self): # Like the test above, but fork() was called from a worker thread # In the forked process, the main Thread object must be marked as stopped. script = """if 1: main_thread = threading.current_thread() def worker(): childpid = os.fork() if childpid != 0: os.waitpid(childpid, 0) sys.exit(0) t = threading.Thread(target=joiningfunc, args=(main_thread,)) print('end of main') t.start() t.join() # Should not block: main_thread is already stopped w = threading.Thread(target=worker) w.start() """ self._run_and_join(script)
def initialize_exception_listener(): # must be invoked in main thread in "geventless" runs in order for raise_in_main_thread to work global REGISTERED_SIGNAL if REGISTERED_SIGNAL: # already registered return if threading.current_thread() is not threading.main_thread(): raise NotMainThread() def handle_signal(sig, stack): global LAST_ERROR error = LAST_ERROR LAST_ERROR = None if error: raise error raise LastErrorEmpty(signal=sig) custom_signal = signal.SIGUSR1 if signal.getsignal(custom_signal) in (signal.SIG_DFL, signal.SIG_IGN): # check if signal is already trapped signal.signal(custom_signal, handle_signal) REGISTERED_SIGNAL = custom_signal else: raise SignalAlreadyBound(signal=custom_signal)
def raise_in_main_thread(exception_type=Exception): try: yield except ProcessExiting: # this exception is meant to stay within the thread raise except exception_type as exc: if threading.current_thread() is threading.main_thread(): raise exc._raised_asynchronously = True global LAST_ERROR if LAST_ERROR: _logger.warning("a different error (%s) is pending - skipping", type(LAST_ERROR)) raise LAST_ERROR = exc _rimt(exc)
def destroy(self, path): # fusermount -u or SIGINT aka control-C self.lfs_status = FRDnode.SOC_STATUS_OFFLINE self.librarian(self.lcp('update_node_soc_status', status=FRDnode.SOC_STATUS_OFFLINE, cpu_percent=0, rootfs_percent=0, network_in=0, network_out=0, mem_percent=0)) self.librarian(self.lcp('update_node_mc_status', status=FRDFAModule.MC_STATUS_OFFLINE)) assert threading.current_thread() is threading.main_thread() self.torms.close() del self.torms # helpers
def myChildThread(): print("Child Thread Starting") time.sleep(5) print("Current Thread ----------") print(threading.current_thread()) print("-------------------------") print("Main Thread -------------") print(threading.main_thread()) print("-------------------------") print("Child Thread Ending")
def on_title_read(self, title): assert threading.current_thread() == threading.main_thread() if title is None: return if title != self._last_title: self._last_title = title # TODO: Fade volume gradually # TODO: Allow user to choose what to do when an advertisement block is detected. # Ideas for possible options: # * reduce or mute volume # * play random audio file from a local directory # * switch to another radio station # * repeat part of last song print("Title changed to %s" % title) # If the title contains a blacklisted tag, reduce volume if BlacklistStorage.is_blacklisted(title): if not self._in_ad_block: print('Advertisement tag detected.') if config.block_mode in (config.BlockMode.REDUCE_VOLUME, config.BlockMode.REDUCE_AND_SWITCH): print('Reducing volume.') self.volume = config.ad_block_volume self._in_ad_block = True self._last_ad_time = time.time() elif config.block_mode == config.BlockMode.SWITCH_STATION: self.switch_to_another_station() else: if self._in_ad_block: print('Restoring volume to maximum.') if config.block_mode in (config.BlockMode.REDUCE_VOLUME, config.BlockMode.REDUCE_AND_SWITCH): self.volume = config.max_volume self._in_ad_block = False self._last_ad_time = None self._just_switched = False dispatchers.player.song_changed(title)
def fire_state_change(self): assert threading.current_thread() == threading.main_thread() dispatchers.player.playing_state_changed(self.is_playing)
def __init__(self): # https://msdn.microsoft.com/en-us/library/windows/desktop/aa363862(v=vs.85).aspx self._closed = True self._iocp = _check( kernel32. CreateIoCompletionPort(INVALID_HANDLE_VALUE, ffi.NULL, 0, 0) ) self._closed = False self._iocp_queue = deque() self._iocp_thread = None self._overlapped_waiters = {} self._completion_key_queues = {} # Completion key 0 is reserved for regular IO events self._completion_key_counter = itertools.count(1) # {stdlib socket object: task} # except that wakeup socket is mapped to None self._socket_waiters = {"read": {}, "write": {}} self._main_thread_waker = WakeupSocketpair() wakeup_sock = self._main_thread_waker.wakeup_sock self._socket_waiters["read"][wakeup_sock] = None # This is necessary to allow control-C to interrupt select(). # https://github.com/python-trio/trio/issues/42 if threading.current_thread() == threading.main_thread(): fileno = self._main_thread_waker.write_sock.fileno() self._old_signal_wakeup_fd = signal.set_wakeup_fd(fileno)
def close(self): if not self._closed: self._closed = True _check(kernel32.CloseHandle(self._iocp)) if self._iocp_thread is not None: self._iocp_thread.join() self._main_thread_waker.close() if threading.current_thread() == threading.main_thread(): signal.set_wakeup_fd(self._old_signal_wakeup_fd)
def main_native_thread(): return __threading__.main_thread() # pylint:disable=no-member
def not_on_main_thread() -> bool: return threading.current_thread() != threading.main_thread()
def remove_heart_log(*args, **kwargs): if six.PY2: if threading.current_thread().name == 'MainThread': debug_log(*args, **kwargs) else: if threading.current_thread() == threading.main_thread(): debug_log(*args, **kwargs)
def _sketch_raise_in_main(exc): ''' Sketchy way to raise an exception in the main thread. ''' if isinstance(exc, BaseException): exc = type(exc) elif issubclass(exc, BaseException): pass else: raise TypeError('Must raise an exception.') # Figure out the id of the main thread main_id = threading.main_thread().ident thread_ref = ctypes.c_long(main_id) exc = ctypes.py_object(exc) result = ctypes.pythonapi.PyThreadState_SetAsyncExc( thread_ref, exc ) # 0 Is failed. if result == 0: raise SystemError('Main thread had invalid ID?') # 1 succeeded # > 1 failed elif result > 1: ctypes.pythonapi.PyThreadState_SetAsyncExc(main_id, 0) raise SystemError('Failed to raise in main thread.')
def _watch_for_exit(self): ''' Automatically watches for termination of the main thread and then closes self gracefully. ''' main = threading.main_thread() main.join() self._stop_nowait()
def test_main_thread(self): main = threading.main_thread() self.assertEqual(main.name, 'MainThread') self.assertEqual(main.ident, threading.current_thread().ident) self.assertEqual(main.ident, threading.get_ident()) def f(): self.assertNotEqual(threading.main_thread().ident, threading.current_thread().ident) th = threading.Thread(target=f) th.start() th.join()
def on_main_thread(): """Checks if we are on the main thread or not.""" return threading.current_thread() is threading.main_thread()
def _on_main_thread(): """Checks if we are on the main thread or not. Duplicated from xonsh.tools here so that this module only relies on the Python standrd library. """ return threading.current_thread() is threading.main_thread()
def _patch_module_locks(): # gevent will not patch existing locks (including ModuleLocks) when it's not single threaded # our solution is to monkey patch the release method for ModuleLocks objects # we assume that patching is done early enough so no other locks are present import importlib _old_release = importlib._bootstrap._ModuleLock.release def _release(*args, **kw): lock = args[0] if lock.owner == main_thread_ident_before_patching: lock.owner = threading.main_thread().ident _old_release(*args, **kw) importlib._bootstrap._ModuleLock.release = _release
def _rimt(exc): _logger.info('YELLOW<<killing main thread greenlet>>') main_thread_greenlet = threading.main_thread()._greenlet orig_throw = main_thread_greenlet.throw # we must override "throw" method so exception will be raised with the original traceback def throw(*args): if len(args) == 1: ex = args[0] return orig_throw(ex.__class__, ex, ex.__traceback__) return orig_throw(*args) main_thread_greenlet.throw = throw gevent.kill(main_thread_greenlet, exc) _logger.debug('exiting the thread that failed') raise exc
def test_call_in_executor(executor): """Test that call_in_thread actually runs the target in a worker thread.""" assert not await call_in_executor(lambda: current_thread() is main_thread(), executor=executor)
def handleOOB(self): # ALTERNATIVE: Put the message on a Queue for the main thread. assert threading.current_thread() is threading.main_thread() for oob in self.torms.inOOB: self.logger.warning('\t\t!!!!!!!!!!!!!!!!!!!!!!!! %s' % oob) self.torms.clearOOB()
def _run(name, queue, options): """ The actual process that runs the separate controller instance. :param name: name of the process :param queue: Queue of the binding parent. :param options: Custom Options :type name: str """ from pyplanet.core.instance import Controller from pyplanet.utils.log import initiate_logger, QueueHandler import logging # Tokio Asyncio (EXPERIMENTAL). if 'tokio' in options and options['tokio'] is True: import tokio import asyncio policy = tokio.TokioLoopPolicy() asyncio.set_event_loop_policy(policy) asyncio.set_event_loop(tokio.new_event_loop()) logging.warning('Using experimental Tokio Asyncio Loop!') # Logging to queue. if multiprocessing.get_start_method() != 'fork': # pragma: no cover initiate_logger() root_logger = logging.getLogger() formatter = ColoredFormatter( '%(log_color)s%(levelname)-8s%(reset)s %(yellow)s[%(threadName)s][%(name)s]%(reset)s %(blue)s%(message)s' ) queue_handler = QueueHandler(queue) queue_handler.setFormatter(formatter) root_logger.addHandler(queue_handler) logging.getLogger(__name__).info('Starting pool process for \'{}\'...'.format(name)) # Setting thread name to our process name. threading.main_thread().setName(name) # Start instance. instance = Controller.prepare(name).instance instance._queue = queue instance.start()
def cron_task_container(task_dict, add_task_only=False): """ ??????. ??????, ?????????????? :param task_dict: ?????????, dict { "target":????(????????,????????) ??, "iterval":????(?) ??, "priority":??? ??, "name":?????? ?? "args":????? (arg1,arg2) ??, "kwargs":????? {key:value,} ??, } :param add_task_only: ????????????? """ global task_scheduler if not add_task_only: # ???? try: infoprint('CronTask:', task_dict.get('name', str(task_dict['target'])), 'Target:', str(task_dict['target'])) target_func = task_dict.get('target') if target_func is None: raise ValueError("target is not given in " + str(task_dict)) target_func( *(task_dict.get('args', ())), # ???????? **(task_dict.get('kwargs', {})) ) except: # coverage: exclude errprint('ErrorWhenProcessingCronTasks', task_dict) traceback.print_exc() # ????????, ?????? if not enable_cron_tasks: if threading.current_thread() != threading.main_thread(): exit() else: return # ????????? task_scheduler.enter( task_dict.get('interval', 300), task_dict.get('priority', 999), cron_task_container, (task_dict,) )
def catch_signals(signals): """A context manager for catching signals. Entering this context manager starts listening for the given signals and returns an async iterator; exiting the context manager stops listening. The async iterator blocks until at least one signal has arrived, and then yields a :class:`set` containing all of the signals that were received since the last iteration. Note that if you leave the ``with`` block while the iterator has unextracted signals still pending inside it, then they will be re-delivered using Python's regular signal handling logic. This avoids a race condition when signals arrives just before we exit the ``with`` block. Args: signals: a set of signals to listen for. Raises: RuntimeError: if you try to use this anywhere except Python's main thread. (This is a Python limitation.) Example: A common convention for Unix daemons is that they should reload their configuration when they receive a ``SIGHUP``. Here's a sketch of what that might look like using :func:`catch_signals`:: with trio.catch_signals({signal.SIGHUP}) as batched_signal_aiter: async for batch in batched_signal_aiter: # We're only listening for one signal, so the batch is always # {signal.SIGHUP}, but if we were listening to more signals # then it could vary. for signum in batch: assert signum == signal.SIGHUP reload_configuration() """ if threading.current_thread() != threading.main_thread(): raise RuntimeError( "Sorry, catch_signals is only possible when running in the " "Python interpreter's main thread" ) token = _core.current_trio_token() queue = SignalQueue() def handler(signum, _): token.run_sync_soon(queue._add, signum, idempotent=True) try: with _signal_handler(signals, handler): yield queue finally: queue._redeliver_remaining()
def _index_subjects(self): """ quereies the triplestore for all subject uri""" lg = logging.getLogger("%s.%s" % (self.ln, inspect.stack()[0][3])) lg.setLevel(self.log_level) # if the subjects have been indexed and there are no new subjects exit if self.data_status.get("indexed") and not self.new_subjects: return # get a list of all the loc_subject URIs sparql = """ SELECT ?s { ?s skos:inScheme <http://id.loc.gov/authorities/subjects> . }""" results = run_sparql_query(sparql=sparql) # Start processing through self.time_start = datetime.datetime.now() batch_size = 12000 if len(results) > batch_size: batch_end = batch_size else: batch_end = len(results) - 1 batch_start = 0 batch_num = 1 self.batch_data = {} self.batch_data[batch_num] = [] end = False last = False while not end: lg.debug("batch %s: %s-%s", batch_num, batch_start, batch_end) for i, subj in enumerate(results[batch_start:batch_end]): th = threading.Thread(name=batch_start + i + 1, target=self._index_subject_item, args=(iri(subj['s']['value']), i+1,batch_num,)) th.start() #self._index_subject_item(iri(subj['s']['value']),i+1) print(datetime.datetime.now() - self.time_start) main_thread = threading.main_thread() for t in threading.enumerate(): if t is main_thread: continue #print('joining %s', t.getName()) t.join() action_list = \ self.es_worker.make_action_list(self.batch_data[batch_num]) self.es_worker.bulk_save(action_list) del self.batch_data[batch_num] batch_end += batch_size batch_start += batch_size if last: end = True if len(results) <= batch_end: batch_end = len(results) last = True batch_num += 1 self.batch_data[batch_num] = [] print(datetime.datetime.now() - self.time_start)
def get_thread_tree(including_this=True): from .logging import THREAD_LOGGING_CONTEXT from .bunch import Bunch tree = {} dead_threads = set() contexts = {} stacks = {} def add_to_tree(thread): contexts[thread.ident] = THREAD_LOGGING_CONTEXT.flatten(thread.uuid) parent = get_thread_parent(thread) if isinstance(parent, DeadThread) and parent not in dead_threads: dead_threads.add(parent) add_to_tree(parent) tree.setdefault(parent, []).append(thread) for thread in threading.enumerate(): add_to_tree(thread) current_ident = threading.current_thread().ident main_ident = threading.main_thread().ident for thread_ident, frame in iter_thread_frames(): if not including_this and thread_ident == current_ident: formatted = " <this frame>" else: # show the entire stack if it's this thread, don't skip ('after_module') anything show_all = thread_ident in (current_ident, main_ident) formatted = format_thread_stack(frame, skip_modules=[] if show_all else _BOOTSTRAPPERS) if frame else '' stacks[thread_ident] = formatted, time.time() def add_thread(parent_thread, parent): for thread in sorted(tree[parent_thread], key=lambda thread: thread.name): ident = thread.ident or 0 stack, ts = stacks.get(ident, ("", 0)) context = contexts.get(ident, {}) context_line = ", ".join("%s: %s" % (k, context[k]) for k in "host context".split() if context.get(k)) this = Bunch( name=thread.name, daemon="[D]" if getattr(thread, "daemon", False) else "", ident=ident, context_line="({})".format(context_line) if context_line else "", stack=stack, timestamp=ts, children=[], ) parent.children.append(this) if thread in tree: add_thread(thread, this) return parent return add_thread(None, Bunch(children=[]))