Python signal 模块,NSIG 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用signal.NSIG

项目:code    作者:ActiveState    | 项目源码 | 文件源码
def get_ctypes_strsignal():
        """Strategy 1: If the C library exposes strsignal(), use it."""
        import signal
        import ctypes
        import ctypes.util
        libc = ctypes.CDLL(ctypes.util.find_library("c"))
        strsignal_proto = ctypes.CFUNCTYPE(ctypes.c_char_p,
                                           ctypes.c_int)
        strsignal_c = strsignal_proto(("strsignal", libc), ((1,),))
        NSIG = signal.NSIG
        def strsignal_ctypes_wrapper(signo):
            # The behavior of the C library strsignal() is unspecified if
            # called with an out-of-range argument.  Range-check on entry
            # _and_ NULL-check on exit.
            if 0 <= signo < NSIG:
                s = strsignal_c(signo)
                if s:
                    return s.decode("utf-8")
            return "Unknown signal "+str(signo)

        return strsignal_ctypes_wrapper
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_add_signal_handler_coroutine_error(self, m_signal):
        m_signal.NSIG = signal.NSIG

        @asyncio.coroutine
        def simple_coroutine():
            yield from []

        # callback must not be a coroutine function
        coro_func = simple_coroutine
        coro_obj = coro_func()
        self.addCleanup(coro_obj.close)
        for func in (coro_func, coro_obj):
            self.assertRaisesRegex(
                TypeError, 'coroutines cannot be used with add_signal_handler',
                self.loop.add_signal_handler,
                signal.SIGINT, func)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_add_signal_handler_install_error(self, m_signal):
        m_signal.NSIG = signal.NSIG

        def set_wakeup_fd(fd):
            if fd == -1:
                raise ValueError()
        m_signal.set_wakeup_fd = set_wakeup_fd

        class Err(OSError):
            errno = errno.EFAULT
        m_signal.signal.side_effect = Err

        self.assertRaises(
            Err,
            self.loop.add_signal_handler,
            signal.SIGINT, lambda: True)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _serve(self):
        if hasattr(signal, 'pthread_sigmask'):
            signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
        while 1:
            try:
                with self._listener.accept() as conn:
                    msg = conn.recv()
                    if msg is None:
                        break
                    key, destination_pid = msg
                    send, close = self._cache.pop(key)
                    try:
                        send(conn, destination_pid)
                    finally:
                        close()
            except:
                if not util.is_exiting():
                    sys.excepthook(*sys.exc_info())
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_add_signal_handler_coroutine_error(self, m_signal):
        m_signal.NSIG = signal.NSIG

        @asyncio.coroutine
        def simple_coroutine():
            yield from []

        # callback must not be a coroutine function
        coro_func = simple_coroutine
        coro_obj = coro_func()
        self.addCleanup(coro_obj.close)
        for func in (coro_func, coro_obj):
            self.assertRaisesRegex(
                TypeError, 'coroutines cannot be used with add_signal_handler',
                self.loop.add_signal_handler,
                signal.SIGINT, func)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_add_signal_handler_install_error(self, m_signal):
        m_signal.NSIG = signal.NSIG

        def set_wakeup_fd(fd):
            if fd == -1:
                raise ValueError()
        m_signal.set_wakeup_fd = set_wakeup_fd

        class Err(OSError):
            errno = errno.EFAULT
        m_signal.signal.side_effect = Err

        self.assertRaises(
            Err,
            self.loop.add_signal_handler,
            signal.SIGINT, lambda: True)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _serve(self):
        if hasattr(signal, 'pthread_sigmask'):
            signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
        while 1:
            try:
                with self._listener.accept() as conn:
                    msg = conn.recv()
                    if msg is None:
                        break
                    key, destination_pid = msg
                    send, close = self._cache.pop(key)
                    try:
                        send(conn, destination_pid)
                    finally:
                        close()
            except:
                if not util.is_exiting():
                    sys.excepthook(*sys.exc_info())
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_add_signal_handler_install_error(self, m_signal):
        m_signal.NSIG = signal.NSIG

        def set_wakeup_fd(fd):
            if fd == -1:
                raise ValueError()
        m_signal.set_wakeup_fd = set_wakeup_fd

        class Err(OSError):
            errno = errno.EFAULT
        m_signal.signal.side_effect = Err

        self.assertRaises(
            Err,
            self.loop.add_signal_handler,
            signal.SIGINT, lambda: True)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def get_reversemap_strsignal():
        """Strategy 2: return the name of the signal constant corresponding to
           the value passed in."""
        import signal

        signames = [None] * signal.NSIG
        for constant in dir(signal):
            if not constant.startswith("SIG"):
                continue
            if constant.startswith("SIG_"):
                continue
            # obsolete names for signals
            if constant in ('SIGIOT', 'SIGCLD', 'SIGPOLL'):
                continue
            signames[getattr(signal, constant)] = constant

        for rt in range(signal.SIGRTMIN+1, signal.SIGRTMAX):
            signames[rt] = "SIGRTMIN+"+str(rt - signal.SIGRTMIN)

        for gap in range(len(signames)):
            if signames[gap] is None:
                signames[gap] = "SIG_"+str(gap)

        NSIG = signal.NSIG
        def strsignal_reversemap_wrapper(signo):
            if 0 <= signo < NSIG:
                return signames[signo]
            else:
                return "Unknown signal "+str(signo)

        return strsignal_reversemap_wrapper
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def _signal_number_to_name(signum):
    """
    Return a name for the signal number, or None if no name can be found.

    >>> _signal_number_to_name(signal.SIGINT)
    'SIGINT'
    >>> _signal_number_to_name(signal.NSIG + 1)
    """

    names = _signums.get(signum, [])
    if not names:
        return None
    return "/".join(names)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def _check_signal(self, sig):
        """Internal helper to validate a signal.

        Raise ValueError if the signal number is invalid or uncatchable.
        Raise RuntimeError if there is a problem setting up the handler.
        """
        if not isinstance(sig, int):
            raise TypeError('sig must be an int, not {!r}'.format(sig))

        if not (1 <= sig < signal.NSIG):
            raise ValueError(
                'sig {} out of range(1, {})'.format(sig, signal.NSIG))
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_check_signal(self):
        self.assertRaises(
            TypeError, self.loop._check_signal, '1')
        self.assertRaises(
            ValueError, self.loop._check_signal, signal.NSIG + 1)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_handle_signal_no_handler(self):
        self.loop._handle_signal(signal.NSIG + 1)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_handle_signal_cancelled_handler(self):
        h = asyncio.Handle(mock.Mock(), (),
                           loop=mock.Mock())
        h.cancel()
        self.loop._signal_handlers[signal.NSIG + 1] = h
        self.loop.remove_signal_handler = mock.Mock()
        self.loop._handle_signal(signal.NSIG + 1)
        self.loop.remove_signal_handler.assert_called_with(signal.NSIG + 1)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_add_signal_handler(self, m_signal):
        m_signal.NSIG = signal.NSIG

        cb = lambda: True
        self.loop.add_signal_handler(signal.SIGHUP, cb)
        h = self.loop._signal_handlers.get(signal.SIGHUP)
        self.assertIsInstance(h, asyncio.Handle)
        self.assertEqual(h._callback, cb)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_add_signal_handler_install_error2(self, m_logging, m_signal):
        m_signal.NSIG = signal.NSIG

        class Err(OSError):
            errno = errno.EINVAL
        m_signal.signal.side_effect = Err

        self.loop._signal_handlers[signal.SIGHUP] = lambda: True
        self.assertRaises(
            RuntimeError,
            self.loop.add_signal_handler,
            signal.SIGINT, lambda: True)
        self.assertFalse(m_logging.info.called)
        self.assertEqual(1, m_signal.set_wakeup_fd.call_count)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_add_signal_handler_install_error3(self, m_logging, m_signal):
        class Err(OSError):
            errno = errno.EINVAL
        m_signal.signal.side_effect = Err
        m_signal.NSIG = signal.NSIG

        self.assertRaises(
            RuntimeError,
            self.loop.add_signal_handler,
            signal.SIGINT, lambda: True)
        self.assertFalse(m_logging.info.called)
        self.assertEqual(2, m_signal.set_wakeup_fd.call_count)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_remove_signal_handler_2(self, m_signal):
        m_signal.NSIG = signal.NSIG
        m_signal.SIGINT = signal.SIGINT

        self.loop.add_signal_handler(signal.SIGINT, lambda: True)
        self.loop._signal_handlers[signal.SIGHUP] = object()
        m_signal.set_wakeup_fd.reset_mock()

        self.assertTrue(
            self.loop.remove_signal_handler(signal.SIGINT))
        self.assertFalse(m_signal.set_wakeup_fd.called)
        self.assertTrue(m_signal.signal.called)
        self.assertEqual(
            (signal.SIGINT, m_signal.default_int_handler),
            m_signal.signal.call_args[0])
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_remove_signal_handler_cleanup_error(self, m_logging, m_signal):
        m_signal.NSIG = signal.NSIG
        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)

        m_signal.set_wakeup_fd.side_effect = ValueError

        self.loop.remove_signal_handler(signal.SIGHUP)
        self.assertTrue(m_logging.info)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_remove_signal_handler_error(self, m_signal):
        m_signal.NSIG = signal.NSIG
        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)

        m_signal.signal.side_effect = OSError

        self.assertRaises(
            OSError, self.loop.remove_signal_handler, signal.SIGHUP)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_remove_signal_handler_error2(self, m_signal):
        m_signal.NSIG = signal.NSIG
        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)

        class Err(OSError):
            errno = errno.EINVAL
        m_signal.signal.side_effect = Err

        self.assertRaises(
            RuntimeError, self.loop.remove_signal_handler, signal.SIGHUP)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_close(self, m_signal):
        m_signal.NSIG = signal.NSIG

        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
        self.loop.add_signal_handler(signal.SIGCHLD, lambda: True)

        self.assertEqual(len(self.loop._signal_handlers), 2)

        m_signal.set_wakeup_fd.reset_mock()

        self.loop.close()

        self.assertEqual(len(self.loop._signal_handlers), 0)
        m_signal.set_wakeup_fd.assert_called_once_with(-1)
