Python zipfile 模块,BadZipfile() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zipfile.BadZipfile()

项目:DCRM    作者:82Flex    | 项目源码 | 文件源码
def clean_zip_file(self):
        """Open the zip file a first time, to check that it is a valid zip archive.
        We'll open it again in a moment, so we have some duplication, but let's focus
        on keeping the code easier to read!
        """
        zip_file = self.cleaned_data['zip_file']
        try:
            zip = zipfile.ZipFile(zip_file)
        except BadZipFile as e:
            raise forms.ValidationError(str(e))
        bad_file = zip.testzip()
        if bad_file:
            zip.close()
            raise forms.ValidationError('"%s" in the .zip archive is corrupt.' % bad_file)
        zip.close()  # Close file in all cases.
        return zip_file
项目:Comictagger    作者:dickloraine    | 项目源码 | 文件源码
def readArchiveFile( self, archive_file ):
        data = ""
        zf = zipfile.ZipFile( self.path, 'r' )

        try:
            data = zf.read( archive_file )
        except zipfile.BadZipfile as e:
            print >> sys.stderr, u"bad zipfile [{0}]: {1} :: {2}".format(e, self.path, archive_file)
            zf.close()  
            raise IOError
        except Exception as e:
            zf.close()  
            print >> sys.stderr, u"bad zipfile [{0}]: {1} :: {2}".format(e, self.path, archive_file)
            raise IOError
        finally:
            zf.close()
        return data
项目:fexum    作者:KDD-OpenSource    | 项目源码 | 文件源码
def put(self, request):
        try:
            zip_file = request.FILES['file']
            archive = zipfile.ZipFile(zip_file)
        except (MultiValueDictKeyError, zipfile.BadZipfile):
            raise NotZIPFileError

        try:
            csv_name = [item for item in archive.namelist() if item.endswith('csv')][0]
        except IndexError:
            raise NoCSVInArchiveFoundError

        with archive.open(csv_name) as zip_csv_file:
            # Convert zipfile handle to Django file handle
            csv_file = File(zip_csv_file)
            dataset = Dataset.objects.create(
                name=zip_csv_file.name,
                content=csv_file,
                uploaded_by=request.user)

        # Start tasks for feature calculation
        initialize_from_dataset.delay(dataset_id=dataset.id)

        serializer = DatasetSerializer(instance=dataset)
        return Response(serializer.data)
项目:modman    作者:haihala    | 项目源码 | 文件源码
def __init__(self, manager, name, version=None):
        self.manager = manager
        self.name = name
        self.pseudo = (name == "base")
        self.required_version = version # None means newest
        self._releases = None
        self._installed_version = None
        self._exists = None

        if not self.pseudo and self.any_version_installed:
            try:
                with zipfile.ZipFile(mod_folder.file_path(self.name)) as zf:
                    info_json_candidates = [n for n in zf.namelist() if n.rsplit("/", 1)[1] == "info.json"]
                    assert info_json_candidates, "Not a mod file"
                    with zf.open(info_json_candidates[0]) as f:
                        data = json.loads(f.read().decode())
            except zipfile.BadZipfile:
                raise CorruptedZipFile(mod_folder.file_path(self.name))

            self.title = data["title"]
            self._installed_version = data["version"]
            self._exists = True
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def _crc32(self, path):
        if self._infodict:
            return self._infodict[path]

        try:
            archive = ZipFile(self.src)
        except BadZipfile:
            e = get_exception()
            if e.args[0].lower().startswith('bad magic number'):
                # Python2.4 can't handle zipfiles with > 64K files.  Try using
                # /usr/bin/unzip instead
                self._legacy_file_list()
            else:
                raise
        else:
            try:
                for item in archive.infolist():
                    self._infodict[item.filename] = long(item.CRC)
            except:
                archive.close()
                raise UnarchiveError('Unable to list files in the archive')

        return self._infodict[path]
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def check_read_with_bad_crc(self, compression):
        """Tests that files with bad CRCs raise a BadZipfile exception when read."""
        zipdata = self.zips_with_bad_crc[compression]

        # Using ZipFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            self.assertRaises(zipfile.BadZipfile, zipf.read, 'afile')

        # Using ZipExtFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                self.assertRaises(zipfile.BadZipfile, corrupt_file.read)

        # Same with small reads (in order to exercise the buffering logic)
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                corrupt_file.MIN_READ_SIZE = 2
                with self.assertRaises(zipfile.BadZipfile):
                    while corrupt_file.read(2):
                        pass
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def check_read_with_bad_crc(self, compression):
        """Tests that files with bad CRCs raise a BadZipfile exception when read."""
        zipdata = self.zips_with_bad_crc[compression]

        # Using ZipFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            self.assertRaises(zipfile.BadZipfile, zipf.read, 'afile')

        # Using ZipExtFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                self.assertRaises(zipfile.BadZipfile, corrupt_file.read)

        # Same with small reads (in order to exercise the buffering logic)
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                corrupt_file.MIN_READ_SIZE = 2
                with self.assertRaises(zipfile.BadZipfile):
                    while corrupt_file.read(2):
                        pass
