Python tarfile 模块,PAX_FORMAT 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tarfile.PAX_FORMAT

项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def _test(self, name, link=None):
        # See GNUWriteTest.
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
        try:
            tar.addfile(tarinfo)
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            if link:
                l = tar.getmembers()[0].linkname
                self.assertTrue(link == l, "PAX longlink creation failed")
            else:
                n = tar.getmembers()[0].name
                self.assertTrue(name == n, "PAX longname creation failed")
        finally:
            tar.close()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_pax_extended_header(self):
        # The fields from the pax header have priority over the
        # TarInfo.
        pax_headers = {"path": "foo", "uid": "123"}

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1")
        try:
            t = tarfile.TarInfo()
            t.name = "\xe4\xf6\xfc" # non-ASCII
            t.uid = 8**8 # too large
            t.pax_headers = pax_headers
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            t = tar.getmembers()[0]
            self.assertEqual(t.pax_headers, pax_headers)
            self.assertEqual(t.name, "foo")
            self.assertEqual(t.uid, 123)
        finally:
            tar.close()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_unicode_filename_error(self):
        if self.format == tarfile.PAX_FORMAT:
            # PAX_FORMAT ignores encoding in write mode.
            return

        tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
        try:
            tarinfo = tarfile.TarInfo()

            tarinfo.name = "\xe4\xf6\xfc"
            self.assertRaises(UnicodeError, tar.addfile, tarinfo)

            tarinfo.name = "foo"
            tarinfo.uname = "\xe4\xf6\xfc"
            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
        finally:
            tar.close()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_uname_unicode(self):
        t = tarfile.TarInfo("foo")
        t.uname = "\xe4\xf6\xfc"
        t.gname = "\xe4\xf6\xfc"

        tar = tarfile.open(tmpname, mode="w", format=self.format, encoding="iso8859-1")
        try:
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            t = tar.getmember("foo")
            self.assertEqual(t.uname, "\xe4\xf6\xfc")
            self.assertEqual(t.gname, "\xe4\xf6\xfc")

            if self.format != tarfile.PAX_FORMAT:
                tar.close()
                tar = tarfile.open(tmpname, encoding="ascii")
                t = tar.getmember("foo")
                self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
                self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
        finally:
            tar.close()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def _test(self, name, link=None):
        # See GNUWriteTest.
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
        try:
            tar.addfile(tarinfo)
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            if link:
                l = tar.getmembers()[0].linkname
                self.assertTrue(link == l, "PAX longlink creation failed")
            else:
                n = tar.getmembers()[0].name
                self.assertTrue(name == n, "PAX longname creation failed")
        finally:
            tar.close()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_pax_extended_header(self):
        # The fields from the pax header have priority over the
        # TarInfo.
        pax_headers = {u"path": u"foo", u"uid": u"123"}

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1")
        try:
            t = tarfile.TarInfo()
            t.name = u"\xe4\xf6\xfc"     # non-ASCII
            t.uid = 8**8        # too large
            t.pax_headers = pax_headers
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            t = tar.getmembers()[0]
            self.assertEqual(t.pax_headers, pax_headers)
            self.assertEqual(t.name, "foo")
            self.assertEqual(t.uid, 123)
        finally:
            tar.close()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_unicode_filename_error(self):
        tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
        try:
            tarinfo = tarfile.TarInfo()

            tarinfo.name = "\xe4\xf6\xfc"
            if self.format == tarfile.PAX_FORMAT:
                self.assertRaises(UnicodeError, tar.addfile, tarinfo)
            else:
                tar.addfile(tarinfo)

            tarinfo.name = u"\xe4\xf6\xfc"
            self.assertRaises(UnicodeError, tar.addfile, tarinfo)

            tarinfo.name = "foo"
            tarinfo.uname = u"\xe4\xf6\xfc"
            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
        finally:
            tar.close()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def _test(self, name, link=None):
        # See GNUWriteTest.
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
        try:
            tar.addfile(tarinfo)
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            if link:
                l = tar.getmembers()[0].linkname
                self.assertTrue(link == l, "PAX longlink creation failed")
            else:
                n = tar.getmembers()[0].name
                self.assertTrue(name == n, "PAX longname creation failed")
        finally:
            tar.close()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_pax_extended_header(self):
        # The fields from the pax header have priority over the
        # TarInfo.
        pax_headers = {u"path": u"foo", u"uid": u"123"}

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1")
        try:
            t = tarfile.TarInfo()
            t.name = u"\xe4\xf6\xfc"     # non-ASCII
            t.uid = 8**8        # too large
            t.pax_headers = pax_headers
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            t = tar.getmembers()[0]
            self.assertEqual(t.pax_headers, pax_headers)
            self.assertEqual(t.name, "foo")
            self.assertEqual(t.uid, 123)
        finally:
            tar.close()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_unicode_filename_error(self):
        tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
        try:
            tarinfo = tarfile.TarInfo()

            tarinfo.name = "\xe4\xf6\xfc"
            if self.format == tarfile.PAX_FORMAT:
                self.assertRaises(UnicodeError, tar.addfile, tarinfo)
            else:
                tar.addfile(tarinfo)

            tarinfo.name = u"\xe4\xf6\xfc"
            self.assertRaises(UnicodeError, tar.addfile, tarinfo)

            tarinfo.name = "foo"
            tarinfo.uname = u"\xe4\xf6\xfc"
            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
        finally:
            tar.close()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def _test(self, name, link=None):
        # See GNUWriteTest.
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
        try:
            tar.addfile(tarinfo)
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            if link:
                l = tar.getmembers()[0].linkname
                self.assertEqual(link, l, "PAX longlink creation failed")
            else:
                n = tar.getmembers()[0].name
                self.assertEqual(name, n, "PAX longname creation failed")
        finally:
            tar.close()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_pax_extended_header(self):
        # The fields from the pax header have priority over the
        # TarInfo.
        pax_headers = {"path": "foo", "uid": "123"}

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
                           encoding="iso8859-1")
        try:
            t = tarfile.TarInfo()
            t.name = "\xe4\xf6\xfc" # non-ASCII
            t.uid = 8**8 # too large
            t.pax_headers = pax_headers
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            t = tar.getmembers()[0]
            self.assertEqual(t.pax_headers, pax_headers)
            self.assertEqual(t.name, "foo")
            self.assertEqual(t.uid, 123)
        finally:
            tar.close()
