Python ctypes 模块,string_at() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ctypes.string_at()

项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def inet_pton(address_family, ip_string):
        if address_family == socket.AF_INET:
            return socket.inet_aton(ip_string)

        addr = sockaddr()
        addr.sa_family = address_family
        addr_size = ctypes.c_int(ctypes.sizeof(addr))

        if WSAStringToAddressA(
                ip_string,
                address_family,
                None,
                ctypes.byref(addr),
                ctypes.byref(addr_size)
        ) != 0:
            raise socket.error(ctypes.FormatError())

        if address_family == socket.AF_INET6:
            return ctypes.string_at(addr.ipv6_addr, 16)

        raise socket.error('unknown address family')
项目:rpi3-webiopi    作者:thortex    | 项目源码 | 文件源码
def xfer(self, txbuff=None):
        length = len(txbuff)
        if PYTHON_MAJOR >= 3:
            _txbuff = bytes(txbuff)
            _txptr = ctypes.create_string_buffer(_txbuff)
        else:
            _txbuff = str(bytearray(txbuff))
            _txptr = ctypes.create_string_buffer(_txbuff)
        _rxptr = ctypes.create_string_buffer(length)

        data = struct.pack("QQLLHBBL",  #64 64 32 32 16 8 8 32 b = 32B
                    ctypes.addressof(_txptr),
                    ctypes.addressof(_rxptr),
                    length,
                    self.speed,
                    0, #delay
                    self.bits,
                    0, # cs_change,
                    0  # pad
                    )

        fcntl.ioctl(self.fd, SPI_IOC_MESSAGE(len(data)), data)
        _rxbuff = ctypes.string_at(_rxptr, length)
        return bytearray(_rxbuff)
项目:pysciter    作者:sciter-sdk    | 项目源码 | 文件源码
def utf16tostr(addr, size=-1):
    """Read UTF-16 string from memory and encode as python string."""
    cb = size if size > 0 else 32
    bstr = ctypes.string_at(addr, cb)
    if size >= 0:
        return bstr.decode('utf-16le')

    # lookup zero char
    chunks = []
    while True:
        found = cb
        for i in range(0, cb, 2):
            c = bstr[i]
            if c == 0x00:
                found = i
                break
            pass
        assert found % 2 == 0, "truncated string with len " + str(found)
        chunks.append(bstr[0:found].decode('utf-16le'))
        if found != cb:
            break
        addr = addr + cb
        bstr = ctypes.string_at(addr, cb)
        continue
    return "".join(chunks)
