我们从Python开源项目中,提取了以下39个代码示例,用于说明如何使用os.chroot()。
def change_root_directory(directory): """ Change the root directory of this process. :param directory: The target directory path. :return: ``None``. Set the current working directory, then the process root directory, to the specified `directory`. Requires appropriate OS privileges for this process. """ try: os.chdir(directory) os.chroot(directory) except Exception as exc: error = DaemonOSEnvironmentError( "Unable to change root directory ({exc})".format(exc=exc)) raise error
def orphansKill(rootToKill, killsig=signal.SIGTERM): """kill off anything that is still chrooted.""" getLog().debug("kill orphans") if USE_NSPAWN is False: for fn in [d for d in os.listdir("/proc") if d.isdigit()]: try: root = os.readlink("/proc/%s/root" % fn) if os.path.realpath(root) == os.path.realpath(rootToKill): getLog().warning("Process ID %s still running in chroot. Killing...", fn) pid = int(fn, 10) os.kill(pid, killsig) os.waitpid(pid, 0) except OSError: pass else: m_uuid = get_machinectl_uuid(rootToKill) if m_uuid: getLog().warning("Machine %s still running. Killing...", m_uuid) os.system("/usr/bin/machinectl terminate %s" % m_uuid)
def create_patched_packages(queue): """ Patches the given package using rpmrebuild and the patching root. @param root The root to be used. """ root = queue.get() logging.debug("Chrooting to {0}".format(root)) os.chroot(root) logging.debug("Chrooting to {0} done.".format(root)) os.chdir("/") if not os.path.isfile("/Makefile"): logging.info("Chroot has no jobs to perform.") queue.task_done() return make_command = ["make"] if not hidden_subprocess.visible_mode: make_command.append("--silent") subprocess.call(make_command) logging.debug("Exiting from {0}".format(root)) queue.task_done()
def __unpack_qemu_packages(self): """ Looks for all qemu packages in the given list of repositories and unpacks them to the given directory. """ initial_directory = os.getcwd() qemu_packages = [] qemu_package = self.qemu_path if qemu_package is None: expression = "^qemu.*\.{0}\.rpm$".format(self.architecture) for repository in self.repositories: qemu_packages_portion = files.find_fast(repository, expression) qemu_packages.extend(qemu_packages_portion) logging.warning("The following qemu packages will be unpacked in " "chroot:") for package in qemu_packages: logging.warning(" * {0}".format(package)) else: qemu_packages.append(qemu_package) for package in qemu_packages: files.unrpm(package, self.patching_root)
def setupEnvironment(config): if config['chroot'] is not None: os.chroot(config['chroot']) if config['rundir'] == '.': config['rundir'] = '/' os.chdir(config['rundir']) if not config['nodaemon']: daemonize() if config['pidfile']: open(config['pidfile'],'wb').write(str(os.getpid()))
def contain(command, image_name, image_dir, container_id, container_dir): linux.unshare(linux.CLONE_NEWNS) # create a new mount namespace # TODO: we added MS_REC here. wanna guess why? linux.mount(None, '/', None, linux.MS_PRIVATE | linux.MS_REC, None) new_root = create_container_root( image_name, image_dir, container_id, container_dir) print('Created a new root fs for our container: {}'.format(new_root)) # Create mounts (/proc, /sys, /dev) under new_root linux.mount('proc', os.path.join(new_root, 'proc'), 'proc', 0, '') linux.mount('sysfs', os.path.join(new_root, 'sys'), 'sysfs', 0, '') linux.mount('tmpfs', os.path.join(new_root, 'dev'), 'tmpfs', linux.MS_NOSUID | linux.MS_STRICTATIME, 'mode=755') # Add some basic devices devpts_path = os.path.join(new_root, 'dev', 'pts') if not os.path.exists(devpts_path): os.makedirs(devpts_path) linux.mount('devpts', devpts_path, 'devpts', 0, '') makedev(os.path.join(new_root, 'dev')) os.chroot(new_root) # TODO: replace with pivot_root os.chdir('/') # TODO: umount2 old root (HINT: see MNT_DETACH in man 2 umount) os.execvp(command[0], command)
def contain(command, image_name, image_dir, container_id, container_dir): new_root = create_container_root( image_name, image_dir, container_id, container_dir) print('Created a new root fs for our container: {}'.format(new_root)) # TODO: time to say goodbye to the old mount namespace, # see "man 2 unshare" to get some help # HINT 1: there is no os.unshare(), time to use the linux module we made # just for you! # HINT 2: the linux module includes both functions and constants! # e.g. linux.CLONE_NEWNS # TODO: remember shared subtrees? # (https://www.kernel.org/doc/Documentation/filesystems/sharedsubtree.txt) # Make / a private mount to avoid littering our host mount table. # Create mounts (/proc, /sys, /dev) under new_root linux.mount('proc', os.path.join(new_root, 'proc'), 'proc', 0, '') linux.mount('sysfs', os.path.join(new_root, 'sys'), 'sysfs', 0, '') linux.mount('tmpfs', os.path.join(new_root, 'dev'), 'tmpfs', linux.MS_NOSUID | linux.MS_STRICTATIME, 'mode=755') # Add some basic devices devpts_path = os.path.join(new_root, 'dev', 'pts') if not os.path.exists(devpts_path): os.makedirs(devpts_path) linux.mount('devpts', devpts_path, 'devpts', 0, '') for i, dev in enumerate(['stdin', 'stdout', 'stderr']): os.symlink('/proc/self/fd/%d' % i, os.path.join(new_root, 'dev', dev)) # TODO: add more devices (e.g. null, zero, random, urandom) using os.mknod. os.chroot(new_root) os.chdir('/') os.execvp(command[0], command)
def system_jail(): """ A simple chroot jail """ os.chroot('safe_root/') yield os.chroot('/')
def chroot(self): """ .. seealso:: :func:`os.chroot` """ os.chroot(self)
def condChroot(chrootPath): if chrootPath is not None: saved = {"ruid": os.getuid(), "euid": os.geteuid()} setresuid(0, 0, 0) os.chdir(chrootPath) os.chroot(chrootPath) setresuid(saved['ruid'], saved['euid'])
def _prepare_nspawn_command(chrootPath, user, cmd, nspawn_args=None, env=None, cwd=None): cmd_is_list = isinstance(cmd, list) if nspawn_args is None: nspawn_args = [] if user: # user can be either id or name if cmd_is_list: cmd = ['-u', str(user)] + cmd else: raise exception.Error('Internal Error: command must be list or shell=True.') elif not cmd_is_list: cmd = [cmd] nspawn_argv = ['/usr/bin/systemd-nspawn', '-q', '-M', uuid.uuid4().hex, '-D', chrootPath] distro_label = distro.linux_distribution(full_distribution_name=False)[0] if (distro_label != 'centos') and (distro_label != 'ol') and (distro_label != 'rhel') and (distro_label != 'deskos'): # EL7 does not support it (yet). See BZ 1417387 nspawn_argv += ['-a'] nspawn_argv.extend(nspawn_args) if cwd: nspawn_argv.append('--chdir={0}'.format(cwd)) if env: # BZ 1312384 workaround env['PROMPT_COMMAND'] = r'printf "\033]0;<mock-chroot>\007"' env['PS1'] = r'<mock-chroot> \s-\v\$ ' for k, v in env.items(): nspawn_argv.append('--setenv={0}={1}'.format(k, v)) cmd = nspawn_argv + cmd if cmd_is_list: return cmd else: return " ".join(cmd)
def doshell(chrootPath=None, environ=None, uid=None, gid=None, cmd=None, nspawn_args=None, unshare_ipc=True, unshare_net=False): log = getLog() log.debug("doshell: chrootPath:%s, uid:%d, gid:%d", chrootPath, uid, gid) if environ is None: environ = clean_env() if 'PROMPT_COMMAND' not in environ: environ['PROMPT_COMMAND'] = r'printf "\033]0;<mock-chroot>\007"' if 'PS1' not in environ: environ['PS1'] = r'<mock-chroot> \s-\v\$ ' if 'SHELL' not in environ: environ['SHELL'] = '/bin/sh' log.debug("doshell environment: %s", environ) if cmd: if not isinstance(cmd, list): cmd = [cmd] cmd = ['/bin/sh', '-c'] + cmd else: cmd = ["/bin/sh", "-i", "-l"] if USE_NSPAWN: # nspawn cannot set gid cmd = _prepare_nspawn_command(chrootPath, uid, cmd, nspawn_args=nspawn_args, env=environ) preexec = ChildPreExec(personality=None, chrootPath=chrootPath, cwd=None, uid=uid, gid=gid, env=environ, shell=True, unshare_ipc=unshare_ipc, unshare_net=unshare_net) log.debug("doshell: command: %s", cmd) return subprocess.call(cmd, preexec_fn=preexec, env=environ, shell=False)
def do_update_config(log, config_opts, cfg, uidManager, name, skipError=True): if os.path.exists(cfg): config_opts['config_paths'].append(cfg) update_config_from_file(config_opts, cfg, uidManager) check_macro_definition(config_opts) elif not skipError: log.error("Could not find required config file: %s", cfg) if name == "default": log.error(" Did you forget to specify the chroot to use with '-r'?") if "/" in cfg: log.error(" If you're trying to specify a path, include the .cfg extension, e.g. -r ./target.cfg") sys.exit(1)
def umount(self): try: subprocess.check_call(['umount', os.path.join(self.chroot, 'proc')]) except subprocess.CalledProcessError: pass
def mount(self): try: mount_point = os.path.join(self.chroot, 'proc') if not os.path.ismount(mount_point): subprocess.check_call(['mount', '-t', 'proc', 'none', mount_point]) except subprocess.CalledProcessError: pass
def __init__(self, path): self.chroot = path self.orig_fd = os.open("/", os.O_RDONLY) self.chroot_fd = os.open(self.chroot, os.O_RDONLY)
def __enter__(self): self.mount() os.chroot(self.chroot) os.fchdir(self.chroot_fd) return self
def __exit__(self, exc_type, exc_val, exc_tb): os.fchdir(self.orig_fd) os.chroot(".") os.close(self.orig_fd) os.close(self.chroot_fd) self.umount()
def get_inside_path(self, path): return path.replace(self.chroot, "")
def enterChroot(path): os.chdir(path) os.chroot(path)
def executeCommand(command): cwd = os.getcwd() rr = os.open("/", os.O_RDONLY) os.chroot(getRootDir.getEnvsDir() + getEnvName()) os.chdir("/") os.system(command) os.fchdir(rr) os.chroot(".") os.chdir(cwd)
def __install_rpmrebuild(self, queue): """ Chroots to the given path and installs rpmrebuild in it. @param queue The queue where the result will be put. """ os.chroot(self.patching_root) os.chdir("/") os.chdir("/rpmrebuild/src") hidden_subprocess.call("Making the rpmrebuild.", ["make"]) hidden_subprocess.call("Installing the rpmrebuild.", ["make", "install"]) if not check.command_exists("rpmrebuild"): sys.exit("Error.") queue.put(True)
def __deploy_packages(self): """ Deploys packages to chroot clones and generates makefiles for them. """ self._tasks.sort(key=lambda task: os.stat(task[1]).st_size) copy_tasks = [] for i in range(repository_combiner.jobs_number): tasks = [] i_task = i while i_task < len(self._tasks): tasks.append(self._tasks[i_task]) i_task += repository_combiner.jobs_number if len(tasks) == 0: continue directories = {} for task in tasks: package_name, package_path, target, _, _ = task copy_tasks.append((package_name, package_path, self.patching_root_clones[i])) self._targets[package_name] = target basename = os.path.basename(target) self._package_names[basename] = package_name self._generate_makefile(self.patching_root_clones[i], tasks) hidden_subprocess.function_call_list( "Copying to patcher", shutil.copy, copy_tasks)
def __use_cached_root_or_prepare(self): """ Tries to find cached root and uses it in case it exists and prepares it otherwise. """ image_info = "{0}".format((self.names, self.repositories, self.architecture, os.path.basename(self.kickstart_file_path))) cached_images_info_paths = files.find_fast( patching_cache_path, ".*preliminary_image.info.txt") matching_images_path = None for info_path in cached_images_info_paths: cached_images_path = info_path.replace(".info.txt", "") if not os.path.isdir(cached_images_path): logging.error("Directory {0} not " "found!".format(cached_images_path)) continue lines = [] with open(info_path, "r") as info_file: for line in info_file: lines.append(line) if lines[0] == image_info: matching_images_path = cached_images_path break if matching_images_path is not None: self.patching_root = matching_images_path logging.info("Found already prepared patching root: " "{0}".format(matching_images_path)) else: self.__prepare() cached_chroot_path = os.path.join( patching_cache_path, os.path.basename( self.patching_root) + "preliminary_image") hidden_subprocess.call( "Saving chroot to cache", ["cp", "-a", self.patching_root, cached_chroot_path]) info_path = cached_chroot_path + ".info.txt" with open(info_path, "wb") as info_file: info_file.write(image_info)
def copy_boot(self): cluster = Cluster(mongo_db = self._mongo_db) image_path = str(self.get('path')) kernver = str(self.get('kernver')) tmp_path = '/tmp' # in chroot env initrdfile = str(self.name) + '-initramfs-' + kernver kernfile = str(self.name) + '-vmlinuz-' + kernver path = cluster.get('path') if not path: self._logger.error("Path needs to be configured.") return None path = str(path) user = cluster.get('user') if not user: self._logger.error("User needs to be configured.") return None path_to_store = path + "/boot" user_id = pwd.getpwnam(user).pw_uid grp_id = pwd.getpwnam(user).pw_gid if not os.path.exists(path_to_store): os.makedirs(path_to_store) os.chown(path_to_store, user_id, grp_id) shutil.copy(image_path + '/boot/initramfs-' + kernver + '.img', path_to_store + '/' + initrdfile) shutil.copy(image_path + '/boot/vmlinuz-' + kernver, path_to_store + '/' + kernfile) os.chown(path_to_store + '/' + initrdfile, user_id, grp_id) os.chmod(path_to_store + '/' + initrdfile, 0644) os.chown(path_to_store + '/' + kernfile, user_id, grp_id) os.chmod(path_to_store + '/' + kernfile, 0644) self.set('kernfile', kernfile) self.set('initrdfile', initrdfile) self._logger.warning("Boot files was copied, but luna module might not being added to initrd. Please check /etc/dracut.conf.d in image") return True
def setupEnvironment(self, chroot, rundir, nodaemon, umask, pidfile): """ Set the filesystem root, the working directory, and daemonize. @type chroot: C{str} or L{None} @param chroot: If not None, a path to use as the filesystem root (using L{os.chroot}). @type rundir: C{str} @param rundir: The path to set as the working directory. @type nodaemon: C{bool} @param nodaemon: A flag which, if set, indicates that daemonization should not be done. @type umask: C{int} or L{None} @param umask: The value to which to change the process umask. @type pidfile: C{str} or L{None} @param pidfile: If not L{None}, the path to a file into which to put the PID of this process. """ daemon = not nodaemon if chroot is not None: os.chroot(chroot) if rundir == '.': rundir = '/' os.chdir(rundir) if daemon and umask is None: umask = 0o077 if umask is not None: os.umask(umask) if daemon: from twisted.internet import reactor self.config["statusPipe"] = self.daemonize(reactor) if pidfile: with open(pidfile, 'wb') as f: f.write(intToBytes(os.getpid()))
def startApplication(self, application): """ Configure global process state based on the given application and run the application. @param application: An object which can be adapted to L{service.IProcess} and L{service.IService}. """ process = service.IProcess(application) if not self.config['originalname']: launchWithName(process.processName) self.setupEnvironment( self.config['chroot'], self.config['rundir'], self.config['nodaemon'], self.config['umask'], self.config['pidfile']) service.IService(application).privilegedStartService() uid, gid = self.config['uid'], self.config['gid'] if uid is None: uid = process.uid if gid is None: gid = process.gid self.shedPrivileges(self.config['euid'], uid, gid) app.startApplication(application, not self.config['no_save'])
def setUp(self): self.root = self.unset self.cwd = self.unset self.mask = self.unset self.daemon = False self.pid = os.getpid() self.patch(os, 'chroot', lambda path: setattr(self, 'root', path)) self.patch(os, 'chdir', lambda path: setattr(self, 'cwd', path)) self.patch(os, 'umask', lambda mask: setattr(self, 'mask', mask)) self.runner = UnixApplicationRunner(twistd.ServerOptions()) self.runner.daemonize = self.daemonize
def test_chroot(self): """ L{UnixApplicationRunner.setupEnvironment} changes the root of the filesystem if passed a non-L{None} value for the C{chroot} parameter. """ self.runner.setupEnvironment("/foo/bar", ".", True, None, None) self.assertEqual(self.root, "/foo/bar")
def test_noChroot(self): """ L{UnixApplicationRunner.setupEnvironment} does not change the root of the filesystem if passed L{None} for the C{chroot} parameter. """ self.runner.setupEnvironment(None, ".", True, None, None) self.assertIs(self.root, self.unset)
def chroot(self): os.chroot(self)
def __init__(self, options): self.ports = options.ports self.password = options.password self.ssl_pem_file = options.ssl_pem_file self.motdfile = options.motd self.verbose = options.verbose self.debug = options.debug self.logdir = options.logdir self.chroot = options.chroot self.setuid = options.setuid self.statedir = options.statedir if self.ssl_pem_file: self.ssl = __import__("ssl") # Find certificate after daemonization if path is relative: if self.ssl_pem_file and os.path.exists(self.ssl_pem_file): self.ssl_pem_file = os.path.abspath(self.ssl_pem_file) # else: might exist in the chroot jail, so just continue if options.listen: self.address = socket.gethostbyname(options.listen) else: self.address = "" server_name_limit = 63 # From the RFC. self.name = socket.getfqdn(self.address)[:server_name_limit] self.channels = {} # irc_lower(Channel name) --> Channel instance. self.clients = {} # Socket --> Client instance. self.nicknames = {} # irc_lower(Nickname) --> Client instance. if self.logdir: create_directory(self.logdir) if self.statedir: create_directory(self.statedir)
def doScript(section, script, scriptnum): fname = '/run/ks-script-%s-%d' % (section, scriptnum) f = open(fname, 'w') f.write('#!%s\n\n' % script.interp) s = '%s' % script for line in s.split('\n'): if line.startswith('%pre') or line.startswith('%post') \ or line.startswith('%end'): # # skip # continue f.write('%s\n' % line) f.close() os.chmod(fname, 0700) pid = os.fork() if pid == 0: if section == 'post' and script.inChroot: shutil.copy(fname, '/run/mnt/sysimage%s' % fname) os.chroot('/run/mnt/sysimage') # # set stdout and stderr to files on the disk so we can examine # the output and errors # fout = open('%s.out' % fname, 'w') ferr = open('%s.err' % fname, 'w') subprocess.call([ fname ], stdout = fout, stderr = ferr) fout.close() ferr.close() sys.exit(0) else: os.wait()
def chroot(self, path): '''Change root directory path @param path: Path to chroot() to @type path: string ''' if not path or not j.data.types.unixdirpath.check(path): raise ValueError('Path %s is invalid' % path) j.logger.logging.info('Change root to %s' % path) os.chroot(path)
def daemonize( work_dir = None, chroot_dir = None, umask = None, uid = None, gid = None, pidfile = None, files_preserve = [], signals = {}): # Dirs, limits, users if chroot_dir is not None: os.chdir(chroot_dir) os.chroot(chroot_dir) if umask is not None: os.umask(umask) if work_dir is not None: os.chdir(work_dir) if gid is not None: os.setgid(gid) if uid is not None: os.setuid(uid) # Doublefork, split session if os.fork()>0: os._exit(0) os.setsid() if os.fork()>0: os._exit(0) # Setup signal handlers for (signum, handler) in signals.items(): signal.signal(signum, handler) # Close descriptors descr_preserve = set(f.fileno() for f in files_preserve) maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] if maxfd==resource.RLIM_INFINITY: maxfd = 65535 for fd in range(maxfd, 3, -1): # 3 means omit stdin, stdout, stderr if fd not in descr_preserve: try: os.close(fd) except Exception: pass # Redirect stdin, stdout, stderr to /dev/null devnull = os.open(os.devnull, os.O_RDWR) for fd in range(3): os.dup2(devnull, fd) # PID file if pidfile is not None: pidd = os.open(pidfile, os.O_RDWR|os.O_CREAT|os.O_EXCL|os.O_TRUNC) os.write(pidd, str(os.getpid())+"\n") os.close(pidd) # Define and setup atexit closure @atexit.register def unlink_pid(): try: os.unlink(pidfile) except Exception: pass
def prepare_minimal_packages_list(graphs): """ Prepares the minimal list of package names that are needed to be installed in the chroot so that rpmrebuild can be used inside it. @param graphs The list of dependency graphs of repositories. @return The list of packages. """ symbols = ["useradd", "mkdir", "awk", "cpio", "make", "rpmbuild", "sed"] deprecated_substrings = ["mic-bootstrap", "x86", "x64"] providers = {} for symbol in symbols: for graph in graphs: if providers.get(symbol) is None: providers[symbol] = Set() names = graph.get_provider_names(symbol) if names is not None: providers[symbol] = providers[symbol] | Set(names) logging.debug("Got providers {0} for symbol " "{1}".format(names, symbol)) logging.debug("{0}".format(providers[symbol])) packages = [] for symbol in symbols: provider = None if len(providers[symbol]) < 1: for graph in graphs: for key in graph.symbol_providers.keys(): logging.debug("{0} : {1}".format( key, graph.symbol_providers[key])) logging.error("Failed to find symbol {0}".format(symbol)) logging.error("size: {0}".format(len(graph.symbol_providers))) sys.exit("Error.") elif len(providers[symbol]) > 1: logging.debug("Analyzing symbol {0}:".format(symbol)) for provider in providers[symbol]: logging.debug(" Provided by {0}".format(provider)) for substring in deprecated_substrings: if substring in provider: logging.debug(" ^--- will be ignored, contains " "{0}".format(substring)) providers[symbol] = providers[symbol] - Set([provider]) logging.debug(" {0}".format(providers[symbol])) if len(providers[symbol]) > 1: logging.warning("Multiple provider names for symbol " "\"{0}\":".format(symbol)) for provider in providers[symbol]: logging.warning(" * {0}".format(provider)) provider = next(iter(providers[symbol])) # FIXME: is it correct? else: provider = next(iter(providers[symbol])) packages.append(provider) logging.debug("Minimal packages list:") for package in packages: logging.debug(" * {0}".format(package)) return packages
def __prepare(self): """ Prepares the patching root ready for RPM patching. """ global developer_disable_patching if developer_disable_patching: logging.debug("RPM patcher will not be prepared.") return graphs = self._graphs self.__prepare_image(graphs) self.patching_root = temporaries.mount_firmware(self.images_directory) host_arch = platform.machine() host_arches = self.__produce_architecture_synonyms_list(host_arch) if self.architecture not in host_arches: self.__deploy_qemu_package() combirepo_dir = os.path.abspath(os.path.dirname(__file__)) rpmrebuild_file = os.path.join(combirepo_dir, 'data/rpmrebuild.tar') already_present_rpmrebuilds = files.find_fast(self.patching_root, "rpmrebuild.*") for already_present_rpmrebuild in already_present_rpmrebuilds: if os.path.isdir(already_present_rpmrebuild): shutil.rmtree(already_present_rpmrebuild) elif os.path.isfile(already_present_rpmrebuild): os.remove(already_present_rpmrebuild) hidden_subprocess.call("Extracting the rpmrebuild ", ["tar", "xf", rpmrebuild_file, "-C", self.patching_root]) queue = multiprocessing.Queue() child = multiprocessing.Process(target=self.__install_rpmrebuild, args=(queue,)) child.start() child.join() if queue.empty(): logging.error("Failed to install rpmrebuild into chroot.") sys.exit("Error.") else: result = queue.get() if result: logging.debug("Installation of rpmrebuild successfully " "completed.") else: raise Exception("Impossible happened.")