我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用weakref.finalize()。
def __new__(cls, dir=False, **kwargs): """ Create a tempfile, return pathlib.Path reference to it. """ if dir: name = tempfile.mkdtemp(**kwargs) else: fd, name = tempfile.mkstemp(**kwargs) # fd is now assigned to our process table, but we don't need to do # anything with the file. We will call `open` on the `name` later # producing a different file descriptor, so close this one to # prevent a resource leak. os.close(fd) obj = super().__new__(cls, name) obj._destructor = weakref.finalize(obj, cls._destruct, str(obj)) return obj
def add_socket(self, socket, target_obj): with self._socket_map_lock: if socket in self.socket_to_id: raise ValueError('Socket already added') socket.setblocking(False) # assumption: only one socket per object self._object_id += 1 self.objects[self._object_id] = target_obj self.id_to_socket[self._object_id] = socket self.socket_to_id[socket] = self._object_id weakref.finalize(target_obj, lambda obj_id=self._object_id: self._object_removed(obj_id)) self._register_sockets.append(socket)
def __init__(self) -> None: '''Create a temporary file. The path to the file can be retrieved from the `name` attribute on the returned object. To ensure proper removal of the file after use, make sure to call `cleanup()` on the returned object, or use it as a context manager. ''' fd, self.name = tempfile.mkstemp(prefix='pyaspio_') # Ideally, we would use this file descriptor instead of reopening the file from the path later, # but then we do not know whether the file descriptor has already been closed. # (possible problems: we might leak a file descriptor, or close it twice (thus risking to close another unrelated file)) os.close(fd) self._cleanup_warning = weakref.finalize(self, _warn_cleanup, type(self).__name__, self.name) # type: ignore self._cleanup_warning.atexit = True
def __init__(self, *, process: subprocess.Popen, encoding: str, tmp_input: FilesystemIPC) -> None: self.process = process self.stdout_encoding = encoding self.iterating = False # # We need to capture stderr in a background thread to avoid deadlocks. # (The problem would occur when dlvhex2 is blocked because the OS buffers on the stderr pipe are full... so we have to constantly read from *both* stdout and stderr) self.stderr_capture_thread = StreamCaptureThread(self.process.stderr) self.stderr_capture_thread.start() # # Set up finalization. Using weakref.finalize seems to work more robustly than using __del__. # (One problem that occurred with __del__: It seemed like python was calling __del__ for self.process and its IO streams first, # which resulted in ResourceWarnings even though we were closing the streams properly in our __del__ function.) self._finalize = weakref.finalize(self, DlvhexLineReader.__close, process, self.stderr_capture_thread, encoding, tmp_input) # type: ignore # Make sure the subprocess will be terminated if it's still running when the python process exits self._finalize.atexit = True
def test_all_freed(self): # we want a weakrefable subclass of weakref.finalize class MyFinalizer(weakref.finalize): pass a = self.A() res = [] def callback(): res.append(123) f = MyFinalizer(a, callback) wr_callback = weakref.ref(callback) wr_f = weakref.ref(f) del callback, f self.assertIsNotNone(wr_callback()) self.assertIsNotNone(wr_f()) del a self._collect_if_necessary() self.assertIsNone(wr_callback()) self.assertIsNone(wr_f()) self.assertEqual(res, [123])
def run_in_child(cls): def error(): # Create an atexit finalizer from inside a finalizer called # at exit. This should be the next to be run. g1 = weakref.finalize(cls, print, 'g1') print('f3 error') 1/0 # cls should stay alive till atexit callbacks run f1 = weakref.finalize(cls, print, 'f1', _global_var) f2 = weakref.finalize(cls, print, 'f2', _global_var) f3 = weakref.finalize(cls, error) f4 = weakref.finalize(cls, print, 'f4', _global_var) assert f1.atexit == True f2.atexit = False assert f3.atexit == True assert f4.atexit == True
def __init__(self, suffix=None, prefix=None, dir=None): self.name = mkdtemp(suffix, prefix, dir) self._finalizer = _weakref.finalize( self, self._cleanup, self.name, warn_message="Implicitly cleaning up {!r}".format(self))
def register(self, receiver, weak=True, dispatch_uid=None): """ Connect receiver to sender for signal. :param receiver: A function or an instance method which is to receive signals. Receivers must be hashable objects. If weak is True, then receiver must be weak referenceable.Receivers must be able to accept keyword arguments. If a receiver is connected with a dispatch_uid argument, it will not be added if another receiver was already connected with that dispatch_uid. :param weak: Whether to use weak references to the receiver. By default, the module will attempt to use weak references to the receiver objects. If this parameter is false, then strong references will be used. :param dispatch_uid: An identifier used to uniquely identify a particular instance of a receiver. This will usually be a string, though it may be anything hashable. """ if dispatch_uid: lookup_key = dispatch_uid else: lookup_key = _make_id(receiver) if weak: ref = weakref.ref receiver_object = receiver # Check for bound methods. if hasattr(receiver, '__self__') and hasattr(receiver, '__func__'): ref = weakref.WeakMethod receiver_object = receiver.__self__ receiver = ref(receiver) weakref.finalize(receiver_object, self._remove_receiver) with self.lock: self._clear_dead_receivers() for rec_key in self.receivers: if rec_key == lookup_key: break else: self.receivers.append((lookup_key, receiver)) self.sender_receivers_cache.clear()
def __init__(self, defer_atexit=False): """Constructor. :param bool defer_atexit: cleanup() to atexit instead of after garbage collection. """ self.name = tempfile.mkdtemp('sphinxcontrib_versioning') if defer_atexit: atexit.register(shutil.rmtree, self.name, True) return try: weakref.finalize(self, shutil.rmtree, self.name, True) except AttributeError: weakref.proxy(self, functools.partial(shutil.rmtree, self.name, True))
def __init__(self, bus): assert isinstance(bus, Bus) self.__bus = bus self.__listeners = [] weakref.finalize(self, _cleanup_listeners, self.__bus, self.__listeners) atexit.register(self.close) self._listen(event_ids.SYSTEM__QUIT, target_ids.ANY, self.__on_close)
def finalize(self): """Finalizes the object if not already done. Returns: None """ # this is the "public" finalize method raise NotImplementedError( "finalize() must be implemented by AutoFinalizedObject." )
def __del__(self): self.finalize()
def _do_finalize_object_ref(obj_ref): """Helper function for weakref.finalize() that dereferences a weakref to an object and calls its _do_finalize_object() method if the object is still alive. Does nothing otherwise. Returns: None (implicit) Arguments: * obj_ref -- weakref to an object """ obj = obj_ref() if obj is not None: # else object disappeared obj._do_finalize_object()
def __new__(cls, *args, **kwargs): """Creates a new object instance and adds the private finalizer attributes to it. Returns: new object instance Arguments: * *args, **kwargs -- passed to the parent instance creator (which ignores them) """ # Note: Do not pass a (hard) reference to instance to the # finalizer as func/args/kwargs, it'd keep the object # alive until the program terminates. # A weak reference is fine. # # Note 2: When using weakrefs and not calling finalize() in # __del__, the object may already have disappeared # when weakref.finalize() kicks in. # Make sure that _finalizer() gets called, # i.e. keep __del__() from the base class. # # Note 3: the _finalize_called attribute is (probably) useless # for this class instance = super(AutoFinalizedObject, cls).__new__( cls, *args, **kwargs ) instance._finalizer = weakref.finalize( instance, _do_finalize_object_ref, weakref.ref(instance) ) return instance
def finalize(self): """Finalizes the object if not already done.""" self._finalizer()
def __init__(self, suffix=None, prefix=None, dir=None, path=None): self.name = self.mkdir(path) if path else tempfile.mkdtemp( suffix, prefix, dir ) self._finalizer = weakref.finalize( self, self._cleanup, self.name, warn_message="Implicitly cleaning up {!r}".format(self))
def __init__(self, *, doctype: str = 'html', title: str = 'W-DOM', charset: str = 'utf-8', default_class: type = WdomElement, autoreload: bool = None, reload_wait: float =None, **kwargs: Any) -> None: """Create new document object for WDOM application. .. caution:: Don't create new document from :class:`WdomDocument` class constructor. Use :func:`get_new_document` function instead. :arg str doctype: doctype of the document (default: html). :arg str title: title of the document. :arg str charset: charset of the document. :arg type default_class: Set default Node class of the document. This class is used when make node by :py:meth:`createElement()` :arg bool autoreload: Enable/Disable autoreload (default: False). :arg float reload_wait: How long (seconds) wait to reload. This parameter is only used when autoreload is enabled. """ self.__tempdir = _tempdir = tempfile.mkdtemp() self._finalizer = weakref.finalize(self, # type: ignore partial(_cleanup, _tempdir)) self._autoreload = autoreload self._reload_wait = reload_wait super().__init__(doctype=doctype, default_class=default_class) self.characterSet = charset self.title = title self.script = Script(parent=self.body) self._autoreload_script = Script(parent=self.head) self.addEventListener('mount', self._on_mount)
def get_cursor( self, connection=None, release_new_connection=True, force_release_connection=False, ): need_release_connection = release_new_connection and not connection connection = connection or await self.get_conn() cursor = await connection.cursor() if need_release_connection or force_release_connection: pool = await self.pool weakref.finalize(cursor, pool.release, connection) return cursor
def __new(cls, *args): self = super().__new__(cls, *args) self._destructor = weakref.finalize(self, self._destruct, str(self)) return self
def add_listener(self, listener): self.listeners.add(listener) weakref.finalize(listener, self._listener_removed)
def __init__(self) -> None: '''Create a temporary named pipe. The path to the named pipe can be retrieved from the `name` attribute on the returned object. To ensure proper removal of the pipe after use, make sure to call `cleanup()` on the returned object, or use it as a context manager. Raises `OSError` if the named pipe could not be created. Raises `NotImplementedError` if `mkfifo` from the builtin `os` module is not available. ''' if hasattr(os, 'mkfifo'): self.tmpdir = tempfile.TemporaryDirectory(prefix='pyaspio_') # creates a temporary directory, avoiding any race conditions self.name = os.path.join(self.tmpdir.name, 'pipe') os.mkfifo(self.name) # may raise OSError self._cleanup_warning = weakref.finalize(self, _warn_cleanup, type(self).__name__, self.name) # type: ignore self._cleanup_warning.atexit = True else: # Notes about a possible Windows implementation: # Windows provides named pipes, see the CreateNamedPipe function in the Windows API: https://msdn.microsoft.com/en-us/library/aa365150(v=vs.85).aspx # We can use the `ctypes` module to call this function, e.g.: # # import ctypes # kernel32 = ctypes.windll.kernel32 # pipe_handle = kernel32.CreateNamedPipeW(name, ...) # # Related functions: ConnectNamedPipe, WriteFile, DisconnectNamedPipe, CloseHandle. # # Overall the Windows pipe system seems more complicated, # and I was not yet able to make it work without also changing the client code (which expects to read from a file). # # For now, fall back to a temporary file on Windows. raise NotImplementedError()
def close(self) -> None: self._finalize() # We cannot have a reference to `self` because we must avoid reference cycles here (see weakref.finalize documentation).
def __init__(self, deviceNumber, controlPeriodMs=10, enablePeriodMs=50): MotorSafety.__init__(self) self.pidSource = self.PIDSourceType.kDisplacement self.deviceNumber = deviceNumber # HAL bounds period to be within [1 ms,95 ms] self._handle = _CanTalonSRX(deviceNumber, controlPeriodMs, enablePeriodMs) self.__finalizer = weakref.finalize(self, _freeCANTalon, self._handle) self.controlMode = self.ControlMode.PercentVbus self.minimumInput = 0 self.maximumInput = 0 self.controlEnabled = True self.stopped = False self.isInverted = False self.profile = 0 self.setPoint = 0.0 self.codesPerRev = 0 self.numPotTurns = 0 self.feedbackDevice = self.FeedbackDevice.QuadEncoder self.setProfile(self.profile) self._applyControlMode(self.ControlMode.PercentVbus) LiveWindow.addActuatorChannel("CANTalon", deviceNumber, self) hal.report(hal.UsageReporting.kResourceType_CANTalonSRX, deviceNumber) # Need this to free on unit test wpilib reset Resource._add_global_resource(self)
def test_finalize(self): def add(x,y,z): res.append(x + y + z) return x + y + z a = self.A() res = [] f = weakref.finalize(a, add, 67, 43, z=89) self.assertEqual(f.alive, True) self.assertEqual(f.peek(), (a, add, (67,43), {'z':89})) self.assertEqual(f(), 199) self.assertEqual(f(), None) self.assertEqual(f(), None) self.assertEqual(f.peek(), None) self.assertEqual(f.detach(), None) self.assertEqual(f.alive, False) self.assertEqual(res, [199]) res = [] f = weakref.finalize(a, add, 67, 43, 89) self.assertEqual(f.peek(), (a, add, (67,43,89), {})) self.assertEqual(f.detach(), (a, add, (67,43,89), {})) self.assertEqual(f(), None) self.assertEqual(f(), None) self.assertEqual(f.peek(), None) self.assertEqual(f.detach(), None) self.assertEqual(f.alive, False) self.assertEqual(res, []) res = [] f = weakref.finalize(a, add, x=67, y=43, z=89) del a self._collect_if_necessary() self.assertEqual(f(), None) self.assertEqual(f(), None) self.assertEqual(f.peek(), None) self.assertEqual(f.detach(), None) self.assertEqual(f.alive, False) self.assertEqual(res, [199])
def test_order(self): a = self.A() res = [] f1 = weakref.finalize(a, res.append, 'f1') f2 = weakref.finalize(a, res.append, 'f2') f3 = weakref.finalize(a, res.append, 'f3') f4 = weakref.finalize(a, res.append, 'f4') f5 = weakref.finalize(a, res.append, 'f5') # make sure finalizers can keep themselves alive del f1, f4 self.assertTrue(f2.alive) self.assertTrue(f3.alive) self.assertTrue(f5.alive) self.assertTrue(f5.detach()) self.assertFalse(f5.alive) f5() # nothing because previously unregistered res.append('A') f3() # => res.append('f3') self.assertFalse(f3.alive) res.append('B') f3() # nothing because previously called res.append('C') del a self._collect_if_necessary() # => res.append('f4') # => res.append('f2') # => res.append('f1') self.assertFalse(f2.alive) res.append('D') f2() # nothing because previously called by gc expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D'] self.assertEqual(res, expected)
def __init__(self, suffix="", prefix=template, dir=None): self.name = mkdtemp(suffix, prefix, dir) self._finalizer = _weakref.finalize( self, self._cleanup, self.name, warn_message="Implicitly cleaning up {!r}".format(self))
def connect(self, receiver, **kwargs): obj = receiver receiver = weakref.ref(receiver) weakref.finalize(obj, self.flag) self.receivers.append(receiver) self.clear()