项目:FESetup    作者:halx    | 项目源码 | 文件源码
def extract(self, compression_type='*', direc='.'):
        """
        Extract contents of internal tar(pax) archive.

        :param compression_type: the compression type (*=transparent)
        :type compression_type: string
        :param direc: path to extract to
        :type direc: string
        """

        if not self.data:
            raise DataDictError('no data files')

        with tarfile.open(mode='r:%s' % compression_type,
                          format=tarfile.PAX_FORMAT,
                          fileobj=cStringIO.StringIO(self.data) ) as tar:
            tar.extractall(direc)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def _test(self, name, link=None):
        # See GNUWriteTest.
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
        tar.addfile(tarinfo)
        tar.close()

        tar = tarfile.open(tmpname)
        try:
            if link:
                l = tar.getmembers()[0].linkname
                self.assertTrue(link == l, "PAX longlink creation failed")
            else:
                n = tar.getmembers()[0].name
                self.assertTrue(name == n, "PAX longname creation failed")
        finally:
            tar.close()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_pax_extended_header(self):
        # The fields from the pax header have priority over the
        # TarInfo.
        pax_headers = {u"path": u"foo", u"uid": u"123"}

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
                           encoding="iso8859-1")
        try:
            t = tarfile.TarInfo()
            t.name = u"äöü"     # non-ASCII
            t.uid = 8**8        # too large
            t.pax_headers = pax_headers
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            t = tar.getmembers()[0]
            self.assertEqual(t.pax_headers, pax_headers)
            self.assertEqual(t.name, "foo")
            self.assertEqual(t.uid, 123)
        finally:
            tar.close()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_unicode_filename_error(self):
        tar = tarfile.open(tmpname, "w", format=self.format,
                           encoding="ascii", errors="strict")
        try:
            tarinfo = tarfile.TarInfo()

            tarinfo.name = "äöü"
            if self.format == tarfile.PAX_FORMAT:
                self.assertRaises(UnicodeError, tar.addfile, tarinfo)
            else:
                tar.addfile(tarinfo)

            tarinfo.name = u"äöü"
            self.assertRaises(UnicodeError, tar.addfile, tarinfo)

            tarinfo.name = "foo"
            tarinfo.uname = u"äöü"
            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
        finally:
            tar.close()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _test(self, name, link=None):
        # See GNUWriteTest.
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
        try:
            tar.addfile(tarinfo)
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            if link:
                l = tar.getmembers()[0].linkname
                self.assertEqual(link, l, "PAX longlink creation failed")
            else:
                n = tar.getmembers()[0].name
                self.assertEqual(name, n, "PAX longname creation failed")
        finally:
            tar.close()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_pax_extended_header(self):
        # The fields from the pax header have priority over the
        # TarInfo.
        pax_headers = {"path": "foo", "uid": "123"}

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
                           encoding="iso8859-1")
        try:
            t = tarfile.TarInfo()
            t.name = "\xe4\xf6\xfc" # non-ASCII
            t.uid = 8**8 # too large
            t.pax_headers = pax_headers
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            t = tar.getmembers()[0]
            self.assertEqual(t.pax_headers, pax_headers)
            self.assertEqual(t.name, "foo")
            self.assertEqual(t.uid, 123)
        finally:
            tar.close()
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def _test(self, name, link=None):
        # See GNUWriteTest.
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
        tar.addfile(tarinfo)
        tar.close()

        tar = tarfile.open(tmpname)
        if link:
            l = tar.getmembers()[0].linkname
            self.assertTrue(link == l, "PAX longlink creation failed")
        else:
            n = tar.getmembers()[0].name
            self.assertTrue(name == n, "PAX longname creation failed")
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_pax_extended_header(self):
        # The fields from the pax header have priority over the
        # TarInfo.
        pax_headers = {u"path": u"foo", u"uid": u"123"}

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1")
        t = tarfile.TarInfo()
        t.name = u""     # non-ASCII
        t.uid = 8**8        # too large
        t.pax_headers = pax_headers
        tar.addfile(t)
        tar.close()

        tar = tarfile.open(tmpname, encoding="iso8859-1")
        t = tar.getmembers()[0]
        self.assertEqual(t.pax_headers, pax_headers)
        self.assertEqual(t.name, "foo")
        self.assertEqual(t.uid, 123)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_unicode_filename_error(self):
        tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
        tarinfo = tarfile.TarInfo()

        tarinfo.name = ""
        if self.format == tarfile.PAX_FORMAT:
            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
        else:
            tar.addfile(tarinfo)

        tarinfo.name = u""
        self.assertRaises(UnicodeError, tar.addfile, tarinfo)

        tarinfo.name = "foo"
        tarinfo.uname = u""
        self.assertRaises(UnicodeError, tar.addfile, tarinfo)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _test(self, name, link=None):
        # See GNUWriteTest.
        tarinfo = tarfile.TarInfo(name)
        if link:
            tarinfo.linkname = link
            tarinfo.type = tarfile.LNKTYPE

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
        try:
            tar.addfile(tarinfo)
        finally:
            tar.close()

        tar = tarfile.open(tmpname)
        try:
            if link:
                l = tar.getmembers()[0].linkname
                self.assertEqual(link, l, "PAX longlink creation failed")
            else:
                n = tar.getmembers()[0].name
                self.assertEqual(name, n, "PAX longname creation failed")
        finally:
            tar.close()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_pax_extended_header(self):
        # The fields from the pax header have priority over the
        # TarInfo.
        pax_headers = {"path": "foo", "uid": "123"}

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
                           encoding="iso8859-1")
        try:
            t = tarfile.TarInfo()
            t.name = "\xe4\xf6\xfc" # non-ASCII
            t.uid = 8**8 # too large
            t.pax_headers = pax_headers
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            t = tar.getmembers()[0]
            self.assertEqual(t.pax_headers, pax_headers)
            self.assertEqual(t.name, "foo")
            self.assertEqual(t.uid, 123)
        finally:
            tar.close()
