我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ctypes.c_buffer()。
def dpapi_encrypt_data(self, input_bytes, entropy = extra_entropy): ''' Encrypts data and returns byte string :param input_bytes: The data to be encrypted :type input_bytes: String or Bytes :param entropy: Extra entropy to add to the encryption process (optional) :type entropy: String or Bytes ''' if not isinstance(input_bytes, bytes) or not isinstance(entropy, bytes): self.fatal('The inputs to dpapi must be bytes') buffer_in = c_buffer(input_bytes, len(input_bytes)) buffer_entropy = c_buffer(entropy, len(entropy)) blob_in = DATA_BLOB(len(input_bytes), buffer_in) blob_entropy = DATA_BLOB(len(entropy), buffer_entropy) blob_out = DATA_BLOB() if CryptProtectData(byref(blob_in), 'python_data', byref(blob_entropy), None, None, CRYPTPROTECT_UI_FORBIDDEN, byref(blob_out)): return get_data(blob_out) else: self.fatal('Failed to decrypt data')
def dpapi_decrypt_data(self, encrypted_bytes, entropy = extra_entropy): ''' Decrypts data and returns byte string :param encrypted_bytes: The encrypted data :type encrypted_bytes: Bytes :param entropy: Extra entropy to add to the encryption process (optional) :type entropy: String or Bytes ''' if not isinstance(encrypted_bytes, bytes) or not isinstance(entropy, bytes): self.fatal('The inputs to dpapi must be bytes') buffer_in = c_buffer(encrypted_bytes, len(encrypted_bytes)) buffer_entropy = c_buffer(entropy, len(entropy)) blob_in = DATA_BLOB(len(encrypted_bytes), buffer_in) blob_entropy = DATA_BLOB(len(entropy), buffer_entropy) blob_out = DATA_BLOB() if CryptUnprotectData(byref(blob_in), None, byref(blob_entropy), None, None, CRYPTPROTECT_UI_FORBIDDEN, byref(blob_out)): return get_data(blob_out) else: self.fatal('Failed to decrypt data')
def __init__(self, msg_or_size=0x1000, attributes=None): # Init the PORT_MESSAGE if isinstance(msg_or_size, (long, int)): self.port_message_buffer_size = msg_or_size self.port_message_raw_buffer = ctypes.c_buffer(msg_or_size) self.port_message = AlpcMessagePort.from_buffer(self.port_message_raw_buffer) self.port_message.set_datalen(0) elif isinstance(msg_or_size, AlpcMessagePort): self.port_message = msg_or_size self.port_message_raw_buffer = self.port_message.raw_buffer self.port_message_buffer_size = len(self.port_message_raw_buffer) else: raise NotImplementedError("Uneexpected type for <msg_or_size>: {0}".format(msg_or_size)) # Init the MessageAttributes if attributes is None: # self.attributes = MessageAttribute.with_all_attributes() self.attributes = MessageAttribute.with_all_attributes() ## Testing else: self.attributes = attributes # PORT_MESSAGE wrappers
def enumerate_handles(): size_needed = ULONG() size = 0x1000 buffer = ctypes.c_buffer(size) try: winproxy.NtQuerySystemInformation(16, buffer, size, ReturnLength=ctypes.byref(size_needed)) except WindowsError as e: pass size = size_needed.value + 0x1000 buffer = ctypes.c_buffer(size) winproxy.NtQuerySystemInformation(16, buffer, size, ReturnLength=ctypes.byref(size_needed)) x = SYSTEM_HANDLE_INFORMATION.from_buffer(buffer) class _GENERATED_SYSTEM_HANDLE_INFORMATION(ctypes.Structure): _fields_ = [ ("HandleCount", ULONG), ("Handles", Handle * x.HandleCount), ] return list(_GENERATED_SYSTEM_HANDLE_INFORMATION.from_buffer_copy(buffer[:size_needed.value]).Handles)
def get_mapped_filename(self, addr): """The filename mapped at address ``addr`` or ``None`` :rtype: :class:`str` or ``None`` """ buffer_size = 0x1000 buffer = ctypes.c_buffer(buffer_size) if windows.current_process.bitness == 32 and self.bitness == 64: target_size = ctypes.c_buffer(buffer_size) try: windows.syswow64.NtQueryVirtualMemory_32_to_64(self.handle, addr, MemorySectionName, buffer, buffer_size, target_size) except NtStatusException as e: if e.code not in [STATUS_FILE_INVALID, STATUS_INVALID_ADDRESS, STATUS_TRANSACTION_NOT_ACTIVE]: raise return None remote_winstring = rctypes.transform_type_to_remote64bits(WinUnicodeString) mapped_filename = remote_winstring(ctypes.addressof(buffer), windows.current_process) return mapped_filename.str try: size = winproxy.GetMappedFileNameA(self.handle, addr, buffer, buffer_size) except winproxy.Kernel32Error as e: return None return buffer[:size]
def query_link(linkpath): utf16_len = len(linkpath) * 2 obj_attr = OBJECT_ATTRIBUTES() obj_attr.Length = ctypes.sizeof(obj_attr) obj_attr.RootDirectory = 0 obj_attr.ObjectName = pointer(LSA_UNICODE_STRING(utf16_len, utf16_len, linkpath)) obj_attr.Attributes = OBJ_CASE_INSENSITIVE obj_attr.SecurityDescriptor = 0 obj_attr.SecurityQualityOfService = 0 res = HANDLE() x = winproxy.NtOpenSymbolicLinkObject(res, DIRECTORY_QUERY | READ_CONTROL , obj_attr) v = LSA_UNICODE_STRING(0x1000, 0x1000, ctypes.cast(ctypes.c_buffer(0x1000), ctypes.c_wchar_p)) s = ULONG() winproxy.NtQuerySymbolicLinkObject(res, v, s) return v.Buffer
def listDevices(flags=0): """Return a list of serial numbers(default), descriptions or locations (Windows only) of the connected FTDI devices depending on value of flags""" n = _ft.DWORD() call_ft(_ft.FT_ListDevices, c.byref(n), None, _ft.DWORD(LIST_NUMBER_ONLY)) devcount = n.value if devcount: # since ctypes has no pointer arithmetic. bd = [c.c_buffer(MAX_DESCRIPTION_SIZE) for i in range(devcount)] +\ [None] # array of pointers to those strings, initially all NULL ba = (c.c_char_p *(devcount + 1))() for i in range(devcount): ba[i] = c.cast(bd[i], c.c_char_p) call_ft(_ft.FT_ListDevices, ba, c.byref(n), _ft.DWORD(LIST_ALL|flags)) return [res for res in ba[:devcount]] else: return None
def eeRead(self): """Get the program information from the EEPROM""" ## if self.devInfo['type'] == 4: ## version = 1 ## elif self.devInfo['type'] == 5: ## version = 2 ## else: ## version = 0 progdata = _ft.ft_program_data( Signature1=0, Signature2=0xffffffff, Version=2, Manufacturer = c.cast(c.c_buffer(256), c.c_char_p), ManufacturerId = c.cast(c.c_buffer(256), c.c_char_p), Description = c.cast(c.c_buffer(256), c.c_char_p), SerialNumber = c.cast(c.c_buffer(256), c.c_char_p)) call_ft(_ft.FT_EE_Read, self.handle, c.byref(progdata)) return progdata
def parse_events(data): """Parse data read from an inotify file descriptor into list of :attr:`~inotify_simple.Event` namedtuples (wd, mask, cookie, name). This function can be used if you have decided to call ``os.read()`` on the inotify file descriptor yourself, instead of calling :func:`~inotify_simple.INotify.read`. Args: data (bytes): A bytestring as read from an inotify file descriptor Returns: list: list of :attr:`~inotify_simple.Event` namedtuples""" events = [] offset = 0 buffer_size = len(data) while offset < buffer_size: wd, mask, cookie, namesize = struct.unpack_from(_EVENT_STRUCT_FORMAT, data, offset) offset += _EVENT_STRUCT_SIZE name = _fsdecode(ctypes.c_buffer(data[offset:offset + namesize], namesize).value) offset += namesize events.append(Event(wd, mask, cookie, name)) return events
def get_data(blob_out): cbData = int(blob_out.cbData) pbData = blob_out.pbData buffer = c_buffer(cbData) memcpy(buffer, pbData, cbData) LocalFree(pbData); return buffer.raw
def from_buffer_size(cls, buffer_size): buffer = ctypes.c_buffer(buffer_size) return cls.from_buffer(buffer)
def with_attributes(cls, attributes): """Create a new :class:`MessageAttribute` with ``attributes`` allocated :returns: :class:`MessageAttribute` """ size = cls._get_required_buffer_size(attributes) buffer = ctypes.c_buffer(size) self = cls.from_buffer(buffer) self.raw_buffer = buffer res = gdef.DWORD() winproxy.AlpcInitializeMessageAttribute(attributes, self, len(self.raw_buffer), res) return self
def computer_name(self): """The name of the computer :type: :class:`str` """ size = DWORD(0x1000) buf = ctypes.c_buffer(size.value) winproxy.GetComputerNameA(buf, ctypes.byref(size)) return buf[:size.value]
def windir(self): buffer = ctypes.c_buffer(0x100) reslen = winproxy.GetWindowsDirectoryA(buffer) return buffer[:reslen]
def _get_object_name(self): lh = self.local_handle size_needed = DWORD() yyy = ctypes.c_buffer(0x1000) winproxy.NtQueryObject(lh, ObjectNameInformation, ctypes.byref(yyy), ctypes.sizeof(yyy), ctypes.byref(size_needed)) return WinUnicodeString.from_buffer_copy(yyy[:size_needed.value]).str
def _get_object_type(self): lh = self.local_handle xxx = EPUBLIC_OBJECT_TYPE_INFORMATION() size_needed = DWORD() try: winproxy.NtQueryObject(lh, ObjectTypeInformation, ctypes.byref(xxx), ctypes.sizeof(xxx), ctypes.byref(size_needed)) except WindowsError as e: if e.code != STATUS_INFO_LENGTH_MISMATCH: # print("ERROR WITH {0:x}".format(lh)) raise size = size_needed.value buffer = ctypes.c_buffer(size) winproxy.NtQueryObject(lh, ObjectTypeInformation, buffer, size, ctypes.byref(size_needed)) xxx = EPUBLIC_OBJECT_TYPE_INFORMATION.from_buffer_copy(buffer) return xxx.TypeName.str
def name(self): """Name of the process :type: :class:`str` """ buffer = ctypes.c_buffer(0x1024) rsize = winproxy.GetProcessImageFileNameA(self.handle, buffer) # GetProcessImageFileNameA returns the fullpath return buffer[:rsize].decode().split("\\")[-1]
def token_user(self): buffer_size = self.get_required_information_size(TokenUser) buffer = ctypes.c_buffer(buffer_size) self.get_informations(TokenUser, buffer) return ctypes.cast(buffer, POINTER(TOKEN_USER))[0]
def _user_and_computer_name(self): tok_usr = self.token_user sid = tok_usr.User.Sid usernamesize = DWORD(0x1000) computernamesize = DWORD(0x1000) username = ctypes.c_buffer(usernamesize.value) computername = ctypes.c_buffer(computernamesize.value) peUse = SID_NAME_USE() winproxy.LookupAccountSidA(None, sid, username, usernamesize, computername, computernamesize, peUse) return username[:usernamesize.value], computername[:computernamesize.value]
def name(self): size = 0x1024 buffer = ctypes.c_buffer(size) res = windows.winproxy.GetWindowTextA(self.handle, buffer, size) return buffer[:res]
def at_point(cls, point): handle = winproxy.WindowFromPoint(point) return cls(handle) # I don't understand the interest: # Either return "" or C:\Python27\python.exe #def module(self): # size = 0x1024 # buffer = ctypes.c_buffer(size) # res = windows.winproxy.GetWindowModuleFileNameA(self.handle, buffer, size) # return buffer[:res]
def get_logical_drive_names(): size = 0x100 buffer = ctypes.c_buffer(size) rsize = winproxy.GetLogicalDriveStringsA(0x1000, buffer) return buffer[:rsize].rstrip("\x00").split("\x00")
def query_dos_device(name): size = 0x1000 buffer = ctypes.c_buffer(size) rsize = winproxy.QueryDosDeviceA(name, buffer, size) return buffer[:rsize].rstrip("\x00").split("\x00")
def entries(self): """Todo: better name ?""" path = self.fullname utf16_len = len(path) * 2 obj_attr = OBJECT_ATTRIBUTES() obj_attr.Length = ctypes.sizeof(obj_attr) obj_attr.RootDirectory = None obj_attr.ObjectName = pointer(LSA_UNICODE_STRING(utf16_len, utf16_len, path)) obj_attr.Attributes = OBJ_CASE_INSENSITIVE obj_attr.SecurityDescriptor = 0 obj_attr.SecurityQualityOfService = 0 res = HANDLE() x = winproxy.NtOpenDirectoryObject(res, DIRECTORY_QUERY | READ_CONTROL , obj_attr) size = 0x1000 buf = ctypes.c_buffer(size) rres = ULONG() ctx = ULONG() while True: try: winproxy.NtQueryDirectoryObject(res, buf, size, False, False, ctx, rres) break except windows.generated_def.ntstatus.NtStatusException as e: if e.code == STATUS_NO_MORE_ENTRIES: return {} if e.code == STATUS_MORE_ENTRIES: size *= 2 buf = ctypes.c_buffer(size) continue raise t = OBJECT_DIRECTORY_INFORMATION.from_buffer(buf) t = POBJECT_DIRECTORY_INFORMATION(t) res = {} for v in t: if v.Name.Buffer is None: break x = KernelObject(path, v.Name.Buffer, v.TypeName.Buffer) res[x.name] = x return res
def get_name(self, nametype=gdef.CERT_NAME_SIMPLE_DISPLAY_TYPE, flags=0): """Retrieve the subject or issuer name of the certificate. See ``CertGetNameStringA`` :returns: :class:`str` """ size = winproxy.CertGetNameStringA(self, nametype, flags, None, None, 0) namebuff = ctypes.c_buffer(size) size = winproxy.CertGetNameStringA(self, nametype, flags, None, namebuff, size) return namebuff[:-1]
def get_param(self, param_type, index=0): data_size = gdef.DWORD() # https://msdn.microsoft.com/en-us/library/windows/desktop/aa380227(v=vs.85).aspx winproxy.CryptMsgGetParam(self, param_type, index, None, data_size) buffer = ctypes.c_buffer(data_size.value) winproxy.CryptMsgGetParam(self, param_type, index, buffer, data_size) if param_type in self.MSG_PARAM_KNOW_TYPES: buffer = self.MSG_PARAM_KNOW_TYPES[param_type].from_buffer(buffer) if isinstance(buffer, gdef.DWORD): # DWORD -> return the Python int return buffer.value return buffer # Certificate accessors
def get_long_path(path): """Return the long path form for ``path`` :returns: :class:`str` """ size = 0x1000 buffer = ctypes.c_buffer(size) rsize = winproxy.GetLongPathNameA(path, buffer, size) return buffer[:rsize]
def get_short_path(path): """Return the short path form for ``path`` :returns: :class:`str` """ size = 0x1000 buffer = ctypes.c_buffer(size) rsize = winproxy.GetShortPathNameA(path, buffer, size) return buffer[:rsize]
def decompress_buffer(comptype, buffer, uncompress_size=None): if uncompress_size is None: uncompress_size = len(buffer) * 10 result_size = DWORD() uncompressed = ctypes.c_buffer(uncompress_size) windows.winproxy.RtlDecompressBuffer(comptype, uncompressed, uncompress_size, buffer, len(buffer), result_size) return uncompressed[:result_size.value] # sid.py + real SID type ?
def get_known_sid(sid_type): size = DWORD() try: windows.winproxy.CreateWellKnownSid(sid_type, None, None, size) except WindowsError: pass buffer = ctypes.c_buffer(size.value) windows.winproxy.CreateWellKnownSid(sid_type, None, buffer, size) return ctypes.cast(buffer, PSID)
def getDeviceInfoDetail(devnum=0): """Get an entry from the internal device info list. """ f = _ft.DWORD() t = _ft.DWORD() i = _ft.DWORD() l = _ft.DWORD() h = _ft.FT_HANDLE() n = c.c_buffer(MAX_DESCRIPTION_SIZE) d = c.c_buffer(MAX_DESCRIPTION_SIZE) createDeviceInfoList() call_ft(_ft.FT_GetDeviceInfoDetail, _ft.DWORD(devnum), c.byref(f), c.byref(t), c.byref(i), c.byref(l), n, d, c.byref(h)) return {'index': devnum, 'flags': f.value, 'type': t.value, 'id': i.value, 'location': l.value, 'serial': n.value, 'description': d.value, 'handle': h}
def read(self, nchars, raw=True): """Read up to nchars bytes of data from the device. Can return fewer if timedout. Use getQueueStatus to find how many bytes are available""" b_read = _ft.DWORD() b = c.c_buffer(nchars) call_ft(_ft.FT_Read, self.handle, b, nchars, c.byref(b_read)) return b.raw[:b_read.value] if raw else b.value[:b_read.value]
def getDeviceInfo(self): """Returns a dictionary describing the device. """ deviceType = _ft.DWORD() deviceId = _ft.DWORD() desc = c.c_buffer(MAX_DESCRIPTION_SIZE) serial = c.c_buffer(MAX_DESCRIPTION_SIZE) call_ft(_ft.FT_GetDeviceInfo, self.handle, c.byref(deviceType), c.byref(deviceId), serial, desc, None) return {'type': deviceType.value, 'id': deviceId.value, 'description': desc.value, 'serial': serial.value}
def __init__(self, ArcName=None, ArcNameW=u'', OpenMode=RAR_OM_LIST): self.CmtBuf = ctypes.c_buffer(64*1024) ctypes.Structure.__init__(self, ArcName=ArcName, ArcNameW=ArcNameW, OpenMode=OpenMode, _CmtBuf=ctypes.addressof(self.CmtBuf), CmtBufSize=ctypes.sizeof(self.CmtBuf))
def __init__(self): self.CmtBuf = ctypes.c_buffer(64*1024) ctypes.Structure.__init__(self, _CmtBuf=ctypes.addressof(self.CmtBuf), CmtBufSize=ctypes.sizeof(self.CmtBuf))
def struct(cls, ea, **sid): """Return the structure_t at address ``ea`` as a dict of ctypes. If the structure ``sid`` is specified, then use that specific structure type. """ ea = interface.address.within(ea) if any(n in sid for n in ('sid','struc','structure','id')): res = sid['sid'] if 'sid' in sid else sid['struc'] if 'struc' in sid else sid['structure'] if 'structure' in sid else sid['id'] if 'id' in sid else None sid = res.id if isinstance(res, structure.structure_t) else res else: sid = type.structure.id(ea) st = structure.instance(sid, offset=ea) typelookup = { (int,-1) : ctypes.c_int8, (int,1) : ctypes.c_uint8, (int,-2) : ctypes.c_int16, (int,2) : ctypes.c_uint16, (int,-4) : ctypes.c_int32, (int,4) : ctypes.c_uint32, (int,-8) : ctypes.c_int64, (int,8) : ctypes.c_uint64, (float,4) : ctypes.c_float, (float,8) : ctypes.c_double, } res = {} for m in st.members: t, val = m.type, read(m.offset, m.size) or '' try: ct = typelookup[t] except KeyError: ty, sz = t if isinstance(t, __builtin__.tuple) else (m.type, 0) if isinstance(t, __builtin__.list): t = typelookup[tuple(ty)] ct = t*sz elif ty in (chr,str): ct = ctypes.c_char*sz else: ct = None finally: res[m.name] = val if any(_ is None for _ in (ct,val)) else ctypes.cast(ctypes.pointer(ctypes.c_buffer(val)),ctypes.POINTER(ct)).contents return res
def _task_list(): psapi = ctypes.windll.psapi kernel = ctypes.windll.kernel32 hModule = ctypes.c_ulong() count = ctypes.c_ulong() modname = ctypes.c_buffer(30) PROCESS_QUERY_INFORMATION = 0x0400 PROCESS_VM_READ = 0x0010 pid_list = win32process.EnumProcesses() info_list = [] for pid in pid_list: hProcess = kernel.OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, pid) if hProcess: psapi.EnumProcessModules(hProcess, ctypes.byref(hModule), ctypes.sizeof(hModule), ctypes.byref(count)) psapi.GetModuleBaseNameA(hProcess, hModule.value, modname, ctypes.sizeof(modname)) pname = ctypes.string_at(modname) procmeminfo = win32process.GetProcessMemoryInfo(hProcess) procmemusage = (procmeminfo["WorkingSetSize"]/1024) info_list.append((pid, pname, procmemusage)) kernel.CloseHandle(hProcess) return info_list
def cfstring_to_string(value_string): value_length = carbon.CFStringGetLength(value_string) buffer_length = carbon.CFStringGetMaximumSizeForEncoding( value_length, kCFStringEncodingUTF8) buffer = ctypes.c_buffer(buffer_length + 1) result = carbon.CFStringGetCString(value_string, buffer, len(buffer), kCFStringEncodingUTF8) if not result: return return buffer.value
def getHardwareInformation(self): ''' Get information from the hardware''' model = c_buffer(255) softwareVersion = c_buffer(255) hardwareNotes = c_buffer(255) self.aptdll.GetHWInfo(self.SerialNum, model, 255, softwareVersion, 255, hardwareNotes, 255) hwinfo = [model.value, softwareVersion.value, hardwareNotes.value] return hwinfo
def test_memory_read(): data = 'A'*0x40 buf = ctypes.c_buffer(data) ea = ctypes.addressof(buf) z = provider.memory() z.seek(ea) if z.consume(len(data)) == data: raise Success raise Failure
def test_memory_write(): data = 'A'*0x40 buf = ctypes.c_buffer(data) ea = ctypes.addressof(buf) z = provider.memory() z.seek(ea) z.store('B'*len(data)) if buf.value == 'B'*len(data): raise Success raise Failure
def test_memory_readwrite(): data = 'A'*0x40 buf = ctypes.c_buffer(data) ea = ctypes.addressof(buf) z = provider.memory() z.seek(ea) z.store('B'*len(data)) z.seek(ea) if z.consume(len(data)) == 'B'*len(data): raise Success
def decrypt(cls, data, key): clen = cls.align_size(len(data)) ckey = c_buffer(key) cin = c_buffer(data) cout = c_buffer(clen) cls.LIB.decrypt(cin, clen, ckey, cout) return cout.raw