项目:glazier    作者:google    | 项目源码 | 文件源码
def Run(self):
    try:
      zip_file = self._args[0]
      out_path = self._args[1]
    except IndexError:
      raise ActionError('Unable to determine desired paths from %s.' %
                        str(self._args))

    try:
      file_util.CreateDirectories(out_path)
    except file_util.Error:
      raise ActionError('Unable to create output path %s.' % out_path)

    try:
      zf = zipfile.ZipFile(zip_file)
      zf.extractall(out_path)
    except (IOError, zipfile.BadZipfile) as e:
      raise ActionError('Bad zip file given as input.  %s' % e)
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def verify(self, password=None):
        try:
            with zipfile.ZipFile(self.filename, 'r') as z:
                z.setpassword(password)
                badfile = z.testzip()
                if badfile is not None:
                    raise CRCError(badfile)

        except (zipfile.BadZipfile, zipfile.LargeZipFile), e:
            raise ArchiveError(e)

        except RuntimeError, e:
            if "encrypted" in e.args[0] or "Bad password" in e.args[0]:
                raise PasswordError(e)
            else:
                raise CRCError(e)
项目:cuckoodroid-2.0    作者:idanr1986    | 项目源码 | 文件源码
def _extract(self, filename, password):
        archive_path = _prepare_archive_at_path(filename)
        if not archive_path:
            return None
        # Extraction.
        extract_path = environ.get("TEMP", "/tmp")
        with ZipFile(archive_path, "r") as archive:
            try:
                archive.extractall(path=extract_path, pwd=password)
            except BadZipfile:
                raise Exception("Invalid Zip file")
            # Try to extract it again, but with a default password
            except RuntimeError:
                try:
                    archive.extractall(path=extract_path, pwd="infected")
                except RuntimeError as err:
                    raise Exception("Unable to extract Zip file: %s" % err)
            finally:
                self._extract_nested_archives(archive, extract_path, password)
        return archive.namelist()
项目:cuckoodroid-2.0    作者:idanr1986    | 项目源码 | 文件源码
def _prepare_archive_at_path(filename):
    """ Verifies that there's a readable zip archive at the given path.

    This function returns a new name for the archive (for most cases it's
    the same as the original one; but if an archive named "foo.zip" contains
    a file named "foo" this archive will be renamed to avoid being overwrite.
    """
    # Verify that the archive is actually readable
    try:
        with ZipFile(filename, "r") as archive:
            archive.close()
    except BadZipfile:
        return None
    # Test if zip file contains a file named as itself
    if _is_overwritten(filename):
        log.debug("ZIP file contains a file with the same name, original is \
        going to be overwrite")
        # In this case we just change the file name
        new_zip_path = filename + _random_extension()
        move(filename, new_zip_path)
        filename = new_zip_path
    return filename
项目:Electrify    作者:jyapayne    | 项目源码 | 文件源码
def extract_files(self):
        self.extract_error = None
        location = self.get_setting('download_dir').value
        version = self.selected_version()
        for setting_name, setting in self.settings['export_settings'].items():
            save_file_path = setting.save_file_path(version,
                                                    location)
            try:
                if setting.value:
                    extract_path = get_data_path('files/'+setting.name)
                    setting.extract(extract_path, version)

                    self.progress_text += '.'

            except (tarfile.ReadError, zipfile.BadZipfile) as e:
                if os.path.exists(save_file_path):
                    os.remove(save_file_path)
                self.extract_error = e
                self.logger.error(self.extract_error)
                # cannot use GUI in thread to notify user. Save it for later
        self.progress_text = '\nDone.\n'
        return True