项目:anchore    作者:anchore    | 项目源码 | 文件源码
def save_files(self, imageId, namespace, rootfsdir, files):
        def tarfilter(member):
            subber = re.sub("^/*", "", rootfsdir)
            subber = re.sub("/*$", "", subber)
            finalstr = '/'.join(['imageroot', re.sub("^"+re.escape(subber)+"/*", "", member.name)])
            member.name = finalstr
            return(member)

        thedir = os.path.join(self.imagerootdir, imageId, "file_store", namespace)
        if not os.path.exists(thedir):
            os.makedirs(thedir)

        tar = tarfile.open('/'.join([thedir, 'stored_files.tar.gz']), mode='w:gz', format=tarfile.PAX_FORMAT)
        for thefile in files:
            if os.path.exists(thefile):
                print "INFO: storing file: " + str(thefile)
                tar.add(thefile, filter=tarfilter)
            else:
                print "WARN: could not find file ("+str(thefile)+") in image: skipping store"
        tar.close()
        return(True)
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def uploadPackage(self, buildId, audit, content, verbose):
        if not self.canUploadLocal():
            return

        shown = False
        try:
            with self._openUploadFile(buildId, ARTIFACT_SUFFIX) as (name, fileobj):
                pax = { 'bob-archive-vsn' : "1" }
                if verbose > 0:
                    print(colorize("   UPLOAD    {} to {} .. "
                                    .format(content, self._remoteName(buildId, ARTIFACT_SUFFIX)), "32"),
                          end="")
                else:
                    print(colorize("   UPLOAD    {} .. ".format(content), "32"), end="")
                shown = True
                with gzip.open(name or fileobj, 'wb', 6) as gzf:
                    with tarfile.open(name, "w", fileobj=gzf,
                                      format=tarfile.PAX_FORMAT, pax_headers=pax) as tar:
                        tar.add(audit, "meta/" + os.path.basename(audit))
                        tar.add(content, arcname="content")
            print(colorize("ok", "32"))
        except ArtifactExistsError:
            if shown:
                print("skipped ({} exists in archive)".format(content))
            else:
                print("   UPLOAD    skipped ({} exists in archive)".format(content))
        except (ArtifactUploadError, tarfile.TarError, OSError) as e:
            if shown:
                if verbose > 0:
                    print(colorize("error ("+str(e)+")", "31"))
                else:
                    print(colorize("error", "31"))
            if not self.__ignoreErrors:
                raise BuildError("Cannot upload artifact: " + str(e))
