Python subprocess 模块,check_call() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用subprocess.check_call()

项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def configure_analyst_opsvm():
    '''
    Configures Anaylyst for OPSVM
    '''
    if not service_running('plumgrid'):
        restart_pg()
    opsvm_ip = pg_gw_context._pg_dir_context()['opsvm_ip']
    NS_ENTER = ('/opt/local/bin/nsenter -t $(ps ho pid --ppid $(cat '
                '/var/run/libvirt/lxc/plumgrid.pid)) -m -n -u -i -p ')
    sigmund_stop = NS_ENTER + '/usr/bin/service plumgrid-sigmund stop'
    sigmund_status = NS_ENTER \
        + '/usr/bin/service plumgrid-sigmund status'
    sigmund_autoboot = NS_ENTER \
        + '/usr/bin/sigmund-configure --ip {0} --start --autoboot' \
        .format(opsvm_ip)
    try:
        status = subprocess.check_output(sigmund_status, shell=True)
        if 'start/running' in status:
            if subprocess.call(sigmund_stop, shell=True):
                log('plumgrid-sigmund couldn\'t be stopped!')
                return
        subprocess.check_call(sigmund_autoboot, shell=True)
        status = subprocess.check_output(sigmund_status, shell=True)
    except:
        log('plumgrid-sigmund couldn\'t be started!')
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def import_key(keyid):
    key = keyid.strip()
    if (key.startswith('-----BEGIN PGP PUBLIC KEY BLOCK-----') and
            key.endswith('-----END PGP PUBLIC KEY BLOCK-----')):
        juju_log("PGP key found (looks like ASCII Armor format)", level=DEBUG)
        juju_log("Importing ASCII Armor PGP key", level=DEBUG)
        with tempfile.NamedTemporaryFile() as keyfile:
            with open(keyfile.name, 'w') as fd:
                fd.write(key)
                fd.write("\n")

            cmd = ['apt-key', 'add', keyfile.name]
            try:
                subprocess.check_call(cmd)
            except subprocess.CalledProcessError:
                error_out("Error importing PGP key '%s'" % key)
    else:
        juju_log("PGP key found (looks like Radix64 format)", level=DEBUG)
        juju_log("Importing PGP key from keyserver", level=DEBUG)
        cmd = ['apt-key', 'adv', '--keyserver',
               'hkp://keyserver.ubuntu.com:80', '--recv-keys', key]
        try:
            subprocess.check_call(cmd)
        except subprocess.CalledProcessError:
            error_out("Error importing PGP key '%s'" % key)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def _git_update_requirements(venv, package_dir, reqs_dir):
    """
    Update from global requirements.

    Update an OpenStack git directory's requirements.txt and
    test-requirements.txt from global-requirements.txt.
    """
    orig_dir = os.getcwd()
    os.chdir(reqs_dir)
    python = os.path.join(venv, 'bin/python')
    cmd = [python, 'update.py', package_dir]
    try:
        subprocess.check_call(cmd)
    except subprocess.CalledProcessError:
        package = os.path.basename(package_dir)
        error_out("Error updating {} from "
                  "global-requirements.txt".format(package))
    os.chdir(orig_dir)
项目:Adafruit_Python_PureIO    作者:adafruit    | 项目源码 | 文件源码
def _clean_check(cmd, target):
    """
    Run the command to download target. If the command fails, clean up before
    re-raising the error.
    """
    try:
        subprocess.check_call(cmd)
    except subprocess.CalledProcessError:
        if os.access(target, os.F_OK):
            os.unlink(target)
        raise