项目:gipc    作者:jgehrcke    | 项目源码 | 文件源码
def _reset_signal_handlers():
    for s in _signals_to_reset:
        # On FreeBSD, the numerical value of SIGRT* is larger than NSIG
        # from signal.h (which is a bug in my opinion). Do not change
        # action for these signals. This prevents a ValueError raised
        # in the signal module.
        if s < signal.NSIG:
            signal.signal(s, signal.SIG_DFL)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def __init__(self, loop, signalnum, ref=True, priority=None):
        if signalnum < 1 or signalnum >= signalmodule.NSIG:
            raise ValueError('illegal signal number: %r' % signalnum)
        # still possible to crash on one of libev's asserts:
        # 1) "libev: ev_signal_start called with illegal signal number"
        #    EV_NSIG might be different from signal.NSIG on some platforms
        # 2) "libev: a signal must not be attached to two different loops"
        #    we probably could check that in LIBEV_EMBED mode, but not in general
        watcher.__init__(self, loop, ref=ref, priority=priority, args=(signalnum, ))
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def __init__(self, loop, signalnum, ref=True, priority=None):
        if signalnum < 1 or signalnum >= signalmodule.NSIG:
            raise ValueError('illegal signal number: %r' % signalnum)
        # still possible to crash on one of libev's asserts:
        # 1) "libev: ev_signal_start called with illegal signal number"
        #    EV_NSIG might be different from signal.NSIG on some platforms
        # 2) "libev: a signal must not be attached to two different loops"
        #    we probably could check that in LIBEV_EMBED mode, but not in general
        watcher.__init__(self, loop, ref=ref, priority=priority, args=(signalnum, ))
