Python subprocess 模块,getstatusoutput() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用subprocess.getstatusoutput()

项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_getoutput(self):
        self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
        self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
                         (0, 'xyzzy'))

        # we use mkdtemp in the next line to create an empty directory
        # under our exclusive control; from that, we can invent a pathname
        # that we _know_ won't exist.  This is guaranteed to fail.
        dir = None
        try:
            dir = tempfile.mkdtemp()
            name = os.path.join(dir, "foo")

            status, output = subprocess.getstatusoutput('cat ' + name)
            self.assertNotEqual(status, 0)
        finally:
            if dir is not None:
                os.rmdir(dir)
项目:SlackBuilds    作者:montagdude    | 项目源码 | 文件源码
def __init__(self):
        """
        Determines whether pkg-config exists on this machine.
        """
        if sys.platform == 'win32':
            self.has_pkgconfig = False
        else:
            try:
                self.pkg_config = os.environ['PKG_CONFIG']
            except KeyError:
                self.pkg_config = 'pkg-config'

            self.set_pkgconfig_path()
            status, output = getstatusoutput(self.pkg_config + " --help")
            self.has_pkgconfig = (status == 0)
            if not self.has_pkgconfig:
                print("IMPORTANT WARNING:")
                print(
                    "    pkg-config is not installed.\n"
                    "    matplotlib may not be able to find some of its dependencies")
项目:SlackBuilds    作者:montagdude    | 项目源码 | 文件源码
def check(self):
        if sys.platform == 'win32':
            check_include_file(get_include_dirs(), 'ft2build.h', 'freetype')
            return 'Using unknown version found on system.'

        status, output = getstatusoutput("freetype-config --ftversion")
        if status == 0:
            version = output
        else:
            version = None

        # Early versions of freetype grep badly inside freetype-config,
        # so catch those cases. (tested with 2.5.3).
        if version is None or 'No such file or directory\ngrep:' in version:
            version = self.version_from_header()

        # pkg_config returns the libtool version rather than the
        # freetype version so we need to explicitly pass the version
        # to _check_for_pkg_config
        return self._check_for_pkg_config(
            'freetype2', 'ft2build.h',
            min_version='2.3', version=version)
项目:SlackBuilds    作者:montagdude    | 项目源码 | 文件源码
def check(self):
        if sys.platform == 'win32':
            check_include_file(get_include_dirs(), 'png.h', 'png')
            return 'Using unknown version found on system.'

        status, output = getstatusoutput("libpng-config --version")
        if status == 0:
            version = output
        else:
            version = None

        try:
            return self._check_for_pkg_config(
                'libpng', 'png.h',
                min_version='1.2', version=version)
        except CheckFailed as e:
            if has_include_file(get_include_dirs(), 'png.h'):
                return str(e) + ' Using unknown version found on system.'
            raise
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_getoutput(self):
        self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
        self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
                         (0, 'xyzzy'))

        # we use mkdtemp in the next line to create an empty directory
        # under our exclusive control; from that, we can invent a pathname
        # that we _know_ won't exist.  This is guaranteed to fail.
        dir = None
        try:
            dir = tempfile.mkdtemp()
            name = os.path.join(dir, "foo")

            status, output = subprocess.getstatusoutput('cat ' + name)
            self.assertNotEqual(status, 0)
        finally:
            if dir is not None:
                os.rmdir(dir)
项目:automatic-repo    作者:WZQ1397    | 项目源码 | 文件源码
def exec2():
    child.expect('#')
    if subprocess.getstatusoutput('id root >> /dev/null 2&1  && echo $?') != 0:
        __newpasswd = 'edong&1310'
        subprocess.getstatusoutput('useradd zach')
        run('passwd zach',events={'(?i)password:':__newpasswd})
        #TODO run EQUAL TO FOLLOW COMMIT!
        '''
        child.expect('password:')
        child.sendline()
        child.expect('password:')
        child.sendline(__newpasswd)
        '''
        child.expect('#')
        child.sendline('su - zach')
    child.expect('$')
    child.sendline('whomai')
项目:Spider    作者:poluo    | 项目源码 | 文件源码
def crawl():
    count = 0
    make_dir('./log')
    while True:
        count+=1
        status, res = subprocess.getstatusoutput('scrapy crawl news')
        if status == 0:
            print(res)
        else:
            print('crawl failed {}'.format(res))
        for file in os.listdir(os.getcwd()):
            if  os.path.isfile(file) and 'res_' in file:
                with open(file,'r') as fobj:
                    try:
                        res = json.load(fobj)
                    except Exception as e:
                        print(e)
                        res = None
                    if res:
                        shutil.copy(file,'./log/{}'.format(file))
                        insert_value(res)
                        print(res)
                os.remove(file)
        print('loop {} finished'.format(count))
        time.sleep(60*30)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_getoutput(self):
        self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
        self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
                         (0, 'xyzzy'))

        # we use mkdtemp in the next line to create an empty directory
        # under our exclusive control; from that, we can invent a pathname
        # that we _know_ won't exist.  This is guaranteed to fail.
        dir = None
        try:
            dir = tempfile.mkdtemp()
            name = os.path.join(dir, "foo")
            status, output = subprocess.getstatusoutput(
                ("type " if mswindows else "cat ") + name)
            self.assertNotEqual(status, 0)
        finally:
            if dir is not None:
                os.rmdir(dir)
