我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用ctypes.Array()。
def synchronized(obj, lock=None): assert not isinstance(obj, SynchronizedBase), 'object already synchronized' if isinstance(obj, ctypes._SimpleCData): return Synchronized(obj, lock) elif isinstance(obj, ctypes.Array): if obj._type_ is ctypes.c_char: return SynchronizedString(obj, lock) return SynchronizedArray(obj, lock) else: cls = type(obj) try: scls = class_cache[cls] except KeyError: names = [field[0] for field in cls._fields_] d = dict((name, make_property(name)) for name in names) classname = 'Synchronized' + cls.__name__ scls = class_cache[cls] = type(classname, (SynchronizedBase,), d) return scls(obj, lock) # # Functions for pickling/unpickling #
def __init__(self, *args, **kwds): super(Array, self).__init__(*args, **kwds) try: if self.flags & PAI_NOTSWAPPED: ct = self._ctypes[self.typekind, self.itemsize] elif c_int.__ctype_le__ is c_int: ct = self._ctypes[self.typekind, self.itemsize].__ctype_be__ else: ct = self._ctypes[self.typekind, self.itemsize].__ctype_le__ except KeyError: ct = c_uint8 * self.itemsize self._ctype = ct self._ctype_p = POINTER(ct)
def test_indices(self): a = self.a self.assertEqual(a[0, 0], 0) self.assertEqual(a[19, 0], 0) self.assertEqual(a[0, 14], 0) self.assertEqual(a[19, 14], 0) self.assertEqual(a[5, 8], 0) a[0, 0] = 12 a[5, 8] = 99 self.assertEqual(a[0, 0], 12) self.assertEqual(a[5, 8], 99) self.assertRaises(IndexError, a.__getitem__, (-1, 0)) self.assertRaises(IndexError, a.__getitem__, (0, -1)) self.assertRaises(IndexError, a.__getitem__, (20, 0)) self.assertRaises(IndexError, a.__getitem__, (0, 15)) self.assertRaises(ValueError, a.__getitem__, 0) self.assertRaises(ValueError, a.__getitem__, (0, 0, 0)) a = Array((3,), 'i', 4) a[1] = 333 self.assertEqual(a[1], 333)
def synchronized(obj, lock=None, ctx=None): assert not isinstance(obj, SynchronizedBase), 'object already synchronized' ctx = ctx or get_context() if isinstance(obj, ctypes._SimpleCData): return Synchronized(obj, lock, ctx) elif isinstance(obj, ctypes.Array): if obj._type_ is ctypes.c_char: return SynchronizedString(obj, lock, ctx) return SynchronizedArray(obj, lock, ctx) else: cls = type(obj) try: scls = class_cache[cls] except KeyError: names = [field[0] for field in cls._fields_] d = dict((name, make_property(name)) for name in names) classname = 'Synchronized' + cls.__name__ scls = class_cache[cls] = type(classname, (SynchronizedBase,), d) return scls(obj, lock, ctx) # # Functions for pickling/unpickling #
def __eq__(self, other): if not isinstance(other, ctypes.Array): print("Unexpected type. Expected: {0}. Received: {1}".format(ctypes.Array, type(other))) return False if len(other) < len(self.expected_string_value) + 1: # +1 for NULL terminating character print("Unexpected length in C string. Expected at least: {0}. Received {1}".format(len(other), len(self.expected_string_value) + 1)) return False if not isinstance(other[0], bytes): print("Unexpected type. Not a string. Received: {0}".format(type(other[0]))) return False if other.value.decode("ascii") != self.expected_string_value: print("Unexpected value. Expected {0}. Received: {1}".format(self.expected_string_value, other.value.decode)) return False return True # Custom Type
def Array(typecode_or_type, size_or_initializer, **kwds): ''' Return a synchronization wrapper for a RawArray ''' lock = kwds.pop('lock', None) if kwds: raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys()) obj = RawArray(typecode_or_type, size_or_initializer) if lock is False: return obj if lock in (True, None): lock = RLock() if not hasattr(lock, 'acquire'): raise AttributeError("'%r' has no method 'acquire'" % lock) return synchronized(obj, lock)
def reduce_ctype(obj): assert_spawning(obj) if isinstance(obj, ctypes.Array): return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_) else: return rebuild_ctype, (type(obj), obj._wrapper, None)
def __init__(self, *args, **kwds): unittest.TestCase.__init__(self, *args, **kwds) self.a = Array((20, 15), 'i', 4)
def test_typekind(self): a = Array((1,), 'i', 4) self.assertTrue(a._ctype is c_int32) self.assertTrue(a._ctype_p is POINTER(c_int32)) a = Array((1,), 'u', 4) self.assertTrue(a._ctype is c_uint32) self.assertTrue(a._ctype_p is POINTER(c_uint32)) a = Array((1,), 'f', 4) # float types unsupported: size system dependent ct = a._ctype self.assertTrue(issubclass(ct, ctypes.Array)) self.assertEqual(sizeof(ct), 4)
def test_itemsize(self): for size in [1, 2, 4, 8]: a = Array((1,), 'i', size) ct = a._ctype self.assertTrue(issubclass(ct, ctypes._SimpleCData)) self.assertEqual(sizeof(ct), size)
def test_byteswapped(self): a = Array((1,), 'u', 4, flags=(PAI_ALIGNED | PAI_WRITEABLE)) ct = a._ctype self.assertTrue(ct is not c_uint32) if sys.byteorder == 'little': self.assertTrue(ct is c_uint32.__ctype_be__) else: self.assertTrue(ct is c_uint32.__ctype_le__) i = 0xa0b0c0d n = c_uint32(i) a[0] = i self.assertEqual(a[0], i) self.assertEqual(a._data[0:4], cast(addressof(n), POINTER(c_uint8))[3:-1:-1])
def native(type_, value): if isinstance(value, type_): return value if sys.version_info < (3,) and type_ == int and isinstance(value, int_types): return value if isinstance(value, ctypes.Array) and value._type_ == ctypes.c_byte: return ctypes.string_at(ctypes.addressof(value), value._length_) return type_(value.value)
def from_param(self, param): typename = type(param).__name__ if hasattr(self, 'from_'+typename): return getattr(self, 'from_'+typename)(param) elif isinstance(param, ctypes.Array): return param else: raise TypeError("Can't convert %s" % typename) # Cast from array.array objects
def Array(typecode_or_type, size_or_initializer, **kwds): ''' Return a synchronization wrapper for a RawArray ''' lock = kwds.pop('lock', None) if kwds: raise ValueError('unrecognized keyword argument(s): %s' % list(kwds.keys())) obj = RawArray(typecode_or_type, size_or_initializer) if lock is False: return obj if lock in (True, None): lock = RLock() if not hasattr(lock, 'acquire'): raise AttributeError("'%r' has no method 'acquire'" % lock) return synchronized(obj, lock)
def to_dict(self): result = {} for name, _ in self._fields_: obj = getattr(self, name) if hasattr(obj, 'to_dict'): result[name] = obj.to_dict() elif isinstance(obj, Array): result[name] = str(list(obj)) else: result[name] = getattr(self, name) return result
def Array(typecode_or_type, size_or_initializer, *, lock=True, ctx=None): ''' Return a synchronization wrapper for a RawArray ''' obj = RawArray(typecode_or_type, size_or_initializer) if lock is False: return obj if lock in (True, None): ctx = ctx or get_context() lock = ctx.RLock() if not hasattr(lock, 'acquire'): raise AttributeError("'%r' has no method 'acquire'" % lock) return synchronized(obj, lock, ctx=ctx)
def default(self, obj): if isinstance(obj, (SkinningBone, SkinnedImage, skinnedmesh.SkinnedMesh)): d = obj.__dict__.copy() for ignore in JSON_IGNORES + getattr(obj, "jsonIgnore", []): if ignore in d: del d[ignore] return d elif isinstance(obj, euclid.Vector3): return (obj.x, obj.y, obj.z) elif isinstance(obj, ctypes.Array): return list(obj) return json.JSONEncoder.default(self, obj)
def evalonarray(lambdastr, array, length=None, **kwargs): """ Evaluates a function on an array using machine code. array can be a numpy array, a ctypes array or a pointer to an array. In the latter case, the correct length must be specified. array will be overwritten! Make a copy before to avoid this. """ # interpret arguments if hasattr(array, 'ctypes'): # numpy array pointer = array.ctypes.get_as_parameter() length = len(array) elif isinstance(array, ctypes.Array): # ctypes array pointer = ctypes.byref(array) length = len(array) elif isinstance(array, ctypes.c_void_p): # ctypes pointer FIXME pointer = array assert isinstance(length, int) and not length < 0 else: raise ValueError('array type not recognized') # generate code code = """ # include <math.h> # define pi M_PI # define e M_E %s void evalonarray(double *array, int length) { double* MAX; for (MAX = array + length; array < MAX; array++) { *array = f(*array); } } """ % genfcode(lambdastr, **kwargs) # compile an run on array run = _compile(code, fname='evalonarray', fprototype=[None, ctypes.c_void_p, ctypes.c_int]) run(pointer, length) ######### # TESTS # #########