项目:golightan    作者:shirou    | 项目源码 | 文件源码
def _check_signal(self, sig):
        """Internal helper to validate a signal.

        Raise ValueError if the signal number is invalid or uncatchable.
        Raise RuntimeError if there is a problem setting up the handler.
        """
        if not isinstance(sig, int):
            raise TypeError('sig must be an int, not {!r}'.format(sig))

        if not (1 <= sig < signal.NSIG):
            raise ValueError(
                'sig {} out of range(1, {})'.format(sig, signal.NSIG))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _check_signal(self, sig):
        """Internal helper to validate a signal.

        Raise ValueError if the signal number is invalid or uncatchable.
        Raise RuntimeError if there is a problem setting up the handler.
        """
        if not isinstance(sig, int):
            raise TypeError('sig must be an int, not {!r}'.format(sig))

        if not (1 <= sig < signal.NSIG):
            raise ValueError(
                'sig {} out of range(1, {})'.format(sig, signal.NSIG))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_check_signal(self):
        self.assertRaises(
            TypeError, self.loop._check_signal, '1')
        self.assertRaises(
            ValueError, self.loop._check_signal, signal.NSIG + 1)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_handle_signal_no_handler(self):
        self.loop._handle_signal(signal.NSIG + 1)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_add_signal_handler_setup_error(self, m_signal):
        m_signal.NSIG = signal.NSIG
        m_signal.set_wakeup_fd.side_effect = ValueError

        self.assertRaises(
            RuntimeError,
            self.loop.add_signal_handler,
            signal.SIGINT, lambda: True)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_add_signal_handler(self, m_signal):
        m_signal.NSIG = signal.NSIG

        cb = lambda: True
        self.loop.add_signal_handler(signal.SIGHUP, cb)
        h = self.loop._signal_handlers.get(signal.SIGHUP)
        self.assertIsInstance(h, asyncio.Handle)
        self.assertEqual(h._callback, cb)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_add_signal_handler_install_error2(self, m_logging, m_signal):
        m_signal.NSIG = signal.NSIG

        class Err(OSError):
            errno = errno.EINVAL
        m_signal.signal.side_effect = Err

        self.loop._signal_handlers[signal.SIGHUP] = lambda: True
        self.assertRaises(
            RuntimeError,
            self.loop.add_signal_handler,
            signal.SIGINT, lambda: True)
        self.assertFalse(m_logging.info.called)
        self.assertEqual(1, m_signal.set_wakeup_fd.call_count)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_remove_signal_handler(self, m_signal):
        m_signal.NSIG = signal.NSIG

        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)

        self.assertTrue(
            self.loop.remove_signal_handler(signal.SIGHUP))
        self.assertTrue(m_signal.set_wakeup_fd.called)
        self.assertTrue(m_signal.signal.called)
        self.assertEqual(
            (signal.SIGHUP, m_signal.SIG_DFL), m_signal.signal.call_args[0])
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_remove_signal_handler_2(self, m_signal):
        m_signal.NSIG = signal.NSIG
        m_signal.SIGINT = signal.SIGINT

        self.loop.add_signal_handler(signal.SIGINT, lambda: True)
        self.loop._signal_handlers[signal.SIGHUP] = object()
        m_signal.set_wakeup_fd.reset_mock()

        self.assertTrue(
            self.loop.remove_signal_handler(signal.SIGINT))
        self.assertFalse(m_signal.set_wakeup_fd.called)
        self.assertTrue(m_signal.signal.called)
        self.assertEqual(
            (signal.SIGINT, m_signal.default_int_handler),
            m_signal.signal.call_args[0])
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_remove_signal_handler_cleanup_error(self, m_logging, m_signal):
        m_signal.NSIG = signal.NSIG
        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)

        m_signal.set_wakeup_fd.side_effect = ValueError

        self.loop.remove_signal_handler(signal.SIGHUP)
        self.assertTrue(m_logging.info)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_remove_signal_handler_error(self, m_signal):
        m_signal.NSIG = signal.NSIG
        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)

        m_signal.signal.side_effect = OSError

        self.assertRaises(
            OSError, self.loop.remove_signal_handler, signal.SIGHUP)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_remove_signal_handler_error2(self, m_signal):
        m_signal.NSIG = signal.NSIG
        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)

        class Err(OSError):
            errno = errno.EINVAL
        m_signal.signal.side_effect = Err

        self.assertRaises(
            RuntimeError, self.loop.remove_signal_handler, signal.SIGHUP)