项目:autotokubackup    作者:Percona-Lab    | 项目源码 | 文件源码
def create_backup_directory(self):
        """
        Creating timestamped backup directory.
        :return: Newly created backup directory or Error.
        """
        new_backup_dir = join(self.backupdir,
                              datetime.now().strftime('%Y-%m-%d_%H-%M-%S'))
        try:
            # Creating backup directory
            makedirs(new_backup_dir)
            # Changing owner
            chown_command = "chown mysql:mysql %s" % new_backup_dir
            status, output = subprocess.getstatusoutput(chown_command)
            if status == 0:
                return new_backup_dir
            else:
                print("Could not change owner of backup directory!")
        except Exception as err:
            print("Something went wrong in create_backup_directory(): {}".format(err))
项目:python_learn    作者:jetty-guo    | 项目源码 | 文件源码
def backup_tables(input_date, backup_table_list):
    """????????:??"""
    """DELTA??? DROP ??? ALTER TABLE ??? ALTER ? RENAME ??,????????"""

    for table in backup_table_list:
        print("backup table %s" %table)
        schema, tablename = table.split('.')
        backup_path = config.backup_path.format(date=input_date)+table+".ddl.bak"
        print(backup_path)
        if os.path.exists(backup_path):
            print("backup exists %s" %table)
        else:
            cmd = "db2look -d {edwdb} -i {edwuser} -w {edwpwd} -z {schema} -e -t {tablename} -nofed -o /etl/etldata/script/yatop_update/{date}/backup/{table}.ddl.bak".format(edwdb=config.edwdb, edwuser=config.edwuser, edwpwd=config.edwpwd, schema=schema,tablename=tablename,date=input_date,table=table)
            status, output = subprocess.getstatusoutput(cmd)
            if status:
                print("\033[1;31;40mcreate ddl error %s\033[0m" %table)
                print(output)
                sys.exit(-1)
项目:python_learn    作者:jetty-guo    | 项目源码 | 文件源码
def backup_schedule(input_date):
    """??????JOB_METADATA,? JOB_SEQ"""

    for table in ["JOB_METADATA", "JOB_SEQ"]:
        if table == "JOB_METADATA":
            path = config.job_metadata_path
        elif table == "JOB_SEQ":
            path = config.job_seq_path

        if os.path.exists(path.format(date=input_date)):
            print("backup exists %s" %table)
        else:
            print("export %s..." %table)
            cmd = 'db2 connect to {dwmmdb} user {dwmmuser} using {dwmmpwd} && db2 "export to /etl/etldata/script/yatop_update/{date}/backup/{table}.del of del select * from ETL.{table}"'.format(dwmmdb=config.dwmmdb, dwmmuser=config.dwmmuser, dwmmpwd=config.dwmmpwd, date=input_date, table=table)

            print(cmd)

            status, output = subprocess.getstatusoutput(cmd)

            if status:
                print("\033[1;31;40mexport %s error\033[0m" % table)
                print(output)
                return -1
    return 0
项目:python_learn    作者:jetty-guo    | 项目源码 | 文件源码
def load_schedule(input_date):
    """LOAD JOB_METADATA,? JOB_SEQ"""

    print("load JOB_METADATA...")
    cmd = 'db2 connect to {dwmmdb} user {dwmmuser} using {dwmmpwd} && db2 "load from /etl/etldata/script/yatop_update/{date}/backup/JOB_METADATA.del of del modified by identityoverride replace into ETL.JOB_METADATA"'.format(dwmmdb=config.dwmmdb, dwmmuser=config.dwmmuser, dwmmpwd=config.dwmmpwd, date=input_date)
    print(cmd)
    status, output = subprocess.getstatusoutput(cmd)
    if status:
        print("\033[1;31;40mload JOB_METADATA error, cat JOB_METADATA.error see detail \033[0m")
        with open('JOB_METADATA.error','w') as f:
            f.write(output)
        return -1

    print("load JOB_SEQ...")
    cmd = 'db2 connect to {dwmmdb} user {dwmmuser} using {dwmmpwd} && db2 "load from /etl/etldata/script/yatop_update/{date}/backup/JOB_SEQ.del of del replace into ETL.JOB_SEQ"'.format(dwmmdb=config.dwmmdb, dwmmuser=config.dwmmuser, dwmmpwd=config.dwmmpwd, date=input_date)
    print(cmd)
    status, output = subprocess.getstatusoutput(cmd)
    if status:
        print("\033[1;31;40mload JOB_SEQ error, cat JOB_SEQ.error see detail \033[0m")
        with open('JOB_SEQ.error','w') as f:
            f.write(output)
        return -1
    return 0
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_getoutput(self):
        self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
        self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
                         (0, 'xyzzy'))

        # we use mkdtemp in the next line to create an empty directory
        # under our exclusive control; from that, we can invent a pathname
        # that we _know_ won't exist.  This is guaranteed to fail.
        dir = None
        try:
            dir = tempfile.mkdtemp()
            name = os.path.join(dir, "foo")
            status, output = subprocess.getstatusoutput(
                ("type " if mswindows else "cat ") + name)
            self.assertNotEqual(status, 0)
        finally:
            if dir is not None:
                os.rmdir(dir)
