我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用magic.open()。
def open(self): self.__inPos = 0 self.__inPosOld = 0 self.__outFile = None self.__current = DirHasher.FileIndex.Stat() try: if os.path.exists(self.__cachePath): self.__inFile = open(self.__cachePath, "rb") sig = self.__inFile.read(4) if sig == DirHasher.FileIndex.SIGNATURE: self.__mismatch = False self.__inPos = self.__inPosOld = 4 self.__readEntry() # prefetch first entry else: logging.getLogger(__name__).info( "Wrong signature at '%s': %s", self.__cachePath, sig) self.__inFile.close() self.__inFile = None self.__mismatch = True else: self.__inFile = None self.__mismatch = True except OSError as e: raise BuildError("Error opening hash cache: " + str(e))
def parse_icon(self, icon_path=None): """ parse icon. :param icon_path: icon storage path """ if not icon_path: icon_path = os.path.dirname(os.path.abspath(__file__)) pkg_name_path = os.path.join(icon_path, self.package) if not os.path.exists(pkg_name_path): os.mkdir(pkg_name_path) aapt_line = "aapt dump badging %s | grep 'application-icon' | awk -F ':' '{print $2}'" % self.get_filename() parse_icon_rt = os.popen(aapt_line).read() icon_paths = [icon.replace("'", '') for icon in parse_icon_rt.split('\n') if icon] zfile = zipfile.ZipFile(StringIO.StringIO(self.__raw), mode='r') for icon in icon_paths: icon_name = icon.replace('/', '_') data = zfile.read(icon) with open(os.path.join(pkg_name_path, icon_name), 'w+b') as icon_file: icon_file.write(data) print "APK ICON in: %s" % pkg_name_path
def readrewritelist(rewritelist): ## rewrite is a hash. Key is sha256 of the file. rewrite = {} try: rewritefile = open(rewritelist, 'r') rewritelines = rewritefile.readlines() rewritefile.close() for r in rewritelines: rs = r.strip().split() ## format error, bail out if len(rs) != 7: return {} (package, version, filename, origin, sha256, newp, newv) = rs ## dupe, skip if sha256 in rewrite: continue rewrite[sha256] = {'package': package, 'version': version, 'filename': filename, 'origin': origin, 'newpackage': newp, 'newversion': newv} except: return {} return rewrite ## split on the special characters, plus remove special control characters that are ## at the beginning and end of the string in escaped form. ## Return a list of strings.
def get_filetype(data): """There are two versions of python-magic floating around, and annoyingly, the interface changed between versions, so we try one method and if it fails, then we try the other. NOTE: you may need to alter the magic_file for your system to point to the magic file.""" if sys.modules.has_key('magic'): try: ms = magic.open(magic.MAGIC_NONE) ms.load() return ms.buffer(data) except: try: return magic.from_buffer(data) except magic.MagicException: magic_custom = magic.Magic(magic_file='C:\windows\system32\magic') return magic_custom.from_buffer(data) return ''
def _get_filetype(self, data): """Gets filetype, uses libmagic if available. @param data: data to be analyzed. @return: file type or None. """ if not HAVE_MAGIC: return None try: ms = magic.open(magic.MAGIC_NONE) ms.load() file_type = ms.buffer(data) except: try: file_type = magic.from_buffer(data) except Exception: return None finally: try: ms.close() except: pass return file_type
def magic(indata, mime=False): """ Performs file magic while maintaining compatibility with different libraries. """ try: if mime: mymagic = magic.open(magic.MAGIC_MIME_TYPE) else: mymagic = magic.open(magic.MAGIC_NONE) mymagic.load() except AttributeError: mymagic = magic.Magic(mime) mymagic.file = mymagic.from_file return mymagic.file(indata)
def smart_open(filename): ''' Returns an open file object if `filename` is plain text, else assumes it is a bzip2 compressed file and returns a file-like object to handle it. ''' if isplaintext(filename): f = open(filename, 'rt') else: file_type = mimetype(filename) if file_type.find('gzip') > -1: f = gzip.GzipFile(filename, 'rt') elif file_type.find('bzip2') > -1: f = bz2file.open(filename, 'rt') else: pass # Not supported format return f
def srm_download_to_file(url, file_): ''' Download the file in `url` storing it in the `file_` file-like object. ''' logger = logging.getLogger('dumper.__init__') ctx = gfal2.creat_context() # pylint: disable=no-member infile = ctx.open(url, 'r') try: chunk = infile.read(CHUNK_SIZE) except GError as e: if e[1] == 70: logger.debug('GError(70) raised, using GRIDFTP PLUGIN:STAT_ON_OPEN=False workarround to download %s', url) ctx.set_opt_boolean('GRIDFTP PLUGIN', 'STAT_ON_OPEN', False) infile = ctx.open(url, 'r') chunk = infile.read(CHUNK_SIZE) else: raise while chunk: file_.write(chunk) chunk = infile.read(CHUNK_SIZE)
def _hardcode_setup(): # A user may want to hardcode in and out paths for their analysis files. If no config file exists, it will ask if you want to store your in/out options in one and use them from then on out. sane = 0 home_dir = str(os.path.expanduser('~')) input_conf = raw_input('-- : -- Please enter full path to input sample directory: ') output_conf = raw_input('-- : -- Please enter full path to sample/analysis output directory: ') if not os.path.exists(input_conf) or not os.path.exists(output_conf): sys.exit("-- : -- Invalid paths detected. Please check input. Run again to re-enter paths.") print '-- : -- Is this input directory correct "'+str(input_conf)+'"?' print '-- : -- Is this output directory correct "'+str(output_conf)+'"?' while sane == 0: dir_choice = raw_input('-- : -- Yes or No: ') if dir_choice == 'Yes': conf_file = open(home_dir+'/automal_conf.conf', 'w') conf_file.write('ipath='+str(input_conf)+'\n') conf_file.write('opath='+str(output_conf)+'\n') conf_file.close() sane = 1 if dir_choice == 'No': sys.exit('-- : -- Exited on incorrect paths. Run again to re-enter.') if dir_choice != 'No' and dir_choice != 'Yes': print '-- : -- Invalid choice, try again. Yes or No.'
def _get_guest_digital_signers(self): retdata = dict() cert_data = dict() cert_info = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(self.results["info"]["id"]), "aux", "DigiSig.json") if os.path.exists(cert_info): with open(cert_info, "r") as cert_file: buf = cert_file.read() if buf: cert_data = json.loads(buf) if cert_data: retdata = { "aux_sha1": cert_data["sha1"], "aux_timestamp": cert_data["timestamp"], "aux_valid": cert_data["valid"], "aux_error": cert_data["error"], "aux_error_desc": cert_data["error_desc"], "aux_signers": cert_data["signers"] } return retdata
def fuzzy_hash(self): if not hasattr(self, '_fuzzy_hash'): # tlsh is not meaningful with files smaller than 512 bytes if os.stat(self.path).st_size >= 512: h = tlsh.Tlsh() with open(self.path, 'rb') as f: for buf in iter(lambda: f.read(32768), b''): h.update(buf) h.final() try: self._fuzzy_hash = h.hexdigest() except ValueError: # File must contain a certain amount of randomness. self._fuzzy_hash = None else: self._fuzzy_hash = None return self._fuzzy_hash
def has_same_content_as(self, other): logger.debug('File.has_same_content: %s %s', self, other) if os.path.isdir(self.path) or os.path.isdir(other.path): return False # try comparing small files directly first try: my_size = os.path.getsize(self.path) other_size = os.path.getsize(other.path) except OSError: # files not readable (e.g. broken symlinks) or something else, # just assume they are different return False if my_size == other_size and my_size <= SMALL_FILE_THRESHOLD: try: with profile('command', 'cmp (internal)'): with open(self.path, 'rb') as file1, open(other.path, 'rb') as file2: return file1.read() == file2.read() except OSError: # one or both files could not be opened for some reason, # assume they are different return False return self.cmp_external(other)
def get_type(self): try: ms = magic.open(magic.MAGIC_NONE) ms.load() file_type = ms.file(self.path) except: try: file_type = magic.from_file(self.path) except: try: import subprocess file_process = subprocess.Popen(['file', '-b', self.path], stdout = subprocess.PIPE) file_type = file_process.stdout.read().strip() except: return '' finally: try: ms.close() except: pass return file_type
def get_type(data): try: ms = magic.open(magic.MAGIC_NONE) ms.load() file_type = ms.buffer(data) except: try: file_type = magic.from_buffer(data) except: return '' finally: try: ms.close() except: pass return file_type
def generate_html(p, templateFile, templateDict, outputFile, overwrite, logger): if not Util.check_file_exists(templateFile): logger.error("Template file does not exist: '%s'" %(templateFile)) os._exit(-1) try: template = jinja2.Environment(loader = jinja2.FileSystemLoader(searchpath=os.path.split(templateFile)[0])).get_template(os.path.split(templateFile)[1]) except Exception, e: logger.error("Failed load template file '%s'" %(templateFile), exc_info=True) os._exit(-1) html = bokeh.embed.file_html(models=p, resources=bokeh.resources.INLINE, title=templateDict["title"] , template=template, template_variables=templateDict) if Util.check_file_exists(outputFile) and not overwrite: logger.error("Html file does exist: '%s'. Delete or use overwrite flag." %(outputFile)) os._exit(-1) try: file = open(outputFile,"w") file.write(html) file.close() except Exception, e: logger.error("Error while writing html file '%s'" %(outputFile), exc_info=True) os._exit(-1) # generates bokeh histogram_data # gets data from every "LatencyList" # data2 is just data/2.0 # commented out code is old and better to read but much slower due to "key not in" - if
def openCompressedFile(ycsbfile, dict, key, decompress, overwrite, logger): try: file = gzip.open(ycsbfile,"r") dict[key]=cPickle.load(file) file.close() except Exception, e: logger.error("Can't open '%s'. Is it really a compressed .ydc file?" %(ycsbfile), exc_info=True) os._exit(-1) # if you truly just want to decompress it, stop after saving plain ycsb file if decompress: try: newFileName=os.path.splitext(os.path.basename(ycsbfile))[0]+".log" if (not Util.check_file_exists(newFileName) or overwrite) and os.access(".", os.W_OK): if key in dict.keys() and dict[key] != None: decompressFile(dict[key], newFileName, logger) else: logger.error("Dictionary does not have filecontent or is null." , exc_info=True) os._exit(-1) else: logger.error("Can't create '%s' to write. Does it already exist?" %(newFileName), exc_info=True) os._exit(-1) except Exception, e: logger.error("Can't open '%s'." %("%s.log.log" %(os.path.basename(ycsbfile))), exc_info=True) os._exit(-1)
def file_type(file_loc): try: import magic except: print('[W] ACBS cannot find libmagic bindings, will use bundled one instead.') import lib.magic as magic mco = magic.open(magic.MIME_TYPE | magic.MAGIC_SYMLINK) mco.load() try: tp = mco.file(file_loc) tp_list = tp.decode('utf-8').split('/') except: print('[W] Unable to determine the file type!') return ['unknown', 'unknown'] return tp_list
def file_type_full(file_loc): try: import magic except: print('[W] ACBS cannot find libmagic bindings, will use bundled one instead.') import lib.magic as magic mco = magic.open(magic.NONE | magic.MAGIC_SYMLINK) mco.load() try: tp = mco.file(file_loc) except: print('[W] Unable to determine the file type!') return 'data' return tp.decode('utf-8')
def hashFile(path): m = hashlib.sha1() try: with open(path, 'rb', buffering=0) as f: buf = f.read(16384) while len(buf) > 0: m.update(buf) buf = f.read(16384) except OSError as e: logging.getLogger(__name__).warning("Cannot hash file: %s", str(e)) return m.digest()
def open(self): pass
def hashDirectory(self, path): self.__index.open() try: return self.__hashDir(os.fsencode(path)) finally: self.__index.close()
def hashPath(self, path): path = os.fsencode(path) try: s = os.lstat(path) except OSError as err: logging.getLogger(__name__).warning("Cannot stat '%s': %s", path, str(err)) return b'' self.__index.open() try: return self.__hashEntry(path, b'', s) finally: self.__index.close()
def _process_cache(self, split="\n", rstrip=True): try: ftype = magic.from_file(self.cache, mime=True) except AttributeError: try: mag = magic.open(magic.MAGIC_MIME) mag.load() ftype = mag.file(self.cache) except AttributeError as e: raise RuntimeError('unable to detect cached file type') if PYVERSION < 3: ftype = ftype.decode('utf-8') if ftype.startswith('application/x-gzip') or ftype.startswith('application/gzip'): from csirtg_smrt.decoders.zgzip import get_lines for l in get_lines(self.cache, split=split): yield l return if ftype == "application/zip": from csirtg_smrt.decoders.zzip import get_lines for l in get_lines(self.cache, split=split): yield l return # all others, mostly txt, etc... with open(self.cache) as f: for l in f: yield l
def _cache_write(self, s): with open(self.cache, 'wb') as f: auth = False if self.username: auth = (self.username, self.password) resp = self._cache_refresh(s, auth) if not resp: return for block in resp.iter_content(1024): f.write(block)
def get_mimetype(f): try: ftype = magic.from_file(f, mime=True) except AttributeError: try: mag = magic.open(magic.MAGIC_MIME) mag.load() ftype = mag.file(f) except AttributeError as e: raise RuntimeError('unable to detect cached file type') if PYVERSION < 3: ftype = ftype.decode('utf-8') return ftype
def get_type(f, mime=None): if not mime: mime = get_mimetype(f) if isinstance(f, str): f = open(f) t = None for tt in TESTS: f.seek(0) t = tt(f, mime) if t: return t
def detect_magic(): global g_m if hasattr(magic, 'open'): g_m = magic.open(magic.MAGIC_SYMLINK) g_m.load()
def gethash(path, filename): scanfile = open("%s/%s" % (path, filename), 'r') h = hashlib.new('sha256') scanfile.seek(0) hashdata = scanfile.read(10000000) while hashdata != '': h.update(hashdata) hashdata = scanfile.read(10000000) scanfile.close() return h.hexdigest() ## method to compare binaries. Returns the amount of bytes that differ ## according to bsdiff, or 0 if the files are identical
def main(argv): parser = OptionParser() parser.add_option("-f", "--filedir", action="store", dest="filedir", help="path to directory containing files to unpack", metavar="DIR") (options, args) = parser.parse_args() if options.filedir == None: parser.error("Specify dir with files") else: try: filelist = open(os.path.join(options.filedir, "LIST")).readlines() except: parser.error("'LIST' not found in file dir") ## first process the LIST file pkgmeta = [] for unpackfile in filelist: try: unpacks = unpackfile.strip().split() if len(unpacks) == 3: origin = "unknown" (package, version, filename) = unpacks else: (package, version, filename, origin) = unpacks pkgmeta.append((options.filedir, filename)) except Exception, e: # oops, something went wrong print >>sys.stderr, e pool = multiprocessing.Pool() unpackresults = pool.map(unpack, pkgmeta, 1) pool.terminate() for i in unpackresults: if i != None: (filename, result) = i if not result: print "corrupt archive: %s" % filename
def scanArchitecture(filename, tags, cursor, conn, filehashes, blacklist=[], scanenv={}, scandebug=False, unpacktempdir=None): if not 'elf' in tags: return archres = elfcheck.getArchitecture(filename, tags) if archres != None: return (['architecture'], archres) ## search markers for various open source programs ## This search is not accurate, but might come in handy in some situations
def searchUnpackLzip(filename, tempdir=None, blacklist=[], offsets={}, scanenv={}, debug=False): hints = {} if not 'lzip' in offsets: return ([], blacklist, [], hints) if offsets['lzip'] == []: return ([], blacklist, [], hints) filesize = os.stat(filename).st_size if filesize < 5: return ([], blacklist, [], hints) diroffsets = [] tags = [] counter = 1 for offset in offsets['lzip']: blacklistoffset = extractor.inblacklist(offset, blacklist) if blacklistoffset != None: continue ## sanity check, only versions 0 or 1 are supported lzipfile = open(filename, 'rb') lzipfile.seek(offset+4) lzipversion = lzipfile.read(1) lzipfile.close() if struct.unpack('<B', lzipversion)[0] > 1: continue tmpdir = dirsetup(tempdir, filename, "lzip", counter) (res, lzipsize) = unpackLzip(filename, offset, tmpdir) if res != None: diroffsets.append((res, offset, lzipsize)) blacklist.append((offset, offset+lzipsize)) counter = counter + 1 if offset == 0 and lzipsize == filesize: tags.append("compressed") tags.append("lzip") else: ## cleanup os.rmdir(tmpdir) return (diroffsets, blacklist, tags, hints)
def gzipcrc32(filename): datafile = open(filename, 'rb') datafile.seek(0) databuffer = datafile.read(10000000) crc32 = binascii.crc32('') while databuffer != '': crc32 = binascii.crc32(databuffer, crc32) databuffer = datafile.read(10000000) datafile.close() crc32 = crc32 & 0xffffffff return crc32
def searchUnpackKnownGzip(filename, tempdir=None, scanenv={}, debug=False): ## first check if the file actually could be a valid gzip file gzipfile = open(filename, 'rb') gzipfile.seek(0) gzipheader = gzipfile.read(3) gzipfile.close() if gzipheader != fsmagic.fsmagic['gzip']: return ([], [], [], {}) ## then try unpacking it. res = searchUnpackGzip(filename, tempdir, [], {'gzip': [0]}, scanenv, debug) (diroffsets, blacklist, newtags, hints) = res failed = False ## there were results, so check if they were successful if diroffsets != []: if len(diroffsets) != 1: failed = True else: (dirpath, startoffset, endoffset) = diroffsets[0] if startoffset != 0 or endoffset != os.stat(filename).st_size: failed = True if failed: for i in diroffsets: (dirpath, startoffset, endoffset) = i try: shutil.rmtree(dirpath) except: pass return ([], [], [], {}) else: return (diroffsets, blacklist, newtags, hints) return ([], [], [], {})
def searchUnpackKnownBzip2(filename, tempdir=None, scanenv={}, debug=False): ## first check if the file actually could be a valid gzip file bzip2file = open(filename, 'rb') bzip2file.seek(0) bzip2header = bzip2file.read(3) bzip2file.close() if bzip2header != fsmagic.fsmagic['bz2']: return ([], [], [], {}) ## then try unpacking it. res = searchUnpackBzip2(filename, tempdir, [], {'bz2': [0]}, scanenv, debug) (diroffsets, blacklist, newtags, hints) = res failed = False ## there were results, so check if they were successful if diroffsets != []: if len(diroffsets) != 1: failed = True else: (dirpath, startoffset, endoffset) = diroffsets[0] if startoffset != 0 or endoffset != os.stat(filename).st_size: failed = True if failed: for i in diroffsets: (dirpath, startoffset, endoffset) = i try: shutil.rmtree(dirpath) except: pass return ([], [], [], {}) else: return (diroffsets, blacklist, newtags, hints) return ([], [], [], {}) ## search and unpack bzip2 compressed files
def searchUnpackRZIP(filename, tempdir=None, blacklist=[], offsets={}, scanenv={}, debug=False): hints = {} if not 'rzip' in offsets: return ([], blacklist, [], hints) if offsets['rzip'] == []: return ([], blacklist, [], hints) if offsets['rzip'][0] != 0: return ([], blacklist, [], hints) if os.stat(filename).st_size < 10: return ([], blacklist, [], hints) diroffsets = [] tags = [] offset = 0 rzipfile = open(filename, 'rb') rzipfile.seek(0) rzipdata = rzipfile.read(10) rzipfile.close() rzipsize = struct.unpack('>L', rzipdata[6:10])[0] blacklistoffset = extractor.inblacklist(offset, blacklist) if blacklistoffset != None: return (diroffsets, blacklist, tags, hints) tmpdir = dirsetup(tempdir, filename, "rzip", 1) res = unpackRZIP(filename, offset, rzipsize, tmpdir) if res != None: rzipdir = res diroffsets.append((rzipdir, offset, 0)) #blacklist.append((offset, offset + unpackrzipsize)) #if offset == 0: # tags.append("compressed") # tags.append("rzip") else: ## cleanup os.rmdir(tmpdir) return (diroffsets, blacklist, tags, hints)
def searchUnpackAndroidSparse(filename, tempdir=None, blacklist=[], offsets={}, scanenv={}, debug=False): hints = {} if not 'android-sparse' in offsets: return ([], blacklist, [], hints) if offsets['android-sparse'] == []: return ([], blacklist, [], hints) diroffsets = [] counter = 1 tags = [] for offset in offsets['android-sparse']: blacklistoffset = extractor.inblacklist(offset, blacklist) if blacklistoffset != None: continue ## first see if the major version is correct sparsefile = open(filename, 'rb') sparsefile.seek(offset+4) sparsedata = sparsefile.read(2) sparsefile.close() if len(sparsedata) != 2: break majorversion = struct.unpack('<H', sparsedata)[0] if not majorversion == 1: continue tmpdir = dirsetup(tempdir, filename, "android-sparse", counter) res = unpackAndroidSparse(filename, offset, tmpdir) if res != None: (sparsesize, sparsedir) = res diroffsets.append((sparsedir, offset, sparsesize)) blacklist.append((offset, offset + sparsesize)) counter = counter + 1 else: ## cleanup os.rmdir(tmpdir) return (diroffsets, blacklist, tags, hints)
def searchUnpackIHex(filename, tempdir=None, blacklist=[], offsets={}, scanenv={}, debug=False): hints = {} tags = [] diroffsets = [] counter = 1 filesize = os.stat(filename).st_size tmpdir = dirsetup(tempdir, filename, "ihex", counter) tmpfile = tempfile.mkstemp(dir=tmpdir) datafile = open(filename, 'r') foundend = False offset = 0 for d in datafile: if foundend: os.fdopen(tmpfile[0]).close() datafile.close() os.rmdir(tmpdir) return (diroffsets, blacklist, tags, hints) b = d.strip() if not b.startswith(':'): if not b.startswith('#'): break if len(b) < 3: break bytecount = ord(b[1:3].decode('hex')) address = struct.unpack('>H', b[3:7].decode('hex')) recordtype = ord(b[7:9].decode('hex')) if recordtype == 1: foundend = True break if recordtype != 0: continue databytes = b[9:9+bytecount*2].decode('hex') os.write(tmpfile[0], databytes) os.fdopen(tmpfile[0]).close() datafile.close() diroffsets.append((tmpdir, offset, filesize)) blacklist.append((offset, offset + filesize)) return (diroffsets, blacklist, tags, hints) ## sometimes MP3 audio files are embedded into binary blobs
def run(self): ret = [] source = open(self.filepath, "rb").read() # Get rid of superfluous comments. source = re.sub("/\\*.*?\\*/", "", source, flags=re.S) for script in re.findall(self.script_re, source, re.I | re.S): try: x = bs4.BeautifulSoup(script, "html.parser") language = x.script.attrs.get("language", "").lower() except: language = None # We can't rely on bs4 or any other HTML/XML parser to provide us # with the raw content of the xml tag as they decode html entities # and all that, leaving us with a corrupted string. source = re.match("<.*>(.*)</.*>$", script, re.S).group(0) # Decode JScript.Encode encoding. if language in ("jscript.encode", "vbscript.encode"): source = self.decode(source) ret.append(to_unicode(source)) return ret
def _get_keys(self): """Get any embedded plaintext public and/or private keys.""" buf = open(self.file_path).read() ret = set() ret.update(re.findall(self.PUBKEY_RE, buf)) ret.update(re.findall(self.PRIVKEY_RE, buf)) return list(ret)
def get_filetype(data): """There are two versions of python-magic floating around, and annoyingly, the interface changed between versions, so we try one method and if it fails, then we try the other""" if sys.modules.has_key('magic'): try: ms = magic.open(magic.MAGIC_NONE) ms.load() return ms.buffer(data) except: return magic.from_buffer(data)
def io_dd(indir, offset, size, outdir): """ Given a path to a target file, extract size bytes from specified offset to given output file. """ if not size: return with open(indir, "rb") as ifp: with open(outdir, "wb") as ofp: ifp.seek(offset, 0) ofp.write(ifp.read(size))
def io_md5(target): """ Performs MD5 with a block size of 64kb. """ blocksize = 65536 hasher = hashlib.md5() with open(target, 'rb') as ifp: buf = ifp.read(blocksize) while buf: hasher.update(buf) buf = ifp.read(blocksize) return hasher.hexdigest()
def http_download(url, filename): ''' Download the file in `url` storing it in the path given by `filename`. ''' with open(filename, 'w') as f: http_download_to_file(url, f)
def srm_download(url, filename): ''' Download the file in `url` storing it in the path given by `filename`. ''' with open(filename, 'w') as f: srm_download_to_file(url, f)
def _get_hc_dirs(): try: read_conf = open(str(os.path.expanduser('~'))+'/automal_conf.conf', 'r').readlines() except IOError: sys.exit("-- : -- Unable to access conf. file. Run and re-enter new conf. file paths.") for line in read_conf: if line.startswith('ipath'): input_path = str(line.split('=')[1]).strip() if line.startswith('opath'): output_path = str(line.split('=')[1]).strip() return(input_path, output_path)
def _get_file_type(full_targ_path): # This function takes the full path of a target sample and determines/returns the file type via python-magic. try: magicObj = magic.open(magic.MAGIC_NONE) magicObj.load() magic_out = str(magicObj.file(full_targ_path)) except AttributeError: magic_out = str(magic.from_file(full_targ_path)) return(magic_out)
def _swf_analysis(full_targ_path): # This function calls swftools, tools, and flasm against SWF samples to extract data and/or performs analysis as needed. command_out = subprocess.Popen(["swfdump", "-a", full_targ_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[0] command2_out = subprocess.Popen(["swfextract", full_targ_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[0] command2_list = command2_out.split('\n') command_out_list = command_out.split('\n') swf_ioc_res = "" for out in command_out_list: strOut = str(out) ioc_list = ["http", "www", ".com", ".net", ".info", "GetVariable", "GetURL", 'String:"_post"', 'String:"send"', "\\\\", "pushstring", "url.split", ".php", "urlmon", ".exe"] for indi in ioc_list: if indi in strOut: swf_ioc_res = "Present" if len(swf_ioc_res) == 0: swf_ioc_res = "None" extract_list_fns = [] for out in command2_list: if "JPEG" in out: j_id = out.rfind(' ')+1 j_id = int(out[j_id:len(out)]) # Sometimes picture extraction doesn't occur... correctly, so we suppress the output. If we get it, great, if we don't whatever, for now. os_null = open(os.devnull, 'wb') subprocess.Popen(['swfextract', full_targ_path, '-j', str(j_id), '-o', '/tmp/automal/'+str(j_id)+'.jpg'], stdout=os_null, stderr=os_null) subprocess.Popen(['swfextract', full_targ_path, '-p', str(j_id), '-o', '/tmp/automal/'+str(j_id)+'.png'], stdout=os_null, stderr=os_null) extract_list_fns.append('/tmp/automal/'+str(j_id)) return(command_out, extract_list_fns, command2_out, swf_ioc_res)
def _c_sample_out_dir(targ, automal_dir): # When we analyze samples, a output directory named after the MD5 hash of the sample is created and/or used for the samples specific exports, putput info, etc. out_md5 = str(hashlib.md5(targ).hexdigest()) out_full_path = automal_dir+'/'+out_md5 if not os.path.exists(out_full_path): os.makedirs(out_full_path) out_file_Obj = open(out_full_path+'/Output.txt', 'a') return (out_file_Obj, out_full_path, out_md5)