我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用commands.getstatusoutput()。
def execCmd(cmd, timeOut = -1): """ Execute the command given on the UNIX command line and returns a list with the cmd exit code and the output written on stdout and stderr. cmd: Command to execute on the shell (string). timeOut: Timeout waiting for the command. A timeout of "-1" means that no timeout is applied (float). Returns: Tuple with the exit code and output on stdout and stderr: [<exit code>, <stdout>] """ logger.debug("Executing command: %s", cmd) if (timeOut == -1): return commands.getstatusoutput(cmd) else: exitCode, stdout, stderr = ngamsCoreExecCmd(cmd, timeOut) return [exitCode, stdout]
def test_cmdline_props(self): nodebooter, domMgr = self.launchDomainManager() self.assertNotEqual(domMgr, None) nodebooter, devMgr = self.launchDeviceManager("/nodes/cmdline_node/DeviceManager.dcd.xml") self.assertNotEqual(devMgr, None) status,output = commands.getstatusoutput('ps -ww -f | grep cmdline_dev') lines = output.split('\n') for line in lines: if 'IOR' in line: break items = line.split(' ') self.assertEqual(items.count('testprop'),1) props=[CF.DataType(id='testprop',value=any.to_any(None))] retprops = devMgr._get_registeredDevices()[0].query(props) self.assertEqual('abc',retprops[0].value._v)
def test_nocmdline_props(self): nodebooter, domMgr = self.launchDomainManager() self.assertNotEqual(domMgr, None) nodebooter, devMgr = self.launchDeviceManager("/nodes/nocmdline_node/DeviceManager.dcd.xml") self.assertNotEqual(devMgr, None) status,output = commands.getstatusoutput('ps -ww -f | grep cmdline_dev') lines = output.split('\n') for line in lines: if 'IOR' in line: break items = line.split(' ') self.assertEqual(items.count('testprop'),0) props=[CF.DataType(id='testprop',value=any.to_any(None))] retprops = devMgr._get_registeredDevices()[0].query(props) self.assertEqual('abc',retprops[0].value._v)
def __init__(self, attach=True, **opts): status, gdb = commands.getstatusoutput('which gdb') if status: raise RuntimeError, 'gdb cannot be found' pass_opts = {} for name, value in opts.iteritems(): if len(name) == 1: name = '-'+name pass_opts[name] = value elif name[0] == '-' and name[1] != '-': pass_opts[name] = value elif name[:2] != '--': name = '--'+name name = name.replace('_','-') pass_opts[name] = value super(GDB,self).__init__(gdb, '=', **pass_opts) self._attach = attach
def run(self): # exec command print (self.command) # use terminal emulator? if self.use_term: commands.getstatusoutput(def_term + " -e 'bash -c \"" + self.command + "; read; \"'") else: commands.getstatusoutput(self.command) # callback if hasattr(self.callback, '__call__'): self.callback() # # Retarded Kill #
def slot_monitor(self): if self.check_options(self.periferica_opt) == 0: pass elif self.intf_mode == "Monitor": status = commands.getstatusoutput('airmon-ng stop ' + self.periferica) if status[0] != 0: self.output(status[1], status[0]) else: self.output("Monitor off: " + self.periferica, status[0]) else: status = commands.getstatusoutput('airmon-ng check kill && echo y | airmon-ng start ' + self.periferica) if status[0] != 0: self.output(status[1], status[0]) else: self.output("Monitor on: " + self.periferica, status[0]) self.slot_reload_interfaces() # # Start Client Fragmentation Attack #
def slot_mac_change(self): if self.check_options(self.change_mac_int_opt | self.change_mac_mac_opt | self.mymon_opn | self.intf_mode_opt) == 0: pass else: # backup of old MAC... commands.getstatusoutput('if [ -e ' + config_dir + '.macaddress-backup ]; then echo ""; else ifconfig ' + self.change_mac_int + ' | grep HWaddr | sed \'s/^.*HWaddr //\' > ' + config_dir + '.macaddress-backup; fi') status = commands.getstatusoutput('ifconfig ' + self.change_mac_int + ' down hw ether ' + self.change_mac_mac) if status[0] != 0: self.output(status[1], status[0]) return status = commands.getstatusoutput('ifconfig ' + self.change_mac_int + ' up') if status[0] != 0: self.output(status[1], status[0]) return self.output('Mac address of interface ' + self.change_mac_int + ' changed in ' + self.change_mac_mac, status[0]) # # Enable ip forwarding #
def brute(ip): print "\n[+] Attempting BruteForce:",ip try: for n in names: response = StringIO.StringIO(commands.getstatusoutput('snmpwalk '+ip+" "+n)[1]).readlines() if re.search("command not found",response[0].lower()): print "\n[-] snmpwalk not installed!!!\n" sys.exit(1) else: if verbose ==1: print "\t{- Trying:",n if len(response) > 1: print "\n\tSuccess:",ip,"Community Name:",n print "\n\tTry: snmpwalk",ip,n,"\n" except(), msg: #print "Error:",msg pass
def log(self,path,number=None): vList = [] cmd = "cd {path} && svn log -l {number} -q".format(path=path,number=number) status,result = commands.getstatusoutput(cmd) if status == 0: for log in result.split('\n'): if log.startswith('---'):continue log = log.split('|') data = dict() data['ver'] = log[0].strip() data['user'] = log[1].strip() data['comid'] = log[0].strip() log = log[2].strip().split(' ',2) ctime = log[0] + ' ' + log[1] data['desc'] = ctime vList.append(data) return vList
def autoSsWap(self): if self.swap_place: if config.plugins.ldteam.swapautostart.value: config.plugins.ldteam.swapautostart.setValue(False) if fileExists('/etc/init.d/SwapManager'): if fileExists('/etc/rc3.d/S98SwapManager'): commands.getstatusoutput('update-rc.d -f SwapManager remove') commands.getstatusoutput('sh /etc/init.d/SwapManager stop') config.plugins.ldteam.swapautostart.save() else: config.plugins.ldteam.swapautostart.setValue(True) if fileExists('/etc/init.d/SwapManager'): if not fileExists('/etc/rc3.d/S98SwapManager'): commands.getstatusoutput('update-rc.d -f SwapManager defaults 98') commands.getstatusoutput('sh /etc/init.d/SwapManager start') config.plugins.ldteam.swapautostart.save() configfile.save() else: mybox = self.session.open(MessageBox, _("You have to create a Swap File before activating autostart"), MessageBox.TYPE_INFO) mybox.setTitle(_("Info")) self.updateSwap()
def __transform_xsltproc(self, file, xsl_file, output, params = {}): command = 'xsltproc ' param_string = '' keys = params.keys() for key in keys: param_string += ' %s "%s"' % (key, params[key]) if param_string: command += ' --stringparam %s' % param_string command += ' -o %s %s %s' % (output, xsl_file, file) # sys.stdout.write('executing command "%s\n"' % command) (exit_status, out_text) = commands.getstatusoutput(command) return exit_status, out_text # os.system(command)
def start(self,netnsName,drive,iprange,port,mode,brName,gateway=None,dns=None): ''' @param iprange: 172.16.0.100,172.16.0.254 @param mode: int|ext ''' if drive == 'ovs': if mode == 'int': cmd = '''ip netns exec {netnsName} /usr/sbin/dnsmasq -u root -g root --no-hosts --no-resolv --strict-order --bind-interfaces --except-interface lo --interface {port} --dhcp-range={iprange},infinite --dhcp-leasefile=/var/run/dnsmasq/{port}.lease --pid-file=/var/run/dnsmasq-{mode}.pid --dhcp-lease-max=253 --dhcp-no-override --log-queries --log-facility=/var/run/dnsmasq/dnsmasq-{mode}.log --dhcp-option-force=3,6 --conf-file='''.format(iprange=iprange,port=port,netnsName=netnsName,mode=mode) elif mode == 'ext': cmd = '''ip netns exec {netnsName} /usr/sbin/dnsmasq -u root -g root --no-hosts --no-resolv --strict-order --bind-interfaces --except-interface lo --interface {port} --dhcp-range={iprange},infinite --dhcp-leasefile=/var/run/dnsmasq/{port}.lease --pid-file=/var/run/dnsmasq-{mode}.pid --dhcp-lease-max=253 --dhcp-no-override --log-queries --log-facility=/var/run/dnsmasq/dnsmasq-{mode}.log --dhcp-option=3,{gateway} --dhcp-option=6,{dns} --conf-file= '''.format(iprange=iprange,port=port,netnsName=netnsName,mode=mode,gateway=gateway,dns=dns) elif drive == 'brctl': if mode == 'int': cmd = '''ip netns exec {netnsName} /usr/sbin/dnsmasq -u root -g root --no-hosts --no-resolv --strict-order --bind-interfaces --except-interface lo --interface {brName}-tap --dhcp-range={iprange},infinite --dhcp-leasefile=/var/run/dnsmasq/{port}.lease --pid-file=/var/run/dnsmasq-{mode}.pid --dhcp-lease-max=253 --dhcp-no-override --log-queries --log-facility=/var/run/dnsmasq/dnsmasq-{mode}.log --dhcp-option-force=3,6 --conf-file='''.format(iprange=iprange,brName=brName,port=port,netnsName=netnsName,mode=mode) elif mode == 'ext': cmd = '''ip netns exec {netnsName} /usr/sbin/dnsmasq -u root -g root --no-hosts --no-resolv --strict-order --bind-interfaces --except-interface lo --interface {brName}-tap --dhcp-range={iprange},infinite --dhcp-leasefile=/var/run/dnsmasq/{port}.lease --pid-file=/var/run/dnsmasq-{mode}.pid --dhcp-lease-max=253 --dhcp-no-override --log-queries --log-facility=/var/run/dnsmasq/dnsmasq-{mode}.log --dhcp-option=3,{gateway} --dhcp-option=6,{dns} --conf-file= '''.format(iprange=iprange,brName=brName,port=port,netnsName=netnsName,mode=mode,gateway=gateway,dns=dns) status,output = commands.getstatusoutput(cmd) return status,output
def _helper_installer(action): if action not in ('install', 'uninstall'): raise Exception('Wrong action: %s' % action) if IS_LINUX: cmd = 'bitmask_helpers ' + action if STANDALONE: binary_path = os.path.join(here(), "bitmask") cmd = "%s %s" % (binary_path, cmd) if os.getuid() != 0: cmd = 'pkexec ' + cmd retcode, output = commands.getstatusoutput(cmd) if retcode != 0: log.error('Error installing/uninstalling helpers: %s' % output) log.error('Command was: %s' % cmd) raise Exception('Could not install/uninstall helpers') else: raise Exception('No install mechanism for this platform')
def need_bootstrap(): """ Check if bootstrap required :return: True if bootstrap is needed """ logger.debug("Check if bootstrap required") needsBootstrap = False # Check for avocado status, output = commands.getstatusoutput('avocado') if 'command not ' in output: logger.info("Avocado needs to be installed") needsBootstrap = True # Check for avocado-vt for plugin in ['vt', 'vt-list', 'vt-bootstrap']: if not is_avocado_plugin_avl(plugin): logger.info("Avocado %s plugin needs to installed", plugin) needsBootstrap = True # Check for avocado-tests for repo in TEST_REPOS: repo_name = repo.split('/')[-1].split('.')[0] if not os.path.isdir(os.path.join(TEST_DIR, repo_name)): logger.info("Test needs to be downloaded/updated") needsBootstrap = True return needsBootstrap
def install_repo(path): """ Install the given repo :param repo: repository path """ logger.info("Installing repo: %s", path) cmd = "cd %s;make requirements;python setup.py install" % path try: status, output = commands.getstatusoutput(cmd) logger.debug("%s", output) if status != 0: logger.error("Error while installing: %s\n%s", path.split('/')[-1], status) sys.exit(1) except Exception, error: logger.error("Failed with exception during installing %s\n%s", path.split('/')[-1], error) sys.exit(1)
def install_optional_plugin(plugin): """ To install optional avocado plugin :param plugin: optional plugin name """ if not is_avocado_plugin_avl(plugin): logger.info("Installing optional plugin: %s", plugin) plugin_path = "%s/avocado/optional_plugins/%s" % (BASE_PATH, plugin) if os.path.isdir(plugin_path): cmd = "cd %s;python setup.py install" % plugin_path status, output = commands.getstatusoutput(cmd) if status != 0: logger.error("Error installing optional plugin: %s", plugin) else: logger.warning("optional plugin %s is not present in path %s," " skipping install", plugin, plugin_path) else: # plugin already installed pass
def codesignapp(app_path): """ Get codesign informatiob included in app About codesign: https://developer.apple.com/legacy/library/technotes/tn2250/_index.html#//apple_ref/doc/uid/DTS40009933 Args: Mach-o path Returns: the content of codesign """ cmd = "/usr/bin/codesign -dvvv %s" % app_path out = commands.getstatusoutput(cmd) if out and len(out) == 2 and out[0] == 0: out = out[1] else: out = '' return out
def create_daisy_server_image(self): LI('Begin to create Daisy Server image') script = path_join(CREATE_QCOW2_PATH, 'daisy-img-modify.sh') sub_script = path_join(CREATE_QCOW2_PATH, 'centos-img-modify.sh') cmd = '{script} -c {sub_script} -a {address} -g {gateway} -s {disk_size}'.format( script=script, sub_script=sub_script, address=self.daisy_server_info['address'], gateway=self.daisy_server_info['gateway'], disk_size=self.daisy_server_info['disk_size']) LI('Command is: ') LI(' %s' % cmd) # status, output = commands.getstatusoutput(cmd) status = run_shell(cmd) if status: err_exit('Failed to create Daisy Server image') if os.access(self.daisy_server_info['image'], os.R_OK): os.remove(self.daisy_server_info['image']) image = path_join(self.work_dir, 'daisy/centos7.qcow2') shutil.move(image, self.daisy_server_info['image']) LI('Daisy Server image is created %s' % self.daisy_server_info['image'])
def cmd(self, cmd, force=False, no_work=False): if self.work_tree and no_work == False: cmd = re.sub(r"^git ", "git --work-tree="+self.work_tree+" --git-dir="+os.path.join(self.work_tree, ".git")+" ", cmd.strip()) if self._enable_echo: Echo.echo("#[hello git #] "+cmd) if not force and not os.path.exists(os.path.join(self.work_tree, ".git")): (status, output) = (1, "Not a git repository: !["+self.work_tree+"]") else: (status, output) = commands.getstatusoutput(cmd) if self._enable_echo: self._enable_echo = False Echo.echo("#[hello git -> " + output.replace("\n", "\nhello git -> ")+"]") if status != 0: exit(status) return (status, output)
def generate_payload(file_name, cmd): content = ''' # include <stdio.h> # include <stdlib.h> int samba_init_module() { system("%s"); return 0; }''' % cmd payload = open(file_name + ".c", 'wb') payload.write(content.strip()) payload.close() compile_cmd = "gcc %s.c -shared -fPIC -o %s.so" % (file_name, file_name) (status, output) = commands.getstatusoutput(compile_cmd) if status == 0: print "[*] Generate payload succeed: %s.so" % (os.path.dirname(os.path.realpath(__file__)) + '/' + file_name) return file_name else: print "[!] Generate payload failed!" exit()
def init_dst_node(self, task_id, label): if not os.path.isdir(task_id): os.mkdir(task_id) os.chdir(task_id) self.task_id = task_id '''parse the label info.''' label_array = label.split('-') container_name = label_array[0] base_image = label_array[1] image_id = label_array[2] logging.debug('The docker image id is %s' %image_id) logging.debug(label_array) '''create new docker container.''' rmv_sh = 'docker rm -f ' + container_name +' >/dev/null 2>&1' logging.debug(rmv_sh) os.system(rmv_sh) cre_sh = 'docker create --name=' + container_name + ' ' + base_image logging.debug(cre_sh) ret,con_id = commands.getstatusoutput(cre_sh) self.container_id = con_id #----untar the last dump directory.----#
def __init__(self): """ Determines whether pkg-config exists on this machine. """ if sys.platform == 'win32': self.has_pkgconfig = False else: try: self.pkg_config = os.environ['PKG_CONFIG'] except KeyError: self.pkg_config = 'pkg-config' self.set_pkgconfig_path() status, output = getstatusoutput(self.pkg_config + " --help") self.has_pkgconfig = (status == 0) if not self.has_pkgconfig: print("IMPORTANT WARNING:") print( " pkg-config is not installed.\n" " matplotlib may not be able to find some of its dependencies")
def check(self): if sys.platform == 'win32': check_include_file(get_include_dirs(), 'ft2build.h', 'freetype') return 'Using unknown version found on system.' status, output = getstatusoutput("freetype-config --ftversion") if status == 0: version = output else: version = None # Early versions of freetype grep badly inside freetype-config, # so catch those cases. (tested with 2.5.3). if version is None or 'No such file or directory\ngrep:' in version: version = self.version_from_header() # pkg_config returns the libtool version rather than the # freetype version so we need to explicitly pass the version # to _check_for_pkg_config return self._check_for_pkg_config( 'freetype2', 'ft2build.h', min_version='2.3', version=version)
def check(self): if sys.platform == 'win32': check_include_file(get_include_dirs(), 'png.h', 'png') return 'Using unknown version found on system.' status, output = getstatusoutput("libpng-config --version") if status == 0: version = output else: version = None try: return self._check_for_pkg_config( 'libpng', 'png.h', min_version='1.2', version=version) except CheckFailed as e: if has_include_file(get_include_dirs(), 'png.h'): return str(e) + ' Using unknown version found on system.' raise
def tearDown(self): scatest.CorbaTestCase.tearDown(self) killChildProcesses(os.getpid()) (status,output) = commands.getstatusoutput('chmod 755 '+os.getcwd()+'/sdr/cache/.BasicTestDevice_node') (status,output) = commands.getstatusoutput('rm -rf devmgr_runtest.props')
def test_NoWriteCache(self): cachedir = os.getcwd()+'/sdr/cache/.BasicTestDevice_node' (status,output) = commands.getstatusoutput('mkdir -p '+cachedir) (status,output) = commands.getstatusoutput('chmod 000 '+cachedir) self.assertFalse(os.access(cachedir, os.R_OK|os.W_OK|os.X_OK), 'Current user can still access directory') devmgr_nb, devMgr = self.launchDeviceManager("/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml") self.assertEquals(255, devmgr_nb.returncode) self.assertEquals(devMgr, None)
def test_DeviceConnections(self): status,output = commands.getstatusoutput('../base/framework/python/ossie/apps/py2prf app_input/py_prf_check_base.py') self.assertEquals(status, 0) prf = parsers.PRFParser.parseString(output) self.assertNotEquals(prf,None) self.assertEquals(len(prf.get_simple()),1) self.assertEquals(prf.get_simple()[0].get_id(),'some_simple') self.assertEquals(len(prf.get_simplesequence()),1) self.assertEquals(prf.get_simplesequence()[0].get_id(),'some_sequence') self.assertEquals(len(prf.get_struct()),1) self.assertEquals(prf.get_struct()[0].get_id(),'some_struct') self.assertEquals(len(prf.get_structsequence()),1) self.assertEquals(prf.get_structsequence()[0].get_id(),'some_struct_seq')
def _xmllint(self, fPath, fType): os.environ['SGML_CATALOG_FILES'] = os.path.abspath("../xml/dtd/catalog.xml") docType = "-//JTRS//DTD SCA V2.2.2 %s//EN" % (fType.upper()) cmd = "xmllint --nowarning --nonet --catalogs --noout --dropdtd --dtdvalidfpi '%s' %s" % (docType, fPath) status = commands.getstatusoutput(cmd) if status[0] != 0: print status[1] return status[0]
def test_NoInteractiveJavaService(self): status, output=commands.getstatusoutput('sdr/dev/services/BasicService_java/java/startJava.sh -i') self.assertNotEquals(output.find(self.message),-1)
def test_NoInteractiveJavaDevice(self): status, output=commands.getstatusoutput('sdr/dev/devices/BasicTestDevice_java/java/startJava.sh -i') self.assertNotEquals(output.find(self.message),-1)
def test_NoInteractiveJavaComponent(self): status, output=commands.getstatusoutput('sdr/dom/components/ECM_JAVA/java/startJava.sh -i') self.assertNotEquals(output.find(self.message),-1)
def test_NoInteractivePythonService(self): status, output=commands.getstatusoutput('sdr/dev/services/S1/python/S1.py -i') self.assertNotEquals(output.find(self.message),-1)
def test_NoInteractivePythonComponent(self): status, output=commands.getstatusoutput('sdr/dom/components/ECM_PY/python/ECM_PY.py -i') self.assertNotEquals(output.find(self.message),-1)
def test_NoInteractiveCppService(self): status, output=commands.getstatusoutput('sdr/dev/services/BasicService_cpp/cpp/BasicService_cpp -i') self.assertNotEquals(output.find(self.message),-1)
def test_NoInteractiveCppDevice(self): status, output=commands.getstatusoutput('sdr/dev/devices/cpp_dev/cpp/cpp_dev -i') self.assertNotEquals(output.find(self.message),-1)
def test_NoInteractiveCppComponent(self): status, output=commands.getstatusoutput('sdr/dom/components/ECM_CPP/cpp/ECM_CPP -i') self.assertNotEquals(output.find(self.message),-1)
def test_pid(self): a = sb.launch('comp_src') #status,output = commands.getstatusoutput('ps -ef | grep comp_src | grep -v grep ') status,output = commands.getstatusoutput('ps -ww -f | grep comp_src ') lines = output.split('\n') for line in lines: if 'IOR' in line: break _pid = line.split()[1] self.assertEquals(int(_pid), a._pid)
def uuid1(node=None, clock_seq=None): """ Generate a UUID from a host ID, sequence number, and the cuurent time. The 'node' and 'clock_seq' arguments are ignored. Attempt to use 'uuidgen' Attempt to use 'uuid' (for debian) """ (result, output) = _commands.getstatusoutput('uuidgen -t') if (result == 0): return output return _commands.getoutput('uuid -v 1 -m')
def uuid4(): """ Generate a random UUID. Attempt to use 'uuidgen' Attempt to use 'uuid' (for debian) """ (result, output) = _commands.getstatusoutput('uuidgen -r') if (result == 0): return output return _commands.getoutput('uuid -v 4 -m')
def __init__(self, command, title): status, self.__command = commands.getstatusoutput('which '+command) if status: raise RuntimeError, command + ' cannot be found' self._title = title
def __init__(self, attach=True, **opts): status, jdb = commands.getstatusoutput('which jdb') if status: raise RuntimeError, 'jdb cannot be found' pass_opts = {} for name, value in opts.iteritems(): if name[0] != '-': name = '-'+name name = name.replace('_','-') pass_opts[name] = value super(JDB,self).__init__(jdb, None, **opts) self._lastport = 5680 self._attach = attach
def run(self): time.sleep(self.sec) commands.getstatusoutput("killall " + self.prog) # # For the callbacks function # extend Main_window class (that contains the GUI) #
def slot_gath_clean(self): commands.getstatusoutput('rm -f ' + config_dir + '*.cap ' + config_dir + '*.csv ' + config_dir + '*.xor ' + config_dir + '*.netxml ') self.direct_output('Logs cleaned') # # WPA Rainbow Tables Cracking #
def init_config_dir(): global def_term # check config dir if not os.path.exists(config_dir): os.mkdir(config_dir) #subprocess.getstatusoutput('zenity --info --window-icon=/usr/local/buc/icons/attenzione.png --title="AirMode" --text="Hello and Thanks for using AirMode this is the first run, and ~/.airmode is now created."') print ('\nConfig directory OK\n') # # This function perform various checks # on program load #
def slot_random_mac(self): if self.check_options(self.periferica_opt) == 0: return # disable interface status = commands.getstatusoutput('ifconfig ' + self.periferica + ' down') if status[0] != 0: self.output(status[1], status[0]) return # random MAC address status = commands.getstatusoutput('macchanger --random ' + self.periferica) if status[0] != 0: self.output(status[1], status[0]) return # re-enable interface status = commands.getstatusoutput('ifconfig ' + self.periferica + ' up') if status[0] !=0: self.output(status[1], status[0]) return self.output("MAC Address changed: " + self.periferica, status[0]) self.slot_reload_interfaces() # # Select an interface #