Python zipfile 模块,BadZipFile() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zipfile.BadZipFile()

项目:puresec-cli    作者:puresec    | 项目源码 | 文件源码
def get_function_root(self, name):
        if not hasattr(self, 'functions_output'):
            self.functions_output = TemporaryDirectory("puresec-serverless-functions-")

        package_name = self._get_function_package_name(name)
        function_root = os.path.join(self.functions_output.name, package_name)
        if os.path.exists(function_root):
            return function_root

        try:
            zipfile = ZipFile(os.path.join(self.serverless_package, "{}.zip".format(package_name)), 'r')
        except FileNotFoundError:
            eprint("error: serverless package did not create a function zip for '{}'", name)
            raise SystemExit(2)
        except BadZipFile:
            eprint("error: serverless package did not create a valid function zip for '{}'", name)
            raise SystemExit(2)

        with zipfile:
            zipfile.extractall(function_root)
        return function_root
项目:DCRM    作者:82Flex    | 项目源码 | 文件源码
def clean_zip_file(self):
        """Open the zip file a first time, to check that it is a valid zip archive.
        We'll open it again in a moment, so we have some duplication, but let's focus
        on keeping the code easier to read!
        """
        zip_file = self.cleaned_data['zip_file']
        try:
            zip = zipfile.ZipFile(zip_file)
        except BadZipFile as e:
            raise forms.ValidationError(str(e))
        bad_file = zip.testzip()
        if bad_file:
            zip.close()
            raise forms.ValidationError('"%s" in the .zip archive is corrupt.' % bad_file)
        zip.close()  # Close file in all cases.
        return zip_file
项目:ava-website    作者:ava-project    | 项目源码 | 文件源码
def get_manifest(self, archive):
        try:
            with ZipFile(archive.temporary_file_path()) as plugin:
                print(plugin.namelist())
                prefix = self.get_prefix(plugin)
                prefix = prefix + '/' if len(prefix) else ''
                with plugin.open('{}manifest.json'.format(prefix)) as myfile:
                    manifest = json.loads(myfile.read())
                validate_manifest(manifest)
                return manifest
        except BadZipFile:
            raise ValidationError('Bad .zip format')
        except FileNotFoundError:
            raise ValidationError('Error with upload, please try again')
        except KeyError:
            raise ValidationError('No manifest.json found in archive')
        except json.JSONDecodeError:
            raise ValidationError('Error with manifest.json, bad Json Format')
        except avasdk.exceptions.ValidationError as e:
            raise ValidationError('Error in manifest.json ({})'.format(e))
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def get_zip_filesize(filepath: str) -> int:
    try:
        my_zip = zipfile.ZipFile(filepath, 'r')
    except zipfile.BadZipFile:
        return -1

    info_list = my_zip.infolist()
    total_size = 0

    for info in info_list:
        if not info.filename.lower().endswith(
            ('.jpeg', '.jpg', '.png', '.gif')
        ):
            continue
        if '__macosx' in info.filename.lower():
            continue
        total_size += int(info.file_size)

    my_zip.close()

    return total_size
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def get_zip_fileinfo(filepath: str) -> Tuple[int, int]:
    try:
        my_zip = zipfile.ZipFile(filepath, 'r')
    except zipfile.BadZipFile:
        return -1, -1

    total_size = 0
    total_count = 0

    for info in my_zip.infolist():
        if not info.filename.lower().endswith(
                ('.jpeg', '.jpg', '.png', '.gif')
        ):
            continue
        if '__macosx' in info.filename.lower():
            continue
        total_size += int(info.file_size)
        total_count += 1

    my_zip.close()

    return total_size, total_count
