我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ctypes.c_uint8()。
def _build_button_map(self, config): """ Returns button map readed from configuration, in format situable for HIDDecoder.buttons.button_map field. Generates default if config is not available. """ if config: # Last possible value is default "maps-to-nothing" mapping buttons = [BUTTON_COUNT - 1] * BUTTON_COUNT for keycode, value in config.get('buttons', {}).items(): keycode = int(keycode) - FIRST_BUTTON if keycode < 0 or keycode >= BUTTON_COUNT: # Out of range continue if value in TRIGGERS: # Not used here pass else: buttons[keycode] = self.button_to_bit(getattr(SCButtons, value)) else: buttons = list(xrange(BUTTON_COUNT)) return (ctypes.c_uint8 * BUTTON_COUNT)(*buttons)
def Pokearray(length): # okay, I learned. # It's not possible to use a custom base class # in a ctypes.Structure field. Forget about it @classmethod def fromBytes(cls, data): return cls(*data) def asBytes(self): return bytes(iter(self)) t = ctypes.c_uint8 * length t.fromBytes = fromBytes t.bytes = asBytes return t
def setUp(self): import enum, ctypes class Fruit(enum.Enum): Apple, Banana, Citron = range(3) class TestStruct(PokeStructure): _fields_ = [ ("_fruit", ctypes.c_uint8), ] _adapters_ = [ ("_fruit", Fruit) ] self.Fruit = Fruit self.TestStruct = TestStruct
def _return_ctype(self): """ Returns the associated ctype of a given datatype. """ _datatype_ctype = { DataType.Bool: ctypes.c_uint8, DataType.I8: ctypes.c_int8, DataType.U8: ctypes.c_uint8, DataType.I16: ctypes.c_int16, DataType.U16: ctypes.c_uint16, DataType.I32: ctypes.c_int32, DataType.U32: ctypes.c_uint32, DataType.I64: ctypes.c_int64, DataType.U64: ctypes.c_uint64, DataType.Sgl: ctypes.c_float, DataType.Dbl: ctypes.c_double, } return _datatype_ctype[self]
def testCallReadSuccess(self, thread, iokit, cf, GetDeviceIntProperty): # pylint: disable=invalid-name init_mock_iokit(iokit) init_mock_cf(cf) init_mock_get_int_property(GetDeviceIntProperty) device = macos.MacOsHidDevice('fakepath') # Call callback for IN report report = (ctypes.c_uint8 * 64)() report[:] = range(64)[:] q = device.read_queue macos.HidReadCallback(q, None, None, None, 0, report, 64) # Device read should return the callback data read_result = device.Read() self.assertEquals(read_result, range(64), 'Read data should match data ' 'passed into the callback')
def _decode_video_packet(self, packet): width = self.video_format.width height = self.video_format.height pitch = width * 3 buffer = (ctypes.c_uint8 * (pitch * height))() result = av.avbin_decode_video(self._video_stream, packet.data, packet.size, buffer) if result < 0: image_data = None else: image_data = image.ImageData(width, height, 'RGB', buffer, pitch) packet.image = image_data # Notify get_next_video_frame() that another one is ready. self._condition.acquire() self._condition.notify() self._condition.release()
def update(self, source): """ Update the overlay with a new source of data. The new *source* buffer must have the same size as the original buffer used to create the overlay. There is currently no method for changing the size of an existing overlay (remove and recreate the overlay if you require this). """ port = self.renderer[0].input[0] fmt = port[0].format bp = ct.c_uint8 * (fmt[0].es[0].video.width * fmt[0].es[0].video.height * 3) try: sp = bp.from_buffer(source) except TypeError: sp = bp.from_buffer_copy(source) buf = mmal.mmal_queue_get(self.pool[0].queue) if not buf: raise PiCameraRuntimeError( "Couldn't get a buffer from the overlay's pool") ct.memmove(buf[0].data, sp, buf[0].alloc_size) buf[0].length = buf[0].alloc_size mmal_check( mmal.mmal_port_send_buffer(port, buf), prefix="Unable to send a buffer to the overlay's port")
def _read_block(self, block): """Reads a block from a Mifare Card after authentication Returns the data read or raises an exception """ if nfc.nfc_device_set_property_bool(self.__device, nfc.NP_EASY_FRAMING, True) < 0: raise Exception("Error setting Easy Framing property") abttx = (ctypes.c_uint8 * 2)() abttx[0] = self.MC_READ abttx[1] = block abtrx = (ctypes.c_uint8 * 250)() res = nfc.nfc_initiator_transceive_bytes(self.__device, ctypes.pointer(abttx), len(abttx), ctypes.pointer(abtrx), len(abtrx), 0) if res < 0: raise IOError("Error reading data") return "".join([chr(abtrx[i]) for i in range(res)])
def __write_block(self, block, data): """Writes a block of data to a Mifare Card after authentication Raises an exception on error """ if nfc.nfc_device_set_property_bool(self.__device, nfc.NP_EASY_FRAMING, True) < 0: raise Exception("Error setting Easy Framing property") if len(data) > 16: raise ValueError("Data value to be written cannot be more than 16 characters.") abttx = (ctypes.c_uint8 * 18)() abttx[0] = self.MC_WRITE abttx[1] = block abtrx = (ctypes.c_uint8 * 250)() for i in range(16): abttx[i + 2] = ord((data + "\x00" * (16 - len(data)))[i]) return nfc.nfc_initiator_transceive_bytes(self.__device, ctypes.pointer(abttx), len(abttx), ctypes.pointer(abtrx), len(abtrx), 0)
def write(self, addr, data): """ Writes data from the array into the device starting at the given address. @param int addr: Start address of the memory block to write. @param sequence data: Data to write. Any type that implements the sequence API (i.e. string, list, bytearray...) is valid as input. """ if not self._is_u32(addr): raise ValueError('The addr parameter must be an unsigned 32-bit value.') if not self._is_valid_buf(data): raise ValueError('The data parameter must be a sequence type with at least one item.') addr = ctypes.c_uint32(addr) data_len = ctypes.c_uint32(len(data)) data = (ctypes.c_uint8 * data_len.value)(*data) self.jlink.JLINKARM_WriteMem(addr, data_len, ctypes.byref(data))
def read(self, addr, data_len): """ Reads data_len bytes from the device starting at the given address. @param int addr: Start address of the memory block to read. @param int data_len: Number of bytes to read. @return [int]: List of values read. """ if not self._is_u32(addr): raise ValueError('The addr parameter must be an unsigned 32-bit value.') if not self._is_u32(data_len): raise ValueError('The data_len parameter must be an unsigned 32-bit value.') addr = ctypes.c_uint32(addr) data_len = ctypes.c_uint32(data_len) data = (ctypes.c_uint8 * data_len.value)() self.jlink.JLINKARM_ReadMem(addr, data_len, ctypes.byref(data)) return bytes(data)
def callback_friend_request(self, callback, user_data): """ Set the callback for the `friend_request` event. Pass None to unset. This event is triggered when a friend request is received. :param callback: Python function. Should take pointer (c_void_p) to Tox object, The Public Key (c_uint8 array) of the user who sent the friend request, The message (c_char_p) they sent along with the request, The size (c_size_t) of the message byte array, pointer (c_void_p) to user_data :param user_data: pointer (c_void_p) to user data """ c_callback = CFUNCTYPE(None, c_void_p, POINTER(c_uint8), c_char_p, c_size_t, c_void_p) self.friend_request_cb = c_callback(callback) Tox.libtoxcore.tox_callback_friend_request(self._tox_pointer, self.friend_request_cb, c_void_p(user_data))
def callback_friend_lossless_packet(self, callback, user_data): """ Set the callback for the `friend_lossless_packet` event. Pass NULL to unset. :param callback: Python function. Should take pointer (c_void_p) to Tox object, friend_number (c_uint32) - The friend number of the friend who sent a lossless packet, A byte array (c_uint8 array) containing the received packet data, length (c_size_t) - The length of the packet data byte array, pointer (c_void_p) to user_data :param user_data: pointer (c_void_p) to user data """ c_callback = CFUNCTYPE(None, c_void_p, c_uint32, POINTER(c_uint8), c_size_t, c_void_p) self.friend_lossless_packet_cb = c_callback(callback) self.libtoxcore.tox_callback_friend_lossless_packet(self._tox_pointer, self.friend_lossless_packet_cb, user_data) # ----------------------------------------------------------------------------------------------------------------- # Low-level network information # -----------------------------------------------------------------------------------------------------------------
def __init__(self, name='color', width=640, height=480, fps=30, color_format='rgb'): self.native = True self.stream = rs_stream.RS_STREAM_COLOR if color_format == 'rgb': self.format = rs_format.RS_FORMAT_RGB8 n_channels = 3 elif color_format == 'bgr': self.format = rs_format.RS_FORMAT_BGR8 n_channels = 3 elif color_format == 'yuv': self.format = rs_format.RS_FORMAT_YUYV n_channels = 2 else: raise ValueError('Unknown color format. Expected rgb, bgr, or yuv ({} given)').format(color_format) self.shape = (height, width, n_channels) self.dtype = ctypes.c_uint8 super(ColorStream, self).__init__(name, self.native, self.stream, width, height, self.format, fps)
def __init__(self, name='cad', width=640, height=480, fps=30, color_format='rgb'): self.native = False self.stream = rs_stream.RS_STREAM_COLOR_ALIGNED_TO_DEPTH if color_format == 'rgb': self.format = rs_format.RS_FORMAT_RGB8 n_channels = 3 elif color_format == 'bgr': self.format = rs_format.RS_FORMAT_BGR8 n_channels = 3 elif color_format == 'yuv': self.format = rs_format.RS_FORMAT_YUYV n_channels = 2 else: raise ValueError('Unknown color format. Expected rgb, bgr, or yuv ({} given)').format(color_format) self.shape = (height, width, n_channels) self.dtype = ctypes.c_uint8 super(CADStream, self).__init__(name, self.native, self.stream, width, height, self.format, fps)
def set_leds_state(pixels): """Set LED's Layout of the array: RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB RGB The first 24 bytes of the array stores the first line of the LED's, the following 24 bytes stores the second line, etc. Each value of the array should not be greater than 0x1F. Note: An exception is thrown if it fails. """ buffer = (ctypes.c_uint8 * len(pixels))(*pixels) ret = _LIB.rpisensehat_set_leds_state(buffer) if ret < 0: raise Exception("rpisensehat set leds state failed")
def load_image(ptr, options, data): """Load a JPEG image in the FT800 chip memory. ptr: Starting address to decompress the image options: Can be FT800_OPT_RGB565 or FT800_OPT_MONO data: A list of bytes Note: An exception is thrown if it fails to load the image in the memory of the FT800 chip. """ length = len(data) _data = (ctypes.c_uint8 * length)(*data) ret = _LIB.eve_click_load_image(ptr, options, _data, length) if ret < 0: raise Exception("eve click load image failed")
def inflate(ptr, data): """Decompress some data in FT800 memory. The data must have been compressed using the DEFLATE algorithm. ptr: Starting address to decompress data data: A list of bytes. Note: An exception is thrown if it fails to decompress data. """ length = len(data) _data = (ctypes.c_uint8 * length)(*data) ret = _LIB.eve_click_inflate(ptr, _data, length) if ret < 0: raise Exception("eve click inflate failed")
def write(self, data): """Writes samples of the given SampleSpec to the PulseAudio server.""" self._fail_if_not_connected() # TODO: Use self.create_data_array... if self._sample_format == SampleFormat.PA_SAMPLE_U8: size = len(data) c_data = (ctypes.c_uint8 * size)(*data) elif self._sample_format == SampleFormat.PA_SAMPLE_S16LE: raise Exception('Not implemented yet.') elif self._sample_format == SampleFormat.PA_SAMPLE_FLOAT32LE: raise Exception('Not implemented yet.') error = ctypes.c_int(0) result = _libpulse_simple.pa_simple_write(self._client, c_data, size, error) if result < 0: raise PulseAudioException('Could not write data.', error.value)
def memalign(cty, align): """Allocate a ctype object on the specific byte alignment """ # Allocate bytes with offset mem = (c_uint8 * (sizeof(cty) + align))() addr = addressof(mem) # Move to alignment offset = addr % align if offset: offset = align - offset buf = cty.from_address(offset + addr) assert 0 == addressof(buf) % align return buf, mem
def __init__(self, initializer): from ctypes import cast, POINTER, c_uint8 Array3D = PixelCopyTestWithArray.Array3D super(Array3D, self).__init__((3, 5, 3), format='B', strides=(20, 4, 1)) self.content = cast(self.buf, POINTER(c_uint8)) for i, v in enumerate(initializer): self.content[i] = v
def __init__(self, blob): # self.data = ctypes.c_uint8 * int(len(blob)) self.data = ctypes.create_string_buffer(len(blob))
def fixed_length(cls, clskey, length): class foo(cls): key = clskey _fields_ = [("data", ctypes.c_uint8*length)] return foo
def get_buffer_as_uint8(self): return self.get_buffer_as(ctypes.c_uint8)
def get_buffer_as_triplet(self): return self.get_buffer_as(ctypes.c_uint8 * 3)
def validate_u8(val, p, key=None): if isinstance(val, int) and ctypes.c_uint8(val).value == val: return raise_validation_error(ErrInvalidU8, val, p, key=key)
def struct(cls, ea, **sid): """Return the structure_t at address ``ea`` as a dict of ctypes. If the structure ``sid`` is specified, then use that specific structure type. """ ea = interface.address.within(ea) if any(n in sid for n in ('sid','struc','structure','id')): res = sid['sid'] if 'sid' in sid else sid['struc'] if 'struc' in sid else sid['structure'] if 'structure' in sid else sid['id'] if 'id' in sid else None sid = res.id if isinstance(res, structure.structure_t) else res else: sid = type.structure.id(ea) st = structure.instance(sid, offset=ea) typelookup = { (int,-1) : ctypes.c_int8, (int,1) : ctypes.c_uint8, (int,-2) : ctypes.c_int16, (int,2) : ctypes.c_uint16, (int,-4) : ctypes.c_int32, (int,4) : ctypes.c_uint32, (int,-8) : ctypes.c_int64, (int,8) : ctypes.c_uint64, (float,4) : ctypes.c_float, (float,8) : ctypes.c_double, } res = {} for m in st.members: t, val = m.type, read(m.offset, m.size) or '' try: ct = typelookup[t] except KeyError: ty, sz = t if isinstance(t, __builtin__.tuple) else (m.type, 0) if isinstance(t, __builtin__.list): t = typelookup[tuple(ty)] ct = t*sz elif ty in (chr,str): ct = ctypes.c_char*sz else: ct = None finally: res[m.name] = val if any(_ is None for _ in (ct,val)) else ctypes.cast(ctypes.pointer(ctypes.c_buffer(val)),ctypes.POINTER(ct)).contents return res
def image_u8_get_array(img_ptr): return ptr_to_array2d(c_uint8, img_ptr.contents.buf.contents, img_ptr.contents.height, img_ptr.contents.stride)
def Write(self, packet): """See base class.""" report_id = 0 out_report_buffer = (ctypes.c_uint8 * self.internal_max_out_report_len)() out_report_buffer[:] = packet[:] result = iokit.IOHIDDeviceSetReport(self.device_handle, K_IO_HID_REPORT_TYPE_OUTPUT, report_id, out_report_buffer, self.internal_max_out_report_len) # Non-zero status indicates failure if result != K_IO_RETURN_SUCCESS: raise errors.OsHidError('Failed to write report to device')
def __init__(self, packet): self.timestamp = timestamp_from_avbin(packet.timestamp) self.data = (ctypes.c_uint8 * packet.size)() self.size = packet.size ctypes.memmove(self.data, packet.data, self.size) # Decoded image. 0 == not decoded yet; None == Error or discarded self.image = 0 self.id = self._next_id self.__class__._next_id += 1
def _encode(cls, payload): """ Protects a stream of data with escape characters The payload should be a bytestring """ out = b'' for byte in payload: if byte >= cls.ESCAPE: out += bytearray([cls.ESCAPE]) byte = ctypes.c_uint8(byte - cls.ESCAPE).value out += bytearray([byte]) return out
def _authenticate(self, block, uid, key = "\xff\xff\xff\xff\xff\xff", use_b_key = False): """Authenticates to a particular block using a specified key""" if nfc.nfc_device_set_property_bool(self.__device, nfc.NP_EASY_FRAMING, True) < 0: raise Exception("Error setting Easy Framing property") abttx = (ctypes.c_uint8 * 12)() abttx[0] = self.MC_AUTH_A if not use_b_key else self.MC_AUTH_B abttx[1] = block for i in range(6): abttx[i + 2] = ord(key[i]) for i in range(4): abttx[i + 8] = ord(uid[i]) abtrx = (ctypes.c_uint8 * 250)() return nfc.nfc_initiator_transceive_bytes(self.__device, ctypes.pointer(abttx), len(abttx), ctypes.pointer(abtrx), len(abtrx), 0)
def _worker_init(mjb_bytes, worker_id, device_ids, shared_rgbs, shared_depths, modder): """ Initializes the global state for the workers. """ s = RenderPoolStorage() with worker_id.get_lock(): proc_worker_id = worker_id.value worker_id.value += 1 s.device_id = device_ids[proc_worker_id % len(device_ids)] s.shared_rgbs_array = np.frombuffer( shared_rgbs.get_obj(), dtype=ctypes.c_uint8) s.shared_depths_array = np.frombuffer( shared_depths.get_obj(), dtype=ctypes.c_float) # avoid a circular import from mujoco_py import load_model_from_mjb, MjRenderContext, MjSim s.sim = MjSim(load_model_from_mjb(mjb_bytes)) # attach a render context to the sim (needs to happen before # modder is called, since it might need to upload textures # to the GPU). MjRenderContext(s.sim, device_id=s.device_id) if modder is not None: s.modder = modder(s.sim, random_state=proc_worker_id) s.modder.whiten_materials() else: s.modder = None global _render_pool_storage _render_pool_storage = s
def nonlinear_function(byte, constant): byte ^= constant state = (byte + 1) % 256 byte ^= eight_bit_integer(state << 5).value byte ^= eight_bit_integer(state >> 3).value return byte
def nonlinear_function2(state_and_constant): state, constant = state_and_constant state ^= constant state += 1 state ^= word(state >> 8).value state ^= word(state << 8).value state = word(~state).value # state ^= word(state << 5).value # state ^= word(state >> 3).value return (eight_bit_integer(state).value, state)
def read_8(self, addr, buf, len, status): addr = ctypes.c_uint32(addr) data = ctypes.c_uint8() status = ctypes.c_byte() self.jlink.JLINKARM_ReadMemU8(addr, 1, ctypes.byref(data), ctypes.byref(status)) return data.value
def write_8(self, addr, data): addr = ctypes.c_uint32(addr) data = ctypes.c_uint8(data) self.jlink.JLINKARM_WriteU8(addr, data)
def callback_file_recv_chunk(self, callback, user_data): """ Set the callback for the `file_recv_chunk` event. Pass NULL to unset. This event is first triggered when a file transfer request is received, and subsequently when a chunk of file data for an accepted request was received. :param callback: Python function. When length is 0, the transfer is finished and the client should release the resources it acquired for the transfer. After a call with length = 0, the file number can be reused for new file transfers. If position is equal to file_size (received in the file_receive callback) when the transfer finishes, the file was received completely. Otherwise, if file_size was UINT64_MAX, streaming ended successfully when length is 0. Should take pointer (c_void_p) to Tox object, The friend number (c_uint32) of the friend who is sending the file. The friend-specific file number (c_uint32) the data received is associated with. The file position (c_uint64) of the first byte in data. A byte array (c_char_p) containing the received chunk. The length (c_size_t) of the received chunk. pointer (c_void_p) to user_data :param user_data: pointer (c_void_p) to user data """ c_callback = CFUNCTYPE(None, c_void_p, c_uint32, c_uint32, c_uint64, POINTER(c_uint8), c_size_t, c_void_p) self.file_recv_chunk_cb = c_callback(callback) self.libtoxcore.tox_callback_file_recv_chunk(self._tox_pointer, self.file_recv_chunk_cb, user_data) # ----------------------------------------------------------------------------------------------------------------- # Low-level custom packet sending and receiving # -----------------------------------------------------------------------------------------------------------------
def callback_friend_lossy_packet(self, callback, user_data): """ Set the callback for the `friend_lossy_packet` event. Pass NULL to unset. :param callback: Python function. Should take pointer (c_void_p) to Tox object, friend_number (c_uint32) - The friend number of the friend who sent a lossy packet, A byte array (c_uint8 array) containing the received packet data, length (c_size_t) - The length of the packet data byte array, pointer (c_void_p) to user_data :param user_data: pointer (c_void_p) to user data """ c_callback = CFUNCTYPE(None, c_void_p, c_uint32, POINTER(c_uint8), c_size_t, c_void_p) self.friend_lossy_packet_cb = c_callback(callback) self.libtoxcore.tox_callback_friend_lossy_packet(self._tox_pointer, self.friend_lossy_packet_cb, user_data)
def audio_send_frame(self, friend_number, pcm, sample_count, channels, sampling_rate): """ Send an audio frame to a friend. The expected format of the PCM data is: [s1c1][s1c2][...][s2c1][s2c2][...]... Meaning: sample 1 for channel 1, sample 1 for channel 2, ... For mono audio, this has no meaning, every sample is subsequent. For stereo, this means the expected format is LRLRLR... with samples for left and right alternating. :param friend_number: The friend number of the friend to which to send an audio frame. :param pcm: An array of audio samples. The size of this array must be sample_count * channels. :param sample_count: Number of samples in this frame. Valid numbers here are ((sample rate) * (audio length) / 1000), where audio length can be 2.5, 5, 10, 20, 40 or 60 milliseconds. :param channels: Number of audio channels. Sulpported values are 1 and 2. :param sampling_rate: Audio sampling rate used in this frame. Valid sampling rates are 8000, 12000, 16000, 24000, or 48000. """ toxav_err_send_frame = c_int() result = ToxAV.libtoxav.toxav_audio_send_frame(self._toxav_pointer, c_uint32(friend_number), cast(pcm, c_void_p), c_size_t(sample_count), c_uint8(channels), c_uint32(sampling_rate), byref(toxav_err_send_frame)) toxav_err_send_frame = toxav_err_send_frame.value if toxav_err_send_frame == TOXAV_ERR_SEND_FRAME['OK']: return bool(result) elif toxav_err_send_frame == TOXAV_ERR_SEND_FRAME['NULL']: raise ArgumentError('The samples data pointer was NULL.') elif toxav_err_send_frame == TOXAV_ERR_SEND_FRAME['FRIEND_NOT_FOUND']: raise ArgumentError('The friend_number passed did not designate a valid friend.') elif toxav_err_send_frame == TOXAV_ERR_SEND_FRAME['FRIEND_NOT_IN_CALL']: raise RuntimeError('This client is currently not in a call with the friend.') elif toxav_err_send_frame == TOXAV_ERR_SEND_FRAME['SYNC']: raise RuntimeError('Synchronization error occurred.') elif toxav_err_send_frame == TOXAV_ERR_SEND_FRAME['INVALID']: raise ArgumentError('One of the frame parameters was invalid. E.g. the resolution may be too small or too ' 'large, or the audio sampling rate may be unsupported.') elif toxav_err_send_frame == TOXAV_ERR_SEND_FRAME['PAYLOAD_TYPE_DISABLED']: raise RuntimeError('Either friend turned off audio or video receiving or we turned off sending for the said' 'payload.') elif toxav_err_send_frame == TOXAV_ERR_SEND_FRAME['RTP_FAILED']: RuntimeError('Failed to push frame through rtp interface.')
def callback_video_receive_frame(self, callback, user_data): """ Set the callback for the `video_receive_frame` event. Pass None to unset. :param callback: Python function. The function type for the video_receive_frame callback. Should take toxAV pointer (c_void_p) to ToxAV object, friend_number The friend number (c_uint32) of the friend who sent a video frame. width Width (c_uint16) of the frame in pixels. height Height (c_uint16) of the frame in pixels. y u v Plane data (POINTER(c_uint8)). The size of plane data is derived from width and height where Y = MAX(width, abs(ystride)) * height, U = MAX(width/2, abs(ustride)) * (height/2) and V = MAX(width/2, abs(vstride)) * (height/2). ystride ustride vstride Strides data (c_int32). Strides represent padding for each plane that may or may not be present. You must handle strides in your image processing code. Strides are negative if the image is bottom-up hence why you MUST abs() it when calculating plane buffer size. user_data pointer (c_void_p) to user_data :param user_data: pointer (c_void_p) to user data """ c_callback = CFUNCTYPE(None, c_void_p, c_uint32, c_uint16, c_uint16, POINTER(c_uint8), POINTER(c_uint8), POINTER(c_uint8), c_int32, c_int32, c_int32, c_void_p) self.video_receive_frame_cb = c_callback(callback) ToxAV.libtoxav.toxav_callback_video_receive_frame(self._toxav_pointer, self.video_receive_frame_cb, user_data)
def setup(self): # counter "ValueError: number of bits invalid for bit field" monkeypatch_pyclibrary_ctypes_struct() # register header- and library paths # https://pyclibrary.readthedocs.org/en/latest/get_started/configuration.html#specifying-headers-and-libraries-locations # TODO: this probably acts on a global basis; think about it if self.include_path: add_header_locations([self.include_path]) if self.library_path: add_library_locations([self.library_path]) # define extra types suitable for embedded use types = { 'uint8_t': c_uint8, 'uint16_t': c_uint16, 'uint32_t': c_uint32, 'int8_t': c_int8, 'int16_t': c_int16, 'int32_t': c_int32, } # TODO: this probably acts on a global basis; think about it if not (CParser._init or CLibrary._init): auto_init(extra_types=types)
def __init__(self, name='infrared', width=640, height=480, fps=30): self.native = True self.stream = rs_stream.RS_STREAM_INFRARED self.format = rs_format.RS_FORMAT_Y8 self.shape = (height, width) self.dtype = ctypes.c_uint8 super(InfraredStream, self).__init__(name, self.native, self.stream, width, height, self.format, fps)
def transfer(tx_data): """Transfers data using the current SPI bus. Returns a list of bytes. tx_data: A list of bytes to send. Note: An exception is thrown if an error occurs during the transfer. """ length = len(tx_data) tx_buffer = (ctypes.c_uint8 * length)(*tx_data) rx_buffer = (ctypes.c_uint8 * length)() ret = _LIB.spi_transfer(tx_buffer, rx_buffer, length) if ret < 0: raise Exception("spi transfer failed") return [rx_buffer[i] for i in range(length)]
def get_mode(index): """Returns the mode of a LED. The LED must be initialised before calling this function. index: must be an integer in range 0..7 Note: An exception is thrown if it fails to retrieve the mode of a LED. """ mode = ctypes.c_uint8(0) ret = _LIB.led_get_mode(index, ctypes.byref(mode)) if ret < 0: raise Exception("led get mode failed") return mode.value
def get_pin(mikrobus_index, pin_type): """Returns GPIO index of a pin on provided mikrobus Note: An exception is thrown if the gpio cannot be found. """ pin = ctypes.c_uint8(0) ret = _LIB.gpio_get_pin(mikrobus_index, pin_type, ctypes.byref(pin)) if ret < 0: raise Exception("gpio get pin failed") return pin.value
def get_type(gpio_pin): """Returns the type of the GPIO Some GPIO's on the Mikrobus has some type (AN, PWM, INT, RST or CS). Other GPIO's don't have a type. Note: An exception is thrown if the type of the gpio cannot be found. """ pin_type = ctypes.c_uint8(0) ret = _LIB.gpio_get_type(gpio_pin, ctypes.byref(pin_type)) if ret < 0: raise Exception("gpio get type failed") return pin_type.value