我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sys.getrefcount()。
def getrc(defns,depth=1): '''get rc's for things in the space separated string defns''' from sys import getrefcount, _getframe f = _getframe(depth) G0 = f.f_globals L = f.f_locals if L is not G0: LL = [L] while 1: f = f.f_back G = f.f_globals L = f.f_locals if G is not G0 or G is L: break LL.append(L) L = {} for l in reversed(LL): L.update(l) else: L = L.copy() G0 = G0.copy() return ' '.join([str(getrefcount(eval(x,L,G0))-1) for x in defns.split()])
def test__POINTER_c_char(self): class X(Structure): _fields_ = [("str", POINTER(c_char))] x = X() # NULL pointer access self.assertRaises(ValueError, getattr, x.str, "contents") b = c_buffer("Hello, World") from sys import getrefcount as grc self.assertEqual(grc(b), 2) x.str = b self.assertEqual(grc(b), 3) # POINTER(c_char) and Python string is NOT compatible # POINTER(c_char) and c_buffer() is compatible for i in range(len(b)): self.assertEqual(b[i], x.str[i]) self.assertRaises(TypeError, setattr, x, "str", "Hello, World")
def test_1(self): from sys import getrefcount as grc f = dll._testfunc_callback_i_if f.restype = ctypes.c_int f.argtypes = [ctypes.c_int, MyCallback] def callback(value): #print "called back with", value return value self.assertEqual(grc(callback), 2) cb = MyCallback(callback) self.assertGreater(grc(callback), 2) result = f(-10, cb) self.assertEqual(result, -18) cb = None gc.collect() self.assertEqual(grc(callback), 2)
def test_two_recursive_children(self): lst = [] def f(): lst.append(1) greenlet.getcurrent().parent.switch() def g(): lst.append(1) g = greenlet(f) g.switch() lst.append(1) g = greenlet(g) g.switch() self.assertEqual(len(lst), 3) self.assertEqual(sys.getrefcount(g), 2)
def _check_multi_index(self, arr, index): """Check a multi index item getting and simple setting. Parameters ---------- arr : ndarray Array to be indexed, must be a reshaped arange. index : tuple of indexing objects Index being tested. """ # Test item getting try: mimic_get, no_copy = self._get_multi_index(arr, index) except Exception: prev_refcount = sys.getrefcount(arr) assert_raises(Exception, arr.__getitem__, index) assert_raises(Exception, arr.__setitem__, index, 0) assert_equal(prev_refcount, sys.getrefcount(arr)) return self._compare_index_result(arr, index, mimic_get, no_copy)
def _check_single_index(self, arr, index): """Check a single index item getting and simple setting. Parameters ---------- arr : ndarray Array to be indexed, must be an arange. index : indexing object Index being tested. Must be a single index and not a tuple of indexing objects (see also `_check_multi_index`). """ try: mimic_get, no_copy = self._get_multi_index(arr, (index,)) except Exception: prev_refcount = sys.getrefcount(arr) assert_raises(Exception, arr.__getitem__, index) assert_raises(Exception, arr.__setitem__, index, 0) assert_equal(prev_refcount, sys.getrefcount(arr)) return self._compare_index_result(arr, index, mimic_get, no_copy)
def test_dot_3args(self): from numpy.core.multiarray import dot np.random.seed(22) f = np.random.random_sample((1024, 16)) v = np.random.random_sample((16, 32)) r = np.empty((1024, 32)) for i in range(12): dot(f, v, r) assert_equal(sys.getrefcount(r), 2) r2 = dot(f, v, out=None) assert_array_equal(r2, r) assert_(r is dot(f, v, out=r)) v = v[:, 0].copy() # v.shape == (16,) r = r[:, 0].copy() # r.shape == (1024,) r2 = dot(f, v) assert_(r is dot(f, v, r)) assert_array_equal(r2, r)
def _assert_valid_refcount(op): """ Check that ufuncs don't mishandle refcount of object `1`. Used in a few regression tests. """ import numpy as np b = np.arange(100*100).reshape(100, 100) c = b i = 1 rc = sys.getrefcount(i) for j in range(15): d = op(b, c) assert_(sys.getrefcount(i) >= rc) del d # for pyflakes
def test_set_bases_refcount(self): def normally_set_base(): return a_factory(set_b_base=True) A_normal = normally_set_base() gc.collect() normal_ref_count = sys.getrefcount(A_normal.__bases__[0]) def patch_set_base(): A = a_factory() B = b_factory() type_set_bases(A, (B,)) return A A_patch = patch_set_base() gc.collect() patch_ref_count = sys.getrefcount(A_patch.__bases__[0]) self.assertEqual(normal_ref_count, patch_ref_count)
def test_set_bases_mro_refcount(self): def normally_set_base(): return a_factory(set_b_base=True) A_normal = normally_set_base() gc.collect() normal_ref_count = sys.getrefcount(A_normal.__mro__) def patch_set_base(): A = a_factory() B = b_factory() type_set_bases(A, (B,)) return A A_patch = patch_set_base() gc.collect() patch_ref_count = sys.getrefcount(A_patch.__mro__) self.assertEqual(normal_ref_count, patch_ref_count)
def test_element_cyclic_gc_none(self): # test if cyclic reference can crash etree Element = self.etree.Element # must disable tracing as it could change the refcounts trace_func = sys.gettrace() try: sys.settrace(None) gc.collect() count = sys.getrefcount(None) l = [Element('name'), Element('name')] l.append(l) del l gc.collect() self.assertEqual(sys.getrefcount(None), count) finally: sys.settrace(trace_func)
def testRefCount(self): count = sys.getrefcount(None) class Global(JSClass): pass with JSContext(Global()) as ctxt: ctxt.eval(""" var none = null; """) self.assertEqual(count+1, sys.getrefcount(None)) ctxt.eval(""" var none = null; """) self.assertEqual(count+1, sys.getrefcount(None))
def test_bug1229429(self): # this bug was never in reversed, it was in # PyObject_CallMethod, and reversed_new calls that sometimes. if not hasattr(sys, "getrefcount"): return def f(): pass r = f.__reversed__ = object() rc = sys.getrefcount(r) for i in range(10): try: reversed(f) except TypeError: pass else: self.fail("non-callable __reversed__ didn't raise!") self.assertEqual(rc, sys.getrefcount(r))
def xmltoolkit63(): """ Check reference leak. >>> xmltoolkit63() >>> count = sys.getrefcount(None) #doctest: +SKIP >>> for i in range(1000): ... xmltoolkit63() >>> sys.getrefcount(None) - count #doctest: +SKIP 0 """ tree = ET.TreeBuilder() tree.start("tag", {}) tree.data("text") tree.end("tag") # --------------------------------------------------------------------
def check_getitem_with_type(self, tp): item = self.getitem_type b = tp(self._source) oldrefcount = getrefcount(b) m = self._view(b) self.assertEqual(m[0], item(b"a")) self.assertIsInstance(m[0], bytes) self.assertEqual(m[5], item(b"f")) self.assertEqual(m[-1], item(b"f")) self.assertEqual(m[-6], item(b"a")) # Bounds checking self.assertRaises(IndexError, lambda: m[6]) self.assertRaises(IndexError, lambda: m[-7]) self.assertRaises(IndexError, lambda: m[sys.maxsize]) self.assertRaises(IndexError, lambda: m[-sys.maxsize]) # Type checking self.assertRaises(TypeError, lambda: m[None]) self.assertRaises(TypeError, lambda: m[0.0]) self.assertRaises(TypeError, lambda: m["a"]) m = None self.assertEqual(getrefcount(b), oldrefcount)
def test__POINTER_c_char(self): class X(Structure): _fields_ = [("str", POINTER(c_char))] x = X() # NULL pointer access self.assertRaises(ValueError, getattr, x.str, "contents") b = c_buffer(b"Hello, World") from sys import getrefcount as grc self.assertEqual(grc(b), 2) x.str = b self.assertEqual(grc(b), 3) # POINTER(c_char) and Python string is NOT compatible # POINTER(c_char) and c_buffer() is compatible for i in range(len(b)): self.assertEqual(b[i], x.str[i]) self.assertRaises(TypeError, setattr, x, "str", "Hello, World")
def test_callback(self): import sys try: from sys import getrefcount except ImportError: return unittest.skip("no sys.getrefcount()") proto = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_int, ctypes.c_int) def func(a, b): return a * b * 2 f = proto(func) gc.collect() a = sys.getrefcount(ctypes.c_int) f(1, 2) self.assertEqual(sys.getrefcount(ctypes.c_int), a)
def xmltoolkit63(): """ Check reference leak. >>> xmltoolkit63() >>> count = sys.getrefcount(None) >>> for i in range(1000): ... xmltoolkit63() >>> sys.getrefcount(None) - count 0 """ tree = ET.TreeBuilder() tree.start("tag", {}) tree.data("text") tree.end("tag") # --------------------------------------------------------------------
def test_attributes_writable(self): if not self.rw_type: self.skipTest("no writable type to test") m = self.check_attributes_with_type(self.rw_type) self.assertEqual(m.readonly, False) # Disabled: unicode uses the old buffer API in 2.x #def test_getbuffer(self): ## Test PyObject_GetBuffer() on a memoryview object. #for tp in self._types: #b = tp(self._source) #oldrefcount = sys.getrefcount(b) #m = self._view(b) #oldviewrefcount = sys.getrefcount(m) #s = unicode(m, "utf-8") #self._check_contents(tp, b, s.encode("utf-8")) #self.assertEqual(sys.getrefcount(m), oldviewrefcount) #m = None #self.assertEqual(sys.getrefcount(b), oldrefcount)
def _getRefCounts(): """ Return information about all object allocated and the number of references pointing to each object. This can be used to check a running server for memory (object) leaks. Taken from: http://www.nightmare.com/medusa/memory-leaks.html. Returns: """ T = TRACE() d = {} sys.modules # Collect all classes for m in sys.modules.values(): for sym in dir(m): o = getattr (m, sym) if type(o) is types.ClassType: d[o] = sys.getrefcount (o) # sort by refcount pairs = map(lambda x: (x[1],x[0]), d.items()) pairs.sort() pairs.reverse() return pairs
def test_ass_subscript (self): for bpp in (8, 16, 24, 32): sf = pygame.Surface ((6, 8), 0, bpp) sf.fill ((255, 255, 255)) ar = pygame.PixelArray (sf) # Test ellipse working ar[...,...] = (0, 0, 0) self.assertEqual (ar[0,0], 0) self.assertEqual (ar[1,0], 0) self.assertEqual (ar[-1,-1], 0) ar[...,] = (0, 0, 255) self.assertEqual (ar[0,0], sf.map_rgb ((0, 0, 255))) self.assertEqual (ar[1,0], sf.map_rgb ((0, 0, 255))) self.assertEqual (ar[-1,-1], sf.map_rgb ((0, 0, 255))) ar[:,...] = (255, 0, 0) self.assertEqual (ar[0,0], sf.map_rgb ((255, 0, 0))) self.assertEqual (ar[1,0], sf.map_rgb ((255, 0, 0))) self.assertEqual (ar[-1,-1], sf.map_rgb ((255, 0, 0))) ar[...] = (0, 255, 0) self.assertEqual (ar[0,0], sf.map_rgb ((0, 255, 0))) self.assertEqual (ar[1,0], sf.map_rgb ((0, 255, 0))) self.assertEqual (ar[-1,-1], sf.map_rgb ((0, 255, 0))) # Ensure x and y are freed for array[x, y] = p # Bug fix: reference counting if hasattr(sys, 'getrefcount'): class Int(int): """Unique int instances""" pass sf = pygame.Surface ((2, 2), 0, 32) ar = pygame.PixelArray (sf) x, y = Int(0), Int(1) rx_before, ry_before = sys.getrefcount (x), sys.getrefcount (y) ar[x, y] = 0 rx_after, ry_after = sys.getrefcount (x), sys.getrefcount (y) self.assertEqual (rx_after, rx_before) self.assertEqual (ry_after, ry_before)
def test_GetView_array_struct(self): from pygame.bufferproxy import BufferProxy class Exporter(self.ExporterBase): def __init__(self, shape, typechar, itemsize): super(Exporter, self).__init__(shape, typechar, itemsize) self.view = BufferProxy(self.__dict__) def get__array_struct__(self): return self.view.__array_struct__ __array_struct__ = property(get__array_struct__) # Should not cause PgObject_GetBuffer to fail __array_interface__ = property(lambda self: None) _shape = [2, 3, 5, 7, 11] # Some prime numbers for ndim in range(1, len(_shape)): o = Exporter(_shape[0:ndim], 'i', 2) v = BufferProxy(o) self.assertSame(v, o) ndim = 2 shape = _shape[0:ndim] for typechar in ('i', 'u'): for itemsize in (1, 2, 4, 8): o = Exporter(shape, typechar, itemsize) v = BufferProxy(o) self.assertSame(v, o) for itemsize in (4, 8): o = Exporter(shape, 'f', itemsize) v = BufferProxy(o) self.assertSame(v, o) # Check returned cobject/capsule reference count try: from sys import getrefcount except ImportError: # PyPy: no reference counting pass else: o = Exporter(shape, typechar, itemsize) self.assertEqual(getrefcount(o.__array_struct__), 1)
def skip_if_no_getrefcount(f): @wraps(f) def skip_if_no_getrefcount_(self): if not hasattr(sys, 'getrefcount'): return self.skipTest('skipped, no sys.getrefcount()') else: return f(self) return skip_if_no_getrefcount_
def test_mogrify_leak_on_multiple_reference(self): # issue #81: reference leak when a parameter value is referenced # more than once from a dict. cur = self.conn.cursor() foo = (lambda x: x)('foo') * 10 import sys nref1 = sys.getrefcount(foo) cur.mogrify("select %(foo)s, %(foo)s, %(foo)s", {'foo': foo}) nref2 = sys.getrefcount(foo) self.assertEqual(nref1, nref2)
def await_gc(obj, rc): """wait for refcount on an object to drop to an expected value Necessary because of the zero-copy gc thread, which can take some time to receive its DECREF message. """ for i in range(50): # rc + 2 because of the refs in this function if grc(obj) <= rc + 2: return time.sleep(0.05)
def test_above_30(self): """Message above 30 bytes are never copied by 0MQ.""" for i in range(5, 16): # 32, 64,..., 65536 s = (2**i)*x self.assertEqual(grc(s), 2) m = zmq.Frame(s) self.assertEqual(grc(s), 4) del m await_gc(s, 2) self.assertEqual(grc(s), 2) del s
def test_lifecycle1(self): """Run through a ref counting cycle with a copy.""" for i in range(5, 16): # 32, 64,..., 65536 s = (2**i)*x rc = 2 self.assertEqual(grc(s), rc) m = zmq.Frame(s) rc += 2 self.assertEqual(grc(s), rc) m2 = copy.copy(m) rc += 1 self.assertEqual(grc(s), rc) buf = m2.buffer rc += view_rc self.assertEqual(grc(s), rc) self.assertEqual(s, b(str(m))) self.assertEqual(s, bytes(m2)) self.assertEqual(s, m.bytes) # self.assert_(s is str(m)) # self.assert_(s is str(m2)) del m2 rc -= 1 self.assertEqual(grc(s), rc) rc -= view_rc del buf self.assertEqual(grc(s), rc) del m rc -= 2 await_gc(s, rc) self.assertEqual(grc(s), rc) self.assertEqual(rc, 2) del s
def test_lifecycle2(self): """Run through a different ref counting cycle with a copy.""" for i in range(5, 16): # 32, 64,..., 65536 s = (2**i)*x rc = 2 self.assertEqual(grc(s), rc) m = zmq.Frame(s) rc += 2 self.assertEqual(grc(s), rc) m2 = copy.copy(m) rc += 1 self.assertEqual(grc(s), rc) buf = m.buffer rc += view_rc self.assertEqual(grc(s), rc) self.assertEqual(s, b(str(m))) self.assertEqual(s, bytes(m2)) self.assertEqual(s, m2.bytes) self.assertEqual(s, m.bytes) # self.assert_(s is str(m)) # self.assert_(s is str(m2)) del buf self.assertEqual(grc(s), rc) del m # m.buffer is kept until m is del'd rc -= view_rc rc -= 1 self.assertEqual(grc(s), rc) del m2 rc -= 2 await_gc(s, rc) self.assertEqual(grc(s), rc) self.assertEqual(rc, 2) del s
def view_get_refcount(self, perspective): return sys.getrefcount(self)
def test03_leak_assignment(self) : a = "example of private object" refcount = sys.getrefcount(a) self.obj.set_private(a) self.assertEqual(refcount+1, sys.getrefcount(a)) self.obj.set_private(None) self.assertEqual(refcount, sys.getrefcount(a))
def test04_leak_GC(self) : a = "example of private object" refcount = sys.getrefcount(a) self.obj.set_private(a) self.obj = None self.assertEqual(refcount, sys.getrefcount(a))
def test_PyString_FromString(self): pythonapi.PyString_FromString.restype = py_object pythonapi.PyString_FromString.argtypes = (c_char_p,) s = "abc" refcnt = grc(s) pyob = pythonapi.PyString_FromString(s) self.assertEqual(grc(s), refcnt) self.assertEqual(s, pyob) del pyob self.assertEqual(grc(s), refcnt) # This test is unreliable, because it is possible that code in # unittest changes the refcount of the '42' integer. So, it # is disabled by default.
def test_PyObj_FromPtr(self): s = "abc def ghi jkl" ref = grc(s) # id(python-object) is the address pyobj = PyObj_FromPtr(id(s)) self.assertIs(s, pyobj) self.assertEqual(grc(s), ref + 1) del pyobj self.assertEqual(grc(s), ref)
def test_pyobject(self): o = () from sys import getrefcount as grc for o in (), [], object(): initial = grc(o) # This call leaks a reference to 'o'... self.check_type(py_object, o) before = grc(o) # ...but this call doesn't leak any more. Where is the refcount? self.check_type(py_object, o) after = grc(o) self.assertEqual((after, o), (before, o))
def test_X(self): class X(Structure): _fields_ = [("p", POINTER(c_char_p))] x = X() i = c_char_p("abc def") from sys import getrefcount as grc print "2?", grc(i) x.p = pointer(i) print "3?", grc(i) for i in range(320): c_int(99) x.p[0] print x.p[0] ## del x ## print "2?", grc(i) ## del i import gc gc.collect() for i in range(320): c_int(99) x.p[0] print x.p[0] print x.p.contents ## print x._objects x.p[0] = "spam spam" ## print x.p[0] print "+" * 42 print x._objects