Python ctypes 模块,sizeof() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ctypes.sizeof()

项目:PythonForWindows    作者:hakril    | 项目源码 | 文件源码
def teb_base(self):
        """The address of the thread's TEB

            :type: :class:`int`
        """
        if windows.current_process.bitness == 32 and self.owner.bitness == 64:
            restype = rctypes.transform_type_to_remote64bits(THREAD_BASIC_INFORMATION)
            ressize = (ctypes.sizeof(restype))
            # Manual aligned allocation :DDDD
            nb_qword = (ressize + 8) / ctypes.sizeof(ULONGLONG)
            buffer = (nb_qword * ULONGLONG)()
            struct_address = ctypes.addressof(buffer)
            if (struct_address & 0xf) not in [0, 8]:
                raise ValueError("ULONGLONG array not aligned on 8")
            windows.syswow64.NtQueryInformationThread_32_to_64(self.handle, ThreadBasicInformation, struct_address, ressize)
            return restype(struct_address, windows.current_process).TebBaseAddress

        res = THREAD_BASIC_INFORMATION()
        windows.winproxy.NtQueryInformationThread(self.handle, ThreadBasicInformation, byref(res), ctypes.sizeof(res))
        return res.TebBaseAddress
项目:LoLVRSpectate    作者:Fire-Proof    | 项目源码 | 文件源码
def list(self):
        """
        return a list of <PROCESSENTRY32>
        """
        processes = []
        hProcessSnap = CreateToolhelp32Snapshot(TH32CS_CLASS.SNAPPROCESS, 0)
        pe32 = PROCESSENTRY32()
        pe32.dwSize = sizeof(PROCESSENTRY32)
        ret = Process32First(hProcessSnap, pointer(pe32))
        while ret:
            ret = Process32Next(hProcessSnap, pointer(pe32))
            if pe32.dwFlags == 0:
                processes.append(copy.copy(pe32))
            else:
                break

        CloseHandle(hProcessSnap)
        return processes
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def inet_pton(address_family, ip_string):
        if address_family == socket.AF_INET:
            return socket.inet_aton(ip_string)

        addr = sockaddr()
        addr.sa_family = address_family
        addr_size = ctypes.c_int(ctypes.sizeof(addr))

        if WSAStringToAddressA(
                ip_string,
                address_family,
                None,
                ctypes.byref(addr),
                ctypes.byref(addr_size)
        ) != 0:
            raise socket.error(ctypes.FormatError())

        if address_family == socket.AF_INET6:
            return ctypes.string_at(addr.ipv6_addr, 16)

        raise socket.error('unknown address family')
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def append(self, obj):
        "append to the list; the object must be assignable to the ctype"
        size = self.size
        if size >= self.prealloc_size:
            # Need to make new space.  There's no 'realloc' for
            # ctypes so this isn't as nice as it is in C.
            # I'm using Python's growth pattern, which is
            #    0, 4, 8, 16, 25, 35, 46, 58, 72, 88, ...
            if size < 9:
                newsize = (size>>3) + 3 + size
            else:
                newsize = (size>>3) + 6 + size
            newdata = (self.c_type * newsize)()
            ctypes.memmove(newdata, self.data, ctypes.sizeof(self.data))
            self.data = newdata
            self.prealloc_size = newsize

        self.data[size] = obj
        self.size = size+1
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def append_kwargs(self, **kwargs):
        "append to the list; assign each key/value to the new item"
        size = self.size
        if size >= self.prealloc_size:
            if size < 9:
                newsize = (size>>3) + 3 + size
            else:
                newsize = (size>>3) + 6 + size
            newdata = (self.c_type * newsize)()
            ctypes.memmove(newdata, self.data, ctypes.sizeof(self.data))
            self.data = newdata
            self.prealloc_size = newsize

        obj = self.data[size]
        for k, v in kwargs.iteritems():
            setattr(obj, k, v)
        self.size = size+1
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def wait_until_idle(idle_time=60):
    """Wait until no more user activity is detected.

    This function won't return until `idle_time` seconds have elapsed
    since the last user activity was detected.
    """

    idle_time_ms = int(idle_time*1000)
    liinfo = LASTINPUTINFO()
    liinfo.cbSize = ctypes.sizeof(liinfo)
    while True:
        GetLastInputInfo(ctypes.byref(liinfo))
        elapsed = GetTickCount() - liinfo.dwTime
        if elapsed>=idle_time_ms:
            break
        Sleep(idle_time_ms - elapsed or 1)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def wait_until_active(tol=5):
    """Wait until awakened by user activity.

    This function will block and wait until some user activity
    is detected. Because of the polling method used, it may return
    `tol` seconds (or less) after user activity actually began.
    """

    liinfo = LASTINPUTINFO()
    liinfo.cbSize = ctypes.sizeof(liinfo)
    lasttime = None
    delay = 1 # ms
    maxdelay = int(tol*1000)
    while True:
        GetLastInputInfo(ctypes.byref(liinfo))
        if lasttime is None: lasttime = liinfo.dwTime
        if lasttime != liinfo.dwTime:
            break
        delay = min(2*delay, maxdelay)
        Sleep(delay)