项目:AnkiHub    作者:dayjaby    | 项目源码 | 文件源码
def installZipFile(data, fname):
    base = os.path.join(defaultBase(),'addons')
    if fname.endswith(".py"):
        path = os.path.join(base, fname)
        open(path, "wb").write(data)
        return True
    # .zip file
    try:
        z = zipfile.ZipFile(io.BytesIO(data))
    except zipfile.BadZipfile:
        return False
    for n in z.namelist():
        if n.endswith("/"):
            # folder; ignore
            continue
        # write
        z.extract(n, base)
    return True
项目:javapackages    作者:fedora-java    | 项目源码 | 文件源码
def _read_manifest(self):
        mf = None
        if self._path.endswith("/META-INF/MANIFEST.MF"):
            mf = open(self._path, "rb")
        if zipfile.is_zipfile(self._path):
            # looks like "zipfile.is_zipfile()" is not reliable
            # see rhbz#889131 for more details
            try:
                jarfile = ZipFile(self._path)
                if "META-INF/MANIFEST.MF" in jarfile.namelist():
                    mf = jarfile.open("META-INF/MANIFEST.MF", "r")
            except (IOError, BadZipfile):
                pass
        if mf is None:
            return None
        content = mf.read()
        mf.close()
        return content.decode("utf-8")
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def check_read_with_bad_crc(self, compression):
        """Tests that files with bad CRCs raise a BadZipfile exception when read."""
        zipdata = self.zips_with_bad_crc[compression]

        # Using ZipFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            self.assertRaises(zipfile.BadZipfile, zipf.read, 'afile')

        # Using ZipExtFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                self.assertRaises(zipfile.BadZipfile, corrupt_file.read)

        # Same with small reads (in order to exercise the buffering logic)
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                corrupt_file.MIN_READ_SIZE = 2
                with self.assertRaises(zipfile.BadZipfile):
                    while corrupt_file.read(2):
                        pass
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def check_read_with_bad_crc(self, compression):
        """Tests that files with bad CRCs raise a BadZipfile exception when read."""
        zipdata = self.zips_with_bad_crc[compression]

        # Using ZipFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            self.assertRaises(zipfile.BadZipfile, zipf.read, 'afile')

        # Using ZipExtFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                self.assertRaises(zipfile.BadZipfile, corrupt_file.read)

        # Same with small reads (in order to exercise the buffering logic)
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                corrupt_file.MIN_READ_SIZE = 2
                with self.assertRaises(zipfile.BadZipfile):
                    while corrupt_file.read(2):
                        pass
项目:algochecker-engine    作者:algochecker    | 项目源码 | 文件源码
def download_package_from_url(url, dest):
    logging.info('Attempting to download missing package from: ' + url)
    tmp_path = internal_path("work/dl_package.zip")

    try:
        req = requests.get(url, stream=True, timeout=10)

        if req.status_code == 200:
            with open(tmp_path, 'wb') as tmp_file:
                req.raw.decode_content = True
                shutil.copyfileobj(req.raw, tmp_file)
        else:
            raise PackageLoadError('Package download failed, server said: {} {}'.format(req.status_code, req.reason))
    except (RuntimeError, IOError) as e:
        raise PackageLoadError('Package download failed due to an error') from e

    logging.info('Extracting package...')
    try:
        with ZipFile(tmp_path, "r") as z:
            z.extractall(dest)
    except BadZipfile as e:
        raise PackageLoadError('Malformed package zip file') from e

    os.remove(tmp_path)
