我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zipfile.BadZipfile()。
def clean_zip_file(self): """Open the zip file a first time, to check that it is a valid zip archive. We'll open it again in a moment, so we have some duplication, but let's focus on keeping the code easier to read! """ zip_file = self.cleaned_data['zip_file'] try: zip = zipfile.ZipFile(zip_file) except BadZipFile as e: raise forms.ValidationError(str(e)) bad_file = zip.testzip() if bad_file: zip.close() raise forms.ValidationError('"%s" in the .zip archive is corrupt.' % bad_file) zip.close() # Close file in all cases. return zip_file
def readArchiveFile( self, archive_file ): data = "" zf = zipfile.ZipFile( self.path, 'r' ) try: data = zf.read( archive_file ) except zipfile.BadZipfile as e: print >> sys.stderr, u"bad zipfile [{0}]: {1} :: {2}".format(e, self.path, archive_file) zf.close() raise IOError except Exception as e: zf.close() print >> sys.stderr, u"bad zipfile [{0}]: {1} :: {2}".format(e, self.path, archive_file) raise IOError finally: zf.close() return data
def put(self, request): try: zip_file = request.FILES['file'] archive = zipfile.ZipFile(zip_file) except (MultiValueDictKeyError, zipfile.BadZipfile): raise NotZIPFileError try: csv_name = [item for item in archive.namelist() if item.endswith('csv')][0] except IndexError: raise NoCSVInArchiveFoundError with archive.open(csv_name) as zip_csv_file: # Convert zipfile handle to Django file handle csv_file = File(zip_csv_file) dataset = Dataset.objects.create( name=zip_csv_file.name, content=csv_file, uploaded_by=request.user) # Start tasks for feature calculation initialize_from_dataset.delay(dataset_id=dataset.id) serializer = DatasetSerializer(instance=dataset) return Response(serializer.data)
def __init__(self, manager, name, version=None): self.manager = manager self.name = name self.pseudo = (name == "base") self.required_version = version # None means newest self._releases = None self._installed_version = None self._exists = None if not self.pseudo and self.any_version_installed: try: with zipfile.ZipFile(mod_folder.file_path(self.name)) as zf: info_json_candidates = [n for n in zf.namelist() if n.rsplit("/", 1)[1] == "info.json"] assert info_json_candidates, "Not a mod file" with zf.open(info_json_candidates[0]) as f: data = json.loads(f.read().decode()) except zipfile.BadZipfile: raise CorruptedZipFile(mod_folder.file_path(self.name)) self.title = data["title"] self._installed_version = data["version"] self._exists = True
def _crc32(self, path): if self._infodict: return self._infodict[path] try: archive = ZipFile(self.src) except BadZipfile: e = get_exception() if e.args[0].lower().startswith('bad magic number'): # Python2.4 can't handle zipfiles with > 64K files. Try using # /usr/bin/unzip instead self._legacy_file_list() else: raise else: try: for item in archive.infolist(): self._infodict[item.filename] = long(item.CRC) except: archive.close() raise UnarchiveError('Unable to list files in the archive') return self._infodict[path]
def check_read_with_bad_crc(self, compression): """Tests that files with bad CRCs raise a BadZipfile exception when read.""" zipdata = self.zips_with_bad_crc[compression] # Using ZipFile.read() with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: self.assertRaises(zipfile.BadZipfile, zipf.read, 'afile') # Using ZipExtFile.read() with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: with zipf.open('afile', 'r') as corrupt_file: self.assertRaises(zipfile.BadZipfile, corrupt_file.read) # Same with small reads (in order to exercise the buffering logic) with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: with zipf.open('afile', 'r') as corrupt_file: corrupt_file.MIN_READ_SIZE = 2 with self.assertRaises(zipfile.BadZipfile): while corrupt_file.read(2): pass
def Run(self): try: zip_file = self._args[0] out_path = self._args[1] except IndexError: raise ActionError('Unable to determine desired paths from %s.' % str(self._args)) try: file_util.CreateDirectories(out_path) except file_util.Error: raise ActionError('Unable to create output path %s.' % out_path) try: zf = zipfile.ZipFile(zip_file) zf.extractall(out_path) except (IOError, zipfile.BadZipfile) as e: raise ActionError('Bad zip file given as input. %s' % e)
def verify(self, password=None): try: with zipfile.ZipFile(self.filename, 'r') as z: z.setpassword(password) badfile = z.testzip() if badfile is not None: raise CRCError(badfile) except (zipfile.BadZipfile, zipfile.LargeZipFile), e: raise ArchiveError(e) except RuntimeError, e: if "encrypted" in e.args[0] or "Bad password" in e.args[0]: raise PasswordError(e) else: raise CRCError(e)
def _extract(self, filename, password): archive_path = _prepare_archive_at_path(filename) if not archive_path: return None # Extraction. extract_path = environ.get("TEMP", "/tmp") with ZipFile(archive_path, "r") as archive: try: archive.extractall(path=extract_path, pwd=password) except BadZipfile: raise Exception("Invalid Zip file") # Try to extract it again, but with a default password except RuntimeError: try: archive.extractall(path=extract_path, pwd="infected") except RuntimeError as err: raise Exception("Unable to extract Zip file: %s" % err) finally: self._extract_nested_archives(archive, extract_path, password) return archive.namelist()
def _prepare_archive_at_path(filename): """ Verifies that there's a readable zip archive at the given path. This function returns a new name for the archive (for most cases it's the same as the original one; but if an archive named "foo.zip" contains a file named "foo" this archive will be renamed to avoid being overwrite. """ # Verify that the archive is actually readable try: with ZipFile(filename, "r") as archive: archive.close() except BadZipfile: return None # Test if zip file contains a file named as itself if _is_overwritten(filename): log.debug("ZIP file contains a file with the same name, original is \ going to be overwrite") # In this case we just change the file name new_zip_path = filename + _random_extension() move(filename, new_zip_path) filename = new_zip_path return filename
def extract_files(self): self.extract_error = None location = self.get_setting('download_dir').value version = self.selected_version() for setting_name, setting in self.settings['export_settings'].items(): save_file_path = setting.save_file_path(version, location) try: if setting.value: extract_path = get_data_path('files/'+setting.name) setting.extract(extract_path, version) self.progress_text += '.' except (tarfile.ReadError, zipfile.BadZipfile) as e: if os.path.exists(save_file_path): os.remove(save_file_path) self.extract_error = e self.logger.error(self.extract_error) # cannot use GUI in thread to notify user. Save it for later self.progress_text = '\nDone.\n' return True
def installZipFile(data, fname): base = os.path.join(defaultBase(),'addons') if fname.endswith(".py"): path = os.path.join(base, fname) open(path, "wb").write(data) return True # .zip file try: z = zipfile.ZipFile(io.BytesIO(data)) except zipfile.BadZipfile: return False for n in z.namelist(): if n.endswith("/"): # folder; ignore continue # write z.extract(n, base) return True
def _read_manifest(self): mf = None if self._path.endswith("/META-INF/MANIFEST.MF"): mf = open(self._path, "rb") if zipfile.is_zipfile(self._path): # looks like "zipfile.is_zipfile()" is not reliable # see rhbz#889131 for more details try: jarfile = ZipFile(self._path) if "META-INF/MANIFEST.MF" in jarfile.namelist(): mf = jarfile.open("META-INF/MANIFEST.MF", "r") except (IOError, BadZipfile): pass if mf is None: return None content = mf.read() mf.close() return content.decode("utf-8")
def download_package_from_url(url, dest): logging.info('Attempting to download missing package from: ' + url) tmp_path = internal_path("work/dl_package.zip") try: req = requests.get(url, stream=True, timeout=10) if req.status_code == 200: with open(tmp_path, 'wb') as tmp_file: req.raw.decode_content = True shutil.copyfileobj(req.raw, tmp_file) else: raise PackageLoadError('Package download failed, server said: {} {}'.format(req.status_code, req.reason)) except (RuntimeError, IOError) as e: raise PackageLoadError('Package download failed due to an error') from e logging.info('Extracting package...') try: with ZipFile(tmp_path, "r") as z: z.extractall(dest) except BadZipfile as e: raise PackageLoadError('Malformed package zip file') from e os.remove(tmp_path)
def from_zip(cls, file_obj, stages_name, stages_root): "Unpack zip from file_obj into os.path.join(stages_root, stages_name)." try: assignment_root = os.path.join(stages_root, stages_name) os.mkdir(assignment_root) with zipfile.ZipFile(file_obj, 'r') as zf: bad_filename = zf.testzip() if bad_filename is not None: raise Error('Corrupt file in zip: ' + bad_filename) # TODO: Handle case where zf.namelist() uses a lot of memory archived_files = zf.namelist() for af in archived_files: zf.extract(af, assignment_root) # TODO: The stage.save_main_script() code below is used as a workaround # to ensure that the main script is executable. Ideally, file # permissions would be preserved. stages = cls(assignment_root) for stage in stages.stages.itervalues(): stage.save_main_script() return stages except (zipfile.BadZipfile, zipfile.LargeZipFile) as e: raise Error(e)
def test_invalidHeader(self): """ A zipfile entry with the wrong magic number should raise BadZipfile for readfile(), but that should not affect other files in the archive. """ fn = self.makeZipFile(["test contents", "more contents"]) with zipfile.ZipFile(fn, "r") as zf: zeroOffset = zf.getinfo("0").header_offset # Zero out just the one header. with open(fn, "r+b") as scribble: scribble.seek(zeroOffset, 0) scribble.write(b'0' * 4) with zipstream.ChunkingZipFile(fn) as czf: self.assertRaises(zipfile.BadZipfile, czf.readfile, "0") with czf.readfile("1") as zfe: self.assertEqual(zfe.read(), b"more contents")
def test_filenameMismatch(self): """ A zipfile entry with a different filename than is found in the central directory should raise BadZipfile. """ fn = self.makeZipFile([b"test contents", b"more contents"]) with zipfile.ZipFile(fn, "r") as zf: info = zf.getinfo("0") info.filename = "not zero" with open(fn, "r+b") as scribble: scribble.seek(info.header_offset, 0) scribble.write(info.FileHeader()) with zipstream.ChunkingZipFile(fn) as czf: self.assertRaises(zipfile.BadZipfile, czf.readfile, "0") with czf.readfile("1") as zfe: self.assertEqual(zfe.read(), b"more contents")
def test_unsupportedCompression(self): """ A zipfile which describes an unsupported compression mechanism should raise BadZipfile. """ fn = self.mktemp() with zipfile.ZipFile(fn, "w") as zf: zi = zipfile.ZipInfo("0") zf.writestr(zi, "some data") # Mangle its compression type in the central directory; can't do # this before the writestr call or zipfile will (correctly) tell us # not to pass bad compression types :) zi.compress_type = 1234 with zipstream.ChunkingZipFile(fn) as czf: self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
def check_name(msg,savname=None,ckname=check_ext,scan_zip=False): "Replace attachment with a warning if its name is suspicious." try: for key,name in msg.getnames(scan_zip): badname = ckname(name) if badname: if key == 'zipname': badname = msg.get_filename() break else: return Milter.CONTINUE except zipfile.BadZipfile: # a ZIP that is not a zip is very suspicious badname = msg.get_filename() hostname = socket.gethostname() msg.set_payload(virus_msg % (badname,hostname,savname)) del msg["content-type"] del msg["content-disposition"] del msg["content-transfer-encoding"] name = "WARNING.TXT" msg["Content-Type"] = "text/plain; name="+name return Milter.CONTINUE
def parsensout(cfdbpath): """Parse CrimeFlare's nsout.zip archive into a dictionary.""" # Open nsout archive, parse out required data, store into a list try: znsout = zipfile.ZipFile('{}/nsout.zip'.format(cfdbpath)) for finfo in znsout.infolist(): ifile = znsout.open(finfo) for record in ifile.readlines(): try: NS1, NS2, DOMAIN = record.decode('utf-8').split() except Exception: NSPLIT = record.decode('utf-8').split() if len(NSPLIT) > 3: DOMAIN = NSPLIT[-1] NSPLIT.remove(DOMAIN) NS1 = ' '.join(NSPLIT[:-2]) NS2 = ' '.join(NSPLIT[-2:]) pass _nsdict[DOMAIN] = '{} {}'.format(NS1, NS2) except(zipfile.BadZipfile): print("[-] Bad checksum on downloaded archive. Try to update again.") raise SystemExit
def parsecountry(cfdbpath): """Parse CrimeFlare's country.zip archive into a dictionary.""" # Open country archive, parse out required data, store into a list try: zcountry = zipfile.ZipFile('{}/country.zip'.format(cfdbpath)) for finfo in zcountry.infolist(): ifile = zcountry.open(finfo) for record in ifile.readlines(): try: DOMAIN, IP, COUNTRY = record.decode('utf-8').split() except(ValueError): COUNTRY = ' '.join(map(str, record.decode('utf-8').split()[2:])) _countrydict[DOMAIN] = '{}'.format(COUNTRY) except(zipfile.BadZipfile): print("[-] Bad checksum on downloaded archive. Try to update again.") raise SystemExit
def unzip(zippedFile, outPath): """Extract all files from a zip archive to a destination directory.""" newDir = False # the toplevel directory name in the zipfile fh = open_file(zippedFile, 'rb') with fh: try: z = zipfile.ZipFile(fh) namelist = z.namelist() newDir = namelist[0] for name in namelist: z.extract(name, outPath) except RuntimeError as re: msg("Error processing zip (RuntimeError): %s" % (re), True) return False except IOError as ioe: msg("Error opening [%s]: %s" % (zippedFile, ioe.strerror), True) return False except zipfile.BadZipfile as bzf: msg("Bad zip file: %s" % zippedFile, True) return False return newDir
def unzip(self, file_obj): """ Unzips a ZIP archive :param file_obj: file to be unzipped """ try: zip_ref = zipfile.ZipFile(file_obj, 'r') zip_ref.extractall(self.target_dir) zip_ref.close() except zipfile.BadZipfile: self.clean_up(os.path.join(self.target_dir, file_obj))
def handle_application_zip(self, headers, fileobj): self.log.info("Opening a ZIP attachment") fileobj = self._decode(headers, fileobj) try: zip = zipfile.ZipFile(fileobj) except zipfile.BadZipfile as error: self.log.error("ZIP handling failed ({0})".format(error)) idiokit.stop(False) for filename in zip.namelist(): csv_data = StringIO(zip.read(filename)) self.log.info("Parsing CSV data from the ZIP attachment") result = yield self.parse_csv(headers, filename, csv_data) idiokit.stop(result)
def handle_application_zip(self, msg): self.log.info("Opening a ZIP attachment") data = yield msg.get_payload(decode=True) try: zip = zipfile.ZipFile(StringIO(data)) except zipfile.BadZipfile as error: self.log.error("ZIP handling failed ({0})".format(error)) idiokit.stop(False) for filename in zip.namelist(): csv_data = zip.open(filename) self.log.info("Parsing CSV data from the ZIP attachment") result = yield self.parse_csv(filename, csv_data) idiokit.stop(result)
def readfile(self, name): """Return file-like object for name.""" if self.mode not in ("r", "a"): raise RuntimeError, 'read() requires mode "r" or "a"' if not self.fp: raise RuntimeError, \ "Attempt to read ZIP archive that was already closed" zinfo = self.getinfo(name) self.fp.seek(zinfo.header_offset, 0) # Skip the file header: fheader = self.fp.read(30) if fheader[0:4] != zipfile.stringFileHeader: raise zipfile.BadZipfile, "Bad magic number for file header" fheader = struct.unpack(zipfile.structFileHeader, fheader) fname = self.fp.read(fheader[zipfile._FH_FILENAME_LENGTH]) if fheader[zipfile._FH_EXTRA_FIELD_LENGTH]: self.fp.read(fheader[zipfile._FH_EXTRA_FIELD_LENGTH]) if fname != zinfo.orig_filename: raise zipfile.BadZipfile, \ 'File name in directory "%s" and header "%s" differ.' % ( zinfo.orig_filename, fname) if zinfo.compress_type == zipfile.ZIP_STORED: return ZipFileEntry(self.fp, zinfo.compress_size) elif zinfo.compress_type == zipfile.ZIP_DEFLATED: if not zlib: raise RuntimeError, \ "De-compression requires the (missing) zlib module" return DeflatedZipFileEntry(self.fp, zinfo.compress_size) else: raise zipfile.BadZipfile, \ "Unsupported compression method %d for file %s" % \ (zinfo.compress_type, name)
def read(self, name): """Return file bytes (as a string) for name.""" f = self.readfile(name) zinfo = self.getinfo(name) bytes = f.read() crc = binascii.crc32(bytes) if crc != zinfo.CRC: raise zipfile.BadZipfile, "Bad CRC-32 for file %s" % name return bytes
def unzip(f): if os.path.isfile(f): try: with zipfile.ZipFile(f) as zf: zf.extractall('.') return 'File {} extracted.'.format(f) except zipfile.BadZipfile: return 'Error: Failed to unzip file.' else: return 'Error: File not found.'
def unzip(f): if os.path.isfile(f): try: with zipfile.ZipFile(f) as zf: zf.extractall('.') return '[*] File {} extracted.'.format(f) except zipfile.BadZipfile: return '[!] Error: Failed to unzip file.' else: return '[-] Error: File not found.'
def archive_context(filename): """ Unzip filename to a temporary directory, set to the cwd. The unzipped target is cleaned up after. """ tmpdir = tempfile.mkdtemp() log.warn('Extracting in %s', tmpdir) old_wd = os.getcwd() try: os.chdir(tmpdir) try: with ContextualZipFile(filename) as archive: archive.extractall() except zipfile.BadZipfile as err: if not err.args: err.args = ('', ) err.args = err.args + ( MEANINGFUL_INVALID_ZIP_ERR_MSG.format(filename), ) raise # going in the directory subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0]) os.chdir(subdir) log.warn('Now working in %s', subdir) yield finally: os.chdir(old_wd) shutil.rmtree(tmpdir)
def _should_contain_zip_content(value): if not isinstance(value, bytes): # If it's not bytes it's basically impossible for # this to be valid zip content, but we'll at least # still try to load the contents as a zip file # to be absolutely sure. value = value.encode('utf-8') fileobj = six.BytesIO(value) try: with closing(zipfile.ZipFile(fileobj)) as f: f.infolist() except zipfile.BadZipfile: raise ValueError(ERROR_MSG)
def _extract_zip(self, file_obj, path): try: with zipfile.ZipFile(file_obj) as archive: archive.extractall(path) except zipfile.BadZipfile: raise self.BadArchive
def __init__(self): """ Initialize a new TUI session. * Load server list from a configuration file under working directory. * Try to load the hosts data file under working directory if it exists. .. note:: IF hosts data file does not exists correctly in current working directory, a warning message box would popup. And operations to change the hosts file on current system could be done only until a new data file has been downloaded. .. seealso:: :meth:`~tui.curses_d.CursesDaemon.session_daemon` method in :class:`~tui.curses_d.CursesDaemon`. .. seealso:: :meth:`~gui.hostsutil.HostsUtil.init_main` in :class:`~gui.hostsutil.HostsUtil` class. """ super(HostsUtil, self).__init__() # Set mirrors self.settings[0][2] = CommonUtil.set_network("network.conf") # Read data file and set function list try: self.set_platform() RetrieveData.unpack() RetrieveData.connect_db() self.set_info() self.set_func_list() except IOError: self.messagebox("No data file found! Press F6 to get data file " "first.", 1) except BadZipfile: self.messagebox("Incorrect Data file! Press F6 to get a new data " "file first.", 1)
def refresh_info(self, refresh=0): """ Reload the data file information and show them on the main dialog. The information here includes both metadata and hosts module info from the data file. :param refresh: A flag indicating whether the information on main dialog needs to be reloaded or not. The value could be `0` or `1`. ======= ============= refresh operation ======= ============= 0 Do NOT reload 1 Reload ======= ============= :type refresh: int """ if refresh and RetrieveData.conn is not None: RetrieveData.clear() try: RetrieveData.unpack() RetrieveData.connect_db() self.set_func_list(refresh) self.refresh_func_list() self.set_info() except (BadZipfile, IOError, OSError): self.warning_incorrect_datafile()
def init_main(self): """ Set up the elements on the main dialog. Check the environment of current operating system and current session. * Load server list from a configuration file under working directory. * Try to load the hosts data file under working directory if it exists. .. note:: IF hosts data file does not exists correctly in current working directory, a warning message box would popup. And operations to change the hosts file on current system could be done only until a new data file has been downloaded. .. seealso:: Method :meth:`~tui.hostsutil.HostsUtil.__init__` in :class:`~tui.hostsutil.HostsUtil` class. """ self.ui.SelectMirror.clear() self.set_version() # Set mirrors self.mirrors = CommonUtil.set_network("network.conf") self.set_mirrors() # Read data file and set function list try: RetrieveData.unpack() RetrieveData.connect_db() self.set_func_list(1) self.refresh_func_list() self.set_info() except IOError: self.warning_no_datafile() except BadZipfile: self.warning_incorrect_datafile() # Check if current session have root privileges self.check_writable() self.init_flag += 1
def files_in_archive(self, force_refresh=False): if self._files_in_archive and not force_refresh: return self._files_in_archive self._files_in_archive = [] try: archive = ZipFile(self.src) except BadZipfile: e = get_exception() if e.args[0].lower().startswith('bad magic number'): # Python2.4 can't handle zipfiles with > 64K files. Try using # /usr/bin/unzip instead self._legacy_file_list(force_refresh) else: raise else: try: for member in archive.namelist(): if member not in self.excludes: self._files_in_archive.append(to_native(member)) except: archive.close() raise UnarchiveError('Unable to list files in the archive') archive.close() return self._files_in_archive
def processArchives(filename, file_filter): # Process zip file if required and return a list of files to process files_to_process = [] if filename.endswith('.zip'): try: zip_archive_filename = filename # Open the zip archive: zip_archive = zipfile.ZipFile(zip_archive_filename, "r") zipFileList = zip_archive.namelist() zip_archive.close() countTotalFiles = len(zipFileList) logger.info("Total files in %s: %d" % (zip_archive_filename, countTotalFiles)) logger.info("Hold on while we check the zipped files...") for zipped_filename in zipFileList: if re.match(file_filter, zipped_filename): files_to_process.append(os.path.join(zip_archive_filename, zipped_filename)) if len(files_to_process) == 0: logger.error("No valid files found!") except (IOError, zipfile.BadZipfile, struct.error), err: logger.error("Error reading zip archive: %s" % zip_archive_filename) exit(-1) else: files_to_process.append(filename) return files_to_process
def test_close_on_exception(self): """Check that the zipfile is closed if an exception is raised in the 'with' block.""" with zipfile.ZipFile(TESTFN2, "w") as zipfp: for fpath, fdata in SMALL_TEST_DATA: zipfp.writestr(fpath, fdata) try: with zipfile.ZipFile(TESTFN2, "r") as zipfp2: raise zipfile.BadZipfile() except zipfile.BadZipfile: self.assertTrue(zipfp2.fp is None, 'zipfp is not closed')
def test_close_erroneous_file(self): # This test checks that the ZipFile constructor closes the file object # it opens if there's an error in the file. If it doesn't, the # traceback holds a reference to the ZipFile object and, indirectly, # the file object. # On Windows, this causes the os.unlink() call to fail because the # underlying file is still open. This is SF bug #412214. # with open(TESTFN, "w") as fp: fp.write("this is not a legal zip file\n") try: zf = zipfile.ZipFile(TESTFN) except zipfile.BadZipfile: pass
def test_damaged_zipfile(self): """Check that zipfiles with missing bytes at the end raise BadZipFile.""" # - Create a valid zip file fp = io.BytesIO() with zipfile.ZipFile(fp, mode="w") as zipf: zipf.writestr("foo.txt", b"O, for a Muse of Fire!") zipfiledata = fp.getvalue() # - Now create copies of it missing the last N bytes and make sure # a BadZipFile exception is raised when we try to open it for N in range(len(zipfiledata)): fp = io.BytesIO(zipfiledata[:N]) self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, fp)
def test_empty_file_raises_BadZipFile(self): with open(TESTFN, 'w') as f: pass self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN) with open(TESTFN, 'w') as fp: fp.write("short file") self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN)