Python celery.schedules 模块,schedule() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用celery.schedules.schedule()

项目:flask-celery3-boilerplate    作者:sdg32    | 项目源码 | 文件源码
def __init__(self, model, app=None):
        super().__init__(
            name=model.name,
            task=model.task,
            last_run_at=model.meta.last_run_at,
            total_run_count=model.meta.total_run_count,
            schedule=model.schedule,
            args=model.args,
            kwargs=model.kwargs,
            options={
                'queue': model.queue,
                'exchange': model.exchange,
                'routing_key': model.routing_key,
                'expires': model.expires_at
            },

            app=app or celery_app._get_current_object()
        )

        self.model = model
项目:flask-celery3-boilerplate    作者:sdg32    | 项目源码 | 文件源码
def _unpack_entry_fields(cls, schedule, args=None, kwargs=None,
                             relative=None, options=None, **entry):
        def _unpack_entry_options(queue=None, exchange=None, routing_key=None,
                                  **kwargs):
            return {
                'queue': queue,
                'exchange': exchange,
                'routing_key': routing_key
            }

        model_schedule, model_field = cls.to_model_schedule(schedule)
        entry.update(
            {
                model_field: model_schedule
            },
            args=args or [],
            kwargs=kwargs or {},
            **_unpack_entry_options(**options or {})
        )
        return entry
项目:telemetry-analysis-service    作者:mozilla    | 项目源码 | 文件源码
def delete(self):
        """
        If found, delete the entry from the scheduler
        """
        entry = self.get()
        if entry is None:
            return False
        else:
            logger.info(
                'Deleting running %s from schedule. '
                'Interval: %s. '
                'Starts at: %s.' %
                (self.name, self.run_every, self.run_at)
            )
            entry.delete()
            return True
项目:celery-beatx    作者:mixkorshun    | 项目源码 | 文件源码
def decode_schedule(obj):
    if obj is None:
        return None

    _type = obj['__type__']
    value = obj['__value__']

    if _type == 'datetime':
        return decode_datetime(value)
    elif _type == 'crontab':
        return crontab(*value.split('\t'))
    elif _type == 'solar':
        return solar(**value)
    elif _type == 'schedule':
        return schedule(**value)
    else:
        raise NotImplementedError(
            'Cannot deserialize schedule %(type)s type' % {
                'type': _type
            }
        )
