我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用codecs.getreader()。
def metadata(self): pathname = os.path.join(self.dirname, self.filename) name_ver = '%s-%s' % (self.name, self.version) info_dir = '%s.dist-info' % name_ver wrapper = codecs.getreader('utf-8') with ZipFile(pathname, 'r') as zf: wheel_metadata = self.get_wheel_metadata(zf) wv = wheel_metadata['Wheel-Version'].split('.', 1) file_version = tuple([int(i) for i in wv]) if file_version < (1, 1): fn = 'METADATA' else: fn = METADATA_FILENAME try: metadata_filename = posixpath.join(info_dir, fn) with zf.open(metadata_filename) as bf: wf = wrapper(bf) result = Metadata(fileobj=wf) except KeyError: raise ValueError('Invalid wheel, because %s is ' 'missing' % fn) return result
def sensu_event_resolve(message): API_URL = settings.SENSU_API_URL + '/resolve' userAndPass = base64.b64encode(str.encode("%s:%s" % (settings.SENSU_API_USER, settings.SENSU_API_PASSWORD))).decode("ascii") headers = { 'X_REQUESTED_WITH' :'XMLHttpRequest', 'Accept': 'application/json, text/javascript, */*; q=0.01', 'Authorization' : 'Basic %s' % userAndPass } try: client_name, check_name = message['entity'].split(':') post_params = {"client": client_name, "check": check_name} request = http.request('POST', API_URL, body=json.dumps(post_params), headers=headers) response = request.status if response == 202: #reader = codecs.getreader('utf-8') #data = json.load(reader(request)) request.release_conn() else: logger.error('response: %s' % str(response)) except: logger.error("sensu_event_resolve failed resolving entity: %s" % message['entity']) raise
def _get_external_data(url): result = {} try: # urlopen might fail if it runs into redirections, # because of Python issue #13696. Fixed in locators # using a custom redirect handler. resp = urlopen(url) headers = resp.info() if headers.get('Content-Type') != 'application/json': logger.debug('Unexpected response for JSON request') else: reader = codecs.getreader('utf-8')(resp) #data = reader.read().decode('utf-8') #result = json.loads(data) result = json.load(reader) except Exception as e: logger.exception('Failed to get external data for %s: %s', url, e) return result
def reset(self): self.dataStream = codecs.getreader(self.charEncoding[0])(self.rawStream, 'replace') self.chunk = u"" self.chunkSize = 0 self.chunkOffset = 0 self.errors = [] # number of (complete) lines in previous chunks self.prevNumLines = 0 # number of columns in the last line of the previous chunk self.prevNumCols = 0 #Deal with CR LF and surrogates split over chunk boundaries self._bufferedCharacter = None
def main(): filenames = ParseArguments(sys.argv[1:]) # Change stderr to write with replacement characters so we don't die # if we try to print something containing non-ASCII characters. sys.stderr = codecs.StreamReaderWriter(sys.stderr, codecs.getreader('utf8'), codecs.getwriter('utf8'), 'replace') _cpplint_state.ResetErrorCounts() for filename in filenames: ProcessFile(filename, _cpplint_state.verbose_level) _cpplint_state.PrintErrorCounts() sys.exit(_cpplint_state.error_count > 0)
def _get_external_data(url): result = {} try: # urlopen might fail if it runs into redirections, # because of Python issue #13696. Fixed in locators # using a custom redirect handler. resp = urlopen(url) headers = resp.info() ct = headers.get('Content-Type') if not ct.startswith('application/json'): logger.debug('Unexpected response for JSON request: %s', ct) else: reader = codecs.getreader('utf-8')(resp) #data = reader.read().decode('utf-8') #result = json.loads(data) result = json.load(reader) except Exception as e: logger.exception('Failed to get external data for %s: %s', url, e) return result
def _open_asset_path(path, encoding=None): """ :param asset_path: string containing absolute path to file, or package-relative path using format ``"python.module:relative/file/path"``. :returns: filehandle opened in 'rb' mode (unless encoding explicitly specified) """ if encoding: return codecs.getreader(encoding)(_open_asset_path(path)) if os.path.isabs(path): return open(path, "rb") package, sep, subpath = path.partition(":") if not sep: raise ValueError("asset path must be absolute file path " "or use 'pkg.name:sub/path' format: %r" % (path,)) return pkg_resources.resource_stream(package, subpath) #: type aliases
def _resolve_version(version): """ Resolve LATEST version """ if version is not LATEST: return version meta_url = urljoin(DEFAULT_URL, '/pypi/setuptools/json') resp = urlopen(meta_url) with contextlib.closing(resp): try: charset = resp.info().get_content_charset() except Exception: # Python 2 compat; assume UTF-8 charset = 'UTF-8' reader = codecs.getreader(charset) doc = json.load(reader(resp)) return str(doc['info']['version'])
def openshift_main(): import sys import json import codecs import urllib3 from collections import OrderedDict pool = urllib3.PoolManager() reader = codecs.getreader('utf-8') spec_url = 'https://raw.githubusercontent.com/openshift/origin/' \ '%s/api/swagger-spec/openshift-openapi-spec.json' % sys.argv[1] output_path = sys.argv[2] print("writing to {}".format(output_path)) with pool.request('GET', spec_url, preload_content=False) as response: if response.status != 200: print("Error downloading spec file. Reason: %s" % response.reason) return 1 in_spec = json.load(reader(response), object_pairs_hook=OrderedDict) out_spec = process_swagger(process_openshift_swagger(in_spec, output_path)) update_codegen_ignore(out_spec, output_path) with open(output_path, 'w') as out: json.dump(out_spec, out, sort_keys=True, indent=2, separators=(',', ': '), ensure_ascii=True) return 0
def get_wheel_metadata(self, zf): name_ver = '%s-%s' % (self.name, self.version) info_dir = '%s.dist-info' % name_ver metadata_filename = posixpath.join(info_dir, 'WHEEL') with zf.open(metadata_filename) as bf: wf = codecs.getreader('utf-8')(bf) message = message_from_file(wf) return dict(message)
def _get_extensions(self): pathname = os.path.join(self.dirname, self.filename) name_ver = '%s-%s' % (self.name, self.version) info_dir = '%s.dist-info' % name_ver arcname = posixpath.join(info_dir, 'EXTENSIONS') wrapper = codecs.getreader('utf-8') result = [] with ZipFile(pathname, 'r') as zf: try: with zf.open(arcname) as bf: wf = wrapper(bf) extensions = json.load(wf) cache = self._get_dylib_cache() prefix = cache.prefix_to_dir(pathname) cache_base = os.path.join(cache.base, prefix) if not os.path.isdir(cache_base): os.makedirs(cache_base) for name, relpath in extensions.items(): dest = os.path.join(cache_base, convert_path(relpath)) if not os.path.exists(dest): extract = True else: file_time = os.stat(dest).st_mtime file_time = datetime.datetime.fromtimestamp(file_time) info = zf.getinfo(relpath) wheel_time = datetime.datetime(*info.date_time) extract = wheel_time > file_time if extract: zf.extract(relpath, cache_base) result.append((name, dest)) except KeyError: pass return result
def __init__(self, **kwargs): if 'stream' in kwargs: stream = kwargs['stream'] if sys.version_info[0] >= 3: # needs to be a text stream stream = codecs.getreader('utf-8')(stream) self.stream = stream else: self.stream = _csv_open(kwargs['path'], 'r') self.reader = csv.reader(self.stream, **self.defaults)
def __init__(self, f, encoding): self.reader = codecs.getreader(encoding)(f)
def check_result(request): mimetype = 'application/json' data = {} if request.method == 'POST' and 'entity' in request.POST and request.POST['entity'] != '': client_name, check_name = request.POST['entity'].split(':') API_URL = settings.SENSU_API_URL + '/results/' + client_name + '/' + check_name userAndPass = base64.b64encode(str.encode("%s:%s" % (settings.SENSU_API_USER, settings.SENSU_API_PASSWORD))).decode("ascii") headers = { 'X_REQUESTED_WITH' :'XMLHttpRequest', 'Accept': 'application/json, text/javascript, */*; q=0.01', 'Authorization' : 'Basic %s' % userAndPass } try: request = http.request('GET', API_URL, None, headers, preload_content=False) response = request.status if response == 200: reader = codecs.getreader('utf-8') data = json.load(reader(request)) request.release_conn() else: logger.error('check_result response: %s' % str(response)) except: logger.error("check_result failed") raise return HttpResponse(json.dumps(data), mimetype)
def sensu_client_list(): API_URL = settings.SENSU_API_URL + '/clients' userAndPass = base64.b64encode(str.encode("%s:%s" % (settings.SENSU_API_USER, settings.SENSU_API_PASSWORD))).decode("ascii") headers = { 'X_REQUESTED_WITH' :'XMLHttpRequest', 'Accept': 'application/json, text/javascript, */*; q=0.01', 'Authorization' : 'Basic %s' % userAndPass } try: request = http.request('GET', API_URL, None, headers, preload_content=False) response = request.status if response == 200: reader = codecs.getreader('utf-8') data = json.load(reader(request)) request.release_conn() else: logger.error('response: %s' % str(response)) except: logger.error("sensu_client_list failed") raise subscriptions = [] [ r.delete(subscription) for subscription in r.keys("subscription_*") ] #[ cache.delete(client) for client in cache.keys("client_*") ] for object in data: cache.set('client_' + object['name'], object, timeout=settings.CACHE_CLIENT_TTL + 300) if 'subscriptions' in object: subscriptions.extend(object['subscriptions']) for subscription in object['subscriptions']: logger.debug("sensu_client_list update subscription_%s adding %s" % (subscription, object['name'])) r.rpush('subscription_' + subscription, object['name']) cache.set('subscriptions', list(set(subscriptions)), timeout=settings.CACHE_CLIENT_TTL + 300)
def sensu_check_list(): API_URL = settings.SENSU_API_URL + '/checks' userAndPass = base64.b64encode(str.encode("%s:%s" % (settings.SENSU_API_USER, settings.SENSU_API_PASSWORD))).decode("ascii") headers = { 'X_REQUESTED_WITH' :'XMLHttpRequest', 'Accept': 'application/json, text/javascript, */*; q=0.01', 'Authorization' : 'Basic %s' % userAndPass } try: request = http.request('GET', API_URL, None, headers, preload_content=False) response = request.status if response == 200: reader = codecs.getreader('utf-8') data = json.load(reader(request)) request.release_conn() else: logger.error('response: %s' % str(response)) except: logger.error("sensu_check_list failed") raise for object in data: logger.debug("sensu_check_list update check: %s" % object['name']) cache.set('check_' + object['name'], object, timeout=settings.CACHE_CHECK_TTL + 300)
def metadata(self): pathname = os.path.join(self.dirname, self.filename) name_ver = '%s-%s' % (self.name, self.version) info_dir = '%s.dist-info' % name_ver wrapper = codecs.getreader('utf-8') metadata_filename = posixpath.join(info_dir, METADATA_FILENAME) with ZipFile(pathname, 'r') as zf: try: with zf.open(metadata_filename) as bf: wf = wrapper(bf) result = Metadata(fileobj=wf) except KeyError: raise ValueError('Invalid wheel, because %s is ' 'missing' % METADATA_FILENAME) return result
def info(self): pathname = os.path.join(self.dirname, self.filename) name_ver = '%s-%s' % (self.name, self.version) info_dir = '%s.dist-info' % name_ver metadata_filename = posixpath.join(info_dir, 'WHEEL') wrapper = codecs.getreader('utf-8') with ZipFile(pathname, 'r') as zf: with zf.open(metadata_filename) as bf: wf = wrapper(bf) message = message_from_file(wf) result = dict(message) return result