项目:supremm    作者:ubccr    | 项目源码 | 文件源码
def pcpfastExtractValues(result_p, vsetidx, vlistidx, dtype):
    """ quicker implementation of pmExtractValue than the default provided with the pcp python bindings
        this version saves converting the C indexes to python and back again
    """

    inst = c_int()
    outAtom = pmapi.pmAtomValue()
    status = LIBPCPFAST.pcpfastExtractValues(result_p, byref(inst), byref(outAtom), vsetidx, vlistidx, dtype)
    if status < 0:
        raise pmapi.pmErr(status)

    if dtype == c_api.PM_TYPE_STRING:
        # Get pointer to C string
        c_str = c_char_p()
        memmove(byref(c_str), addressof(outAtom) + pmapi.pmAtomValue.cp.offset, sizeof(c_char_p))
        # Convert to a python string and have result point to it
        outAtom.cp = outAtom.cp
        # Free the C string
        LIBC.free(c_str)

    return outAtom.dref(dtype), inst.value
项目:openni-python    作者:severin-lemaignan    | 项目源码 | 文件源码
def oni_call(func):
    @functools.wraps(func)
    def wrapper(*args):
        res = func(*args)
        if res != OniStatus.ONI_STATUS_OK:
            msg = oniGetExtendedError()
            if not msg:
                msg = ''
            buf = ctypes.create_string_buffer(1024)
            rc = _oniGetLogFileName(buf, ctypes.sizeof(buf))
            if rc == OniStatus.ONI_STATUS_OK:
                logfile = buf.value
            else:
                logfile = None
            raise OpenNIError(res, msg.strip(), logfile)
        return res

    return wrapper
项目:PythonForWindows    作者:hakril    | 项目源码 | 文件源码
def new_aligned(cls):
        """Return a new :class:`ECONTEXT64` aligned on 16 bits

           temporary workaround or horrible hack ? choose your side
        """
        size = ctypes.sizeof(cls)
        nb_qword = (size + 8) / ctypes.sizeof(ULONGLONG)
        buffer = (nb_qword * ULONGLONG)()
        struct_address = ctypes.addressof(buffer)
        if (struct_address & 0xf) not in [0, 8]:
            raise ValueError("ULONGLONG array not aligned on 8")
        if (struct_address & 0xf) == 8:
            struct_address += 8
        self = cls.from_address(struct_address)
        # Keep the raw buffer alive
        self._buffer = buffer
        return self