项目:autoinjection    作者:ChengWiLL    | 项目源码 | 文件源码
def inet_pton(address_family, ip_string):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))

    if WSAStringToAddressA(
            ip_string,
            address_family,
            None,
            ctypes.byref(addr),
            ctypes.byref(addr_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    if address_family == socket.AF_INET:
        return ctypes.string_at(addr.ipv4_addr, 4)
    if address_family == socket.AF_INET6:
        return ctypes.string_at(addr.ipv6_addr, 16)

    raise socket.error('unknown address family')
项目:Cayenne-Agent    作者:myDevicesIoT    | 项目源码 | 文件源码
def xfer(self, txbuff=None):
        length = len(txbuff)
        if PYTHON_MAJOR >= 3:
            _txbuff = bytes(txbuff)
            _txptr = ctypes.create_string_buffer(_txbuff)
        else:
            _txbuff = str(bytearray(txbuff))
            _txptr = ctypes.create_string_buffer(_txbuff)
        _rxptr = ctypes.create_string_buffer(length)

        data = struct.pack("QQLLHBBL",  #64 64 32 32 16 8 8 32 b = 32B
                    ctypes.addressof(_txptr),
                    ctypes.addressof(_rxptr),
                    length,
                    self.speed,
                    0, #delay
                    self.bits,
                    0, # cs_change,
                    0  # pad
                    )

        fcntl.ioctl(self.fd, SPI_IOC_MESSAGE(len(data)), data)
        _rxbuff = ctypes.string_at(_rxptr, length)
        return bytearray(_rxbuff)
项目:Tor    作者:r0oth3x49    | 项目源码 | 文件源码
def inet_pton(address_family, ip_string):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))

    if WSAStringToAddressA(
            ip_string,
            address_family,
            None,
            ctypes.byref(addr),
            ctypes.byref(addr_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    if address_family == socket.AF_INET:
        return ctypes.string_at(addr.ipv4_addr, 4)
    if address_family == socket.AF_INET6:
        return ctypes.string_at(addr.ipv6_addr, 16)

    raise socket.error('unknown address family')
项目:ivport-v2    作者:ivmech    | 项目源码 | 文件源码
def _callback_write(self, buf, key=PiVideoFrameType.frame):
        """
        Overridden to strip alpha bytes when required.
        """
        if self._strip_alpha:
            s = ct.string_at(buf[0].data, buf[0].length)
            s = bytearray(s)
            del s[3::4]
            # All this messing around with buffers is to work around some issue
            # with MMAL or ctypes (I'm not sure which is at fault). Anyway, the
            # upshot is that if you fiddle with buf[0].data in any way
            # whatsoever (even if you make every attempt to restore its value
            # afterward), mmal_port_disable locks up when we call it in stop()
            new_buf = mmal.MMAL_BUFFER_HEADER_T.from_buffer_copy(buf[0])
            new_buf.length = len(s)
            new_buf.data = ct.pointer(ct.c_uint8.from_buffer(s))
            return super(PiRawMixin, self)._callback_write(ct.pointer(new_buf), key)
        else:
            return super(PiRawMixin, self)._callback_write(buf, key)
项目:displayminion    作者:cedarsuite    | 项目源码 | 文件源码
def update(self, dt):
        sample, self.sample = self.sample, None
        if sample is None:
            return

        try:
            buf = sample.get_buffer()
            result, mapinfo = buf.map(Gst.MapFlags.READ)

            addr = mapinfo.__hash__()
            c_mapinfo = _MapInfo.from_address(addr)

            sbuf = string_at(c_mapinfo.data, mapinfo.size)
            self.texture.blit_buffer(sbuf, colorfmt = 'rgb')
        finally:
            if mapinfo is not None:
                buf.unmap(mapinfo)

        self.image.canvas.ask_update()
项目:beremiz    作者:nucleron    | 项目源码 | 文件源码
def TraceThreadProc(self):
        """
        Return a list of traces, corresponding to the list of required idx
        """
        while self.PLCStatus == "Started":
            tick = ctypes.c_uint32()
            size = ctypes.c_uint32()
            buff = ctypes.c_void_p()
            TraceBuffer = None
            if self.PLClibraryLock.acquire(False):
                if self._GetDebugData(ctypes.byref(tick),
                                      ctypes.byref(size),
                                      ctypes.byref(buff)) == 0:
                    if size.value:
                        TraceBuffer = ctypes.string_at(buff.value, size.value)
                    self._FreeDebugData()
                self.PLClibraryLock.release()
            if TraceBuffer is not None:
                self._TracesPush((tick.value, TraceBuffer))
            self._TracesAutoSuspend()
        self._TracesFlush()
项目:Helix    作者:3lackrush    | 项目源码 | 文件源码
def inet_pton(address_family, ip_string):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))

    if WSAStringToAddressA(
            ip_string,
            address_family,
            None,
            ctypes.byref(addr),
            ctypes.byref(addr_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    if address_family == socket.AF_INET:
        return ctypes.string_at(addr.ipv4_addr, 4)
    if address_family == socket.AF_INET6:
        return ctypes.string_at(addr.ipv6_addr, 16)

    raise socket.error('unknown address family')
项目:PySyncObj    作者:bakwc    | 项目源码 | 文件源码
def inet_pton(address_family, ip_string):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))

    if WSAStringToAddressA(
            ip_string,
            address_family,
            None,
            ctypes.byref(addr),
            ctypes.byref(addr_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    if address_family == socket.AF_INET:
        return ctypes.string_at(addr.ipv4_addr, 4)
    if address_family == socket.AF_INET6:
        return ctypes.string_at(addr.ipv6_addr, 16)

    raise socket.error('unknown address family')
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))


