我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zipfile.BadZipFile()。
def get_function_root(self, name): if not hasattr(self, 'functions_output'): self.functions_output = TemporaryDirectory("puresec-serverless-functions-") package_name = self._get_function_package_name(name) function_root = os.path.join(self.functions_output.name, package_name) if os.path.exists(function_root): return function_root try: zipfile = ZipFile(os.path.join(self.serverless_package, "{}.zip".format(package_name)), 'r') except FileNotFoundError: eprint("error: serverless package did not create a function zip for '{}'", name) raise SystemExit(2) except BadZipFile: eprint("error: serverless package did not create a valid function zip for '{}'", name) raise SystemExit(2) with zipfile: zipfile.extractall(function_root) return function_root
def clean_zip_file(self): """Open the zip file a first time, to check that it is a valid zip archive. We'll open it again in a moment, so we have some duplication, but let's focus on keeping the code easier to read! """ zip_file = self.cleaned_data['zip_file'] try: zip = zipfile.ZipFile(zip_file) except BadZipFile as e: raise forms.ValidationError(str(e)) bad_file = zip.testzip() if bad_file: zip.close() raise forms.ValidationError('"%s" in the .zip archive is corrupt.' % bad_file) zip.close() # Close file in all cases. return zip_file
def get_manifest(self, archive): try: with ZipFile(archive.temporary_file_path()) as plugin: print(plugin.namelist()) prefix = self.get_prefix(plugin) prefix = prefix + '/' if len(prefix) else '' with plugin.open('{}manifest.json'.format(prefix)) as myfile: manifest = json.loads(myfile.read()) validate_manifest(manifest) return manifest except BadZipFile: raise ValidationError('Bad .zip format') except FileNotFoundError: raise ValidationError('Error with upload, please try again') except KeyError: raise ValidationError('No manifest.json found in archive') except json.JSONDecodeError: raise ValidationError('Error with manifest.json, bad Json Format') except avasdk.exceptions.ValidationError as e: raise ValidationError('Error in manifest.json ({})'.format(e))
def get_zip_filesize(filepath: str) -> int: try: my_zip = zipfile.ZipFile(filepath, 'r') except zipfile.BadZipFile: return -1 info_list = my_zip.infolist() total_size = 0 for info in info_list: if not info.filename.lower().endswith( ('.jpeg', '.jpg', '.png', '.gif') ): continue if '__macosx' in info.filename.lower(): continue total_size += int(info.file_size) my_zip.close() return total_size
def get_zip_fileinfo(filepath: str) -> Tuple[int, int]: try: my_zip = zipfile.ZipFile(filepath, 'r') except zipfile.BadZipFile: return -1, -1 total_size = 0 total_count = 0 for info in my_zip.infolist(): if not info.filename.lower().endswith( ('.jpeg', '.jpg', '.png', '.gif') ): continue if '__macosx' in info.filename.lower(): continue total_size += int(info.file_size) total_count += 1 my_zip.close() return total_size, total_count
def from_zip(cls, src='/tmp/app.zip', dest='/app'): """ Unzips a zipped app project file and instantiates it. :param src: zipfile path :param dest: destination folder to extract the zipfile content Returns A project instance. """ try: zf = zipfile.ZipFile(src, 'r') except FileNotFoundError: raise errors.InvalidPathError(src) except zipfile.BadZipFile: raise errors.InvalidZipFileError(src) [zf.extract(file, dest) for file in zf.namelist()] zf.close() return cls.from_path(dest)
def check_read_with_bad_crc(self, compression): """Tests that files with bad CRCs raise a BadZipFile exception when read.""" zipdata = self.zips_with_bad_crc[compression] # Using ZipFile.read() with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile') # Using ZipExtFile.read() with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: with zipf.open('afile', 'r') as corrupt_file: self.assertRaises(zipfile.BadZipFile, corrupt_file.read) # Same with small reads (in order to exercise the buffering logic) with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: with zipf.open('afile', 'r') as corrupt_file: corrupt_file.MIN_READ_SIZE = 2 with self.assertRaises(zipfile.BadZipFile): while corrupt_file.read(2): pass
def test_empty_zipfile(self): # Check that creating a file in 'w' or 'a' mode and closing without # adding any files to the archives creates a valid empty ZIP file zipf = zipfile.ZipFile(TESTFN, mode="w") zipf.close() try: zipf = zipfile.ZipFile(TESTFN, mode="r") except zipfile.BadZipFile: self.fail("Unable to create empty ZIP file in 'w' mode") zipf = zipfile.ZipFile(TESTFN, mode="a") zipf.close() try: zipf = zipfile.ZipFile(TESTFN, mode="r") except: self.fail("Unable to create empty ZIP file in 'a' mode")
def test_read_with_bad_crc(self): """Tests that files with bad CRCs raise a BadZipFile exception when read.""" zipdata = self.zip_with_bad_crc # Using ZipFile.read() with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile') # Using ZipExtFile.read() with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: with zipf.open('afile', 'r') as corrupt_file: self.assertRaises(zipfile.BadZipFile, corrupt_file.read) # Same with small reads (in order to exercise the buffering logic) with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: with zipf.open('afile', 'r') as corrupt_file: corrupt_file.MIN_READ_SIZE = 2 with self.assertRaises(zipfile.BadZipFile): while corrupt_file.read(2): pass
def _unzip(files, destination): size = len(files) destination.mkdir(exist_ok=True) LOGGER.debug("Unzip %d files into %s", size, destination) for pos, zipped_file in enumerate(files, 1): if pos % STEP == 0 or pos == size: LOGGER.debug("%.2f%%", 100 * pos / size) zipped_repo, filename = zipped_file.split('::') ext = Path(filename).suffix dest_path = destination.joinpath(str(uuid4()) + ext) try: with ZipFile(zipped_repo) as zip_file: with dest_path.open('wb') as dest_file: dest_file.write(zip_file.read(filename)) except BadZipFile as error: LOGGER.error("Malformed file %s, error: %s", zipped_repo, error) raise RuntimeError('Extraction failed for {}'.format(zipped_repo))
def get_file_type(f_path): """ Get the Blib type of a file. Args: f_path (str): Path to the file to be checked. Returns: str or None A string containing the Blib type is returned, if no valid type is found, None is returned. """ try: archive = zf.ZipFile(f_path, 'r') except zf.BadZipFile: return None try: blib_type = archive.comment.decode("utf-8").split(" ")[1] except ValueError: return None archive.close() return blib_type
def zipGetFileSize(content): content = content.split() if len(content) > 1: try: zipFile = zipfile.ZipFile(content[1]) infoOfFile = zipFile.getinfo(content[0]) zipFile.close() return [str(infoOfFile.file_size) + ' KB'] except FileNotFoundError: return ['1f401268File %r Not Found' %(content[1])] except zipfile.BadZipFile: return ['1f401268File %r is not a zip file' %(content[1])] except: return ['Something went wrong when trying to handel file %r' %(content[1])] else: return ['zip getfilesize item file_zip']
def zipGetComSize(content): content = content.split() if len(content) > 1: try: zipFile = zipfile.ZipFile(content[1]) infoOfFile = zipFile.getinfo(content[0]) zipFile.close() return [str(infoOfFile.compress_size) + ' KB'] except FileNotFoundError: return ['1f401268File %r Not Found' %(content[1])] except zipfile.BadZipFile: return ['1f401268File %r is not a zip file' %(content[1])] except KeyError: return ['1f401268There no item named %r in the archive' %(content[0])] except: return ['Something went wrong when trying to handel file %r' %(content[1])] else: return ['zip getfilesize item file_zip']
def unzipAll(content): content = content.split() if len(content) == 2: extractTo = content[1] else: extractTo = 0 try: zipFile = zipfile.ZipFile(content[0]) if extractTo == 0: zipFile.extractall() else: zipFile.extractall(r'{}'.format(content[1])) zipFile.close() return [] except FileNotFoundError: return ['1f401268File %r Not Found' %(content[0])] except zipfile.BadZipFile: return ['1f401268File %r is not a zip file' %(content[0])] except: return ['Something went wrong when trying to unzip file %r' %(content[0])]
def unzip(content): content = content.split() if len(content) < 2: return unzipAll(''.join(content)) else: if len(content) == 3: extractTo = content[-1] else: extractTo = 0 try: zipFile = zipfile.ZipFile(content[1]) if extractTo == 0: zipFile.extract(content[0]) else: zipFile.extract(content[0], extractTo) zipFile.close() return [] except FileNotFoundError: return ['1f401268File %r Not Found' %(content[1])] except zipfile.BadZipFile: return ['1f401268File %r is not a zip file' %(content[1])] except KeyError: return ['1f401268There no item named %r in the archive' %(content[0])] except: return ['Something went wrong when trying to unzip file %r' %(content[1])]
def update_compressed_packages(pkgs): multi_importer.loaders = [] for p in pkgs: try: multi_importer.loaders.append(ZipLoader(p)) except (FileNotFoundError, zipfile.BadZipFile) as e: print("error loading " + p + ": " + str(e))
def test_bad_zip_file(self): mis = Miz(BAD_ZIP_FILE) with pytest.raises(BadZipFile): mis.unzip()
def try_open_zip(file): try: zip_file = ZipFile(file) except BadZipFile: return None canonical_dict = dict((name.lower(), name) for name in zip_file.namelist()) def open(name): try: return zip_file.open(canonical_dict[name.lower()]) except KeyError: raise FileNotFoundError(name) from None return open
def filecount_in_zip(filepath: str) -> int: try: my_zip = zipfile.ZipFile(filepath, 'r') except zipfile.BadZipFile: return 0 filtered_files = list(filter(discard_zipfile_contents, sorted(my_zip.namelist(), key=zfill_to_three))) my_zip.close() return len(filtered_files)
def generate_image_set(self, force: bool=False) -> None: if not os.path.isfile(self.zipped.path): return image_set_present = bool(self.image_set.all()) # large thumbnail and image set if not image_set_present or force: try: my_zip = zipfile.ZipFile( self.zipped.path, 'r') except (zipfile.BadZipFile, NotImplementedError): return if my_zip.testzip(): my_zip.close() return filtered_files = list(filter(discard_zipfile_contents, sorted(my_zip.namelist(), key=zfill_to_three))) for img in self.image_set.all(): if img.extracted: img.image.delete(save=False) img.thumbnail.delete(save=False) img.delete() for count, filename in enumerate(filtered_files, start=1): image = Image(archive=self, archive_position=count, position=count) image.image = None image.save() my_zip.close()
def generate_thumbnails(self) -> bool: if not os.path.exists(self.zipped.path): return False try: my_zip = zipfile.ZipFile(self.zipped.path, 'r') except (zipfile.BadZipFile, NotImplementedError): return False if my_zip.testzip(): my_zip.close() return False self.thumbnail.delete(save=False) filtered_files = list(filter(discard_zipfile_contents, sorted(my_zip.namelist(), key=zfill_to_three))) if not filtered_files: my_zip.close() return False if self.image_set.all(): first_file = filtered_files[self.image_set.all()[0].archive_position - 1] else: first_file = filtered_files[0] with my_zip.open(first_file) as current_img: im = PImage.open(current_img) if im.mode != 'RGB': im = im.convert('RGB') im.thumbnail((200, 290), PImage.ANTIALIAS) thumb_name = thumb_path_handler(self, "thumb2.jpg") os.makedirs(os.path.dirname(pjoin(settings.MEDIA_ROOT, thumb_name)), exist_ok=True) im.save(pjoin(settings.MEDIA_ROOT, thumb_name), "JPEG") self.thumbnail.name = thumb_name my_zip.close() super(Archive, self).save() return True
def extract(path, output_dir): import zipfile import qiime2.sdk try: extracted_dir = qiime2.sdk.Result.extract(path, output_dir) except (zipfile.BadZipFile, ValueError): raise click.BadParameter( '%s is not a valid QIIME 2 Result. Only QIIME 2 Artifacts and ' 'Visualizations can be extracted.' % path) else: click.echo('Extracted to %s' % extracted_dir)
def upload_tweet_archive(): ta = TwitterArchive( account=g.viewer.account, body=request.files['file'].read()) db.session.add(ta) db.session.commit() try: files = libforget.twitter.chunk_twitter_archive(ta.id) ta.chunks = len(files) db.session.commit() if not ta.chunks > 0: raise TweetArchiveEmptyException() for filename in files: tasks.import_twitter_archive_month.s(ta.id, filename).apply_async() return redirect(url_for('index', _anchor='recent_archives')) except (BadZipFile, TweetArchiveEmptyException): if sentry: sentry.captureException() return redirect( url_for('index', tweet_archive_failed='', _anchor='tweet_archive_import'))
def run(self): try: zipf = zipfile.ZipFile(variables["file"][0]) except FileNotFoundError: self.pwdh.error = "zip file not found" return for word in self.words: if self.pwdh.pwd != None: return elif self.pwdh.error != None: return elif self.pwdh.kill == True: return try: word = word.decode("utf-8").replace("\n", "") if word[0] == "#": continue #animline("trying password: "+word) zipf.extractall(variables["exto"][0], pwd=word.encode("utf-8")) self.pwdh.pwd = word return except RuntimeError: pass except zipfile.BadZipFile: pass
def test_close_on_exception(self): """Check that the zipfile is closed if an exception is raised in the 'with' block.""" with zipfile.ZipFile(TESTFN2, "w") as zipfp: for fpath, fdata in SMALL_TEST_DATA: zipfp.writestr(fpath, fdata) try: with zipfile.ZipFile(TESTFN2, "r") as zipfp2: raise zipfile.BadZipFile() except zipfile.BadZipFile: self.assertTrue(zipfp2.fp is None, 'zipfp is not closed')
def test_close_erroneous_file(self): # This test checks that the ZipFile constructor closes the file object # it opens if there's an error in the file. If it doesn't, the # traceback holds a reference to the ZipFile object and, indirectly, # the file object. # On Windows, this causes the os.unlink() call to fail because the # underlying file is still open. This is SF bug #412214. # with open(TESTFN, "w") as fp: fp.write("this is not a legal zip file\n") try: zf = zipfile.ZipFile(TESTFN) except zipfile.BadZipFile: pass
def test_empty_file_raises_BadZipFile(self): f = open(TESTFN, 'w') f.close() self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) with open(TESTFN, 'w') as fp: fp.write("short file") self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
def zip_namelist(f): try: return zipfile.ZipFile(f).namelist() except zipfile.BadZipFile: return []
def extract_manifest_of_file(crx_file): debug = False if os.path.isfile(crx_file): size = os.path.getsize(crx_file) if size == 0: print('###################!!!! fucking empty downloads', crx_file) return try: with ZipFile(crx_file) as myzip: if debug: # """ #save file to debug later source = myzip.open('manifest.json') target = open('lol.json', "wb") with source, target: shutil.copyfileobj(source, target) #""" with myzip.open('manifest.json') as myfile: text = myfile.read() #if debug: print(text) try: text = text.decode('utf-8-sig') except: text = text.decode('latin-1') if debug: print(text) try: return json.loads(text, strict=False) except Exception as e: print(e) text = uncomment(text) return json.loads(text, strict=False) except BadZipFile: print('fucking interrupted downloads') else: print(crx_file,'does not exist') #TODO: add filesize
def extract(self, filename:str, password:str): """ POST /admin/extract """ if os.path.exists(Config.contest_path): self.raise_exc(Forbidden, "CONTEST", "Contest already loaded") os.makedirs(Config.contest_path) wd = os.getcwd() z = os.path.abspath(os.path.join(Config.contest_zips, filename)) os.chdir(Config.contest_path) try: with zipfile.ZipFile(z) as f: f.extractall(pwd=password.encode()) Logger.info("CONTEST", "Contest extracted") except FileNotFoundError: BaseHandler.raise_exc(NotFound, "NOT_FOUND", "Archive %s not found" % z) except RuntimeError as ex: BaseHandler.raise_exc(Forbidden, "FAILED", str(ex)) except PermissionError as ex: BaseHandler.raise_exc(Forbidden, "FAILED", str(ex)) except zipfile.BadZipFile as ex: BaseHandler.raise_exc(Forbidden, "FAILED", str(ex)) finally: os.chdir(wd) ContestManager.read_from_disk() return {}
def test_close_on_exception(self): """Check that the zipfile is closed if an exception is raised in the 'with' block.""" with zipfile.ZipFile(TESTFN2, "w") as zipfp: for fpath, fdata in SMALL_TEST_DATA: zipfp.writestr(fpath, fdata) try: with zipfile.ZipFile(TESTFN2, "r") as zipfp2: raise zipfile.BadZipFile() except zipfile.BadZipFile: self.assertIsNone(zipfp2.fp, 'zipfp is not closed')
def test_damaged_zipfile(self): """Check that zipfiles with missing bytes at the end raise BadZipFile.""" # - Create a valid zip file fp = io.BytesIO() with zipfile.ZipFile(fp, mode="w") as zipf: zipf.writestr("foo.txt", b"O, for a Muse of Fire!") zipfiledata = fp.getvalue() # - Now create copies of it missing the last N bytes and make sure # a BadZipFile exception is raised when we try to open it for N in range(len(zipfiledata)): fp = io.BytesIO(zipfiledata[:N]) self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp)
def get(self, key): info = self._names_to_info[key] with self._lock: try: return self.file.read(info) except zipfile.BadZipFile: # Try re-opening the file self._open_file() return self.file.read(info)
def _list_files(repos, languages, remove): repo_files = {lang: [] for lang in languages} exts = {ext: lang for lang, exts in languages.items() for ext in exts} for pos, zipped_repo in enumerate(repos, 1): current_repo_files = {lang: [] for lang in languages} size = len(repos) if pos % STEP == 0 or pos == size: LOGGER.debug("%.2f%%", 100 * pos / size) try: with ZipFile(zipped_repo) as zip_file: for filename in zip_file.namelist(): lang = exts.get(Path(filename).suffix.lstrip('.')) if not lang or len(current_repo_files[lang]) >= MAX_FILES: continue current_repo_files[lang].append( '{}::{}'.format(zipped_repo, filename)) except BadZipFile as error: LOGGER.warning("Malformed file %s, error: %s", zipped_repo, error) if remove: Path(zipped_repo).unlink() LOGGER.debug("%s removed", zipped_repo) continue for lang, zipped_files in current_repo_files.items(): repo_files[lang].extend(zipped_files) return repo_files