我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sys.path()。
def pip_execute(*args, **kwargs): """Overriden pip_execute() to stop sys.path being changed. The act of importing main from the pip module seems to cause add wheels from the /usr/share/python-wheels which are installed by various tools. This function ensures that sys.path remains the same after the call is executed. """ try: _path = sys.path try: from pip import main as _pip_execute except ImportError: apt_update() apt_install('python-pip') from pip import main as _pip_execute _pip_execute(*args, **kwargs) finally: sys.path = _path
def shared_locations(self): """ A dictionary of shared locations whose keys are in the set 'prefix', 'purelib', 'platlib', 'scripts', 'headers', 'data' and 'namespace'. The corresponding value is the absolute path of that category for this distribution, and takes into account any paths selected by the user at installation time (e.g. via command-line arguments). In the case of the 'namespace' key, this would be a list of absolute paths for the roots of namespace packages in this distribution. The first time this property is accessed, the relevant information is read from the SHARED file in the .dist-info directory. """ result = {} shared_path = os.path.join(self.path, 'SHARED') if os.path.isfile(shared_path): with codecs.open(shared_path, 'r', encoding='utf-8') as f: lines = f.read().splitlines() for line in lines: key, value = line.split('=', 1) if key == 'namespace': result.setdefault(key, []).append(value) else: result[key] = value return result
def _warn_unsafe_extraction_path(path): """ If the default extraction path is overridden and set to an insecure location, such as /tmp, it opens up an opportunity for an attacker to replace an extracted file with an unauthorized payload. Warn the user if a known insecure location is used. See Distribute #375 for more details. """ if os.name == 'nt' and not path.startswith(os.environ['windir']): # On Windows, permissions are generally restrictive by default # and temp directories are not writable by other users, so # bypass the warning. return mode = os.stat(path).st_mode if mode & stat.S_IWOTH or mode & stat.S_IWGRP: msg = ("%s is writable by group/others and vulnerable to attack " "when " "used with get_resource_filename. Consider a more secure " "location (set with .set_extraction_path or the " "PYTHON_EGG_CACHE environment variable)." % path) warnings.warn(msg, UserWarning)
def zipdir(path, zipn): for root, dirs, files in os.walk(path): for file in files: zipn.write(os.path.join(root, file))
def atomic_writer(file_path, mode): """Atomic file writer. :param file_path: path of file to write to. :type file_path: ``unicode`` :param mode: sames as for `func:open` :type mode: string .. versionadded:: 1.12 Context manager that ensures the file is only written if the write succeeds. The data is first written to a temporary file. """ temp_suffix = '.aw.temp' temp_file_path = file_path + temp_suffix with open(temp_file_path, mode) as file_obj: try: yield file_obj os.rename(temp_file_path, file_path) finally: try: os.remove(temp_file_path) except (OSError, IOError): pass
def cachedir(self): """Path to workflow's cache directory. The cache directory is a subdirectory of Alfred's own cache directory in ``~/Library/Caches``. The full path is: ``~/Library/Caches/com.runningwithcrayons.Alfred-X/Workflow Data/<bundle id>`` ``Alfred-X`` may be ``Alfred-2`` or ``Alfred-3``. :returns: full path to workflow's cache directory :rtype: ``unicode`` """ if self.alfred_env.get('workflow_cache'): dirpath = self.alfred_env.get('workflow_cache') else: dirpath = self._default_cachedir return self._create(dirpath)
def datadir(self): """Path to workflow's data directory. The data directory is a subdirectory of Alfred's own data directory in ``~/Library/Application Support``. The full path is: ``~/Library/Application Support/Alfred 2/Workflow Data/<bundle id>`` :returns: full path to workflow data directory :rtype: ``unicode`` """ if self.alfred_env.get('workflow_data'): dirpath = self.alfred_env.get('workflow_data') else: dirpath = self._default_datadir return self._create(dirpath)
def _delete_directory_contents(self, dirpath, filter_func): """Delete all files in a directory. :param dirpath: path to directory to clear :type dirpath: ``unicode`` or ``str`` :param filter_func function to determine whether a file shall be deleted or not. :type filter_func ``callable`` """ if os.path.exists(dirpath): for filename in os.listdir(dirpath): if not filter_func(filename): continue path = os.path.join(dirpath, filename) if os.path.isdir(path): shutil.rmtree(path) else: os.unlink(path) self.logger.debug('Deleted : %r', path)
def logic(data): # ???????? #print data f1 = open('/tmp/test.txt','a') f1.write(data) f1.write('\n') f1.close() data = json.loads(data) print type(data) mounts = data["MOUNT"] netifaces = data["NET"] for netif in netifaces: if netif["status"] == "up": ip = netif["ip"] #print netifaces['ip'] for mount in mounts: Mount_point= mount['path'] use_rate=mount['used_rate'] if use_rate > '70': send_mail(recvmail_conf['addr'],"mount point"+Mount_point ,used_rate) return("OK")
def removeduppaths(): """ Remove duplicate entries from sys.path along with making them absolute""" # This ensures that the initial path provided by the interpreter contains # only absolute pathnames, even if we're running from the build directory. L = [] known_paths = set() for dir in sys.path: # Filter out duplicate paths (on case-insensitive file systems also # if they only differ in case); turn relative paths into absolute # paths. dir, dircase = makepath(dir) if not dircase in known_paths: L.append(dir) known_paths.add(dircase) sys.path[:] = L return known_paths # XXX This should not be part of site.py, since it is needed even when # using the -S option for Python. See http://www.python.org/sf/586680
def addsitedir(sitedir, known_paths=None): """Add 'sitedir' argument to sys.path if missing and handle .pth files in 'sitedir'""" if known_paths is None: known_paths = _init_pathinfo() reset = 1 else: reset = 0 sitedir, sitedircase = makepath(sitedir) if not sitedircase in known_paths: sys.path.append(sitedir) # Add path component try: names = os.listdir(sitedir) except os.error: return names.sort() for name in names: if name.endswith(os.extsep + "pth"): addpackage(sitedir, name, known_paths) if reset: known_paths = None return known_paths
def __setup(self): if self.__lines: return data = None for dir in self.__dirs: for filename in self.__files: filename = os.path.join(dir, filename) try: fp = open(filename, "rU") data = fp.read() fp.close() break except IOError: pass if data: break if not data: data = self.__data self.__lines = data.split('\n') self.__linecnt = len(self.__lines)
def setcopyright(): """Set 'copyright' and 'credits' in __builtin__""" builtins.copyright = _Printer("copyright", sys.copyright) if _is_jython: builtins.credits = _Printer( "credits", "Jython is maintained by the Jython developers (www.jython.org).") elif _is_pypy: builtins.credits = _Printer( "credits", "PyPy is maintained by the PyPy developers: http://pypy.org/") else: builtins.credits = _Printer("credits", """\ Thanks to CWI, CNRI, BeOpen.com, Zope Corporation and a cast of thousands for supporting Python development. See www.python.org for more information.""") here = os.path.dirname(os.__file__) builtins.license = _Printer( "license", "See http://www.python.org/%.3s/license.html" % sys.version, ["LICENSE.txt", "LICENSE"], [os.path.join(here, os.pardir), here, os.curdir])
def __init__(self, path=None, include_egg=False): """ Create an instance from a path, optionally including legacy (distutils/ setuptools/distribute) distributions. :param path: The path to use, as a list of directories. If not specified, sys.path is used. :param include_egg: If True, this instance will look for and return legacy distributions as well as those based on PEP 376. """ if path is None: path = sys.path self.path = path self._include_dist = True self._include_egg = include_egg self._cache = _Cache() self._cache_egg = _Cache() self._cache_enabled = True self._scheme = get_scheme('default')
def _generate_cache(self): """ Scan the path for distributions and populate the cache with those that are found. """ gen_dist = not self._cache.generated gen_egg = self._include_egg and not self._cache_egg.generated if gen_dist or gen_egg: for dist in self._yield_distributions(): if isinstance(dist, InstalledDistribution): self._cache.add(dist) else: self._cache_egg.add(dist) if gen_dist: self._cache.generated = True if gen_egg: self._cache_egg.generated = True
def get_distributions(self): """ Provides an iterator that looks for distributions and returns :class:`InstalledDistribution` or :class:`EggInfoDistribution` instances for each one of them. :rtype: iterator of :class:`InstalledDistribution` and :class:`EggInfoDistribution` instances """ if not self._cache_enabled: for dist in self._yield_distributions(): yield dist else: self._generate_cache() for dist in self._cache.path.values(): yield dist if self._include_egg: for dist in self._cache_egg.path.values(): yield dist
def get_distribution(self, name): """ Looks for a named distribution on the path. This function only returns the first result found, as no more than one value is expected. If nothing is found, ``None`` is returned. :rtype: :class:`InstalledDistribution`, :class:`EggInfoDistribution` or ``None`` """ result = None name = name.lower() if not self._cache_enabled: for dist in self._yield_distributions(): if dist.key == name: result = dist break else: self._generate_cache() if name in self._cache.name: result = self._cache.name[name][0] elif self._include_egg and name in self._cache_egg.name: result = self._cache_egg.name[name][0] return result
def _get_records(self): """ Get the list of installed files for the distribution :return: A list of tuples of path, hash and size. Note that hash and size might be ``None`` for some entries. The path is exactly as stored in the file (which is as in PEP 376). """ results = [] r = self.get_distinfo_resource('RECORD') with contextlib.closing(r.as_stream()) as stream: with CSVReader(stream=stream) as record_reader: # Base location is parent dir of .dist-info dir #base_location = os.path.dirname(self.path) #base_location = os.path.abspath(base_location) for row in record_reader: missing = [None for i in range(len(row), 3)] path, checksum, size = row + missing #if not os.path.isabs(path): # path = path.replace('/', os.sep) # path = os.path.join(base_location, path) results.append((path, checksum, size)) return results
def get_resource_path(self, relative_path): """ NOTE: This API may change in the future. Return the absolute path to a resource file with the given relative path. :param relative_path: The path, relative to .dist-info, of the resource of interest. :return: The absolute path where the resource is to be found. """ r = self.get_distinfo_resource('RESOURCES') with contextlib.closing(r.as_stream()) as stream: with CSVReader(stream=stream) as resources_reader: for relative, destination in resources_reader: if relative == relative_path: return destination raise KeyError('no resource file with relative path %r ' 'is installed' % relative_path)
def write_shared_locations(self, paths, dry_run=False): """ Write shared location information to the SHARED file in .dist-info. :param paths: A dictionary as described in the documentation for :meth:`shared_locations`. :param dry_run: If True, the action is logged but no file is actually written. :return: The path of the file written to. """ shared_path = os.path.join(self.path, 'SHARED') logger.info('creating %s', shared_path) if dry_run: return None lines = [] for key in ('prefix', 'lib', 'headers', 'scripts', 'data'): path = paths[key] if os.path.isdir(paths[key]): lines.append('%s=%s' % (key, path)) for ns in paths.get('namespace', ()): lines.append('namespace=%s' % ns) with codecs.open(shared_path, 'w', encoding='utf-8') as f: f.write('\n'.join(lines)) return shared_path
def __init__(self, path, env=None): def set_name_and_version(s, n, v): s.name = n s.key = n.lower() # for case-insensitive comparisons s.version = v self.path = path self.dist_path = env if env and env._cache_enabled and path in env._cache_egg.path: metadata = env._cache_egg.path[path].metadata set_name_and_version(self, metadata.name, metadata.version) else: metadata = self._get_metadata(path) # Need to be set before caching set_name_and_version(self, metadata.name, metadata.version) if env and env._cache_enabled: env._cache_egg.add(self) super(EggInfoDistribution, self).__init__(metadata, path, env)
def check_installed_files(self): """ Checks that the hashes and sizes of the files in ``RECORD`` are matched by the files themselves. Returns a (possibly empty) list of mismatches. Each entry in the mismatch list will be a tuple consisting of the path, 'exists', 'size' or 'hash' according to what didn't match (existence is checked first, then size, then hash), the expected value and the actual value. """ mismatches = [] record_path = os.path.join(self.path, 'installed-files.txt') if os.path.exists(record_path): for path, _, _ in self.list_installed_files(): if path == record_path: continue if not os.path.exists(path): mismatches.append((path, 'exists', True, False)) return mismatches
def _build_from_requirements(cls, req_spec): """ Build a working set from a requirement spec. Rewrites sys.path. """ # try it without defaults already on sys.path # by starting with an empty path ws = cls([]) reqs = parse_requirements(req_spec) dists = ws.resolve(reqs, Environment()) for dist in dists: ws.add(dist) # add any missing entries from sys.path for entry in sys.path: if entry not in ws.entries: ws.add_entry(entry) # then copy back to sys.path sys.path[:] = ws.entries return ws
def __iter__(self): """Yield distributions for non-duplicate projects in the working set The yield order is the order in which the items' path entries were added to the working set. """ seen = {} for item in self.entries: if item not in self.entry_keys: # workaround a cache issue continue for key in self.entry_keys[item]: if key not in seen: seen[key] = 1 yield self.by_key[key]
def __init__(self, search_path=None, platform=get_supported_platform(), python=PY_MAJOR): """Snapshot distributions available on a search path Any distributions found on `search_path` are added to the environment. `search_path` should be a sequence of ``sys.path`` items. If not supplied, ``sys.path`` is used. `platform` is an optional string specifying the name of the platform that platform-specific distributions must be compatible with. If unspecified, it defaults to the current platform. `python` is an optional string naming the desired version of Python (e.g. ``'3.3'``); it defaults to the current version. You may explicitly set `platform` (and/or `python`) to ``None`` if you wish to map *all* distributions, not just those compatible with the running platform or Python version. """ self._distmap = {} self.platform = platform self.python = python self.scan(search_path)
def get_cache_path(self, archive_name, names=()): """Return absolute location in cache for `archive_name` and `names` The parent directory of the resulting path will be created if it does not already exist. `archive_name` should be the base filename of the enclosing egg (which may not be the name of the enclosing zipfile!), including its ".egg" extension. `names`, if provided, should be a sequence of path name parts "under" the egg's extraction location. This method should only be called by resource providers that need to obtain an extraction location, and only for names they intend to extract, as it tracks the generated names for possible cleanup later. """ extract_path = self.extraction_path or get_default_cache() target_path = os.path.join(extract_path, archive_name + '-tmp', *names) try: _bypass_ensure_directory(target_path) except: self.extraction_error() self._warn_unsafe_extraction_path(extract_path) self.cached_files[target_path] = 1 return target_path
def set_extraction_path(self, path): """Set the base path where resources will be extracted to, if needed. If you do not call this routine before any extractions take place, the path defaults to the return value of ``get_default_cache()``. (Which is based on the ``PYTHON_EGG_CACHE`` environment variable, with various platform-specific fallbacks. See that routine's documentation for more details.) Resources are extracted to subdirectories of this path based upon information given by the ``IResourceProvider``. You may set this to a temporary directory, but then you must call ``cleanup_resources()`` to delete the extracted files when done. There is no guarantee that ``cleanup_resources()`` will be able to remove all extracted files. (Note: you may not change the extraction path for a given resource manager once resources have been extracted, unless you first call ``cleanup_resources()``.) """ if self.cached_files: raise ValueError( "Can't change extraction path, files already extracted" ) self.extraction_path = path
def find_eggs_in_zip(importer, path_item, only=False): """ Find eggs in zip files; possibly multiple nested eggs. """ if importer.archive.endswith('.whl'): # wheels are not supported with this finder # they don't have PKG-INFO metadata, and won't ever contain eggs return metadata = EggMetadata(importer) if metadata.has_metadata('PKG-INFO'): yield Distribution.from_filename(path_item, metadata=metadata) if only: # don't yield nested distros return for subitem in metadata.resource_listdir('/'): if _is_unpacked_egg(subitem): subpath = os.path.join(path_item, subitem) for dist in find_eggs_in_zip(zipimport.zipimporter(subpath), subpath): yield dist
def _by_version_descending(names): """ Given a list of filenames, return them in descending order by version number. >>> names = 'bar', 'foo', 'Python-2.7.10.egg', 'Python-2.7.2.egg' >>> _by_version_descending(names) ['Python-2.7.10.egg', 'Python-2.7.2.egg', 'foo', 'bar'] >>> names = 'Setuptools-1.2.3b1.egg', 'Setuptools-1.2.3.egg' >>> _by_version_descending(names) ['Setuptools-1.2.3.egg', 'Setuptools-1.2.3b1.egg'] >>> names = 'Setuptools-1.2.3b1.egg', 'Setuptools-1.2.3.post1.egg' >>> _by_version_descending(names) ['Setuptools-1.2.3.post1.egg', 'Setuptools-1.2.3b1.egg'] """ def _by_version(name): """ Parse each component of the filename """ name, ext = os.path.splitext(name) parts = itertools.chain(name.split('-'), [ext]) return [packaging.version.parse(part) for part in parts] return sorted(names, key=_by_version, reverse=True)
def register_namespace_handler(importer_type, namespace_handler): """Register `namespace_handler` to declare namespace packages `importer_type` is the type or class of a PEP 302 "Importer" (sys.path item handler), and `namespace_handler` is a callable like this:: def namespace_handler(importer, path_entry, moduleName, module): # return a path_entry to use for child packages Namespace handlers are only called if the importer object has already agreed that it can handle the relevant path item, and they should only return a subpath if the module __path__ does not already contain an equivalent subpath. For an example namespace handler, see ``pkg_resources.file_ns_handler``. """ _namespace_handlers[importer_type] = namespace_handler
def check_version_conflict(self): if self.key == 'setuptools': # ignore the inevitable setuptools self-conflicts :( return nsp = dict.fromkeys(self._get_metadata('namespace_packages.txt')) loc = normalize_path(self.location) for modname in self._get_metadata('top_level.txt'): if (modname not in sys.modules or modname in nsp or modname in _namespace_packages): continue if modname in ('pkg_resources', 'setuptools', 'site'): continue fn = getattr(sys.modules[modname], '__file__', None) if fn and (normalize_path(fn).startswith(loc) or fn.startswith(self.location)): continue issue_warning( "Module %s was already imported from %s, but %s is being added" " to sys.path" % (modname, fn, self.location), )
def setup_py(self): assert self.source_dir, "No source dir for %s" % self try: import setuptools # noqa except ImportError: if get_installed_version('setuptools') is None: add_msg = "Please install setuptools." else: add_msg = traceback.format_exc() # Setuptools is not available raise InstallationError( "Could not import setuptools which is required to " "install from a source distribution.\n%s" % add_msg ) setup_py = os.path.join(self.setup_py_dir, 'setup.py') # Python2 __file__ should not be unicode if six.PY2 and isinstance(setup_py, six.text_type): setup_py = setup_py.encode(sys.getfilesystemencoding()) return setup_py
def run_script(self, script_name, namespace): script = 'scripts/' + script_name if not self.has_metadata(script): raise ResolutionError("No script named %r" % script_name) script_text = self.get_metadata(script).replace('\r\n', '\n') script_text = script_text.replace('\r', '\n') script_filename = self._fn(self.egg_info, script) namespace['__file__'] = script_filename if os.path.exists(script_filename): source = open(script_filename).read() code = compile(source, script_filename, 'exec') exec(code, namespace, namespace) else: from linecache import cache cache[script_filename] = ( len(script_text), 0, script_text.split('\n'), script_filename ) script_code = compile(script_text, script_filename, 'exec') exec(script_code, namespace, namespace)
def build(cls, path): """ Build a dictionary similar to the zipimport directory caches, except instead of tuples, store ZipInfo objects. Use a platform-specific path separator (os.sep) for the path keys for compatibility with pypy on Windows. """ with ContextualZipFile(path) as zfile: items = ( ( name.replace('/', os.sep), zfile.getinfo(name), ) for name in zfile.namelist() ) return dict(items)
def _index(self): try: return self._dirindex except AttributeError: ind = {} for path in self.zipinfo: parts = path.split(os.sep) while parts: parent = os.sep.join(parts[:-1]) if parent in ind: ind[parent].append(parts[-1]) break else: ind[parent] = [parts.pop()] self._dirindex = ind return ind
def find_eggs_in_zip(importer, path_item, only=False): """ Find eggs in zip files; possibly multiple nested eggs. """ if importer.archive.endswith('.whl'): # wheels are not supported with this finder # they don't have PKG-INFO metadata, and won't ever contain eggs return metadata = EggMetadata(importer) if metadata.has_metadata('PKG-INFO'): yield Distribution.from_filename(path_item, metadata=metadata) if only: # don't yield nested distros return for subitem in metadata.resource_listdir('/'): if _is_egg_path(subitem): subpath = os.path.join(path_item, subitem) for dist in find_eggs_in_zip(zipimport.zipimporter(subpath), subpath): yield dist elif subitem.lower().endswith('.dist-info'): subpath = os.path.join(path_item, subitem) submeta = EggMetadata(zipimport.zipimporter(subpath)) submeta.egg_info = subpath yield Distribution.from_location(path_item, subitem, submeta)
def _handle_ns(packageName, path_item): """Ensure that named package includes a subpath of path_item (if needed)""" importer = get_importer(path_item) if importer is None: return None loader = importer.find_module(packageName) if loader is None: return None module = sys.modules.get(packageName) if module is None: module = sys.modules[packageName] = types.ModuleType(packageName) module.__path__ = [] _set_parent_ns(packageName) elif not hasattr(module, '__path__'): raise TypeError("Not a package:", packageName) handler = _find_adapter(_namespace_handlers, importer) subpath = handler(importer, path_item, packageName, module) if subpath is not None: path = module.__path__ path.append(subpath) loader.load_module(packageName) _rebuild_mod_path(path, packageName, module) return subpath
def pip_install(package, fatal=False, upgrade=False, venv=None, **options): """Install a python package""" if venv: venv_python = os.path.join(venv, 'bin/pip') command = [venv_python, "install"] else: command = ["install"] available_options = ('proxy', 'src', 'log', 'index-url', ) for option in parse_options(options, available_options): command.append(option) if upgrade: command.append('--upgrade') if isinstance(package, list): command.extend(package) else: command.append(package) log("Installing {} package with options: {}".format(package, command)) if venv: subprocess.check_call(command) else: pip_execute(command)
def pip_create_virtualenv(path=None): """Create an isolated Python environment.""" apt_install('python-virtualenv') if path: venv_path = path else: venv_path = os.path.join(charm_dir(), 'venv') if not os.path.exists(venv_path): subprocess.check_call(['virtualenv', venv_path])
def get_cwd(): path = os.getcwd() path = path + '>' return path
def append_slash_if_dir(p): if p and os.path.isdir(p) and p[-1] != os.sep: return p + os.sep else: return p
def find_path(text, line, begidx, endidx, \ dir_only=False, files_only=False, exe_only=False,\ py_only=False, uploads=False, all_dir=False): cur_dir = os.getcwd() before_arg = line.rfind(" ", 0, begidx) if before_arg == -1: return # arg not found fixed = line[before_arg+1:begidx] # fixed portion of the arg arg = line[before_arg+1:endidx] if uploads: os.chdir(uploads_path) pattern = arg + '*' completions = [] for path in glob.glob(pattern): if dir_only: if os.path.isdir(path): path = append_slash_if_dir(path) completions.append(path.replace(fixed, "", 1)) elif files_only: if not os.path.isdir(path): completions.append(path.replace(fixed, "", 1)) elif exe_only: if not os.path.isdir(path): if path.endswith('.exe') or path.endswith('.py'): completions.append(path.replace(fixed, "", 1)) elif py_only: if not os.path.isdir(path): if path.endswith('.py'): completions.append(path.replace(fixed, "", 1)) elif all_dir: if os.path.isdir(path): path = append_slash_if_dir(path) completions.append(path.replace(fixed, "", 1)) os.chdir(cur_dir) return completions
def st_logger(resp,log_path,log_name,verbose=True): if no_error(resp): i = 1 log = os.path.join(log_path,'{}.log'.format(log_name)) while os.path.exists(log): new_log_name = '{} ({}).log'.format(log_name,i) log = os.path.join(log_path,new_log_name) i += 1 if verbose: st_print("[+] Output has been written to {}\n".format(log)) with open(log,'w') as l: l.write(resp) #http://stackoverflow.com/questions/2828953/silence-the-stdout-of-a-function-in-python-without-trashing-sys-stdout-and-resto
def __init__(self, filepath, defaults=None): """Create new :class:`Settings` object.""" super(Settings, self).__init__() self._filepath = filepath self._nosave = False self._original = {} if os.path.exists(self._filepath): self._load() elif defaults: for key, val in defaults.items(): self[key] = val self.save() # save default settings