项目:ISB-CGC-pipelines    作者:isb-cgc    | 项目源码 | 文件源码
def stopScheduler():
        try:
            subprocess.check_call(["sudo", "service", "supervisor", "stop"])

        except subprocess.CalledProcessError as e:
            print "ERROR: couldn't stop the scheduler (supervisor): {reason}".format(reason=e)
            exit(-1)

        try:
            subprocess.check_call(["sudo", "service", "rabbitmq-server", "stop"])

        except subprocess.CalledProcessError as e:
            print "ERROR: couldn't stop the scheduler (rabbitmq): {reason}".format(reason=e)
            exit(-1)

        print "Scheduler stopped successfully!"
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def sed(filename, before, after, flags='g'):
    """
    Search and replaces the given pattern on filename.

    :param filename: relative or absolute file path.
    :param before: expression to be replaced (see 'man sed')
    :param after: expression to replace with (see 'man sed')
    :param flags: sed-compatible regex flags in example, to make
    the  search and replace case insensitive, specify ``flags="i"``.
    The ``g`` flag is always specified regardless, so you do not
    need to remember to include it when overriding this parameter.
    :returns: If the sed command exit code was zero then return,
    otherwise raise CalledProcessError.
    """
    expression = r's/{0}/{1}/{2}'.format(before,
                                         after, flags)

    return subprocess.check_call(["sed", "-i", "-r", "-e",
                                  expression,
                                  os.path.expanduser(filename)])
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def add_group(group_name, system_group=False):
    """Add a group to the system"""
    try:
        group_info = grp.getgrnam(group_name)
        log('group {0} already exists!'.format(group_name))
    except KeyError:
        log('creating group {0}'.format(group_name))
        cmd = ['addgroup']
        if system_group:
            cmd.append('--system')
        else:
            cmd.extend([
                '--group',
            ])
        cmd.append(group_name)
        subprocess.check_call(cmd)
        group_info = grp.getgrnam(group_name)
    return group_info
项目:BIO309_Spring2017    作者:ianmisner    | 项目源码 | 文件源码
def signalp():
    singleline()
    command = ("signalp3.0 -t euk -short -m hmm " + path + "singleline.fasta | grep ' S ' > " + path + "signalpOUT.txt")
    print "\nRunning SignalP"
    signalpRUN = subprocess.check_call([command], shell=True)
    print "SignalP Complete"

    # Generate the list of sequences with siganal peptides using the mature sequences

    print "\nCreating SignalP protein list"
    command_list = ("cut -d ' ' -f 1 " + path + "signalpOUT.txt")
    file_out4 = open(path + "goodlistSigP.txt", "w")
    tab = subprocess.check_call([command_list], stdout=file_out4, shell=True)
    file_out4.close()

# This function creates a fasta file containing the complete sequences with signal peptides
项目:BIO309_Spring2017    作者:ianmisner    | 项目源码 | 文件源码
def tmhmm():
    command = ("tmhmm -short " + path + "signalP_pass.fasta")
    file_out = open(path + "tmhmmOUT.txt", "w")
    print "\nRunning tmhmm on mature signalp sequences only"
    tmhmmRUN = subprocess.check_call([command], stdout=file_out, shell=True)
    file_out.close()
    print "tmhmm complete"
    print "\nIdentifying sequences without tm regions."
    # This section of code parses the output from tmhmm and collects fastas with no TM regions
    openfile = open(path + "tmhmmOUT.txt", "r")
    file_out2 = open(path + "tmhmmGoodlist.txt", "a")
    for line in openfile:
        if "\tPredHel=0\t" in line:
            goodname = line.partition('\t')[0] + '\n'
            file_out2.write(goodname)
    openfile.close()
    file_out2.close()