项目:celerybeat-sqlalchemy    作者:kindule    | 项目源码 | 文件源码
def from_entry(cls, name, session, skip_fields=('relative', 'options'), **entry):
        """
        ??????PeriodicTask
        :param session:
        :param name:
        :param skip_fields:
        :param entry:
        :return:
        """
        fields = dict(entry)
        for skip_field in skip_fields:
            fields.pop(skip_field, None)
        schedule = fields.pop('schedule')
        model_schedule, model_field = cls.to_model_schedule(schedule, session)
        fields[model_field] = model_schedule
        fields['args'] = json.dumps(fields.get('args') or [])
        fields['kwargs'] = json.dumps(fields.get('kwargs') or {})
        model, _ = PeriodicTask.update_or_create(session, name=name, defaults=fields)
        cls.save_model(session, model)
        return cls(model)
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def test_schedule_changed(self):
        self.m2.args = '[16, 16]'
        self.m2.save()
        e2 = self.s.schedule[self.m2.name]
        assert e2.args == [16, 16]

        self.m1.args = '[32, 32]'
        self.m1.save()
        e1 = self.s.schedule[self.m1.name]
        assert e1.args == [32, 32]
        e1 = self.s.schedule[self.m1.name]
        assert e1.args == [32, 32]

        self.m3.delete()
        with pytest.raises(KeyError):
            self.s.schedule.__getitem__(self.m3.name)
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def test_sync_syncs_before_save(self):
        # Get the entry for m2
        e1 = self.s.schedule[self.m2.name]

        # Increment the entry (but make sure it doesn't sync)
        self.s._last_sync = monotonic()
        e2 = self.s.schedule[e1.name] = self.s.reserve(e1)
        assert self.s.flushed == 1

        # Fetch the raw object from db, change the args
        # and save the changes.
        m2 = PeriodicTask.objects.get(pk=self.m2.pk)
        m2.args = '[16, 16]'
        m2.save()

        # get_schedule should now see the schedule has changed.
        # and also sync the dirty objects.
        e3 = self.s.schedule[self.m2.name]
        assert self.s.flushed == 2
        assert e3.last_run_at == e2.last_run_at
        assert e3.args == [16, 16]
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def test_periodic_task_disabled_while_reserved(self):
        # Get the entry for m2
        e1 = self.s.schedule[self.m2.name]

        # Increment the entry (but make sure it doesn't sync)
        self.s._last_sync = monotonic()
        e2 = self.s.schedule[e1.name] = self.s.reserve(e1)
        assert self.s.flushed == 1

        # Fetch the raw object from db, change the args
        # and save the changes.
        m2 = PeriodicTask.objects.get(pk=self.m2.pk)
        m2.enabled = False
        m2.save()

        # reserve is called because the task gets called from
        # tick after the database change is made
        self.s.reserve(e2)

        # get_schedule should now see the schedule has changed.
        # and remove entry for m2
        assert self.m2.name not in self.s.schedule
        assert self.s.flushed == 2
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def test_SolarSchedule_schedule(self):
        s = SolarSchedule(event='solar_noon', latitude=48.06, longitude=12.86)
        dt = datetime(day=26, month=7, year=2050, hour=1, minute=0)
        dt_lastrun = make_aware(dt)

        assert s.schedule is not None
        isdue, nextcheck = s.schedule.is_due(dt_lastrun)
        assert isdue is False  # False means task isn't due, but keep checking.
        assert (nextcheck > 0) and (isdue is False) or \
            (nextcheck == s.max_interval) and (isdue is True)

        s2 = SolarSchedule(event='solar_noon', latitude=48.06, longitude=12.86)
        dt2 = datetime(day=26, month=7, year=2000, hour=1, minute=0)
        dt2_lastrun = make_aware(dt2)

        assert s2.schedule is not None
        isdue2, nextcheck2 = s2.schedule.is_due(dt2_lastrun)
        assert isdue2 is True  # True means task is due and should run.
        assert (nextcheck2 > 0) and (isdue2 is True) or \
            (nextcheck2 == s2.max_interval) and (isdue2 is False)
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def schedule(self):
        update = False
        if not self._initial_read:
            debug('DatabaseScheduler: initial read')
            update = True
            self._initial_read = True
        elif self.schedule_changed():
            info('DatabaseScheduler: Schedule changed.')
            update = True

        if update:
            self.sync()
            self._schedule = self.all_as_schedule()
            # the schedule changed, invalidate the heap in Scheduler.tick
            self._heap = None
            if logger.isEnabledFor(logging.DEBUG):
                debug('Current schedule:\n%s', '\n'.join(
                    repr(entry) for entry in values(self._schedule)),
                )
        return self._schedule
项目:flask-celery3-boilerplate    作者:sdg32    | 项目源码 | 文件源码
def to_model_schedule(cls, schedule):
        for schedule_type, model_type, model_field in cls.model_schedules:
            schedule = schedules.maybe_schedule(schedule)
            if isinstance(schedule, schedule_type):
                model_schedule = model_type.from_schedule(schedule)
                return model_schedule, model_field

        raise ValueError(
            'Cannot convert schedule type {0!r} to model'.format(schedule)
        )
项目:flask-celery3-boilerplate    作者:sdg32    | 项目源码 | 文件源码
def setup_schedule(self):
        self.install_default_entries(self.schedule)
        self.update_from_dict(self.app.conf.CELERYBEAT_SCHEDULE)
项目:flask-celery3-boilerplate    作者:sdg32    | 项目源码 | 文件源码
def install_default_entries(self, data):
        entries = {}
        if self.app.conf.CELERY_TASK_RESULT_EXPIRES:
            # Add backend clean up
            entries.setdefault(
                'celery.backend_cleanup', {
                    'task': 'celery.backend_cleanup',
                    'schedule': schedules.crontab('0', '4', '*')
                }
            )

        self.update_from_dict(entries)
项目:flask-celery3-boilerplate    作者:sdg32    | 项目源码 | 文件源码
def schedule(self):
        update = False
        if not self._initial_read:
            logger.info('DatabaseScheduler: initial read')
            update = True
            self._initial_read = True
        elif self.is_schedule_changed:
            logger.info('DatabaseScheduler: schedule changed')
            update = True

        if update:
            self._schedule = self.all_as_schedule()

        return self._schedule
项目:flask-celery3-boilerplate    作者:sdg32    | 项目源码 | 文件源码
def schedule(self):
        return schedules.crontab(
            minute=self.minute,
            hour=self.hour,
            day_of_week=self.day_of_week,
            day_of_month=self.day_of_month,
            month_of_year=self.month_of_year
        )
