Python freezegun 模块,freeze_time() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用freezegun.freeze_time()

项目:stregsystemet    作者:f-klubben    | 项目源码 | 文件源码
def test_multibuy_hint_two_buys_applicable(self):
        member = Member.objects.get(username="jokke")
        coke = Product.objects.create(
            name="coke",
            price=100,
            active=True
        )
        with freeze_time(timezone.datetime(2018, 1, 1)) as frozen_time:
            for i in range(1, 3):
                Sale.objects.create(
                    member=member,
                    product=coke,
                    price=100,
                )
                frozen_time.tick()
        give_multibuy_hint, sale_hints = stregsystem_views._multibuy_hint(timezone.datetime(2018, 1, 1, tzinfo=pytz.UTC), member)
        self.assertTrue(give_multibuy_hint)
        self.assertEqual(sale_hints, "{} {}:{}".format("jokke", coke.id, 2))
项目:stregsystemet    作者:f-klubben    | 项目源码 | 文件源码
def setUp(self):
        self.flan = Product.objects.create(name="FLan", price=1.0, active=True)
        self.flanmad = Product.objects.create(name="FLan mad", price=2.0, active=True)
        self.notflan = Product.objects.create(name="Ikke Flan", price=2.0, active=True)

        self.alan = Member.objects.create(username="tester", firstname="Alan", lastname="Alansen")
        self.bob = Member.objects.create(username="bob", firstname="bob", lastname="bob")

        with freeze_time('2017-02-02'):
            Sale.objects.create(member=self.alan, product=self.flan, price=1.0)

        with freeze_time('2017-02-15'):
            Sale.objects.create(member=self.alan, product=self.flan, price=1.0)

        with freeze_time('2017-02-07'):
            Sale.objects.create(member=self.alan, product=self.flanmad, price=1.0)

        with freeze_time('2017-02-05'):
            Sale.objects.create(member=self.alan, product=self.notflan, price=1.0)
项目:errcron    作者:attakei    | 项目源码 | 文件源码
def test_default_poller_interval_is_30_seconds(capsys):
    class ActivateImpl(MockedImpl):
        CRONTAB = [
            '0 0 * * * stub.print_datetime',
        ]

    plugin = ActivateImpl()
    plugin.activate()
    with freeze_time('2016-01-01 00:00:01'):
        plugin.poll_crontab()
        out, err = capsys.readouterr()
        assert out == '2016-01-01'
    with freeze_time('2016-01-01 00:00:31'):
        plugin.poll_crontab()
        out, err = capsys.readouterr()
        assert out == ''
项目:errcron    作者:attakei    | 项目源码 | 文件源码
def test_timezone_in_config(capsys):
    class MockConfig(object):
        TIMEZONE = 'Asia/Tokyo'

    class ActivateImpl(MockedImpl):
        CRONTAB = [
            '0 0 * * * .print_datetime',
        ]

        def print_datetime(self, polled_time):
            six.print_(polled_time.strftime('%Y-%m-%d'), end='')

    plugin = ActivateImpl()
    plugin.activate()
    setattr(plugin, 'bot_config', MockConfig())
    with freeze_time('2016-01-01 00:00:01'):
        plugin.poll_crontab()
        out, err = capsys.readouterr()
        assert out != '2016-01-01'
项目:errcron    作者:attakei    | 项目源码 | 文件源码
def test_default_poller_interval_is_30_seconds(capsys):
    class ActivateImpl(MockedImpl):
        CRONTAB = [
            '0 0 * * * stub.print_datetime',
        ]

    plugin = ActivateImpl()
    plugin.activate()
    with freeze_time('2016-01-01 00:00:01'):
        plugin.poll_crontab()
        out, err = capsys.readouterr()
        assert out == '2016-01-01'
    with freeze_time('2016-01-01 00:00:31'):
        plugin.poll_crontab()
        out, err = capsys.readouterr()
        assert out == ''
项目:errcron    作者:attakei    | 项目源码 | 文件源码
def test_timezone_in_plugin(capsys):
    class ActivateImpl(MockedImpl):
        TIMEZONE = 'Asia/Tokyo'
        CRONTAB = [
            '0 0 * * * .print_datetime',
        ]

        def activate(self):
            self.activate_crontab()

        def print_datetime(self, polled_time):
            six.print_(polled_time.strftime('%Y-%m-%d'), end='')

    plugin = ActivateImpl()
    plugin.activate()
    with freeze_time('2016-01-01 00:00:01'):
        plugin.poll_crontab()
        out, err = capsys.readouterr()
        assert out != '2016-01-01'
项目:errcron    作者:attakei    | 项目源码 | 文件源码
def test_timezone_in_config(capsys):
    class MockConfig(object):
        TIMEZONE = 'Asia/Tokyo'

    class ActivateImpl(MockedImpl):
        CRONTAB = [
            '0 0 * * * .print_datetime',
        ]

        def print_datetime(self, polled_time):
            six.print_(polled_time.strftime('%Y-%m-%d'), end='')

    plugin = ActivateImpl()
    plugin.activate()
    setattr(plugin, 'bot_config', MockConfig())
    with freeze_time('2016-01-01 00:00:01'):
        plugin.poll_crontab()
        out, err = capsys.readouterr()
        assert out != '2016-01-01'