项目:PythonForWindows    作者:hakril    | 项目源码 | 文件源码
def start_address(self):
        """The start address of the thread

            :type: :class:`int`
        """
        if windows.current_process.bitness == 32 and self.owner.bitness == 64:
            res = ULONGLONG()
            windows.syswow64.NtQueryInformationThread_32_to_64(self.handle, ThreadQuerySetWin32StartAddress, byref(res), ctypes.sizeof(res))
            return res.value
        res_size = max(self.owner.bitness, windows.current_process.bitness)
        if res_size == 32:
            res = ULONG()
        else:
            res = ULONGLONG()
        winproxy.NtQueryInformationThread(self.handle, ThreadQuerySetWin32StartAddress, byref(res), ctypes.sizeof(res))
        return res.value
项目:PythonForWindows    作者:hakril    | 项目源码 | 文件源码
def ppid(self):
        """Parent Process ID

        :type: :class:`int`
        """
        if windows.current_process.bitness == 32 and self.bitness == 64:
            xtype = windows.remotectypes.transform_type_to_remote64bits(PROCESS_BASIC_INFORMATION)
            # Fuck-it <3
            data = (ctypes.c_char * ctypes.sizeof(xtype))()
            windows.syswow64.NtQueryInformationProcess_32_to_64(self.handle, ProcessInformation=data, ProcessInformationLength=ctypes.sizeof(xtype))
            # Map a remote64bits(PROCESS_BASIC_INFORMATION) at the address of 'data'
            x = xtype(ctypes.addressof(data), windows.current_process)
        else:
            information_type = 0
            x = PROCESS_BASIC_INFORMATION()
            winproxy.NtQueryInformationProcess(self.handle, information_type, x)
        return x.InheritedFromUniqueProcessId
项目:PythonForWindows    作者:hakril    | 项目源码 | 文件源码
def query_memory(self, addr):
        """Query the memory informations about page at ``addr``

        :rtype: :class:`~windows.generated_def.MEMORY_BASIC_INFORMATION`
        """
        if windows.current_process.bitness == 32 and self.bitness == 64:
            res = MEMORY_BASIC_INFORMATION64()
            try:
                v = windows.syswow64.NtQueryVirtualMemory_32_to_64(ProcessHandle=self.handle, BaseAddress=addr, MemoryInformationClass=MemoryBasicInformation, MemoryInformation=res)
            except NtStatusException as e:
                if e.code & 0xffffffff == 0XC000000D:
                    raise winproxy.Kernel32Error("NtQueryVirtualMemory_32_to_64")
                raise
            return res

        info_type = {32 : MEMORY_BASIC_INFORMATION32, 64 : MEMORY_BASIC_INFORMATION64}
        res = info_type[windows.current_process.bitness]()
        ptr = ctypes.cast(byref(res), POINTER(MEMORY_BASIC_INFORMATION))
        winproxy.VirtualQueryEx(self.handle, addr, ptr, sizeof(res))
        return res
项目:PythonForWindows    作者:hakril    | 项目源码 | 文件源码
def peb_syswow_addr(self):
        if not self.is_wow_64:
            raise ValueError("Not a syswow process")
        if windows.current_process.bitness == 64:
            information_type = 0
            x = PROCESS_BASIC_INFORMATION()
            winproxy.NtQueryInformationProcess(self.handle, information_type, x)
            peb_addr = ctypes.cast(x.PebBaseAddress, PVOID).value
            return peb_addr
        else: #current is 32bits
            x = windows.remotectypes.transform_type_to_remote64bits(PROCESS_BASIC_INFORMATION)
            # Fuck-it <3
            data = (ctypes.c_char * ctypes.sizeof(x))()
            windows.syswow64.NtQueryInformationProcess_32_to_64(self.handle, ProcessInformation=data, ProcessInformationLength=ctypes.sizeof(x))
            peb_offset = x.PebBaseAddress.offset
            peb_addr = struct.unpack("<Q", data[x.PebBaseAddress.offset: x.PebBaseAddress.offset+8])[0]
            return peb_addr

    # Not a fixedpropety to prevent ref-cycle and uncollectable WinProcess
    # Try with a weakref ?
