Python weakref 模块,WeakKeyDictionary() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用weakref.WeakKeyDictionary()

项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def __init__(self, code=None, namespace=None, process_target=None, use_caching=False):
        """
        Create a new signal.
        """
        if not process_target:
            process_target = self.process
        self.process_target = process_target

        self.receivers = list()
        self.self_refs = dict()
        self.lock = threading.Lock()

        if code:
            self.code = code
        else:
            self.code = self.Meta.code

        if namespace:
            self.namespace = namespace
        else:
            self.namespace = self.Meta.namespace

        self.use_caching = use_caching
        self.sender_receivers_cache = weakref.WeakKeyDictionary() if use_caching else {}
        self._dead_receivers = False
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, clickRadius=2, moveDistance=5, parent=None):
        QtGui.QGraphicsScene.__init__(self, parent)
        self.setClickRadius(clickRadius)
        self.setMoveDistance(moveDistance)
        self.exportDirectory = None

        self.clickEvents = []
        self.dragButtons = []
        self.mouseGrabber = None
        self.dragItem = None
        self.lastDrag = None
        self.hoverItems = weakref.WeakKeyDictionary()
        self.lastHoverEvent = None
        self.minDragTime = 0.5  # drags shorter than 0.5 sec are interpreted as clicks

        self.contextMenu = [QtGui.QAction("Export...", self)]
        self.contextMenu[0].triggered.connect(self.showExportDialog)

        self.exportDialog = None
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, clickRadius=2, moveDistance=5, parent=None):
        QtGui.QGraphicsScene.__init__(self, parent)
        self.setClickRadius(clickRadius)
        self.setMoveDistance(moveDistance)
        self.exportDirectory = None

        self.clickEvents = []
        self.dragButtons = []
        self.mouseGrabber = None
        self.dragItem = None
        self.lastDrag = None
        self.hoverItems = weakref.WeakKeyDictionary()
        self.lastHoverEvent = None
        self.minDragTime = 0.5  # drags shorter than 0.5 sec are interpreted as clicks

        self.contextMenu = [QtGui.QAction("Export...", self)]
        self.contextMenu[0].triggered.connect(self.showExportDialog)

        self.exportDialog = None
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def __init__(self, providing_args=None, use_caching=False):
        """
        Create a new signal.

        providing_args
            A list of the arguments this signal can pass along in a send() call.
        """
        self.receivers = []
        if providing_args is None:
            providing_args = []
        self.providing_args = set(providing_args)
        self.lock = threading.Lock()
        self.use_caching = use_caching
        # For convenience we create empty caches even if they are not used.
        # A note about caching: if use_caching is defined, then for each
        # distinct sender we cache the receivers that sender has in
        # 'sender_receivers_cache'. The cache is cleaned when .connect() or
        # .disconnect() is called and populated on send().
        self.sender_receivers_cache = weakref.WeakKeyDictionary() if use_caching else {}
        self._dead_receivers = False
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def __init__(self, parent_dispatch_cls, fn):
        self.__name__ = fn.__name__
        argspec = util.inspect_getargspec(fn)
        self.arg_names = argspec.args[1:]
        self.has_kw = bool(argspec.keywords)
        self.legacy_signatures = list(reversed(
            sorted(
                getattr(fn, '_legacy_signatures', []),
                key=lambda s: s[0]
            )
        ))
        self.__doc__ = fn.__doc__ = legacy._augment_fn_docs(
            self, parent_dispatch_cls, fn)

        self._clslevel = weakref.WeakKeyDictionary()
        self._empty_listeners = weakref.WeakKeyDictionary()
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def __init__(self, parent_dispatch_cls, fn):
        self.__name__ = fn.__name__
        argspec = util.inspect_getargspec(fn)
        self.arg_names = argspec.args[1:]
        self.has_kw = bool(argspec.keywords)
        self.legacy_signatures = list(reversed(
            sorted(
                getattr(fn, '_legacy_signatures', []),
                key=lambda s: s[0]
            )
        ))
        self.__doc__ = fn.__doc__ = legacy._augment_fn_docs(
            self, parent_dispatch_cls, fn)

        self._clslevel = weakref.WeakKeyDictionary()
        self._empty_listeners = weakref.WeakKeyDictionary()
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
项目:adaptivemd    作者:markovmodel    | 项目源码 | 文件源码
def __init__(self, size_limit=100, weak_type='value'):
        """
        Parameters
        ----------
        size_limit : int
            integer that defines the size of the LRU cache. Default is 100.
        """

        super(WeakLRUCache, self).__init__()
        self._size_limit = size_limit
        self.weak_type = weak_type

        if weak_type == 'value':
            self._weak_cache = weakref.WeakValueDictionary()
        elif weak_type == 'key':
            self._weak_cache = weakref.WeakKeyDictionary()
        else:
            raise ValueError("weak_type must be either 'key' or 'value'")

        self._cache = OrderedDict()