#This function uses targetp to verify the destination of the signal peptide
#NOTE for plant networks use -P over -N
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def try_initialize_swauth():
    if is_leader() and config('auth-type') == 'swauth':
        if leader_get('swauth-init') is not True:
            try:
                admin_key = config('swauth-admin-key')
                if admin_key == '' or admin_key is None:
                    admin_key = leader_get('swauth-admin-key')
                    if admin_key is None:
                        admin_key = uuid.uuid4()
                leader_set({'swauth-admin-key': admin_key})

                bind_port = config('bind-port')
                bind_port = determine_api_port(bind_port, singlenode_mode=True)
                subprocess.check_call([
                    'swauth-prep',
                    '-A',
                    'http://localhost:{}/auth'.format(bind_port),
                    '-K', admin_key])
                leader_set({'swauth-init': True})
            except subprocess.CalledProcessError:
                log("had a problem initializing swauth!")
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def sed(filename, before, after, flags='g'):
    """
    Search and replaces the given pattern on filename.

    :param filename: relative or absolute file path.
    :param before: expression to be replaced (see 'man sed')
    :param after: expression to replace with (see 'man sed')
    :param flags: sed-compatible regex flags in example, to make
    the  search and replace case insensitive, specify ``flags="i"``.
    The ``g`` flag is always specified regardless, so you do not
    need to remember to include it when overriding this parameter.
    :returns: If the sed command exit code was zero then return,
    otherwise raise CalledProcessError.
    """
    expression = r's/{0}/{1}/{2}'.format(before,
                                         after, flags)

    return subprocess.check_call(["sed", "-i", "-r", "-e",
                                  expression,
                                  os.path.expanduser(filename)])
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def add_metric(*args, **kwargs):
    """Add metric values. Values may be expressed with keyword arguments. For
    metric names containing dashes, these may be expressed as one or more
    'key=value' positional arguments. May only be called from the collect-metrics
    hook."""
    _args = ['add-metric']
    _kvpairs = []
    _kvpairs.extend(args)
    _kvpairs.extend(['{}={}'.format(k, v) for k, v in kwargs.items()])
    _args.extend(sorted(_kvpairs))
    try:
        subprocess.check_call(_args)
        return
    except EnvironmentError as e:
        if e.errno != errno.ENOENT:
            raise
    log_message = 'add-metric failed: {}'.format(' '.join(_kvpairs))
    log(log_message, level='INFO')
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def _git_update_requirements(venv, package_dir, reqs_dir):
    """
    Update from global requirements.

    Update an OpenStack git directory's requirements.txt and
    test-requirements.txt from global-requirements.txt.
    """
    orig_dir = os.getcwd()
    os.chdir(reqs_dir)
    python = os.path.join(venv, 'bin/python')
    cmd = [python, 'update.py', package_dir]
    try:
        subprocess.check_call(cmd)
    except subprocess.CalledProcessError:
        package = os.path.basename(package_dir)
        error_out("Error updating {} from "
                  "global-requirements.txt".format(package))
    os.chdir(orig_dir)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def sed(filename, before, after, flags='g'):
    """
    Search and replaces the given pattern on filename.

    :param filename: relative or absolute file path.
    :param before: expression to be replaced (see 'man sed')
    :param after: expression to replace with (see 'man sed')
    :param flags: sed-compatible regex flags in example, to make
    the  search and replace case insensitive, specify ``flags="i"``.
    The ``g`` flag is always specified regardless, so you do not
    need to remember to include it when overriding this parameter.
    :returns: If the sed command exit code was zero then return,
    otherwise raise CalledProcessError.
    """
    expression = r's/{0}/{1}/{2}'.format(before,
                                         after, flags)

    return subprocess.check_call(["sed", "-i", "-r", "-e",
                                  expression,
                                  os.path.expanduser(filename)])
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def add_metric(*args, **kwargs):
    """Add metric values. Values may be expressed with keyword arguments. For
    metric names containing dashes, these may be expressed as one or more
    'key=value' positional arguments. May only be called from the collect-metrics
    hook."""
    _args = ['add-metric']
    _kvpairs = []
    _kvpairs.extend(args)
    _kvpairs.extend(['{}={}'.format(k, v) for k, v in kwargs.items()])
    _args.extend(sorted(_kvpairs))
    try:
        subprocess.check_call(_args)
        return
    except EnvironmentError as e:
        if e.errno != errno.ENOENT:
            raise
    log_message = 'add-metric failed: {}'.format(' '.join(_kvpairs))
    log(log_message, level='INFO')
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def _git_update_requirements(venv, package_dir, reqs_dir):
    """
    Update from global requirements.

    Update an OpenStack git directory's requirements.txt and
    test-requirements.txt from global-requirements.txt.
    """
    orig_dir = os.getcwd()
    os.chdir(reqs_dir)
    python = os.path.join(venv, 'bin/python')
    cmd = [python, 'update.py', package_dir]
    try:
        subprocess.check_call(cmd)
    except subprocess.CalledProcessError:
        package = os.path.basename(package_dir)
        error_out("Error updating {} from "
                  "global-requirements.txt".format(package))
    os.chdir(orig_dir)
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def index_reference(self, in_fasta_fn, in_gtf_fn, num_threads=1, sa_sparse_d=None, sa_index_n_bases=None,
                        chr_bin_n_bits=None, limit_ram=None):
        if os.path.exists(self.reference_star_path):
            raise Exception('STAR reference path %s already exists' % self.reference_star_path)

        os.mkdir(self.reference_star_path)

        args = ['STAR', '--runMode', 'genomeGenerate', '--genomeDir', self.reference_star_path,
                '--runThreadN', str(num_threads), '--genomeFastaFiles', in_fasta_fn,
                '--sjdbGTFfile', in_gtf_fn]
        if limit_ram is not None:
            args += ['--limitGenomeGenerateRAM', str(limit_ram)]
        if sa_sparse_d is not None:
            args += ['--genomeSAsparseD', str(sa_sparse_d)]
        if sa_index_n_bases is not None:
            args += ['--genomeSAindexNbases', str(sa_index_n_bases)]
        if chr_bin_n_bits is not None:
            args += ['--genomeChrBinNbits', str(chr_bin_n_bits)]

        subprocess.check_call(args)
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def _index_reference(self, index_path, **kwargs):
        """
        Generates a bowtie2 index for the specified reference file.

        Args:
            index_path (str): path to index prefix
            **kwargs: Any additional arguments to bowtie2-build may be included. Flags may have value set to None. Values
                are not validated. Parameters with hypens in name should be defined using underscores in place of hypens.

        Notes:
            Bowtie2 generates temporary files for indexing as a side-effect.

        Examples:
            kwargs can be specified as such: myBowtie2._index_reference(index_path, large_index=None, bmax=4)

        """

        additional_arguments = cr_utils.kwargs_to_command_line_options(set(), replace_chars={'_': '-'}, **kwargs)

        command = 'bowtie2-build %s %s %s' % (additional_arguments, self.reference_fasta_path, index_path)
        subprocess.check_call(command, shell=True)

        self.index_path = index_path
        self.indexed = True
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def main(args, outs):

    # Write read_chunk for consumption by Rust
    with open("chunk_args.json", "w") as f:
        json.dump(args.read_chunk, f)

    output_path = martian.make_path("")
    prefix = "fastq_chunk"
    chunk_reads_args = ['chunk_reads',  '--reads-per-fastq', str(args.reads_per_file), output_path, prefix, "--martian-args", "chunk_args.json"]
    print "running chunk reads: [%s]" % str(chunk_reads_args)
    subprocess.check_call(chunk_reads_args)

    with open(os.path.join(output_path, "read_chunks.json")) as f:
        chunk_results = json.load(f)

    outs.out_chunks = []

    # Write out a new chunk entry for each resulting chunk
    for chunk in chunk_results:
        print args.read_chunk
        chunk_copy = args.read_chunk.copy()
        print chunk_copy
        chunk_copy['read_chunks'] = chunk
        outs.out_chunks.append(chunk_copy)
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def get_bcl2fastq_v2(hostname):
    try:
        subprocess.check_call(["which", "bcl2fastq"])
        # Restore the LD_LIBRARY_PATH set aside by sourceme.bash/shell10x.
        # Required for some installations of bcl2fastq.
        new_environ = dict(os.environ)
        new_environ['LD_LIBRARY_PATH'] = os.environ.get('_TENX_LD_LIBRARY_PATH', '')
        output = subprocess.check_output(["bcl2fastq", "--version"], env=new_environ, stderr=subprocess.STDOUT)
        match = None
        for l in output.split("\n"):
            match = re.match("bcl2fastq v([0-9.]+)", l)
            if match is not None:
                return (match.groups()[0], None)

        return (None, "bcl2fastq version not recognized -- please check the output of bcl2fastq --version")
    except subprocess.CalledProcessError:
        msg = "On machine: %s, bcl2fastq not found on PATH." % hostname
        return (None, msg)
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def combine_vcfs(output_filename, input_vcf_filenames):
    tmp_filename = output_filename + ".tmp"

    for (i,fn) in enumerate(input_vcf_filenames):
        if i == 0:
            args = 'cat ' + fn
            subprocess.check_call(args + " > " + tmp_filename, shell=True)
        else:
            args = 'grep -v "^#" ' + fn
            ret = subprocess.call(args + " >> " + tmp_filename, shell=True)
            if ret == 2:
                raise Exception("grep call failed: " + args)

    # Sort and index the files
    tk_tabix.sort_vcf(tmp_filename, output_filename)
    tk_tabix.index_vcf(output_filename)

    os.remove(tmp_filename)
