我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ctypes.c_long()。
def parse_default(field, ftype, fdefault): if not (ftype == 'bool' and fdefault == 'true'): try: fdefault = literal_eval(fdefault.rstrip('LDF')) except (ValueError, SyntaxError): fdefault = None if type(fdefault) is int: if ftype[0] != 'u' and ftype[:5] != 'fixed': if fdefault >> 63: fdefault = c_long(fdefault).value elif fdefault >> 31 and ftype[-2:] != '64': fdefault = c_int(fdefault).value else: fdefault &= (1 << int(ftype[-2:])) - 1 if ftype == 'float' and abs(fdefault) >> 23: fdefault = unpack('=f', pack('=i', fdefault))[0] elif ftype == 'double' and abs(fdefault) >> 52: fdefault = unpack('=d', pack('=q', fdefault))[0] if fdefault: field.default_value = str(fdefault)
def init_dropout_descriptor(fn, handle): dropout_desc = cudnn.DropoutDescriptor() dropout_states_size = ctypes.c_long() check_error(cudnn.lib.cudnnDropoutGetStatesSize( handle, ctypes.byref(dropout_states_size))) dropout_states = torch.cuda.ByteTensor(dropout_states_size.value) dropout_desc.set( handle, fn.dropout, dropout_states, fn.seed ) return dropout_desc
def test_int_pointers(self): from ctypes import c_short, c_uint, c_int, c_long, POINTER, pointer LPINT = POINTER(c_int) ## p = pointer(c_int(42)) ## x = LPINT.from_param(p) x = LPINT.from_param(pointer(c_int(42))) self.assertEqual(x.contents.value, 42) self.assertEqual(LPINT(c_int(42)).contents.value, 42) self.assertEqual(LPINT.from_param(None), None) if c_int != c_long: self.assertRaises(TypeError, LPINT.from_param, pointer(c_long(42))) self.assertRaises(TypeError, LPINT.from_param, pointer(c_uint(42))) self.assertRaises(TypeError, LPINT.from_param, pointer(c_short(42)))
def pre_work( self, task, ): self.update_current_task( task=task, ) interval = self.worker_config['timeouts']['soft_timeout'] if interval == 0: interval = None self.current_timers[threading.get_ident()] = threading.Timer( interval=interval, function=ctypes.pythonapi.PyThreadState_SetAsyncExc, args=( ctypes.c_long(threading.get_ident()), ctypes.py_object(worker.WorkerSoftTimedout), ) ) self.current_timers[threading.get_ident()].start()
def test_array_pointers(self): from ctypes import c_short, c_uint, c_int, c_long, POINTER INTARRAY = c_int * 3 ia = INTARRAY() self.assertEqual(len(ia), 3) self.assertEqual([ia[i] for i in range(3)], [0, 0, 0]) # Pointers are only compatible with arrays containing items of # the same type! LPINT = POINTER(c_int) LPINT.from_param((c_int*3)()) self.assertRaises(TypeError, LPINT.from_param, c_short*3) self.assertRaises(TypeError, LPINT.from_param, c_long*3) self.assertRaises(TypeError, LPINT.from_param, c_uint*3) ## def test_performance(self): ## check_perf()
def raise_exc(self, excobj): """Raise exception processing. """ if not self.isAlive(): return tid = next((k for k, v in threading._active.items() if v is self), None) # pylint: disable=protected-access if tid is None: return with self._thr_lock: res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(excobj)) if res == 0: raise ValueError("nonexistent thread id") elif res > 1: # if it returns a number greater than one, you're in trouble, # and you should call it again with exc=NULL to revert the effect ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), 0) raise SystemError("PyThreadState_SetAsyncExc failed") # the thread was alive when we entered the loop, but was not found # in the dict, hence it must have been already terminated. should we raise # an exception here? silently ignore?
def _set(self, dropout, seed): if self.state is None and dropout > 0: dropout_states_size = ctypes.c_long() check_error(lib.cudnnDropoutGetStatesSize( self.handle, ctypes.byref(dropout_states_size))) self.state = torch.cuda.ByteTensor(dropout_states_size.value) state_ptr = self.state.data_ptr() state_size = self.state.size(0) else: state_ptr = None state_size = 0 check_error(lib.cudnnSetDropoutDescriptor( self, self.handle, ctypes.c_float(dropout), ctypes.c_void_p(state_ptr), ctypes.c_size_t(state_size), ctypes.c_ulonglong(seed), )) self.dropout = dropout
def col_major(self, new_col=None): """ Finds the majority of this CDF file Other Parameters ================ new_col : boolean Specify True to change to column-major, False to change to row major, or do not specify to check the majority rather than changing it. (default is check only) Returns ======= out : boolean True if column-major, false if row-major """ if new_col != None: new_maj = const.COLUMN_MAJOR if new_col else const.ROW_MAJOR self._call(const.PUT_, const.CDF_MAJORITY_, new_maj) maj = ctypes.c_long(0) self._call(const.GET_, const.CDF_MAJORITY_, ctypes.byref(maj)) if not maj.value in (const.ROW_MAJOR.value, const.COLUMN_MAJOR.value): raise CDFError(const.BAD_MAJORITY) return maj.value == const.COLUMN_MAJOR.value
def rv(self, new_rv=None): """ Gets or sets whether this variable has record variance If the variance is unknown, True is assumed (this replicates the apparent behavior of the CDF library on variable creation). Other Parameters ================ new_rv : boolean True to change to record variance, False to change to NRV, unspecified to simply check variance. Returns ======= out : Boolean True if record varying, False if NRV """ if new_rv != None: self._call(const.PUT_, const.zVAR_RECVARY_, const.VARY if new_rv else const.NOVARY) vary = ctypes.c_long(0) self._call(const.GET_, const.zVAR_RECVARY_, ctypes.byref(vary)) return vary.value != const.NOVARY.value
def _call(self, *args, **kwargs): """Select this CDF and variable and call the CDF internal interface Adds call to select this CDF to L{args} and passes all parameters directly through to the CDFlib routine of the CDF library's C internal interface. Checks the return value with L{Library.check_status}. @param args: Passed directly to the CDF library interface. Useful constants are defined in the :py:mod:`pycdf.const` module of this package. @type args: various, see :py:mod:`ctypes`. @return: CDF status from the library @rtype: ctypes.c_long @note: Terminal NULL_ is automatically added to L{args}. @raise CDFError: if CDF library reports an error @raise CDFWarning: if CDF library reports a warning and interpreter is set to error on warnings. """ return self.cdf_file._call(const.SELECT_, const.zVAR_NAME_, self._name, *args, **kwargs)
def select(self): """Selects this hyperslice in the CDF Calls the CDF library to select the CDF, variable, records, and array elements corresponding to this slice. """ args = (const.SELECT_, const.zVAR_RECNUMBER_, ctypes.c_long(self.starts[0]), const.SELECT_, const.zVAR_RECCOUNT_, ctypes.c_long(self.counts[0]), const.SELECT_, const.zVAR_RECINTERVAL_, ctypes.c_long(self.intervals[0])) if self.dims > 1: dims = self.dims - 1 args += (const.SELECT_, const.zVAR_DIMINDICES_, (ctypes.c_long * dims)(*self.starts[1:]), const.SELECT_, const.zVAR_DIMCOUNTS_, (ctypes.c_long * dims)(*self.counts[1:]), const.SELECT_, const.zVAR_DIMINTERVALS_, (ctypes.c_long * dims)(*self.intervals[1:])) self.zvar._call(*args)
def __delitem__(self, key): """Delete a slice of Entries. @param key: index or range of Entry numbers to delete @type key: slice or int @note: Attributes do not 'grow' or 'shrink' as entries are added or removed. Indexes of entries never change and there is no way to 'insert'. """ if key is Ellipsis: key = slice(None, None, None) if not hasattr(key, 'indices'): idx = (key, key + 1, 1) else: idx = key.indices(self.max_idx() + 1) for i in range(*idx): self._call(const.SELECT_, self.ENTRY_, ctypes.c_long(i), const.DELETE_, self.ENTRY_)
def has_entry(self, number): """Check if this attribute has a particular Entry number Parameters ========== number : int number of Entry to check or change Returns ======= out : bool True if ``number`` is a valid entry number; False if not """ status = self._call(const.CONFIRM_, self.ENTRY_EXISTENCE_, ctypes.c_long(number), ignore=(const.NO_SUCH_ENTRY, )) return not status == const.NO_SUCH_ENTRY
def global_scope(self): """Determine scope of this attribute. Returns ======= out : bool True if global (i.e. gAttr), False if zAttr """ scope = ctypes.c_long(0) self._call(const.GET_, const.ATTR_SCOPE_, ctypes.byref(scope)) if scope.value == const.GLOBAL_SCOPE.value: return True elif scope.value == const.VARIABLE_SCOPE.value: return False else: raise CDFError(const.BAD_SCOPE)
def __iter__(self, current=0): """Iterates over all Attr in this CDF or variable Returns name of one L{Attr} at a time until reaches the end. @note: Returned in number order. """ count = ctypes.c_long(0) self._cdf_file._call(const.GET_, const.CDF_NUMATTRS_, ctypes.byref(count)) while current < count.value: candidate = self.AttrType(self._cdf_file, current) if candidate.global_scope() == self.global_scope: if self.special_entry is None or \ candidate.has_entry(self.special_entry()): if str == bytes: value = yield(candidate._name) else: value = yield(candidate._name.decode()) if value != None: current = self[value].number() current += 1
def __len__(self): """Number of zAttributes in this variable @return: number of zAttributes in the CDF which have entries for this variable. @rtype: int """ length = 0 count = ctypes.c_long(0) self._cdf_file._call(const.GET_, const.CDF_NUMATTRS_, ctypes.byref(count)) current = 0 while current < count.value: candidate = zAttr(self._cdf_file, current) if not candidate.global_scope(): if candidate.has_entry(self._zvar._num()): length += 1 current += 1 return length
def type(self, name, new_type=None): """Find or change the CDF type of a zEntry in this zVar @param name: name of the zAttr to check or change @type name: str @param new_type: type to change it to, see :py:mod:`pycdf.const` @type new_type: ctypes.c_long @return: CDF variable type, see :py:mod:`pycdf.const` @rtype: int @note: If changing types, old and new must be equivalent, see CDF User's Guide section 2.5.5 pg. 57 """ attrib = super(zAttrList, self).__getitem__(name) zvar_num = self._zvar._num() if not attrib.has_entry(zvar_num): raise KeyError(name + ': no such attribute for variable ' + self._zvar.name()) return attrib.type(zvar_num, new_type)
def terminate_thread(thread): """Terminates a python thread from another thread. :param thread: a threading.Thread instance """ if not thread.isAlive(): return exc = ctypes.py_object(SystemExit) res = ctypes.pythonapi.PyThreadState_SetAsyncExc( ctypes.c_long(thread.ident), exc) if res == 0: raise ValueError("nonexistent thread id") elif res > 1: # """if it returns a number greater than one, you're in trouble, # and you should call it again with exc=NULL to revert the effect""" ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None) raise SystemError("PyThreadState_SetAsyncExc failed")
def getsig(filename,clib): infile = gzip.open(filename,'rb') try: lines = infile.readlines() except IOError: infile.close() infile = open(filename,'rt') lines = infile.readlines() infile.close() T = len(lines) n = len(bytes.decode(lines[0]).split()[3:]) chrom = (ctypes.c_long * T)() pos = (ctypes.c_long * T)() obs = np.zeros((T,n),dtype=ctypes.c_int8) lines_p = (ctypes.c_char_p * T)() lines_p[:] = lines clib.getsig.restype = None; clib.getsig(chrom,pos,obs.ctypes.data_as(ctypes.c_void_p), ctypes.c_long(T),ctypes.c_long(n),lines_p) return n/2, chrom, pos, obs # retrieve the genotype likelihoods
def terminate_thread(thread): """Terminate a python thread from another thread. :param thread: a threading.Thread instance """ if not thread.isAlive(): return exc = ctypes.py_object(SystemExit) res = ctypes.pythonapi.PyThreadState_SetAsyncExc( ctypes.c_long(thread.ident), exc) if res == 0: raise ValueError("nonexistent thread id") elif res > 1: # If it returns a number greater than one, you're in trouble, # and you should call it again with exc=NULL to revert the effect ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None) raise SystemError("PyThreadState_SetAsyncExc failed")
def DevControl(self, controlType,value): ''' This function is used to change the display properties of the ALP. The default values are assigned during device allocation by AllocateSequence. Usage: Control(self, controlType, value) PARAMETERS ---------- controlType: ctypes c_ulong Specifies the type of value to set. SEE ALSO -------- See AlpDevControl in the ALP API description for control types. ''' self._checkError(self._ALPLib.AlpDevControl(self.ALP_ID, controlType, ct.c_long(value)),'Error sending request.')
def ProjControl(self, controlType, value): ''' This function controls the system parameters that are in effect for all sequences. These parameters are maintained until they are modified again or until the ALP is freed. Default values are in effect after ALP allocation. All parameters can be read out using the AlpProjInquire function. This function is only allowed if the ALP is in idle wait state (ALP_PROJ_IDLE), which can be enforced by the AlpProjHalt function. Usage: Control(self, controlType, value) PARAMETERS ---------- controlType : attribute flag (ctypes c_ulong) Specify the paramter to set. value : c_double Value of the parameter to set. SEE ALSO -------- See AlpProjControl in the ALP API description for control types. ''' self._checkError(self._ALPLib.AlpProjControl(self.ALP_ID, controlType, ct.c_long(value)),'Error sending request.')
def __init__(self, path_dll, serialnumber, hwtype, label='', unit='m'): """ @param str path_dll: the absolute path to the dll of the current operating system @param int serialnumber: serial number of the stage @param str hwtype: name for the type of the hardware device you want to control. The name must be available in hwtype_dict! @param str label: a label which identifies the axis and gives it a meaning. @param str unit: the unit of this axis, possible entries are m, ° or degree """ self.aptdll = windll.LoadLibrary(path_dll) self.aptdll.EnableEventDlg(True) self.aptdll.APTInit() self._HWType = c_long(self.hwtype_dict[hwtype]) self.Connected = False self.verbose = False self.label = label self.setSerialNumber(serialnumber) self._wait_until_done = True self._unit = unit # all apt stages are wither in mm or in degree and # since mm is not an SI unit it has to be converted # here in this hardware file from m to mm.
def setHardwareLimitSwitches(self, switch_reverse, switch_forward): """ Set the Switch Configuration of the axis. @param int switch_reverse: sets the switch in reverse movement @param int switch_forward: sets the switch in forward movement The following values are allowed: 0x01 or 1: Ignore switch or switch not present. 0x02 or 2: Switch makes on contact. 0x03 or 3: Switch breaks on contact. 0x04 or 4: Switch makes on contact - only used for homes (e.g. limit switched rotation stages). 0x05 or 5: Switch breaks on contact - only used for homes (e.g. limit switched rotations stages). 0x06 or 6: For PMD based brushless servo controllers only - uses index mark for homing. """ reverseLimitSwitch = c_long(switch_reverse) forwardLimitSwitch = c_long(switch_forward) self.aptdll.MOT_SetHWLimSwitches(self.SerialNum, reverseLimitSwitch, forwardLimitSwitch) hardwareLimitSwitches = [reverseLimitSwitch.value, forwardLimitSwitch.value] return hardwareLimitSwitches
def set_home_parameter(self, home_dir, switch_dir, home_vel, zero_offset): """ Set the home parameters. @param int home_dir: direction to the home position, 1 = Move forward 2 = Move backward @param int switch_dir: Direction of the switch limit: 4 = Use forward limit switch for home datum 1 = Use forward limit switch for home datum. @param float home_vel = default velocity @param float zero_offset: the distance or offset (in mm or degrees) of the limit switch from the Home position. """ home_dir_c = c_long(home_dir) switch_dir_c = c_long(switch_dir) home_vel_c = c_float(home_vel) zero_offset_c = c_float(zero_offset) self.aptdll.MOT_SetHomeParams(self.SerialNum, home_dir_c, switch_dir_c, home_vel_c, zero_offset_c) return True
def get_status(self): """ Get the status bits of the current axis. @return tuple(int, dict): the current status as an integer and the dictionary explaining the current status. """ status_bits = c_long() self.aptdll.MOT_GetStatusBits(self.SerialNum, pointer(status_bits)) # Check at least whether magnet is moving: if self._test_bit(status_bits.value, 4): return 1, self._create_status_dict() elif self._test_bit(status_bits.value, 5): return 2, self._create_status_dict() else: return 0, self._create_status_dict()
def _coord_byval(coord): """ Turns a COORD object into a c_long. This will cause it to be passed by value instead of by reference. (That is what I think at least.) When runing ``ptipython`` is run (only with IPython), we often got the following error:: Error in 'SetConsoleCursorPosition'. ArgumentError("argument 2: <class 'TypeError'>: wrong type",) argument 2: <class 'TypeError'>: wrong type It was solved by turning ``COORD`` parameters into a ``c_long`` like this. More info: http://msdn.microsoft.com/en-us/library/windows/desktop/ms686025(v=vs.85).aspx """ return c_long(coord.Y * 0x10000 | coord.X & 0xFFFF) #: If True: write the output of the renderer also to the following file. This #: is very useful for debugging. (e.g.: to see that we don't write more bytes #: than required.)
def CellQuality(grid): """ Given a vtk unstructured grid, returns the minimum scaled jacobian cell quality for each cell. Unstructured grid can only contain the following elements tetrahedral pyramid wedge hexahedral Elements may be linear or quadradic """ # Get cells and points from grid cells = grid.GetNumpyCells(ctypes.c_long) points = grid.GetNumpyPoints(ctypes.c_double) return _cellqual.CompScJac_quad(cells, points)
def CoCreateInstanceC2R (self, store, reg, clsid, iid) : # Ugly code to find DLL in C2R version of COM object and get a COM # object despite the fact that COM doesn't handle C2R try: # Get DLL to load from 2R register aReg = winreg.ConnectRegistry(None, store) aKey = winreg.OpenKey(aReg, reg, 0, winreg.KEY_READ | winreg.KEY_WOW64_64KEY) dummy_n, IconvDLL, dummy_t = winreg.EnumValue(aKey, 0) winreg.CloseKey(aKey) winreg.CloseKey(aReg) # Create OLE object from DLL IconvOLE = ctypes.OleDLL(IconvDLL) # Get COM Instance from OLE clsid_class = uuid.UUID(str(clsid)).bytes_le iclassfactory = uuid.UUID(str(pythoncom.IID_IClassFactory)).bytes_le com_classfactory = ctypes.c_long(0) IconvOLE.DllGetClassObject(clsid_class, iclassfactory, ctypes.byref(com_classfactory)) MyFactory = pythoncom.ObjectFromAddress(com_classfactory.value, pythoncom.IID_IClassFactory) return MyFactory.CreateInstance (None, str(iid)) except: return None
def terminate_thread(thread): """Terminate a python thread from another thread. :param thread: a threading.Thread instance """ if not thread.isAlive(): return # print('valet watcher thread: notifier thread is alive... - kill it...') exc = ctypes.py_object(SystemExit) res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(thread.ident), exc) if res == 0: raise ValueError("nonexistent thread id") elif res > 1: # If it returns a number greater than one, you're in trouble, # and you should call it again with exc=NULL to revert the effect ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None) raise SystemError("PyThreadState_SetAsyncExc failed") # print('valet watcher thread exits')
def _async_raise(tid, exctype): '''Raises an exception in the threads with id tid''' if not inspect.isclass(exctype): raise TypeError("Only types can be raised (not instances)") try: res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(exctype)) except AttributeError: # To catch: undefined symbol: PyThreadState_SetAsyncExc return if res == 0: raise ValueError("invalid thread id") elif res != 1: # "if it returns a number greater than one, you're in trouble, # and you should call it again with exc=NULL to revert the effect" ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), 0) raise SystemError("PyThreadState_SetAsyncExc failed")
def ENrunH(self): """ solves hydraulics for conditions at time t. Returns: long * current simulation time (seconds) This function is used in a loop with ENnextH() to run an extended period hydraulic simulation. See ENsolveH() for an example. """ lT = ctypes.c_long() self.errcode = self.ENlib.ENrunH(byref(lT)) self._error() self.cur_time = lT.value return lT.value
def ENnextH(self): """ determines time until next hydraulic event. Returns: * time (seconds) until next hydraulic event (0 marks end of simulation period) This function is used in a loop with ENrunH() to run an extended period hydraulic simulation. See ENsolveH() for an example. """ lTstep = ctypes.c_long() self.errcode = self.ENlib.ENnextH(byref(lTstep)) self._error() return lTstep.value
def ENrunQ(self): """ retrieves hydraulic & WQ results at time t. Returns: long * current simulation time (seconds) This function is used in a loop with ENnextQ() to run an extended period WQ simulation. See ENsolveQ() for an example. """ lT = ctypes.c_long() self.errcode = self.ENlib.ENrunQ(byref(lT)) self._error() return lT.value
def ENnextQ(self): """ advances WQ simulation to next hydraulic event. Returns: long * time (seconds) until next hydraulic event (0 marks end of simulation period) This function is used in a loop with ENrunQ() to run an extended period WQ simulation. See ENsolveQ() for an example. """ lTstep = ctypes.c_long() self.errcode = self.ENlib.ENnextQ(byref(lTstep)) self._error() return lTstep.value
def list_alttab_windows(cls): """ Return the list of the windows handles that are currently guessed to be eligible to the Alt+Tab panel. Raises a OSError exception on error. """ # LPARAM is defined as LONG_PTR (signed type) if ctypes.sizeof(ctypes.c_long) == ctypes.sizeof(ctypes.c_void_p): LPARAM = ctypes.c_long elif ctypes.sizeof(ctypes.c_longlong) == ctypes.sizeof(ctypes.c_void_p): LPARAM = ctypes.c_longlong EnumWindowsProc = ctypes.WINFUNCTYPE( ctypes.c_bool, ctypes.c_void_p, LPARAM) def _enum_proc(hwnd, lparam): try: if cls.is_alttab_window(hwnd): handles.append(hwnd) except OSError: pass return True handles = [] ctypes.windll.user32.EnumWindows(EnumWindowsProc(_enum_proc), 0) return handles
def ptrace(command, pid=0, arg1=0, arg2=0, check_errno=False): if HAS_CPTRACE: try: set_errno(0) result = _ptrace(command, pid, arg1, arg2, check_errno) except ValueError as errobj: message = str(errobj) errno = get_errno() raise PtraceError(message, errno=errno, pid=pid) else: result = _ptrace(command, pid, arg1, arg2) result_signed = c_long(result).value if result_signed == -1: errno = get_errno() # peek operations may returns -1 with errno=0: # it's not an error. For other operations, -1 # is always an error if not(check_errno) or errno: message = "ptrace(cmd=%s, pid=%s, %r, %r) error #%s: %s" % ( command, pid, arg1, arg2, errno, strerror(errno)) raise PtraceError(message, errno=errno, pid=pid) return result