我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.times()。
def cpu_times(percpu=False): """Return system-wide CPU times as a namedtuple. Every CPU time represents the seconds the CPU has spent in the given mode. The namedtuple's fields availability varies depending on the platform: - user - system - idle - nice (UNIX) - iowait (Linux) - irq (Linux, FreeBSD) - softirq (Linux) - steal (Linux >= 2.6.11) - guest (Linux >= 2.6.24) - guest_nice (Linux >= 3.2.0) When percpu is True return a list of namedtuples for each CPU. First element of the list refers to first CPU, second element to second CPU and so on. The order of the list is consistent across calls. """ if not percpu: return _psplatform.cpu_times() else: return _psplatform.per_cpu_times()
def test_cmdline(self): cmdline = [PYTHON, "-c", "import time; time.sleep(60)"] sproc = get_test_subprocess(cmdline) try: self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline()), ' '.join(cmdline)) except AssertionError: # XXX - most of the times the underlying sysctl() call on Net # and Open BSD returns a truncated string. # Also /proc/pid/cmdline behaves the same so it looks # like this is a kernel bug. if NETBSD or OPENBSD: self.assertEqual( psutil.Process(sproc.pid).cmdline()[0], PYTHON) else: raise
def _cpu_tot_time(times): """Given a cpu_time() ntuple calculates the total CPU time (including idle time). """ tot = sum(times) if LINUX: # On Linux guest times are already accounted in "user" or # "nice" times, so we subtract them from total. # Htop does the same. References: # https://github.com/giampaolo/psutil/pull/940 # http://unix.stackexchange.com/questions/178045 # https://github.com/torvalds/linux/blob/ # 447976ef4fd09b1be88b316d1a81553f1aa7cd07/kernel/sched/ # cputime.c#L158 tot -= getattr(times, "guest", 0) # Linux 2.6.24+ tot -= getattr(times, "guest_nice", 0) # Linux 3.2.0+ return tot
def search(self): '''Top level search. ''' logger.debug("Start searching...") logger.debug(self.get_info()) self.logwriter = file_writers.LogWriter('%s/%s.log'%(self.out_dir.rstrip('/'), self.data_handle.data_list[0].filename.split('/')[-1].replace('.h5','').replace('.fits','').replace('.fil',''))) self.filewriter = file_writers.FileWriter('%s/%s.dat'%(self.out_dir.rstrip('/'), self.data_handle.data_list[0].filename.split('/')[-1].replace('.h5','').replace('.fits','').replace('.fil','')),self.data_handle.data_list[0].header) logger.info("Start ET search for %s"%self.data_handle.data_list[0].filename) self.logwriter.info("Start ET search for %s"%(self.data_handle.data_list[0].filename)) for ii,target_data_obj in enumerate(self.data_handle.data_list): self.search_data(target_data_obj) ##EE-benshmark cProfile.runctx('self.search_data(target_data_obj)',globals(),locals(),filename='profile_M%2.1f_S%2.1f_t%i'%(self.max_drift,self.snr,int(os.times()[-1]))) #---------------------------------------- #Closing instance. Collect garbage. self.data_handle.data_list[ii].close() gc.collect()
def t(s, sinceBegin=False): """Pretty-print timing information.""" global CTIME, CTIMES nt = int(time.time()) ntimes = os.times() if not sinceBegin: ct = CTIME cts = CTIMES else: ct = BEGIN_TIME cts = BEGIN_TIMES print '# TIME', s, \ ': %dmin, %dsec (wall) %dmin, %dsec (user) %dmin, %dsec (system)' \ % _minSec(nt-ct, ntimes[0]-cts[0], ntimes[1]-cts[1]) if not sinceBegin: CTIME = nt CTIMES = ntimes
def _get_time_times(timer=os.times): t = timer() return t[0] + t[1] # Using getrusage(3) is better than clock(3) if available: # on some systems (e.g. FreeBSD), getrusage has a higher resolution # Furthermore, on a POSIX system, returns microseconds, which # wrap around after 36min.
def changeCycle(self): """Wallpaper change cycle.""" uold, sold, cold, c, e = os.times() while True: delta = self.deltaTime() if delta >= self.timeout: self.change_next() self.time = time.time() unew, snew, cnew, c, e = os.times() start = time.time() percentage = get_percentage(unew, uold, start) if percentage > 30.0: time.sleep(0.1)
def threads(self): """Return threads opened by process as a list of (id, user_time, system_time) namedtuples representing thread id and thread CPU times (user/system). On OpenBSD this method requires root access. """ return self._proc.threads()
def cpu_times(self): """Return a (user, system, children_user, children_system) namedtuple representing the accumulated process time, in seconds. This is similar to os.times() but per-process. On OSX and Windows children_user and children_system are always set to 0. """ return self._proc.cpu_times()
def _monotonic_time(): return os.times()[4]
def test_cpu_times(self): times = psutil.Process().cpu_times() assert (times.user > 0.0) or (times.system > 0.0), times assert (times.children_user >= 0.0), times assert (times.children_system >= 0.0), times # make sure returned values can be pretty printed with strftime for name in times._fields: time.strftime("%H:%M:%S", time.localtime(getattr(times, name))) # Test Process.cpu_times() against os.times() # os.times() is broken on Python 2.6 # http://bugs.python.org/issue1040026 # XXX fails on OSX: not sure if it's for os.times(). We should # try this with Python 2.7 and re-enable the test.
def test_cpu_times_2(self): user_time, kernel_time = psutil.Process().cpu_times()[:2] utime, ktime = os.times()[:2] # Use os.times()[:2] as base values to compare our results # using a tolerance of +/- 0.1 seconds. # It will fail if the difference between the values is > 0.1s. if (max([user_time, utime]) - min([user_time, utime])) > 0.1: self.fail("expected: %s, found: %s" % (utime, user_time)) if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: self.fail("expected: %s, found: %s" % (ktime, kernel_time))
def initStats(self): sNode.n = 0 StateSpace.n = 1 #initial state already generated on call so search self.total_search_time = 0 self.cycle_check_pruned = 0 self.total_search_time = os.times()[0]
def run_during(duration, func): _t = time.time n = 0 start = os.times() start_timestamp = _t() real_start = start[4] or start_timestamp while True: func() n += 1 if _t() - start_timestamp > duration: break end = os.times() real = (end[4] if start[4] else time.time()) - real_start return n, real, sum(end[0:2]) - sum(start[0:2])
def TSTART(): global t0, t1 u, s, cu, cs = os.times() t0 = u+cu, s+cs, time.time()
def TSTOP(*label): global t0, t1 u, s, cu, cs = os.times() t1 = u+cu, s+cs, time.time() tt = [] for i in range(3): tt.append(t1[i] - t0[i]) [u, s, r] = tt msg = '' for x in label: msg = msg + (x + ' ') msg = msg + '%r user, %r sys, %r real\n' % (u, s, r) sys.stderr.write(msg)
def initStats(self): sNode.n = 0 StateSpace.n = 1 #initial state already generated on call so search self.cycle_check_pruned = 0 self.cost_bound_pruned = 0 self.total_search_time = 0 self.total_search_time = os.times()[0]
def main(): print(os.nice(0)) # get relative process priority print(os.nice(1)) # change relative priority print(os.times()) # process times: system, user etc... print(os.isatty(0)) # is the file descriptor arg a tty?(0 = stdin) print(os.isatty(4)) # 4 is just an arbitrary test value print(os.getloadavg()) # UNIX only - number of processes in queue print(os.cpu_count()) # New in Python 3.4
def sim_run_flush(tFlush,nFlush): ''' Run simulation for nFlush*tFlush seconds, Flush probes every tFlush of simulation time, (only flush those that don't have 'weights' in their label names) ''' weighttimeidxold = 0 #doubledLearningRate = False for duration in [tFlush]*nFlush: _,_,_,_,realtime = os.times() print("Finished till",sim.time,'s, in',realtime-realtimeold,'s') sys.stdout.flush() # save weights if weightdt or more has passed since last save weighttimeidx = int(sim.time/weightdt) if weighttimeidx > weighttimeidxold: weighttimeidxold = weighttimeidx save_current_weights(False,sim.time) # flush probes for probe in sim.model.probes: # except weight probes (flushed in save_current_weights) # except error probe which is saved fully in ..._end.shelve if probe.label is not None: if 'weights' in probe.label or 'error' in probe.label: break del sim._probe_outputs[probe][:] ## if time > 1000s, double learning rate #if sim.time>1000. and not doubledLearningRate: # changeLearningRate(4.) # works only if excPESDecayRate = None # doubledLearningRate = True # run simulation for tFlush duration sim.run(duration,progress_bar=False) ### ### run the simulation, with flushing for learning simulations ### ###
def _cpu_busy_time(times): """Given a cpu_time() ntuple calculates the busy CPU time. We do so by subtracting all idle CPU times. """ busy = _cpu_tot_time(times) busy -= times.idle # Linux: "iowait" is time during which the CPU does not do anything # (waits for IO to complete). On Linux IO wait is *not* accounted # in "idle" time so we subtract it. Htop does the same. # References: # https://github.com/torvalds/linux/blob/ # 447976ef4fd09b1be88b316d1a81553f1aa7cd07/kernel/sched/cputime.c#L244 busy -= getattr(times, "iowait", 0) return busy
def get_user_cputime(self): return os.times()[0]
def burn_cpu(self): """Consume REQUEST_CPUTIME_SEC core seconds. This method consumes REQUEST_CPUTIME_SEC core seconds. If unable to complete within REQUEST_TIMEOUT_SEC walltime seconds, it times out and terminates the process. """ start_walltime_sec = self.get_walltime() start_cputime_sec = self.get_user_cputime() while (self.get_user_cputime() < start_cputime_sec + REQUEST_CPUTIME_SEC): self.busy_wait() if (self.get_walltime() > start_walltime_sec + REQUEST_TIMEOUT_SEC): sys.exit(1)
def __init__(self, timer=None, bias=None): self.timings = {} self.cur = None self.cmd = "" self.c_func_name = "" if bias is None: bias = self.bias self.bias = bias # Materialize in local dict for lookup speed. if not timer: self.timer = self.get_time = time.process_time self.dispatcher = self.trace_dispatch_i else: self.timer = timer t = self.timer() # test out timer function try: length = len(t) except TypeError: self.get_time = timer self.dispatcher = self.trace_dispatch_i else: if length == 2: self.dispatcher = self.trace_dispatch else: self.dispatcher = self.trace_dispatch_l # This get_time() implementation needs to be defined # here to capture the passed-in timer in the parameter # list (for performance). Note that we can't assume # the timer() result contains two values in all # cases. def get_time_timer(timer=timer, sum=sum): return sum(timer()) self.get_time = get_time_timer self.t = self.get_time() self.simulate_call('profiler') # Heavily optimized dispatch routine for os.times() timer