项目:acbs    作者:AOSC-Dev    | 项目源码 | 文件源码
def start_ab3(tmp_dir_loc, repo_dir, pkg_info, rm_abdir=False):
    start_time = int(time.time())
    os.chdir(tmp_dir_loc)
    if not copy_abd(tmp_dir_loc, repo_dir, pkg_info):
        return False
    # For logging support: ptyprocess.PtyProcessUnicode.spawn(['autobuild'])
    shadow_defines_loc = os.path.abspath(os.path.curdir)
    if not parser_pass_through(pkg_info, shadow_defines_loc):
        return False
    try:
        subprocess.check_call(['autobuild'])
    except:
        return False
    time_span = int(time.time()) - start_time
    print('>>>>>>>>>>>>>>>>>> Time for building\033[36m {} \033[0m:\033[36m {} \033[0mseconds'.format(
        pkg_info['NAME'], time_span))
    if rm_abdir is True:
        shutil.rmtree(os.path.abspath(os.path.curdir) + '/autobuild/')
    # Will get better display later
    return True
项目:acbs    作者:AOSC-Dev    | 项目源码 | 文件源码
def aria_get(url, threads=3, output=None):
    if os.path.exists(output) and not os.path.exists(output+'.aria2'):
        return
    aria_cmd = ['aria2c', '--max-connection-per-server={}'.format(threads), url, '--auto-file-renaming=false']  # ,'--check-certificate=false'
    if output is not None:
        aria_cmd.insert(2, '-d')
        aria_cmd.insert(3, dump_loc)
        aria_cmd.insert(4, '-o')
        aria_cmd.insert(5, output.split('/')[-1])
    try:
        subprocess.check_call(aria_cmd)
    except KeyboardInterrupt:
        raise KeyboardInterrupt()
    except:
        raise AssertionError('Failed to fetch source with Aria2!')
    return
