我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用subprocess.STDOUT。
def service_running(service_name): """Determine whether a system service is running""" if init_is_systemd(): return service('is-active', service_name) else: try: output = subprocess.check_output( ['service', service_name, 'status'], stderr=subprocess.STDOUT).decode('UTF-8') except subprocess.CalledProcessError: return False else: # This works for upstart scripts where the 'service' command # returns a consistent string to represent running 'start/running' if ("start/running" in output or "is running" in output or "up and running" in output): return True # Check System V scripts init script return codes if service_name in systemv_services_running(): return True return False
def convert_image(inpath, outpath, size): """Convert an image file using `sips`. Args: inpath (str): Path of source file. outpath (str): Path to destination file. size (int): Width and height of destination image in pixels. Raises: RuntimeError: Raised if `sips` exits with non-zero status. """ cmd = [ b'sips', b'-z', b'{0}'.format(size), b'{0}'.format(size), inpath, b'--out', outpath] # log().debug(cmd) with open(os.devnull, 'w') as pipe: retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT) if retcode != 0: raise RuntimeError('sips exited with {0}'.format(retcode))
def is_crm_dc(): """ Determine leadership by querying the pacemaker Designated Controller """ cmd = ['crm', 'status'] try: status = subprocess.check_output(cmd, stderr=subprocess.STDOUT) if not isinstance(status, six.text_type): status = six.text_type(status, "utf-8") except subprocess.CalledProcessError as ex: raise CRMDCNotFound(str(ex)) current_dc = '' for line in status.split('\n'): if line.startswith('Current DC'): # Current DC: juju-lytrusty-machine-2 (168108163) - partition with quorum current_dc = line.split(':')[1].split()[0] if current_dc == get_unit_hostname(): return True elif current_dc == 'NONE': raise CRMDCNotFound('Current DC: NONE') return False
def compiler_is_clang(comp) : print("check for clang compiler ...", end=' ') try: cc_output = subprocess.check_output(comp+['--version'], stderr = subprocess.STDOUT, shell=False) except OSError as ex: print("compiler test call failed with error {0:d} msg: {1}".format(ex.errno, ex.strerror)) print("no") return False ret = re.search(b'clang', cc_output) is not None if ret : print("yes") else: print("no") return ret
def del_addr(linkname, address): try: subprocess.run(['ip', 'address', 'del', address, 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) return [True, str(linkname)] except subprocess.CalledProcessError as suberror: return [False, "delete address failed : %s" % suberror.stdout.decode('utf-8')] # ovs-vsctl list-br # ovs-vsctl br-exists <Bridge> # ovs-vsctl add-br <Bridge> # ovs-vsctl del-br <Bridge> # ovs-vsctl list-ports <Bridge> # ovs-vsctl del-port <Bridge> <Port> # ovs-vsctl add-port <Bridge> <Port> -- set interface <Port> type=gre options:remote_ip=<RemoteIP> # ovs-vsctl add-port <Bridge> <Port> tag=<ID> -- set interface <Port> type=internal # ovs-vsctl port-to-br <Port> # ovs-vsctl set Port <Port> tag=<ID> # ovs-vsctl clear Port <Port> tag
def is_crm_leader(resource, retry=False): """ Returns True if the charm calling this is the elected corosync leader, as returned by calling the external "crm" command. We allow this operation to be retried to avoid the possibility of getting a false negative. See LP #1396246 for more info. """ if resource == DC_RESOURCE_NAME: return is_crm_dc() cmd = ['crm', 'resource', 'show', resource] try: status = subprocess.check_output(cmd, stderr=subprocess.STDOUT) if not isinstance(status, six.text_type): status = six.text_type(status, "utf-8") except subprocess.CalledProcessError: status = None if status and get_unit_hostname() in status: return True if status and "resource %s is NOT running" % (resource) in status: raise CRMResourceNotFound("CRM resource %s not found" % (resource)) return False
def get_bcl2fastq_v2(hostname): try: subprocess.check_call(["which", "bcl2fastq"]) # Restore the LD_LIBRARY_PATH set aside by sourceme.bash/shell10x. # Required for some installations of bcl2fastq. new_environ = dict(os.environ) new_environ['LD_LIBRARY_PATH'] = os.environ.get('_TENX_LD_LIBRARY_PATH', '') output = subprocess.check_output(["bcl2fastq", "--version"], env=new_environ, stderr=subprocess.STDOUT) match = None for l in output.split("\n"): match = re.match("bcl2fastq v([0-9.]+)", l) if match is not None: return (match.groups()[0], None) return (None, "bcl2fastq version not recognized -- please check the output of bcl2fastq --version") except subprocess.CalledProcessError: msg = "On machine: %s, bcl2fastq not found on PATH." % hostname return (None, msg)
def launch(self, cfg, path, flags): logging.debug("Determine the OS and Architecture this application is currently running on") hostOS = platform.system().lower() logging.debug("hostOS: " + str(hostOS)) is_64bits = sys.maxsize > 2 ** 32 if is_64bits: hostArchitecture = 'x64' else: hostArchitecture = 'ia32' logging.debug("hostArchitecture: " + str(hostArchitecture)) if(self.validateConfig(cfg, hostOS, hostArchitecture)): fnull = open(os.devnull, 'w') if os.environ.get("WPW_HOME") is not None: cmd = [os.environ["WPW_HOME"] + '/bin/rpc-agent-' + platform.system().lower() + '-' + self.detectHostArchitecture()] else: cmd = [path + '/wpwithinpy/iot-core-component/bin/rpc-agent-' + platform.system().lower() + '-' + self.detectHostArchitecture()] cmd.extend(flags) proc = subprocess.Popen(cmd, stdin=None, stdout=fnull, stderr=subprocess.STDOUT) return proc else: logging.debug("Invalid OS/Architecture combination detected")
def win64_available(self): wine_bin, \ wineserver_bin, \ wine_lib = self.get_wine_bin_path() dev_null = open(os.devnull, 'w') try: proc = subprocess.Popen([wine_bin + '64'], stdout=dev_null, \ stdin=subprocess.PIPE, stderr=subprocess.STDOUT) dev_null.close() stdoutdata, stderrdata = proc.communicate() if proc.returncode == 1: self.combobox_winearch.set_visible(True) return True else: self.combobox_winearch.set_visible(False) self.winearch = 'win32' return False except: self.combobox_winearch.set_visible(False) self.winearch = 'win32' return False
def execute_task(host, tasks): """Call Fabric to execute tasks against a host Return: CompletedProcess instance """ # TODO: add support for groups, multiple hosts # TODO: add support for providing input data from team files return subprocess.run( [ PYTHON2_PATH, FABRIC_PATH, '--abort-on-prompts', '--hosts=%s' % host, '--fabfile=%s' % FABFILE_PATH, *tasks, ], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # Combine out/err into stdout; stderr will be None universal_newlines=True, check=True, )
def index(): if request.args.get('code'): unlock_code = request.args.get('code') # unlock, new password re = s.query(RecoveryEmail).filter(RecoveryEmail.password_code==unlock_code).one_or_none() if re: jid = re.jid re.password_code = None s.merge(re) s.commit() # set new password and send email email_address = re.email password = ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for _ in range(10)) p = subprocess.Popen(['/usr/bin/prosodyctl', 'passwd', jid], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT) args = bytes("%s\n%s\n" % (password, password), encoding='utf8') p.communicate(args) sendMail(email_address, 'new password', password) content = render_template('success.html', message='password was sent') else: content = render_template('error.html', message='link invalid') else: content = render_template('index.html') return content
def run_app(please_stop, server_is_ready): proc = subprocess.Popen( ["python", "active_data\\app.py", "--settings", "tests/config/elasticsearch.json"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=-1 #creationflags=CREATE_NEW_PROCESS_GROUP ) while not please_stop: line = proc.stdout.readline() if not line: continue if line.find(" * Running on") >= 0: server_is_ready.go() Log.note("SERVER: {{line}}", {"line": line.strip()}) proc.send_signal(signal.CTRL_C_EVENT) # read_alternate_settings
def _install(self, pkg): # to have a unified string which we can query # we need to execute the command with LANG=en_US.UTF-8 cmd = 'LANG=en_US.UTF-8 yaourt --needed --noconfirm -S {}'.format(pkg) self._log.info("Installing \"{}\". Please wait...".format(pkg)) # needed to avoid conflicts due to locking time.sleep(1) proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) out = proc.stdout.read() proc.stdout.close() for item in self._strings.keys(): if out.decode("utf-8").find(self._strings[item]) >= 0: return item self._log.warning("Could not determine what happened with package {}".format(pkg)) return PkgStatus.NOT_SURE
def invoke(command, success_codes=(0,)): try: output = subprocess.check_output(command, stderr=subprocess.STDOUT) status = 0 except subprocess.CalledProcessError as error: output = error.output status = error.returncode output = output.decode('utf-8') if status not in success_codes: raise Exception( 'Command %r return exit code %d and output: """%s""".' % ( command, status, output, ) ) return status, output
def get_verifier_id(): """ Returns verifier id for current Tempest """ create_rally_deployment() create_verifier() cmd = ("rally verify list-verifiers | awk '/" + CONST.__getattribute__('tempest_verifier_name') + "/ {print $2}'") p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) deployment_uuid = p.stdout.readline().rstrip() if deployment_uuid == "": logger.error("Tempest verifier not found.") raise Exception('Error with command:%s' % cmd) return deployment_uuid
def run_defcore_default(self): """Run default defcore sys command.""" options = ["-v"] if not self.insecure else ["-v", self.insecure] cmd = (["refstack-client", "test", "-c", self.confpath] + options + ["--test-list", self.defcorelist]) LOGGER.info("Starting Refstack_defcore test case: '%s'.", cmd) with open(os.path.join(conf_utils.REFSTACK_RESULTS_DIR, "environment.log"), 'w+') as f_env: f_env.write( ("Refstack environment:\n" " SUT: {}\n Scenario: {}\n Node: {}\n Date: {}\n").format( CONST.__getattribute__('INSTALLER_TYPE'), CONST.__getattribute__('DEPLOY_SCENARIO'), CONST.__getattribute__('NODE_NAME'), time.strftime("%a %b %d %H:%M:%S %Z %Y"))) with open(os.path.join(conf_utils.REFSTACK_RESULTS_DIR, "refstack.log"), 'w+') as f_stdout: subprocess.call(cmd, shell=False, stdout=f_stdout, stderr=subprocess.STDOUT)
def popen4(cmd, mode="t", bufsize=-1): """Execute the shell command 'cmd' in a sub-process. On UNIX, 'cmd' may be a sequence, in which case arguments will be passed directly to the program without shell intervention (as with os.spawnv()). If 'cmd' is a string it will be passed to the shell (as with os.system()). If 'bufsize' is specified, it sets the buffer size for the I/O pipes. The file objects (child_stdin, child_stdout_stderr) are returned.""" import warnings msg = "os.popen4 is deprecated. Use the subprocess module." warnings.warn(msg, DeprecationWarning, stacklevel=2) import subprocess PIPE = subprocess.PIPE p = subprocess.Popen(cmd, shell=isinstance(cmd, basestring), bufsize=bufsize, stdin=PIPE, stdout=PIPE, stderr=subprocess.STDOUT, close_fds=True) return p.stdin, p.stdout
def start_cfssl(cmdline=""): global cfsslproc cmd = "cfssl serve -loglevel=1 %s "%cmdline env = os.environ.copy() env['PATH']=env['PATH']+":/usr/local/bin" # make sure cfssl isn't running os.system('pkill -f cfssl') cfsslproc = subprocess.Popen(cmd,env=env,shell=True,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,universal_newlines=True) if cfsslproc.returncode is not None: raise Exception("Unable to launch %: failed with code "%(cmd,cfsslproc.returncode)) logger.debug("Waiting for cfssl to start...") while True: line = cfsslproc.stdout.readline() if "Now listening on" in line: break time.sleep(0.2)# give cfssl a little more time to get started logger.debug("cfssl started successfully")
def designPrimers(p3_args, input_log=None, output_log=None, err_log=None): ''' Return the raw primer3_core output for the provided primer3 args. Returns an ordered dict of the boulderIO-format primer3 output file ''' sp = subprocess.Popen([pjoin(PRIMER3_HOME, 'primer3_core')], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT) p3_args.setdefault('PRIMER_THERMODYNAMIC_PARAMETERS_PATH', pjoin(PRIMER3_HOME, 'primer3_config/')) in_str = _formatBoulderIO(p3_args) if input_log: input_log.write(in_str) input_log.flush() out_str, err_str = sp.communicate(input=in_str) if output_log: output_log.write(out_str) output_log.flush() if err_log and err_str is not None: err_log.write(err_str) err_log.flush() return _parseBoulderIO(out_str)
def call(self, args, devnull=False): """Call other processes. args - list of command args devnull - whether to pipe stdout to /dev/null (or equivalent) """ if self.debug: click.echo(subprocess.list2cmdline(args)) click.confirm('Continue?', default=True, abort=True) try: kwargs = {} if devnull: # Pipe to /dev/null (or equivalent). kwargs['stderr'] = subprocess.STDOUT kwargs['stdout'] = self.FNULL ret_code = subprocess.call(args, **kwargs) except subprocess.CalledProcessError: return False return ret_code
def _start_from_profile_path(self, path): self._firefox_env["XRE_PROFILE_PATH"] = path if platform.system().lower() == 'linux': self._modify_link_library_path() command = [self._start_cmd, "-silent"] if self.command_line is not None: for cli in self.command_line: command.append(cli) # The following exists upstream and is known to create hanging # firefoxes, leading to zombies. # subprocess.Popen(command, stdout=self._log_file, # stderr=subprocess.STDOUT, # env=self._firefox_env).communicate() command[1] = '-foreground' self.process = subprocess.Popen( command, stdout=self._log_file, stderr=subprocess.STDOUT, env=self._firefox_env)
def __init__(self, project, arguments=None, stdout="file", stdin=None, timeout=10.0, name=None): if not name: name = "process:%s" % basename(arguments[0]) ProjectAgent.__init__(self, project, name) self.env = Environment(self) if arguments is None: arguments = [] self.cmdline = CommandLine(self, arguments) self.timeout = timeout self.max_memory = 100*1024*1024 self.stdout = stdout self.popen_args = { 'stderr': STDOUT, } if stdin is not None: if stdin == "null": self.popen_args['stdin'] = open('/dev/null', 'r') else: raise ValueError("Invalid stdin value: %r" % stdin)
def test_should_trigger_on_connect_if_client_connect_valid(server_with_mocks): node_script = ''' module.paths.push('{0}') WebSocket = require('ws') const SubscriptionClient = require('subscriptions-transport-ws').SubscriptionClient new SubscriptionClient('ws://localhost:{1}/socket') '''.format( os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) try: subprocess.check_output( ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) except: mock = server_with_mocks.get_nowait() assert mock.name == 'on_connect' mock.assert_called_once()
def test_should_trigger_on_connect_with_correct_cxn_params(server_with_mocks): node_script = ''' module.paths.push('{0}') WebSocket = require('ws') const SubscriptionClient = require('subscriptions-transport-ws').SubscriptionClient const connectionParams = {{test: true}} new SubscriptionClient('ws://localhost:{1}/socket', {{ connectionParams, }}) '''.format( os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT) try: subprocess.check_output( ['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2) except: mock = server_with_mocks.get_nowait() assert mock.name == 'on_connect' mock.assert_called_once() mock.assert_called_with({'test': True})
def interface_exists(interface): ''' Checks if interface exists on node. ''' try: subprocess.check_call(['ip', 'link', 'show', interface], stdout=open(os.devnull, 'w'), stderr=subprocess.STDOUT) except subprocess.CalledProcessError: return False return True
def systemv_services_running(): output = subprocess.check_output( ['service', '--status-all'], stderr=subprocess.STDOUT).decode('UTF-8') return [row.split()[-1] for row in output.split('\n') if '[ + ]' in row]
def service_available(service_name): """Determine whether a system service is available""" try: subprocess.check_output( ['service', service_name, 'status'], stderr=subprocess.STDOUT).decode('UTF-8') except subprocess.CalledProcessError as e: return b'unrecognized service' not in e.output else: return True
def _call_security(self, action, service, account, *args): """Call ``security`` CLI program that provides access to keychains. May raise `PasswordNotFound`, `PasswordExists` or `KeychainError` exceptions (the first two are subclasses of `KeychainError`). :param action: The ``security`` action to call, e.g. ``add-generic-password`` :type action: ``unicode`` :param service: Name of the service. :type service: ``unicode`` :param account: name of the account the password is for, e.g. "Pinboard" :type account: ``unicode`` :param password: the password to secure :type password: ``unicode`` :param *args: list of command line arguments to be passed to ``security`` :type *args: `list` or `tuple` :returns: ``(retcode, output)``. ``retcode`` is an `int`, ``output`` a ``unicode`` string. :rtype: `tuple` (`int`, ``unicode``) """ cmd = ['security', action, '-s', service, '-a', account] + list(args) p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) stdout, _ = p.communicate() if p.returncode == 44: # password does not exist raise PasswordNotFound() elif p.returncode == 45: # password already exists raise PasswordExists() elif p.returncode > 0: err = KeychainError('Unknown Keychain error : %s' % stdout) err.retcode = p.returncode raise err return stdout.strip().decode('utf-8')
def run_script(self, script): fd, self.tmp_script_filename = tempfile.mkstemp() os.close(fd) f = open(self.tmp_script_filename, 'w') f.write(script) f.close() os.chmod(self.tmp_script_filename, 0o744) p = subprocess.Popen( self.tmp_script_filename, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) self.process = p self.stream = p.stdout
def scrapeDoi(url): env = os.environ.copy() cmd_line = ['timeout', '30s', 'google-chrome-unstable', '--headless', '--dump-dom', url] p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env) out, err = p.communicate() if p.returncode: print('UTOH') return None elif b'ERROR:headless_shell.cc' in out: print(out) raise IOError('Something is wrong...') qurl = quote(url, '') if len(qurl) > 200: qurl = qurl[:200] with open(os.path.expanduser(f'~/files/scibot/{qurl}'), 'wb') as f: f.write(out) both = BeautifulSoup(out, 'lxml') doi = getDoi(both, both) return doi
def sys_run(command,check=False): Ret = subprocess.run(command, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, shell=True, check=check) return Ret
def list_links(): try: ret = subprocess.run(['ip', 'link', 'show'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) links = ipcontrol.parse(ret.stdout.decode('utf-8')) return [True, list(links.keys())] except subprocess.CalledProcessError as suberror: return [False, "list links failed : %s" % suberror.stdout.decode('utf-8')]
def link_exist(linkname): try: subprocess.run(['ip', 'link', 'show', 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) return True except subprocess.CalledProcessError: return False
def link_info(linkname): try: ret = subprocess.run(['ip', 'address', 'show', 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) return [True, ipcontrol.parse(ret.stdout.decode('utf-8'))[str(linkname)]] except subprocess.CalledProcessError as suberror: return [False, "get link info failed : %s" % suberror.stdout.decode('utf-8')]
def link_state(linkname): try: ret = subprocess.run(['ip', 'link', 'show', 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) return [True, ipcontrol.parse(ret.stdout.decode('utf-8'))[str(linkname)]['state']] except subprocess.CalledProcessError as suberror: return [False, "get link state failed : %s" % suberror.stdout.decode('utf-8')]
def down_link(linkname): try: subprocess.run(['ip', 'link', 'set', 'dev', str(linkname), 'down'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) return [True, str(linkname)] except subprocess.CalledProcessError as suberror: return [False, "set link down failed : %s" % suberror.stdout.decode('utf-8')]
def add_addr(linkname, address): try: subprocess.run(['ip', 'address', 'add', address, 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) return [True, str(linkname)] except subprocess.CalledProcessError as suberror: return [False, "add address failed : %s" % suberror.stdout.decode('utf-8')]
def list_bridges(): try: ret = subprocess.run(['ovs-vsctl', 'list-br'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) return [True, ret.stdout.decode('utf-8').split()] except subprocess.CalledProcessError as suberror: return [False, "list bridges failed : %s" % suberror.stdout.decode('utf-8')]