项目:maas    作者:maas    | 项目源码 | 文件源码
def detect_ipmi():
    # XXX: andreserl 2013-04-09 bug=1064527: Try to detect if node
    # is a Virtual Machine. If it is, do not try to detect IPMI.
    with open('/proc/cpuinfo', 'r') as cpuinfo:
        for line in cpuinfo:
            if line.startswith('model name') and 'QEMU' in line:
                return (False, None)

    (status, output) = subprocess.getstatusoutput('ipmi-locate')
    show_re = re.compile('(IPMI\ Version:) (\d\.\d)')
    res = show_re.search(output)
    if res is None:
        found = glob.glob("/dev/ipmi[0-9]")
        if len(found):
            return (True, "UNKNOWN: %s" % " ".join(found))
        return (False, "")

    # We've detected IPMI, but it doesn't necessarily mean we can access
    # the BMC. Let's test if we can.
    cmd = 'bmc-config --checkout --key-pair=Lan_Conf:IP_Address_Source'
    (status, output) = subprocess.getstatusoutput(cmd)
    if status != 0:
        return (False, "")

    return (True, res.group(2))
项目:watcher    作者:magigo    | 项目源码 | 文件源码
def _get_disk_info(self):
        """
        ??????????

        :return: list, ?????????????
        """
        status, output = getstatusoutput("df -h")
        disk_info_list = []
        if not status:
            lines = output.split('\n')
            for line in lines[1:]:
                line_sp = line.split()
                line_sp = line_sp[:5] + [line_sp[-1]]
                if '/dev/' in line_sp[0]:
                    disk_info_list.append(self.DiskInfo(*line_sp))
        return disk_info_list
项目:conec    作者:cod3licious    | 项目源码 | 文件源码
def log_results(clf_ner, description, filen='', subf=''):
    import os
    if not os.path.exists('data/conll2003_results'):
        os.mkdir('data/conll2003_results')
    if not os.path.exists('data/conll2003_results%s' % subf):
        os.mkdir('data/conll2003_results%s' % subf)
    import subprocess
    print("applying to training set")
    apply_conll2003_ner(clf_ner, 'data/conll2003/ner/eng.train', 'data/conll2003_results%s/eng.out_train.txt' % subf)
    print("applying to test set")
    apply_conll2003_ner(clf_ner, 'data/conll2003/ner/eng.testa', 'data/conll2003_results%s/eng.out_testa.txt' % subf)
    apply_conll2003_ner(clf_ner, 'data/conll2003/ner/eng.testb', 'data/conll2003_results%s/eng.out_testb.txt' % subf)
    # write out results
    with open('data/conll2003_results/output_all_%s.txt' % filen, 'a') as f:
        f.write('%s\n' % description)
        f.write('results on training data\n')
        out = subprocess.getstatusoutput('data/conll2003/ner/bin/conlleval < data/conll2003_results%s/eng.out_train.txt' % subf)[1]
        f.write(out)
        f.write('\n')
        f.write('results on testa\n')
        out = subprocess.getstatusoutput('data/conll2003/ner/bin/conlleval < data/conll2003_results%s/eng.out_testa.txt' % subf)[1]
        f.write(out)
        f.write('\n')
        f.write('results on testb\n')
        out = subprocess.getstatusoutput('data/conll2003/ner/bin/conlleval < data/conll2003_results%s/eng.out_testb.txt' % subf)[1]
        f.write(out)
        f.write('\n')
        f.write('\n')
项目:sublime-text-3-packages    作者:nickjj    | 项目源码 | 文件源码
def _get_nix_font_path(self, name, style):
        try:
            from commands import getstatusoutput
        except ImportError:
            from subprocess import getstatusoutput
        exit, out = getstatusoutput('fc-list "%s:style=%s" file' %
                                    (name, style))
        if not exit:
            lines = out.splitlines()
            if lines:
                path = lines[0].strip().strip(':')
                return path