项目:rain    作者:scizzorz    | 项目源码 | 文件源码
def compile_so(libs):
  # I don't know how else to find these .so files other than just asking clang
  # to make a .so file out of all of them

  clang = os.getenv('CLANG', 'clang')

  tempdir = tempfile.gettempdir()
  libname = '.'.join(sorted(libs))
  target = join(tempdir, 'lib' + libname + '.so')

  if not os.path.exists(target):
    libs = ['-l' + lib for lib in libs]
    flags = ['-shared']

    cmd = [clang, '-o', target] + flags + libs
    subprocess.check_call(cmd)

  return target
项目:rain    作者:scizzorz    | 项目源码 | 文件源码
def share(self):
    '''Compile a single Rain file into a shared object file.'''
    self.build()

    if self.compiled:
      return
    self.compiled = True

    self.compile_links()

    with self.okay('sharing'):
      target = self.target or self.qname + '.so'
      clang = os.getenv('CLANG', 'clang')
      flags = ['-O2', '-shared', '-fPIC']
      cmd = [clang, '-o', target, self.ll] + flags

      self.vprint('{:>10} {}', 'target', X(target, 'yellow'))
      self.vprint('{:>10} {}', 'flags', X('  '.join(flags), 'yellow'))
      self.vprint('{:>10} {}', 'src', X(self.ll, 'yellow'))

      subprocess.check_call(cmd)
