我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用subprocess.getstatusoutput()。
def test_getoutput(self): self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy') self.assertEqual(subprocess.getstatusoutput('echo xyzzy'), (0, 'xyzzy')) # we use mkdtemp in the next line to create an empty directory # under our exclusive control; from that, we can invent a pathname # that we _know_ won't exist. This is guaranteed to fail. dir = None try: dir = tempfile.mkdtemp() name = os.path.join(dir, "foo") status, output = subprocess.getstatusoutput('cat ' + name) self.assertNotEqual(status, 0) finally: if dir is not None: os.rmdir(dir)
def __init__(self): """ Determines whether pkg-config exists on this machine. """ if sys.platform == 'win32': self.has_pkgconfig = False else: try: self.pkg_config = os.environ['PKG_CONFIG'] except KeyError: self.pkg_config = 'pkg-config' self.set_pkgconfig_path() status, output = getstatusoutput(self.pkg_config + " --help") self.has_pkgconfig = (status == 0) if not self.has_pkgconfig: print("IMPORTANT WARNING:") print( " pkg-config is not installed.\n" " matplotlib may not be able to find some of its dependencies")
def check(self): if sys.platform == 'win32': check_include_file(get_include_dirs(), 'ft2build.h', 'freetype') return 'Using unknown version found on system.' status, output = getstatusoutput("freetype-config --ftversion") if status == 0: version = output else: version = None # Early versions of freetype grep badly inside freetype-config, # so catch those cases. (tested with 2.5.3). if version is None or 'No such file or directory\ngrep:' in version: version = self.version_from_header() # pkg_config returns the libtool version rather than the # freetype version so we need to explicitly pass the version # to _check_for_pkg_config return self._check_for_pkg_config( 'freetype2', 'ft2build.h', min_version='2.3', version=version)
def check(self): if sys.platform == 'win32': check_include_file(get_include_dirs(), 'png.h', 'png') return 'Using unknown version found on system.' status, output = getstatusoutput("libpng-config --version") if status == 0: version = output else: version = None try: return self._check_for_pkg_config( 'libpng', 'png.h', min_version='1.2', version=version) except CheckFailed as e: if has_include_file(get_include_dirs(), 'png.h'): return str(e) + ' Using unknown version found on system.' raise
def exec2(): child.expect('#') if subprocess.getstatusoutput('id root >> /dev/null 2&1 && echo $?') != 0: __newpasswd = 'edong&1310' subprocess.getstatusoutput('useradd zach') run('passwd zach',events={'(?i)password:':__newpasswd}) #TODO run EQUAL TO FOLLOW COMMIT! ''' child.expect('password:') child.sendline() child.expect('password:') child.sendline(__newpasswd) ''' child.expect('#') child.sendline('su - zach') child.expect('$') child.sendline('whomai')
def crawl(): count = 0 make_dir('./log') while True: count+=1 status, res = subprocess.getstatusoutput('scrapy crawl news') if status == 0: print(res) else: print('crawl failed {}'.format(res)) for file in os.listdir(os.getcwd()): if os.path.isfile(file) and 'res_' in file: with open(file,'r') as fobj: try: res = json.load(fobj) except Exception as e: print(e) res = None if res: shutil.copy(file,'./log/{}'.format(file)) insert_value(res) print(res) os.remove(file) print('loop {} finished'.format(count)) time.sleep(60*30)
def test_getoutput(self): self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy') self.assertEqual(subprocess.getstatusoutput('echo xyzzy'), (0, 'xyzzy')) # we use mkdtemp in the next line to create an empty directory # under our exclusive control; from that, we can invent a pathname # that we _know_ won't exist. This is guaranteed to fail. dir = None try: dir = tempfile.mkdtemp() name = os.path.join(dir, "foo") status, output = subprocess.getstatusoutput( ("type " if mswindows else "cat ") + name) self.assertNotEqual(status, 0) finally: if dir is not None: os.rmdir(dir)
def create_backup_directory(self): """ Creating timestamped backup directory. :return: Newly created backup directory or Error. """ new_backup_dir = join(self.backupdir, datetime.now().strftime('%Y-%m-%d_%H-%M-%S')) try: # Creating backup directory makedirs(new_backup_dir) # Changing owner chown_command = "chown mysql:mysql %s" % new_backup_dir status, output = subprocess.getstatusoutput(chown_command) if status == 0: return new_backup_dir else: print("Could not change owner of backup directory!") except Exception as err: print("Something went wrong in create_backup_directory(): {}".format(err))
def backup_tables(input_date, backup_table_list): """????????:??""" """DELTA??? DROP ??? ALTER TABLE ??? ALTER ? RENAME ??,????????""" for table in backup_table_list: print("backup table %s" %table) schema, tablename = table.split('.') backup_path = config.backup_path.format(date=input_date)+table+".ddl.bak" print(backup_path) if os.path.exists(backup_path): print("backup exists %s" %table) else: cmd = "db2look -d {edwdb} -i {edwuser} -w {edwpwd} -z {schema} -e -t {tablename} -nofed -o /etl/etldata/script/yatop_update/{date}/backup/{table}.ddl.bak".format(edwdb=config.edwdb, edwuser=config.edwuser, edwpwd=config.edwpwd, schema=schema,tablename=tablename,date=input_date,table=table) status, output = subprocess.getstatusoutput(cmd) if status: print("\033[1;31;40mcreate ddl error %s\033[0m" %table) print(output) sys.exit(-1)
def backup_schedule(input_date): """??????JOB_METADATA,? JOB_SEQ""" for table in ["JOB_METADATA", "JOB_SEQ"]: if table == "JOB_METADATA": path = config.job_metadata_path elif table == "JOB_SEQ": path = config.job_seq_path if os.path.exists(path.format(date=input_date)): print("backup exists %s" %table) else: print("export %s..." %table) cmd = 'db2 connect to {dwmmdb} user {dwmmuser} using {dwmmpwd} && db2 "export to /etl/etldata/script/yatop_update/{date}/backup/{table}.del of del select * from ETL.{table}"'.format(dwmmdb=config.dwmmdb, dwmmuser=config.dwmmuser, dwmmpwd=config.dwmmpwd, date=input_date, table=table) print(cmd) status, output = subprocess.getstatusoutput(cmd) if status: print("\033[1;31;40mexport %s error\033[0m" % table) print(output) return -1 return 0
def load_schedule(input_date): """LOAD JOB_METADATA,? JOB_SEQ""" print("load JOB_METADATA...") cmd = 'db2 connect to {dwmmdb} user {dwmmuser} using {dwmmpwd} && db2 "load from /etl/etldata/script/yatop_update/{date}/backup/JOB_METADATA.del of del modified by identityoverride replace into ETL.JOB_METADATA"'.format(dwmmdb=config.dwmmdb, dwmmuser=config.dwmmuser, dwmmpwd=config.dwmmpwd, date=input_date) print(cmd) status, output = subprocess.getstatusoutput(cmd) if status: print("\033[1;31;40mload JOB_METADATA error, cat JOB_METADATA.error see detail \033[0m") with open('JOB_METADATA.error','w') as f: f.write(output) return -1 print("load JOB_SEQ...") cmd = 'db2 connect to {dwmmdb} user {dwmmuser} using {dwmmpwd} && db2 "load from /etl/etldata/script/yatop_update/{date}/backup/JOB_SEQ.del of del replace into ETL.JOB_SEQ"'.format(dwmmdb=config.dwmmdb, dwmmuser=config.dwmmuser, dwmmpwd=config.dwmmpwd, date=input_date) print(cmd) status, output = subprocess.getstatusoutput(cmd) if status: print("\033[1;31;40mload JOB_SEQ error, cat JOB_SEQ.error see detail \033[0m") with open('JOB_SEQ.error','w') as f: f.write(output) return -1 return 0
def detect_ipmi(): # XXX: andreserl 2013-04-09 bug=1064527: Try to detect if node # is a Virtual Machine. If it is, do not try to detect IPMI. with open('/proc/cpuinfo', 'r') as cpuinfo: for line in cpuinfo: if line.startswith('model name') and 'QEMU' in line: return (False, None) (status, output) = subprocess.getstatusoutput('ipmi-locate') show_re = re.compile('(IPMI\ Version:) (\d\.\d)') res = show_re.search(output) if res is None: found = glob.glob("/dev/ipmi[0-9]") if len(found): return (True, "UNKNOWN: %s" % " ".join(found)) return (False, "") # We've detected IPMI, but it doesn't necessarily mean we can access # the BMC. Let's test if we can. cmd = 'bmc-config --checkout --key-pair=Lan_Conf:IP_Address_Source' (status, output) = subprocess.getstatusoutput(cmd) if status != 0: return (False, "") return (True, res.group(2))
def _get_disk_info(self): """ ?????????? :return: list, ????????????? """ status, output = getstatusoutput("df -h") disk_info_list = [] if not status: lines = output.split('\n') for line in lines[1:]: line_sp = line.split() line_sp = line_sp[:5] + [line_sp[-1]] if '/dev/' in line_sp[0]: disk_info_list.append(self.DiskInfo(*line_sp)) return disk_info_list
def log_results(clf_ner, description, filen='', subf=''): import os if not os.path.exists('data/conll2003_results'): os.mkdir('data/conll2003_results') if not os.path.exists('data/conll2003_results%s' % subf): os.mkdir('data/conll2003_results%s' % subf) import subprocess print("applying to training set") apply_conll2003_ner(clf_ner, 'data/conll2003/ner/eng.train', 'data/conll2003_results%s/eng.out_train.txt' % subf) print("applying to test set") apply_conll2003_ner(clf_ner, 'data/conll2003/ner/eng.testa', 'data/conll2003_results%s/eng.out_testa.txt' % subf) apply_conll2003_ner(clf_ner, 'data/conll2003/ner/eng.testb', 'data/conll2003_results%s/eng.out_testb.txt' % subf) # write out results with open('data/conll2003_results/output_all_%s.txt' % filen, 'a') as f: f.write('%s\n' % description) f.write('results on training data\n') out = subprocess.getstatusoutput('data/conll2003/ner/bin/conlleval < data/conll2003_results%s/eng.out_train.txt' % subf)[1] f.write(out) f.write('\n') f.write('results on testa\n') out = subprocess.getstatusoutput('data/conll2003/ner/bin/conlleval < data/conll2003_results%s/eng.out_testa.txt' % subf)[1] f.write(out) f.write('\n') f.write('results on testb\n') out = subprocess.getstatusoutput('data/conll2003/ner/bin/conlleval < data/conll2003_results%s/eng.out_testb.txt' % subf)[1] f.write(out) f.write('\n') f.write('\n')
def _get_nix_font_path(self, name, style): try: from commands import getstatusoutput except ImportError: from subprocess import getstatusoutput exit, out = getstatusoutput('fc-list "%s:style=%s" file' % (name, style)) if not exit: lines = out.splitlines() if lines: path = lines[0].strip().strip(':') return path
def expandName(fn): # Many of the files we'll be testing are executables, and therefore # looked up on the path. Try to look it up there (use the shell). # Note: This uses a deprecated interface, but boy is it handy. # If the returned status is non-zero, the lookup did not succeed. (status, result) = subprocess.getstatusoutput('which ' + fn) if status == 0: fn = result # The file may be a link, or the path may contain one... return os.path.realpath(fn)
def getDLLs(self): if not self.effectivelyReadable(): return [] # Uses the system's ldd command to get all the supporting libraries. # Note: This uses a deprecated interface, but boy is it handy. (status, result) = subprocess.getstatusoutput('ldd ' + self.name) if status != 0: return [] parts = result.split('\n') processed = [] for n in range(len(parts)): # One dll at a time. Prune the address from the right. # If there is a path, it is the second element, use it. # If that's empty, use the first element. if 'ldd: warning:' in parts[n]: continue names = parts[n].split(' (')[0] names = names.split('=>') if len(names) > 1: name = names[1].strip() else: name = '' if name == '': name = names[0].strip() processed.append(name) return processed
def getoutput(cmd, successful_status=(0,), stacklevel=1): try: status, output = getstatusoutput(cmd) except EnvironmentError: e = get_exception() warnings.warn(str(e), UserWarning, stacklevel=stacklevel) return False, output if os.WIFEXITED(status) and os.WEXITSTATUS(status) in successful_status: return True, output return False, output
def intltool_version(): ''' Return the version of intltool as a tuple. ''' import subprocess if sys.platform == 'win32': cmd = ["perl", "-e print qx(intltool-update --version) =~ m/(\d+.\d+.\d+)/;"] try: ver, ret = subprocess.Popen(cmd ,stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True).communicate() ver = ver.decode("utf-8") if ver > "": version_str = ver else: return (0,0,0) except: return (0,0,0) else: cmd = 'intltool-update --version 2> /dev/null' # pathological case retcode, version_str = subprocess.getstatusoutput(cmd) if retcode != 0: return None cmd = 'intltool-update --version 2> /dev/null | head -1 | cut -d" " -f3' retcode, version_str = subprocess.getstatusoutput(cmd) if retcode != 0: # unlikely but just barely imaginable, so leave it return None return tuple([int(num) for num in version_str.split('.')])
def shell_run(text): import subprocess val, output = subprocess.getstatusoutput(text) if not val: style = 'OUTPUT' else: style = 'ERROR' add_scrollback(output, style)
def get_uwsgi_version(): status, output = subprocess.getstatusoutput(['uwsgi --version']) return None if status else output
def check_zone(self, zone, contents): zonefile = self.checkzone_dir + '/' + zone.domain_name with open(zonefile, 'w') as f: f.write(contents) return getstatusoutput("%s %s %s" % (self.checkzone_bin, zone.domain_name, zonefile))
def _twitting(self): url = shorten_url(self.buf[3] if self.buf[3][:8] != "**URL:**" else self.buf[3][9:]) text = self.buf[4] if self.buf[4][:10] != "**Notes:**" else self.buf[4][11:] if len(text) > TWEET_LIMIT - len(url) - 1 - 3: # one symbol for space, three symbols more premature_ending = "... " # FIXME: for some reason twitter counts for three symbols more, than len() while len(text) > TWEET_LIMIT - len(premature_ending) - len(url) - 3: text = str.rsplit(text, " ", 1)[0] twit = "\"" + text + premature_ending + url + "\"" else: twit = "\"" + text + " " + url + "\"" cmd.getstatusoutput(self.twitter_command + " " + twit) return twit
def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--toc-maker", help="path to ToC making tool") parser.add_argument("--twitter-poster", default="t update", help="twitter poster command") parser.add_argument("-t", "--use-twitter", action="store_true") known_args, unknown_args = parser.parse_known_args() if not known_args.toc_maker: known_args.toc_maker = "./gh-md-toc" if not os.path.isfile(known_args.toc_maker): s = cmd.getoutput("uname -s").lower() f = "gh-md-toc.%s.amd64.tgz" % s URL = "https://github.com/ekalinin/github-markdown-toc.go/releases/download/0.6.0/%s" % f if not os.path.isfile(f): if cmd.getstatusoutput("wget %s" % URL)[0] != 0: raise EnvironmentError("Cannot download toc maker from URL: %s" % URL) if cmd.getstatusoutput("tar xzf %s" % f)[0] != 0: raise EnvironmentError("Cannot untar toc maker from file %s" % f) os.remove(f) current_permissions = stat.S_IMODE(os.lstat(known_args.toc_maker).st_mode) os.chmod(known_args.toc_maker, current_permissions & stat.S_IXUSR) if unknown_args: filepath = unknown_args[0] else: print("You should specify the path for file to work with!") quit(1) return known_args, filepath
def get_version(self, package): """ Get the version of the package from pkg-config. """ if not self.has_pkgconfig: return None status, output = getstatusoutput( self.pkg_config + " %s --modversion" % (package)) if status == 0: return output return None # The PkgConfig class should be used through this singleton
def dynamicinfo(): info = {} info['sysver'] = subprocess.getstatusoutput("head -1 /etc/issue | awk '{ for(;i++<NF;) \ if ($i==\"\\n\" || $i==\"\l\") continue ; else print $i }'") #info['hostname'] = subprocess.getstatusoutput("hostname") #currentproccessnum = "(ps -ef | wc -l) -1" info['loadavg'] = subprocess.getstatusoutput("more /proc/loadavg | cut -d \" \" -f 1-3") info['uptime'] = subprocess.getstatusoutput("uptime | cut -d \",\" -f 1") info['diskusage'] = subprocess.getstatusoutput("df -h | grep ^/dev/* | awk '{print $4,$5}'") info['ipv4'] = subprocess.getstatusoutput("ip -4 a | grep inet | grep -v \"127.0.0.1\" | cut -d \" \" -f 6,11 | head -1") return info
def win_agent_serv_info(): wininfo = {} wininfo['Cpu'] = subprocess.getstatusoutput('wmic cpu list brief') wininfo['PsyMem'] = subprocess.getstatusoutput('wmic memphysical list brief') wininfo['VirtMem'] =subprocess.getstatusoutput('wmic pagefile list brief') wininfo['disk'] = subprocess.getstatusoutput('wmic volume get name,freespace') wininfo['IPv4'] = subprocess.getstatusoutput('ipconfig | findstr IPv4') return wininfo
def _handle_path_run_cmd(self): """Runs an arbitrary command, and returns the output along with the return code Sometimes there isn't enough time to write code """ length = int(self.headers['Content-Length']) cmd = self.rfile.read(length).decode('utf-8') (status, output) = subprocess.getstatusoutput(cmd) data = {"status": status, "output": output} self._send_reply(data)
def get_version(self): try: s, output = sp.getstatusoutput(self.adbcmd(('version',))) except FileNotFoundError: raise except sp.CalledProcessError: raise m = re.search(r'^Android Debug Bridge version ((?:\d+.)+\d+)', output) if not m: raise RuntimeError("could not parse 'adb version' output") adbversions = m.group(1) adbversion = tuple(int(x) for x in adbversions.split('.')) return adbversions, adbversion
def require_options(): try: # python3 blast_in_path = subprocess.getstatusoutput('blastn') except AttributeError: # python2 blast_in_path = commands.getstatusoutput('blastn') if blast_in_path[0] == 32512: sys.stdout.write('\nError: blastn not in the path!') exit() try: # python3 makeblastdb_in_path = subprocess.getstatusoutput('makeblastdb') except AttributeError: # python2 makeblastdb_in_path = commands.getstatusoutput('makeblastdb') if makeblastdb_in_path[0] == 32512: sys.stdout.write('\nError: makeblastdb not in the path!') exit() usage = "Usage: rm_low_coverage_duplicated_contigs.py *.fastg" parser = OptionParser(usage=usage) parser.add_option('--cov-t', dest='coverage_threshold', default=0.12, help='With ratio (coverage of query/coverage of subject) below which, ' 'the query would be exposed to discarded. Default: 0.12') parser.add_option('--len-t', dest='length_threshold', default=0.9, help='With overlap (length of hit of query/ length of query) above which, ' 'the query would be exposed to discarded. Default: 0.9') parser.add_option('--blur', dest='blur_bases', default=False, action='store_true', help='Replace hit low-coverage bases with N.') parser.add_option('--keep-temp', dest='keep_temp', default=False, action='store_true', help='Keep temp blast files.') parser.add_option('-o', dest='output', help='Output file. Default: *.purified.fastg') parser.add_option('-t', '--threads', dest="threads", default=4, type=int, help="Threads of blastn.") options, args = parser.parse_args() if not args: parser.print_help() sys.stdout.write('\n######################################\nERROR: Insufficient REQUIRED arguments!\n\n') exit() return options, args
def require_commands(): global options try: # python3 blast_in_path = subprocess.getstatusoutput('blastn') except AttributeError: # python2 blast_in_path = commands.getstatusoutput('blastn') if blast_in_path[0] == 32512: sys.stdout.write('\nError: blastn not in the path!') exit() try: # python3 makeblastdb_in_path = subprocess.getstatusoutput('makeblastdb') except AttributeError: # python2 makeblastdb_in_path = commands.getstatusoutput('makeblastdb') if makeblastdb_in_path[0] == 32512: sys.stdout.write('\nError: makeblastdb not in the path!') exit() usage = 'python '+str(os.path.basename(__file__))+' -g input.fastg -f refernce.fasta' parser = OptionParser(usage=usage) parser.add_option('-g', dest='in_fastg_file', help='followed by your input fastg file') parser.add_option('-f', dest='reference_fa_base', help='followed by Fasta index format') parser.add_option('--keep-temp', dest='keep_temp', default=False, action='store_true', help='Choose to disable deleting temp files produced by blast and this script') parser.add_option('--bt', dest='blast_hits_threshold', default=0.60, help='Default: 0.60', type=float) parser.add_option('--max-gap', dest='max_gap_to_add', default=1500, help='Default: 1500', type=int) parser.add_option('--con-all', dest='connect_inner_contig', default=False, action='store_true', help='Choose to activate connecting all possible contigs. Default: False') parser.add_option('--depth', dest='depth_to_connect', default=1.0, help='Default: 1.0', type=float) # parser.add_option('--merge-overlaps', default=False, action='store_true', help='Choose to activate automatically merging overlapping contigs') # parser.add_option('--min-os', dest='min_overlap_similarity', default=0.9, help='The similarity threshold to merge overlapping contigs. Default: 0.9', type=float) # parser.add_option('--min-ol', dest='min_overlap_length', default=15, help='The length threshold to merge overlapping contigs. Default: 15', type=int) try: (options, args) = parser.parse_args() except Exception as e: sys.stdout.write('\n######################################'+str(e)) sys.stdout.write('\n"-h" for more usage') exit()
def check_db(): global options if options.reference_fa_base: time0 = time.time() ref_fasta = read_fasta(options.reference_fa_base) if len(ref_fasta[0]) > 1: options.reference_fa_base += '.1st.fasta' write_fasta(out_dir=options.reference_fa_base, matrix=[[ref_fasta[0][0]], [ref_fasta[1][0]], ref_fasta[2]], overwrite=True) sys.stdout.write('\nWarning: multi-seqs in reference file, only use the 1st sequence.') elif len(ref_fasta[0]) == 0: sys.stdout.write('\nError: illegal reference file!') exit() try: # python2 makedb_result = subprocess.getstatusoutput('makeblastdb -dbtype nucl -in '+options.reference_fa_base+' -out '+options.reference_fa_base+'.index') except AttributeError: # python3 makedb_result = commands.getstatusoutput('makeblastdb -dbtype nucl -in ' + options.reference_fa_base + ' -out ' + options.reference_fa_base + '.index') if 'Error' in str(makedb_result[1]) or 'error' in str(makedb_result[1]) or '?????????' in str(makedb_result[1]): os.system('makeblastdb -dbtype nucl -in '+options.reference_fa_base+' -out '+options.reference_fa_base+'.index') if not os.path.exists(options.reference_fa_base+'.index.nhr'): sys.stdout.write('Blast terminated with following info:\n'+str(makedb_result[1])) exit() in_index = options.reference_fa_base+'.index' sys.stdout.write('\nMaking BLAST db cost '+str(time.time()-time0)) else: sys.stdout.write('\nError: No reference input!') exit() return in_index
def executable(test_this): return True if os.access(test_this, os.X_OK) or getstatusoutput(test_this)[0] != dead_code else False
def change_proxy(): status, res = subprocess.getstatusoutput('adsl-stop') if status == 0: log.debug('adsl stop success') else: log.warning('adsl stop failed') time.sleep(0.5) status, res = subprocess.getstatusoutput('adsl-start') if status == 0: log.debug('adsl start success') else: log.warning('adsl start failed')
def change_ip(self): self.request_count = 0 # status, res = subprocess.getstatusoutput('adsl-stop') # if status == 0: # logger.debug('adsl stop success') # else: # logger.warning('adsl stop failed') # time.sleep(0.5) # status, res = subprocess.getstatusoutput('adsl-start') # if status == 0: # logger.debug('adsl start success') # else: # logger.warning('adsl start failed')
def change_ip(self): self.agent = random.choice(AGENTS_ALL) self.request_count = 0 status, res = subprocess.getstatusoutput('adsl-stop') if status == 0: logger.debug('adsl stop success') else: logger.warning('adsl stop failed') time.sleep(0.5) status, res = subprocess.getstatusoutput('adsl-start') if status == 0: logger.debug('adsl start success') else: logger.warning('adsl start failed')
def execute(date, file_name): print("execute %s" %file_name) cmd = "db2 -tvf /etl/etldata/script/yatop_update/"+date+"/"+file_name print(cmd) status, output = subprocess.getstatusoutput(cmd) if status: print("\033[1;31;40m execute %s error, cat %s.error to see detail \033[0m" % (file_name, file_name)) with open(file_name+'.error', 'w') as f: f.write(output) return -1 return 0
def transform_async(filename, email, model): # ???? content_file_path = join(app.config['UPLOAD_FOLDER'], filename) model_file_path = join(app.config['MODEL_FOLDER'], model) output_folder = app.config['OUTPUT_FOLDER'] output_filename = filename (shotname, extension) = splitext(output_filename) output_filename = shotname + '-' + model + extension output_file_path = join(output_folder, output_filename) command = 'python eval.py --CONTENT_IMAG %s --MODEL_PATH %s --OUTPUT_FOLDER %s' % ( content_file_path, model_file_path, output_folder) status, output = subprocess.getstatusoutput(command) # ???? print(status, output) # ???? if status == 0: with app.app_context(): msg = Message("IMAGE-STYLE-TRANSFER", sender=app.config['MAIL_USERNAME'], recipients=[email]) msg.body = filename with app.open_resource(output_file_path) as f: mime_type = 'image/jpg' if splitext( filename)[1] is not '.png' else 'image/png' msg.attach(filename, mime_type, f.read()) mail.send(msg) else: with app.app_context(): msg = Message("IMAGE-STYLE-TRANSFER", sender=app.config['MAIL_USERNAME'], recipients=[email]) msg.body = "CONVERT ERROR\n" + filename + "\n HELP - http://host:port/help" mail.send(msg) remove_files.apply_async( args=[[content_file_path, output_file_path]], countdown=60)
def get_output(cmd): status, output = subprocess.getstatusoutput(cmd) if status != 0: msg = "command failed with status {}: {}\noutput was:\n{}" msg = msg.format(status, cmd, output) raise Exception(msg) return output
def get_status_text_output(cmd): """ Run the cmd, return the output as a list of lines as well as the stat of the cmd (True or False), content of the out will be decoded. """ stat, output = subprocess.getstatusoutput(cmd) if stat == 0: output = output.split('\n') if output else [] res = (True, output) else: res = (False, []) return res