我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用weakref.WeakKeyDictionary()。
def __init__(self, code=None, namespace=None, process_target=None, use_caching=False): """ Create a new signal. """ if not process_target: process_target = self.process self.process_target = process_target self.receivers = list() self.self_refs = dict() self.lock = threading.Lock() if code: self.code = code else: self.code = self.Meta.code if namespace: self.namespace = namespace else: self.namespace = self.Meta.namespace self.use_caching = use_caching self.sender_receivers_cache = weakref.WeakKeyDictionary() if use_caching else {} self._dead_receivers = False
def __init__(self, clickRadius=2, moveDistance=5, parent=None): QtGui.QGraphicsScene.__init__(self, parent) self.setClickRadius(clickRadius) self.setMoveDistance(moveDistance) self.exportDirectory = None self.clickEvents = [] self.dragButtons = [] self.mouseGrabber = None self.dragItem = None self.lastDrag = None self.hoverItems = weakref.WeakKeyDictionary() self.lastHoverEvent = None self.minDragTime = 0.5 # drags shorter than 0.5 sec are interpreted as clicks self.contextMenu = [QtGui.QAction("Export...", self)] self.contextMenu[0].triggered.connect(self.showExportDialog) self.exportDialog = None
def _make_cached_stream_func(src_func, wrapper_func): cache = WeakKeyDictionary() def func(): stream = src_func() try: rv = cache.get(stream) except Exception: rv = None if rv is not None: return rv rv = wrapper_func() try: cache[stream] = rv except Exception: pass return rv return func
def __init__(self, providing_args=None, use_caching=False): """ Create a new signal. providing_args A list of the arguments this signal can pass along in a send() call. """ self.receivers = [] if providing_args is None: providing_args = [] self.providing_args = set(providing_args) self.lock = threading.Lock() self.use_caching = use_caching # For convenience we create empty caches even if they are not used. # A note about caching: if use_caching is defined, then for each # distinct sender we cache the receivers that sender has in # 'sender_receivers_cache'. The cache is cleaned when .connect() or # .disconnect() is called and populated on send(). self.sender_receivers_cache = weakref.WeakKeyDictionary() if use_caching else {} self._dead_receivers = False
def __init__(self, parent_dispatch_cls, fn): self.__name__ = fn.__name__ argspec = util.inspect_getargspec(fn) self.arg_names = argspec.args[1:] self.has_kw = bool(argspec.keywords) self.legacy_signatures = list(reversed( sorted( getattr(fn, '_legacy_signatures', []), key=lambda s: s[0] ) )) self.__doc__ = fn.__doc__ = legacy._augment_fn_docs( self, parent_dispatch_cls, fn) self._clslevel = weakref.WeakKeyDictionary() self._empty_listeners = weakref.WeakKeyDictionary()
def __init__(self, size_limit=100, weak_type='value'): """ Parameters ---------- size_limit : int integer that defines the size of the LRU cache. Default is 100. """ super(WeakLRUCache, self).__init__() self._size_limit = size_limit self.weak_type = weak_type if weak_type == 'value': self._weak_cache = weakref.WeakValueDictionary() elif weak_type == 'key': self._weak_cache = weakref.WeakKeyDictionary() else: raise ValueError("weak_type must be either 'key' or 'value'") self._cache = OrderedDict()
def test_deepcopy_weakkeydict(self): class C(object): def __init__(self, i): self.i = i a, b, c, d = [C(i) for i in range(4)] u = weakref.WeakKeyDictionary() u[a] = b u[c] = d # Keys aren't copied, values are v = copy.deepcopy(u) self.assertNotEqual(v, u) self.assertEqual(len(v), 2) self.assertFalse(v[a] is b) self.assertFalse(v[c] is d) self.assertEqual(v[a].i, b.i) self.assertEqual(v[c].i, d.i) del c support.gc_collect() self.assertEqual(len(v), 1)
def test_deepcopy_weakkeydict(self): class C(object): def __init__(self, i): self.i = i a, b, c, d = [C(i) for i in xrange(4)] u = weakref.WeakKeyDictionary() u[a] = b u[c] = d # Keys aren't copied, values are v = copy.deepcopy(u) self.assertNotEqual(v, u) self.assertEqual(len(v), 2) self.assertFalse(v[a] is b) self.assertFalse(v[c] is d) self.assertEqual(v[a].i, b.i) self.assertEqual(v[c].i, d.i) del c self.assertEqual(len(v), 1)
def test_trashcan_16602(self): # Issue #16602: when a weakref's target was part of a long # deallocation chain, the trashcan mechanism could delay clearing # of the weakref and make the target object visible from outside # code even though its refcount had dropped to 0. A crash ensued. class C(object): def __init__(self, parent): if not parent: return wself = weakref.ref(self) def cb(wparent): o = wself() self.wparent = weakref.ref(parent, cb) d = weakref.WeakKeyDictionary() root = c = C(None) for n in range(100): d[c] = c = C(c) del root gc.collect()
def __init__(self, queue, enqueue_ops): """Create a PyQueueRunner. When you later call the `create_threads()` method, the `QueueRunner` will create one thread for each op in `enqueue_ops`. Each thread will run its enqueue op in parallel with the other threads. The enqueue ops do not have to all be the same op, but it is expected that they all enqueue tensors in `queue`. Args: qnqueue_handler: a python function that transforms queue: A `Queue`. enqueue_ops: List of enqueue ops to run in threads later. """ self._queue = queue self._enqueue_ops = enqueue_ops self._lock = threading.Lock() # A map from a session object to the number of outstanding queue runner # threads for that session. self._runs_per_session = weakref.WeakKeyDictionary()
def __init__(self, parent=None): self._children = WeakKeyDictionary() self._parents = [] self._local_registry = {} self._local_reserved_words = {} self._local_group_words = {} self._local_list_words = {} self._registry = {} self._reserved_words = {} self._group_words = {} self._list_words = {} if parent: self._parents.extend(parent._parents) self._parents.append(parent) parent._children[self] = True self._update_cache()
def test_deepcopy_weakkeydict(self): class C(object): def __init__(self, i): self.i = i a, b, c, d = [C(i) for i in range(4)] u = weakref.WeakKeyDictionary() u[a] = b u[c] = d # Keys aren't copied, values are v = copy.deepcopy(u) self.assertNotEqual(v, u) self.assertEqual(len(v), 2) self.assertIsNot(v[a], b) self.assertIsNot(v[c], d) self.assertEqual(v[a].i, b.i) self.assertEqual(v[c].i, d.i) del c self.assertEqual(len(v), 1)
def test_trashcan_16602(self): # Issue #16602: when a weakref's target was part of a long # deallocation chain, the trashcan mechanism could delay clearing # of the weakref and make the target object visible from outside # code even though its refcount had dropped to 0. A crash ensued. class C: def __init__(self, parent): if not parent: return wself = weakref.ref(self) def cb(wparent): o = wself() self.wparent = weakref.ref(parent, cb) d = weakref.WeakKeyDictionary() root = c = C(None) for n in range(100): d[c] = c = C(c) del root gc.collect()
def __init__(self, get): self._get = get self._cache = WeakKeyDictionary()
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): threading.Thread.__init__(self, group, target, name, args, kwargs) self._pid = None self._children = weakref.WeakKeyDictionary() self._start_called = False self._parent = current_process()
def dns_bulk_resolve(candidates, reverse=False, ip_version=None, threads=50): """ Resolve a list of host names to IPs or, if reverse is true, IPs to host names. Return a map of each result keyed to its candidate. WARNING: This function will create a pool of up to 'threads' threads. """ # This is based loosely on http://stackoverflow.com/a/34377198 if reverse and ip_version is not None: raise ValueError("Unable to force IP version when reverse-resolving") if ip_version is None: ip_version = 4 __check_ip_version__(ip_version) result = {} if len(candidates) == 0: return result # Work around a bug in 2.6 # TODO: Get rid of this when 2.6 is no longer in the picture. if not hasattr(threading.current_thread(), "_children"): threading.current_thread()._children = weakref.WeakKeyDictionary() pool = multiprocessing.dummy.Pool( processes=min(len(candidates), threads) ) candidate_args = [ (candidate, ip_version) for candidate in candidates ] for ip, name in pool.imap( __reverser__ if reverse else __forwarder__, candidate_args, chunksize=1): result[ip] = name pool.close() return result
def __init__(self, area): object.__init__(self) self.area = area self._container = None self._stretch = (10, 10) self.stretches = weakref.WeakKeyDictionary()
def __init__(self, widgetList=None): """Initialize WidgetGroup, adding specified widgets into this group. widgetList can be: - a list of widget specifications (widget, [name], [scale]) - a dict of name: widget pairs - any QObject, and all compatible child widgets will be added recursively. The 'scale' parameter for each widget allows QSpinBox to display a different value than the value recorded in the group state (for example, the program may set a spin box value to 100e-6 and have it displayed as 100 to the user) """ QtCore.QObject.__init__(self) self.widgetList = weakref.WeakKeyDictionary() # Make sure widgets don't stick around just because they are listed here self.scales = weakref.WeakKeyDictionary() self.cache = {} ## name:value pairs self.uncachedWidgets = weakref.WeakKeyDictionary() if isinstance(widgetList, QtCore.QObject): self.autoAdd(widgetList) elif isinstance(widgetList, list): for w in widgetList: self.addWidget(*w) elif isinstance(widgetList, dict): for name, w in widgetList.items(): self.addWidget(w, name) elif widgetList is None: return else: raise Exception("Wrong argument type %s" % type(widgetList))
def _type_memos(self): return weakref.WeakKeyDictionary()
def _take_snapshot(self): if not self._is_transaction_boundary: self._new = self._parent._new self._deleted = self._parent._deleted self._dirty = self._parent._dirty self._key_switches = self._parent._key_switches return if not self.session._flushing: self.session.flush() self._new = weakref.WeakKeyDictionary() self._deleted = weakref.WeakKeyDictionary() self._dirty = weakref.WeakKeyDictionary() self._key_switches = weakref.WeakKeyDictionary()
def __init__(self, parent_dispatch_cls, fn): self.name = fn.__name__ argspec = util.inspect_getargspec(fn) self.arg_names = argspec.args[1:] self.has_kw = bool(argspec.keywords) self.legacy_signatures = list(reversed( sorted( getattr(fn, '_legacy_signatures', []), key=lambda s: s[0] ) )) fn.__doc__ = legacy._augment_fn_docs(self, parent_dispatch_cls, fn) self._clslevel = weakref.WeakKeyDictionary()
def _parents(self): """Dictionary of parent object->attribute name on the parent. This attribute is a so-called "memoized" property. It initializes itself with a new ``weakref.WeakKeyDictionary`` the first time it is accessed, returning the same object upon subsequent access. """ return weakref.WeakKeyDictionary()
def __init__(self, *args, **kw): init_dict = dict(*args, **kw) self._d = weakref.WeakKeyDictionary( (key, self._create_value(key, value)) for key, value in init_dict.iteritems())
def __init__(self, Base): self.base = Base self.dbs = WeakKeyDictionary()
def _async_clients(cls): attr_name = '_async_client_dict_' + cls.__name__ if not hasattr(cls, attr_name): setattr(cls, attr_name, weakref.WeakKeyDictionary()) return getattr(cls, attr_name)