项目:PythonForWindows    作者:hakril    | 项目源码 | 文件源码
def get_INT(self):
        if not self.OriginalFirstThunk:
            return None
        int_addr = self.OriginalFirstThunk + self.baseaddr
        int_entry = self.transformers.create_structure_at(THUNK_DATA, int_addr)
        res = []
        while int_entry.Ordinal:
            if int_entry.Ordinal & self.IMAGE_ORDINAL_FLAG:
                res += [(int_entry.Ordinal & 0x7fffffff, None)]
            else:
                import_by_name = self.transformers.create_structure_at(IMPORT_BY_NAME, self.baseaddr + int_entry.AddressOfData)
                name_address = self.baseaddr + int_entry.AddressOfData + type(import_by_name).Name.offset
                if self.target is None:
                    name = get_string(self.target, name_address)
                else:
                    name = get_string(self.target, name_address).decode()
                res.append((import_by_name.Hint, name))
            int_addr += ctypes.sizeof(type(int_entry))
            int_entry = self.transformers.create_structure_at(THUNK_DATA, int_addr)
        return res
项目:PythonForWindows    作者:hakril    | 项目源码 | 文件源码
def get_raw_certificate_chains(self): # Rename to all_chains ?
        chain_context = EPCCERT_CHAIN_CONTEXT()

        enhkey_usage = gdef.CERT_ENHKEY_USAGE()
        enhkey_usage.cUsageIdentifier = 0
        enhkey_usage.rgpszUsageIdentifier = None

        cert_usage = gdef.CERT_USAGE_MATCH()
        cert_usage.dwType = gdef.USAGE_MATCH_TYPE_AND
        cert_usage.Usage   = enhkey_usage

        chain_para = gdef.CERT_CHAIN_PARA()
        chain_para.cbSize = ctypes.sizeof(chain_para)
        chain_para.RequestedUsage = cert_usage

        winproxy.CertGetCertificateChain(None, self, None, self[0].hCertStore, ctypes.byref(chain_para), 0, None, ctypes.byref(chain_context))
        #return CertficateChain(chain_context)
        return chain_context
项目:PythonForWindows    作者:hakril    | 项目源码 | 文件源码
def enable_privilege(lpszPrivilege, bEnablePrivilege):
    """
    Enable or disable a privilege::

        enable_privilege(SE_DEBUG_NAME, True)
    """
    tp = TOKEN_PRIVILEGES()
    luid = LUID()
    hToken = HANDLE()

    winproxy.OpenProcessToken(winproxy.GetCurrentProcess(), TOKEN_ALL_ACCESS, byref(hToken))
    winproxy.LookupPrivilegeValueA(None, lpszPrivilege, byref(luid))
    tp.PrivilegeCount = 1
    tp.Privileges[0].Luid = luid
    if bEnablePrivilege:
        tp.Privileges[0].Attributes = SE_PRIVILEGE_ENABLED
    else:
        tp.Privileges[0].Attributes = 0
    winproxy.AdjustTokenPrivileges(hToken, False, byref(tp), sizeof(TOKEN_PRIVILEGES))
    winproxy.CloseHandle(hToken)
    if winproxy.GetLastError() == windef.ERROR_NOT_ALL_ASSIGNED:
        raise ValueError("Failed to get privilege {0}".format(lpszPrivilege))
    return True
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def _ram(self):
        kernel32 = ctypes.windll.kernel32
        c_ulong = ctypes.c_ulong
        class MEMORYSTATUS(ctypes.Structure):
            _fields_ = [
                ('dwLength', c_ulong),
                ('dwMemoryLoad', c_ulong),
                ('dwTotalPhys', c_ulong),
                ('dwAvailPhys', c_ulong),
                ('dwTotalPageFile', c_ulong),
                ('dwAvailPageFile', c_ulong),
                ('dwTotalVirtual', c_ulong),
                ('dwAvailVirtual', c_ulong)
            ]

        memoryStatus = MEMORYSTATUS()
        memoryStatus.dwLength = ctypes.sizeof(MEMORYSTATUS)
        kernel32.GlobalMemoryStatus(ctypes.byref(memoryStatus))
        return (memoryStatus.dwTotalPhys, memoryStatus.dwAvailPhys)
