Python signal 模块,setitimer() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用signal.setitimer()

项目:raiden    作者:raiden-network    | 项目源码 | 文件源码
def __init__(
            self,
            callback,
            timer=TIMER,
            interval=INTERVAL_SECONDS,
            timer_signal=TIMER_SIGNAL):

        assert callable(callback), 'callback must be callable'

        signal.signal(timer_signal, self.callback)
        signal.setitimer(timer, interval, interval)

        oldtimer, oldaction = None, None  # cheating for now
        self.oldaction = oldaction
        self.oldtimer = oldtimer
        self._callback = callback
项目:future_data_analyse    作者:ipqhjjybj    | 项目源码 | 文件源码
def execution_timeout(timeout):
    def timed_out(signum, sigframe):
        raise TimeoutError
    delay, interval = signal.setitimer(signal.ITIMER_REAL, timeout, 0)
    old_hdl = signal.signal(signal.SIGALRM, timed_out)
    now = time.time()
    try:
        yield
    finally:
        # inner timeout must be smaller, or the timer event will be delayed
        if delay:
          elapsed = time.time() - now
          delay = max(delay - elapsed, 0.000001)
        else:
          delay = 0
        signal.setitimer(signal.ITIMER_REAL, delay, interval)
        signal.signal(signal.SIGALRM, old_hdl)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def sig_vtalrm(self, *args):
        self.hndl_called = True

        if self.hndl_count > 3:
            # it shouldn't be here, because it should have been disabled.
            raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
                "timer.")
        elif self.hndl_count == 3:
            # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
            signal.setitimer(signal.ITIMER_VIRTUAL, 0)
            if support.verbose:
                print("last SIGVTALRM handler call")

        self.hndl_count += 1

        if support.verbose:
            print("SIGVTALRM handler invoked", args)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_itimer_prof(self):
        self.itimer = signal.ITIMER_PROF
        signal.signal(signal.SIGPROF, self.sig_prof)
        signal.setitimer(self.itimer, 0.2, 0.2)

        start_time = time.time()
        while time.time() - start_time < 60.0:
            # do some work
            _ = pow(12345, 67890, 10000019)
            if signal.getitimer(self.itimer) == (0.0, 0.0):
                break # sig_prof handler stopped this itimer
        else: # Issue 8424
            self.skipTest("timeout: likely cause: machine too slow or load too "
                          "high")

        # profiling itimer should be (0.0, 0.0) now
        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
        # and the handler should have been called
        self.assertEqual(self.hndl_called, True)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def sig_vtalrm(self, *args):
        self.hndl_called = True

        if self.hndl_count > 3:
            # it shouldn't be here, because it should have been disabled.
            raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
                "timer.")
        elif self.hndl_count == 3:
            # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
            signal.setitimer(signal.ITIMER_VIRTUAL, 0)
            if test_support.verbose:
                print("last SIGVTALRM handler call")

        self.hndl_count += 1

        if test_support.verbose:
            print("SIGVTALRM handler invoked", args)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_itimer_prof(self):
        self.itimer = signal.ITIMER_PROF
        signal.signal(signal.SIGPROF, self.sig_prof)
        signal.setitimer(self.itimer, 0.2, 0.2)

        start_time = time.time()
        while time.time() - start_time < 60.0:
            # do some work
            _ = pow(12345, 67890, 10000019)
            if signal.getitimer(self.itimer) == (0.0, 0.0):
                break # sig_prof handler stopped this itimer
        else: # Issue 8424
            self.skipTest("timeout: likely cause: machine too slow or load too "
                          "high")

        # profiling itimer should be (0.0, 0.0) now
        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
        # and the handler should have been called
        self.assertEqual(self.hndl_called, True)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def sig_vtalrm(self, *args):
        self.hndl_called = True

        if self.hndl_count > 3:
            # it shouldn't be here, because it should have been disabled.
            raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
                "timer.")
        elif self.hndl_count == 3:
            # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
            signal.setitimer(signal.ITIMER_VIRTUAL, 0)
            if test_support.verbose:
                print("last SIGVTALRM handler call")

        self.hndl_count += 1

        if test_support.verbose:
            print("SIGVTALRM handler invoked", args)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_itimer_prof(self):
        self.itimer = signal.ITIMER_PROF
        signal.signal(signal.SIGPROF, self.sig_prof)
        signal.setitimer(self.itimer, 0.2, 0.2)

        start_time = time.time()
        while time.time() - start_time < 60.0:
            # do some work
            _ = pow(12345, 67890, 10000019)
            if signal.getitimer(self.itimer) == (0.0, 0.0):
                break # sig_prof handler stopped this itimer
        else: # Issue 8424
            self.skipTest("timeout: likely cause: machine too slow or load too "
                          "high")

        # profiling itimer should be (0.0, 0.0) now
        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
        # and the handler should have been called
        self.assertEqual(self.hndl_called, True)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_itimer_prof(self):
        self.itimer = signal.ITIMER_PROF
        signal.signal(signal.SIGPROF, self.sig_prof)
        signal.setitimer(self.itimer, 0.2, 0.2)

        start_time = time.time()
        while time.time() - start_time < 60.0:
            # do some work
            _ = pow(12345, 67890, 10000019)
            if signal.getitimer(self.itimer) == (0.0, 0.0):
                break # sig_prof handler stopped this itimer
        else: # Issue 8424
            self.skipTest("timeout: likely cause: machine too slow or load too "
                          "high")

        # profiling itimer should be (0.0, 0.0) now
        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
        # and the handler should have been called
        self.assertEqual(self.hndl_called, True)