项目:Indushell    作者:SecarmaLabs    | 项目源码 | 文件源码
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
项目:Liljimbo-Chatbot    作者:chrisjim316    | 项目源码 | 文件源码
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
项目:flask_system    作者:prashasy    | 项目源码 | 文件源码
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_deepcopy_weakkeydict(self):
        class C(object):
            def __init__(self, i):
                self.i = i
        a, b, c, d = [C(i) for i in range(4)]
        u = weakref.WeakKeyDictionary()
        u[a] = b
        u[c] = d
        # Keys aren't copied, values are
        v = copy.deepcopy(u)
        self.assertNotEqual(v, u)
        self.assertEqual(len(v), 2)
        self.assertFalse(v[a] is b)
        self.assertFalse(v[c] is d)
        self.assertEqual(v[a].i, b.i)
        self.assertEqual(v[c].i, d.i)
        del c
        support.gc_collect()
        self.assertEqual(len(v), 1)
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
项目:jltools    作者:ownport    | 项目源码 | 文件源码
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
项目:FileStoreGAE    作者:liantian-cn    | 项目源码 | 文件源码
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
项目:DjangoBlog    作者:0daybug    | 项目源码 | 文件源码
def __init__(self, providing_args=None, use_caching=False):
        """
        Create a new signal.

        providing_args
            A list of the arguments this signal can pass along in a send() call.
        """
        self.receivers = []
        if providing_args is None:
            providing_args = []
        self.providing_args = set(providing_args)
        self.lock = threading.Lock()
        self.use_caching = use_caching
        # For convenience we create empty caches even if they are not used.
        # A note about caching: if use_caching is defined, then for each
        # distinct sender we cache the receivers that sender has in
        # 'sender_receivers_cache'. The cache is cleaned when .connect() or
        # .disconnect() is called and populated on send().
        self.sender_receivers_cache = weakref.WeakKeyDictionary() if use_caching else {}
        self._dead_receivers = False