项目:grade-oven    作者:mikelmcdaniel    | 项目源码 | 文件源码
def from_zip(cls, file_obj, stages_name, stages_root):
    "Unpack zip from file_obj into os.path.join(stages_root, stages_name)."
    try:
      assignment_root = os.path.join(stages_root, stages_name)
      os.mkdir(assignment_root)
      with zipfile.ZipFile(file_obj, 'r') as zf:
        bad_filename = zf.testzip()
        if bad_filename is not None:
          raise Error('Corrupt file in zip: ' + bad_filename)
        # TODO: Handle case where zf.namelist() uses a lot of memory
        archived_files = zf.namelist()
        for af in archived_files:
          zf.extract(af, assignment_root)
        # TODO: The stage.save_main_script() code below is used as a workaround
        # to ensure that the main script is executable. Ideally, file
        # permissions would be preserved.
        stages = cls(assignment_root)
        for stage in stages.stages.itervalues():
          stage.save_main_script()
        return stages
    except (zipfile.BadZipfile, zipfile.LargeZipFile) as e:
      raise Error(e)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_invalidHeader(self):
        """
        A zipfile entry with the wrong magic number should raise BadZipfile for
        readfile(), but that should not affect other files in the archive.
        """
        fn = self.makeZipFile(["test contents",
                               "more contents"])
        with zipfile.ZipFile(fn, "r") as zf:
            zeroOffset = zf.getinfo("0").header_offset
        # Zero out just the one header.
        with open(fn, "r+b") as scribble:
            scribble.seek(zeroOffset, 0)
            scribble.write(b'0' * 4)
        with zipstream.ChunkingZipFile(fn) as czf:
            self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
            with czf.readfile("1") as zfe:
                self.assertEqual(zfe.read(), b"more contents")
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_filenameMismatch(self):
        """
        A zipfile entry with a different filename than is found in the central
        directory should raise BadZipfile.
        """
        fn = self.makeZipFile([b"test contents",
                               b"more contents"])
        with zipfile.ZipFile(fn, "r") as zf:
            info = zf.getinfo("0")
            info.filename = "not zero"
        with open(fn, "r+b") as scribble:
            scribble.seek(info.header_offset, 0)
            scribble.write(info.FileHeader())

        with zipstream.ChunkingZipFile(fn) as czf:
            self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
            with czf.readfile("1") as zfe:
                self.assertEqual(zfe.read(), b"more contents")
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_unsupportedCompression(self):
        """
        A zipfile which describes an unsupported compression mechanism should
        raise BadZipfile.
        """
        fn = self.mktemp()
        with zipfile.ZipFile(fn, "w") as zf:
            zi = zipfile.ZipInfo("0")
            zf.writestr(zi, "some data")
            # Mangle its compression type in the central directory; can't do
            # this before the writestr call or zipfile will (correctly) tell us
            # not to pass bad compression types :)
            zi.compress_type = 1234

        with zipstream.ChunkingZipFile(fn) as czf:
            self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
项目:pyload-plugins    作者:pyload    | 项目源码 | 文件源码
def verify(self, password=None):
        try:
            with zipfile.ZipFile(self.filename, 'r') as z:
                z.setpassword(password)
                badfile = z.testzip()
                if badfile is not None:
                    raise CRCError(badfile)

        except (zipfile.BadZipfile, zipfile.LargeZipFile), e:
            raise ArchiveError(e)

        except RuntimeError, e:
            if "encrypted" in e.args[0] or "Bad password" in e.args[0]:
                raise PasswordError(e)
            else:
                raise CRCError(e)
项目:pymilter    作者:sdgathman    | 项目源码 | 文件源码
def check_name(msg,savname=None,ckname=check_ext,scan_zip=False):
  "Replace attachment with a warning if its name is suspicious."
  try:
    for key,name in msg.getnames(scan_zip):
      badname = ckname(name)
      if badname:
        if key == 'zipname':
          badname = msg.get_filename()
        break
    else:
      return Milter.CONTINUE
  except zipfile.BadZipfile:
    # a ZIP that is not a zip is very suspicious
    badname = msg.get_filename()
  hostname = socket.gethostname()
  msg.set_payload(virus_msg % (badname,hostname,savname))
  del msg["content-type"]
  del msg["content-disposition"]
  del msg["content-transfer-encoding"]
  name = "WARNING.TXT"
  msg["Content-Type"] = "text/plain; name="+name
  return Milter.CONTINUE
项目:Security-Research    作者:RhinoSecurityLabs    | 项目源码 | 文件源码
def parsensout(cfdbpath):
    """Parse CrimeFlare's nsout.zip archive into a dictionary."""
    # Open nsout archive, parse out required data, store into a list
    try:
        znsout = zipfile.ZipFile('{}/nsout.zip'.format(cfdbpath))
        for finfo in znsout.infolist():
            ifile = znsout.open(finfo)
            for record in ifile.readlines():
                try:
                    NS1, NS2, DOMAIN = record.decode('utf-8').split()
                except Exception:
                    NSPLIT = record.decode('utf-8').split()
                    if len(NSPLIT) > 3:
                        DOMAIN = NSPLIT[-1]
                        NSPLIT.remove(DOMAIN)
                        NS1 = ' '.join(NSPLIT[:-2])
                        NS2 = ' '.join(NSPLIT[-2:])
                    pass
                _nsdict[DOMAIN] = '{} {}'.format(NS1, NS2)
    except(zipfile.BadZipfile):
        print("[-] Bad checksum on downloaded archive. Try to update again.")
        raise SystemExit