项目:time2go    作者:twitchyliquid64    | 项目源码 | 文件源码
def set_blocking_signal_threshold(self, seconds, action):
        """Sends a signal if the ioloop is blocked for more than s seconds.

        Pass seconds=None to disable.  Requires python 2.6 on a unixy
        platform.

        The action parameter is a python signal handler.  Read the
        documentation for the python 'signal' module for more information.
        If action is None, the process will be killed if it is blocked for
        too long.
        """
        if not hasattr(signal, "setitimer"):
            logging.error("set_blocking_signal_threshold requires a signal module "
                       "with the setitimer method")
            return
        self._blocking_signal_threshold = seconds
        if seconds is not None:
            signal.signal(signal.SIGALRM,
                          action if action is not None else signal.SIG_DFL)
项目:deprecated_thedap    作者:unitedvote    | 项目源码 | 文件源码
def set_blocking_signal_threshold(self, seconds, action):
        """Sends a signal if the ioloop is blocked for more than s seconds.

        Pass seconds=None to disable.  Requires python 2.6 on a unixy
        platform.

        The action parameter is a python signal handler.  Read the
        documentation for the python 'signal' module for more information.
        If action is None, the process will be killed if it is blocked for
        too long.
        """
        if not hasattr(signal, "setitimer"):
            logging.error("set_blocking_signal_threshold requires a signal module "
                       "with the setitimer method")
            return
        self._blocking_signal_threshold = seconds
        if seconds is not None:
            signal.signal(signal.SIGALRM,
                          action if action is not None else signal.SIG_DFL)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def sig_vtalrm(self, *args):
        self.hndl_called = True

        if self.hndl_count > 3:
            # it shouldn't be here, because it should have been disabled.
            raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
                "timer.")
        elif self.hndl_count == 3:
            # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
            signal.setitimer(signal.ITIMER_VIRTUAL, 0)
            if test_support.verbose:
                print("last SIGVTALRM handler call")

        self.hndl_count += 1

        if test_support.verbose:
            print("SIGVTALRM handler invoked", args)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_itimer_prof(self):
        self.itimer = signal.ITIMER_PROF
        signal.signal(signal.SIGPROF, self.sig_prof)
        signal.setitimer(self.itimer, 0.2, 0.2)

        start_time = time.time()
        while time.time() - start_time < 60.0:
            # do some work
            _ = pow(12345, 67890, 10000019)
            if signal.getitimer(self.itimer) == (0.0, 0.0):
                break # sig_prof handler stopped this itimer
        else: # Issue 8424
            self.skipTest("timeout: likely cause: machine too slow or load too "
                          "high")

        # profiling itimer should be (0.0, 0.0) now
        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
        # and the handler should have been called
        self.assertEqual(self.hndl_called, True)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_itimer_prof(self):
        self.itimer = signal.ITIMER_PROF
        signal.signal(signal.SIGPROF, self.sig_prof)
        signal.setitimer(self.itimer, 0.2, 0.2)

        start_time = time.time()
        while time.time() - start_time < 60.0:
            # do some work
            _ = pow(12345, 67890, 10000019)
            if signal.getitimer(self.itimer) == (0.0, 0.0):
                break # sig_prof handler stopped this itimer
        else: # Issue 8424
            self.skipTest("timeout: likely cause: machine too slow or load too "
                          "high")

        # profiling itimer should be (0.0, 0.0) now
        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
        # and the handler should have been called
        self.assertEqual(self.hndl_called, True)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def sig_vtalrm(self, *args):
        self.hndl_called = True

        if self.hndl_count > 3:
            # it shouldn't be here, because it should have been disabled.
            raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
                "timer.")
        elif self.hndl_count == 3:
            # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
            signal.setitimer(signal.ITIMER_VIRTUAL, 0)
            if test_support.verbose:
                print("last SIGVTALRM handler call")

        self.hndl_count += 1

        if test_support.verbose:
            print("SIGVTALRM handler invoked", args)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_itimer_prof(self):
        self.itimer = signal.ITIMER_PROF
        signal.signal(signal.SIGPROF, self.sig_prof)
        signal.setitimer(self.itimer, 0.2, 0.2)

        start_time = time.time()
        while time.time() - start_time < 60.0:
            # do some work
            _ = pow(12345, 67890, 10000019)
            if signal.getitimer(self.itimer) == (0.0, 0.0):
                break # sig_prof handler stopped this itimer
        else: # Issue 8424
            self.skipTest("timeout: likely cause: machine too slow or load too "
                          "high")

        # profiling itimer should be (0.0, 0.0) now
        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
        # and the handler should have been called
        self.assertEqual(self.hndl_called, True)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_itimer_prof(self):
        self.itimer = signal.ITIMER_PROF
        signal.signal(signal.SIGPROF, self.sig_prof)
        signal.setitimer(self.itimer, 0.2, 0.2)

        start_time = time.time()
        while time.time() - start_time < 60.0:
            # do some work
            _ = pow(12345, 67890, 10000019)
            if signal.getitimer(self.itimer) == (0.0, 0.0):
                break # sig_prof handler stopped this itimer
        else: # Issue 8424
            self.skipTest("timeout: likely cause: machine too slow or load too "
                          "high")

        # profiling itimer should be (0.0, 0.0) now
        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
        # and the handler should have been called
        self.assertEqual(self.hndl_called, True)