项目:digger-build-cli    作者:aerogear    | 项目源码 | 文件源码
def from_zip(cls, src='/tmp/app.zip', dest='/app'):
    """
    Unzips a zipped app project file and instantiates it.

    :param src: zipfile path
    :param dest: destination folder to extract the zipfile content

    Returns
      A project instance.
    """
    try:
      zf = zipfile.ZipFile(src, 'r')
    except FileNotFoundError:
      raise errors.InvalidPathError(src)
    except zipfile.BadZipFile:
      raise errors.InvalidZipFileError(src)
    [zf.extract(file, dest) for file in zf.namelist()]
    zf.close()
    return cls.from_path(dest)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def check_read_with_bad_crc(self, compression):
        """Tests that files with bad CRCs raise a BadZipFile exception when read."""
        zipdata = self.zips_with_bad_crc[compression]

        # Using ZipFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')

        # Using ZipExtFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                self.assertRaises(zipfile.BadZipFile, corrupt_file.read)

        # Same with small reads (in order to exercise the buffering logic)
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                corrupt_file.MIN_READ_SIZE = 2
                with self.assertRaises(zipfile.BadZipFile):
                    while corrupt_file.read(2):
                        pass
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_empty_zipfile(self):
        # Check that creating a file in 'w' or 'a' mode and closing without
        # adding any files to the archives creates a valid empty ZIP file
        zipf = zipfile.ZipFile(TESTFN, mode="w")
        zipf.close()
        try:
            zipf = zipfile.ZipFile(TESTFN, mode="r")
        except zipfile.BadZipFile:
            self.fail("Unable to create empty ZIP file in 'w' mode")

        zipf = zipfile.ZipFile(TESTFN, mode="a")
        zipf.close()
        try:
            zipf = zipfile.ZipFile(TESTFN, mode="r")
        except:
            self.fail("Unable to create empty ZIP file in 'a' mode")
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_empty_zipfile(self):
        # Check that creating a file in 'w' or 'a' mode and closing without
        # adding any files to the archives creates a valid empty ZIP file
        zipf = zipfile.ZipFile(TESTFN, mode="w")
        zipf.close()
        try:
            zipf = zipfile.ZipFile(TESTFN, mode="r")
        except zipfile.BadZipFile:
            self.fail("Unable to create empty ZIP file in 'w' mode")

        zipf = zipfile.ZipFile(TESTFN, mode="a")
        zipf.close()
        try:
            zipf = zipfile.ZipFile(TESTFN, mode="r")
        except:
            self.fail("Unable to create empty ZIP file in 'a' mode")
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_read_with_bad_crc(self):
        """Tests that files with bad CRCs raise a BadZipFile exception when read."""
        zipdata = self.zip_with_bad_crc

        # Using ZipFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')

        # Using ZipExtFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                self.assertRaises(zipfile.BadZipFile, corrupt_file.read)

        # Same with small reads (in order to exercise the buffering logic)
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                corrupt_file.MIN_READ_SIZE = 2
                with self.assertRaises(zipfile.BadZipFile):
                    while corrupt_file.read(2):
                        pass
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_empty_zipfile(self):
        # Check that creating a file in 'w' or 'a' mode and closing without
        # adding any files to the archives creates a valid empty ZIP file
        zipf = zipfile.ZipFile(TESTFN, mode="w")
        zipf.close()
        try:
            zipf = zipfile.ZipFile(TESTFN, mode="r")
        except zipfile.BadZipFile:
            self.fail("Unable to create empty ZIP file in 'w' mode")

        zipf = zipfile.ZipFile(TESTFN, mode="a")
        zipf.close()
        try:
            zipf = zipfile.ZipFile(TESTFN, mode="r")
        except:
            self.fail("Unable to create empty ZIP file in 'a' mode")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_read_with_bad_crc(self):
        """Tests that files with bad CRCs raise a BadZipFile exception when read."""
        zipdata = self.zip_with_bad_crc

        # Using ZipFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')

        # Using ZipExtFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                self.assertRaises(zipfile.BadZipFile, corrupt_file.read)

        # Same with small reads (in order to exercise the buffering logic)
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                corrupt_file.MIN_READ_SIZE = 2
                with self.assertRaises(zipfile.BadZipFile):
                    while corrupt_file.read(2):
                        pass
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_empty_zipfile(self):
        # Check that creating a file in 'w' or 'a' mode and closing without
        # adding any files to the archives creates a valid empty ZIP file
        zipf = zipfile.ZipFile(TESTFN, mode="w")
        zipf.close()
        try:
            zipf = zipfile.ZipFile(TESTFN, mode="r")
        except zipfile.BadZipFile:
            self.fail("Unable to create empty ZIP file in 'w' mode")

        zipf = zipfile.ZipFile(TESTFN, mode="a")
        zipf.close()
        try:
            zipf = zipfile.ZipFile(TESTFN, mode="r")
        except:
            self.fail("Unable to create empty ZIP file in 'a' mode")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_read_with_bad_crc(self):
        """Tests that files with bad CRCs raise a BadZipFile exception when read."""
        zipdata = self.zip_with_bad_crc

        # Using ZipFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')

        # Using ZipExtFile.read()
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                self.assertRaises(zipfile.BadZipFile, corrupt_file.read)

        # Same with small reads (in order to exercise the buffering logic)
        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
            with zipf.open('afile', 'r') as corrupt_file:
                corrupt_file.MIN_READ_SIZE = 2
                with self.assertRaises(zipfile.BadZipFile):
                    while corrupt_file.read(2):
                        pass