项目:parsec-cloud    作者:Scille    | 项目源码 | 文件源码
def test_perform_block_delete(app):
    content = 'foo'
    eff = app.perform_block_create(EBlockCreate(content))
    block_id = perform_sequence([], eff)
    # Delete from new blocks
    with freeze_time('2012-01-01') as frozen_datetime:
        eff = app.perform_block_delete(EBlockDelete(block_id))
        perform_sequence([], eff)
        assert app.last_modified == Arrow.fromdatetime(frozen_datetime())
    # Delete in cache
    app.block_cache[block_id] = {'foo': 'bar'}
    assert app.block_cache.currsize == 1
    with freeze_time('2012-01-01') as frozen_datetime:
        eff = app.perform_block_delete(EBlockDelete(block_id))
        perform_sequence([], eff)
        assert app.last_modified == Arrow.fromdatetime(frozen_datetime())
    assert app.block_cache.currsize == 0
    # Not found
    with pytest.raises(BlockNotFound):
        eff = app.perform_block_delete(EBlockDelete(block_id))
        perform_sequence([], eff)
项目:parsec-cloud    作者:Scille    | 项目源码 | 文件源码
def test_perform_user_vlob_delete(app):
    eff = app.perform_user_vlob_update(EUserVlobUpdate(1, 'foo'))
    perform_sequence([], eff)
    # Delete from new user vlobs
    with freeze_time('2012-01-01') as frozen_datetime:
        eff = app.perform_user_vlob_delete(EUserVlobDelete())
        perform_sequence([], eff)
        assert app.last_modified == Arrow.fromdatetime(frozen_datetime())
    # Delete in cache
    app.user_vlob_cache[2] = {'foo': 'bar'}
    assert app.user_vlob_cache.currsize == 1
    with freeze_time('2012-01-01') as frozen_datetime:
        eff = app.perform_user_vlob_delete(EUserVlobDelete(2))
        perform_sequence([], eff)
        assert app.last_modified == Arrow.fromdatetime(frozen_datetime())
    assert app.user_vlob_cache.currsize == 0
    # Not found
    with pytest.raises(UserVlobNotFound):
        eff = app.perform_user_vlob_delete(EUserVlobDelete(2))
        perform_sequence([], eff)
项目:S4    作者:MichaelAquilina    | 项目源码 | 文件源码
def test_get(self, s3_client):
        # given
        data = b'#000000'
        frozen_time = datetime.datetime(2016, 10, 23, 10, 30, tzinfo=datetime.timezone.utc)
        with freezegun.freeze_time(frozen_time):
            s3_client.boto.put_object(
                Bucket=s3_client.bucket,
                Key=os.path.join(s3_client.prefix, 'black.color'),
                Body=data,
            )

        # when
        output_object = s3_client.get('black.color')

        # then
        assert output_object.fp.read() == data
        assert output_object.total_size == len(data)
        assert output_object.timestamp == to_timestamp(frozen_time)
项目:pony-standup-bot    作者:alexanderad    | 项目源码 | 文件源码
def test_execute_report_summary_already_sent(self):
        with freezegun.freeze_time('2016-12-23 11:00'):
            self.task.execute(self.bot, self.slack)

            task = self.bot.fast_queue.pop()
            self.assertIsInstance(task, pony.tasks.AskStatus)
            self.assertListEqual(task.teams, ['dev_team1'])
            self.assertEqual(task.user_id, '_sasha_id')

            # mark day as already reported
            self.bot.storage.set('report', {
                date(2016, 12, 23): {
                    'dev_team1': {
                        'reported_at': datetime.utcnow(),
                        'reports': {}
                    }
                }
            })

            self.task.execute(self.bot, self.slack)
            with self.assertRaises(IndexError):
                self.bot.fast_queue.pop()
项目:pony-standup-bot    作者:alexanderad    | 项目源码 | 文件源码
def test_execute_day_is_holiday(self):
        with freezegun.freeze_time('2016-12-01 11:00'):
            self.task.execute(self.bot, self.slack)

            task = self.bot.fast_queue.pop()
            self.assertIsInstance(task, pony.tasks.SendMessage)
            self.assertEqual(task.to, '#dev-team')
            self.assertIn('No Standup Today', task.text)
            self.assertIn('Romanian National Day', task.text)

            # this is sent only once (report is marked reported)
            self.task.execute(self.bot, self.slack)
            with self.assertRaises(IndexError):
                self.bot.fast_queue.pop()

            self.assertIsNotNone(
                self.bot.storage.get('report')[
                    date(2016, 12, 1)
                ]['dev_team1'].get('reported_at')
            )