项目:Security-Research    作者:RhinoSecurityLabs    | 项目源码 | 文件源码
def parsecountry(cfdbpath):
    """Parse CrimeFlare's country.zip archive into a dictionary."""
    # Open country archive, parse out required data, store into a list
    try:
        zcountry = zipfile.ZipFile('{}/country.zip'.format(cfdbpath))
        for finfo in zcountry.infolist():
            ifile = zcountry.open(finfo)
            for record in ifile.readlines():
                try:
                    DOMAIN, IP, COUNTRY = record.decode('utf-8').split()
                except(ValueError):
                    COUNTRY = ' '.join(map(str, record.decode('utf-8').split()[2:]))
                _countrydict[DOMAIN] = '{}'.format(COUNTRY)
    except(zipfile.BadZipfile):
        print("[-] Bad checksum on downloaded archive. Try to update again.")
        raise SystemExit
项目:wp-file-analyser    作者:racwn    | 项目源码 | 文件源码
def unzip(zippedFile, outPath):
    """Extract all files from a zip archive to a destination directory."""
    newDir = False  # the toplevel directory name in the zipfile
    fh = open_file(zippedFile, 'rb')
    with fh:
        try:
            z = zipfile.ZipFile(fh)
            namelist = z.namelist()
            newDir = namelist[0]
            for name in namelist:
                z.extract(name, outPath)
        except RuntimeError as re:
            msg("Error processing zip (RuntimeError): %s" % (re), True)
            return False
        except IOError as ioe:
            msg("Error opening [%s]: %s" % (zippedFile, ioe.strerror), True)
            return False
        except zipfile.BadZipfile as bzf:
            msg("Bad zip file: %s" % zippedFile, True)
            return False
    return newDir
