我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用signal.SIGCHLD。
def set_exit_callback(self, callback): """Runs ``callback`` when this process exits. The callback takes one argument, the return code of the process. This method uses a ``SIGCHLD`` handler, which is a global setting and may conflict if you have other libraries trying to handle the same signal. If you are using more than one ``IOLoop`` it may be necessary to call `Subprocess.initialize` first to designate one ``IOLoop`` to run the signal handlers. In many cases a close callback on the stdout or stderr streams can be used as an alternative to an exit callback if the signal handler is causing a problem. """ self._exit_callback = stack_context.wrap(callback) Subprocess.initialize(self.io_loop) Subprocess._waiting[self.pid] = self Subprocess._try_cleanup_process(self.pid)
def initialize(cls, io_loop=None): """Initializes the ``SIGCHLD`` handler. The signal handler is run on an `.IOLoop` to avoid locking issues. Note that the `.IOLoop` used for signal handling need not be the same one used by individual Subprocess objects (as long as the ``IOLoops`` are each running in separate threads). .. versionchanged:: 4.1 The ``io_loop`` argument is deprecated. """ if cls._initialized: return if io_loop is None: io_loop = ioloop.IOLoop.current() cls._old_sigchld = signal.signal( signal.SIGCHLD, lambda sig, frame: io_loop.add_callback_from_signal(cls._cleanup)) cls._initialized = True
def __init__(self, master=False): # Setup signal fd, this allows signal to behave correctly if os.name == 'posix': self.signal_pipe_r, self.signal_pipe_w = os.pipe() self._set_nonblock(self.signal_pipe_r) self._set_nonblock(self.signal_pipe_w) signal.set_wakeup_fd(self.signal_pipe_w) self._signals_received = collections.deque() signal.signal(signal.SIGINT, signal.SIG_DFL) if os.name == 'posix': signal.signal(signal.SIGCHLD, signal.SIG_DFL) signal.signal(signal.SIGTERM, self._signal_catcher) signal.signal(signal.SIGALRM, self._signal_catcher) signal.signal(signal.SIGHUP, self._signal_catcher) else: # currently a noop on window... signal.signal(signal.SIGTERM, self._signal_catcher) # FIXME(sileht): should allow to catch signal CTRL_BREAK_EVENT, # but we to create the child process with CREATE_NEW_PROCESS_GROUP # to make this work, so current this is a noop for later fix signal.signal(signal.SIGBREAK, self._signal_catcher)
def getsignal(signalnum): """ Exactly the same as :func:`signal.signal` except where :const:`signal.SIGCHLD` is concerned. For :const:`signal.SIGCHLD`, this cooperates with :func:`signal` to provide consistent answers. """ if signalnum != _signal.SIGCHLD: return _signal_getsignal(signalnum) global _child_handler if _child_handler is _INITIAL: _child_handler = _signal_getsignal(_signal.SIGCHLD) return _child_handler
def attach_loop(self, loop): assert loop is None or isinstance(loop, events.AbstractEventLoop) if self._loop is not None and loop is None and self._callbacks: warnings.warn( 'A loop is being detached ' 'from a child watcher with pending handlers', RuntimeWarning) if self._loop is not None: self._loop.remove_signal_handler(signal.SIGCHLD) self._loop = loop if loop is not None: loop.add_signal_handler(signal.SIGCHLD, self._sig_chld) # Prevent a race condition in case a child terminated # during the switch. self._do_waitpid_all()
def initialize(cls, io_loop=None): """Initializes the ``SIGCHILD`` handler. The signal handler is run on an `.IOLoop` to avoid locking issues. Note that the `.IOLoop` used for signal handling need not be the same one used by individual Subprocess objects (as long as the ``IOLoops`` are each running in separate threads). """ if cls._initialized: return if io_loop is None: io_loop = ioloop.IOLoop.current() cls._old_sigchld = signal.signal( signal.SIGCHLD, lambda sig, frame: io_loop.add_callback_from_signal(cls._cleanup)) cls._initialized = True
def main (): signal.signal (signal.SIGCHLD, signal_handler) pid, fd = pty.fork() if pid == 0: os.write (sys.stdout.fileno(), 'This is a test.\nThis is a test.') time.sleep(10000) nonblock (fd) tty.setraw(fd) #STDIN_FILENO) print 'Sending SIGKILL to child pid:', pid time.sleep(2) os.kill (pid, signal.SIGKILL) print 'Entering to sleep...' try: time.sleep(2) except: print 'Sleep interrupted' try: os.kill(pid, 0) print '\tChild is alive. This is ambiguous because it may be a Zombie.' except OSError as e: print '\tChild appears to be dead.' # print str(e) print print 'Reading from master fd:', os.read (fd, 1000)
def setUp(self): """ Save the current SIGCHLD handler as reported by L{signal.signal} and the current file descriptor registered with L{installHandler}. """ handler = signal.getsignal(signal.SIGCHLD) if handler != signal.SIG_DFL: self.signalModuleHandler = handler signal.signal(signal.SIGCHLD, signal.SIG_DFL) else: self.signalModuleHandler = None self.oldFD = installHandler(-1) if self.signalModuleHandler is not None and self.oldFD != -1: msg("Previous test didn't clean up after its SIGCHLD setup: %r %r" % (self.signalModuleHandler, self.oldFD))
def save_signal_handlers(): saved = {} for sig in [signal.SIGINT, signal.SIGTERM, signal.SIGCHLD]: saved[sig] = signal.getsignal(sig) if "twisted" in repr(saved): if not issubclass(IOLoop.configured_class(), TwistedIOLoop): # when the global ioloop is twisted, we expect the signal # handlers to be installed. Otherwise, it means we're not # cleaning up after twisted properly. raise Exception("twisted signal handlers already installed") return saved
def uninitialize(cls): """Removes the ``SIGCHLD`` handler.""" if not cls._initialized: return signal.signal(signal.SIGCHLD, cls._old_sigchld) cls._initialized = False
def _analyze(self): if self.signum == SIGSEGV: self.memoryFault() elif self.signum == SIGFPE: self.mathError() elif self.signum == SIGCHLD: self.childExit() elif self.signum == SIGABRT: self.error = Abort() return self.error
def attach_loop(self, loop): assert loop is None or isinstance(loop, events.AbstractEventLoop) if self._loop is not None: self._loop.remove_signal_handler(signal.SIGCHLD) self._loop = loop if loop is not None: loop.add_signal_handler(signal.SIGCHLD, self._sig_chld) # Prevent a race condition in case a child terminated # during the switch. self._do_waitpid_all()
def _sig_chld(self): try: self._do_waitpid_all() except Exception as exc: # self._loop should always be available here # as '_sig_chld' is added as a signal handler # in 'attach_loop' self._loop.call_exception_handler({ 'message': 'Unknown exception in SIGCHLD handler', 'exception': exc, })
def test_close(self, m_signal): m_signal.NSIG = signal.NSIG self.loop.add_signal_handler(signal.SIGHUP, lambda: True) self.loop.add_signal_handler(signal.SIGCHLD, lambda: True) self.assertEqual(len(self.loop._signal_handlers), 2) m_signal.set_wakeup_fd.reset_mock() self.loop.close() self.assertEqual(len(self.loop._signal_handlers), 0) m_signal.set_wakeup_fd.assert_called_once_with(-1)
def test_create_watcher(self): self.m_add_signal_handler.assert_called_once_with( signal.SIGCHLD, self.watcher._sig_chld)
def test_set_loop(self, m): # register a child callback = mock.Mock() with self.watcher: self.running = True self.watcher.add_child_handler(60, callback) # attach a new loop old_loop = self.loop self.loop = self.new_test_loop() patch = mock.patch.object with patch(old_loop, "remove_signal_handler") as m_old_remove, \ patch(self.loop, "add_signal_handler") as m_new_add: self.watcher.attach_loop(self.loop) m_old_remove.assert_called_once_with( signal.SIGCHLD) m_new_add.assert_called_once_with( signal.SIGCHLD, self.watcher._sig_chld) # child terminates self.running = False self.add_zombie(60, 9) self.watcher._sig_chld() callback.assert_called_once_with(60, 9)
def test_close(self, m): # register two children callback1 = mock.Mock() with self.watcher: self.running = True # child 1 terminates self.add_zombie(63, 9) # other child terminates self.add_zombie(65, 18) self.watcher._sig_chld() self.watcher.add_child_handler(63, callback1) self.watcher.add_child_handler(64, callback1) self.assertEqual(len(self.watcher._callbacks), 1) if isinstance(self.watcher, asyncio.FastChildWatcher): self.assertEqual(len(self.watcher._zombies), 1) with mock.patch.object( self.loop, "remove_signal_handler") as m_remove_signal_handler: self.watcher.close() m_remove_signal_handler.assert_called_once_with( signal.SIGCHLD) self.assertFalse(self.watcher._callbacks) if isinstance(self.watcher, asyncio.FastChildWatcher): self.assertFalse(self.watcher._zombies)
def setupCommands(): # install the sig child handler to get rid of the zombie processes global __sigchld_handler_installed if not __sigchld_handler_installed: signal.signal( signal.SIGCHLD, __sigchld_handler ) __sigchld_handler_installed = True