我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用time.clock_gettime()。
def test_clock_monotonic(self): a = time.clock_gettime(time.CLOCK_MONOTONIC) b = time.clock_gettime(time.CLOCK_MONOTONIC) self.assertLessEqual(a, b)
def _monotonic_clock(): return time.clock_gettime(time.CLOCK_MONOTONIC)
def run(self): loops = 0 # loop counter input_not_ready = False # to control log spam data_requested = self._frame_len # to keep the alignment intact # capture the starting time start_time = time.clock_gettime(time.CLOCK_MONOTONIC_RAW) while not self._end.is_set(): # increment loop counter loops += 1 # try to read a frame from the input -- should be there all the time try: data = os.read(self._pipe_fd, data_requested) # so we apparently got some data, clear the flag and calculate things input_not_ready = False data_len = len(data) if data_len != 0 and data_len != self._frame_len: log.warning('AacProcessor: Got partial buffer of size {}'.format(data_len)) # call the callback self._play(data) # calculate requested size for the next iteration data_requested -= data_len if data_requested == 0: data_requested = self._frame_len except OSError as e: if e.errno == errno.EAGAIN: # prevent spamming the log with megabytes of text if not input_not_ready: log.error('AacProcessor: Buffer not ready') input_not_ready = True else: raise # calculate next transmission time next_time = start_time + self._frame_period * loops sleep_time = max(0, self._frame_period + (next_time - time.clock_gettime(time.CLOCK_MONOTONIC_RAW))) time.sleep(sleep_time)
def test_clock_realtime(self): time.clock_gettime(time.CLOCK_REALTIME)
def test_clock_settime(self): t = time.clock_gettime(time.CLOCK_REALTIME) try: time.clock_settime(time.CLOCK_REALTIME, t) except PermissionError: pass if hasattr(time, 'CLOCK_MONOTONIC'): self.assertRaises(OSError, time.clock_settime, time.CLOCK_MONOTONIC, 0)
def test_monotonic_settime(self): t1 = time.monotonic() realtime = time.clock_gettime(time.CLOCK_REALTIME) # jump backward with an offset of 1 hour try: time.clock_settime(time.CLOCK_REALTIME, realtime - 3600) except PermissionError as err: self.skipTest(err) t2 = time.monotonic() time.clock_settime(time.CLOCK_REALTIME, realtime) # monotonic must not be affected by system clock updates self.assertGreaterEqual(t2, t1)
def main_loop(bar, inputs = None): # TODO: remove eventinputs again? #inputs += bar.widget.eventinputs() if inputs == None: inputs = global_inputs global_update = True def signal_quit(signal, frame): quit_main_loop() signal.signal(signal.SIGINT, signal_quit) signal.signal(signal.SIGTERM, signal_quit) # main loop while not core.shutdown_requested() and bar.is_running(): now = time.clock_gettime(time.CLOCK_MONOTONIC) if bar.widget.maybe_timeout(now): global_update = True data_ready = [] if global_update: painter = bar.painter() painter.widget(bar.widget) data_ready = select.select(inputs,[],[], 0.00)[0] if not data_ready: #print("REDRAW: " + str(time.clock_gettime(time.CLOCK_MONOTONIC))) painter.flush() global_update = False else: pass #print("more data already ready") if not data_ready: # wait for new data next_timeout = now + 360 # wait for at most one hour until the next bar update to = bar.widget.next_timeout() if to != None: next_timeout = min(next_timeout, to) now = time.clock_gettime(time.CLOCK_MONOTONIC) next_timeout -= now next_timeout = max(next_timeout,0.1) #print("next timeout = " + str(next_timeout)) data_ready = select.select(inputs,[],[], next_timeout)[0] if core.shutdown_requested(): break if not data_ready: pass #print('timeout!') else: for x in data_ready: x.process() global_update = True bar.proc.kill() for i in inputs: i.kill() bar.proc.wait()
def memory_cpu_stats_middleware(get_response): import time import psutil from collections import Counter from django_statsd.clients import statsd from .global_request import get_view_name from django.conf import settings mem_entries = ( 'rss', 'shared_clean', 'shared_dirty', 'private_clean', 'private_dirty' ) def summed(info): res = dict.fromkeys(mem_entries, 0) for path_info in info: for name in mem_entries: res[name] += getattr(path_info, name) return res def middleware(request): global SAMPLE_COUNT SAMPLE_COUNT += 1 if SAMPLE_COUNT >= settings.SAMPLE_RATE: SAMPLE_COUNT = 0 cpu_before = time.clock_gettime(time.CLOCK_PROCESS_CPUTIME_ID) mem_before = summed(psutil.Process().memory_maps()) try: return get_response(request) finally: cpu_after = time.clock_gettime(time.CLOCK_PROCESS_CPUTIME_ID) statsd.gauge( 'cpu.{}'.format(get_view_name()), cpu_after - cpu_before) mem_after = summed(psutil.Process().memory_maps()) mem_key_base = 'memory.{}.{{}}'.format(get_view_name()) for name, after in mem_after.items(): diff = after - mem_before[name] statsd.gauge(mem_key_base.format(name) + '.total', after) statsd.gauge(mem_key_base.format(name) + '.change', diff) else: return get_response(request) return middleware