我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用commands.getoutput()。
def getConfig(cfgname): cfg = globals()[cfgname] # Scan the ip of configuration if cfg: if cfg['host'].startswith('-cmd-'): cmd = cfg['host'][5:] cfg['host'] = commands.getoutput(cmd) elif cfg['host'].startswith('-func-'): func_name = cfg['host'][6:] func = globals()[func_name] cfg['host'] = func() print('IP[%s]: %s' % (cfgname,cfg['host'])) return cfg
def getChildren(parentPid): process_listing = commands.getoutput('ls /proc').split('\n') children = [] for entry in process_listing: try: filename = '/proc/'+entry+'/status' fp = open(filename,'r') stuff=fp.read() fp.close() rows = stuff.split('\n') for row in rows: if row[:4]=='PPid': PPid = int(row.split(':')[1][1:]) if PPid == parentPid: children.append(int(entry)) break except: continue return children
def getProcessName(pid): str_pid = str(pid) process_listing = commands.getoutput('ls /proc').split('\n') Name = '' for entry in process_listing: if entry == str_pid: try: filename = '/proc/'+entry+'/status' fp = open(filename,'r') stuff=fp.read() fp.close() lines = stuff.split('\n') for line in lines: if line[:4]=='Name': Name = line.split(':')[1][1:] return Name except: continue else: continue return Name
def slot_autoload_victim_clients(self): # clear self.combo_wep_mac_cfrag.clear() self.combo_wpa_mac_hand.clear() # check *.csv files if not glob.glob(config_dir + "*.csv"): self.output("no csv files in " + config_dir, 1) return # open dump file dump_file = commands.getoutput("cat " + config_dir + "*.csv | egrep -e '^[0-9a-fA-F]{2,2}:[0-9a-fA-F]{2,2}:[0-9a-fA-F]{2,2}:[0-9a-fA-F]{2,2}:[0-9a-fA-F]{2,2}:[0-9a-fA-F]{2,2}.+[0-9a-fA-F]{2,2}:[0-9a-fA-F]{2,2}:[0-9a-fA-F]{2,2}:[0-9a-fA-F]{2,2}:[0-9a-fA-F]{2,2}:[0-9a-fA-F]{2,2},' | grep " + self.ac + " | tr ',' ' ' | awk ' { print $1 } '") dump_file = dump_file.split('\n') for mac in dump_file: self.combo_wep_mac_cfrag.insertItem(0, mac) self.combo_wpa_mac_hand.insertItem(0, mac) # # Add cracked key to database #
def ping_ip(idx, ip): ''' ping 3 ????ip, 3?????3s?????????? ?? 3 ? ping ???? param: idx: server?? ip: ip?? ''' ping_info = commands.getoutput('ping -c 3 -t 3 ' + ip) connected = re.findall(r'\b(\d)\b packets received', ping_info) if connected[0] == '0': # fail return [idx, float('inf'), '0'] else: avg_time = float(re.findall( r'stddev = [\d|.]+/([\d|.]+)', ping_info)[0]) return [idx, avg_time, connected[0]]
def main(log_dir='.'): # Check stack.sh status rm = 'stack_failed' log = "%s/%s/logs/stack.sh.log.gz" % ( log_dir, known_issues[rm]['file']) last_line = commands.getoutput('tail -n1 %s ' % log) if last_line.find(known_issues[rm]['pattern']) >= 0: send_email(rm) del known_issues[rm] for rm, details in known_issues.iteritems(): try: log = "%s/%s" % ( log_dir, details['file']) print ("Checking %s for %s" % (log, details['pattern'])) if details['pattern'] in open(log).read(): print "RM#%s happened again in %s" % (rm, log_dir) send_email(rm) except Exception as e: print "Exception %s" % ( e ) pass
def get_chd_mac_addr(self): #get the ip address of cw-llc interface intf = 'cw-llc' #cmd_op = commands.getoutput("ifconfig | grep " + intf ).split() cmd_op = commands.getoutput("ifconfig | grep " + intf ) if (cmd_op.find('HWaddr')) != -1: tokens = cmd_op.split() mac_addr = tokens[tokens.index('HWaddr') + 1] return mac_addr else: return None #----------------------------------------------------------------------- # Get ip address for COHDA interface #-----------------------------------------------------------------------
def modem_available(self): intf = 'cw-llc' intf_ip = commands.getoutput("ip address show dev " + intf).split() intf_ip = intf_ip[intf_ip.index('inet') + 1].split('/')[0] intf_ip = self.get_chd_ip_addr() if (not intf_ip is None): # JMA add dsrc_led self.dsrc_led.on() return 1 else: return 0 #----------------------------------------------------------------------- # Connecto 801.110 modem and open UDP connections for Tx and Rx #-----------------------------------------------------------------------
def detect_desktop_environment(): '''Checks for known desktop environments Return the desktop environments name, lowercase (kde, gnome, xfce) or "generic" ''' desktop_environment = 'generic' if os.environ.get('KDE_FULL_SESSION') == 'true': desktop_environment = 'kde' elif os.environ.get('GNOME_DESKTOP_SESSION_ID'): desktop_environment = 'gnome' else: try: info = commands.getoutput('xprop -root _DT_SAVE_MODE') if ' = "xfce4"' in info: desktop_environment = 'xfce' except (OSError, RuntimeError): pass return desktop_environment
def initial_run(context): if context.mode == 'virtualenv': cmd = commands.getoutput('which bitmaskctl') # print("CMD PATH", cmd) commands.getoutput('bitmaskctl stop') # TODO: fix bitmaskctl to only exit once bitmaskd has stopped time.sleep(2) _initialize_home_path() commands.getoutput('bitmaskctl start') time.sleep(1) elif context.mode in ('bundle', 'bundle-ci'): commands.getoutput(context.bundle_path) time.sleep(2) tokenpath = os.path.join(get_path_prefix(), 'leap', 'authtoken') token = open(tokenpath).read().strip() context.login_url = "http://localhost:7070/#%s" % token
def test(self): # only supports combined server+client model at the moment # should support separate I suppose, but nobody uses it nprocs = self.params.get('nprocs', default=commands.getoutput("nproc")) args = self.params.get('args', default=None) args = '%s %s' % (args, nprocs) pid = os.fork() if pid: # parent client = os.path.join(self.sourcedir, 'client.txt') args = '-c %s %s' % (client, args) cmd = os.path.join(self.sourcedir, "tbench") + " " + args # Standard output is verbose and merely makes our debug logs huge # so we don't retain it. It gets parsed for the results. self.results = process.system_output(cmd, shell=True) os.kill(pid, signal.SIGTERM) # clean up the server else: # child server = os.path.join(self.sourcedir, 'tbench_srv') os.execlp(server, server) pattern = re.compile(r"Throughput (.*?) MB/sec (.*?) procs") (throughput, procs) = pattern.findall(self.results)[0] self.log.info({'throughput': throughput, 'procs': procs})
def _check_dependencies(dependencies): """Return error if wrong software version""" warning = 0 info = "[WARNING] Old version of %s: %s. Update to version %s+!\n" for cmd, version in dependencies.iteritems(): out = _check_executable(cmd) if "not found" in out: warning = 1 sys.stderr.write("[ERROR] %s\n"%out) elif version: out = commands.getoutput("%s --version"%cmd) curver = out.split()[-1] if not curver.isdigit(): warning = 1 sys.stderr.write("[WARNING] Problem checking %s version: %s\n"%(cmd, out)) elif int(curver)<version: warning = 1 sys.stderr.write(info%(cmd, curver, version)) message = "Make sure you have installed all dependencies from https://github.com/lpryszcz/pyScaf#dependencies !" if warning: sys.stderr.write("\n%s\n\n"%message) sys.exit(1)
def main(): parser = argparse.ArgumentParser(description='email notification') parser.add_argument('--email', '-e', help='email address', required=True) parser.add_argument('--process', '-p', help='process', required=True) parser.add_argument('--title', '-t', help='email content', default='Your process is done') parser.add_argument('--content', '-c', help='email content') args = parser.parse_args() host = platform.node() process = args.process email = args.email title = args.title content = args.content if args.content else '{} is done'.format(process) print commands.getoutput('ps -ef | grep %s' % process).split('\n') while any([True if x.find('python %s' % process) >= 0 else False for x in commands.getoutput('ps -ef | grep %s' % process).split('\n')]): print 'sleep ...' sleep(300) send_email(title, host, email, content)
def cmmnd_FingerPosition(self, finger_value): commands.getoutput('rosrun kinova_demo fingers_action_client.py j2n6a300 percent -- {0} {1} {2}'.format(finger_value[0],finger_value[1],finger_value[2])) # fingers_action_client.getCurrentFingerPosition('j2n6a300_') # # finger_turn, finger_meter, finger_percent = fingers_action_client.unitParser('percent', finger_value, '-r') # finger_number = 3 # finger_maxDist = 18.9/2/1000 # max distance for one finger in meter # finger_maxTurn = 6800 # max thread turn for one finger # # try: # if finger_number == 0: # print('Finger number is 0, check with "-h" to see how to use this node.') # positions = [] # Get rid of static analysis warning that doesn't see the exit() # exit() # else: # positions_temp1 = [max(0.0, n) for n in finger_turn] # positions_temp2 = [min(n, finger_maxTurn) for n in positions_temp1] # positions = [float(n) for n in positions_temp2] # # print('Sending finger position ...') # result = fingers_action_client.gripper_client(positions) # print('Finger position sent!') # # except rospy.ROSInterruptException: # print('program interrupted before completion')
def hex_dump(hex_cmd): """ return hexdump in html formatted data :param hex_cmd: :return: str """ hex_string = getoutput(hex_cmd) # Format the data html_string = '' hex_rows = hex_string.split('\n') for row in hex_rows: if len(row) > 9: off_str = row[0:8] hex_str = row[9:58] asc_str = row[58:78] asc_str = asc_str.replace('"', '"') asc_str = asc_str.replace('<', '<') asc_str = asc_str.replace('>', '>') html_string += '<div class="row"><span class="text-info mono">{0}</span> ' \ '<span class="text-primary mono">{1}</span> <span class="text-success mono">' \ '{2}</span></div>'.format(off_str, hex_str, asc_str) # return the data return html_string
def main(): inputs = utils.format_inputs() task = inputs['task'] ret = [json.dumps(task)] if 'end' in task and 'end' not in inputs['prior']: timew = json.loads(commands.getoutput('timew get dom.tracked.1.json')) cmd = 'timew duration "%(uuid)s" from %(entry)s - %(end)s' % task if 'end' not in timew and task['uuid'] in timew['tags']: cmd = 'timew stop :quiet && ' + cmd if 'estimate' in task: ret.append( 'Estimate Duration: %s' % utils.parse_duration(task['estimate'])) ret.append('Total Duration: %s' % commands.getoutput(cmd)) if len(ret) == 1: ret.append('') print('\n'.join(ret))
def Steg_brute(ifile, dicc): i = 0 ofile = ifile.split('.')[0] + "_flag.txt" nlines = len(open(dicc).readlines()) with open(dicc, 'r') as passFile: pbar = ProgressBar(widgets=[Percentage(), Bar()], maxval=nlines).start() for line in passFile.readlines(): password = line.strip('\n') r = commands.getoutput("steghide extract -sf %s -p '%s' -xf %s" % (ifile, password, ofile)) if not "no pude extraer" in r and not "could not extract" in r: print(color.GREEN + "\n\n " + r + color.ENDC) print("\n\n [+] " + color.INFO + "Information obtained with password:" + color.GREEN + " %s\n" % password + color.ENDC) if check_file(ofile): with open(ofile, 'r') as outfile: for line in outfile.readlines(): print(line) break pbar.update(i + 1) i += 1
def task_download_report(self, task_item): uri = task_item.data['uri'] #size = task_item.data['size'] #file_format = task_item.data['format'] md5 = task_item.data['md5'] save_path = task_item.data['save_path'] def report(count, blockSize, totalSize): #update percent task_item.percent = ("%.2f" % (count*blockSize*100/totalSize)) task_item.consuming = int(time.time() - self.begin_handle_time) pass filename = commands.getoutput('basename %s' % uri) file_abs_path = save_path + '/' + filename urllib.urlretrieve(uri, file_abs_path, reporthook=report) this_md5 = commands.getoutput("md5sum %s | awk '{print $1}' " % file_abs_path) if this_md5 == md5: return {"status_code": 0, "msg":"OK"} else: #os.system("rm -f %s" % file_abs_path) return {"status_code": -1, "msg":"md5 doesn't match"}
def get_sw_info(self): ret = {} hv_ver = self.conn.getVersion() major = hv_ver / 1000000 minor = hv_ver % 1000000 / 1000 release = hv_ver % 1000 hv_ver = str(major)+'.'+str(minor)+'.'+str(release) lib_ver = self.conn.getLibVersion() major = lib_ver / 1000000 minor = lib_ver % 1000000 / 1000 release = lib_ver % 1000 lib_ver = str(major)+'.'+str(minor)+'.'+str(release) abspath = os.path.dirname(os.path.abspath(__file__)) cli_ver = open(abspath + "/VERSION").readline().rstrip('\r\n') cmd = "uname -r" os_ver = commands.getoutput(cmd) ret['version'] = cli_ver ret['libvirt_version'] = lib_ver ret['hv_version'] = hv_ver ret['os_version'] = os_ver return ret
def get_rx_tx(self, itface): try: cmd_rx = "cat /sys/class/net/%s/statistics/rx_bytes" % itface cmd_tx = "cat /sys/class/net/%s/statistics/tx_bytes" % itface data_rx_prev = commands.getoutput(cmd_rx) data_tx_prev = commands.getoutput(cmd_tx) time.sleep(1) data_rx_now = commands.getoutput(cmd_rx) data_tx_now = commands.getoutput(cmd_tx) rx = (float(data_rx_now) - float(data_rx_prev))/1024 rx = ("%.2f" % rx) tx = (float(data_tx_now) - float(data_tx_prev))/1024 tx = ("%.2f" % tx) except Exception, e: logger.exception(e) return None return {"in" : rx, "out" : tx}
def kill_process(proc, file=":"): try: check = "ps auxww | grep [" + proc[0] + "]" + proc[1:] + " | grep " + file status = commands.getoutput(check) #print(status) if status: #Process exists, kill it kill = "kill $(" + check + " | awk '{print $2}')" commands.getoutput(kill) return True else: return False except: return False #This function is used to check individual processes, not services.
def get_output(cmd): if int(sublime.version()) < 3000: if sublime.platform() != "windows": # Handle Linux and OS X in Python 2. run = '"' + '" "'.join(cmd) + '"' return commands.getoutput(run) else: # Handle Windows in Python 2. # Prevent console window from showing. startupinfo = subprocess.STARTUPINFO() startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW return subprocess.Popen(cmd, \ stdout=subprocess.PIPE, \ startupinfo=startupinfo).communicate()[0] else: # Handle all OS in Python 3. run = '"' + '" "'.join(cmd) + '"' return subprocess.check_output(run, stderr=subprocess.STDOUT, shell=True, env=os.environ)
def main(): fname = sys.argv[1] key_old = sys.argv[2] key_new = sys.argv[3] params = np.load(fname) new_params = OrderedDict() flag = False for k, v in params.iteritems(): if k == key_old: k = key_new flag = True new_params[k] = v if flag: print '[info] %s found in %s'%(key_old, fname) commands.getoutput('rm -f %s'%(fname)) np.savez(fname, **new_params) else: print '[info] %s not found in %s'%(key_old, fname)
def my_cmd(self, query, flag): rst = commands.getoutput(query) print query rst = ''.join(rst) rst = rst.split('\r')[-1] po = rst.split('\n')[:-1] # print po # print '\n\nsystem output 1 = \n' + rst # print po # print 'neighbours1', neighbours1 if flag: def p (x): return [x.split(' ')[1], x.split(' ')[2]] else : def p (x): return [x.split(' ')[1], x.split(' ')[0]] result = map(p, po) # print 'result = ' # print result return result
def get_dupreport_byins(insname): flag = True pc = prpcrypt() for a in insname.db_name_set.all(): for i in a.db_account_set.all(): if i.role == 'admin': tar_username = i.user tar_passwd = pc.decrypt(i.passwd) flag = False break if flag == False: break if vars().has_key('tar_username'): cmd = incept.pttool_path + '/pt-duplicate-key-checker' + ' -u %s -p %s -P %d -h %s ' % (tar_username, tar_passwd, int(insname.port), insname.ip) dup_result = commands.getoutput(cmd) return dup_result
def get_diff(dbtag1,tb1,dbtag2,tb2): if os.path.isfile(path_mysqldiff) : tar_host1, tar_port1, tar_username1, tar_passwd1,tar_dbname1 = get_conn_info(dbtag1) tar_host2, tar_port2, tar_username2, tar_passwd2,tar_dbname2 = get_conn_info(dbtag2) server1 = ' -q --server1={}:{}@{}:{}'.format(tar_username1,tar_passwd1,tar_host1,str(tar_port1)) server2 = ' --server2={}:{}@{}:{}'.format(tar_username2,tar_passwd2,tar_host2,str(tar_port2)) option = ' --difftype=sql' table = ' {}.{}:{}.{}'.format(tar_dbname1,tb1,tar_dbname2,tb2) cmd = path_mysqldiff + server1 + server2 + option + table output = os.popen(cmd) result = output.read() # result = commands.getoutput(cmd) else : result = "mysqldiff not installed" return result
def check_node_status(): try: b = commands.getoutput(PATH) logging.info(b) if "DN" in b or "Status=Up/Down" not in b: if read_flag() == 'normal': write_flag('abnormal') send_mail_tool(b) elif read_flag() == 'abnormal': pass else: write_flag('abnormal') send_mail_tool(b) else: write_flag('normal') except: send_mail_tool("Check node status error!")
def discoverhostparams(): command_host_name = utils.hostnameCmdPath.cmd result_dict = {} resultString = "" hostname = commands.getoutput(command_host_name) result_dict['hostname'] = hostname resultString = json.dumps(result_dict) print resultString sys.exit(utils.PluginStatusCode.OK) ### # This plugin discovers all the host specific parameters # Currently it gets only the hostname from the node # but when we add support for discovering physical # components like cpu,network,disk etc, all those will be # addded as part of this module ###
def get_output(cmd): if int(sublime.version()) < 3000: if sublime.platform() != "windows": # Handle Linux and OS X in Python 2. run = '"' + '" "'.join(cmd) + '"' return commands.getoutput(run) else: # Handle Windows in Python 2. # Prevent console window from showing. startupinfo = subprocess.STARTUPINFO() startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW return subprocess.Popen(cmd, \ stdout=subprocess.PIPE, \ startupinfo=startupinfo).communicate()[0] else: # Handle all OS in Python 3. run = '"' + '" "'.join(cmd) + '"' try: return subprocess.check_output(run, stderr=subprocess.STDOUT, shell=True, env=os.environ) except Exception as exception: print(exception.output)
def pidExists(pid): process_listing = commands.getoutput('ls /proc').split('\n') return str(pid) in process_listing
def pidExists(pid): process_listing = commands.getoutput('ls /proc').split('\n') return str(pid) in process_listing # This test suite requires log4cxx support because it checks the domain's log # output
def test_OrphanProcesses(self): """ Tests that all child processes associated with a component get terminated with that component. """ nb, domMgr = self.launchDomainManager() self.assertNotEqual(domMgr, None) nb, devMgr = self.launchDeviceManager('/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml') self.assertNotEqual(devMgr, None) domMgr.installApplication('/waveforms/orphaned_child/orphaned_child.sad.xml') appFact = domMgr._get_applicationFactories()[0] self.assertEqual(appFact._get_name(), 'orphaned_child') app = appFact.create(appFact._get_name(), [], []) pid = app._get_componentProcessIds()[0].processId children = [int(line) for line in commands.getoutput('ps --ppid %d --no-headers -o pid' % (pid,)).split()] app.releaseObject() orphans = 0 for pid in children: try: os.kill(pid, 9) orphans += 1 except OSError: pass self.assertEqual(orphans, 0)
def test_CppGppOrphanProcesses(self): """ Tests that all child processes associated with a component get terminated with that component. """ nb, domMgr = self.launchDomainManager() self.assertNotEqual(domMgr, None) nb, devMgr = self.launchDeviceManager('/nodes/test_GPP_node/DeviceManager.dcd.xml') self.assertNotEqual(devMgr, None) domMgr.installApplication('/waveforms/orphaned_child/orphaned_child.sad.xml') appFact = domMgr._get_applicationFactories()[0] self.assertEqual(appFact._get_name(), 'orphaned_child') app = appFact.create(appFact._get_name(), [], []) pid = app._get_componentProcessIds()[0].processId children = [int(line) for line in commands.getoutput('ps --ppid %d --no-headers -o pid' % (pid,)).split()] app.releaseObject() orphans = 0 for pid in children: try: os.kill(pid, 9) orphans += 1 except OSError: pass self.assertEqual(orphans, 0)
def getUnitTestFiles(rootpath, testFileGlob="test_*.py"): rootpath = os.path.normpath(rootpath) + "/" print "Searching for files in %s with prefix %s" % (rootpath, testFileGlob) test_files = commands.getoutput("find %s -name '%s'" % (rootpath, testFileGlob)) files = test_files.split('\n') if files == ['']: files = [] files.sort() return files
def uuidgen(): return commands.getoutput('uuidgen')
def terminateChildrenPidOnly(self, pid, signals=(signal.SIGINT, signal.SIGTERM)): ls = commands.getoutput('ls /proc') entries = ls.split('\n') for entry in entries: filename = '/proc/'+entry+'/status' try: fp = open(filename,'r') stuff = fp.readlines() fp.close() except: continue ret = '' for line in stuff: if 'PPid' in line: ret=line break if ret != '': parentPid = ret.split('\t')[-1][:-1] if parentPid == pid: self.terminateChildrenPidOnly(entry, signals) filename = '/proc/'+pid+'/status' for sig in signals: try: os.kill(int(pid), sig) except: continue done = False attemptCount = 0 while not done: try: fp = open(filename,'r') fp.close() attemptCount += 1 if attemptCount == 10: break time.sleep(0.1) except: done = True if not done: continue
def uuidgen(): return commands.getoutput('uuidgen') # Finds SDR root directory
def uuid1(node=None, clock_seq=None): """ Generate a UUID from a host ID, sequence number, and the cuurent time. The 'node' and 'clock_seq' arguments are ignored. Attempt to use 'uuidgen' Attempt to use 'uuid' (for debian) """ (result, output) = _commands.getstatusoutput('uuidgen -t') if (result == 0): return output return _commands.getoutput('uuid -v 1 -m')
def uuid4(): """ Generate a random UUID. Attempt to use 'uuidgen' Attempt to use 'uuid' (for debian) """ (result, output) = _commands.getstatusoutput('uuidgen -r') if (result == 0): return output return _commands.getoutput('uuid -v 4 -m')
def add_key_to_database(self): aircrack_log = config_dir + 'aircrack-log.txt' # read cracked key key = commands.getoutput("cat " + aircrack_log + " | grep 'KEY FOUND' | tr '[]' '\n' | egrep '([a-fA-F0-9]:)+' | tr -d ' \t'") # insert a row in the database self.table_database.insertRow(0) item=QtGui.QTableWidgetItem() item.setText(essid) self.table_database.setItem(0, 0, item) item=QtGui.QTableWidgetItem() item.setText(self.ac) self.table_database.setItem(0, 1, item) item=QtGui.QTableWidgetItem() item.setText(self.canale) self.table_database.setItem(0, 2, item) item=QtGui.QTableWidgetItem() item.setText(key) self.table_database.setItem(0, 3, item) item=QtGui.QTableWidgetItem() item.setText((key_to_ascii(key))) self.table_database.setItem(0, 4, item) # # Database changed #