我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.name()。
def zip_dir(directory): """zip a directory tree into a BytesIO object""" result = io.BytesIO() dlen = len(directory) with ZipFile(result, "w") as zf: for root, dirs, files in os.walk(directory): for name in files: full = os.path.join(root, name) rel = root[dlen:] dest = os.path.join(rel, name) zf.write(full, dest) return result # # Simple progress bar #
def _warn_unsafe_extraction_path(path): """ If the default extraction path is overridden and set to an insecure location, such as /tmp, it opens up an opportunity for an attacker to replace an extracted file with an unauthorized payload. Warn the user if a known insecure location is used. See Distribute #375 for more details. """ if os.name == 'nt' and not path.startswith(os.environ['windir']): # On Windows, permissions are generally restrictive by default # and temp directories are not writable by other users, so # bypass the warning. return mode = os.stat(path).st_mode if mode & stat.S_IWOTH or mode & stat.S_IWGRP: msg = ("%s is writable by group/others and vulnerable to attack " "when " "used with get_resource_filename. Consider a more secure " "location (set with .set_extraction_path or the " "PYTHON_EGG_CACHE environment variable)." % path) warnings.warn(msg, UserWarning)
def enable_node(self, ip_addr, *setup_args): """If computation disabled nodes (with 'disabled_nodes=True' when Computation is constructed), nodes are not automatically used by the scheduler until nodes are enabled with 'enable_node'. 'ip_addr' must be either IP address or host name of the node to be enabled. 'setup_args' is arguments passed to 'node_setup' function specific to that node. If 'node_setup' succeeds (i.e., finishes with value 0), the node is used for computations. """ if self.scheduler: if isinstance(ip_addr, pycos.Location): ip_addr = ip_addr.addr self.scheduler.send({'req': 'enable_node', 'auth': self._auth, 'addr': ip_addr, 'setup_args': setup_args})
def __init__(self, name, addr): self.name = name self.addr = addr self.cpus_used = 0 self.cpus = 0 self.platform = None self.avail_info = None self.servers = {} self.disabled_servers = {} self.load = 0.0 self.status = Scheduler.NodeClosed self.task = None self.last_pulse = time.time() self.lock = pycos.Lock() self.avail = pycos.Event() self.avail.clear()
def check_dataset(dirname, name): ''' Check the test and valid files are in the directory, as well as the solution''' valid_file = os.path.join(dirname, name + '_valid.data') if not os.path.isfile(valid_file): print('No validation file for ' + name) exit(1) test_file = os.path.join(dirname, name + '_test.data') if not os.path.isfile(test_file): print('No test file for ' + name) exit(1) # Check the training labels are there training_solution = os.path.join(dirname, name + '_train.solution') if not os.path.isfile(training_solution): print('No training labels for ' + name) exit(1) return True
def addsitedir(sitedir, known_paths=None): """Add 'sitedir' argument to sys.path if missing and handle .pth files in 'sitedir'""" if known_paths is None: known_paths = _init_pathinfo() reset = 1 else: reset = 0 sitedir, sitedircase = makepath(sitedir) if not sitedircase in known_paths: sys.path.append(sitedir) # Add path component try: names = os.listdir(sitedir) except os.error: return names.sort() for name in names: if name.endswith(os.extsep + "pth"): addpackage(sitedir, name, known_paths) if reset: known_paths = None return known_paths
def get_info(self): """Return the TarInfo's attributes as a dictionary. """ info = { "name": self.name, "mode": self.mode & 0o7777, "uid": self.uid, "gid": self.gid, "size": self.size, "mtime": self.mtime, "chksum": self.chksum, "type": self.type, "linkname": self.linkname, "uname": self.uname, "gname": self.gname, "devmajor": self.devmajor, "devminor": self.devminor } if info["type"] == DIRTYPE and not info["name"].endswith("/"): info["name"] += "/" return info
def _apply_pax_info(self, pax_headers, encoding, errors): """Replace fields with supplemental information from a previous pax extended or global header. """ for keyword, value in pax_headers.items(): if keyword == "GNU.sparse.name": setattr(self, "path", value) elif keyword == "GNU.sparse.size": setattr(self, "size", int(value)) elif keyword == "GNU.sparse.realsize": setattr(self, "size", int(value)) elif keyword in PAX_FIELDS: if keyword in PAX_NUMBER_FIELDS: try: value = PAX_NUMBER_FIELDS[keyword](value) except ValueError: value = 0 if keyword == "path": value = value.rstrip("/") setattr(self, keyword, value) self.pax_headers = pax_headers.copy()
def _getmember(self, name, tarinfo=None, normalize=False): """Find an archive member by name from bottom to top. If tarinfo is given, it is used as the starting point. """ # Ensure that all members have been loaded. members = self.getmembers() # Limit the member search list up to tarinfo. if tarinfo is not None: members = members[:members.index(tarinfo)] if normalize: name = os.path.normpath(name) for member in reversed(members): if normalize: member_name = os.path.normpath(member.name) else: member_name = member.name if name == member_name: return member
def resolve(self, s): """ Resolve strings to objects using standard import and attribute syntax. """ name = s.split('.') used = name.pop(0) try: found = self.importer(used) for frag in name: used += '.' + frag try: found = getattr(found, frag) except AttributeError: self.importer(used) found = getattr(found, frag) return found except ImportError: e, tb = sys.exc_info()[1:] v = ValueError('Cannot resolve %r: %s' % (s, e)) v.__cause__, v.__traceback__ = e, tb raise v
def _make_script(self, entry, filenames, options=None): post_interp = b'' if options: args = options.get('interpreter_args', []) if args: args = ' %s' % ' '.join(args) post_interp = args.encode('utf-8') shebang = self._get_shebang('utf-8', post_interp, options=options) script = self._get_script_text(entry).encode('utf-8') name = entry.name scriptnames = set() if '' in self.variants: scriptnames.add(name) if 'X' in self.variants: scriptnames.add('%s%s' % (name, sys.version[0])) if 'X.Y' in self.variants: scriptnames.add('%s-%s' % (name, sys.version[:3])) if options and options.get('gui', False): ext = 'pyw' else: ext = 'py' self._write_script(scriptnames, shebang, script, filenames, ext)
def write_exports(exports, stream): if sys.version_info[0] >= 3: # needs to be a text stream stream = codecs.getwriter('utf-8')(stream) cp = configparser.ConfigParser() for k, v in exports.items(): # TODO check k, v for valid values cp.add_section(k) for entry in v.values(): if entry.suffix is None: s = entry.prefix else: s = '%s:%s' % (entry.prefix, entry.suffix) if entry.flags: s = '%s [%s]' % (s, ', '.join(entry.flags)) cp.set(k, entry.name, s) cp.write(stream)
def convert_path(pathname): """Return 'pathname' as a name that will work on the native filesystem. The path is split on '/' and put back together again using the current directory separator. Needed because filenames in the setup script are always supplied in Unix style, and have to be converted to the local convention before we can actually use them in the filesystem. Raises ValueError on non-Unix-ish systems if 'pathname' either starts or ends with a slash. """ if os.sep == '/': return pathname if not pathname: return pathname if pathname[0] == '/': raise ValueError("path '%s' cannot be absolute" % pathname) if pathname[-1] == '/': raise ValueError("path '%s' cannot end with '/'" % pathname) paths = pathname.split('/') while os.curdir in paths: paths.remove(os.curdir) if not paths: return os.curdir return os.path.join(*paths)
def split_filename(filename, project_name=None): """ Extract name, version, python version from a filename (no extension) Return name, version, pyver or None """ result = None pyver = None filename = unquote(filename).replace(' ', '-') m = PYTHON_VERSION.search(filename) if m: pyver = m.group(1) filename = filename[:m.start()] if project_name and len(filename) > len(project_name) + 1: m = re.match(re.escape(project_name) + r'\b', filename) if m: n = m.end() result = filename[:n], filename[n + 1:], pyver if result is None: m = PROJECT_NAME_AND_VERSION.match(filename) if m: result = m.group(1), m.group(3), pyver return result # Allow spaces in name because of legacy dists like "Twisted Core"
def add(self, event, subscriber, append=True): """ Add a subscriber for an event. :param event: The name of an event. :param subscriber: The subscriber to be added (and called when the event is published). :param append: Whether to append or prepend the subscriber to an existing subscriber list for the event. """ subs = self._subscribers if event not in subs: subs[event] = deque([subscriber]) else: sq = subs[event] if append: sq.append(subscriber) else: sq.appendleft(subscriber)
def __init__(self, search_path=None, platform=get_supported_platform(), python=PY_MAJOR): """Snapshot distributions available on a search path Any distributions found on `search_path` are added to the environment. `search_path` should be a sequence of ``sys.path`` items. If not supplied, ``sys.path`` is used. `platform` is an optional string specifying the name of the platform that platform-specific distributions must be compatible with. If unspecified, it defaults to the current platform. `python` is an optional string naming the desired version of Python (e.g. ``'3.3'``); it defaults to the current version. You may explicitly set `platform` (and/or `python`) to ``None`` if you wish to map *all* distributions, not just those compatible with the running platform or Python version. """ self._distmap = {} self.platform = platform self.python = python self.scan(search_path)
def get_cache_path(self, archive_name, names=()): """Return absolute location in cache for `archive_name` and `names` The parent directory of the resulting path will be created if it does not already exist. `archive_name` should be the base filename of the enclosing egg (which may not be the name of the enclosing zipfile!), including its ".egg" extension. `names`, if provided, should be a sequence of path name parts "under" the egg's extraction location. This method should only be called by resource providers that need to obtain an extraction location, and only for names they intend to extract, as it tracks the generated names for possible cleanup later. """ extract_path = self.extraction_path or get_default_cache() target_path = os.path.join(extract_path, archive_name + '-tmp', *names) try: _bypass_ensure_directory(target_path) except: self.extraction_error() self._warn_unsafe_extraction_path(extract_path) self.cached_files[target_path] = 1 return target_path
def postprocess(self, tempname, filename): """Perform any platform-specific postprocessing of `tempname` This is where Mac header rewrites should be done; other platforms don't have anything special they should do. Resource providers should call this method ONLY after successfully extracting a compressed resource. They must NOT call it on resources that are already in the filesystem. `tempname` is the current (temporary) name of the file, and `filename` is the name it will be renamed to by the caller after this routine returns. """ if os.name == 'posix': # Make the resource executable mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777 os.chmod(tempname, mode)
def _by_version_descending(names): """ Given a list of filenames, return them in descending order by version number. >>> names = 'bar', 'foo', 'Python-2.7.10.egg', 'Python-2.7.2.egg' >>> _by_version_descending(names) ['Python-2.7.10.egg', 'Python-2.7.2.egg', 'foo', 'bar'] >>> names = 'Setuptools-1.2.3b1.egg', 'Setuptools-1.2.3.egg' >>> _by_version_descending(names) ['Setuptools-1.2.3.egg', 'Setuptools-1.2.3b1.egg'] >>> names = 'Setuptools-1.2.3b1.egg', 'Setuptools-1.2.3.post1.egg' >>> _by_version_descending(names) ['Setuptools-1.2.3.post1.egg', 'Setuptools-1.2.3b1.egg'] """ def _by_version(name): """ Parse each component of the filename """ name, ext = os.path.splitext(name) parts = itertools.chain(name.split('-'), [ext]) return [packaging.version.parse(part) for part in parts] return sorted(names, key=_by_version, reverse=True)
def parse_map(cls, data, dist=None): """Parse a map of entry point groups""" if isinstance(data, dict): data = data.items() else: data = split_sections(data) maps = {} for group, lines in data: if group is None: if not lines: continue raise ValueError("Entry points must be listed in groups") group = group.strip() if group in maps: raise ValueError("Duplicate group name", group) maps[group] = cls.parse_group(group, lines, dist) return maps
def _dep_map(self): try: return self.__dep_map except AttributeError: dm = self.__dep_map = {None: []} for name in 'requires.txt', 'depends.txt': for extra, reqs in split_sections(self._get_metadata(name)): if extra: if ':' in extra: extra, marker = extra.split(':', 1) if invalid_marker(marker): # XXX warn reqs = [] elif not evaluate_marker(marker): reqs = [] extra = safe_extra(extra) or None dm.setdefault(extra, []).extend(parse_requirements(reqs)) return dm
def __init__(self, requirement_string): """DO NOT CALL THIS UNDOCUMENTED METHOD; use Requirement.parse()!""" try: super(Requirement, self).__init__(requirement_string) except packaging.requirements.InvalidRequirement as e: raise RequirementParseError(str(e)) self.unsafe_name = self.name project_name = safe_name(self.name) self.project_name, self.key = project_name, project_name.lower() self.specs = [ (spec.operator, spec.version) for spec in self.specifier] self.extras = tuple(map(safe_extra, self.extras)) self.hashCmp = ( self.key, self.specifier, frozenset(self.extras), str(self.marker) if self.marker else None, ) self.__hash = hash(self.hashCmp)
def default_environment(): if hasattr(sys, 'implementation'): iver = format_full_version(sys.implementation.version) implementation_name = sys.implementation.name else: iver = '0' implementation_name = '' return { "implementation_name": implementation_name, "implementation_version": iver, "os_name": os.name, "platform_machine": platform.machine(), "platform_release": platform.release(), "platform_system": platform.system(), "platform_version": platform.version(), "python_full_version": platform.python_version(), "platform_python_implementation": platform.python_implementation(), "python_version": platform.python_version()[:3], "sys_platform": sys.platform, }
def build(cls, path): """ Build a dictionary similar to the zipimport directory caches, except instead of tuples, store ZipInfo objects. Use a platform-specific path separator (os.sep) for the path keys for compatibility with pypy on Windows. """ with ContextualZipFile(path) as zfile: items = ( ( name.replace('/', os.sep), zfile.getinfo(name), ) for name in zfile.namelist() ) return dict(items)
def parse(cls, src, dist=None): """Parse a single entry point from string `src` Entry point syntax follows the form:: name = some.module:some.attr [extra1, extra2] The entry name and module name are required, but the ``:attrs`` and ``[extras]`` parts are optional """ m = cls.pattern.match(src) if not m: msg = "EntryPoint must be in 'name=module:attrs [extras]' format" raise ValueError(msg, src) res = m.groupdict() extras = cls._parse_extras(res['extras']) attrs = res['attr'].split('.') if res['attr'] else () return cls(res['name'], res['module'], attrs, extras, dist)
def config_file(kind="local"): """Get the filename of the distutils, local, global, or per-user config `kind` must be one of "local", "global", or "user" """ if kind == 'local': return 'setup.cfg' if kind == 'global': return os.path.join( os.path.dirname(distutils.__file__), 'distutils.cfg' ) if kind == 'user': dot = os.name == 'posix' and '.' or '' return os.path.expanduser(convert_path("~/%spydistutils.cfg" % dot)) raise ValueError( "config_file() type must be 'local', 'global', or 'user'", kind )
def _expand(self, *attrs): config_vars = self.get_finalized_command('install').config_vars if self.prefix: # Set default install_dir/scripts from --prefix config_vars = config_vars.copy() config_vars['base'] = self.prefix scheme = self.INSTALL_SCHEMES.get(os.name, self.DEFAULT_SCHEME) for attr, val in scheme.items(): if getattr(self, attr, None) is None: setattr(self, attr, val) from distutils.util import subst_vars for attr in attrs: val = getattr(self, attr) if val is not None: val = subst_vars(val, config_vars) if os.name == 'posix': val = os.path.expanduser(val) setattr(self, attr, val)
def removedirs(name): """removedirs(name) Super-rmdir; remove a leaf directory and all empty intermediate ones. Works like rmdir except that, if the leaf directory is successfully removed, directories corresponding to rightmost path segments will be pruned away until either the whole path is consumed or an error occurs. Errors during this latter phase are ignored -- they generally mean that a directory was not empty. """ rmdir(name) head, tail = path.split(name) if not tail: head, tail = path.split(head) while head and tail: try: rmdir(head) except OSError: break head, tail = path.split(head)
def renames(old, new): """renames(old, new) Super-rename; create directories as necessary and delete any left empty. Works like rename, except creation of any intermediate directories needed to make the new pathname good is attempted first. After the rename, directories corresponding to rightmost path segments of the old name will be pruned until either the whole path is consumed or a nonempty directory is found. Note: this function can fail with the new directory structure made if you lack permissions needed to unlink the leaf directory or file. """ head, tail = path.split(new) if head and tail and not path.exists(head): makedirs(head) rename(old, new) head, tail = path.split(old) if head and tail: try: removedirs(head) except OSError: pass
def __setattr__(self, name, value): if hasattr(self, name): self.__dict__[name] = value else: raise AttributeError('Invalid attribute "%s"' % name)
def allocate(self, ip_addr, name, platform, cpus, memory, disk): """When a node is found, scheduler calls this method with IP address, name, CPUs, memory and disk available on that node. This method should return a number indicating number of CPUs to use. If return value is 0, the node is not used; if the return value is < 0, this allocation is ignored (next allocation in the 'node_allocations' list, if any, is applied). """ if not re.match(self.ip_rex, ip_addr): return -1 if (self.platform and not re.search(self.platform, platform)): return -1 if ((self.memory and memory and self.memory > memory) or (self.disk and disk and self.disk > disk)): return 0 if self.cpus > 0: if self.cpus > cpus: return 0 return self.cpus elif self.cpus == 0: return 0 else: cpus += self.cpus if cpus < 0: return 0 return cpus
def __init__(self, request, client, name, where, cpu, code, args=None, kwargs=None): self.request = request self.client = client self.name = name self.where = where self.cpu = cpu self.code = code self.args = pycos.serialize(args) self.kwargs = pycos.serialize(kwargs) self.done = None
def __init__(self, name, location): self.name = name self.task = None self.status = Scheduler.ServerClosed self.rtasks = {} self.xfer_files = [] self.askew_results = {} self.avail = pycos.Event() self.avail.clear() self.scheduler = Scheduler._instance
def __init__(self, **kwargs): self.__class__._instance = self self._nodes = {} self._disabled_nodes = {} self._avail_nodes = set() self._nodes_avail = pycos.Event() self._nodes_avail.clear() self._shared = False self._cur_computation = None self.__cur_client_auth = None self.__cur_node_allocations = [] self.__pulse_interval = kwargs.pop('pulse_interval', MaxPulseInterval) self.__ping_interval = kwargs.pop('ping_interval', 0) self.__zombie_period = kwargs.pop('zombie_period', 100 * MaxPulseInterval) self._node_port = kwargs.pop('dispycosnode_port', 51351) self.__server_locations = set() self.__job_scheduler_task = None kwargs['name'] = 'dispycos_scheduler' clean = kwargs.pop('clean', False) nodes = kwargs.pop('nodes', []) self.pycos = pycos.Pycos.instance(**kwargs) self.__dest_path = os.path.join(self.pycos.dest_path, 'dispycos', 'dispycosscheduler') if clean: shutil.rmtree(self.__dest_path) self.pycos.dest_path = self.__dest_path self.__computation_sched_event = pycos.Event() self.__computation_scheduler_task = SysTask(self.__computation_scheduler_proc, nodes) self.__client_task = SysTask(self.__client_proc) self.__timer_task = SysTask(self.__timer_proc) Scheduler.__status_task = self.__status_task = SysTask(self.__status_proc) self.__client_task.register('dispycos_scheduler') self.pycos.discover_peers(port=self._node_port)
def __node_allocate(self, node): for node_allocate in self.__cur_node_allocations: cpus = node_allocate.allocate(node.addr, node.name, node.platform, node.cpus, node.avail_info.memory, node.avail_info.disk) if cpus < 0: continue return min(cpus, node.cpus) return node.cpus
def __discover_node(self, msg, task=None): for _ in range(10): node_task = yield Task.locate('dispycos_node', location=msg.location, timeout=MsgTimeout) if not isinstance(node_task, Task): yield task.sleep(0.1) continue self._disabled_nodes.pop(msg.location.addr, None) node = self._nodes.pop(msg.location.addr, None) if node: logger.warning('Rediscovered dispycosnode at %s; discarding previous incarnation!', msg.location.addr) self._disabled_nodes.pop(node.addr, None) if self._cur_computation: status_task = self._cur_computation.status_task else: status_task = None if status_task: for server in node.servers.values(): for rtask, job in server.rtasks.values(): status = pycos.MonitorException(rtask, (Scheduler.TaskAbandoned, None)) status_task.send(status) status_task.send(DispycosStatus(Scheduler.ServerAbandoned, server.task.location)) info = DispycosNodeInfo(node.name, node.addr, node.cpus, node.platform, node.avail_info) status_task.send(DispycosStatus(Scheduler.NodeAbandoned, info)) node = self._disabled_nodes.get(msg.location.addr, None) if not node: node = Scheduler._Node(msg.name, msg.location.addr) self._disabled_nodes[msg.location.addr] = node node.task = node_task yield self.__get_node_info(node, task=task) raise StopIteration
def inventory_data_nodir(input_dir): ''' Inventory data, assuming flat directory structure''' training_names = ls(os.path.join(input_dir, '*_train.data')) for i in range(0,len(training_names)): name = training_names[i] training_names[i] = name[-name[::-1].index(filesep):-name[::-1].index('_')-1] check_dataset(input_dir, training_names[i]) return training_names
def inventory_data_dir(input_dir): ''' Inventory data, assuming flat directory structure, assuming a directory hierarchy''' training_names = ls(input_dir + '/*/*_train.data') # This supports subdirectory structures obtained by concatenating bundles for i in range(0,len(training_names)): name = training_names[i] training_names[i] = name[-name[::-1].index(filesep):-name[::-1].index('_')-1] check_dataset(os.path.join(input_dir, training_names[i]), training_names[i]) return training_names
def addpackage(sitedir, name, known_paths): """Add a new path to known_paths by combining sitedir and 'name' or execute sitedir if it starts with 'import'""" if known_paths is None: _init_pathinfo() reset = 1 else: reset = 0 fullname = os.path.join(sitedir, name) try: f = open(fullname, "rU") except IOError: return try: for line in f: if line.startswith("#"): continue if line.startswith("import"): exec(line) continue line = line.rstrip() dir, dircase = makepath(sitedir, line) if not dircase in known_paths and os.path.exists(dir): sys.path.append(dir) known_paths.add(dircase) finally: f.close() if reset: known_paths = None return known_paths
def setquit(): """Define new built-ins 'quit' and 'exit'. These are simply strings that display a hint on how to exit. """ if os.sep == ':': eof = 'Cmd-Q' elif os.sep == '\\': eof = 'Ctrl-Z plus Return' else: eof = 'Ctrl-D (i.e. EOF)' class Quitter(object): def __init__(self, name): self.name = name def __repr__(self): return 'Use %s() or %s to exit' % (self.name, eof) def __call__(self, code=None): # Shells like IDLE catch the SystemExit, but listen when their # stdin wrapper is closed. try: sys.stdin.close() except: pass raise SystemExit(code) builtins.quit = Quitter('quit') builtins.exit = Quitter('exit')
def __init__(self, name, data, files=(), dirs=()): self.__name = name self.__data = data self.__files = files self.__dirs = dirs self.__lines = None
def __init__(self, name, mode): mode = { "r": os.O_RDONLY, "w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC, }[mode] if hasattr(os, "O_BINARY"): mode |= os.O_BINARY self.fd = os.open(name, mode, 0o666)
def _init_write_gz(self): """Initialize for writing with gzip compression. """ self.cmp = self.zlib.compressobj(9, self.zlib.DEFLATED, -self.zlib.MAX_WBITS, self.zlib.DEF_MEM_LEVEL, 0) timestamp = struct.pack("<L", int(time.time())) self.__write(b"\037\213\010\010" + timestamp + b"\002\377") if self.name.endswith(".gz"): self.name = self.name[:-3] # RFC1952 says we must use ISO-8859-1 for the FNAME field. self.__write(self.name.encode("iso-8859-1", "replace") + NUL)