项目:swjtu-oj-insite    作者:swjtuacmer    | 项目源码 | 文件源码
def strsignal_ctypes_wrapper(signo):
        # The behavior of the C library strsignal() is unspecified if
        # called with an out-of-range argument.  Range-check on entry
        # _and_ NULL-check on exit.
        if 0 <= signo < NSIG:
            s = strsignal_c(signo)
            if s:
                return s.decode("utf-8")
        return "Unknown signal %d" % signo
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _resetSignalDisposition(self):
        # The Python interpreter ignores some signals, and our child
        # process will inherit that behaviour. To have a child process
        # that responds to signals normally, we need to reset our
        # child process's signal handling (just) after we fork and
        # before we execvpe.
        for signalnum in xrange(1, signal.NSIG):
            if signal.getsignal(signalnum) == signal.SIG_IGN:
                # Reset signal handling to the default
                signal.signal(signalnum, signal.SIG_DFL)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(self, loop, signalnum, ref=True, priority=None):
        if signalnum < 1 or signalnum >= signalmodule.NSIG:
            raise ValueError('illegal signal number: %r' % signalnum)
        # still possible to crash on one of libev's asserts:
        # 1) "libev: ev_signal_start called with illegal signal number"
        #    EV_NSIG might be different from signal.NSIG on some platforms
        # 2) "libev: a signal must not be attached to two different loops"
        #    we probably could check that in LIBEV_EMBED mode, but not in general
        watcher.__init__(self, loop, ref=ref, priority=priority, args=(signalnum, ))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _check_signal(self, sig):
        """Internal helper to validate a signal.

        Raise ValueError if the signal number is invalid or uncatchable.
        Raise RuntimeError if there is a problem setting up the handler.
        """
        if not isinstance(sig, int):
            raise TypeError('sig must be an int, not {!r}'.format(sig))

        if not (1 <= sig < signal.NSIG):
            raise ValueError(
                'sig {} out of range(1, {})'.format(sig, signal.NSIG))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_check_signal(self):
        self.assertRaises(
            TypeError, self.loop._check_signal, '1')
        self.assertRaises(
            ValueError, self.loop._check_signal, signal.NSIG + 1)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_handle_signal_no_handler(self):
        self.loop._handle_signal(signal.NSIG + 1)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_add_signal_handler_setup_error(self, m_signal):
        m_signal.NSIG = signal.NSIG
        m_signal.set_wakeup_fd.side_effect = ValueError

        self.assertRaises(
            RuntimeError,
            self.loop.add_signal_handler,
            signal.SIGINT, lambda: True)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_add_signal_handler(self, m_signal):
        m_signal.NSIG = signal.NSIG

        cb = lambda: True
        self.loop.add_signal_handler(signal.SIGHUP, cb)
        h = self.loop._signal_handlers.get(signal.SIGHUP)
        self.assertIsInstance(h, asyncio.Handle)
        self.assertEqual(h._callback, cb)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_add_signal_handler_install_error2(self, m_logging, m_signal):
        m_signal.NSIG = signal.NSIG

        class Err(OSError):
            errno = errno.EINVAL
        m_signal.signal.side_effect = Err

        self.loop._signal_handlers[signal.SIGHUP] = lambda: True
        self.assertRaises(
            RuntimeError,
            self.loop.add_signal_handler,
            signal.SIGINT, lambda: True)
        self.assertFalse(m_logging.info.called)
        self.assertEqual(1, m_signal.set_wakeup_fd.call_count)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_add_signal_handler_install_error3(self, m_logging, m_signal):
        class Err(OSError):
            errno = errno.EINVAL
        m_signal.signal.side_effect = Err
        m_signal.NSIG = signal.NSIG

        self.assertRaises(
            RuntimeError,
            self.loop.add_signal_handler,
            signal.SIGINT, lambda: True)
        self.assertFalse(m_logging.info.called)
        self.assertEqual(2, m_signal.set_wakeup_fd.call_count)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_remove_signal_handler_2(self, m_signal):
        m_signal.NSIG = signal.NSIG
        m_signal.SIGINT = signal.SIGINT

        self.loop.add_signal_handler(signal.SIGINT, lambda: True)
        self.loop._signal_handlers[signal.SIGHUP] = object()
        m_signal.set_wakeup_fd.reset_mock()

        self.assertTrue(
            self.loop.remove_signal_handler(signal.SIGINT))
        self.assertFalse(m_signal.set_wakeup_fd.called)
        self.assertTrue(m_signal.signal.called)
        self.assertEqual(
            (signal.SIGINT, m_signal.default_int_handler),
            m_signal.signal.call_args[0])
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_remove_signal_handler_cleanup_error(self, m_logging, m_signal):
        m_signal.NSIG = signal.NSIG
        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)

        m_signal.set_wakeup_fd.side_effect = ValueError

        self.loop.remove_signal_handler(signal.SIGHUP)
        self.assertTrue(m_logging.info)