我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用zipfile.ZipFile()。
def fetch_data(): try: r = requests.get(MTG_JSON_URL) except requests.ConnectionError: r = requests.get(FALLBACK_MTG_JSON_URL) with closing(r), zipfile.ZipFile(io.BytesIO(r.content)) as archive: unzipped_files = archive.infolist() if len(unzipped_files) != 1: raise RuntimeError("Found an unexpected number of files in the MTGJSON archive.") data = archive.read(archive.infolist()[0]) decoded_data = data.decode('utf-8') sets_data = json.loads(decoded_data) return sets_data
def generate_info(): tickets_archive_path = ROOT_DIR_PATH.joinpath('tickets.zip') ensure_data_file(tickets_archive_path, DATA_FILE_INFO['TICKETS_URL']) with zipfile.ZipFile(str(tickets_archive_path)) as zf: for name in zf.namelist(): stem, ext = os.path.splitext(name) if ext != '.csv': continue with zf.open(name) as f: # Zipfile only opens file in binary mode, but csv only accepts # text files, so we need to wrap this. # See <https://stackoverflow.com/questions/5627954>. textfile = io.TextIOWrapper(f, encoding='utf8', newline='') for row in csv.DictReader(textfile): yield Registration(row)
def zipdir(archivename, basedir): '''Zip directory, from J.F. Sebastian http://stackoverflow.com/''' assert os.path.isdir(basedir) with closing(ZipFile(archivename, "w", ZIP_DEFLATED)) as z: for root, dirs, files in os.walk(basedir): #NOTE: ignore empty directories for fn in files: if fn[-4:]!='.zip': absfn = os.path.join(root, fn) zfn = absfn[len(basedir)+len(os.sep):] #XXX: relative path z.write(absfn, zfn) # ================ Inventory input data and create data structure =================
def pickle_load(path, compression=False): """Unpickle a possible compressed pickle. Parameters ---------- path: str path to the output file compression: bool if true assumes that pickle was compressed when created and attempts decompression. Returns ------- obj: object the unpickled object """ if compression: with zipfile.ZipFile(path, "r", compression=zipfile.ZIP_DEFLATED) as myzip: with myzip.open("data") as f: return pickle.load(f) else: with open(path, "rb") as f: return pickle.load(f)
def prepare_zip(): from pkg_resources import resource_filename as resource from config import config from json import dumps logger.info('creating/updating gimel.zip') with ZipFile('gimel.zip', 'w', ZIP_DEFLATED) as zipf: info = ZipInfo('config.json') info.external_attr = 0o664 << 16 zipf.writestr(info, dumps(config)) zipf.write(resource('gimel', 'config.py'), 'config.py') zipf.write(resource('gimel', 'gimel.py'), 'gimel.py') zipf.write(resource('gimel', 'logger.py'), 'logger.py') for root, dirs, files in os.walk(resource('gimel', 'vendor')): for file in files: real_file = os.path.join(root, file) relative_file = os.path.relpath(real_file, resource('gimel', '')) zipf.write(real_file, relative_file)
def download_current_dataset(self, dest_path='.', unzip=True): now = datetime.now().strftime('%Y%m%d') file_name = 'numerai_dataset_{0}.zip'.format(now) dest_file_path ='{0}/{1}'.format(dest_path, file_name) r = requests.get(self._dataset_url) if r.status_code!=200: return r.status_code with open(dest_file_path, "wb") as fp: for byte in r.content: fp.write(byte) if unzip: with zipfile.ZipFile(dest_file_path, "r") as z: z.extractall(dest_path) return r.status_code
def __init__(self, file): self.file = file if file == '': self.infile = sys.stdin elif file.lower().startswith('http://') or file.lower().startswith('https://'): try: if sys.hexversion >= 0x020601F0: self.infile = urllib23.urlopen(file, timeout=5) else: self.infile = urllib23.urlopen(file) except urllib23.HTTPError: print('Error accessing URL %s' % file) print(sys.exc_info()[1]) sys.exit() elif file.lower().endswith('.zip'): try: self.zipfile = zipfile.ZipFile(file, 'r') self.infile = self.zipfile.open(self.zipfile.infolist()[0], 'r', C2BIP3('infected')) except: print('Error opening file %s' % file) print(sys.exc_info()[1]) sys.exit() else: try: self.infile = open(file, 'rb') except: print('Error opening file %s' % file) print(sys.exc_info()[1]) sys.exit() self.ungetted = []
def test_pydist(): """Make sure pydist.json exists and validates against our schema.""" # XXX this test may need manual cleanup of older wheels import jsonschema def open_json(filename): return json.loads(open(filename, 'rb').read().decode('utf-8')) pymeta_schema = open_json(resource_filename('wheel.test', 'pydist-schema.json')) valid = 0 for dist in ("simple.dist", "complex-dist"): basedir = pkg_resources.resource_filename('wheel.test', dist) for (dirname, subdirs, filenames) in os.walk(basedir): for filename in filenames: if filename.endswith('.whl'): whl = ZipFile(os.path.join(dirname, filename)) for entry in whl.infolist(): if entry.filename.endswith('/metadata.json'): pymeta = json.loads(whl.read(entry).decode('utf-8')) jsonschema.validate(pymeta, pymeta_schema) valid += 1 assert valid > 0, "No metadata.json found"
def test_zipfile_attributes(): # With the change from ZipFile.write() to .writestr(), we need to manually # set member attributes. with temporary_directory() as tempdir: files = (('foo', 0o644), ('bar', 0o755)) for filename, mode in files: path = os.path.join(tempdir, filename) with codecs.open(path, 'w', encoding='utf-8') as fp: fp.write(filename + '\n') os.chmod(path, mode) zip_base_name = os.path.join(tempdir, 'dummy') zip_filename = wheel.archive.make_wheelfile_inner( zip_base_name, tempdir) with readable_zipfile(zip_filename) as zf: for filename, mode in files: info = zf.getinfo(os.path.join(tempdir, filename)) assert info.external_attr == (mode | 0o100000) << 16 assert info.compress_type == zipfile.ZIP_DEFLATED
def handle_file(self, fname): with open(fname, 'rb') as fd: if fd.read(4) == b'dex\n': new_jar = self.name + '/classes-dex2jar.jar' run([dex2jar, fname, '-f', '-o', new_jar], cwd=self.name, stderr=DEVNULL) fname = new_jar with ZipFile(fname) as jar: jar.extractall(self.name) for cls in jar.namelist(): if cls.endswith('.class'): cls = cls.replace('/', '.')[:-6] self.classes.append(cls) elif cls.endswith('.dex'): self.handle_file(self.name + '/' + cls) elif cls.endswith('.proto'): self.bonus_protos[cls] = jar.read(cls).decode('utf8') elif cls.endswith('.so'): self.bonus_protos.update(walk_binary(self.name + '/' + cls))
def load_names_data(): fp = os.path.join(tempfile.gettempdir(), ZIP_NAME) if not os.path.exists(fp): r = requests.get(URL_NAMES) with open(fp, 'wb') as f: f.write(r.content) post = collections.OrderedDict() with zipfile.ZipFile(fp) as zf: # get ZipInfo instances for zi in sorted(zf.infolist(), key=lambda zi: zi.filename): fn = zi.filename if fn.startswith('yob'): year = int(fn[3:7]) df = pd.read_csv( zf.open(zi), header=None, names=('name', 'gender', 'count')) df['year'] = year post[year] = df df = pd.concat(post.values()) df.set_index('name', inplace=True, drop=True) return df
def file(self): """ Returns a file pointer to this binary :example: >>> process_obj = c.select(Process).where("process_name:svch0st.exe").first() >>> binary_obj = process_obj.binary >>> print(binary_obj.file.read(2)) MZ """ # TODO: I don't like reaching through to the session... with closing(self._cb.session.get("/api/v1/binary/{0:s}".format(self.md5sum), stream=True)) as r: z = StringIO(r.content) zf = ZipFile(z) fp = zf.open('filedata') return fp
def _create_lambda(arn, func_name, func_desc, lambda_handler, lambda_main, runtime): func = dict() lamb = boto3.client('lambda') with open(temp_deploy_zip) as deploy: func['ZipFile'] = deploy.read() try: resp = lamb.create_function( FunctionName=func_name, Runtime=runtime, Publish=True, Description=func_desc, Role=arn, Code=func, Handler='{0}.{1}'.format( lambda_main, lambda_handler )) logging.info("Create Lambda Function resp:{0}".format( json.dumps(resp, indent=4, sort_keys=True)) ) return resp except ClientError as ce: if ce.response['Error']['Code'] == 'ValidationException': logging.warning("Validation Error {0} creating function '{1}'.".format( ce, func_name)) else: logging.error("Unexpected Error: {0}".format(ce))
def _update_lambda_function(zip_file, func_name): lamb = boto3.client('lambda') try: resp = lamb.update_function_code( FunctionName=func_name, ZipFile=zip_file.read(), Publish=True ) return resp['Version'] except ClientError as ce: if ce.response['Error']['Code'] == 'ValidationException': logging.warning( "Validation Error {0} updating function '{1}'.".format( ce, func_name)) else: logging.error("Unexpected Error: {0}".format(ce))
def download_driver_file(whichbin, url, base_path): if url.endswith('.tar.gz'): ext = '.tar.gz' else: ext = '.zip' print("Downloading from: {}".format(url)) download_file(url, '/tmp/pwr_temp{}'.format(ext)) if ext == '.tar.gz': import tarfile tar = tarfile.open('/tmp/pwr_temp{}'.format(ext), "r:gz") tar.extractall('{}/'.format(base_path)) tar.close() else: import zipfile with zipfile.ZipFile('/tmp/pwr_temp{}'.format(ext), "r") as z: z.extractall('{}/'.format(base_path)) # if whichbin == 'wires' and '/v{}/'.format(latest_gecko_driver) in url: # os.rename('{}/geckodriver'.format(base_path), # '{}/wires'.format(base_path)) # os.chmod('{}/wires'.format(base_path), 0o775) if whichbin == 'wires': os.chmod('{}/geckodriver'.format(base_path), 0o775) else: os.chmod('{}/chromedriver'.format(base_path), 0o775)
def reseed(self, netdb): """Compress netdb entries and set content""" zip_file = io.BytesIO() dat_files = [] for root, dirs, files in os.walk(netdb): for f in files: if f.endswith(".dat"): # TODO check modified time # may be not older than 10h dat_files.append(os.path.join(root, f)) if len(dat_files) == 0: raise PyseederException("Can't get enough netDb entries") elif len(dat_files) > 75: dat_files = random.sample(dat_files, 75) with ZipFile(zip_file, "w", compression=ZIP_DEFLATED) as zf: for f in dat_files: zf.write(f, arcname=os.path.split(f)[1]) self.FILE_TYPE = 0x00 self.CONTENT_TYPE = 0x03 self.CONTENT = zip_file.getvalue() self.CONTENT_LENGTH = len(self.CONTENT)
def _load(self, fn, notify=False): "Load the video from a ZIP file" with ZipFile(fn) as zf: self._loadMeta(zf) self._costumes = [] i = 0 while i >= 0: try: if notify: notify(fn, i, self) data = zf.read(str(i)) if data: data = data[:-12], data[-12:] else: data = self._costumes[i-1] i += 1 self._costumes.append(data) except: if notify: notify(fn, None, self) i = -1
def compare_component_output(self, input_path, expected_output_path): rendering_engine = self.get_rendering_engine() temp_dir = tempfile.gettempdir() output_dir = os.path.join(temp_dir, str(uuid.uuid4())) process_sketch_archive(zip_path=input_path, compress_zip=False, output_path=output_dir, engine=rendering_engine) self.assertTrue(dircmp.is_same(expected_output_path, output_dir)) shutil.rmtree(output_dir) storage.clear() output_zip = os.path.join(temp_dir, "{}.zip".format(str(uuid.uuid4()))) process_sketch_archive(zip_path=input_path, compress_zip=True, output_path=output_zip, engine=rendering_engine) z = zipfile.ZipFile(output_zip) z.extractall(output_dir) self.assertTrue(dircmp.is_same(expected_output_path, output_dir)) shutil.rmtree(output_dir) os.remove(output_zip)
def write_file_to_zip_at_path(zip_path=None, archive=None, file_path='', content='', encode=False): if zip_path: ensure_directories_for_path(zip_path) z = zipfile.ZipFile(zip_path, mode="a") elif archive: z = archive else: raise Exception('One of zip_path, archive must be provided.') print("Adding {} to archive ...".format(file_path)) if encode: z.writestr(file_path, content.encode('utf-8')) else: z.writestr(file_path, content) storage.file_paths.add(file_path) if zip_path: z.close()
def download_celeb_a(base_path): data_path = os.path.join(base_path, 'celebA') images_path = os.path.join(data_path, 'images') if os.path.exists(data_path): print('[!] Found celeb-A - skip') return filename, drive_id = "img_align_celeba.zip", "0B7EVK8r0v71pZjFTYXZWM3FlRnM" save_path = os.path.join(base_path, filename) if os.path.exists(save_path): print('[*] {} already exists'.format(save_path)) else: download_file_from_google_drive(drive_id, save_path) zip_dir = '' with zipfile.ZipFile(save_path) as zf: zip_dir = zf.namelist()[0] zf.extractall(base_path) if not os.path.exists(data_path): os.mkdir(data_path) os.rename(os.path.join(base_path, "img_align_celeba"), images_path) os.remove(save_path) download_attr_file(data_path)
def listdir(path): """Replacement for os.listdir that works in frozen environments.""" if not hasattr(sys, 'frozen'): return os.listdir(path) (zipPath, archivePath) = splitZip(path) if archivePath is None: return os.listdir(path) with zipfile.ZipFile(zipPath, "r") as zipobj: contents = zipobj.namelist() results = set() for name in contents: # components in zip archive paths are always separated by forward slash if name.startswith(archivePath) and len(name) > len(archivePath): name = name[len(archivePath):].split('/')[0] results.add(name) return list(results)
def find_package(self, package): for path in self.paths(): full = os.path.join(path, package) if os.path.exists(full): return package, full if not os.path.isdir(path) and zipfile.is_zipfile(path): zip = zipfile.ZipFile(path, 'r') try: zip.read(os.path.join(package, '__init__.py')) except KeyError: pass else: zip.close() return package, full zip.close() ## FIXME: need special error for package.py case: raise InstallationError( 'No package with the name %s found' % package)
def create_zipfile(self, filename): zip_file = zipfile.ZipFile(filename, "w") try: self.mkpath(self.target_dir) # just in case for root, dirs, files in os.walk(self.target_dir): if root == self.target_dir and not files: raise DistutilsOptionError( "no files found in upload directory '%s'" % self.target_dir) for name in files: full = os.path.join(root, name) relative = root[len(self.target_dir):].lstrip(os.path.sep) dest = os.path.join(relative, name) zip_file.write(full, dest) finally: zip_file.close()
def setUp(self): if self.datafile is None or self.dataname is None: return if not os.path.isfile(self.datafile): self.old_cwd = None return self.old_cwd = os.getcwd() self.temp_dir = tempfile.mkdtemp() zip_file, source, target = [None, None, None] try: zip_file = zipfile.ZipFile(self.datafile) for files in zip_file.namelist(): _extract(zip_file, files, self.temp_dir) finally: if zip_file: zip_file.close() del zip_file os.chdir(os.path.join(self.temp_dir, self.dataname))
def test_create_zipfile(self): # Test to make sure zipfile creation handles common cases. # This explicitly includes a folder containing an empty folder. dist = Distribution() cmd = upload_docs(dist) cmd.upload_dir = self.upload_dir cmd.target_dir = self.upload_dir tmp_dir = tempfile.mkdtemp() tmp_file = os.path.join(tmp_dir, 'foo.zip') try: zip_file = cmd.create_zipfile(tmp_file) assert zipfile.is_zipfile(tmp_file) zip_file = zipfile.ZipFile(tmp_file) # woh... assert zip_file.namelist() == ['index.html'] zip_file.close() finally: shutil.rmtree(tmp_dir)
def tutor_fpout(): pklout = os.path.join(RESDIR, TUTORPKL) if os.path.exists(pklout): with open(pklout, 'rb') as f: fpout = pickle.load(f) else: print('re-creating fp results ... this could take a few minutes') zip_archive = os.path.join(DATADIR, ZIPFILE) with zipfile.ZipFile(zip_archive, 'r') as zfile: zfile.extractall(DATADIR) fpout = tutor_example() make_clean_dat() os.makedirs(RESDIR, exist_ok=True) with open(pklout, 'wb') as f: pickle.dump(fpout, f) return fpout
def hfdata_plot(): make_quickdata() unit_convert = {'q': 1e-3, 'c': 1e-6, 'P': 1e3} converters = { k: _converter_func(float(v), 0.) for k, v in unit_convert.items()} converters['T'] = _converter_func(1., 273.15) with zipfile.ZipFile(os.path.join(DATADIR, ZIPFILE), 'r') as zarch: with zarch.open(QUICKFILE, 'r') as datafile: data = HFData( fname=datafile, cols=(2, 3, 4, 6, 5, 7, 8), converters=converters, delimiter=",", skip_header=4) fig = plot_hfdata(data) return fig
def extract_zip_file(zipFilePath, extractDir): if not os.path.exists(extractDir): os.mkdir(extractDir) print '''Extracting %s to %s...''' % (zipFilePath, extractDir), zfile = zipfile.ZipFile(zipFilePath) uncompress_size = sum((file.file_size for file in zfile.infolist())) extracted_size = 0 print '\n' for _file in zfile.infolist(): extracted_size += _file.file_size sys.stdout.write(" %s%%\t\t%s\n" % (extracted_size * 100/uncompress_size, _file.filename)) zfile.extract(_file, path=extractDir) # ORIG zip.extractall(path=extractDir) print 'Ok'
def get_client_version(my): '''API Function: get_client_version() @return: string - Version of TACTIC that this client came from''' # may use pkg_resources in 2.6 if '.zip' in __file__: import zipfile parts = __file__.split('.zip') zip_name = '%s.zip'%parts[0] if zipfile.is_zipfile(zip_name): z = zipfile.ZipFile(zip_name) version = z.read('pyasm/application/common/interpreter/tactic_client_lib/VERSION') version = version.strip() z.close() else: dir = os.path.dirname(__file__) f = open('%s/VERSION' % dir, 'r') version = f.readline().strip() f.close() return version
def get_client_api_version(my): '''API Function: get_client_api_version() @return: string - client api version''' # may use pkg_resources in 2.6 if '.zip' in __file__: import zipfile parts = __file__.split('.zip') zip_name = '%s.zip'%parts[0] if zipfile.is_zipfile(zip_name): z = zipfile.ZipFile(zip_name) version = z.read('pyasm/application/common/interpreter/tactic_client_lib/VERSION_API') version = version.strip() z.close() else: dir = os.path.dirname(__file__) f = open('%s/VERSION_API' % dir, 'r') version = f.readline().strip() f.close() return version
def test_export_result_csv_no_tasks_returns_empty_file(self): """Test WEB export Result to CSV returns empty file if no results in project.""" project = ProjectFactory.create(short_name='no_tasks_here') uri = "/project/%s/tasks/export?type=result&format=csv" % project.short_name res = self.app.get(uri, follow_redirects=True) zip = zipfile.ZipFile(StringIO(res.data)) extracted_filename = zip.namelist()[0] csv_content = StringIO(zip.read(extracted_filename)) csvreader = unicode_csv_reader(csv_content) is_empty = True for line in csvreader: is_empty = False, line assert is_empty
def test_pydist(): """Make sure pydist.json exists and validates against our schema.""" # XXX this test may need manual cleanup of older wheels import jsonschema def open_json(filename): with open(filename, 'rb') as json_file: return json.loads(json_file.read().decode('utf-8')) pymeta_schema = open_json(resource_filename('wheel.test', 'pydist-schema.json')) valid = 0 for dist in ("simple.dist", "complex-dist"): basedir = pkg_resources.resource_filename('wheel.test', dist) for (dirname, subdirs, filenames) in os.walk(basedir): for filename in filenames: if filename.endswith('.whl'): whl = ZipFile(os.path.join(dirname, filename)) for entry in whl.infolist(): if entry.filename.endswith('/metadata.json'): pymeta = json.loads(whl.read(entry).decode('utf-8')) jsonschema.validate(pymeta, pymeta_schema) valid += 1 assert valid > 0, "No metadata.json found"
def init_zoneinfo(): """ Add each zone info to the datastore. This will overwrite existing zones. This must be called before the AppengineTimezoneLoader will work. """ import os, logging from zipfile import ZipFile zoneobjs = [] zoneinfo_path = os.path.abspath(os.path.join(os.path.dirname(__file__), 'zoneinfo.zip')) with ZipFile(zoneinfo_path) as zf: for zfi in zf.filelist: key = ndb.Key('Zoneinfo', zfi.filename, namespace=NDB_NAMESPACE) zobj = Zoneinfo(key=key, data=zf.read(zfi)) zoneobjs.append(zobj) logging.info("Adding %d timezones to the pytz-appengine database" % len(zoneobjs) ) ndb.put_multi(zoneobjs)
def get_appliances(): """ This will download an archive of all GNS3 appliances and load them to a dictionnary """ appliances = {} url = 'https://github.com/GNS3/gns3-registry/archive/master.zip' response = urllib.request.urlopen(url) z = zipfile.ZipFile(io.BytesIO(response.read())) for path in z.namelist(): if path.endswith('.gns3a'): with z.open(path, 'r') as f: id = os.path.basename(path).split('.')[0] appliance = json.loads(f.read().decode()) if appliance["status"] != "broken": appliances[id] = appliance return appliances
def extract_zipfile(archive_name, destpath): "Unpack a zip file" archive = zipfile.ZipFile(archive_name) archive.extractall(destpath)
def get_zip_class(): """ Supplement ZipFile class to support context manager for Python 2.6 """ class ContextualZipFile(zipfile.ZipFile): def __enter__(self): return self def __exit__(self, type, value, traceback): self.close return zipfile.ZipFile if hasattr(zipfile.ZipFile, '__exit__') else \ ContextualZipFile
def download_glove(glove): if os.path.exists(glove): return print('Downloading glove...') with tempfile.TemporaryFile() as tmp: with urllib.request.urlopen('http://nlp.stanford.edu/data/glove.42B.300d.zip') as res: shutil.copyfileobj(res, tmp) with zipfile.ZipFile(tmp, 'r') as glove_zip: glove_zip.extract('glove.42B.300d.txt', path=os.path.dirname(glove)) print('Done')
def _unpack_zipfile(filename, extract_dir): """Unpack zip `filename` to `extract_dir` """ try: import zipfile except ImportError: raise ReadError('zlib not supported, cannot unpack this archive.') if not zipfile.is_zipfile(filename): raise ReadError("%s is not a zip file" % filename) zip = zipfile.ZipFile(filename) try: for info in zip.infolist(): name = info.filename # don't extract absolute paths or ones with .. in them if name.startswith('/') or '..' in name: continue target = os.path.join(extract_dir, *name.split('/')) if not target: continue _ensure_directory(target) if not name.endswith('/'): # file data = zip.read(info.filename) f = open(target, 'wb') try: f.write(data) finally: f.close() del data finally: zip.close()
def __new__(cls, *args, **kwargs): """ Construct a ZipFile or ContextualZipFile as appropriate """ if hasattr(zipfile.ZipFile, '__exit__'): return zipfile.ZipFile(*args, **kwargs) return super(ContextualZipFile, cls).__new__(cls)
def unzip_file(filename, location, flatten=True): """ Unzip the file (with path `filename`) to the destination `location`. All files are written based on system defaults and umask (i.e. permissions are not preserved), except that regular file members with any execute permissions (user, group, or world) have "chmod +x" applied after being written. Note that for windows, any execute changes using os.chmod are no-ops per the python docs. """ ensure_dir(location) zipfp = open(filename, 'rb') try: zip = zipfile.ZipFile(zipfp, allowZip64=True) leading = has_leading_dir(zip.namelist()) and flatten for info in zip.infolist(): name = info.filename data = zip.read(name) fn = name if leading: fn = split_leading_dir(name)[1] fn = os.path.join(location, fn) dir = os.path.dirname(fn) if fn.endswith('/') or fn.endswith('\\'): # A directory ensure_dir(fn) else: ensure_dir(dir) fp = open(fn, 'wb') try: fp.write(data) finally: fp.close() mode = info.external_attr >> 16 # if mode and regular file and any execute permissions for # user/group/world? if mode and stat.S_ISREG(mode) and mode & 0o111: # make dest file have execute for user/group/world # (chmod +x) no-op on windows per python docs os.chmod(fn, (0o777 - current_umask() | 0o111)) finally: zipfp.close()
def readable_zipfile(path): # zipfile.ZipFile() isn't a context manager under Python 2. zf = zipfile.ZipFile(path, 'r') try: yield zf finally: zf.close()
def test_verifying_zipfile(): if not hasattr(zipfile.ZipExtFile, '_update_crc'): pytest.skip('No ZIP verification. Missing ZipExtFile._update_crc.') sio = StringIO() zf = zipfile.ZipFile(sio, 'w') zf.writestr("one", b"first file") zf.writestr("two", b"second file") zf.writestr("three", b"third file") zf.close() # In default mode, VerifyingZipFile checks the hash of any read file # mentioned with set_expected_hash(). Files not mentioned with # set_expected_hash() are not checked. vzf = wheel.install.VerifyingZipFile(sio, 'r') vzf.set_expected_hash("one", hashlib.sha256(b"first file").digest()) vzf.set_expected_hash("three", "blurble") vzf.open("one").read() vzf.open("two").read() try: vzf.open("three").read() except wheel.install.BadWheelFile: pass else: raise Exception("expected exception 'BadWheelFile()'") # In strict mode, VerifyingZipFile requires every read file to be # mentioned with set_expected_hash(). vzf.strict = True try: vzf.open("two").read() except wheel.install.BadWheelFile: pass else: raise Exception("expected exception 'BadWheelFile()'") vzf.set_expected_hash("two", None) vzf.open("two").read()
def open(self, name_or_info, mode="r", pwd=None): """Return file-like object for 'name'.""" # A non-monkey-patched version would contain most of zipfile.py ef = zipfile.ZipFile.open(self, name_or_info, mode, pwd) if isinstance(name_or_info, zipfile.ZipInfo): name = name_or_info.filename else: name = name_or_info if (name in self._expected_hashes and self._expected_hashes[name] != None): expected_hash = self._expected_hashes[name] try: _update_crc_orig = ef._update_crc except AttributeError: warnings.warn('Need ZipExtFile._update_crc to implement ' 'file hash verification (in Python >= 2.7)') return ef running_hash = self._hash_algorithm() if hasattr(ef, '_eof'): # py33 def _update_crc(data): _update_crc_orig(data) running_hash.update(data) if ef._eof and running_hash.digest() != expected_hash: raise BadWheelFile("Bad hash for file %r" % ef.name) else: def _update_crc(data, eof=None): _update_crc_orig(data, eof=eof) running_hash.update(data) if eof and running_hash.digest() != expected_hash: raise BadWheelFile("Bad hash for file %r" % ef.name) ef._update_crc = _update_crc elif self.strict and name not in self._expected_hashes: raise BadWheelFile("No expected hash for file %r" % ef.name) return ef