项目:guesslang    作者:yoeo    | 项目源码 | 文件源码
def _unzip(files, destination):
    size = len(files)
    destination.mkdir(exist_ok=True)
    LOGGER.debug("Unzip %d files into %s", size, destination)
    for pos, zipped_file in enumerate(files, 1):
        if pos % STEP == 0 or pos == size:
            LOGGER.debug("%.2f%%", 100 * pos / size)

        zipped_repo, filename = zipped_file.split('::')
        ext = Path(filename).suffix
        dest_path = destination.joinpath(str(uuid4()) + ext)

        try:
            with ZipFile(zipped_repo) as zip_file:
                with dest_path.open('wb') as dest_file:
                    dest_file.write(zip_file.read(filename))
        except BadZipFile as error:
            LOGGER.error("Malformed file %s, error: %s", zipped_repo, error)
            raise RuntimeError('Extraction failed for {}'.format(zipped_repo))
项目:Blib    作者:LucaRood    | 项目源码 | 文件源码
def get_file_type(f_path):
    """
    Get the Blib type of a file.

    Args:
        f_path (str): Path to the file to be checked.

    Returns:
        str or None
        A string containing the Blib type is returned,
        if no valid type is found, None is returned.
    """

    try:
        archive = zf.ZipFile(f_path, 'r')
    except zf.BadZipFile:
        return None

    try:
        blib_type = archive.comment.decode("utf-8").split(" ")[1]
    except ValueError:
        return None

    archive.close()
    return blib_type
项目:myterminal    作者:graktung    | 项目源码 | 文件源码
def zipGetFileSize(content):
    content = content.split()
    if len(content) > 1:
        try:
            zipFile = zipfile.ZipFile(content[1])
            infoOfFile = zipFile.getinfo(content[0])
            zipFile.close()
            return [str(infoOfFile.file_size) + ' KB']
        except FileNotFoundError:
            return ['1f401268File %r Not Found' %(content[1])]
        except zipfile.BadZipFile:
            return ['1f401268File %r is not a zip file' %(content[1])]
        except:
            return ['Something went wrong when trying to handel file %r' %(content[1])]
    else:
        return ['zip getfilesize item file_zip']
项目:myterminal    作者:graktung    | 项目源码 | 文件源码
def zipGetComSize(content):
    content = content.split()
    if len(content) > 1:
        try:
            zipFile = zipfile.ZipFile(content[1])
            infoOfFile = zipFile.getinfo(content[0])
            zipFile.close()
            return [str(infoOfFile.compress_size) + ' KB']
        except FileNotFoundError:
            return ['1f401268File %r Not Found' %(content[1])]
        except zipfile.BadZipFile:
            return ['1f401268File %r is not a zip file' %(content[1])]
        except KeyError:
            return ['1f401268There no item named %r in the archive' %(content[0])]
        except:
            return ['Something went wrong when trying to handel file %r' %(content[1])]
    else:
        return ['zip getfilesize item file_zip']
项目:myterminal    作者:graktung    | 项目源码 | 文件源码
def unzipAll(content):
    content = content.split()
    if len(content) == 2:
        extractTo = content[1]
    else:
        extractTo = 0
    try:
        zipFile = zipfile.ZipFile(content[0])
        if extractTo == 0:
            zipFile.extractall()
        else:
            zipFile.extractall(r'{}'.format(content[1]))
        zipFile.close()
        return []
    except FileNotFoundError:
        return ['1f401268File %r Not Found' %(content[0])]
    except zipfile.BadZipFile:
        return ['1f401268File %r is not a zip file' %(content[0])]
    except:
        return ['Something went wrong when trying to unzip file %r' %(content[0])]
