我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用signal.CTRL_C_EVENT。
def test_err_in_fun(self): # Test that the original signal this process was hit with # is not returned in case fun raise an exception. Instead, # we're supposed to see retsig = 1. ret = pyrun(textwrap.dedent( """ import os, signal, imp, sys mod = imp.load_source("mod", r"{}") def foo(): sys.stderr = os.devnull 1 / 0 sig = signal.SIGTERM if os.name == 'posix' else \ signal.CTRL_C_EVENT mod.register_exit_fun(foo) os.kill(os.getpid(), sig) """.format(os.path.abspath(__file__), TESTFN) )) if POSIX: self.assertEqual(ret, 1) assert ret != signal.SIGTERM, strfsig(ret)
def run_app(please_stop, server_is_ready): proc = subprocess.Popen( ["python", "active_data\\app.py", "--settings", "tests/config/elasticsearch.json"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=-1 #creationflags=CREATE_NEW_PROCESS_GROUP ) while not please_stop: line = proc.stdout.readline() if not line: continue if line.find(" * Running on") >= 0: server_is_ready.go() Log.note("SERVER: {{line}}", {"line": line.strip()}) proc.send_signal(signal.CTRL_C_EVENT) # read_alternate_settings
def kill(self, sig): '''Sends a Unix signal to the subprocess. Use constants from the :mod:`signal` module to specify which signal. ''' if sys.platform == 'win32': if sig in [signal.SIGINT, signal.CTRL_C_EVENT]: sig = signal.CTRL_C_EVENT elif sig in [signal.SIGBREAK, signal.CTRL_BREAK_EVENT]: sig = signal.CTRL_BREAK_EVENT else: sig = signal.SIGTERM os.kill(self.proc.pid, sig)
def test_CTRL_C_EVENT(self): from ctypes import wintypes import ctypes # Make a NULL value by creating a pointer with no argument. NULL = ctypes.POINTER(ctypes.c_int)() SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int), wintypes.BOOL) SetConsoleCtrlHandler.restype = wintypes.BOOL # Calling this with NULL and FALSE causes the calling process to # handle CTRL+C, rather than ignore it. This property is inherited # by subprocesses. SetConsoleCtrlHandler(NULL, 0) self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
def raise_sigint(): """ Raising the SIGINT signal in the current process and all sub-processes. os.kill() only issues a signal in the current process (without subprocesses). CTRL+C on the console sends the signal to the process group (which we need). """ if hasattr(signal, 'CTRL_C_EVENT'): # windows. Need CTRL_C_EVENT to raise the signal in the whole process group os.kill(os.getpid(), signal.CTRL_C_EVENT) else: # unix. pgid = os.getpgid(os.getpid()) if pgid == 1: os.kill(os.getpid(), signal.SIGINT) else: os.killpg(os.getpgid(os.getpid()), signal.SIGINT)
def test_CTRL_C_EVENT(self): from ctypes import wintypes import ctypes # Make a NULL value by creating a pointer with no argument. NULL = ctypes.POINTER(ctypes.c_int)() SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int), wintypes.BOOL) SetConsoleCtrlHandler.restype = wintypes.BOOL # Calling this with NULL and FALSE causes the calling process to # handle Ctrl+C, rather than ignore it. This property is inherited # by subprocesses. SetConsoleCtrlHandler(NULL, 0) self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
def _default_handler(signum, *args): ''' The default signal handler. Don't register with built-in signal.signal! This needs to be used on the subprocess await death workaround. ''' # All valid cpython windows signals sigs = { signal.SIGABRT: SIGABRT, # signal.SIGFPE: 'fpe', # Don't catch this # signal.SIGSEGV: 'segv', # Don't catch this # signal.SIGILL: 'illegal', # Don't catch this signal.SIGINT: SIGINT, signal.SIGTERM: SIGTERM, # Note that signal.CTRL_C_EVENT and signal.CTRL_BREAK_EVENT are # converted to SIGINT in _await_signal } try: exc = sigs[signum] except KeyError: exc = DaemonikerSignal _sketch_raise_in_main(exc)
def send_signal(self, sig): """Send a signal to the process """ if sig == signal.SIGTERM: self.terminate() elif sig == signal.CTRL_C_EVENT: os.kill(self.pid, signal.CTRL_C_EVENT) elif sig == signal.CTRL_BREAK_EVENT: os.kill(self.pid, signal.CTRL_BREAK_EVENT) else: raise ValueError("Unsupported signal: {}".format(sig))
def _ctrl_handler(sig): """Handle a sig event and return 0 to terminate the process""" if sig == signal.CTRL_C_EVENT: pass elif sig == signal.CTRL_BREAK_EVENT: pass else: print("UNKNOWN EVENT") return 0
def test_ctrl_signals(self): p = psutil.Process(get_test_subprocess().pid) p.send_signal(signal.CTRL_C_EVENT) p.send_signal(signal.CTRL_BREAK_EVENT) p.kill() p.wait() self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.CTRL_C_EVENT) self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.CTRL_BREAK_EVENT)
def pipe_server(): ''' Part of attach/set_attach for Windows ''' while 1: pipe = Pipe('vdb_%d' % os.getpid(), server=True) knock = pipe.read(3) if knock == 'vsi': os.kill(0, signal.CTRL_C_EVENT) #ctypes.windll.kernel32.GenerateConsoleCtrlEvent(0, os.getpid()) pipe.disconnect() pipe.close()
def kill_node(self, id): if self.is_running(id): self._pnodes[node.id].send_signal(signal.CTRL_C_EVENT)
def quit(): mypid = os.getpid() for pid in sorted(psutil.pids(), reverse=True): if pid == mypid: continue try: p = psutil.Process(pid) if p.exe() == sys.executable: p.send_signal(signal.CTRL_C_EVENT) except: pass
def test_signal_waiting(self): ''' Fixture thine self. ''' proc1 = ProcFixture(signal.SIGINT) proc2 = ProcFixture(signal.SIGTERM) proc3 = ProcFixture(signal.SIGABRT) proc4 = ProcFixture(signal.CTRL_C_EVENT) proc5 = ProcFixture(signal.CTRL_BREAK_EVENT) self.assertEqual(_await_signal(proc1), signal.SIGINT) self.assertEqual(_await_signal(proc2), signal.SIGTERM) self.assertEqual(_await_signal(proc3), signal.SIGABRT) self.assertEqual(_await_signal(proc4), signal.SIGINT) self.assertEqual(_await_signal(proc5), signal.SIGINT)
def _await_signal(process): ''' Waits for the process to die, and then returns the exit code for the process, converting CTRL_C_EVENT and CTRL_BREAK_EVENT into SIGINT. ''' # Note that this is implemented with a busy wait process.wait() code = process.returncode if code == signal.CTRL_C_EVENT: code = signal.SIGINT elif code == signal.CTRL_BREAK_EVENT: code = signal.SIGINT return code
def send_signal(self, sig): """Send a signal to the process.""" # Don't signal a process that we know has already died. if self.returncode is not None: return if sig == signal.SIGTERM: self.terminate() elif sig == signal.CTRL_C_EVENT: os.kill(self.pid, signal.CTRL_C_EVENT) elif sig == signal.CTRL_BREAK_EVENT: os.kill(self.pid, signal.CTRL_BREAK_EVENT) else: raise ValueError("Unsupported signal: {}".format(sig))