我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.readlink()。
def lookupmodule(self, filename): """Helper function for break/clear parsing -- may be overridden. lookupmodule() translates (possibly incomplete) file or module name into an absolute file name. """ if os.path.isabs(filename) and os.path.exists(filename): return filename f = os.path.join(sys.path[0], filename) if os.path.exists(f) and self.canonic(f) == self.mainpyfile: return f root, ext = os.path.splitext(filename) if ext == '': filename = filename + '.py' if os.path.isabs(filename): return filename for dirname in sys.path: while os.path.islink(dirname): dirname = os.readlink(dirname) fullname = os.path.join(dirname, filename) if os.path.exists(fullname): return fullname return None
def _resolve_link(path): """Internal helper function. Takes a path and follows symlinks until we either arrive at something that isn't a symlink, or encounter a path we've seen before (meaning that there's a loop). """ paths_seen = set() while islink(path): if path in paths_seen: # Already seen this path, so we must have a symlink loop return None paths_seen.add(path) # Resolve where the link points to resolved = os.readlink(path) if not isabs(resolved): dir = dirname(path) path = normpath(join(dir, resolved)) else: path = normpath(resolved) return path
def execute(self): from ranger.container.file import File new_path = self.rest(1) cf = self.fm.thisfile if not new_path: return self.fm.notify('Syntax: relink <newpath>', bad=True) if not cf.is_link: return self.fm.notify('%s is not a symlink!' % cf.relative_path, bad=True) if new_path == os.readlink(cf.path): return try: os.remove(cf.path) os.symlink(new_path, cf.path) except OSError as err: self.fm.notify(err) self.fm.reset() self.fm.thisdir.pointed_obj = cf self.fm.thisfile = cf
def find_exec(name, error='error', extra=""): proc = subprocess.Popen(["which", name], stdout=subprocess.PIPE, stderr=subprocess.PIPE) path = proc.communicate()[0] if path: path = path.strip() if os.path.islink(path): path = os.readlink(path) return path else: if error in ('error', 'warning'): sys.stderr.write("%s: can't find %s in $PATH.\n" % (error, name)) if extra: sys.stderr.write("%s\n" % (extra,)) if error == 'error': sys.exit() return False # Classes ##########
def copy(self, target, mode=False): """ copy path to target.""" if self.check(file=1): if target.check(dir=1): target = target.join(self.basename) assert self!=target copychunked(self, target) if mode: copymode(self.strpath, target.strpath) else: def rec(p): return p.check(link=0) for x in self.visit(rec=rec): relpath = x.relto(self) newx = target.join(relpath) newx.dirpath().ensure(dir=1) if x.check(link=1): newx.mksymlinkto(x.readlink()) continue elif x.check(file=1): copychunked(x, newx) elif x.check(dir=1): newx.ensure(dir=1) if mode: copymode(x.strpath, newx.strpath)
def _getAppDirConfig(self, appname): try: appconfig = self.app.configuration.get("SplunkDeployment.apps")[appname] appdir = appconfig.get("directory") if os.path.islink(appdir): appdir = os.readlink(appdir) if not os.path.isdir(appdir): raise AppNotFoundException("The app \"" + appname + "\" does not exist in \"" + appdir + "\".") return [appdir, appconfig] except AppNotFoundException as e: raise e except: pass appconfig = self.app.configuration.get("SplunkDeployment.apps.default") appdir = os.path.join( appconfig.get("directory"), appname ) if os.path.islink(appdir): appdir = os.readlink(appdir) if not os.path.isdir(appdir): raise AppNotFoundException("The app \"" + appname + "\" does not exist in \"" + appdir + "\".") return [appdir, appconfig]
def readlink(path): """Wrapper around os.readlink().""" assert isinstance(path, basestring), path path = os.readlink(path) # readlink() might return paths containing null bytes ('\x00') # resulting in "TypeError: must be encoded string without NULL # bytes, not str" errors when the string is passed to other # fs-related functions (os.*, open(), ...). # Apparently everything after '\x00' is garbage (we can have # ' (deleted)', 'new' and possibly others), see: # https://github.com/giampaolo/psutil/issues/717 path = path.split('\x00')[0] # Certain paths have ' (deleted)' appended. Usually this is # bogus as the file actually exists. Even if it doesn't we # don't care. if path.endswith(' (deleted)') and not path_exists_strict(path): path = path[:-10] return path
def get_proc_inodes(self, pid): inodes = defaultdict(list) for fd in os.listdir("%s/%s/fd" % (self._procfs_path, pid)): try: inode = readlink("%s/%s/fd/%s" % (self._procfs_path, pid, fd)) except OSError as err: # ENOENT == file which is gone in the meantime; # os.stat('/proc/%s' % self.pid) will be done later # to force NSP (if it's the case) if err.errno in (errno.ENOENT, errno.ESRCH): continue elif err.errno == errno.EINVAL: # not a link continue else: raise else: if inode.startswith('socket:['): # the process is using a socket inode = inode[8:][:-1] inodes[inode].append((pid, int(fd))) return inodes
def exe(self): try: return readlink("%s/%s/exe" % (self._procfs_path, self.pid)) except OSError as err: if err.errno in (errno.ENOENT, errno.ESRCH): # no such file error; might be raised also if the # path actually exists for system processes with # low pids (about 0-20) if os.path.lexists("%s/%s" % (self._procfs_path, self.pid)): return "" else: if not pid_exists(self.pid): raise NoSuchProcess(self.pid, self._name) else: raise ZombieProcess(self.pid, self._name, self._ppid) if err.errno in (errno.EPERM, errno.EACCES): raise AccessDenied(self.pid, self._name) raise
def cwd(self): """Return process current working directory.""" # sometimes we get an empty string, in which case we turn # it into None if OPENBSD and self.pid == 0: return None # ...else it would raise EINVAL elif NETBSD: with wrap_exceptions_procfs(self): return os.readlink("/proc/%s/cwd" % self.pid) elif hasattr(cext, 'proc_open_files'): # FreeBSD < 8 does not support functions based on # kinfo_getfile() and kinfo_getvmmap() return cext.proc_cwd(self.pid) or None else: raise NotImplementedError( "supported only starting from FreeBSD 8" if FREEBSD else "")
def terminal(self): procfs_path = self._procfs_path hit_enoent = False tty = wrap_exceptions( cext.proc_basic_info(self.pid, self._procfs_path)[0]) if tty != cext.PRNODEV: for x in (0, 1, 2, 255): try: return os.readlink( '%s/%d/path/%d' % (procfs_path, self.pid, x)) except OSError as err: if err.errno == errno.ENOENT: hit_enoent = True continue raise if hit_enoent: # raise NSP if the process disappeared on us os.stat('%s/%s' % (procfs_path, self.pid))
def open_files(self): retlist = [] hit_enoent = False procfs_path = self._procfs_path pathdir = '%s/%d/path' % (procfs_path, self.pid) for fd in os.listdir('%s/%d/fd' % (procfs_path, self.pid)): path = os.path.join(pathdir, fd) if os.path.islink(path): try: file = os.readlink(path) except OSError as err: # ENOENT == file which is gone in the meantime if err.errno == errno.ENOENT: hit_enoent = True continue raise else: if isfile_strict(file): retlist.append(_common.popenfile(file, int(fd))) if hit_enoent: # raise NSP if the process disappeared on us os.stat('%s/%s' % (procfs_path, self.pid)) return retlist
def _SetCurrent(self, image): filename = '%d.iso' % (image['timestamp']) path = os.path.join(self._image_dir, filename) current_path = os.path.join(self._image_dir, 'current') try: link = os.readlink(current_path) link_path = os.path.join(self._image_dir, link) if link_path == path: return except FileNotFoundError: pass print('Changing current link to:', filename, flush=True) temp_path = tempfile.mktemp(dir=self._image_dir) os.symlink(filename, temp_path) os.rename(temp_path, current_path)
def get_function_by_ifname(ifname): """Get the function by the interface name Given the device name, returns the PCI address of a device and returns True if the address is in a physical function. """ dev_path = "/sys/class/net/%s/device" % ifname sriov_totalvfs = 0 if os.path.isdir(dev_path): try: # sriov_totalvfs contains the maximum possible VFs for this PF with open(os.path.join(dev_path, _SRIOV_TOTALVFS)) as fd: sriov_totalvfs = int(fd.read()) return (os.readlink(dev_path).strip("./"), sriov_totalvfs > 0) except (IOError, ValueError): return os.readlink(dev_path).strip("./"), False return None, False
def get_vf_num_by_pci_address(pci_addr): """Get the VF number based on a VF's pci address A VF is associated with an VF number, which ip link command uses to configure it. This number can be obtained from the PCI device filesystem. """ VIRTFN_RE = re.compile("virtfn(\d+)") virtfns_path = "/sys/bus/pci/devices/%s/physfn/virtfn*" % (pci_addr) vf_num = None try: for vf_path in glob.iglob(virtfns_path): if re.search(pci_addr, os.readlink(vf_path)): t = VIRTFN_RE.search(vf_path) vf_num = t.group(1) break except Exception: pass if vf_num is None: raise exception.PciDeviceNotFoundById(id=pci_addr) return vf_num
def test_update_link_to(self): for ver in six.moves.range(1, 6): for kind in ALL_FOUR: self._write_out_kind(kind, ver) self.assertEqual(ver, self.test_rc.current_version(kind)) # pylint: disable=protected-access self.test_rc._update_link_to("cert", 3) self.test_rc._update_link_to("privkey", 2) self.assertEqual(3, self.test_rc.current_version("cert")) self.assertEqual(2, self.test_rc.current_version("privkey")) self.assertEqual(5, self.test_rc.current_version("chain")) self.assertEqual(5, self.test_rc.current_version("fullchain")) # Currently we are allowed to update to a version that doesn't exist self.test_rc._update_link_to("chain", 3000) # However, current_version doesn't allow querying the resulting # version (because it's a broken link). self.assertEqual(os.path.basename(os.readlink(self.test_rc.chain)), "chain3000.pem")
def get_link_target(link): """Get an absolute path to the target of link. :param str link: Path to a symbolic link :returns: Absolute path to the target of link :rtype: str :raises .CertStorageError: If link does not exists. """ try: target = os.readlink(link) except OSError: raise errors.CertStorageError( "Expected {0} to be a symlink".format(link)) if not os.path.isabs(target): target = os.path.join(os.path.dirname(link), target) return os.path.abspath(target)
def _fix_symlinks(self): """Fixes symlinks in the event of an incomplete version update. If there is no problem with the current symlinks, this function has no effect. """ previous_symlinks = self._previous_symlinks() if all(os.path.exists(link[1]) for link in previous_symlinks): for kind, previous_link in previous_symlinks: current_link = getattr(self, kind) if os.path.lexists(current_link): os.unlink(current_link) os.symlink(os.readlink(previous_link), current_link) for _, link in previous_symlinks: if os.path.exists(link): os.unlink(link)
def packet_socket(): ''' Function to return a list of pids and process names utilizing packet sockets. ''' packetcontent = _packetload() packetresult = [] for line in packetcontent: line_array = _remove_empty(line.split(' ')) inode = line_array[8].rstrip() pid = _get_pid_of_inode(inode) try: exe = os.readlink('/proc/' + pid + '/exe') except BaseException: exe = None nline = [pid, exe] packetresult.append(nline) return packetresult
def need_to_create_symlink(directory, checksums, filetype, symlink_path): """Check if we need to create a symlink for an existing file""" # If we don't have a symlink path, we don't need to create a symlink if symlink_path is None: return False pattern = EFormats.get_content(filetype) filename, _ = get_name_and_checksum(checksums, pattern) full_filename = os.path.join(directory, filename) symlink_name = os.path.join(symlink_path, filename) if os.path.islink(symlink_name): existing_link = os.readlink(symlink_name) if full_filename == existing_link: return False return True
def is_git_dir(d): """ This is taken from the git setup.c:is_git_directory function. @throws WorkTreeRepositoryUnsupported if it sees a worktree directory. It's quite hacky to do that here, but at least clearly indicates that we don't support it. There is the unlikely danger to throw if we see directories which just look like a worktree dir, but are none.""" if osp.isdir(d): if osp.isdir(osp.join(d, 'objects')) and osp.isdir(osp.join(d, 'refs')): headref = osp.join(d, 'HEAD') return osp.isfile(headref) or \ (osp.islink(headref) and os.readlink(headref).startswith('refs')) elif (osp.isfile(osp.join(d, 'gitdir')) and osp.isfile(osp.join(d, 'commondir')) and osp.isfile(osp.join(d, 'gitfile'))): raise WorkTreeRepositoryUnsupported(d) return False
def _read_fstab(self, line, **params): if not line or line.startswith('#'): return fields = line.split() dev = fields[0] config = { 'dev': fields[0], 'mount': fields[1], 'fstype': fields[2], } if dev.startswith('/dev/'): try: devlink = os.readlink(dev) dev = os.path.abspath(os.path.join(os.path.dirname(devlink), dev)) except: pass dev = dev.replace('/dev/', '') if dev == params['devname']: return config elif dev.startswith('UUID='): uuid = dev.replace('UUID=', '') partinfo = si.Server.partinfo(devname=params['devname']) if partinfo['uuid'] == uuid: return config
def load_models(self): now = datetime.now() self.rec_candidates = [] if self.last_model_update is None or (now - self.last_model_update).days >= 1: print('loading model', now) model_path = '{0}/it-topics'.format(settings.PORTRAIT_FOLDER) lda_filename = os.readlink('{0}/current_lda_model.gensim'.format(model_path)) self.lda_model = gensim.models.ldamulticore.LdaMulticore.load(lda_filename) self.topic_graph = nx.read_gpickle('{0}/current_topic_graph.nx'.format(model_path)) with gzip.open('{0}/current_candidates.json.gz'.format(model_path), 'rt') as f: self.rec_candidates = json.load(f) print('loaded', len(self.rec_candidates), 'candidates') self.last_model_update = datetime.now()
def copyfile(src, dest, symlink=True): if not os.path.exists(src): # Some bad symlink in the src logger.warn('Cannot find file %s (bad symlink)', src) return if os.path.exists(dest): logger.debug('File %s already exists', dest) return if not os.path.exists(os.path.dirname(dest)): logger.info('Creating parent directories for %s' % os.path.dirname(dest)) os.makedirs(os.path.dirname(dest)) if not os.path.islink(src): srcpath = os.path.abspath(src) else: srcpath = os.readlink(src) if symlink and hasattr(os, 'symlink') and not is_win: logger.info('Symlinking %s', dest) try: os.symlink(srcpath, dest) except (OSError, NotImplementedError): logger.info('Symlinking failed, copying to %s', dest) copyfileordir(src, dest) else: logger.info('Copying to %s', dest) copyfileordir(src, dest)
def mount_lowerdirs(self): # First, build a dependency graph in order to avoid duplicate entries dependencies = {} def dependency_path(dep): container = self.__class__(dep['imageName']) path = dep.get('path', os.readlink(os.path.join(container.path, 'overlay.fs'))) return os.path.join(dep['imageName'], path) pending_deps = set(map(dependency_path, self.dependencies)) while len(pending_deps) > 0: path = pending_deps.pop() name = path.split('/')[-2] if name not in dependencies: dependencies[path] = set(map(dependency_path, self.__class__(name).dependencies)) pending_deps |= dependencies[path] # Then sort it topologically. The list is reversed, because overlayfs # will check the mounts in order they are given, so the base fs has to # be the last one. dependencies = reversed(list(toposort_flatten(dependencies))) return ':'.join(os.path.join(self.metadata_dir, dep) for dep in dependencies)
def recursive_set_attributes(module, b_path, follow, file_args): changed = False for b_root, b_dirs, b_files in os.walk(b_path): for b_fsobj in b_dirs + b_files: b_fsname = os.path.join(b_root, b_fsobj) if not os.path.islink(b_fsname): tmp_file_args = file_args.copy() tmp_file_args['path'] = to_native(b_fsname, errors='surrogate_or_strict') changed |= module.set_fs_attributes_if_different(tmp_file_args, changed) else: tmp_file_args = file_args.copy() tmp_file_args['path'] = to_native(b_fsname, errors='surrogate_or_strict') changed |= module.set_fs_attributes_if_different(tmp_file_args, changed) if follow: b_fsname = os.path.join(b_root, os.readlink(b_fsname)) if os.path.isdir(b_fsname): changed |= recursive_set_attributes(module, b_fsname, follow, file_args) tmp_file_args = file_args.copy() tmp_file_args['path'] = to_native(b_fsname, errors='surrogate_or_strict') changed |= module.set_fs_attributes_if_different(tmp_file_args, changed) return changed
def lcopy(src, dst, no_hardlinks=False): try: if issymlink(src): linkto = os.readlink(src) os.symlink(linkto, dst) return True else: if no_hardlinks: shutil.copy(src, dst) else: hardlink(src, dst) return False except EnvironmentError as err: if err.errno == errno.EEXIST: os.unlink(dst) return lcopy(src, dst) raise
def get_file_contents(code: models.File) -> bytes: """Get the contents of the given :class:`.models.File`. :param code: The file object to read. :returns: The contents of the file with newlines. """ if code.is_directory: raise APIException( 'Cannot display this file as it is a directory.', f'The selected file with id {code.id} is a directory.', APICodes.OBJECT_WRONG_TYPE, 400 ) filename = code.get_diskname() if os.path.islink(filename): raise APIException( f'This file is a symlink to `{os.readlink(filename)}`.', 'The file {} is a symlink'.format(code.id), APICodes.INVALID_STATE, 410 ) with open(filename, 'rb') as codefile: return codefile.read()