我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用weakref.WeakMethod()。
def test_hashing(self): # Alive WeakMethods are hashable if the underlying object is # hashable. x = Object(1) y = Object(1) a = weakref.WeakMethod(x.some_method) b = weakref.WeakMethod(y.some_method) c = weakref.WeakMethod(y.other_method) # Since WeakMethod objects are equal, the hashes should be equal. self.assertEqual(hash(a), hash(b)) ha = hash(a) # Dead WeakMethods retain their old hash value del x, y gc.collect() self.assertEqual(hash(a), ha) self.assertEqual(hash(b), ha) # If it wasn't hashed when alive, a dead WeakMethod cannot be hashed. self.assertRaises(TypeError, hash, c)
def register(self, receiver, weak=True, dispatch_uid=None): """ Connect receiver to sender for signal. :param receiver: A function or an instance method which is to receive signals. Receivers must be hashable objects. If weak is True, then receiver must be weak referenceable.Receivers must be able to accept keyword arguments. If a receiver is connected with a dispatch_uid argument, it will not be added if another receiver was already connected with that dispatch_uid. :param weak: Whether to use weak references to the receiver. By default, the module will attempt to use weak references to the receiver objects. If this parameter is false, then strong references will be used. :param dispatch_uid: An identifier used to uniquely identify a particular instance of a receiver. This will usually be a string, though it may be anything hashable. """ if dispatch_uid: lookup_key = dispatch_uid else: lookup_key = _make_id(receiver) if weak: ref = weakref.ref receiver_object = receiver # Check for bound methods. if hasattr(receiver, '__self__') and hasattr(receiver, '__func__'): ref = weakref.WeakMethod receiver_object = receiver.__self__ receiver = ref(receiver) weakref.finalize(receiver_object, self._remove_receiver) with self.lock: self._clear_dead_receivers() for rec_key in self.receivers: if rec_key == lookup_key: break else: self.receivers.append((lookup_key, receiver)) self.sender_receivers_cache.clear()
def register_user_callback(self, func): """ Func to be called when a subscription receives a new EventAdd command. This function will be called by a Task in the main thread. If ``func`` needs to do CPU-intensive or I/O-related work, it should execute that work in a separate thread of process. """ if inspect.ismethod(func): self._callback = weakref.WeakMethod(func, self._callback_cleared) else: self._callback = weakref.ref(func, self._callback_cleared)
def test_alive(self): o = Object(1) r = weakref.WeakMethod(o.some_method) self.assertIsInstance(r, weakref.ReferenceType) self.assertIsInstance(r(), type(o.some_method)) self.assertIs(r().__self__, o) self.assertIs(r().__func__, o.some_method.__func__) self.assertEqual(r()(), 4)
def test_object_dead(self): o = Object(1) r = weakref.WeakMethod(o.some_method) del o gc.collect() self.assertIs(r(), None)
def test_method_dead(self): C = self._subclass() o = C(1) r = weakref.WeakMethod(o.some_method) del C.some_method gc.collect() self.assertIs(r(), None)
def test_callback_when_object_dead(self): # Test callback behaviour when object dies first. C = self._subclass() calls = [] def cb(arg): calls.append(arg) o = C(1) r = weakref.WeakMethod(o.some_method, cb) del o gc.collect() self.assertEqual(calls, [r]) # Callback is only called once. C.some_method = Object.some_method gc.collect() self.assertEqual(calls, [r])
def test_callback_when_method_dead(self): # Test callback behaviour when method dies first. C = self._subclass() calls = [] def cb(arg): calls.append(arg) o = C(1) r = weakref.WeakMethod(o.some_method, cb) del C.some_method gc.collect() self.assertEqual(calls, [r]) # Callback is only called once. del o gc.collect() self.assertEqual(calls, [r])
def test_equality(self): def _eq(a, b): self.assertTrue(a == b) self.assertFalse(a != b) def _ne(a, b): self.assertTrue(a != b) self.assertFalse(a == b) x = Object(1) y = Object(1) a = weakref.WeakMethod(x.some_method) b = weakref.WeakMethod(y.some_method) c = weakref.WeakMethod(x.other_method) d = weakref.WeakMethod(y.other_method) # Objects equal, same method _eq(a, b) _eq(c, d) # Objects equal, different method _ne(a, c) _ne(a, d) _ne(b, c) _ne(b, d) # Objects unequal, same or different method z = Object(2) e = weakref.WeakMethod(z.some_method) f = weakref.WeakMethod(z.other_method) _ne(a, e) _ne(a, f) _ne(b, e) _ne(b, f) del x, y, z gc.collect() # Dead WeakMethods compare by identity refs = a, b, c, d, e, f for q in refs: for r in refs: self.assertEqual(q == r, q is r) self.assertEqual(q != r, q is not r)
def test_subscribe_method(self, pubpen): """Test that adding method callbacks succeed""" foo = Foo() first = pubpen.subscribe('test_event', foo.method) # Test internals of saving worked assert pubpen._subscriptions[first] == 'test_event' assert len(pubpen._event_handlers['test_event']) == 1 event = pubpen._event_handlers['test_event'] assert list(event.keys()) == [first] assert list(event.values()) == [weakref.WeakMethod(foo.method)]
def test_subscribe_diff_callback_same_event(self, pubpen): first = pubpen.subscribe('test_event', function) foo = Foo() second = pubpen.subscribe('test_event', foo.method) # Test internals of subscribing worked assert pubpen._subscriptions[first] == 'test_event' assert pubpen._subscriptions[second] == 'test_event' assert len(pubpen._event_handlers['test_event']) == 2 events = pubpen._event_handlers['test_event'] assert events[first] != events[second] assert events[first] in (weakref.ref(function), weakref.WeakMethod(foo.method)) assert events[second] in (weakref.ref(function), weakref.WeakMethod(foo.method))
def test_subscribe_diff_callback_diff_event(self, pubpen): first = pubpen.subscribe('test_event1', function) foo = Foo() second = pubpen.subscribe('test_event2', foo.method) # Test internals of subscribing worked assert pubpen._subscriptions[first] == 'test_event1' assert pubpen._subscriptions[second] == 'test_event2' assert len(pubpen._event_handlers['test_event1']) == 1 assert len(pubpen._event_handlers['test_event2']) == 1 events = pubpen._event_handlers['test_event1'] assert events[first] == weakref.ref(function) events = pubpen._event_handlers['test_event2'] assert events[second] == weakref.WeakMethod(foo.method)
def register(self, slot): if inspect.ismethod(slot): self._methods.add(weakref.WeakMethod(slot)) else: self._functions.add(slot)
def __init__(self, slot, weak=False): self._weak = weak or isinstance(slot, weakref.ref) if weak and not isinstance(slot, weakref.ref): if isinstance(slot, types.MethodType): slot = WeakMethod(slot) else: slot = weakref.ref(slot) self._slot = slot
def subscribe(self, event, callback): """ Subscribe a callback to an event :arg event: String name of an event to subscribe to :callback: The function to call when the event is published. This can be any python callable. Use :func:`functools.partial` to call the callback with any other arguments. .. note:: The callback is registered with the event each time this method is called. The callback is called each time it has been registered when the event is published. For example:: >>> import asyncio >>> import pubmarine >>> pubpen = pubmarine.PubPen(asyncio.get_event_loop) >>> def message(): ... print('message called') >>> pubpen.subscribe('test', message) >>> pubpen.subscribe('test', message) >>> pubpen.publish('test') message called message called If the caller wants the callback to only be called once, it is the caller's responsibility to only subscribe the callback once. """ if self._event_list and event not in self._event_list: raise EventNotFoundError('{} is not a registered event' .format(event)) # Get an id for the subscription sub_id = next(self._next_id) self._subscriptions[sub_id] = event try: # Add a method self._event_handlers[event][sub_id] = WeakMethod(callback) except TypeError: # Add a function self._event_handlers[event][sub_id] = ref(callback) return sub_id