我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用gzip.GzipFile()。
def get_tokens(self, text): if isinstance(text, text_type): # raw token stream never has any non-ASCII characters text = text.encode('ascii') if self.compress == 'gz': import gzip gzipfile = gzip.GzipFile('', 'rb', 9, BytesIO(text)) text = gzipfile.read() elif self.compress == 'bz2': import bz2 text = bz2.decompress(text) # do not call Lexer.get_tokens() because we do not want Unicode # decoding to occur, and stripping is not optional. text = text.strip(b'\n') + b'\n' for i, t, v in self.get_tokens_unprocessed(text): yield t, v
def encode_public_key(self): """ Based on spotnab, this is the gzipped version of the key with base64 applied to it. We encode it as such and return it. """ fileobj = StringIO() with GzipFile(fileobj=fileobj, mode="wb") as f: try: f.write(self.public_pem()) except TypeError: # It wasn't initialized yet return None return b64encode(fileobj.getvalue())
def getWebPage(url, headers, cookies, postData=None): try: if (postData): params = urllib.parse.urlencode(postData) params = params.encode('utf-8') request = urllib.request.Request(url, data=params, headers=headers) else: print('Fetching '+url) request = urllib.request.Request(url, None, headers) request.add_header('Cookie', cookies) if (postData): response = urllib.request.build_opener(urllib.request.HTTPCookieProcessor).open(request) else: response = urllib.request.urlopen(request) if response.info().get('Content-Encoding') == 'gzip': buf = BytesIO(response.read()) f = gzip.GzipFile(fileobj=buf) r = f.read() else: r = response.read() return r except Exception as e: print("Error processing webpage: "+str(e)) return None ## https://stackoverflow.com/questions/480214/how-do-you-remove-duplicates-from-a-list-in-python-whilst-preserving-order
def gzopen(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs): """Open gzip compressed tar archive name for reading or writing. Appending is not allowed. """ if len(mode) > 1 or mode not in "rw": raise ValueError("mode must be 'r' or 'w'") try: import gzip gzip.GzipFile except (ImportError, AttributeError): raise CompressionError("gzip module is not available") try: t = cls.taropen(name, mode, gzip.GzipFile(name, mode + "b", compresslevel, fileobj), **kwargs) except IOError: raise ReadError("not a gzip file") t._extfileobj = False return t
def gzip_encode(data): """data -> gzip encoded data Encode data using the gzip content encoding as described in RFC 1952 """ if not gzip: raise NotImplementedError f = StringIO.StringIO() gzf = gzip.GzipFile(mode="wb", fileobj=f, compresslevel=1) gzf.write(data) gzf.close() encoded = f.getvalue() f.close() return encoded ## # Decode a string using the gzip content encoding such as specified by the # Content-Encoding: gzip # in the HTTP header, as described in RFC 1952 # # @param data The encoded data # @return the unencoded data # @raises ValueError if data is not correctly coded.
def gzip_decode(data): """gzip encoded data -> unencoded data Decode data using the gzip content encoding as described in RFC 1952 """ if not gzip: raise NotImplementedError f = StringIO.StringIO(data) gzf = gzip.GzipFile(mode="rb", fileobj=f) try: decoded = gzf.read() except IOError: raise ValueError("invalid data") f.close() gzf.close() return decoded ## # Return a decoded file-like object for the gzip encoding # as described in RFC 1952. # # @param response A stream supporting a read() method # @return a file-like object that the decoded data can be read() from
def _get_result(response, limit=None): if limit == '0': result = response.read(224 * 1024) elif limit: result = response.read(int(limit) * 1024) else: result = response.read(5242880) try: encoding = response.info().getheader('Content-Encoding') except: encoding = None if encoding == 'gzip': result = gzip.GzipFile(fileobj=StringIO.StringIO(result)).read() return result
def bytes2zip(bytes): """ RETURN COMPRESSED BYTES """ if hasattr(bytes, "read"): buff = TemporaryFile() archive = gzip.GzipFile(fileobj=buff, mode='w') for b in bytes: archive.write(b) archive.close() buff.seek(0) from pyLibrary.env.big_data import FileString, safe_size return FileString(buff) buff = BytesIO() archive = gzip.GzipFile(fileobj=buff, mode='w') archive.write(bytes) archive.close() return buff.getvalue()
def _decompressContent(response, new_content): content = new_content try: encoding = response.get('content-encoding', None) if encoding in ['gzip', 'deflate']: if encoding == 'gzip': content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read() if encoding == 'deflate': content = zlib.decompress(content) response['content-length'] = str(len(content)) # Record the historical presence of the encoding in a way the won't interfere. response['-content-encoding'] = response['content-encoding'] del response['content-encoding'] except IOError: content = "" raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content) return content
def _extract_images(filename): """??????????????????? :param filename: ????? :return: 4??numpy??[index, y, x, depth]? ???np.float32 """ images = [] print('Extracting {}'.format(filename)) with gzip.GzipFile(fileobj=open(filename, 'rb')) as f: buf = f.read() index = 0 magic, num_images, rows, cols = struct.unpack_from('>IIII', buf, index) if magic != 2051: raise ValueError('Invalid magic number {} in MNIST image file: {}'.format(magic, filename)) index += struct.calcsize('>IIII') for i in range(num_images): img = struct.unpack_from('>784B', buf, index) index += struct.calcsize('>784B') img = np.array(img, dtype=np.float32) # ????[0,255]???[0,1] img = np.multiply(img, 1.0 / 255.0) img = img.reshape(rows, cols, 1) images.append(img) return np.array(images, dtype=np.float32)
def _extract_labels(filename, num_classes=10): """??????????????? :param filename: ????? :param num_classes: ??one-hot??????????10? :return: 2??numpy??[index, num_classes]? ???np.float32 """ labels = [] print('Extracting {}'.format(filename)) with gzip.GzipFile(fileobj=open(filename, 'rb')) as f: buf = f.read() index = 0 magic, num_labels = struct.unpack_from('>II', buf, index) if magic != 2049: raise ValueError('Invalid magic number {} in MNIST label file: {}'.format(magic, filename)) index += struct.calcsize('>II') for i in range(num_labels): label = struct.unpack_from('>B', buf, index) index += struct.calcsize('>B') label_one_hot = np.zeros(num_classes, dtype=np.float32) label_one_hot[label[0]] = 1 labels.append(label_one_hot) return np.array(labels, dtype=np.float32)
def update_sideshow_file(fname, server_fname, server=SIDESHOW_SERVER, temp_path=gettempdir()): """ Update the JPL side show file stored locally at *fname*. The remote file is accessed via FTP on *server* at *server_fname*. The path *temp_path* is used to store intermediate files. Return *fname*. """ dest_fname = replace_path(temp_path, server_fname) logger.info('opening connection to {}'.format(server)) with closing(FTP(server)) as ftp, open(dest_fname, 'w') as fid: logger.info('logging in') ftp.login() logger.info('writing to {}'.format(dest_fname)) ftp.retrbinary('RETR ' + server_fname, fid.write) logger.info('uncompressing file to {}'.format(fname)) with GzipFile(dest_fname) as gzip_fid, open(fname, 'w') as fid: fid.write(gzip_fid.read()) return fname
def parse_file(self): if self.path.endswith("gz"): f = gzip.GzipFile(self.path) else: f = open(self.path) tree = ET.parse(f) f.close() root = tree.getroot() current_urbs = {} for child in root: p = USBPacket(child) urb_id = p["usb.urb_id"] urb_status = p["usb.urb_status"] urb_type = p["usb.urb_type"] if (urb_type == URB_TYPE_SUBMIT): current_urbs[urb_id] = p if (urb_type == URB_TYPE_COMPLETED): if (urb_id not in current_urbs): print("Urb id not present: {:x}".format(urb_id)) else: submit = current_urbs[urb_id] completed = p self.usb_transaction(submit, completed) del current_urbs[urb_id]
def setUp(self): import gzip env = remoting.Envelope(pyamf.AMF3) r = remoting.Response(['foo' * 50000] * 200) env['/1'] = r response = remoting.encode(env).getvalue() buf = util.BufferedByteStream() x = gzip.GzipFile(fileobj=buf, mode='wb') x.write(response) x.close() self.canned_response = buf.getvalue() BaseServiceTestCase.setUp(self) self.headers['Content-Encoding'] = 'gzip'
def _compress(path): with open(path, "rb") as archive: directory, filename = _split_compress_path(path) prefix, suffix = os.path.splitext(filename) with _unique_writable_file(directory, prefix, suffix + ".gz") as (gz_path, gz_file): compressed = gzip.GzipFile(fileobj=gz_file) try: compressed.writelines(archive) finally: compressed.close() try: os.remove(path) except OSError: pass return gz_path
def compress_sequence(sequence): buf = StreamingBuffer() zfile = GzipFile(mode='wb', compresslevel=6, fileobj=buf) # Output headers... yield buf.read() for item in sequence: zfile.write(item) data = buf.read() if data: yield data zfile.close() yield buf.read() # Expression to match some_token and some_token="with spaces" (and similarly # for single-quoted strings).
def compress(self, current=False): gz_log = None try: compress_file = self.backup_log_file if not current: compress_file = self.last_log if not os.path.isfile(self.last_log) or self.last_log == self.backup_log_file: return logging.info("Compressing log file: %s" % compress_file) gz_file = "%s.gz" % compress_file gz_log = GzipFile(gz_file, "w+") with open(compress_file) as f: for line in f: gz_log.write(line) os.remove(compress_file) finally: if gz_log: gz_log.close()
def retrieve_content(url, data=None): """ Retrieves page content from given URL """ try: req = urllib2.Request("".join(url[i].replace(' ', "%20") if i > url.find('?') else url[i] for i in xrange(len(url))), data, {"User-agent": NAME, "Accept-encoding": "gzip, deflate"}) resp = urllib2.urlopen(req, timeout=TIMEOUT) retval = resp.read() encoding = resp.headers.get("Content-Encoding") if encoding: if encoding.lower() == "deflate": data = StringIO.StringIO(zlib.decompress(retval, -15)) else: data = gzip.GzipFile("", "rb", 9, StringIO.StringIO(retval)) retval = data.read() except Exception, ex: retval = ex.read() if hasattr(ex, "read") else getattr(ex, "msg", str()) return retval or ""
def handleResponse(self, data): if (self.isCompressed): logging.debug("Decompressing content...") data = gzip.GzipFile('', 'rb', 9, StringIO.StringIO(data)).read() logging.log(self.getLogLevel(), "Read from server:\n" + data) #logging.log(self.getLogLevel(), "Read from server:\n <large data>" ) data = self.replaceSecureLinks(data) if (self.contentLength != None): self.client.setHeader('Content-Length', len(data)) self.client.write(data) self.shutdown()
def handleResponse(self, data): if (self.isCompressed): logging.debug("Decompressing content...") data = gzip.GzipFile('', 'rb', 9, StringIO.StringIO(data)).read() #logging.log(self.getLogLevel(), "Read from server:\n" + data) logging.log(self.getLogLevel(), "Read from server:\n <large data>" ) data = self.replaceSecureLinks(data) if (self.contentLength != None): self.client.setHeader('Content-Length', len(data)) self.client.write(data) self.shutdown()
def testProcess(self): if os.path.exists('/bin/gzip'): cmd = '/bin/gzip' elif os.path.exists('/usr/bin/gzip'): cmd = '/usr/bin/gzip' else: raise RuntimeError("gzip not found in /bin or /usr/bin") s = "there's no place like home!\n" * 3 p = Accumulator() d = p.endedDeferred = defer.Deferred() reactor.spawnProcess(p, cmd, [cmd, "-c"], env=None, path="/tmp", usePTY=self.usePTY) p.transport.write(s) p.transport.closeStdin() def processEnded(ign): f = p.outF f.seek(0, 0) gf = gzip.GzipFile(fileobj=f) self.assertEquals(gf.read(), s) return d.addCallback(processEnded)
def respond_to_checkpoint(self, response_code): headers = { 'User-Agent': self.USER_AGENT, 'Origin': 'https://i.instagram.com', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US', 'Accept-Encoding': 'gzip', 'Referer': self.endpoint, 'Cookie': self.cookie, } req = Request(self.endpoint, headers=headers) data = {'csrfmiddlewaretoken': self.csrftoken, 'response_code': response_code} res = urlopen(req, data=urlencode(data).encode('ascii'), timeout=self.timeout) if res.info().get('Content-Encoding') == 'gzip': buf = BytesIO(res.read()) content = gzip.GzipFile(fileobj=buf).read().decode('utf-8') else: content = res.read().decode('utf-8') return res.code, content
def anyfile(infile, mode='r', encoding="utf8"): ''' return a file handler with the support for gzip/zip comppressed files if infile is a two value tuple, then first one is the compressed file; the second one is the actual filename in the compressed file. e.g., ('a.zip', 'aa.txt') ''' if isinstance(infile, tuple): infile, rawfile = infile[:2] else: rawfile = os.path.splitext(infile)[0] filetype = os.path.splitext(infile)[1].lower() if filetype == '.gz': import gzip in_f = io.TextIOWrapper(gzip.GzipFile(infile, 'r'),encoding=encoding) elif filetype == '.zip': import zipfile in_f = io.TextIOWrapper(zipfile.ZipFile(infile, 'r').open(rawfile, 'r'),encoding=encoding) else: in_f = open(infile, mode, encoding=encoding) return in_f
def __parse_server_data(self): """ :return: """ if self.data_s2c: try: resp = dpkt.http.Response(self.data_s2c) if resp.headers.get("content-encoding") == "gzip": data = resp.body data_arrays = mills.str2hex(data) if data_arrays[0:3] == ["1f", "8b", "08"]: data_unzip = gzip.GzipFile(fileobj=StringIO(data)).read() resp.body = data_unzip return resp except Exception as e: logging.error("[dpkt_http_resp_parse_failed]: %s %r" % (self.data_s2c, e))
def OpenFileForRead(path, logtext): """ Opens a text file, be it GZip or plain """ frame = None file = None if not path: return (frame, file) try: if path.endswith('.gz'): frame = open(path, 'rb') file = gzip.GzipFile(fileobj=frame, mode='rt') else: file = open(path, 'rt') if logtext: output.Log('Opened %s file: %s' % (logtext, path), 1) else: output.Log('Opened file: %s' % path, 1) except IOError: output.Error('Can not open file: %s' % path) return (frame, file) #end def OpenFileForRead
def _decompressContent(response, new_content): content = new_content try: encoding = response.get('content-encoding', None) if encoding in ['gzip', 'deflate']: if encoding == 'gzip': content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read() if encoding == 'deflate': content = zlib.decompress(content, -zlib.MAX_WBITS) response['content-length'] = str(len(content)) # Record the historical presence of the encoding in a way the won't interfere. response['-content-encoding'] = response['content-encoding'] del response['content-encoding'] except IOError: content = "" raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content) return content
def _decompressContent(response, new_content): content = new_content try: encoding = response.get('content-encoding', None) if encoding in ['gzip', 'deflate']: if encoding == 'gzip': content = gzip.GzipFile(fileobj=io.BytesIO(new_content)).read() if encoding == 'deflate': content = zlib.decompress(content, -zlib.MAX_WBITS) response['content-length'] = str(len(content)) # Record the historical presence of the encoding in a way the won't interfere. response['-content-encoding'] = response['content-encoding'] del response['content-encoding'] except IOError: content = "" raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content) return content
def extract_images(filename): """Extract the images into a 4D uint8 numpy array [index, y, x, depth].""" print('Extracting', filename) with tf.gfile.Open(filename, 'rb') as f, gzip.GzipFile(fileobj=f) as bytestream: magic = _read32(bytestream) if magic != 2051: raise ValueError( 'Invalid magic number %d in MNIST image file: %s' % (magic, filename)) num_images = _read32(bytestream) rows = _read32(bytestream) cols = _read32(bytestream) buf = bytestream.read(rows * cols * num_images) data = numpy.frombuffer(buf, dtype=numpy.uint8) data = data.reshape(num_images, rows, cols, 1) return data
def get_title2id(self, dump_date): print('get_title2id...') title2id = {} regex = re.compile(r"\((\d+),0,'(.+?)','") fname = '/home/ddimitrov/data/enwiki20150304_plus_clickstream/enwiki-' + dump_date + '-page.sql.gz' fname = '/home/ddimitrov/data/enwiki20150304_plus_clickstream/enwiki-' + dump_date + '-page.sql' #with gzip.GzipFile(fname, 'rb') as infile: with open(fname) as f: content = f.readlines() for line in content: line = line.decode('utf-8') if not line.startswith('INSERT'): continue for pid, title in regex.findall(line): title2id[DataHandler.unescape_mysql(title)] = int(pid) return title2id
def get_rpid2pid(self, dump_date): print('get_rpid2pid...') title2id = self.get_title2id(dump_date) rpid2pid = {} regex = re.compile(r"\((\d+),0,'(.+?)','") fname = '/home/ddimitrov/data/enwiki20150304_plus_clickstream/enwiki-' + dump_date + '-redirect.sql.gz' with gzip.GzipFile(fname, 'rb') as infile: for line in infile: line = line.decode('utf-8') if not line.startswith('INSERT'): continue line = line.replace('NULL', "''") for pid, title in regex.findall(line): try: rpid2pid[pid] = title2id[DataHandler.unescape_mysql(title)] except KeyError: print(pid, title) # pdb.set_trace() return rpid2pid
def read_lett_iter(f, decode=True): fh = f fh.seek(0) if f.name.endswith('.gz'): fh = gzip.GzipFile(fileobj=fh, mode='r') for line in fh: lang, mime, enc, url, html, text = line[:-1].split("\t") html = base64.b64decode(html) text = base64.b64decode(text) if decode: html = html.decode("utf-8") text = text.decode("utf-8") p = Page(url, html, text, mime, enc, lang) yield p
def test_gzip_loadtxt(): # Thanks to another windows brokeness, we can't use # NamedTemporaryFile: a file created from this function cannot be # reopened by another open call. So we first put the gzipped string # of the test reference array, write it to a securely opened file, # which is then read from by the loadtxt function s = BytesIO() g = gzip.GzipFile(fileobj=s, mode='w') g.write(b'1 2 3\n') g.close() s.seek(0) with temppath(suffix='.gz') as name: with open(name, 'wb') as f: f.write(s.read()) res = np.loadtxt(name) s.close() assert_array_equal(res, [1, 2, 3])
def _read_datafile(self, path, expected_dims): """Helper function to read a file in IDX format.""" base_magic_num = 2048 with gzip.GzipFile(path) as f: magic_num = struct.unpack('>I', f.read(4))[0] expected_magic_num = base_magic_num + expected_dims if magic_num != expected_magic_num: raise ValueError('Incorrect MNIST magic number (expected ' '{}, got {})' .format(expected_magic_num, magic_num)) dims = struct.unpack('>' + 'I' * expected_dims, f.read(4 * expected_dims)) buf = f.read(reduce(operator.mul, dims)) data = np.frombuffer(buf, dtype=np.uint8) data = data.reshape(*dims) return data
def to_file(self, dir_path='.', fname='amsetrun', force_write=True): if not force_write: n = 1 fname0 = fname while os.path.exists(os.path.join(dir_path, '{}.json.gz'.format(fname))): warnings.warn('The file, {} exists. AMSET outputs will be ' 'written in {}'.format(fname, fname0+'_'+str(n))) fname = fname0 + '_' + str(n) n += 1 # make the output dict out_d = {'kgrid': self.kgrid, 'egrid': self.egrid} # write the output dict to file with gzip.GzipFile( os.path.join(dir_path, '{}.json.gz'.format(fname)), 'w') as fp: jsonstr = json.dumps(out_d, cls=MontyEncoder) fp.write(jsonstr)
def _get_data(url): """Helper function to get data over http or from a local file""" if url.startswith('http://'): # Try Python 2, use Python 3 on exception try: resp = urllib.urlopen(url) encoding = resp.headers.dict.get('content-encoding', 'plain') except AttributeError: resp = urllib.request.urlopen(url) encoding = resp.headers.get('content-encoding', 'plain') data = resp.read() if encoding == 'plain': pass elif encoding == 'gzip': data = StringIO(data) data = gzip.GzipFile(fileobj=data).read() else: raise RuntimeError('unknown encoding') else: with open(url, 'r') as fid: data = fid.read() return data
def write_handle(self, handle): """ Write the database to the specified file handle. """ if self.compress and _gzip_ok: try: g = gzip.GzipFile(mode="wb", fileobj=handle) except: g = handle else: g = handle self.g = codecs.getwriter("utf8")(g) self.write_xml_data() g.close() return 1
def py_gunzip(gz_file, outdir = "."): # extract a .gz file # read in the contents print gz_file print outdir print "Reading file:\t" + gz_file input_file = gzip.GzipFile(gz_file, 'rb') file_contents = input_file.read() input_file.close() print "Finished reading file" # get the output path from the outdir and filename output_file_base = os.path.basename(os.path.splitext(gz_file)[0]) print output_file_base output_file_path = os.path.join(outdir, output_file_base) print output_file_path # write the contents print "Writing contents to file:\t" + output_file_path print type(output_file_path) # output_file = file(output_file_path, 'wb') output_file = open(output_file_path, 'wb') output_file.write(file_contents) output_file.close()
def saveIDL(filename, annotations): [name, ext] = os.path.splitext(filename) if(ext == ".idl"): file = open(filename,'w') if(ext == ".gz"): file = gzip.GzipFile(filename, 'w') if(ext == ".bz2"): file = bz2.BZ2File(filename, 'w') i=0 for annotation in annotations: annotation.writeIDL(file) if (i+1<len(annotations)): file.write(";\n") else: file.write(".\n") i+=1 file.close()
def encode_private_key(self): """ Based on spotnab, this is the gzipped version of the key with base64 applied to it. We encode it as such and return it. """ fileobj = StringIO() with GzipFile(fileobj=fileobj, mode="wb") as f: try: f.write(self.private_pem()) except TypeError: # It wasn't initialized yet return None return b64encode(fileobj.getvalue())
def decode_private_key(self, encoded): """ Based on spotnab, this is the gzipped version of the key with base64 applied to it. We decode it and load it. """ fileobj = StringIO() try: fileobj.write(b64decode(encoded)) except TypeError: return False fileobj.seek(0L, SEEK_SET) private_key = None with GzipFile(fileobj=fileobj, mode="rb") as f: private_key = f.read() if not private_key: return False # We were successful if not self.load(private_key=private_key): return False return True
def decode_public_key(self, encoded): """ Based on spotnab, this is the gzipped version of the key with base64 applied to it. We decode it and load it. """ fileobj = StringIO() try: fileobj.write(b64decode(encoded)) except TypeError: return False fileobj.seek(0L, SEEK_SET) self.public_key = None with GzipFile(fileobj=fileobj, mode="rb") as f: try: self.public_key = serialization.load_pem_public_key( f.read(), backend=default_backend() ) except ValueError: # Could not decrypt content return False if not self.public_key: return False return True
def _init_read_gz(self): """Initialize for reading a gzip compressed fileobj. """ self.cmp = self.zlib.decompressobj(-self.zlib.MAX_WBITS) self.dbuf = b"" # taken from gzip.GzipFile with some alterations if self.__read(2) != b"\037\213": raise ReadError("not a gzip file") if self.__read(1) != b"\010": raise CompressionError("unsupported compression method") flag = ord(self.__read(1)) self.__read(6) if flag & 4: xlen = ord(self.__read(1)) + 256 * ord(self.__read(1)) self.read(xlen) if flag & 8: while True: s = self.__read(1) if not s or s == NUL: break if flag & 16: while True: s = self.__read(1) if not s or s == NUL: break if flag & 2: self.__read(2)
def seekable(self): if not hasattr(self.fileobj, "seekable"): # XXX gzip.GzipFile and bz2.BZ2File return True return self.fileobj.seekable()
def gzopen(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs): """Open gzip compressed tar archive name for reading or writing. Appending is not allowed. """ if len(mode) > 1 or mode not in "rw": raise ValueError("mode must be 'r' or 'w'") try: import gzip gzip.GzipFile except (ImportError, AttributeError): raise CompressionError("gzip module is not available") extfileobj = fileobj is not None try: fileobj = gzip.GzipFile(name, mode + "b", compresslevel, fileobj) t = cls.taropen(name, mode, fileobj, **kwargs) except IOError: if not extfileobj and fileobj is not None: fileobj.close() if fileobj is None: raise raise ReadError("not a gzip file") except: if not extfileobj and fileobj is not None: fileobj.close() raise t._extfileobj = extfileobj return t