Python subprocess 模块,run() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用subprocess.run()

项目:pygrunt    作者:elementbound    | 项目源码 | 文件源码
def preprocess_source(self, in_file, additional_args=[]):
        import subprocess

        self._args.extend(self._build_compiler_flags())
        self._args.extend(additional_args)

        result = subprocess.run(self._args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)

        if result.returncode == 0:
            return result.stdout
        else:
            if result.stderr:
                Style.error('Preprocess failed: ')
                print(result.stderr)

            return ''
项目:docklet    作者:unias    | 项目源码 | 文件源码
def del_addr(linkname, address):
        try:
            subprocess.run(['ip', 'address', 'del', address, 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
            return [True, str(linkname)]
        except subprocess.CalledProcessError as suberror:
            return [False, "delete address failed : %s" % suberror.stdout.decode('utf-8')]


# ovs-vsctl list-br
# ovs-vsctl br-exists <Bridge>
# ovs-vsctl add-br <Bridge>
# ovs-vsctl del-br <Bridge>
# ovs-vsctl list-ports <Bridge>
# ovs-vsctl del-port <Bridge> <Port>
# ovs-vsctl add-port <Bridge> <Port> -- set interface <Port> type=gre options:remote_ip=<RemoteIP>
# ovs-vsctl add-port <Bridge> <Port> tag=<ID> -- set interface <Port> type=internal
# ovs-vsctl port-to-br <Port>
# ovs-vsctl set Port <Port> tag=<ID>
# ovs-vsctl clear Port <Port> tag
项目:kubernaut    作者:datawire    | 项目源码 | 文件源码
def build_package(builder_image, package_type, version, out_dir, dependencies):
    """
    Build a deb or RPM package using a fpm-within-docker Docker image.

    :param package_type str: "rpm" or "deb".
    :param version str: The package version.
    :param out_dir Path: Directory where package will be output.
    :param dependencies list: package names the resulting package should depend
        on.
    """
    run([
        "docker", "run", "--rm", "-e", "PACKAGE_VERSION=" + version,
        "-e", "PACKAGE_TYPE=" + package_type,
        "-v", "{}:/build-inside:rw".format(THIS_DIRECTORY),
        "-v", "{}:/source:rw".format(THIS_DIRECTORY.parent),
        "-v", str(out_dir) + ":/out", "-w", "/build-inside", builder_image,
        "/build-inside/build-package.sh", *dependencies
    ],
        check=True)
项目:err-fabric    作者:netquity    | 项目源码 | 文件源码
def execute_task(host, tasks):
        """Call Fabric to execute tasks against a host

        Return: CompletedProcess instance
        """
        # TODO: add support for groups, multiple hosts
        # TODO: add support for providing input data from team files
        return subprocess.run(
            [
                PYTHON2_PATH,
                FABRIC_PATH,
                '--abort-on-prompts',
                '--hosts=%s' % host,
                '--fabfile=%s' % FABFILE_PATH,
                *tasks,
            ],
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,  # Combine out/err into stdout; stderr will be None
            universal_newlines=True,
            check=True,
        )
项目:foremast    作者:gogoair    | 项目源码 | 文件源码
def _upload_artifacts_to_version(self):
        """Recursively upload directory contents to S3."""
        if not os.listdir(self.artifact_path) or not self.artifact_path:
            raise S3ArtifactNotFound

        uploaded = False
        if self.s3props.get("content_metadata"):
            LOG.info("Uploading in multiple parts to set metadata")
            uploaded = self.content_metadata_uploads()

        if not uploaded:
            cmd = 'aws s3 sync {} {} --delete --exact-timestamps --profile {}'.format(self.artifact_path,
                                                                                      self.s3_version_uri, self.env)
            result = subprocess.run(cmd, check=True, shell=True, stdout=subprocess.PIPE)
            LOG.debug("Upload Command Ouput: %s", result.stdout)

        LOG.info("Uploaded artifacts to %s bucket", self.bucket)
项目:foremast    作者:gogoair    | 项目源码 | 文件源码
def _sync_to_uri(self, uri):
        """Copy and sync versioned directory to uri in S3.

        Args:
            uri (str): S3 URI to sync version to.
        """
        cmd_cp = 'aws s3 cp {} {} --recursive --profile {}'.format(self.s3_version_uri, uri, self.env)
        # AWS CLI sync does not work as expected bucket to bucket with exact timestamp sync.
        cmd_sync = 'aws s3 sync {} {} --delete --exact-timestamps --profile {}'.format(
            self.s3_version_uri, uri, self.env)

        cp_result = subprocess.run(cmd_cp, check=True, shell=True, stdout=subprocess.PIPE)
        LOG.debug("Copy to %s before sync output: %s", uri, cp_result.stdout)
        LOG.info("Copied version %s to %s", self.version, uri)

        sync_result = subprocess.run(cmd_sync, check=True, shell=True, stdout=subprocess.PIPE)
        LOG.debug("Sync to %s command output: %s", uri, sync_result.stdout)
        LOG.info("Synced version %s to %s", self.version, uri)
项目:manubot    作者:greenelab    | 项目源码 | 文件源码
def test_example_manuscript(manuscript):
    """
    Test command line execution of manubot to build an example manuscript.
    """
    manuscript_dir = directory.joinpath('manuscripts', manuscript)
    args = [
        'manubot',
        '--log-level', 'INFO',
        '--content-directory', manuscript_dir.joinpath('content'),
        '--output-directory', manuscript_dir.joinpath('output'),
    ]
    if manuscript == 'variables':
        args.extend([
            '--template-variables-path',
            manuscript_dir.joinpath('content/template-variables.json'),
        ])
    process = subprocess.run(
        args,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )
    print(process.args)
    print(process.stderr.decode())
    assert process.returncode == 0
项目:hlsclt    作者:benjmarshall    | 项目源码 | 文件源码
def do_start_build_stuff(ctx):
    config = ctx.obj.config
    solution_num = ctx.obj.solution_num
    try:
        file = click.open_file("run_hls.tcl","w")
        file.write("open_project " + config["project_name"] + "\n")
        file.write("set_top " + config["top_level_function_name"] + "\n")
        for src_file in config["src_files"]:
            file.write("add_files " + config["src_dir_name"] + "/" + src_file + "\n")
        for tb_file in config["tb_files"]:
            file.write("add_files -tb " + config["tb_dir_name"] + "/" + tb_file + "\n")
        if ctx.params['keep']:
            file.write("open_solution -reset \"solution" + str(solution_num) + "\"" + "\n")
        else:
            file.write("open_solution \"solution" + str(solution_num) + "\"" + "\n")
        file.write("set_part \{" + config["part_name"] + "\}" + "\n")
        file.write("create_clock -period " + config["clock_period"] + " -name default" + "\n")
        return file
    except OSError:
        click.echo("Woah! Couldn't create a Tcl run file in the current folder!")
        raise click.Abort()

# Function to write a default build into the HLS Tcl build script.
项目:hlsclt    作者:benjmarshall    | 项目源码 | 文件源码
def build_end_callback(ctx,sub_command_returns,keep,report):
    # Catch the case where no subcommands have been issued and offer a default build
    if not sub_command_returns:
        if click.confirm("No build stages specified, would you like to run a default sequence using all the build stages?", abort=True):
            do_default_build(ctx)
    ctx.obj.file.write("exit" + "\n")
    ctx.obj.file.close()
    # Call the Vivado HLS process
    hls_processs = subprocess.run(["vivado_hls", "-f", "run_hls.tcl"])
    # Check return status of the HLS process.
    if hls_processs.returncode < 0:
        raise click.Abort()
    elif hls_processs.returncode > 0:
        click.echo("Warning: HLS Process returned an error, skipping report opening!")
        raise click.Abort()
    else:
        do_end_build_stuff(ctx,sub_command_returns,report)

# csim subcommand
项目:myDotFiles    作者:GuidoFe    | 项目源码 | 文件源码
def volume_plugin(colors):
    path=os.path.realpath(__file__)
    path=os.path.dirname(path)
    p=subprocess.run(['sh',path+"/volume_level.sh"], stdout=subprocess.PIPE)
    string=''
    for c in str(p.stdout):
        if c.isnumeric():
            string+=c
    level=int(string)
    p=subprocess.run(['sh', path+"/volume_muted.sh"], stdout=subprocess.PIPE)
    if str(p.stdout)[2]=='y':
        muted=True
    else:
        muted=False
    string='%{F'+colors["lwhite"]+'}'
    if level>0 and not muted:
        if level>=50:
            string+='\uf028' 
        else:
            string+='\uf027'
    else:
        string+='\uf026'
    string+="  %{F"+colors["lwhite"]+"}%3.0f"%level+'%'
    return string
项目:doctr    作者:drdoctr    | 项目源码 | 文件源码
def generate_ssh_key(note, keypath='github_deploy_key'):
    """
    Generates an SSH deploy public and private key.

    Returns the public key as a str.
    """
    p = subprocess.run(['ssh-keygen', '-t', 'rsa', '-b', '4096', '-C', note,
        '-f', keypath, '-N', ''])

    if p.returncode:
        raise RuntimeError("SSH key generation failed")

    with open(keypath + ".pub") as f:
        key = f.read()

    os.remove(keypath + ".pub")

    return key
项目:doctr    作者:drdoctr    | 项目源码 | 文件源码
def guess_github_repo():
    """
    Guesses the github repo for the current directory

    Returns False if no guess can be made.
    """
    p = subprocess.run(['git', 'ls-remote', '--get-url', 'origin'],
        stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False)
    if p.stderr or p.returncode:
        return False

    url = p.stdout.decode('utf-8').strip()
    m = GIT_URL.fullmatch(url)
    if not m:
        return False
    return m.group(1)
项目:doctr    作者:drdoctr    | 项目源码 | 文件源码
def checkout_deploy_branch(deploy_branch, canpush=True):
    """
    Checkout the deploy branch, creating it if it doesn't exist.
    """
    # Create an empty branch with .nojekyll if it doesn't already exist
    create_deploy_branch(deploy_branch, push=canpush)
    remote_branch = "doctr_remote/{}".format(deploy_branch)
    print("Checking out doctr working branch tracking", remote_branch)
    clear_working_branch()
    # If gh-pages doesn't exist the above create_deploy_branch() will create
    # it we can push, but if we can't, it won't and the --track would fail.
    if run(['git', 'rev-parse', '--verify', remote_branch], exit=False) == 0:
        extra_args = ['--track', remote_branch]
    else:
        extra_args = []
    run(['git', 'checkout', '-b', DOCTR_WORKING_BRANCH] + extra_args)
    print("Done")

    return canpush
项目:doctr    作者:drdoctr    | 项目源码 | 文件源码
def push_docs(deploy_branch='gh-pages', retries=3):
    """
    Push the changes to the branch named ``deploy_branch``.

    Assumes that :func:`setup_GitHub_push` has been run and returned True, and
    that :func:`commit_docs` has been run. Does not push anything if no changes
    were made.

    """

    code = 1
    while code and retries:
        print("Pulling")
        code = run(['git', 'pull', '-s', 'recursive', '-X', 'ours',
            'doctr_remote', deploy_branch], exit=False)
        print("Pushing commit")
        code = run(['git', 'push', '-q', 'doctr_remote',
            '{}:{}'.format(DOCTR_WORKING_BRANCH, deploy_branch)], exit=False)
        if code:
            retries -= 1
            print("Push failed, retrying")
            time.sleep(1)
        else:
            return
    sys.exit("Giving up...")
项目:python-everywhere    作者:wdv4758h    | 项目源码 | 文件源码
def run_cmd(cmd, quiet=False):
    if not quiet:
        logging.info('command: {}'.format(cmd))

    # use shlex to keep quoted substrings
    result = run(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
    stdout = result.stdout.strip().decode()
    stderr = result.stderr.strip().decode()

    if stdout and not quiet:
        logging.debug(stdout)

    if stderr and not quiet:
        logging.warning(stderr)

    return result.stdout.strip()
项目:zinc    作者:PressLabs    | 项目源码 | 文件源码
def meld(got, expected):
    if got == expected:
        return
    import inspect
    call_frame = inspect.getouterframes(inspect.currentframe(), 2)
    test_name = call_frame[1][3]
    from pprint import pformat
    import os
    from os import path
    os.makedirs(test_name, exist_ok=True)
    got_fn = path.join(test_name, 'got')
    expected_fn = path.join(test_name, 'expected')
    with open(got_fn, 'w') as got_f, open(expected_fn, 'w') as expected_f:
        got_f.write(pformat(got))
        expected_f.write(pformat(expected))
    import subprocess
    subprocess.run(['meld', got_fn, expected_fn])
项目:neoreader    作者:MaxwellBo    | 项目源码 | 文件源码
def call_say(self, txt: str, speed=None, pitch=None, literal=False):
        if self.get_option(self.Options.USE_ESPEAK):
            args = ["espeak"]
            if pitch:
                args += ["-p", str(pitch)]
            if speed:
                args += ["-s", str(speed)]
            if literal:
                txt = " ".join(txt)
            args.append(txt)
        else:
            args = ["say"]
            if pitch:
                txt = f"[[ pbas +{pitch}]] {txt}"
            if speed:
                args += ["-r", str(speed)]
            if literal:
                txt = f"[[ char LTRL ]] {txt}"
            args.append(txt)

        if self.enabled:
            logger.debug(f"Saying '{txt}'")
            subprocess.run(args)
项目:atoolbox    作者:liweitianux    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser(
        description="Backup system/data using dar and par2")
    parser.add_argument("-c", "--config", dest="config", required=True,
                        help="configuration file for dar and archive. " +
                        "NOTE: the backup archive will be placed under " +
                        "the same directory as this configuration file")
    parser.add_argument("-n", "--dry-run", dest="dry_run", action="store_true",
                        help="dry run, do not perform any action")
    parser.add_argument("-v", "--verbose", dest="verbose", action="store_true",
                        help="show verbose information")
    args = parser.parse_args()

    if args.verbose:
        logging.basicConfig(level=logging.INFO)

    settings = DarSettings(args.config, verbose=args.verbose,
                           dry_run=args.dry_run)
    dar = DarBackup(settings)
    dar.run(dry_run=args.dry_run)
项目:mooq    作者:jeremyarr    | 项目源码 | 文件源码
def run(self):
        '''
        Restarts the RabbitMQ broker using a method derived from the TEST_DISTRIBUTION
        environmental variable.


        If TEST_DISTRIBUTION=="arch", will try to restart rabbitmq using the linux ``systemctl``
        command.

        If TEST_DISTRIBUTION=="ubuntu", will try to restart rabbitmq using the linux ``service``
        command.

        Will wait for 20 seconds after restarting before returning.

        :raises ValueError: if TEST_DISTRIBUTION environmental variable not found

        .. note:: the user who invokes this method will likely require sudo access to the linux commands.
            This can be provided by editing the sudoers file.
        '''

        await self.loop.run_in_executor(None, self._run)
        await asyncio.sleep(20)
项目:NordVPN-NetworkManager    作者:Chadsr    | 项目源码 | 文件源码
def get_vpn_connections():
    try:
        output = subprocess.run(['nmcli', '--mode', 'tabular', '--terse', '--fields', 'TYPE,NAME', 'connection', 'show'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        output.check_returncode()

        lines = output.stdout.decode('utf-8').split('\n')

        vpn_connections = []
        for line in lines:
            if line:
                elements = line.strip().split(':')

                if (elements[0] == 'vpn'):
                    vpn_connections.append(elements[1])

        return vpn_connections

    except subprocess.CalledProcessError:
        error = utils.format_std_string(output.stderr)
        logger.error(error)
        return False
项目:NordVPN-NetworkManager    作者:Chadsr    | 项目源码 | 文件源码
def get_interfaces(wifi=True, ethernet=True):
    try:
        output = subprocess.run(['nmcli', '--mode', 'tabular', '--terse', '--fields', 'TYPE,DEVICE', 'device', 'status'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        output.check_returncode()

        lines = output.stdout.decode('utf-8').split('\n')

        interfaces = []
        for line in lines:
            if line:
                elements = line.strip().split(':')

                if (wifi and elements[0] == 'wifi') or (ethernet and elements[0] == 'ethernet'):
                    interfaces.append(elements[1])

        return interfaces

    except subprocess.CalledProcessError:
        error = utils.format_std_string(output.stderr)
        logger.error(error)
        return False

    except Exception as ex:
        logger.error(ex)
        return False
项目:NordVPN-NetworkManager    作者:Chadsr    | 项目源码 | 文件源码
def get_num_processes(num_servers):
    # Since each process is not resource heavy and simply takes time waiting for pings, maximise the number of processes (within constraints of the current configuration)

    # Maximum open file descriptors of current configuration
    soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE)

    # Find how many file descriptors are already in use by the parent process
    ppid = os.getppid()
    used_file_descriptors = int(subprocess.run('ls -l /proc/' + str(ppid) + '/fd | wc -l', shell=True, stdout=subprocess.PIPE).stdout.decode('utf-8'))

    # Max processes is the number of file descriptors left, before the sof limit (configuration maximum) is reached
    max_processes = int((soft_limit - used_file_descriptors) / 2)

    if num_servers > max_processes:
        return max_processes
    else:
        return num_servers
项目:website-fingerprinting    作者:AxelGoetz    | 项目源码 | 文件源码
def extract_all_features(save_dir, data_dir=DATA_DIR, extension=".cell"):
    from naive_bayes import extract_nb_features
    from random_forest import extract_rf_features
    from svc1 import extract_svc1_features
    from svc2 import extract_svc2_features
    import subprocess

    create_dir_if_not_exists(save_dir + '/knn_cells/')
    subprocess.run([
        'go', 'run', dirname + '/kNN.go', '-folder', data_dir + '/',
        '-new_path', save_dir + '/knn_cells/', '-extension', extension]
    )

    # extract_features(extract_nb_features, save_dir + '/nb_cells', data_dir=data_dir, extension=extension, model_name="naive bayes")
    extract_features(extract_rf_features, save_dir + '/rf_cells', data_dir=data_dir, extension=extension, model_name="random forest")
    extract_features(extract_svc1_features, save_dir + '/svc1_cells', data_dir=data_dir, extension=extension, model_name="svc1")
    extract_features(extract_svc2_features, save_dir + '/svc2_cells', data_dir=data_dir, extension=extension, model_name="svc2")

    stdout.write("Finished extracting features\n")
项目:scm-workbench    作者:barry-scott    | 项目源码 | 文件源码
def fixRPath( self, filename ):
        all_rpaths = self.getRPaths( filename )

        all_replacements = []
        print( 'Checking RPATH for %s' % (filename,) )
        for rpath in all_rpaths:
            print( '    Looking at RPATH %s' % (rpath,) )
            for all_orig_rpaths, replacement in self.all_rpath_replacements:
                all_replacements.append( replacement )
                for orig in all_orig_rpaths:
                    if rpath == orig:
                        print( '    Need to replace %s with %s' % (rpath, replacement) )
                        subprocess.run( ['install_name_tool', '-rpath', rpath, replacement, filename], check=True )

        all_rpaths = self.getRPaths( filename )
        for rpath in all_rpaths:
            if rpath not in all_replacements:
                print( '    Delete unused rpath %s' % (rpath,) )
                subprocess.run( ['install_name_tool', '-delete_rpath', rpath, filename], check=True )
项目:scm-workbench    作者:barry-scott    | 项目源码 | 文件源码
def getRPaths( self, filename ):
        all_rpaths = []
        res = subprocess.run( ['otool', '-l', filename], stdout=subprocess.PIPE, universal_newlines=True )
        state = self.ST_IDLE
        for line in res.stdout.split('\n'):
            words = line.strip().split()
            if state == self.ST_IDLE:
                if words == ['cmd','LC_RPATH']:
                    state = self.ST_RPATH

            elif state == self.ST_RPATH:
                if words[0:1] == ['path']:
                    path = words[1]
                    all_rpaths.append( path )
                    state = self.ST_IDLE

        return all_rpaths
项目:scm-workbench    作者:barry-scott    | 项目源码 | 文件源码
def getDylibs( self, filename ):
        all_dylibs = []
        res = subprocess.run( ['otool', '-l', filename], stdout=subprocess.PIPE, universal_newlines=True )
        state = self.ST_IDLE
        for line in res.stdout.split('\n'):
            words = line.strip().split()
            if state == self.ST_IDLE:
                if words == ['cmd','LC_LOAD_DYLIB']:
                    state = self.ST_DYNLIB

            elif state == self.ST_DYNLIB:
                if words[0:1] == ['name']:
                    path = words[1]
                    all_dylibs.append( path )
                    state = self.ST_IDLE

        return all_dylibs
项目:HTSOHM-dev    作者:akaija    | 项目源码 | 文件源码
def write_raspa_file(filename, uuid, simulation_config):
    """Writes RASPA input file for calculating helium void fraction.

    Args:
        filename (str): path to input file.
        run_id (str): identification string for run.
        material_id (str): uuid for material.

    Writes RASPA input-file.

    """
    # Load simulation parameters from config
    values = {
            'NumberOfCycles'                : simulation_config['simulation_cycles'],
            'FrameworkName'                 : uuid}

    # Load template and replace values
    input_data = load_and_subs_template('helium_void_fraction.input', values)

    # Write simulation input-file
    with open(filename, "w") as raspa_input_file:
        raspa_input_file.write(input_data)
项目:HTSOHM-dev    作者:akaija    | 项目源码 | 文件源码
def write_raspa_file(filename, uuid, simulation_config):
    """Writes RASPA input file for calculating surface area.

    Args:
        filename (str): path to input file.
        run_id (str): identification string for run.
        material_id (str): uuid for material.

    Writes RASPA input-file.

    """
    # Load simulation parameters from config
    values = {
            'NumberOfCycles'                : simulation_config['simulation_cycles'],
            'FrameworkName'                 : uuid}

    # Load template and replace values
    input_data = load_and_subs_template('surface_area.input', values)

    # Write simulation input-file
    with open(filename, "w") as raspa_input_file:
        raspa_input_file.write(input_data)
项目:core-workflow    作者:python    | 项目源码 | 文件源码
def flush_git_rm_files():
    if git_rm_files:
        try:
            subprocess.run(["git", "rm", "-f", *git_rm_files], stdout=subprocess.PIPE, stderr=subprocess.PIPE).check_returncode()
        except subprocess.CalledProcessError:
            pass

        # clean up
        for path in git_rm_files:
            try:
                os.unlink(path)
            except FileNotFoundError:
                pass

        git_rm_files.clear()


# @subcommand
# def noop():
#     "Do-nothing command.  Used for blurb smoke-testing."
#     pass
项目:proxmox-tools    作者:FredHutch    | 项目源码 | 文件源码
def run_chef_knife(host):
    knife = "knife bootstrap --no-host-key-verify " \
        "--ssh-user root --ssh-identity-file %s/.ssh/id_rsa_prox " \
        "--environment=scicomp-env-compute " \
        '--server-url "https://chef.fhcrc.org/organizations/cit" ' \
        "--run-list 'role[cit-base]','role[scicomp-base]' " \
        "--node-name %s " \
        "%s" % (homedir,host,host)
    if host == 'hostname': 
        print('you can also execute this knife command manually:')
        print('************************************')
        print(knife)
        print('************************************')
    else:
        if os.path.exists('%s/.chef' % homedir):
            print('*** executing knife command:')
            print(knife)
            ret = subprocess.run(knife, shell=True)
        else:
            print ('chef/knife config dir %s/.chef does not exist.' % homedir)
项目:osm-wikidata    作者:EdwardBetts    | 项目源码 | 文件源码
def load_into_pgsql(self, capture_stderr=True):
        if not os.path.exists(self.overpass_filename):
            return 'no data from overpass to load with osm2pgsql'

        if os.stat(self.overpass_filename).st_size == 0:
            return 'no data from overpass to load with osm2pgsql'

        cmd = self.osm2pgsql_cmd()

        if not capture_stderr:
            p = subprocess.run(cmd,
                               env={'PGPASSWORD': current_app.config['DB_PASS']})
            return
        p = subprocess.run(cmd,
                           stderr=subprocess.PIPE,
                           env={'PGPASSWORD': current_app.config['DB_PASS']})
        if p.returncode != 0:
            if b'Out of memory' in p.stderr:
                return 'out of memory'
            else:
                return p.stderr.decode('utf-8')
项目:osm-wikidata    作者:EdwardBetts    | 项目源码 | 文件源码
def chunk(self):
        chunk_size = utils.calc_chunk_size(self.area_in_sq_km)
        chunks = self.chunk_n(chunk_size)

        print('chunk size:', chunk_size)

        files = []
        for num, chunk in enumerate(chunks):
            filename = self.chunk_filename(num, len(chunks))
            # print(num, q.count(), len(tags), filename, list(tags))
            full = os.path.join('overpass', filename)
            files.append(full)
            if os.path.exists(full):
                continue
            oql = self.oql_for_chunk(chunk, include_self=(num == 0))

            r = overpass.run_query_persistent(oql)
            if not r:
                print(oql)
            assert r
            open(full, 'wb').write(r.content)

        cmd = ['osmium', 'merge'] + files + ['-o', self.overpass_filename]
        print(' '.join(cmd))
        subprocess.run(cmd)
项目:pynnotator    作者:raonyguimaraes    | 项目源码 | 文件源码
def run(self):

        tstart = datetime.now()
        msg = "{} Starting validator: {}".format(tstart, self.vcf_file)

        self.log(msg)

        std = self.validate()

        tend = datetime.now()
        annotation_time = tend - tstart

        msg = "{} Finished validator, it took: {}".format(tend, annotation_time)
        self.log(msg)

        # print(tend, 'Finished validator, it took: ', annotation_time)

        return std

    # Validate vcf file with Vcftools
项目:pygrunt    作者:elementbound    | 项目源码 | 文件源码
def _get_gitinfo():
    import pygrunt.platform as platform
    import subprocess
    from pathlib import Path

    git = platform.current.find_executable('git')
    if git is None:
        # No git installed; assume we're on master
        return ('master', '')

    cwd = str(Path(__file__).parent)

    args = [git, 'rev-parse', '--abbrev-ref', 'HEAD']
    result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, cwd=cwd, universal_newlines=True)
    if result.returncode != 0:
        # Quietly return defaults on fail
        return ('master', '')

    branch = result.stdout

    args = [git, 'rev-parse', 'HEAD']
    result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, cwd=cwd, universal_newlines=True)
    if result.returncode != 0:
        # Quietly return defaults on fail
        return ('master', '')

    commit = result.stdout

    return (branch.strip(), commit.strip())
项目:pygrunt    作者:elementbound    | 项目源码 | 文件源码
def compile_object(self, in_file, out_file):
        import subprocess

        in_file = Path(in_file)
        out_file = Path(out_file)

        # Skip compile if RecompileStrategy says so
        # Since preprocess_source ( possibly used by recompile ) also modifies self._args,
        # we gotta back it up
        # TODO: Maybe use something more elegant than self._args?
        old_args = self._args
        if out_file.is_file():
            if not self.recompile.should_recompile(str(in_file)):
                # Style.info('Nothing to do with', in_file)
                return True
        self._args = old_args

        Path(out_file).parent.mkdir(parents=True, exist_ok=True)
        self._args.extend(self._build_compiler_flags())
        result = subprocess.run(self._args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)

        # TODO: do something useful with output
        if result.stdout:
            print(result.stdout)

        if result.stderr:
            print(result.stderr)

        return result.returncode == 0
项目:pygrunt    作者:elementbound    | 项目源码 | 文件源码
def link_executable(self, in_files, out_file):
        import subprocess

        Path(out_file).parent.mkdir(parents=True, exist_ok=True)
        self._args.extend(self._build_linker_flags())
        result = subprocess.run(self._args)
        return result.returncode == 0
项目:pygrunt    作者:elementbound    | 项目源码 | 文件源码
def link_library(self, in_files, out_file):
        import subprocess

        Path(out_file).parent.mkdir(parents=True, exist_ok=True)
        self._args.extend(self._build_linker_flags())
        result = subprocess.run(self._args)
        return result.returncode == 0
项目:MPIS    作者:KernelPanicBlog    | 项目源码 | 文件源码
def clear():
    subprocess.run(["clear"])
项目:MPIS    作者:KernelPanicBlog    | 项目源码 | 文件源码
def sleep(_time):
    """genera una pausa la cantidad de segundos indicados por _time
    Args:
        _time (int): tiempo de la pausa en segundos
    """
    subprocess.run(["sleep", str(_time)+"s"])
项目:docklet    作者:unias    | 项目源码 | 文件源码
def sys_run(command,check=False):
    Ret = subprocess.run(command, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, shell=True, check=check)
    return Ret
项目:docklet    作者:unias    | 项目源码 | 文件源码
def list_links():
        try:
            ret = subprocess.run(['ip', 'link', 'show'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
            links = ipcontrol.parse(ret.stdout.decode('utf-8'))
            return [True, list(links.keys())]
        except subprocess.CalledProcessError as suberror:
            return [False, "list links failed : %s" % suberror.stdout.decode('utf-8')]
项目:docklet    作者:unias    | 项目源码 | 文件源码
def link_exist(linkname):
        try:
            subprocess.run(['ip', 'link', 'show', 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
            return True
        except subprocess.CalledProcessError:
            return False
项目:docklet    作者:unias    | 项目源码 | 文件源码
def link_info(linkname):
        try:
            ret = subprocess.run(['ip', 'address', 'show', 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
            return [True, ipcontrol.parse(ret.stdout.decode('utf-8'))[str(linkname)]]
        except subprocess.CalledProcessError as suberror:
            return [False, "get link info failed : %s" % suberror.stdout.decode('utf-8')]
项目:docklet    作者:unias    | 项目源码 | 文件源码
def link_state(linkname):
        try:
            ret = subprocess.run(['ip', 'link', 'show', 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
            return [True, ipcontrol.parse(ret.stdout.decode('utf-8'))[str(linkname)]['state']]
        except subprocess.CalledProcessError as suberror:
            return [False, "get link state failed : %s" % suberror.stdout.decode('utf-8')]
项目:docklet    作者:unias    | 项目源码 | 文件源码
def down_link(linkname):
        try:
            subprocess.run(['ip', 'link', 'set', 'dev', str(linkname), 'down'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
            return [True, str(linkname)]
        except subprocess.CalledProcessError as suberror:
            return [False, "set link down failed : %s" % suberror.stdout.decode('utf-8')]
项目:docklet    作者:unias    | 项目源码 | 文件源码
def add_addr(linkname, address):
        try:
            subprocess.run(['ip', 'address', 'add', address, 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
            return [True, str(linkname)]
        except subprocess.CalledProcessError as suberror:
            return [False, "add address failed : %s" % suberror.stdout.decode('utf-8')]
项目:docklet    作者:unias    | 项目源码 | 文件源码
def list_bridges():
        try:
            ret = subprocess.run(['ovs-vsctl', 'list-br'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
            return [True, ret.stdout.decode('utf-8').split()]
        except subprocess.CalledProcessError as suberror:
            return [False, "list bridges failed : %s" % suberror.stdout.decode('utf-8')]
项目:docklet    作者:unias    | 项目源码 | 文件源码
def bridge_exist(bridge):
        try:
            subprocess.run(['ovs-vsctl', 'br-exists', str(bridge)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
            return True
        except subprocess.CalledProcessError:
            return False
项目:docklet    作者:unias    | 项目源码 | 文件源码
def add_bridge(bridge):
        try:
            subprocess.run(['ovs-vsctl', '--may-exist', 'add-br', str(bridge)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
            return [True, str(bridge)]
        except subprocess.CalledProcessError as suberror:
            return [False, "add bridge failed : %s" % suberror.stdout.decode('utf-8')]
项目:docklet    作者:unias    | 项目源码 | 文件源码
def del_bridge(bridge):
        try:
            subprocess.run(['ovs-vsctl', 'del-br', str(bridge)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
            return [True, str(bridge)]
        except subprocess.CalledProcessError as suberror:
            return [False, "del bridge failed : %s" % suberror.stdout.decode('utf-8')]