我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用weakref.WeakValueDictionary()。
def __init__(self, session): super(Connection, self).__init__() self._connectionStatus = ConnectionState.LoggedOut spotifyconnect._connection_instance = self self._cache = weakref.WeakValueDictionary() self._emitters = [] self._callback_handles = set() spotifyconnect.Error.maybe_raise( lib.SpRegisterConnectionCallbacks( _ConnectionCallbacks.get_struct(), session)) spotifyconnect.Error.maybe_raise( lib.SpRegisterDebugCallbacks( _DebugCallbacks.get_struct(), session))
def __init__(self, size_limit=100, weak_type='value'): """ Parameters ---------- size_limit : int integer that defines the size of the LRU cache. Default is 100. """ super(WeakLRUCache, self).__init__() self._size_limit = size_limit self.weak_type = weak_type if weak_type == 'value': self._weak_cache = weakref.WeakValueDictionary() elif weak_type == 'key': self._weak_cache = weakref.WeakKeyDictionary() else: raise ValueError("weak_type must be either 'key' or 'value'") self._cache = OrderedDict()
def test_deepcopy_weakvaluedict(self): class C(object): def __init__(self, i): self.i = i a, b, c, d = [C(i) for i in range(4)] u = weakref.WeakValueDictionary() u[a] = b u[c] = d # Keys are copied, values aren't v = copy.deepcopy(u) self.assertNotEqual(v, u) self.assertEqual(len(v), 2) (x, y), (z, t) = sorted(v.items(), key=lambda pair: pair[0].i) self.assertFalse(x is a) self.assertEqual(x.i, a.i) self.assertTrue(y is b) self.assertFalse(z is c) self.assertEqual(z.i, c.i) self.assertTrue(t is d) del x, y, z, t del d support.gc_collect() self.assertEqual(len(v), 1)
def test_deepcopy_weakvaluedict(self): class C(object): def __init__(self, i): self.i = i a, b, c, d = [C(i) for i in xrange(4)] u = weakref.WeakValueDictionary() u[a] = b u[c] = d # Keys are copied, values aren't v = copy.deepcopy(u) self.assertNotEqual(v, u) self.assertEqual(len(v), 2) (x, y), (z, t) = sorted(v.items(), key=lambda pair: pair[0].i) self.assertFalse(x is a) self.assertEqual(x.i, a.i) self.assertTrue(y is b) self.assertFalse(z is c) self.assertEqual(z.i, c.i) self.assertTrue(t is d) del x, y, z, t del d self.assertEqual(len(v), 1)
def test_weak_valued_dict_update(self): self.check_update(weakref.WeakValueDictionary, {1: C(), 'a': C(), C(): C()}) # errors self.assertRaises(TypeError, weakref.WeakValueDictionary.update) d = weakref.WeakValueDictionary() self.assertRaises(TypeError, d.update, {}, {}) self.assertRaises(TypeError, d.update, (), ()) self.assertEqual(list(d.keys()), []) # special keyword arguments o = Object(3) for kw in 'self', 'dict', 'other', 'iterable': d = weakref.WeakValueDictionary() d.update(**{kw: o}) self.assertEqual(list(d.keys()), [kw]) self.assertEqual(d[kw], o)
def __init__(self, token: str, *, bot: bool = True, max_connections: int = 10): #: The token used for all requests. self.token = token # Calculated headers headers = { "User-Agent": self.USER_AGENT, "Authorization": "{}{}".format("Bot " if bot else "", self.token) } self.session = asks.Session(base_location=Endpoints.BASE, endpoint=Endpoints.API_BASE, connections=max_connections) self.headers = headers #: The global ratelimit lock. self.global_lock = multio.Lock() self._rate_limits = weakref.WeakValueDictionary() self._ratelimit_remaining = lru(1024) self._is_bot = bot
def __init__(self, dirpath, **kwargs): self.dirpath = dirpath self.dirpath = os.path.abspath(self.dirpath) self.dirpath = os.path.normpath(self.dirpath) self.source = self.create_com_object() self.filter = self.create_com_object_filter() try: self.source.OpenDataFile(self.dirpath) except comtypes.COMError as err: raise IOError(str(err)) self._TIC = self.source.GetTIC() self.device = self._TIC.DeviceName self._n_spectra = self._TIC.TotalDataPoints self._scan_types_flags = self.source.MSScanFileInformation.ScanTypes self._producer = self._scan_group_iterator() self._scan_cache = WeakValueDictionary() self._index = self._pack_index() self._get_instrument_info()
def __init__(self, client, config): self.client = client self.config = config self.ready = Event() self.guilds_waiting_sync = 0 self.me = None self.dms = HashMap() self.guilds = HashMap() self.channels = HashMap(weakref.WeakValueDictionary()) self.users = HashMap(weakref.WeakValueDictionary()) self.voice_states = HashMap(weakref.WeakValueDictionary()) # If message tracking is enabled, listen to those events if self.config.track_messages: self.messages = DefaultHashMap(lambda: deque(maxlen=self.config.track_messages_size)) self.EVENTS += ['MessageDelete', 'MessageDeleteBulk'] # The bound listener objects self.listeners = [] self.bind()
def test_deepcopy_weakvaluedict(self): class C(object): def __init__(self, i): self.i = i a, b, c, d = [C(i) for i in range(4)] u = weakref.WeakValueDictionary() u[a] = b u[c] = d # Keys are copied, values aren't v = copy.deepcopy(u) self.assertNotEqual(v, u) self.assertEqual(len(v), 2) (x, y), (z, t) = sorted(v.items(), key=lambda pair: pair[0].i) self.assertIsNot(x, a) self.assertEqual(x.i, a.i) self.assertIs(y, b) self.assertIsNot(z, c) self.assertEqual(z.i, c.i) self.assertIs(t, d) del x, y, z, t del d self.assertEqual(len(v), 1)
def stop(self): """Stops all child threads and activities and closes associated sockets. Re-initializes this activity to be able to start again. Raise `ActivityException` if activity is not currently started. """ if not self.started: raise ActivityException(desc='Cannot call stop when activity is ' 'not started or has been stopped already.') LOG.debug('Stopping activity %s.', self.name) self._stop_timers() self._stop_child_activities() self._stop_child_threads() self._close_asso_sockets() # Setup activity for start again. self._started = False self._asso_socket_map = weakref.WeakValueDictionary() self._child_activity_map = weakref.WeakValueDictionary() self._child_thread_map = weakref.WeakValueDictionary() self._timers = weakref.WeakValueDictionary() LOG.debug('Stopping activity %s finished.', self.name)
def __init__(self, gen=None): super().__init__() if gen is None: def gen(): yield self._check_on_close = False else: self._check_on_close = True self._gen = gen() next(self._gen) self._time = 0 self._clock_resolution = 1e-9 self._timers = [] self._selector = TestSelector() self.readers = {} self.writers = {} self.reset_counters() self._transports = weakref.WeakValueDictionary()
def test_deepcopy_weakvaluedict(self): class C(object): def __init__(self, i): self.i = i a, b, c, d = [C(i) for i in xrange(4)] u = weakref.WeakValueDictionary() u[a] = b u[c] = d # Keys are copied, values aren't v = copy.deepcopy(u) self.assertNotEqual(v, u) self.assertEqual(len(v), 2) (x, y), (z, t) = sorted(v.items(), key=lambda pair: pair[0].i) self.assertFalse(x is a) self.assertEqual(x.i, a.i) self.assertTrue(y is b) self.assertFalse(z is c) self.assertEqual(z.i, c.i) self.assertTrue(t is d) del x, y, z, t del d test_support.gc_collect() self.assertEqual(len(v), 1)
def reset(self): "Reset the state." self.offset = 0 self.current = {} self.previous = {} self.shared = {} self.callback = weakref.WeakValueDictionary()
def __init__( self, loop=None, max_frame_size=None, bakery_client=None, jujudata=None, ): """Instantiate a new Model. The connect method will need to be called before this object can be used for anything interesting. If jujudata is None, jujudata.FileJujuData will be used. :param loop: an asyncio event loop :param max_frame_size: See `juju.client.connection.Connection.MAX_FRAME_SIZE` :param bakery_client httpbakery.Client: The bakery client to use for macaroon authorization. :param jujudata JujuData: The source for current controller information. """ self._connector = connector.Connector( loop=loop, max_frame_size=max_frame_size, bakery_client=bakery_client, jujudata=jujudata, ) self._observers = weakref.WeakValueDictionary() self.state = ModelState(self) self._info = None self._watch_stopping = asyncio.Event(loop=self._connector.loop) self._watch_stopped = asyncio.Event(loop=self._connector.loop) self._watch_received = asyncio.Event(loop=self._connector.loop) self._watch_stopped.set() self._charmstore = CharmStore(self._connector.loop)
def test_cache_create(self): self.assertIsNone(self.plot._cache) cache = self.plot.cache self.assertIsInstance(cache, WeakValueDictionary) self.assertEqual(cache, {})
def test_bad_cache_setter(self): emsg = ("Require cache to be a 'WeakValueDictionary', " "got 'dict'") with self.assertRaisesRegexp(TypeError, emsg): self.plot.cache = dict()
def factory_constructed(class_): cache = WeakValueDictionary() def factory(*args, **kwargs): key = (args, frozenset(kwargs.items())) instance = cache.get(key) if instance is not None: return instance instance = class_(*args, **kwargs) cache[key] = instance return instance factory.type = class_ return factory
def __init__(self, temporary=False, home=None): Container.__init__(self, self) QtGui.QWidget.__init__(self) DockDrop.__init__(self, allowedAreas=['left', 'right', 'top', 'bottom']) self.layout = QtGui.QVBoxLayout() self.layout.setContentsMargins(0,0,0,0) self.layout.setSpacing(0) self.setLayout(self.layout) self.docks = weakref.WeakValueDictionary() self.topContainer = None self.raiseOverlay() self.temporary = temporary self.tempAreas = [] self.home = home
def __init__(self): # symbol key : QRect(...) coordinates where symbol can be found in atlas. # note that the coordinate list will always be the same list object as # long as the symbol is in the atlas, but the coordinates may # change if the atlas is rebuilt. # weak value; if all external refs to this list disappear, # the symbol will be forgotten. self.symbolMap = weakref.WeakValueDictionary() self.atlasData = None # numpy array of atlas image self.atlas = None # atlas as QPixmap self.atlasValid = False self.max_width=0
def __init__(self): self.objs = weakref.WeakValueDictionary() self.allNames = []
def __init__(self, moveEvent, acceptable): self.enter = False self.acceptable = acceptable self.exit = False self.__clickItems = weakref.WeakValueDictionary() self.__dragItems = weakref.WeakValueDictionary() self.currentItem = None if moveEvent is not None: self._scenePos = moveEvent.scenePos() self._screenPos = moveEvent.screenPos() self._lastScenePos = moveEvent.lastScenePos() self._lastScreenPos = moveEvent.lastScreenPos() self._buttons = moveEvent.buttons() self._modifiers = moveEvent.modifiers() else: self.exit = True
def __init__(self): if CanvasManager.SINGLETON is not None: raise Exception("Can only create one canvas manager.") CanvasManager.SINGLETON = self QtCore.QObject.__init__(self) self.canvases = weakref.WeakValueDictionary()
def prune(self): """prune unreferenced, non-dirty states.""" ref_count = len(self) dirty = [s.obj() for s in self.all_states() if s.modified] # work around http://bugs.python.org/issue6149 keepers = weakref.WeakValueDictionary() keepers.update(self) self._dict.clear() self._dict.update(keepers) self.modified = bool(dirty) return ref_count - len(self)
def __init__(self): self.__slots = WeakValueDictionary()
def __new__(typ, *args, **kw): #check the class has an __instances__ dict, if not, #create it and initialize __instance_id. try: typ.__instances__ except AttributeError: typ.__instance_id = 0 typ.__instances__ = weakref.WeakValueDictionary() obj = object.__new__(typ, *args, **kw) obj.id = typ.__instance_id typ.__instances__[typ.__instance_id] = obj typ.__instance_id += 1 return obj
def __init__(self, function, capacity=50): self.cache = weakref.WeakValueDictionary() self.next = self.previous = self self.function = function self.capacity = capacity
def observable__getattribute__(f): """ decoratore per Observable.__getattribute__""" # https://bugs.python.org/issue3445 # from functools import wraps # @wraps(f) def wrapper(*args, **kwargs): instance, attribute = args[:2] result = f(*args, **kwargs) if attribute == '_Observable__observers' and result is None: result = weakref.WeakValueDictionary() setattr(instance, '_Observable__observers', result) return result return wrapper
def global_cache(srctype, ffi, funcname, *args, **kwds): key = kwds.pop('key', (funcname, args)) assert not kwds try: return ffi._backend.__typecache[key] except KeyError: pass except AttributeError: # initialize the __typecache attribute, either at the module level # if ffi._backend is a module, or at the class level if ffi._backend # is some instance. if isinstance(ffi._backend, types.ModuleType): ffi._backend.__typecache = weakref.WeakValueDictionary() else: type(ffi._backend).__typecache = weakref.WeakValueDictionary() try: res = getattr(ffi._backend, funcname)(*args) except NotImplementedError as e: raise NotImplementedError("%s: %r: %s" % (funcname, srctype, e)) # note that setdefault() on WeakValueDictionary is not atomic # and contains a rare bug (http://bugs.python.org/issue19542); # we have to use a lock and do it ourselves cache = ffi._backend.__typecache with global_lock: res1 = cache.get(key) if res1 is None: cache[key] = res return res else: return res1
def getReference(self, obj): return weakref.WeakValueDictionary(obj)
def __init__(self, root_name, int_starts_with): # dict of {'id': obj } for items to be evaluated eventually. self.object_to_eval = weakref.WeakValueDictionary() self.evaluated_items = {} # dict of {'path': evaluated object} self.root_name = root_name self.int_regex = re.compile('^{}([\d]+)$'.format(int_starts_with))
def _load_wrapper(self): self._threadLock.acquire() paths_to_eval = tuple( set(i._item_key for i in self._registry.object_to_eval.values())) self._registry.object_to_eval = weakref.WeakValueDictionary() new_items = self.load(paths_to_eval) if isinstance(new_items, MutableMapping): self._registry.evaluated_items.update(new_items) self._threadLock.release() else: self._threadLock.release() raise Exception( "load method needs to return a dictionary of {path: value}")
def __init__(self, rowClasses): """ Initialize me against a database. @param rowClasses: a list of row class objects that describe the database schema. """ self.rowCache = weakref.WeakValueDictionary() # does not hold references to cached rows. self.rowClasses = rowClasses self.schema = {} self._populate()
def __setstate__(self, state): self.__dict__ = state self.rowCache = weakref.WeakValueDictionary() self._populate()
def __init__(self): self.__memo = weakref.WeakValueDictionary()
def cache(func=..., *, key='received'): """Memoize the function using weak references. Once the decorated function has been called with a given signature, it will return the same object for all subsequent calls with that signature, as long as another non-weak reference to the object still exists. >>> class WeakReferenceableList(list): ... pass # Built-in lists can't be weak-referenced >>> @cache ... def say(word, also='lmao'): ... return WeakReferenceableList([word, also]) >>> say('ayy') is say('ayy') True >>> say('ayy') is say('ayy', 'lmao') True >>> say('ayy') is say('ayy', also='lmao') True >>> say('ayy') is say(also='lmao', word='ayy') True """ return memoize(func, cache=weakref.WeakValueDictionary(), key=key)
def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.__cache = weakref.WeakValueDictionary()
def __init__(self, context=None, **kwds): super(DjangoTranslator, self).__init__(**kwds) if context is not None: self.context = context self._django_hasher_cache = weakref.WeakKeyDictionary() self._passlib_hasher_cache = weakref.WeakValueDictionary()
def __init__(self, name, aliases=(), defaults=None): self.name = name self.aliases = aliases #: Maps (src, dst) -> transformation function self.funcs = {} #: Maps defaults variable names to values self.defaults = defaults or {} #: Maps (src, dst) -> self #: Used as a convenience dictionary to be composed by ContextChain self.relation_to_context = weakref.WeakValueDictionary()
def __init__(self, identity_map=None): if identity_map is not None: self.identity_map = identity_map else: self.identity_map = weakref.WeakValueDictionary() self.attributes = global_attributes self.new = util.HashSet(ordered = True) self.dirty = util.HashSet() self.modified_lists = util.HashSet() self.deleted = util.HashSet()
def __init__(self, echo = False, use_threadlocal = True): self._threadconns = weakref.WeakValueDictionary() self._use_threadlocal = use_threadlocal self._echo = echo