项目:civet    作者:TheJacksonLaboratory    | 项目源码 | 文件源码
def expandName(fn):
        # Many of the files we'll be testing are executables, and therefore
        # looked up on the path.  Try to look it up there (use the shell).
        # Note: This uses a deprecated interface, but boy is it handy.
        # If the returned status is non-zero, the lookup did not succeed.
        (status, result) = subprocess.getstatusoutput('which ' + fn)
        if status == 0:
            fn = result

        # The file may be a link, or the path may contain one...
        return os.path.realpath(fn)
项目:civet    作者:TheJacksonLaboratory    | 项目源码 | 文件源码
def getDLLs(self):
        if not self.effectivelyReadable():
            return []
        # Uses the system's ldd command to get all the supporting libraries.
        # Note: This uses a deprecated interface, but boy is it handy.
        (status, result) = subprocess.getstatusoutput('ldd ' + self.name)
        if status != 0:
            return []

        parts = result.split('\n')
        processed = []
        for n in range(len(parts)):
            # One dll at a time. Prune the address from the right.
            # If there is a path, it is the second element, use it.
            # If that's empty, use the first element.
            if 'ldd: warning:' in parts[n]:
                continue
            names = parts[n].split(' (')[0]
            names = names.split('=>')
            if len(names) > 1:
                name = names[1].strip()
            else:
                name = ''
            if name == '':
                name = names[0].strip()
            processed.append(name)
        return processed
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def getoutput(cmd, successful_status=(0,), stacklevel=1):
    try:
        status, output = getstatusoutput(cmd)
    except EnvironmentError:
        e = get_exception()
        warnings.warn(str(e), UserWarning, stacklevel=stacklevel)
        return False, output
    if os.WIFEXITED(status) and os.WEXITSTATUS(status) in successful_status:
        return True, output
    return False, output
项目:gprime    作者:GenealogyCollective    | 项目源码 | 文件源码
def intltool_version():
    '''
    Return the version of intltool as a tuple.
    '''
    import subprocess
    if sys.platform == 'win32':
        cmd = ["perl", "-e print qx(intltool-update --version) =~ m/(\d+.\d+.\d+)/;"]
        try:
            ver, ret = subprocess.Popen(cmd ,stdout=subprocess.PIPE,
                stderr=subprocess.PIPE, shell=True).communicate()
            ver = ver.decode("utf-8")
            if ver > "":
                version_str = ver
            else:
                return (0,0,0)
        except:
            return (0,0,0)
    else:
        cmd = 'intltool-update --version 2> /dev/null' # pathological case
        retcode, version_str = subprocess.getstatusoutput(cmd)
        if retcode != 0:
            return None
        cmd = 'intltool-update --version 2> /dev/null | head -1 | cut -d" " -f3'
        retcode, version_str = subprocess.getstatusoutput(cmd)
        if retcode != 0: # unlikely but just barely imaginable, so leave it
            return None
    return tuple([int(num) for num in version_str.split('.')])
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def shell_run(text):
    import subprocess
    val, output = subprocess.getstatusoutput(text)

    if not val:
        style = 'OUTPUT'
    else:
        style = 'ERROR'

    add_scrollback(output, style)
项目:site    作者:alphageek-xyz    | 项目源码 | 文件源码
def get_uwsgi_version():
    status, output = subprocess.getstatusoutput(['uwsgi --version'])
    return None if status else output
项目:mdb    作者:edb-gjengen    | 项目源码 | 文件源码
def check_zone(self, zone, contents):
        zonefile = self.checkzone_dir + '/' + zone.domain_name
        with open(zonefile, 'w') as f:
            f.write(contents)

        return getstatusoutput("%s %s %s" % (self.checkzone_bin, zone.domain_name, zonefile))
项目:macos-st-packages    作者:zce    | 项目源码 | 文件源码
def _get_nix_font_path(self, name, style):
        try:
            from commands import getstatusoutput
        except ImportError:
            from subprocess import getstatusoutput
        exit, out = getstatusoutput('fc-list "%s:style=%s" file' %
                                    (name, style))
        if not exit:
            lines = out.splitlines()
            if lines:
                path = lines[0].strip().strip(':')
                return path
项目:krpcScripts    作者:jwvanderbeck    | 项目源码 | 文件源码
def getoutput(cmd, successful_status=(0,), stacklevel=1):
    try:
        status, output = getstatusoutput(cmd)
    except EnvironmentError:
        e = get_exception()
        warnings.warn(str(e), UserWarning, stacklevel=stacklevel)
        return False, output
    if os.WIFEXITED(status) and os.WEXITSTATUS(status) in successful_status:
        return True, output
    return False, output
