我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用celery.schedules.schedule()。
def __init__(self, model, app=None): super().__init__( name=model.name, task=model.task, last_run_at=model.meta.last_run_at, total_run_count=model.meta.total_run_count, schedule=model.schedule, args=model.args, kwargs=model.kwargs, options={ 'queue': model.queue, 'exchange': model.exchange, 'routing_key': model.routing_key, 'expires': model.expires_at }, app=app or celery_app._get_current_object() ) self.model = model
def _unpack_entry_fields(cls, schedule, args=None, kwargs=None, relative=None, options=None, **entry): def _unpack_entry_options(queue=None, exchange=None, routing_key=None, **kwargs): return { 'queue': queue, 'exchange': exchange, 'routing_key': routing_key } model_schedule, model_field = cls.to_model_schedule(schedule) entry.update( { model_field: model_schedule }, args=args or [], kwargs=kwargs or {}, **_unpack_entry_options(**options or {}) ) return entry
def delete(self): """ If found, delete the entry from the scheduler """ entry = self.get() if entry is None: return False else: logger.info( 'Deleting running %s from schedule. ' 'Interval: %s. ' 'Starts at: %s.' % (self.name, self.run_every, self.run_at) ) entry.delete() return True
def decode_schedule(obj): if obj is None: return None _type = obj['__type__'] value = obj['__value__'] if _type == 'datetime': return decode_datetime(value) elif _type == 'crontab': return crontab(*value.split('\t')) elif _type == 'solar': return solar(**value) elif _type == 'schedule': return schedule(**value) else: raise NotImplementedError( 'Cannot deserialize schedule %(type)s type' % { 'type': _type } )
def from_entry(cls, name, session, skip_fields=('relative', 'options'), **entry): """ ??????PeriodicTask :param session: :param name: :param skip_fields: :param entry: :return: """ fields = dict(entry) for skip_field in skip_fields: fields.pop(skip_field, None) schedule = fields.pop('schedule') model_schedule, model_field = cls.to_model_schedule(schedule, session) fields[model_field] = model_schedule fields['args'] = json.dumps(fields.get('args') or []) fields['kwargs'] = json.dumps(fields.get('kwargs') or {}) model, _ = PeriodicTask.update_or_create(session, name=name, defaults=fields) cls.save_model(session, model) return cls(model)
def test_schedule_changed(self): self.m2.args = '[16, 16]' self.m2.save() e2 = self.s.schedule[self.m2.name] assert e2.args == [16, 16] self.m1.args = '[32, 32]' self.m1.save() e1 = self.s.schedule[self.m1.name] assert e1.args == [32, 32] e1 = self.s.schedule[self.m1.name] assert e1.args == [32, 32] self.m3.delete() with pytest.raises(KeyError): self.s.schedule.__getitem__(self.m3.name)
def test_sync_syncs_before_save(self): # Get the entry for m2 e1 = self.s.schedule[self.m2.name] # Increment the entry (but make sure it doesn't sync) self.s._last_sync = monotonic() e2 = self.s.schedule[e1.name] = self.s.reserve(e1) assert self.s.flushed == 1 # Fetch the raw object from db, change the args # and save the changes. m2 = PeriodicTask.objects.get(pk=self.m2.pk) m2.args = '[16, 16]' m2.save() # get_schedule should now see the schedule has changed. # and also sync the dirty objects. e3 = self.s.schedule[self.m2.name] assert self.s.flushed == 2 assert e3.last_run_at == e2.last_run_at assert e3.args == [16, 16]
def test_periodic_task_disabled_while_reserved(self): # Get the entry for m2 e1 = self.s.schedule[self.m2.name] # Increment the entry (but make sure it doesn't sync) self.s._last_sync = monotonic() e2 = self.s.schedule[e1.name] = self.s.reserve(e1) assert self.s.flushed == 1 # Fetch the raw object from db, change the args # and save the changes. m2 = PeriodicTask.objects.get(pk=self.m2.pk) m2.enabled = False m2.save() # reserve is called because the task gets called from # tick after the database change is made self.s.reserve(e2) # get_schedule should now see the schedule has changed. # and remove entry for m2 assert self.m2.name not in self.s.schedule assert self.s.flushed == 2
def test_SolarSchedule_schedule(self): s = SolarSchedule(event='solar_noon', latitude=48.06, longitude=12.86) dt = datetime(day=26, month=7, year=2050, hour=1, minute=0) dt_lastrun = make_aware(dt) assert s.schedule is not None isdue, nextcheck = s.schedule.is_due(dt_lastrun) assert isdue is False # False means task isn't due, but keep checking. assert (nextcheck > 0) and (isdue is False) or \ (nextcheck == s.max_interval) and (isdue is True) s2 = SolarSchedule(event='solar_noon', latitude=48.06, longitude=12.86) dt2 = datetime(day=26, month=7, year=2000, hour=1, minute=0) dt2_lastrun = make_aware(dt2) assert s2.schedule is not None isdue2, nextcheck2 = s2.schedule.is_due(dt2_lastrun) assert isdue2 is True # True means task is due and should run. assert (nextcheck2 > 0) and (isdue2 is True) or \ (nextcheck2 == s2.max_interval) and (isdue2 is False)
def schedule(self): update = False if not self._initial_read: debug('DatabaseScheduler: initial read') update = True self._initial_read = True elif self.schedule_changed(): info('DatabaseScheduler: Schedule changed.') update = True if update: self.sync() self._schedule = self.all_as_schedule() # the schedule changed, invalidate the heap in Scheduler.tick self._heap = None if logger.isEnabledFor(logging.DEBUG): debug('Current schedule:\n%s', '\n'.join( repr(entry) for entry in values(self._schedule)), ) return self._schedule
def to_model_schedule(cls, schedule): for schedule_type, model_type, model_field in cls.model_schedules: schedule = schedules.maybe_schedule(schedule) if isinstance(schedule, schedule_type): model_schedule = model_type.from_schedule(schedule) return model_schedule, model_field raise ValueError( 'Cannot convert schedule type {0!r} to model'.format(schedule) )
def setup_schedule(self): self.install_default_entries(self.schedule) self.update_from_dict(self.app.conf.CELERYBEAT_SCHEDULE)
def install_default_entries(self, data): entries = {} if self.app.conf.CELERY_TASK_RESULT_EXPIRES: # Add backend clean up entries.setdefault( 'celery.backend_cleanup', { 'task': 'celery.backend_cleanup', 'schedule': schedules.crontab('0', '4', '*') } ) self.update_from_dict(entries)
def schedule(self): update = False if not self._initial_read: logger.info('DatabaseScheduler: initial read') update = True self._initial_read = True elif self.is_schedule_changed: logger.info('DatabaseScheduler: schedule changed') update = True if update: self._schedule = self.all_as_schedule() return self._schedule
def schedule(self): return schedules.crontab( minute=self.minute, hour=self.hour, day_of_week=self.day_of_week, day_of_month=self.day_of_month, month_of_year=self.month_of_year )
def from_schedule(cls, schedule): data = dict([ (x, getattr(schedule, '_orig_{0}'.format(x))) for x in ( 'minute', 'hour', 'day_of_week', 'day_of_month', 'month_of_year' ) ]) instance = cls.query.filter(*[ getattr(cls, k) == v for k, v in data.items() ]).first() if not instance: instance = cls(**data) db.session.add(instance) db.session.commit() return instance
def schedule(self): return schedules.schedule(timedelta(**{self.period: self.every}))
def from_schedule(cls, schedule): every = max(schedule.run_every.total_seconds(), 0) instance = cls.query.filter_by(every=every, period='seconds') if not instance: instance = cls(every=every, period='seconds') db.session.add(instance) db.session.commit() return instance
def create(self): entry = RedBeatSchedulerEntry( name=self.name, task=self.task, schedule=schedule( run_every=self.run_every, # setting "now" to the job start datetime nowfun=lambda: self.run_at, app=celery, ), args=(self.spark_job.pk,), kwargs={}, app=celery, ) return entry
def add(self): """ Create and save an entry to the scheduler """ logger.info( 'Adding running %s to schedule. ' 'Interval: %s. ' 'Starts at: %s.' % (self.name, self.run_every, self.run_at) ) entry = self.create() entry.save() return entry
def serialize_entry(entry, schedule_encoder=encode_schedule): return { 'name': entry.name, 'task': entry.task, 'schedule': schedule_encoder(entry.schedule), 'args': entry.args, 'kwargs': entry.kwargs, 'last_run_at': encode_datetime(entry.last_run_at, allow_null=True), 'total_run_count': entry.total_run_count, 'options': entry.options }
def deserialize_entry(entry, schedule_decoder=decode_schedule): return ScheduleEntry( name=entry['name'], task=entry['task'], schedule=schedule_decoder(entry['schedule']), args=entry['args'], kwargs=entry['kwargs'], last_run_at=decode_datetime(entry['last_run_at'], allow_null=True), total_run_count=entry['total_run_count'], options=entry['options'], )
def schedule(self): return schedules.schedule(datetime.timedelta(**{self.period.code: self.every}))
def from_schedule(cls, session, schedule, period='seconds'): every = max(schedule.run_every.total_seconds(), 0) obj = cls.filter_by(session, every=every, period=period).first() if obj is None: return cls(every=every, period=period) else: return obj
def schedule(self): return schedules.crontab(minute=self.minute, hour=self.hour, day_of_week=self.day_of_week, day_of_month=self.day_of_month, month_of_year=self.month_of_year)
def from_schedule(cls, session, schedule): spec = {'minute': schedule._orig_minute, 'hour': schedule._orig_hour, 'day_of_week': schedule._orig_day_of_week, 'day_of_month': schedule._orig_day_of_month, 'month_of_year': schedule._orig_month_of_year} obj = cls.filter_by(session, **spec).first() if obj is None: return cls(**spec) else: return obj
def schedule(self): if self.crontab: return self.crontab.schedule if self.interval: return self.interval.schedule
def is_due(self): if not self.model.enabled: return False, 5.0 # 5 second delay for re-enable. return self.schedule.is_due(self.last_run_at)
def to_model_schedule(cls, schedule, session): """ :param session: :param schedule: :return: """ for schedule_type, model_type, model_field in cls.model_schedules: debug(cls.model_schedules) schedule = schedules.maybe_schedule(schedule) if isinstance(schedule, schedule_type): model_schedule = model_type.from_schedule(session, schedule) cls.save_model(session, model_schedule) return model_schedule, model_field raise ValueError('Cannot convert schedule type {0!r} to model'.format(schedule))
def __repr__(self): return '<ModelEntry: {0} {1}(*{2}, **{3}) {{4}}>'.format( safe_str(self.name), self.task, self.args, self.kwargs, self.schedule, )
def sync(self): info('Writing entries...') _tried = set() while self._dirty: try: name = self._dirty.pop() _tried.add(name) self.schedule[name].save() except KeyError: pass
def update_from_dict(self, dict_): s = {} for name, entry in dict_.items(): try: s[name] = self.Entry.from_entry(name, self.session, **entry) except Exception as exc: error(ADD_ENTRY_ERROR, name, exc, entry) self.schedule.update(s)
def install_default_entries(self, data): entries = {} if self.app.conf.CELERY_TASK_RESULT_EXPIRES: entries.setdefault( 'celery.backend_cleanup', { 'task': 'celery.backend_cleanup', 'schedule': schedules.crontab('*/5', '*', '*'), 'options': {'expires': 12 * 3600}, }, ) self.update_from_dict(entries)
def schedule(self): update = False if not self._initial_read: debug('DatabaseScheduler: intial read') update = True self._initial_read = True elif self.schedule_changed(): info('DatabaseScheduler: Schedule changed.') update = True if update: self.sync() self._schedule = self.all_as_schedule() debug('Current schedule:\n%s', '\n'.join(repr(entry) for entry in self._schedule.itervalues())) return self._schedule
def create_model_interval(self, schedule, **kwargs): interval = IntervalSchedule.from_schedule(schedule) interval.save() return self.create_model(interval=interval, **kwargs)
def create_model_crontab(self, schedule, **kwargs): crontab = CrontabSchedule.from_schedule(schedule) crontab.save() return self.create_model(crontab=crontab, **kwargs)
def create_model_solar(self, schedule, **kwargs): solar = SolarSchedule.from_schedule(schedule) solar.save() return self.create_model(solar=solar, **kwargs)
def create_conf_entry(self): name = 'thefoo{0}'.format(next(_ids)) return name, dict( task='djcelery.unittest.add{0}'.format(next(_ids)), schedule=timedelta(0, 600), args=(), relative=False, kwargs={}, options={'queue': 'extra_queue'} )
def test_entry(self): m = self.create_model_interval(schedule(timedelta(seconds=10))) e = self.Entry(m, app=self.app) assert e.args == [2, 2] assert e.kwargs == {'callback': 'foo'} assert e.schedule assert e.total_run_count == 0 assert isinstance(e.last_run_at, datetime) assert e.options['queue'] == 'xaz' assert e.options['exchange'] == 'foo' assert e.options['routing_key'] == 'cpu' right_now = self.app.now() m2 = self.create_model_interval( schedule(timedelta(seconds=10)), last_run_at=right_now, ) assert m2.last_run_at e2 = self.Entry(m2, app=self.app) assert e2.last_run_at is right_now e3 = e2.next() assert e3.last_run_at > e2.last_run_at assert e3.total_run_count == 1
def test_periodic_task_model_disabled_schedule(self): self.m1.enabled = False self.m1.save() s = self.Scheduler(app=self.app) sched = s.schedule assert sched assert len(sched) == 1 assert 'celery.backend_cleanup' in sched assert self.entry_name not in sched
def setup_scheduler(self, app): self.app = app self.app.conf.beat_schedule = {} self.m1 = self.create_model_interval( schedule(timedelta(seconds=10))) self.m1.save() self.m1.refresh_from_db() self.m2 = self.create_model_interval( schedule(timedelta(minutes=20))) self.m2.save() self.m2.refresh_from_db() self.m3 = self.create_model_crontab( crontab(minute='2,4,5')) self.m3.save() self.m3.refresh_from_db() self.m4 = self.create_model_solar( solar('solar_noon', 48.06, 12.86)) self.m4.save() self.m4.refresh_from_db() # disabled, should not be in schedule m5 = self.create_model_interval( schedule(timedelta(seconds=1))) m5.enabled = False m5.save() self.s = self.Scheduler(app=self.app)
def test_all_as_schedule(self): sched = self.s.schedule assert sched assert len(sched) == 5 assert 'celery.backend_cleanup' in sched for n, e in sched.items(): assert isinstance(e, self.s.Entry)
def test_reserve(self): e1 = self.s.schedule[self.m1.name] self.s.schedule[self.m1.name] = self.s.reserve(e1) assert self.s.flushed == 1 e2 = self.s.schedule[self.m2.name] self.s.schedule[self.m2.name] = self.s.reserve(e2) assert self.s.flushed == 1 assert self.m2.name in self.s._dirty
def test_periodic_task_disabled_and_enabled(self): # Get the entry for m2 e1 = self.s.schedule[self.m2.name] # Increment the entry (but make sure it doesn't sync) self.s._last_sync = monotonic() self.s.schedule[e1.name] = self.s.reserve(e1) assert self.s.flushed == 1 # Fetch the raw object from db, change the args # and save the changes. m2 = PeriodicTask.objects.get(pk=self.m2.pk) m2.enabled = False m2.save() # get_schedule should now see the schedule has changed. # and remove entry for m2 assert self.m2.name not in self.s.schedule assert self.s.flushed == 2 m2.enabled = True m2.save() # get_schedule should now see the schedule has changed. # and add entry for m2 assert self.m2.name in self.s.schedule assert self.s.flushed == 3
def test_sync_rollback_on_save_error(self): self.s.schedule[self.m1.name] = EntrySaveRaises(self.m1, app=self.app) self.s._dirty.add(self.m1.name) with pytest.raises(RuntimeError): self.s.sync()
def test_PeriodicTask_unicode_interval(self): p = self.create_model_interval(schedule(timedelta(seconds=10))) assert text_t(p) == '{0}: every 10.0 seconds'.format(p.name)
def test_PeriodicTask_unicode_no_schedule(self): p = self.create_model() assert text_t(p) == '{0}: {{no schedule}}'.format(p.name)
def test_CrontabSchedule_schedule(self): s = CrontabSchedule( minute='3, 7', hour='3, 4', day_of_week='*', day_of_month='1, 16', month_of_year='1, 7', ) assert s.schedule.minute == {3, 7} assert s.schedule.hour == {3, 4} assert s.schedule.day_of_week == {0, 1, 2, 3, 4, 5, 6} assert s.schedule.day_of_month == {1, 16} assert s.schedule.month_of_year == {1, 7}
def test_track_changes(self): assert PeriodicTasks.last_change() is None m1 = self.create_model_interval(schedule(timedelta(seconds=10))) m1.save() x = PeriodicTasks.last_change() assert x m1.args = '(23, 24)' m1.save() y = PeriodicTasks.last_change() assert y assert y > x