项目:vws-python    作者:adamtheturtle    | 项目源码 | 文件源码
def test_rfc_1123_date(self) -> None:
        """
        ``rfc_1123_date`` returns the date formatted as required by Vuforia.
        This test matches the example date set at
        `<https://library.vuforia.com/articles/Training/Using-the-VWS-API>`_.
        """
        date = datetime.datetime(
            day=22,
            month=4,
            year=2012,
            hour=8,
            minute=49,
            second=37,
        )
        with freeze_time(date):
            assert rfc_1123_date() == 'Sun, 22 Apr 2012 08:49:37 GMT'
项目:TopChef    作者:TopChef    | 项目源码 | 文件源码
def test_has_timed_out_true(
            self, timeout: timedelta, time_to_wait: timedelta
    ) -> None:
        """

        :param timeout: The timeout to set and wait for
        :param time_to_wait: The time to wait on top of the timeout,
             in order to ensure that the server timed out
        :return:
        """
        self.service.timeout = timeout
        self.service.check_in()

        assume(
            self._target_time_in_range(
                datetime.utcnow(), timeout, time_to_wait
            )
        )

        time_to_freeze_to = datetime.utcnow() + timeout + time_to_wait

        with freeze_time(time_to_freeze_to):  # ZA WARUDO!
            self.assertTrue(self.service.has_timed_out)
项目:TopChef    作者:TopChef    | 项目源码 | 文件源码
def test_has_timed_out_false(
            self, timeout: timedelta, time_to_wait: timedelta
    ) -> None:
        self.service.timeout = timeout
        self.service.check_in()

        assume(
            self._target_time_in_range(
                datetime.utcnow(), timeout, time_to_wait
            )
        )

        with freeze_time(
            datetime.utcnow() + timeout + time_to_wait
        ):  # ZA WARUDO!
            self.assertFalse(self.service.has_timed_out)
项目:skills-ml    作者:workforce-data-initiative    | 项目源码 | 文件源码
def test_dataset_stats_counter_empty():
    counter = DatasetStatsCounter(quarter='2013Q1', dataset_id='VA')
    with moto.mock_s3():
        with freeze_time('2017-01-10'):
            s3_conn = boto.connect_s3()
            s3_conn.create_bucket('test-bucket')
            counter.save(s3_conn, 'test-bucket/stats')

            key = s3_conn.get_bucket('test-bucket')\
                .get_key('stats/quarterly/VA_2013Q1')

        expected_stats = {
            'total': 0,
            'output_counts': {},
            'input_counts': {},
            'output_percentages': {},
            'input_percentages': {},
            'last_updated': '2017-01-10T00:00:00',
            'quarter': '2013Q1',
        }
        assert json.loads(key.get_contents_as_string().decode('utf-8')) == expected_stats
项目:skills-ml    作者:workforce-data-initiative    | 项目源码 | 文件源码
def test_dataset_stats_aggregator():
    with moto.mock_s3():
        s3_conn = boto.connect_s3()
        aggregator = DatasetStatsAggregator(dataset_id='CB', s3_conn=s3_conn)

        add_s3_content(
            s3_conn,
            {
                'test-bucket/stats/quarterly/CB_2014Q1':
                    json.dumps(sample_quarter_stats('2014Q1')),
                'test-bucket/stats/quarterly/CB_2014Q2':
                    json.dumps(sample_quarter_stats('2014Q2')),
                'test-bucket/stats/quarterly/VA_2014Q1':
                    json.dumps(sample_quarter_stats('2014Q1')),
            }
        )

        with freeze_time('2017-01-10'):
            aggregator.run('test-bucket/stats')

        expected_stats = sample_dataset_stats()
        key = s3_conn.get_bucket('test-bucket')\
            .get_key('stats/dataset_summaries/CB.json')
        assert json.loads(key.get_contents_as_string().decode('utf-8')) == expected_stats
项目:django-config-models    作者:edx    | 项目源码 | 文件源码
def test_key_values(self, mock_cache):
        mock_cache.get.return_value = None

        with freeze_time('2012-01-01'):
            ExampleKeyedConfig(left='left_a', right='right_a', changed_by=self.user).save()
            ExampleKeyedConfig(left='left_b', right='right_b', changed_by=self.user).save()

        ExampleKeyedConfig(left='left_a', right='right_a', changed_by=self.user).save()
        ExampleKeyedConfig(left='left_b', right='right_b', changed_by=self.user).save()

        unique_key_pairs = ExampleKeyedConfig.key_values()
        self.assertEquals(len(unique_key_pairs), 2)
        self.assertEquals(set(unique_key_pairs), set([('left_a', 'right_a'), ('left_b', 'right_b')]))
        unique_left_keys = ExampleKeyedConfig.key_values('left', flat=True)
        self.assertEquals(len(unique_left_keys), 2)
        self.assertEquals(set(unique_left_keys), set(['left_a', 'left_b']))