项目:myterminal    作者:graktung    | 项目源码 | 文件源码
def unzip(content):
    content = content.split()
    if len(content) < 2:
        return unzipAll(''.join(content))
    else:
        if len(content) == 3:
            extractTo = content[-1]
        else:
            extractTo = 0
        try:
            zipFile = zipfile.ZipFile(content[1])
            if extractTo == 0:
                zipFile.extract(content[0])
            else:
                zipFile.extract(content[0], extractTo)
            zipFile.close()
            return []
        except FileNotFoundError:
            return ['1f401268File %r Not Found' %(content[1])]
        except zipfile.BadZipFile:
            return ['1f401268File %r is not a zip file' %(content[1])]
        except KeyError:
            return ['1f401268There no item named %r in the archive' %(content[0])]
        except:
            return ['Something went wrong when trying to unzip file %r' %(content[1])]
项目:SublimeTerm    作者:percevalw    | 项目源码 | 文件源码
def update_compressed_packages(pkgs):
    multi_importer.loaders = []
    for p in pkgs:
        try:
            multi_importer.loaders.append(ZipLoader(p))
        except (FileNotFoundError, zipfile.BadZipFile) as e:
            print("error loading " + p + ": " + str(e))
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def test_bad_zip_file(self):
        mis = Miz(BAD_ZIP_FILE)
        with pytest.raises(BadZipFile):
            mis.unzip()
项目:jd4    作者:vijos    | 项目源码 | 文件源码
def try_open_zip(file):
    try:
        zip_file = ZipFile(file)
    except BadZipFile:
        return None
    canonical_dict = dict((name.lower(), name)
                          for name in zip_file.namelist())
    def open(name):
        try:
            return zip_file.open(canonical_dict[name.lower()])
        except KeyError:
            raise FileNotFoundError(name) from None
    return open
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def filecount_in_zip(filepath: str) -> int:
    try:
        my_zip = zipfile.ZipFile(filepath, 'r')
    except zipfile.BadZipFile:
        return 0

    filtered_files = list(filter(discard_zipfile_contents, sorted(my_zip.namelist(), key=zfill_to_three)))

    my_zip.close()

    return len(filtered_files)
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def generate_image_set(self, force: bool=False) -> None:

        if not os.path.isfile(self.zipped.path):
            return
        image_set_present = bool(self.image_set.all())
        # large thumbnail and image set
        if not image_set_present or force:
            try:
                my_zip = zipfile.ZipFile(
                    self.zipped.path, 'r')
            except (zipfile.BadZipFile, NotImplementedError):
                return
            if my_zip.testzip():
                my_zip.close()
                return
            filtered_files = list(filter(discard_zipfile_contents, sorted(my_zip.namelist(), key=zfill_to_three)))

            for img in self.image_set.all():
                if img.extracted:
                    img.image.delete(save=False)
                    img.thumbnail.delete(save=False)
                img.delete()
            for count, filename in enumerate(filtered_files, start=1):
                image = Image(archive=self, archive_position=count, position=count)
                image.image = None
                image.save()

            my_zip.close()
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def generate_thumbnails(self) -> bool:

        if not os.path.exists(self.zipped.path):
            return False
        try:
            my_zip = zipfile.ZipFile(self.zipped.path, 'r')
        except (zipfile.BadZipFile, NotImplementedError):
            return False

        if my_zip.testzip():
            my_zip.close()
            return False

        self.thumbnail.delete(save=False)

        filtered_files = list(filter(discard_zipfile_contents, sorted(my_zip.namelist(), key=zfill_to_three)))

        if not filtered_files:
            my_zip.close()
            return False

        if self.image_set.all():
            first_file = filtered_files[self.image_set.all()[0].archive_position - 1]
        else:
            first_file = filtered_files[0]

        with my_zip.open(first_file) as current_img:

            im = PImage.open(current_img)
            if im.mode != 'RGB':
                im = im.convert('RGB')

            im.thumbnail((200, 290), PImage.ANTIALIAS)
            thumb_name = thumb_path_handler(self, "thumb2.jpg")
            os.makedirs(os.path.dirname(pjoin(settings.MEDIA_ROOT, thumb_name)), exist_ok=True)
            im.save(pjoin(settings.MEDIA_ROOT, thumb_name), "JPEG")
            self.thumbnail.name = thumb_name

        my_zip.close()
        super(Archive, self).save()
        return True
