我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用celery.schedules.crontab()。
def scheduler(cls, interval: (crontab, float), *args, queue: Enum = TaskQueue.SHORT, **kwargs): # pragma: no cover """ Registers the decorated function as a periodic task. The task should not accept any arguments. :param interval: Periodic interval in seconds as float or crontab object specifying task trigger time. See http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab :param queue: Queue to use for the scheduled task. :param args: Arguments to pass to scheduled task. :param kwargs: Keyword arguments to pass to scheduled task. """ def _wrapper(function: Callable): task = celery.task(function, base=ExceptionLoggerTask, queue=queue.value) celery.add_periodic_task(interval, task.s(), args, kwargs) return function return _wrapper
def decode_schedule(obj): if obj is None: return None _type = obj['__type__'] value = obj['__value__'] if _type == 'datetime': return decode_datetime(value) elif _type == 'crontab': return crontab(*value.split('\t')) elif _type == 'solar': return solar(**value) elif _type == 'schedule': return schedule(**value) else: raise NotImplementedError( 'Cannot deserialize schedule %(type)s type' % { 'type': _type } )
def install_default_entries(self, data): entries = {} if self.app.conf.CELERY_TASK_RESULT_EXPIRES: # Add backend clean up entries.setdefault( 'celery.backend_cleanup', { 'task': 'celery.backend_cleanup', 'schedule': schedules.crontab('0', '4', '*') } ) self.update_from_dict(entries)
def schedule(self): return schedules.crontab( minute=self.minute, hour=self.hour, day_of_week=self.day_of_week, day_of_month=self.day_of_month, month_of_year=self.month_of_year )
def schedule(self): if self.crontab: return self.crontab.schedule if self.interval: return self.interval.schedule
def scheduled_responder(cls, plugin: str, interval: (crontab, float), queue: Enum = TaskQueue.SHORT, **kwargs): """ Registers the decorated function as responder and register `run_plugin_for_all_repos` as periodic task with plugin name and a responder event as arguments. :param plugin: Name of plugin with which responder will be registered. :param interval: Periodic interval in seconds as float or crontab object specifying task trigger time. See http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab :param queue: Queue to use for the scheduled_responder's tasks. :param kwargs: Keyword arguments to pass to `run_plugin_for_all_repos`. >>> from gitmate_hooks.utils import ResponderRegistrar >>> @ResponderRegistrar.scheduled_responder('test', 10.0) ... def test_responder(igitt_repo): ... print('Hello, World!') This will register a `test.test_responder` responder and schedule `run_plugin_for_all_repos` with arguments `('test', 'test.test_responder')` with 10 seconds interval. """ def _wrapper(function: Callable): action = '{}.{}'.format(plugin, function.__name__) periodic_task_args = (plugin, action) function = cls.responder(plugin, action)(function) task = celery.task(run_plugin_for_all_repos, base=ExceptionLoggerTask, queue=queue.value) celery.add_periodic_task( interval, task.s(), periodic_task_args, kwargs) return function return _wrapper
def schedule(self): return schedules.crontab(minute=self.minute, hour=self.hour, day_of_week=self.day_of_week, day_of_month=self.day_of_month, month_of_year=self.month_of_year)
def __str__(self): fmt = '{0.name}: {0.crontab}' return fmt.format(self)
def install_default_entries(self, data): entries = {} if self.app.conf.CELERY_TASK_RESULT_EXPIRES: entries.setdefault( 'celery.backend_cleanup', { 'task': 'celery.backend_cleanup', 'schedule': schedules.crontab('*/5', '*', '*'), 'options': {'expires': 12 * 3600}, }, ) self.update_from_dict(entries)
def install_default_entries(self, data): entries = {} if self.app.conf.result_expires: entries.setdefault( 'celery.backend_cleanup', { 'task': 'celery.backend_cleanup', 'schedule': schedules.crontab('0', '4', '*'), 'options': {'expires': 12 * 3600}, }, ) self.update_from_dict(entries)
def schedule(self): return schedules.crontab(minute=self.minute, hour=self.hour, day_of_week=self.day_of_week, day_of_month=self.day_of_month, month_of_year=self.month_of_year, nowfun=lambda: make_aware(now()))
def validate_unique(self, *args, **kwargs): super(PeriodicTask, self).validate_unique(*args, **kwargs) if not self.interval and not self.crontab and not self.solar: raise ValidationError({ 'interval': [ 'One of interval, crontab, or solar must be set.' ] }) if self.interval and self.crontab and self.solar: raise ValidationError({ 'crontab': [ 'Only one of interval, crontab, or solar must be set' ] })
def __str__(self): fmt = '{0.name}: {{no schedule}}' if self.interval: fmt = '{0.name}: {0.interval}' if self.crontab: fmt = '{0.name}: {0.crontab}' if self.solar: fmt = '{0.name}: {0.solar}' return fmt.format(self)
def schedule(self): if self.interval: return self.interval.schedule if self.crontab: return self.crontab.schedule if self.solar: return self.solar.schedule
def test_worker_schedule(): # timedelta conf = WorkerConfiguration({ 'worker': { 'broker_url': 'redis://', 'celery_result_backend': 'redis://', 'celerybeat_schedule': { 'add-every-30-seconds': { 'task': 'tasks.add', 'schedule': 'timedelta(seconds=30)', }, }, } }) assert conf.worker_schedule == { 'add-every-30-seconds': { 'task': 'tasks.add', 'schedule': datetime.timedelta(seconds=30), 'args': (), }, } assert conf.worker_config['CELERYBEAT_SCHEDULE'] == conf.worker_schedule # crontab conf2 = WorkerConfiguration({ 'worker': { 'broker_url': 'redis://', 'celery_result_backend': 'redis://', 'celerybeat_schedule': { 'add-every-minute': { 'task': 'tasks.add', 'schedule': "crontab(minute='*')", 'args': [16, 16], }, }, } }) assert conf2.worker_schedule == { 'add-every-minute': { 'task': 'tasks.add', 'schedule': crontab(minute='*'), 'args': (16, 16), }, } assert conf2.worker_config['CELERYBEAT_SCHEDULE'] == conf2.worker_schedule # import path conf3 = WorkerConfiguration({ 'worker': { 'broker_url': 'redis://', 'celery_result_backend': 'redis://', 'celerybeat_schedule': { 'add-every-minute': { 'task': 'tasks.add', 'schedule': "celery.schedules:crontab(minute='*')", 'args': [16, 16], }, }, } }) assert conf3.worker_schedule == conf2.worker_schedule assert conf3.worker_config['CELERYBEAT_SCHEDULE'] == conf3.worker_schedule
def encode_schedule(value): if value is None: return None elif isinstance(value, datetime): return { '__type__': 'datetime', '__value__': encode_datetime(value) } elif isinstance(value, crontab): return { '__type__': 'crontab', '__value__': '%(minute)s\t%(hour)s\t%(day_of_week)s\t' '%(day_of_month)s\t%(month_of_year)s' % { 'minute': value._orig_minute, 'hour': value._orig_hour, 'day_of_week': value._orig_day_of_week, 'day_of_month': value._orig_day_of_month, 'month_of_year': value._orig_month_of_year, } } elif isinstance(value, solar): return { '__type__': 'solar', '__value__': { 'event': value.event, 'lat': value.lat, 'lon': value.lon } } elif isinstance(value, schedule): return { '__type__': 'schedule', '__value__': { 'run_every': value.run_every.total_seconds(), 'relative': bool(value.relative), } } else: raise NotImplementedError( 'Cannot serialize schedule %(type)s type' % { 'type': type(value).__name__ } )