我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用shutil.unpack_archive()。
def wishbone_my_setup(setup_dir): # install GSEA, diffusion components, and download data. tools_dir = setup_dir + '/tools' if not os.path.exists(tools_dir + '/DiffusionGeometry/'): shutil.unpack_archive(tools_dir + '/DiffusionGeometry.zip', tools_dir + '/DiffusionGeometry/') if not os.path.exists(tools_dir + '/mouse/'): shutil.unpack_archive(tools_dir + '/mouse_gene_sets.tar.gz', tools_dir) if not os.path.exists(tools_dir + '/human/'): shutil.unpack_archive(tools_dir + '/human_gene_sets.tar.gz', tools_dir) if not os.path.exists( setup_dir +'/data/GSE72857_umitab.txt.gz'): # downloads mouse UMI from GSE72857 Transcriptional heterogeneity and lineage commitment in myeloid progenitors [single cell RNA-seq] os.system("wget -m -nH -nd -P "+ setup_dir + "/data/ ftp://ftp.ncbi.nlm.nih.gov/geo/series/GSE72nnn/GSE72857/suppl/GSE72857%5Fumitab%2Etxt%2Egz") x=pd.read_csv(setup_dir + '/data/GSE72857_umitab.txt.gz', sep = '\t', compression="gzip") y=pd.read_csv(setup_dir + '/data/sample_scseq_data.csv', index_col=[0]) scdata_raw= x.T.loc[y.index] scdata_raw = wishbone.wb.SCData(scdata_raw.astype('float'), data_type='sc-seq') return(scdata_raw)
def _restore_from_backup(self, version): logger.debug('Restoring from backup for {}'.format(version)) for file_path in self.files_to_preserve: try: shutil.copy2(os.path.join(self.backup_target, file_path), os.path.join(self.tmp_dir, file_path)) except IOError as e: logger.warning('Copying {} failed due to {}' .format(file_path, e)) shutil.unpack_archive(self._backup_name_ext( version), self.backup_target, self.backup_format) for file_path in self.files_to_preserve: try: shutil.copy2(os.path.join(self.tmp_dir, file_path), os.path.join(self.backup_target, file_path)) except IOError as e: logger.warning('Copying {} failed due to {}' .format(file_path, e)) shutil.rmtree(self.tmp_dir, ignore_errors=True)
def execute(self, context): name = os.path.basename(self.filepath) #TODO: put on another file and module def checkProjectFile(path): if os.path.basename(path) != "project.json": return False if not os.path.isfile(path): return False utils.loadProjectFile(path) if not 'bge-project' in utils.project_data: return False return True def getMainBlend(path): path = os.path.dirname(path) + os.sep + "project" + os.sep + "main.blend" if os.path.isfile(path): return path #If we have a zip if os.path.isfile(self.filepath): for x in [".zip", ".tar", ".gztar", ".bztar", ".xztar"]: if self.filepath.endswith(x): tpath = tempfile.gettempdir() + os.sep + os.path.basename(self.filepath)[:-len(x)] + os.sep shutil.unpack_archive(self.filepath, tpath) self.filepath = tpath print("Extracted to: ", tpath) #Once we have a folder... list=[self.filepath, self.filepath + "project.json", os.path.dirname(self.filepath) + os.sep + "project.json", os.path.dirname(self.filepath) + os.sep + "../" + "project.json"] endpath=None for path in list: if checkProjectFile(path): endpath=getMainBlend(path) if endpath==None: self.report({'ERROR_INVALID_INPUT'}, "Error loading project, project file not found.") return {'CANCELLED'} bpy.ops.wm.open_mainfile(filepath=endpath) return {'FINISHED'}
def untar(path, fname, deleteTar=True): """Unpacks the given archive file to the same directory, then (by default) deletes the archive file. """ print('unpacking ' + fname) fullpath = os.path.join(path, fname) shutil.unpack_archive(fullpath, path) if deleteTar: os.remove(fullpath)
def extract_neo4j(database_name, archive_path): database_directory = os.path.join(settings.POLYGLOT_DATA_DIRECTORY, database_name) neo4j_directory = os.path.join(database_directory, 'neo4j') if os.path.exists(neo4j_directory): return False shutil.unpack_archive(archive_path, database_directory) for d in os.listdir(database_directory): if d.startswith('neo4j'): os.rename(os.path.join(database_directory, d), neo4j_directory) return True
def extract_influxdb(database_name, archive_path): database_directory = os.path.join(settings.POLYGLOT_DATA_DIRECTORY, database_name) influxdb_directory = os.path.join(database_directory, 'influxdb') if os.path.exists(influxdb_directory): return False shutil.unpack_archive(archive_path, database_directory) for d in os.listdir(database_directory): if d.startswith('influxdb'): os.rename(os.path.join(database_directory, d), influxdb_directory) return True
def zipfile_unzip(file_path=None, extract_dir=None, **filedialog_kwargs): if file_path is None: file_path = QtGui.QFileDialog.getOpenFileName(**filedialog_kwargs)[0] if extract_dir is None: extract_dir = os.path.dirname(file_path) shutil.unpack_archive(file_path, extract_dir=extract_dir)
def unpack_archive(file, extract_dir, format_, msg): try: shutil.unpack_archive(file, extract_dir=extract_dir, format=format_) except (ValueError, OSError) as err: logging.debug(traceback.format_exc()) logging.critical(err) logging.critical(msg) raise SystemExit
def _get_url_unpacked_path_or_null(url): parsed = urllib.parse.urlparse(url) if parsed.scheme == "file" and parsed.path.endswith(".whl"): return Path("/dev/null") try: cache_dir, packed_path = _get_url_impl(url) except CalledProcessError: return Path("/dev/null") if packed_path.is_file(): # pip:// shutil.unpack_archive(str(packed_path), cache_dir.name) unpacked_path, = ( path for path in Path(cache_dir.name).iterdir() if path.is_dir()) return unpacked_path
def untar(fname): print('unpacking ' + fname) fullpath = os.path.join(fname) shutil.unpack_archive(fullpath) os.remove(fullpath)
def untar(path, fname): print('unpacking ' + fname) fullpath = os.path.join(path, fname) shutil.unpack_archive(fullpath, path) os.remove(fullpath)
def maybe_download_and_extract(data_root: str, url: str) -> None: """ Maybe download the specified file to ``data_root`` and try to unpack it with ``shutil.unpack_archive``. :param data_root: data root to download the files to :param url: url to download from """ # make sure data_root exists os.makedirs(data_root, exist_ok=True) filename = os.path.basename(url) # check whether the archive already exists filepath = os.path.join(data_root, filename) if os.path.exists(filepath): logging.info('\t`%s` already exists; skipping', filepath) return # download with progressbar logging.info('\tdownloading %s', filepath) req = requests.get(url, stream=True) expected_size = int(req.headers.get('content-length')) chunk_size = 1024 with open(filepath, 'wb') as f_out,\ click.progressbar(req.iter_content(chunk_size=chunk_size), length=expected_size/chunk_size) as bar: for chunk in bar: if chunk: f_out.write(chunk) f_out.flush() # extract try: shutil.unpack_archive(filepath, data_root) except (shutil.ReadError, ValueError): logging.info('File `%s` could not be extracted by `shutil.unpack_archive`. Please process it manually.', filepath)
def execute(self, context): import html.parser import urllib.request remote_platforms = [] ps = context.scene.ge_publish_settings # create lib folder if not already available lib_path = bpy.path.abspath(ps.lib_path) if not os.path.exists(lib_path): os.makedirs(lib_path) print("Retrieving list of platforms from blender.org...", end=" ", flush=True) class AnchorParser(html.parser.HTMLParser): def handle_starttag(self, tag, attrs): if tag == 'a': for key, value in attrs: if key == 'href' and value.startswith('blender'): remote_platforms.append(value) url = 'http://download.blender.org/release/Blender' + bpy.app.version_string.split()[0] parser = AnchorParser() data = urllib.request.urlopen(url).read() parser.feed(str(data)) print("done", flush=True) print("Downloading files (this will take a while depending on your internet connection speed).", flush=True) for i in remote_platforms: src = '/'.join((url, i)) dst = os.path.join(lib_path, i) dst_dir = '.'.join([i for i in dst.split('.') if i not in {'zip', 'tar', 'bz2'}]) if not os.path.exists(dst) and not os.path.exists(dst.split('.')[0]): print("Downloading " + src + "...", end=" ", flush=True) urllib.request.urlretrieve(src, dst) print("done", flush=True) else: print("Reusing existing file: " + dst, flush=True) print("Unpacking " + dst + "...", end=" ", flush=True) if os.path.exists(dst_dir): shutil.rmtree(dst_dir) shutil.unpack_archive(dst, dst_dir) print("done", flush=True) print("Creating platform from libs...", flush=True) bpy.ops.scene.publish_auto_platforms() return {'FINISHED'}
def fetch(source, destination, allow_file=False): """ Fetch a group of files either from a file archive or version control repository. Supported URL schemes: - file - ftp - ftps - git - git+http - git+https - git+ssh - http - https :param str source: The source URL to retrieve. :param str destination: The directory into which the files should be placed. :param bool allow_file: Whether or not to permit the file:// URL for processing local resources. :return: The destination directory that was used. :rtype: str """ source = source.strip() if os.path.exists(destination): raise ValueError('destination must not be an existing directory') parsed_url = urllib.parse.urlparse(source, scheme='file') if parsed_url.username is not None: # if the username is not none, then the password will be a string creds = Creds(parsed_url.username, parsed_url.password or '') else: creds = Creds(None, None) parsed_url = collections.OrderedDict(zip(('scheme', 'netloc', 'path', 'params', 'query', 'fragment'), parsed_url)) parsed_url['netloc'] = parsed_url['netloc'].split('@', 1)[-1] parsed_url['scheme'] = parsed_url['scheme'].lower() if parsed_url['scheme'] == 'file': if not allow_file: raise RuntimeError('file: URLs are not allowed to be processed') tmp_path = parsed_url['path'] if os.path.isdir(tmp_path): shutil.copytree(tmp_path, destination, symlinks=True) elif os.path.isfile(tmp_path): shutil.unpack_archive(tmp_path, destination) else: tmp_fd, tmp_path = tempfile.mkstemp(suffix='_' + os.path.basename(parsed_url['path'])) os.close(tmp_fd) tmp_file = open(tmp_path, 'wb') try: _fetch_remote(source, destination, parsed_url, creds, tmp_file, tmp_path) if os.stat(tmp_path).st_size: shutil.unpack_archive(tmp_path, destination) finally: tmp_file.close() os.remove(tmp_path) return