我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用gzip.decompress()。
def show(): city=e1.get() url='http://wthrcdn.etouch.cn/WeatherApi?city='+urllib.parse.quote(city) weather= urllib.request.urlopen(url).read() weather_data = gzip.decompress(weather).decode('utf-8') try: soup=BeautifulSoup(weather_data) wheater=soup.find_all('weather') Text = (('??:%s'%soup.shidu.text),('??:%s'%soup.fengli.text),wheater[0].high.text,wheater[0].low.text,('??:%s'%wheater[0].type.text)) e2['state']= 'normal' e2.delete(1.0,tk.END) e2.insert(tk.END,Text) e2['state']= 'disabled' except: Text ='??????????????????' e2['state']= 'normal' e2.delete(1.0,tk.END) e2.insert(tk.END,Text) e2['state']= 'disabled'
def download_sifts_xml(pdb_id, outdir='', outfile=''): """Download the SIFTS file for a PDB ID. Args: pdb_id: outdir: outfile: Returns: """ baseURL = 'ftp://ftp.ebi.ac.uk/pub/databases/msd/sifts/xml/' filename = '{}.xml.gz'.format(pdb_id) if outfile: outfile = op.join(outdir, outfile) else: outfile = op.join(outdir, filename.split('.')[0] + '.sifts.xml') if not op.exists(outfile): response = urlopen(baseURL + filename) with open(outfile, 'wb') as f: f.write(gzip.decompress(response.read())) return outfile
def test_brotli_dynamic_cache(br_client): from brotli import decompress from time import sleep br_client.get( '/', headers=[('accept-encoding', 'gzip, br')]) sleep(0.5) resp = br_client.get( '/', headers=[('accept-encoding', 'gzip, br')]) assert resp.headers.get('x-brotli-cache') == 'HIT' assert resp.headers.get('content-encoding') == 'br' assert b"Hello, world!" in decompress(resp.data)
def saveContentOfURL(target_url): # ?????????URL??????,??? if target_url in searched_url: return try: # ??GET??target_url article_response = urllib.request.urlopen(target_url) raw_data = article_response.read() # ???????gzip???????? if article_response.getheader("Content-Encoding") == "gzip": raw_data = gzip.decompress(raw_data) # gb2312??,???????????????,?????? article_data = raw_data.decode('gb2312', 'ignore') # ?????<p></p>????clean???? forEachMatch(pattern_str='<p>(.*?)</p>', to_match_str=article_data, func=lambda match: file_operator.writeFile(cleanArticle(match.group(1)))) except urllib.error.URLError: print(target_url, 'is a wrong url') except BaseException as message: print(message) # ?????????URL searched_url.add(target_url) # ??<p></p>????,???????
def import_to_store(self, compressed_nar): """Given a compressed NAR, extract it and import it into the nix store. :param compressed_nar: The bytes of a NAR, compressed. :type compressed_nar: ``str`` """ # Figure out how to extract the content. if self.compression.lower() in ("xz", "xzip"): data = lzma.decompress(compressed_nar) elif self.compression.lower() in ("bz2", "bzip2"): data = bz2.decompress(compressed_nar) else: data = gzip.decompress(compressed_nar) # Once extracted, convert it into a nix export object and import. export = self.nar_to_export(data) imported_path = export.import_to_store()
def get_page(self, _url): ''' ???????? return str ''' header = { 'Accept-Encoding': 'gzip' } header['User-Agent'] = self.ualist[random.randint(0, len(self.ualist)-1)] if opts['user_agent']: header['User-Agent'] = opts['user_agent'] with (yield from semaphore): response = yield from aiohttp.request('GET', _url, headers = header) page = yield from response.read() try: if self.url_type == "2": return "None Content" if self.url_type == "4": return gzip.decompress(page).decode('gb2312').encode('utf-8') else: return gzip.decompress(page) except OSError: return page
def _process_response(self, res): """ Take the response object and return JSON :param res: :return: """ # TODO Figure out exceptions here if res.headers['Content-Encoding'] == 'gzip': self.send_log('Detected gzipped response', 'debug') raw_output = gzip.decompress(res.read()).decode('utf-8') else: self.send_log('Detected other type of response encoding: {}'.format(res.headers['Content-Encoding']), 'debug') raw_output = res.read().decode('utf-8') json_output = json.loads(raw_output) return json_output
def _load_data(): # https://github.com/raumkraut/python-debian/blob/master/README.deb822 global _data mirror = BP.config['L4TM_MIRROR'] release = BP.config['L4TM_RELEASE'] repo = '%s/dists/%s/%%s/%%s/Packages.gz' % (mirror, release) _data = {} for area in BP.config['L4TM_AREAS']: for arch in ('binary-all', 'binary-arm64'): BP.logger.info('Loading/processing %s/%s/Packages.gz...' % ( area, arch)) pkgarea = repo % (area, arch) pkgresp = HTTP_REQUESTS.get(pkgarea) if pkgresp.status_code != 200: BP.logger.error('%s not found' % arch) continue BP.logger.debug('Uncompressing %s bytes' % pkgresp.headers['content-length']) unzipped = gzip.decompress(pkgresp.content) # bytes all around BP.logger.debug('Parsing %d bytes of package data' % len(unzipped)) unzipped = BytesIO(unzipped) # the next step needs read() tmp = [ src for src in Packages.iter_paragraphs(unzipped) ] _data.update(dict((pkg['Package'], pkg) for pkg in tmp))
def extract_features(doc): html = doc['html'] or '' if not doc_is_extra_sampled(doc): try: html = gzip.decompress(base64.b64decode(html)).decode('utf8') except Exception: pass # support not compressed html too text = html_text.extract_text(html) try: lang = langdetect.detect(text) except LangDetectException: lang = None return { 'text': text, 'language': lang, }
def msg_recv(conn, sendfunc, closefunc): ''' Function msg_recv reads null-delimited series of bytes from `conn`, which is a socket. Each series of bytes is then de-serialized into a json object, and `sendfunc` is called with that json object. `closefunc` is called if/when the socket `conn` is closed. ''' buf = bytes() while True: try: data = conn.recv(8192) # No data means the connection is closed if not data: closefunc() return inbuf = buf + data if SEP in inbuf: parts = inbuf.split(SEP) # logging.debug("Length of parts: {}".format(len(parts))) tosend = [parts[0]] for p in parts[1:-1]: tosend.append(p) buf = parts[-1] for msg in tosend: m = gzip.decompress(msg) m = m.decode('utf-8') logging.debug("Msg: {}".format(m[:150]+'...' if len(m) > 150 else m)) obj = json.loads(m) sendfunc(obj) else: buf += data except Exception as e: logging.exception(e)
def _handle_gzip_packed(self, msg_id, sequence, reader, updates): self._logger.debug('Handling gzip packed data') reader.read_int(signed=False) # code packed_data = reader.tgread_bytes() unpacked_data = gzip.decompress(packed_data) with BinaryReader(unpacked_data) as compressed_reader: return self._process_msg( msg_id, sequence, compressed_reader, updates) # endregion
def _lzma(self): '''LZMA processor''' try: archive = lzma.decompress(self.cur_attachment.file_obj.read()) new_fn, ext = os.path.splitext(self.cur_attachment.orig_filename) cur_file = File(archive, new_fn) self.process_payload(cur_file) except: self.cur_attachment.make_dangerous() return self.cur_attachment
def _bzip(self): '''BZip2 processor''' try: archive = bz2.decompress(self.cur_attachment.file_obj.read()) new_fn, ext = os.path.splitext(self.cur_attachment.orig_filename) cur_file = File(archive, new_fn) self.process_payload(cur_file) except: self.cur_attachment.make_dangerous() return self.cur_attachment
def download_biological_assemblies(pdb_id, outdir): """Downloads biological assembly file from: `ftp://ftp.wwpdb.org/pub/pdb/data/biounit/coordinates/divided/` Args: outdir (str): Output directory of the decompressed assembly """ # TODO: not tested yet if not op.exists(outdir): raise ValueError('{}: output directory does not exist'.format(outdir)) folder = pdb_id[1:3] server = 'ftp://ftp.wwpdb.org/pub/pdb/data/biounit/coordinates/divided/{}/'.format(folder) html_folder = urlopen(server).readlines() for line in html_folder: if pdb_id in str(line).strip(): file_name = '%s' % (pdb_id + str(line).strip().split(pdb_id)[1].split('\r\n')[0]) outfile_name = file_name.replace('.', '_') outfile_name = outfile_name.replace('_gz', '.pdb') f = urlopen(op.join(server, file_name)) decompressed_data = zlib.decompress(f.read(), 16 + zlib.MAX_WBITS) with open(op.join(outdir, outfile_name), 'wb') as f: f.write(decompressed_data) f.close() log.debug('{}: downloaded biological assembly') return op.join(outdir, outfile_name)
def decode(value, decompress=False): """ Decodes response from Base64 encoded string. """ decoded = base64.b64decode(value) if decompress: decoded = gzip.decompress(decoded) return json.loads(decoded.decode())
def read(reader): assert reader.read_int(signed=False) == GzipPacked.CONSTRUCTOR_ID return gzip.decompress(reader.tgread_bytes())
def get_vxstream_report(sha256, envid, type_): # XML, HTML, BIN and PCAP are GZipped Sample.query.filter_by(sha256=sha256).first_or_404() headers = { 'Accept': 'text/html', 'User-Agent': 'VxStream Sandbox API Client'} params = {'type': type_, 'environmentId': envid} vx = vxstream.api.get('result/{}'.format(sha256), params=params, headers=headers) if type_ in ['xml', 'html', 'bin', 'pcap']: return gzip.decompress(vx) return vx
def get_fireeye_report(sha256, envid, type): raise ApiException({}, 501) # XML, HTML, BIN and PCAP are GZipped Sample.query.filter_by(sha256=sha256).first_or_404() headers = { 'Accept': 'text/html', 'User-Agent': 'FireEye Sandbox API Client'} params = {'type': type, 'environmentId': envid} vx = fireeye.api.get('result/{}'.format(sha256), params=params, headers=headers) if type in ['xml', 'html', 'bin', 'pcap']: return gzip.decompress(vx) return vx
def get_cp_vxstream_report(sha256, envid, type_): # XML, HTML, BIN and PCAP are GZipped Sample.query.filter_by(sha256=sha256, user_id=g.user.id).first_or_404() headers = { 'Accept': 'text/html', 'User-Agent': 'VxStream Sandbox API Client'} params = {'type': type_, 'environmentId': envid} vx = vxstream.api.get('result/{}'.format(sha256), params=params, headers=headers) if type_ in ['xml', 'html', 'bin', 'pcap']: return gzip.decompress(vx) return vx
def fetch_page(query): url = REQUEST_URL.format(BASE_URL, query) request = urllib.request.Request(url) request.add_header('User-agent', _random_user_agent()) request.add_header('connection', 'keep-alive') request.add_header('Accept-Encoding', 'gzip, deflate, sdch, br') request.add_header('referer', REQUEST_URL.format(BASE_URL, "")) print(url) response = urllib.request.urlopen(request) data = response.read() print(type(data)) return gzip.decompress(data)
def read_bytes_data(): with open('sample.bytes', 'rb') as file: content = file.read() # dom = BeautifulSoup(gzip.decompress(content)) data = gzip.decompress(content) # soup = BeautifulSoup(data, 'html.parser') # links = soup.find_all('a', {'class':'l _HId'}) # for link in links: # print(link.get('href')) # print(soup.prettify())
def decode_gzip(content): assert isinstance(content, bytes) return gzip.decompress(content)
def decode_deflate(content): assert isinstance(content, bytes) try: return zlib.decompress(content) except Exception: return zlib.decompress(content, -zlib.MAX_WBITS)
def test_brotli_static_gzip(br_client): from gzip import decompress gzip_resp = br_client.get( '/static/bee.txt', headers=[('accept-encoding', 'gzip')]) assert gzip_resp.headers.get('content-encoding') == 'gzip' assert BEE_SCRIPT in decompress(gzip_resp.data)
def test_brotli_static_br(br_client): from brotli import decompress br_resp = br_client.get( '/static/bee.txt', headers=[('accept-encoding', 'gzip, br')]) assert br_resp.headers.get('content-encoding') == 'br' assert BEE_SCRIPT in decompress(br_resp.data)
def test_brotli_dynamic(br_client): from brotli import decompress resp = br_client.get( '/', headers=[('accept-encoding', 'gzip, br')]) assert resp.headers.get('x-brotli-cache') == 'MISS' assert resp.headers.get('content-encoding') == 'br' assert b"Hello, world!" in decompress(resp.data)
def test_main(bucket, crypto, processor): _p = functools.partial(_payload, crypto) today = date.today() user = 'foo' prefix = 'v2/sessions/%s/%s/%s/' % (today.year, today.month, user) records = [ make_record('2', _p({'user': user, 'extra': 1})), ] assert main(processor, records) == ('2', 0) objs = list(bucket.filter(Prefix=prefix)) assert len(objs) == 1 assert objs[0].key.endswith('.json.gz') obj = bucket.get(objs[0].key) assert obj['ContentEncoding'] == 'gzip' assert obj['ContentType'] == 'application/json' body = obj['Body'].read() obj['Body'].close() body = json.loads(gzip.decompress(body).decode('utf-8')) assert body == {'user': user, 'extra': 1} # Upload a second time records = [ make_record('3', _p({'user': user, 'extra': 2})), ] assert main(processor, records) == ('3', 0) objs = list(bucket.filter(Prefix=prefix)) assert len(objs) == 2
def _decompress_result(self, compressed_result): result_bytes = bytes([int(c, 16) for c in compressed_result.split('x')]) result = gzip.decompress(result_bytes).decode() result = json.loads(result) return result
def decompress( data, ): decompressed_object = gzip.decompress(data) return decompressed_object
def log_file_s3_object(record): return gzip.decompress(s3.get_object( Bucket=record['s3']['bucket']['name'], Key=record['s3']['object']['key'])['Body'].read())
def urlopen(url, headers={}, data=None, retries=RETRIES, timeout=TIMEOUT): '''????http??, ???Request. headers ???dict. ?????????, ??User-Agent, Referer?, ? ????????. ????????http??, ??????????. ???????gzip????, ????gzip???????, ??????? ??. req.data ?????????http????, ????UTF-8?????. ''' headers_merged = default_headers.copy() for key in headers.keys(): headers_merged[key] = headers[key] opener = urllib.request.build_opener(ForbiddenHandler) opener.addheaders = [(k, v) for k,v in headers_merged.items()] for i in range(retries): try: req = opener.open(url, data=data, timeout=timeout) encoding = req.headers.get('Content-encoding') req.data = req.read() if encoding == 'gzip': req.data = gzip.decompress(req.data) elif encoding == 'deflate': req.data = zlib.decompress(req.data, -zlib.MAX_WBITS) return req except OSError: logger.error(traceback.format_exc()) except: logger.error(traceback.format_exc()) return None
def post_multipart(url, headers, fields, files, retries=RETRIES): content_type, body = encode_multipart_formdata(fields, files) schema = urllib.parse.urlparse(url) headers_merged = default_headers.copy() for key in headers.keys(): headers_merged[key] = headers[key] headers_merged['Content-Type'] = content_type headers_merged['Content-length'] = str(len(body)) for i in range(retries): try: h = http.client.HTTPConnection(schema.netloc) h.request('POST', url, body=body, headers=headers_merged) req = h.getresponse() encoding = req.getheader('Content-encoding') req.data = req.read() if encoding == 'gzip': req.data = gzip.decompress(req.data) elif encoding == 'deflate': req.data = zlib.decompress(req.data, -zlib.MAX_WBITS) return req except OSError: logger.error(traceback.format_exc()) except: logger.error(traceback.format_exc()) #return None return None
def rehash_release(_filelist, fdesc, rmstr): """ Calculates checksums of a given filelist and writes them to the given file descriptor. Takes rmstr as the third argument, which is a string to remove from the path of the hashed file when writing it to a file. """ info('Hashing checksums') for csum in checksums: fdesc.write('%s:\n' % csum['name']) for i in _filelist: if isfile(i): cont = open(i, 'rb').read() fdesc.write(' %s %8s %s\n' % (csum['f'](cont).hexdigest(), getsize(i), i.replace(rmstr+'/', ''))) elif i.endswith('.xz') and isfile(i.replace('.xz', '.gz')): xzstr = lzma_comp(open(i.replace('.xz', '.gz'), 'rb').read()) fdesc.write(' %s %8s %s\n' % (csum['f'](xzstr).hexdigest(), len(xzstr), i.replace(rmstr+'/', ''))) elif not i.endswith('.gz') and isfile(i+'.gz'): uncomp = gzip_decomp(open(i+'.gz', 'rb').read()) fdesc.write(' %s %8s %s\n' % (csum['f'](uncomp).hexdigest(), len(uncomp), i.replace(rmstr+'/', ''))) return
def ungzip(data): try: # ???? print('????.....') data = gzip.decompress(data) print('????!') except: print('????, ????') return data
def __ungzip(self, data): try: # ???? # print('????.....') data = gzip.decompress(data) # print('????!') except: logger.error('?????????????????') # print('????, ????') return data
def Get_weather(city): url = 'http://wthrcdn.etouch.cn/WeatherApi?city=' + urllib.parse.quote(city) weather = urllib.request.urlopen(url).read() weather_data = gzip.decompress(weather).decode('utf-8') soup = BeautifulSoup(weather_data) return soup
def get_huochepiao(url1): headers={'Accept':'text/javascript, application/javascript, application/ecmascript, application/x-ecmascript, */*; q=0.01', 'Accept-Encoding':'gzip, deflate, sdch', 'Accept-Language':'zh-CN,zh;q=0.8', 'Connection':'keep-alive', 'Cookie':'QN99=3733; QN1=eIQiP1dEAASbOq51LyeNAg==; QunarGlobal=192.168.31.105_-4c70ffc_154e15d8dc8_2de|1464074245311; ag_fid=AsWRT9vZYYLJ23qF; __ag_cm_=1464074247005; QN269=906FD473217F11E6B393C4346BAC1530; PHPSESSID=epq85mhbfeg12b3t6q8rkic702; QN25=5cfd26dc-8670-44ec-aafc-94923235a6fc-9f992f90; QN42=zbua0851; _q=U.ryzxozi0081; _t=24542281; csrfToken=QxdjaQNPcDnkhaMMMwxbGbpwWeKXNtET; _s=s_2QHWQF6G6AI3QWPVO6UBTX2LZE; _v=-8JqPkXGW-Vsgcr1koBOn0mWlXDIk6gdgRyueLvJJO3C0Ru2ALnLJw7DFu6Y6FUrAWf8tU-PZtj1Dc2l_o50sSp6YyMnlDQ4dVpPmDi0QMz_XOGK0loLwpTeCoe0wvE0aHJKPGHtArx4jlrdtgWSX9O2IfI8qnNi3-wHXEY6rVEN; QN44=ryzxozi0081; _i=RBTjeomvkDExEx-xsOrmQxSvMXex; _vi=7AZYnlCS385W7Z8-IQdjp5sbVR1PFm8kL0-Qi39HR1-wvJEvexvDP9L5vcTyfiBM9AUeWbCi1osGa2UEs6aMSu-IrejFGqde7L7Y04s8z115RVvdF0h-VmYrWg5Ni-nNZVw8xz3rFA7Jcv-ASn9aff2fhGbtS_0JFDKWQkwggWMx; Hm_lvt_2e7c38479e4def08a8ea1c0ebdb3e0d6=1472535537; Hm_lpvt_2e7c38479e4def08a8ea1c0ebdb3e0d6=1472541016; QN268=|1472541016285_e1523dd1fcbd8c01', 'Host':'train.qunar.com', 'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.80 Safari/537.36', 'X-Requested-With':'XMLHttpRequest' } req=urllib.request.Request(url1,headers=headers) html=urllib.request.urlopen(req).read() decompressed_data = zlib.decompress(html ,16+zlib.MAX_WBITS) text = decompressed_data.decode("utf-8") soup=BeautifulSoup(text) m=str(soup)[46:-2] htm=json.loads(m) try: lines=htm['data']['s2sBeanList'] print('??????%s??'%len(lines)) i =1 for item in lines: print("-------------?%s?????-------------"%i) print('--------------??????--------------') print('?????%s'%item['dptStationName']),\ print('?????%s'%item['arrStationName']),\ print('??:%s'%item['trainNo']),\ print('????:%s'%item['dptTime']),\ print('??%s'%item['arrTime']),\ print('???%s'%item['extraBeanMap']['interval']),\ print('?????%s'%item['extraBeanMap']['ticketType']),print('?????%s'%item['extraBeanMap']['stationType']) b=item['seats'] for key ,value in b.items(): print('?????%s,???%s,??:%s'%(key,value['price'],value['count'])) i+=1 except: print('????????????????????????') #--------??????
def test_middleware_compress_response(self): fake_request = FakeRequestAcceptsBrotli() response_content = UTF8_LOREM_IPSUM_IN_CZECH fake_response = FakeResponse(content=response_content) brotli_middleware = BrotliMiddleware() brotli_response = brotli_middleware.process_response(fake_request, fake_response) decompressed_response = brotli.decompress(data=brotli_response.content) # type: bytes self.assertEqual(response_content, decompressed_response.decode(encoding='utf-8'))
def test_etag_is_updated_if_present(self): fake_request = FakeRequestAcceptsBrotli() response_content = UTF8_LOREM_IPSUM_IN_CZECH * 5 fake_etag_content = "\"foo\"" fake_response = FakeResponse(content=response_content, headers={"ETag": fake_etag_content}) self.assertEqual(fake_response['ETag'], fake_etag_content) brotli_middleware = BrotliMiddleware() brotli_response = brotli_middleware.process_response(fake_request, fake_response) decompressed_response = brotli.decompress(data=brotli_response.content) # type: bytes self.assertEqual(response_content, decompressed_response.decode(encoding='utf-8')) self.assertEqual(brotli_response['ETag'], '"foo;br\\"')
def test_middleware_wont_compress_if_response_is_already_compressed(self): fake_request = FakeRequestAcceptsBrotli() response_content = UTF8_LOREM_IPSUM_IN_CZECH fake_response = FakeResponse(content=response_content) brotli_middleware = BrotliMiddleware() django_gzip_middleware = GZipMiddleware() gzip_response = django_gzip_middleware.process_response(fake_request, fake_response) brotli_response = brotli_middleware.process_response(fake_request, gzip_response) self.assertEqual(response_content, gzip.decompress(brotli_response.content).decode(encoding='utf-8'))
def get_json_response(url): with urllib.request.urlopen(url) as response: result = gzip.decompress(response.read()) return json.loads(result.decode('utf-8'))
def ungzip(data,url): try: print(url,"?????...") data = gzip.decompress(data) print(url,"????...") except: print(url,"?????????...") return data
def ungzip(data,url): try: #print(url,"?????...") data = gzip.decompress(data) #print(url,"????...") except: #print(url,"?????????...") pass return data
def ungzip(data,url): try: data = gzip.decompress(data) except: pass return data
def ungzip(data): try: #print("?????...") data = gzip.decompress(data) #print("????...") except: print("?????????...") return data #CSDN???
def verify_unit_response(zip_ext_file, min_lines): assert isinstance(zip_ext_file, zipfile.ZipExtFile) unit_output = gzip.decompress(zip_ext_file.read()) assert len(unit_output.decode().split('\n')) >= min_lines, 'Expect at least {} lines. Full unit output {}'.format( min_lines, unit_output)
def resolve_msg(msg_bytes): try: msg = simplejson.loads(gzip.decompress(crypto.decrypt(msg_bytes))) return msg if msg['proto'] == 'iddp' else None except Exception as e: logging.error("Received an invalid message: %s" % str(e)) return None # iot_force_download_msg = {'proto': 'iddp', 'role': 'force_download', 'routine': ':19001/your_file'}
def ungzip(data): data = gzip.decompress(data) return data;