我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用ctypes.c_ssize_t()。
def _to_ssize_tuple(self, addr): from ctypes import cast, POINTER, c_ssize_t if addr is None: return None return tuple(cast(addr, POINTER(c_ssize_t))[0:self._view.ndim])
def _NumOfRows(self): """Get the number of rows""" if not self.connection: self.close() NOR = c_ssize_t() ret = SQLRowCount(self.stmt_h, ADDR(NOR)) if ret != SQL_SUCCESS: check_success(self, ret) self.rowcount = NOR.value return self.rowcount
def repr_offset(): "How many bytes after id(type) can we find the tp_repr pointer?" return 11 * ctypes.sizeof(ctypes.c_ssize_t)
def str_offset(): "How many bytes after id(type) can we find the tp_str pointer?" # str is 6 places after repr in the type specification return repr_offset() + 6 * ctypes.sizeof(ctypes.c_ssize_t)
def assure_generic_c_level_function(tt, offset): """ Makes sure that the given type tt uses the generic c-level function for some of the magic methods such as repr(), str(), etc. """ ref_from_address = ctypes.c_ssize_t.from_address tp_func_object = ref_from_address(id(Object) + offset) tp_func_cursed = ref_from_address(id(tt) + offset) tp_func_cursed.value = tp_func_object.value
def makeDataObject(num): """ Creates the ctypes data object used by both processes - defines the layout of the shared memory. """ class DataBlock(ctypes.Structure): _fields_ = [("y", ctypes.c_float*(num*3)), ("y10", ctypes.c_float*(num//10*3)), ("y100", ctypes.c_float*(num//100*3)), ("endpoint", ctypes.c_ssize_t), ("startpoint", ctypes.c_ssize_t), ("pltcurrent", ctypes.c_ssize_t)] return num
def libvlc_media_discoverer_list_get(p_inst, i_cat, ppp_services): '''Get media discoverer services by category. @param p_inst: libvlc instance. @param i_cat: category of services to fetch. @param ppp_services: address to store an allocated array of media discoverer services (must be freed with L{libvlc_media_discoverer_list_release}() by the caller) [OUT]. @return: the number of media discoverer services or -1 on error. @version: LibVLC 3.0.0 and later. ''' f = _Cfunctions.get('libvlc_media_discoverer_list_get', None) or \ _Cfunction('libvlc_media_discoverer_list_get', ((1,), (1,), (1,),), None, ctypes.c_ssize_t, Instance, MediaDiscovererCategory, ctypes.POINTER(ctypes.POINTER(MediaDiscovererDescription))) return f(p_inst, i_cat, ppp_services)
def __init__(self, shape, format=None, strides=None, readonly=None, itemsize=None): if format is None: format = 'B' if readonly is None: readonly = False prefix = '' typecode = '' i = 0 if i < len(format): try: prefix = self.prefixes[format[i]] i += 1 except LookupError: pass if i < len(format) and format[i] == '1': i += 1 if i == len(format) - 1: typecode = format[i] if itemsize is None: try: itemsize = ctypes.sizeof(self.types[prefix + typecode]) except KeyError: raise ValueError("Unknown item format '" + format + "'") self.readonly = bool(readonly) self.format = format self._format = ctypes.create_string_buffer(format.encode('latin_1')) self.ndim = len(shape) self.itemsize = itemsize self.len = reduce(operator.mul, shape, 1) * self.itemsize self.shape = tuple(shape) self._shape = (ctypes.c_ssize_t * self.ndim)(*self.shape) if strides is None: self._strides = (ctypes.c_ssize_t * self.ndim)() self._strides[self.ndim - 1] = itemsize for i in range(self.ndim - 1, 0, -1): self._strides[i - 1] = self.shape[i] * self._strides[i] self.strides = tuple(self._strides) elif len(strides) == self.ndim: self.strides = tuple(strides) self._strides = (ctypes.c_ssize_t * self.ndim)(*self.strides) else: raise ValueError("Mismatch in length of strides and shape") buflen = max(d * abs(s) for d, s in zip(self.shape, self.strides)) self.buflen = buflen self._buf = (ctypes.c_ubyte * buflen)() offset = sum((d - 1) * abs(s) for d, s in zip(self.shape, self.strides) if s < 0) self.buf = ctypes.addressof(self._buf) + offset
def _CreateColBuf(self): if not self.connection: self.close() self._free_stmt(SQL_UNBIND) NOC = self._NumOfCols() self._ColBufferList = [] bind_data = True for col_num in range(NOC): col_name = self.description[col_num][0] col_size = self.description[col_num][2] col_sql_data_type = self._ColTypeCodeList[col_num] target_type = SQL_data_type_dict[col_sql_data_type][2] dynamic_length = SQL_data_type_dict[col_sql_data_type][5] # set default size base on the column's sql data type total_buf_len = SQL_data_type_dict[col_sql_data_type][4] # over-write if there's pre-set size value for "large columns" if total_buf_len > 20500: total_buf_len = self._outputsize.get(None,total_buf_len) # over-write if there's pre-set size value for the "col_num" column total_buf_len = self._outputsize.get(col_num, total_buf_len) # if the size of the buffer is very long, do not bind # because a large buffer decrease performance, and sometimes you only get a NULL value. # in that case use sqlgetdata instead. if col_size >= 1024: dynamic_length = True alloc_buffer = SQL_data_type_dict[col_sql_data_type][3](total_buf_len) used_buf_len = c_ssize_t() force_unicode = self.connection.unicode_results if force_unicode and col_sql_data_type in (SQL_CHAR,SQL_VARCHAR,SQL_LONGVARCHAR): target_type = SQL_C_WCHAR alloc_buffer = create_buffer_u(total_buf_len) buf_cvt_func = self.connection.output_converter[self._ColTypeCodeList[col_num]] if bind_data: if dynamic_length: bind_data = False self._ColBufferList.append([col_name, target_type, used_buf_len, ADDR(used_buf_len), alloc_buffer, ADDR(alloc_buffer), total_buf_len, buf_cvt_func, bind_data]) if bind_data: ret = ODBC_API.SQLBindCol(self.stmt_h, col_num + 1, target_type, ADDR(alloc_buffer), total_buf_len, ADDR(used_buf_len)) if ret != SQL_SUCCESS: check_success(self, ret)
def _UpdateDesc(self): "Get the information of (name, type_code, display_size, internal_size, col_precision, scale, null_ok)" if not self.connection: self.close() force_unicode = self.connection.unicode_results if force_unicode: Cname = create_buffer_u(1024) else: Cname = create_buffer(1024) Cname_ptr = c_short() Ctype_code = c_short() Csize = ctypes.c_size_t() Cdisp_size = c_ssize_t(0) CDecimalDigits = c_short() Cnull_ok = c_short() ColDescr = [] self._ColTypeCodeList = [] NOC = self._NumOfCols() for col in range(1, NOC+1): ret = ODBC_API.SQLColAttribute(self.stmt_h, col, SQL_DESC_DISPLAY_SIZE, ADDR(create_buffer(10)), 10, ADDR(c_short()),ADDR(Cdisp_size)) if ret != SQL_SUCCESS: check_success(self, ret) if force_unicode: ret = ODBC_API.SQLDescribeColW(self.stmt_h, col, Cname, len(Cname), ADDR(Cname_ptr),\ ADDR(Ctype_code),ADDR(Csize),ADDR(CDecimalDigits), ADDR(Cnull_ok)) if ret != SQL_SUCCESS: check_success(self, ret) else: ret = ODBC_API.SQLDescribeCol(self.stmt_h, col, Cname, len(Cname), ADDR(Cname_ptr),\ ADDR(Ctype_code),ADDR(Csize),ADDR(CDecimalDigits), ADDR(Cnull_ok)) if ret != SQL_SUCCESS: check_success(self, ret) col_name = Cname.value if lowercase: col_name = col_name.lower() #(name, type_code, display_size, ColDescr.append((col_name, SQL_data_type_dict.get(Ctype_code.value,(Ctype_code.value,))[0],Cdisp_size.value,\ Csize.value, Csize.value,CDecimalDigits.value,Cnull_ok.value == 1 and True or False)) self._ColTypeCodeList.append(Ctype_code.value) if len(ColDescr) > 0: self.description = ColDescr # Create the row type before fetching. self._row_type = self.row_type_callable(self) else: self.description = None self._CreateColBuf()