项目:benchmarks    作者:tensorflow    | 项目源码 | 文件源码
def CreatePods(pod_name, yaml_file):
  """Creates pods based on the given kubernetes config.

  Args:
    pod_name: 'name-prefix' selector for the pods.
    yaml_file: kubernetes yaml config.

  Raises:
    TimeoutError: if jobs didn't come up for a long time.
  """
  command = [_KUBECTL, 'create', '--filename=%s' % yaml_file]
  logging.info('Creating pods: %s', subprocess.list2cmdline(command))
  subprocess.check_call(command)

  if not _WaitUntil(100, _GetPodNames, pod_name):
    raise TimeoutError(
        'Timed out waiting for %s pod to come up.' % pod_name)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def clone_helpers(work_dir, branch):
    dest = os.path.join(work_dir, 'charm-helpers')
    logging.info('Checking out %s to %s.' % (branch, dest))
    cmd = ['bzr', 'checkout', '--lightweight', branch, dest]
    subprocess.check_call(cmd)
    return dest
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def interface_exists(interface):
    '''
    Checks if interface exists on node.
    '''
    try:
        subprocess.check_call(['ip', 'link', 'show', interface],
                              stdout=open(os.devnull, 'w'),
                              stderr=subprocess.STDOUT)
    except subprocess.CalledProcessError:
        return False
    return True
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def _exec_cmd(cmd=None, error_msg='Command exited with ERRORs', fatal=False):
    '''
    Function to execute any bash command on the node.
    '''
    if cmd is None:
        log("No command specified")
    else:
        if fatal:
            subprocess.check_call(cmd)
        else:
            try:
                subprocess.check_call(cmd)
            except subprocess.CalledProcessError:
                log(error_msg)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def service_resume(service_name, init_dir="/etc/init",
                   initd_dir="/etc/init.d"):
    """Resume a system service.

    Reenable starting again at boot. Start the service"""
    upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
    sysv_file = os.path.join(initd_dir, service_name)
    if init_is_systemd():
        service('enable', service_name)
    elif os.path.exists(upstart_file):
        override_path = os.path.join(
            init_dir, '{}.override'.format(service_name))
        if os.path.exists(override_path):
            os.unlink(override_path)
    elif os.path.exists(sysv_file):
        subprocess.check_call(["update-rc.d", service_name, "enable"])
    else:
        raise ValueError(
            "Unable to detect {0} as SystemD, Upstart {1} or"
            " SysV {2}".format(
                service_name, upstart_file, sysv_file))

    started = service_running(service_name)
    if not started:
        started = service_start(service_name)
    return started
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def add_user_to_group(username, group):
    """Add a user to a group"""
    cmd = ['gpasswd', '-a', username, group]
    log("Adding user {} to group {}".format(username, group))
    subprocess.check_call(cmd)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def symlink(source, destination):
    """Create a symbolic link"""
    log("Symlinking {} as {}".format(source, destination))
    cmd = [
        'ln',
        '-sf',
        source,
        destination,
    ]
    subprocess.check_call(cmd)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def modprobe(module, persist=True):
    """Load a kernel module and configure for auto-load on reboot."""
    cmd = ['modprobe', module]

    log('Loading kernel module %s' % module, level=INFO)

    check_call(cmd)
    if persist:
        with open('/etc/modules', 'r+') as modules:
            if module not in modules.read():
                modules.write(module)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def rmmod(module, force=False):
    """Remove a module from the linux kernel"""
    cmd = ['rmmod']
    if force:
        cmd.append('-f')
    cmd.append(module)
    log('Removing kernel module %s' % module, level=INFO)
    return check_call(cmd)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def update_initramfs(version='all'):
    """Updates an initramfs image"""
    return check_call(["update-initramfs", "-k", version, "-u"])
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def relation_set(relation_id=None, relation_settings=None, **kwargs):
    """Set relation information for the current unit"""
    relation_settings = relation_settings if relation_settings else {}
    relation_cmd_line = ['relation-set']
    accepts_file = "--file" in subprocess.check_output(
        relation_cmd_line + ["--help"], universal_newlines=True)
    if relation_id is not None:
        relation_cmd_line.extend(('-r', relation_id))
    settings = relation_settings.copy()
    settings.update(kwargs)
    for key, value in settings.items():
        # Force value to be a string: it always should, but some call
        # sites pass in things like dicts or numbers.
        if value is not None:
            settings[key] = "{}".format(value)
    if accepts_file:
        # --file was introduced in Juju 1.23.2. Use it by default if
        # available, since otherwise we'll break if the relation data is
        # too big. Ideally we should tell relation-set to read the data from
        # stdin, but that feature is broken in 1.23.2: Bug #1454678.
        with tempfile.NamedTemporaryFile(delete=False) as settings_file:
            settings_file.write(yaml.safe_dump(settings).encode("utf-8"))
        subprocess.check_call(
            relation_cmd_line + ["--file", settings_file.name])
        os.remove(settings_file.name)
    else:
        for key, value in settings.items():
            if value is None:
                relation_cmd_line.append('{}='.format(key))
            else:
                relation_cmd_line.append('{}={}'.format(key, value))
        subprocess.check_call(relation_cmd_line)
    # Flush cache of any relation-gets for local unit
    flush(local_unit())
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def open_port(port, protocol="TCP"):
    """Open a service network port"""
    _args = ['open-port']
    _args.append('{}/{}'.format(port, protocol))
    subprocess.check_call(_args)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def action_set(values):
    """Sets the values to be returned after the action finishes"""
    cmd = ['action-set']
    for k, v in list(values.items()):
        cmd.append('{}={}'.format(k, v))
    subprocess.check_call(cmd)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def action_fail(message):
    """Sets the action status to failed and sets the error message.

    The results set by action_set are preserved."""
    subprocess.check_call(['action-fail', message])
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def leader_set(settings=None, **kwargs):
    """Juju leader set value(s)"""
    # Don't log secrets.
    # log("Juju leader-set '%s'" % (settings), level=DEBUG)
    cmd = ['leader-set']
    settings = settings or {}
    settings.update(kwargs)
    for k, v in settings.items():
        if v is None:
            cmd.append('{}='.format(k))
        else:
            cmd.append('{}={}'.format(k, v))
    subprocess.check_call(cmd)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def payload_register(ptype, klass, pid):
    """ is used while a hook is running to let Juju know that a
        payload has been started."""
    cmd = ['payload-register']
    for x in [ptype, klass, pid]:
        cmd.append(x)
    subprocess.check_call(cmd)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def payload_unregister(klass, pid):
    """ is used while a hook is running to let Juju know
    that a payload has been manually stopped. The <class> and <id> provided
    must match a payload that has been previously registered with juju using
    payload-register."""
    cmd = ['payload-unregister']
    for x in [klass, pid]:
        cmd.append(x)
    subprocess.check_call(cmd)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def pip_install(package, fatal=False, upgrade=False, venv=None, **options):
    """Install a python package"""
    if venv:
        venv_python = os.path.join(venv, 'bin/pip')
        command = [venv_python, "install"]
    else:
        command = ["install"]

    available_options = ('proxy', 'src', 'log', 'index-url', )
    for option in parse_options(options, available_options):
        command.append(option)

    if upgrade:
        command.append('--upgrade')

    if isinstance(package, list):
        command.extend(package)
    else:
        command.append(package)

    log("Installing {} package with options: {}".format(package,
                                                        command))
    if venv:
        subprocess.check_call(command)
    else:
        pip_execute(command)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def pip_create_virtualenv(path=None):
    """Create an isolated Python environment."""
    apt_install('python-virtualenv')

    if path:
        venv_path = path
    else:
        venv_path = os.path.join(charm_dir(), 'venv')

    if not os.path.exists(venv_path):
        subprocess.check_call(['virtualenv', venv_path])
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def install_ca_cert(ca_cert):
    if ca_cert:
        with open('/usr/local/share/ca-certificates/keystone_juju_ca_cert.crt',
                  'w') as crt:
            crt.write(ca_cert)
        subprocess.check_call(['update-ca-certificates', '--fresh'])
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def add_bridge(name, datapath_type=None):
    ''' Add the named bridge to openvswitch '''
    log('Creating bridge {}'.format(name))
    cmd = ["ovs-vsctl", "--", "--may-exist", "add-br", name]
    if datapath_type is not None:
        cmd += ['--', 'set', 'bridge', name,
                'datapath_type={}'.format(datapath_type)]
    subprocess.check_call(cmd)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def del_bridge(name):
    ''' Delete the named bridge from openvswitch '''
    log('Deleting bridge {}'.format(name))
    subprocess.check_call(["ovs-vsctl", "--", "--if-exists", "del-br", name])
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def del_bridge_port(name, port):
    ''' Delete a port from the named openvswitch bridge '''
    log('Deleting port {} from bridge {}'.format(port, name))
    subprocess.check_call(["ovs-vsctl", "--", "--if-exists", "del-port",
                           name, port])
    subprocess.check_call(["ip", "link", "set", port, "down"])
    subprocess.check_call(["ip", "link", "set", port, "promisc", "off"])
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def set_manager(manager):
    ''' Set the controller for the local openvswitch '''
    log('Setting manager for local ovs to {}'.format(manager))
    subprocess.check_call(['ovs-vsctl', 'set-manager',
                           'ssl:{}'.format(manager)])