项目:bawk    作者:jttwnsnd    | 项目源码 | 文件源码
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
项目:infinite-lorem-ipsum    作者:patjm1992    | 项目源码 | 文件源码
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_deepcopy_weakkeydict(self):
        class C(object):
            def __init__(self, i):
                self.i = i
        a, b, c, d = [C(i) for i in xrange(4)]
        u = weakref.WeakKeyDictionary()
        u[a] = b
        u[c] = d
        # Keys aren't copied, values are
        v = copy.deepcopy(u)
        self.assertNotEqual(v, u)
        self.assertEqual(len(v), 2)
        self.assertFalse(v[a] is b)
        self.assertFalse(v[c] is d)
        self.assertEqual(v[a].i, b.i)
        self.assertEqual(v[c].i, d.i)
        del c
        self.assertEqual(len(v), 1)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_trashcan_16602(self):
        # Issue #16602: when a weakref's target was part of a long
        # deallocation chain, the trashcan mechanism could delay clearing
        # of the weakref and make the target object visible from outside
        # code even though its refcount had dropped to 0.  A crash ensued.
        class C(object):
            def __init__(self, parent):
                if not parent:
                    return
                wself = weakref.ref(self)
                def cb(wparent):
                    o = wself()
                self.wparent = weakref.ref(parent, cb)

        d = weakref.WeakKeyDictionary()
        root = c = C(None)
        for n in range(100):
            d[c] = c = C(c)
        del root
        gc.collect()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_deepcopy_weakkeydict(self):
        class C(object):
            def __init__(self, i):
                self.i = i
        a, b, c, d = [C(i) for i in xrange(4)]
        u = weakref.WeakKeyDictionary()
        u[a] = b
        u[c] = d
        # Keys aren't copied, values are
        v = copy.deepcopy(u)
        self.assertNotEqual(v, u)
        self.assertEqual(len(v), 2)
        self.assertFalse(v[a] is b)
        self.assertFalse(v[c] is d)
        self.assertEqual(v[a].i, b.i)
        self.assertEqual(v[c].i, d.i)
        del c
        self.assertEqual(len(v), 1)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_trashcan_16602(self):
        # Issue #16602: when a weakref's target was part of a long
        # deallocation chain, the trashcan mechanism could delay clearing
        # of the weakref and make the target object visible from outside
        # code even though its refcount had dropped to 0.  A crash ensued.
        class C(object):
            def __init__(self, parent):
                if not parent:
                    return
                wself = weakref.ref(self)
                def cb(wparent):
                    o = wself()
                self.wparent = weakref.ref(parent, cb)

        d = weakref.WeakKeyDictionary()
        root = c = C(None)
        for n in range(100):
            d[c] = c = C(c)
        del root
        gc.collect()
项目:hart    作者:akosiorek    | 项目源码 | 文件源码
def __init__(self, queue, enqueue_ops):
        """Create a PyQueueRunner.

        When you later call the `create_threads()` method, the `QueueRunner` will
        create one thread for each op in `enqueue_ops`.  Each thread will run its
        enqueue op in parallel with the other threads.  The enqueue ops do not have
        to all be the same op, but it is expected that they all enqueue tensors in
        `queue`.

        Args:
          qnqueue_handler: a python function that transforms
          queue: A `Queue`.
          enqueue_ops: List of enqueue ops to run in threads later.
        """
        self._queue = queue
        self._enqueue_ops = enqueue_ops

        self._lock = threading.Lock()
        # A map from a session object to the number of outstanding queue runner
        # threads for that session.
        self._runs_per_session = weakref.WeakKeyDictionary()
项目:trydjango18    作者:lucifer-yqh    | 项目源码 | 文件源码
def __init__(self, providing_args=None, use_caching=False):
        """
        Create a new signal.

        providing_args
            A list of the arguments this signal can pass along in a send() call.
        """
        self.receivers = []
        if providing_args is None:
            providing_args = []
        self.providing_args = set(providing_args)
        self.lock = threading.Lock()
        self.use_caching = use_caching
        # For convenience we create empty caches even if they are not used.
        # A note about caching: if use_caching is defined, then for each
        # distinct sender we cache the receivers that sender has in
        # 'sender_receivers_cache'. The cache is cleaned when .connect() or
        # .disconnect() is called and populated on send().
        self.sender_receivers_cache = weakref.WeakKeyDictionary() if use_caching else {}
        self._dead_receivers = False
项目:python-group-proj    作者:Sharcee    | 项目源码 | 文件源码
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
项目:islam-buddy    作者:hamir    | 项目源码 | 文件源码
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
项目:covar_me_app    作者:CovarMe    | 项目源码 | 文件源码
def _make_cached_stream_func(src_func, wrapper_func):
    cache = WeakKeyDictionary()
    def func():
        stream = src_func()
        try:
            rv = cache.get(stream)
        except Exception:
            rv = None
        if rv is not None:
            return rv
        rv = wrapper_func()
        try:
            cache[stream] = rv
        except Exception:
            pass
        return rv
    return func