项目:stackimpact-python    作者:stackimpact    | 项目源码 | 文件源码
def destroy(self):
        if not self.setup_done:
            return

        signal.setitimer(signal.ITIMER_REAL, 0)
        signal.signal(signal.SIGALRM, self.prev_signal_handler)
项目:stackimpact-python    作者:stackimpact    | 项目源码 | 文件源码
def record(self, duration):
        self.agent.log('Activating block profiler.')

        signal.setitimer(signal.ITIMER_REAL, self.SAMPLING_RATE, self.SAMPLING_RATE)
        time.sleep(duration)
        signal.setitimer(signal.ITIMER_REAL, 0)

        self.agent.log('Deactivating block profiler.')

        self.profile_duration += duration

        self.agent.log('Block profiler CPU overhead per activity second: {0} seconds'.format(self.profile._overhead / self.profile_duration))
项目:stackimpact-python    作者:stackimpact    | 项目源码 | 文件源码
def destroy(self):
        if not self.setup_done:
            return

        signal.setitimer(signal.ITIMER_PROF, 0)
        signal.signal(signal.SIGPROF, self.prev_signal_handler)
项目:stackimpact-python    作者:stackimpact    | 项目源码 | 文件源码
def record(self, duration):
        if self.agent.config.is_profiling_disabled():
            return

        self.agent.log('Activating CPU profiler.')

        signal.setitimer(signal.ITIMER_PROF, self.SAMPLING_RATE, self.SAMPLING_RATE)
        time.sleep(duration)
        signal.setitimer(signal.ITIMER_PROF, 0)

        self.agent.log('Deactivating CPU profiler.')

        self.profile_duration += duration

        self.agent.log('CPU profiler CPU overhead per activity second: {0} seconds'.format(self.profile._overhead / self.profile_duration))
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def set_blocking_signal_threshold(self, seconds, action):
        if not hasattr(signal, "setitimer"):
            gen_log.error("set_blocking_signal_threshold requires a signal module "
                          "with the setitimer method")
            return
        self._blocking_signal_threshold = seconds
        if seconds is not None:
            signal.signal(signal.SIGALRM,
                          action if action is not None else signal.SIG_DFL)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def set_blocking_signal_threshold(self, seconds, action):
        if not hasattr(signal, "setitimer"):
            gen_log.error("set_blocking_signal_threshold requires a signal module "
                          "with the setitimer method")
            return
        self._blocking_signal_threshold = seconds
        if seconds is not None:
            signal.signal(signal.SIGALRM,
                          action if action is not None else signal.SIG_DFL)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def alarm(self, t=None):
        """start a timer to fire only once

        like signal.alarm, but with better resolution than integer seconds.
        """
        if t is None:
            t = self.signal_delay
        self.timer_fired = False
        self.orig_handler = signal.signal(signal.SIGALRM, self.stop_timer)
        # signal_period ignored, since only one timer event is allowed to fire
        signal.setitimer(signal.ITIMER_REAL, t, 1000)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def stop_timer(self, *args):
        self.timer_fired = True
        signal.setitimer(signal.ITIMER_REAL, 0, 0)
        signal.signal(signal.SIGALRM, self.orig_handler)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def set_blocking_signal_threshold(self, seconds, action):
        if not hasattr(signal, "setitimer"):
            gen_log.error("set_blocking_signal_threshold requires a signal module "
                          "with the setitimer method")
            return
        self._blocking_signal_threshold = seconds
        if seconds is not None:
            signal.signal(signal.SIGALRM,
                          action if action is not None else signal.SIG_DFL)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_signal_handling_while_selecting(self):
        # Test with a signal actually arriving during a select() call.
        caught = 0

        def my_handler():
            nonlocal caught
            caught += 1
            self.loop.stop()

        self.loop.add_signal_handler(signal.SIGALRM, my_handler)

        signal.setitimer(signal.ITIMER_REAL, 0.01, 0)  # Send SIGALRM once.
        self.loop.run_forever()
        self.assertEqual(caught, 1)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_signal_handling_args(self):
        some_args = (42,)
        caught = 0

        def my_handler(*args):
            nonlocal caught
            caught += 1
            self.assertEqual(args, some_args)

        self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args)

        signal.setitimer(signal.ITIMER_REAL, 0.1, 0)  # Send SIGALRM once.
        self.loop.call_later(0.5, self.loop.stop)
        self.loop.run_forever()
        self.assertEqual(caught, 1)