项目:pi_romulus    作者:ArthurMoore85    | 项目源码 | 文件源码
def unzip(self, file_obj):
        """
        Unzips a ZIP archive
        :param file_obj: file to be unzipped
        """
        try:
            zip_ref = zipfile.ZipFile(file_obj, 'r')
            zip_ref.extractall(self.target_dir)
            zip_ref.close()
        except zipfile.BadZipfile:
            self.clean_up(os.path.join(self.target_dir, file_obj))
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def handle_application_zip(self, headers, fileobj):
        self.log.info("Opening a ZIP attachment")
        fileobj = self._decode(headers, fileobj)
        try:
            zip = zipfile.ZipFile(fileobj)
        except zipfile.BadZipfile as error:
            self.log.error("ZIP handling failed ({0})".format(error))
            idiokit.stop(False)

        for filename in zip.namelist():
            csv_data = StringIO(zip.read(filename))

            self.log.info("Parsing CSV data from the ZIP attachment")
            result = yield self.parse_csv(headers, filename, csv_data)
            idiokit.stop(result)
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def handle_application_zip(self, msg):
        self.log.info("Opening a ZIP attachment")
        data = yield msg.get_payload(decode=True)
        try:
            zip = zipfile.ZipFile(StringIO(data))
        except zipfile.BadZipfile as error:
            self.log.error("ZIP handling failed ({0})".format(error))
            idiokit.stop(False)

        for filename in zip.namelist():
            csv_data = zip.open(filename)

            self.log.info("Parsing CSV data from the ZIP attachment")
            result = yield self.parse_csv(filename, csv_data)
            idiokit.stop(result)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def readfile(self, name):
        """Return file-like object for name."""
        if self.mode not in ("r", "a"):
            raise RuntimeError, 'read() requires mode "r" or "a"'
        if not self.fp:
            raise RuntimeError, \
                  "Attempt to read ZIP archive that was already closed"
        zinfo = self.getinfo(name)

        self.fp.seek(zinfo.header_offset, 0)

        # Skip the file header:
        fheader = self.fp.read(30)
        if fheader[0:4] != zipfile.stringFileHeader:
            raise zipfile.BadZipfile, "Bad magic number for file header"

        fheader = struct.unpack(zipfile.structFileHeader, fheader)
        fname = self.fp.read(fheader[zipfile._FH_FILENAME_LENGTH])
        if fheader[zipfile._FH_EXTRA_FIELD_LENGTH]:
            self.fp.read(fheader[zipfile._FH_EXTRA_FIELD_LENGTH])

        if fname != zinfo.orig_filename:
            raise zipfile.BadZipfile, \
                      'File name in directory "%s" and header "%s" differ.' % (
                          zinfo.orig_filename, fname)

        if zinfo.compress_type == zipfile.ZIP_STORED:
            return ZipFileEntry(self.fp, zinfo.compress_size)
        elif zinfo.compress_type == zipfile.ZIP_DEFLATED:
            if not zlib:
                raise RuntimeError, \
                      "De-compression requires the (missing) zlib module"
            return DeflatedZipFileEntry(self.fp, zinfo.compress_size)
        else:
            raise zipfile.BadZipfile, \
                  "Unsupported compression method %d for file %s" % \
            (zinfo.compress_type, name)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def read(self, name):
        """Return file bytes (as a string) for name."""
        f = self.readfile(name)
        zinfo = self.getinfo(name)
        bytes = f.read()
        crc = binascii.crc32(bytes)
        if crc != zinfo.CRC:
            raise zipfile.BadZipfile, "Bad CRC-32 for file %s" % name
        return bytes
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def unzip(f):
    if os.path.isfile(f):
        try:
            with zipfile.ZipFile(f) as zf:
                zf.extractall('.')
                return 'File {} extracted.'.format(f)
        except zipfile.BadZipfile:
            return 'Error: Failed to unzip file.'
    else:
        return 'Error: File not found.'
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def unzip(f):
    if os.path.isfile(f):
        try:
            with zipfile.ZipFile(f) as zf:
                zf.extractall('.')
                return '[*] File {} extracted.'.format(f)
        except zipfile.BadZipfile:
            return '[!] Error: Failed to unzip file.'
    else:
        return '[-] Error: File not found.'
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def unzip(f):
    if os.path.isfile(f):
        try:
            with zipfile.ZipFile(f) as zf:
                zf.extractall('.')
                return 'File {} extracted.'.format(f)
        except zipfile.BadZipfile:
            return 'Error: Failed to unzip file.'
    else:
        return 'Error: File not found.'
项目:jcchess    作者:johncheetham    | 项目源码 | 文件源码
def archive_context(filename):
    """
    Unzip filename to a temporary directory, set to the cwd.

    The unzipped target is cleaned up after.
    """
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        try:
            with ContextualZipFile(filename) as archive:
                archive.extractall()
        except zipfile.BadZipfile as err:
            if not err.args:
                err.args = ('', )
            err.args = err.args + (
                MEANINGFUL_INVALID_ZIP_ERR_MSG.format(filename),
            )
            raise

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
项目:whoseline    作者:bmorris3    | 项目源码 | 文件源码
def archive_context(filename):
    """
    Unzip filename to a temporary directory, set to the cwd.

    The unzipped target is cleaned up after.
    """
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        try:
            with ContextualZipFile(filename) as archive:
                archive.extractall()
        except zipfile.BadZipfile as err:
            if not err.args:
                err.args = ('', )
            err.args = err.args + (
                MEANINGFUL_INVALID_ZIP_ERR_MSG.format(filename),
            )
            raise

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
项目:dust_extinction    作者:karllark    | 项目源码 | 文件源码
def archive_context(filename):
    """
    Unzip filename to a temporary directory, set to the cwd.

    The unzipped target is cleaned up after.
    """
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        try:
            with ContextualZipFile(filename) as archive:
                archive.extractall()
        except zipfile.BadZipfile as err:
            if not err.args:
                err.args = ('', )
            err.args = err.args + (
                MEANINGFUL_INVALID_ZIP_ERR_MSG.format(filename),
            )
            raise

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
项目:thejoker    作者:adrn    | 项目源码 | 文件源码
def archive_context(filename):
    """
    Unzip filename to a temporary directory, set to the cwd.

    The unzipped target is cleaned up after.
    """
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        try:
            with ContextualZipFile(filename) as archive:
                archive.extractall()
        except zipfile.BadZipfile as err:
            if not err.args:
                err.args = ('', )
            err.args = err.args + (
                MEANINGFUL_INVALID_ZIP_ERR_MSG.format(filename),
            )
            raise

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
项目:AshsSDK    作者:thehappydinoa    | 项目源码 | 文件源码
def _should_contain_zip_content(value):
    if not isinstance(value, bytes):
        # If it's not bytes it's basically impossible for
        # this to be valid zip content, but we'll at least
        # still try to load the contents as a zip file
        # to be absolutely sure.
        value = value.encode('utf-8')
    fileobj = six.BytesIO(value)
    try:
        with closing(zipfile.ZipFile(fileobj)) as f:
            f.infolist()
    except zipfile.BadZipfile:
        raise ValueError(ERROR_MSG)
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def _extract_zip(self, file_obj, path):
            try:
                with zipfile.ZipFile(file_obj) as archive:
                    archive.extractall(path)
            except zipfile.BadZipfile:
                raise self.BadArchive
