我们从Python开源项目中,提取了以下5个代码示例,用于说明如何使用shutil.ReadError()。
def test_unpack_archive(self): formats = ['tar', 'gztar', 'zip'] if BZ2_SUPPORTED: formats.append('bztar') for format in formats: tmpdir = self.mkdtemp() base_dir, root_dir, base_name = self._create_files() tmpdir2 = self.mkdtemp() filename = make_archive(base_name, format, root_dir, base_dir) # let's try to unpack it now unpack_archive(filename, tmpdir2) diff = self._compare_dirs(tmpdir, tmpdir2) self.assertEqual(diff, []) # and again, this time with the format specified tmpdir3 = self.mkdtemp() unpack_archive(filename, tmpdir3, format=format) diff = self._compare_dirs(tmpdir, tmpdir3) self.assertEqual(diff, []) self.assertRaises(shutil.ReadError, unpack_archive, TESTFN) self.assertRaises(ValueError, unpack_archive, TESTFN, format='xxx')
def test_unpack_archive(self): formats = ['tar', 'gztar', 'zip'] if BZ2_SUPPORTED: formats.append('bztar') root_dir, base_dir = self._create_files() expected = rlistdir(root_dir) expected.remove('outer') for format in formats: base_name = os.path.join(self.mkdtemp(), 'archive') filename = make_archive(base_name, format, root_dir, base_dir) # let's try to unpack it now tmpdir2 = self.mkdtemp() unpack_archive(filename, tmpdir2) self.assertEqual(rlistdir(tmpdir2), expected) # and again, this time with the format specified tmpdir3 = self.mkdtemp() unpack_archive(filename, tmpdir3, format=format) self.assertEqual(rlistdir(tmpdir3), expected) self.assertRaises(shutil.ReadError, unpack_archive, TESTFN) self.assertRaises(ValueError, unpack_archive, TESTFN, format='xxx')
def maybe_download_and_extract(data_root: str, url: str) -> None: """ Maybe download the specified file to ``data_root`` and try to unpack it with ``shutil.unpack_archive``. :param data_root: data root to download the files to :param url: url to download from """ # make sure data_root exists os.makedirs(data_root, exist_ok=True) filename = os.path.basename(url) # check whether the archive already exists filepath = os.path.join(data_root, filename) if os.path.exists(filepath): logging.info('\t`%s` already exists; skipping', filepath) return # download with progressbar logging.info('\tdownloading %s', filepath) req = requests.get(url, stream=True) expected_size = int(req.headers.get('content-length')) chunk_size = 1024 with open(filepath, 'wb') as f_out,\ click.progressbar(req.iter_content(chunk_size=chunk_size), length=expected_size/chunk_size) as bar: for chunk in bar: if chunk: f_out.write(chunk) f_out.flush() # extract try: shutil.unpack_archive(filepath, data_root) except (shutil.ReadError, ValueError): logging.info('File `%s` could not be extracted by `shutil.unpack_archive`. Please process it manually.', filepath)