项目:django-config-models    作者:edx    | 项目源码 | 文件源码
def test_current_set(self, mock_cache):
        mock_cache.get.return_value = None

        with freeze_time('2012-01-01'):
            ExampleKeyedConfig(left='left_a', right='right_a', int_field=0, changed_by=self.user).save()
            ExampleKeyedConfig(left='left_b', right='right_b', int_field=0, changed_by=self.user).save()

        ExampleKeyedConfig(left='left_a', right='right_a', int_field=1, changed_by=self.user).save()
        ExampleKeyedConfig(left='left_b', right='right_b', int_field=2, changed_by=self.user).save()

        queryset = ExampleKeyedConfig.objects.current_set()
        self.assertEqual(len(queryset.all()), 2)
        self.assertEqual(
            set(queryset.order_by('int_field').values_list('int_field', flat=True)),
            set([1, 2])
        )
项目:adhocracy4    作者:liqd    | 项目源码 | 文件源码
def test_action_create(question_factory):
    question = question_factory()
    project = question.module.project
    now = timezone.now()
    with freeze_time(now):
        action = Action(
            actor=question.creator,
            verb=Verbs.ADD.value,
            obj=question,
            target=project,
            description='description'
        )
    assert action.actor == question.creator
    assert action.verb == Verbs.ADD.value
    assert action.obj == question
    assert action.target == project
    assert action.timestamp == now
    assert action.public is True
    assert action.description == 'description'
项目:adhocracy4    作者:liqd    | 项目源码 | 文件源码
def test_phase_end_tomorrow(phase_factory):

    phase = phase_factory(
        start_date=parse('2013-01-01 17:00:00 UTC'),
        end_date=parse('2013-01-01 18:00:00 UTC')
    )

    project = phase.module.project

    action_count = Action.objects.filter(verb=SCHEDULE).count()
    assert action_count == 0

    with freeze_time('2013-01-01 17:30:00 UTC'):
        call_command('create_system_actions')
        action_count = Action.objects.filter(verb=SCHEDULE).count()
        action = Action.objects.filter(verb=SCHEDULE).last()
        assert action_count == 1
        assert action.obj == phase
        assert action.verb == SCHEDULE
        assert action.project == project
项目:adhocracy4    作者:liqd    | 项目源码 | 文件源码
def test_project_starts_later_or_earlier(phase_factory):

    phase = phase_factory(
        start_date=parse('2013-01-01 17:00:00 UTC'),
        end_date=parse('2013-01-02 18:00:00 UTC')
    )

    with freeze_time(phase.start_date - timedelta(days=1)):
        call_command('create_system_actions')
        action_count = Action.objects.filter(verb=START).count()
        assert action_count == 0

    with freeze_time(phase.start_date + timedelta(days=1)):
        call_command('create_system_actions')
        action_count = Action.objects.filter(verb=START).count()
        assert action_count == 0
项目:adhocracy4    作者:liqd    | 项目源码 | 文件源码
def test_project_start_single_action(phase_factory):

    phase = phase_factory(
        start_date=parse('2013-01-01 17:00:00 UTC'),
        end_date=parse('2013-01-01 18:00:00 UTC')
    )

    action_count = Action.objects.filter(verb=START).count()
    assert action_count == 0

    with freeze_time(phase.start_date + timedelta(minutes=30)):
        call_command('create_system_actions')
        action_count = Action.objects.filter(verb=START).count()
        assert action_count == 1

    # first phase starts within the last hour but script has already run
    with freeze_time(phase.start_date + timedelta(minutes=45)):
        call_command('create_system_actions')
        action_count = Action.objects.filter(verb=START).count()
        assert action_count == 1
项目:adhocracy4    作者:liqd    | 项目源码 | 文件源码
def test_project_start_reschedule(phase_factory):

    phase = phase_factory(
        start_date=parse('2013-01-01 17:00:00 UTC'),
        end_date=parse('2013-01-01 18:00:00 UTC')
    )

    # first phase starts within an hour
    with freeze_time(phase.start_date + timedelta(minutes=30)):
        call_command('create_system_actions')
        action_count = Action.objects.filter(verb=START).count()
        assert action_count == 1

    # first phases start date has been moved forward
    # and the start actions timestamp has to be adapted
    phase.start_date = phase.start_date + timedelta(days=1)
    phase.save()
    with freeze_time(phase.start_date + timedelta(minutes=30)):
        call_command('create_system_actions')
        action_count = Action.objects.filter(verb=START).count()
        assert action_count == 1
        action = Action.objects.filter(verb=START).first()
        assert action.timestamp == phase.start_date
项目:adhocracy4    作者:liqd    | 项目源码 | 文件源码
def test_days_left(project, phase_factory):
    phase1 = phase_factory(
        start_date=parse('2013-01-01 18:00:00 UTC'),
        end_date=parse('2013-01-02 18:00:00 UTC'),
        module__project=project,
    )
    phase2 = phase_factory(
        start_date=parse('2013-02-01 18:00:00 UTC'),
        end_date=parse('2013-02-02 18:00:00 UTC'),
        module__project=project,
    )

    with freeze_time(phase1.start_date):
        assert project.days_left is 1
    with freeze_time(phase1.end_date):
        assert project.days_left is None
    with freeze_time(phase2.start_date):
        assert project.days_left is 1
    with freeze_time(phase2.end_date):
        assert project.days_left is None