####################################################################################
#
# SDDS Payload Containers
#
####################################################################################
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def surf_same_image(a, b):
    """Return True if a's pixel buffer is identical to b's"""

    a_sz = a.get_height() * a.get_pitch()
    b_sz = b.get_height() * b.get_pitch()
    if a_sz != b_sz:
        return False
    a_bytes = ctypes.string_at(a._pixels_address, a_sz)
    b_bytes = ctypes.string_at(b._pixels_address, b_sz)
    return a_bytes == b_bytes
项目:AlexaOPi    作者:dony71    | 项目源码 | 文件源码
def string_result(result, func, arguments):
    """Errcheck function. Returns a string and frees the original pointer.

    It assumes the result is a char *.
    """
    if result:
        # make a python string copy
        s = bytes_to_str(ctypes.string_at(result))
        # free original string ptr
        libvlc_free(result)
        return s
    return None
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def check_string(result, func, cargs):
    if result:
        s = string_at(result)
        free(result)
    else:
        s = ''
    return s.decode(GEOIP_DEFAULT_ENCODING)
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def check_string(result, func, cargs, offset=-1, str_result=False):
    """
    Checks the string output returned from the given function, and frees
    the string pointer allocated by OGR.  The `str_result` keyword
    may be used when the result is the string pointer, otherwise
    the OGR error code is assumed.  The `offset` keyword may be used
    to extract the string pointer passed in by-reference at the given
    slice offset in the function arguments.
    """
    if str_result:
        # For routines that return a string.
        ptr = result
        if not ptr:
            s = None
        else:
            s = string_at(result)
    else:
        # Error-code return specified.
        check_err(result)
        ptr = ptr_byref(cargs, offset)
        # Getting the string value
        s = ptr.value
    # Correctly freeing the allocated memory behind GDAL pointer
    # with the VSIFree routine.
    if ptr:
        lgdal.VSIFree(ptr)
    return s

# ### DataSource, Layer error-checking ###


# ### Envelope checking ###
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def wkb(self):
        "Returns the WKB representation of the Geometry."
        if sys.byteorder == 'little':
            byteorder = 1  # wkbNDR (from ogr_core.h)
        else:
            byteorder = 0  # wkbXDR
        sz = self.wkb_size
        # Creating the unsigned character buffer, and passing it in by reference.
        buf = (c_ubyte * sz)()
        capi.to_wkb(self.ptr, byteorder, byref(buf))
        # Returning a buffer of the string at the pointer.
        return six.memoryview(string_at(buf, sz))
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def check_sized_string(result, func, cargs):
    """
    Error checking for routines that return explicitly sized strings.

    This frees the memory allocated by GEOS at the result pointer.
    """
    if not result:
        raise GEOSException('Invalid string pointer returned by GEOS C function "%s"' % func.__name__)
    # A c_size_t object is passed in by reference for the second
    # argument on these routines, and its needed to determine the
    # correct size.
    s = string_at(result, last_arg_byref(cargs))
    # Freeing the memory allocated within GEOS
    free(result)
    return s
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def check_string(result, func, cargs):
    """
    Error checking for routines that return strings.

    This frees the memory allocated by GEOS at the result pointer.
    """
    if not result:
        raise GEOSException('Error encountered checking string return value in GEOS C function "%s".' % func.__name__)
    # Getting the string value at the pointer address.
    s = string_at(result)
    # Freeing the memory allocated within GEOS
    free(result)
    return s