项目:huhamhire-hosts    作者:jiangsile    | 项目源码 | 文件源码
def __init__(self):
        """
        Initialize a new TUI session.

        * Load server list from a configuration file under working directory.
        * Try to load the hosts data file under working directory if it
          exists.

        .. note:: IF hosts data file does not exists correctly in current
            working directory, a warning message box would popup. And
            operations to change the hosts file on current system could be
            done only until a new data file has been downloaded.

        .. seealso:: :meth:`~tui.curses_d.CursesDaemon.session_daemon` method
            in :class:`~tui.curses_d.CursesDaemon`.

        .. seealso:: :meth:`~gui.hostsutil.HostsUtil.init_main` in
            :class:`~gui.hostsutil.HostsUtil` class.
        """
        super(HostsUtil, self).__init__()
        # Set mirrors
        self.settings[0][2] = CommonUtil.set_network("network.conf")
        # Read data file and set function list
        try:
            self.set_platform()
            RetrieveData.unpack()
            RetrieveData.connect_db()
            self.set_info()
            self.set_func_list()
        except IOError:
            self.messagebox("No data file found! Press F6 to get data file "
                            "first.", 1)
        except BadZipfile:
            self.messagebox("Incorrect Data file! Press F6 to get a new data "
                            "file first.", 1)
项目:huhamhire-hosts    作者:jiangsile    | 项目源码 | 文件源码
def refresh_info(self, refresh=0):
        """
        Reload the data file information and show them on the main dialog. The
        information here includes both metadata and hosts module info from the
        data file.

        :param refresh: A flag indicating whether the information on main
            dialog needs to be reloaded or not. The value could be `0` or `1`.

                =======  =============
                refresh  operation
                =======  =============
                0        Do NOT reload
                1        Reload
                =======  =============

        :type refresh: int
        """
        if refresh and RetrieveData.conn is not None:
            RetrieveData.clear()
        try:
            RetrieveData.unpack()
            RetrieveData.connect_db()
            self.set_func_list(refresh)
            self.refresh_func_list()
            self.set_info()
        except (BadZipfile, IOError, OSError):
            self.warning_incorrect_datafile()
项目:huhamhire-hosts    作者:jiangsile    | 项目源码 | 文件源码
def init_main(self):
        """
        Set up the elements on the main dialog. Check the environment of
        current operating system and current session.

        * Load server list from a configuration file under working directory.
        * Try to load the hosts data file under working directory if it
          exists.

        .. note:: IF hosts data file does not exists correctly in current
            working directory, a warning message box would popup. And
            operations to change the hosts file on current system could be
            done only until a new data file has been downloaded.

        .. seealso:: Method :meth:`~tui.hostsutil.HostsUtil.__init__` in
            :class:`~tui.hostsutil.HostsUtil` class.
        """
        self.ui.SelectMirror.clear()
        self.set_version()
        # Set mirrors
        self.mirrors = CommonUtil.set_network("network.conf")
        self.set_mirrors()
        # Read data file and set function list
        try:
            RetrieveData.unpack()
            RetrieveData.connect_db()
            self.set_func_list(1)
            self.refresh_func_list()
            self.set_info()
        except IOError:
            self.warning_no_datafile()
        except BadZipfile:
            self.warning_incorrect_datafile()
        # Check if current session have root privileges
        self.check_writable()
        self.init_flag += 1
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def files_in_archive(self, force_refresh=False):
        if self._files_in_archive and not force_refresh:
            return self._files_in_archive

        self._files_in_archive = []
        try:
            archive = ZipFile(self.src)
        except BadZipfile:
            e = get_exception()
            if e.args[0].lower().startswith('bad magic number'):
                # Python2.4 can't handle zipfiles with > 64K files.  Try using
                # /usr/bin/unzip instead
                self._legacy_file_list(force_refresh)
            else:
                raise
        else:
            try:
                for member in archive.namelist():
                    if member not in self.excludes:
                        self._files_in_archive.append(to_native(member))
            except:
                archive.close()
                raise UnarchiveError('Unable to list files in the archive')

            archive.close()
        return self._files_in_archive