项目:bob    作者:BobBuildTool    | 项目源码 | 文件源码
def __createArtifactByName(self, name, version="1"):
        pax = { 'bob-archive-vsn' : version }
        with tarfile.open(name, "w|gz", format=tarfile.PAX_FORMAT, pax_headers=pax) as tar:
            with NamedTemporaryFile() as audit:
                audit.write(b'AUDIT')
                audit.flush()
                tar.add(audit.name, "meta/audit.json.gz")
            with TemporaryDirectory() as content:
                with open(os.path.join(content, "data"), "wb") as f:
                    f.write(b'DATA')
                tar.add(content, "content")

        return name
项目:abe-bootstrap    作者:TryCoin-Team    | 项目源码 | 文件源码
def handle_download(abe, page):
        name = abe.args.download_name
        if name is None:
            name = re.sub(r'\W+', '-', ABE_APPNAME.lower()) + '-' + ABE_VERSION
        fileobj = lambda: None
        fileobj.func_dict['write'] = page['start_response'](
            '200 OK',
            [('Content-type', 'application/x-gtar-compressed'),
             ('Content-disposition', 'filename=' + name + '.tar.gz')])
        import tarfile
        with tarfile.TarFile.open(fileobj=fileobj, mode='w|gz',
                                  format=tarfile.PAX_FORMAT) as tar:
            tar.add(os.path.split(__file__)[0], name)
        raise Streamed()
项目:abe-bootstrap    作者:TryCoin-Team    | 项目源码 | 文件源码
def handle_download(abe, page):
        name = abe.args.download_name
        if name is None:
            name = re.sub(r'\W+', '-', ABE_APPNAME.lower()) + '-' + ABE_VERSION
        fileobj = lambda: None
        fileobj.func_dict['write'] = page['start_response'](
            '200 OK',
            [('Content-type', 'application/x-gtar-compressed'),
             ('Content-disposition', 'filename=' + name + '.tar.gz')])
        import tarfile
        with tarfile.TarFile.open(fileobj=fileobj, mode='w|gz',
                                  format=tarfile.PAX_FORMAT) as tar:
            tar.add(os.path.split(__file__)[0], name)
        raise Streamed()
