我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pkgutil.get_data()。
def http_handler(self, request): if request.path.endswith("api.sock"): return await self.ws_handler(request) if request.path.endswith("/monitor/"): data = pkgutil.get_data("rci.services.monitor", "monitor.html").decode("utf8") return web.Response(text=data, content_type="text/html") if request.path.endswith("/login/github"): if request.method == "POST": url = self.oauth.generate_request_url(("read:org", )) return web.HTTPFound(url) if request.path.endswith("/oauth2/github"): return (await self._oauth2_handler(request)) if request.path.endswith("logout"): if request.method == "POST": sid = request.cookies.get(self.config["cookie_name"]) del(self.sessions[sid]) return web.HTTPFound("/monitor/") return web.HTTPNotFound()
def _monkey_patch_httplib2(extract_dir): """Patch things so that httplib2 works properly in a PAR. Manually extract certificates to file to make OpenSSL happy and avoid error: ssl.SSLError: [Errno 185090050] _ssl.c:344: error:0B084002:x509 ... Args: extract_dir: the directory into which we extract the necessary files. """ if os.path.isfile(httplib2.CA_CERTS): # Not inside of a PAR file, so don't bother. return cacerts_contents = pkgutil.get_data('httplib2', 'cacerts.txt') cacerts_filename = os.path.join(extract_dir, 'cacerts.txt') with open(cacerts_filename, 'wb') as f: f.write(cacerts_contents) httplib2.CA_CERTS = cacerts_filename
def get_filename(package, resource): """Rewrite of pkgutil.get_data() that return the file path. """ loader = pkgutil.get_loader(package) if loader is None or not hasattr(loader, 'get_data'): return None mod = sys.modules.get(package) or loader.load_module(package) if mod is None or not hasattr(mod, '__file__'): return None # Modify the resource name to be compatible with the loader.get_data # signature - an os.path format "filename" starting with the dirname of # the package's __file__ parts = resource.split('/') parts.insert(0, os.path.dirname(mod.__file__)) resource_name = os.path.normpath(os.path.join(*parts)) return resource_name
def install_kernel_resources(destination, resource='gnuplot_kernel', files=None): """ Copy the resource files to the kernelspec folder. """ if files is None: files = ['logo-64x64.png', 'logo-32x32.png'] for filename in files: try: data = pkgutil.get_data(resource, os.path.join('images', filename)) with open(os.path.join(destination, filename), 'wb') as fp: fp.write(data) except Exception as e: sys.stderr.write(str(e))
def _generate_form(self): font_xref = self._get_font_reference() seal_template = PDFTemplate(pkgutil.get_data("llpdf.resources", "seal.pdft")) seal_xref = seal_template.merge_into_pdf(self._pdf)["SealObject"] sign_template = PDFTemplate(pkgutil.get_data("llpdf.resources", "sign_form.pdft")) sign_template["FontXRef"] = font_xref sign_template["SealFormXRef"] = seal_xref signform_xref = sign_template.merge_into_pdf(self._pdf)["SignFormObject"] signform = self._pdf.lookup(signform_xref) signform.content[PDFName("/BBox")] = self._get_signature_bbox() signform_data = signform.stream.decode() (posx, posy, width, height) = self._get_signature_bbox() signform_vars = { "WIDTH": b"%.0f" % (width - 1), "HEIGHT": b"%.0f" % (height - 1), "TEXT": self._get_signing_text(), } for (varname, replacement) in signform_vars.items(): key = ("${" + varname + "}").encode("ascii") signform_data = signform_data.replace(key, replacement) signform.set_stream(EncodedObject.create(signform_data, compress = True)) return signform_xref
def initialize_kinto(loop, kinto_client, bucket, collection): """ Initialize the remote server with the initialization.yml file. """ # Leverage kinto-wizard async client. thread_pool = ThreadPoolExecutor() async_client = AsyncKintoClient(kinto_client, loop, thread_pool) initialization_manifest = pkgutil.get_data('buildhub', 'initialization.yml') config = yaml.safe_load(initialization_manifest) # Check that we push the records at the right place. if bucket not in config: raise ValueError(f"Bucket '{bucket}' not specified in `initialization.yml`.") if collection not in config[bucket]['collections']: raise ValueError(f"Collection '{collection}' not specified in `initialization.yml`.") await initialize_server(async_client, config, bucket=bucket, collection=collection, force=False)
def create_file_from_template (self, relpath, unique=False, template_name=None, append_data=None, subst=True, pathtype='temp'): """ Create file from app template using app's conf dict. If subst=False no template operations will be performed and the file is copied verbatim. """ if not template_name: tname = template_name = os.path.basename(relpath) else: tname = template_name # Try pkgutil resource locator tpath = os.path.join('apps', self.__class__.__name__, tname + '.template') filedata = pkgutil.get_data('trivup', tpath) if filedata is None: raise FileNotFoundError('Class %s resource %s not found' % ('trivup', tpath)) if subst: rendered = Template(filedata.decode('ascii')).substitute(self.conf) else: rendered = filedata.decode('ascii') if append_data is not None: rendered += '\n' + append_data return self.create_file(relpath, unique, data=rendered, pathtype=pathtype)
def test_load(self): sm = pkgutil.get_data('smeftrunner', 'tests/data/SMInput-CPV.dat').decode('utf-8') wc = pkgutil.get_data('smeftrunner', 'tests/data/WCsInput-CPV-SMEFT.dat').decode('utf-8') wcout = pkgutil.get_data('smeftrunner', 'tests/data/Output_SMEFTrunner.dat').decode('utf-8') io.sm_lha2dict(pylha.load(sm)) io.wc_lha2dict(pylha.load(wc)) CSM = io.sm_lha2dict(pylha.load(wcout)) C = io.wc_lha2dict(pylha.load(wcout)) C2 = io.wc_lha2dict(io.wc_dict2lha(C)) for k in C: npt.assert_array_equal(C[k], C2[k]) smeft = SMEFT() smeft.load_initial((wcout,)) for k in C: npt.assert_array_equal(definitions.symmetrize(C)[k], smeft.C_in[k], err_msg="Failed for {}".format(k)) for k in CSM: npt.assert_array_equal(definitions.symmetrize(CSM)[k], smeft.C_in[k], err_msg="Failed for {}".format(k)) CSM2 = io.sm_lha2dict(io.sm_dict2lha(CSM)) for k in CSM: npt.assert_array_equal(CSM[k], CSM2[k], err_msg="Failed for {}".format(k))
def get_source(self, environment, template_name): final_path = template_name if not template_name.startswith("/"): for tdir in self.template_dirs: full_path = os.path.join(tdir, template_name) if os.path.isfile(full_path): final_path = full_path break else: full_path = os.path.join(tdir, template_name + self.template_extension) if os.path.isfile(full_path): final_path = full_path break else: # See if parent can return it if self.parent_loader: return self.parent_loader.get_source(environment, template_name) else: source = pkgutil.get_data("onering", "data/templates/" + template_name).decode('utf-8') return source, final_path, lambda: True with file(final_path) as f: source = f.read().decode('utf-8') return source, final_path, lambda: mtime == getmtime(final_path)
def __init__(self, cls, name): self.cls = cls self.name = name self.filename = '%s.csv' % self.name self.keys_by_name = {} self.raw_by_key = {} self.processed_by_key = {} data = pkgutil.get_data(__name__, self.filename) buf = io.StringIO(data.decode('ascii'), newline=u'') reader = csv.DictReader(buf, lineterminator=u'\n') self.fieldnames = reader.fieldnames for raw in reader: key = self.cls(raw['key']) assert key not in self.raw_by_key self.raw_by_key[key] = raw name = self.name_from_raw(key, raw) assert name not in self.keys_by_name self.keys_by_name[name] = key self.accessor = KnowledgeAccessor(self)
def _load_notices(): lookup = lxml.etree.ElementNamespaceClassLookup() parser = lxml.etree.XMLParser() parser.set_element_class_lookup(lookup) ns = lookup.get_namespace(None) for severity in Severity: ns[severity.name] = Notice ns['title'] = Title ns['explain'] = Paragraph ns['exception'] = ExceptionDetails ns['var'] = Var ns['ref'] = Ref ns['cite'] = Cite ns['rfc'] = CiteRFC for tag in known_map: ns[tag] = Known notices_xml = pkgutil.get_data('httpolice', 'notices.xml') root = lxml.etree.fromstring(notices_xml, parser) r = {} for elem in root: if isinstance(elem, Notice): assert elem.id not in r r[elem.id] = elem return r, parser
def reduce_domains(domains): # reduce 'www.google.com' to 'google.com' # remove invalid domains tld_content = pkgutil.get_data('gfwlist2pac', 'resources/tld.txt') tlds = set(tld_content.splitlines(False)) new_domains = set() for domain in domains: domain_parts = domain.split('.') last_root_domain = None for i in xrange(0, len(domain_parts)): root_domain = '.'.join(domain_parts[len(domain_parts) - i - 1:]) if i == 0: if not tlds.__contains__(root_domain): # root_domain is not a valid tld break last_root_domain = root_domain if tlds.__contains__(root_domain): continue else: break if last_root_domain is not None: new_domains.add(last_root_domain) return new_domains
def generate_pac_precise(rules, proxy): def grep_rule(rule): if rule: if rule.startswith('!'): return None if rule.startswith('['): return None return rule return None # render the pac file proxy_content = pkgutil.get_data('gfwlist2pac', 'resources/abp.js') rules = filter(grep_rule, rules) proxy_content = proxy_content.replace('__PROXY__', json.dumps(str(proxy))) proxy_content = proxy_content.replace('__RULES__', json.dumps(rules, indent=2)) return proxy_content
def _read_emodic(self): """ Load emotion dictionaries """ self.emodic = {'emotem': {}, 'emotion': {}} # Reading dictionaries of syntactical indicator of emotiveness emotemy = ('interjections', 'exclamation', 'vulgar', 'endearments', 'emotikony', 'gitaigo') for emotem_class in emotemy: data = pkgutil.get_data('mlask', os.path.join('emotemes', '%s_uncoded.txt') % emotem_class) phrases = data.decode('utf8').splitlines() self.emodic['emotem'][emotem_class] = phrases # Reading dictionaries of emotion emotions = ('aware', 'haji', 'ikari', 'iya', 'kowa', 'odoroki', 'suki', 'takaburi', 'yasu', 'yorokobi') for emotion_class in emotions: data = pkgutil.get_data('mlask', os.path.join('emotions', '%s_uncoded.txt') % emotion_class) phrases = data.decode('utf8').splitlines() self.emodic['emotion'][emotion_class] = phrases
def has_sorted_training_set(self): try: pkgutil.get_data('numerai.data', 'r' + str(self.round_number) + '_numerai_sorted_training_data.csv') return True except IOError: return False
def getzoneinfofile_stream(): try: return BytesIO(get_data(__name__, _ZONEFILENAME)) except IOError as e: # TODO switch to FileNotFoundError? warnings.warn("I/O error({0}): {1}".format(e.errno, e.strerror)) return None
def _http_settings(self, request): import jinja2 template = jinja2.Template( pkgutil.get_data("rci.services.github", "github_settings.html").decode("utf8")) client = self._get_client(request) if client is None: return web.HTTPUnauthorized(text="fail") orgs = [] for org in (await client.get("user/orgs")): orgs.append(org) return web.Response(text=template.render(orgs=orgs), content_type="text/html")
def pkgdata(name): data = pkgutil.get_data("pythonwhois", name) if sys.version_info < (3, 0): return data else: return data.decode("utf-8")
def get_content(file_name, pkg_name='onedrivee', is_text=True): """ Read a resource file in data/. :param str file_name: :param str pkg_name: :param True | False is_text: True to indicate the text is UTF-8 encoded. :return str | bytes: Content of the file. """ content = pkgutil.get_data(pkg_name, 'store/' + file_name) if is_text: content = content.decode('utf-8') return content
def getzoneinfofile_stream(): try: return BytesIO(get_data(__name__, ZONEFILENAME)) except IOError as e: # TODO switch to FileNotFoundError? warnings.warn("I/O error({0}): {1}".format(e.errno, e.strerror)) return None
def write_images(self): """Write the SVG images.""" for path in ('file.svg', 'back.svg'): source = pkgutil.get_data('flake8_html', 'images/' + path) outpath = os.path.join(self.outdir, path) with open(outpath, 'wb') as f: f.write(source)
def lib(): print('In dir_shadowing_lib.py lib()') # Test resource extraction lib_dat = pkgutil.get_data('test_dir_shadowing', 'dir_shadowing_lib_dat.txt') assert (lib_dat == b'Dummy data file for dir_shadowing_lib.py\n'), lib_dat
def main(): print('In dir_shadowing_main.py main()') dir_shadowing_lib.lib() # Test resource extraction dat = pkgutil.get_data('test_dir_shadowing', 'dir_shadowing_main_dat.txt') assert (dat == b'Dummy data file for dir_shadowing_main.py\n'), dat
def main(): a.main() a_lib.lib() print('In b.py main()') # Test resource extraction b_dat = pkgutil.get_data('subpar.tests.package_b', 'b_dat.txt') assert (b_dat == b'Dummy data file for b.py\n'), b_dat
def lib(): print('In a_lib.py lib()') # Test resource extraction a_lib_dat = pkgutil.get_data('subpar.tests.package_a', 'a_lib_dat.txt') assert (a_lib_dat == b'Dummy data file for a_lib.py\n'), a_lib_dat
def main(): print('In a.py main()') # Test resource extraction a_dat = pkgutil.get_data('subpar.tests.package_a', 'a_dat.txt') assert (a_dat == b'Dummy data file for a.py\n'), a_dat
def _generate_from_template(destination, **params): template = pkgutil.get_data(__name__, os.path.join( 'resources', 'logrotate')) pretty_params = json.dumps(params, indent=4, sort_keys=True) lgr.debug('Rendering logrotate with params: {0}...'.format(pretty_params)) generated = jinja2.Environment().from_string(template).render(**params) lgr.debug('Writing generated file to {0}...'.format(destination)) with open(destination, 'w') as f: f.write(generated)
def get_default_config(self): try: return get_data(__package__, 'default_{section}.conf'.format(section=self.SECTION)) except IOError: pass
def _gen_ffbinary(self, ffname): bin_data = pkgutil.get_data("bin", ffname) temp = tempfile.NamedTemporaryFile(delete=False) temp.write(bin_data) temp.close() # chmod +x os.chmod(temp.name, os.stat(temp.name).st_mode | stat.S_IEXEC) return temp
def test_getdata_filesys(self): pkg = 'test_getdata_filesys' # Include a LF and a CRLF, to test that binary data is read back RESOURCE_DATA = b'Hello, world!\nSecond line\r\nThird line' # Make a package with some resources package_dir = os.path.join(self.dirname, pkg) os.mkdir(package_dir) # Empty init.py f = open(os.path.join(package_dir, '__init__.py'), "wb") f.close() # Resource files, res.txt, sub/res.txt f = open(os.path.join(package_dir, 'res.txt'), "wb") f.write(RESOURCE_DATA) f.close() os.mkdir(os.path.join(package_dir, 'sub')) f = open(os.path.join(package_dir, 'sub', 'res.txt'), "wb") f.write(RESOURCE_DATA) f.close() # Check we can read the resources res1 = pkgutil.get_data(pkg, 'res.txt') self.assertEqual(res1, RESOURCE_DATA) res2 = pkgutil.get_data(pkg, 'sub/res.txt') self.assertEqual(res2, RESOURCE_DATA) del sys.modules[pkg]
def test_getdata_zipfile(self): zip = 'test_getdata_zipfile.zip' pkg = 'test_getdata_zipfile' # Include a LF and a CRLF, to test that binary data is read back RESOURCE_DATA = b'Hello, world!\nSecond line\r\nThird line' # Make a package with some resources zip_file = os.path.join(self.dirname, zip) z = zipfile.ZipFile(zip_file, 'w') # Empty init.py z.writestr(pkg + '/__init__.py', "") # Resource files, res.txt, sub/res.txt z.writestr(pkg + '/res.txt', RESOURCE_DATA) z.writestr(pkg + '/sub/res.txt', RESOURCE_DATA) z.close() # Check we can read the resources sys.path.insert(0, zip_file) res1 = pkgutil.get_data(pkg, 'res.txt') self.assertEqual(res1, RESOURCE_DATA) res2 = pkgutil.get_data(pkg, 'sub/res.txt') self.assertEqual(res2, RESOURCE_DATA) names = [] for loader, name, ispkg in pkgutil.iter_modules([zip_file]): names.append(name) self.assertEqual(names, ['test_getdata_zipfile']) del sys.path[0] del sys.modules[pkg]
def get_data(self, path): return "Hello, world!"
def test_getdata_pep302(self): # Use a dummy importer/loader self.assertEqual(pkgutil.get_data('foo', 'dummy'), "Hello, world!") del sys.modules['foo']
def test_alreadyloaded(self): # Ensure that get_data works without reloading - the "loads" module # variable in the example loader should count how many times a reload # occurs. import foo self.assertEqual(foo.loads, 1) self.assertEqual(pkgutil.get_data('foo', 'dummy'), "Hello, world!") self.assertEqual(foo.loads, 1) del sys.modules['foo']
def get_version_file(): """ Looks for a file VERSION in the package data and returns the contents in this. Does not check consistency. """ return pkgutil.get_data('kolibri', 'VERSION').decode('utf-8')
def load_dataset(name): """Load example dataset. If seaborn is present, its datasets can be loaded. Physt also includes some datasets in CSV format. Parameters ---------- name : str Returns ------- dataset : pandas.DataFrame """ # Our custom datasets: try: try: import pandas as pd except ImportError: raise RuntimeError("Pandas not installed.") import pkgutil import io binary_data = pkgutil.get_data('physt', 'examples/{0}.csv'.format(name)) return pd.read_csv(io.BytesIO(binary_data)) except FileNotFoundError: pass # Seaborn datasets? try: import seaborn as sns import warnings with warnings.catch_warnings(): warnings.simplefilter("ignore") if name in sns.get_dataset_names(): return sns.load_dataset(name) except ImportError: pass # Fall through raise RuntimeError("Dataset {0} not available.".format(name))
def get_resource(rel_path, pkg_name='onedrived', is_text=True): """ Read a resource file in data/. :param str rel_path: :param str pkg_name: :param True | False is_text: True to indicate the text is UTF-8 encoded. :return str | bytes: Content of the file. """ content = pkgutil.get_data(pkg_name, rel_path) if is_text: content = content.decode('utf-8') return content
def load_json_mock(mock_name: str) -> object: """Load a JSON mock from package data. Arguments: mock_name: Returns: ... """ mock_path = ('osp_api', 'mocks/%s.json' % mock_name) mock_obj = json.loads(pkgutil.get_data(*mock_path).decode('utf-8')) return mock_obj
def try_load_pkg_data(name): try: return pkgutil.get_data(__name__, name) except ValueError: return open(name, "rb").read()
def config_template_contents(self): return pkgutil.get_data('bridgy', 'config/samples/' + self.config_template_path)
def test_getdata_filesys(self): pkg = 'test_getdata_filesys' # Include a LF and a CRLF, to test that binary data is read back RESOURCE_DATA = 'Hello, world!\nSecond line\r\nThird line' # Make a package with some resources package_dir = os.path.join(self.dirname, pkg) os.mkdir(package_dir) # Empty init.py f = open(os.path.join(package_dir, '__init__.py'), "wb") f.close() # Resource files, res.txt, sub/res.txt f = open(os.path.join(package_dir, 'res.txt'), "wb") f.write(RESOURCE_DATA) f.close() os.mkdir(os.path.join(package_dir, 'sub')) f = open(os.path.join(package_dir, 'sub', 'res.txt'), "wb") f.write(RESOURCE_DATA) f.close() # Check we can read the resources res1 = pkgutil.get_data(pkg, 'res.txt') self.assertEqual(res1, RESOURCE_DATA) res2 = pkgutil.get_data(pkg, 'sub/res.txt') self.assertEqual(res2, RESOURCE_DATA) del sys.modules[pkg]
def test_getdata_zipfile(self): zip = 'test_getdata_zipfile.zip' pkg = 'test_getdata_zipfile' # Include a LF and a CRLF, to test that binary data is read back RESOURCE_DATA = 'Hello, world!\nSecond line\r\nThird line' # Make a package with some resources zip_file = os.path.join(self.dirname, zip) z = zipfile.ZipFile(zip_file, 'w') # Empty init.py z.writestr(pkg + '/__init__.py', "") # Resource files, res.txt, sub/res.txt z.writestr(pkg + '/res.txt', RESOURCE_DATA) z.writestr(pkg + '/sub/res.txt', RESOURCE_DATA) z.close() # Check we can read the resources sys.path.insert(0, zip_file) res1 = pkgutil.get_data(pkg, 'res.txt') self.assertEqual(res1, RESOURCE_DATA) res2 = pkgutil.get_data(pkg, 'sub/res.txt') self.assertEqual(res2, RESOURCE_DATA) del sys.path[0] del sys.modules[pkg]