我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ctypes.c_uint32()。
def libvlc_media_player_set_xwindow(p_mi, drawable): '''Set an X Window System drawable where the media player should render its video output. The call takes effect when the playback starts. If it is already started, it might need to be stopped before changes apply. If LibVLC was built without X11 output support, then this function has no effects. By default, LibVLC will capture input events on the video rendering area. Use L{libvlc_video_set_mouse_input}() and L{libvlc_video_set_key_input}() to disable that and deliver events to the parent window / to the application instead. By design, the X11 protocol delivers input events to only one recipient. @warning The application must call the XInitThreads() function from Xlib before L{libvlc_new}(), and before any call to XOpenDisplay() directly or via any other library. Failure to call XInitThreads() will seriously impede LibVLC performance. Calling XOpenDisplay() before XInitThreads() will eventually crash the process. That is a limitation of Xlib. @param p_mi: media player. @param drawable: X11 window ID @note The specified identifier must correspond to an existing Input/Output class X11 window. Pixmaps are B{not} currently supported. The default X11 server is assumed, i.e. that specified in the DISPLAY environment variable. @warning LibVLC can deal with invalid X11 handle errors, however some display drivers (EGL, GLX, VA and/or VDPAU) can unfortunately not. Thus the window handle must remain valid until playback is stopped, otherwise the process may abort or crash. @bug No more than one window handle per media player instance can be specified. If the media has multiple simultaneously active video tracks, extra tracks will be rendered into external windows beyond the control of the application. ''' f = _Cfunctions.get('libvlc_media_player_set_xwindow', None) or \ _Cfunction('libvlc_media_player_set_xwindow', ((1,), (1,),), None, None, MediaPlayer, ctypes.c_uint32) return f(p_mi, drawable)
def symlink_ms(source, linkname): """Python 2 doesn't have os.symlink on windows so we do it ourselfs :param source: sourceFile :type source: str :param linkname: symlink path :type linkname: str :raises: WindowsError, raises when it fails to create the symlink if the user permissions are incorrect """ import ctypes csl = ctypes.windll.kernel32.CreateSymbolicLinkW csl.argtypes = (ctypes.c_wchar_p, ctypes.c_wchar_p, ctypes.c_uint32) csl.restype = ctypes.c_ubyte flags = 1 if os.path.isdir(source) else 0 try: if csl(linkname, source.replace('/', '\\'), flags) == 0: raise ctypes.WinError() except WindowsError: raise WindowsError("Failed to create symbolicLink due to user permissions")
def get(self): arg = DrmModeObjGetPropertiesC() arg.obj_id = self.obj_id arg.obj_type = self.obj_type fcntl.ioctl(self._drm.fd, DRM_IOCTL_MODE_OBJ_GETPROPERTIES, arg) prop_ids = (ctypes.c_uint32*arg.count_props)() arg.props_ptr = ctypes.cast(ctypes.pointer(prop_ids), ctypes.c_void_p).value prop_values = (ctypes.c_uint64*arg.count_props)() arg.prop_values_ptr = ctypes.cast(ctypes.pointer(prop_values), ctypes.c_void_p).value fcntl.ioctl(self._drm.fd, DRM_IOCTL_MODE_OBJ_GETPROPERTIES, arg) self._arg = arg vals = [v for i, v in zip(prop_ids, prop_values) if i == self.id] return vals[0]
def set(self, fb, x, y, mode, *conns): arg = DrmModeCrtcC() arg.crtc_id = self.id arg.fb_id = fb.id arg.x = x arg.y = y arg.mode_valid = 1 arg.mode = mode._arg connector_ids = (ctypes.c_uint32 * len(conns))(*[conn.id for conn in conns]) arg.set_connectors_ptr = ctypes.cast(ctypes.pointer(connector_ids), ctypes.c_void_p).value arg.count_connectors = len(conns) fcntl.ioctl(self._drm.fd, DRM_IOCTL_MODE_SETCRTC, arg) self.fetch()
def float_to_rgb(float_rgb): """ Converts a packed float RGB format to an RGB list Args: float_rgb: RGB value packed as a float Returns: color (list): 3-element list of integers [0-255,0-255,0-255] """ s = struct.pack('>f', float_rgb) i = struct.unpack('>l', s)[0] pack = ctypes.c_uint32(i).value r = (pack & 0x00FF0000) >> 16 g = (pack & 0x0000FF00) >> 8 b = (pack & 0x000000FF) color = [r,g,b] return color
def log_entries(self): """This method yields LogEntry objects.""" current_sequence_number = self.baseblock.get_primary_sequence_number() curr_pos = BASE_BLOCK_LENGTH_LOG while curr_pos < self.file_size: try: curr_logentry = LogEntry(self.file_object, curr_pos, current_sequence_number) except (LogEntryException, ReadException): break # We could read garbage at the end of the file, this is normal. yield curr_logentry curr_pos += curr_logentry.get_size() current_sequence_number = c_uint32(current_sequence_number + 1).value # Handle a possible overflow.
def netmask(self, interface_name, netmask): """ Checking interface first, if interface name found in Get().interfaces() validating Ipv4. After that applied ip address to interace interface_name = Applied Interface netmask = New netmask ip address """ interface_check = Get().interfaces valid_ipv4 = validators.ipv4(netmask) if not interface_name in interface_check: raise WrongInterfaceName("Wrong Interface Name %s" % interface_name) elif not valid_ipv4 is True: raise NotValidIPv4Address("Not Valid IPv4 Address %s" % netmask) else: prefix_len = self.get_net_size(netmask.split('.')) ifname = interface_name.encode(encoding='UTF-8') netmask = ctypes.c_uint32(~((2 ** (32 - prefix_len)) - 1)).value nmbytes = socket.htonl(netmask) ifreq = struct.pack('16sH2sI8s', ifname, AF_INET, b'\x00'*2, nmbytes, b'\x00'*8) fcntl.ioctl(self.sock, SIOCSIFNETMASK, ifreq)
def _return_ctype(self): """ Returns the associated ctype of a given datatype. """ _datatype_ctype = { DataType.Bool: ctypes.c_uint8, DataType.I8: ctypes.c_int8, DataType.U8: ctypes.c_uint8, DataType.I16: ctypes.c_int16, DataType.U16: ctypes.c_uint16, DataType.I32: ctypes.c_int32, DataType.U32: ctypes.c_uint32, DataType.I64: ctypes.c_int64, DataType.U64: ctypes.c_uint64, DataType.Sgl: ctypes.c_float, DataType.Dbl: ctypes.c_double, } return _datatype_ctype[self]
def setUp(self, mock_cdll, mock_find_library): """ Setup up self._library so that self._library.AwesomeFunction(int, str) can be called, and the return value can be changed by setting self._mock_awesome_function.return_value. """ mock_loaded_library = mock.Mock() mock_cdll.LoadLibrary.return_value = mock_loaded_library self._mock_awesome_function = mock.Mock() self._mock_awesome_function.__name__ = "Entrypoint_AwesomeFunction" mock_loaded_library.Entrypoint_AwesomeFunction = self._mock_awesome_function self._library = StatusCheckedLibrary( library_name="CoolLibrary", library_function_infos=[ LibraryFunctionInfo( pretty_name="AwesomeFunction", name_in_library="Entrypoint_AwesomeFunction", named_argtypes=[NamedArgtype("some_integer", ctypes.c_uint32), NamedArgtype("some_string", ctypes.c_char_p)]) ])
def test_good_error_message_from_memory_full_error(self): """ Tests a good error message from a library call that fails. 1. Correctly converts -52000 to NiFpgaMemoryFullError 2. An integer arg gets printed as hex (easier to debug than decimal) 3. A string arg gets printed with quotes surrounding it (so it's obviously a string) """ self._mock_awesome_function.return_value = -52000 try: self._library.AwesomeFunction(ctypes.c_uint32(33), ctypes.c_char_p(b"2")) self.fail("AwesomeFunction should have raised MemoryFull") except nifpga.MemoryFullError as e: if python_version == 2: self.assertEqual( "Error: MemoryFull (-52000) when calling 'Entrypoint_AwesomeFunction' with arguments:" "\n\tsome_integer: 0x21L" "\n\tsome_string: '2'", str(e)) else: self.assertEqual( "Error: MemoryFull (-52000) when calling 'Entrypoint_AwesomeFunction' with arguments:" "\n\tsome_integer: 0x21" "\n\tsome_string: b'2'", str(e))
def __init__(self, pretty_name, name_in_library, named_argtypes): """ A struct describing a library entry point function to be used in StatusCheckedLibrary. pretty_name: e.g. "Run" A "pretty" name by which a StatusCheckedLibrary object will call the function. name_in_library: e.g. "NiFpgaDll_Run" The name of the actual DLL entry point used to call the function. named_argtypes: e.g. [NamedArgtype("session", _SessionType), NamedArgtype("fifo", ctypes.c_uint32)] A list of NamedArgtype structs used to call the function. """ self.pretty_name = pretty_name self.name_in_library = name_in_library self.named_argtypes = named_argtypes
def __init__(self): v_posix.PosixMixin.__init__(self) v_posix.PtraceMixin.__init__(self) self.libc = ctypes.CDLL(c_util.find_library('c')) self.myport = self.libc.mach_task_self() self.libc.mach_port_allocate.argtypes = [ipc_space_t, mach_port_right_t, ctypes.POINTER(mach_port_name_t)] self.libc.mach_port_allocate.restype = kern_return_t self.libc.mach_vm_read.argtypes = [ mach_port_t, size_t, size_t, ctypes.POINTER(ctypes.c_void_p), ctypes.POINTER(ctypes.c_uint32)] self.libc.mach_vm_read.restype = kern_return_t self.libc.ptrace.restype = ctypes.c_int self.libc.ptrace.argtypes = [ctypes.c_int, ctypes.c_uint32, ctypes.c_size_t, ctypes.c_int] machhelp_path = os.path.join(darwindir, 'machhelper.dylib') self.machhelper = ctypes.CDLL(machhelp_path) self.machhelper.platformPs.restype = ctypes.POINTER(ProcessListEntry) self.useptrace = False self.portset = self.newMachPort(MACH_PORT_RIGHT_PORT_SET) self.excport = self.newMachRWPort() self.addPortToSet(self.excport)
def platformGetRegCtx(self, tid): ctx = self.archGetRegCtx() # NOTE: the tid *is* the port... state = STRUCT_X86_THREAD_STATE32() scount = ctypes.c_uint32(ctypes.sizeof(state) / 4) ret = self.libc.thread_get_state(tid, x86_THREAD_STATE32, addrof(state), addrof(scount)); if ret != 0: raise Exception('thread_get_state (THREAD_STATE32) failed: 0x%.8x' % ret) ctx._rctx_Import(state) state = STRUCT_X86_DEBUG_STATE32() scount = ctypes.c_uint32(ctypes.sizeof(state) / 4) ret = self.libc.thread_get_state(tid, x86_DEBUG_STATE32, addrof(state), addrof(scount)); if ret != 0: raise Exception('thread_get_state (DEBUG_STATE32) failed: 0x%.8x' % ret) ctx._rctx_Import(state) return ctx
def platformSetRegCtx(self, tid, ctx): state = STRUCT_X86_THREAD_STATE32() # Sync up a struct first... scount = ctypes.c_uint32(ctypes.sizeof(state) / 4) ret = self.libc.thread_get_state(tid, x86_THREAD_STATE32, addrof(state), addrof(scount)); if ret != 0: raise Exception('thread_get_state (THREAD_STATE32) failed: 0x%.8x' % ret) # Export our shit into it... ctx._rctx_Export(state) scount = ctypes.sizeof(state) / 4 r = self.libc.thread_set_state(tid, x86_THREAD_STATE32, addrof(state), scount) if r != 0: raise Exception('thread_set_state (THREAD_STATE32) failed: 0x%.8x' % r) state = STRUCT_X86_DEBUG_STATE32() ctx._rctx_Export(state) scount = ctypes.sizeof(state) / 4 r = self.libc.thread_set_state(tid, x86_DEBUG_STATE32, addrof(state), scount) if r != 0: raise Exception('thread_set_state (DEBUG_STATE32) failed: 0x%.8x' % r)
def platformGetRegCtx(self, tid): ctx = self.archGetRegCtx() # NOTE: the tid *is* the port... state = STRUCT_X86_THREAD_STATE64() scount = ctypes.c_uint32(ctypes.sizeof(state) / 4) ret = self.libc.thread_get_state(tid, x86_THREAD_STATE64, addrof(state), addrof(scount)); if ret != 0: self.libc.mach_error("thread_get_state x86_THREAD_STATE64 failed:", ret) raise Exception('thread_get_state (THREAD_STATE64) failed: 0x%.8x' % ret) ctx._rctx_Import(state) state = STRUCT_X86_DEBUG_STATE64() scount = ctypes.c_uint32(ctypes.sizeof(state) / 4) ret = self.libc.thread_get_state(tid, x86_DEBUG_STATE64, addrof(state), addrof(scount)); if ret != 0: self.libc.mach_error("thread_get_state x86_DEBUG_STATE64 failed:", ret) raise Exception('thread_get_state (DEBUG_STATE64) failed: 0x%.8x' % ret) ctx._rctx_Import(state) return ctx
def getch_impl(): # TODO: This windows impl keeps pipes/redirects from working. Need ReadFile for that, # with more complicated handling (personally, I'm just going to keep using unix/cygwin # for pipe-y debug stuff...) # TODO: Windows escape seqs via ReadConsoleInput, convert to VT100 seqs for more commonality. if is_windows: stdin_handle = ctypes.windll.kernel32.GetStdHandle(ctypes.c_ulong(-10)) one_char_buf = ctypes.c_uint32() chars_read = ctypes.c_uint32() # NOTE: W version of this function == ERROR_NOACCESS after text color set in photopia!? result = ctypes.windll.kernel32.ReadConsoleA(stdin_handle, ctypes.byref(one_char_buf), 1, ctypes.byref(chars_read), 0) if result == 0 or chars_read.value != 1: last_err = ctypes.windll.kernel32.GetLastError() print('LAST ERR', last_err) err('failed to read console') return chr(one_char_buf.value) else: #Unix return sys.stdin.read(1)
def find_generic_password(kc_name, service, username): username = username.encode('utf-8') service = service.encode('utf-8') with open(kc_name) as keychain: length = c_uint32() data = c_void_p() status = SecKeychainFindGenericPassword( keychain, len(service), service, len(username), username, length, data, None) msg = "Can't fetch password from Keychain" NotFound.raise_for_status(status, msg) password = ctypes.create_string_buffer(length.value) ctypes.memmove(password, data.value, length.value) SecKeychainItemFreeContent(None, data) return password.raw.decode('utf-8')
def GetLogMessage(self, level, msgid): tick = ctypes.c_uint32() tv_sec = ctypes.c_uint32() tv_nsec = ctypes.c_uint32() if self._GetLogMessage is not None: maxsz = len(self._log_read_buffer)-1 sz = self._GetLogMessage(level, msgid, self._log_read_buffer, maxsz, ctypes.byref(tick), ctypes.byref(tv_sec), ctypes.byref(tv_nsec)) if sz and sz <= maxsz: self._log_read_buffer[sz] = '\x00' return self._log_read_buffer.value, tick.value, tv_sec.value, tv_nsec.value elif self._loading_error is not None and level == 0: return self._loading_error, 0, 0, 0 return None
def TraceThreadProc(self): """ Return a list of traces, corresponding to the list of required idx """ while self.PLCStatus == "Started": tick = ctypes.c_uint32() size = ctypes.c_uint32() buff = ctypes.c_void_p() TraceBuffer = None if self.PLClibraryLock.acquire(False): if self._GetDebugData(ctypes.byref(tick), ctypes.byref(size), ctypes.byref(buff)) == 0: if size.value: TraceBuffer = ctypes.string_at(buff.value, size.value) self._FreeDebugData() self.PLClibraryLock.release() if TraceBuffer is not None: self._TracesPush((tick.value, TraceBuffer)) self._TracesAutoSuspend() self._TracesFlush()
def set_register(self, register_name, value): """ Writes a CPU register. @param int, str, or CPURegister(IntEnum) register_name: CPU register to write. @param int value: Value to write. """ if not self._is_u32(value): raise ValueError('The value parameter must be an unsigned 32-bit value.') if not self._is_enum(register_name, CpuRegister): raise ValueError('Parameter register_name must be of type int, str or CpuRegister enumeration.') register_name = self._decode_enum(register_name, CpuRegister) if register_name is None: raise ValueError('Parameter register_name must be of type int, str or CpuRegister enumeration.') register_name = ctypes.c_int(register_name.value) value = ctypes.c_uint32(value) self.jlink.JLINKARM_WriteReg(register_name, value)
def get_register(self, register_name): """ Reads a CPU register. @param int, str, or CPURegister(IntEnum) register_name: CPU register to read. @return int: Value read. """ if not self._is_enum(register_name, CpuRegister): raise ValueError('Parameter register_name must be of type int, str or CpuRegister enumeration.') register_name = self._decode_enum(register_name, CpuRegister) if register_name is None: raise ValueError('Parameter register_name must be of type int, str or CpuRegister enumeration.') register_name = ctypes.c_int(register_name.value) #value = ctypes.c_uint32() return self.jlink.JLINKARM_ReadReg(register_name)
def write(self, addr, data): """ Writes data from the array into the device starting at the given address. @param int addr: Start address of the memory block to write. @param sequence data: Data to write. Any type that implements the sequence API (i.e. string, list, bytearray...) is valid as input. """ if not self._is_u32(addr): raise ValueError('The addr parameter must be an unsigned 32-bit value.') if not self._is_valid_buf(data): raise ValueError('The data parameter must be a sequence type with at least one item.') addr = ctypes.c_uint32(addr) data_len = ctypes.c_uint32(len(data)) data = (ctypes.c_uint8 * data_len.value)(*data) self.jlink.JLINKARM_WriteMem(addr, data_len, ctypes.byref(data))
def read(self, addr, data_len): """ Reads data_len bytes from the device starting at the given address. @param int addr: Start address of the memory block to read. @param int data_len: Number of bytes to read. @return [int]: List of values read. """ if not self._is_u32(addr): raise ValueError('The addr parameter must be an unsigned 32-bit value.') if not self._is_u32(data_len): raise ValueError('The data_len parameter must be an unsigned 32-bit value.') addr = ctypes.c_uint32(addr) data_len = ctypes.c_uint32(data_len) data = (ctypes.c_uint8 * data_len.value)() self.jlink.JLINKARM_ReadMem(addr, data_len, ctypes.byref(data)) return bytes(data)
def friend_delete(self, friend_number): """ Remove a friend from the friend list. This does not notify the friend of their deletion. After calling this function, this client will appear offline to the friend and no communication can occur between the two. :param friend_number: Friend number for the friend to be deleted. :return: True on success. """ tox_err_friend_delete = c_int() result = Tox.libtoxcore.tox_friend_delete(self._tox_pointer, c_uint32(friend_number), byref(tox_err_friend_delete)) tox_err_friend_delete = tox_err_friend_delete.value if tox_err_friend_delete == TOX_ERR_FRIEND_DELETE['OK']: return bool(result) elif tox_err_friend_delete == TOX_ERR_FRIEND_DELETE['FRIEND_NOT_FOUND']: raise ArgumentError('There was no friend with the given friend number. No friends were deleted.') # ----------------------------------------------------------------------------------------------------------------- # Friend list queries # -----------------------------------------------------------------------------------------------------------------
def self_get_friend_list(self, friend_list=None): """ Copy a list of valid friend numbers into an array. Call tox_self_get_friend_list_size to determine the number of elements to allocate. :param friend_list: pointer (c_char_p) to a memory region with enough space to hold the friend list. If this parameter is None, this function allocates memory for the friend list. :return: friend list """ friend_list_size = self.self_get_friend_list_size() if friend_list is None: friend_list = create_string_buffer(sizeof(c_uint32) * friend_list_size) friend_list = POINTER(c_uint32)(friend_list) Tox.libtoxcore.tox_self_get_friend_list(self._tox_pointer, friend_list) return friend_list[0:friend_list_size]
def friend_get_public_key(self, friend_number, public_key=None): """ Copies the Public Key associated with a given friend number to a byte array. :param friend_number: The friend number you want the Public Key of. :param public_key: pointer (c_char_p) to a memory region of at least TOX_PUBLIC_KEY_SIZE bytes. If this parameter is None, this function allocates memory for Tox Public Key. :return: Tox Public Key """ if public_key is None: public_key = create_string_buffer(TOX_PUBLIC_KEY_SIZE) tox_err_friend_get_public_key = c_int() Tox.libtoxcore.tox_friend_get_public_key(self._tox_pointer, c_uint32(friend_number), public_key, byref(tox_err_friend_get_public_key)) tox_err_friend_get_public_key = tox_err_friend_get_public_key.value if tox_err_friend_get_public_key == TOX_ERR_FRIEND_GET_PUBLIC_KEY['OK']: return bin_to_string(public_key, TOX_PUBLIC_KEY_SIZE) elif tox_err_friend_get_public_key == TOX_ERR_FRIEND_GET_PUBLIC_KEY['FRIEND_NOT_FOUND']: raise ArgumentError('No friend with the given number exists on the friend list.')
def friend_get_name_size(self, friend_number): """ Return the length of the friend's name. If the friend number is invalid, the return value is unspecified. The return value is equal to the `length` argument received by the last `friend_name` callback. """ tox_err_friend_query = c_int() result = Tox.libtoxcore.tox_friend_get_name_size(self._tox_pointer, c_uint32(friend_number), byref(tox_err_friend_query)) tox_err_friend_query = tox_err_friend_query.value if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['OK']: return result elif tox_err_friend_query == TOX_ERR_FRIEND_QUERY['NULL']: raise ArgumentError('The pointer parameter for storing the query result (name, message) was NULL. Unlike' ' the `_self_` variants of these functions, which have no effect when a parameter is' ' NULL, these functions return an error in that case.') elif tox_err_friend_query == TOX_ERR_FRIEND_QUERY['FRIEND_NOT_FOUND']: raise ArgumentError('The friend_number did not designate a valid friend.')
def callback_friend_name(self, callback, user_data): """ Set the callback for the `friend_name` event. Pass None to unset. This event is triggered when a friend changes their name. :param callback: Python function. Should take pointer (c_void_p) to Tox object, The friend number (c_uint32) of the friend whose name changed, A byte array (c_char_p) containing the same data as tox_friend_get_name would write to its `name` parameter, A value (c_size_t) equal to the return value of tox_friend_get_name_size, pointer (c_void_p) to user_data :param user_data: pointer (c_void_p) to user data """ c_callback = CFUNCTYPE(None, c_void_p, c_uint32, c_char_p, c_size_t, c_void_p) self.friend_name_cb = c_callback(callback) Tox.libtoxcore.tox_callback_friend_name(self._tox_pointer, self.friend_name_cb, user_data)
def friend_get_status_message_size(self, friend_number): """ Return the length of the friend's status message. If the friend number is invalid, the return value is SIZE_MAX. :return: length of the friend's status message """ tox_err_friend_query = c_int() result = Tox.libtoxcore.tox_friend_get_status_message_size(self._tox_pointer, c_uint32(friend_number), byref(tox_err_friend_query)) tox_err_friend_query = tox_err_friend_query.value if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['OK']: return result elif tox_err_friend_query == TOX_ERR_FRIEND_QUERY['NULL']: raise ArgumentError('The pointer parameter for storing the query result (name, message) was NULL. Unlike' ' the `_self_` variants of these functions, which have no effect when a parameter is' ' NULL, these functions return an error in that case.') elif tox_err_friend_query == TOX_ERR_FRIEND_QUERY['FRIEND_NOT_FOUND']: raise ArgumentError('The friend_number did not designate a valid friend.')
def friend_get_status(self, friend_number): """ Return the friend's user status (away/busy/...). If the friend number is invalid, the return value is unspecified. The status returned is equal to the last status received through the `friend_status` callback. :return: TOX_USER_STATUS """ tox_err_friend_query = c_int() result = Tox.libtoxcore.tox_friend_get_status(self._tox_pointer, c_uint32(friend_number), byref(tox_err_friend_query)) tox_err_friend_query = tox_err_friend_query.value if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['OK']: return result elif tox_err_friend_query == TOX_ERR_FRIEND_QUERY['NULL']: raise ArgumentError('The pointer parameter for storing the query result (name, message) was NULL. Unlike' ' the `_self_` variants of these functions, which have no effect when a parameter is' ' NULL, these functions return an error in that case.') elif tox_err_friend_query == TOX_ERR_FRIEND_QUERY['FRIEND_NOT_FOUND']: raise ArgumentError('The friend_number did not designate a valid friend.')
def friend_get_connection_status(self, friend_number): """ Check whether a friend is currently connected to this client. The result of this function is equal to the last value received by the `friend_connection_status` callback. :param friend_number: The friend number for which to query the connection status. :return: the friend's connection status (TOX_CONNECTION) as it was received through the `friend_connection_status` event. """ tox_err_friend_query = c_int() result = Tox.libtoxcore.tox_friend_get_connection_status(self._tox_pointer, c_uint32(friend_number), byref(tox_err_friend_query)) tox_err_friend_query = tox_err_friend_query.value if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['OK']: return result elif tox_err_friend_query == TOX_ERR_FRIEND_QUERY['NULL']: raise ArgumentError('The pointer parameter for storing the query result (name, message) was NULL. Unlike' ' the `_self_` variants of these functions, which have no effect when a parameter is' ' NULL, these functions return an error in that case.') elif tox_err_friend_query == TOX_ERR_FRIEND_QUERY['FRIEND_NOT_FOUND']: raise ArgumentError('The friend_number did not designate a valid friend.')
def callback_friend_connection_status(self, callback, user_data): """ Set the callback for the `friend_connection_status` event. Pass NULL to unset. This event is triggered when a friend goes offline after having been online, or when a friend goes online. This callback is not called when adding friends. It is assumed that when adding friends, their connection status is initially offline. :param callback: Python function. Should take pointer (c_void_p) to Tox object, The friend number (c_uint32) of the friend whose connection status changed, The result of calling tox_friend_get_connection_status (TOX_CONNECTION) on the passed friend_number, pointer (c_void_p) to user_data :param user_data: pointer (c_void_p) to user data """ c_callback = CFUNCTYPE(None, c_void_p, c_uint32, c_int, c_void_p) self.friend_connection_status_cb = c_callback(callback) Tox.libtoxcore.tox_callback_friend_connection_status(self._tox_pointer, self.friend_connection_status_cb, c_void_p(user_data))
def friend_get_typing(self, friend_number): """ Check whether a friend is currently typing a message. :param friend_number: The friend number for which to query the typing status. :return: true if the friend is typing. """ tox_err_friend_query = c_int() result = Tox.libtoxcore.tox_friend_get_typing(self._tox_pointer, c_uint32(friend_number), byref(tox_err_friend_query)) tox_err_friend_query = tox_err_friend_query.value if tox_err_friend_query == TOX_ERR_FRIEND_QUERY['OK']: return bool(result) elif tox_err_friend_query == TOX_ERR_FRIEND_QUERY['NULL']: raise ArgumentError('The pointer parameter for storing the query result (name, message) was NULL. Unlike' ' the `_self_` variants of these functions, which have no effect when a parameter is' ' NULL, these functions return an error in that case.') elif tox_err_friend_query == TOX_ERR_FRIEND_QUERY['FRIEND_NOT_FOUND']: raise ArgumentError('The friend_number did not designate a valid friend.')
def self_set_typing(self, friend_number, typing): """ Set the client's typing status for a friend. The client is responsible for turning it on or off. :param friend_number: The friend to which the client is typing a message. :param typing: The typing status. True means the client is typing. :return: True on success. """ tox_err_set_typing = c_int() result = Tox.libtoxcore.tox_self_set_typing(self._tox_pointer, c_uint32(friend_number), c_bool(typing), byref(tox_err_set_typing)) tox_err_set_typing = tox_err_set_typing.value if tox_err_set_typing == TOX_ERR_SET_TYPING['OK']: return bool(result) elif tox_err_set_typing == TOX_ERR_SET_TYPING['FRIEND_NOT_FOUND']: raise ArgumentError('The friend number did not designate a valid friend.')
def callback_friend_read_receipt(self, callback, user_data): """ Set the callback for the `friend_read_receipt` event. Pass None to unset. This event is triggered when the friend receives the message sent with tox_friend_send_message with the corresponding message ID. :param callback: Python function. Should take pointer (c_void_p) to Tox object, The friend number (c_uint32) of the friend who received the message, The message ID (c_uint32) as returned from tox_friend_send_message corresponding to the message sent, pointer (c_void_p) to user_data :param user_data: pointer (c_void_p) to user data """ c_callback = CFUNCTYPE(None, c_void_p, c_uint32, c_uint32, c_void_p) self.friend_read_receipt_cb = c_callback(callback) Tox.libtoxcore.tox_callback_friend_read_receipt(self._tox_pointer, self.friend_read_receipt_cb, c_void_p(user_data)) # ----------------------------------------------------------------------------------------------------------------- # Receiving private messages and friend requests # -----------------------------------------------------------------------------------------------------------------
def callback_friend_message(self, callback, user_data): """ Set the callback for the `friend_message` event. Pass None to unset. This event is triggered when a message from a friend is received. :param callback: Python function. Should take pointer (c_void_p) to Tox object, The friend number (c_uint32) of the friend who sent the message, Message type (TOX_MESSAGE_TYPE), The message data (c_char_p) they sent, The size (c_size_t) of the message byte array. pointer (c_void_p) to user_data :param user_data: pointer (c_void_p) to user data """ c_callback = CFUNCTYPE(None, c_void_p, c_uint32, c_int, c_char_p, c_size_t, c_void_p) self.friend_message_cb = c_callback(callback) Tox.libtoxcore.tox_callback_friend_message(self._tox_pointer, self.friend_message_cb, c_void_p(user_data)) # ----------------------------------------------------------------------------------------------------------------- # File transmission: common between sending and receiving # -----------------------------------------------------------------------------------------------------------------