项目:build-calibre    作者:kovidgoyal    | 项目源码 | 文件源码
def create_tarfile(env, compression_level='9'):
    print('Creating archive...')
    base = os.path.join(SW, 'dist')
    try:
        shutil.rmtree(base)
    except EnvironmentError as err:
        if err.errno != errno.ENOENT:
            raise
    os.mkdir(base)
    dist = os.path.join(base, '%s-%s-%s.tar' % (calibre_constants['appname'], calibre_constants['version'], arch))
    with tarfile.open(dist, mode='w', format=tarfile.PAX_FORMAT) as tf:
        cwd = os.getcwd()
        os.chdir(env.base)
        try:
            for x in os.listdir('.'):
                tf.add(x)
        finally:
            os.chdir(cwd)
    print('Compressing archive...')
    ans = dist.rpartition('.')[0] + '.txz'
    start_time = time.time()
    subprocess.check_call(['xz', '--threads=0', '-f', '-' + compression_level, dist])
    secs = time.time() - start_time
    print('Compressed in %d minutes %d seconds' % (secs // 60, secs % 60))
    os.rename(dist + '.xz', ans)
    print('Archive %s created: %.2f MB' % (
        os.path.basename(ans), os.stat(ans).st_size / (1024.**2)))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_pax_global_header(self):
        pax_headers = {
                "foo": "bar",
                "uid": "0",
                "mtime": "1.23",
                "test": "\xe4\xf6\xfc",
                "\xe4\xf6\xfc": "test"}

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
                pax_headers=pax_headers)
        try:
            tar.addfile(tarfile.TarInfo("test"))
        finally:
            tar.close()

        # Test if the global header was written correctly.
        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            self.assertEqual(tar.pax_headers, pax_headers)
            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
            # Test if all the fields are strings.
            for key, val in tar.pax_headers.items():
                self.assertTrue(type(key) is not bytes)
                self.assertTrue(type(val) is not bytes)
                if key in tarfile.PAX_NUMBER_FIELDS:
                    try:
                        tarfile.PAX_NUMBER_FIELDS[key](val)
                    except (TypeError, ValueError):
                        self.fail("unable to convert pax header field")
        finally:
            tar.close()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_open_nonwritable_fileobj(self):
        for exctype in IOError, EOFError, RuntimeError:
            class BadFile(StringIO.StringIO):
                first = True
                def write(self, data):
                    if self.first:
                        self.first = False
                        raise exctype

            f = BadFile()
            with self.assertRaises(exctype):
                tar = tarfile.open(tmpname, self.mode, fileobj=f,
                                   format=tarfile.PAX_FORMAT,
                                   pax_headers={'non': 'empty'})
            self.assertFalse(f.closed)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_pax_global_header(self):
        pax_headers = {
                u"foo": u"bar",
                u"uid": u"0",
                u"mtime": u"1.23",
                u"test": u"\xe4\xf6\xfc",
                u"\xe4\xf6\xfc": u"test"}

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
                pax_headers=pax_headers)
        try:
            tar.addfile(tarfile.TarInfo("test"))
        finally:
            tar.close()

        # Test if the global header was written correctly.
        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            self.assertEqual(tar.pax_headers, pax_headers)
            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)

            # Test if all the fields are unicode.
            for key, val in tar.pax_headers.iteritems():
                self.assertTrue(type(key) is unicode)
                self.assertTrue(type(val) is unicode)
                if key in tarfile.PAX_NUMBER_FIELDS:
                    try:
                        tarfile.PAX_NUMBER_FIELDS[key](val)
                    except (TypeError, ValueError):
                        self.fail("unable to convert pax header field")
        finally:
            tar.close()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_open_nonwritable_fileobj(self):
        for exctype in IOError, EOFError, RuntimeError:
            class BadFile(StringIO.StringIO):
                first = True
                def write(self, data):
                    if self.first:
                        self.first = False
                        raise exctype

            f = BadFile()
            with self.assertRaises(exctype):
                tar = tarfile.open(tmpname, self.mode, fileobj=f,
                                   format=tarfile.PAX_FORMAT,
                                   pax_headers={'non': 'empty'})
            self.assertFalse(f.closed)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_pax_global_header(self):
        pax_headers = {
                u"foo": u"bar",
                u"uid": u"0",
                u"mtime": u"1.23",
                u"test": u"\xe4\xf6\xfc",
                u"\xe4\xf6\xfc": u"test"}

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
                pax_headers=pax_headers)
        try:
            tar.addfile(tarfile.TarInfo("test"))
        finally:
            tar.close()

        # Test if the global header was written correctly.
        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            self.assertEqual(tar.pax_headers, pax_headers)
            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)

            # Test if all the fields are unicode.
            for key, val in tar.pax_headers.iteritems():
                self.assertTrue(type(key) is unicode)
                self.assertTrue(type(val) is unicode)
                if key in tarfile.PAX_NUMBER_FIELDS:
                    try:
                        tarfile.PAX_NUMBER_FIELDS[key](val)
                    except (TypeError, ValueError):
                        self.fail("unable to convert pax header field")
        finally:
            tar.close()
