我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用os.popen()。
def popen(cmd, mode="r", buffering=-1): if not isinstance(cmd, str): raise TypeError("invalid cmd type (%s, expected string)" % type(cmd)) if mode not in ("r", "w"): raise ValueError("invalid mode %r" % mode) if buffering == 0 or buffering is None: raise ValueError("popen() does not support unbuffered streams") import subprocess, io if mode == "r": proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, bufsize=buffering) return _wrap_close(io.TextIOWrapper(proc.stdout), proc) else: proc = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, bufsize=buffering) return _wrap_close(io.TextIOWrapper(proc.stdin), proc) # Helper for popen() -- a proxy for a file whose close waits for the process
def ssh_setup_agent(config, envkeys=None): """ Starts the ssh-agent """ envkeys = envkeys or ['SSH_PRIVATE_KEY'] output = os.popen('ssh-agent -s').readlines() for line in output: matches = re.search(r"(\S+)\=(\S+)\;", line) if matches: config.environ[matches.group(1)] = matches.group(2) for envkey in envkeys: key = os.environ.get(envkey) if key: ssh_add_key(config.environ, key) else: logging.warning('%s is missing', envkey)
def status(args): with os.popen("docker inspect -f '{{json .State}}' " + args.container + " 2>&1") as pipe: status = pipe.read().strip() if "No such image or container" in status: print "0" else: statusjs = json.loads(status) if statusjs["Running"]: print "1" elif statusjs["ExitCode"] == 0: print "2" else: print "3" # get the uptime in seconds, if the container is running
def multi_stat_update(args, container_dir, filename): dict = {} try: pipe = os.popen("docker exec " + args.container + " cat " + container_dir + "/" + filename + " 2>&1") for line in pipe: m = _STAT_RE.match(line) if m: dict[m.group(1)] = m.group(2) pipe.close() f = open(args.container + "/" + filename,"w") for key in dict.keys(): f.write(key + " " + dict[key] + "\n") f.close() except Exception, e: debug(args.container + ": could not update " + filename) debug(str(sys.exc_info())) return dict
def get_numa_spec(): maxnode=0 maxcpu=1 numasupport=False try: lines = [line.rstrip() for line in os.popen('numactl --show')] if len(lines) > 0 : numasupport=True for l in lines: if l.startswith('nodebind'): maxnode=int(l.split()[-1]) if l.startswith('physcpubind'): maxcpu=int(l.split()[-1]) if "NO NUMA" in l.upper(): numasupport=False except: numasupport=False return numasupport,maxnode,maxcpu
def open(self, url, new=0, autoraise=True): if self._name == 'default': script = 'open location "%s"' % url.replace('"', '%22') # opens in default browser else: script = ''' tell application "%s" activate open location "%s" end '''%(self._name, url.replace('"', '%22')) osapipe = os.popen("osascript", "w") if osapipe is None: return False osapipe.write(script) rc = osapipe.close() return not rc # Don't clear _tryorder or _browsers since OS X can use above Unix support # (but we prefer using the OS X specific stuff)
def _find_mac(command, args, hw_identifiers, get_index): import os for dir in ['', '/sbin/', '/usr/sbin']: executable = os.path.join(dir, command) if not os.path.exists(executable): continue try: # LC_ALL to get English output, 2>/dev/null to # prevent output on stderr cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args) with os.popen(cmd) as pipe: for line in pipe: words = line.lower().split() for i in range(len(words)): if words[i] in hw_identifiers: return int( words[get_index(i)].replace(':', ''), 16) except IOError: continue return None
def _findLib_gcc(name): expr = r'[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name) fdout, ccout = tempfile.mkstemp() os.close(fdout) cmd = 'if type gcc >/dev/null 2>&1; then CC=gcc; elif type cc >/dev/null 2>&1; then CC=cc;else exit 10; fi;' \ '$CC -Wl,-t -o ' + ccout + ' 2>&1 -l' + name try: f = os.popen(cmd) try: trace = f.read() finally: rv = f.close() finally: try: os.unlink(ccout) except OSError, e: if e.errno != errno.ENOENT: raise if rv == 10: raise OSError, 'gcc or cc command not found' res = re.search(expr, trace) if not res: return None return res.group(0)
def _get_soname(f): # assuming GNU binutils / ELF if not f: return None cmd = 'if ! type objdump >/dev/null 2>&1; then exit 10; fi;' \ "objdump -p -j .dynamic 2>/dev/null " + f f = os.popen(cmd) dump = f.read() rv = f.close() if rv == 10: raise OSError, 'objdump command not found' f = os.popen(cmd) try: data = f.read() finally: f.close() res = re.search(r'\sSONAME\s+([^\s]+)', data) if not res: return None return res.group(1)
def terminal_size(): """ Return the number of terminal columns and rows, defaulting to 80x24 if the standard output is not a TTY. NOTE THAT THIS ONLY WORKS ON UNIX. """ if sys.stdout.isatty(): # TODO: This only works on Unix. rows, columns = [int(x) for x in os.popen('stty size', 'r').read().split()] else: rows = 24 columns = 80 return rows, columns
def do_osx_install(srcdir, targetdir): if os.path.exists(targetdir): print 'Target dir %s already exists! Removing...' shutil.rmtree(targetdir) install_script = os.popen('find '+ srcdir +' -iname install.sh').read().strip() print 'DBG install_script:', install_script os.popen('chmod +x "%s"' % install_script) cmd_install = '%s %s %s' % (pipes.quote(install_script), srcdir, targetdir) print 'DBG cmd: "%s"' % cmd_install cmd_chmod_chromium = 'find %s -name Chromium -exec chmod +x {} \;' % (targetdir) cmd_chmod_chromium_helper = 'find %s -name Chromium\ Helper -exec chmod +x {} \;' % (targetdir) for cmd in [cmd_install, cmd_chmod_chromium, cmd_chmod_chromium_helper]: proc = subprocess.Popen(cmd, shell=True) proc.wait() if proc.returncode: print "returncode " + str(proc.returncode)
def about(self, ctx): '''About me''' from clients import application_info changes = os.popen(r'git show -s HEAD~3..HEAD --format="[`%h`](https://github.com/Harmon758/Harmonbot/commit/%H) %s (%cr)"').read().strip() embed = discord.Embed(title = "About Me", color = clients.bot_color) embed.description = "[Changelog (Harmonbot Server)]({})\n[Invite Link]({})".format(clients.changelog, discord.utils.oauth_url(application_info.id)) # avatar = ctx.message.author.avatar_url or ctx.message.author.default_avatar_url # embed.set_author(name = ctx.message.author.display_name, icon_url = avatar) avatar = self.bot.user.avatar_url or self.bot.user.default_avatar_url # embed.set_thumbnail(url = avatar) embed.set_author(name = "Harmonbot (Discord ID: {})".format(self.bot.user.id), icon_url = avatar) if changes: embed.add_field(name = "Latest Changes:", value = changes, inline = False) embed.add_field(name = "Created on:", value = "February 10th, 2016") embed.add_field(name = "Version", value = clients.version) embed.add_field(name = "Library", value = "[discord.py](https://github.com/Rapptz/discord.py) v{0}\n([Python](https://www.python.org/) v{1.major}.{1.minor}.{1.micro})".format(discord.__version__, sys.version_info)) me = discord.utils.get(self.bot.get_all_members(), id = clients.owner_id) avatar = me.default_avatar_url if not me.avatar else me.avatar_url embed.set_footer(text = "Developer/Owner: {0} (Discord ID: {0.id})".format(me), icon_url = avatar) await self.bot.reply("", embed = embed) await self.bot.say("Changelog (Harmonbot Server): {}".format(clients.changelog))
def __init__(self, prog, **kw): try: self.prog = prog self.kw = kw self.popen = None if Popen.verbose: sys.stdout.write("Popen created: %r, kw=%r..." % (prog, kw)) do_delegate = kw.get('stdout', None) == -1 and kw.get('stderr', None) == -1 if do_delegate: if Popen.verbose: print("Delegating to real Popen") self.popen = self.real_Popen(prog, **kw) else: if Popen.verbose: print("Emulating") except Exception as e: if Popen.verbose: print("Exception: %s" % e) raise
def __getattr__(self, name): if Popen.verbose: sys.stdout.write("Getattr: %s..." % name) if name in Popen.__slots__: if Popen.verbose: print("In slots!") return object.__getattr__(self, name) else: if self.popen is not None: if Popen.verbose: print("from Popen") return getattr(self.popen, name) else: if name == "wait": return self.emu_wait else: raise Exception("subprocess emulation: not implemented: %s" % name)
def post(self, *args, **kwargs): try: servidor = self.get_argument('grupo') except: servidor ="all" item="" corpo="{\"hosts\":[" import os hosts=os.popen("ansible "+servidor+" --list-hosts").read() item=hosts.replace("\n","\",") item=item.replace(" "," \"") #for options in hosts: # item=item+" \""+options+"\"," item=item[1:-1] corpo=corpo+item+"]}" self.set_header("Content-Type", "application/json") self.write(corpo);
def step3(): key_vec = {} maxx = 12505807 size = 10000 for i in range(size, maxx, size): print(i, maxx) res = os.popen("head -n {i} ./dataset/bind.txt | tail -n {size} | ./fasttext print-sentence-vectors ./models/model.bin".format(i=i, size=size)).read() for line in res.split("\n"): if line == "": continue vec = list(map(float, line.split()[-100:])) txt = line.split()[:-100] key = " ".join(txt) if key_vec.get(key) is None: key_vec[key] = vec open("key_vec.pkl", "wb").write(pickle.dumps(key_vec))
def _step5(arr): kmeans = pickle.loads(open("kmeans.model", "rb").read()) key, lines, tipe = arr print(key) open("./tmp/tmp.{tipe}.{key}.txt".format(tipe=tipe,key=key), "w").write("\n".join(lines)) res = os.popen("./fasttext print-sentence-vectors ./models/model.bin < tmp/tmp.{tipe}.{key}.txt".format(tipe=tipe, key=key)).read() w = open("tmp/tmp.{tipe}.{key}.json".format(tipe=tipe,key=key), "w") for line in res.split("\n"): try: vec = list(map(float, line.split()[-100:])) except: print(line) print(res) continue x = np.array(vec) if np.isnan(x).any(): continue cluster = kmeans.predict([vec]) txt = line.split()[:-100] obj = {"txt": txt, "cluster": cluster.tolist()} data = json.dumps(obj, ensure_ascii=False) w.write( data + "\n" )
def main(): file_name = "dnsbee.txt" dot = "." if (os.path.exists(file_name)): file = open(file_name,"a") else: file = open(file_name,"w") host = sys.argv[1] nameserver = sys.argv[2] print "Trying to query", for i in range(0,len(records)): string = "dig " + records[i] + " " + host + " @" + nameserver command = os.popen(string,"r") while(1): line = command.readline() #line = line.strip() if line: print ".", file.write("Query> " + string + ":\n") file.write(line) else: break
def find_jmpreg(self,dir="c:\windows\system32"): k = os.listdir("c:\windows\system32") for z in k: if z.endswith(".dll"): data = [] cmd = "%s -D C:\windows\system32\%s" % (self.objdump,z) fh = os.popen(cmd,"r") data = fh.read(100000).split("\n") fh.close() for r in data: y = "eb\x20%s" % self.reg if r.find("jmp") != -1 and r.find(y) != -1: (addy,null) = r.split(":") temp = "0x%s" % addy print temp self.addrs[z] = int(temp,0) self.found = 1 if self.found != 1: print "found no jmp [reg]!" return 0 return 1 # pre defined database of jmp's
def gpu_status(self,av_type_list): for t in av_type_list: cmd='nvidia-smi -q --display='+t #print('\nCMD:',cmd,'\n') r=os.popen(cmd) info=r.readlines() r.close() content = " ".join(info) #print('\ncontent:',content,'\n') index=content.find('Attached GPUs') s=content[index:].replace(' ','').rstrip('\n') self.t_send(s) time.sleep(.5) #th.exit() #============================================================================== # #==============================================================================
def WriteResult(s_file, s_string): LOCK.acquire() try: found = False datafile = file(s_file) for dataline in datafile: if s_string in dataline: found = True break if found == False: f = open(s_file, 'a') f.write(str(s_string) + "\n") f.close() s = str("pkill -SIGHUP hostapd") p = os.popen(s, "r") while 1: line = p.readline() if not line: break except: PrintResult(1, "Error writing cracked user credentials to EAP users file. :( Sorz") LOCK.release() # This is the worker thread section.
def _syscmd_uname(option,default=''): """ Interface to the system's uname command. """ if sys.platform in ('dos','win32','win16','os2'): # XXX Others too ? return default try: f = os.popen('uname %s 2> %s' % (option, DEV_NULL)) except (AttributeError,os.error): return default output = string.strip(f.read()) rc = f.close() if not output or rc: return default else: return output
def _popen(command, args): import os path = os.environ.get("PATH", os.defpath).split(os.pathsep) path.extend(('/sbin', '/usr/sbin')) for dir in path: executable = os.path.join(dir, command) if (os.path.exists(executable) and os.access(executable, os.F_OK | os.X_OK) and not os.path.isdir(executable)): break else: return None # LC_ALL to ensure English output, 2>/dev/null to prevent output on # stderr (Note: we don't have an example where the words we search for # are actually localized, but in theory some system could do so.) cmd = 'LC_ALL=C %s %s 2>/dev/null' % (executable, args) return os.popen(cmd)
def _ipconfig_getnode(): """Get the hardware address on Windows by running ipconfig.exe.""" import os, re dirs = ['', r'c:\windows\system32', r'c:\winnt\system32'] try: import ctypes buffer = ctypes.create_string_buffer(300) ctypes.windll.kernel32.GetSystemDirectoryA(buffer, 300) dirs.insert(0, buffer.value.decode('mbcs')) except: pass for dir in dirs: try: pipe = os.popen(os.path.join(dir, 'ipconfig') + ' /all') except IOError: continue with pipe: for line in pipe: value = line.split(':')[-1].strip().lower() if re.match('([0-9a-f][0-9a-f]-){5}[0-9a-f][0-9a-f]', value): return int(value.replace('-', ''), 16)
def _read_output(commandstring): """Output from successful command execution or None""" # Similar to os.popen(commandstring, "r").read(), # but without actually using os.popen because that # function is not usable during python bootstrap. # tempfile is also not available then. import contextlib try: import tempfile fp = tempfile.NamedTemporaryFile() except ImportError: fp = open("/tmp/_osx_support.%s"%( os.getpid(),), "w+b") with contextlib.closing(fp) as fp: cmd = "%s 2>/dev/null >'%s'" % (commandstring, fp.name) return fp.read().strip() if not os.system(cmd) else None
def _findLib_gcc(name): expr = r'[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name) fdout, ccout = tempfile.mkstemp() os.close(fdout) cmd = 'if type gcc >/dev/null 2>&1; then CC=gcc; elif type cc >/dev/null 2>&1; then CC=cc;else exit 10; fi;' \ 'LANG=C LC_ALL=C $CC -Wl,-t -o ' + ccout + ' 2>&1 -l' + name try: f = os.popen(cmd) try: trace = f.read() finally: rv = f.close() finally: try: os.unlink(ccout) except OSError, e: if e.errno != errno.ENOENT: raise if rv == 10: raise OSError, 'gcc or cc command not found' res = re.search(expr, trace) if not res: return None return res.group(0)
def _findLib_crle(name, is64): if not os.path.exists('/usr/bin/crle'): return None if is64: cmd = 'env LC_ALL=C /usr/bin/crle -64 2>/dev/null' else: cmd = 'env LC_ALL=C /usr/bin/crle 2>/dev/null' for line in os.popen(cmd).readlines(): line = line.strip() if line.startswith('Default Library Path (ELF):'): paths = line.split()[4] if not paths: return None for dir in paths.split(":"): libfile = os.path.join(dir, "lib%s.so" % name) if os.path.exists(libfile): return libfile return None
def iTunes_backup_looker(): backupPath = globber("/Users/*/Library/Application Support/MobileSync/Backup/*/Info.plist") if len(backupPath) > 0: backups = True returnable = "%sLooking for backups! %s\n" % (blue_star, blue_star) for z, y in enumerate(backupPath): returnable += "\n----- Device " + str(z + 1) + " -----\n\n" returnable += "Product Name: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Product Name\"' '%s'" % y).read().replace("\n", "") returnable += "Product Version: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Product Version\"' '%s'" % y).read().replace("\n", "") returnable += "Last Backup Date: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Last Backup Date\"' '%s'" % y).read().replace("\n", "") returnable += "Device Name: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Device Name\"' '%s'" % y).read().replace("\n", "") returnable += "Phone Number: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Phone Number\"' '%s'" % y).read().replace("\n", "") returnable += "Serial Number: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Serial Number\"' '%s'" % y).read().replace("\n", "") returnable += "IMEI/MEID: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"IMEI\"' '%s'" % y).read().replace("\n", "") returnable += "UDID: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Target Identifier\"' '%s'" % y).read().replace("\n", "") returnable += "iTunes Version: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"iTunes Version\"' '%s'" % y).read().replace("\n", "") #iTunesBackupString += "Installed Apps: %s\n" % os.popen("/usr/libexec/PlistBuddy -c 'Print :\"Installed Applications\"' '%s'" % y).read().replace("\n", "") else: backups = False returnable = "%sNo local backups found %s\n" % (blue_star, blue_star) return (returnable, backups, len(backupPath))
def __print_log_event(self, event): time_string = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(event['timestamp']/1000)) rows, columns = os.popen('stty size', 'r').read().split() line_wrap = int(columns) - 22 message_lines = str(event['message']).splitlines() formatted_lines = [] for line in message_lines: line = line.replace('\t',' ') formatted_lines.append('\n'.join(line[i:i+line_wrap] for i in range(0, len(line), line_wrap))) message_string = '\n'.join(formatted_lines) message_string = message_string.replace('\n','\n ') print(time_string + " - " + message_string)
def __init__(self): self.client = socket(AF_INET, SOCK_STREAM) self.client.connect(self.ADDR) while True: data = self.client.recv(BUFSIZ) if data: try: if data == "exit": self.client.close() break ME = os.popen(data.decode('utf8')) os_result = ME.read() #print(os_result) self.client.sendall(os_result) except: self.client.close()
def test_no_memory_leak(): import gc import os def rss(): gc.collect() out = os.popen("ps -o rss= -p %d" % os.getpid()).read() return int(out.strip()) before = rss() for _ in range(100000): n = Name.parse("Reallyverylongfirstname Reallyverylonglastname") n.given_name n.surname after = rss() assert after < 1.25 * before
def Update(): # Check for ROOT # If "uls --update" is not run as ROOT, notify user & exit. print('Checking for ROOT...') if os.geteuid() != 0: print("ERR_1005: 'uls --update' must be run as ROOT. Use 'sudo uls --update' instead.") exit(1005) # Check for Internet connection before update print('Checking for Internet connection...') rPing = os.popen("ping -c 3 raw.githubusercontent.com | grep '0 received' | wc -l") strPing = rPing.read().strip('\n') rPing.close() # If Internet is unavailable, exit. if strPing == '1': print('ERR_1003: Internet is unavailable. Check your connection before running update.') exit(1003) # Now, do the update os.system("wget --no-check-certificate -O /usr/share/uls/uls_update.sh https://raw.githubusercontent.com/CYRO4S/Universal-Linux-Script/master/uls_update.sh && bash /usr/share/uls/uls_update.sh") exit(0) # Echo
def test_no_unknown_exported_symbols(): if not hasattr(_cffi_backend, '__file__'): py.test.skip("_cffi_backend module is built-in") if not sys.platform.startswith('linux'): py.test.skip("linux-only") g = os.popen("objdump -T '%s'" % _cffi_backend.__file__, 'r') for line in g: if not line.startswith('0'): continue if '*UND*' in line: continue name = line.split()[-1] if name.startswith('_') or name.startswith('.'): continue if name not in ('init_cffi_backend', 'PyInit__cffi_backend'): raise Exception("Unexpected exported name %r" % (name,)) g.close()
def parse_batch(self, sentenceDumpedFileName, parsingDumpedFileName): if os.path.exists('../stanford-parser-2012-03-09') == False: print >> sys.stderr, 'can not find Stanford parser directory' sys.exit(1) # tokenized cmd = r'java -server -mx4096m -cp "../stanford-parser-2012-03-09/*:" edu.stanford.nlp.parser.lexparser.LexicalizedParser -retainTMPSubcategories -sentences newline -tokenized -escaper edu.stanford.nlp.process.PTBEscapingProcessor -outputFormat "wordsAndTags, penn, typedDependencies" -outputFormatOptions "basicDependencies" edu/stanford/nlp/models/lexparser/englishPCFG.ser.gz ' + sentenceDumpedFileName r = os.popen(cmd).read().strip().decode('utf-8') f = open(parsingDumpedFileName, 'w') f.write(r.encode('utf-8')) f.close() rlist = r.replace('\n\n\n', '\n\n\n\n').split('\n\n') return rlist
def monkey_func(self, url, new=0, autoraise=True): if self._name == 'default': script = 'do shell script "open %s"' % url.replace('"', '%22') # opens in default browser else: script = ''' tell application "%s" activate open location "%s" end ''' % (self._name, url.replace('"', '%22')) osapipe = os.popen("osascript", "w") if osapipe is None: return False osapipe.write(script) rc = osapipe.close() return not rc
def console_setup(): global COLOR_WINDOW_TX global COLOR_WINDOW_BG global WINDOW_ROWS global WINDOW_COLS global WINDOW_HANDLE OUTPUT_ROW_BUFFER=WINDOW_ROWS os.system("@title FileBot v"+__version__) os.system("color "+str(hex(COLOR_WINDOW_TX+COLOR_WINDOW_BG*16))[2:]) os.popen("mode con: lines="+str(WINDOW_ROWS)+" cols="+str(WINDOW_COLS)) WINDOW_HANDLE=ctypes.windll.kernel32.GetStdHandle(-12) ctypes.windll.kernel32.SetConsoleScreenBufferSize(WINDOW_HANDLE,OUTPUT_ROW_BUFFER*(2**16)+WINDOW_COLS) cursor_visibility(False) return
def translate(word): if not word: return if re.match(u'.*[\u4e00-\u9fa5].*', word) or ' ' in word: p = {'wd': word} return "http://dict.baidu.com/s?" + urllib.parse.urlencode(p) reses = os.popen('sdcv -n ' + word).readlines() if not re.match(u'^Found 1 items.*', reses[0]): p = {'wd': word} return "http://dict.baidu.com/s?" + urllib.parse.urlencode(p) res = '' for i in range(4, len(reses)): res += reses[i] res = re.sub(u'\[.+\]', '', res) res = res.replace('\n', '') res = res.replace('//', '\r') return res
def getXmlInfo(self): xml_cmd = '' self.lastError = '' if 'Windows' in platform.system(): xml_cmd = "%s d xmltree \"%s\" AndroidManifest.xml " % (self.aapt_path, self.apk_path) # xml_cmd = "%s%saapt.exe d xmltree \"%s\" AndroidManifest.xml "%(self.aapt_path,os.sep, self.apk_path) if 'Linux' in platform.system(): xml_cmd = "%s%saapt d xmltree %s AndroidManifest.xml " % (self.aapt_path, os.sep, self.apk_path) try: strxml = os.popen(xml_cmd) self.xmltree = strxml.read() except Exception as e: # print "aapt Mainfestxml error" self.lastError = 'aapt get AndroidManifest.xml error' return False return True # ?xml???????
def parse_icon(self, icon_path=None): """ parse icon. :param icon_path: icon storage path """ if not icon_path: icon_path = os.path.dirname(os.path.abspath(__file__)) pkg_name_path = os.path.join(icon_path, self.package) if not os.path.exists(pkg_name_path): os.mkdir(pkg_name_path) aapt_line = "aapt dump badging %s | grep 'application-icon' | awk -F ':' '{print $2}'" % self.get_filename() parse_icon_rt = os.popen(aapt_line).read() icon_paths = [icon.replace("'", '') for icon in parse_icon_rt.split('\n') if icon] zfile = zipfile.ZipFile(StringIO.StringIO(self.__raw), mode='r') for icon in icon_paths: icon_name = icon.replace('/', '_') data = zfile.read(icon) with open(os.path.join(pkg_name_path, icon_name), 'w+b') as icon_file: icon_file.write(data) print "APK ICON in: %s" % pkg_name_path