我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用django.conf.settings.CELERY_ALWAYS_EAGER。
def worker(**options): "Run background worker instance." from django.conf import settings if hasattr(settings, 'CELERY_ALWAYS_EAGER') and \ settings.CELERY_ALWAYS_EAGER: raise click.ClickException( 'Disable CELERY_ALWAYS_EAGER in your ' 'settings file to spawn workers.') from munch.core.celery import app os.environ['WORKER_TYPE'] = ','.join(options.pop('worker_type')).lower() pool_cls = options.pop('pool') worker = app.Worker( pool_cls=pool_cls, queues=settings.CELERY_DEFAULT_QUEUE, **options) worker.start() try: sys.exit(worker.exitcode) except AttributeError: # `worker.exitcode` was added in a newer version of Celery: # https://github.com/celery/celery/commit/dc28e8a5 # so this is an attempt to be forwards compatible pass
def setUp(self): super(BaseFilerCelery, self).setUp() settings.CELERY_ALWAYS_EAGER = True current_app.conf.CELERY_ALWAYS_EAGER = True
def send_task(name, args=None, kwargs=None, **opts): """ Send a task by name. Contrary to app.send_task, this function respects the CELERY_ALWAYS_EAGER settings, which is necessary in tests. As a consequence, it works only for registered tasks. """ if settings.CELERY_ALWAYS_EAGER: task = app.tasks[name] # Raises a NotRegistered exception for unregistered tasks return task.apply(args=args, kwargs=kwargs, **opts) else: return app.send_task(name, args=args, kwargs=kwargs)
def basic_test(): logger = basic_test.get_logger() logger.info("celery is running task, we write to the celery logger, and by the way, CELERY_ALWAYS_EAGER is %s" % str(settings.CELERY_ALWAYS_EAGER)) return 'success'
def cron(**options): "Run periodic task dispatcher." from django.conf import settings if hasattr(settings, 'CELERY_ALWAYS_EAGER') and \ settings.CELERY_ALWAYS_EAGER: raise click.ClickException( 'Disable CELERY_ALWAYS_EAGER in your ' 'settings file to spawn workers.') from munch.core.celery import app app.Beat(**options).run()
def TaskRunner(): settings.CELERY_ALWAYS_EAGER = True current_app.conf.CELERY_ALWAYS_EAGER = True yield current_app.conf.CELERY_ALWAYS_EAGER = False settings.CELERY_ALWAYS_EAGER = False