项目:adhocracy4    作者:liqd    | 项目源码 | 文件源码
def test_react_rating_anonymous(rf, question, comment):
    with freeze_time('2013-01-02 18:00:00 UTC'):
        user = AnonymousUser()
        props = react_comment_render_for_props(rf, user, question)
        comments_content_type = ContentType.objects.get_for_model(comment)
        request = rf.get('/')
        request.user = user

        comments = ThreadSerializer(
            question.comments.all().order_by('-created'),
            many=True, context={'request': request}).data

        assert props == {
            'comments': comments,
            'comments_contenttype': comments_content_type.pk,
            'isAuthenticated': False,
            'isModerator': False,
            'isReadOnly': True,
            'user_name': '',
        }
项目:adhocracy4    作者:liqd    | 项目源码 | 文件源码
def test_react_rating_user(rf, user, phase, question, comment):
    with freeze_time('2013-01-02 18:00:00 UTC'):
        props = react_comment_render_for_props(rf, user, question)
        comments_content_type = ContentType.objects.get_for_model(comment)
        request = rf.get('/')
        request.user = user

        comments = ThreadSerializer(
            question.comments.all().order_by('-created'),
            many=True, context={'request': request}).data

        assert props == {
            'comments': comments,
            'comments_contenttype': comments_content_type.pk,
            'isAuthenticated': True,
            'isModerator': False,
            'isReadOnly': False,
            'user_name': user.username,
        }
项目:adhocracy4    作者:liqd    | 项目源码 | 文件源码
def test_manager_finished_phases(phase_factory):
    old_phase = phase_factory(
        start_date=parse('2013-01-01 17:00:00 UTC'),
        end_date=parse('2013-01-01 18:00:00 UTC')
    )

    new_phase = phase_factory(
        start_date=parse('2013-01-01 18:00:00 UTC'),
        end_date=parse('2013-01-01 19:00:00 UTC')
    )

    with freeze_time(new_phase.start_date):
        finished_phases = models.Phase.objects.finished_phases()
        assert list(finished_phases) == [old_phase]

    with freeze_time(new_phase.end_date):
        finished_phases = models.Phase.objects.finished_phases()
        assert list(finished_phases) == [old_phase, new_phase]
项目:adhocracy4    作者:liqd    | 项目源码 | 文件源码
def test_manager_finish_next(phase_factory):

    phase_today = phase_factory(
        start_date=parse('2013-01-01 17:00:00 UTC'),
        end_date=parse('2013-01-01 17:00:01 UTC')
    )

    phase_tomorrow = phase_factory(
        start_date=parse('2013-01-01 17:00:00 UTC'),
        end_date=parse('2013-01-02 17:00:00 UTC')
    )

    phase_factory(
        start_date=parse('2013-01-01 17:00:00 UTC'),
        end_date=parse('2013-01-02 17:00:03 UTC')
    )

    with freeze_time(phase_today.start_date):
        finish_phases = models.Phase.objects.finish_next()
        assert list(finish_phases) == [phase_today, phase_tomorrow]

    with freeze_time(phase_today.end_date):
        finish_phases = models.Phase.objects.finish_next()
        assert list(finish_phases) == [phase_tomorrow]
项目:adhocracy4    作者:liqd    | 项目源码 | 文件源码
def test_past_phases(phase_factory):
    phase1 = phase_factory(
        start_date=parse('2013-01-01 18:00:00 UTC'),
        end_date=parse('2013-01-10 18:00:00 UTC'),
    )
    phase2 = phase_factory(
        start_date=parse('2013-01-05 18:00:00 UTC'),
        end_date=parse('2013-01-15 18:00:00 UTC'),
    )

    with freeze_time(phase1.start_date):
        assert list(models.Phase.objects.past_phases()) == []
    with freeze_time(phase1.end_date):
        assert list(models.Phase.objects.past_phases()) == [phase1]
    with freeze_time(phase2.end_date):
        assert list(models.Phase.objects.past_phases()) == [phase1, phase2]
项目:adhocracy4    作者:liqd    | 项目源码 | 文件源码
def test_future_phases(phase_factory):
    phase1 = phase_factory(
        start_date=parse('2013-01-01 18:00:00 UTC'),
        end_date=parse('2013-01-10 18:00:00 UTC'),
    )
    phase2 = phase_factory(
        start_date=parse('2013-01-05 18:00:00 UTC'),
        end_date=parse('2013-01-15 18:00:00 UTC'),
    )
    phase3 = phase_factory(
        start_date=None,
        end_date=None
    )

    with freeze_time(phase1.start_date - timedelta(minutes=1)):
        assert (list(models.Phase.objects.future_phases())
                == [phase3, phase1, phase2])
    with freeze_time(phase2.start_date - timedelta(minutes=1)):
        assert list(models.Phase.objects.future_phases()) == [phase3, phase2]
    with freeze_time(phase2.end_date):
        assert list(models.Phase.objects.future_phases()) == [phase3]
