我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ctypes.c_ulonglong()。
def curr_write_pos(self): """ long: Indicates the position in the buffer of the next sample to generate. This value is identical for all channels in the task. """ val = ctypes.c_ulonglong() cfunc = lib_importer.windll.DAQmxGetWriteCurrWritePos if cfunc.argtypes is None: with cfunc.arglock: if cfunc.argtypes is None: cfunc.argtypes = [ lib_importer.task_handle, ctypes.POINTER(ctypes.c_ulonglong)] error_code = cfunc( self._handle, ctypes.byref(val)) check_for_error(error_code) return val.value
def total_samp_per_chan_generated(self): """ long: Indicates the total number of samples generated by each channel in the task. This value is identical for all channels in the task. """ val = ctypes.c_ulonglong() cfunc = lib_importer.windll.DAQmxGetWriteTotalSampPerChanGenerated if cfunc.argtypes is None: with cfunc.arglock: if cfunc.argtypes is None: cfunc.argtypes = [ lib_importer.task_handle, ctypes.POINTER(ctypes.c_ulonglong)] error_code = cfunc( self._handle, ctypes.byref(val)) check_for_error(error_code) return val.value
def curr_read_pos(self): """ long: Indicates in samples per channel the current position in the buffer. """ val = ctypes.c_ulonglong() cfunc = lib_importer.windll.DAQmxGetReadCurrReadPos if cfunc.argtypes is None: with cfunc.arglock: if cfunc.argtypes is None: cfunc.argtypes = [ lib_importer.task_handle, ctypes.POINTER(ctypes.c_ulonglong)] error_code = cfunc( self._handle, ctypes.byref(val)) check_for_error(error_code) return val.value
def logging_file_preallocation_size(self): """ long: Specifies a size in samples to be used to pre-allocate space on disk. Pre-allocation can improve file I/O performance, especially in situations where multiple files are being written to disk. For finite tasks, the default behavior is to pre-allocate the file based on the number of samples you configure the task to acquire. """ val = ctypes.c_ulonglong() cfunc = lib_importer.windll.DAQmxGetLoggingFilePreallocationSize if cfunc.argtypes is None: with cfunc.arglock: if cfunc.argtypes is None: cfunc.argtypes = [ lib_importer.task_handle, ctypes.POINTER(ctypes.c_ulonglong)] error_code = cfunc( self._handle, ctypes.byref(val)) check_for_error(error_code) return val.value
def total_samp_per_chan_acquired(self): """ long: Indicates the total number of samples acquired by each channel. NI-DAQmx returns a single value because this value is the same for all channels. For retriggered acquisitions, this value is the cumulative number of samples across all retriggered acquisitions. """ val = ctypes.c_ulonglong() cfunc = lib_importer.windll.DAQmxGetReadTotalSampPerChanAcquired if cfunc.argtypes is None: with cfunc.arglock: if cfunc.argtypes is None: cfunc.argtypes = [ lib_importer.task_handle, ctypes.POINTER(ctypes.c_ulonglong)] error_code = cfunc( self._handle, ctypes.byref(val)) check_for_error(error_code) return val.value
def samp_quant_samp_per_chan(self): """ long: Specifies the number of samples to acquire or generate for each channel if **samp_quant_samp_mode** is **AcquisitionType.FINITE**. If **samp_quant_samp_mode** is **AcquisitionType.CONTINUOUS**, NI-DAQmx uses this value to determine the buffer size. """ val = ctypes.c_ulonglong() cfunc = lib_importer.windll.DAQmxGetSampQuantSampPerChan if cfunc.argtypes is None: with cfunc.arglock: if cfunc.argtypes is None: cfunc.argtypes = [ lib_importer.task_handle, ctypes.POINTER(ctypes.c_ulonglong)] error_code = cfunc( self._handle, ctypes.byref(val)) check_for_error(error_code) return val.value
def getFreeSpace(self): free_space = 0 if "statvfs" in dir(os): # Unix statvfs = os.statvfs(config.data_dir) free_space = statvfs.f_frsize * statvfs.f_bavail else: # Windows try: import ctypes free_space_pointer = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW( ctypes.c_wchar_p(config.data_dir), None, None, ctypes.pointer(free_space_pointer) ) free_space = free_space_pointer.value except Exception, err: self.log.debug("GetFreeSpace error: %s" % err) return free_space
def make_array(shape=(1,), dtype=np.float32, shared=False, fill_val=None): np_type_to_ctype = {np.float32: ctypes.c_float, np.float64: ctypes.c_double, np.bool: ctypes.c_bool, np.uint8: ctypes.c_ubyte, np.uint64: ctypes.c_ulonglong} if not shared: np_arr = np.empty(shape, dtype=dtype) else: numel = np.prod(shape) arr_ctypes = sharedctypes.RawArray(np_type_to_ctype[dtype], numel) np_arr = np.frombuffer(arr_ctypes, dtype=dtype, count=numel) np_arr.shape = shape if not fill_val is None: np_arr[...] = fill_val return np_arr
def get_free_space(path=settings.KOLIBRI_HOME): if sys.platform.startswith('win'): import ctypes free = ctypes.c_ulonglong(0) check = ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(path), None, None, ctypes.pointer(free)) if check == 0: raise ctypes.winError() result = free.value else: st = os.statvfs(path) result = st.f_bavail * st.f_frsize return result # Utility functions for pinging or killing PIDs
def _set(self, dropout, seed): if self.state is None and dropout > 0: dropout_states_size = ctypes.c_long() check_error(lib.cudnnDropoutGetStatesSize( self.handle, ctypes.byref(dropout_states_size))) self.state = torch.cuda.ByteTensor(dropout_states_size.value) state_ptr = self.state.data_ptr() state_size = self.state.size(0) else: state_ptr = None state_size = 0 check_error(lib.cudnnSetDropoutDescriptor( self, self.handle, ctypes.c_float(dropout), ctypes.c_void_p(state_ptr), ctypes.c_size_t(state_size), ctypes.c_ulonglong(seed), )) self.dropout = dropout
def ctypes_getprocesstimes_systimes(): creationtime = ctypes.c_ulonglong() exittime = ctypes.c_ulonglong() kerneltime = ctypes.c_ulonglong() usertime = ctypes.c_ulonglong() rc = ctypes.windll.kernel32.GetProcessTimes( ctypes.windll.kernel32.GetCurrentProcess(), ctypes.byref(creationtime), ctypes.byref(exittime), ctypes.byref(kerneltime), ctypes.byref(usertime)) if not rc: raise TypeError('GetProcessTimes() returned an error') return (usertime.value / WIN32_PROCESS_TIMES_TICKS_PER_SECOND, kerneltime.value / WIN32_PROCESS_TIMES_TICKS_PER_SECOND) # Select the default for the systimes() function
def statfs(self, path): if common.windows: lpSectorsPerCluster = ctypes.c_ulonglong(0) lpBytesPerSector = ctypes.c_ulonglong(0) lpNumberOfFreeClusters = ctypes.c_ulonglong(0) lpTotalNumberOfClusters = ctypes.c_ulonglong(0) ret = windll.kernel32.GetDiskFreeSpaceW(ctypes.c_wchar_p(path), ctypes.pointer(lpSectorsPerCluster), ctypes.pointer(lpBytesPerSector), ctypes.pointer(lpNumberOfFreeClusters), ctypes.pointer(lpTotalNumberOfClusters)) if not ret: raise WindowsError free_blocks = lpNumberOfFreeClusters.value * lpSectorsPerCluster.value result = {'f_bavail': free_blocks, 'f_bfree': free_blocks, 'f_bsize': lpBytesPerSector.value, 'f_frsize': lpBytesPerSector.value, 'f_blocks': lpTotalNumberOfClusters.value * lpSectorsPerCluster.value, 'f_namemax': wintypes.MAX_PATH} return result else: stv = os.statvfs(path) # f_flag causes python interpreter crashes in some cases. i don't get it. return dict((key, getattr(stv, key)) for key in ('f_bavail', 'f_bfree', 'f_blocks', 'f_bsize', 'f_favail', 'f_ffree', 'f_files', 'f_frsize', 'f_namemax'))
def get_total_disk_space(): cwd = os.getcwd() # Windows is the only platform that doesn't support os.statvfs, so # we need to special case this. if sys.platform.startswith('win'): _, total, free = (ctypes.c_ulonglong(), ctypes.c_ulonglong(), \ ctypes.c_ulonglong()) if sys.version_info >= (3,) or isinstance(cwd, unicode): fn = ctypes.windll.kernel32.GetDiskFreeSpaceExW else: fn = ctypes.windll.kernel32.GetDiskFreeSpaceExA ret = fn(cwd, ctypes.byref(_), ctypes.byref(total), ctypes.byref(free)) if ret == 0: # WinError() will fetch the last error code. raise ctypes.WinError() return (total.value, free.value) else: st = os.statvfs(cwd) free = st.f_bavail * st.f_frsize total = st.f_blocks * st.f_frsize return (total, free)
def get_free_space_bytes(folder): """ Return folder/drive free space (in bytes) """ if platform.system() == 'Windows': _free_bytes = ctypes.c_ulonglong(0) _total_bytes = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW(folder, None, ctypes.pointer(_total_bytes), ctypes.pointer(_free_bytes)) total_bytes = _total_bytes.value free_bytes = _free_bytes.value else: try: st = os.statvfs(folder) total_bytes = st.f_blocks * st.f_frsize free_bytes = st.f_bavail * st.f_frsize except: total_bytes = 0 free_bytes = 0 return total_bytes, free_bytes
def load_libsodium(): global loaded, libsodium, buf libsodium = util.find_library('sodium', 'crypto_stream_salsa20_xor_ic', 'libsodium') if libsodium is None: raise Exception('libsodium not found') libsodium.crypto_stream_salsa20_xor_ic.restype = c_int libsodium.crypto_stream_salsa20_xor_ic.argtypes = (c_void_p, c_char_p, c_ulonglong, c_char_p, c_ulonglong, c_char_p) libsodium.crypto_stream_chacha20_xor_ic.restype = c_int libsodium.crypto_stream_chacha20_xor_ic.argtypes = (c_void_p, c_char_p, c_ulonglong, c_char_p, c_ulonglong, c_char_p) try: libsodium.crypto_stream_chacha20_ietf_xor_ic.restype = c_int libsodium.crypto_stream_chacha20_ietf_xor_ic.argtypes = (c_void_p, c_char_p, c_ulonglong, c_char_p, c_ulong, c_char_p) except: pass buf = create_string_buffer(buf_size) loaded = True
def get_free_space(dir_name): """Get free space in bytes for the path provided :param dir_name: :return: """ if platform.system() == 'Windows': import ctypes free_bytes = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW( ctypes.c_wchar_p(dir_name), None, None, ctypes.pointer(free_bytes) ) return free_bytes.value else: st = os.statvfs(dir_name) return st.f_bavail * st.f_frsize
def logging_file_preallocation_size(self, val): cfunc = lib_importer.windll.DAQmxSetLoggingFilePreallocationSize if cfunc.argtypes is None: with cfunc.arglock: if cfunc.argtypes is None: cfunc.argtypes = [ lib_importer.task_handle, ctypes.c_ulonglong] error_code = cfunc( self._handle, val) check_for_error(error_code)
def logging_samps_per_file(self, val): cfunc = lib_importer.windll.DAQmxSetLoggingSampsPerFile if cfunc.argtypes is None: with cfunc.arglock: if cfunc.argtypes is None: cfunc.argtypes = [ lib_importer.task_handle, ctypes.c_ulonglong] error_code = cfunc( self._handle, val) check_for_error(error_code)
def samp_quant_samp_per_chan(self, val): cfunc = lib_importer.windll.DAQmxSetSampQuantSampPerChan if cfunc.argtypes is None: with cfunc.arglock: if cfunc.argtypes is None: cfunc.argtypes = [ lib_importer.task_handle, ctypes.c_ulonglong] error_code = cfunc( self._handle, val) check_for_error(error_code)
def cfg_change_detection_timing( self, rising_edge_chan="", falling_edge_chan="", sample_mode=AcquisitionType.FINITE, samps_per_chan=1000): """ Configures the task to acquire samples on the rising and/or falling edges of the lines or ports you specify. To detect both rising and falling edges on a line or port, specify the name of that line or port to both **rising_edge_chan** and **falling_edge_chan**. Args: rising_edge_chan (Optional[str]): Specifies the names of the digital lines or ports on which to detect rising edges. The DAQmx physical channel constant lists all lines and ports for devices installed in your system. falling_edge_chan (Optional[str]): Specifies the names of the digital lines or ports on which to detect falling edges. The DAQmx physical channel constant lists all lines and ports for devices installed in your system. sample_mode (Optional[nidaqmx.constants.AcquisitionType]): Specifies if the task acquires samples continuously or if it acquires a finite number of samples. samps_per_chan (Optional[long]): Specifies the number of samples to acquire from each channel in the task if **sample_mode** is **FINITE_SAMPLES**. This function returns an error if the specified value is negative. """ cfunc = lib_importer.windll.DAQmxCfgChangeDetectionTiming if cfunc.argtypes is None: with cfunc.arglock: if cfunc.argtypes is None: cfunc.argtypes = [ lib_importer.task_handle, ctypes_byte_str, ctypes_byte_str, ctypes.c_int, ctypes.c_ulonglong] error_code = cfunc( self._handle, rising_edge_chan, falling_edge_chan, sample_mode.value, samps_per_chan) check_for_error(error_code)
def cfg_handshaking_timing( self, sample_mode=AcquisitionType.FINITE, samps_per_chan=1000): """ Determines the number of digital samples to acquire or generate using digital handshaking between the device and a peripheral device. Args: sample_mode (Optional[nidaqmx.constants.AcquisitionType]): Specifies if the task acquires or generates samples continuously or if it acquires or generates a finite number of samples. samps_per_chan (Optional[long]): Specifies the number of samples to acquire or generate for each channel in the task if **sample_mode** is **FINITE_SAMPLES**. If **sample_mode** is **CONTINUOUS_SAMPLES**, NI-DAQmx uses this value to determine the buffer size. This function returns an error if the specified value is negative. """ cfunc = lib_importer.windll.DAQmxCfgHandshakingTiming if cfunc.argtypes is None: with cfunc.arglock: if cfunc.argtypes is None: cfunc.argtypes = [ lib_importer.task_handle, ctypes.c_int, ctypes.c_ulonglong] error_code = cfunc( self._handle, sample_mode.value, samps_per_chan) check_for_error(error_code)
def cfg_implicit_timing( self, sample_mode=AcquisitionType.FINITE, samps_per_chan=1000): """ Sets only the number of samples to acquire or generate without specifying timing. Typically, you should use this instance when the task does not require sample timing, such as tasks that use counters for buffered frequency measurement, buffered period measurement, or pulse train generation. For finite counter output tasks, **samps_per_chan** is the number of pulses to generate. Args: sample_mode (Optional[nidaqmx.constants.AcquisitionType]): Specifies if the task acquires or generates samples continuously or if it acquires or generates a finite number of samples. samps_per_chan (Optional[long]): Specifies the number of samples to acquire or generate for each channel in the task if **sample_mode** is **FINITE_SAMPLES**. If **sample_mode** is **CONTINUOUS_SAMPLES**, NI-DAQmx uses this value to determine the buffer size. This function returns an error if the specified value is negative. """ cfunc = lib_importer.windll.DAQmxCfgImplicitTiming if cfunc.argtypes is None: with cfunc.arglock: if cfunc.argtypes is None: cfunc.argtypes = [ lib_importer.task_handle, ctypes.c_int, ctypes.c_ulonglong] error_code = cfunc( self._handle, sample_mode.value, samps_per_chan) check_for_error(error_code)
def _GetTotalMemoryInBytes(): if sys.platform in ('win32', 'cygwin'): import ctypes class MEMORYSTATUSEX(ctypes.Structure): _fields_ = [ ("dwLength", ctypes.c_ulong), ("dwMemoryLoad", ctypes.c_ulong), ("ullTotalPhys", ctypes.c_ulonglong), ("ullAvailPhys", ctypes.c_ulonglong), ("ullTotalPageFile", ctypes.c_ulonglong), ("ullAvailPageFile", ctypes.c_ulonglong), ("ullTotalVirtual", ctypes.c_ulonglong), ("ullAvailVirtual", ctypes.c_ulonglong), ("sullAvailExtendedVirtual", ctypes.c_ulonglong), ] stat = MEMORYSTATUSEX(dwLength=ctypes.sizeof(MEMORYSTATUSEX)) ctypes.windll.kernel32.GlobalMemoryStatusEx(ctypes.byref(stat)) return stat.ullTotalPhys elif sys.platform.startswith('linux'): if os.path.exists("/proc/meminfo"): with open("/proc/meminfo") as meminfo: memtotal_re = re.compile(r'^MemTotal:\s*(\d*)\s*kB') for line in meminfo: match = memtotal_re.match(line) if not match: continue return float(match.group(1)) * 2**10 elif sys.platform == 'darwin': try: return int(subprocess.check_output(['sysctl', '-n', 'hw.memsize'])) except Exception: return 0 # TODO(scottmg): Implement this for other platforms. return 0
def freeSpace(folder): if os.name == "nt": import ctypes free_bytes = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(folder), None, None, ctypes.pointer(free_bytes)) return free_bytes.value else: from os import statvfs s = statvfs(folder) return s.f_bsize * s.f_bavail
def free_space(folder): if os.name == "nt": import ctypes free_bytes = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(folder), None, None, ctypes.pointer(free_bytes)) return free_bytes.value else: s = os.statvfs(folder) return s.f_frsize * s.f_bavail
def FreeSpace(path): if platform.startswith('win'): free = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(path), None, None, ctypes.pointer(free)) return free.value else: stat = os.statvfs(path) free = stat.f_frsize * stat.f_bavail return free
def get_free_disk_space(): """Return the number of bytes free on the given disk in Gigabytes (floating)""" path = os.path.dirname(os.path.realpath(__file__)) if platform.system() == 'Windows': import ctypes free_bytes = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(path), None, None, ctypes.pointer(free_bytes)) return float(free_bytes.value / 1024 / 1024) / 1024.0 else: stat = os.statvfs(path) return float(stat.f_bavail * stat.f_frsize / 1024 / 1024) / 1024.0 # pylint: enable=E1101
def _get_free_diskspace(self): '''https://stackoverflow.com/questions/51658/cross-platform-space-remaining-on-volume-using-python''' if platform.system() == 'Windows': free_bytes = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(self.settings.save_directory), None, None, ctypes.pointer(free_bytes)) return free_bytes.value / 1024 / 1024 st = os.statvfs(self.settings.save_directory) return st.f_bavail * st.f_frsize / 1024 / 1024 / 1024
def get_free_space_mb(folder): """ Return folder/drive free space (in bytes) """ if platform.system() == 'Windows': free_bytes = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(folder), None, None, ctypes.pointer(free_bytes)) return free_bytes.value/1024/1024/1024 else: st = os.statvfs(folder) return st.f_bavail * st.f_frsize/1024/1024
def freespace(self,location): # Return free space in bytes avaialable at specfied location # Source: http://stackoverflow.com/questions/51658/cross-platform-space-remaining-on-volume-using-python if platform.system() == 'Windows': free_bytes = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(folder), None, None, ctypes.pointer(free_bytes)) return free_bytes.value else: return os.statvfs(folder).f_bfree
def get_free_space_mb(dirname): import ctypes # import platform # if platform.system() == 'Windows': if xbmc.getCondVisibility('system.platform.windows'): free_bytes = ctypes.c_ulonglong(0) total_bytes = ctypes.c_int64() ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(dirname), None, ctypes.pointer(total_bytes), ctypes.pointer(free_bytes)) return free_bytes.value, total_bytes.value else: st = os.statvfs(dirname) return st.f_bavail * st.f_frsize, st.f_frsize * st.f_blocks
def get_filesystem_pagesize(path): pagesize = 1024 # retrieve filesystem page size for optimizing filesystem based database systems like sqlite try: if is_windows_system(): pagesize = 4096 # almost the best for standard windows systems # try to find the perfect value from the filesystem import ctypes drive = os.path.splitdrive(path) sectorsPerCluster = ctypes.c_ulonglong(0) bytesPerSector = ctypes.c_ulonglong(0) rootPathName = ctypes.c_wchar_p(unicode(drive[0])) ctypes.windll.kernel32.GetDiskFreeSpaceW(rootPathName, ctypes.pointer(sectorsPerCluster), ctypes.pointer(bytesPerSector), None, None, ) pagesize = sectorsPerCluster.value * bytesPerSector.value else: # I could not try it out on non-windows platforms # if it doesn't work the default page size is returned from os import statvfs stats = statvfs(path) pagesize = stats.f_bsize except: log.error('') # adjust page size if pagesize > 32768: pagesize = 32768 if pagesize < 1024: pagesize = 1024 return pagesize
def get_frame_number(self, stream): """Retrieve the frame number for specific stream. Args: stream (int): value from :class:`pyrealsense.constants.rs_stream`. Returns: (double): frame number. """ lrs.rs_get_frame_number.restype = ctypes.c_ulonglong e = ctypes.POINTER(rs_error)() return lrs.rs_get_frame_number(self.dev, stream, ctypes.byref(e))
def __init__(self, volumes): if isinstance(volumes, str): volumes = [volumes, ] volume_array = c_char_p * len(volumes) self.handle = libewf.libewf_open(volume_array(*volumes), c_int(len(volumes)), c_int(1)) if self.handle == 0: raise RuntimeError("Unable to open ewf file") self.readptr = 0 size_p = pointer(c_ulonglong(0)) libewf.libewf_get_media_size(self.handle, size_p) self.size = size_p.contents.value
def read(self, length): buf = create_string_buffer(length) length = libewf.libewf_read_random(self.handle, buf, c_ulong(length), c_ulonglong(self.readptr)) return buf.raw[:length]
def EnumMissingModules(): """Enumerate all modules which match the patterns MODULE_PATTERNS. PyInstaller often fails to locate all dlls which are required at runtime. We import all the client modules here, we simply introdpect all the modules we have loaded in our current running process, and all the ones matching the patterns are copied into the client package. Yields: a source file for a linked dll. """ module_handle = ctypes.c_ulong() count = ctypes.c_ulong() process_handle = ctypes.windll.kernel32.OpenProcess( PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, 0, os.getpid()) ctypes.windll.psapi.EnumProcessModules( process_handle, ctypes.byref(module_handle), ctypes.sizeof(module_handle), ctypes. byref(count)) # The size of a handle is pointer size (i.e. 64 bit on amd64 and 32 bit on # i386). if sys.maxsize > 2 ** 32: handle_type = ctypes.c_ulonglong else: handle_type = ctypes.c_ulong module_list = (handle_type * (count.value / ctypes.sizeof(handle_type)))() ctypes.windll.psapi.EnumProcessModulesEx( process_handle, ctypes.byref(module_list), ctypes.sizeof(module_list), ctypes.byref(count), 2) for x in module_list: module_filename = win32process.GetModuleFileNameEx(process_handle, x).lower() # PyInstaller is pretty bad in finding all the imported pyd files, and dlls. if ("winsxs" in module_filename or "site-packages" in module_filename or module_filename.endswith(".pyd") or "msvc" in module_filename or "\\dlls" in module_filename): yield module_filename else: print "Skipping %s" % module_filename
def GetValueFromAddress(self, processHandle, address, isFloat=False, is64bit=False, isString=False): if isString: data = c.create_string_buffer(16) bytesRead = c.c_ulonglong(16) elif is64bit: data = c.c_ulonglong() bytesRead = c.c_ulonglong() else: data = c.c_ulong() bytesRead = c.c_ulonglong(4) successful = ReadProcessMemory(processHandle, address, c.byref(data), c.sizeof(data), c.byref(bytesRead)) if not successful: e = GetLastError() print("ReadProcessMemory Error: Code " + str(e)) self.ReacquireEverything() value = data.value if isFloat: return struct.unpack("<f", value)[0] elif isString: try: return value.decode('utf-8') except: print("ERROR: Couldn't decode string from memory") return "ERROR" else: return int(value)
def GetBlockOfData(self, processHandle, address, size_of_block): data = c.create_string_buffer(size_of_block) bytesRead = c.c_ulonglong(size_of_block) successful = ReadProcessMemory(processHandle, address, c.byref(data), c.sizeof(data), c.byref(bytesRead)) if not successful: e = GetLastError() print("Getting Block of Data Error: Code " + str(e)) #print('{} : {}'.format(address, self.GetValueFromFrame(data, PlayerDataAddress.simple_move_state))) return data
def disk_usage(path): _, total, free = ctypes.c_ulonglong(), ctypes.c_ulonglong(), \ ctypes.c_ulonglong() if sys.version_info >= (3,) or isinstance(path, unicode): fun = ctypes.windll.kernel32.GetDiskFreeSpaceExW else: fun = ctypes.windll.kernel32.GetDiskFreeSpaceExA ret = fun(path, ctypes.byref(_), ctypes.byref(total), ctypes.byref(free)) if ret == 0: raise ctypes.WinError() used = total.value - free.value return _ntuple_diskusage(total.value, used, free.value)
def cfg_burst_handshaking_timing_export_clock( self, sample_clk_rate, sample_clk_outp_term, sample_mode=AcquisitionType.FINITE, samps_per_chan=1000, sample_clk_pulse_polarity=Polarity.ACTIVE_HIGH, pause_when=Level.HIGH, ready_event_active_level=Polarity.ACTIVE_HIGH): """ Configures when the DAQ device transfers data to a peripheral device, using the onboard Sample Clock of the DAQ device to control burst handshake timing and exporting that clock for use by the peripheral device. Args: sample_clk_rate (float): Specifies in hertz the rate of the Sample Clock. sample_clk_outp_term (str): Specifies the terminal to which to export the Sample Clock. sample_mode (Optional[nidaqmx.constants.AcquisitionType]): Specifies if the task acquires or generates samples continuously or if it acquires or generates a finite number of samples. samps_per_chan (Optional[long]): Specifies the number of samples to acquire or generate for each channel in the task if **sample_mode** is **FINITE_SAMPLES**. If **sample_mode** is **CONTINUOUS_SAMPLES**, NI-DAQmx uses this value to determine the buffer size. This function returns an error if the specified value is negative. sample_clk_pulse_polarity (Optional[nidaqmx.constants.Polarity]): Specifies the polarity of the exported Sample Clock. pause_when (Optional[nidaqmx.constants.Level]): Specifies whether the task pauses while the trigger signal is high or low. ready_event_active_level (Optional[nidaqmx.constants.Polarity]): Specifies the polarity of the Ready for Transfer Event. """ cfunc = lib_importer.windll.DAQmxCfgBurstHandshakingTimingExportClock if cfunc.argtypes is None: with cfunc.arglock: if cfunc.argtypes is None: cfunc.argtypes = [ lib_importer.task_handle, ctypes.c_int, ctypes.c_ulonglong, ctypes.c_double, ctypes_byte_str, ctypes.c_int, ctypes.c_int, ctypes.c_int] error_code = cfunc( self._handle, sample_mode.value, samps_per_chan, sample_clk_rate, sample_clk_outp_term, sample_clk_pulse_polarity.value, pause_when.value, ready_event_active_level.value) check_for_error(error_code)