我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用pefile.DIRECTORY_ENTRY。
def _fixup_pe_header(self, pe): """Fixes the PE header from an in-memory representation to an on-disk representation.""" for section in pe.sections: section.PointerToRawData = section.VirtualAddress section.SizeOfRawData = max( section.Misc_VirtualSize, section.SizeOfRawData ) reloc = pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_BASERELOC"] if len(pe.OPTIONAL_HEADER.DATA_DIRECTORY) < reloc: return reloc = pe.OPTIONAL_HEADER.DATA_DIRECTORY[reloc] if not reloc.VirtualAddress or not reloc.Size: return # Disable relocations as those have already been applied. reloc.VirtualAddress = reloc.Size = 0 pe.FILE_HEADER.Characteristics |= \ pefile.IMAGE_CHARACTERISTICS["IMAGE_FILE_RELOCS_STRIPPED"]
def initialize(self, sample): if(self.already_initialized): return self.library self.already_initialized = True try: self.library = pefile.PE(data=sample.getBinary(), fast_load=True) # see if this initializations can be done on plugins. self.library.parse_data_directories(directories=[ pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT'], pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT'], pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_TLS'], pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_SECURITY'], pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE']]) except pefile.PEFormatError: # print("parse fail") self.library = None # print(traceback.format_exc()) logging.error("Error parsing pefileModule with sample:%s", sample.getID(), exc_info=True)
def get_import_size_stats(self): # self.pe.parse_data_directories() # si if has fast load. total = 0 if (self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT']].VirtualAddress == 0): return 0, 0, 0 for entry in self.pe.DIRECTORY_ENTRY_IMPORT: total = total + len(entry.imports) # print entry.dll # for imp in entry.imports: # print '\t', hex(imp.address), imp.name cant_librerias = (len(self.pe.DIRECTORY_ENTRY_IMPORT)) total_imports = total promedio = total / cant_librerias return total_imports, cant_librerias, promedio
def getImports(self): if (self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT']].VirtualAddress == 0): return None d = {} # print(self.pe.DIRECTORY_ENTRY_IMPORT) for entry in self.pe.DIRECTORY_ENTRY_IMPORT: aux = [] for i in range(len(entry.dll)): if(ord(entry.dll[i]) >= 128): aux.append('.') else: aux.append(entry.dll[i]) dll_name = "".join(aux) # print entry.dll # print entry.imports l = [] for imp in entry.imports: l.append(str(imp.name)) # print '\t', hex(imp.address), imp.name d[unicode(str(dll_name), "utf-8")] = l return d
def test_write_header_fields(self): """Verify correct field data modification.""" # Test version information writing control_file = os.path.join(REGRESSION_TESTS_DIR, 'MSVBVM60.DLL') pe = pefile.PE(control_file, fast_load=True) pe.parse_data_directories( directories=[ pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE']]) original_data = pe.write() str1 = b'string1' str2 = b'str2' str3 = b'string3' pe.FileInfo[0].StringTable[0].entries['FileDescription'] = str1 pe.FileInfo[0].StringTable[0].entries['FileVersion'] = str2 pe.FileInfo[0].StringTable[0].entries['InternalName'] = str3 new_data = pe.write() diff, differences = 0, list() for idx in range(len(original_data)): if original_data[idx] != new_data[idx]: diff += 1 # Skip the zeroes that pefile automatically adds to pad a new, # shorter string, into the space occupied by a longer one. if new_data[idx] != 0: differences.append(chr(new_data[idx])) # Verify all modifications in the file were the ones we just made # self.assertEqual(''.join(differences).encode('utf-8', 'backslashreplace'), str1 + str2 + str3) pe.close()
def ScanFile(filename, signatures, minimumEntropy): global oLogger if not FileContentsStartsWithMZ(filename): return try: pe = GetPEObject(filename) except pefile.PEFormatError: oLogger.PrintAndLog(('%s', '%s'), (filename, 'PEFormatError')) return except TypeError: oLogger.PrintAndLog(('%s', '%s'), (filename, 'TypeError')) return try: raw = pe.write() except MemoryError: oLogger.PrintAndLog(('%s', '%s'), (filename, 'MemoryError')) return entropy = pe.sections[0].entropy_H(raw) if entropy >= minimumEntropy: countFlagsExecute = 0 countFlagsExecuteAndWrite = 0 for section in pe.sections: if section.IMAGE_SCN_MEM_EXECUTE: countFlagsExecute += 1 if section.IMAGE_SCN_MEM_EXECUTE and section.IMAGE_SCN_MEM_WRITE: countFlagsExecuteAndWrite += 1 calculatedCRC = pe.generate_checksum() crcDifferent = pe.OPTIONAL_HEADER.CheckSum != 0 and pe.OPTIONAL_HEADER.CheckSum != calculatedCRC info = GetVersionInfo(pe) oLogger.PrintAndLog(('%s', '%f', '%d', '%d', '%d', '%d', '%08X', '%08X', '%d', '%s', '%s', '%s', '%s'), (filename, entropy, len(pe.sections), countFlagsExecute, countFlagsExecuteAndWrite, pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_SECURITY']].Size, pe.OPTIONAL_HEADER.CheckSum, calculatedCRC, crcDifferent, time.asctime(time.gmtime(pe.FILE_HEADER.TimeDateStamp)), repr(RVOES(info, 'CompanyName')), repr(RVOES(info, 'ProductName')), hashlib.md5(raw).hexdigest()))
def get_reloc_diff(rinstrs): """When reordering relocatable instructions, we also need to update the relocation info. This function returns a byte diff of the relocation section.""" diff = [] relocations = {} # TODO cache relocations pe = pefile.PE(inp_dump.get_input_file_path(), fast_load=True) pe.parse_data_directories(directories=[ pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_BASERELOC']]) if pe.OPTIONAL_HEADER.DATA_DIRECTORY[5].Size > 0: for base_reloc in pe.DIRECTORY_ENTRY_BASERELOC: for reloc in filter(lambda x: x.type == 3, base_reloc.entries): # HIGHLOW relocations[reloc.rva] = reloc.struct.get_file_offset() base = pe.OPTIONAL_HEADER.ImageBase # now check if we reordered any relocatable data for rins in filter(lambda x: x.inst_len >= 5, rinstrs): # print rins.disas, hex(rins.addr), hex(rins.raddr) # a relocatable ref can be found after the first byte (opcode) and is # 4 bytes long (no need to check the last three bytes of the instruction) for rva in xrange(rins.addr - base + 1, rins.addr - base + rins.inst_len - 3): if rva in relocations: foff = relocations[rva] new_rva = rva + rins.raddr - rins.addr new_rva_h = ((new_rva >> 8) & 0xf) | 3 << 4 # 3 is HIGHLOW # print "relocations: %x %x %x %x %x %x %x" % (rva, new_rva, rva & 0xff, # new_rva & 0xff, (rva >> 8) & 0xff, (new_rva >> 8) & 0xff, new_rva_h) diff.append((foff + 1, chr((rva >> 8) & 0xff), chr(new_rva_h))) diff.append((foff, chr(rva & 0xff), chr(new_rva & 0xff))) return diff
def _getImports_pe(pth): """ Find the binary dependencies of PTH. This implementation walks through the PE header and uses library pefile for that and supports 32/64bit Windows """ import pefile dlls = set() # By default library pefile parses all PE information. # We are only interested in the list of dependent dlls. # Performance is improved by reading only needed information. # https://code.google.com/p/pefile/wiki/UsageExamples pe = pefile.PE(pth, fast_load=True) pe.parse_data_directories(directories=[ pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT'], pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT'], ]) # Some libraries have no other binary dependencies. Use empty list # in that case. Otherwise pefile would return None. # e.g. C:\windows\system32\kernel32.dll on Wine for entry in getattr(pe, 'DIRECTORY_ENTRY_IMPORT', []): dll_str = winutils.convert_dll_name_to_str(entry.dll) dlls.add(dll_str) # We must also read the exports table to find forwarded symbols: # http://blogs.msdn.com/b/oldnewthing/archive/2006/07/19/671238.aspx exportSymbols = getattr(pe, 'DIRECTORY_ENTRY_EXPORT', None) if exportSymbols: for sym in exportSymbols.symbols: if sym.forwarder is not None: # sym.forwarder is for example 'KERNEL32.EnterCriticalSection' dll, _ = sym.forwarder.split('.') dlls.add(winutils.convert_dll_name_to_str(dll) + ".dll") return dlls
def checkTSL(self): _tls = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[ pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_TLS']].VirtualAddress if _tls: return _tls else: return None
def process(self): pelib = self._getLibrary(PEFileModule().getName()) if(pelib is None): return "" try: if (pelib.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT']].VirtualAddress == 0): return "" except Exception, e: print str(e) return "" d = [] dir_ent_imp = None try: dir_ent_imp = pelib.DIRECTORY_ENTRY_IMPORT except Exception, e: print str(e) return "" for entry in dir_ent_imp: dll_name = repr(entry.dll).lower() l = [] for imp in entry.imports: l.append(repr(imp.name).lower()) # aux={} # aux["name"]=imp.name # aux["ordinal"]=imp.ordinal # l.append(aux) dic_ent = {"lib": dll_name, "functions": l} d.append(dic_ent) return d
def hash_imports(hashfun, path, data=None): if data: pe = pefile.PE(data=data) else: pe = pefile.PE(path) dll_name = os.path.split(path)[-1].split('.')[0] try: x = pe.IMAGE_DIRECTORY_ENTRY_EXPORT except: if pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT']].VirtualAddress != 0: pe.parse_data_directories( directories=[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT']]) ret = {} # write name of library as well h = hashfun(dll_name) ret[h] = dll_name h = hashfun(dll_name.lower()) ret[h] = dll_name.lower() h = hashfun(dll_name.upper()) ret[h] = dll_name.upper() if dll_name[-3:].lower() != 'dll': h = hashfun(dll_name.lower() + '.dll') h = hashfun(dll_name.upper() + '.DLL') for entry in pe.DIRECTORY_ENTRY_EXPORT.symbols: if entry.name != None: for n in [entry.name.lower(), entry.name.upper(), entry.name]: h = hashfun(n) ret[h] = n return {dll_name: ret}
def get_debug_data(pe, type=DEBUG_TYPE[u'IMAGE_DEBUG_TYPE_CODEVIEW']): retval = None if not hasattr(pe, u'DIRECTORY_ENTRY_DEBUG'): # fast loaded - load directory pe.parse_data_directories(DIRECTORY_ENTRY[u'IMAGE_DIRECTORY_ENTRY_DEBUG']) if not hasattr(pe, u'DIRECTORY_ENTRY_DEBUG'): raise PENoDebugDirectoryEntriesError() else: for entry in pe.DIRECTORY_ENTRY_DEBUG: off = entry.struct.PointerToRawData size = entry.struct.SizeOfData if entry.struct.Type == type: retval = pe.__data__[off:off+size] break return retval
def reorder_imports(input_dir, output_dir, architecture): """Swap chrome_elf.dll to be the first import of chrome.exe. Also copy over any related files that might be needed (pdbs, manifests etc.). """ # TODO(thakis): See if there is a reliable way to write the # correct executable in the first place, so that this script # only needs to verify that and not write a whole new exe. input_image = os.path.join(input_dir, 'chrome.exe') output_image = os.path.join(output_dir, 'chrome.exe') # pefile mmap()s the whole executable, and then parses parts of # it into python data structures for ease of processing. # To write the file again, only the mmap'd data is written back, # so modifying the parsed python objects generally has no effect. # However, parsed raw data ends up in pe.Structure instances, # and these all get serialized back when the file gets written. # So things that are in a Structure must have their data set # through the Structure, while other data must bet set through # the set_bytes_*() methods. pe = pefile.PE(input_image, fast_load=True) if architecture == 'x64': assert pe.PE_TYPE == pefile.OPTIONAL_HEADER_MAGIC_PE_PLUS else: assert pe.PE_TYPE == pefile.OPTIONAL_HEADER_MAGIC_PE pe.parse_data_directories(directories=[ pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT']]) found_elf = False for i, peimport in enumerate(pe.DIRECTORY_ENTRY_IMPORT): if peimport.dll.lower() == 'chrome_elf.dll': assert not found_elf, 'only one chrome_elf.dll import expected' found_elf = True if i > 0: swap = pe.DIRECTORY_ENTRY_IMPORT[0] # Morally we want to swap peimport.struct and swap.struct here, # but the pe module doesn't expose a public method on Structure # to get all data of a Structure without explicitly listing all # field names. # NB: OriginalFirstThunk and Characteristics are an union both at # offset 0, handling just one of them is enough. peimport.struct.OriginalFirstThunk, swap.struct.OriginalFirstThunk = \ swap.struct.OriginalFirstThunk, peimport.struct.OriginalFirstThunk peimport.struct.TimeDateStamp, swap.struct.TimeDateStamp = \ swap.struct.TimeDateStamp, peimport.struct.TimeDateStamp peimport.struct.ForwarderChain, swap.struct.ForwarderChain = \ swap.struct.ForwarderChain, peimport.struct.ForwarderChain peimport.struct.Name, swap.struct.Name = \ swap.struct.Name, peimport.struct.Name peimport.struct.FirstThunk, swap.struct.FirstThunk = \ swap.struct.FirstThunk, peimport.struct.FirstThunk assert found_elf, 'chrome_elf.dll import not found' pe.write(filename=output_image) for fname in glob.iglob(os.path.join(input_dir, 'chrome.exe.*')): shutil.copy(fname, os.path.join(output_dir, os.path.basename(fname))) return 0
def _get_signature(self): """If this executable is signed, get its signature(s).""" dir_index = pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_SECURITY"] if len(self.pe.OPTIONAL_HEADER.DATA_DIRECTORY) < dir_index: return [] dir_entry = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[dir_index] if not dir_entry or not dir_entry.VirtualAddress or not dir_entry.Size: return [] if not HAVE_MCRYPTO: log.critical("You do not have the m2crypto library installed " "preventing certificate extraction: " "pip install m2crypto") return [] signatures = self.pe.write()[dir_entry.VirtualAddress+8:] bio = M2Crypto.BIO.MemoryBuffer(signatures) if not bio: return [] pkcs7_obj = M2Crypto.m2.pkcs7_read_bio_der(bio.bio_ptr()) if not pkcs7_obj: return [] ret = [] p7 = M2Crypto.SMIME.PKCS7(pkcs7_obj) for cert in p7.get0_signers(M2Crypto.X509.X509_Stack()) or []: subject = cert.get_subject() ret.append({ "serial_number": "%032x" % cert.get_serial_number(), "common_name": subject.CN, "country": subject.C, "locality": subject.L, "organization": subject.O, "email": subject.Email, "sha1": "%040x" % int(cert.get_fingerprint("sha1"), 16), "md5": "%032x" % int(cert.get_fingerprint("md5"), 16), }) if subject.GN and subject.SN: ret[-1]["full_name"] = "%s %s" % (subject.GN, subject.SN) elif subject.GN: ret[-1]["full_name"] = subject.GN elif subject.SN: ret[-1]["full_name"] = subject.SN return ret
def process2(self): pe = self._getLibrary(PEFileModule().getName()) if(pe is None): return "" # get the security directory entry address = pe.OPTIONAL_HEADER.DATA_DIRECTORY[ pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_SECURITY']].VirtualAddress if address > 0: # Always in DER format AFAIK derData = pe.write()[address + 8:] else: logging.debug("address 0") return (contentInfo, rest) = decoder.decode( derData, asn1Spec=rfc2315.ContentInfo()) contentType = contentInfo.getComponentByName('contentType') if contentType == rfc2315.signedData: signedData = decode(contentInfo.getComponentByName( 'content'), asn1Spec=rfc2315.SignedData()) for sd in signedData: if sd == '': continue signerInfos = sd.getComponentByName('signerInfos') for si in signerInfos: issuerAndSerial = si.getComponentByName( 'issuerAndSerialNumber') issuer = issuerAndSerial.getComponentByName( 'issuer').getComponent() for i in issuer: for r in i: at = r.getComponentByName('type') if rfc2459.id_at_countryName == at: cn = decode(r.getComponentByName( 'value'), asn1Spec=rfc2459.X520countryName()) print(cn[0]) elif rfc2459.id_at_organizationName == at: on = decode(r.getComponentByName( 'value'), asn1Spec=rfc2459.X520OrganizationName()) print(on[0].getComponent()) elif rfc2459.id_at_organizationalUnitName == at: ou = decode(r.getComponentByName( 'value'), asn1Spec=rfc2459.X520OrganizationalUnitName()) print(ou[0].getComponent()) elif rfc2459.id_at_commonName == at: cn = decode(r.getComponentByName( 'value'), asn1Spec=rfc2459.X520CommonName()) print(cn[0].getComponent()) else: print at
def main(): # set current dir set_home() # read file data = get_file_data(path) # get pefile infos pe_info = pefile.PE(data = data, fast_load=False) #-------------------------------------- #TimeDateStamp #clean from whole file TimeDateStamp = pe_info.FILE_HEADER.TimeDateStamp #raw to byte string TimeDateStamp = struct.pack('I',TimeDateStamp) new_data = data.replace(TimeDateStamp,'\x00\x00\x00\x00') #-------------------------------------- #DEBUG info if hasattr(pe_info,'DIRECTORY_ENTRY_DEBUG'): #clean debug datas for debug in pe_info.DIRECTORY_ENTRY_DEBUG: d_addr = debug.struct.PointerToRawData d_size = debug.struct.SizeOfData new_data = fill_zero(new_data,d_addr,d_size) #clean debug dir dir = pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_DEBUG'] debug_offset = pe_info.get_offset_from_rva(pe_info.OPTIONAL_HEADER.DATA_DIRECTORY[dir].VirtualAddress) debug_size = pe_info.OPTIONAL_HEADER.DATA_DIRECTORY[dir].Size new_data = fill_zero(new_data,debug_offset,debug_size) #clean links to dir offset = pe_info.OPTIONAL_HEADER.DATA_DIRECTORY[dir].__file_offset__ size = pe_info.OPTIONAL_HEADER.DATA_DIRECTORY[dir].__format_length__ new_data = fill_zero(new_data,offset,size) SaveFile(path,new_data) print '[OK] post-compile\n'
def _get_digital_signers(self): if not self.pe: return None retlist = None if HAVE_CRYPTO: address = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_SECURITY']].VirtualAddress #check if file is digitally signed if address == 0: return retlist signature = self.pe.write()[address+8:] # BIO.MemoryBuffer expects an argument of type 'str' if type(signature) is bytearray: signature = str(signature) bio = BIO.MemoryBuffer(signature) if bio: swig_pkcs7 = m2.pkcs7_read_bio_der(bio.bio_ptr()) if swig_pkcs7: p7 = SMIME.PKCS7(swig_pkcs7) xst = p7.get0_signers(X509.X509_Stack()) retlist = [] if xst: for cert in xst: sn = cert.get_serial_number() sha1_fingerprint = cert.get_fingerprint('sha1').lower().rjust(40, '0') md5_fingerprint = cert.get_fingerprint('md5').lower().rjust(32, '0') subject_str = str(cert.get_subject()) try: cn = subject_str[subject_str.index("/CN=")+len("/CN="):] except: continue retlist.append({ "sn": str(sn), "cn": cn, "sha1_fingerprint": sha1_fingerprint, "md5_fingerprint": md5_fingerprint }) return retlist