项目:pep517    作者:pypa    | 项目源码 | 文件源码
def build_sdist(sdist_directory, config_settings):
    target = 'pkg2-0.5.tar.gz'
    with tarfile.open(pjoin(sdist_directory, target), 'w:gz',
                      format=tarfile.PAX_FORMAT) as tf:
        def _add(relpath):
            tf.add(relpath, arcname='pkg2-0.5/' + relpath)

        _add('pyproject.toml')
        for pyfile in glob('*.py'):
            _add(pyfile)
        for distinfo in glob('*.dist-info'):
            _add(distinfo)

    return target
项目:pep517    作者:pypa    | 项目源码 | 文件源码
def build_sdist(sdist_directory, config_settings):
    if config_settings.get('test_unsupported', False):
        raise UnsupportedOperation

    target = 'pkg1-0.5.tar.gz'
    with tarfile.open(pjoin(sdist_directory, target), 'w:gz',
                      format=tarfile.PAX_FORMAT) as tf:
        def _add(relpath):
            tf.add(relpath, arcname='pkg1-0.5/' + relpath)

        _add('pyproject.toml')
        for pyfile in glob('*.py'):
            _add(pyfile)

    return target
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_pax_global_header(self):
        pax_headers = {
                "foo": "bar",
                "uid": "0",
                "mtime": "1.23",
                "test": "\xe4\xf6\xfc",
                "\xe4\xf6\xfc": "test"}

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
                pax_headers=pax_headers)
        try:
            tar.addfile(tarfile.TarInfo("test"))
        finally:
            tar.close()

        # Test if the global header was written correctly.
        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            self.assertEqual(tar.pax_headers, pax_headers)
            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
            # Test if all the fields are strings.
            for key, val in tar.pax_headers.items():
                self.assertIsNot(type(key), bytes)
                self.assertIsNot(type(val), bytes)
                if key in tarfile.PAX_NUMBER_FIELDS:
                    try:
                        tarfile.PAX_NUMBER_FIELDS[key](val)
                    except (TypeError, ValueError):
                        self.fail("unable to convert pax header field")
        finally:
            tar.close()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_uname_unicode(self):
        t = tarfile.TarInfo("foo")
        t.uname = "\xe4\xf6\xfc"
        t.gname = "\xe4\xf6\xfc"

        tar = tarfile.open(tmpname, mode="w", format=self.format,
                           encoding="iso8859-1")
        try:
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            t = tar.getmember("foo")
            self.assertEqual(t.uname, "\xe4\xf6\xfc")
            self.assertEqual(t.gname, "\xe4\xf6\xfc")

            if self.format != tarfile.PAX_FORMAT:
                tar.close()
                tar = tarfile.open(tmpname, encoding="ascii")
                t = tar.getmember("foo")
                self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
                self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
        finally:
            tar.close()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_pax_limits(self):
        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
        tarinfo.tobuf(tarfile.PAX_FORMAT)

        tarinfo = tarfile.TarInfo("longlink")
        tarinfo.linkname = "123/" * 126 + "longname"
        tarinfo.tobuf(tarfile.PAX_FORMAT)

        tarinfo = tarfile.TarInfo("name")
        tarinfo.uid = 0o4000000000000000000
        tarinfo.tobuf(tarfile.PAX_FORMAT)