项目:adhocracy4    作者:liqd    | 项目源码 | 文件源码
def test_past_and_active_phases(phase_factory):

    phase1 = phase_factory(
        start_date=parse('2013-01-01 18:00:00 UTC'),
        end_date=parse('2013-01-10 18:00:00 UTC'),
    )
    phase2 = phase_factory(
        start_date=parse('2013-01-05 18:00:00 UTC'),
        end_date=parse('2013-01-15 18:00:00 UTC'),
    )
    phase_factory(
        start_date=None,
        end_date=None
    )

    with freeze_time(phase1.start_date - timedelta(minutes=1)):
        assert list(models.Phase.objects.past_and_active_phases()) == []
    with freeze_time(phase1.start_date):
        assert list(models.Phase.objects.past_and_active_phases()) == [phase1]
    with freeze_time(phase2.start_date):
        assert (list(models.Phase.objects.past_and_active_phases())
                == [phase1, phase2])
    with freeze_time(phase2.end_date):
        assert (list(models.Phase.objects.past_and_active_phases())
                == [phase1, phase2])
项目:uclapi    作者:uclapi    | 项目源码 | 文件源码
def test_seconds_until_midnight(self):
        arg_list = [
            "2017-05-29 23:59:59",
            "2017-05-29 00:00:00",
            "2017-05-29 00:00:01"
        ]

        expected = [
            1,
            0,
            86399
        ]

        for idx, arg in enumerate(arg_list):
            with freeze_time(arg):
                self.assertEqual(
                    how_many_seconds_until_midnight(),
                    expected[idx]
                )
项目:shifthelper    作者:fact-project    | 项目源码 | 文件源码
def test_NightlyResettingDefaultdict():
    from shifthelper.tools import NightlyResettingDefaultdict

    initial_datetime = datetime.datetime(2016, 1, 1)
    two_days_later = datetime.datetime(2016, 1, 3, 0, 0, 0)

    with freeze_time(initial_datetime) as frozen_datetime:
        nightly_max_rate = NightlyResettingDefaultdict(lambda: -np.inf)
        nightly_max_rate['foo'] = 5

        assert nightly_max_rate['foo'] == 5
        frozen_datetime.move_to(two_days_later)

        assert nightly_max_rate['foo'] == -np.inf
        nightly_max_rate['foo'] = 6
        assert nightly_max_rate['foo'] == 6
项目:weixin_server    作者:duoduo369    | 项目源码 | 文件源码
def test_key_values(self, mock_cache):
        mock_cache.get.return_value = None

        with freeze_time('2012-01-01'):
            ExampleKeyedConfig(left='left_a', right='right_a', changed_by=self.user).save()
            ExampleKeyedConfig(left='left_b', right='right_b', changed_by=self.user).save()

        ExampleKeyedConfig(left='left_a', right='right_a', changed_by=self.user).save()
        ExampleKeyedConfig(left='left_b', right='right_b', changed_by=self.user).save()

        unique_key_pairs = ExampleKeyedConfig.key_values()
        self.assertEquals(len(unique_key_pairs), 2)
        self.assertEquals(set(unique_key_pairs), set([('left_a', 'right_a'), ('left_b', 'right_b')]))
        unique_left_keys = ExampleKeyedConfig.key_values('left', flat=True)
        self.assertEquals(len(unique_left_keys), 2)
        self.assertEquals(set(unique_left_keys), set(['left_a', 'left_b']))
项目:weixin_server    作者:duoduo369    | 项目源码 | 文件源码
def test_current_set(self, mock_cache):
        mock_cache.get.return_value = None

        with freeze_time('2012-01-01'):
            ExampleKeyedConfig(left='left_a', right='right_a', int_field=0, changed_by=self.user).save()
            ExampleKeyedConfig(left='left_b', right='right_b', int_field=0, changed_by=self.user).save()

        ExampleKeyedConfig(left='left_a', right='right_a', int_field=1, changed_by=self.user).save()
        ExampleKeyedConfig(left='left_b', right='right_b', int_field=2, changed_by=self.user).save()

        queryset = ExampleKeyedConfig.objects.current_set()
        self.assertEqual(len(queryset.all()), 2)
        self.assertEqual(
            set(queryset.order_by('int_field').values_list('int_field', flat=True)),
            set([1, 2])
        )
项目:a4-opin    作者:liqd    | 项目源码 | 文件源码
def test_list_view(rf, phase, module_factory, idea_factory):
    module = phase.module
    project = module.project
    idea = idea_factory(module=module)
    other_module = module_factory()
    other_idea = idea_factory(module=other_module)

    with freeze_time(phase.start_date):
        view = views.IdeaListView.as_view()
        request = rf.get('/ideas')
        response = view(request, project=project, module=module)

        assert idea in response.context_data['idea_list']
        assert other_idea not in response.context_data['idea_list']
        assert response.context_data['idea_list'][0].comment_count == 0
        assert response.context_data['idea_list'][0].positive_rating_count == 0
        assert response.context_data['idea_list'][0].negative_rating_count == 0
