我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用pkg_resources.resource_exists()。
def _load_sklearn_default_classifier(): if sys.version_info[0] == 2: file_name = "sklearn_classifier_py2.pklz" protocol = 2 else: file_name = "sklearn_classifier_py3.pklz" protocol = 3 file_path = resource_filename('sudokuextract.data', file_name) if resource_exists('sudokuextract.data', file_name): f = gzip.open(file_path, 'rb') classifier = pickle.load(f) f.close() else: classifier = KNeighborsClassifier(n_neighbors=10) classifier = fit_combined_classifier(classifier) f = gzip.open(file_path, 'wb') pickle.dump(classifier, f, protocol=protocol) f.close() return classifier
def _load_sudokuextract_default_classifier(): file_name = "sudokuextract_classifier.pklz" protocol = 2 file_path = resource_filename('sudokuextract.data', file_name) if resource_exists('sudokuextract.data', file_name): f = gzip.open(file_path, 'rb') classifier_json = pickle.load(f) classifier = KNeighborsClassifier(classifier_json.get('n_neighbors'), classifier_json.get('weights'), classifier_json.get('metric'), classifier_json.get('p')) classifier.fit(np.array(classifier_json.get('data')), np.array(classifier_json.get('labels'))) f.close() else: classifier = KNeighborsClassifier(n_neighbors=10) classifier = fit_combined_classifier(classifier) f = gzip.open(file_path, 'wb') pickle.dump(classifier.to_json(), f, protocol=protocol) f.close() return classifier
def validate_absolute_path(self, root, absolute_path): """ An override of :meth:`tornado.web.StaticFileHandler.validate_absolute_path`; Validate and returns the given *absolute_path* using `pkg_resources` if ``self.use_pkg`` is set otherwise performs a normal filesystem validation. """ # We have to generate the real absolute path in this method since the # Tornado devs--for whatever reason--decided that get_absolute_path() # must be a classmethod (we need access to self.use_pkg). if self.use_pkg: if not resource_exists(self.use_pkg, absolute_path): raise HTTPError(404) return resource_filename(self.use_pkg, absolute_path) return super( StaticHandler, self).validate_absolute_path(root, absolute_path)
def test_resource_api(easter_fixture): test_file = NamedTemporaryFile(mode='wb+') dirname, file_name = os.path.split(test_file.name) easter_fixture.stub_egg.location = dirname easter_fixture.stub_egg.activate() assert pkg_resources.resource_exists(easter_fixture.stub_egg.as_requirement(), file_name) assert not pkg_resources.resource_exists(easter_fixture.stub_egg.as_requirement(), 'IDoNotExist') contents = b'asdd ' test_file.write(contents) test_file.flush() as_string = pkg_resources.resource_string(easter_fixture.stub_egg.as_requirement(), file_name) assert as_string == contents as_file = pkg_resources.resource_stream(easter_fixture.stub_egg.as_requirement(), file_name) assert as_file.read() == contents
def resource_exists(package_or_requirement, resource_name): return False
def getResource(identifier, pkgname=__name__): """ Acquire a readable object for a given package name and identifier. An IOError will be raised if the resource can not be found. For example: mydata = getResource('mypkgdata.jpg').read() Note that the package name must be fully qualified, if given, such that it would be found in sys.modules. In some cases, getResource will return a real file object. In that case, it may be useful to use its name attribute to get the path rather than use it as a file-like object. For example, you may be handing data off to a C API. """ if resource_exists(pkgname, identifier): return resource_stream(pkgname, identifier) mod = sys.modules[pkgname] fn = getattr(mod, '__file__', None) if fn is None: raise IOError("%s has no __file__!" % repr(mod)) path = os.path.join(os.path.dirname(fn), identifier) if sys.version_info < (3, 3): loader = getattr(mod, '__loader__', None) if loader is not None: try: data = loader.get_data(path) except IOError: pass else: return BytesIO(data) return open(os.path.normpath(path), 'rb')
def _mnist_raw_data(): fname = resource_filename('sudokuextract.data', "train-images-idx3-ubyte.gz") if resource_exists('sudokuextract.data', "train-images-idx3-ubyte.gz"): f = gzip.open(fname, mode='rb') data = f.read() f.close() else: sio = StringIO(urlopen(_url_to_mnist_train_data).read()) sio.seek(0) f = gzip.GzipFile(fileobj=sio, mode='rb') data = f.read() f.close() try: sio.seek(0) with open(fname, 'wb') as f: f.write(sio.read()) except: pass correct_magic_number = 2051 magic_number = _toS32(data[:4]) if magic_number != correct_magic_number: raise ValueError("Error parsing images file. Read magic number {0} != {1}!".format( magic_number, correct_magic_number)) n_images = _toS32(data[4:8]) n_rows = _toS32(data[8:12]) n_cols = _toS32(data[12:16]) images = np.fromstring(data[16:], 'uint8').reshape(n_images, n_rows*n_cols) return [imrow.reshape(28, 28) for imrow in images]
def _mnist_raw_labels(): fname = resource_filename('sudokuextract.data', "train-labels-idx1-ubyte.gz") if resource_exists('sudokuextract.data', "train-labels-idx1-ubyte.gz"): f = gzip.open(fname, mode='rb') data = f.read() f.close() else: sio = StringIO(urlopen(_url_to_mnist_train_labels).read()) sio.seek(0) f = gzip.GzipFile(fileobj=sio, mode='rb') data = f.read() f.close() try: sio.seek(0) with open(fname, 'wb') as f: f.write(sio.read()) except: pass correct_magic_number = 2049 magic_number = _toS32(data[:4]) if magic_number != correct_magic_number: raise ValueError("Error parsing labels file. Read magic number {0} != {1}!".format( magic_number, correct_magic_number)) n_labels = _toS32(data[4:8]) return np.fromstring(data[8:], 'uint8')
def _sudokuextract_data(): fname = resource_filename('sudokuextract.data', "se-train-data.gz") if resource_exists('sudokuextract.data', "se-train-data.gz"): f = gzip.open(fname, mode='rb') data = np.load(f) f.close() else: raise IOError("SudokuExtract Training data file was not present!") return data
def _mnist_data(): fname = resource_filename('sudokuextract.data', "mnist-train-data.gz") if resource_exists('sudokuextract.data', "mnist-train-data.gz"): f = gzip.open(fname, mode='rb') data = np.load(f) f.close() else: raise IOError("MNIST Training data file was not present!") return data
def _mnist_labels(): fname = resource_filename('sudokuextract.data', "mnist-train-labels.gz") if resource_exists('sudokuextract.data', "mnist-train-labels.gz"): f = gzip.open(fname, mode='rb') data = np.load(f) f.close() else: raise IOError("MNIST Training labels file was not present!") return data
def _get_env(self, template): tmppath = os.path.join(_DEFAULT_RES_PATH, self._get_template_filename(template)) if resource_exists(__package__, tmppath): return Environment( loader=FileSystemLoader(resource_filename(__package__, _DEFAULT_RES_PATH)), trim_blocks=True ) else: raise FileNotFoundError('No such template')
def getwords(wordlists): """return a list of all unique words in all specified files""" words = [] for wl in wordlists: if os.path.exists(wl): words.extend(slurp(wl)) continue rp = 'wordlists/' + wl if pkg_resources.resource_exists(__name__, rp): words.extend(pkg_resources.resource_string( __name__, rp).decode('utf-8').splitlines()) continue click.echo('cannot find word list "{}"'.format(wl)) return list(set(words))
def send_extra(self): """ Sends any extra JS/CSS files placed in Gate One's 'static/extra' directory. Can be useful if you want to use Gate One's file synchronization and caching capabilities in your app. .. note:: You may have to create the 'static/extra' directory before putting files in there. """ #extra_path = resource_filename('gateone', 'static/extra') extra_path = os.path.join(getsettings('BASE_DIR'), 'static/extra') #if not resource_exists('gateone', '/static/extra'): #return # Nothing to do if not os.path.exists(extra_path): return # Nothing to do #for f in resource_listdir('gateone', '/static/extra'): #filepath = resource_filename('gateone', '/static/extra/%s' % f) #if filepath.endswith('.js'): #self.send_js(filepath, force=True) #elif filepath.endswith('.css'): #self.send_css(filepath, force=True) for f in os.listdir(extra_path): filepath = os.path.join(extra_path,f) if filepath.endswith('.js'): self.send_js(filepath, force=True) elif filepath.endswith('.css'): self.send_css(filepath, force=True)
def getResource(identifier, pkgname=__name__): """ Acquire a readable object for a given package name and identifier. An IOError will be raised if the resource can not be found. For example: mydata = getResource('mypkgdata.jpg').read() Note that the package name must be fully qualified, if given, such that it would be found in sys.modules. In some cases, getResource will return a real file object. In that case, it may be useful to use its name attribute to get the path rather than use it as a file-like object. For example, you may be handing data off to a C API. """ if resource_exists(pkgname, identifier): return resource_stream(pkgname, identifier) mod = sys.modules[pkgname] fn = getattr(mod, '__file__', None) if fn is None: raise IOError("%s has no __file__!" % repr(mod)) path = os.path.join(os.path.dirname(fn), identifier) loader = getattr(mod, '__loader__', None) if loader is not None: try: data = loader.get_data(path) except IOError: pass else: return BytesIO(data) return open(os.path.normpath(path), 'rb')
def test_resource_api_from_module_name(easter_fixture): test_file = NamedTemporaryFile(mode='wb+', suffix='.py') dirname, file_name = os.path.split(test_file.name) easter_fixture.stub_egg.location = dirname easter_fixture.stub_egg.activate() module_name = file_name.split('.')[0] assert pkg_resources.resource_exists(module_name, '') assert pkg_resources.resource_filename(module_name, '') == dirname
def exists(self): try: return pkg_resources.resource_exists(self.containing_package, self.relative_path.replace(os.sep, '/')) except ImportError as ex: if six.PY2 and str(ex).endswith(self.name): return False elif six.PY3 and ex.name == '.'.join([self.containing_package, self.name]): return False raise
def asset_exists(name: str) -> bool: """ Return True only if the desired asset is known to exist. """ if name: return pkg_resources.resource_exists(__name__, name) else: return False # don't allow access to directory
def get_release_info() -> str: """ Read the release file and return all lines bunched into one comma-separated value. Life's exciting. And short. But mostly exciting. """ if pkg_resources.resource_exists(__name__, "release.txt"): content = pkg_resources.resource_string(__name__, "release.txt") text = content.decode(errors='ignore').strip() lines = [line.strip() for line in text.split('\n')] release_info = ", ".join(lines) else: release_info = "Not available. Hint: release info will be created by upload_env.sh" return "Release information: " + release_info
def parameters_from_yaml(name, input_key=None, expected_key=None): package_name, resource_name = name.split('.', 1) resources = [] if resource_isdir(package_name, resource_name): resources.extend([resource_name + '/' + r for r in resource_listdir(package_name, resource_name) if r.endswith(('.yml', '.yaml'))]) elif resource_exists(package_name, resource_name + '.yml'): resources.append(resource_name + '.yml') elif resource_exists(package_name, resource_name + '.yaml'): resources.append(resource_name + '.yaml') if not resources: raise RuntimeError('Not able to load any yaml file for {0}'.format(name)) parameters = [] for resource_name in resources: with resource_stream(package_name, resource_name) as stream: data = yaml.load(stream, Loader=serializer.YAMLLoader) if input_key and expected_key: parameters.append((data[expected_key], data[input_key])) continue for root_key, root_value in data.items(): if isinstance(root_value, Mapping): for expected, data_input in root_value.items(): for properties in data_input if isinstance(data_input, (tuple, list)) else [data_input]: parameters.append((root_key, expected, properties)) else: for properties in root_value if isinstance(root_value, (tuple, list)) else [root_value]: parameters.append((root_key, properties)) return parameters
def _validate_config_files(self): """ Validates the configuration files necessary for the application. An exception is thrown if any of the required files are inaccessible. """ # Determine the module of the derived class mod = self.__class__.__module__ # If the configuration directory exists in the library, create config files as necessary # This check also provides backwards compatibility for projects that don't have the # configuration files in the library. if pkg_resources.resource_exists(mod, self.LIB_CONFIG_DIR): # Create configuration directory if not found if not os.access(self._config_dir, os.R_OK): logger.info("Configuration directory '{0}' not found, creating...".format(self._config_dir)) os.makedirs(self._config_dir) # Count of current configuration files config_files_count = len([name for name in os.listdir(self._config_dir) if os.path.isfile(os.path.join(self._config_dir, name))]) # Create configuration files if not found files = pkg_resources.resource_listdir(mod, self.LIB_APP_CONFIG_DIR) for f in files: config_path = os.path.join(self._config_dir, f) if not os.access(config_path, os.R_OK): f_lower = f.lower() # Copy configuration file. Only copy logging file if the directory was empty if not(f_lower.endswith(".py") or f_lower.endswith(".pyc")) and \ (f_lower != Application.LOGGING_CONFIG_FILE or config_files_count == 0): logger.info("Configuration file '{0}' not found, creating...".format(f)) shutil.copyfile(pkg_resources.resource_filename( mod, self.LIB_APP_CONFIG_DIR + "/" + f), config_path) if not os.access(self._dxlclient_config_path, os.R_OK): raise Exception( "Unable to access client configuration file: {0}".format( self._dxlclient_config_path)) if not os.access(self._app_config_path, os.R_OK): raise Exception( "Unable to access application configuration file: {0}".format( self._app_config_path))