项目:deep-learning-nlp-rl-papers    作者:madrugado    | 项目源码 | 文件源码
def _twitting(self):
        url = shorten_url(self.buf[3] if self.buf[3][:8] != "**URL:**" else self.buf[3][9:])
        text = self.buf[4] if self.buf[4][:10] != "**Notes:**" else self.buf[4][11:]
        if len(text) > TWEET_LIMIT - len(url) - 1 - 3:  # one symbol for space, three symbols more
            premature_ending = "... "
            # FIXME: for some reason twitter counts for three symbols more, than len()
            while len(text) > TWEET_LIMIT - len(premature_ending) - len(url) - 3:
                text = str.rsplit(text, " ", 1)[0]

            twit = "\"" + text + premature_ending + url + "\""
        else:
            twit = "\"" + text + " " + url + "\""

        cmd.getstatusoutput(self.twitter_command + " " + twit)
        return twit
项目:deep-learning-nlp-rl-papers    作者:madrugado    | 项目源码 | 文件源码
def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--toc-maker", help="path to ToC making tool")
    parser.add_argument("--twitter-poster", default="t update", help="twitter poster command")
    parser.add_argument("-t", "--use-twitter", action="store_true")

    known_args, unknown_args = parser.parse_known_args()

    if not known_args.toc_maker:
        known_args.toc_maker = "./gh-md-toc"
        if not os.path.isfile(known_args.toc_maker):
            s = cmd.getoutput("uname -s").lower()
            f = "gh-md-toc.%s.amd64.tgz" % s
            URL = "https://github.com/ekalinin/github-markdown-toc.go/releases/download/0.6.0/%s" % f
            if not os.path.isfile(f):
                if cmd.getstatusoutput("wget %s" % URL)[0] != 0:
                    raise EnvironmentError("Cannot download toc maker from URL: %s" % URL)
            if cmd.getstatusoutput("tar xzf %s" % f)[0] != 0:
                    raise EnvironmentError("Cannot untar toc maker from file %s" % f)
            os.remove(f)

            current_permissions = stat.S_IMODE(os.lstat(known_args.toc_maker).st_mode)
            os.chmod(known_args.toc_maker, current_permissions & stat.S_IXUSR)

    if unknown_args:
        filepath = unknown_args[0]
    else:
        print("You should specify the path for file to work with!")
        quit(1)

    return known_args, filepath
项目:SlackBuilds    作者:montagdude    | 项目源码 | 文件源码
def get_version(self, package):
        """
        Get the version of the package from pkg-config.
        """
        if not self.has_pkgconfig:
            return None

        status, output = getstatusoutput(
            self.pkg_config + " %s --modversion" % (package))
        if status == 0:
            return output
        return None


# The PkgConfig class should be used through this singleton
项目:automatic-repo    作者:WZQ1397    | 项目源码 | 文件源码
def dynamicinfo():
    info = {}
    info['sysver'] = subprocess.getstatusoutput("head -1 /etc/issue | awk '{ for(;i++<NF;) \
    if ($i==\"\\n\" ||  $i==\"\l\") continue ; else print $i }'")
    #info['hostname'] = subprocess.getstatusoutput("hostname")
    #currentproccessnum = "(ps -ef | wc -l) -1"
    info['loadavg'] = subprocess.getstatusoutput("more /proc/loadavg  | cut -d \" \" -f 1-3")
    info['uptime'] = subprocess.getstatusoutput("uptime | cut -d \",\" -f 1")
    info['diskusage'] = subprocess.getstatusoutput("df -h | grep ^/dev/* | awk '{print $4,$5}'")
    info['ipv4'] = subprocess.getstatusoutput("ip -4 a | grep inet | grep -v \"127.0.0.1\" | cut -d \" \" -f 6,11 | head -1")


    return info
项目:automatic-repo    作者:WZQ1397    | 项目源码 | 文件源码
def win_agent_serv_info():
    wininfo = {}
    wininfo['Cpu'] = subprocess.getstatusoutput('wmic cpu list brief')

    wininfo['PsyMem'] = subprocess.getstatusoutput('wmic memphysical list brief')

    wininfo['VirtMem'] =subprocess.getstatusoutput('wmic pagefile list brief')

    wininfo['disk'] = subprocess.getstatusoutput('wmic volume get name,freespace')

    wininfo['IPv4'] = subprocess.getstatusoutput('ipconfig | findstr IPv4')

    return wininfo