项目:flask-celery3-boilerplate    作者:sdg32    | 项目源码 | 文件源码
def from_schedule(cls, schedule):
        data = dict([
            (x, getattr(schedule, '_orig_{0}'.format(x)))
            for x in (
                'minute', 'hour', 'day_of_week', 'day_of_month', 'month_of_year'
            )
        ])
        instance = cls.query.filter(*[
            getattr(cls, k) == v for k, v in data.items()
        ]).first()
        if not instance:
            instance = cls(**data)
            db.session.add(instance)
            db.session.commit()
        return instance
项目:flask-celery3-boilerplate    作者:sdg32    | 项目源码 | 文件源码
def schedule(self):
        return schedules.schedule(timedelta(**{self.period: self.every}))
项目:flask-celery3-boilerplate    作者:sdg32    | 项目源码 | 文件源码
def from_schedule(cls, schedule):
        every = max(schedule.run_every.total_seconds(), 0)
        instance = cls.query.filter_by(every=every, period='seconds')
        if not instance:
            instance = cls(every=every, period='seconds')
            db.session.add(instance)
            db.session.commit()
        return instance
项目:telemetry-analysis-service    作者:mozilla    | 项目源码 | 文件源码
def create(self):
        entry = RedBeatSchedulerEntry(
            name=self.name,
            task=self.task,
            schedule=schedule(
                run_every=self.run_every,
                # setting "now" to the job start datetime
                nowfun=lambda: self.run_at,
                app=celery,
            ),
            args=(self.spark_job.pk,),
            kwargs={},
            app=celery,
        )
        return entry
项目:telemetry-analysis-service    作者:mozilla    | 项目源码 | 文件源码
def add(self):
        """
        Create and save an entry to the scheduler
        """
        logger.info(
            'Adding running %s to schedule. '
            'Interval: %s. '
            'Starts at: %s.' %
            (self.name, self.run_every, self.run_at)
        )
        entry = self.create()
        entry.save()
        return entry
项目:celery-beatx    作者:mixkorshun    | 项目源码 | 文件源码
def serialize_entry(entry, schedule_encoder=encode_schedule):
    return {
        'name': entry.name,
        'task': entry.task,
        'schedule': schedule_encoder(entry.schedule),
        'args': entry.args,
        'kwargs': entry.kwargs,
        'last_run_at': encode_datetime(entry.last_run_at, allow_null=True),
        'total_run_count': entry.total_run_count,
        'options': entry.options
    }
项目:celery-beatx    作者:mixkorshun    | 项目源码 | 文件源码
def deserialize_entry(entry, schedule_decoder=decode_schedule):
    return ScheduleEntry(
        name=entry['name'],
        task=entry['task'],
        schedule=schedule_decoder(entry['schedule']),
        args=entry['args'],
        kwargs=entry['kwargs'],
        last_run_at=decode_datetime(entry['last_run_at'], allow_null=True),
        total_run_count=entry['total_run_count'],
        options=entry['options'],
    )
项目:celerybeat-sqlalchemy    作者:kindule    | 项目源码 | 文件源码
def schedule(self):
        return schedules.schedule(datetime.timedelta(**{self.period.code: self.every}))
项目:celerybeat-sqlalchemy    作者:kindule    | 项目源码 | 文件源码
def from_schedule(cls, session, schedule, period='seconds'):
        every = max(schedule.run_every.total_seconds(), 0)
        obj = cls.filter_by(session, every=every, period=period).first()
        if obj is None:
            return cls(every=every, period=period)
        else:
            return obj
项目:celerybeat-sqlalchemy    作者:kindule    | 项目源码 | 文件源码
def schedule(self):
        return schedules.crontab(minute=self.minute,
                                 hour=self.hour,
                                 day_of_week=self.day_of_week,
                                 day_of_month=self.day_of_month,
                                 month_of_year=self.month_of_year)
项目:celerybeat-sqlalchemy    作者:kindule    | 项目源码 | 文件源码
def from_schedule(cls, session, schedule):
        spec = {'minute': schedule._orig_minute,
                'hour': schedule._orig_hour,
                'day_of_week': schedule._orig_day_of_week,
                'day_of_month': schedule._orig_day_of_month,
                'month_of_year': schedule._orig_month_of_year}
        obj = cls.filter_by(session, **spec).first()
        if obj is None:
            return cls(**spec)
        else:
            return obj
项目:celerybeat-sqlalchemy    作者:kindule    | 项目源码 | 文件源码
def schedule(self):
        if self.crontab:
            return self.crontab.schedule
        if self.interval:
            return self.interval.schedule
项目:celerybeat-sqlalchemy    作者:kindule    | 项目源码 | 文件源码
def is_due(self):
        if not self.model.enabled:
            return False, 5.0   # 5 second delay for re-enable.
        return self.schedule.is_due(self.last_run_at)