项目:LoLVRSpectate    作者:Fire-Proof    | 项目源码 | 文件源码
def write_bytes(self, address, data):
        address = int(address)
        if not self.isProcessOpen:
            raise ProcessException("Can't write_bytes(%s, %s), process %s is not open" % (address, data, self.pid))
        buffer = create_string_buffer(data)
        sizeWriten = c_ulong(0)
        bufferSize = sizeof(buffer) - 1
        _address = address
        _length = bufferSize + 1
        try:
            old_protect = self.VirtualProtectEx(_address, _length, PAGE_EXECUTE_READWRITE)
        except:
            pass

        res = windll.kernel32.WriteProcessMemory(self.h_process, address, buffer, bufferSize, byref(sizeWriten))
        try:
            self.VirtualProtectEx(_address, _length, old_protect)
        except:
            pass

        return res
项目:LoLVRSpectate    作者:Fire-Proof    | 项目源码 | 文件源码
def list_modules(self):
        """
        return a list of <MODULEENTRY32>
        """
        module_list = []
        if self.process32 is not None:
            hModuleSnap = CreateToolhelp32Snapshot(TH32CS_CLASS.SNAPMODULE, self.process32.th32ProcessID)
            if hModuleSnap is not None:
                module_entry = MODULEENTRY32()
                module_entry.dwSize = sizeof(module_entry)
                success = Module32First(hModuleSnap, byref(module_entry))
                while success:
                    if module_entry.th32ProcessID == self.process32.th32ProcessID:
                        module_list.append(copy.copy(module_entry))
                    success = Module32Next(hModuleSnap, byref(module_entry))

                CloseHandle(hModuleSnap)
        return module_list
项目:SwiftKitten    作者:johncsnyder    | 项目源码 | 文件源码
def test_bitfield(self):
        # struct bitfield { int a:10, b:20, c:3; };
        assert ffi.sizeof("struct bitfield") == 8
        s = ffi.new("struct bitfield *")
        s.a = 511
        py.test.raises(OverflowError, "s.a = 512")
        py.test.raises(OverflowError, "s[0].a = 512")
        assert s.a == 511
        s.a = -512
        py.test.raises(OverflowError, "s.a = -513")
        py.test.raises(OverflowError, "s[0].a = -513")
        assert s.a == -512
        s.c = 3
        assert s.c == 3
        py.test.raises(OverflowError, "s.c = 4")
        py.test.raises(OverflowError, "s[0].c = 4")
        s.c = -4
        assert s.c == -4
项目:SwiftKitten    作者:johncsnyder    | 项目源码 | 文件源码
def test_ffi_buffer_with_file(self):
        import tempfile, os, array
        fd, filename = tempfile.mkstemp()
        f = os.fdopen(fd, 'r+b')
        a = ffi.new("int[]", list(range(1005)))
        try:
            ffi.buffer(a, 512)
        except NotImplementedError as e:
            py.test.skip(str(e))
        f.write(ffi.buffer(a, 1000 * ffi.sizeof("int")))
        f.seek(0)
        assert f.read() == array.array('i', range(1000)).tostring()
        f.seek(0)
        b = ffi.new("int[]", 1005)
        f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int")))
        assert list(a)[:1000] + [0] * (len(a)-1000) == list(b)
        f.close()
        os.unlink(filename)