项目:q2cli    作者:qiime2    | 项目源码 | 文件源码
def extract(path, output_dir):
    import zipfile
    import qiime2.sdk

    try:
        extracted_dir = qiime2.sdk.Result.extract(path, output_dir)
    except (zipfile.BadZipFile, ValueError):
        raise click.BadParameter(
            '%s is not a valid QIIME 2 Result. Only QIIME 2 Artifacts and '
            'Visualizations can be extracted.' % path)
    else:
        click.echo('Extracted to %s' % extracted_dir)
项目:forget    作者:codl    | 项目源码 | 文件源码
def upload_tweet_archive():
    ta = TwitterArchive(
            account=g.viewer.account,
            body=request.files['file'].read())
    db.session.add(ta)
    db.session.commit()

    try:
        files = libforget.twitter.chunk_twitter_archive(ta.id)

        ta.chunks = len(files)
        db.session.commit()

        if not ta.chunks > 0:
            raise TweetArchiveEmptyException()

        for filename in files:
            tasks.import_twitter_archive_month.s(ta.id, filename).apply_async()

        return redirect(url_for('index', _anchor='recent_archives'))
    except (BadZipFile, TweetArchiveEmptyException):
        if sentry:
            sentry.captureException()
        return redirect(
                url_for('index', tweet_archive_failed='',
                        _anchor='tweet_archive_import'))
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def run(self):
        try:
            zipf = zipfile.ZipFile(variables["file"][0])

        except FileNotFoundError:
            self.pwdh.error = "zip file not found"
            return
        for word in self.words:
            if self.pwdh.pwd != None:
                return
            elif self.pwdh.error != None:
                return
            elif self.pwdh.kill == True:
                return
            try:
                word = word.decode("utf-8").replace("\n", "")
                if word[0] == "#":
                    continue
                #animline("trying password: "+word)
                zipf.extractall(variables["exto"][0], pwd=word.encode("utf-8"))
                self.pwdh.pwd = word
                return
            except RuntimeError:
                pass
            except zipfile.BadZipFile:
                pass
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_close_on_exception(self):
        """Check that the zipfile is closed if an exception is raised in the
        'with' block."""
        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
            for fpath, fdata in SMALL_TEST_DATA:
                zipfp.writestr(fpath, fdata)

        try:
            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
                raise zipfile.BadZipFile()
        except zipfile.BadZipFile:
            self.assertTrue(zipfp2.fp is None, 'zipfp is not closed')
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_close_erroneous_file(self):
        # This test checks that the ZipFile constructor closes the file object
        # it opens if there's an error in the file.  If it doesn't, the
        # traceback holds a reference to the ZipFile object and, indirectly,
        # the file object.
        # On Windows, this causes the os.unlink() call to fail because the
        # underlying file is still open.  This is SF bug #412214.
        #
        with open(TESTFN, "w") as fp:
            fp.write("this is not a legal zip file\n")
        try:
            zf = zipfile.ZipFile(TESTFN)
        except zipfile.BadZipFile:
            pass
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_empty_file_raises_BadZipFile(self):
        f = open(TESTFN, 'w')
        f.close()
        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)

        with open(TESTFN, 'w') as fp:
            fp.write("short file")
        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
项目:DLink_Harvester    作者:MikimotoH    | 项目源码 | 文件源码
def zip_namelist(f):
    try:
        return zipfile.ZipFile(f).namelist()
    except zipfile.BadZipFile:
        return []
项目:chrome-extensions-archive    作者:mdamien    | 项目源码 | 文件源码
def extract_manifest_of_file(crx_file):
    debug = False
    if os.path.isfile(crx_file):
        size = os.path.getsize(crx_file)
        if size == 0:
            print('###################!!!! fucking empty downloads', crx_file)
            return
        try:
            with ZipFile(crx_file) as myzip:
                if debug:
                    # """ #save file to debug later
                    source = myzip.open('manifest.json')
                    target = open('lol.json', "wb")
                    with source, target:
                        shutil.copyfileobj(source, target)
                    #"""
                with myzip.open('manifest.json') as myfile:
                    text = myfile.read()
                    #if debug: print(text)
                    try:
                        text = text.decode('utf-8-sig')
                    except:
                        text = text.decode('latin-1')
                    if debug: print(text)
                    try:
                        return json.loads(text, strict=False)
                    except Exception as e:
                        print(e)
                        text = uncomment(text)
                        return json.loads(text, strict=False)
        except BadZipFile:
            print('fucking interrupted downloads')
    else:
        print(crx_file,'does not exist')
    #TODO: add filesize