项目:a4-opin    作者:liqd    | 项目源码 | 文件源码
def test_create_view(client, phase, user):
    module = phase.module
    with freeze_time(phase.start_date):
        count = models.Idea.objects.all().count()
        assert count == 0
        url = reverse('idea-create', kwargs={'slug': module.slug})
        response = client.get(url)
        assert response.status_code == 302
        assert redirect_target(response) == 'account_login'
        client.login(username=user.email, password='password')
        response = client.get(url)
        assert response.status_code == 200
        idea = {'name': 'Idea', 'description': 'description'}
        response = client.post(url, idea)
        assert response.status_code == 302
        assert redirect_target(response) == 'idea-detail'
        count = models.Idea.objects.all().count()
        assert count == 1
项目:a4-opin    作者:liqd    | 项目源码 | 文件源码
def test_update_view(client, phase, idea):
    idea.module = phase.module
    idea.save()
    user = idea.creator
    with freeze_time(phase.start_date):
        url = reverse('idea-update', kwargs={'slug': idea.slug})
        response = client.get(url)
        assert response.status_code == 302
        client.login(username=user.email, password='password')
        response = client.get(url)
        assert response.status_code == 200
        data = {'description': 'description', 'name': idea.name}
        response = client.post(url, data)
        id = idea.pk
        updated_idea = models.Idea.objects.get(id=id)
        assert updated_idea.description == 'description'
        assert response.status_code == 302
项目:a4-opin    作者:liqd    | 项目源码 | 文件源码
def test_ideas_download_contains_right_data(rf, idea_factory, admin):
    idea = idea_factory()
    module = idea.module
    idea_factory(module=module)
    idea_factory(module=module)

    now = timezone.now()
    with freeze_time(now):
        request = rf.get('/ideas/download/module/{}'.format(module.slug))
        request.user = admin
        response = views.IdeaDownloadView.as_view()(request, slug=module.slug)
        assert response.status_code == 200
        assert (response._headers['content-type'] ==
                ('Content-Type',
                'application/vnd.openxmlformats-officedocument'
                    '.spreadsheetml.sheet'))
        assert (response._headers['content-disposition'] ==
                ('Content-Disposition',
                 'attachment; filename="{}_{}.xlsx"'
                 .format(
                     module.project.slug,
                     now.strftime('%Y%m%dT%H%M%S'))))
项目:a4-opin    作者:liqd    | 项目源码 | 文件源码
def test_phase_dispatch_mixin_return_active_phase(
    rf,
    project_detail_view,
    phase1,
    phase2
):
    project = phase1.module.project
    project_url = reverse('project-detail', args=[project.slug])

    with freeze_time(phase1.start_date):
        # Requesting garbage should return the currently active phase.
        request = rf.get("{0}?phase={1}".format(project_url, "A"*100))
        response = project_detail_view(request, slug=project.slug)
        assert FakePhase0View.template_name in response.template_name
        assert FakePhase1View.template_name not in response.template_name

        # Without any further specification via '?phase=' return the
        # active phase.
        request = rf.get(project_url)
        response = project_detail_view(request, slug=project.slug)
        assert FakePhase0View.template_name in response.template_name
        assert FakePhase1View.template_name not in response.template_name
项目:PyAPNs2    作者:Pr0Ger    | 项目源码 | 文件源码
def setUp(self):
        # Create an 'ephemeral' token so we can test token timeouts. We
        # want a timeout long enough to last the test, but we don't want to
        # slow down the tests too much either.
        self.normal_creds = TokenCredentials(self.key_path, self.key_id,
                                             self.team_id)
        self.lasting_header = self.normal_creds.get_authorization_header(
            self.topics[0])

        with freeze_time('2012-01-14'):
            self.expiring_creds = \
                TokenCredentials(self.key_path, self.key_id,
                                 self.team_id,
                                 token_lifetime=self.token_lifetime)
            self.expiring_header = self.expiring_creds.get_authorization_header(
                self.topics[0])
项目:hakoblog-python    作者:hakobe    | 项目源码 | 文件源码
def test_find_entries():
    "EntryLoader.find_entries can find entries for the specified blog"
    db = DB()
    blog = create_blog()

    entries = []
    with freeze_time('2017-01-13 12:00:02'):
        entries.append(create_entry(blog=blog))
    with freeze_time('2017-01-13 12:00:01'):
        entries.append(create_entry(blog=blog))
    with freeze_time('2017-01-13 12:00:00'):
        entries.append(create_entry(blog=blog))

    found_entries = EntryLoader.find_entries(db, blog.id)
    assert len(found_entries) == len(entries)
    assert [e.id for e in found_entries] == [e.id for e in entries]

    found_entries_with_limit = EntryLoader.find_entries(db, blog.id, limit=2)
    assert len(found_entries_with_limit) == len(entries[0:2])
    assert [
        e.id for e in found_entries_with_limit
    ] == [e.id for e in entries[0:2]]