项目:SwiftKitten    作者:johncsnyder    | 项目源码 | 文件源码
def test_nested_anonymous_struct(self):
        # struct nested_anon {
        #     struct { int a, b; };
        #     union { int c, d; };
        # };
        assert ffi.sizeof("struct nested_anon") == 3 * SIZE_OF_INT
        p = ffi.new("struct nested_anon *", [1, 2, 3])
        assert p.a == 1
        assert p.b == 2
        assert p.c == 3
        assert p.d == 3
        p.d = 17
        assert p.c == 17
        p.b = 19
        assert p.a == 1
        assert p.b == 19
        assert p.c == 17
        assert p.d == 17
        p = ffi.new("struct nested_anon *", {'b': 12, 'd': 14})
        assert p.a == 0
        assert p.b == 12
        assert p.c == 14
        assert p.d == 14
项目:SwiftKitten    作者:johncsnyder    | 项目源码 | 文件源码
def test_struct_packed(self):
        # struct nonpacked { char a; int b; };
        # struct is_packed { char a; int b; } __attribute__((packed));
        assert ffi.sizeof("struct nonpacked") == 8
        assert ffi.sizeof("struct is_packed") == 5
        assert ffi.alignof("struct nonpacked") == 4
        assert ffi.alignof("struct is_packed") == 1
        s = ffi.new("struct is_packed[2]")
        s[0].b = 42623381
        s[0].a = b'X'
        s[1].b = -4892220
        s[1].a = b'Y'
        assert s[0].b == 42623381
        assert s[0].a == b'X'
        assert s[1].b == -4892220
        assert s[1].a == b'Y'
项目:SwiftKitten    作者:johncsnyder    | 项目源码 | 文件源码
def test_bitfield(self):
        ffi = FFI(backend=self.Backend())
        ffi.cdef("struct foo { int a:10, b:20, c:3; };")
        assert ffi.sizeof("struct foo") == 8
        s = ffi.new("struct foo *")
        s.a = 511
        py.test.raises(OverflowError, "s.a = 512")
        py.test.raises(OverflowError, "s[0].a = 512")
        assert s.a == 511
        s.a = -512
        py.test.raises(OverflowError, "s.a = -513")
        py.test.raises(OverflowError, "s[0].a = -513")
        assert s.a == -512
        s.c = 3
        assert s.c == 3
        py.test.raises(OverflowError, "s.c = 4")
        py.test.raises(OverflowError, "s[0].c = 4")
        s.c = -4
        assert s.c == -4
项目:SwiftKitten    作者:johncsnyder    | 项目源码 | 文件源码
def test_ffi_buffer_with_file(self):
        ffi = FFI(backend=self.Backend())
        import tempfile, os, array
        fd, filename = tempfile.mkstemp()
        f = os.fdopen(fd, 'r+b')
        a = ffi.new("int[]", list(range(1005)))
        try:
            ffi.buffer(a, 512)
        except NotImplementedError as e:
            py.test.skip(str(e))
        f.write(ffi.buffer(a, 1000 * ffi.sizeof("int")))
        f.seek(0)
        assert f.read() == array.array('i', range(1000)).tostring()
        f.seek(0)
        b = ffi.new("int[]", 1005)
        f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int")))
        assert list(a)[:1000] + [0] * (len(a)-1000) == list(b)
        f.close()
        os.unlink(filename)
项目:SwiftKitten    作者:johncsnyder    | 项目源码 | 文件源码
def test_ffi_buffer_with_io(self):
        ffi = FFI(backend=self.Backend())
        import io, array
        f = io.BytesIO()
        a = ffi.new("int[]", list(range(1005)))
        try:
            ffi.buffer(a, 512)
        except NotImplementedError as e:
            py.test.skip(str(e))
        f.write(ffi.buffer(a, 1000 * ffi.sizeof("int")))
        f.seek(0)
        assert f.read() == array.array('i', range(1000)).tostring()
        f.seek(0)
        b = ffi.new("int[]", 1005)
        f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int")))
        assert list(a)[:1000] + [0] * (len(a)-1000) == list(b)
        f.close()
