我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ctypes.string_at()。
def asString(self): return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
def inet_pton(address_family, ip_string): if address_family == socket.AF_INET: return socket.inet_aton(ip_string) addr = sockaddr() addr.sa_family = address_family addr_size = ctypes.c_int(ctypes.sizeof(addr)) if WSAStringToAddressA( ip_string, address_family, None, ctypes.byref(addr), ctypes.byref(addr_size) ) != 0: raise socket.error(ctypes.FormatError()) if address_family == socket.AF_INET6: return ctypes.string_at(addr.ipv6_addr, 16) raise socket.error('unknown address family')
def xfer(self, txbuff=None): length = len(txbuff) if PYTHON_MAJOR >= 3: _txbuff = bytes(txbuff) _txptr = ctypes.create_string_buffer(_txbuff) else: _txbuff = str(bytearray(txbuff)) _txptr = ctypes.create_string_buffer(_txbuff) _rxptr = ctypes.create_string_buffer(length) data = struct.pack("QQLLHBBL", #64 64 32 32 16 8 8 32 b = 32B ctypes.addressof(_txptr), ctypes.addressof(_rxptr), length, self.speed, 0, #delay self.bits, 0, # cs_change, 0 # pad ) fcntl.ioctl(self.fd, SPI_IOC_MESSAGE(len(data)), data) _rxbuff = ctypes.string_at(_rxptr, length) return bytearray(_rxbuff)
def utf16tostr(addr, size=-1): """Read UTF-16 string from memory and encode as python string.""" cb = size if size > 0 else 32 bstr = ctypes.string_at(addr, cb) if size >= 0: return bstr.decode('utf-16le') # lookup zero char chunks = [] while True: found = cb for i in range(0, cb, 2): c = bstr[i] if c == 0x00: found = i break pass assert found % 2 == 0, "truncated string with len " + str(found) chunks.append(bstr[0:found].decode('utf-16le')) if found != cb: break addr = addr + cb bstr = ctypes.string_at(addr, cb) continue return "".join(chunks)
def inet_pton(address_family, ip_string): addr = sockaddr() addr.sa_family = address_family addr_size = ctypes.c_int(ctypes.sizeof(addr)) if WSAStringToAddressA( ip_string, address_family, None, ctypes.byref(addr), ctypes.byref(addr_size) ) != 0: raise socket.error(ctypes.FormatError()) if address_family == socket.AF_INET: return ctypes.string_at(addr.ipv4_addr, 4) if address_family == socket.AF_INET6: return ctypes.string_at(addr.ipv6_addr, 16) raise socket.error('unknown address family')
def _callback_write(self, buf, key=PiVideoFrameType.frame): """ Overridden to strip alpha bytes when required. """ if self._strip_alpha: s = ct.string_at(buf[0].data, buf[0].length) s = bytearray(s) del s[3::4] # All this messing around with buffers is to work around some issue # with MMAL or ctypes (I'm not sure which is at fault). Anyway, the # upshot is that if you fiddle with buf[0].data in any way # whatsoever (even if you make every attempt to restore its value # afterward), mmal_port_disable locks up when we call it in stop() new_buf = mmal.MMAL_BUFFER_HEADER_T.from_buffer_copy(buf[0]) new_buf.length = len(s) new_buf.data = ct.pointer(ct.c_uint8.from_buffer(s)) return super(PiRawMixin, self)._callback_write(ct.pointer(new_buf), key) else: return super(PiRawMixin, self)._callback_write(buf, key)
def update(self, dt): sample, self.sample = self.sample, None if sample is None: return try: buf = sample.get_buffer() result, mapinfo = buf.map(Gst.MapFlags.READ) addr = mapinfo.__hash__() c_mapinfo = _MapInfo.from_address(addr) sbuf = string_at(c_mapinfo.data, mapinfo.size) self.texture.blit_buffer(sbuf, colorfmt = 'rgb') finally: if mapinfo is not None: buf.unmap(mapinfo) self.image.canvas.ask_update()
def TraceThreadProc(self): """ Return a list of traces, corresponding to the list of required idx """ while self.PLCStatus == "Started": tick = ctypes.c_uint32() size = ctypes.c_uint32() buff = ctypes.c_void_p() TraceBuffer = None if self.PLClibraryLock.acquire(False): if self._GetDebugData(ctypes.byref(tick), ctypes.byref(size), ctypes.byref(buff)) == 0: if size.value: TraceBuffer = ctypes.string_at(buff.value, size.value) self._FreeDebugData() self.PLClibraryLock.release() if TraceBuffer is not None: self._TracesPush((tick.value, TraceBuffer)) self._TracesAutoSuspend() self._TracesFlush()
def asString(self): return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self)) #################################################################################### # # SDDS Payload Containers # ####################################################################################
def surf_same_image(a, b): """Return True if a's pixel buffer is identical to b's""" a_sz = a.get_height() * a.get_pitch() b_sz = b.get_height() * b.get_pitch() if a_sz != b_sz: return False a_bytes = ctypes.string_at(a._pixels_address, a_sz) b_bytes = ctypes.string_at(b._pixels_address, b_sz) return a_bytes == b_bytes
def string_result(result, func, arguments): """Errcheck function. Returns a string and frees the original pointer. It assumes the result is a char *. """ if result: # make a python string copy s = bytes_to_str(ctypes.string_at(result)) # free original string ptr libvlc_free(result) return s return None
def check_string(result, func, cargs): if result: s = string_at(result) free(result) else: s = '' return s.decode(GEOIP_DEFAULT_ENCODING)
def check_string(result, func, cargs, offset=-1, str_result=False): """ Checks the string output returned from the given function, and frees the string pointer allocated by OGR. The `str_result` keyword may be used when the result is the string pointer, otherwise the OGR error code is assumed. The `offset` keyword may be used to extract the string pointer passed in by-reference at the given slice offset in the function arguments. """ if str_result: # For routines that return a string. ptr = result if not ptr: s = None else: s = string_at(result) else: # Error-code return specified. check_err(result) ptr = ptr_byref(cargs, offset) # Getting the string value s = ptr.value # Correctly freeing the allocated memory behind GDAL pointer # with the VSIFree routine. if ptr: lgdal.VSIFree(ptr) return s # ### DataSource, Layer error-checking ### # ### Envelope checking ###
def wkb(self): "Returns the WKB representation of the Geometry." if sys.byteorder == 'little': byteorder = 1 # wkbNDR (from ogr_core.h) else: byteorder = 0 # wkbXDR sz = self.wkb_size # Creating the unsigned character buffer, and passing it in by reference. buf = (c_ubyte * sz)() capi.to_wkb(self.ptr, byteorder, byref(buf)) # Returning a buffer of the string at the pointer. return six.memoryview(string_at(buf, sz))
def check_sized_string(result, func, cargs): """ Error checking for routines that return explicitly sized strings. This frees the memory allocated by GEOS at the result pointer. """ if not result: raise GEOSException('Invalid string pointer returned by GEOS C function "%s"' % func.__name__) # A c_size_t object is passed in by reference for the second # argument on these routines, and its needed to determine the # correct size. s = string_at(result, last_arg_byref(cargs)) # Freeing the memory allocated within GEOS free(result) return s
def check_string(result, func, cargs): """ Error checking for routines that return strings. This frees the memory allocated by GEOS at the result pointer. """ if not result: raise GEOSException('Error encountered checking string return value in GEOS C function "%s".' % func.__name__) # Getting the string value at the pointer address. s = string_at(result) # Freeing the memory allocated within GEOS free(result) return s
def bytes_from_buffer(buffer, maxlen=None): if isinstance(buffer, _pointer_int_types): return ctypes.string_at(buffer, maxlen) if maxlen is not None: return buffer.raw[0:maxlen] return buffer.raw
def native(type_, value): if isinstance(value, type_): return value if sys.version_info < (3,) and type_ == int and isinstance(value, int_types): return value if isinstance(value, ctypes.Array) and value._type_ == ctypes.c_byte: return ctypes.string_at(ctypes.addressof(value), value._length_) return type_(value.value)
def struct_bytes(struct_): return ctypes.string_at(struct_, ctypes.sizeof(struct_.contents))
def nfq_callback(qh, unused_nfmsg, nfad, unused_data): packet = nfq.nfq_get_msg_packet_hdr(nfad).contents packet_id = socket.ntohl(packet.packet_id) payload_pointer = ctypes.c_void_p() size = nfq.nfq_get_payload(nfad, ctypes.byref(payload_pointer)) payload = ctypes.string_at(payload_pointer, size) packet = Packet(packet_id, size, payload, qh) py_callbacks[qh](packet) return 0 # Maps queue handles to user-specified callbacks.
def body_names(self): start_addr = ctypes.addressof(self.names.contents) return [ctypes.string_at(start_addr + int(inc)).decode("utf-8") for inc in self.name_bodyadr.flatten()]