项目:FESetup    作者:halx    | 项目源码 | 文件源码
def list(self, compression_type='*'):
        """
        List contents of internal tar(pax) archive.

        :param compression_type: the compression type (*=transparent)
        :type compression_type: string
        """

        if not self.data:
            raise DataDictError('no data files')

        with tarfile.open(mode='r:%s' % compression_type,
                          format=tarfile.PAX_FORMAT,
                          fileobj=cStringIO.StringIO(self.data) ) as tar:
            tar.list()
项目:SigMF    作者:gnuradio    | 项目源码 | 文件源码
def fromarchive(archive_path, dir=None):
    """Extract an archive and return a SigMFFile.

    If `dir` is given, extract the archive to that directory. Otherwise,
    the archive will be extracted to a temporary directory. For example,
    `dir` == "." will extract the archive into the current working
    directory.

    """
    if not dir:
        dir = tempfile.mkdtemp()

    archive = tarfile.open(archive_path, mode="r", format=tarfile.PAX_FORMAT)
    members = archive.getmembers()

    try:
        archive.extractall(path=dir)

        data_file = None
        metadata = None

        for member in members:
            if member.name.endswith(SIGMF_DATASET_EXT):
                data_file = path.join(dir, member.name)
            elif member.name.endswith(SIGMF_METADATA_EXT):
                bytestream_reader = codecs.getreader("utf-8")  # bytes -> str
                mdfile_reader = bytestream_reader(archive.extractfile(member))
                metadata = json.load(mdfile_reader)
    finally:
        archive.close()

    return SigMFFile(metadata=metadata, data_file=data_file)
项目:SigMF    作者:gnuradio    | 项目源码 | 文件源码
def __init__(self, sigmffile, name=None, fileobj=None):
        self.sigmffile = sigmffile
        self.name = name
        self.fileobj = fileobj

        self._check_input()

        archive_name = self._get_archive_name()
        sigmf_fileobj = self._get_output_fileobj()
        sigmf_archive = tarfile.TarFile(mode="w", 
                                        fileobj=sigmf_fileobj,
                                        format=tarfile.PAX_FORMAT)
        tmpdir = tempfile.mkdtemp()
        sigmf_md_filename = archive_name + SIGMF_METADATA_EXT
        sigmf_md_path = os.path.join(tmpdir, sigmf_md_filename)
        sigmf_data_filename = archive_name + SIGMF_DATASET_EXT
        sigmf_data_path = os.path.join(tmpdir, sigmf_data_filename)

        with open(sigmf_md_path, "w") as mdfile:
            self.sigmffile.dump(mdfile, pretty=True)

        shutil.copy(self.sigmffile.data_file, sigmf_data_path)

        def chmod(tarinfo):
            if tarinfo.isdir():
                tarinfo.mode = 0o755  # dwrxw-rw-r
            else:
                tarinfo.mode = 0o644  # -wr-r--r--
            return tarinfo

        sigmf_archive.add(tmpdir, arcname=archive_name, filter=chmod)
        sigmf_archive.close()
        if not fileobj:
            sigmf_fileobj.close()

        shutil.rmtree(tmpdir)

        self.path = sigmf_archive.name
