我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用_thread.get_ident()。
def recursive_repr(fillvalue='...'): 'Decorator to make a repr function return fillvalue for a recursive call' def decorating_function(user_function): repr_running = set() def wrapper(self): key = id(self), get_ident() if key in repr_running: return fillvalue repr_running.add(key) try: result = user_function(self) finally: repr_running.discard(key) return result # Can't use functools.wraps() here because of bootstrap issues wrapper.__module__ = getattr(user_function, '__module__') wrapper.__doc__ = getattr(user_function, '__doc__') wrapper.__name__ = getattr(user_function, '__name__') wrapper.__annotations__ = getattr(user_function, '__annotations__', {}) return wrapper return decorating_function
def _recursive_repr(fillvalue='...'): 'Decorator to make a repr function return fillvalue for a recursive call' def decorating_function(user_function): repr_running = set() def wrapper(self): key = id(self), get_ident() if key in repr_running: return fillvalue repr_running.add(key) try: result = user_function(self) finally: repr_running.discard(key) return result # Can't use functools.wraps() here because of bootstrap issues wrapper.__module__ = getattr(user_function, '__module__') wrapper.__doc__ = getattr(user_function, '__doc__') wrapper.__name__ = getattr(user_function, '__name__') wrapper.__annotations__ = getattr(user_function, '__annotations__', {}) return wrapper return decorating_function
def remind(self): while True: (self.lowest_time,remind_for,chat) = self.find_next() if self.lowest_time!=-1: sleep_for = self.lowest_time - time.time() if (sleep_for<=0): reply="Should have reminded you for \"`{}`\" on {}, sorry...".format("`\" and \"`".join(self.reminders[chat][str(self.lowest_time)]),time.strftime("%d.%m.%Y - %H:%M:%S",time.localtime(self.lowest_time))) self.bot.sendMessage(chat,reply,parse_mode="Markdown") self.reminders[chat].pop(str(self.lowest_time)) self.save_reminders(chat) else: time.sleep(sleep_for) if self.remindthread==thread.get_ident(): reply="Reminder for \"`{}`\". It is now {}.".format("`\" and \"`".join(self.reminders[chat][str(self.lowest_time)]),time.strftime("%d.%m.%Y - %H:%M:%S",time.localtime(self.lowest_time))) self.bot.sendMessage(chat,reply,parse_mode="Markdown") self.reminders[chat].pop(str(self.lowest_time)) self.save_reminders(chat) else: # other thread took over... return else: #no reminder left return
def recursive_repr(func): """Decorator to prevent infinite repr recursion.""" repr_running = set() @wraps(func) def wrapper(self): key = id(self), get_ident() if key in repr_running: return '...' repr_running.add(key) try: return func(self) finally: repr_running.discard(key) return wrapper
def __init__(self, loop=None, default=None): greenlet.__init__(self) if hasattr(loop, 'run'): if default is not None: raise TypeError("Unexpected argument: default") self.loop = loop else: if default is None and get_ident() != MAIN_THREAD: default = False loop_class = _import(self.loop_class) if loop is None: loop = self.backend self.loop = loop_class(flags=loop, default=default) self._resolver = None self._threadpool = None self.format_context = _import(self.format_context)
def __init__(self, loop=None, default=None): greenlet.__init__(self) if hasattr(loop, 'run'): if default is not None: raise TypeError("Unexpected argument: default") self.loop = loop elif _threadlocal.loop is not None: # Reuse a loop instance previously set by # destroying a hub without destroying the associated # loop. See #237 and #238. self.loop = _threadlocal.loop else: if default is None and get_ident() != MAIN_THREAD: default = False loop_class = _import(self.loop_class) if loop is None: loop = self.backend self.loop = loop_class(flags=loop, default=default) self._resolver = None self._threadpool = None self.format_context = _import(self.format_context)
def __init__(self, f, n, wait_before_exit=False): """ Construct a bunch of `n` threads running the same function `f`. If `wait_before_exit` is True, the threads won't terminate until do_finish() is called. """ self.f = f self.n = n self.started = [] self.finished = [] self._can_exit = not wait_before_exit def task(): tid = get_ident() self.started.append(tid) try: f() finally: self.finished.append(tid) while not self._can_exit: _wait() for i in range(n): start_new_thread(task, ())
def release(self): """Release a lock, decrementing the recursion level. If after the decrement it is zero, reset the lock to unlocked (not owned by any thread), and if any other threads are blocked waiting for the lock to become unlocked, allow exactly one of them to proceed. If after the decrement the recursion level is still nonzero, the lock remains locked and owned by the calling thread. Only call this method when the calling thread owns the lock. A RuntimeError is raised if this method is called when the lock is unlocked. There is no return value. """ if self._owner != get_ident(): raise RuntimeError("cannot release un-acquired lock") self._count = count = self._count - 1 if not count: self._owner = None self._block.release()
def recursive_repr(func): """Decorator to prevent infinite repr recursion.""" repr_running = set() @wraps(func) def wrapper(self): "Return ellipsis on recursive re-entry to function." key = id(self), get_ident() if key in repr_running: return '...' repr_running.add(key) try: return func(self) finally: repr_running.discard(key) return wrapper
def recursive_repr(fillvalue='...'): 'Decorator to make a repr function return fillvalue for a recursive call' def decorating_function(user_function): repr_running = set() def wrapper(self): key = id(self), get_ident() if key in repr_running: return fillvalue repr_running.add(key) try: result = user_function(self) finally: repr_running.discard(key) return result # Can't use functools.wraps() here because of bootstrap issues wrapper.__module__ = getattr(user_function, '__module__') wrapper.__doc__ = getattr(user_function, '__doc__') wrapper.__name__ = getattr(user_function, '__name__') return wrapper return decorating_function
def recursive_repr(fillvalue='...'): 'Decorator to make a repr function return fillvalue for a recursive call' def decorating_function(user_function): repr_running = set() def wrapper(self): key = id(self), get_ident() if key in repr_running: return fillvalue repr_running.add(key) try: result = user_function(self) finally: repr_running.discard(key) return result # Can't use functools.wraps() here because of bootstrap issues wrapper.__module__ = getattr(user_function, '__module__') wrapper.__doc__ = getattr(user_function, '__doc__') wrapper.__name__ = getattr(user_function, '__name__') wrapper.__qualname__ = getattr(user_function, '__qualname__') wrapper.__annotations__ = getattr(user_function, '__annotations__', {}) return wrapper return decorating_function
def get_ident(): """Dummy implementation of _thread.get_ident(). Since this module should only be used when _threadmodule is not available, it is safe to assume that the current process is the only thread. Thus a constant can be safely returned. """ return -1
def __init__(self): object.__setattr__(self, '__storage__', {}) object.__setattr__(self, '__ident_func__', get_ident)
def get_ident(self): """Return the context identifier the local objects use internally for this context. You cannot override this method to change the behavior but use it to link other context local objects (such as SQLAlchemy's scoped sessions) to the Werkzeug locals. .. versionchanged:: 0.7 You can pass a different ident function to the local manager that will then be propagated to all the locals passed to the constructor. """ return self.ident_func()
def __enter__(self): # mark that we're about to do socket I/O so we won't deliver # debug events when we're debugging the standard library cur_thread = get_thread_from_id(thread.get_ident()) if cur_thread is not None: cur_thread.is_sending = True send_lock.acquire()
def __exit__(self, exc_type, exc_value, tb): send_lock.release() # start sending debug events again cur_thread = get_thread_from_id(thread.get_ident()) if cur_thread is not None: cur_thread.is_sending = False if exc_type is not None: detach_threads() detach_process() # swallow the exception, we're no longer debugging return True
def __init__(self, id = None): if id is not None: self.id = id else: self.id = thread.get_ident() self._events = {'call' : self.handle_call, 'line' : self.handle_line, 'return' : self.handle_return, 'exception' : self.handle_exception, 'c_call' : self.handle_c_call, 'c_return' : self.handle_c_return, 'c_exception' : self.handle_c_exception, } self.cur_frame = None self.stepping = STEPPING_NONE self.unblock_work = None self._block_lock = thread.allocate_lock() self._block_lock.acquire() self._block_starting_lock = thread.allocate_lock() self._is_blocked = False self._is_working = False self.stopped_on_line = None self.detach = False self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly self.prev_trace_func = None self.trace_func_stack = [] self.reported_process_loaded = False self.django_stepping = None self.is_sending = False # stackless changes if stackless is not None: self._stackless_attach() if sys.platform == 'cli': self.frames = []
def block(self, block_lambda, keep_stopped_on_line = False): """blocks the current thread until the debugger resumes it""" assert not self._is_blocked #assert self.id == thread.get_ident(), 'wrong thread identity' + str(self.id) + ' ' + str(thread.get_ident()) # we should only ever block ourselves # send thread frames before we block self.enum_thread_frames_locally() if not keep_stopped_on_line: self.stopped_on_line = self.cur_frame.f_lineno # need to synchronize w/ sending the reason we're blocking self._block_starting_lock.acquire() self._is_blocked = True block_lambda() self._block_starting_lock.release() while not DETACHED: self._block_lock.acquire() if self.unblock_work is None: break # the debugger wants us to do something, do it, and then block again self._is_working = True self.unblock_work() self.unblock_work = None self._is_working = False self._block_starting_lock.acquire() assert self._is_blocked self._is_blocked = False self._block_starting_lock.release()
def unblock(self): """unblocks the current thread allowing it to continue to run""" assert self._is_blocked assert self.id != thread.get_ident() # only someone else should unblock us self._block_lock.release()
def write(self, data): if not DETACHED: probe_stack(3) str_data = utf_8.decode(data)[0] with _SendLockCtx: write_bytes(conn, OUTP) write_int(conn, thread.get_ident()) write_string(conn, str_data) self.buffer.write(data)
def break_into_debugger(): """If a PTVS remote debugger is attached, pauses execution of all threads, and breaks into the debugger with current thread as active. """ if not vspd.DETACHED: vspd.SEND_BREAK_COMPLETE = thread.get_ident() vspd.mark_all_threads_for_break()