项目:NarshaTech    作者:KimJangHyeon    | 项目源码 | 文件源码
def check_string(result, func, cargs):
    if result:
        s = string_at(result)
        free(result)
    else:
        s = ''
    return s.decode(GEOIP_DEFAULT_ENCODING)
项目:NarshaTech    作者:KimJangHyeon    | 项目源码 | 文件源码
def check_string(result, func, cargs, offset=-1, str_result=False):
    """
    Checks the string output returned from the given function, and frees
    the string pointer allocated by OGR.  The `str_result` keyword
    may be used when the result is the string pointer, otherwise
    the OGR error code is assumed.  The `offset` keyword may be used
    to extract the string pointer passed in by-reference at the given
    slice offset in the function arguments.
    """
    if str_result:
        # For routines that return a string.
        ptr = result
        if not ptr:
            s = None
        else:
            s = string_at(result)
    else:
        # Error-code return specified.
        check_err(result)
        ptr = ptr_byref(cargs, offset)
        # Getting the string value
        s = ptr.value
    # Correctly freeing the allocated memory behind GDAL pointer
    # with the VSIFree routine.
    if ptr:
        lgdal.VSIFree(ptr)
    return s

# ### DataSource, Layer error-checking ###


# ### Envelope checking ###
项目:NarshaTech    作者:KimJangHyeon    | 项目源码 | 文件源码
def wkb(self):
        "Returns the WKB representation of the Geometry."
        if sys.byteorder == 'little':
            byteorder = 1  # wkbNDR (from ogr_core.h)
        else:
            byteorder = 0  # wkbXDR
        sz = self.wkb_size
        # Creating the unsigned character buffer, and passing it in by reference.
        buf = (c_ubyte * sz)()
        capi.to_wkb(self.ptr, byteorder, byref(buf))
        # Returning a buffer of the string at the pointer.
        return six.memoryview(string_at(buf, sz))
项目:NarshaTech    作者:KimJangHyeon    | 项目源码 | 文件源码
def check_sized_string(result, func, cargs):
    """
    Error checking for routines that return explicitly sized strings.

    This frees the memory allocated by GEOS at the result pointer.
    """
    if not result:
        raise GEOSException('Invalid string pointer returned by GEOS C function "%s"' % func.__name__)
    # A c_size_t object is passed in by reference for the second
    # argument on these routines, and its needed to determine the
    # correct size.
    s = string_at(result, last_arg_byref(cargs))
    # Freeing the memory allocated within GEOS
    free(result)
    return s
项目:NarshaTech    作者:KimJangHyeon    | 项目源码 | 文件源码
def check_string(result, func, cargs):
    """
    Error checking for routines that return strings.

    This frees the memory allocated by GEOS at the result pointer.
    """
    if not result:
        raise GEOSException('Error encountered checking string return value in GEOS C function "%s".' % func.__name__)
    # Getting the string value at the pointer address.
    s = string_at(result)
    # Freeing the memory allocated within GEOS
    free(result)
    return s
项目:Scrum    作者:prakharchoudhary    | 项目源码 | 文件源码
def check_string(result, func, cargs):
    if result:
        s = string_at(result)
        free(result)
    else:
        s = ''
    return s.decode(GEOIP_DEFAULT_ENCODING)
项目:Scrum    作者:prakharchoudhary    | 项目源码 | 文件源码
def check_string(result, func, cargs, offset=-1, str_result=False):
    """
    Checks the string output returned from the given function, and frees
    the string pointer allocated by OGR.  The `str_result` keyword
    may be used when the result is the string pointer, otherwise
    the OGR error code is assumed.  The `offset` keyword may be used
    to extract the string pointer passed in by-reference at the given
    slice offset in the function arguments.
    """
    if str_result:
        # For routines that return a string.
        ptr = result
        if not ptr:
            s = None
        else:
            s = string_at(result)
    else:
        # Error-code return specified.
        check_err(result)
        ptr = ptr_byref(cargs, offset)
        # Getting the string value
        s = ptr.value
    # Correctly freeing the allocated memory behind GDAL pointer
    # with the VSIFree routine.
    if ptr:
        lgdal.VSIFree(ptr)
    return s