项目:celerybeat-sqlalchemy    作者:kindule    | 项目源码 | 文件源码
def to_model_schedule(cls, schedule, session):
        """
        :param session:
        :param schedule:
        :return:
        """
        for schedule_type, model_type, model_field in cls.model_schedules:
            debug(cls.model_schedules)
            schedule = schedules.maybe_schedule(schedule)
            if isinstance(schedule, schedule_type):
                model_schedule = model_type.from_schedule(session, schedule)
                cls.save_model(session, model_schedule)
                return model_schedule, model_field
        raise ValueError('Cannot convert schedule type {0!r} to model'.format(schedule))
项目:celerybeat-sqlalchemy    作者:kindule    | 项目源码 | 文件源码
def __repr__(self):
        return '<ModelEntry: {0} {1}(*{2}, **{3}) {{4}}>'.format(
            safe_str(self.name), self.task, self.args, self.kwargs, self.schedule,
        )
项目:celerybeat-sqlalchemy    作者:kindule    | 项目源码 | 文件源码
def setup_schedule(self):
        self.install_default_entries(self.schedule)
        self.update_from_dict(self.app.conf.CELERYBEAT_SCHEDULE)
项目:celerybeat-sqlalchemy    作者:kindule    | 项目源码 | 文件源码
def sync(self):
        info('Writing entries...')
        _tried = set()
        while self._dirty:
            try:
                name = self._dirty.pop()
                _tried.add(name)
                self.schedule[name].save()
            except KeyError:
                pass
项目:celerybeat-sqlalchemy    作者:kindule    | 项目源码 | 文件源码
def update_from_dict(self, dict_):
        s = {}
        for name, entry in dict_.items():
            try:

                s[name] = self.Entry.from_entry(name, self.session, **entry)
            except Exception as exc:
                error(ADD_ENTRY_ERROR, name, exc, entry)
        self.schedule.update(s)
项目:celerybeat-sqlalchemy    作者:kindule    | 项目源码 | 文件源码
def install_default_entries(self, data):
        entries = {}
        if self.app.conf.CELERY_TASK_RESULT_EXPIRES:
            entries.setdefault(
                'celery.backend_cleanup', {
                    'task': 'celery.backend_cleanup',
                    'schedule': schedules.crontab('*/5', '*', '*'),
                    'options': {'expires': 12 * 3600},
                },
            )
        self.update_from_dict(entries)
