我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zlib.adler32()。
def generate_ip_verify_hash(input_dict): """ ???????????hash ? human_ip_verification ????? hash??14? hash(?7?+salt) = ?7? ??????? :rtype str """ strbuff = human_ip_verification_answers_hash_str for key in input_dict: strbuff += key + input_dict[key] + str(random.randint(0, 9000000)) input_key_hash = hex(zlib.adler32(strbuff.encode(encoding='utf-8')))[2:] while len(input_key_hash) < 7: input_key_hash += '0' output_hash = hex(zlib.adler32((input_key_hash + human_ip_verification_answers_hash_str).encode(encoding='utf-8')))[2:] while len(output_hash) < 7: output_hash += '0' return input_key_hash + output_hash
def verify_ip_hash_cookie(hash_cookie_value): """ ??cookie??hash?????????? ? human_ip_verification ????? hash??14? hash(?7?+salt) = ?7? ??????? :type hash_cookie_value: str :rtype: bool """ try: input_key_hash = hash_cookie_value[:8] output_hash = hash_cookie_value[8:] calculated_hash = hex(zlib.adler32( (input_key_hash + human_ip_verification_answers_hash_str).encode(encoding='utf-8') ))[2:] if output_hash == calculated_hash: return True else: return False except: return False
def options(self, object): def crc(data): value = adler32(data.encode('utf-8', 'replace')) if value < 0: value += 0x100000000 return value ret = [object.user] if object.use_sudo: ret.append('sudo') if object.use_ionice: ret.append('ionice') if object.includes: ret.append('inc=%d:%x' % ( len(object.includes.split(' ')), crc(object.includes))) if object.excludes: ret.append('exc=%d:%x' % ( len(object.excludes.split(' ')), crc(object.excludes))) return ', '.join(ret)
def __init__(self, documents=None, id_range=32000, myhash=zlib.adler32, debug=True): """ By default, keep track of debug statistics and mappings. If you find yourself running out of memory (or are sure you don't need the debug info), set `debug=False`. """ self.myhash = myhash # hash fnc: string->integer self.id_range = id_range # hash range: id = myhash(key) % id_range self.debug = debug # the following (potentially massive!) dictionaries are only formed if `debug` is True self.token2id = {} self.id2token = {} # reverse mapping int->set(words) self.dfs = {} # token_id -> how many documents this token_id appeared in self.dfs_debug = {} # token_string->how many documents this word appeared in self.num_docs = 0 # number of documents processed self.num_pos = 0 # total number of corpus positions self.num_nnz = 0 # total number of non-zeroes in the BOW matrix self.allow_update = True if documents is not None: self.add_documents(documents)
def get_data_hash(self): """ Generates a 'hash' that can be used instead of addr_old as block id, and that should be 'stable' across .blend file load & save (i.e. it does not changes due to pointer addresses variations). """ # TODO This implementation is most likely far from optimal... and CRC32 is not renown as the best hashing # algo either. But for now does the job! import zlib def _is_pointer(self, k): return self.file.structs[self.sdna_index].field_from_path( self.file.header, self.file.handle, k).dna_name.is_pointer hsh = 1 for k, v in self.items_recursive_iter(): if not _is_pointer(self, k): hsh = zlib.adler32(str(v).encode(), hsh) return hsh
def hash(self, key): """Compute portable hash for `key`. :param key: key to hash :return: hash value """ mask = 0xFFFFFFFF disk_key, _ = self.put(key) type_disk_key = type(disk_key) if type_disk_key is sqlite3.Binary: return zlib.adler32(disk_key) & mask elif type_disk_key is TextType: return zlib.adler32(disk_key.encode('utf-8')) & mask # pylint: disable=no-member elif type_disk_key in INT_TYPES: return disk_key % mask else: assert type_disk_key is float return zlib.adler32(struct.pack('!d', disk_key)) & mask
def testBuild(self): d = HashDictionary(self.texts, myhash=zlib.adler32) expected = {5232: 2, 5798: 3, 10608: 2, 12466: 2, 12736: 3, 15001: 2, 18451: 3, 23844: 3, 28591: 2, 29104: 2, 31002: 2, 31049: 2} self.assertEqual(d.dfs, expected) expected = {'minors': 15001, 'graph': 18451, 'system': 5798, 'trees': 23844, 'eps': 31049, 'computer': 10608, 'survey': 28591, 'user': 12736, 'human': 31002, 'time': 29104, 'interface': 12466, 'response': 5232} for ex in expected: self.assertEqual(d.token2id[ex], expected[ex])
def parse(cls, data): if len(data) < (cls.header_size + 4): raise BasebinaryError("not enough data to parse") header_data = data[:cls.header_size] inner_data = data[cls.header_size:-4] #XXX: and here?? stored_checksum, = struct.unpack(">I", data[-4:]) (byte_0x0F, model, version, byte_0x18, byte_0x19, byte_0x1A, flags, unk_0x1C) = cls.parse_header(header_data) if flags & 2: inner_data = cls.decrypt(inner_data, model, byte_0x0F) #XXX: why is Python so shitty about this comparison <.< checksum = cast_u32(zlib.adler32(header_data+inner_data)) logging.debug("stored checksum {0:#x}".format(stored_checksum)) logging.debug("calculated checksum {0:#x}".format(checksum)) logging.debug("data length {0:#x}".format(len(header_data+inner_data))) if stored_checksum != checksum: raise BasebinaryError("bad checksum") return inner_data
def __init__(self, version, flags, unused, command, error_code, key, body=None, body_size=None): self.version = version self.flags = flags self.unused = unused self.command = command self.error_code = error_code # body is not specified, this is a stream header if body == None: # the body size is already specified, don't override it self.body_size = body_size if body_size != None else -1 self.body_checksum = 1 # equivalent to zlib.adler32("") else: # the body size is already specified, don't override it self.body_size = body_size if body_size != None else len(body) self.body_checksum = zlib.adler32(body) self.key = key self.body = body
def _compress(self, fileobj, body): """Compress ctb-file-body and write it to <fileobj>.""" def writestr(s): if PYTHON3: fileobj.write(s.encode()) else: fileobj.write(s) if PYTHON3: body = body.encode() comp_body = zlib.compress(body) adler_chksum = zlib.adler32(comp_body) writestr('PIAFILEVERSION_2.0,CTBVER1,compress\r\npmzlibcodec') fileobj.write(pack('LLL', adler_chksum, len(body), len(comp_body))) fileobj.write(comp_body)
def fix_checksums(self, buff): """ Fix a dex format buffer by setting all checksums :rtype: string """ import zlib import hashlib signature = hashlib.sha1(buff[32:]).digest() buff = buff[:12] + signature + buff[32:] checksum = zlib.adler32(buff[12:]) buff = buff[:8] + pack("=i", checksum) + buff[12:] debug("NEW SIGNATURE %s" % repr(signature)) debug("NEW CHECKSUM %x" % checksum) return buff
def __init__(self, seed): if not seed: seed = "%.1f" % time.time() if hasattr(seed, "encode"): seed = seed.encode('ascii') # A note on hashfunctions. # We don't need cryptographic quality, so we won't use hashlib - # that'd be way to slow. The zlib module contains two hash # functions. Adler32 is fast, but not very uniform for short # strings. Crc32 is slower, but has better bit distribution. # So, we use crc32 whenever the hash is converted into an # exportable number, but we use adler32 when we're producing # intermediate values. self.seed = zlib.adler32(seed) self.text_seed = seed # Default, typically overridden self.size = 1024 + 786j
def adler32(file): """ An Adler-32 checksum is obtained by calculating two 16-bit checksums A and B and concatenating their bits into a 32-bit integer. A is the sum of all bytes in the stream plus one, and B is the sum of the individual values of A from each step. :returns: Hexified string, padded to 8 values. """ # adler starting value is _not_ 0 adler = 1L try: openFile = open(file, 'rb') for line in openFile: adler = zlib.adler32(line, adler) except: raise Exception('FATAL - could not get checksum of file %s' % file) # backflip on 32bit if adler < 0: adler = adler + 2 ** 32 return str('%08x' % adler)
def pack_auth_data(self, auth_data, buf): if len(buf) == 0: return b'' if len(buf) > 400: rnd_len = common.ord(os.urandom(1)[0]) % 512 else: rnd_len = struct.unpack('<H', os.urandom(2))[0] % 1024 data = auth_data data_len = 4 + 16 + 10 + len(buf) + rnd_len + 4 data = data + struct.pack('<H', data_len) + struct.pack('<H', rnd_len) uid = os.urandom(4) encryptor = encrypt.Encryptor(to_bytes(base64.b64encode(uid + self.server_info.key)) + self.salt, 'aes-128-cbc', b'\x00' * 16) data = uid + encryptor.encrypt(data)[16:] data += hmac.new(self.server_info.iv + self.server_info.key, data, hashlib.sha1).digest()[:10] data += os.urandom(rnd_len) + buf data += struct.pack('<I', (zlib.adler32(data) & 0xFFFFFFFF)) return data
def _checkStreamStart(self): if self._startChecked: return # ok, we've not started yet, either read or write. Start back at prefix start # the way "tell()/seek()" is written, we might have gotten moved, so we have to reset self._stream.seek(self._prefixStart) if self._rawAvailable() < self.PREFIX_SIZE: self._dataSize = None else: magic = self._stream.read(4) if magic != self.MAGIC: self._fail("Bad Magic Number at Start") check = zlib.adler32(magic) sizeBytes = self._stream.read(4) size = struct.unpack("!I",sizeBytes)[0] check = zlib.adler32(sizeBytes, check) checkBytes = self._stream.read(4) cmpCheck = struct.unpack("!I", checkBytes)[0] if cmpCheck != check: self._fail("Bad Prefix Checksum") self._dataSize = size self._startChecked = True
def writeFrame(self): if self._endChecked: raise Exception("Cannot write frame data. Already present") self._stream.seek(self._prefixStart+4) check = zlib.adler32(self.MAGIC) sizeBytes = struct.pack("!I",self._dataSize) check = zlib.adler32(sizeBytes, check) self._stream.write(sizeBytes) self._stream.write(struct.pack("!I",check)) self._stream.seek(self._prefixStart + self.PREFIX_SIZE + self._dataSize) check = zlib.adler32(sizeBytes) check = zlib.adler32(self.REV_MAGIC, check) self._stream.write(struct.pack("!I", check)) self._stream.write(sizeBytes) self._stream.write(self.REV_MAGIC) self._endChecked = True
def pack_data(self, buf): data = self.rnd_data(len(buf)) + buf data_len = len(data) + 8 crc = binascii.crc32(struct.pack('>H', data_len)) & 0xFFFF data = struct.pack('<H', crc) + data data = struct.pack('>H', data_len) + data adler32 = zlib.adler32(data) & 0xFFFFFFFF data += struct.pack('<I', adler32) return data
def client_post_decrypt(self, buf): if self.raw_trans: return buf self.recv_buf += buf out_buf = b'' while len(self.recv_buf) > 4: crc = struct.pack('<H', binascii.crc32(self.recv_buf[:2]) & 0xFFFF) if crc != self.recv_buf[2:4]: raise Exception('client_post_decrypt data uncorrect crc') length = struct.unpack('>H', self.recv_buf[:2])[0] if length >= 8192 or length < 7: self.raw_trans = True self.recv_buf = b'' raise Exception('client_post_decrypt data error') if length > len(self.recv_buf): break if struct.pack('<I', zlib.adler32(self.recv_buf[:length - 4]) & 0xFFFFFFFF) != self.recv_buf[length - 4:length]: self.raw_trans = True self.recv_buf = b'' raise Exception('client_post_decrypt data uncorrect checksum') pos = common.ord(self.recv_buf[4]) if pos < 255: pos += 4 else: pos = struct.unpack('>H', self.recv_buf[5:7])[0] + 4 out_buf += self.recv_buf[pos:length - 4] self.recv_buf = self.recv_buf[length:] if out_buf: self.decrypt_packet_num += 1 return out_buf
def encode_adler32(self, *, message : str): '''Compute Adler-32 checksum''' await self.bot.embed_reply(zlib.adler32(message.encode("utf-8")))
def hash(self, token): return zlib.adler32(scrub(token)) % self.id_range
def adler32(data) : return zlib.adler32(data) & 0xFFFFFFFF
def createMsg(self, body, replyflags=0): pflgs=replyflags if _has_compression and Pyro.config.PYRO_COMPRESSION: before=len(body) bz=zlib.compress(body) # default compression level if len(bz)<before: pflgs|=PFLG_COMPRESSED body=bz crc=0 if Pyro.config.PYRO_CHECKSUM and _has_compression: crc=zlib.adler32(body) pflgs|=PFLG_CHECKSUM if Pyro.config.PYRO_XML_PICKLE=='gnosis': pflgs|=PFLG_XMLPICKLE_GNOSIS return struct.pack(self.headerFmt, self.headerID, self.version, self.headerSize, len(body), pflgs, crc) + body
def _decode_key_block(self, key_block_compressed, key_block_info_list): key_list = [] i = 0 for compressed_size, decompressed_size in key_block_info_list: start = i end = i + compressed_size # 4 bytes : compression type key_block_type = key_block_compressed[start:start + 4] # 4 bytes : adler checksum of decompressed key block adler32 = unpack('>I', key_block_compressed[start + 4:start + 8])[0] if key_block_type == b'\x00\x00\x00\x00': key_block = key_block_compressed[start + 8:end] elif key_block_type == b'\x01\x00\x00\x00': if lzo is None: print("LZO compression is not supported") break # decompress key block header = b'\xf0' + pack('>I', decompressed_size) key_block = lzo.decompress(key_block_compressed[start + 8:end], initSize = decompressed_size, blockSize=1308672) elif key_block_type == b'\x02\x00\x00\x00': # decompress key block key_block = zlib.decompress(key_block_compressed[start + 8:end]) # extract one single key block into a key list key_list += self._split_key_block(key_block) # notice that adler32 returns signed value assert(adler32 == zlib.adler32(key_block) & 0xffffffff) i += compressed_size return key_list
def render(self, output='html', **kwargs): if self.cache: cache_entry = 'ponyconf-%d' % adler32('|'.join(map(str, [self.site.domain, output, self.pending] + list(kwargs.values()))).encode('utf-8')) result = cache.get(cache_entry) if not result: result = getattr(self, '_as_%s' % output)(**kwargs) cache.set(cache_entry, result, 3 * 60 * 60) # 3H return result else: return getattr(self, '_as_%s' % output)(**kwargs)
def testDocFreqOneDoc(self): texts = [['human', 'interface', 'computer']] d = HashDictionary(texts, myhash=zlib.adler32) expected = {10608: 1, 12466: 1, 31002: 1} self.assertEqual(d.dfs, expected)
def testDocFreqAndToken2IdForSeveralDocsWithOneWord(self): # two docs texts = [['human'], ['human']] d = HashDictionary(texts, myhash=zlib.adler32) expected = {31002: 2} self.assertEqual(d.dfs, expected) # only one token (human) should exist expected = {'human': 31002} self.assertEqual(d.token2id['human'], expected['human']) self.assertEqual(d.token2id.keys(), expected.keys()) # three docs texts = [['human'], ['human'], ['human']] d = HashDictionary(texts, myhash=zlib.adler32) expected = {31002: 3} self.assertEqual(d.dfs, expected) # only one token (human) should exist expected = {'human': 31002} self.assertEqual(d.token2id['human'], expected['human']) self.assertEqual(d.token2id.keys(), expected.keys()) # four docs texts = [['human'], ['human'], ['human'], ['human']] d = HashDictionary(texts, myhash=zlib.adler32) expected = {31002: 4} self.assertEqual(d.dfs, expected) # only one token (human) should exist expected = {'human': 31002} self.assertEqual(d.token2id['human'], expected['human']) self.assertEqual(d.token2id.keys(), expected.keys())
def testDocFreqForOneDocWithSeveralWord(self): # two words texts = [['human', 'cat']] d = HashDictionary(texts, myhash=zlib.adler32) expected = {9273: 1, 31002: 1} self.assertEqual(d.dfs, expected) # three words texts = [['human', 'cat', 'minors']] d = HashDictionary(texts, myhash=zlib.adler32) expected = {9273: 1, 15001: 1, 31002: 1} self.assertEqual(d.dfs, expected)
def testDebugMode(self): # two words texts = [['human', 'cat']] d = HashDictionary(texts, debug=True, myhash=zlib.adler32) expected = {9273: set(['cat']), 31002: set(['human'])} self.assertEqual(d.id2token, expected) # now the same thing, with debug off texts = [['human', 'cat']] d = HashDictionary(texts, debug=False, myhash=zlib.adler32) expected = {} self.assertEqual(d.id2token, expected)
def testFilter(self): d = HashDictionary(self.texts, myhash=zlib.adler32) d.filter_extremes() expected = {} self.assertEqual(d.dfs, expected) d = HashDictionary(self.texts, myhash=zlib.adler32) d.filter_extremes(no_below=0, no_above=0.3) expected = {29104: 2, 31049: 2, 28591: 2, 5232: 2, 10608: 2, 12466: 2, 15001: 2, 31002: 2} self.assertEqual(d.dfs, expected) d = HashDictionary(self.texts, myhash=zlib.adler32) d.filter_extremes(no_below=3, no_above=1.0, keep_n=4) expected = {5798: 3, 12736: 3, 18451: 3, 23844: 3} self.assertEqual(d.dfs, expected)