项目:SwiftKitten    作者:johncsnyder    | 项目源码 | 文件源码
def test_struct_packed(self):
        ffi = FFI(backend=self.Backend())
        ffi.cdef("struct nonpacked { char a; int b; };")
        ffi.cdef("struct is_packed { char a; int b; };", packed=True)
        assert ffi.sizeof("struct nonpacked") == 8
        assert ffi.sizeof("struct is_packed") == 5
        assert ffi.alignof("struct nonpacked") == 4
        assert ffi.alignof("struct is_packed") == 1
        s = ffi.new("struct is_packed[2]")
        s[0].b = 42623381
        s[0].a = b'X'
        s[1].b = -4892220
        s[1].a = b'Y'
        assert s[0].b == 42623381
        assert s[0].a == b'X'
        assert s[1].b == -4892220
        assert s[1].a == b'Y'
项目:MCExtractor    作者:platomav    | 项目源码 | 文件源码
def get_struct(input_stream, start_offset, class_name, param_list = None) :
    if param_list is None : param_list = []

    structure = class_name(*param_list) # Unpack parameter list
    struct_len = ctypes.sizeof(structure)
    struct_data = input_stream[start_offset:start_offset + struct_len]
    fit_len = min(len(struct_data), struct_len)

    if (start_offset > file_end) or (fit_len < struct_len) :
        print(col_r + "Error: Offset 0x%X out of bounds, possibly incomplete image!" % start_offset + col_e)

        mce_exit(1)

    ctypes.memmove(ctypes.addressof(structure), struct_data, fit_len)

    return structure
项目:python-doublescript    作者:fdintino    | 项目源码 | 文件源码
def extra_ivars(cls, base):
    c_type = c_typeobj(cls)
    c_base = c_typeobj(base)
    t_size = c_type.tp_basicsize
    b_size = c_base.tp_basicsize

    sizeof_pyobj = ctypes.sizeof(ctypes.py_object)

    if c_type.tp_itemsize or c_base.tp_itemsize:
        return t_size != b_size or c_type.tp_itemsize != c_base.tp_itemsize

    if is_heap_type(cls):
        if c_type.tp_weaklistoffset and c_base.tp_weaklistoffset == 0:
            if c_type.tp_weaklistoffset + sizeof_pyobj == t_size:
                t_size -= sizeof_pyobj
        if c_type.tp_dictoffset and c_base.tp_dictoffset == 0:
            if c_type.tp_dictoffset + sizeof_pyobj == t_size:
                t_size -= sizeof_pyobj

    return t_size != b_size
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))


####################################################################################
#
# SDDS Payload Containers
#
####################################################################################
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def set_data(self, samples ):
           if type(samples) == list:
                for i,x in enumerate(samples):
                     if i < sdds_cb_payload.NUM_SAMPLES:
                          self.data[i] = x
           else:
                fit = min(len(samples), ctypes.sizeof(self))
                ctypes.memmove(ctypes.addressof(self), samples, fit)
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def set_data(self, samples ):
           if type(samples) == list:
                for i,x in enumerate(samples):
                     if i < sdds_si_payload.NUM_SAMPLES:
                          self.data[i] = x
           else:
                fit = min(len(samples), ctypes.sizeof(self))
                ctypes.memmove(ctypes.addressof(self), samples, fit)
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def set_data(self, samples ):
           if type(samples) == list:
                for i,x in enumerate(samples):
                     if i < sdds_sn_payload.NUM_SAMPLES:
                          self.data[i] = x
           else:
                fit = min(len(samples), ctypes.sizeof(self))
                ctypes.memmove(ctypes.addressof(self), samples, fit)
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def asString(self):
          return ctypes.string_at(ctypes.addressof(self),ctypes.sizeof(self))
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def set_data(self, samples ):
           if type(samples) == list:
                for i,x in enumerate(samples):
                     if i < sdds_sf_payload.NUM_SAMPLES:
                          self.data[i] = x
           else:
                fit = min(len(samples), ctypes.sizeof(self))
                ctypes.memmove(ctypes.addressof(self), samples, fit)