项目:territoriali-backend    作者:algorithm-ninja    | 项目源码 | 文件源码
def extract(self, filename:str, password:str):
        """
        POST /admin/extract
        """
        if os.path.exists(Config.contest_path):
            self.raise_exc(Forbidden, "CONTEST", "Contest already loaded")
        os.makedirs(Config.contest_path)
        wd = os.getcwd()
        z = os.path.abspath(os.path.join(Config.contest_zips, filename))
        os.chdir(Config.contest_path)
        try:
            with zipfile.ZipFile(z) as f:
                f.extractall(pwd=password.encode())
            Logger.info("CONTEST", "Contest extracted")
        except FileNotFoundError:
            BaseHandler.raise_exc(NotFound, "NOT_FOUND", "Archive %s not found" % z)
        except RuntimeError as ex:
            BaseHandler.raise_exc(Forbidden, "FAILED", str(ex))
        except PermissionError as ex:
            BaseHandler.raise_exc(Forbidden, "FAILED", str(ex))
        except zipfile.BadZipFile as ex:
            BaseHandler.raise_exc(Forbidden, "FAILED", str(ex))
        finally:
            os.chdir(wd)
        ContestManager.read_from_disk()
        return {}
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_close_on_exception(self):
        """Check that the zipfile is closed if an exception is raised in the
        'with' block."""
        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
            for fpath, fdata in SMALL_TEST_DATA:
                zipfp.writestr(fpath, fdata)

        try:
            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
                raise zipfile.BadZipFile()
        except zipfile.BadZipFile:
            self.assertIsNone(zipfp2.fp, 'zipfp is not closed')
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_close_erroneous_file(self):
        # This test checks that the ZipFile constructor closes the file object
        # it opens if there's an error in the file.  If it doesn't, the
        # traceback holds a reference to the ZipFile object and, indirectly,
        # the file object.
        # On Windows, this causes the os.unlink() call to fail because the
        # underlying file is still open.  This is SF bug #412214.
        #
        with open(TESTFN, "w") as fp:
            fp.write("this is not a legal zip file\n")
        try:
            zf = zipfile.ZipFile(TESTFN)
        except zipfile.BadZipFile:
            pass
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_damaged_zipfile(self):
        """Check that zipfiles with missing bytes at the end raise BadZipFile."""
        # - Create a valid zip file
        fp = io.BytesIO()
        with zipfile.ZipFile(fp, mode="w") as zipf:
            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
        zipfiledata = fp.getvalue()

        # - Now create copies of it missing the last N bytes and make sure
        #   a BadZipFile exception is raised when we try to open it
        for N in range(len(zipfiledata)):
            fp = io.BytesIO(zipfiledata[:N])
            self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_empty_file_raises_BadZipFile(self):
        f = open(TESTFN, 'w')
        f.close()
        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)

        with open(TESTFN, 'w') as fp:
            fp.write("short file")
        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
项目:windows-st-portable-x64    作者:zce    | 项目源码 | 文件源码
def update_compressed_packages(pkgs):
    multi_importer.loaders = []
    for p in pkgs:
        try:
            multi_importer.loaders.append(ZipLoader(p))
        except (FileNotFoundError, zipfile.BadZipFile) as e:
            print("error loading " + p + ": " + str(e))
