我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用mmap.error()。
def _load_file_contents(f, size=None): try: fd = f.fileno() except (UnsupportedOperation, AttributeError): fd = None # Attempt to use mmap if possible if fd is not None: if size is None: size = os.fstat(fd).st_size if has_mmap: try: contents = mmap.mmap(fd, size, access=mmap.ACCESS_READ) except mmap.error: # Perhaps a socket? pass else: return contents, size contents = f.read() size = len(contents) return contents, size
def _walk_ref_chains(self): if not self._resolve_ext_ref: self._ensure_no_pending() return for base_sha, pending in sorted(self._pending_ref.items()): if base_sha not in self._pending_ref: continue try: type_num, chunks = self._resolve_ext_ref(base_sha) except KeyError: # Not an external ref, but may depend on one. Either it will get # popped via a _follow_chain call, or we will raise an error # below. continue self._ext_refs.append(base_sha) self._pending_ref.pop(base_sha) for new_offset in pending: for result in self._follow_chain(new_offset, type_num, chunks): yield result self._ensure_no_pending()
def __init__(self, dbfile, use_mmap=True, basepos=0): self._file = dbfile self.is_closed = False # Seek to the end to get total file size (to check if mmap is OK) dbfile.seek(0, os.SEEK_END) filesize = self._file.tell() dbfile.seek(basepos) self._diroffset = self._file.read_long() self._dirlength = self._file.read_int() self._file.seek(self._diroffset) self._dir = self._file.read_pickle() self._options = self._file.read_pickle() self._locks = {} self._source = None use_mmap = ( use_mmap and hasattr(self._file, "fileno") # check file is a real file and filesize < sys.maxsize # check fit on 32-bit Python ) if mmap and use_mmap: # Try to open the entire segment as a memory-mapped object try: fileno = self._file.fileno() self._source = mmap.mmap(fileno, 0, access=mmap.ACCESS_READ) except (mmap.error, OSError): e = sys.exc_info()[1] # If we got an error because there wasn't enough memory to # open the map, ignore it and fall through, we'll just use the # (slower) "sub-file" implementation if e.errno == errno.ENOMEM: pass else: raise else: # If that worked, we can close the file handle we were given self._file.close() self._file = None
def __init__(self, filename): self.fd = os.open(filename, os.O_RDONLY | os.O_BINARY if hasattr(os, "O_BINARY") else os.O_RDONLY) try: self.mmap = mmap.mmap(self.fd, 0, access=mmap.ACCESS_READ) except (ValueError, mmap.error): # Can not memory map empty opening books. self.mmap = None
def __getitem__(self, key): if self.mmap is None: raise IndexError() if key < 0: key = len(self) + key try: key, raw_move, weight, learn = ENTRY_STRUCT.unpack_from(self.mmap, key * ENTRY_STRUCT.size) except struct.error: raise IndexError() return Entry(key, raw_move, weight, learn)
def read_zlib_chunks(read_some, unpacked, include_comp=False, buffer_size=_ZLIB_BUFSIZE): """Read zlib data from a buffer. This function requires that the buffer have additional data following the compressed data, which is guaranteed to be the case for git pack files. :param read_some: Read function that returns at least one byte, but may return less than the requested size. :param unpacked: An UnpackedObject to write result data to. If its crc32 attr is not None, the CRC32 of the compressed bytes will be computed using this starting CRC32. After this function, will have the following attrs set: * comp_chunks (if include_comp is True) * decomp_chunks * decomp_len * crc32 :param include_comp: If True, include compressed data in the result. :param buffer_size: Size of the read buffer. :return: Leftover unused data from the decompression. :raise zlib.error: if a decompression error occurred. """ if unpacked.decomp_len <= -1: raise ValueError('non-negative zlib data stream size expected') decomp_obj = zlib.decompressobj() comp_chunks = [] decomp_chunks = unpacked.decomp_chunks decomp_len = 0 crc32 = unpacked.crc32 while True: add = read_some(buffer_size) if not add: raise zlib.error('EOF before end of zlib stream') comp_chunks.append(add) decomp = decomp_obj.decompress(add) decomp_len += len(decomp) decomp_chunks.append(decomp) unused = decomp_obj.unused_data if unused: left = len(unused) if crc32 is not None: crc32 = binascii.crc32(add[:-left], crc32) if include_comp: comp_chunks[-1] = add[:-left] break elif crc32 is not None: crc32 = binascii.crc32(add, crc32) if crc32 is not None: crc32 &= 0xffffffff if decomp_len != unpacked.decomp_len: raise zlib.error('decompressed data does not match expected size') unpacked.crc32 = crc32 if include_comp: unpacked.comp_chunks = comp_chunks return unused
def read_objects(self, compute_crc32=False): """Read the objects in this pack file. :param compute_crc32: If True, compute the CRC32 of the compressed data. If False, the returned CRC32 will be None. :return: Iterator over UnpackedObjects with the following members set: offset obj_type_num obj_chunks (for non-delta types) delta_base (for delta types) decomp_chunks decomp_len crc32 (if compute_crc32 is True) :raise ChecksumMismatch: if the checksum of the pack contents does not match the checksum in the pack trailer. :raise zlib.error: if an error occurred during zlib decompression. :raise IOError: if an error occurred writing to the output file. """ pack_version, self._num_objects = read_pack_header(self.read) if pack_version is None: return for i in range(self._num_objects): offset = self.offset unpacked, unused = unpack_object( self.read, read_some=self.recv, compute_crc32=compute_crc32, zlib_bufsize=self._zlib_bufsize) unpacked.offset = offset # prepend any unused data to current read buffer buf = BytesIO() buf.write(unused) buf.write(self._rbuf.read()) buf.seek(0) self._rbuf = buf yield unpacked if self._buf_len() < 20: # If the read buffer is full, then the last read() got the whole # trailer off the wire. If not, it means there is still some of the # trailer to read. We need to read() all 20 bytes; N come from the # read buffer and (20 - N) come from the wire. self.read(20) pack_sha = bytearray(self._trailer) if pack_sha != self.sha.digest(): raise ChecksumMismatch(sha_to_hex(pack_sha), self.sha.hexdigest())
def resolve_object(self, offset, type, obj, get_ref=None): """Resolve an object, possibly resolving deltas when necessary. :return: Tuple with object type and contents. """ # Walk down the delta chain, building a stack of deltas to reach # the requested object. base_offset = offset base_type = type base_obj = obj delta_stack = [] while base_type in DELTA_TYPES: prev_offset = base_offset if get_ref is None: get_ref = self.get_ref if base_type == OFS_DELTA: (delta_offset, delta) = base_obj # TODO: clean up asserts and replace with nicer error messages assert ( isinstance(base_offset, int) or isinstance(base_offset, long)) assert ( isinstance(delta_offset, int) or isinstance(base_offset, long)) base_offset = base_offset - delta_offset base_type, base_obj = self.get_object_at(base_offset) assert isinstance(base_type, int) elif base_type == REF_DELTA: (basename, delta) = base_obj assert isinstance(basename, bytes) and len(basename) == 20 base_offset, base_type, base_obj = get_ref(basename) assert isinstance(base_type, int) delta_stack.append((prev_offset, base_type, delta)) # Now grab the base object (mustn't be a delta) and apply the # deltas all the way up the stack. chunks = base_obj for prev_offset, delta_type, delta in reversed(delta_stack): chunks = apply_delta(chunks, delta) # TODO(dborowitz): This can result in poor performance if # large base objects are separated from deltas in the pack. # We should reorganize so that we apply deltas to all # objects in a chain one after the other to optimize cache # performance. if prev_offset is not None: self._offset_cache[prev_offset] = base_type, chunks return base_type, chunks