我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用os.path()。
def LoadPlugins(plugins, verbose): if plugins == '': return scriptPath = os.path.dirname(sys.argv[0]) for plugin in sum(map(ProcessAt, plugins.split(',')), []): try: if not plugin.lower().endswith('.py'): plugin += '.py' if os.path.dirname(plugin) == '': if not os.path.exists(plugin): scriptPlugin = os.path.join(scriptPath, plugin) if os.path.exists(scriptPlugin): plugin = scriptPlugin exec(open(plugin, 'r').read()) except Exception as e: print('Error loading plugin: %s' % plugin) if verbose: raise e
def _warn_unsafe_extraction_path(path): """ If the default extraction path is overridden and set to an insecure location, such as /tmp, it opens up an opportunity for an attacker to replace an extracted file with an unauthorized payload. Warn the user if a known insecure location is used. See Distribute #375 for more details. """ if os.name == 'nt' and not path.startswith(os.environ['windir']): # On Windows, permissions are generally restrictive by default # and temp directories are not writable by other users, so # bypass the warning. return mode = os.stat(path).st_mode if mode & stat.S_IWOTH or mode & stat.S_IWGRP: msg = ("%s is writable by group/others and vulnerable to attack " "when " "used with get_resource_filename. Consider a more secure " "location (set with .set_extraction_path or the " "PYTHON_EGG_CACHE environment variable)." % path) warnings.warn(msg, UserWarning)
def atomic_writer(file_path, mode): """Atomic file writer. :param file_path: path of file to write to. :type file_path: ``unicode`` :param mode: sames as for `func:open` :type mode: string .. versionadded:: 1.12 Context manager that ensures the file is only written if the write succeeds. The data is first written to a temporary file. """ temp_suffix = '.aw.temp' temp_file_path = file_path + temp_suffix with open(temp_file_path, mode) as file_obj: try: yield file_obj os.rename(temp_file_path, file_path) finally: try: os.remove(temp_file_path) except (OSError, IOError): pass
def cachedir(self): """Path to workflow's cache directory. The cache directory is a subdirectory of Alfred's own cache directory in ``~/Library/Caches``. The full path is: ``~/Library/Caches/com.runningwithcrayons.Alfred-X/Workflow Data/<bundle id>`` ``Alfred-X`` may be ``Alfred-2`` or ``Alfred-3``. :returns: full path to workflow's cache directory :rtype: ``unicode`` """ if self.alfred_env.get('workflow_cache'): dirpath = self.alfred_env.get('workflow_cache') else: dirpath = self._default_cachedir return self._create(dirpath)
def datadir(self): """Path to workflow's data directory. The data directory is a subdirectory of Alfred's own data directory in ``~/Library/Application Support``. The full path is: ``~/Library/Application Support/Alfred 2/Workflow Data/<bundle id>`` :returns: full path to workflow data directory :rtype: ``unicode`` """ if self.alfred_env.get('workflow_data'): dirpath = self.alfred_env.get('workflow_data') else: dirpath = self._default_datadir return self._create(dirpath)
def _delete_directory_contents(self, dirpath, filter_func): """Delete all files in a directory. :param dirpath: path to directory to clear :type dirpath: ``unicode`` or ``str`` :param filter_func function to determine whether a file shall be deleted or not. :type filter_func ``callable`` """ if os.path.exists(dirpath): for filename in os.listdir(dirpath): if not filter_func(filename): continue path = os.path.join(dirpath, filename) if os.path.isdir(path): shutil.rmtree(path) else: os.unlink(path) self.logger.debug('Deleted : %r', path)
def test_rar_detection(self): """ Tests the rar file detection process """ from newsreap.codecs.CodecRar import RAR_PART_RE result = RAR_PART_RE.match('/path/to/test.rar') assert result is not None assert result.group('part') is None result = RAR_PART_RE.match('/path/to/test.RaR') assert result is not None assert result.group('part') is None result = RAR_PART_RE.match('/path/to/test.r01') assert result is not None assert result.group('part') == '01' result = RAR_PART_RE.match('/path/to/test.R200') assert result is not None assert result.group('part') == '200' result = RAR_PART_RE.match('/path/to/test.part65.rar') assert result is not None assert result.group('part') == '65'
def test_7z_detection(self): """ Tests the 7z file detection process """ from newsreap.codecs.Codec7Zip import SEVEN_ZIP_PART_RE result = SEVEN_ZIP_PART_RE.match('/path/to/test.7z') assert result is not None assert result.group('part') is None result = SEVEN_ZIP_PART_RE.match('/path/to/test.7Z') assert result is not None assert result.group('part') is None result = SEVEN_ZIP_PART_RE.match('/path/to/test.7z.001') assert result is not None assert result.group('part0') == '001' result = SEVEN_ZIP_PART_RE.match('/path/to/test.part65.7z') assert result is not None assert result.group('part') == '65'
def _build_from_requirements(cls, req_spec): """ Build a working set from a requirement spec. Rewrites sys.path. """ # try it without defaults already on sys.path # by starting with an empty path ws = cls([]) reqs = parse_requirements(req_spec) dists = ws.resolve(reqs, Environment()) for dist in dists: ws.add(dist) # add any missing entries from sys.path for entry in sys.path: if entry not in ws.entries: ws.add_entry(entry) # then copy back to sys.path sys.path[:] = ws.entries return ws
def __init__(self, search_path=None, platform=get_supported_platform(), python=PY_MAJOR): """Snapshot distributions available on a search path Any distributions found on `search_path` are added to the environment. `search_path` should be a sequence of ``sys.path`` items. If not supplied, ``sys.path`` is used. `platform` is an optional string specifying the name of the platform that platform-specific distributions must be compatible with. If unspecified, it defaults to the current platform. `python` is an optional string naming the desired version of Python (e.g. ``'3.3'``); it defaults to the current version. You may explicitly set `platform` (and/or `python`) to ``None`` if you wish to map *all* distributions, not just those compatible with the running platform or Python version. """ self._distmap = {} self.platform = platform self.python = python self.scan(search_path)
def get_cache_path(self, archive_name, names=()): """Return absolute location in cache for `archive_name` and `names` The parent directory of the resulting path will be created if it does not already exist. `archive_name` should be the base filename of the enclosing egg (which may not be the name of the enclosing zipfile!), including its ".egg" extension. `names`, if provided, should be a sequence of path name parts "under" the egg's extraction location. This method should only be called by resource providers that need to obtain an extraction location, and only for names they intend to extract, as it tracks the generated names for possible cleanup later. """ extract_path = self.extraction_path or get_default_cache() target_path = os.path.join(extract_path, archive_name + '-tmp', *names) try: _bypass_ensure_directory(target_path) except: self.extraction_error() self._warn_unsafe_extraction_path(extract_path) self.cached_files[target_path] = 1 return target_path
def build(cls, path): """ Build a dictionary similar to the zipimport directory caches, except instead of tuples, store ZipInfo objects. Use a platform-specific path separator (os.sep) for the path keys for compatibility with pypy on Windows. """ with ContextualZipFile(path) as zfile: items = ( ( name.replace('/', os.sep), zfile.getinfo(name), ) for name in zfile.namelist() ) return dict(items)
def _index(self): try: return self._dirindex except AttributeError: ind = {} for path in self.zipinfo: parts = path.split(os.sep) while parts: parent = os.sep.join(parts[:-1]) if parent in ind: ind[parent].append(parts[-1]) break else: ind[parent] = [parts.pop()] self._dirindex = ind return ind
def find_eggs_in_zip(importer, path_item, only=False): """ Find eggs in zip files; possibly multiple nested eggs. """ if importer.archive.endswith('.whl'): # wheels are not supported with this finder # they don't have PKG-INFO metadata, and won't ever contain eggs return metadata = EggMetadata(importer) if metadata.has_metadata('PKG-INFO'): yield Distribution.from_filename(path_item, metadata=metadata) if only: # don't yield nested distros return for subitem in metadata.resource_listdir('/'): if _is_unpacked_egg(subitem): subpath = os.path.join(path_item, subitem) for dist in find_eggs_in_zip(zipimport.zipimporter(subpath), subpath): yield dist
def _by_version_descending(names): """ Given a list of filenames, return them in descending order by version number. >>> names = 'bar', 'foo', 'Python-2.7.10.egg', 'Python-2.7.2.egg' >>> _by_version_descending(names) ['Python-2.7.10.egg', 'Python-2.7.2.egg', 'foo', 'bar'] >>> names = 'Setuptools-1.2.3b1.egg', 'Setuptools-1.2.3.egg' >>> _by_version_descending(names) ['Setuptools-1.2.3.egg', 'Setuptools-1.2.3b1.egg'] >>> names = 'Setuptools-1.2.3b1.egg', 'Setuptools-1.2.3.post1.egg' >>> _by_version_descending(names) ['Setuptools-1.2.3.post1.egg', 'Setuptools-1.2.3b1.egg'] """ def _by_version(name): """ Parse each component of the filename """ name, ext = os.path.splitext(name) parts = itertools.chain(name.split('-'), [ext]) return [packaging.version.parse(part) for part in parts] return sorted(names, key=_by_version, reverse=True)
def register_namespace_handler(importer_type, namespace_handler): """Register `namespace_handler` to declare namespace packages `importer_type` is the type or class of a PEP 302 "Importer" (sys.path item handler), and `namespace_handler` is a callable like this:: def namespace_handler(importer, path_entry, moduleName, module): # return a path_entry to use for child packages Namespace handlers are only called if the importer object has already agreed that it can handle the relevant path item, and they should only return a subpath if the module __path__ does not already contain an equivalent subpath. For an example namespace handler, see ``pkg_resources.file_ns_handler``. """ _namespace_handlers[importer_type] = namespace_handler
def _handle_ns(packageName, path_item): """Ensure that named package includes a subpath of path_item (if needed)""" importer = get_importer(path_item) if importer is None: return None loader = importer.find_module(packageName) if loader is None: return None module = sys.modules.get(packageName) if module is None: module = sys.modules[packageName] = types.ModuleType(packageName) module.__path__ = [] _set_parent_ns(packageName) elif not hasattr(module, '__path__'): raise TypeError("Not a package:", packageName) handler = _find_adapter(_namespace_handlers, importer) subpath = handler(importer, path_item, packageName, module) if subpath is not None: path = module.__path__ path.append(subpath) loader.load_module(packageName) _rebuild_mod_path(path, packageName, module) return subpath
def check_version_conflict(self): if self.key == 'setuptools': # ignore the inevitable setuptools self-conflicts :( return nsp = dict.fromkeys(self._get_metadata('namespace_packages.txt')) loc = normalize_path(self.location) for modname in self._get_metadata('top_level.txt'): if (modname not in sys.modules or modname in nsp or modname in _namespace_packages): continue if modname in ('pkg_resources', 'setuptools', 'site'): continue fn = getattr(sys.modules[modname], '__file__', None) if fn and (normalize_path(fn).startswith(loc) or fn.startswith(self.location)): continue issue_warning( "Module %s was already imported from %s, but %s is being added" " to sys.path" % (modname, fn, self.location), )
def __init__(self, name=None, delete=None): # If we were not given an explicit directory, and we were not given an # explicit delete option, then we'll default to deleting. if name is None and delete is None: delete = True if name is None: # We realpath here because some systems have their default tmpdir # symlinked to another directory. This tends to confuse build # scripts, so we canonicalize the path by traversing potential # symlinks here. name = os.path.realpath(tempfile.mkdtemp(prefix="pip-build-")) # If we were not given an explicit directory, and we were not given # an explicit delete option, then we'll default to deleting. if delete is None: delete = True self.name = name self.delete = delete
def check_path_owner(path): # If we don't have a way to check the effective uid of this process, then # we'll just assume that we own the directory. if not hasattr(os, "geteuid"): return True previous = None while path != previous: if os.path.lexists(path): # Check if path is writable by current user. if os.geteuid() == 0: # Special handling for root user in order to handle properly # cases where users use sudo without -H flag. try: path_uid = get_path_uid(path) except OSError: return False return path_uid == 0 else: return os.access(path, os.W_OK) else: previous, path = path, os.path.dirname(path)
def fix_script(path): """Replace #!python with #!/path/to/python Return True if file was changed.""" # XXX RECORD hashes will need to be updated if os.path.isfile(path): with open(path, 'rb') as script: firstline = script.readline() if not firstline.startswith(b'#!python'): return False exename = sys.executable.encode(sys.getfilesystemencoding()) firstline = b'#!' + exename + os.linesep.encode("ascii") rest = script.read() with open(path, 'wb') as script: script.write(firstline) script.write(rest) return True
def uninstallation_paths(dist): """ Yield all the uninstallation paths for dist based on RECORD-without-.pyc Yield paths to all the files in RECORD. For each .py file in RECORD, add the .pyc in the same directory. UninstallPathSet.add() takes care of the __pycache__ .pyc. """ from pip.utils import FakeFile # circular import r = csv.reader(FakeFile(dist.get_metadata_lines('RECORD'))) for row in r: path = os.path.join(dist.location, row[0]) yield path if path.endswith('.py'): dn, fn = os.path.split(path) base = fn[:-3] path = os.path.join(dn, base + '.pyc') yield path
def _build_one(self, req, output_dir, python_tag=None): """Build one wheel. :return: The filename of the built wheel, or None if the build failed. """ tempd = tempfile.mkdtemp('pip-wheel-') try: if self.__build_one(req, tempd, python_tag=python_tag): try: wheel_name = os.listdir(tempd)[0] wheel_path = os.path.join(output_dir, wheel_name) shutil.move(os.path.join(tempd, wheel_name), wheel_path) logger.info('Stored in directory: %s', output_dir) return wheel_path except: pass # Ignore return, we can't do anything else useful. self._clean_one(req) return None finally: rmtree(tempd)
def load(self): # XXX JSON is not a great database for path in load_config_paths('wheel'): conf = os.path.join(native(path), self.CONFIG_NAME) if os.path.exists(conf): with open(conf, 'r') as infile: self.data = json.load(infile) for x in ('signers', 'verifiers'): if not x in self.data: self.data[x] = [] if 'schema' not in self.data: self.data['schema'] = self.SCHEMA elif self.data['schema'] != self.SCHEMA: raise ValueError( "Bad wheel.json version {0}, expected {1}".format( self.data['schema'], self.SCHEMA)) break return self
def run_script(self, script_name, namespace): script = 'scripts/' + script_name if not self.has_metadata(script): raise ResolutionError("No script named %r" % script_name) script_text = self.get_metadata(script).replace('\r\n', '\n') script_text = script_text.replace('\r', '\n') script_filename = self._fn(self.egg_info, script) namespace['__file__'] = script_filename if os.path.exists(script_filename): source = open(script_filename).read() code = compile(source, script_filename, 'exec') exec(code, namespace, namespace) else: from linecache import cache cache[script_filename] = ( len(script_text), 0, script_text.split('\n'), script_filename ) script_code = compile(script_text, script_filename, 'exec') exec(script_code, namespace, namespace)
def find_eggs_in_zip(importer, path_item, only=False): """ Find eggs in zip files; possibly multiple nested eggs. """ if importer.archive.endswith('.whl'): # wheels are not supported with this finder # they don't have PKG-INFO metadata, and won't ever contain eggs return metadata = EggMetadata(importer) if metadata.has_metadata('PKG-INFO'): yield Distribution.from_filename(path_item, metadata=metadata) if only: # don't yield nested distros return for subitem in metadata.resource_listdir('/'): if _is_egg_path(subitem): subpath = os.path.join(path_item, subitem) for dist in find_eggs_in_zip(zipimport.zipimporter(subpath), subpath): yield dist elif subitem.lower().endswith('.dist-info'): subpath = os.path.join(path_item, subitem) submeta = EggMetadata(zipimport.zipimporter(subpath)) submeta.egg_info = subpath yield Distribution.from_location(path_item, subitem, submeta)
def maybe_move(self, spec, dist_filename, setup_base): dst = os.path.join(self.build_directory, spec.key) if os.path.exists(dst): msg = ( "%r already exists in %s; build directory %s will not be kept" ) log.warn(msg, spec.key, self.build_directory, setup_base) return setup_base if os.path.isdir(dist_filename): setup_base = dist_filename else: if os.path.dirname(dist_filename) == setup_base: os.unlink(dist_filename) # get it out of the tmp dir contents = os.listdir(setup_base) if len(contents) == 1: dist_filename = os.path.join(setup_base, contents[0]) if os.path.isdir(dist_filename): # if the only thing there is a directory, move it instead setup_base = dist_filename ensure_directory(dst) shutil.move(setup_base, dst) return dst
def build_and_install(self, setup_script, setup_base): args = ['bdist_egg', '--dist-dir'] dist_dir = tempfile.mkdtemp( prefix='egg-dist-tmp-', dir=os.path.dirname(setup_script) ) try: self._set_fetcher_options(os.path.dirname(setup_script)) args.append(dist_dir) self.run_setup(setup_script, setup_base, args) all_eggs = Environment([dist_dir]) eggs = [] for key in all_eggs: for dist in all_eggs[key]: eggs.append(self.install_egg(dist.location, setup_base)) if not eggs and not self.dry_run: log.warn("No eggs found in %s (setup script problem?)", dist_dir) return eggs finally: rmtree(dist_dir) log.set_verbosity(self.verbose) # restore our log verbosity
def _set_fetcher_options(self, base): """ When easy_install is about to run bdist_egg on a source dist, that source dist might have 'setup_requires' directives, requiring additional fetching. Ensure the fetcher options given to easy_install are available to that command as well. """ # find the fetch options from easy_install and write them out # to the setup.cfg file. ei_opts = self.distribution.get_option_dict('easy_install').copy() fetch_directives = ( 'find_links', 'site_dirs', 'index_url', 'optimize', 'site_dirs', 'allow_hosts', ) fetch_options = {} for key, val in ei_opts.items(): if key not in fetch_directives: continue fetch_options[key.replace('_', '-')] = val[1] # create a settings dictionary suitable for `edit_config` settings = dict(easy_install=fetch_options) cfg_filename = os.path.join(base, 'setup.cfg') setopt.edit_config(cfg_filename, settings)
def _expand(self, *attrs): config_vars = self.get_finalized_command('install').config_vars if self.prefix: # Set default install_dir/scripts from --prefix config_vars = config_vars.copy() config_vars['base'] = self.prefix scheme = self.INSTALL_SCHEMES.get(os.name, self.DEFAULT_SCHEME) for attr, val in scheme.items(): if getattr(self, attr, None) is None: setattr(self, attr, val) from distutils.util import subst_vars for attr in attrs: val = getattr(self, attr) if val is not None: val = subst_vars(val, config_vars) if os.name == 'posix': val = os.path.expanduser(val) setattr(self, attr, val)
def sanitize(self): import os.path if not self.working_dir: self.working_dir = os.path.curdir if not self.output_dir: self.output_dir = os.path.join(self.working_dir, 'build', '') allowed_types = ['executable', 'library', 'shared'] if self.type not in allowed_types: # TODO: exceptions? Style.error('Invalid output type:', self.type) Style.error('Allowed types:', allowed_types) self.type = allowed_types[0] Style.error('Reverting to', self.type) if not self.executable: self.executable = os.path.join(self.output_dir, self.name) if self.type == 'executable': self.executable = platform.current.as_executable(self.executable) elif self.type == 'library': self.executable = platform.current.as_static_library(self.executable) elif self.type == 'shared': self.executable = platform.current.as_shared_library(self.executable) self.working_dir = os.path.realpath(self.working_dir) self.output_dir = os.path.realpath(self.output_dir) self.sources.working_directory = Path(self.working_dir) self._init_hooks() # Stage hooks # Create an empty list for each stage
def storage(path): return send_from_directory('storage', path)
def tail(): if request.method == 'POST': fi = request.form['file'] if os.path.isfile(fi): n = int(request.form['n']) le = io.open(fi, 'r', encoding='utf-8') taildata = le.read()[-n:] le.close() else: taildata = "No such file." return render_template('tail.html',taildata=taildata)
def _create_home(self): if not os.path.isdir(self._HOME + '/' + self._CONFIG_DIR): os.makedirs(self._HOME + '/' + self._CONFIG_DIR) with os.fdopen(os.open(self._HOME + '/' + self._CONFIG_DIR + '/' + self._CONFIG_FILE_NAME, os.O_WRONLY | os.O_CREAT, 0o600), 'w'): pass with os.fdopen(os.open(self._HOME + '/' + self._CONFIG_DIR + '/' + self._CREDENTIALS_FILE_NAME, os.O_WRONLY | os.O_CREAT, 0o600), 'w'): pass
def grab_dump_and_convert(dev, target_name, dest_path, pkg): Fun.p_open(Res.adb_grab_heap_dump_file_with_pkg(dev, absolute_tmp_dump_path, pkg)) Fun.sleep(4) dump_file_path = os.path.join(dest_path, target_name) dump_file = dump_file_path + Res.hprof_suffix if not dump_file_path.endswith(Res.hprof_suffix) else dump_file_path if os.path.exists(dump_file): dump_file = dump_file.replace(Res.hprof_suffix, Fun.current_time() + Res.hprof_suffix) Fun.p_open(Res.adb_pull_heap_file_to_dest(dev, absolute_tmp_dump_path, dump_file)) convert_file_into_directory(dump_file)
def convert_and_store(file_or_path): """ :param file_or_path: use a single file with full path or a full path all the files end with under the path will be converted and store into directory named with the file name """ if not os.path.isdir(file_or_path): convert_file_into_directory(file_or_path) else: for filename in os.listdir(file_or_path): if filename.endswith(Res.hprof_suffix): convert_file_into_directory(os.path.join(file_or_path, filename))
def convert_file_into_directory(file_with_full_path): path = dirname(file_with_full_path) tags = os.path.split(file_with_full_path) file_name = tags[1] convert_and_store_in_directory(path, file_name)
def convert_and_store_in_directory(dest_path, target_name): target_name = target_name.replace(Res.hprof_suffix, '') dump_file_path = os.path.join(dest_path, target_name) dump_file = dump_file_path + Res.hprof_suffix if not dump_file_path.endswith(Res.hprof_suffix) else dump_file_path Fun.make_dir_if_not_exist(dump_file_path) convert_file = os.path.join(dump_file_path, target_name + Res.convert_suffix) Fun.p_open(Res.adb_convert_heap_profile(dump_file, convert_file))
def execute(heap_dump_name): destination_path = '/Users/elex/Documents/dump' # change directory path to your favorites grab_dump_and_convert(destination_path, heap_dump_name)
def __init__(self, filepath, defaults=None): """Create new :class:`Settings` object.""" super(Settings, self).__init__() self._filepath = filepath self._nosave = False self._original = {} if os.path.exists(self._filepath): self._load() elif defaults: for key, val in defaults.items(): self[key] = val self.save() # save default settings