我们从Python开源项目中,提取了以下3个代码示例,用于说明如何使用celery.signals()。
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options): # Pass girder related job information through to # the signals by adding this information to options['headers'] # This sets defaults for reserved_options based on the class defaults, # or values defined by the girder_job() dectorator headers = { 'girder_job_title': self._girder_job_title, 'girder_job_type': self._girder_job_type, 'girder_job_public': self._girder_job_public, 'girder_job_handler': self._girder_job_handler, 'girder_job_other_fields': self._girder_job_other_fields, } # Certain keys may show up in either kwargs (e.g. via # .delay(girder_token='foo') or in options (e.g. # .apply_async(args=(), kwargs={}, girder_token='foo') For # those special headers, pop them out of kwargs or options and # put them in headers so they can be picked up by the # before_task_publish signal. for key in self.reserved_headers + self.reserved_options: if kwargs is not None and key in kwargs: headers[key] = kwargs.pop(key) if key in options: headers[key] = options.pop(key) if 'headers' in options: options['headers'].update(headers) else: options['headers'] = headers return super(Task, self).apply_async( args=args, kwargs=kwargs, task_id=task_id, producer=producer, link=link, link_error=link_error, shadow=shadow, **options)
def enable_signals(): """Best effort enabling of metrics, logging, sentry signals for celery.""" try: from celery import signals from raven.contrib.celery import CeleryFilter except ImportError: # pragma: no cover return signals.setup_logging.connect(celery_logging_handler) signals.before_task_publish.connect(before_task_publish) signals.after_task_publish.connect(after_task_publish) signals.task_prerun.connect(task_prerun) signals.task_postrun.connect(task_postrun) signals.task_retry.connect(task_retry) signals.task_success.connect(task_success) signals.task_failure.connect(task_failure) signals.task_revoked.connect(task_revoked) # install celery error handler get_sentry_handler().install() talisker.sentry.register_client_update(sentry_handler_update) # de-dup celery errors log_handler = talisker.sentry.get_log_handler() for filter in log_handler.filters: if isinstance(filter, CeleryFilter): break else: log_handler.addFilter(CeleryFilter()) logging.getLogger(__name__).info('enabled talisker celery signals')
def disable_signals(): from celery import signals get_sentry_handler().uninstall() signals.setup_logging.disconnect(celery_logging_handler) signals.before_task_publish.disconnect(before_task_publish) signals.after_task_publish.disconnect(after_task_publish) signals.task_prerun.disconnect(task_prerun) signals.task_postrun.disconnect(task_postrun) signals.task_retry.disconnect(task_retry) signals.task_success.disconnect(task_success) signals.task_failure.disconnect(task_failure) signals.task_revoked.disconnect(task_revoked)