项目:appcompatprocessor    作者:mbevilacqua    | 项目源码 | 文件源码
def processArchives(filename, file_filter):
    # Process zip file if required and return a list of files to process
    files_to_process = []

    if filename.endswith('.zip'):
        try:
            zip_archive_filename = filename
            # Open the zip archive:
            zip_archive = zipfile.ZipFile(zip_archive_filename, "r")
            zipFileList = zip_archive.namelist()
            zip_archive.close()
            countTotalFiles = len(zipFileList)
            logger.info("Total files in %s: %d" % (zip_archive_filename, countTotalFiles))
            logger.info("Hold on while we check the zipped files...")

            for zipped_filename in zipFileList:
                if re.match(file_filter, zipped_filename):
                    files_to_process.append(os.path.join(zip_archive_filename, zipped_filename))
            if len(files_to_process) == 0:
                logger.error("No valid files found!")
        except (IOError, zipfile.BadZipfile, struct.error), err:
            logger.error("Error reading zip archive: %s" % zip_archive_filename)
            exit(-1)
    else:
        files_to_process.append(filename)
    return files_to_process
项目:fiasco    作者:wtbarnes    | 项目源码 | 文件源码
def archive_context(filename):
    """
    Unzip filename to a temporary directory, set to the cwd.

    The unzipped target is cleaned up after.
    """
    tmpdir = tempfile.mkdtemp()
    log.warn('Extracting in %s', tmpdir)
    old_wd = os.getcwd()
    try:
        os.chdir(tmpdir)
        try:
            with ContextualZipFile(filename) as archive:
                archive.extractall()
        except zipfile.BadZipfile as err:
            if not err.args:
                err.args = ('', )
            err.args = err.args + (
                MEANINGFUL_INVALID_ZIP_ERR_MSG.format(filename),
            )
            raise

        # going in the directory
        subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
        os.chdir(subdir)
        log.warn('Now working in %s', subdir)
        yield

    finally:
        os.chdir(old_wd)
        shutil.rmtree(tmpdir)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_close_on_exception(self):
        """Check that the zipfile is closed if an exception is raised in the
        'with' block."""
        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
            for fpath, fdata in SMALL_TEST_DATA:
                zipfp.writestr(fpath, fdata)

        try:
            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
                raise zipfile.BadZipfile()
        except zipfile.BadZipfile:
            self.assertTrue(zipfp2.fp is None, 'zipfp is not closed')
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_close_erroneous_file(self):
        # This test checks that the ZipFile constructor closes the file object
        # it opens if there's an error in the file.  If it doesn't, the
        # traceback holds a reference to the ZipFile object and, indirectly,
        # the file object.
        # On Windows, this causes the os.unlink() call to fail because the
        # underlying file is still open.  This is SF bug #412214.
        #
        with open(TESTFN, "w") as fp:
            fp.write("this is not a legal zip file\n")
        try:
            zf = zipfile.ZipFile(TESTFN)
        except zipfile.BadZipfile:
            pass
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_damaged_zipfile(self):
        """Check that zipfiles with missing bytes at the end raise BadZipFile."""
        # - Create a valid zip file
        fp = io.BytesIO()
        with zipfile.ZipFile(fp, mode="w") as zipf:
            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
        zipfiledata = fp.getvalue()

        # - Now create copies of it missing the last N bytes and make sure
        #   a BadZipFile exception is raised when we try to open it
        for N in range(len(zipfiledata)):
            fp = io.BytesIO(zipfiledata[:N])
            self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, fp)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_empty_file_raises_BadZipFile(self):
        with open(TESTFN, 'w') as f:
            pass
        self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN)

        with open(TESTFN, 'w') as fp:
            fp.write("short file")
        self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN)