我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用threading.local()。
def run(self): # the code until the while statement does NOT run atomicaly # a thread while loop cycle is atomic # thread safe locals: L = threading.local(), then L.foo="baz" django.setup() self.logger.info('Worker Starts') while not self._stopevent.isSet(): if not self.worker_queue.empty(): try: task = self.worker_queue.get() self.run_task(task) except Exception as e: helpers.save_task_failed(task,e) else: helpers.save_task_success(task) self.worker_queue = None self.logger.warn('Worker stopped, %s tasks handled'%self.tasks_counter)
def do_extend(self, additional_time): pipe = self.redis.pipeline() pipe.watch(self.name) lock_value = pipe.get(self.name) if lock_value != self.local.token: raise LockError("Cannot extend a lock that's no longer owned") expiration = pipe.pttl(self.name) if expiration is None or expiration < 0: # Redis evicted the lock key between the previous get() and now # we'll handle this when we call pexpire() expiration = 0 pipe.multi() pipe.pexpire(self.name, expiration + int(additional_time * 1000)) try: response = pipe.execute() except WatchError: # someone else acquired the lock raise LockError("Cannot extend a lock that's no longer owned") if not response[0]: # pexpire returns False if the key doesn't exist raise LockError("Cannot extend a lock that's no longer owned") return True
def __new__(cls, *args, **kw): self = object.__new__(cls) key = '_local__key', 'thread.local.' + str(id(self)) object.__setattr__(self, '_local__key', key) object.__setattr__(self, '_local__args', (args, kw)) object.__setattr__(self, '_local__lock', RLock()) if (args or kw) and (cls.__init__ is object.__init__): raise TypeError("Initialization arguments are not supported") # We need to create the thread dict in anticipation of # __init__ being called, to make sure we don't call it # again ourselves. dict = object.__getattribute__(self, '__dict__') current_thread().__dict__[key] = dict return self
def process_request(self, request): """ Gets the current user from the request and prepares and connects a signal receiver with the user already attached to it. """ # Initialize thread local storage threadlocal.actionslog = { 'signal_duid': (self.__class__, time.time()), 'remote_ip': request.META.get('REMOTE_ADDR'), } # In case of proxy, set 'original' address if request.META.get('HTTP_X_FORWARDED_FOR'): threadlocal.actionslog['remote_ip'] = request.META.get('HTTP_X_FORWARDED_FOR').split(',')[0] # Connect signal for automatic logging if hasattr(request, 'user') and hasattr(request.user, 'is_authenticated') and request.user.is_authenticated(): set_user = curry(self.set_user, request.user) pre_save.connect(set_user, sender=LogAction, dispatch_uid=threadlocal.actionslog['signal_duid'], weak=False)
def __exit__(self, type, value, traceback): try: self.exit(type, value, traceback) finally: final_contexts = _state.contexts _state.contexts = self.old_contexts # Generator coroutines and with-statements with non-local # effects interact badly. Check here for signs of # the stack getting out of sync. # Note that this check comes after restoring _state.context # so that if it fails things are left in a (relatively) # consistent state. if final_contexts is not self.new_contexts: raise StackContextInconsistentError( 'stack_context inconsistency (may be caused by yield ' 'within a "with StackContext" block)') # Break up a reference to itself to allow for faster GC on CPython. self.new_contexts = None
def __init__(self, parent, overrides=None, threadsafe=False): self.parent = parent self._data = {} if overrides is None else overrides self._old_data = None # merge self.global_data into self._data if threadsafe: for k, v in six.iteritems(self.parent.global_data): if k not in self._data: # A deepcopy is necessary to avoid using the same # objects in globals as we do in thread local storage. # Otherwise, changing one would automatically affect # the other. self._data[k] = copy.deepcopy(v) else: for k, v in six.iteritems(self.parent.global_data): if k not in self._data: self._data[k] = v
def template_localtime(value, use_tz=None): """ Checks if value is a datetime and converts it to local time if necessary. If use_tz is provided and is not None, that will force the value to be converted (or not), overriding the value of settings.USE_TZ. This function is designed for use by the template engine. """ should_convert = (isinstance(value, datetime) and (settings.USE_TZ if use_tz is None else use_tz) and not is_naive(value) and getattr(value, 'convert_to_local_time', True)) return localtime(value) if should_convert else value # Utilities
def localtime(value, timezone=None): """ Converts an aware datetime.datetime to local time. Local time is defined by the current time zone, unless another time zone is specified. """ if timezone is None: timezone = get_current_timezone() # If `value` is naive, astimezone() will raise a ValueError, # so we don't need to perform a redundant check. value = value.astimezone(timezone) if hasattr(timezone, 'normalize'): # This method is available for pytz time zones. value = timezone.normalize(value) return value
def __addcookies(self): '''Add cookies from self.cookies to request in self.local.h ''' for cname, morsel in self.cookies.iteritems(): attrs = [] value = morsel.get('version', '') if value != '' and value != '0': attrs.append('$Version=%s' % value) attrs.append('%s=%s' % (cname, morsel.coded_value)) value = morsel.get('path') if value: attrs.append('$Path=%s' % value) value = morsel.get('domain') if value: attrs.append('$Domain=%s' % value) self.local.h.putheader('Cookie', "; ".join(attrs))
def Receive(self, replytype, **kw): '''Parse message, create Python object. KeyWord data: faults -- list of WSDL operation.fault typecodes wsaction -- If using WS-Address, must specify Action value we expect to receive. ''' self.ReceiveSOAP(**kw) if self.local.ps.IsAFault(): msg = FaultFromFaultMessage(self.local.ps) raise FaultException(msg) tc = replytype if hasattr(replytype, 'typecode'): tc = replytype.typecode reply = self.local.ps.Parse(tc) if self.address is not None: self.address.checkResponse(self.local.ps, kw.get('wsaction')) return reply
def __parse_child(self, node): '''for rpc-style map each message part to a class in typesmodule ''' try: tc = self.gettypecode(self.typesmodule, node) except: self.logger.debug('didnt find typecode for "%s" in typesmodule: %s', node.localName, self.typesmodule) tc = TC.Any(aslist=1) return tc.parse(node, self.local.ps) self.logger.debug('parse child with typecode : %s', tc) try: return tc.parse(node, self.local.ps) except Exception: self.logger.debug('parse failed try Any : %s', tc) tc = TC.Any(aslist=1) return tc.parse(node, self.local.ps)
def __init__(self, *args, **kwargs): class LocalContexts(threading.local): def __init__(self): super(LocalContexts, self).__init__() self._contexts = [] def append(self, item): self._contexts.append(item) def pop(self): return self._contexts.pop() super(ThreadSafeStackContext, self).__init__(*args, **kwargs) if hasattr(self, 'contexts'): # only patch if context exists self.contexts = LocalContexts()
def load(self): # Apply extra parameters before loading the configs self.register_extra_parameters() globalConfigName = ".dallingerconfig" globalConfig = os.path.expanduser(os.path.join("~/", globalConfigName)) localConfig = os.path.join(os.getcwd(), LOCAL_CONFIG) defaults_folder = os.path.join(os.path.dirname(__file__), "default_configs") local_defaults_file = os.path.join(defaults_folder, "local_config_defaults.txt") global_defaults_file = os.path.join(defaults_folder, "global_config_defaults.txt") # Load the configuration, with local parameters overriding global ones. for config_file in [ global_defaults_file, local_defaults_file, globalConfig, ]: self.load_from_file(config_file) if os.path.exists(localConfig): self.load_from_file(localConfig) self.load_from_environment() self.ready = True
def __init__(self, context, core_src): self._context = context self._present = {'mitogen': [ 'mitogen.ansible', 'mitogen.compat', 'mitogen.compat.pkgutil', 'mitogen.fakessh', 'mitogen.master', 'mitogen.ssh', 'mitogen.sudo', 'mitogen.utils', ]} self.tls = threading.local() self._cache = {} if core_src: self._cache['mitogen.core'] = ( None, 'mitogen/core.py', zlib.compress(core_src), )
def __init__(self, maxusage=None, setsession=None, closeable=False, threadlocal=None, *args, **kwargs): """Set up the persistent PostgreSQL connection generator. maxusage: maximum number of reuses of a single connection (0 or None means unlimited reuse) When this maximum usage number of the connection is reached, the connection is automatically reset (closed and reopened). setsession: optional list of SQL commands that may serve to prepare the session, e.g. ["set datestyle to ...", "set time zone ..."] closeable: if this is set to true, then closing connections will be allowed, but by default this will be silently ignored threadlocal: an optional class for representing thread-local data that will be used instead of our Python implementation (threading.local is faster, but cannot be used in all cases) args, kwargs: the parameters that shall be used to establish the PostgreSQL connections using class PyGreSQL pg.DB() """ self._maxusage = maxusage self._setsession = setsession self._closeable = closeable self._args, self._kwargs = args, kwargs self.thread = (threadlocal or local)()
def wrapping(wrapped): # A decorator to decorate a decorator's wrapper. Following the lead # of Twisted and Monocle, this is supposed to make debugging heavily # decorated code easier. We'll see... # TODO(pcostello): This copies the functionality of functools.wraps # following the patch in http://bugs.python.org/issue3445. We can replace # this once upgrading to python 3.3. def wrapping_wrapper(wrapper): try: wrapper.__wrapped__ = wrapped wrapper.__name__ = wrapped.__name__ wrapper.__doc__ = wrapped.__doc__ wrapper.__dict__.update(wrapped.__dict__) # Local functions won't have __module__ attribute. if hasattr(wrapped, '__module__'): wrapper.__module__ = wrapped.__module__ except Exception: pass return wrapper return wrapping_wrapper # Define a base class for classes that need to be thread-local. # This is pretty subtle; we want to use threading.local if threading # is supported, but object if it is not.
def set_thread_local(var_name, val): if val is None and has_thread_local(var_name): gl_storage = _get_greenlet_local_storage() # Delete variable from greenlet local storage. if gl_storage: del gl_storage[var_name] # Delete the entire greenlet local storage from thread local storage. if gl_storage and len(gl_storage) == 0: del _th_loc_storage.greenlet_locals[corolocal.get_ident()] if val is not None: gl_storage = _get_greenlet_local_storage() if not gl_storage: gl_storage = _th_loc_storage.greenlet_locals[ corolocal.get_ident()] = {} gl_storage[var_name] = val
def __init__(self, threads=4, *args, **kwargs): self.local = threading.local() super(ThreadBaseScheduler, self).__init__(*args, **kwargs) if isinstance(self.taskdb, SQLiteMixin): self.threads = 1 else: self.threads = threads self._taskdb = self.taskdb self._projectdb = self.projectdb self._resultdb = self.resultdb self.thread_objs = [] self.thread_queues = [] self._start_threads() assert len(self.thread_queues) > 0
def __init__(self, username, password): self.username = username self.password = password # Keep state in per-thread local storage self._thread_local = threading.local()
def acquire(self, blocking=None, blocking_timeout=None): """ Use Redis to hold a shared, distributed lock named ``name``. Returns True once the lock is acquired. If ``blocking`` is False, always return immediately. If the lock was acquired, return True, otherwise return False. ``blocking_timeout`` specifies the maximum number of seconds to wait trying to acquire the lock. """ sleep = self.sleep token = b(uuid.uuid1().hex) if blocking is None: blocking = self.blocking if blocking_timeout is None: blocking_timeout = self.blocking_timeout stop_trying_at = None if blocking_timeout is not None: stop_trying_at = mod_time.time() + blocking_timeout while 1: if self.do_acquire(token): self.local.token = token return True if not blocking: return False if stop_trying_at is not None and mod_time.time() > stop_trying_at: return False mod_time.sleep(sleep)
def release(self): "Releases the already acquired lock" expected_token = self.local.token if expected_token is None: raise LockError("Cannot release an unlocked lock") self.local.token = None self.do_release(expected_token)
def extend(self, additional_time): """ Adds more time to an already acquired lock. ``additional_time`` can be specified as an integer or a float, both representing the number of seconds to add. """ if self.local.token is None: raise LockError("Cannot extend an unlocked lock") if self.timeout is None: raise LockError("Cannot extend a lock with no timeout") return self.do_extend(additional_time)
def do_extend(self, additional_time): additional_time = int(additional_time * 1000) if not bool(self.lua_extend(keys=[self.name], args=[self.local.token, additional_time], client=self.redis)): raise LockError("Cannot extend a lock that's no longer owned") return True
def local(self, sys=sys): return sys.modules[__name__]
def getcontext(_local=local): """Returns this thread's context. If this thread does not yet have a context, returns a new context and sets this thread's context. New contexts are copies of DefaultContext. """ try: return _local.__decimal_context__ except AttributeError: context = Context() _local.__decimal_context__ = context return context
def setcontext(context, _local=local): """Set this thread's context to context.""" if context in (DefaultContext, BasicContext, ExtendedContext): context = context.copy() context.clear_flags() _local.__decimal_context__ = context
def localcontext(ctx=None): """Return a context manager for a copy of the supplied context Uses a copy of the current context if no context is specified The returned context manager creates a local decimal context in a with statement: def sin(x): with localcontext() as ctx: ctx.prec += 2 # Rest of sin calculation algorithm # uses a precision 2 greater than normal return +s # Convert result to normal precision def sin(x): with localcontext(ExtendedContext): # Rest of sin calculation algorithm # uses the Extended Context from the # General Decimal Arithmetic Specification return +s # Convert result to normal context >>> setcontext(DefaultContext) >>> print getcontext().prec 28 >>> with localcontext(): ... ctx = getcontext() ... ctx.prec += 2 ... print ctx.prec ... 30 >>> with localcontext(ExtendedContext): ... print getcontext().prec ... 9 >>> print getcontext().prec 28 """ if ctx is None: ctx = getcontext() return _ContextManager(ctx) ##### Decimal class #######################################################
def progress(self, current, total): """Updates progress bar with status information""" if self.refresh_needed(current, total): message = self.format_message(current, total) if self.new_line_needed(current, total): self.console.new_line(message) else: self.console.same_line(message) # Shortcut methods using thread-local ProgressBar and Console instance
def get_credentials(self): """A thread local Credentials object. Returns: A client.Credentials object, or None if credentials hasn't been set in this thread yet, which may happen when calling has_credentials inside oauth_aware. """ return getattr(self._tls, 'credentials', None)
def get_flow(self): """A thread local Flow object. Returns: A credentials.Flow object, or None if the flow hasn't been set in this thread yet, which happens in _create_flow() since Flows are created lazily. """ return getattr(self._tls, 'flow', None)
def get_contexts(cls): # no race-condition here, cls.contexts is a thread-local object # be sure not to override contexts in a subclass however! if not hasattr(cls.contexts, 'stack'): cls.contexts.stack = [] return cls.contexts.stack
def gettext_translate( s ): """ Thread-safe version of _(). We look up the thread-local translation function. """ return catalogs.translate(s) # Inject the _() function in the builtins. You could also inject a N_() # function for noop translation markers.
def __init__(self, audience): self._thread_local_store = threading.local() self.audience = audience self._header_name = 'x-w69b-idtoken'
def __init__(self): super(ThreadIdent, self).__init__() self._local = threading.local() if issue1868: self._lock = threading.Lock() else: self._lock = DummyLock() # We watch for thread-death using a weakref callback to a thread local. # Weakrefs are permitted on subclasses of object but not object() itself.