我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用daemon.DaemonContext()。
def run_daemon(args): ctx = daemon.DaemonContext( working_directory=args.working_directory, pidfile=lockfile.LockFile(args.pidfile), stdout=open_io_redirect_file(args.stdout), stderr=open_io_redirect_file(args.stderr), ) if args.command == "status": sys,exit(1 if ctx.is_open else 0) elif args.command == "stop": ctx.close() elif args.command == "start": with ctx: PyPush.web.main.main( False, # I hereby force debug-less mode on the daemon. args.host, args.port, args.db_uri, args.ble_driver, args.ble_device ) else: raise Exception("Unexpected daemon command {!r}".format(args.command))
def kerberos(args): # noqa print(settings.HEADER) import airflow.security.kerberos if args.daemon: pid, stdout, stderr, log_file = setup_locations("kerberos", args.pid, args.stdout, args.stderr, args.log_file) stdout = open(stdout, 'w+') stderr = open(stderr, 'w+') ctx = daemon.DaemonContext( pidfile=TimeoutPIDLockFile(pid, -1), stdout=stdout, stderr=stderr, ) with ctx: airflow.security.kerberos.run() stdout.close() stderr.close() else: airflow.security.kerberos.run()
def main(): args = get_options() config = ConfigParser.ConfigParser() config.read(args.conffile) if config.has_option('default', 'pidfile'): pid_fn = os.path.expanduser(config.get('default', 'pidfile')) else: pid_fn = '/var/run/germqtt.pid' if args.foreground: _main(args, config) else: pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10) with daemon.DaemonContext(pidfile=pid): _main(args, config)
def start(args, cfg, pidfile): # Ensure the directory is there to keep the PID file try: os.makedirs(os.path.join(cfg.getRootDirectory(), 'var', 'run')) except OSError: pass if os.path.isfile(pidfile): pid = lockfile.pidlockfile.read_pid_from_pidfile(pidfile) err('PID file %s already exists (pid=%d), not overwriting possibly existing instance' % (pidfile, pid,)) return 1 # Go, go, go! with daemon.DaemonContext(pidfile=lockfile.pidlockfile.PIDLockFile(pidfile, timeout=1)): ngamsSrv = ngamsServer.ngamsServer() ngamsSrv.init(args) return 0
def main(): parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, description='A cmdline tool to enhance PL9K theme for ZSH\n') subparsers = parser.add_subparsers(help='Use {subcommand} -h for each subcommand\'s optional arguments details', dest='command') subparsers.add_parser('init', help='Init the settings in `~/.zshrc` file') subparsers.add_parser('polling', help='Start the polling daemon process') display_parser = subparsers.add_parser('display', help='Print the corresponding info on the terminal') display_parser.add_argument('widget', help='The widget to display, e.g. weather') args = parser.parse_args() if args.command == 'polling': with daemon.DaemonContext(pidfile=daemon.pidfile.PIDLockFile(constants.PID_PATH)): polling() elif args.command == 'display': widgets = sys.modules['thunderbolt100k.widgets'] assert hasattr(widgets, args.widget), 'There is no widget called {0}'.format(args.widget) assert hasattr(getattr(widgets, args.widget), 'display'), 'The widget {0} must contains a `display` method'.format(args.widget) result = getattr(widgets, args.widget).display() if result: print(result) elif args.command == 'init': init_zshrc()
def main(): args = parse_args() config = yaml.load(open(args.config, 'r').read()) if not args.foreground: fh, logger = setup_logger(config.get('logfile', args.logfile)) try: with daemon.DaemonContext( files_preserve=[fh.stream, sys.stdout], pidfile=pid.PidFile('arwn', args.piddir)): logger.debug("Starting arwn in daemon mode") event_loop(config) except Exception: logger.exception("Something went wrong!") else: fh, logger = setup_logger() logger.debug("Starting arwn in foreground") event_loop(config)
def handle(name, tag, delay, aid, origin, elnk): delay_value = 10 aid_value = None origin_value = None # # print "Name: {0}".format(name) # # print "tag: {0}".format(tag) stamp = str(datetime.datetime.now()) if delay: delay_value = int(delay) if name and tag and api and elnk and clnk: clnk_module = core.extend_load(clnk) elnk_module = core.extend_load(elnk) api_module = core.extend_load(api) task = CoRRTask(name=name, tag=tag, clnk_module=clnk_module, api_module=api_module, elnk_module=elnk_module) # task.run() try: # # print "Loading watcher: {0}".format(task.tag) with daemon.DaemonContext(): task.run() except: pass
def start(self): if 'DAEMON' in self.config: if self.config.get('DAEMON'): import daemon with daemon.DaemonContext(): self._start() self._start()
def open(self): self._addLoggerFiles() daemon.DaemonContext.open(self) if self.stdout_logger: fileLikeObj = FileLikeLogger(self.stdout_logger) sys.stdout = fileLikeObj if self.stderr_logger: fileLikeObj = FileLikeLogger(self.stderr_logger) sys.stderr = fileLikeObj #---------------------------------------------------------------
def _exec(self, detach=True): """ daemonize and exec main() """ kwargs = { 'pidfile': self.pidfile, 'working_directory': self.home_dir, } # FIXME - doesn't work if not detach: kwargs.update({ 'detach_process': False, 'files_preserve': [0,1,2], 'stdout': sys.stdout, 'stderr': sys.stderr, }) ctx = daemon.DaemonContext(**kwargs) with ctx: self._main()
def launch(): """ Launch a polkit authentication agent as a daemon. """ with daemon.DaemonContext(): _launch_agent()
def scheduler(args): print(settings.HEADER) job = jobs.SchedulerJob( dag_id=args.dag_id, subdir=process_subdir(args.subdir), run_duration=args.run_duration, num_runs=args.num_runs, do_pickle=args.do_pickle) if args.daemon: pid, stdout, stderr, log_file = setup_locations("scheduler", args.pid, args.stdout, args.stderr, args.log_file) handle = setup_logging(log_file) stdout = open(stdout, 'w+') stderr = open(stderr, 'w+') ctx = daemon.DaemonContext( pidfile=TimeoutPIDLockFile(pid, -1), files_preserve=[handle], stdout=stdout, stderr=stderr, ) with ctx: job.run() stdout.close() stderr.close() else: signal.signal(signal.SIGINT, sigint_handler) signal.signal(signal.SIGTERM, sigint_handler) signal.signal(signal.SIGQUIT, sigquit_handler) job.run()
def flower(args): broka = conf.get('celery', 'BROKER_URL') address = '--address={}'.format(args.hostname) port = '--port={}'.format(args.port) api = '' if args.broker_api: api = '--broker_api=' + args.broker_api flower_conf = '' if args.flower_conf: flower_conf = '--conf=' + args.flower_conf if args.daemon: pid, stdout, stderr, log_file = setup_locations("flower", args.pid, args.stdout, args.stderr, args.log_file) stdout = open(stdout, 'w+') stderr = open(stderr, 'w+') ctx = daemon.DaemonContext( pidfile=TimeoutPIDLockFile(pid, -1), stdout=stdout, stderr=stderr, ) with ctx: os.execvp("flower", ['flower', '-b', broka, address, port, api, flower_conf]) stdout.close() stderr.close() else: signal.signal(signal.SIGINT, sigint_handler) signal.signal(signal.SIGTERM, sigint_handler) os.execvp("flower", ['flower', '-b', broka, address, port, api, flower_conf])
def execute(self, *args, **options): if options['daemon']: pid_file = options['pid-file'] if os.path.exists(pid_file + ".lock") or os.path.exists(pid_file): try: pid = int(open(pid_file).read()) os.kill(pid, 0) except (ValueError, OSError, IOError): # Not running, delete stale PID file sys.stderr.write("Removing stale PID file\n") import errno try: os.remove(pid_file) except OSError, e: if e.errno != errno.ENOENT: raise e try: os.remove(pid_file + ".lock") except OSError, e: if e.errno != errno.ENOENT: raise e else: # Running, we should refuse to run raise RuntimeError("Daemon is already running (PID %s)" % pid) with DaemonContext(pidfile = PIDLockFile(pid_file)): self._execute_inner(*args, **options) else: self._execute_inner(*args, **options)
def daemonize(args, callback): with DaemonContext(): from Pyflix.utils.logger import log_set_up log_set_up(True) log = logging.getLogger('pyflix.daemon') log.info("running daemon") create_process = False lock = Lock(LOCKFILE, os.getpid(), args.name, args.sea_ep[0], args.sea_ep[1], args.port) if lock.is_locked(): log.debug("lock active") lock_pid = lock.get_pid() if not lock.is_same_file(args.name, args.sea_ep[0], args.sea_ep[1]) \ or not is_process_running(lock_pid): try: log.debug("killing process %s" % lock_pid) os.kill(lock_pid, signal.SIGQUIT) except OSError: pass except TypeError: pass lock.break_lock() create_process = True else: create_process = True if create_process: log.debug("creating proccess") lock.acquire() callback() lock.release() else: log.debug("same daemon process")
def daemonize(logfile, pidfile): needl.log.info('Daemonizing and logging to %s', logfile) with daemon.DaemonContext(working_directory=os.getcwd(), stderr=logfile, umask=0o002, pidfile=daemon.pidfile.PIDLockFile(pidfile)) as dc: start()
def run(): opts.load_options() if opts.VIZ_LUMA_MAP: import luma_vis luma_vis.visualize(models.LUMA_FILE) else: opts.print_config() models.load_luma_observations() models.load_luma_map(models.LUMA_FILE) if opts.RUN_AS_DAEMON: import os try: import daemon import daemon.pidfile except: print "MISSING DAEMON MODULE. PLEASE INSTALL PYTHON-DAEMON TO ENABLE DAEMON MODE" import sys sys.exit(1) uid = os.getuid() lock_file = os.path.join(os.path.sep, "tmp", "autolux.%s.pid" % uid) print "RUNNING IN DAEMON MODE" print "LOCKFILE", lock_file with daemon.DaemonContext(pidfile=daemon.pidfile.PIDLockFile(lock_file)): monitor_luma() else: monitor_luma()
def main_daemon(options): pidfile = lockfile.pidlockfile.PIDLockFile(options.pid_file) context = daemon.DaemonContext(pidfile=pidfile) context.signal_map = { signal.SIGTERM: mainloop.shutdown_callback, signal.SIGINT: mainloop.shutdown_callback } with context: return main_script(options, False)
def main(): """Resource manager main method. Loads the Django models if needed and starts a manager server. """ django.setup() args = docopt.docopt(__doc__) server_port = args["--server-port"] if server_port is None: server_port = RESOURCE_MANAGER_PORT else: server_port = int(server_port) run_django_server = args["--run-django-server"] django_port = int(args["--django-port"]) run_as_daemon = args["--daemon"] if run_as_daemon: if sys.platform == "win32": raise ValueError("Cannot run as daemon on Windows") print "Running in detached mode (as daemon)" with daemon.DaemonContext(stdout=None): start_server(server_port=server_port, run_django_server=run_django_server, django_port=django_port) else: print "Running in attached mode" start_server(server_port=server_port, run_django_server=run_django_server, django_port=django_port)
def daemonized(verbosity, logPath): process = get_i3configger_process() if process: sys.exit(f"i3configger already running ({process})") context = daemon.DaemonContext(working_directory=Path(__file__).parent) if verbosity > 2: # spew output to terminal from where daemon was started context.stdout = sys.stdout context.stderr = sys.stderr with context: base.configure_logging(verbosity, logPath, isDaemon=True) watch_guarded()
def main(): parser = argparse.ArgumentParser(description='Listens to an SQS queue and accepts and removes Salt minion keys') parser.add_argument('-v', '--verbose', action='store_true', help='Enable debug logging') parser.add_argument('-d', '--daemon', action='store_true', help='Daemonize and enable logging to file') parser.add_argument('--syslog', action='store_true', help='Log to syslog rather than file, only in daemon mode') parser.add_argument('--purge', action='store_true', help='Purge all message from queue at startup') args = parser.parse_args() logger.setLevel(logging.INFO) if args.verbose: logger.setLevel(logging.DEBUG) # Log to file if daemonized if args.daemon: if args.syslog: lh = logging.handlers.SysLogHandler(address='/dev/log', facility='daemon') lh.setFormatter(logging.Formatter('%(filename)s[%(process)d]: %(levelname)s - %(message)s')) log_fh = lh.socket.fileno() else: lh = logging.FileHandler('/var/log/aws_ork.log') lh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) log_fh = lh.stream.fileno() else: lh = logging.StreamHandler() lh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) log_fh = lh.stream.fileno() logger.addHandler(lh) if args.daemon: context = daemon.DaemonContext() context.files_preserve = [log_fh] with context: run(args.purge) else: run(args.purge)
def start(self): """ If configured, converts to a daemon. Otherwise start connected to the current console. """ run_as_daemon = self.config.getboolean('Runtime', 'daemon', fallback=False) uid = self.config.get('Runtime', 'daemon user', fallback=None) gid = self.config.get('Runtime', 'daemon group', fallback=None) pidfile = self.config.get('Runtime', 'daemon pid file', fallback=None) if uid: uid = getpwnam(uid).pw_uid if gid: gid = getgrnam(gid).gr_gid if run_as_daemon: LOGGER.info('Starting as a daemon process') import daemon import daemon.pidfile if pidfile: # We need to create the enclosing directory (if it does not exist) before # starting the daemon context as we might lose the permissions to create # that directory piddir = os.path.dirname(pidfile) if not os.path.isdir(piddir): os.makedirs(piddir, 0o0755) os.chown(piddir, uid, gid) # Convert the filename to the appropriate type for the deamon module. pidfile = daemon.pidfile.TimeoutPIDLockFile(pidfile) with daemon.DaemonContext(uid=uid, gid=gid, pidfile=pidfile): self._start() self._start()
def main(): parser = argparse.ArgumentParser() parser.add_argument("vnic_address", help="Playground address of the VNIC") parser.add_argument("switch_address", help="IP address of the Playground Switch") parser.add_argument("switch_port", type=int, help="TCP port of the Playground Switch") parser.add_argument("--port", type=int, default=0, help="TCP port for serving VNIC connections") parser.add_argument("--statusfile", help="file to record status; useful for communications") parser.add_argument("--pidfile", help="file to record pid; useful for communciations") parser.add_argument("--no-daemon", action="store_true", default=False, help="do not launch VNIC in a daemon; remain in foreground") args = parser.parse_args() pidFileName = os.path.expanduser(os.path.expandvars(args.pidfile)) statusFileName = os.path.expanduser(os.path.expandvars(args.statusfile)) pidFileDir = os.path.dirname(pidFileName) if args.no_daemon: runVnic(args.vnic_address, args.port, statusFileName, args.switch_address, args.switch_port, False) else: with daemon.DaemonContext( working_directory=pidFileDir, umask=0o002, pidfile=pidfile.TimeoutPIDLockFile(pidFileName), ) as context: runVnic(args.vnic_address, args.port, statusFileName, args.switch_address, args.switch_port, True)
def main(): parser = argparse.ArgumentParser() parser.add_argument("--private", action="store_true", help="Only accept local connections.") parser.add_argument("--port", type=int, default=0, help="listening port for switch") parser.add_argument("--statusfile", help="file to record status; useful for communications") parser.add_argument("--pidfile", help="file to record pid; useful for communciations") parser.add_argument("--unreliable", action="store_true", default=False, help="Introduce errors on the wire") parser.add_argument("--no-daemon", action="store_true", default=False, help="do not launch switch in a daemon; remain in foreground") args = parser.parse_args() pidFileName = os.path.expanduser(os.path.expandvars(args.pidfile)) statusFileName = os.path.expanduser(os.path.expandvars(args.statusfile)) pidFileDir = os.path.dirname(pidFileName) host = None if args.private: host = "127.0.0.1" if args.no_daemon: runSwitch(args.unreliable, host, args.port, statusFileName) else: with daemon.DaemonContext( working_directory=pidFileDir, umask=0o002, pidfile=pidfile.TimeoutPIDLockFile(pidFileName), ) as context: runSwitch(args.unreliable, host, args.port, statusFileName)
def start(self, domain_name): domain_path = os.path.join(utils.CONFIG_PATH, domain_name) if not os.path.exists(domain_path): raise exception.DomainNotFound(domain=domain_name) bmc_config = self._parse_config(domain_name) # check libvirt's connection and domain prior to starting the BMC utils.check_libvirt_connection_and_domain( bmc_config['libvirt_uri'], domain_name, sasl_username=bmc_config['libvirt_sasl_username'], sasl_password=bmc_config['libvirt_sasl_password']) # mask the passwords if requested log_config = bmc_config.copy() if not CONF['default']['show_passwords']: log_config = utils.mask_dict_password(bmc_config) LOG.debug('Starting a Virtual BMC for domain %(domain)s with the ' 'following configuration options: %(config)s', {'domain': domain_name, 'config': ' '.join(['%s="%s"' % (k, log_config[k]) for k in log_config])}) with daemon.DaemonContext(stderr=sys.stderr, files_preserve=[LOG.handler.stream, ]): # FIXME(lucasagomes): pyghmi start the sockets when the # class is instantiated, therefore we need to create the object # within the daemon context try: vbmc = VirtualBMC(**bmc_config) except Exception as e: msg = ('Error starting a Virtual BMC for domain %(domain)s. ' 'Error: %(error)s' % {'domain': domain_name, 'error': e}) LOG.error(msg) raise exception.VirtualBMCError(msg) # Save the PID number pidfile_path = os.path.join(domain_path, 'pid') with open(pidfile_path, 'w') as f: f.write(str(os.getpid())) LOG.info('Virtual BMC for domain %s started', domain_name) vbmc.listen()
def __init__( self, chroot_directory=None, working_directory='/', umask=0, uid=None, gid=None, prevent_core=True, detach_process=None, files_preserve=[], # changed default loggers_preserve=[], # new pidfile=None, stdout_logger = None, # new stderr_logger = None, # new #stdin, omitted! #stdout, omitted! #sterr, omitted! signal_map=None, ): self.stdout_logger = stdout_logger self.stderr_logger = stderr_logger self.loggers_preserve = loggers_preserve devnull_in = open(os.devnull, 'r+') devnull_out = open(os.devnull, 'w+') files_preserve.extend([devnull_in, devnull_out]) daemon.DaemonContext.__init__(self, chroot_directory = chroot_directory, working_directory = working_directory, umask = umask, uid = uid, gid = gid, prevent_core = prevent_core, detach_process = detach_process, files_preserve = files_preserve, pidfile = pidfile, stdin = devnull_in, stdout = devnull_out, stderr = devnull_out, signal_map = signal_map)
def handle_daemon(self, name, daemon): """ Executes the daemon command. :param str name: The name of the daemon. :param * daemon: The daemon, i.e. object with main method. """ self.output = EnarkshStyle(self.input, self.output) log = logging.getLogger('enarksh') log.setLevel(logging.INFO) log_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') if self.option('daemonize'): config = Config.get() log_file_name = os.path.join(C.HOME, config.get_enarksh_log_dir(), name + '.log') pid_file_name = os.path.join(C.HOME, config.get_enarksh_lock_dir(), name + '.pid') log_handler = logging.handlers.RotatingFileHandler(log_file_name, maxBytes=config.get_enarksh_max_log_size(), backupCount=config.get_enarksh_log_back()) log_handler.setLevel(logging.DEBUG) log_handler.setFormatter(log_formatter) log.addHandler(log_handler) output = open(log_file_name, 'ab', 0) context = DaemonContext() context.working_directory = C.HOME context.umask = 0o002 context.pidfile = PIDLockFile(pid_file_name, False) context.stdout = output context.stderr = output context.files_preserve = [log_handler.stream] with context: daemon.main() else: log_handler = logging.StreamHandler(sys.stdout) log_handler.setLevel(logging.DEBUG) log_handler.setFormatter(log_formatter) log.addHandler(log_handler) daemon.main() # ----------------------------------------------------------------------------------------------------------------------
def _run_job(name, config, gpu=None, prog_args=None, background=False): import socket import subprocess import daemon exper_dir = _expath(name) runem_cmd = ([config['experiment']['prog']] + config['experiment']['prog_args'] + (prog_args or [])) env = os.environ if gpu: env['CUDA_VISIBLE_DEVICES'] = gpu def _do_run_job(): try: job = subprocess.Popen(runem_cmd, cwd=exper_dir, env=env, stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr) with shelve.open('.em', writeback=True) as emdb: emdb[name] = { 'started': _tstamp(), 'status': 'running', 'pid': job.pid, 'hostname': socket.getfqdn(), } if gpu: emdb[name]['gpu'] = gpu job.wait() with shelve.open('.em', writeback=True) as emdb: status = 'completed' if job.returncode == 0 else 'error' emdb[name]['status'] = status except KeyboardInterrupt: with shelve.open('.em', writeback=True) as emdb: emdb[name]['status'] = 'interrupted' finally: with shelve.open('.em', writeback=True) as emdb: emdb[name].pop('pid', None) emdb[name]['ended'] = _tstamp() if background: curdir = osp.abspath(os.curdir) with daemon.DaemonContext(working_directory=curdir): _do_run_job() else: _do_run_job()
def worker(args): env = os.environ.copy() env['AIRFLOW_HOME'] = settings.AIRFLOW_HOME # Celery worker from airflow.executors.celery_executor import app as celery_app from celery.bin import worker worker = worker.worker(app=celery_app) options = { 'optimization': 'fair', 'O': 'fair', 'queues': args.queues, 'concurrency': args.concurrency, 'hostname': args.celery_hostname, } if args.daemon: pid, stdout, stderr, log_file = setup_locations("worker", args.pid, args.stdout, args.stderr, args.log_file) handle = setup_logging(log_file) stdout = open(stdout, 'w+') stderr = open(stderr, 'w+') ctx = daemon.DaemonContext( pidfile=TimeoutPIDLockFile(pid, -1), files_preserve=[handle], stdout=stdout, stderr=stderr, ) with ctx: sp = subprocess.Popen(['airflow', 'serve_logs'], env=env, close_fds=True) worker.run(**options) sp.kill() stdout.close() stderr.close() else: signal.signal(signal.SIGINT, sigint_handler) signal.signal(signal.SIGTERM, sigint_handler) sp = subprocess.Popen(['airflow', 'serve_logs'], env=env, close_fds=True) worker.run(**options) sp.kill()
def daemon_run(host="localhost", port="8080", pidfile=None, logfile=None, keyfile='priv.key', certfile='pub.crt', cafile='ca.crt', action="start"): """ Get the bottle 'run' function running in the background as a daemonized process. :host: The host interface to listen for connections on. Enter 0.0.0.0 to listen on all interfaces. Defaults to localhost. :port: The host port to listen on. Defaults to 8080. :pidfile: The file to use as the process id file. Defaults to "bottle.pid" :logfile: The file to log stdout and stderr from bottle to. Defaults to "bottle.log" """ if pidfile is None: pidfile = os.path.join( os.getcwd(), "bottle.pid" ) if logfile is None: logfile = os.path.join( os.getcwd(), "bottle.log" ) if action == "start": log = open(logfile, "w+") context = daemon.DaemonContext( pidfile=__locked_pidfile(pidfile), stdout=log, stderr=log ) with context: # bottle.run(host=host, port=port) srv = SSLWSGIRefServer(host=host, port=port, keyfile=keyfile, certfile=certfile, cafile=cafile) bottle.run(server=srv) else: with open(pidfile, "r") as p: pid = int(p.read()) os.kill(pid, signal.SIGTERM)
def worker(args): env = os.environ.copy() env['AIRFLOW_HOME'] = settings.AIRFLOW_HOME # Celery worker from airflow.executors.celery_executor import app as celery_app from celery.bin import worker worker = worker.worker(app=celery_app) options = { 'optimization': 'fair', 'O': 'fair', 'queues': args.queues, 'concurrency': args.concurrency, } if args.daemon: pid, stdout, stderr, log_file = setup_locations("worker", args.pid, args.stdout, args.stderr, args.log_file) handle = setup_logging(log_file) stdout = open(stdout, 'w+') stderr = open(stderr, 'w+') ctx = daemon.DaemonContext( pidfile=TimeoutPIDLockFile(pid, -1), files_preserve=[handle], stdout=stdout, stderr=stderr, ) with ctx: sp = subprocess.Popen(['airflow', 'serve_logs'], env=env) worker.run(**options) sp.kill() stdout.close() stderr.close() else: signal.signal(signal.SIGINT, sigint_handler) signal.signal(signal.SIGTERM, sigint_handler) sp = subprocess.Popen(['airflow', 'serve_logs'], env=env) worker.run(**options) sp.kill()