我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ctypes.addressof()。
def teb_base(self): """The address of the thread's TEB :type: :class:`int` """ if windows.current_process.bitness == 32 and self.owner.bitness == 64: restype = rctypes.transform_type_to_remote64bits(THREAD_BASIC_INFORMATION) ressize = (ctypes.sizeof(restype)) # Manual aligned allocation :DDDD nb_qword = (ressize + 8) / ctypes.sizeof(ULONGLONG) buffer = (nb_qword * ULONGLONG)() struct_address = ctypes.addressof(buffer) if (struct_address & 0xf) not in [0, 8]: raise ValueError("ULONGLONG array not aligned on 8") windows.syswow64.NtQueryInformationThread_32_to_64(self.handle, ThreadBasicInformation, struct_address, ressize) return restype(struct_address, windows.current_process).TebBaseAddress res = THREAD_BASIC_INFORMATION() windows.winproxy.NtQueryInformationThread(self.handle, ThreadBasicInformation, byref(res), ctypes.sizeof(res)) return res.TebBaseAddress
def check_args(self, call_flags, shape, typekind, strides, length, bufsize, itemsize, offset=0): if call_flags & 1: typekind_arg = typekind else: typekind_arg = None if call_flags & 2: strides_arg = strides else: strides_arg = None a = Exporter(shape, itemsize=itemsize, strides=strides_arg) self.assertEqual(sizeof(a._data), bufsize) self.assertEqual(a.data, ctypes.addressof(a._data) + offset) m = ArrayInterface(a) self.assertEqual(m.data, a.data) self.assertEqual(m.itemsize, itemsize) self.assertEqual(tuple(m.shape[0:m.nd]), shape) self.assertEqual(tuple(m.strides[0:m.nd]), strides)
def __delitem__(self, i): size = self.size if i >= size: raise IndexError("list index out of range") if i < 0: i = size + i if i < 0: raise IndexError("list index out of range") # shift everything left by one address = ctypes.addressof(self.data) typesize = self._typesize to_address = address + i*typesize from_address = to_address + typesize ctypes.memmove(to_address, from_address, typesize*(size-i-1)) self.size = size = size-1 if self.prealloc_size > size*2: self.compact()
def pcpfastExtractValues(result_p, vsetidx, vlistidx, dtype): """ quicker implementation of pmExtractValue than the default provided with the pcp python bindings this version saves converting the C indexes to python and back again """ inst = c_int() outAtom = pmapi.pmAtomValue() status = LIBPCPFAST.pcpfastExtractValues(result_p, byref(inst), byref(outAtom), vsetidx, vlistidx, dtype) if status < 0: raise pmapi.pmErr(status) if dtype == c_api.PM_TYPE_STRING: # Get pointer to C string c_str = c_char_p() memmove(byref(c_str), addressof(outAtom) + pmapi.pmAtomValue.cp.offset, sizeof(c_char_p)) # Convert to a python string and have result point to it outAtom.cp = outAtom.cp # Free the C string LIBC.free(c_str) return outAtom.dref(dtype), inst.value
def ppid(self): """Parent Process ID :type: :class:`int` """ if windows.current_process.bitness == 32 and self.bitness == 64: xtype = windows.remotectypes.transform_type_to_remote64bits(PROCESS_BASIC_INFORMATION) # Fuck-it <3 data = (ctypes.c_char * ctypes.sizeof(xtype))() windows.syswow64.NtQueryInformationProcess_32_to_64(self.handle, ProcessInformation=data, ProcessInformationLength=ctypes.sizeof(xtype)) # Map a remote64bits(PROCESS_BASIC_INFORMATION) at the address of 'data' x = xtype(ctypes.addressof(data), windows.current_process) else: information_type = 0 x = PROCESS_BASIC_INFORMATION() winproxy.NtQueryInformationProcess(self.handle, information_type, x) return x.InheritedFromUniqueProcessId
def virtual_protect(self, addr, size, protect, old_protect): """Change the access right of one or more page of the process""" if windows.current_process.bitness == 32 and self.bitness == 64: #addr = (addr >> 12) << 12 #addr = ULONG64(addr) if size & 0x0fff: size = ((size >> 12) + 1) << 12 #ssize = ULONG(size) old_protect = ctypes.addressof(old_protect) xaddr = ULONG64(addr) addr = ctypes.addressof(xaddr) xsize = ULONG(size) size = ctypes.addressof(xsize) return windows.syswow64.NtProtectVirtualMemory_32_to_64(self.handle, addr, size, protect, old_protect) else: winproxy.VirtualProtectEx(self.handle, addr, size, protect, old_protect)
def get_mapped_filename(self, addr): """The filename mapped at address ``addr`` or ``None`` :rtype: :class:`str` or ``None`` """ buffer_size = 0x1000 buffer = ctypes.c_buffer(buffer_size) if windows.current_process.bitness == 32 and self.bitness == 64: target_size = ctypes.c_buffer(buffer_size) try: windows.syswow64.NtQueryVirtualMemory_32_to_64(self.handle, addr, MemorySectionName, buffer, buffer_size, target_size) except NtStatusException as e: if e.code not in [STATUS_FILE_INVALID, STATUS_INVALID_ADDRESS, STATUS_TRANSACTION_NOT_ACTIVE]: raise return None remote_winstring = rctypes.transform_type_to_remote64bits(WinUnicodeString) mapped_filename = remote_winstring(ctypes.addressof(buffer), windows.current_process) return mapped_filename.str try: size = winproxy.GetMappedFileNameA(self.handle, addr, buffer, buffer_size) except winproxy.Kernel32Error as e: return None return buffer[:size]
def modules(self): """The loaded modules present in the PEB :type: [:class:`LoadedModule`] -- List of loaded modules """ res = [] list_entry_ptr = ctypes.cast(self.Ldr.contents.InMemoryOrderModuleList.Flink, LIST_ENTRY_PTR) current_dll = list_entry_ptr.TO_LDR_ENTRY() while current_dll.DllBase: res.append(current_dll) list_entry_ptr = ctypes.cast(current_dll.InMemoryOrderLinks.Flink, LIST_ENTRY_PTR) current_dll = list_entry_ptr.TO_LDR_ENTRY() return [LoadedModule.from_address(addressof(LDR)) for LDR in res] # Memory stuff
def xfer(self, txbuff=None): length = len(txbuff) if PYTHON_MAJOR >= 3: _txbuff = bytes(txbuff) _txptr = ctypes.create_string_buffer(_txbuff) else: _txbuff = str(bytearray(txbuff)) _txptr = ctypes.create_string_buffer(_txbuff) _rxptr = ctypes.create_string_buffer(length) data = struct.pack("QQLLHBBL", #64 64 32 32 16 8 8 32 b = 32B ctypes.addressof(_txptr), ctypes.addressof(_rxptr), length, self.speed, 0, #delay self.bits, 0, # cs_change, 0 # pad ) fcntl.ioctl(self.fd, SPI_IOC_MESSAGE(len(data)), data) _rxbuff = ctypes.string_at(_rxptr, length) return bytearray(_rxbuff)
def next(self): """Return the next event in the trail.""" event = lib.tdb_cursor_next(self.cursor) if not event: raise StopIteration() address = addressof(event.contents.items) items = (tdb_item*event.contents.num_items).from_address(address) timestamp = event.contents.timestamp if self.parsetime: timestamp = datetime.fromtimestamp(event.contents.timestamp) if self.only_timestamp: return timestamp elif self.valuefun: return self.cls(timestamp, *(self.valuefun(item) for item in items)) else: return self.cls(timestamp, *items)
def arg_split(commandline, posix=False, strict=True): """Split a command line's arguments in a shell-like manner. This is a special version for windows that use a ctypes call to CommandLineToArgvW to do the argv splitting. The posix paramter is ignored. If strict=False, process_common.arg_split(...strict=False) is used instead. """ #CommandLineToArgvW returns path to executable if called with empty string. if commandline.strip() == "": return [] if not strict: # not really a cl-arg, fallback on _process_common return py_arg_split(commandline, posix=posix, strict=strict) argvn = c_int() result_pointer = CommandLineToArgvW(py3compat.cast_unicode(commandline.lstrip()), ctypes.byref(argvn)) result_array_type = LPCWSTR * argvn.value result = [arg for arg in result_array_type.from_address(ctypes.addressof(result_pointer.contents))] retval = LocalFree(result_pointer) return result
def save( blenderobject, path ): cmesh = Mesh( blenderobject.data ) start = time.time() dotmesh( path, ctypes.addressof( cmesh.faces ), ctypes.addressof( cmesh.faces_smooth ), ctypes.addressof( cmesh.faces_material_index ), ctypes.addressof( cmesh.vertex_positions ), ctypes.addressof( cmesh.vertex_normals ), cmesh.numFaces, cmesh.numVerts, ) print('Mesh dumped in %s seconds' % (time.time()-start)) ## Make sure we can import from same directory
def create_read_console(get_text): code = [None] def _read_console(p, buf, buflen, add_history): if not code[0]: text = get_text(p.decode(ENCODING), add_history) if text is None: return 0 code[0] = text.encode(ENCODING) addr = ctypes.addressof(buf.contents) c2 = (ctypes.c_char * buflen).from_address(addr) nb = min(len(code[0]), buflen - 2) c2[:nb] = code[0][:nb] if nb < buflen - 2: c2[nb:(nb + 2)] = b'\n\0' code[0] = code[0][nb:] return 1 return _read_console
def nprocessors(): try: try: # Mac OS libc=ctypes.cdll.LoadLibrary(ctypes.util.find_library('libc')) v=ctypes.c_int(0) size=ctypes.c_size_t(ctypes.sizeof(v)) libc.sysctlbyname('hw.ncpu', ctypes.c_voidp(ctypes.addressof(v)), ctypes.addressof(size), None, 0) return v.value except: # Cygwin (Windows) and Linuxes # Could try sysconf(_SC_NPROCESSORS_ONLN) (LSB) next. Instead, count processors in cpuinfo. s = open('/proc/cpuinfo', 'r').read() return s.replace(' ', '').replace('\t', '').count('processor:') except: return 1
def marshal(self) : "serializes this Message into the wire protocol format and returns a bytes object." buf = ct.POINTER(ct.c_ubyte)() nr_bytes = ct.c_int() if not dbus.dbus_message_marshal(self._dbobj, ct.byref(buf), ct.byref(nr_bytes)) : raise CallFailed("dbus_message_marshal") #end if result = bytearray(nr_bytes.value) ct.memmove \ ( ct.addressof((ct.c_ubyte * nr_bytes.value).from_buffer(result)), buf, nr_bytes.value ) dbus.dbus_free(buf) return \ result #end marshal
def demarshal(celf, buf, error = None) : "deserializes a bytes or array-of-bytes object from the wire protocol" \ " format into a Message object." error, my_error = _get_error(error) if isinstance(buf, bytes) : baseadr = ct.cast(buf, ct.c_void_p).value elif isinstance(buf, bytearray) : baseadr = ct.addressof((ct.c_ubyte * len(buf)).from_buffer(buf)) elif isinstance(buf, array.array) and buf.typecode == "B" : baseadr = buf.buffer_info()[0] else : raise TypeError("buf is not bytes, bytearray or array.array of bytes") #end if msg = dbus.dbus_message_demarshal(baseadr, len(buf), error._dbobj) my_error.raise_if_set() if msg != None : msg = celf(msg) #end if return \ msg #end demarshal
def buffer_info(self): return (addressof(self.buffer), self.shape[0])
def _get_buffer(self, view, flags): from ctypes import addressof if (flags & PyBUF_WRITABLE) == PyBUF_WRITABLE and self.readonly: raise BufferError("buffer is read-only") if ((flags & PyBUF_C_CONTIGUOUS) == PyBUF_C_CONTIGUOUS and not self.is_contiguous('C')): raise BufferError("data is not C contiguous") if ((flags & PyBUF_F_CONTIGUOUS) == PyBUF_F_CONTIGUOUS and not self.is_contiguous('F')): raise BufferError("data is not F contiguous") if ((flags & PyBUF_ANY_CONTIGUOUS) == PyBUF_ANY_CONTIGUOUS and not self.is_contiguous('A')): raise BufferError("data is not contiguous") view.buf = self.buf view.readonly = self.readonly view.len = self.len if flags | PyBUF_WRITABLE == PyBUF_WRITABLE: view.ndim = 0 else: view.ndim = self.ndim view.itemsize = self.itemsize if (flags & PyBUF_FORMAT) == PyBUF_FORMAT: view.format = addressof(self._format) else: view.format = None if (flags & PyBUF_ND) == PyBUF_ND: view.shape = addressof(self._shape) elif self.is_contiguous('C'): view.shape = None else: raise BufferError( "shape required for {} dimensional data".format(self.ndim)) if (flags & PyBUF_STRIDES) == PyBUF_STRIDES: view.strides = ctypes.addressof(self._strides) elif view.shape is None or self.is_contiguous('C'): view.strides = None else: raise BufferError("strides required for none C contiguous data") view.suboffsets = None view.internal = None view.obj = self
def test_attributes(self): a = Exporter((13, 5, 11, 3), '=h', (440, 88, 8, 2)) self.assertEqual(a.ndim, 4) self.assertEqual(a.itemsize, 2) self.assertFalse(a.readonly) self.assertEqual(a.shape, (13, 5, 11, 3)) self.assertEqual(a.format, '=h') self.assertEqual(a.strides, (440, 88, 8, 2)) self.assertEqual(a.len, 4290) self.assertEqual(a.buflen, 5720) self.assertEqual(a.buf, ctypes.addressof(a._buf)) a = Exporter((8,)) self.assertEqual(a.ndim, 1) self.assertEqual(a.itemsize, 1) self.assertFalse(a.readonly) self.assertEqual(a.shape, (8,)) self.assertEqual(a.format, 'B') self.assertTrue(isinstance(a.strides, tuple)) self.assertEqual(a.strides, (1,)) self.assertEqual(a.len, 8) self.assertEqual(a.buflen, 8) a = Exporter([13, 5, 11, 3], '=h', [440, 88, 8, 2]) self.assertTrue(isinstance(a.shape, tuple)) self.assertTrue(isinstance(a.strides, tuple)) self.assertEqual(a.shape, (13, 5, 11, 3)) self.assertEqual(a.strides, (440, 88, 8, 2))
def capsule_new(p): return PyCapsule_New(addressof(p), None, None)
def capsule_new(p): return PyCObject_FromVoidPtr(addressof(p), None)
def test_byteswapped(self): a = Array((1,), 'u', 4, flags=(PAI_ALIGNED | PAI_WRITEABLE)) ct = a._ctype self.assertTrue(ct is not c_uint32) if sys.byteorder == 'little': self.assertTrue(ct is c_uint32.__ctype_be__) else: self.assertTrue(ct is c_uint32.__ctype_le__) i = 0xa0b0c0d n = c_uint32(i) a[0] = i self.assertEqual(a[0], i) self.assertEqual(a._data[0:4], cast(addressof(n), POINTER(c_uint8))[3:-1:-1])
def NEWBUF_test_bad_format(self): from pygame.bufferproxy import BufferProxy from pygame.newbuffer import BufferMixin from ctypes import create_string_buffer, addressof buftools = self.buftools Exporter = buftools.Exporter Importer = buftools.Importer PyBUF_FORMAT = buftools.PyBUF_FORMAT for format in ['', '=', '1', ' ', '2h', '=2h', '0x', '11x', '=!', 'h ', ' h', 'hh', '?']: exp = Exporter((1,), format, itemsize=2) b = BufferProxy(exp) self.assertRaises(ValueError, Importer, b, PyBUF_FORMAT)
def _get_own_repr(self): return self._addr_repr(ctypes.addressof(self._blob))
def _convert_to_address(self, BClass): if getattr(BClass, '_BItem', None) is self.__class__: return ctypes.addressof(self._blob) else: return CTypesData._convert_to_address(self, BClass)
def write_variable(self, BType, name, value): new_ctypes_obj = BType._to_ctypes(value) ctypes_obj = BType._ctype.in_dll(self.cdll, name) ctypes.memmove(ctypes.addressof(ctypes_obj), ctypes.addressof(new_ctypes_obj), ctypes.sizeof(BType._ctype))
def read(cls, byte_object): a = cls() ctypes.memmove(ctypes.addressof(a), bytes(byte_object), min(len(byte_object), ctypes.sizeof(cls))) return a # Mixin to allow conversion of a ctypes structure to and from a dictionary.
def load_settings(cls, b): a = cls() ctypes.memmove(ctypes.addressof(a.start_of_settings), bytes(b), min(len(b), ctypes.sizeof(a))) return a
def read(cls, byte_object): a = cls() a.body_length = len(byte_object) - ctypes.sizeof(Command) ctypes.memmove(ctypes.addressof(a), bytes(byte_object), min(len(byte_object), ctypes.sizeof(cls))) return a
def __bytes__(self): length = self.body_length + ctypes.sizeof(Command) a = ctypes.create_string_buffer(length) ctypes.memmove(ctypes.addressof(a), ctypes.addressof(self), length) return bytes(a)
def __new__(self): b = "00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00"\ " 00 00 00 00 00 01 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00"\ " 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00"\ " 00 00 00 00 00 00 00" byte_object = bytes([int(a, 16) for a in b.split(" ")]) a = super().__new__(self) ctypes.memmove(ctypes.addressof(a.set_settings_request), bytes(byte_object), len(byte_object)) return a # Should equal 2 for off, 1 for on.
def load_payload(self, b): self.body_length = len(b) + 8 # II from position, length ctypes.memmove(ctypes.addressof(self.data_reply.data), bytes(b), len(b)) self.command.packet_length = self.body_length
def get_attribute(self, attribute): if not self.is_allocated(attribute): raise ValueError("Cannot get non-allocated attribute <{0}>".format(attribute)) offset = ctypes.sizeof(self) for sflag, struct in self.ATTRIBUTE_BY_FLAG: if sflag == attribute: # print("Attr {0:#x} was at offet {1:#x}".format(attribute, offset)) return struct.from_address(ctypes.addressof(self) + offset) elif self.is_allocated(sflag): offset += ctypes.sizeof(struct) raise ValueError("ALPC Attribute <{0}> not found :(".format(attribute))
def get_raw(self): x = DWORD.from_address(ctypes.addressof(self)) return x.value
def set_raw(self, value): x = DWORD.from_address(ctypes.addressof(self)) x.value = value return None
def EEFlags(self): """Enhanced view of the Eflags (you also have ``EFlags`` for the raw value) :type: :class:`EEflags` """ off = type(self).EFlags.offset x = EEflags.from_address(ctypes.addressof(self) + off) x.self = self return x
def EDr7(self): """Enhanced view of the DR7 register (you also have ``Dr7`` for the raw value) :type: :class:`EDr7` """ off = type(self).Dr7.offset x = EDr7.from_address(ctypes.addressof(self) + off) x.self = self return x
def get_DataDirectory(self): # This won't work if we load a PE32 in a 64bit process # PE32 .NET... # return self.get_OptionalHeader().DataDirectory DataDirectory_type = IMAGE_DATA_DIRECTORY * IMAGE_NUMBEROF_DIRECTORY_ENTRIES SizeOfOptionalHeader = self.get_NT_HEADER().FileHeader.SizeOfOptionalHeader if self.target is None: opt_header_addr = ctypes.addressof(self.get_NT_HEADER().OptionalHeader) else: opt_header_addr = self.get_NT_HEADER().OptionalHeader._base_addr DataDirectory_addr = opt_header_addr + SizeOfOptionalHeader - ctypes.sizeof(DataDirectory_type) return self.transformers.create_structure_at(DataDirectory_type, DataDirectory_addr)
def sections(self): nt_header = self.get_NT_HEADER() nb_section = nt_header.FileHeader.NumberOfSections SizeOfOptionalHeader = self.get_NT_HEADER().FileHeader.SizeOfOptionalHeader if self.target is None: opt_header_addr = ctypes.addressof(self.get_NT_HEADER().OptionalHeader) else: opt_header_addr = self.get_NT_HEADER().OptionalHeader._base_addr base_section = opt_header_addr + SizeOfOptionalHeader #pe_section_type = IMAGE_SECTION_HEADER return [PESection.create(self, base_section + (sizeof(IMAGE_SECTION_HEADER) * i)) for i in range(nb_section)] #sections_array = self.transformers.create_structure_at((self.PESection * nb_section), base_section) #return list(sections_array)
def NtProtectVirtualMemory_32_to_64(ProcessHandle, BaseAddress, NumberOfBytesToProtect, NewAccessProtection, OldAccessProtection=None): if OldAccessProtection is None: XOldAccessProtection = DWORD() OldAccessProtection = ctypes.addressof(XOldAccessProtection) return NtProtectVirtualMemory_32_to_64.ctypes_function(ProcessHandle, BaseAddress, NumberOfBytesToProtect, NewAccessProtection, OldAccessProtection)
def x86_cpuid(req): """Performs a CPUID in 32bits mode :rtype: :class:`X86CpuidResult` """ cpuid_res = X86CpuidResult() do_cpuid32(req, ctypes.addressof(cpuid_res)) return cpuid_res
def x64_cpuid(req): """Performs a CPUID in 64bits mode :rtype: :class:`X86CpuidResult` """ cpuid_res = X64CpuidResult() do_cpuid64(req, ctypes.addressof(cpuid_res)) # For now assembler cannot do 32bits register in x64 return X86CpuidResult(cpuid_res.RAX, cpuid_res.RBX, cpuid_res.RCX, cpuid_res.RDX)
def raw_value(self): # Bypass our own 'value' implementation # Even if we are a subclass of c_ulonglong my_addr = ctypes.addressof(self) return ctypes.c_ulong.from_address(my_addr).value
def get_kernel_modules(): if windows.current_process.is_wow_64: return get_kernel_modules_syswow64() cbsize = DWORD() winproxy.NtQuerySystemInformation(SystemModuleInformation, None, 0, byref(cbsize)) raw_buffer = (cbsize.value * c_char)() buffer = SYSTEM_MODULE_INFORMATION.from_address(ctypes.addressof(raw_buffer)) winproxy.NtQuerySystemInformation(SystemModuleInformation, byref(raw_buffer), sizeof(raw_buffer), byref(cbsize)) modules = (SYSTEM_MODULE * buffer.ModulesCount).from_address(addressof(buffer) + SYSTEM_MODULE_INFORMATION.Modules.offset) return list(modules)
def get_kernel_modules_syswow64(): cbsize = DWORD() windows.syswow64.NtQuerySystemInformation_32_to_64(SystemModuleInformation, None, 0, ctypes.addressof(cbsize)) raw_buffer = (cbsize.value * c_char)() buffer = SYSTEM_MODULE_INFORMATION64.from_address(ctypes.addressof(raw_buffer)) windows.syswow64.NtQuerySystemInformation_32_to_64(SystemModuleInformation, byref(raw_buffer), sizeof(raw_buffer), byref(cbsize)) modules = (SYSTEM_MODULE64 * buffer.ModulesCount).from_address(addressof(buffer) + SYSTEM_MODULE_INFORMATION64.Modules.offset) return list(modules) # String stuff
def __init__(self): self.verify_implem(self.IMPLEMENT) vtable, implems = self._create_vtable(self.IMPLEMENT) self.vtable = vtable self.implems = implems self.vtable_pointer = ctypes.pointer(self.vtable) self._as_parameter_ = ctypes.addressof(self.vtable_pointer)
def readBytes(self, address, size): debug("Read %s bytes at %s" % (size, formatAddress(address))) buffer = create_string_buffer(size) io_desc = ptrace_io_desc( piod_op=PIOD_READ_D, piod_offs=address, piod_addr=addressof(buffer), piod_len=size) ptrace_io(self.pid, io_desc) return buffer.raw