我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用subprocess.PIPE。
def cmd(command): result = Result() p = Popen(shlex.split(command), stdin=PIPE, stdout=PIPE, stderr=PIPE) (stdout, stderr) = p.communicate() result.exit_code = p.returncode result.stdout = stdout result.stderr = stderr result.command = command if p.returncode != 0: print 'Error executing command [%s]' % command print 'stderr: [%s]' % stderr print 'stdout: [%s]' % stdout return result
def run_command(args, wait=False): try: if (wait): p = subprocess.Popen( args, stdout = subprocess.PIPE) p.wait() else: p = subprocess.Popen( args, stdin = None, stdout = None, stderr = None, close_fds = True) (result, error) = p.communicate() except subprocess.CalledProcessError as e: sys.stderr.write( "common::run_command() : [ERROR]: output = %s, error code = %s\n" % (e.output, e.returncode)) return result
def preprocess_source(self, in_file, additional_args=[]): import subprocess self._args.extend(self._build_compiler_flags()) self._args.extend(additional_args) result = subprocess.run(self._args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) if result.returncode == 0: return result.stdout else: if result.stderr: Style.error('Preprocess failed: ') print(result.stderr) return ''
def popen(cmd, mode="r", buffering=-1): if not isinstance(cmd, str): raise TypeError("invalid cmd type (%s, expected string)" % type(cmd)) if mode not in ("r", "w"): raise ValueError("invalid mode %r" % mode) if buffering == 0 or buffering is None: raise ValueError("popen() does not support unbuffered streams") import subprocess, io if mode == "r": proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, bufsize=buffering) return _wrap_close(io.TextIOWrapper(proc.stdout), proc) else: proc = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, bufsize=buffering) return _wrap_close(io.TextIOWrapper(proc.stdin), proc) # Helper for popen() -- a proxy for a file whose close waits for the process
def store_revision_info(src_path, output_dir, arg_string): # Get git hash gitproc = Popen(['git', 'rev-parse', 'HEAD'], stdout = PIPE, cwd=src_path) (stdout, _) = gitproc.communicate() git_hash = stdout.strip() # Get local changes gitproc = Popen(['git', 'diff', 'HEAD'], stdout = PIPE, cwd=src_path) (stdout, _) = gitproc.communicate() git_diff = stdout.strip() # Store a text file in the log directory rev_info_filename = os.path.join(output_dir, 'revision_info.txt') with open(rev_info_filename, "w") as text_file: text_file.write('arguments: %s\n--------------------\n' % arg_string) text_file.write('git hash: %s\n--------------------\n' % git_hash) text_file.write('%s' % git_diff)
def run_command(command, wait=False): try: if (wait): p = subprocess.Popen( [command], stdout = subprocess.PIPE, shell = True) p.wait() else: p = subprocess.Popen( [command], shell = True, stdin = None, stdout = None, stderr = None, close_fds = True) (result, error) = p.communicate() except subprocess.CalledProcessError as e: sys.stderr.write( "common::run_command() : [ERROR]: output = %s, error code = %s\n" % (e.output, e.returncode)) return result
def mathjax(s): with open("temp.log", "w") as f: f.write(s) p = Popen([app.config['mjpage'], '--dollars', '--output', "CommonHTML", '--fontURL', ("https://cdnjs.cloudflare.com/ajax/libs/" "mathjax/2.7.0/fonts/HTML-CSS")], stdout=PIPE, stdin=PIPE, stderr=PIPE) #filename = hashlib.sha256(s.encode('utf-8')).hexdigest() #with open(filename, 'w') as f: # print(s, file=f) res = p.communicate(input=s.encode('utf-8')) out = res[0].decode('utf-8') err = res[1].decode('utf-8') soup = BeautifulSoup(out, 'html.parser') style = str(soup.style) body = "".join(str(s) for s in soup.body.children) return style, body
def _compile_proto(full_path, dest): 'Helper to compile protobuf files' proto_path = os.path.dirname(full_path) protoc_args = [find_protoc(), '--python_out={}'.format(dest), '--proto_path={}'.format(proto_path), full_path] proc = subprocess.Popen(protoc_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) try: outs, errs = proc.communicate(timeout=5) except subprocess.TimeoutExpired: proc.kill() outs, errs = proc.communicate() return False if proc.returncode != 0: msg = 'Failed compiling "{}": \n\nstderr: {}\nstdout: {}'.format( full_path, errs.decode('utf-8'), outs.decode('utf-8')) raise BadProtobuf(msg) return True
def exec_cmd(cmd, **kwds): """ Execute arbitrary commands as sub-processes. """ stdin = kwds.get('stdin', None) stdin_flag = None if not stdin is None: stdin_flag = subprocess.PIPE proc = subprocess.Popen( cmd, stdin=stdin_flag, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = proc.communicate(stdin) return (proc.returncode, stdout, stderr) #======================================================================= # Extend the Milter Class (where the email is captured) #=======================================================================
def del_addr(linkname, address): try: subprocess.run(['ip', 'address', 'del', address, 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) return [True, str(linkname)] except subprocess.CalledProcessError as suberror: return [False, "delete address failed : %s" % suberror.stdout.decode('utf-8')] # ovs-vsctl list-br # ovs-vsctl br-exists <Bridge> # ovs-vsctl add-br <Bridge> # ovs-vsctl del-br <Bridge> # ovs-vsctl list-ports <Bridge> # ovs-vsctl del-port <Bridge> <Port> # ovs-vsctl add-port <Bridge> <Port> -- set interface <Port> type=gre options:remote_ip=<RemoteIP> # ovs-vsctl add-port <Bridge> <Port> tag=<ID> -- set interface <Port> type=internal # ovs-vsctl port-to-br <Port> # ovs-vsctl set Port <Port> tag=<ID> # ovs-vsctl clear Port <Port> tag
def epubcheck_help(): """Return epubcheck.jar commandline help text. :return unicode: helptext from epubcheck.jar """ # tc = locale.getdefaultlocale()[1] with open(os.devnull, "w") as devnull: p = subprocess.Popen( [c.JAVA, '-Duser.language=en', '-jar', c.EPUBCHECK, '-h'], stdout=subprocess.PIPE, stderr=devnull, ) result = p.communicate()[0] return result.decode()
def run_command_with_code(self, cmd, redirect_output=True, check_exit_code=True): """Runs a command in an out-of-process shell. Returns the output of that command. Working directory is self.root. """ if redirect_output: stdout = subprocess.PIPE else: stdout = None proc = subprocess.Popen(cmd, cwd=self.root, stdout=stdout) output = proc.communicate()[0] if check_exit_code and proc.returncode != 0: self.die('Command "%s" failed.\n%s', ' '.join(cmd), output) return (output, proc.returncode)
def filter_region(view, txt, command): try: contents = tempfile.NamedTemporaryFile(suffix='.txt', delete=False) contents.write(txt.encode('utf-8')) contents.close() script = tempfile.NamedTemporaryFile(suffix='.bat', delete=False) script.write(('@echo off\ntype %s | %s' % (contents.name, command)).encode('utf-8')) script.close() p = subprocess.Popen([script.name], stdout=PIPE, stderr=PIPE, startupinfo=get_startup_info()) out, err = p.communicate() return (out or err).decode(get_oem_cp()).replace('\r\n', '\n')[:-1].strip() finally: os.remove(script.name) os.remove(contents.name)
def test_loadSADFile_startorder(self): maxpid=32768 try: out=subprocess.Popen(['cat', '/proc/sys/kernel/pid_max'], stdout=subprocess.PIPE) res=out.communicate() maxpid=int(res[0].strip()) except: pass retval = sb.loadSADFile('sdr/dom/waveforms/ticket_462_w/ticket_462_w.sad.xml') self.assertEquals(retval, True) comp_ac = sb.getComponent('ticket_462_ac_1') self.assertNotEquals(comp_ac, None) comp = sb.getComponent('ticket_462_1') self.assertNotEquals(comp, None) if comp_ac._pid <= maxpid-1: isless= comp_ac._pid < comp._pid else: isless=comp._pid < comp_ac._pid self.assertTrue(isless)
def test_UserOrGroupNoDaemon(self): """Test that we read the correct domainname from the DMD file, the test domain should have been created by the test runner""" domainName = scatest.getTestDomainName() # Test that we don't already have a bound domain try: domMgr = self._root.resolve(scatest.getDomainMgrURI()) self.assertEqual(domMgr, None) except CosNaming.NamingContext.NotFound: pass # This exception is expected args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--user','domuser','--group','somegroup' ] nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE) self.assertNotEqual(nb.stderr.read().find('If either group or user are specified, daemon must be set'),-1) args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--group','somegroup' ] nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE) self.assertNotEqual(nb.stderr.read().find('If either group or user are specified, daemon must be set'),-1) args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--user','domuser' ] nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE) self.assertNotEqual(nb.stderr.read().find('If either group or user are specified, daemon must be set'),-1)
def test_BadUserOrBadGroup(self): """Test that we read the correct domainname from the DMD file, the test domain should have been created by the test runner""" domainName = scatest.getTestDomainName() # Test that we don't already have a bound domain try: domMgr = self._root.resolve(scatest.getDomainMgrURI()) self.assertEqual(domMgr, None) except CosNaming.NamingContext.NotFound: pass # This exception is expected args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--user=domuser'] nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE) self.assertNotEqual(nb.stderr.read().find('Separator must be a space'),-1) args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--group=somegroup'] nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE) self.assertNotEqual(nb.stderr.read().find('Separator must be a space'),-1)
def print_card(pdf, printer_name): """ Send the PDF to the printer! Shells out to `lpr` to do the work. :param pdf: Binary PDF buffer :param printer_name: Name of the printer on the system (ie. CUPS name) """ process = subprocess.Popen( ['lpr', '-P', printer_name], stdin=subprocess.PIPE ) process.communicate(pdf) if process.returncode != 0: raise PrintingError('Return code {}'.format(process.returncode))
def pipe_weighted_edgelist_to_convert(matrix, bin_filename, weight_filename): """ Pipe a weighted edgelist (COO sparse matrix) to Louvain's convert utility """ raise ValueError('Unsupported method at the moment') devnull = open(os.devnull, 'w') proc = subprocess.Popen([LOUVAIN_CONVERT_BINPATH, '-i', '/dev/stdin', '-o', bin_filename, '-w', weight_filename, ], stdin=subprocess.PIPE, stdout=devnull, stderr=devnull) # Stream text triplets to 'convert' for ijx in itertools.izip(matrix.row, matrix.col, matrix.data): proc.stdin.write('%d\t%d\t%f\n' % ijx) proc.stdin.close() proc.wait() devnull.close()
def pipe_unweighted_edgelist_to_convert(matrix, bin_filename): """ Pipe an unweighted edgelist (COO sparse matrix) to Louvain's convert utility """ devnull = open(os.devnull, 'w') proc = subprocess.Popen([LOUVAIN_CONVERT_BINPATH, '-i', '/dev/stdin', '-o', bin_filename, ], stdin=subprocess.PIPE, stdout=devnull, stderr=devnull) # Stream text triplets to 'convert' for ij in itertools.izip(matrix.row, matrix.col): proc.stdin.write('%d\t%d\n' % ij) proc.stdin.close() proc.wait() devnull.close()
def read_fastq(self): if self.file[-2:] == "gz": proc = subprocess.Popen(["gunzip", "--stdout", self.file], stdout=subprocess.PIPE) reader = proc.stdout else: reader = file(self.file, "r") while True: header = reader.next().strip() seq = reader.next().strip() reader.next() # incr line qual = reader.next().strip() if self.rc: seq = tk_seq.get_rev_comp(seq) qual = qual[::-1] yield FastqRow(header, seq, qual) reader.close()
def add_to_vcs(self, summary): if ( self.git_add and (SyncStatus.DELETED in summary or SyncStatus.ADDED in summary) and not self.dry_run and self.confirm( question=( 'Do you want to add created and removed files to GIT?' ) ) ): output, errors = subprocess.Popen( ['git', '-C', app_settings.SYNC_DIRECTORY, 'add', '-A', app_settings.SYNC_DIRECTORY], stdout=subprocess.PIPE, stderr=subprocess.PIPE ).communicate() if errors: raise self.error('Adding file changes to GIT failed!')
def get_audio_streams(self): with open(os.devnull, 'w') as DEV_NULL: #Get file info and Parse it try: proc = subprocess.Popen([ FFPROBE, '-i', self.input_video, '-of', 'json', '-show_streams' ], stdout=subprocess.PIPE, stderr=DEV_NULL) except OSError as e: if e.errno == os.errno.ENOENT: Logger.error("FFPROBE not found, install on your system to use this script") sys.exit(0) output = proc.stdout.read() return get_audio_streams(json.loads(output))
def build_mkdocs(self): """ Invokes MkDocs to build the static documentation and moves the folder into the project root folder. """ # Setting the working directory os.chdir(MKDOCS_DIR) # Building the MkDocs project pipe = subprocess.PIPE mkdocs_process = subprocess.Popen( ["mkdocs", "build", "-q"], stdout=pipe, stderr=pipe) std_op, std_err_op = mkdocs_process.communicate() if std_err_op: raise Error("Could not build MkDocs !\n%s" % std_err_op)
def goglib_get_games_list(): proc = subprocess.Popen(['lgogdownloader', '--exclude', \ '1,2,4,8,16,32', '--list-details'],stdout=subprocess.PIPE) games_detailed_list = proc.stdout.readlines() stdoutdata, stderrdata = proc.communicate() if proc.returncode == 0: file_path = os.getenv('HOME') + '/.games_nebula/config/games_list' games_list_file = open(file_path, 'w') for line in games_detailed_list: if 'Getting game info' not in line: games_list_file.write(line) return 0 else: return 1
def win64_available(self): wine_bin, \ wineserver_bin, \ wine_lib = self.get_wine_bin_path() dev_null = open(os.devnull, 'w') try: proc = subprocess.Popen([wine_bin + '64'], stdout=dev_null, \ stdin=subprocess.PIPE, stderr=subprocess.STDOUT) dev_null.close() stdoutdata, stderrdata = proc.communicate() if proc.returncode == 1: self.combobox_winearch.set_visible(True) return True else: self.combobox_winearch.set_visible(False) self.winearch = 'win32' return False except: self.combobox_winearch.set_visible(False) self.winearch = 'win32' return False
def execute_task(host, tasks): """Call Fabric to execute tasks against a host Return: CompletedProcess instance """ # TODO: add support for groups, multiple hosts # TODO: add support for providing input data from team files return subprocess.run( [ PYTHON2_PATH, FABRIC_PATH, '--abort-on-prompts', '--hosts=%s' % host, '--fabfile=%s' % FABFILE_PATH, *tasks, ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # Combine out/err into stdout; stderr will be None universal_newlines=True, check=True, )
def index(): if request.args.get('code'): unlock_code = request.args.get('code') # unlock, new password re = s.query(RecoveryEmail).filter(RecoveryEmail.password_code==unlock_code).one_or_none() if re: jid = re.jid re.password_code = None s.merge(re) s.commit() # set new password and send email email_address = re.email password = ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for _ in range(10)) p = subprocess.Popen(['/usr/bin/prosodyctl', 'passwd', jid], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT) args = bytes("%s\n%s\n" % (password, password), encoding='utf8') p.communicate(args) sendMail(email_address, 'new password', password) content = render_template('success.html', message='password was sent') else: content = render_template('error.html', message='link invalid') else: content = render_template('index.html') return content
def output_articles(articles): if len(articles) == 0: print('No articles found') return try: pager = subprocess.Popen(['less'], stdin=subprocess.PIPE, stdout=sys.stdout) for article in articles: if int(article['reading_time']) <= 0: article['reading_time'] = 'Unknown' content = format_article(article, line=True) if six.PY3: content = bytearray(content, 'utf-8') pager.stdin.write(content) pager.stdin.close() pager.wait() except (KeyboardInterrupt, ValueError): pass
def spawn(self, cmd, stdin_content="", stdin=False, shell=False, timeout=2): """ Spawn a new process using subprocess """ try: if type(cmd) != list: raise PJFInvalidType(type(cmd), list) if type(stdin_content) != str: raise PJFInvalidType(type(stdin_content), str) if type(stdin) != bool: raise PJFInvalidType(type(stdin), bool) self._in = stdin_content try: self.process = subprocess.Popen(cmd, stdout=PIPE, stderr=PIPE, stdin=PIPE, shell=shell) self.finish_read(timeout, stdin_content, stdin) if self.process.poll() is not None: self.close() except KeyboardInterrupt: return except OSError: raise PJFProcessExecutionError("Binary <%s> does not exist" % cmd[0]) except Exception as e: raise PJFBaseException(e.message if hasattr(e, "message") else str(e))
def _upload_artifacts_to_version(self): """Recursively upload directory contents to S3.""" if not os.listdir(self.artifact_path) or not self.artifact_path: raise S3ArtifactNotFound uploaded = False if self.s3props.get("content_metadata"): LOG.info("Uploading in multiple parts to set metadata") uploaded = self.content_metadata_uploads() if not uploaded: cmd = 'aws s3 sync {} {} --delete --exact-timestamps --profile {}'.format(self.artifact_path, self.s3_version_uri, self.env) result = subprocess.run(cmd, check=True, shell=True, stdout=subprocess.PIPE) LOG.debug("Upload Command Ouput: %s", result.stdout) LOG.info("Uploaded artifacts to %s bucket", self.bucket)
def _sync_to_uri(self, uri): """Copy and sync versioned directory to uri in S3. Args: uri (str): S3 URI to sync version to. """ cmd_cp = 'aws s3 cp {} {} --recursive --profile {}'.format(self.s3_version_uri, uri, self.env) # AWS CLI sync does not work as expected bucket to bucket with exact timestamp sync. cmd_sync = 'aws s3 sync {} {} --delete --exact-timestamps --profile {}'.format( self.s3_version_uri, uri, self.env) cp_result = subprocess.run(cmd_cp, check=True, shell=True, stdout=subprocess.PIPE) LOG.debug("Copy to %s before sync output: %s", uri, cp_result.stdout) LOG.info("Copied version %s to %s", self.version, uri) sync_result = subprocess.run(cmd_sync, check=True, shell=True, stdout=subprocess.PIPE) LOG.debug("Sync to %s command output: %s", uri, sync_result.stdout) LOG.info("Synced version %s to %s", self.version, uri)
def spawn(self, lines, additional_args = [ '-p', ''], width = None): (mouse_x, mouse_y) = get_mouse_location() if not width: width = 100 # some default width width = max(width, 101) # width has to be 100 at least (rofi restriction) # first, compute the top left corner of the menu menu_x = min(max(mouse_x - width/2, self.x), self.x + self.panel_width - width) menu_y = self.y # then, specify these coordinates relative to the mouse cursor menu_x -= mouse_x menu_y -= mouse_y # compile rofi arguments cmd = ['rofi', '-dmenu', '-sep' , '\\0' ] cmd += ['-monitor', '-3' ] # position relative to mouse cursor cmd += ['-layout', '1' ] # specify top left corner of the menu cmd += ['-width', str(width) ] cmd += ['-xoffset', str(menu_x), '-yoffset', str(menu_y) ] cmd += self.rofi_args cmd += additional_args rofi = subprocess.Popen(cmd,stdout=subprocess.PIPE,stdin=subprocess.PIPE) for i in lines: rofi.stdin.write(i.encode('utf-8')) rofi.stdin.write(struct.pack('B', 0)) rofi.stdin.close() rofi.wait()
def run_app(please_stop, server_is_ready): proc = subprocess.Popen( ["python", "active_data\\app.py", "--settings", "tests/config/elasticsearch.json"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=-1 #creationflags=CREATE_NEW_PROCESS_GROUP ) while not please_stop: line = proc.stdout.readline() if not line: continue if line.find(" * Running on") >= 0: server_is_ready.go() Log.note("SERVER: {{line}}", {"line": line.strip()}) proc.send_signal(signal.CTRL_C_EVENT) # read_alternate_settings
def pquery(command, stdin=None, **kwargs): if very_verbose: info('Query "'+' '.join(command)+'" in '+getcwd()) try: proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs) except OSError as e: if e[0] == errno.ENOENT: error( "Could not execute \"%s\".\n" "Please verify that it's installed and accessible from your current path by executing \"%s\".\n" % (command[0], command[0]), e[0]) else: raise e stdout, _ = proc.communicate(stdin) if very_verbose: log(str(stdout).strip()+"\n") if proc.returncode != 0: raise ProcessException(proc.returncode, command[0], ' '.join(command), getcwd()) return stdout
def test_example_manuscript(manuscript): """ Test command line execution of manubot to build an example manuscript. """ manuscript_dir = directory.joinpath('manuscripts', manuscript) args = [ 'manubot', '--log-level', 'INFO', '--content-directory', manuscript_dir.joinpath('content'), '--output-directory', manuscript_dir.joinpath('output'), ] if manuscript == 'variables': args.extend([ '--template-variables-path', manuscript_dir.joinpath('content/template-variables.json'), ]) process = subprocess.run( args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) print(process.args) print(process.stderr.decode()) assert process.returncode == 0
def communicate(input, coro=None): if platform.system() == 'Windows': # asyncfile.Popen must be used instead of subprocess.Popen pipe = asyncoro.asyncfile.Popen([r'\cygwin64\bin\sha1sum.exe'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) else: pipe = subprocess.Popen(['sha1sum'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) # convert pipe to asynchronous version async_pipe = asyncoro.asyncfile.AsyncPipe(pipe) # 'communicate' takes either the data or file descriptor with data # (if file is too large to read in full) as input input = open(input) stdout, stderr = yield async_pipe.communicate(input) print('communicate sha1sum: %s' % stdout)
def custom_feeder(input, coro=None): def write_proc(fin, pipe, coro=None): while True: data = yield os.read(fin.fileno(), 8*1024) if not data: break n = yield pipe.write(data, full=True) assert n == len(data) fin.close() pipe.stdin.close() def read_proc(pipe, coro=None): # output from sha1sum is small, so read until EOF data = yield pipe.stdout.read() pipe.stdout.close() raise StopIteration(data) if platform.system() == 'Windows': # asyncfile.Popen must be used instead of subprocess.Popen pipe = asyncoro.asyncfile.Popen([r'\cygwin64\bin\sha1sum.exe'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) else: pipe = subprocess.Popen(['sha1sum'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) async_pipe = asyncoro.asyncfile.AsyncPipe(pipe) reader = asyncoro.Coro(read_proc, async_pipe) writer = asyncoro.Coro(write_proc, open(input), async_pipe) stdout = yield reader.finish() print(' feeder sha1sum: %s' % stdout) # asyncoro.logger.setLevel(asyncoro.Logger.DEBUG) # simpler version using 'communicate'
def execute(cmd, success_msg="", failure_msg="", exitcode=-1): """ Generic wrapper to execute the CLI commands. Returns Output if success. On success it can print message in stdout if specified. On failure, exits after writing to stderr. """ p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate() if p.returncode == 0: if success_msg: output_ok(success_msg) return out else: err_msg = err if err else out output_notok(failure_msg, err=err_msg, exitcode=exitcode)
def get_glusterd_workdir(): """ Command to get Glusterd working dir. If failed returns the default directory /var/lib/glusterd """ p = subprocess.Popen(["gluster", "system::", "getwd"], stdout=subprocess.PIPE) out, _ = p.communicate() if p.returncode == 0: return out.strip() else: return DEFAULT_GLUSTERD_WORKDIR
def _get_gitinfo(): import pygrunt.platform as platform import subprocess from pathlib import Path git = platform.current.find_executable('git') if git is None: # No git installed; assume we're on master return ('master', '') cwd = str(Path(__file__).parent) args = [git, 'rev-parse', '--abbrev-ref', 'HEAD'] result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, cwd=cwd, universal_newlines=True) if result.returncode != 0: # Quietly return defaults on fail return ('master', '') branch = result.stdout args = [git, 'rev-parse', 'HEAD'] result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, cwd=cwd, universal_newlines=True) if result.returncode != 0: # Quietly return defaults on fail return ('master', '') commit = result.stdout return (branch.strip(), commit.strip())
def compile_object(self, in_file, out_file): import subprocess in_file = Path(in_file) out_file = Path(out_file) # Skip compile if RecompileStrategy says so # Since preprocess_source ( possibly used by recompile ) also modifies self._args, # we gotta back it up # TODO: Maybe use something more elegant than self._args? old_args = self._args if out_file.is_file(): if not self.recompile.should_recompile(str(in_file)): # Style.info('Nothing to do with', in_file) return True self._args = old_args Path(out_file).parent.mkdir(parents=True, exist_ok=True) self._args.extend(self._build_compiler_flags()) result = subprocess.run(self._args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) # TODO: do something useful with output if result.stdout: print(result.stdout) if result.stderr: print(result.stderr) return result.returncode == 0
def parse_tox(self): proc = subprocess.Popen( "tox -l", shell=True, stdout=subprocess.PIPE, cwd=self.cwd) self.tox_lines = proc.stdout.read().strip().split('\n') self.parse_python_versions()
def run_command(command): try: subp = subprocess.Popen(command,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) subp_output, errors = subp.communicate() if not errors: if subp_output == '': return '[+] Command successfully executed.\n' else: return subp_output return "[!] {}".format(errors) except KeyboardInterrupt: print "Terminated command."
def check_dep(): dep = subprocess.Popen('wmic OS get DataExecutionPrevention_SupportPolicy',shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) dep_mode, errors = dep.communicate() if not errors and dep_mode: if "0" in dep_mode: return "DEP is off for the whole system." elif "1" in dep_mode: return "Full DEP coverage for the whole system with no exceptions." elif "2" in dep_mode: return "DEP is limited to Windows system binaries." elif "3" in dep_mode: return "DEP is on for all programs and services." else: return errors
def checklocalfw(): print("Getting Windows Built in Firewall configuration...") fw = subprocess.Popen('netsh advfirewall show all state',shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) fw_mode, errors = fw.communicate() if not errors and fw_mode: return fw_mode else: return errros