项目:pencroft    作者:lukeyeager    | 项目源码 | 文件源码
def get(self, key):
        info = self._names_to_info[key]
        with self._lock:
            try:
                return self.file.read(info)
            except zipfile.BadZipFile:
                # Try re-opening the file
                self._open_file()
                return self.file.read(info)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_close_on_exception(self):
        """Check that the zipfile is closed if an exception is raised in the
        'with' block."""
        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
            for fpath, fdata in SMALL_TEST_DATA:
                zipfp.writestr(fpath, fdata)

        try:
            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
                raise zipfile.BadZipFile()
        except zipfile.BadZipFile:
            self.assertIsNone(zipfp2.fp, 'zipfp is not closed')
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_close_erroneous_file(self):
        # This test checks that the ZipFile constructor closes the file object
        # it opens if there's an error in the file.  If it doesn't, the
        # traceback holds a reference to the ZipFile object and, indirectly,
        # the file object.
        # On Windows, this causes the os.unlink() call to fail because the
        # underlying file is still open.  This is SF bug #412214.
        #
        with open(TESTFN, "w") as fp:
            fp.write("this is not a legal zip file\n")
        try:
            zf = zipfile.ZipFile(TESTFN)
        except zipfile.BadZipFile:
            pass
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_damaged_zipfile(self):
        """Check that zipfiles with missing bytes at the end raise BadZipFile."""
        # - Create a valid zip file
        fp = io.BytesIO()
        with zipfile.ZipFile(fp, mode="w") as zipf:
            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
        zipfiledata = fp.getvalue()

        # - Now create copies of it missing the last N bytes and make sure
        #   a BadZipFile exception is raised when we try to open it
        for N in range(len(zipfiledata)):
            fp = io.BytesIO(zipfiledata[:N])
            self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_empty_file_raises_BadZipFile(self):
        f = open(TESTFN, 'w')
        f.close()
        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)

        with open(TESTFN, 'w') as fp:
            fp.write("short file")
        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_close_on_exception(self):
        """Check that the zipfile is closed if an exception is raised in the
        'with' block."""
        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
            for fpath, fdata in SMALL_TEST_DATA:
                zipfp.writestr(fpath, fdata)

        try:
            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
                raise zipfile.BadZipFile()
        except zipfile.BadZipFile:
            self.assertIsNone(zipfp2.fp, 'zipfp is not closed')
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_close_erroneous_file(self):
        # This test checks that the ZipFile constructor closes the file object
        # it opens if there's an error in the file.  If it doesn't, the
        # traceback holds a reference to the ZipFile object and, indirectly,
        # the file object.
        # On Windows, this causes the os.unlink() call to fail because the
        # underlying file is still open.  This is SF bug #412214.
        #
        with open(TESTFN, "w") as fp:
            fp.write("this is not a legal zip file\n")
        try:
            zf = zipfile.ZipFile(TESTFN)
        except zipfile.BadZipFile:
            pass
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_damaged_zipfile(self):
        """Check that zipfiles with missing bytes at the end raise BadZipFile."""
        # - Create a valid zip file
        fp = io.BytesIO()
        with zipfile.ZipFile(fp, mode="w") as zipf:
            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
        zipfiledata = fp.getvalue()

        # - Now create copies of it missing the last N bytes and make sure
        #   a BadZipFile exception is raised when we try to open it
        for N in range(len(zipfiledata)):
            fp = io.BytesIO(zipfiledata[:N])
            self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_empty_file_raises_BadZipFile(self):
        f = open(TESTFN, 'w')
        f.close()
        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)

        with open(TESTFN, 'w') as fp:
            fp.write("short file")
        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
项目:guesslang    作者:yoeo    | 项目源码 | 文件源码
def _list_files(repos, languages, remove):
    repo_files = {lang: [] for lang in languages}
    exts = {ext: lang for lang, exts in languages.items() for ext in exts}

    for pos, zipped_repo in enumerate(repos, 1):
        current_repo_files = {lang: [] for lang in languages}
        size = len(repos)
        if pos % STEP == 0 or pos == size:
            LOGGER.debug("%.2f%%", 100 * pos / size)

        try:
            with ZipFile(zipped_repo) as zip_file:
                for filename in zip_file.namelist():
                    lang = exts.get(Path(filename).suffix.lstrip('.'))
                    if not lang or len(current_repo_files[lang]) >= MAX_FILES:
                        continue

                    current_repo_files[lang].append(
                        '{}::{}'.format(zipped_repo, filename))
        except BadZipFile as error:
            LOGGER.warning("Malformed file %s, error: %s", zipped_repo, error)
            if remove:
                Path(zipped_repo).unlink()
                LOGGER.debug("%s removed", zipped_repo)
            continue

        for lang, zipped_files in current_repo_files.items():
            repo_files[lang].extend(zipped_files)

    return repo_files