我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用utils.run_command()。
def make_partition(session, dev, partition_start, partition_end): # Since XS7.0 which has sfdisk V2.23, we observe sfdisk has a bug # that sfdisk will wrongly calculate cylinders when specify Sector # as unit (-uS). That bug will cause the partition operation failed. # And that's fixed in 2.26. So as a workaround, let's use the option # of '--force' for version <=2.25 and >=2.23. '--force' will ignore # the wrong cylinder value but works as expected. VER_FORCE_MIN = '2.23' VER_FORCE_MAX = '2.25' dev_path = utils.make_dev_path(dev) if partition_end != "-": raise pluginlib.PluginError("Can only create unbounded partitions") sfdisk_ver = _get_sfdisk_version() cmd_list = ['sfdisk', '-uS', dev_path] if sfdisk_ver: if StrictVersion(sfdisk_ver) >= StrictVersion(VER_FORCE_MIN) and \ StrictVersion(sfdisk_ver) <= StrictVersion(VER_FORCE_MAX): cmd_list = ['sfdisk', '--force', '-uS', dev_path] utils.run_command(cmd_list, '%s,;\n' % (partition_start))
def _mkfs(fs, path, label): """Format a file or block device :param fs: Filesystem type (only 'swap', 'ext3' supported) :param path: Path to file or block device to format :param label: Volume label to use """ if fs == 'swap': args = ['mkswap'] elif fs == 'ext3': args = ['mkfs', '-t', fs] # add -F to force no interactive execute on non-block device. args.extend(['-F']) if label: args.extend(['-L', label]) else: raise pluginlib.PluginError("Partition type %s not supported" % fs) args.append(path) utils.run_command(args)
def _get_sfdisk_version(): out = utils.run_command(['/sbin/sfdisk', '-v']) if out: # Return the first two numbers from the version. # In XS6.5, it's 2.13-pre7. Just return 2.13 for this case. pattern = re.compile("(\d+)\.(\d+)") match = pattern.search(out.split('\n')[0]) if match: return match.group(0)
def mkfs(session, dev, partnum, fs_type, fs_label): dev_path = utils.make_dev_path(dev) out = utils.run_command(['kpartx', '-avspp', dev_path]) try: logging.info('kpartx output: %s' % out) mapperdir = os.path.join('/dev', 'mapper') dev_base = os.path.basename(dev) partition_path = os.path.join(mapperdir, "%sp%s" % (dev_base, partnum)) _mkfs(fs_type, partition_path, fs_label) finally: # Always remove partitions otherwise we can't unplug the VBD utils.run_command(['kpartx', '-dvspp', dev_path])
def _create_iso(mkisofs_cmd, filename, path): logging.debug("Creating ISO '%s'..." % filename) orig_dir = os.getcwd() os.chdir(path) try: utils.run_command([mkisofs_cmd, '-quiet', '-l', '-o', filename, '-c', 'boot.cat', '-b', 'isolinux.bin', '-no-emul-boot', '-boot-load-size', '4', '-boot-info-table', '.']) finally: os.chdir(orig_dir)
def _run_command(cmd, cmd_input=None): """Wrap utils.run_command to raise PluginError on failure""" try: return utils.run_command(cmd, cmd_input=cmd_input) except utils.SubprocessException, e: # noqa raise pluginlib.PluginError(e.err)
def checkout_from(cls, remote_repo_url, repo_path): """ Checkout a repository from a remote URL into a local path. """ LOG.info("Checking out repository from '%s' into '%s'" % (remote_repo_url, repo_path)) command = 'svn checkout ' proxy = CONF.get('http_proxy') if proxy: url = urlparse.urlparse(proxy) host = url.scheme + '://' + url.hostname port = url.port options = ("servers:global:http-proxy-host='%s'" % host, "servers:global:http-proxy-port='%s'" % port) proxy_conf = ['--config-option ' + option for option in options] command += ' '.join(proxy_conf) + ' ' command += '%(remote_repo_url)s %(local_target_path)s' % \ {'remote_repo_url': remote_repo_url, 'local_target_path': repo_path} try: utils.run_command(command) return SvnRepository(remote_repo_url, repo_path) except: message = "Failed to clone repository" LOG.exception(message) raise exception.RepositoryError(message=message)
def make_partition(session, dev, partition_start, partition_end): dev_path = utils.make_dev_path(dev) if partition_end != "-": raise pluginlib.PluginError("Can only create unbounded partitions") utils.run_command(['sfdisk', '-uS', dev_path], '%s,;\n' % (partition_start))
def _create_ssh_keys(self): """ Generate a pair of ssh keys for this prefix Returns: None Raises: RuntimeError: if it fails to create the keys """ ret, _, _ = utils.run_command( [ 'ssh-keygen', '-t', 'rsa', '-N', '', '-f', self.paths.ssh_id_rsa(), ] ) if ret != 0: raise RuntimeError( 'Failed to crate ssh keys at %s', self.paths.ssh_id_rsa(), )
def _run_qemu(qemu_cmd, disk_path): ret = utils.run_command(qemu_cmd) if ret.code != 0: raise RuntimeError( 'Failed to create image, qemu-img returned %d:\n' 'out:%s\nerr:%s' % ret, ) # To avoid losing access to the file os.chmod(disk_path, 0666) return ret
def _brctl(command, *args): ret, out, err = utils.run_command(_BRCTL + [command] + list(args)) if ret: raise RuntimeError( 'brctl %s failed\nrc: %d\n\nout:\n%s\n\nerr:\n%s' % (command, ret, out, err) ) return ret, out, err
def _set_link(name, state): ret, _, _ = utils.run_command(_IP + ['link', 'set', 'dev', name, state]) if ret: raise RuntimeError('Could not set %s to state %s' % (name, state))
def find_repo_by_name(name, repo_dir=None): """ Searches the given repo name inside the repo_dir (will use the config value 'template_repos' if no repo dir passed), will rise an exception if not found Args: name (str): Name of the repo to search repo_dir (str): Directory where to search the repo Return: str: path to the repo Raises: RuntimeError: if not found """ if repo_dir is None: repo_dir = config.get('template_repos') ret, out, _ = utils.run_command([ 'find', repo_dir, '-name', '*.json', ], ) repos = [ TemplateRepository.from_url(line.strip()) for line in out.split('\n') if len(line.strip()) ] for repo in repos: if repo.name == name: return repo raise RuntimeError('Could not find repo %s' % name)
def archive(self, archive_name, build_dir): """ Archive repository and its submodules into a single compressed file. Args: archive_name (str): prefix of the resulting archive file name build_dir (str): path to the directory to place the archive file """ archive_file_path = os.path.join(build_dir, archive_name + ".tar") LOG.info("Archiving {name} into {file}" .format(name=self.name, file=archive_file_path)) with open(archive_file_path, "wb") as archive_file: super(GitRepository, self).archive( archive_file, prefix=archive_name + "/", format="tar") # Generate one tar file for each submodule submodules_archives_paths = [] for submodule in self.submodules: submodule_archive_file_path = os.path.join( build_dir, "%s-%s.tar" % ( archive_name, submodule.name.replace("/", "_"))) LOG.info("Archiving submodule {name} into {file}".format( name=submodule.name, file=submodule_archive_file_path)) with open(submodule_archive_file_path, "wb") as archive_file: submodule.module().archive(archive_file, prefix=os.path.join( archive_name, submodule.path) + "/", format="tar") submodules_archives_paths.append(submodule_archive_file_path) if submodules_archives_paths: LOG.info("Concatenating {name} archive with submodules" .format(name=self.name)) for submodule_archive_path in submodules_archives_paths: # The tar --concatenate option has a bug, producing an # undesired result when more than two files are # concatenated: # https://lists.gnu.org/archive/html/bug-tar/2008-08/msg00002.html cmd = "tar --concatenate --file %s %s" % ( archive_file_path, submodule_archive_path) utils.run_command(cmd) compressed_archive_file_path = archive_file_path + ".gz" LOG.info("Compressing {name} archive into {file}" .format(name=self.name, file=compressed_archive_file_path)) cmd = "gzip --fast %s" % archive_file_path utils.run_command(cmd) return compressed_archive_file_path
def sysprep(disk, distro, loader=None, backend='direct', **kwargs): """ Run virt-sysprep on the ``disk``, commands are built from the distro specific template and arguments passed in ``kwargs``. If no template is available it will default to ``sysprep-base.j2``. Args: disk(str): path to disk distro(str): distro to render template for loader(jinja2.BaseLoader): Jinja2 template loader, if not passed, will search Lago's package. backend(str): libguestfs backend to use **kwargs(dict): environment variables for Jinja2 template Returns: None Raises: RuntimeError: On virt-sysprep none 0 exit code. """ if loader is None: loader = PackageLoader('lago', 'templates') sysprep_file = _render_template(distro, loader=loader, **kwargs) cmd = ['virt-sysprep', '-a', disk] cmd.extend(['--commands-from-file', sysprep_file]) env = os.environ.copy() if 'LIBGUESTFS_BACKEND' not in env: env['LIBGUESTFS_BACKEND'] = backend ret = utils.run_command(cmd, env=env) if ret: raise RuntimeError( 'Failed to bootstrap %s\ncommand:%s\nstdout:%s\nstderr:%s' % ( disk, ' '.join('"%s"' % elem for elem in cmd), ret.out, ret.err, ) )
def download(self, temp_ver, store_metadata=True): """ Retrieve the given template version Args: temp_ver (TemplateVersion): template version to retrieve store_metadata (bool): If set to ``False``, will not refresh the local metadata with the retrieved one Returns: None """ dest = self._prefixed(temp_ver.name) temp_dest = '%s.tmp' % dest with lockfile.LockFile(dest): # Image was downloaded while we were waiting if os.path.exists(dest): return temp_ver.download(temp_dest) if store_metadata: with open('%s.metadata' % dest, 'w') as f: utils.json_dump(temp_ver.get_metadata(), f) sha1 = utils.get_hash(temp_dest) if temp_ver.get_hash() != sha1: raise RuntimeError( 'Image %s does not match the expected hash %s' % ( temp_ver.name, sha1.hexdigest(), ) ) with open('%s.hash' % dest, 'w') as f: f.write(sha1) with log_utils.LogTask('Convert image', logger=LOGGER): result = utils.run_command( [ 'qemu-img', 'convert', '-O', 'raw', temp_dest, dest, ], ) os.unlink(temp_dest) if result: raise RuntimeError(result.err)