项目:aiotools    作者:achimnol    | 项目源码 | 文件源码
def set_timeout():
    def make_timeout(sec, callback):

        def _callback(signum, frame):
            signal.alarm(0)
            callback()

        signal.signal(signal.SIGALRM, _callback)
        signal.setitimer(signal.ITIMER_REAL, sec)

    yield make_timeout
项目:raiden    作者:raiden-network    | 项目源码 | 文件源码
def stop(self):
        self._callback = None

        if self.oldaction and callable(self.oldaction):
            signal.signal(TIMER_SIGNAL, self.oldaction)
            signal.setitimer(
                TIMER_SIGNAL,
                self.oldtimer[0],
                self.oldtimer[1],
            )
        else:
            signal.signal(TIMER_SIGNAL, signal.SIG_IGN)
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def test_exit_code_signal(self):
        self.mktmp("import signal, time\n"
                   "signal.setitimer(signal.ITIMER_REAL, 0.1)\n"
                   "time.sleep(1)\n")
        self.system("%s %s" % (sys.executable, self.fname))
        self.assertEqual(ip.user_ns['_exit_code'], -signal.SIGALRM)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def tearDown(self):
        signal.signal(signal.SIGALRM, self.old_alarm)
        if self.itimer is not None: # test_itimer_exc doesn't change this attr
            # just ensure that itimer is stopped
            signal.setitimer(self.itimer, 0)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def sig_prof(self, *args):
        self.hndl_called = True
        signal.setitimer(signal.ITIMER_PROF, 0)

        if support.verbose:
            print("SIGPROF handler invoked", args)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_itimer_exc(self):
        # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
        # defines it ?
        self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
        # Negative times are treated as zero on some platforms.
        if 0:
            self.assertRaises(signal.ItimerError,
                              signal.setitimer, signal.ITIMER_REAL, -1)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_itimer_real(self):
        self.itimer = signal.ITIMER_REAL
        signal.setitimer(self.itimer, 1.0)
        if support.verbose:
            print("\ncall pause()...")
        signal.pause()

        self.assertEqual(self.hndl_called, True)

    # Issue 3864, unknown if this affects earlier versions of freebsd also
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def set_blocking_signal_threshold(self, seconds, action):
        if not hasattr(signal, "setitimer"):
            gen_log.error("set_blocking_signal_threshold requires a signal module "
                          "with the setitimer method")
            return
        self._blocking_signal_threshold = seconds
        if seconds is not None:
            signal.signal(signal.SIGALRM,
                          action if action is not None else signal.SIG_DFL)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def set_blocking_signal_threshold(self, seconds, action):
        if not hasattr(signal, "setitimer"):
            gen_log.error("set_blocking_signal_threshold requires a signal module "
                          "with the setitimer method")
            return
        self._blocking_signal_threshold = seconds
        if seconds is not None:
            signal.signal(signal.SIGALRM,
                          action if action is not None else signal.SIG_DFL)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def set_blocking_signal_threshold(self, seconds, action):
        if not hasattr(signal, "setitimer"):
            gen_log.error("set_blocking_signal_threshold requires a signal module "
                          "with the setitimer method")
            return
        self._blocking_signal_threshold = seconds
        if seconds is not None:
            signal.signal(signal.SIGALRM,
                          action if action is not None else signal.SIG_DFL)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def set_blocking_signal_threshold(self, seconds, action):
        if not hasattr(signal, "setitimer"):
            gen_log.error("set_blocking_signal_threshold requires a signal module "
                          "with the setitimer method")
            return
        self._blocking_signal_threshold = seconds
        if seconds is not None:
            signal.signal(signal.SIGALRM,
                          action if action is not None else signal.SIG_DFL)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def set_blocking_signal_threshold(self, seconds, action):
        if not hasattr(signal, "setitimer"):
            gen_log.error("set_blocking_signal_threshold requires a signal module "
                          "with the setitimer method")
            return
        self._blocking_signal_threshold = seconds
        if seconds is not None:
            signal.signal(signal.SIGALRM,
                          action if action is not None else signal.SIG_DFL)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def tearDown(self):
        signal.signal(signal.SIGALRM, self.old_alarm)
        if self.itimer is not None: # test_itimer_exc doesn't change this attr
            # just ensure that itimer is stopped
            signal.setitimer(self.itimer, 0)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def sig_prof(self, *args):
        self.hndl_called = True
        signal.setitimer(signal.ITIMER_PROF, 0)

        if test_support.verbose:
            print("SIGPROF handler invoked", args)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_itimer_exc(self):
        # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
        # defines it ?
        self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
        # Negative times are treated as zero on some platforms.
        if 0:
            self.assertRaises(signal.ItimerError,
                              signal.setitimer, signal.ITIMER_REAL, -1)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_itimer_real(self):
        self.itimer = signal.ITIMER_REAL
        signal.setitimer(self.itimer, 1.0)
        if test_support.verbose:
            print("\ncall pause()...")
        signal.pause()

        self.assertEqual(self.hndl_called, True)

    # Issue 3864. Unknown if this affects earlier versions of freebsd also.
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def tearDown(self):
        signal.signal(signal.SIGALRM, self.old_alarm)
        if self.itimer is not None: # test_itimer_exc doesn't change this attr
            # just ensure that itimer is stopped
            signal.setitimer(self.itimer, 0)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def sig_prof(self, *args):
        self.hndl_called = True
        signal.setitimer(signal.ITIMER_PROF, 0)

        if test_support.verbose:
            print("SIGPROF handler invoked", args)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_itimer_exc(self):
        # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
        # defines it ?
        self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
        # Negative times are treated as zero on some platforms.
        if 0:
            self.assertRaises(signal.ItimerError,
                              signal.setitimer, signal.ITIMER_REAL, -1)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_itimer_real(self):
        self.itimer = signal.ITIMER_REAL
        signal.setitimer(self.itimer, 1.0)
        if test_support.verbose:
            print("\ncall pause()...")
        signal.pause()

        self.assertEqual(self.hndl_called, True)

    # Issue 3864. Unknown if this affects earlier versions of freebsd also.
项目:GLaDOS2    作者:TheComet    | 项目源码 | 文件源码
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
    def decorator(func):
        def _handle_timeout(signum, frame):
            raise TimeoutError(error_message)

        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, _handle_timeout)
            signal.setitimer(signal.ITIMER_REAL,seconds) #used timer instead of alarm
            try:
                result = func(*args, **kwargs)
            finally:
                signal.alarm(0)
            return result
        return wraps(func)(wrapper)
    return decorator
项目:My-Web-Server-Framework-With-Python2.7    作者:syjsu    | 项目源码 | 文件源码
def set_blocking_signal_threshold(self, seconds, action):
        if not hasattr(signal, "setitimer"):
            gen_log.error("set_blocking_signal_threshold requires a signal module "
                          "with the setitimer method")
            return
        self._blocking_signal_threshold = seconds
        if seconds is not None:
            signal.signal(signal.SIGALRM,
                          action if action is not None else signal.SIG_DFL)