我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sys.getsizeof()。
def allocDelayProg(size, duration): """Python program as str that allocates object of the specified size in Gigabytes and then waits for the specified time size - allocation size, bytes; None to skip allocation duration - execution duration, sec; None to skip the sleep return - Python program as str """ return """from __future__ import print_function, division # Required for stderr output, must be the first import import sys import time import array if {size} is not None: a = array.array('b', [0]) asize = sys.getsizeof(a) # Note: allocate at least empty size, i.e. empty list buffer = a * int(max({size} - asize + 1, 0)) #if _DEBUG_TRACE: # print(''.join((' allocDelayProg(), allocated ', str(sys.getsizeof(buffer)) # , ' bytes for ', str({duration}),' sec')), file=sys.stderr) if {duration} is not None: time.sleep({duration}) """.format(size=size, duration=duration)
def set_cache_entry(url, content, metadata): if sys.getsizeof(content) > MAX_PAYLOAD_SIZE_BYTES: logger.info(u"Not caching {} because payload is too large: {}".format( url, sys.getsizeof(content))) return hash_key = _build_hash_key(url) # logger.info(u"*** {} {}".format(url, hash_key)) k = boto.s3.key.Key(requests_cache_bucket) k.key = hash_key k.set_contents_from_string(content) if metadata: k.set_remote_metadata(metadata, {}, True) # remote_key = requests_cache_bucket.get_key(hash_key) # logger.info(u"metadata: {}".format(remote_key.metadata)) return
def _get_size(item, seen): known_types = {dict: lambda d: chain.from_iterable(d.items())} default_size = getsizeof(0) def size_walk(item): if id(item) in seen: return 0 seen.add(id(item)) s = getsizeof(item, default_size) for _type, fun in known_types.iteritems(): if isinstance(item, _type): s += sum(map(size_walk, fun(item))) break return s return size_walk(item)
def memory_dump(): import gc x = 0 for obj in gc.get_objects(): i = id(obj) size = sys.getsizeof(obj, 0) # referrers = [id(o) for o in gc.get_referrers(obj)] try: cls = str(obj.__class__) except: cls = "<no class>" if size > 1024 * 50: referents = set([id(o) for o in gc.get_referents(obj)]) x += 1 print(x, {'id': i, 'class': cls, 'size': size, "ref": len(referents)}) #if len(referents) < 2000: #print(obj)
def __init__(self, read_cache=False): self._cache = None self._backup = None if read_cache: path = os.path.join(options.cfg.cache_path, "coq_cache.db") if os.path.exists(path): try: self._cache = pickle.load(open(path, "rb")) s = "Using query cache (current size: {}, max size: {})." if options.cfg.verbose: s = s.format(self._cache.currsize, self._cache.maxsize) logger.info(s) print(s) except (IOError, ValueError, EOFError): S = "Cannot read query cache, creating a new one (size: {})." S = S.format(options.cfg.query_cache_size) logger.warning(S) if self._cache is None: self._cache = cachetools.LFUCache( maxsize=options.cfg.query_cache_size, getsizeof=sys.getsizeof)
def add(self, key, x): # if enabled, cache data frame if options.cfg.use_cache: size = sys.getsizeof(x) # do not attempt to cache overly large data frames: if size > self._cache.maxsize: S = ("Query result too large for the query cache ({} MBytes" "missing).") S = S.format((size - self._cache.maxsize) // (1024*1024)) logger.warning(S) # remove items from cache if necessary: while self._cache.currsize + size > self._cache.maxsize: self._cache.popitem() # add data frame to cache self._cache[key] = x self._backup = None
def debug_primes(): ''' Generate an infinite stream of primes and log out the size of the sieve as it increases ''' max_sieve_size = 0 sieve_size = 0 sieve = defaultdict(list) k = 2 while True: k_factors = sieve.get(k) if k_factors: del sieve[k] for f in k_factors: sieve[f+k].append(f) sieve_size = getsizeof(sieve) if sieve_size > max_sieve_size: max_sieve_size = sieve_size print('{}\tincrease: {} bytes'.format(k, max_sieve_size)) else: yield k sieve[k ** 2] = [k] k += 1
def test_dump_tx_size(self): # GIVEN tx = Transaction() tx.put_data("TEST") tx.transaction_type = TransactionStatus.confirmed tx_only_data = TransactionDataOnly() tx_only_data.data = "TEST" # WHEN dump_a = pickle.dumps(tx_only_data) dump_b = pickle.dumps(tx) # THEN logging.debug("size of tx_only_data: " + str(sys.getsizeof(dump_a))) logging.debug("size of tx: " + str(sys.getsizeof(dump_b))) self.assertLessEqual(sys.getsizeof(dump_a), sys.getsizeof(dump_b) * 1.5)
def test_errors(self): class BadSizeof(object): def __sizeof__(self): raise ValueError self.assertRaises(ValueError, sys.getsizeof, BadSizeof()) class InvalidSizeof(object): def __sizeof__(self): return None self.assertRaises(TypeError, sys.getsizeof, InvalidSizeof()) sentinel = ["sentinel"] self.assertIs(sys.getsizeof(InvalidSizeof(), sentinel), sentinel) class OverflowSizeof(long): def __sizeof__(self): return int(self) self.assertEqual(sys.getsizeof(OverflowSizeof(sys.maxsize)), sys.maxsize + self.gc_headsize) with self.assertRaises(OverflowError): sys.getsizeof(OverflowSizeof(sys.maxsize + 1)) with self.assertRaises(ValueError): sys.getsizeof(OverflowSizeof(-1)) with self.assertRaises((ValueError, OverflowError)): sys.getsizeof(OverflowSizeof(-sys.maxsize - 1))
def clean_message(self): # Quick check if the message really came encrypted message = self.cleaned_data.get("message") lines = message.split("\r\n") if getsizeof(message) > MAX_MESSAGE_SIZE: self.add_error( "message", _('The message or file exceeds your allowed size limit.') ) begin = "-----BEGIN PGP MESSAGE-----" end = "-----END PGP MESSAGE-----" try: if lines[0] != begin or lines[-1] != end: self.add_error("message", _('Invalid PGP message')) except IndexError: self.add_error("message", _('Invalid PGP message')) return message
def _alchemy_image_request(self, method_name, image_file=None, image_url=None, params=None): if params is None: params = {} params['outputMode'] = 'json' params = _convert_boolean_values(params) headers = {} image_contents = None if image_file: params['imagePostMode'] = 'raw' image_contents = image_file.read() # headers['content-length'] = sys.getsizeof(image_contents) url = '/image/Image' + method_name elif image_url: params['imagePostMode'] = 'not-raw' params['url'] = image_url url = '/url/URL' + method_name else: raise WatsonInvalidArgument( 'image_file or image_url must be specified') return self.request(method='POST', url=url, params=params, data=image_contents, headers=headers, accept_json=True)
def frame_profile(frame_idx, serial_data_path, pickle_path, mol_types, coords, sys_type, assoc_sel_idxs, assoc_type, inx_type): inxs, system, assoc = profile_coords(mol_types, coords, sys_type, assoc_sel_idxs, assoc_type, inx_type) # data output inx_type.pdb_serial_output(inxs[inx_type], serial_data_path, delim=" ") # persistent storage with open(pickle_path, 'wb') as f: pickle.dump(inxs, f) print("--------------------------------------------------------------------------------") print("frame", frame_idx) print("----------------------------------------") print("size of inxs {}".format(sys.getsizeof(inxs))) print("size of system {}".format(sys.getsizeof(system))) print("size of assoc {}".format(sys.getsizeof(assoc))) if len(inxs[inx_type]) > 0: print(len(inxs[inx_type]), "intermolecular hydrogen bonds") for inx in inxs[inx_type]: inx.pp() else: print(0, "intermolecular hydrogen bonds")
def test_memory_usage(self): for o in self.objs: res = o.memory_usage() res_deep = o.memory_usage(deep=True) if (com.is_object_dtype(o) or (isinstance(o, Series) and com.is_object_dtype(o.index))): # if there are objects, only deep will pick them up self.assertTrue(res_deep > res) else: self.assertEqual(res, res_deep) if isinstance(o, Series): self.assertEqual( (o.memory_usage(index=False) + o.index.memory_usage()), o.memory_usage(index=True) ) # sys.getsizeof will call the .memory_usage with # deep=True, and add on some GC overhead diff = res_deep - sys.getsizeof(o) self.assertTrue(abs(diff) < 100)
def save(operations,path): print('Saving texthasher model...') s = pickle.dumps(operations, pickle.HIGHEST_PROTOCOL) z = sys.getsizeof(s) print('texthasher model occupies total size of {0:.2f} GB'.format(z/_1GB)) # In case of huge model, split it into smaller chunks # and save seperately if z>_1GB: i = 0 for c in split_to_chunks(s,_1GB): i += 1 save_chunk(path, i, c) else: # Small model, just save it with typical method with open(path + '.0', 'wb+') as f: pickle.dump(operations, f)
def measure_memory(cls, obj, seen=None): """Recursively finds size of objects""" size = sys.getsizeof(obj) if seen is None: seen = set() obj_id = id(obj) if obj_id in seen: return 0 # Important mark as seen *before* entering recursion to gracefully handle # self-referential objects seen.add(obj_id) if isinstance(obj, dict): size += sum([cls.measure_memory(v, seen) for v in obj.values()]) size += sum([cls.measure_memory(k, seen) for k in obj.keys()]) elif hasattr(obj, '__dict__'): size += cls.measure_memory(obj.__dict__, seen) elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes, bytearray)): size += sum([cls.measure_memory(i, seen) for i in obj]) return size
def getrecsizeof(o, seen=None): if seen is None: seen = set() if id(o) in seen: return 0 seen.add(id(o)) c = sys.getsizeof(o) if isinstance(o, dict): for k, v in o.items(): c += getrecsizeof(k, seen) c += getrecsizeof(v, seen) elif isinstance(o, str) or isinstance(o, bytes): pass elif isinstance(o, collections.Iterable): for e in o: c += getrecsizeof(e, seen) return c
def test_subclasses(self): # Verify that subclasses can share keys (per PEP 412) class A: pass class B(A): pass a, b = A(), B() self.assertEqual(sys.getsizeof(vars(a)), sys.getsizeof(vars(b))) self.assertLess(sys.getsizeof(vars(a)), sys.getsizeof({})) a.x, a.y, a.z, a.w = range(4) self.assertNotEqual(sys.getsizeof(vars(a)), sys.getsizeof(vars(b))) a2 = A() self.assertEqual(sys.getsizeof(vars(a)), sys.getsizeof(vars(a2))) self.assertLess(sys.getsizeof(vars(a)), sys.getsizeof({})) b.u, b.v, b.w, b.t = range(4) self.assertLess(sys.getsizeof(vars(b)), sys.getsizeof({}))
def test_pickler(self): basesize = support.calcobjsize('5P2n3i2n3iP') p = _pickle.Pickler(io.BytesIO()) self.assertEqual(object.__sizeof__(p), basesize) MT_size = struct.calcsize('3nP0n') ME_size = struct.calcsize('Pn0P') check = self.check_sizeof check(p, basesize + MT_size + 8 * ME_size + # Minimal memo table size. sys.getsizeof(b'x'*4096)) # Minimal write buffer size. for i in range(6): p.dump(chr(i)) check(p, basesize + MT_size + 32 * ME_size + # Size of memo table required to # save references to 6 objects. 0) # Write buffer is cleared after every dump().
def get_file_size(file): """ Returns a string with the file size and highest rating. """ byte_size = sys.getsizeof(file) kb_size = byte_size / 1024 if kb_size == 0: byte_size = "%s Bytes" % (byte_size) return byte_size mb_size = kb_size / 1024 if mb_size == 0: kb_size = "%s KB" % (kb_size) return kb_size gb_size = mb_size / 1024 % (mb_size) if gb_size == 0: mb_size = "%s MB" %(mb_size) return mb_size return "%s GB" % (gb_size)
def __call__(self, *args, **kwargs): key = hash(repr((args, kwargs))) if (not key in self.known_keys): #when an ammount of arguments can't be identified from keys value = self.func(*args, **kwargs) #compute function self.known_keys.append(key) #add the key to your list of keys self.known_values.append(value) #add the value to your list of values if self.size is not None: self.size -= sys.getsizeof(value) #the assigned space has decreased if (sys.getsizeof(self.known_values) > self.size): #free cache when size of values goes beyond the size limit del self.known_keys del self.known_values del self.size self.known_keys = [] self.known_values = [] self.size = self.size_copy return value else: #if you've already computed everything i = self.known_keys.index(key) #find your key and return your values return self.known_values[i]
def getsize(top_obj): visited = set() def inner(obj): obj_id = id(obj) if obj_id in visited: return 0 visited.add(obj_id) size = sys.getsizeof(obj) if isinstance(obj, (str, bytes, Number, range, bytearray)): pass elif isinstance(obj, (tuple, list, Set, deque)): size += sum(inner(i) for i in obj) elif isinstance(obj, Mapping) or hasattr(obj, 'items'): size += sum(inner(k) + inner(v) for k, v in obj.items()) if hasattr(obj, '__dict__'): size += inner(vars(obj)) if hasattr(obj, '__slots__'): size += sum( inner(getattr(obj, s)) for s in obj.__slots__ if hasattr(obj, s)) return size return inner(top_obj)
def __sizeof__(self): sizeof = _sys.getsizeof n = len(self) + 1 # number of links including root size = sizeof(self.__dict__) # instance dictionary size += sizeof(self.__map) * 2 # internal dict and inherited dict size += sizeof(self.__hardroot) * n # link objects size += sizeof(self.__root) * n # proxy objects return size
def post(self, url = '/', data = {}, endpoint = "http://0.0.0.0:9000"): #try: data = dict(data) data['token'] = session['token'] logger.info ("Docklet Request: user = %s data = %s, url = %s"%(session['username'], data, url)) reqtype = url.split("/")[1] userreq = { 'login', 'external_login', 'register', 'user', 'beans', 'notification', 'cloud', 'settings' } if ":" not in endpoint: endpoint = "http://"+endpoint+":"+master_port if reqtype in userreq: result = requests.post(user_endpoint + url, data=data).json() else: result = requests.post(endpoint + url, data=data).json() # logger.info('response content: %s'%response.content) # result = response.json() if (result.get('success', None) == "false" and result.get('reason', None) == "Unauthorized Action"): abort(401) if (result.get('Unauthorized', None) == 'True'): session['401'] = 'Token Expired' abort(401) logstr = "Docklet Response: user = %s result = %s, url = %s" % (session['username'], result, url) if (sys.getsizeof(logstr) > 512): logstr = "Docklet Response: user = %s, url = %s"%(session['username'], url) logger.info(logstr) return result #except: #abort(500)
def deep_getsizeof(o, ids): """Find the memory footprint of a Python object This is a recursive function that drills down a Python object graph like a dictionary holding nested dictionaries with lists of lists and tuples and sets. The sys.getsizeof function does a shallow size of only. It counts each object inside a container as pointer only regardless of how big it really is. :param o: the object :param ids: :return: """ d = deep_getsizeof if id(o) in ids: return 0 r = getsizeof(o) ids.add(id(o)) if isinstance(o, str) or isinstance(0, unicode): return r if isinstance(o, Mapping): return r + sum(d(k, ids) + d(v, ids) for k, v in o.iteritems()) if isinstance(o, Container): return r + sum(d(x, ids) for x in o) return r
def test_user_public_key(user_contract, web3): key = encrypt.createKey() binPubKey1 = key.publickey().exportKey('DER') print(sys.getsizeof(binPubKey1)) user_contract.transact().setPubKey(binPubKey1) binPubKey2 = user_contract.call().userPubkey(web3.eth.accounts[0]) print(binPubKey2) pubKeyObj = RSA.importKey(binPubKey2) enc_data = encrypt.encrypt(pubKeyObj, 'Hello there!') print(encrypt.decrypt(enc_data, key))
def _getreferents(unused): return () # sorry, no refs # sys.getsizeof() new in Python 2.6, adjusted below
def test_clean_file_upload_form_invalid_data(self): t = workflows.create_instance.CustomizeAction(self.request, {}) upload_str = b'\x81' files = {'script_upload': self.SimpleFile('script_name', upload_str, sys.getsizeof(upload_str))} self.assertRaises( forms.ValidationError, t.clean_uploaded_files, 'script', files)
def test_clean_file_upload_form_valid_data(self): t = workflows.create_instance.CustomizeAction(self.request, {}) precleaned = 'user data' upload_str = 'user data' files = {'script_upload': self.SimpleFile('script_name', upload_str, sys.getsizeof(upload_str))} cleaned = t.clean_uploaded_files('script', files) self.assertEqual( cleaned, precleaned)
def extract_password(fhandle): fpos = 8568 fhandle.seek(fpos) chunk = fhandle.read(sys.getsizeof(fhandle)) # Decompress chunk result, window = lzs.LZSDecompress(chunk) print_status('Decompressed chunk: {0}'.format(result)) # Extract plaintext password res = re.findall(b'([\040-\176]{5,})', result) return res[0]
def __sizeof__(self): return object.__sizeof__(self) + \ sum(sys.getsizeof(v) for v in self.__dict__.values())
def actionDumpobj(self): import gc import sys self.sendHeader() # No more if not in debug mode if not config.debug: yield "Not in debug mode" raise StopIteration class_filter = self.get.get("class") yield """ <style> * { font-family: monospace; white-space: pre } table * { text-align: right; padding: 0px 10px } </style> """ objs = gc.get_objects() for obj in objs: obj_type = str(type(obj)) if obj_type != "<type 'instance'>" or obj.__class__.__name__ != class_filter: continue yield "%.1fkb %s... " % (float(sys.getsizeof(obj)) / 1024, cgi.escape(str(obj))) for attr in dir(obj): yield "- %s: %s<br>" % (attr, cgi.escape(str(getattr(obj, attr)))) yield "<br>" gc.collect() # Implicit grabage collection
def test_empty_array(self): x = np.array([]) assert_(sys.getsizeof(x) > 0)
def check_array(self, dtype): elem_size = dtype(0).itemsize for length in [10, 50, 100, 500]: x = np.arange(length, dtype=dtype) assert_(sys.getsizeof(x) > length * elem_size)
def test_view(self): d = np.ones(100) assert_(sys.getsizeof(d[...]) < sys.getsizeof(d))
def test_reshape(self): d = np.ones(100) assert_(sys.getsizeof(d) < sys.getsizeof(d.reshape(100, 1, 1).copy()))
def test_resize(self): d = np.ones(100) old = sys.getsizeof(d) d.resize(50) assert_(old > sys.getsizeof(d)) d.resize(150) assert_(old < sys.getsizeof(d))
def buffer_length(arr): if isinstance(arr, unicode): arr = str(arr) return (sys.getsizeof(arr+"a") - sys.getsizeof(arr)) * len(arr) v = memoryview(arr) if v.shape is None: return len(v) * v.itemsize else: return np.prod(v.shape) * v.itemsize
def test_unused_fields_size(self): """ Should return correct size of unused fields values """ self.model_instance_mock.field2 = 'foobar' self.model_instance_mock.field4 = 'spam' * 10 expected_size = sys.getsizeof('foobar') + sys.getsizeof('spam' * 10) self.wrapper.field1 self.wrapper.field3 self.assertEqual(self.wrapper.eraserhead_unused_fields_size, expected_size)
def eraserhead_unused_fields_size(self): total_size = 0 for field in self.eraserhead_unused_fields: total_size += sys.getsizeof(getattr(self.__wrapped__, field)) return total_size
def spf_record_len(addresses): quote_allowance = '" "' * (len(addresses) // 4) return sys.getsizeof('v=spf1 {addresses} {quotes} include:spf1.example.domain.com -all'.format( addresses=' ip4:'.join(addresses), quotes=quote_allowance ))
def resize(self, newsize): new_cache = cachetools.LFUCache( maxsize=newsize, getsizeof=sys.getsizeof) cached = [self._cache.popitem() for x in range(len(self._cache))] for key, val in cached: if sys.getsizeof(val) + new_cache.currsize <= new_cache.maxsize: new_cache[key] = val self._cache = new_cache
def clear(self): self._backup = self._cache self._cache = cachetools.LFUCache( maxsize=self._backup.maxsize, getsizeof=sys.getsizeof)
def size(self): """ Return size in GB """ size = 0 size += self.data.size * sys.getsizeof(self.data) return size / 1024.0 / 1024.0 / 1024.0
def get_image_in_memory_data(): """ Creates the InMemoryUploadedFile object using thedata from io to save it into the ImageField of the database """ io = get_image_data() # get a red rectangle 200x200px # create the InMemoryUploadedFile object with the 'foo.jpg' file image_file = InMemoryUploadedFile(io, None, 'foo.jpg', 'jpeg', sys.getsizeof(io), None) image_file.seek(0) # seek to the beginning return image_file