我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用subprocess.getoutput()。
def get_proc_etime(self,pid): fmt = subprocess.getoutput("ps -A -opid,etime | grep '^ *%d ' | awk '{print $NF}'" % pid).strip() if fmt == '': return -1 parts = fmt.split('-') days = int(parts[0]) if len(parts) == 2 else 0 fmt = parts[-1] parts = fmt.split(':') hours = int(parts[0]) if len(parts) == 3 else 0 parts = parts[len(parts)-2:] minutes = int(parts[0]) seconds = int(parts[1]) return ((days * 24 + hours) * 60 + minutes) * 60 + seconds # compute the billing val this running hour # if isreal is True, it will also make users' beans decrease to pay for the bill. # return the billing value in this running hour
def extend_swap(size): if size < 0: (mem_free, mem_total) = system_manager.get_memory_sample() size = (mem_total + mem_total // 8) // 1024 nid = 128 while subprocess.getoutput("cat /proc/swaps | grep cg-loop | awk '{print $1}' | awk -F\- '{print $NF}' | grep %d$" % nid) != "": nid = nid + 1 start_time = time.time() # setup os.system('dd if=/dev/zero of=/tmp/cg-swap-%d bs=1G count=0 seek=%d >/dev/null 2>&1' % (nid, size)) os.system('mknod -m 0660 /dev/cg-loop-%d b 7 %d >/dev/null 2>&1' % (nid, nid)) os.system('losetup /dev/cg-loop-%d /tmp/cg-swap-%d >/dev/null 2>&1' % (nid, nid)) os.system('mkswap /dev/cg-loop-%d >/dev/null 2>&1' % nid) success = os.system('swapon /dev/cg-loop-%d >/dev/null 2>&1' % nid) == 0 # detach # os.system('swapoff /dev/cg-loop-%d >/dev/null 2>&1' % nid) # os.system('losetup -d /dev/cg-loop-%d >/dev/null 2>&1' % nid) # os.system('rm -f /dev/cg-loop-%d /tmp/cg-swap-%d >/dev/null 2>&1' % (nid, nid)) end_time = time.time() return {"setup": success, "time": end_time - start_time }
def scan(network): myip_subnet = subprocess.getoutput("/sbin/ip -o -f inet addr show | awk '/scope global/ {print $4}'") nm.scan(hosts=myip_subnet, arguments='nmap -sn') iplist = [] # add localhost iplist.append({'ip_addr': '127.0.0.1', 'host': 'localhost'}) for host in nm.all_hosts(): try: ip_a = (nm[host]['addresses']['ipv4']) except KeyError: ip_a = "[Unknown IP]" try: host_name = nm[host].hostname() except KeyError: host_name = "[Unknown hostname]" iplist.append({'ip_addr': ip_a, 'host': host_name}) return iplist
def test_getoutput(self): self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy') self.assertEqual(subprocess.getstatusoutput('echo xyzzy'), (0, 'xyzzy')) # we use mkdtemp in the next line to create an empty directory # under our exclusive control; from that, we can invent a pathname # that we _know_ won't exist. This is guaranteed to fail. dir = None try: dir = tempfile.mkdtemp() name = os.path.join(dir, "foo") status, output = subprocess.getstatusoutput('cat ' + name) self.assertNotEqual(status, 0) finally: if dir is not None: os.rmdir(dir)
def hex_dump(hex_cmd): """ return hexdump in html formatted data :param hex_cmd: :return: str """ hex_string = getoutput(hex_cmd) # Format the data html_string = '' hex_rows = hex_string.split('\n') for row in hex_rows: if len(row) > 9: off_str = row[0:8] hex_str = row[9:58] asc_str = row[58:78] asc_str = asc_str.replace('"', '"') asc_str = asc_str.replace('<', '<') asc_str = asc_str.replace('>', '>') html_string += '<div class="row"><span class="text-info mono">{0}</span> ' \ '<span class="text-primary mono">{1}</span> <span class="text-success mono">' \ '{2}</span></div>'.format(off_str, hex_str, asc_str) # return the data return html_string
def api_synced(): root_meta_dict = dict() root_data_dict = dict() root_links_dict = dict() root_meta_dict["version"] = 1 root_meta_dict["name"] = "Synced" root_meta_dict["state"] = "In Progress" root_meta_dict["Underlying Command"] = "electrum is_synchronized" synced = subprocess.getoutput(root_meta_dict["Underlying Command"]) root_data_dict["electrum_synced"] = synced return jsonify(data=root_data_dict, meta=root_meta_dict, links=root_links_dict)
def git_info(): head, diff, remote = None, None, None try: head = subprocess.getoutput('git rev-parse HEAD').strip() except subprocess.CalledProcessError: pass try: diff = subprocess.getoutput('git diff --no-color') except subprocess.CalledProcessError: pass try: remote = subprocess.getoutput('git remote -v').strip() except subprocess.CalledProcessError: pass git_dict = {'head': head or 'Unknown', 'diff': diff or 'Unknown', 'remote': remote or 'Unknown'} return git_dict
def add_to_desktop(self, widget, desktopEntry): try: # Determine where the Desktop folder is (could be localized) import subprocess #sys.path.append('/usr/lib/ubuntu-mate/common') from configobj import ConfigObj config = ConfigObj(GLib.get_home_dir() + "/.config/user-dirs.dirs") desktopDir = GLib.get_home_dir() + "/Desktop" tmpdesktopDir = config['XDG_DESKTOP_DIR'] tmpdesktopDir = subprocess.getoutput("echo " + tmpdesktopDir) if os.path.exists(tmpdesktopDir): desktopDir = tmpdesktopDir # Copy the desktop file to the desktop os.system("cp \"%s\" \"%s/\"" % (desktopEntry.desktopFile, desktopDir)) os.system("chmod a+rx %s/*.desktop" % (desktopDir)) except Exception as detail: print (detail)
def get_workspaces(self): Workspaces = namedtuple('Workspaces', 'horz vert total') dconf_horz = 'dconf read /org/compiz/profiles/unity/plugins/core/hsize' dconf_vert = 'dconf read /org/compiz/profiles/unity/plugins/core/vsize' workspaces_horz = subprocess.getoutput(dconf_horz) workspaces_vert = subprocess.getoutput(dconf_vert) # If workspace changer hasn't been enabled h- and v-size doesn't seem to be set. if not workspaces_horz: self.logger.debug("unity/plugins/core/hsize not set - setting horisontal workspaces to '1'") workspaces_horz = '1' if not workspaces_vert: self.logger.debug("unity/plugins/core/vsize not set - setting vertical workspaces to '1'") workspaces_vert = '1' workspaces_total = int(workspaces_vert) * int(workspaces_horz) self.logger.debug("Horisontal number of workspaces is: " + str(workspaces_horz)) self.logger.debug("Vertical number of workspaces is: " + str(workspaces_vert)) self.logger.debug("Total number of workspaces is: " + str(workspaces_total)) workspaces = Workspaces( int(workspaces_horz), int(workspaces_vert), workspaces_total) return workspaces
def test_getoutput(self): self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy') self.assertEqual(subprocess.getstatusoutput('echo xyzzy'), (0, 'xyzzy')) # we use mkdtemp in the next line to create an empty directory # under our exclusive control; from that, we can invent a pathname # that we _know_ won't exist. This is guaranteed to fail. dir = None try: dir = tempfile.mkdtemp() name = os.path.join(dir, "foo") status, output = subprocess.getstatusoutput( ("type " if mswindows else "cat ") + name) self.assertNotEqual(status, 0) finally: if dir is not None: os.rmdir(dir)
def run(self, edit, query): ## get the contents of the tab that is the focus in sublime allcontent = sublime.Region(0, self.view.size()) ## convert to string allcontent = self.view.substr(allcontent) ## get a unique string for use in the tmp files unique_filename = str(uuid.uuid4()) ## open db and query files in /tmp and write allcontent (i.e. the 'page' open, with the sublime console at the bottom) db_handle = '/tmp/%s.sublime_blast.tmp_db.fa' % unique_filename with open(db_handle, 'w') as fo: fo.write(allcontent) query_handle = '/tmp/%s.sublime_blast.tmp_query.fa' % unique_filename with open(query_handle, 'w') as fo: fo.write('>query\n%s\n' % query) ## make a blastdb of the page open in sublime subprocess.call(['/usr/local/bin/makeblastdb', '-dbtype', 'nucl', '-in', '%s' % db_handle]) ## run blast, taking the query as what is input in the console blast_output = subprocess.getoutput(['/usr/local/bin/blastn -db {0} -query {1} -outfmt 6'.format(db_handle, query_handle)]) ## add the headers and the blast output to the end of the window of focus ## self.view.size() is where you want to insert the text, in this case, at the end of the file so we use self.view.size() which is the total size ## edit is a token which classes the insert as a single edit which can e.g. be undone with one 'undo' self.view.insert(edit, self.view.size(), '\nqseqid sseqid pident length mismatch gapopen qstart qend sstart send evalue bitscore\n') self.view.insert(edit, self.view.size(), blast_output)
def __get_remote_index(): """ Gets the index of news crawl files from commoncrawl.org and returns an array of names :return: """ # cleanup subprocess.getoutput("rm tmpaws.txt") # get the remote info cmd = "aws s3 ls --recursive s3://commoncrawl/crawl-data/CC-NEWS/ --no-sign-request > .tmpaws.txt && " \ "awk '{ print $4 }' .tmpaws.txt && " \ "rm .tmpaws.txt" __logger.info('executing: %s', cmd) stdout_data = subprocess.getoutput(cmd) lines = stdout_data.splitlines() return lines
def __get_remote_index(self): """ Gets the index of news crawl files from commoncrawl.org and returns an array of names :return: """ # cleanup subprocess.getoutput("rm tmpaws.txt") # get the remote info cmd = "aws s3 ls --recursive s3://commoncrawl/crawl-data/CC-NEWS/ --no-sign-request > tmpaws.txt && " \ "awk '{ print $4 }' tmpaws.txt && " \ "rm tmpaws.txt" self.__logger.info('executing: %s', cmd) stdout_data = subprocess.getoutput(cmd) print(stdout_data) lines = stdout_data.splitlines() return lines
def get_our_pids(this_base): ''' Runs ps ax, returns the list of PIDs whose comand starts with "/res_binaries/ ''' # Get the id of the running box; Vagrant should accept the name but doesn't; https://github.com/mitchellh/vagrant/issues/8691 id_of_base = "" for this_line in (subprocess.getoutput("vagrant global-status")).splitlines(): if this_base in this_line: (id_of_base, _) = this_line.split(" ", 1) break if not id_of_base: die("No id could be found for {}.".format(this_base)) this_command = "vagrant ssh --command \"ps ax o pid,args --no-headers\" {}".format(id_of_base) this_ps = subprocess.getoutput(this_command) pids_to_return = [] for this_line in this_ps.splitlines(): (this_pid, this_cmd) = (this_line.strip()).split(" ", 1) if this_cmd.startswith("/res_binaries/"): pids_to_return.append((this_pid, this_cmd)) return pids_to_return
def detect(hostname): """ Performs CDN detection through the DNS, using the nslookup command. Parameters ---------- hostname : str Hostname to assess """ print('[+] DNS detection\n') hostname = urlparse.urlparse(hostname).netloc regexp = re.compile('\\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\\b') out = commands.getoutput("host " + hostname) addresses = regexp.finditer(out) for addr in addresses: CDNEngine.find(commands.getoutput('nslookup ' + addr.group()))
def detect(hostname): """ Performs CDN detection through whois command's. Parameters ---------- hostname : str Hostname to assess """ print('[+] Whois detection\n') hostname = urlparse.urlparse(hostname).netloc out = commands.getoutput("whois " + hostname) CDNEngine.find(out.lower())
def detect(hostname): """ Performs CDN detection by trying to access the cdn subdomain of the specified hostname. Parameters ---------- hostname : str Hostname to assess """ print('[+] CDN subdomain detection\n') hostname = "cdn." + urlparse.urlparse(hostname).netloc out = commands.getoutput("host -a " + hostname) CDNEngine.find(out.lower())
def detect(hostname): """ Performs CDN detection thanks to information disclosure from server error. Parameters ---------- hostname : str Hostname to assess """ print('[+] Error server detection\n') hostname = urlparse.urlparse(hostname).netloc regexp = re.compile('\\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\\b') out = commands.getoutput("host " + hostname) addresses = regexp.finditer(out) for addr in addresses: res = request.do('http://' + addr.group()) if res is not None and res.status_code == 500: CDNEngine.find(res.text.lower())
def getcommands(helpcmd): """Recursively crawl all subgroups, starting from specified help command. Passed command assumed to end with -h. For example, 'az vm -h' """ indentation = 4 * (len(helpcmd.split(' ')) - 3) print(indentation*' ' + helpcmd) stdoutdata = subprocess.getoutput(helpcmd) # capture help command output subgroups = False # flag to track whether inside the subgroup section for line in stdoutdata.split('\n'): if line.strip() == 'Subgroups:': subgroups = True # found start of subgroup section continue # skip the 'Subgroups:' line if not subgroups: continue # skip all lines before subgroup section if subgroups and (not line.strip() or line.lstrip() == line): break # blank or non-indented line is end of subgroup section subhelp = subcommand(helpcmd, line) # create help command for subgroup getcommands(subhelp) # enumerate help commands for this subgroup
def _deSubstPath(path): """desubstitude craft short path""" if not OsDetection.isWin(): return path drive, tail = os.path.splitdrive(path) drive = drive.upper() if CraftStandardDirs._SUBST == None: tmp = subprocess.getoutput("subst").split("\n") CraftStandardDirs._SUBST = {} for s in tmp: if s != "": key, val = s.split("\\: => ") CraftStandardDirs._SUBST[key] = val if drive in CraftStandardDirs._SUBST: deSubst = CraftStandardDirs._SUBST[drive] + tail return deSubst return path
def __init__(self,test=False): global laststopcpuval global workercinfo threading.Thread.__init__(self) self.thread_stop = False self.interval = 2 self.billingtime = 3600 # billing interval self.test = test self.cpu_last = {} self.cpu_quota = {} self.mem_quota = {} self.net_stats = {} self.cores_num = int(subprocess.getoutput("grep processor /proc/cpuinfo | wc -l")) containers = self.list_container() for container in containers: # recovery if not container == '': try: vnode = VNode.query.get(container) laststopcpuval[container] = vnode.laststopcpuval laststopruntime[container] = vnode.laststopruntime workercinfo[container] = {} workercinfo[container]['basic_info'] = {} workercinfo[container]['basic_info']['billing'] = vnode.billing workercinfo[container]['basic_info']['billing_history'] = get_billing_history(container) workercinfo[container]['basic_info']['RunningTime'] = vnode.laststopruntime workercinfo[container]['basic_info']['a_cpu'] = a_cpu workercinfo[container]['basic_info']['b_mem'] = b_mem workercinfo[container]['basic_info']['c_disk'] = c_disk workercinfo[container]['basic_info']['d_port'] = d_port except: laststopcpuval[container] = 0 laststopruntime[container] = 0 return # list containers on this worker
def user_live_add(form, args): if not os.path.exists('/var/lib/docklet/global/users/%s' % form['user']): return False subprocess.getoutput('echo live > /var/lib/docklet/global/users/%s/status' % form['user']) return True # curl -L -X POST -F user=docklet http://0.0.0.0:1728/v1/user/live/remove
def user_live_remove(form, args): subprocess.getoutput('rm -f /var/lib/docklet/global/users/%s/status' % form['user']) return True # curl -L -X POST http://0.0.0.0:1728/v1/user/live/list
def user_live_list(form, args): return subprocess.getoutput('ls -1 /var/lib/docklet/global/users/*/status 2>/dev/null | awk -F\/ \'{print $(NF-1)\'}').split()
def get_cpu_sample(): [a, b, c, d] = subprocess.getoutput("cat /proc/stat | grep ^cpu\ | awk '{print $2, $3, $4, $6}'").split() cpu_time = int(a) + int(b) + int(c) + int(d) return (cpu_time, time.time())
def get_memory_sample(): mem_free = int(subprocess.getoutput("awk '{if ($1==\"MemAvailable:\") print $2}' /proc/meminfo 2>/dev/null")) // 1024 mem_total = int(subprocess.getoutput("awk '{if ($1==\"MemTotal:\") print $2}' /proc/meminfo 2>/dev/null")) // 1024 return (mem_free, mem_total)
def get_swap_sample(): swap_free = int(subprocess.getoutput("awk '{if ($1==\"SwapFree:\") print $2}' /proc/meminfo 2>/dev/null")) // 1024 swap_total = int(subprocess.getoutput("awk '{if ($1==\"SwapTotal:\") print $2}' /proc/meminfo 2>/dev/null")) // 1024 return (swap_free, swap_total)
def get_system_loads(): if 'last_cpu_sample' not in system_manager.__dict__: system_manager.last_cpu_sample = system_manager.get_cpu_sample() time.sleep(1) cpu_sample = system_manager.get_cpu_sample() (mem_free, mem_total) = system_manager.get_memory_sample() (swap_free, swap_total) = system_manager.get_swap_sample() ncpus = int(subprocess.getoutput("grep processor /proc/cpuinfo | wc -l")) cpu_free = ncpus - (cpu_sample[0] - system_manager.last_cpu_sample[0]) * 0.01 / (cpu_sample[1] - system_manager.last_cpu_sample[1]) cpu_free = 0.0 if cpu_free <= 0.0 else cpu_free system_manager.last_cpu_sample = cpu_sample return {"mem_free": mem_free, "mem_total": mem_total, "swap_free": swap_free, "swap_total": swap_total, "cpu_free": cpu_free, "cpu_total": ncpus }
def get_cgroup_containers(): containers = subprocess.getoutput("find %s -type d 2>/dev/null | awk -F\/ '{print $(NF-1)}'" % (cgroup_manager.__default_prefix__ % ('cpu', '*', '.'))).split() uuids = [] for item in containers: if item.startswith('docker-') and item.endswith('.scope') and len(item) > 64: uuids.append(item[7:-6]) else: uuids.append(item) return uuids
def signal_handler(signal, frame): if sys.argv[1] == 'master': subprocess.getoutput('ovs-vsctl del-br ovs-master >/dev/null 2>&1') else: subprocess.getoutput('ovs-vsctl del-br ovs-minion >/dev/null 2>&1') sys.exit(0)
def get_score_by_uuid(uuid): user = uuid.split('-')[0] online = subprocess.getoutput('cat /var/lib/docklet/global/users/%s/status 2>/dev/null' % user) == 'live' return 10.0 if online else 1.0
def mpv_pause_status(): stdoutdata = subprocess.getoutput('echo \'{ "command": ["get_property", "pause"] }\' | socat - "' + mpv_socket + '"') try: return loads(stdoutdata)['data'] except: return mpv_pause_status()
def mpv_fullscreen_status(): stdoutdata = subprocess.getoutput('echo \'{ "command": ["get_property", "fullscreen"] }\' | socat - "' + mpv_socket + '"') try: return loads(stdoutdata)['data'] except: return mpv_fullscreen_status()
def create_jp2(img_name, img_id): image = Image.open('{}.tiff'.format(img_name)) if image.mode == 'RGB': kdu_com = kdu_command_rgb.format(img_name, img_name) else: kdu_com = kdu_command.format(img_name, img_name) kdu_com = kdu_com.replace('{', '\{') kdu_com = kdu_com.replace('}', '\}') res = subprocess.getoutput(kdu_com) if res.startswith('Kakadu Error'): # Probably not uncompressed tiff print('probably not uncompressed tiff') print(res) subprocess.getoutput('mv {}.tiff {}-2.tiff'.format(img_name, img_name)) subprocess.getoutput(convert_command.format(img_name, img_name)) kdu_com2 = kdu_command.format(img_name, img_name) kdu_com2 = kdu_com2.replace('{', '\{') kdu_com2 = kdu_com2.replace('}', '\}') res = subprocess.getoutput(kdu_com2) print('new response') print(res) if res.startswith('Kakadu Error') or res.startswith('Kakadu Core Error'): print('Still broken :(') raise ValueError(img_name) k = Key(b, '{}{}.jp2'.format(image_base, img_id)) k.set_contents_from_filename('{}.jp2'.format(img_name))
def get_gitversion() -> dict: """ Uses GitVersion (https://github.com/GitTools/GitVersion) to infer project's current version Returns Gitversion JSON output as a dict """ if os.environ.get('APPVEYOR'): exe = find_executable('gitversion', r'C:\ProgramData\chocolatey\bin') else: exe = find_executable('gitversion') if not exe: click.secho( '"gitversion.exe" not been found in your PATH.\n' 'GitVersion is used to infer the current version from the Git repository.\n' 'setuptools_scm plans on switching to using the Semver scheme in the future; when that happens, ' 'I\'ll remove the dependency to GitVersion.\n' 'In the meantime, GitVersion can be obtained via Chocolatey (recommended): ' 'https://chocolatey.org/packages/GitVersion.Portable\n' 'If you already have chocolatey installed, you can simply run the following command (as admin):\n\n' '\t\t"choco install gitversion.portable -pre -y"\n\n' 'If you\'re not comfortable using the command line, there is a GUI tool for Chocolatey available at:\n\n' '\t\thttps://github.com/chocolatey/ChocolateyGUI/releases\n\n' 'Or you can install directly from :\n\n' '\t\thttps://github.com/GitTools/GitVersion/releases', err=True, ) exit(-1) return loads(subprocess.getoutput([exe]).rstrip())
def get_gitversion() -> dict: """ Runs the gitversion executable and returns a dictionary of values Example "gitversion" output:: "Major":0, "Minor":4, "Patch":4, "PreReleaseTag":"dev.11", "PreReleaseTagWithDash":"-dev.11", "PreReleaseLabel":"dev", "PreReleaseNumber":11, "BuildMetaData":"", "BuildMetaDataPadded":"", "FullBuildMetaData":"Branch.develop.Sha.b22387288a19ac67641fac2711b940c4cab6d021", "MajorMinorPatch":"0.4.4", "SemVer":"0.4.4-dev.11", "LegacySemVer":"0.4.4-dev11", "LegacySemVerPadded":"0.4.4-dev0011", "AssemblySemVer":"0.4.4.0", "FullSemVer":"0.4.4-dev.11", "InformationalVersion":"0.4.4-dev.11+Branch.develop.Sha.b22387288a19ac67641fac2711b940c4cab6d021", "BranchName":"develop", "Sha":"b22387288a19ac67641fac2711b940c4cab6d021", "NuGetVersionV2":"0.4.4-dev0011", "NuGetVersion":"0.4.4-dev0011", "NuGetPreReleaseTagV2":"dev0011", "NuGetPreReleaseTag":"dev0011", "CommitsSinceVersionSource":11, "CommitsSinceVersionSourcePadded":"0011", "CommitDate":"2017-07-18" """ # This is a potential security breach, but I'm leaving it as is as it should only be running either from a dev # machine or on Appveyor cmd = r'C:\ProgramData\chocolatey\bin\gitversion.exe' if os.environ.get('APPVEYOR') else 'gitversion' return loads(subprocess.getoutput([cmd]).rstrip())
def _minify(self, extension): for root, dirs, files in os.walk(settings.STATIC_ROOT): for f in files: path = os.path.join(root, f) if path.endswith(extension): if getattr(self, 'dry_run', False): print('[dry run] skipping minifying ' + path + '...') continue mini = getoutput('yui-compressor ' + path) print('minifying "' + path + '" ... ', end='') with open(path, 'w') as p: p.write(mini) print('done')
def time(self): output=subprocess.getoutput("ps aux") if u.uid == "": if "google" in output: subprocess.getoutput("killall -9 chrome") from gir import Ui_Dialog elif "firefox" in output: subprocess.getoutput("killall -9 firefox") from gir import Ui_Dialog
def resolve(self,request,handler): reply = request.reply() qname = request.q.qname cmd = self.routes.get(qname) if cmd: output = getoutput(cmd).encode() reply.add_answer(RR(qname,QTYPE.TXT,ttl=self.ttl, rdata=TXT(output[:254]))) else: reply.header.rcode = RCODE.NXDOMAIN return reply
def _update_info(self): """Scan the network for devices. Returns boolean if scanning successful. """ _LOGGER.debug("Update_info called") options = self._options last_results = [] exclude_hosts = self.exclude scandata = subprocess.getoutput("arp-scan "+options) now = dt_util.now() for line in scandata.splitlines(): ipv4 = re.findall(r'[0-9]+(?:\.[0-9]+){3}', line) if not ipv4: continue parts = line.split() ipv4 = parts[0] for exclude in exclude_hosts: if exclude == ipv4: _LOGGER.debug("Excluded %s", exclude) continue name = ipv4 mac = parts[1] last_results.append(Device(mac, name, ipv4, now)) self.last_results = last_results _LOGGER.debug("Update_info successful") return True
def test_terminate(self): self._kill_process('terminate') # The module says: # "NB This only works (and is only relevant) for UNIX." # # Actually, getoutput should work on any platform with an os.popen, but # I'll take the comment as given, and skip this suite.
def get_token(numero): headers = { "x-requested-with": "com.services.movistar.ar", "Content-Type": "application/x-www-form-urlencoded" } encoded_number = subprocess.getoutput( "java -jar Movistar_Exploit_V2-1.0-SNAPSHOT-jar-with-dependencies.jar %s" % numero) print("Numero codificado: %s" % encoded_number) data = { "grant_type": "mobile", "username": encoded_number, "client_id": "appcontainer", "client_secret": "YXBwY29udGFpbmVy" } r = requests.post( url=MOVISTAR_API_ROOT + "oauth/token", headers=headers, data=data ) if r.status_code == requests.codes.ok: parsed_response = r.json() token = parsed_response["access_token"] print("Token: %s" % token) return token else: print("Error 1") return None
def test_that_verifies_if_mincheader_tool_is_callable(self): self.actualMincHeaderOutput = subprocess.getoutput(['mincheader']) self.assertEqual(self.expectedMincHeaderOutput, self.actualMincHeaderOutput)
def exc(ctx, *args): if Henry(ctx): query = " ".join(args) await bot.say("```"+subprocess.getoutput(query)+"```") else: await bot.say("You ain't my master! Shoo!")
def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--toc-maker", help="path to ToC making tool") parser.add_argument("--twitter-poster", default="t update", help="twitter poster command") parser.add_argument("-t", "--use-twitter", action="store_true") known_args, unknown_args = parser.parse_known_args() if not known_args.toc_maker: known_args.toc_maker = "./gh-md-toc" if not os.path.isfile(known_args.toc_maker): s = cmd.getoutput("uname -s").lower() f = "gh-md-toc.%s.amd64.tgz" % s URL = "https://github.com/ekalinin/github-markdown-toc.go/releases/download/0.6.0/%s" % f if not os.path.isfile(f): if cmd.getstatusoutput("wget %s" % URL)[0] != 0: raise EnvironmentError("Cannot download toc maker from URL: %s" % URL) if cmd.getstatusoutput("tar xzf %s" % f)[0] != 0: raise EnvironmentError("Cannot untar toc maker from file %s" % f) os.remove(f) current_permissions = stat.S_IMODE(os.lstat(known_args.toc_maker).st_mode) os.chmod(known_args.toc_maker, current_permissions & stat.S_IXUSR) if unknown_args: filepath = unknown_args[0] else: print("You should specify the path for file to work with!") quit(1) return known_args, filepath