项目:hakoblog-python    作者:hakobe    | 项目源码 | 文件源码
def test_index_with_entries():
    db = DB()
    with global_user(random_string(5)):
        blog = BlogAction.ensure_global_blog_created(db)

        entries = []
        with freeze_time('2017-01-13 12:00:02'):
            entries.append(create_entry(blog=blog))
        with freeze_time('2017-01-13 12:00:01'):
            entries.append(create_entry(blog=blog))
        with freeze_time('2017-01-13 12:00:00'):
            entries.append(create_entry(blog=blog))

        res = web_client().get('/')
        assert res.status == '200 OK'

        d = pq(res.data)

        assert [
            int(d(a).attr('data-entry-id')) for a in d('.entry')
        ] == [e.id for e in entries]
项目:pyaptly    作者:adfinis-sygroup    | 项目源码 | 文件源码
def test_publish_update_rotating():
    """Test if update rotating publishes works."""
    with test.clean_and_config(os.path.join(
            _test_base,
            b"publish-current.yml",
    )) as (tyml, config):
        do_publish_create_rotating(config)
        with freezegun.freeze_time("2012-10-11 10:10:10"):
            args = [
                '-c',
                config,
                'publish',
                'update',
            ]
            main(args)
            state = SystemStateReader()
            state.read()
            expect = {
                u'fake/current stable': set([u'fake-current']),
                u'fakerepo01/current stable': set([u'fakerepo01-current']),
                u'fakerepo02/current stable': set([u'fakerepo02-current'])
            }
            assert expect == state.publish_map
项目:pyaptly    作者:adfinis-sygroup    | 项目源码 | 文件源码
def test_publish_snapshot_update_rotating():
    """Test if update rotating publishes via snapshot works."""
    with test.clean_and_config(os.path.join(
            _test_base,
            b"publish-current.yml",
    )) as (tyml, config):
        do_publish_create_rotating(config)
        with freezegun.freeze_time("2012-10-11 10:10:10"):
            args = [
                '-c',
                config,
                'snapshot',
                'update',
            ]
            main(args)
            state = SystemStateReader()
            state.read()
            expect = {
                u'fake/current stable': set([u'fake-current']),
                u'fakerepo01/current stable': set([u'fakerepo01-current']),
                u'fakerepo02/current stable': set([u'fakerepo02-current'])
            }
            assert expect == state.publish_map
项目:alexa-voice-service-client    作者:richtier    | 项目源码 | 文件源码
def test_retrieve_api_token_expiring_memo(token_retrieve_200):
    authenticator = AlexaVoiceServiceTokenAuthenticator(
        client_id='debug', secret='debug', refresh_token='debug'
    )

    with freeze_time('3012-01-14 12:00:00'):
        authenticator.retrieve_api_token()
        authenticator.retrieve_api_token()
    assert len(token_retrieve_200.request_history) == 1

    with freeze_time('3012-01-14 13:00:00'):
        authenticator.retrieve_api_token()
    assert len(token_retrieve_200.request_history) == 2

    with freeze_time('3012-01-14 13:30:00'):
        authenticator.retrieve_api_token()
    assert len(token_retrieve_200.request_history) == 2

    with freeze_time('3012-01-14 14:00:00'):
        authenticator.retrieve_api_token()
    assert len(token_retrieve_200.request_history) == 3
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_edited_is_false_for_newly_created_content_within_15_minutes_grace_period(self):
        with freeze_time(self.public_content.created + datetime.timedelta(minutes=14)):
            self.public_content.save()
            self.assertFalse(self.public_content.edited)
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_edited_is_true_for_newly_created_content_after_15_minutes_grace_period(self):
        with freeze_time(self.public_content.created + datetime.timedelta(minutes=16)):
            self.public_content.save()
            self.assertTrue(self.public_content.edited)
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_dict_for_view_edited_post(self):
        with freeze_time(self.public_content.created + datetime.timedelta(minutes=16)):
            self.public_content.save()
            self.assertEqual(self.public_content.dict_for_view(self.user), {
                "author": self.public_content.author_id,
                "author_guid": self.public_content.author.guid,
                "author_handle": self.public_content.author.handle,
                "author_home_url": self.public_content.author.home_url,
                "author_image": self.public_content.author.safer_image_url_small,
                "author_is_local": bool(self.public_content.author.user),
                "author_name": self.public_content.author.handle,
                "author_profile_url": self.public_content.author.get_absolute_url(),
                "content_type": self.public_content.content_type.string_value,
                "delete_url": reverse("content:delete", kwargs={"pk": self.public_content.id}),
                "detail_url": self.public_content.get_absolute_url(),
                "formatted_timestamp": self.public_content.timestamp,
                "guid": self.public_content.guid,
                "has_shared": False,
                "humanized_timestamp": "%s (edited)" % self.public_content.humanized_timestamp,
                "id": self.public_content.id,
                "is_authenticated": True,
                "is_author": True,
                "is_following_author": False,
                "parent": "",
                "profile_id": self.public_content.author.id,
                "rendered": self.public_content.rendered,
                "reply_count": 0,
                "reply_url": reverse("content:reply", kwargs={"pk": self.public_content.id}),
                "shares_count": 0,
                "slug": self.public_content.slug,
                "through": self.public_content.id,
                "update_url": reverse("content:update", kwargs={"pk": self.public_content.id}),
            })