我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用shutil.make_archive()。
def archive_model(project): """ Use 'pyb -P model_name="<model_name>" archive_model' to create '<model_name>_CURRENTDATE.tar.gz' in 'build' directory. If model_name == 'deeppavlov_docs', then documentation from build/docs will be archived. """ import tarfile, datetime os.chdir('build') model_name = project.get_property('model_name') archive_name = model_name + '_' + datetime.date.today().strftime("%y%m%d") if model_name == 'deeppavlov_docs': import shutil shutil.make_archive(archive_name, 'gztar', 'docs', 'deeppavlov') os.chdir('..') return with tarfile.open(archive_name + '.tar.gz', "w:gz") as archive: os.chdir(model_name) for f in os.listdir('.'): if os.path.isfile(f) and (('h5' in f) or ('json' in f) or ('pkl' in f)or ('dict' in f) or ('threshold' in f) or ('data' in f) or ('index' in f) or ('meta' in f) or ('checkpoint' in f)): archive.add(f) os.chdir('..') os.chdir('..')
def send_log(): ldir = options['logsfolder'] dirs = [d for d in os.listdir(ldir) if os.path.isdir(os.path.join(ldir, d))] dirs = [os.path.join(ldir, d) for d in dirs] latest_subdir = max(dirs, key=os.path.getmtime) logfolder = latest_subdir logfile = os.path.join(ldir, 'compressedlogs') shutil.make_archive(logfile, 'zip', logfolder) logfile = logfile + '.zip' log_content = open(logfile, 'rb').read() encoded_log = base64.b64encode(bytes(log_content)) data = {'encoded_log': encoded_log, 'sample_hash': options['sample_hash']} request = urllib2.Request(options['log-server-url']) request.add_header('Content-Type', 'application/json') response = urllib2.urlopen(request, json.dumps(data)) if response.getcode() != 200: print 'Unable to send data'
def _create(self, fixture_file_path, less_verbose=0): try: self.write_info('Creating fixture %s' % fixture_file_path, 1+less_verbose) fixture_file_path = re.sub(r'\.zip$', '', fixture_file_path) # we strip away .zip if given tmp_dir = tempfile.mkdtemp() # copy media root shutil.copytree(self._media_root, join(tmp_dir, 'MEDIA_ROOT')) # database dump with open(join(tmp_dir, 'db.sql'), 'w') as fp: return_code = subprocess.call(['pg_dump', '--clean', '--no-owner', self._database_name], stdout=fp) if return_code != 0: raise CommandError('pg_dump failed with exit code {}'.format(return_code)) # creating the fixture archive archive_name = shutil.make_archive(fixture_file_path, 'zip', root_dir=tmp_dir) self.write_debug(subprocess.check_output(['unzip', '-l', archive_name])) except: self.write_debug('Temporary directory %s kept due to exception.' % tmp_dir) raise else: self.write_info('... fixture created', 1+less_verbose) shutil.rmtree(tmp_dir)
def archive(self, temp_dir): """Archive source directory to temp directory. Return archive full path. """ # We want to achive `/my/sub/{some dir}` under unique temp dir: # `/tmp/dEdjnr/{name from config}` # # So we make archive from base path `/my/sub/{some dir}`, # root path `/my/sub/` and archive name # `/tmp/dEdjnr/{name from config}` from_path = self.settings['path'] base_name = self._prepare_data_path(temp_dir) echo("... archive directory %s" % from_path) arch_path = shutil.make_archive( base_name=base_name, root_dir=os.path.dirname(from_path), base_dir=from_path, # logger=log, format=self.get_archive_format()) echo("... archived %s" % arch_path) return arch_path
def save_schema(keyspace_arg=None): host = get_rpc_address() save_path = sys.path[0] + '/.snapshots/schemas' keyspaces = get_keyspaces(host) if keyspace_arg: for ks in keyspace_arg: if ks not in keyspaces: print('ERROR: Invalid keyspace argument') exit(1) print('Saving schema . . .') print_save_path = write_schema(host, save_path) print('Saved schema as %s' % print_save_path) for ks in keyspaces: print_save_path = write_schema(host, save_path, ks) print('Saved keyspace schema as %s' % print_save_path) print('Compressing schema file') shutil.make_archive(save_path, 'zip', save_path) print('Saving ring information . . .') write_ring_info(sys.path[0] + '/.snapshots')
def zipfile_make_archive(dirname, to=None, **kwargs): """ Thin layer over shutil.make_archive. Just defaults the to to the same name as the directory by default. :param dirname: (str) The directory name :param to: (str, default None) The file path to create the archive in. :param kwargs: shutil.make_archive(**kwargs) :return: (str) The :param to file path. """ if to is None: to = dirname return shutil.make_archive(to, "zip", root_dir=dirname, **kwargs)
def ZipAttachments(f_name): arch_name = "C:\Users\Public\Intel\Logs\\" + f_name + "Attachments" files = os.listdir(dir_zip) try: shutil.make_archive(arch_name, 'zip', dir_zip) except Exception as e: pass for j in range(len(files)): try: os.remove(dir_zip + "\\" + files[j]) except Exception as e: print e #Function to take screenshot
def get_backup(): experiment = request.args.get('xp') try: experiment = to_unicode(experiment) except: return wrong_argument("Experiment name should be a non-empty string or unicode instead of '{}'".format(type(experiment))) if not experiment: return wrong_argument("xp argument is required") folder_path = tensorboard_folder.format(experiment) if not path.isdir(folder_path): return wrong_argument("Requested experiment '{}' does not exist".format(experiment)) zip_file = shutil.make_archive("/tmp/{}".format(experiment), 'zip', folder_path) return send_file(zip_file, mimetype='application/zip')
def _create_codebase_snapshot(self): def ignored_files(path, names): # see https://docs.python.org/2/library/shutil.html#shutil.copytree if os.path.abspath(path) == self.export_directory or ".git" in path or ".idea" in path: return names print "\t", path include = [".gml", ".py"] only_files = (name for name in names if os.path.isfile(os.path.join(path, name))) ignored = [name for name in only_files if not any(name.endswith(ext) for ext in include)] return ignored code_source_directory = self.local_base_path code_export_directory = os.path.join(self.export_directory, self.code_base_id) if os.path.exists(code_export_directory): raise DeploymentError("The export directory exists!") code_subdir_name = "sca" src_dir = os.path.join(code_export_directory, code_subdir_name) shutil.copytree(code_source_directory, src_dir, ignore=ignored_files) tar = shutil.make_archive(code_export_directory, format="gztar", root_dir=self.export_directory, base_dir=os.path.join(self.code_base_id, code_subdir_name)) if self.cleanup: shutil.rmtree(code_export_directory) self._generated_files.add(tar)
def __Step_MakePackage(self, overwrite_existing): print 'Making %s package...' % (self.PACKAGE_NAME or '<NONE>') if self.RELEASE_NAME_FREE_FORMAT: release_name = self.__ParseMacros(self.RELEASE_NAME_FREE_FORMAT).format(*self.VERSION) else: release_name = (self.VERSION[3] and self.__ParseMacros(self.RELEASE_NAME_WITH_BUILD_FMT % self.VERSION) or self.__ParseMacros(self.RELEASE_NAME_FMT % self.VERSION[:3])) package_file_name = self.__MakeSrcPath(os.path.join('/', self.ARCHIVE_DEST, release_name)) archive_name = package_file_name + '.zip' if os.path.exists(archive_name): if not overwrite_existing: print 'ERROR: Package for this version already exists: %s' % archive_name exit(-1) print '=> package already exists. DELETING.' os.remove(archive_name) shutil.make_archive(package_file_name, 'zip', self.__GetAbsReleaseFolder(), 'GameData') print '=> stored in:', package_file_name # Fills VERSION given the string or int compinents. The patch and build could be "*".
def zip_built(outdir): """Packages the build folder into a zip""" print("Zipping the built files!") config_file_dir = os.path.join(cwd, "config.py") if not os.path.exists(config_file_dir): sys.exit( "There dosen't seem to be a configuration file. Have you run the init command?") else: sys.path.insert(0, cwd) try: from config import website_name except: sys.exit( "Some of the configuration values could not be found! Maybe your config.py is too old. Run 'blended init' to fix.") # Remove the build folder build_dir = os.path.join(cwd, outdir) zip_dir = os.path.join(cwd, website_name.replace(" ", "_") + "-build-" + str(datetime.now().date())) if os.path.exists(build_dir): shutil.make_archive(zip_dir, 'zip', build_dir) else: print("The " + outdir + "/ folder could not be found! Have you run 'blended build' yet?")
def zip(temp_package_path): """Create the StreamAlert Lambda deployment package archive. Zips all dependency files to run the function, and names this zipfile based on the current date/time, along with the Lambda function module version. example filename: stream_alert_1.0.0_20161010_00:11:22.zip Only package in the `.py` files per AWS's instructions for creation of lambda functions. Args: temp_package_path (str): the temporary file path to store the zip. Returns: str: Deployment package full path """ LOGGER_CLI.debug('Creating Lambda package: %s', temp_package_path + '.zip') package_path = shutil.make_archive(temp_package_path, 'zip', temp_package_path) LOGGER_CLI.info('Package successfully created') return package_path
def repo(name): name = os.path.basename(name) pack_name = os.path.splitext(name)[0] print("Fetching zip archive for: {}".format(pack_name)) if not os.path.exists(os.path.join(cache_path, name)): print("Creating zip package for cache") zip_name = os.path.join(cache_path, pack_name) dir_name = os.path.join(package_path, pack_name) if not os.path.exists(dir_name): print("Such package was not found") abort(404) shutil.make_archive(zip_name, 'zip', dir_name) return send_from_directory(cache_path, name)
def cZip(cDir,tName,zName): ''' creates a zip file of the temporary directory ''' os.chdir(cDir) #zName = 'pipelign.' + time.strftime('%Y-%m-%d-%H%M%S') try: shutil.make_archive(zName,'zip',tName) except OSError as e: sys.exit(e) print('\nArchive for all temporary files created in %s.zip\n' % zName) sys.exit() #******************************************
def asynchronous(): threads = [] threads.append(gevent.spawn(loader)) for i in xrange(0, workers): threads.append(gevent.spawn(worker, i)) threads.append(gevent.spawn(writer_valid)) threads.append(gevent.spawn(state)) if invunma: threads.append(gevent.spawn(writer_invalid)) threads.append(gevent.spawn(writer_unmatched)) if grabactiv: threads.append(gevent.spawn(writer_grabber)) start = timer() gevent.joinall(threads) end = timer() if grabactiv: if snap_shot: output_filename = "grabbed_" + time.strftime("%Y%m%d-%H%M%S") shutil.make_archive(output_filename, 'zip', "grabbed") print "[INFO]Time elapsed: " + str(end - start)[:5], "seconds." print "[INFO] Done." evt.set() # cleaning up
def download(self, path=None): """ Download view method. :param path: File path. """ if not self.can_download: abort(404) logger = logging.getLogger(self.__class__.__name__) base_path, directory, path = self._normalize_path(path) # backward compatibility with base_url base_url = self.get_base_url() if base_url: base_url = urljoin(self.get_url('.index'), base_url) return redirect(urljoin(base_url, path)) if op.isdir(directory): logger.debug("Directory download asked for: %s" %path) shutil.make_archive(directory,'zip',directory) return redirect(self._get_dir_url('.index', op.dirname(path))) return send_file(directory)
def _build_downloader(target_directory): """Build the downloader Lambda deployment package.""" print('Creating downloader deploy package...') pathlib.Path(DOWNLOAD_SOURCE).touch() temp_package_dir = os.path.join(tempfile.gettempdir(), 'tmp_yara_downloader.pkg') if os.path.exists(temp_package_dir): shutil.rmtree(temp_package_dir) # Extract cbapi library. with zipfile.ZipFile(DOWNLOAD_DEPENDENCIES, 'r') as deps: deps.extractall(temp_package_dir) # Pip install backoff library (has no native dependencies). pip.main(['install', '--quiet', '--target', temp_package_dir, 'backoff']) # Copy Lambda code into the package. shutil.copy(DOWNLOAD_SOURCE, temp_package_dir) # Zip up the package and remove temporary directory. shutil.make_archive(os.path.join(target_directory, DOWNLOAD_ZIPFILE), 'zip', temp_package_dir) shutil.rmtree(temp_package_dir)
def archive(self): timestr = time.strftime("%Y%m%d-%H%M%S") archive_name = "{0}_{1}".format(self.work_dir, timestr) shutil.make_archive(archive_name, "zip", str(self.local_root_dir), str(self.work_dir.name))
def export(): """ Create a zip file from the item content """ item = application.getItemByUUID(request.args(0)) export_dir = tempfile.mkdtemp() application.exportItem(item.unique_id, export_dir) tmpdir = tempfile.mkdtemp() try: tmparchive = os.path.join(tmpdir, item.slugline) archive = shutil.make_archive(tmparchive, 'zip', export_dir) response.stream( archive, chunk_size=4096, request=request, attachment=True, filename="{}.zip".format(item.slugline) ) finally: shutil.rmtree(tmpdir) shutil.rmtree(export_dir) return ''
def create_zipdir(zipname, path, extn='zip'): """zip the path and output to same level""" output_filepath = path + os.sep + zipname zip_file_path = shutil.make_archive(output_filepath, extn, path) return zip_file_path
def zipdir(self): shutil.make_archive("media", 'zip', safe_join( settings.BASE_DIR, "media"))
def get_source(): """Download the source of this application.""" # from http://stackoverflow.com/questions/458436/adding-folders-to-a-zip-file-using-python#6511788 directory = tempfile.mkdtemp() temp_path = os.path.join(directory, APPLICATION) zip_path = shutil.make_archive(temp_path, "zip", HERE) return static_file(APPLICATION + ".zip", root=directory)
def execute(self, context): ps = context.scene.ge_publish_settings if ps.publish_default_platform: print("Publishing default platform") blender_bin_path = bpy.app.binary_path blender_bin_dir = os.path.dirname(blender_bin_path) ext = os.path.splitext(blender_bin_path)[-1].lower() WriteRuntime(os.path.join(blender_bin_dir, 'blenderplayer' + ext), os.path.join(ps.output_path, 'default', ps.runtime_name), ps.asset_paths, True, True, True, ps.make_archive, self.report ) else: print("Skipping default platform") for platform in ps.platforms: if platform.publish: print("Publishing", platform.name) WriteRuntime(platform.player_path, os.path.join(ps.output_path, platform.name, ps.runtime_name), ps.asset_paths, True, True, True, ps.make_archive, self.report ) else: print("Skipping", platform.name) return {'FINISHED'}
def move_to_out_folder(project, job, config): """ Moves the final project into the tmp folder """ build_file = config.get("out") if os.path.isfile(build_file): flash("Could not find output file", category="error") return build_folder = os.path.join(project.path, os.path.dirname(build_file)) output_folder = os.path.join(project.output, job.name) shutil.make_archive(output_folder, "zip", build_folder) remove_tree(build_folder) return output_folder + ".zip"
def _make_archive(self, dir_name): """Make archive of the specified directory near that directory. """ return shutil.make_archive(dir_name, root_dir=dir_name, base_dir=None, format=self.get_archive_format())
def zip_folder(folder, name, format='zip'): print('zip %s into %s compressed file' % (folder, name)) shutil.make_archive(name, format, '.', folder) print('remove %s folder' % folder) shutil.rmtree(folder)
def archive(env): print('Archiving LOVE project \'%s\'...' % env.conf.identifier) # Archives the project directory into a .love file if not os.path.exists(env.dist_dir): os.makedirs(env.dist_dir) # Creates the .zip file tmp_dir = tempfile.mkdtemp() copytree(env.project_dir, tmp_dir, ignore=ignored_files) shutil.make_archive(env.love_file, 'zip', tmp_dir) shutil.rmtree(tmp_dir) # Renames to .love, since shutil.make_archive() adds .zip os.rename(env.love_file + '.zip', env.love_file) print('Archival complete!')
def main(src_dir, new_dir, map_d, archer): """ Call out the necessary functions.""" print "Creating a copy of %s in %s" % (src_dir, new_dir) shutil.copytree(src_dir, new_dir, symlinks=False, ignore=shutil.ignore_patterns('*.pyc')) print "Changing the paths according to the mapping table." for key in map_d.keys(): grep_and_sed(key, new_dir, map_d[key]) fn = shutil.make_archive(new_dir, 'tar', new_dir) print "%s can now be copied elsewhere and used." %(fn)
def _create_backup(self, version): logger.debug('Creating backup for {}'.format(version)) shutil.make_archive(self._backup_name(version), self.backup_format, self.backup_target)
def zip_dir(root_path, save_path, title): # use shutil.make_archive in python2.7 rootlength = len(root_path) z = zipfile.ZipFile(save_path + '/' + title + '.zip', 'w', zipfile.ZIP_DEFLATED) for root, dirs, files in os.walk(root_path): for f in files: filename = os.path.join(root_path, f) z.write(filename, filename[rootlength:]) z.close()
def tinc_client_openwrt_config_tar(client): basedir = mkdtemp() tinc_config_base = os.path.join(basedir, 'etc', 'tinc', client.gateway.nickname) os.makedirs(tinc_config_base) os.makedirs(os.path.join(tinc_config_base, 'hosts')) with open(os.path.join(tinc_config_base, 'tinc.conf'), 'w') as conffile: conffile.write(tinc_client_conf(client)) with open(os.path.join(tinc_config_base, 'tinc_up'), 'w') as conffile: conffile.write(tinc_client_tinc_up(client)) with open(os.path.join(tinc_config_base, 'tinc_down'), 'w') as conffile: conffile.write(tinc_client_tinc_down(client)) with open(os.path.join(tinc_config_base, 'hosts', client.gateway.nickname), 'w') as conffile: conffile.write(tinc_gateway_host(client.gateway)) with open(os.path.join(tinc_config_base, 'hosts', client.member.username), 'w') as conffile: conffile.write(tinc_client_host(client)) openwrt_config_base = os.path.join(basedir, 'etc', 'config') os.makedirs(openwrt_config_base) with open(os.path.join(openwrt_config_base, 'firewall'), 'w') as conffile: conffile.write(tinc_client_openwrt_firewall_config(client)) with open(os.path.join(openwrt_config_base, 'tinc'), 'w') as conffile: conffile.write(tinc_client_openwrt_tinc_config(client)) tarfile = make_archive('openwrt_config', 'gztar', root_dir=basedir) with open(tarfile, 'rb') as tarfile: return tarfile.read()
def compress(path): if not os.path.exists(path): print 'Error: {0} does not exist'.format(path) sys.exit(1) shutil.make_archive(path, 'zip', path) return path + '.zip'
def __create_jar(classes_path, jar_path): zip_path = shutil.make_archive(jar_path, 'zip', classes_path) os.rename(zip_path, jar_path)
def artifact(*args, **kwargs): """ Generate an artifact for the app. Will be located at ./build :param args: :param kwargs: :return: """ target = kwargs.pop('target') name = get_config(target)['name'] zip_name = '{}/builds/{}-artifact-{}'.format(target, name, int(time.time())) builds_dir = '{}/builds'.format(target) if os.path.exists(builds_dir): shutil.rmtree(builds_dir) os.mkdir(builds_dir) os.mkdir('{}/{}-artifact'.format(builds_dir, name)) directory_list = ['{}/app'.format(target)] file_list = ['{}/app_index.py'.format(target), '{}/env.dist.yml'.format(target), '{}/tight.yml'.format(target)] create_zip = ['zip', '-9', zip_name] subprocess.call(create_zip) artifact_dir = '{}/builds/{}-artifact/'.format(target, name) for dir in directory_list: cp_dir_command = ['cp', '-R', dir, artifact_dir] subprocess.call(cp_dir_command) for file_name in file_list: cp_file_command = ['cp', file_name, artifact_dir] subprocess.call(cp_file_command) shutil.make_archive(zip_name, 'zip', root_dir=artifact_dir)
def main(pool): options = get_options() options.compose_file.close() syslog.syslog(syslog.LOG_INFO, "Options are {0}".format(options)) compose_cmd = get_compose_cmd(options) container_ids = get_container_id_mapping(pool, compose_cmd) syslog.syslog(syslog.LOG_INFO, "Container ID mapping {0}".format( container_ids)) tmp_dir = tempfile.mkdtemp() atexit.register(lambda: shutil.rmtree(tmp_dir)) syslog.syslog(syslog.LOG_INFO, "Temporary directory: {0}".format(tmp_dir)) snapshot_dir = os.path.join(tmp_dir, "snapshot") for name in container_ids: os.makedirs(os.path.join(snapshot_dir, name)) with closing_pool(pool): process_main_files(pool, snapshot_dir, compose_cmd, container_ids) for name, container_id in container_ids.items(): process_service_files(pool, name, container_id, snapshot_dir, compose_cmd) syslog.syslog(syslog.LOG_INFO, "Information was collected.") make_archive(tmp_dir, options.snapshot_path) syslog.syslog(syslog.LOG_INFO, "Data is collected")
def make_archive(collected_dir, result_path): formats = {name for name, description in shutil.get_archive_formats()} for fmt in "xztar", "bztar", "gztar": if fmt in formats: archive_format = fmt break else: archive_format = "tar" shutil.make_archive(result_path, archive_format, collected_dir, "snapshot")
def exportProj(self, event): self.chooser.setDialogTitle("Save project") Ffilter = FileNameExtensionFilter("Zip files", ["zip"]) self.chooser.setFileFilter(Ffilter) returnVal = self.chooser.showSaveDialog(None) if returnVal == JFileChooser.APPROVE_OPTION: dst = str(self.chooser.getSelectedFile()) shutil.make_archive(dst, "zip", self.getCurrentProjPath()) self.popup("Project exported successfully")
def _check_rootfs(self): """ If this file contains a known filesystem type, extract it. """ if not self.get_rootfs_status(): for module in binwalk.scan(self.item, "-e", "-r", "-y", "filesystem", signature=True, quiet=True): for entry in module.results: self.printf(">>>> %s" % entry.description) break if module.extractor.directory: unix = Extractor.io_find_rootfs(module.extractor.directory) if not unix[0]: self.printf(">>>> Extraction failed!") return False self.printf(">>>> Found Linux filesystem in %s!" % unix[1]) if self.output: shutil.make_archive(self.output, "gztar", root_dir=unix[1]) else: self.extractor.do_rootfs = False return True return False
def createZipFile(source_dir, zip_path): print printer.title("Creating zip File: ") + zip_path try: if os.path.exists(zip_path): os.remove(zip_path) shutil.make_archive(zip_path, 'zip', source_dir) except: e = sys.exc_info()[0] print e print printer.okGreen("DONE!") return
def build(self, ver): var1 = ('apd_ver'+ver, 'apd_ver'+ver+'.zip') var2 = self.www_path home = expanduser("~") pwd = os.getcwd() self.printer('Archiving buildozer/android/app/ contents into %s' % (var1[1])) shutil.make_archive(home+'/'+var1[0], 'zip', root_dir='.buildozer/android/app/') r = self.try_move(home+'/'+var1[1], var2) if r == True: self.printer(colorama.Fore.GREEN + 'Build update successful') else: self.printer(colorama.Fore.RED + 'Build update failed')
def build_no_buildozer(self, ver): var1 = ('apd_ver'+ver, 'apd_ver'+ver+'.zip') var2 = self.www_path home = expanduser("~") pwd = os.getcwd() tempdir = 'temporary_apupdater_folder1111' temppath = '%s/Desktop/%s/' % (home, tempdir) self.printer('Creating tempdir "%s"' % (temppath)) self.mkdir(temppath) self.printer('Moving bin and .buildozer to tempdir') self.try_move('bin', temppath) self.try_move('.buildozer', temppath) self.printer('Archiving active folder contents into %s' % (var1[1])) shutil.make_archive(home+'/'+var1[0], 'zip', root_dir='.') r = self.try_move(home+'/'+var1[1], var2) self.printer('Moving bin and .buildozer back to active dir') self.try_move(temppath+'/bin', pwd+'/') self.try_move(temppath+'/.buildozer', pwd+'/') self.printer('Removing tempdir') self.rmdir(temppath) if r == True: self.printer(colorama.Fore.GREEN + 'Build update successful') else: self.printer(colorama.Fore.RED + 'Build update failed')
def createArchive(path, name): print (text.creatingPackage) shutil.make_archive(name, "tar", path)
def compress_log(cfg): tmp_compressed_path = "/tmp/output.zip" if os.path.exists(tmp_compressed_path): os.remove(tmp_compressed_path) dest = os.path.join(cfg.file_log_dir,"output.zip") if os.path.exists(dest): os.remove(dest) f_name = shutil.make_archive("/tmp/output","zip",cfg.file_log_dir) shutil.move(f_name,dest) log.info("log files were packed into %s",cfg.file_log_dir)
def _zip_and_upload_code(self): self._logger.debug('Zipping project code in preparation to send') archive_dir = os.path.abspath(os.path.join(self._project_path, '..')) place = os.path.join(self._project_path, 'full_code') shutil.make_archive(base_name=place, format='zip', root_dir=archive_dir, base_dir='NASA_Project') self._logger.debug('Zipping done') self._s3_client.upload_file(self._project_bucket, os.path.join(self._project_path, 'full_code.zip'), 'full_code.zip')
def zip(markdown_file: Path): """ Generate a ZIP file with the presentation. """ markdown_file = Path(markdown_file) # We copy the directory because `make_archive` cannot follow symlinks... with TemporaryDirectory() as tmpdir: tmpdir = Path(tmpdir) / 'out' generate(markdown_file) config = load_config() copytree(src=str(config['output_path']), dst=str(tmpdir)) make_archive(markdown_file.stem, format='zip', root_dir=str(tmpdir))
def store_submission(id, dirs: DirectoryStructure): """ Store files submitted by a user and create an archive for workers convenience. Expects that the body of the POST request uses file paths as keys and the content of the files as values. """ # Make a separate directory for the submitted files job_dir = os.path.join(dirs.submission_dir, id) os.makedirs(job_dir, exist_ok=True) # Save each received file for name, content in request.files.items(): # Get the directory of the file path and create it, if necessary dirname = os.path.dirname(name) if dirname: os.makedirs(os.path.join(job_dir, dirname), exist_ok=True) # Save the file with open(os.path.join(job_dir, name), 'wb') as f: content.save(f) # Make an archive that contains the submitted files shutil.make_archive(os.path.join(dirs.archive_dir, id), "zip", root_dir=dirs.submission_dir, base_dir=id) # Return the path to the archive return json.dumps({ "archive_path": url_for('fileserver.get_submission_archive', id=id, ext='zip'), "result_path": url_for('fileserver.store_result', id=id, ext='zip') })
def zip_archive(self): """Create ZIP files for all archive directories.""" dst_folder_list = os.listdir(self.archive_path) for folder in dst_folder_list: folder_path = os.path.join(self.archive_path, folder) if folder in self.saf_folder_list and os.path.isdir(folder_path): shutil.make_archive(folder_path, 'zip', folder_path)