我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用django.utils.timezone.timedelta()。
def test_update_and_stop_seeding_that_seeding_return_none(self, mock_get_torrent, mock_hset): mock_get_torrent.return_value = self.torrent( status='seeding', progress=Decimal('100.00'), ratio=Decimal('9.99'), rateUpload=10500, rateDownload=105000, stop=mock_stop ) self.torrent_model.created = timezone.now() + timezone.timedelta(hours=-24, seconds=-1) self.torrent_model.save() self.assertIsNone(update_and_stop_seeding(self.torrent_model.pk)) mock_get_torrent.assert_called_with(self.torrent_model.hash) mock_hset.assert_called_with('torrent:{}'.format(self.torrent_model.pk), 'rate_upload', 0)
def setUp(self): division = Division.objects.create( id='ocd-division/country:us', name='USA') jur1 = Jurisdiction.objects.create( id="ocd-division/country:us/state:mo", name="Missouri State Senate", url="http://www.senate.mo.gov", division=division, ) jur2 = Jurisdiction.objects.create( id="ocd-division/country:us/state:da", name="Dausa State Senate", url="http://www.senate.da.gov", division=division, ) start_time = timezone.now() end_time = start_time + timezone.timedelta(minutes=10) RunPlan.objects.create(jurisdiction=jur1, success=True, start_time=start_time, end_time=end_time) start_time = end_time + timezone.timedelta(minutes=10) end_time = start_time + timezone.timedelta(minutes=10) RunPlan.objects.create(jurisdiction=jur2, success=True, start_time=start_time, end_time=end_time)
def setUp(self): division = Division.objects.create( id='ocd-division/country:us', name='USA') jur1 = Jurisdiction.objects.create( id="ocd-division/country:us/state:mo", name="Missouri State Senate", url="http://www.senate.mo.gov", division=division, ) start_time = timezone.now() end_time = start_time + timezone.timedelta(minutes=10) RunPlan.objects.create(jurisdiction=jur1, success=True, start_time=start_time, end_time=end_time) LegislativeSession.objects.create(jurisdiction=jur1, identifier="2017", name="2017 Test Session", start_date="2017-06-25", end_date="2017-06-26")
def test_project_create(me, api): project_data = { 'name': 'my_project', 'description': 'blah', } resp = api.post('/api/projects/', project_data) created_project_data = resp.json() assert resp.status_code == 201, created_project_data resp = api.get('/api/projects/{}/'.format(created_project_data['id'])) fetched_project_data = resp.json() assert resp.status_code == 200, fetched_project_data assert fetched_project_data == created_project_data assert fetched_project_data['name'] == project_data['name'] assert fetched_project_data['description'] == project_data['description'] assert fetched_project_data['owner'] == me.id creation_date = parse_date(fetched_project_data['creationDate']) assert (timezone.now() - creation_date) < timedelta(seconds=5)
def check_user_state(): '''?????????????????????''' users = User.objects.filter(level__gt=0) for user in users: # ?????????????? if timezone.now() - timezone.timedelta(days=1) > user.level_expire_time: user.ss_user.enable = False user.ss_user.upload_traffic = 0 user.ss_user.download_traffic = 0 user.ss_user.transfer_enable = settings.DEFAULT_TRAFFIC user.ss_user.save() user.level = 0 user.save() logs = 'time: {} use: {} level timeout '.format(timezone.now().strftime('%Y-%m-%d'), user.username).encode('utf8') print(logs) print('Time:{} CHECKED'.format(timezone.now()))
def delete_old(request): """Delete old messages""" old_at = timezone.now() - timezone.timedelta( days=settings.OLD_MSG_THRESHOLD) q_sender = Q(sender__username__iexact=request.user.username) & Q( sender_status='1normal') & Q(created_at__lte=old_at) msgs = Msg.objects.filter(q_sender) for msg in msgs: msg.sender_status = '6deleted' msg.save() q_recipient = Q(recipient__username__iexact=request.user.username) \ & (Q(recipient_status='1normal') | Q( recipient_status='2read')) & Q(created_at__lte=old_at) msgs = Msg.objects.filter(q_recipient) for msg in msgs: msg.recipient_status = '6deleted' msg.save() return redirect('msgs:inbox', page=1)
def shortdate(value): now = timezone.template_localtime(timezone.now()) diff = now - value if diff < timezone.timedelta(): return date_format(value, "DATE_FORMAT") elif diff.days < 1: return _('today') elif diff.days < 2: return _('yesterday') elif diff.days < 30: return ungettext_lazy('%d day ago', '%d days ago') % diff.days else: months = diff.days // 30 if months <= 6: return ungettext_lazy('a month ago', '%d months ago') % months else: return date_format(value, "DATE_FORMAT")
def open_bets(request): # used for expiring soon and new bet tags tomorrow = timezone.now() + timezone.timedelta(days=1) yesterday = timezone.now() + timezone.timedelta(days=-1) # get the current user current_user = request.user # get all open prop bets from other users open_bets = ProposedBet.objects.filter(remaining_wagers__gt=0, end_date__gt=timezone.now(), won_bet__isnull=True).exclude(user=current_user) # get all bets created in past 24 hours new_bets = ProposedBet.objects.filter(remaining_wagers__gt=0, end_date__gt=timezone.now(), created_on__gt=yesterday, won_bet__isnull=True).exclude(user=current_user) # get all bets expiring in next 24 hours closing_soon_bets = ProposedBet.objects.filter(remaining_wagers__gt=0, end_date__gt=timezone.now(), end_date__lt=tomorrow, won_bet__isnull=True).exclude(user=current_user) return render(request, 'bets/base_open_bets.html', {'nbar': 'open_bets', 'open_bets': open_bets, 'new_bets': new_bets, 'closing_soon_bets': closing_soon_bets})
def change(self, request): if not self.is_mine(request.user) or self.has_edit: return 1 if len(request.POST['body']) > 2200 or len(request.POST['body']) < 1: return 1 if not self.befores: befores_json = [] else: befores_json = json.loads(self.befores) befores_json.append(self.body) self.befores = json.dumps(befores_json) self.body = request.POST['body'] self.spoils = request.POST.get('is_spoiler', False) self.feeling = request.POST.get('feeling_id', 0) if not timezone.now() < self.created + timezone.timedelta(minutes=2): self.has_edit = True return self.save()
def give_notification(user, type, to, post=None, comment=None): # Just keeping this simple for now, might want to make it better later # If the user sent a notification to this user at least 5 seconds ago, return False # Or if user is to # Or if yeah notifications are off and this is a yeah notification user_is_self_unk = (not type == 3 and user == to) is_notification_too_fast = (user.notification_sender.filter(created__gt=timezone.now() - timedelta(seconds=5), type=type).exclude(type=4) and not type == 3) user_no_yeahnotif = (not to.let_yeahnotifs() and (type == 0 or type == 1)) if user_is_self_unk or is_notification_too_fast or user_no_yeahnotif: return False # Search for my own notifiaction. If it exists, set it as unread. merge_own = user.notification_sender.filter(created__gt=timezone.now() - timedelta(hours=8), to=to, type=type, context_post=post, context_comment=comment) if merge_own: # If it's merged, don't unread that one, but unread what it's merging. return merge_own.first().set_unread() # Search for a notification already there so we can merge with it if it exists merge_s = Notification.objects.filter(created__gt=timezone.now() - timedelta(hours=8), to=to, type=type, context_post=post, context_comment=comment) # If it exists, merge with it. Else, create a new notification. if merge_s: return merge_s.first().merge(user) else: return user.notification_sender.create(source=user, type=type, to=to, context_post=post, context_comment=comment)
def get_friendships(user, limit=50, offset=0, latest=False, online_only=False): if not limit: return Friendship.objects.filter(Q(source=user) | Q(target=user)).order_by('-created') if latest: if online_only: delta = timezone.now() - timedelta(seconds=48) awman = [] for friend in Friendship.objects.filter(Q(source=user) | Q(target=user)).order_by('-latest')[offset:offset + limit]: if friend.other(user).last_login > delta: awman.append(friend) return awman # Fix all of this at some point # return Friendship.objects.filter( #source=When(source__ne=user, source__last_login__gt=delta), #target=When(target__ne=user, target__last_login__gt=delta) #).order_by('-latest')[offset:offset + limit] else: return Friendship.objects.filter(Q(source=user) | Q(target=user)).order_by('-latest')[offset:offset + limit] else: return Friendship.objects.filter(Q(source=user) | Q(target=user)).order_by('-created')[offset:offset + limit]
def make_message(self, request): if not request.user.has_freedom() and (request.POST.get('url') or request.FILES.get('screen')): return 6 if Message.real.filter(creator=request.user, created__gt=timezone.now() - timedelta(seconds=2)).exists(): return 3 if len(request.POST['body']) > 50000 or (len(request.POST['body']) < 1 and not request.POST.get('_post_type') == 'painting'): return 1 upload = None drawing = None if request.FILES.get('screen'): upload = util.image_upload(request.FILES['screen'], True) if upload == 1: return 2 if request.POST.get('_post_type') == 'painting': if not request.POST.get('painting'): return 2 drawing = util.image_upload(request.POST['painting'], False, True) if drawing == 1: return 2 new_post = self.message_set.create(body=request.POST.get('body'), creator=request.user, feeling=int(request.POST.get('feeling_id', 0)), drawing=drawing, screenshot=upload) new_post.mine = True return new_post
def __init__(self, begin=None, end=None): if begin: self.begin = arrow.Arrow.strptime(begin, '%Y-%m-%d', settings.TIME_ZONE) if begin else arrow.now() self.begin = self.begin.floor('day').to('UTC').datetime elif end: to = arrow.Arrow.strptime(end, '%Y-%m-%d', settings.TIME_ZONE).floor('day').to('UTC').datetime self.begin = to - timezone.timedelta(days=settings.EVENTS_CALENDAR_PERIOD) else: self.begin = arrow.now() self.begin = self.begin.floor('day').to('UTC').datetime if end: self.end = arrow.Arrow.strptime(end, '%Y-%m-%d', settings.TIME_ZONE).floor('day').to('UTC').datetime else: self.end = self.begin + timezone.timedelta(days=settings.EVENTS_CALENDAR_PERIOD) self.events = Event.objects.get_by_dates(begin=self.begin, end=self.end)
def test_get_for_closing(self): now = arrow.now().floor('day').to('UTC').datetime event_before = Event() event_before.begin = now - timezone.timedelta(days=3) event_before.end = now - timezone.timedelta(days=4) event_before.title = 'test_title_now' event_before.status = 'open' event_before.save() event_after = copy.copy(event_before) event_after.id = None event_after.begin = now + timezone.timedelta(days=3) event_after.end = now + timezone.timedelta(days=4) event_after.save() queryset = Event.objects.get_for_closing() self.assertTrue(event_before in queryset) self.assertTrue(event_after not in queryset) self.assertEqual(queryset.count(), 2) for event in queryset: self.assertTrue(event.end < now) self.assertTrue(event.paid >= event.total)
def test_calendar(self): calendar = Calendar() now = arrow.now().floor('day').to('UTC').datetime week = now + timezone.timedelta(days=settings.EVENTS_CALENDAR_PERIOD) self.assertEqual(now, calendar.begin) self.assertEqual(week, calendar.end) event = Event() event.begin = now + timezone.timedelta(days=3) event.end = now + timezone.timedelta(days=4) event.title = 'test_title_now' event.status = 'open' event.save() days = calendar.get_days() self.assertEqual(settings.EVENTS_CALENDAR_PERIOD, len(days)) for element in days: if event.begin <= element.date < event.end: self.assertIn(event, element.events) for hour in element.hours: if event.begin <= hour.date < event.end: self.assertIn(event, hour.events)
def test_check_callback_and_save(self): # we ensure a user may by default not send more than 50 messages a day Message.objects.filter(sender=self.participant1).count() r = 100 for i in range(r): try: m = Message(sender=self.participant1, thread=self.thread1, body="hi") m.save() except Exception: pass # we have more than 50 messages (more in setUp) self.assertEqual(50, Message.objects.filter(sender=self.participant1).count()) # the limit is only in the last 24 hours Message.objects.filter(sender=self.participant1).update(sent_at=now() - timedelta(days=1, seconds=1)) last = Message.objects.filter(sender=self.participant1).latest('id') new = Message.objects.create(sender=self.participant1, thread=self.thread1, body="hi") self.assertEqual(new.id, last.id + 1)
def test_get_lasts_messages_of_threads_check_who_read(self): # participant 1 and 2 have read the messages, 3 no self.participation1.date_last_check = now() + timedelta(days=1) self.participation1.save() self.participation2.date_last_check = now() + timedelta(days=1) self.participation2.save() # we do not modify self.participation3.date_last_check # this means participant 1 and 2 have read the message, not 3 with self.assertNumQueries(5): messages = Message.managers.get_lasts_messages_of_threads(self.participant1.id, check_who_read=True, check_is_notification=False) # the ordering has not been modified self.assertEqual([self.m33.id, self.m22.id, self.m11.id], [m.id for m in messages]) # participant 1 and 2 have read Thread 1 self.assertTrue(self.participant1.id in messages[2].readers) self.assertTrue(self.participant2.id in messages[2].readers) self.assertFalse(self.participant3.id in messages[0].readers)
def test_get_lasts_messages_of_threads_check_is_notification_check_who_read(self): # participant 1 and 2 have read the messages, 3 no self.participation1.date_last_check = now() + timedelta(days=1) self.participation1.save() self.participation2.date_last_check = now() + timedelta(days=1) self.participation2.save() # we create a notification check with self.assertNumQueries(6): messages = Message.managers.get_lasts_messages_of_threads(self.participant1.id, check_who_read=True, check_is_notification=True) # the ordering has not been modified self.assertEqual([self.m33.id, self.m22.id, self.m11.id], [m.id for m in messages]) # the second conversation has no notification because last reply comes from the current participant self.assertEqual(messages[1].is_notification, False) # the third (first in ordering) conversation has new messages self.assertEqual(messages[0].is_notification, True) # participant 1 and 2 have read Thread 1 self.assertTrue(self.participant1.id in messages[2].readers)
def test_list_messages_in_thread(self): # no authentication response = self.client_unauthenticated.get("{0}{1}/list_messages_in_thread/".format(self.url, self.thread1.id)) self.assertEqual(403, response.status_code) # no permission response = self.client_authenticated.get("{0}{1}/list_messages_in_thread/".format(self.url, self.thread_unrelated.id)) self.assertEqual(403, response.status_code) # ok # participant 3 has read the 2 last messages, 1 only the first p1 = Participation.objects.create(participant=self.participant3, thread=self.thread3) p1.date_last_check = now() - timedelta(days=1) p1.save() p2 = Participation.objects.create(participant=self.participant1, thread=self.thread3) p2.date_last_check = now() - timedelta(days=2) p2.save() # we change the date of the messages self.m31.sent_at = p1.date_last_check = now() - timedelta(days=3) self.m31.save() self.m32.sent_at = p1.date_last_check = now() - timedelta(days=1, hours=12) self.m32.save() response = self.client_authenticated.get("{0}{1}/list_messages_in_thread/".format(self.url, self.thread3.id)) messages_dct = parse_json_response(response.data) messages = messages_dct["results"] self.assertEqual([self.m33.id, self.m32.id, self.m31.id], [m["id"] for m in messages]) self.assertEqual([set([]), set([self.participant3.id]), set([self.participant1.id, self.participant3.id])], [set(m["readers"]) for m in messages])
def form_valid(self, form): # Get data latitude = form.cleaned_data['latitude'] longitude = form.cleaned_data['longitude'] # Get today's date now = timezone.now() # Get next week's date next_week = now + timezone.timedelta(weeks=1) # Get Point location = Point(longitude, latitude, srid=4326) # Look up events events = Event.objects.filter(datetime__gte=now).filter(datetime__lte=next_week).annotate(distance=Distance('venue__location', location)).order_by('distance')[0:5] # Render the template return render_to_response('gigs/lookupresults.html', { 'events': events })
def parse_daily_schedule(self, table, date): """Get daily schedule for t.cast channel.""" date_format = timezone.datetime.strftime(date, "%Y%m%d") next_date = date + timezone.timedelta(days=1) daily_schedule = [] # Get schedule for hour in range(24): date_hour_string = date_format + '{:02d}'.format(hour) cell = table.find('td', id=date_hour_string) if cell is None: return None schedules = cell.find_all('div', class_='con active') for schedule in schedules: if hour in range(self.start_hour): # Next day's schedule. daily_schedule.append(self.parse_schedule_item(schedule, next_date)) else: daily_schedule.append(self.parse_schedule_item(schedule, date)) # Add to list return daily_schedule
def test_access_is_forbidden_when_interviewer_for_already_passed_course(self): old_app_info = ApplicationInfoFactory( start_date=timezone.now() - timezone.timedelta(days=10), end_date=timezone.now() - timezone.timedelta(days=5), start_interview_date=timezone.now() - timezone.timedelta(days=4), end_interview_date=timezone.now() - timezone.timedelta(days=2), ) other_user = BaseUserFactory(password=self.test_password) other_interviewer = Interviewer.objects.create_from_user(other_user) add_course_to_interviewer_courses(interviewer=other_interviewer, course=old_app_info.course) with self.login(email=other_interviewer.email, password=self.test_password): response = self.get(self.url) self.response_403(response)
def test_can_add_free_time_if_interviewer(self): free_time_count = InterviewerFreeTime.objects.count() start_time = faker.time_object() end_time = (timezone.datetime.combine(timezone.now(), start_time) + timezone.timedelta(seconds=1)).time() field_names = ('interview_time_length', 'break_time') data = { 'date': (timezone.now() + timezone.timedelta(days=1)).date(), 'start_time': start_time, 'end_time': end_time, field_names[0]: InterviewerFreeTime._meta.get_field(field_names[0]).get_default(), field_names[1]: InterviewerFreeTime._meta.get_field(field_names[1]).get_default() } with self.login(email=self.interviewer.email, password=self.test_password): response = self.post(self.url, data=data) self.assertEqual(302, response.status_code) self.assertEqual(free_time_count + 1, InterviewerFreeTime.objects.count()) self.assertIsNotNone(self.interviewer.free_time_slots)
def test_creates_competition_when_data_is_valid(self): competition_count = Competition.objects.count() start_date = timezone.now() end_date = start_date + timezone.timedelta(days=2) data = { 'name': faker.name(), 'start_date': start_date.date(), 'end_date': end_date.date(), 'slug_url': faker.slug() } with self.login(email=self.superuser.email, password=self.test_password): response = self.post(url_name=self.url, data=data) self.response_302(response) self.assertEqual(competition_count + 1, Competition.objects.count())
def test_course_is_created_successfully_on_post(self): self.assertEqual(0, Course.objects.count()) start_date = faker.date_object() data = { 'name': faker.word(), 'start_date': start_date, 'end_date': start_date + timezone.timedelta(days=faker.pyint()), 'repository': faker.url(), 'video_channel': faker.url(), 'facebook_group': faker.url(), 'slug_url': faker.slug(), } with self.login(email=self.user.email, password=self.test_password): response = self.post(self.url, data=data) self.assertRedirects(response, expected_url=reverse('dashboard:education:user-course-detail', kwargs={'course_id': Course.objects.last().id})) self.assertEqual(1, Course.objects.count())
def test_create_application_info_creates_application_info_when_data_is_valid(self): self.start_date = timezone.now().date() + timezone.timedelta(days=1) self.end_date = timezone.now().date() + timezone.timedelta(days=2) self.start_interview_date = timezone.now().date() + timezone.timedelta(days=1) self.end_interview_date = timezone.now().date() + timezone.timedelta(days=2) current_app_info_count = ApplicationInfo.objects.count() create_application_info(start_date=self.start_date, end_date=self.end_date, course=self.course, start_interview_date=self.start_interview_date, end_interview_date=self.end_interview_date) self.assertEqual(current_app_info_count + 1, ApplicationInfo.objects.count())
def test_post_creates_instance_when_data_is_valid(self): add_teacher(course=self.course, teacher=self.teacher) self.course.start_date = timezone.now().date() + timezone.timedelta(days=1) self.course.end_date = timezone.now().date() + timezone.timedelta(days=2) self.course.save() current_app_info_count = ApplicationInfo.objects.count() self.start_date = timezone.now().date() + timezone.timedelta(days=1) self.end_date = timezone.now().date() + timezone.timedelta(days=2) data = { 'start_date': self.start_date, 'end_date': self.end_date, } with self.login(email=self.user.email, password=self.test_password): response = self.post(self.url, data=data) self.assertRedirects(response, expected_url=reverse('dashboard:education:user-course-detail', kwargs={'course_id': self.course.id})) self.assertEqual(current_app_info_count + 1, ApplicationInfo.objects.count())
def test_post_successfully_creates_application_when_apply_is_open(self): self.app_info.start_date = timezone.now().date() self.app_info.end_date = timezone.now().date() + timezone.timedelta(days=2) self.app_info.save() current_app_count = Application.objects.count() data = { 'phone': faker.phone_number(), 'works_at': faker.job(), 'skype': faker.word() } with self.login(email=self.user.email, password=self.test_password): response = self.post(self.url, data=data) self.assertRedirects(response, expected_url=reverse('dashboard:applications:user-applications')) self.assertEqual(current_app_count + 1, Application.objects.count())
def test_post_does_not_create_application_when_apply_is_closed(self): self.app_info.start_date = timezone.now().date() - timezone.timedelta(days=2) self.app_info.end_date = timezone.now().date() - timezone.timedelta(days=1) self.app_info.save() current_app_count = Application.objects.count() data = { 'phone': faker.phone_number(), 'works_at': faker.job() } with self.login(email=self.user.email, password=self.test_password): response = self.post(self.url, data=data) self.assertFalse(response.context_data['form'].is_valid()) self.assertEqual(current_app_count, Application.objects.count())
def test_post_redirects_when_user_has_already_applied(self): self.app_info.start_date = timezone.now().date() self.app_info.end_date = timezone.now().date() + timezone.timedelta(days=2) self.app_info.save() data = { 'phone': faker.phone_number(), 'works_at': faker.job(), 'skype': faker.word() } create_application(user=self.user, application_info=self.app_info, skype=data.get('skype')) with self.login(email=self.user.email, password=self.test_password): response = self.post(self.url, data=data) expected_url = reverse('dashboard:applications:user-applications') self.assertRedirects(response, expected_url=expected_url)
def test_sends_mail_to_address_upon_successful_application(self, mock_send_mail): self.app_info.start_date = timezone.now().date() self.app_info.end_date = timezone.now().date() + timezone.timedelta(days=2) self.app_info.save() data = { 'phone': faker.phone_number(), 'works_at': faker.job(), 'skype': faker.word } with self.login(email=self.user.email, password=self.test_password): response = self.post(self.url, data=data) self.assertRedirects(response, expected_url=reverse('dashboard:applications:user-applications')) self.assertEqual(mock_send_mail.called, True) (template_name, recipients, context), kwargs = mock_send_mail.call_args self.assertEqual([self.user.email], recipients)
def setUp(self): self.test_password = faker.password() self.user = BaseUserFactory(password=self.test_password) self.user.is_active = True self.user.save() self.course = CourseFactory( start_date=timezone.now() + timezone.timedelta(days=10), end_date=timezone.now() + timezone.timedelta(days=20) ) self.app_info = ApplicationInfoFactory( start_date=timezone.now(), end_date=timezone.now() + timezone.timedelta(days=4), start_interview_date=timezone.now() + timezone.timedelta(days=5), end_interview_date=timezone.now() + timezone.timedelta(days=6), course=self.course ) self.application = ApplicationFactory(application_info=self.app_info, user=self.user) self.url = reverse('dashboard:applications:user-applications')
def setUp(self): self.test_password = faker.password() self.user = BaseUserFactory(password=self.test_password) self.user.is_active = True self.user.save() self.course = CourseFactory( start_date=timezone.now() + timezone.timedelta(days=10), end_date=timezone.now() + timezone.timedelta(days=20) ) self.app_info = ApplicationInfoFactory( start_date=timezone.now(), end_date=timezone.now() + timezone.timedelta(days=4), start_interview_date=timezone.now() + timezone.timedelta(days=5), end_interview_date=timezone.now() + timezone.timedelta(days=6), course=self.course ) self.application = ApplicationFactory(application_info=self.app_info, user=self.user) self.url = reverse('dashboard:applications:edit-application', kwargs={ 'course_id': self.course.id })
def setUp(self): self.start_date = timezone.now().date() self.end_date = timezone.now().date() + timezone.timedelta(days=2) self.open_applications = ApplicationInfoFactory.create_batch(size=5, start_date=self.start_date, end_date=self.end_date, start_interview_date=self.start_date, end_interview_date=self.end_date) false_start_date = timezone.now().date() - timezone.timedelta(days=2) false_end_date = timezone.now().date() - timezone.timedelta(days=1) self.closed_applications = ApplicationInfoFactory.create_batch( size=5, start_date=false_start_date, end_date=false_end_date, start_interview_date=false_start_date, end_interview_date=false_end_date)
def test_set_next_notification_date(self, now_mock): """ Notifications: Test if next notification date is set """ now_mock.return_value = timezone.make_aware( timezone.datetime(2016, 11, 16)) now = timezone.localtime(timezone.now()) tomorrow = (timezone.localtime(timezone.now()) + timezone.timedelta(hours=24)).date() settings = NotificationSetting.get_solo() settings.next_notification = now settings.save() dsmr_notification.services.set_next_notification(settings, now) self.assertEqual(settings.next_notification, tomorrow)
def create_notification_message(day, stats): """ Create the action notification message :param day: :param stats: :return: """ capabilities = dsmr_backend.services.get_capabilities() day_date = (day - timezone.timedelta(hours=1)).strftime("%d-%m-%Y") message = _('Your daily usage statistics for {}\n').format(day_date) if capabilities['electricity']: electricity_merged = dsmr_consumption.services.round_decimal(stats.electricity_merged) message += _('Electricity consumed: {} kWh\n').format(electricity_merged) if capabilities['electricity_returned']: electricity_returned_merged = dsmr_consumption.services.round_decimal(stats.electricity_returned_merged) message += _('Electricity returned: {} kWh\n').format(electricity_returned_merged) if capabilities['gas']: gas = dsmr_consumption.services.round_decimal(stats.gas) message += _('Gas consumed: {} m3\n').format(gas) message += _('Total cost: € {}').format(dsmr_consumption.services.round_decimal(stats.total_cost)) return message
def test_check_interval_restriction(self, now_mock, create_backup_mock): """ Test whether backups are restricted by one backup per day. """ now_mock.return_value = timezone.make_aware(timezone.datetime(2016, 1, 1, hour=1, minute=5)) # Fake latest backup. now = timezone.localtime(timezone.now()) backup_settings = BackupSettings.get_solo() backup_settings.latest_backup = now backup_settings.backup_time = (now - timezone.timedelta(minutes=1)).time() backup_settings.save() self.assertIsNotNone(BackupSettings.get_solo().latest_backup) self.assertFalse(create_backup_mock.called) # Should not do anything. dsmr_backup.services.backup.check() self.assertFalse(create_backup_mock.called) backup_settings.latest_backup = now - timezone.timedelta(days=1) backup_settings.save() # Should be fine to backup now. dsmr_backup.services.backup.check() self.assertTrue(create_backup_mock.called)
def test_check_backup_time_restriction(self, now_mock, create_backup_mock): """ Test whether backups are restricted by user's backup time preference. """ now_mock.return_value = timezone.make_aware(timezone.datetime(2016, 1, 1, hour=1, minute=5)) now = timezone.localtime(timezone.now()) backup_settings = BackupSettings.get_solo() backup_settings.latest_backup = now - timezone.timedelta(days=1) backup_settings.backup_time = (now + timezone.timedelta(seconds=15)).time() backup_settings.save() # Should not do anything, we should backup a minute from now. self.assertFalse(create_backup_mock.called) dsmr_backup.services.backup.check() self.assertFalse(create_backup_mock.called) # Should be fine to backup now. Passed prefered time of user. backup_settings.backup_time = now.time() backup_settings.save() dsmr_backup.services.backup.check() self.assertTrue(create_backup_mock.called)
def test_sync_last_modified(self, now_mock, upload_chunked_mock, get_backup_directory_mock): """ Test whether syncs are skipped when file was not modified. """ now_mock.return_value = timezone.make_aware(timezone.datetime(2016, 1, 1)) dropbox_settings = DropboxSettings.get_solo() dropbox_settings.latest_sync = timezone.now() - timezone.timedelta(weeks=1) dropbox_settings.save() with tempfile.TemporaryDirectory() as temp_dir: get_backup_directory_mock.return_value = temp_dir temp_file = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False) temp_file.write(b'Meh.') temp_file.flush() # 1420070400: 01 Jan 2015 00:00:00 GMT os.utime(temp_file.name, times=(1420070400, 1420070400)) self.assertFalse(upload_chunked_mock.called) # File should be ignored, as it's modification timestamp is before latest sync. dsmr_backup.services.dropbox.sync() self.assertFalse(upload_chunked_mock.called)
def insert_notifications(apps, schema_editor): import dsmr_frontend.services Notification = apps.get_model('dsmr_frontend', 'Notification') # Search for any applied migrations in the past. This should indicate a long(er) living instance of the project. existing_project = MigrationRecorder.Migration.objects.filter( applied__lt=timezone.now() - timezone.timedelta(hours=24) ).exists() if existing_project: return Notification.objects.create( message=dsmr_frontend.services.get_translated_string( text=_('Welcome to DSMR-reader! Please make sure to check your settings in the Configuration page!') ), redirect_to='admin:index' ) Notification.objects.create( message=dsmr_frontend.services.get_translated_string( text=_('You may check the status of your readings and data in the Status page.') ), redirect_to='frontend:status' )
def test_status_back_to_the_future(self, now_mock): """ Test some weird situation having the smart meter reporting a future timestamp in the telegrams. """ if not self.support_data: self.skipTest('No data') now_mock.return_value = timezone.make_aware(timezone.datetime(2017, 2, 1)) latest_reading = DsmrReading.objects.all().order_by('-timestamp')[0] DsmrReading.objects.exclude(pk=latest_reading.pk).delete() latest_reading.timestamp = timezone.now() + timezone.timedelta(seconds=15) # Future reading. latest_reading.save() response = self.client.get( reverse('{}:status'.format(self.namespace)) ) self.assertEqual(response.status_code, 200) # These should be reset for convenience. self.assertEqual(response.context['latest_reading'].timestamp, timezone.now()) self.assertEqual(response.context['delta_since_latest_reading'], 0)
def test_next_sync_setting_retroactive(self): """ Test whether the migration can also handle existing data. """ now = timezone.now().replace(microsecond=0) TemperatureReading.objects.create( read_at=now + timezone.timedelta(hours=1), degrees_celcius=20, ) TemperatureReading.objects.create( read_at=now, degrees_celcius=20, ) self.assertIsNone(WeatherSettings.get_solo().next_sync) # Now we fake applying the migration (again for this test). MigrationRecorder.Migration.objects.filter( app='dsmr_weather', name='0004_next_sync_setting_retroactive' ).delete() MigrationExecutor(connection=connection).migrate([(self.app, '0004_next_sync_setting_retroactive')]) # When having existing data, next_sync should be based on latest reading. self.assertEqual(WeatherSettings.get_solo().next_sync, now + timezone.timedelta(hours=2))
def _convert_legacy_dsmr_gas_line(parsed_reading, current_line, next_line): """ Legacy support for DSMR 2.x gas. """ legacy_gas_line = current_line if next_line.startswith('('): # pragma: no cover legacy_gas_line = current_line + next_line legacy_gas_result = re.search( r'[^(]+\((\d+)\)\(\d+\)\(\d+\)\(\d+\)\([0-9-.:]+\)\(m3\)\(([0-9.]+)\)', legacy_gas_line ) gas_timestamp = legacy_gas_result.group(1) if timezone.now().dst() != timezone.timedelta(0): gas_timestamp += 'S' else: gas_timestamp += 'W' parsed_reading['extra_device_timestamp'] = reading_timestamp_to_datetime( string=gas_timestamp ) parsed_reading['extra_device_delivered'] = legacy_gas_result.group(2) return parsed_reading
def test_export_fail(self, now_mock, should_export_mock, requests_post_mock): """ Test export() failing by denied API call. """ now_mock.return_value = timezone.make_aware(timezone.datetime(2015, 12, 12, hour=4, minute=45)) should_export_mock.return_value = True settings = MinderGasSettings.get_solo() self.assertFalse(settings.export) self.assertIsNone(settings.next_export) self.assertFalse(requests_post_mock.called) # Mindergas error codes according to docs. for current_error_code in (401, 422): requests_post_mock.return_value = mock.MagicMock(status_code=current_error_code, text='Error message') dsmr_mindergas.services.export() settings = MinderGasSettings.get_solo() # This should be set one hour forward now. self.assertEqual(settings.next_export, timezone.now() + timezone.timedelta(hours=1)) self.assertTrue(requests_post_mock.called)
def setUp(self): calendar = Calendar.objects.create_calendar('default') now = timezone.now() day = timezone.timedelta(days=1) hour = timezone.timedelta(hours=1) self.event = Event.objects.create( name='Test', nb_path='/pseudo/test', start_time=now + 3 * day, end_time=now + 3 * day + 4 * hour, calendar=calendar ) self.group = SupportGroup.objects.create( name='Test', nb_path='/12/grouptest' )
def setUp(self): self.calendar = Calendar.objects.create( name="My calendar", slug="my_calendar", ) now = timezone.now() day = timezone.timedelta(days=1) hour = timezone.timedelta(hours=1) for i in range(20): Event.objects.create( name="Event {}".format(i), calendar=self.calendar, start_time=now + i * day, end_time=now + i * day + hour )