我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用freezegun.freeze_time()。
def test_multibuy_hint_two_buys_applicable(self): member = Member.objects.get(username="jokke") coke = Product.objects.create( name="coke", price=100, active=True ) with freeze_time(timezone.datetime(2018, 1, 1)) as frozen_time: for i in range(1, 3): Sale.objects.create( member=member, product=coke, price=100, ) frozen_time.tick() give_multibuy_hint, sale_hints = stregsystem_views._multibuy_hint(timezone.datetime(2018, 1, 1, tzinfo=pytz.UTC), member) self.assertTrue(give_multibuy_hint) self.assertEqual(sale_hints, "{} {}:{}".format("jokke", coke.id, 2))
def setUp(self): self.flan = Product.objects.create(name="FLan", price=1.0, active=True) self.flanmad = Product.objects.create(name="FLan mad", price=2.0, active=True) self.notflan = Product.objects.create(name="Ikke Flan", price=2.0, active=True) self.alan = Member.objects.create(username="tester", firstname="Alan", lastname="Alansen") self.bob = Member.objects.create(username="bob", firstname="bob", lastname="bob") with freeze_time('2017-02-02'): Sale.objects.create(member=self.alan, product=self.flan, price=1.0) with freeze_time('2017-02-15'): Sale.objects.create(member=self.alan, product=self.flan, price=1.0) with freeze_time('2017-02-07'): Sale.objects.create(member=self.alan, product=self.flanmad, price=1.0) with freeze_time('2017-02-05'): Sale.objects.create(member=self.alan, product=self.notflan, price=1.0)
def test_default_poller_interval_is_30_seconds(capsys): class ActivateImpl(MockedImpl): CRONTAB = [ '0 0 * * * stub.print_datetime', ] plugin = ActivateImpl() plugin.activate() with freeze_time('2016-01-01 00:00:01'): plugin.poll_crontab() out, err = capsys.readouterr() assert out == '2016-01-01' with freeze_time('2016-01-01 00:00:31'): plugin.poll_crontab() out, err = capsys.readouterr() assert out == ''
def test_timezone_in_config(capsys): class MockConfig(object): TIMEZONE = 'Asia/Tokyo' class ActivateImpl(MockedImpl): CRONTAB = [ '0 0 * * * .print_datetime', ] def print_datetime(self, polled_time): six.print_(polled_time.strftime('%Y-%m-%d'), end='') plugin = ActivateImpl() plugin.activate() setattr(plugin, 'bot_config', MockConfig()) with freeze_time('2016-01-01 00:00:01'): plugin.poll_crontab() out, err = capsys.readouterr() assert out != '2016-01-01'
def test_timezone_in_plugin(capsys): class ActivateImpl(MockedImpl): TIMEZONE = 'Asia/Tokyo' CRONTAB = [ '0 0 * * * .print_datetime', ] def activate(self): self.activate_crontab() def print_datetime(self, polled_time): six.print_(polled_time.strftime('%Y-%m-%d'), end='') plugin = ActivateImpl() plugin.activate() with freeze_time('2016-01-01 00:00:01'): plugin.poll_crontab() out, err = capsys.readouterr() assert out != '2016-01-01'
def test_perform_block_delete(app): content = 'foo' eff = app.perform_block_create(EBlockCreate(content)) block_id = perform_sequence([], eff) # Delete from new blocks with freeze_time('2012-01-01') as frozen_datetime: eff = app.perform_block_delete(EBlockDelete(block_id)) perform_sequence([], eff) assert app.last_modified == Arrow.fromdatetime(frozen_datetime()) # Delete in cache app.block_cache[block_id] = {'foo': 'bar'} assert app.block_cache.currsize == 1 with freeze_time('2012-01-01') as frozen_datetime: eff = app.perform_block_delete(EBlockDelete(block_id)) perform_sequence([], eff) assert app.last_modified == Arrow.fromdatetime(frozen_datetime()) assert app.block_cache.currsize == 0 # Not found with pytest.raises(BlockNotFound): eff = app.perform_block_delete(EBlockDelete(block_id)) perform_sequence([], eff)
def test_perform_user_vlob_delete(app): eff = app.perform_user_vlob_update(EUserVlobUpdate(1, 'foo')) perform_sequence([], eff) # Delete from new user vlobs with freeze_time('2012-01-01') as frozen_datetime: eff = app.perform_user_vlob_delete(EUserVlobDelete()) perform_sequence([], eff) assert app.last_modified == Arrow.fromdatetime(frozen_datetime()) # Delete in cache app.user_vlob_cache[2] = {'foo': 'bar'} assert app.user_vlob_cache.currsize == 1 with freeze_time('2012-01-01') as frozen_datetime: eff = app.perform_user_vlob_delete(EUserVlobDelete(2)) perform_sequence([], eff) assert app.last_modified == Arrow.fromdatetime(frozen_datetime()) assert app.user_vlob_cache.currsize == 0 # Not found with pytest.raises(UserVlobNotFound): eff = app.perform_user_vlob_delete(EUserVlobDelete(2)) perform_sequence([], eff)
def test_get(self, s3_client): # given data = b'#000000' frozen_time = datetime.datetime(2016, 10, 23, 10, 30, tzinfo=datetime.timezone.utc) with freezegun.freeze_time(frozen_time): s3_client.boto.put_object( Bucket=s3_client.bucket, Key=os.path.join(s3_client.prefix, 'black.color'), Body=data, ) # when output_object = s3_client.get('black.color') # then assert output_object.fp.read() == data assert output_object.total_size == len(data) assert output_object.timestamp == to_timestamp(frozen_time)
def test_execute_report_summary_already_sent(self): with freezegun.freeze_time('2016-12-23 11:00'): self.task.execute(self.bot, self.slack) task = self.bot.fast_queue.pop() self.assertIsInstance(task, pony.tasks.AskStatus) self.assertListEqual(task.teams, ['dev_team1']) self.assertEqual(task.user_id, '_sasha_id') # mark day as already reported self.bot.storage.set('report', { date(2016, 12, 23): { 'dev_team1': { 'reported_at': datetime.utcnow(), 'reports': {} } } }) self.task.execute(self.bot, self.slack) with self.assertRaises(IndexError): self.bot.fast_queue.pop()
def test_execute_day_is_holiday(self): with freezegun.freeze_time('2016-12-01 11:00'): self.task.execute(self.bot, self.slack) task = self.bot.fast_queue.pop() self.assertIsInstance(task, pony.tasks.SendMessage) self.assertEqual(task.to, '#dev-team') self.assertIn('No Standup Today', task.text) self.assertIn('Romanian National Day', task.text) # this is sent only once (report is marked reported) self.task.execute(self.bot, self.slack) with self.assertRaises(IndexError): self.bot.fast_queue.pop() self.assertIsNotNone( self.bot.storage.get('report')[ date(2016, 12, 1) ]['dev_team1'].get('reported_at') )
def test_rfc_1123_date(self) -> None: """ ``rfc_1123_date`` returns the date formatted as required by Vuforia. This test matches the example date set at `<https://library.vuforia.com/articles/Training/Using-the-VWS-API>`_. """ date = datetime.datetime( day=22, month=4, year=2012, hour=8, minute=49, second=37, ) with freeze_time(date): assert rfc_1123_date() == 'Sun, 22 Apr 2012 08:49:37 GMT'
def test_has_timed_out_true( self, timeout: timedelta, time_to_wait: timedelta ) -> None: """ :param timeout: The timeout to set and wait for :param time_to_wait: The time to wait on top of the timeout, in order to ensure that the server timed out :return: """ self.service.timeout = timeout self.service.check_in() assume( self._target_time_in_range( datetime.utcnow(), timeout, time_to_wait ) ) time_to_freeze_to = datetime.utcnow() + timeout + time_to_wait with freeze_time(time_to_freeze_to): # ZA WARUDO! self.assertTrue(self.service.has_timed_out)
def test_has_timed_out_false( self, timeout: timedelta, time_to_wait: timedelta ) -> None: self.service.timeout = timeout self.service.check_in() assume( self._target_time_in_range( datetime.utcnow(), timeout, time_to_wait ) ) with freeze_time( datetime.utcnow() + timeout + time_to_wait ): # ZA WARUDO! self.assertFalse(self.service.has_timed_out)
def test_dataset_stats_counter_empty(): counter = DatasetStatsCounter(quarter='2013Q1', dataset_id='VA') with moto.mock_s3(): with freeze_time('2017-01-10'): s3_conn = boto.connect_s3() s3_conn.create_bucket('test-bucket') counter.save(s3_conn, 'test-bucket/stats') key = s3_conn.get_bucket('test-bucket')\ .get_key('stats/quarterly/VA_2013Q1') expected_stats = { 'total': 0, 'output_counts': {}, 'input_counts': {}, 'output_percentages': {}, 'input_percentages': {}, 'last_updated': '2017-01-10T00:00:00', 'quarter': '2013Q1', } assert json.loads(key.get_contents_as_string().decode('utf-8')) == expected_stats
def test_dataset_stats_aggregator(): with moto.mock_s3(): s3_conn = boto.connect_s3() aggregator = DatasetStatsAggregator(dataset_id='CB', s3_conn=s3_conn) add_s3_content( s3_conn, { 'test-bucket/stats/quarterly/CB_2014Q1': json.dumps(sample_quarter_stats('2014Q1')), 'test-bucket/stats/quarterly/CB_2014Q2': json.dumps(sample_quarter_stats('2014Q2')), 'test-bucket/stats/quarterly/VA_2014Q1': json.dumps(sample_quarter_stats('2014Q1')), } ) with freeze_time('2017-01-10'): aggregator.run('test-bucket/stats') expected_stats = sample_dataset_stats() key = s3_conn.get_bucket('test-bucket')\ .get_key('stats/dataset_summaries/CB.json') assert json.loads(key.get_contents_as_string().decode('utf-8')) == expected_stats
def test_key_values(self, mock_cache): mock_cache.get.return_value = None with freeze_time('2012-01-01'): ExampleKeyedConfig(left='left_a', right='right_a', changed_by=self.user).save() ExampleKeyedConfig(left='left_b', right='right_b', changed_by=self.user).save() ExampleKeyedConfig(left='left_a', right='right_a', changed_by=self.user).save() ExampleKeyedConfig(left='left_b', right='right_b', changed_by=self.user).save() unique_key_pairs = ExampleKeyedConfig.key_values() self.assertEquals(len(unique_key_pairs), 2) self.assertEquals(set(unique_key_pairs), set([('left_a', 'right_a'), ('left_b', 'right_b')])) unique_left_keys = ExampleKeyedConfig.key_values('left', flat=True) self.assertEquals(len(unique_left_keys), 2) self.assertEquals(set(unique_left_keys), set(['left_a', 'left_b']))
def test_current_set(self, mock_cache): mock_cache.get.return_value = None with freeze_time('2012-01-01'): ExampleKeyedConfig(left='left_a', right='right_a', int_field=0, changed_by=self.user).save() ExampleKeyedConfig(left='left_b', right='right_b', int_field=0, changed_by=self.user).save() ExampleKeyedConfig(left='left_a', right='right_a', int_field=1, changed_by=self.user).save() ExampleKeyedConfig(left='left_b', right='right_b', int_field=2, changed_by=self.user).save() queryset = ExampleKeyedConfig.objects.current_set() self.assertEqual(len(queryset.all()), 2) self.assertEqual( set(queryset.order_by('int_field').values_list('int_field', flat=True)), set([1, 2]) )
def test_action_create(question_factory): question = question_factory() project = question.module.project now = timezone.now() with freeze_time(now): action = Action( actor=question.creator, verb=Verbs.ADD.value, obj=question, target=project, description='description' ) assert action.actor == question.creator assert action.verb == Verbs.ADD.value assert action.obj == question assert action.target == project assert action.timestamp == now assert action.public is True assert action.description == 'description'
def test_phase_end_tomorrow(phase_factory): phase = phase_factory( start_date=parse('2013-01-01 17:00:00 UTC'), end_date=parse('2013-01-01 18:00:00 UTC') ) project = phase.module.project action_count = Action.objects.filter(verb=SCHEDULE).count() assert action_count == 0 with freeze_time('2013-01-01 17:30:00 UTC'): call_command('create_system_actions') action_count = Action.objects.filter(verb=SCHEDULE).count() action = Action.objects.filter(verb=SCHEDULE).last() assert action_count == 1 assert action.obj == phase assert action.verb == SCHEDULE assert action.project == project
def test_project_starts_later_or_earlier(phase_factory): phase = phase_factory( start_date=parse('2013-01-01 17:00:00 UTC'), end_date=parse('2013-01-02 18:00:00 UTC') ) with freeze_time(phase.start_date - timedelta(days=1)): call_command('create_system_actions') action_count = Action.objects.filter(verb=START).count() assert action_count == 0 with freeze_time(phase.start_date + timedelta(days=1)): call_command('create_system_actions') action_count = Action.objects.filter(verb=START).count() assert action_count == 0
def test_project_start_single_action(phase_factory): phase = phase_factory( start_date=parse('2013-01-01 17:00:00 UTC'), end_date=parse('2013-01-01 18:00:00 UTC') ) action_count = Action.objects.filter(verb=START).count() assert action_count == 0 with freeze_time(phase.start_date + timedelta(minutes=30)): call_command('create_system_actions') action_count = Action.objects.filter(verb=START).count() assert action_count == 1 # first phase starts within the last hour but script has already run with freeze_time(phase.start_date + timedelta(minutes=45)): call_command('create_system_actions') action_count = Action.objects.filter(verb=START).count() assert action_count == 1
def test_project_start_reschedule(phase_factory): phase = phase_factory( start_date=parse('2013-01-01 17:00:00 UTC'), end_date=parse('2013-01-01 18:00:00 UTC') ) # first phase starts within an hour with freeze_time(phase.start_date + timedelta(minutes=30)): call_command('create_system_actions') action_count = Action.objects.filter(verb=START).count() assert action_count == 1 # first phases start date has been moved forward # and the start actions timestamp has to be adapted phase.start_date = phase.start_date + timedelta(days=1) phase.save() with freeze_time(phase.start_date + timedelta(minutes=30)): call_command('create_system_actions') action_count = Action.objects.filter(verb=START).count() assert action_count == 1 action = Action.objects.filter(verb=START).first() assert action.timestamp == phase.start_date
def test_days_left(project, phase_factory): phase1 = phase_factory( start_date=parse('2013-01-01 18:00:00 UTC'), end_date=parse('2013-01-02 18:00:00 UTC'), module__project=project, ) phase2 = phase_factory( start_date=parse('2013-02-01 18:00:00 UTC'), end_date=parse('2013-02-02 18:00:00 UTC'), module__project=project, ) with freeze_time(phase1.start_date): assert project.days_left is 1 with freeze_time(phase1.end_date): assert project.days_left is None with freeze_time(phase2.start_date): assert project.days_left is 1 with freeze_time(phase2.end_date): assert project.days_left is None
def test_react_rating_anonymous(rf, question, comment): with freeze_time('2013-01-02 18:00:00 UTC'): user = AnonymousUser() props = react_comment_render_for_props(rf, user, question) comments_content_type = ContentType.objects.get_for_model(comment) request = rf.get('/') request.user = user comments = ThreadSerializer( question.comments.all().order_by('-created'), many=True, context={'request': request}).data assert props == { 'comments': comments, 'comments_contenttype': comments_content_type.pk, 'isAuthenticated': False, 'isModerator': False, 'isReadOnly': True, 'user_name': '', }
def test_react_rating_user(rf, user, phase, question, comment): with freeze_time('2013-01-02 18:00:00 UTC'): props = react_comment_render_for_props(rf, user, question) comments_content_type = ContentType.objects.get_for_model(comment) request = rf.get('/') request.user = user comments = ThreadSerializer( question.comments.all().order_by('-created'), many=True, context={'request': request}).data assert props == { 'comments': comments, 'comments_contenttype': comments_content_type.pk, 'isAuthenticated': True, 'isModerator': False, 'isReadOnly': False, 'user_name': user.username, }
def test_manager_finished_phases(phase_factory): old_phase = phase_factory( start_date=parse('2013-01-01 17:00:00 UTC'), end_date=parse('2013-01-01 18:00:00 UTC') ) new_phase = phase_factory( start_date=parse('2013-01-01 18:00:00 UTC'), end_date=parse('2013-01-01 19:00:00 UTC') ) with freeze_time(new_phase.start_date): finished_phases = models.Phase.objects.finished_phases() assert list(finished_phases) == [old_phase] with freeze_time(new_phase.end_date): finished_phases = models.Phase.objects.finished_phases() assert list(finished_phases) == [old_phase, new_phase]
def test_manager_finish_next(phase_factory): phase_today = phase_factory( start_date=parse('2013-01-01 17:00:00 UTC'), end_date=parse('2013-01-01 17:00:01 UTC') ) phase_tomorrow = phase_factory( start_date=parse('2013-01-01 17:00:00 UTC'), end_date=parse('2013-01-02 17:00:00 UTC') ) phase_factory( start_date=parse('2013-01-01 17:00:00 UTC'), end_date=parse('2013-01-02 17:00:03 UTC') ) with freeze_time(phase_today.start_date): finish_phases = models.Phase.objects.finish_next() assert list(finish_phases) == [phase_today, phase_tomorrow] with freeze_time(phase_today.end_date): finish_phases = models.Phase.objects.finish_next() assert list(finish_phases) == [phase_tomorrow]
def test_past_phases(phase_factory): phase1 = phase_factory( start_date=parse('2013-01-01 18:00:00 UTC'), end_date=parse('2013-01-10 18:00:00 UTC'), ) phase2 = phase_factory( start_date=parse('2013-01-05 18:00:00 UTC'), end_date=parse('2013-01-15 18:00:00 UTC'), ) with freeze_time(phase1.start_date): assert list(models.Phase.objects.past_phases()) == [] with freeze_time(phase1.end_date): assert list(models.Phase.objects.past_phases()) == [phase1] with freeze_time(phase2.end_date): assert list(models.Phase.objects.past_phases()) == [phase1, phase2]
def test_future_phases(phase_factory): phase1 = phase_factory( start_date=parse('2013-01-01 18:00:00 UTC'), end_date=parse('2013-01-10 18:00:00 UTC'), ) phase2 = phase_factory( start_date=parse('2013-01-05 18:00:00 UTC'), end_date=parse('2013-01-15 18:00:00 UTC'), ) phase3 = phase_factory( start_date=None, end_date=None ) with freeze_time(phase1.start_date - timedelta(minutes=1)): assert (list(models.Phase.objects.future_phases()) == [phase3, phase1, phase2]) with freeze_time(phase2.start_date - timedelta(minutes=1)): assert list(models.Phase.objects.future_phases()) == [phase3, phase2] with freeze_time(phase2.end_date): assert list(models.Phase.objects.future_phases()) == [phase3]
def test_past_and_active_phases(phase_factory): phase1 = phase_factory( start_date=parse('2013-01-01 18:00:00 UTC'), end_date=parse('2013-01-10 18:00:00 UTC'), ) phase2 = phase_factory( start_date=parse('2013-01-05 18:00:00 UTC'), end_date=parse('2013-01-15 18:00:00 UTC'), ) phase_factory( start_date=None, end_date=None ) with freeze_time(phase1.start_date - timedelta(minutes=1)): assert list(models.Phase.objects.past_and_active_phases()) == [] with freeze_time(phase1.start_date): assert list(models.Phase.objects.past_and_active_phases()) == [phase1] with freeze_time(phase2.start_date): assert (list(models.Phase.objects.past_and_active_phases()) == [phase1, phase2]) with freeze_time(phase2.end_date): assert (list(models.Phase.objects.past_and_active_phases()) == [phase1, phase2])
def test_seconds_until_midnight(self): arg_list = [ "2017-05-29 23:59:59", "2017-05-29 00:00:00", "2017-05-29 00:00:01" ] expected = [ 1, 0, 86399 ] for idx, arg in enumerate(arg_list): with freeze_time(arg): self.assertEqual( how_many_seconds_until_midnight(), expected[idx] )
def test_NightlyResettingDefaultdict(): from shifthelper.tools import NightlyResettingDefaultdict initial_datetime = datetime.datetime(2016, 1, 1) two_days_later = datetime.datetime(2016, 1, 3, 0, 0, 0) with freeze_time(initial_datetime) as frozen_datetime: nightly_max_rate = NightlyResettingDefaultdict(lambda: -np.inf) nightly_max_rate['foo'] = 5 assert nightly_max_rate['foo'] == 5 frozen_datetime.move_to(two_days_later) assert nightly_max_rate['foo'] == -np.inf nightly_max_rate['foo'] = 6 assert nightly_max_rate['foo'] == 6
def test_list_view(rf, phase, module_factory, idea_factory): module = phase.module project = module.project idea = idea_factory(module=module) other_module = module_factory() other_idea = idea_factory(module=other_module) with freeze_time(phase.start_date): view = views.IdeaListView.as_view() request = rf.get('/ideas') response = view(request, project=project, module=module) assert idea in response.context_data['idea_list'] assert other_idea not in response.context_data['idea_list'] assert response.context_data['idea_list'][0].comment_count == 0 assert response.context_data['idea_list'][0].positive_rating_count == 0 assert response.context_data['idea_list'][0].negative_rating_count == 0
def test_create_view(client, phase, user): module = phase.module with freeze_time(phase.start_date): count = models.Idea.objects.all().count() assert count == 0 url = reverse('idea-create', kwargs={'slug': module.slug}) response = client.get(url) assert response.status_code == 302 assert redirect_target(response) == 'account_login' client.login(username=user.email, password='password') response = client.get(url) assert response.status_code == 200 idea = {'name': 'Idea', 'description': 'description'} response = client.post(url, idea) assert response.status_code == 302 assert redirect_target(response) == 'idea-detail' count = models.Idea.objects.all().count() assert count == 1
def test_update_view(client, phase, idea): idea.module = phase.module idea.save() user = idea.creator with freeze_time(phase.start_date): url = reverse('idea-update', kwargs={'slug': idea.slug}) response = client.get(url) assert response.status_code == 302 client.login(username=user.email, password='password') response = client.get(url) assert response.status_code == 200 data = {'description': 'description', 'name': idea.name} response = client.post(url, data) id = idea.pk updated_idea = models.Idea.objects.get(id=id) assert updated_idea.description == 'description' assert response.status_code == 302
def test_ideas_download_contains_right_data(rf, idea_factory, admin): idea = idea_factory() module = idea.module idea_factory(module=module) idea_factory(module=module) now = timezone.now() with freeze_time(now): request = rf.get('/ideas/download/module/{}'.format(module.slug)) request.user = admin response = views.IdeaDownloadView.as_view()(request, slug=module.slug) assert response.status_code == 200 assert (response._headers['content-type'] == ('Content-Type', 'application/vnd.openxmlformats-officedocument' '.spreadsheetml.sheet')) assert (response._headers['content-disposition'] == ('Content-Disposition', 'attachment; filename="{}_{}.xlsx"' .format( module.project.slug, now.strftime('%Y%m%dT%H%M%S'))))
def test_phase_dispatch_mixin_return_active_phase( rf, project_detail_view, phase1, phase2 ): project = phase1.module.project project_url = reverse('project-detail', args=[project.slug]) with freeze_time(phase1.start_date): # Requesting garbage should return the currently active phase. request = rf.get("{0}?phase={1}".format(project_url, "A"*100)) response = project_detail_view(request, slug=project.slug) assert FakePhase0View.template_name in response.template_name assert FakePhase1View.template_name not in response.template_name # Without any further specification via '?phase=' return the # active phase. request = rf.get(project_url) response = project_detail_view(request, slug=project.slug) assert FakePhase0View.template_name in response.template_name assert FakePhase1View.template_name not in response.template_name
def setUp(self): # Create an 'ephemeral' token so we can test token timeouts. We # want a timeout long enough to last the test, but we don't want to # slow down the tests too much either. self.normal_creds = TokenCredentials(self.key_path, self.key_id, self.team_id) self.lasting_header = self.normal_creds.get_authorization_header( self.topics[0]) with freeze_time('2012-01-14'): self.expiring_creds = \ TokenCredentials(self.key_path, self.key_id, self.team_id, token_lifetime=self.token_lifetime) self.expiring_header = self.expiring_creds.get_authorization_header( self.topics[0])
def test_find_entries(): "EntryLoader.find_entries can find entries for the specified blog" db = DB() blog = create_blog() entries = [] with freeze_time('2017-01-13 12:00:02'): entries.append(create_entry(blog=blog)) with freeze_time('2017-01-13 12:00:01'): entries.append(create_entry(blog=blog)) with freeze_time('2017-01-13 12:00:00'): entries.append(create_entry(blog=blog)) found_entries = EntryLoader.find_entries(db, blog.id) assert len(found_entries) == len(entries) assert [e.id for e in found_entries] == [e.id for e in entries] found_entries_with_limit = EntryLoader.find_entries(db, blog.id, limit=2) assert len(found_entries_with_limit) == len(entries[0:2]) assert [ e.id for e in found_entries_with_limit ] == [e.id for e in entries[0:2]]
def test_index_with_entries(): db = DB() with global_user(random_string(5)): blog = BlogAction.ensure_global_blog_created(db) entries = [] with freeze_time('2017-01-13 12:00:02'): entries.append(create_entry(blog=blog)) with freeze_time('2017-01-13 12:00:01'): entries.append(create_entry(blog=blog)) with freeze_time('2017-01-13 12:00:00'): entries.append(create_entry(blog=blog)) res = web_client().get('/') assert res.status == '200 OK' d = pq(res.data) assert [ int(d(a).attr('data-entry-id')) for a in d('.entry') ] == [e.id for e in entries]
def test_publish_update_rotating(): """Test if update rotating publishes works.""" with test.clean_and_config(os.path.join( _test_base, b"publish-current.yml", )) as (tyml, config): do_publish_create_rotating(config) with freezegun.freeze_time("2012-10-11 10:10:10"): args = [ '-c', config, 'publish', 'update', ] main(args) state = SystemStateReader() state.read() expect = { u'fake/current stable': set([u'fake-current']), u'fakerepo01/current stable': set([u'fakerepo01-current']), u'fakerepo02/current stable': set([u'fakerepo02-current']) } assert expect == state.publish_map
def test_publish_snapshot_update_rotating(): """Test if update rotating publishes via snapshot works.""" with test.clean_and_config(os.path.join( _test_base, b"publish-current.yml", )) as (tyml, config): do_publish_create_rotating(config) with freezegun.freeze_time("2012-10-11 10:10:10"): args = [ '-c', config, 'snapshot', 'update', ] main(args) state = SystemStateReader() state.read() expect = { u'fake/current stable': set([u'fake-current']), u'fakerepo01/current stable': set([u'fakerepo01-current']), u'fakerepo02/current stable': set([u'fakerepo02-current']) } assert expect == state.publish_map
def test_retrieve_api_token_expiring_memo(token_retrieve_200): authenticator = AlexaVoiceServiceTokenAuthenticator( client_id='debug', secret='debug', refresh_token='debug' ) with freeze_time('3012-01-14 12:00:00'): authenticator.retrieve_api_token() authenticator.retrieve_api_token() assert len(token_retrieve_200.request_history) == 1 with freeze_time('3012-01-14 13:00:00'): authenticator.retrieve_api_token() assert len(token_retrieve_200.request_history) == 2 with freeze_time('3012-01-14 13:30:00'): authenticator.retrieve_api_token() assert len(token_retrieve_200.request_history) == 2 with freeze_time('3012-01-14 14:00:00'): authenticator.retrieve_api_token() assert len(token_retrieve_200.request_history) == 3
def test_edited_is_false_for_newly_created_content_within_15_minutes_grace_period(self): with freeze_time(self.public_content.created + datetime.timedelta(minutes=14)): self.public_content.save() self.assertFalse(self.public_content.edited)
def test_edited_is_true_for_newly_created_content_after_15_minutes_grace_period(self): with freeze_time(self.public_content.created + datetime.timedelta(minutes=16)): self.public_content.save() self.assertTrue(self.public_content.edited)
def test_dict_for_view_edited_post(self): with freeze_time(self.public_content.created + datetime.timedelta(minutes=16)): self.public_content.save() self.assertEqual(self.public_content.dict_for_view(self.user), { "author": self.public_content.author_id, "author_guid": self.public_content.author.guid, "author_handle": self.public_content.author.handle, "author_home_url": self.public_content.author.home_url, "author_image": self.public_content.author.safer_image_url_small, "author_is_local": bool(self.public_content.author.user), "author_name": self.public_content.author.handle, "author_profile_url": self.public_content.author.get_absolute_url(), "content_type": self.public_content.content_type.string_value, "delete_url": reverse("content:delete", kwargs={"pk": self.public_content.id}), "detail_url": self.public_content.get_absolute_url(), "formatted_timestamp": self.public_content.timestamp, "guid": self.public_content.guid, "has_shared": False, "humanized_timestamp": "%s (edited)" % self.public_content.humanized_timestamp, "id": self.public_content.id, "is_authenticated": True, "is_author": True, "is_following_author": False, "parent": "", "profile_id": self.public_content.author.id, "rendered": self.public_content.rendered, "reply_count": 0, "reply_url": reverse("content:reply", kwargs={"pk": self.public_content.id}), "shares_count": 0, "slug": self.public_content.slug, "through": self.public_content.id, "update_url": reverse("content:update", kwargs={"pk": self.public_content.id}), })