我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用threading.get_ident()。
def __init__(self, graph=None): """ Construct the graph object. Parameters ---------- graph : networkx.DiGraph or NNGraph Graph to build the object from (optional). """ super(Graph, self).__init__() # Privates self._thread_to_graph_mapping = {} self._creator_thread = threading.get_ident() self._creator_pid = mp.current_process().pid # Publics if graph is not None: self.graph = graph else: self.graph = NNGraph()
def _recursive_repr(fillvalue='...'): 'Decorator to make a repr function return fillvalue for a recursive call' def decorating_function(user_function): repr_running = set() def wrapper(self): key = id(self), get_ident() if key in repr_running: return fillvalue repr_running.add(key) try: result = user_function(self) finally: repr_running.discard(key) return result # Can't use functools.wraps() here because of bootstrap issues wrapper.__module__ = getattr(user_function, '__module__') wrapper.__doc__ = getattr(user_function, '__doc__') wrapper.__name__ = getattr(user_function, '__name__') wrapper.__annotations__ = getattr(user_function, '__annotations__', {}) return wrapper return decorating_function
def async_test_scarlett_os(loop): """Return a ScarlettOS object pointing at test config dir.""" # loop._thread_ident = threading.get_ident() ss = s.ScarlettSystem(loop) ss.config.location_name = 'test scarlett' ss.config.config_dir = get_test_config_dir() ss.config.latitude = 32.87336 ss.config.longitude = -117.22743 ss.config.elevation = 0 ss.config.time_zone = date_utility.get_time_zone('US/Pacific') ss.config.units = METRIC_SYSTEM ss.config.skip_pip = True # if 'custom_automations.test' not in loader.AVAILABLE_COMPONENTS: # yield from loop.run_in_executor(None, loader.prepare, ss) ss.state = s.CoreState.running return ss
def pre_work( self, task, ): self.update_current_task( task=task, ) interval = self.worker_config['timeouts']['soft_timeout'] if interval == 0: interval = None self.current_timers[threading.get_ident()] = threading.Timer( interval=interval, function=ctypes.pythonapi.PyThreadState_SetAsyncExc, args=( ctypes.c_long(threading.get_ident()), ctypes.py_object(worker.WorkerSoftTimedout), ) ) self.current_timers[threading.get_ident()].start()
def __repr__(self, _repr_running={}): 'repr as "MIDict(items, names)"' call_key = id(self), _get_ident() if call_key in _repr_running: # pragma: no cover return '<%s(...)>' % self.__class__.__name__ _repr_running[call_key] = 1 try: try: if self.indices: names = force_list(self.indices.keys()) items = force_list(self.items()) return '%s(%s, %s)' % (self.__class__.__name__, items, names) except AttributeError: # pragma: no cover # may not have attr ``indices`` yet pass return '%s()' % self.__class__.__name__ finally: del _repr_running[call_key]
def reject_recursive_repeats(to_wrap): ''' Prevent simple cycles by returning None when called recursively with same instance ''' to_wrap.__already_called = {} @functools.wraps(to_wrap) def wrapped(*args): arg_instances = tuple(map(id, args)) thread_id = threading.get_ident() thread_local_args = (thread_id,) + arg_instances if thread_local_args in to_wrap.__already_called: raise ValueError('Recursively called %s with %r' % (to_wrap, args)) to_wrap.__already_called[thread_local_args] = True try: wrapped_val = to_wrap(*args) finally: del to_wrap.__already_called[thread_local_args] return wrapped_val return wrapped
def test_pthread_kill_main_thread(self): # Test that a signal can be sent to the main thread with pthread_kill() # before any other thread has been created (see issue #12392). code = """if True: import threading import signal import sys def handler(signum, frame): sys.exit(3) signal.signal(signal.SIGUSR1, handler) signal.pthread_kill(threading.get_ident(), signal.SIGUSR1) sys.exit(2) """ with spawn_python('-c', code) as process: stdout, stderr = process.communicate() exitcode = process.wait() if exitcode != 3: raise Exception("Child error (exit code %s): %s" % (exitcode, stdout))
def __init__(self, f, n, wait_before_exit=False): """ Construct a bunch of `n` threads running the same function `f`. If `wait_before_exit` is True, the threads won't terminate until do_finish() is called. """ self.f = f self.n = n self.started = [] self.finished = [] self._can_exit = not wait_before_exit def task(): tid = threading.get_ident() self.started.append(tid) try: f() finally: self.finished.append(tid) while not self._can_exit: _wait() for i in range(n): start_new_thread(task, ())
def task(N, done, done_tasks, errors): try: # We don't use modulefinder but still import it in order to stress # importing of different modules from several threads. if len(done_tasks) % 2: import modulefinder import random else: import random import modulefinder # This will fail if random is not completely initialized x = random.randrange(1, 3) except Exception as e: errors.append(e.with_traceback(None)) finally: done_tasks.append(threading.get_ident()) finished = len(done_tasks) == N if finished: done.set() # Create a circular import structure: A -> C -> B -> D -> A # NOTE: `time` is already loaded and therefore doesn't threaten to deadlock.
def test_thread_state(self): # some extra thread-state tests driven via _testcapi def target(): idents = [] def callback(): idents.append(threading.get_ident()) _testcapi._test_thread_state(callback) a = b = callback time.sleep(1) # Check our main thread is in the list exactly 3 times. self.assertEqual(idents.count(threading.get_ident()), 3, "Couldn't find main thread correctly in the list") target() t = threading.Thread(target=target) t.start() t.join()
def _check_thread(self): """Check that the current thread is the thread running the event loop. Non-thread-safe methods of this class make this assumption and will likely behave incorrectly when the assumption is violated. Should only be called when (self._debug == True). The caller is responsible for checking this condition for performance reasons. """ if self._thread_id is None: return thread_id = threading.get_ident() if thread_id != self._thread_id: raise RuntimeError( "Non-thread-safe operation invoked on an event loop other " "than the current one")
def _green_existing_locks(): """Make locks created before monkey-patching safe. RLocks rely on a Lock and on Python 2, if an unpatched Lock blocks, it blocks the native thread. We need to replace these with green Locks. This was originally noticed in the stdlib logging module.""" import gc import threading import eventlet.green.thread lock_type = type(threading.Lock()) rlock_type = type(threading.RLock()) if sys.version_info[0] >= 3: pyrlock_type = type(threading._PyRLock()) # We're monkey-patching so there can't be any greenlets yet, ergo our thread # ID is the only valid owner possible. tid = eventlet.green.thread.get_ident() for obj in gc.get_objects(): if isinstance(obj, rlock_type): if (sys.version_info[0] == 2 and isinstance(obj._RLock__block, lock_type)): _fix_py2_rlock(obj, tid) elif (sys.version_info[0] >= 3 and not isinstance(obj, pyrlock_type)): _fix_py3_rlock(obj)
def timeout_response() -> chalice.Response: """ Produce a chalice Response object that indicates a timeout. Stacktraces for all running threads, other than the current thread, are provided in the response object. """ frames = sys._current_frames() current_threadid = threading.get_ident() trace_dump = { thread_id: traceback.format_stack(frame) for thread_id, frame in frames.items() if thread_id != current_threadid} problem = { 'status': requests.codes.gateway_timeout, 'code': "timed_out", 'title': "Timed out processing request.", 'traces': trace_dump, } return chalice.Response( status_code=problem['status'], headers={"Content-Type": "application/problem+json"}, body=json.dumps(problem), )
def test_with_multi_threading(): test_truncate() def task(n): print('In thread {}'.format(threading.get_ident())) for _ in range(n): test_insert_one() threads = [threading.Thread(target=task, args=(100,)) for _ in range(50)] for t in threads: t.start() for t in threads: t.join() test_query()
def run_forever(self): """Run until stop() is called.""" self._check_closed() if self.is_running(): raise RuntimeError('Event loop is running.') self._set_coroutine_wrapper(self._debug) self._thread_id = threading.get_ident() try: while True: self._run_once() if self._stopping: break finally: self._stopping = False self._thread_id = None self._set_coroutine_wrapper(False)
def test_main_thread_after_fork(self): code = """if 1: import os, threading pid = os.fork() if pid == 0: main = threading.main_thread() print(main.name) print(main.ident == threading.current_thread().ident) print(main.ident == threading.get_ident()) else: os.waitpid(pid, 0) """ _, out, err = assert_python_ok("-c", code) data = out.decode().replace('\r', '') self.assertEqual(err, b"") self.assertEqual(data, "MainThread\nTrue\nTrue\n")
def test_main_thread_after_fork_from_nonmain_thread(self): code = """if 1: import os, threading, sys def f(): pid = os.fork() if pid == 0: main = threading.main_thread() print(main.name) print(main.ident == threading.current_thread().ident) print(main.ident == threading.get_ident()) # stdout is fully buffered because not a tty, # we have to flush before exit. sys.stdout.flush() else: os.waitpid(pid, 0) th = threading.Thread(target=f) th.start() th.join() """ _, out, err = assert_python_ok("-c", code) data = out.decode().replace('\r', '') self.assertEqual(err, b"") self.assertEqual(data, "Thread-1\nTrue\nTrue\n")
def __init__(self, uri): if isinstance(uri, str): uri = core.URI(uri) elif not isinstance(uri, core.URI): raise TypeError("expected Pyro URI") self._pyroUri = uri self._pyroConnection = None self._pyroSerializer = None # can be set to the name of a serializer to override the global one per-proxy self._pyroMethods = set() # all methods of the remote object, gotten from meta-data self._pyroAttrs = set() # attributes of the remote object, gotten from meta-data self._pyroOneway = set() # oneway-methods of the remote object, gotten from meta-data self._pyroSeq = 0 # message sequence number self._pyroRawWireResponse = False # internal switch to enable wire level responses self._pyroHandshake = "hello" # the data object that should be sent in the initial connection handshake message self._pyroMaxRetries = config.MAX_RETRIES self.__pyroTimeout = config.COMMTIMEOUT self.__pyroOwnerThread = get_ident() # the thread that owns this proxy if config.SERIALIZER not in serializers.serializers: raise ValueError("unknown serializer configured") core.current_context.annotations = {} core.current_context.response_annotations = {}
def schedule(self, f, *args, **kwargs): """ Try to acquire connection access lock. Then call protocol method. Return concurrent Future instance you can wait in the other thread. """ self.wait_open() # RabbitMQ operations are multiplexed between different AMQP # method callbacks. Final result of the protocol method call # will be set inside one of this callbacks. So other thread # will be able to wait unless this event happens in the # connection event loop. future = Future() with self.lock: self.process(get_ident(), (f, args, kwargs), future) return future
def format_html(fp, exclude=()): frames = stackframes() fp.write('<!DOCTYPE html>\n') fp.write('<html><head><title>{} Traces</title></head><body>\n'.format(len(frames))) for thread_id, stack in sorted(frames.items(), key=lambda x: x[0]): name = 'Thread {}'.format(thread_id) if thread_id == threading.get_ident(): name += ' (tracing thread)' elif thread_id == main_thread.ident: name += ' (main)' fp.write('<h3>{}</h3>\n'.format(name)) tbstr = format_stack(stack) if pygments: formatter = pygments.formatters.HtmlFormatter(full=False, noclasses=True) lexer = pygments.lexers.PythonLexer() tbstr = pygments.highlight(tbstr, lexer, formatter) fp.write(tbstr) fp.write('\n') fp.write('</body>\n')
def test_async(): from threading import get_ident main = get_ident() @on_test.register(async=True) def a1(): assert get_ident() != main @on_test.register() def a2(): assert get_ident() == main on_test() on_test.async = True @on_test.register() # follows the current setting def a3(): assert get_ident() != main on_test() on_test.async = False
def __init__(self, pool_size): assert not self.__class__.POOL, "Can't create more than one: %s" % self.__class__ self.__class__.POOL = self self.active_jobs = set() self.pool_size = pool_size self.jobs_queue = Queue() def work(): while True: func, name, job_id, parent_uuid = self.jobs_queue.get() _set_thread_uuid(threading.get_ident(), parent_uuid) _logger.debug('Starting job in real thread: %s', name or "<anonymous>") func() self.active_jobs.remove(job_id) _logger.debug('ready for the next job') for i in range(self.pool_size): name = 'real-thread-%s' % i thread = threading.Thread(target=work, name=name, daemon=True) thread.start()
def fire_coroutine_threadsafe(coro, loop): """Submit a coroutine object to a given event loop. This method does not provide a way to retrieve the result and is intended for fire-and-forget use. This reduces the work involved to fire the function on the loop. """ ident = loop.__dict__.get("_thread_ident") if ident is not None and ident == threading.get_ident(): raise RuntimeError('Cannot be called from within the event loop') if not coroutines.iscoroutine(coro): raise TypeError('A coroutine object is required: %s' % coro) def callback(): """Callback to fire coroutine.""" # pylint: disable=deprecated-method ensure_future(coro, loop=loop) loop.call_soon_threadsafe(callback) return
def run_callback_threadsafe(loop, callback, *args): """Submit a callback object to a given event loop. Return a concurrent.futures.Future to access the result. """ ident = loop.__dict__.get("_thread_ident") if ident is not None and ident == threading.get_ident(): raise RuntimeError('Cannot be called from within the event loop') future = concurrent.futures.Future() def run_callback(): """Run callback and store result.""" try: future.set_result(callback(*args)) # pylint: disable=broad-except except Exception as exc: if future.set_running_or_notify_cancel(): future.set_exception(exc) else: _LOGGER.warning("Exception on lost future: ", exc_info=True) loop.call_soon_threadsafe(run_callback) return future
def game_loop(asyncio_loop): print("Game loop thread id {}".format(threading.get_ident())) # a coroutine to run in main thread async def notify(): print("Notify thread id {}".format(threading.get_ident())) await tick.acquire() tick.notify_all() tick.release() while 1: task = asyncio.run_coroutine_threadsafe(notify(), asyncio_loop) # blocking the thread sleep(1) # make sure the task has finished task.result()
def get_prelogin_bytes(use_ssl, instance_name): instance_name = instance_name.encode('ascii') + b'\00' pos = 26 # version buf = b'\x00' + _bint_to_2bytes(pos) + _bint_to_2bytes(6) pos += 6 # encryption buf += b'\x01' + _bint_to_2bytes(pos) + _bint_to_2bytes(1) pos += 1 # instance name buf += b'\x02' + _bint_to_2bytes(pos) + _bint_to_2bytes(len(instance_name)) pos += len(instance_name) # thread id buf += b'\x03' + _bint_to_2bytes(pos) + _bint_to_2bytes(4) pos += 4 # MARS buf += b'\x04' + _bint_to_2bytes(pos) + _bint_to_2bytes(1) pos += 1 # terminator buf += b'\xff' assert len(buf) == 26 buf += _bin_version + _bint_to_2bytes(0) if use_ssl is None: buf += b'\x03' # ENCRYPT_REQ elif use_ssl: buf += b'\x01' # ENCRYPT_ON else: buf += b'\x02' # ENCRYPT_NOT_SUP buf += instance_name buf += _bint_to_4bytes(threading.get_ident()) # thread id buf += b'\x00' # not use MARS return buf
def test_run_in_executor(self): def run(arg): return (arg, threading.get_ident()) f2 = self.loop.run_in_executor(None, run, 'yo') res, thread_id = self.loop.run_until_complete(f2) self.assertEqual(res, 'yo') self.assertNotEqual(thread_id, threading.get_ident())
def test_wrap_future(self): def run(arg): return (arg, threading.get_ident()) ex = concurrent.futures.ThreadPoolExecutor(1) f1 = ex.submit(run, 'oi') f2 = asyncio.wrap_future(f1, loop=self.loop) res, ident = self.loop.run_until_complete(f2) self.assertIsInstance(f2, asyncio.Future) self.assertEqual(res, 'oi') self.assertNotEqual(ident, threading.get_ident())
def test_wrap_future_use_global_loop(self, m_events): def run(arg): return (arg, threading.get_ident()) ex = concurrent.futures.ThreadPoolExecutor(1) f1 = ex.submit(run, 'oi') f2 = asyncio.wrap_future(f1) self.assertIs(m_events.get_event_loop.return_value, f2._loop)
def get_current_tenant(cls): try: return cls._threadmap[threading.get_ident()] except KeyError: return None
def set_tenant(cls, tenant_slug): cls._threadmap[threading.get_ident()] = SimpleLazyObject( lambda: Tenant.objects.filter(slug=tenant_slug).first())
def clear_tenant(cls): del cls._threadmap[threading.get_ident()]
def process_request(self, request): request.tenant = SimpleLazyObject(lambda: get_tenant(request)) self._threadmap[threading.get_ident()] = request.tenant return request
def process_exception(self, request, exception): try: del self._threadmap[threading.get_ident()] except KeyError: pass
def _on_read_thread(self): return self._recv_thread is not None and \ threading.get_ident() == self._recv_thread.ident
def graph(self): # `graph` needs to be different for every thread, because torch.nn.parallel.replicate does # not make a copy. graph = self._thread_to_graph_mapping.get(threading.get_ident()) if graph is None: creator_thread_graph = self._thread_to_graph_mapping.get(self._creator_thread) assert creator_thread_graph is not None graph = creator_thread_graph.copy() # We don't need to clear payloads because the copy method of NNGraph copies only the # graph structure and not the attributes self._thread_to_graph_mapping.update({threading.get_ident(): graph}) return graph
def graph(self, value): assert_(isinstance(value, NNGraph), exception_type=TypeError) self._thread_to_graph_mapping.update({threading.get_ident(): value})
def log(s): """ Central log fn, lets modify logging behavior in a single place. """ print("[asset-flinger, %5i] - %s" % (threading.get_ident(), s))
def signal_raise(signum): # Use pthread_kill to make sure we're actually using the wakeup fd on # Unix signal.pthread_kill(threading.get_ident(), signum)
def current_task( self, ): return self.current_tasks[threading.get_ident()]