项目:celerybeat-sqlalchemy    作者:kindule    | 项目源码 | 文件源码
def schedule(self):
        update = False
        if not self._initial_read:
            debug('DatabaseScheduler: intial read')
            update = True
            self._initial_read = True
        elif self.schedule_changed():
            info('DatabaseScheduler: Schedule changed.')
            update = True

        if update:
            self.sync()
            self._schedule = self.all_as_schedule()
            debug('Current schedule:\n%s', '\n'.join(repr(entry) for entry in self._schedule.itervalues()))
        return self._schedule
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def create_model_interval(self, schedule, **kwargs):
        interval = IntervalSchedule.from_schedule(schedule)
        interval.save()
        return self.create_model(interval=interval, **kwargs)
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def create_model_crontab(self, schedule, **kwargs):
        crontab = CrontabSchedule.from_schedule(schedule)
        crontab.save()
        return self.create_model(crontab=crontab, **kwargs)
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def create_model_solar(self, schedule, **kwargs):
        solar = SolarSchedule.from_schedule(schedule)
        solar.save()
        return self.create_model(solar=solar, **kwargs)
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def create_conf_entry(self):
        name = 'thefoo{0}'.format(next(_ids))
        return name, dict(
            task='djcelery.unittest.add{0}'.format(next(_ids)),
            schedule=timedelta(0, 600),
            args=(),
            relative=False,
            kwargs={},
            options={'queue': 'extra_queue'}
        )
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def test_entry(self):
        m = self.create_model_interval(schedule(timedelta(seconds=10)))
        e = self.Entry(m, app=self.app)

        assert e.args == [2, 2]
        assert e.kwargs == {'callback': 'foo'}
        assert e.schedule
        assert e.total_run_count == 0
        assert isinstance(e.last_run_at, datetime)
        assert e.options['queue'] == 'xaz'
        assert e.options['exchange'] == 'foo'
        assert e.options['routing_key'] == 'cpu'

        right_now = self.app.now()
        m2 = self.create_model_interval(
            schedule(timedelta(seconds=10)),
            last_run_at=right_now,
        )

        assert m2.last_run_at
        e2 = self.Entry(m2, app=self.app)
        assert e2.last_run_at is right_now

        e3 = e2.next()
        assert e3.last_run_at > e2.last_run_at
        assert e3.total_run_count == 1
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def test_periodic_task_model_disabled_schedule(self):
        self.m1.enabled = False
        self.m1.save()

        s = self.Scheduler(app=self.app)
        sched = s.schedule
        assert sched
        assert len(sched) == 1
        assert 'celery.backend_cleanup' in sched
        assert self.entry_name not in sched
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def setup_scheduler(self, app):
        self.app = app
        self.app.conf.beat_schedule = {}

        self.m1 = self.create_model_interval(
            schedule(timedelta(seconds=10)))
        self.m1.save()
        self.m1.refresh_from_db()

        self.m2 = self.create_model_interval(
            schedule(timedelta(minutes=20)))
        self.m2.save()
        self.m2.refresh_from_db()

        self.m3 = self.create_model_crontab(
            crontab(minute='2,4,5'))
        self.m3.save()
        self.m3.refresh_from_db()

        self.m4 = self.create_model_solar(
            solar('solar_noon', 48.06, 12.86))
        self.m4.save()
        self.m4.refresh_from_db()

        # disabled, should not be in schedule
        m5 = self.create_model_interval(
            schedule(timedelta(seconds=1)))
        m5.enabled = False
        m5.save()

        self.s = self.Scheduler(app=self.app)
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def test_all_as_schedule(self):
        sched = self.s.schedule
        assert sched
        assert len(sched) == 5
        assert 'celery.backend_cleanup' in sched
        for n, e in sched.items():
            assert isinstance(e, self.s.Entry)
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def test_reserve(self):
        e1 = self.s.schedule[self.m1.name]
        self.s.schedule[self.m1.name] = self.s.reserve(e1)
        assert self.s.flushed == 1

        e2 = self.s.schedule[self.m2.name]
        self.s.schedule[self.m2.name] = self.s.reserve(e2)
        assert self.s.flushed == 1
        assert self.m2.name in self.s._dirty
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def test_periodic_task_disabled_and_enabled(self):
        # Get the entry for m2
        e1 = self.s.schedule[self.m2.name]

        # Increment the entry (but make sure it doesn't sync)
        self.s._last_sync = monotonic()
        self.s.schedule[e1.name] = self.s.reserve(e1)
        assert self.s.flushed == 1

        # Fetch the raw object from db, change the args
        # and save the changes.
        m2 = PeriodicTask.objects.get(pk=self.m2.pk)
        m2.enabled = False
        m2.save()

        # get_schedule should now see the schedule has changed.
        # and remove entry for m2
        assert self.m2.name not in self.s.schedule
        assert self.s.flushed == 2

        m2.enabled = True
        m2.save()

        # get_schedule should now see the schedule has changed.
        # and add entry for m2
        assert self.m2.name in self.s.schedule
        assert self.s.flushed == 3
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def test_sync_rollback_on_save_error(self):
        self.s.schedule[self.m1.name] = EntrySaveRaises(self.m1, app=self.app)
        self.s._dirty.add(self.m1.name)
        with pytest.raises(RuntimeError):
            self.s.sync()
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def test_PeriodicTask_unicode_interval(self):
        p = self.create_model_interval(schedule(timedelta(seconds=10)))
        assert text_t(p) == '{0}: every 10.0 seconds'.format(p.name)
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def test_PeriodicTask_unicode_no_schedule(self):
        p = self.create_model()
        assert text_t(p) == '{0}: {{no schedule}}'.format(p.name)
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def test_CrontabSchedule_schedule(self):
        s = CrontabSchedule(
            minute='3, 7',
            hour='3, 4',
            day_of_week='*',
            day_of_month='1, 16',
            month_of_year='1, 7',
        )
        assert s.schedule.minute == {3, 7}
        assert s.schedule.hour == {3, 4}
        assert s.schedule.day_of_week == {0, 1, 2, 3, 4, 5, 6}
        assert s.schedule.day_of_month == {1, 16}
        assert s.schedule.month_of_year == {1, 7}
项目:django-celery-beat    作者:celery    | 项目源码 | 文件源码
def test_track_changes(self):
        assert PeriodicTasks.last_change() is None
        m1 = self.create_model_interval(schedule(timedelta(seconds=10)))
        m1.save()
        x = PeriodicTasks.last_change()
        assert x
        m1.args = '(23, 24)'
        m1.save()
        y = PeriodicTasks.last_change()
        assert y
        assert y > x