我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zlib.crc32()。
def _read_eof(self): # We've read to the end of the file, so we have to rewind in order # to reread the 8 bytes containing the CRC and the file size. # We check the that the computed CRC and size of the # uncompressed data matches the stored values. Note that the size # stored is the true file size mod 2**32. self.fileobj.seek(-8, 1) crc32 = read32(self.fileobj) isize = read32(self.fileobj) # may exceed 2GB if crc32 != self.crc: raise IOError("CRC check failed %s != %s" % (hex(crc32), hex(self.crc))) elif isize != (self.size & 0xffffffffL): raise IOError, "Incorrect length of data produced" # Gzip files can be padded with zeroes and still have archives. # Consume all zero bytes and set the file position to the first # non-zero byte. See http://www.gzip.org/#faq8 c = "\x00" while c == "\x00": c = self.fileobj.read(1) if c: self.fileobj.seek(-1, 1)
def crc32(self): """ A little bit old-fashioned, but some encodings like yEnc require that a crc32 value be used. This calculates it based on the file """ # block size defined as 2**16 block_size = 65536 # The mask to apply to all CRC checking BIN_MASK = 0xffffffffL # Initialize _crc = 0 if self.open(mode=NNTPFileMode.BINARY_RO): for chunk in iter(lambda: self.stream.read(block_size), b''): _crc = crc32(chunk, _crc) return format(_crc & BIN_MASK, '08x') return None
def infer_id(self): representation = self.__repr__(ignore_id=True) # Clean the representation representation = representation\ .replace(':bytes ', ':string ')\ .replace('?bytes ', '?string ')\ .replace('<', ' ').replace('>', '')\ .replace('{', '').replace('}', '') representation = re.sub( r' \w+:flags\.\d+\?true', r'', representation ) return crc32(representation.encode('ascii'))
def write(self,data): self._check_closed() if self.mode != WRITE: import errno raise IOError(errno.EBADF, "write() on read-only GzipFile object") if self.fileobj is None: raise ValueError, "write() on closed GzipFile object" # Convert data type if called by io.BufferedWriter. if isinstance(data, memoryview): data = data.tobytes() if len(data) > 0: self.size = self.size + len(data) self.crc = zlib.crc32(data, self.crc) & 0xffffffffL self.fileobj.write( self.compress.compress(data) ) self.offset += len(data) return len(data)
def asciiDecompress(code): """ decompress result of asciiCompress """ code = base64.decodestring(code) csum = zlib.crc32(code) data = zlib.decompress(code) return data, csum
def test_img_cdn_hard_rewrite(self): """????html?CDN??? https://httpbin.org/""" # ?????, ????CDN self.rv = self.client.get( self.url("/image/jpeg"), environ_base=env(), headers=headers() ) # type: Response # ??flaks???, ???????????, ??????????? self.assertEqual("image/jpeg", self.rv.content_type, msg=self.dump()) self.assertEqual(200, self.rv.status_code, msg=self.dump()) self.assertEqual(0x97ca823f, crc32(self.rv.data), msg=self.dump()) with self.app.test_client() as c: # ???? https://httpbin.org/image/jpeg ???, ??????????????CDN self.rv2 = c.get( self.url("/"), environ_base=env(), headers=headers() ) # type: Response self.assertIn(b"cdn2.zmirror-unittest.com/image/jpeg", self.rv2.data, msg=self.dump())
def load_data(self, record): if isinstance(record,str): # parse the text record record = self.parse_station_record(record) # loads the object from a database object self.ReceiverCode = record['ReceiverCode'] self.ReceiverSerial = record['ReceiverSerial'] self.ReceiverFirmware = record['ReceiverFirmware'] self.AntennaCode = record['AntennaCode'] self.AntennaSerial = record['AntennaSerial'] self.AntennaHeight = float(record['AntennaHeight']) self.AntennaNorth = float(record['AntennaNorth']) self.AntennaEast = float(record['AntennaEast']) self.HeightCode = record['HeightCode'] self.RadomeCode = record['RadomeCode'] self.ReceiverVers = record['ReceiverVers'] # create a hash record using the station information # use only the information that can actually generate a change in the antenna position self.hash = zlib.crc32('%.3f %.3f %.3f %s %s' % (self.AntennaNorth, self.AntennaEast, self.AntennaHeight, self.AntennaCode, self.RadomeCode)) return
def crc32_value(file_path, block_size=1048576): """ Calculate the CRC32 value of the data of the specified file. NOTE: this function's result equals to other software generated. but not equals to the value got by onedrive. So we should use this function in our system. :param str file_path: :param int block_size: :return int: """ crc = 0 with open(file_path, 'rb') as f: while True: data = f.read(block_size) if not data: break crc = zlib.crc32(data, crc) return format(crc & 0xFFFFFFFF, '08x').upper()
def _recv_tcp_full(self): """ Receives a message from the network, internally encoded using the TCP full protocol. May raise InvalidChecksumError if the received data doesn't match its valid checksum. :return: the read message payload. """ packet_len_seq = self.read(8) # 4 and 4 packet_len, seq = struct.unpack('<ii', packet_len_seq) body = self.read(packet_len - 12) checksum = struct.unpack('<I', self.read(4))[0] valid_checksum = crc32(packet_len_seq + body) if checksum != valid_checksum: raise InvalidChecksumError(checksum, valid_checksum) return body
def write(self,data): self._check_closed() if self.mode != WRITE: import errno raise IOError(errno.EBADF, "write() on read-only GzipFile object") if self.fileobj is None: raise ValueError, "write() on closed GzipFile object" # Convert data type if called by io.BufferedWriter. if isinstance(data, memoryview): data = data.tobytes() if len(data) > 0: self.fileobj.write(self.compress.compress(data)) self.size += len(data) self.crc = zlib.crc32(data, self.crc) & 0xffffffffL self.offset += len(data) return len(data)
def compute_campaign_digests(test_campaign): dc = b"" for ts in test_campaign: dts = b"" for t in ts: dt = t.test.strip().encode('ascii') t.crc = crc32(dt) dts += b"\0"+dt ts.crc = crc32(dts) dc += b"\0\x01"+dts test_campaign.crc = crc32(dc) if type(test_campaign.filename) is str and test_campaign.filename != '<stdin>': test = open(test_campaign.filename, 'rb').read() elif test_campaign.filename == '<stdin>': test = sys.stdin.read().encode('ascii') else: raise Exception("Unknown test source %s" % test_campaign.filename) test_campaign.sha = sha1(test) #### FILTER CAMPAIGN #####
def pack_into(self, buff, offset): """Serialize and write to ``buff`` starting at offset ``offset``. Intentionally follows the pattern of ``struct.pack_into`` :param buff: The buffer to write into :param offset: The offset to start the write at """ # NB a length of 0 means an empty string, whereas -1 means null len_key = -1 if self.partition_key is None else len(self.partition_key) len_value = -1 if self.value is None else len(self.value) fmt = '!BBi%dsi%ds' % (max(len_key, 0), max(len_value, 0)) args = (self.MAGIC, self.compression_type, len_key, self.partition_key or b"", len_value, self.value or b"") struct.pack_into(fmt, buff, offset + 4, *args) fmt_size = struct.calcsize(fmt) data = buffer(buff[(offset + 4):(offset + 4 + fmt_size)]) crc = crc32(data) & 0xffffffff struct.pack_into('!I', buff, offset, crc)
def write(self,data): self._check_closed() if self.mode != WRITE: import errno raise IOError(errno.EBADF, "write() on read-only GzipFile object") if self.fileobj is None: raise ValueError("write() on closed GzipFile object") # Convert data type if called by io.BufferedWriter. if isinstance(data, memoryview): data = data.tobytes() if len(data) > 0: self.size = self.size + len(data) self.crc = zlib.crc32(data, self.crc) & 0xffffffff self.fileobj.write( self.compress.compress(data) ) self.offset += len(data) return len(data)
def __init__(self, *args): super().__init__(*args) self.packsize = None # uint32|64 packed size self.origsize = None # uint32|64 original size self.datetime = None # uint32 ctime self.attribs = None # uint32 file attributes self.crc32 = None # uint32 checksum over compressed file self.comptype = None # uint8 compression type self.compqual = None # uint8 compression quality self.params = None # uint16 decompression parameters self.reserved1 = None # uint16 self.filename = None # [uint16] self.comment = b'' # [uint16] optional, compressed self.ntsecurity = b'' # [uint16] optional self.reserved2 = None # ? self.dataoffset = None # position of data after hdr
def __init__(self, idx, filehdrs, f): """ Initialize an :class:`AceMember` object with index within archive *idx*, initial file header *filehdr* and underlying file-like object *f*. """ self._idx = idx self._file = f self._headers = filehdrs self.__attribs = filehdrs[0].attribs self.__comment = filehdrs[0].comment.decode('utf-8', errors='replace') self.__crc32 = filehdrs[-1].crc32 self.__comptype = filehdrs[0].comptype self.__compqual = filehdrs[0].compqual self.__datetime = _dt_fromdos(filehdrs[0].datetime) self.__dicsizebits = (filehdrs[0].params & 15) + 10 self.__dicsize = 1 << self.__dicsizebits self.__raw_filename = filehdrs[0].filename self.__filename = self._sanitize_filename(filehdrs[0].filename) self.__ntsecurity = filehdrs[0].ntsecurity self.__size = filehdrs[0].origsize self.__packsize = 0 for hdr in filehdrs: self.__packsize += hdr.packsize
def to_png(self, data, output): ''' Dump data to the image file. Data is bytes(RGBRGB...RGB). Pure python PNG implementation. http://inaps.org/journal/comment-fonctionne-le-png ''' p__ = pack line = self.width * 3 png_filter = p__('>B', 0) scanlines = b''.join( [png_filter + data[y * line:y * line + line] for y in range(self.height)]) magic = p__('>8B', 137, 80, 78, 71, 13, 10, 26, 10) # Header: size, marker, data, CRC32 ihdr = [b'', b'IHDR', b'', b''] ihdr[2] = p__('>2I5B', self.width, self.height, 8, 2, 0, 0, 0) ihdr[3] = p__('>I', crc32(b''.join(ihdr[1:3])) & 0xffffffff) ihdr[0] = p__('>I', len(ihdr[2])) # Data: size, marker, data, CRC32 idat = [b'', b'IDAT', compress(scanlines), b''] idat[3] = p__('>I', crc32(b''.join(idat[1:3])) & 0xffffffff) idat[0] = p__('>I', len(idat[2])) # Footer: size, marker, None, CRC32 iend = [b'', b'IEND', b'', b''] iend[3] = p__('>I', crc32(iend[1]) & 0xffffffff) iend[0] = p__('>I', len(iend[2])) with open(output, 'wb') as fileh: fileh.write(magic) fileh.write(b''.join(ihdr)) fileh.write(b''.join(idat)) fileh.write(b''.join(iend)) return err = 'Error writing data to "{0}".'.format(output) raise ScreenshotError(err)
def write(self, s): """Write string s to the stream. """ if self.comptype == "gz": self.crc = self.zlib.crc32(s, self.crc) self.pos += len(s) if self.comptype != "tar": s = self.cmp.compress(s) self.__write(s)
def write_chunk(outfile, tag, data=b''): """ Write a PNG chunk to the output file, including length and checksum. """ # http://www.w3.org/TR/PNG/#5Chunk-layout outfile.write(struct.pack("!I", len(data))) outfile.write(tag) outfile.write(data) checksum = zlib.crc32(tag) checksum = zlib.crc32(data, checksum) checksum &= 2**32-1 outfile.write(struct.pack("!I", checksum))
def send(self, packet): """Sends the given packet (bytes array) to the connected peer""" if not self.tcp_client.connected: raise ConnectionError('Client not connected to server.') with BinaryWriter() as writer: writer.write_int(len(packet) + 12) # 12 = size_of (integer) * 3 writer.write_int(self.send_counter) writer.write(packet) crc = crc32(writer.get_bytes()) writer.write_int(crc, signed=False) self.send_counter += 1 self.tcp_client.write(writer.get_bytes())
def receive(self, **kwargs): """Receives a TCP message (tuple(sequence number, body)) from the connected peer. If a named 'timeout' parameter is present, it will override 'self.timeout', and this can be a 'timedelta' or 'None'. """ timeout = kwargs.get('timeout', self.timeout) # First read everything we need packet_length_bytes = self.tcp_client.read(4, timeout) packet_length = int.from_bytes(packet_length_bytes, byteorder='little') seq_bytes = self.tcp_client.read(4, timeout) seq = int.from_bytes(seq_bytes, byteorder='little') body = self.tcp_client.read(packet_length - 12, timeout) checksum = int.from_bytes( self.tcp_client.read(4, timeout), byteorder='little', signed=False) # Then perform the checks rv = packet_length_bytes + seq_bytes + body valid_checksum = crc32(rv) if checksum != valid_checksum: raise InvalidChecksumError(checksum, valid_checksum) # If we passed the tests, we can then return a valid TCP message return seq, body
def _init_write(self, filename): self.name = filename self.crc = zlib.crc32("") & 0xffffffffL self.size = 0 self.writebuf = [] self.bufsize = 0
def _init_read(self): self.crc = zlib.crc32("") & 0xffffffffL self.size = 0
def _add_read_data(self, data): self.crc = zlib.crc32(data, self.crc) & 0xffffffffL offset = self.offset - self.extrastart self.extrabuf = self.extrabuf[offset:] + data self.extrasize = self.extrasize + len(data) self.extrastart = self.offset self.size = self.size + len(data)
def write(self, s): """Write string s to the stream. """ if self.comptype == "gz": self.crc = self.zlib.crc32(s, self.crc) & 0xffffffffL self.pos += len(s) if self.comptype != "tar": s = self.cmp.compress(s) self.__write(s)
def write_chunk(outfile, tag, data=strtobytes('')): """ Write a PNG chunk to the output file, including length and checksum. """ # http://www.w3.org/TR/PNG/#5Chunk-layout outfile.write(struct.pack("!I", len(data))) tag = strtobytes(tag) outfile.write(tag) outfile.write(data) checksum = zlib.crc32(tag) checksum = zlib.crc32(data, checksum) checksum &= 2**32-1 outfile.write(struct.pack("!I", checksum))
def asciiCompress(data, level=9): """ compress data to printable ascii-code """ code = zlib.compress(data,level) csum = zlib.crc32(code) code = base64.encodestring(code) return code, csum
def update(self, block): import zlib self.crc = zlib.crc32(block, self.crc)
def encode_crc32(self, *, message : str): '''Compute CRC32 checksum''' await self.bot.embed_reply(zlib.crc32(message.encode("utf-8")))
def crc32(data, start=0): return zlib.crc32(data, start)
def seqCrc32(seq, start=0): for i in seq: start = crc32(i, start) return start
def update(self): self.size = len(self.data) self.crc = crc32(self.data)
def _hash_py_argv(): return zlib.crc32('\x00'.join(sys.argv[1:]))
def pngCRC32(data): return crc32(data) & 0xFFFFFFFF