我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用mmap.ACCESS_READ。
def __init__(self,file_name): try: path = os.path.abspath(file_name) self._handle= open(path, "rb") if mmap is not None: self.data = mmap.mmap(self._handle.fileno(), 0, access=mmap.ACCESS_READ) else: self.data = self._handle.read() self.dict={} self.start =self.int_from_4byte(0) index_last_offset = self.int_from_4byte(4) prefix_start_offset = self.int_from_4byte(8) prefix_end_offset = self.int_from_4byte(12) i=prefix_start_offset while i <= prefix_end_offset: prefix =self.int_from_1byte(i) map_dict={ 'prefix':prefix , 'start_index':self.int_from_4byte(i+1), 'end_index':self.int_from_4byte(i+5) } self.dict[prefix]=map_dict i+=9 except Exception as ex: print "cannot open file %s" % file print ex.message exit(0)
def main(): p = ArgumentParser() p.add_argument('-f', '--file', help='Carve Prefetch files from the given file', required=True) p.add_argument('-o', '--outfile', help='Write results to the given file', required=True) p.add_argument('-c', '--csv', help='Output results in csv format', action='store_true') p.add_argument('-m', '--mactime', help='Output results in mactime format', action='store_true') p.add_argument('-t', '--tln', help='Output results in tln format', action='store_true') p.add_argument('-s', '--system', help='System name (use with -t)') args = p.parse_args() with open(args.file, 'rb') as i: with contextlib.closing(mmap.mmap(i.fileno(), 0 , access=mmap.ACCESS_READ)) as m: with open(args.outfile, 'wb') as o: if args.tln: prefetchCarve(m, o, "tln", system_name=args.system) elif args.csv: o.write(u'last_run_time,prefetch_file_name,run_count\n') prefetchCarve(m, o, output_type="csv") elif args.mactime: prefetchCarve(m, o, output_type="mactime") else: prefetchCarve(m, o)
def mmapper(filename): """A context manager that yields (fd, file_contents) given a file name. This ensures that the mmap and file objects are closed at the end of the 'with' statement.""" fileobj = open(filename, 'rb') fd = fileobj.fileno() if os.fstat(fd).st_size == 0: # mmap can't handle empty files. try: yield fd, '' finally: fileobj.close() else: mmapobj = mmap.mmap(fd, 0, access=mmap.ACCESS_READ) try: yield fd, mmapobj finally: mmapobj.close() fileobj.close() # elf_info objects are only created by `get_elf_info` or the `copy` or # `rename` methods.
def compute_etag_from_file_obj(file_obj, offset=0, size=None, chunk_size=1024 * 1024): etag = hashlib.sha256() size = size or os.fstat(file_obj.fileno()).st_size - offset if size != 0 and offset % mmap.ALLOCATIONGRANULARITY == 0: target = mmap.mmap(file_obj.fileno(), length=size, offset=offset, access=mmap.ACCESS_READ) else: target = file_obj target.seek(offset) while size > 0: data = target.read(chunk_size) etag.update(data[:min(len(data), size)]) size -= len(data) if target is file_obj: file_obj.seek(offset) else: target.close() s = etag.hexdigest() return s
def compute_tree_etag_from_file_obj(file_obj, offset=0, size=None, chunk_size=1024 * 1024): generator = TreeHashGenerator() size = size or os.fstat(file_obj.fileno()).st_size - offset if size != 0 and offset % mmap.ALLOCATIONGRANULARITY == 0: target = mmap.mmap(file_obj.fileno(), length=size, offset=offset, access=mmap.ACCESS_READ) else: target = file_obj target.seek(offset) while size > 0: data = target.read(chunk_size) generator.update(data[:min(len(data), size)]) size -= len(data) if target is file_obj: file_obj.seek(offset) else: target.close() return generator.generate().digest()
def compute_hash_from_file_obj(file_obj, offset=0, size=None, chunk_size=1024 * 1024): etag = hashlib.sha256() generator = TreeHashGenerator() size = size or os.fstat(file_obj.fileno()).st_size - offset if size != 0 and offset % mmap.ALLOCATIONGRANULARITY == 0: target = mmap.mmap(file_obj.fileno(), length=size, offset=offset, access=mmap.ACCESS_READ) else: target = file_obj target.seek(offset) while size > 0: data = target.read(chunk_size) generator.update(data[:min(len(data), size)]) etag.update(data[:min(len(data), size)]) size -= len(data) if target is file_obj: file_obj.seek(offset) else: target.close() return etag.hexdigest(), generator.generate().digest()
def __init__(self, fasta_file): self.faidx = {} self.fasta_file=fasta_file try: self.fasta_fd = open(fasta_file) self.fasta_handle = mmap.mmap(self.fasta_fd.fileno(), 0, access=mmap.ACCESS_READ) except IOError: print("Reference sequence doesn't exist") try: self.faidx_handle=open(fasta_file+".fai") except IOError: print("samtools faidx file doesn't exist for reference") self.load_faidx() # Function to cache fasta index in dictionary # faidx format contains the following columns: ##.the name of the sequence ##.the length of the sequence ##.the offset of the first base in the file ##.the number of bases in each fasta line ##.the number of bytes in each fasta line
def get_list_file(self, projname, callback=None): global list_module_files global lmf_update_ongoing lmf_update_ongoing = True lmf = [] for folder in sublime.active_window().folders(): for root, dirs, files in os.walk(folder): for fn in files: if fn.lower().endswith(('.vhd','.vho','.vhdl')): ffn = os.path.join(root,fn) f = open(ffn) if os.stat(ffn).st_size: s = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) if s.find(b'entity') != -1: lmf.append(ffn) elif s.find(b'component') != -1: lmf.append(ffn) sublime.status_message('List of module files updated') list_module_files[projname] = lmf[:] lmf_update_ongoing = False if callback: callback()
def create(self, access='write'): """ Create a new block of shared memory using the mmap module. """ if access == 'write': mmap_access = mmap.ACCESS_WRITE else: mmap_access = mmap.ACCESS_READ name = "conque_%s_%s" % (self.mem_type, self.mem_key) self.shm = mmap.mmap(0, self.mem_size * self.char_width, name, mmap_access) if not self.shm: return False else: return True
def __init__(self, settings): self.settings = settings self._fd = init_imu() self._map = mmap.mmap(self._fd.fileno(), 0, access=mmap.ACCESS_READ) self._last_data = None self._imu_data = { 'accel': (0.0, 0.0, 0.0), 'accelValid': False, 'compass': (0.0, 0.0, 0.0), 'compassValid': False, 'fusionPose': (0.0, 0.0, 0.0), 'fusionPoseValid': False, 'fusionQPose': (0.0, 0.0, 0.0, 0.0), 'fusionQPoseValid': False, 'gyro': (0.0, 0.0, 0.0), 'gyroValid': False, 'humidity': float('nan'), 'humidityValid': False, 'pressure': float('nan'), 'pressureValid': False, 'temperature': float('nan'), 'temperatureValid': False, 'timestamp': 0, }
def __init__(self): self._fd = init_screen() self._map = mmap.mmap(self._fd.fileno(), 0, access=mmap.ACCESS_READ) # Construct arrays representing the LED states (_screen) and the user # controlled gamma lookup table (_gamma) self._screen = np.frombuffer(self._map, dtype=np.uint16, count=64).reshape((8, 8)) self._gamma = np.frombuffer(self._map, dtype=np.uint8, count=32, offset=128) # Construct the final gamma correction lookup table. This is equivalent # to gamma correction of 1/4 (*much* brighter) because the HAT's RGB # LEDs are much brighter than a corresponding LCD display. It also uses # a non-zero starting point so that LEDs that are off appear gray self._gamma_rgbled = ( np.sqrt(np.sqrt(np.linspace(0.05, 1, 32))) * 255 ).astype(np.uint8) self._touch_stop = Event() self._touch_thread = Thread(target=self._touch_run) self._touch_thread.daemon = True self._touch_thread.start()
def _load_file_contents(f, size=None): try: fd = f.fileno() except (UnsupportedOperation, AttributeError): fd = None # Attempt to use mmap if possible if fd is not None: if size is None: size = os.fstat(fd).st_size if has_mmap: try: contents = mmap.mmap(fd, size, access=mmap.ACCESS_READ) except mmap.error: # Perhaps a socket? pass else: return contents, size contents = f.read() size = len(contents) return contents, size
def init_mmap(self): # Open fd. if self.fd is None: path = os.path.join(self.directory, self.filename) + self.suffix self.fd = os.open(path, os.O_RDONLY | os.O_BINARY if hasattr(os, "O_BINARY") else os.O_RDONLY) # Open mmap. if self.data is None: self.data = mmap.mmap(self.fd, 0, access=mmap.ACCESS_READ) if sys.version_info >= (3, ): self.read_ubyte = self.data.__getitem__ else: def read_ubyte(data_ptr): return ord(self.data[data_ptr]) self.read_ubyte = read_ubyte
def __init__(self, database): """Reader for the MaxMind DB file format Arguments: database -- A path to a valid MaxMind DB file such as a GeoIP2 database file. """ with open(database, 'rb') as db_file: self._buffer = mmap.mmap( db_file.fileno(), 0, access=mmap.ACCESS_READ) metadata_start = self._buffer.rfind(self._METADATA_START_MARKER, self._buffer.size() - 128 * 1024) if metadata_start == -1: raise InvalidDatabaseError('Error opening database file ({0}). ' 'Is this a valid MaxMind DB file?' ''.format(database)) metadata_start += len(self._METADATA_START_MARKER) metadata_decoder = Decoder(self._buffer, metadata_start) (metadata, _) = metadata_decoder.decode(metadata_start) self._metadata = Metadata(**metadata) # pylint: disable=star-args self._decoder = Decoder(self._buffer, self._metadata.search_tree_size + self._DATA_SECTION_SEPARATOR_SIZE)
def map_file(cls, fileobj, offset = 0, size = None): fileobj.seek(offset) total_size = cls._Header.unpack(fileobj.read(cls._Header.size))[0] map_start = offset - offset % mmap.ALLOCATIONGRANULARITY buf = mmap.mmap(fileobj.fileno(), total_size + offset - map_start, access = mmap.ACCESS_READ, offset = map_start) rv = cls(buffer(buf, offset - map_start)) rv._file = fileobj rv._mmap = buf return rv
def map_file(cls, fileobj, offset = 0, size = None): map_start = offset - offset % mmap.ALLOCATIONGRANULARITY fileobj.seek(map_start) buf = mmap.mmap(fileobj.fileno(), 0, access = mmap.ACCESS_READ, offset = map_start) rv = cls(buf, offset - map_start) rv._file = fileobj return rv
def map_file(self, file, start): # Initialize with bytes from file self.input = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ) self.read_cursor = start
def _read_fasta(self): if self.fasta_file.endswith('.gz'): return gzip.open(self.fasta_file, 'rb') else: #return open(self.fasta_file, 'rb') f = open(self.fasta_file, 'rb') mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) return mm
def __init__(self, filename, flags=STANDARD, cache=True): """ Create and return an GeoIP instance. :arg filename: File path to a GeoIP database :arg flags: Flags that affect how the database is processed. Currently supported flags are STANDARD (default), MEMORY_CACHE (preload the whole file into memory) and MMAP_CACHE (access the file via mmap) :arg cache: Used in tests to skip instance caching """ self._lock = Lock() self._flags = flags self._netmask = None if self._flags & const.MMAP_CACHE and mmap is None: # pragma: no cover import warnings warnings.warn("MMAP_CACHE cannot be used without a mmap module") self._flags &= ~const.MMAP_CACHE if self._flags & const.MMAP_CACHE: f = codecs.open(filename, 'rb', ENCODING) access = mmap.ACCESS_READ self._fp = mmap.mmap(f.fileno(), 0, access=access) self._type = 'MMAP_CACHE' f.close() elif self._flags & const.MEMORY_CACHE: f = codecs.open(filename, 'rb', ENCODING) self._memory = f.read() self._fp = util.str2fp(self._memory) self._type = 'MEMORY_CACHE' f.close() else: self._fp = codecs.open(filename, 'rb', ENCODING) self._type = 'STANDARD' try: self._lock.acquire() self._setup_segments() finally: self._lock.release()
def __init__(self, dbfile, use_mmap=True, basepos=0): self._file = dbfile self.is_closed = False # Seek to the end to get total file size (to check if mmap is OK) dbfile.seek(0, os.SEEK_END) filesize = self._file.tell() dbfile.seek(basepos) self._diroffset = self._file.read_long() self._dirlength = self._file.read_int() self._file.seek(self._diroffset) self._dir = self._file.read_pickle() self._options = self._file.read_pickle() self._locks = {} self._source = None use_mmap = ( use_mmap and hasattr(self._file, "fileno") # check file is a real file and filesize < sys.maxsize # check fit on 32-bit Python ) if mmap and use_mmap: # Try to open the entire segment as a memory-mapped object try: fileno = self._file.fileno() self._source = mmap.mmap(fileno, 0, access=mmap.ACCESS_READ) except (mmap.error, OSError): e = sys.exc_info()[1] # If we got an error because there wasn't enough memory to # open the map, ignore it and fall through, we'll just use the # (slower) "sub-file" implementation if e.errno == errno.ENOMEM: pass else: raise else: # If that worked, we can close the file handle we were given self._file.close() self._file = None
def query(word): if not _oo_file_access(): return [-1, []] index_file, data_file = _data_pair() with open(index_file) as index_opened: index = mmap.mmap( index_opened.fileno(), 0, access=mmap.ACCESS_READ ) decorated = encode_utf_8(u''.join([u'\n',word,u'|'])) found_loc = index.find(decorated) if found_loc==-1: index.close() return [1, []] index.seek(found_loc+1) item_index = int(index.readline().strip().split( encode_utf_8('|'))[1]) index.close() query_result = list() with open(data_file) as data: data.seek(item_index) header = decode_utf_8(data.readline()) num_groups = int(header.strip().split(u'|')[1]) for i in range(num_groups): group = decode_utf_8( data.readline()).strip().split(u'|') query_result.append( [group[0][1:-1], [ _strip_descriptor(group_item) for group_item in group[1:]]]) return [0, query_result]
def _upload_data_to_request(self, source): mmapped_data = None try: with open(source, 'rb') as f: mmapped_data = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) except Exception as e: if mmapped_data: mmapped_data.close() raise NetstorageError(e) return mmapped_data
def contains_text(self, text): b_text = bytes(re.escape(text), 'utf-8') for page in glob.iglob(path.join(self.path_pages, '*.txt')): with open(page, 'rb', 0) as f: with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as s: if re.search(br'(?i)%b' % b_text, s): return True return False
def __init__(self, filename, flags=0): """ Initialize the class. @param filename: path to a geoip database. If MEMORY_CACHE is used, the file can be gzipped. @type filename: str @param flags: flags that affect how the database is processed. Currently the only supported flags are STANDARD (the default), MEMORY_CACHE (preload the whole file into memory), and MMAP_CACHE (access the file via mmap). @type flags: int """ self._filename = filename self._flags = flags if self._flags & const.MMAP_CACHE: with open(filename, 'rb') as f: self._filehandle = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) elif self._flags & const.MEMORY_CACHE: if filename.endswith('.gz'): opener = gzip.open else: opener = open with opener(filename, 'rb') as f: self._memoryBuffer = f.read() self._filehandle = StringIO(self._memoryBuffer) else: self._filehandle = codecs.open(filename, 'rb','latin_1') self._setup_segments()
def _initialize_mmap(self, ttdir): mmttf = {} init = True for infile in os.listdir(ttdir): station, phase = infile.split(".")[:2] infile = open(os.path.abspath(os.path.join(ttdir, infile)), 'rb') mmf = mmap.mmap(infile.fileno(), 0, access=mmap.ACCESS_READ) if station not in mmttf: mmttf[station] = {} mmttf[station][phase] = mmf if init: init = False self._initialize_grid(mmf) else: nr, nlat, nlon = struct.unpack("3i", mmf.read(12)) dr, dlat, dlon = struct.unpack("3f", mmf.read(12)) r0, lat0, lon0 = struct.unpack("3f", mmf.read(12)) if not nr == self.nr\ or not nlat == self.nlat\ or not nlon == self.nlon\ or not dr == self.dr\ or not dlat == self.dlat\ or not dlon == self.dlon\ or not r0 == self.r0\ or not lat0 == self.lat0\ or not lon0 == self.lon0: raise ValueError("travel-time headers do not match") self.mmttf = mmttf
def __init__ (self, filename): self.file = open(filename, "rb") self.datamap = mmap.mmap(self.file.fileno(), 0, access=mmap.ACCESS_READ) self.offset = 0 self.filesize = self.datamap.size()
def setUp(self): with open(support.TESTFN, "wb+") as f: f.seek(_4G) f.write(b"asdf") f.flush() self.mapping = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
def __init__(self, fingerprint_file, mac_file): """Function initializes the parser and obtains the open file descriptors Args: fingerprint_file : nmap-os-db file mac_file : nmap-mac-prefixes file """ logger.debug('Initializing Nmap fingerprint parser.') fd_fingerprint_file = open(fingerprint_file) self.fingerprint_file = mmap.mmap(fd_fingerprint_file.fileno(), 0, access=mmap.ACCESS_READ) fd_mac_file = open(mac_file) self.mac_file = mmap.mmap(fd_mac_file.fileno(), 0, access=mmap.ACCESS_READ)
def run_re_file(re_str, fn): size = os.stat(fn).st_size with open(fn, 'r') as tf: data = mmap.mmap(tf.fileno(), size, access=mmap.ACCESS_READ) return re.findall(re_str, data)
def __init__(self, buf = None, pos = 0, filename = None, fp = None): if fp is not None: try: fileno = fp.fileno() length = os.path.getsize(fp.name) import mmap except: # read whole file into memory buf = fp.read() pos = 0 else: # map the whole file into memory if length: # length must not be zero buf = mmap.mmap(fileno, length, access = mmap.ACCESS_READ) pos = os.lseek(fileno, 0, 1) else: buf = '' pos = 0 if filename is None: try: filename = fp.name except AttributeError: filename = None self.buf = buf self.pos = pos self.line = 1 self.col = 1 self.filename = filename
def test_mmap(): for test, expectation in tests: f = tempfile.NamedTemporaryFile() f.write(test) f.flush() test_mmap = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) assert buf_filled_with(test_mmap, test_mmap[0]) == expectation
def __init__(self, source, bytelength=None, byteoffset=None): self.source = source source.seek(0, os.SEEK_END) self.filelength = source.tell() if byteoffset is None: byteoffset = 0 if bytelength is None: bytelength = self.filelength - byteoffset self.byteoffset = byteoffset self.bytelength = bytelength self.filemap = mmap.mmap(source.fileno(), 0, access=mmap.ACCESS_READ)
def __init__(self, filename): """Read the given TrueType file. :Parameters: `filename` The name of any Windows, OS2 or Macintosh Truetype file. The object must be closed (see `close`) after use. An exception will be raised if the file does not exist or cannot be read. """ if not filename: filename = '' len = os.stat(filename).st_size self._fileno = os.open(filename, os.O_RDONLY) if hasattr(mmap, 'MAP_SHARED'): self._data = mmap.mmap(self._fileno, len, mmap.MAP_SHARED, mmap.PROT_READ) else: self._data = mmap.mmap(self._fileno, len, None, mmap.ACCESS_READ) offsets = _read_offset_table(self._data, 0) self._tables = {} for table in _read_table_directory_entry.array(self._data, offsets.size, offsets.num_tables): self._tables[table.tag] = table self._names = None self._horizontal_metrics = None self._character_advances = None self._character_kernings = None self._glyph_kernings = None self._character_map = None self._glyph_map = None self._font_selection_flags = None self.header = \ _read_head_table(self._data, self._tables['head'].offset) self.horizontal_header = \ _read_horizontal_header(self._data, self._tables['hhea'].offset)
def __init__(self, file_name, open_mode='rb', mmap_access=mmap.ACCESS_READ, **kwargs): try: self.file = open(file_name, mode=open_mode, **kwargs) except OSError as e: raise SystemExit(e) self.file_name = file_name self.mmap = None self.mmap_access = mmap_access
def encode_multipart_data(file_dir, file_name, progress=Progress()): boundary = random_string(30) CRLF = '\r\n' file_path = os.path.join(file_dir, file_name) def encode_body_header(file_name): return ('--' + boundary, 'Content-Disposition: form-data; name="Filedata"; ' 'filename="%s"' % file_name, 'Content-Type: "application/x-compressed"', '') def encode_file_data(file_path): with open(file_path, "rb") as f: mmapped_file = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) return mmapped_file.read(mmapped_file.size()) lines = [] lines.extend(encode_body_header(file_name)) body_header = (CRLF.join(lines) + CRLF).encode('utf-8') body_footer = (CRLF + '--%s--' % boundary + CRLF + '').encode('utf-8') file_data = encode_file_data(file_path) body = body_header + file_data + body_footer headers = {'content-type': 'multipart/form-data; boundary=' + boundary, 'content-length': str(len(body))} return body, headers
def findPackage(self, filepath): size = os.stat(filepath).st_size mtime = os.stat(filepath).st_mtime filename = os.path.basename(filepath) fp = open(filepath) filedata = mmap.mmap(fp.fileno(), size, access=mmap.ACCESS_READ) match = re.search('package (.+);', filedata) package = match.group(1).replace('.', '/') fp.close() return package;