项目:dcos    作者:dcos    | 项目源码 | 文件源码
def _handle_path_run_cmd(self):
        """Runs an arbitrary command, and returns the output along with the return code

        Sometimes there isn't enough time to write code
        """
        length = int(self.headers['Content-Length'])
        cmd = self.rfile.read(length).decode('utf-8')
        (status, output) = subprocess.getstatusoutput(cmd)
        data = {"status": status, "output": output}
        self._send_reply(data)
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def getoutput(cmd, successful_status=(0,), stacklevel=1):
    try:
        status, output = getstatusoutput(cmd)
    except EnvironmentError:
        e = get_exception()
        warnings.warn(str(e), UserWarning, stacklevel=stacklevel)
        return False, output
    if os.WIFEXITED(status) and os.WEXITSTATUS(status) in successful_status:
        return True, output
    return False, output
项目:tetherback    作者:dlenski    | 项目源码 | 文件源码
def get_version(self):
         try:
             s, output = sp.getstatusoutput(self.adbcmd(('version',)))
         except FileNotFoundError:
             raise
         except sp.CalledProcessError:
             raise

         m = re.search(r'^Android Debug Bridge version ((?:\d+.)+\d+)', output)
         if not m:
             raise RuntimeError("could not parse 'adb version' output")

         adbversions = m.group(1)
         adbversion = tuple(int(x) for x in adbversions.split('.'))
         return adbversions, adbversion
项目:sublimeTextConfig    作者:luoye-fe    | 项目源码 | 文件源码
def _get_nix_font_path(self, name, style):
        try:
            from commands import getstatusoutput
        except ImportError:
            from subprocess import getstatusoutput
        exit, out = getstatusoutput('fc-list "%s:style=%s" file' %
                                    (name, style))
        if not exit:
            lines = out.splitlines()
            if lines:
                path = lines[0].strip().strip(':')
                return path
项目:aws-lambda-numpy    作者:vitolimandibhrata    | 项目源码 | 文件源码
def getoutput(cmd, successful_status=(0,), stacklevel=1):
    try:
        status, output = getstatusoutput(cmd)
    except EnvironmentError:
        e = get_exception()
        warnings.warn(str(e), UserWarning, stacklevel=stacklevel)
        return False, output
    if os.WIFEXITED(status) and os.WEXITSTATUS(status) in successful_status:
        return True, output
    return False, output
项目:GetOrganelle    作者:Kinggerm    | 项目源码 | 文件源码
def require_options():
    try:
        # python3
        blast_in_path = subprocess.getstatusoutput('blastn')
    except AttributeError:
        # python2
        blast_in_path = commands.getstatusoutput('blastn')
    if blast_in_path[0] == 32512:
        sys.stdout.write('\nError: blastn not in the path!')
        exit()
    try:
        # python3
        makeblastdb_in_path = subprocess.getstatusoutput('makeblastdb')
    except AttributeError:
        # python2
        makeblastdb_in_path = commands.getstatusoutput('makeblastdb')
    if makeblastdb_in_path[0] == 32512:
        sys.stdout.write('\nError: makeblastdb not in the path!')
        exit()
    usage = "Usage: rm_low_coverage_duplicated_contigs.py *.fastg"
    parser = OptionParser(usage=usage)
    parser.add_option('--cov-t', dest='coverage_threshold', default=0.12,
                      help='With ratio (coverage of query/coverage of subject) below which, '
                           'the query would be exposed to discarded. Default: 0.12')
    parser.add_option('--len-t', dest='length_threshold', default=0.9,
                      help='With overlap (length of hit of query/ length of query) above which, '
                           'the query would be exposed to discarded. Default: 0.9')
    parser.add_option('--blur', dest='blur_bases', default=False, action='store_true',
                      help='Replace hit low-coverage bases with N.')
    parser.add_option('--keep-temp', dest='keep_temp', default=False, action='store_true',
                      help='Keep temp blast files.')
    parser.add_option('-o', dest='output',
                      help='Output file. Default: *.purified.fastg')
    parser.add_option('-t', '--threads', dest="threads", default=4, type=int,
                      help="Threads of blastn.")
    options, args = parser.parse_args()
    if not args:
        parser.print_help()
        sys.stdout.write('\n######################################\nERROR: Insufficient REQUIRED arguments!\n\n')
        exit()
    return options, args