项目:trydjango18    作者:wei0104    | 项目源码 | 文件源码
def __init__(self, providing_args=None, use_caching=False):
        """
        Create a new signal.

        providing_args
            A list of the arguments this signal can pass along in a send() call.
        """
        self.receivers = []
        if providing_args is None:
            providing_args = []
        self.providing_args = set(providing_args)
        self.lock = threading.Lock()
        self.use_caching = use_caching
        # For convenience we create empty caches even if they are not used.
        # A note about caching: if use_caching is defined, then for each
        # distinct sender we cache the receivers that sender has in
        # 'sender_receivers_cache'. The cache is cleaned when .connect() or
        # .disconnect() is called and populated on send().
        self.sender_receivers_cache = weakref.WeakKeyDictionary() if use_caching else {}
        self._dead_receivers = False
项目:sqlbuilder    作者:emacsway    | 项目源码 | 文件源码
def __init__(self, parent=None):
        self._children = WeakKeyDictionary()
        self._parents = []

        self._local_registry = {}
        self._local_reserved_words = {}
        self._local_group_words = {}
        self._local_list_words = {}

        self._registry = {}
        self._reserved_words = {}
        self._group_words = {}
        self._list_words = {}

        if parent:
            self._parents.extend(parent._parents)
            self._parents.append(parent)
            parent._children[self] = True
            self._update_cache()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_deepcopy_weakkeydict(self):
        class C(object):
            def __init__(self, i):
                self.i = i
        a, b, c, d = [C(i) for i in range(4)]
        u = weakref.WeakKeyDictionary()
        u[a] = b
        u[c] = d
        # Keys aren't copied, values are
        v = copy.deepcopy(u)
        self.assertNotEqual(v, u)
        self.assertEqual(len(v), 2)
        self.assertIsNot(v[a], b)
        self.assertIsNot(v[c], d)
        self.assertEqual(v[a].i, b.i)
        self.assertEqual(v[c].i, d.i)
        del c
        self.assertEqual(len(v), 1)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_trashcan_16602(self):
        # Issue #16602: when a weakref's target was part of a long
        # deallocation chain, the trashcan mechanism could delay clearing
        # of the weakref and make the target object visible from outside
        # code even though its refcount had dropped to 0.  A crash ensued.
        class C:
            def __init__(self, parent):
                if not parent:
                    return
                wself = weakref.ref(self)
                def cb(wparent):
                    o = wself()
                self.wparent = weakref.ref(parent, cb)

        d = weakref.WeakKeyDictionary()
        root = c = C(None)
        for n in range(100):
            d[c] = c = C(c)
        del root
        gc.collect()
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def __init__(self, get):
        self._get = get
        self._cache = WeakKeyDictionary()
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
        threading.Thread.__init__(self, group, target, name, args, kwargs)
        self._pid = None
        self._children = weakref.WeakKeyDictionary()
        self._start_called = False
        self._parent = current_process()
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def dns_bulk_resolve(candidates, reverse=False, ip_version=None, threads=50):
    """
    Resolve a list of host names to IPs or, if reverse is true, IPs to
    host names.  Return a map of each result keyed to its candidate.

    WARNING: This function will create a pool of up to 'threads'
    threads.
    """

    # This is based loosely on http://stackoverflow.com/a/34377198

    if reverse and ip_version is not None:
        raise ValueError("Unable to force IP version when reverse-resolving")

    if ip_version is None:
        ip_version = 4
    __check_ip_version__(ip_version)

    result = {}

    if len(candidates) == 0:
        return result

    # Work around a bug in 2.6
    # TODO: Get rid of this when 2.6 is no longer in the picture.
    if not hasattr(threading.current_thread(), "_children"):
        threading.current_thread()._children = weakref.WeakKeyDictionary()

    pool = multiprocessing.dummy.Pool(
        processes=min(len(candidates), threads) )

    candidate_args = [ (candidate, ip_version) for candidate in candidates ]

    for ip, name in pool.imap(
        __reverser__ if reverse else __forwarder__,
        candidate_args,
        chunksize=1):
        result[ip] = name
    pool.close()
    return result
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, area):
        object.__init__(self)
        self.area = area
        self._container = None
        self._stretch = (10, 10)
        self.stretches = weakref.WeakKeyDictionary()
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, widgetList=None):
        """Initialize WidgetGroup, adding specified widgets into this group.
        widgetList can be: 
         - a list of widget specifications (widget, [name], [scale])
         - a dict of name: widget pairs
         - any QObject, and all compatible child widgets will be added recursively.

        The 'scale' parameter for each widget allows QSpinBox to display a different value than the value recorded
        in the group state (for example, the program may set a spin box value to 100e-6 and have it displayed as 100 to the user)
        """
        QtCore.QObject.__init__(self)
        self.widgetList = weakref.WeakKeyDictionary() # Make sure widgets don't stick around just because they are listed here
        self.scales = weakref.WeakKeyDictionary()
        self.cache = {}  ## name:value pairs
        self.uncachedWidgets = weakref.WeakKeyDictionary()
        if isinstance(widgetList, QtCore.QObject):
            self.autoAdd(widgetList)
        elif isinstance(widgetList, list):
            for w in widgetList:
                self.addWidget(*w)
        elif isinstance(widgetList, dict):
            for name, w in widgetList.items():
                self.addWidget(w, name)
        elif widgetList is None:
            return
        else:
            raise Exception("Wrong argument type %s" % type(widgetList))
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, area):
        object.__init__(self)
        self.area = area
        self._container = None
        self._stretch = (10, 10)
        self.stretches = weakref.WeakKeyDictionary()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _type_memos(self):
        return weakref.WeakKeyDictionary()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _take_snapshot(self):
        if not self._is_transaction_boundary:
            self._new = self._parent._new
            self._deleted = self._parent._deleted
            self._dirty = self._parent._dirty
            self._key_switches = self._parent._key_switches
            return

        if not self.session._flushing:
            self.session.flush()

        self._new = weakref.WeakKeyDictionary()
        self._deleted = weakref.WeakKeyDictionary()
        self._dirty = weakref.WeakKeyDictionary()
        self._key_switches = weakref.WeakKeyDictionary()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def __init__(self, parent_dispatch_cls, fn):
        self.name = fn.__name__
        argspec = util.inspect_getargspec(fn)
        self.arg_names = argspec.args[1:]
        self.has_kw = bool(argspec.keywords)
        self.legacy_signatures = list(reversed(
            sorted(
                getattr(fn, '_legacy_signatures', []),
                key=lambda s: s[0]
            )
        ))
        fn.__doc__ = legacy._augment_fn_docs(self, parent_dispatch_cls, fn)

        self._clslevel = weakref.WeakKeyDictionary()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _parents(self):
        """Dictionary of parent object->attribute name on the parent.

        This attribute is a so-called "memoized" property.  It initializes
        itself with a new ``weakref.WeakKeyDictionary`` the first time
        it is accessed, returning the same object upon subsequent access.

        """

        return weakref.WeakKeyDictionary()
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def __init__(self, *args, **kw):
        init_dict = dict(*args, **kw)

        self._d = weakref.WeakKeyDictionary(
            (key, self._create_value(key, value))
            for key, value in init_dict.iteritems())
项目:oriole-service    作者:zhouxiaoxiang    | 项目源码 | 文件源码
def __init__(self, Base):
        self.base = Base
        self.dbs = WeakKeyDictionary()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _async_clients(cls):
        attr_name = '_async_client_dict_' + cls.__name__
        if not hasattr(cls, attr_name):
            setattr(cls, attr_name, weakref.WeakKeyDictionary())
        return getattr(cls, attr_name)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _async_clients(cls):
        attr_name = '_async_client_dict_' + cls.__name__
        if not hasattr(cls, attr_name):
            setattr(cls, attr_name, weakref.WeakKeyDictionary())
        return getattr(cls, attr_name)