我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用lzma.open()。
def read_header(dispout): """ Read header (first 3 words) from disp.dat :param dispout: disp.dat filename :returns: header (num_nodes, num_dims, num_timesteps) """ import struct word_size = 4 # bytes if dispout.endswith('.xz'): import lzma d = lzma.open(dispout, 'rb') else: d = open(dispout, 'rb') num_nodes = struct.unpack('f', d.read(word_size)) num_dims = struct.unpack('f', d.read(word_size)) num_timesteps = struct.unpack('f', d.read(word_size)) header = {'num_nodes': int(num_nodes[0]), 'num_dims': int(num_dims[0]), 'num_timesteps': int(num_timesteps[0])} return header
def extract_dt(dyn_file): """ extract time step (dt) from dyna input deck assumes that input deck is comma-delimited :param dyn_file: input.dyn filename :returns: dt from input.dyn binary data save parameter """ found_database = False with open(dyn_file, 'r') as d: for dyn_line in d: if found_database: line_items = dyn_line.split(',') # make sure we're not dealing with a comment if '$' in line_items[0]: continue else: dt = float(line_items[0]) break elif '*DATABASE_NODOUT' in dyn_line: found_database = True return dt
def save_companies(self): """ Receives path to the dataset file and create a Company object for each row of each file. It creates the related activity when needed. """ skip = ('main_activity', 'secondary_activity') keys = tuple(f.name for f in Company._meta.fields if f not in skip) with lzma.open(self.path, mode='rt', encoding='utf-8') as file_handler: for row in csv.DictReader(file_handler): main, secondary = self.save_activities(row) filtered = {k: v for k, v in row.items() if k in keys} obj = Company.objects.create(**self.serialize(filtered)) for activity in main: obj.main_activity.add(activity) for activity in secondary: obj.secondary_activity.add(activity) obj.save() self.count += 1 self.print_count(Company, count=self.count)
def load_embeddings(self, filename, xz=False): if not os.path.isfile(filename): print(filename, "does not exist") return self if xz: f = lzma.open(filename, "rt", encoding="utf-8", errors="ignore") else: f = open(filename, "r") found_set = set() for line in f: l = line.split() word = strong_normalize(l[0]) vec = [float(x) for x in l[1:]] if word in self._vocab: found_set.add(word) self._word_lookup.init_row(self._vocab[word], vec) f.close() print("Loaded embeddings from", filename) print(len(found_set), "hits with vocab size of", len(self._vocab)) return self
def _token_to_filenames(token): if token[0] == '!': pattern = token[1:] filenames = glob.glob(pattern) if not filenames: raise RuntimeError('No filenames matched "%s" pattern' % pattern) elif token[0] == '@': filelist_name = sys.stdin if token == '@-' else token[1:] with open(filelist_name) as filelist: filenames = [line.rstrip('\n') for line in filelist] directory = os.path.dirname(token[1:]) if directory != '.': filenames = [f if f[0] != '/' else directory + '/' + f for f in filenames] else: filenames = token return filenames
def next_filehandle(self): """Go to the next file and retrun its filehandle or None (meaning no more files).""" filename = self.next_filename() if filename is None: fhandle = None elif filename == '-': fhandle = sys.stdin else: filename_extension = filename.split('.')[-1] if filename_extension == 'gz': myopen = gzip.open elif filename_extension == 'xz': myopen = lzma.open elif filename_extension == 'bz2': myopen = bz2.open else: myopen = open fhandle = myopen(filename, 'rt', encoding=self.encoding) self.filehandle = fhandle return fhandle
def open_regular_or_compressed(filename): if filename is None: return sys.stdin if hasattr(filename, 'read'): fobj = filename else: f = filename.lower() ext = f.rsplit('.', 1)[-1] if ext == 'gz': import gzip fobj = gzip.GzipFile(filename) elif ext == 'bz2': import bz2 fobj = bz2.BZ2File(filename) elif ext == 'xz': import lzma fobj = lzma.open(filename) else: fobj = open(filename) return fobj
def _convert_any_to_vw(source, format, output, weights, preprocessor, columnspec, named_labels, remap_label, ignoreheader): if named_labels is not None: assert not isinstance(named_labels, basestring) named_labels = set(named_labels) rows_source = open_anything(source, format, ignoreheader=ignoreheader) output = open(output, 'wb') for row in rows_source: try: vw_line = convert_row_to_vw(row, columnspec, preprocessor=preprocessor, weights=weights, named_labels=named_labels, remap_label=remap_label) except Exception: log_always('Failed to parse: %r', row) raise output.write(vw_line) flush_and_close(output)
def open_compressed_file(filename, mode): """Open a compressed file, determining the compression type based on the file name. Args: filename: The file to open. mode: The file open mode. Returns: The opened file. """ ext = os.path.splitext(filename) opener = get_file_opener(ext) if not opener: raise ValueError("{} is not a recognized compression format") return opener(filename, mode)
def __init__(self, path, mode='w'): self.outfile = open(path, mode) self.devnull = open(os.devnull, 'w') self.closed = False # Setting close_fds to True in the Popen arguments is necessary due to # <http://bugs.python.org/issue12786>. kwargs = dict(stdin=PIPE, stdout=self.outfile, stderr=self.devnull, close_fds=True) try: self.process = Popen(['pigz'], **kwargs) self.program = 'pigz' except OSError as e: # binary not found, try regular gzip try: self.process = Popen(['gzip'], **kwargs) self.program = 'gzip' except (IOError, OSError) as e: self.outfile.close() self.devnull.close() raise except IOError as e: self.outfile.close() self.devnull.close() raise
def download(self): """ Downloads the latest iOS gadget. :return: """ download_url = self._get_download_url() # stream the download using requests dylib = requests.get(download_url, stream=True) # save the requests stream to file with open(self.ios_dylib_gadget_archive_path, 'wb') as f: click.secho('Downloading iOS dylib to {0}...'.format(self.ios_dylib_gadget_archive_path), fg='green', dim=True) shutil.copyfileobj(dylib.raw, f) return self
def set_application_binary(self, binary: str = None) -> None: """ Sets the binary that will be patched. If a binary is not defined, the applications Info.plist is parsed and the CFBundleIdentifier key read. :param binary: :return: """ if binary is not None: click.secho('Using user provided binary name of: {0}'.format(binary)) self.app_binary = os.path.join(self.app_folder, binary) return with open(os.path.join(self.app_folder, 'Info.plist'), 'rb') as f: info_plist = plistlib.load(f) # print the bundle identifier click.secho('Bundle identifier is: {0}'.format(info_plist['CFBundleIdentifier']), fg='green', bold=True) self.app_binary = os.path.join(self.app_folder, info_plist['CFBundleExecutable'])
def download(self): """ Downloads the latest Android gadget for this architecture. :return: """ download_url = self._get_download_url() # stream the download using requests library = requests.get(download_url, stream=True) library_destination = self.get_frida_library_path(packed=True) # save the requests stream to file with open(library_destination, 'wb') as f: click.secho('Downloading {0} library to {1}...'.format(self.architecture, library_destination), fg='green', dim=True) shutil.copyfileobj(library.raw, f) return self
def open_dispout(dispout): """open dispout file for reading :param dispout: (str) dispout filename (disp.dat) :return: dispout file object """ if dispout.endswith('.xz'): import lzma dispout = lzma.open(dispout, 'rb') else: dispout = open(dispout, 'rb') return dispout
def create_dat(nodout="nodout", dispout="disp.dat", legacynodes=False): """create binary data file :param str nodout: nodout file created by ls-dyna (default="nodout") :param str dispout: default = "disp.dat" :param boolean legacynodes: node IDs written every timestep (default=False) """ header_written = False timestep_read = False timestep_count = 0 writenode = True with open(nodout, 'r') as nodout: with open_dispout(dispout) as dispout: for line in nodout: if 'nodal' in line: timestep_read = True timestep_count += 1 data = [] continue if timestep_read is True: if line[0:2] == '\n': # done reading the time step timestep_read = False # if this was the first time, everything needed to # be read to # get node count for header if not header_written: header = generate_header(data, nodout) write_headers(dispout, header) header_written = True print('Time Step: ', end="", flush=True) if timestep_count > 1 and not legacynodes: writenode = False print("%i, " % timestep_count, end="", flush=True) process_timestep_data(data, dispout, writenode) else: raw_data = parse_line(line) data.append(list(raw_data)) print("done.", flush=True) return 0
def count_timesteps(outfile): """count timesteps written to nodout searches for 'time' in lines, and then removes 1 extra entry that occurs for t = 0 grep will be used on linux systems (way faster) :param outfile: usually 'nodout' :returns: int ts_count """ from sys import platform print("Reading number of time steps... ", end="", flush=True) if platform == "linux": from subprocess import PIPE, Popen p = Popen('grep time %s | wc -l' % outfile, shell=True, stdout=PIPE) ts_count = int(p.communicate()[0].strip().decode()) else: print("Non-linux OS detected -> using slower python implementation", flush=True) ts_count = 0 with open(outfile, 'r') as f: for line in f: if 'time' in line: ts_count += 1 ts_count -= 1 # rm extra time count print('there are {}.'.format(ts_count), flush=True) return ts_count
def receipts(self): """Returns a Generator with batches of receipts text.""" print('Loading receipts text dataset…', end='\r') with lzma.open(self.path, mode='rt') as file_handler: batch = [] for row in csv.DictReader(file_handler): batch.append(self.serialize(row)) if len(batch) >= self.batch_size: yield batch batch = [] yield batch
def suspicions(self): """Returns a Generator with batches of suspicions.""" print('Loading suspicions dataset…', end='\r') with lzma.open(self.path, mode='rt', encoding='utf-8') as file_handler: batch = [] for row in csv.DictReader(file_handler): batch.append(self.serialize(row)) if len(batch) >= self.batch_size: yield batch batch = [] yield batch
def reimbursements(self): """Returns a Generator with a dict object for each row.""" with lzma.open(self.path, 'rt') as file_handler: yield from DictReader(file_handler)
def verify_contents(thefile, tgt_hostname=None, callback=None): """ Given a sysstat binary data file verify that it contains a set of well formed data values. The optional 'tgt_hostname' argument is checked against the file header's stored hostname value. The optional 'callback' argument, if provided, should be an instance of the ContentAction class, where for each magic structure, file header, file activity set, record header and record payload read the appropriate method will be invoked, with the 'eof' method invoked at the end. One of the following exceptions will be raised if a problem is found with the file: Invalid: The file header or record header metadata values do not make sense in relation to each other Corruption: The file appears to be corrupted in some way Truncated: The file does not appear to contain all the data as described by the file header or a given record header """ try: with lzma.open(thefile, "rb") as fp: verify_contents_fp(fp, tgt_hostname, callback) except lzma.LZMAError: with open(thefile, "rb") as fp: verify_contents_fp(fp, tgt_hostname, callback)
def fetch_fileheader(thefile): """ Fetch the sysstat FileHeader object for the given file path. """ try: with lzma.open(thefile, "rb") as fp: res = fetch_fileheader_with_fp(fp) except lzma.LZMAError: with open(thefile, "rb") as fp: res = fetch_fileheader_with_fp(fp) return res
def load_vocab(self, filename): with open(filename, "rb") as f: vocab = pickle.load(f) self._load_vocab(vocab) return self
def save_vocab(self, filename): with open(filename, "wb") as f: pickle.dump(self._fullvocab, f) return self
def save_model(self, filename): self.save_vocab(filename + ".vocab") with open(filename + ".params", "wb") as f: pickle.dump(self._args, f) self._model.save(filename + ".model") return self
def load_model(self, filename, **kwargs): self.load_vocab(filename + ".vocab") with open(filename + ".params", "rb") as f: args = pickle.load(f) args.update(kwargs) self.create_parser(**args) self.init_model() self._model.load(filename + ".model") return self
def write_file(filename, data): if isinstance(data, list): data = ''.join(data) else: assert isinstance(data, str), type(data) if filename in STDOUT_NAMES: sys.stdout.write(data) else: fobj = open(filename, 'w') fobj.write(data) flush_and_close(fobj)
def get_num_features(filename): counting = False count = 0 for line in open(filename): if counting: count += 1 else: if line.strip() == ':0': counting = True return count
def _load_erdm_ground_truth(outdir): """A helper function to load Legal TREC 2009 data""" with open(os.path.join(outdir, 'seed_relevant.txt'), 'rt') as fh: relevant_files = [el.strip() for el in fh.readlines()] with open(os.path.join(outdir, 'seed_non_relevant.txt'), 'rt') as fh: non_relevant_files = [el.strip() for el in fh.readlines()] if platform.system() == 'Windows': relevant_files = [el.replace('/', '\\') for el in relevant_files] non_relevant_files = [el.replace('/', '\\') for el in non_relevant_files] return non_relevant_files, relevant_files
def __init__(self, path, mode='w'): self.name = path self.outfile = open(path, mode) self.devnull = open(os.devnull, 'w') self.closed = False try: # Setting close_fds to True is necessary due to # http://bugs.python.org/issue12786 self.process = Popen( [get_program_path('gzip')], stdin=PIPE, stdout=self.outfile, stderr=self.devnull, close_fds=True) except IOError: self.outfile.close() self.devnull.close() raise
def open_gzip_file(filename, mode, use_system=True): """Open a gzip file, preferring the system gzip program if `use_system` is True, falling back to the gzip python library. Args: mode: The file open mode. use_system: Whether to try to use the system gzip program. """ if use_system: try: if 'r' in mode: gzfile = GzipReader(filename) else: gzfile = GzipWriter(filename) if 't' in mode: gzfile = io.TextIOWrapper(gzfile) return gzfile except: pass gzfile = gzip.open(filename, mode) if 'b' in mode: if 'r' in mode: gzfile = io.BufferedReader(gzfile) else: gzfile = io.BufferedWriter(gzfile) return gzfile
def open_lzma_file(filename, mode, **kwargs): """Open a LZMA (xz) file. """ return lzma.open(filename, mode)
def load(filename): with lzma.open(filename, 'rb') as dataset: while True: try: yield pickle.load(dataset) except EOFError: break
def main(): parser = argparse.ArgumentParser( description='dataset generator' ) parser.add_argument( '-p', '--possibility', type=float, default=0.9, help='possibility to add train dataset' ) parser.add_argument( 'source', help='path to mecab-processed corpus (xz compressed)' ) parser.add_argument( 'train', help='path for writing training dataset (xz compressed)' ) parser.add_argument( 'test', help='path for writing testing dataset (xz compressed)' ) args = parser.parse_args() with lzma.open(args.source, 'rt') as source,\ lzma.open(args.train, 'wb') as train,\ lzma.open(args.test, 'wb') as test: separate(source, args.possibility, train, test)
def test_translate_csv_with_reimbursement_with_net_value_with_decimal_comma(self): csv_with_decimal_comma = os.path.join(self.fixtures_path, 'Ano-with-decimal-comma.csv') path_with_decimal_point = os.path.join(self.fixtures_path, 'reimbursements-with-decimal-point.csv') with open(path_with_decimal_point, 'r') as csv_expected: expected = csv_expected.read() xz_path = Dataset('')._translate_file(csv_with_decimal_comma) with lzma.open(xz_path) as xz_file: output = xz_file.read().decode('utf-8') self.assertEqual(output, expected)
def glove_(): vecs = np.memmap("glovesmall.arr", np.float32).reshape((-1, 300)) words = open("glovewords.txt").read().splitlines() return dict(zip(words, vecs))
def germanw2v_(): vecs = np.memmap("german.vecbin", np.float32).reshape((-1, 300)) words = open("german.words").read().splitlines() return dict(zip(words, vecs))
def get_book(name, language): book = lzma.open('../data/{}-common.vpl.xz'.format(name), 'rt').read().splitlines() book = [ [ language.get(w, veczero) for w in words(l)] + ([veczero] * (n_steps - len(words(l)))) for l in book ] lens = np.array([len(l) for l in book], dtype=np.int32) for verse in book: assert len(verse) <= n_steps, "n_steps should be at least {}".format(len(verse)) return (book, lens)
def get_book(name, language): book = lzma.open('../data/{}-common.vpl.xz'.format(name), 'rt').read().splitlines() book = [ [ language(w, 0) for w in l] + ([veczero] * (n_steps - len(l))) for l in book ] for verse in book: assert len(verse) <= n_steps, "n_steps should be at least {}".format(len(verse)) return book
def get_book(name, language): book = lzma.open('../data/{}-common.vpl.xz'.format(name), 'rt').read().splitlines() book = [ [ language(w) for w in l] + ([language(' ')] * (n_steps - len(l))) for l in book ] lens = np.array([len(l) for l in book], dtype=np.int32) for verse in book: assert len(verse) <= n_steps, "n_steps should be at least {}".format(len(verse)) return (book, lens)
def benchmark_screed(fn): import screed total_seq = int(0) t0 = time.time() it = screed.open(fn) for i, e in enumerate(it): total_seq += len(e.sequence) if i % REFRESH_RATE == 0: t1 = time.time() print('\r%.2fMB/s' % (total_seq/(1E6)/(t1-t0)), end='', flush=True) print() print('%i entries' % (i+1))
def _opener(filename): if filename.endswith('.gz'): import gzip return gzip.open elif filename.endswith('.bz2'): import bz2 return bz2.open elif filename.endswith('.lzma'): import lzma return lzma.open else: return open
def _screed_iter(fn): import screed it = screed.open(fn) for i, e in enumerate(it): yield (i, e.name.encode('ascii'), str(e.sequence).encode('ascii'))
def _ngs_plumbing_iter(fn, mode, buffering): import ngs_plumbing.fastq openfunc = _opener(fn) with open(fn, mode, buffering = buffering) as f: with openfunc(f) as fh: it = ngs_plumbing.fastq.read_fastq(fh) for i, e in enumerate(it): yield (i, e.header[1:], e.sequence)
def _fastqandfurious_iter(fn, mode, buffering): from fastqandfurious import fastqandfurious bufsize = int(5E4) openfunc = _opener(fn) with open(fn, mode, buffering = buffering) as f: with openfunc(f) as fh: it = fastqandfurious.readfastq_iter(fh, bufsize) for i, e in enumerate(it): yield (i, e.header, e.sequence)
def hashFile(file): block = 64 * 1024 hash = hashlib.sha256() with open(file, 'rb') as f: buf = f.read(block) while len(buf) > 0: hash.update(buf) buf = f.read(block) return hash.hexdigest()
def unpack(self): """ Unpacks a downloaded .xz gadget. :return: """ click.secho('Unpacking {0}...'.format(self.ios_dylib_gadget_archive_path), dim=True) with lzma.open(self.ios_dylib_gadget_archive_path) as f: with open(self.ios_dylib_gadget_path, 'wb') as g: g.write(f.read()) return self
def unpack(self): """ Unpacks a downloaded .xz gadget. :return: """ click.secho('Unpacking {0}...'.format(self.get_frida_library_path(packed=True)), dim=True) with lzma.open(self.get_frida_library_path(packed=True)) as f: with open(self.get_frida_library_path(), 'wb') as g: g.write(f.read()) return self
def get_temp_file(suffix="", name=None, delete=False): """Creates a temporary file under /tmp.""" if name: name = os.path.join("/tmp", name) t = open(name, "w") cleanup.register_tmp_file(name) else: _suffix = "_nmtpy_%d" % os.getpid() if suffix != "": _suffix += suffix t = tempfile.NamedTemporaryFile(suffix=_suffix, delete=delete) cleanup.register_tmp_file(t.name) return t
def fopen(filename, mode=None): """GZ/BZ2/XZ-aware file opening function.""" # NOTE: Mode is not used but kept for not breaking iterators. if filename.endswith('.gz'): return gzip.open(filename, 'rt') elif filename.endswith('.bz2'): return bz2.open(filename, 'rt') elif filename.endswith(('.xz', '.lzma')): return lzma.open(filename, 'rt') else: # Plain text return open(filename, 'r')
def split_file(source, nfolds=None, ignoreheader=False, importance=0, minfoldsize=10000): if nfolds is None: nfolds = 10 if isinstance(source, basestring): ext = get_real_ext(source) else: ext = 'xxx' if hasattr(source, 'seek'): source.seek(0) # XXX already have examples_count total_lines = 0 for line in open_regular_or_compressed(source): total_lines += 1 if hasattr(source, 'seek'): source.seek(0) source = open_regular_or_compressed(source) if ignoreheader: source.next() total_lines -= 1 foldsize = int(math.ceil(total_lines / float(nfolds))) foldsize = max(foldsize, minfoldsize) nfolds = int(math.ceil(total_lines / float(foldsize))) folds = [] current_fold = -1 count = foldsize current_fileobj = None total_count = 0 for line in source: if count >= foldsize: if current_fileobj is not None: flush_and_close(current_fileobj) current_fileobj = None current_fold += 1 if current_fold >= nfolds: break fname = get_temp_filename('fold%s.%s' % (current_fold, ext)) current_fileobj = open(fname, 'w') count = 0 folds.append(fname) current_fileobj.write(line) count += 1 total_count += 1 if current_fileobj is not None: flush_and_close(current_fileobj) if total_count != total_lines: sys.exit('internal error: total_count=%r total_lines=%r source=%r' % (total_count, total_lines, source)) return folds, total_lines