项目:GetOrganelle    作者:Kinggerm    | 项目源码 | 文件源码
def require_commands():
    global options
    try:
        # python3
        blast_in_path = subprocess.getstatusoutput('blastn')
    except AttributeError:
        # python2
        blast_in_path = commands.getstatusoutput('blastn')
    if blast_in_path[0] == 32512:
        sys.stdout.write('\nError: blastn not in the path!')
        exit()
    try:
        # python3
        makeblastdb_in_path = subprocess.getstatusoutput('makeblastdb')
    except AttributeError:
        # python2
        makeblastdb_in_path = commands.getstatusoutput('makeblastdb')
    if makeblastdb_in_path[0] == 32512:
        sys.stdout.write('\nError: makeblastdb not in the path!')
        exit()
    usage = 'python '+str(os.path.basename(__file__))+' -g input.fastg -f refernce.fasta'
    parser = OptionParser(usage=usage)
    parser.add_option('-g', dest='in_fastg_file', help='followed by your input fastg file')
    parser.add_option('-f', dest='reference_fa_base', help='followed by Fasta index format')
    parser.add_option('--keep-temp', dest='keep_temp', default=False, action='store_true', help='Choose to disable deleting temp files produced by blast and this script')
    parser.add_option('--bt', dest='blast_hits_threshold', default=0.60, help='Default: 0.60', type=float)
    parser.add_option('--max-gap', dest='max_gap_to_add', default=1500, help='Default: 1500', type=int)
    parser.add_option('--con-all', dest='connect_inner_contig', default=False, action='store_true', help='Choose to activate connecting all possible contigs. Default: False')
    parser.add_option('--depth', dest='depth_to_connect', default=1.0, help='Default: 1.0', type=float)
    # parser.add_option('--merge-overlaps', default=False, action='store_true', help='Choose to activate automatically merging overlapping contigs')
    # parser.add_option('--min-os', dest='min_overlap_similarity', default=0.9, help='The similarity threshold to merge overlapping contigs. Default: 0.9', type=float)
    # parser.add_option('--min-ol', dest='min_overlap_length', default=15, help='The length threshold to merge overlapping contigs. Default: 15', type=int)
    try:
        (options, args) = parser.parse_args()
    except Exception as e:
        sys.stdout.write('\n######################################'+str(e))
        sys.stdout.write('\n"-h" for more usage')
        exit()
项目:GetOrganelle    作者:Kinggerm    | 项目源码 | 文件源码
def check_db():
    global options
    if options.reference_fa_base:
        time0 = time.time()
        ref_fasta = read_fasta(options.reference_fa_base)
        if len(ref_fasta[0]) > 1:
            options.reference_fa_base += '.1st.fasta'
            write_fasta(out_dir=options.reference_fa_base, matrix=[[ref_fasta[0][0]], [ref_fasta[1][0]], ref_fasta[2]], overwrite=True)
            sys.stdout.write('\nWarning: multi-seqs in reference file, only use the 1st sequence.')
        elif len(ref_fasta[0]) == 0:
            sys.stdout.write('\nError: illegal reference file!')
            exit()
        try:
            # python2
            makedb_result = subprocess.getstatusoutput('makeblastdb -dbtype nucl -in '+options.reference_fa_base+' -out '+options.reference_fa_base+'.index')
        except AttributeError:
            # python3
            makedb_result = commands.getstatusoutput('makeblastdb -dbtype nucl -in ' + options.reference_fa_base + ' -out ' + options.reference_fa_base + '.index')
        if 'Error' in str(makedb_result[1]) or 'error' in str(makedb_result[1]) or '?????????' in str(makedb_result[1]):
            os.system('makeblastdb -dbtype nucl -in '+options.reference_fa_base+' -out '+options.reference_fa_base+'.index')
            if not os.path.exists(options.reference_fa_base+'.index.nhr'):
                sys.stdout.write('Blast terminated with following info:\n'+str(makedb_result[1]))
                exit()
        in_index = options.reference_fa_base+'.index'
        sys.stdout.write('\nMaking BLAST db cost '+str(time.time()-time0))
    else:
        sys.stdout.write('\nError: No reference input!')
        exit()
    return in_index
项目:GetOrganelle    作者:Kinggerm    | 项目源码 | 文件源码
def executable(test_this):
    return True if os.access(test_this, os.X_OK) or getstatusoutput(test_this)[0] != dead_code else False
项目:Spider    作者:poluo    | 项目源码 | 文件源码
def change_proxy():
        status, res = subprocess.getstatusoutput('adsl-stop')
        if status == 0:
            log.debug('adsl stop success')
        else:
            log.warning('adsl stop failed')
        time.sleep(0.5)
        status, res = subprocess.getstatusoutput('adsl-start')
        if status == 0:
            log.debug('adsl start success')
        else:
            log.warning('adsl start failed')
项目:Spider    作者:poluo    | 项目源码 | 文件源码
def change_proxy():
        status, res = subprocess.getstatusoutput('adsl-stop')
        if status == 0:
            log.debug('adsl stop success')
        else:
            log.warning('adsl stop failed')
        time.sleep(0.5)
        status, res = subprocess.getstatusoutput('adsl-start')
        if status == 0:
            log.debug('adsl start success')
        else:
            log.warning('adsl start failed')