项目:SigMF    作者:gnuradio    | 项目源码 | 文件源码
def create_test_archive(test_sigmffile, tmpfile):
    sigmf_archive = test_sigmffile.archive(fileobj=tmpfile)
    sigmf_tarfile = tarfile.open(sigmf_archive, mode="r", format=tarfile.PAX_FORMAT)
    return sigmf_tarfile
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_open_nonwritable_fileobj(self):
        for exctype in IOError, EOFError, RuntimeError:
            class BadFile(StringIO.StringIO):
                first = True
                def write(self, data):
                    if self.first:
                        self.first = False
                        raise exctype

            f = BadFile()
            with self.assertRaises(exctype):
                tar = tarfile.open(tmpname, self.mode, fileobj=f,
                                   format=tarfile.PAX_FORMAT,
                                   pax_headers={'non': 'empty'})
            self.assertFalse(f.closed)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_pax_global_header(self):
        pax_headers = {
                u"foo": u"bar",
                u"uid": u"0",
                u"mtime": u"1.23",
                u"test": u"äöü",
                u"äöü": u"test"}

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
                pax_headers=pax_headers)
        tar.addfile(tarfile.TarInfo("test"))
        tar.close()

        # Test if the global header was written correctly.
        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            self.assertEqual(tar.pax_headers, pax_headers)
            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
            # Test if all the fields are unicode.
            for key, val in tar.pax_headers.iteritems():
                self.assertTrue(type(key) is unicode)
                self.assertTrue(type(val) is unicode)
                if key in tarfile.PAX_NUMBER_FIELDS:
                    try:
                        tarfile.PAX_NUMBER_FIELDS[key](val)
                    except (TypeError, ValueError):
                        self.fail("unable to convert pax header field")
        finally:
            tar.close()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_open_nonwritable_fileobj(self):
        for exctype in OSError, EOFError, RuntimeError:
            class BadFile(io.BytesIO):
                first = True
                def write(self, data):
                    if self.first:
                        self.first = False
                        raise exctype

            f = BadFile()
            with self.assertRaises(exctype):
                tar = tarfile.open(tmpname, self.mode, fileobj=f,
                                   format=tarfile.PAX_FORMAT,
                                   pax_headers={'non': 'empty'})
            self.assertFalse(f.closed)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_pax_global_header(self):
        pax_headers = {
                "foo": "bar",
                "uid": "0",
                "mtime": "1.23",
                "test": "\xe4\xf6\xfc",
                "\xe4\xf6\xfc": "test"}

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
                pax_headers=pax_headers)
        try:
            tar.addfile(tarfile.TarInfo("test"))
        finally:
            tar.close()

        # Test if the global header was written correctly.
        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            self.assertEqual(tar.pax_headers, pax_headers)
            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
            # Test if all the fields are strings.
            for key, val in tar.pax_headers.items():
                self.assertIsNot(type(key), bytes)
                self.assertIsNot(type(val), bytes)
                if key in tarfile.PAX_NUMBER_FIELDS:
                    try:
                        tarfile.PAX_NUMBER_FIELDS[key](val)
                    except (TypeError, ValueError):
                        self.fail("unable to convert pax header field")
        finally:
            tar.close()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_uname_unicode(self):
        t = tarfile.TarInfo("foo")
        t.uname = "\xe4\xf6\xfc"
        t.gname = "\xe4\xf6\xfc"

        tar = tarfile.open(tmpname, mode="w", format=self.format,
                           encoding="iso8859-1")
        try:
            tar.addfile(t)
        finally:
            tar.close()

        tar = tarfile.open(tmpname, encoding="iso8859-1")
        try:
            t = tar.getmember("foo")
            self.assertEqual(t.uname, "\xe4\xf6\xfc")
            self.assertEqual(t.gname, "\xe4\xf6\xfc")

            if self.format != tarfile.PAX_FORMAT:
                tar.close()
                tar = tarfile.open(tmpname, encoding="ascii")
                t = tar.getmember("foo")
                self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
                self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
        finally:
            tar.close()
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_pax_global_header(self):
        pax_headers = {
                u"foo": u"bar",
                u"uid": u"0",
                u"mtime": u"1.23",
                u"test": u"",
                u"": u"test"}

        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
                pax_headers=pax_headers)
        tar.addfile(tarfile.TarInfo("test"))
        tar.close()

        # Test if the global header was written correctly.
        tar = tarfile.open(tmpname, encoding="iso8859-1")
        self.assertEqual(tar.pax_headers, pax_headers)
        self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)

        # Test if all the fields are unicode.
        for key, val in tar.pax_headers.iteritems():
            self.assertTrue(type(key) is unicode)
            self.assertTrue(type(val) is unicode)
            if key in tarfile.PAX_NUMBER_FIELDS:
                try:
                    tarfile.PAX_NUMBER_FIELDS[key](val)
                except (TypeError, ValueError):
                    self.fail("unable to convert pax header field")
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_pax_limits(self):
        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
        tarinfo.tobuf(tarfile.PAX_FORMAT)

        tarinfo = tarfile.TarInfo("longlink")
        tarinfo.linkname = "123/" * 126 + "longname"
        tarinfo.tobuf(tarfile.PAX_FORMAT)

        tarinfo = tarfile.TarInfo("name")
        tarinfo.uid = 04000000000000000000L
        tarinfo.tobuf(tarfile.PAX_FORMAT)