# ### DataSource, Layer error-checking ###


# ### Envelope checking ###
项目:Scrum    作者:prakharchoudhary    | 项目源码 | 文件源码
def wkb(self):
        "Returns the WKB representation of the Geometry."
        if sys.byteorder == 'little':
            byteorder = 1  # wkbNDR (from ogr_core.h)
        else:
            byteorder = 0  # wkbXDR
        sz = self.wkb_size
        # Creating the unsigned character buffer, and passing it in by reference.
        buf = (c_ubyte * sz)()
        capi.to_wkb(self.ptr, byteorder, byref(buf))
        # Returning a buffer of the string at the pointer.
        return six.memoryview(string_at(buf, sz))
项目:Scrum    作者:prakharchoudhary    | 项目源码 | 文件源码
def check_sized_string(result, func, cargs):
    """
    Error checking for routines that return explicitly sized strings.

    This frees the memory allocated by GEOS at the result pointer.
    """
    if not result:
        raise GEOSException('Invalid string pointer returned by GEOS C function "%s"' % func.__name__)
    # A c_size_t object is passed in by reference for the second
    # argument on these routines, and its needed to determine the
    # correct size.
    s = string_at(result, last_arg_byref(cargs))
    # Freeing the memory allocated within GEOS
    free(result)
    return s
项目:Scrum    作者:prakharchoudhary    | 项目源码 | 文件源码
def check_string(result, func, cargs):
    """
    Error checking for routines that return strings.

    This frees the memory allocated by GEOS at the result pointer.
    """
    if not result:
        raise GEOSException('Error encountered checking string return value in GEOS C function "%s".' % func.__name__)
    # Getting the string value at the pointer address.
    s = string_at(result)
    # Freeing the memory allocated within GEOS
    free(result)
    return s
项目:pyShadowsocks    作者:FTwOoO    | 项目源码 | 文件源码
def bytes_from_buffer(buffer, maxlen=None):
    if isinstance(buffer, _pointer_int_types):
        return ctypes.string_at(buffer, maxlen)
    if maxlen is not None:
        return buffer.raw[0:maxlen]
    return buffer.raw
项目:pyShadowsocks    作者:FTwOoO    | 项目源码 | 文件源码
def native(type_, value):
    if isinstance(value, type_):
        return value
    if sys.version_info < (3,) and type_ == int and isinstance(value, int_types):
        return value
    if isinstance(value, ctypes.Array) and value._type_ == ctypes.c_byte:
        return ctypes.string_at(ctypes.addressof(value), value._length_)
    return type_(value.value)
项目:pyShadowsocks    作者:FTwOoO    | 项目源码 | 文件源码
def struct_bytes(struct_):
    return ctypes.string_at(struct_, ctypes.sizeof(struct_.contents))
项目:packet-queue    作者:google    | 项目源码 | 文件源码
def nfq_callback(qh, unused_nfmsg, nfad, unused_data):
  packet = nfq.nfq_get_msg_packet_hdr(nfad).contents
  packet_id = socket.ntohl(packet.packet_id)

  payload_pointer = ctypes.c_void_p()
  size = nfq.nfq_get_payload(nfad, ctypes.byref(payload_pointer))
  payload = ctypes.string_at(payload_pointer, size)

  packet = Packet(packet_id, size, payload, qh)
  py_callbacks[qh](packet)
  return 0

# Maps queue handles to user-specified callbacks.
项目:third_person_im    作者:bstadie    | 项目源码 | 文件源码
def body_names(self):
        start_addr = ctypes.addressof(self.names.contents)
        return [ctypes.string_at(start_addr + int(inc)).decode("utf-8")
                for inc in self.name_bodyadr.flatten()]