项目:Spider    作者:poluo    | 项目源码 | 文件源码
def change_ip(self):
        self.request_count = 0
        # status, res = subprocess.getstatusoutput('adsl-stop')
        # if status == 0:
        #     logger.debug('adsl stop success')
        # else:
        #     logger.warning('adsl stop failed')
        # time.sleep(0.5)
        # status, res = subprocess.getstatusoutput('adsl-start')
        # if status == 0:
        #     logger.debug('adsl start success')
        # else:
        #     logger.warning('adsl start failed')
项目:Spider    作者:poluo    | 项目源码 | 文件源码
def change_ip(self):
        self.agent = random.choice(AGENTS_ALL)
        self.request_count = 0
        status, res = subprocess.getstatusoutput('adsl-stop')
        if status == 0:
            logger.debug('adsl stop success')
        else:
            logger.warning('adsl stop failed')
        time.sleep(0.5)
        status, res = subprocess.getstatusoutput('adsl-start')
        if status == 0:
            logger.debug('adsl start success')
        else:
            logger.warning('adsl start failed')
项目:lambda-numba    作者:rlhotovy    | 项目源码 | 文件源码
def getoutput(cmd, successful_status=(0,), stacklevel=1):
    try:
        status, output = getstatusoutput(cmd)
    except EnvironmentError:
        e = get_exception()
        warnings.warn(str(e), UserWarning, stacklevel=stacklevel)
        return False, output
    if os.WIFEXITED(status) and os.WEXITSTATUS(status) in successful_status:
        return True, output
    return False, output
项目:python_learn    作者:jetty-guo    | 项目源码 | 文件源码
def execute(date, file_name):
    print("execute %s" %file_name)
    cmd = "db2 -tvf /etl/etldata/script/yatop_update/"+date+"/"+file_name
    print(cmd)

    status, output = subprocess.getstatusoutput(cmd)


    if status:
        print("\033[1;31;40m execute %s error, cat %s.error to see detail \033[0m" % (file_name, file_name))
        with open(file_name+'.error', 'w') as f:
            f.write(output)
        return -1
    return 0
项目:prisma    作者:hijkzzz    | 项目源码 | 文件源码
def transform_async(filename, email, model):
    # ????
    content_file_path = join(app.config['UPLOAD_FOLDER'], filename)
    model_file_path = join(app.config['MODEL_FOLDER'], model)
    output_folder = app.config['OUTPUT_FOLDER']

    output_filename = filename
    (shotname, extension) = splitext(output_filename)
    output_filename = shotname + '-' + model + extension
    output_file_path = join(output_folder, output_filename)

    command = 'python eval.py --CONTENT_IMAG %s --MODEL_PATH %s --OUTPUT_FOLDER %s' % (
        content_file_path, model_file_path, output_folder)
    status, output = subprocess.getstatusoutput(command)

    # ????
    print(status, output)

    # ????
    if status == 0:
        with app.app_context():
            msg = Message("IMAGE-STYLE-TRANSFER",
                          sender=app.config['MAIL_USERNAME'], recipients=[email])
            msg.body = filename
            with app.open_resource(output_file_path) as f:
                mime_type = 'image/jpg' if splitext(
                    filename)[1] is not '.png' else 'image/png'
                msg.attach(filename, mime_type, f.read())
            mail.send(msg)
    else:
        with app.app_context():
            msg = Message("IMAGE-STYLE-TRANSFER",
                          sender=app.config['MAIL_USERNAME'], recipients=[email])
            msg.body = "CONVERT ERROR\n" + filename + "\n HELP - http://host:port/help"
            mail.send(msg)

    remove_files.apply_async(
        args=[[content_file_path, output_file_path]], countdown=60)
项目:regen    作者:biojppm    | 项目源码 | 文件源码
def get_output(cmd):
    status, output = subprocess.getstatusoutput(cmd)
    if status != 0:
        msg = "command failed with status {}: {}\noutput was:\n{}"
        msg = msg.format(status, cmd, output)
        raise Exception(msg)
    return output
项目:log-with-git    作者:iesugrace    | 项目源码 | 文件源码
def get_status_text_output(cmd):
    """ Run the cmd, return the output as a list of lines
    as well as the stat of the cmd (True or False), content
    of the out will be decoded.
    """
    stat, output = subprocess.getstatusoutput(cmd)
    if stat == 0:
        output = output.split('\n') if output else []
        res    = (True, output)
    else:
        res    = (False, [])
    return res
项目:deliver    作者:orchestor    | 项目源码 | 文件源码
def getoutput(cmd, successful_status=(0,), stacklevel=1):
    try:
        status, output = getstatusoutput(cmd)
    except EnvironmentError:
        e = get_exception()
        warnings.warn(str(e), UserWarning, stacklevel=stacklevel)
        return False, output
    if os.WIFEXITED(status) and os.WEXITSTATUS(status) in successful_status:
        return True, output
    return False, output