我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.utils.timezone.datetime()。
def razzia_wizard(request): if request.method == 'POST': return redirect( reverse("razzia_view") + "?start={0}-{1}-{2}&end={3}-{4}-{5}&products={6}&username=&razzia_title={7}" .format(int(request.POST['start_year']), int(request.POST['start_month']), int(request.POST['start_day']), int(request.POST['end_year']), int(request.POST['end_month']), int(request.POST['end_day']), request.POST.get('products'), request.POST.get('razzia_title'))) suggested_start_date = timezone.now() - datetime.timedelta(days=-180) suggested_end_date = timezone.now() start_date_picker = fields.DateField( widget=extras.SelectDateWidget(years=[x for x in range(2000, timezone.now().year + 1)])) end_date_picker = fields.DateField( widget=extras.SelectDateWidget(years=[x for x in range(2000, timezone.now().year + 1)])) return render(request, 'admin/stregsystem/razzia/wizard.html', { 'start_date_picker': start_date_picker.widget.render("start", suggested_start_date), 'end_date_picker': end_date_picker.widget.render("end", suggested_end_date)}, )
def test_multibuy_hint_two_buys_applicable(self): member = Member.objects.get(username="jokke") coke = Product.objects.create( name="coke", price=100, active=True ) with freeze_time(timezone.datetime(2018, 1, 1)) as frozen_time: for i in range(1, 3): Sale.objects.create( member=member, product=coke, price=100, ) frozen_time.tick() give_multibuy_hint, sale_hints = stregsystem_views._multibuy_hint(timezone.datetime(2018, 1, 1, tzinfo=pytz.UTC), member) self.assertTrue(give_multibuy_hint) self.assertEqual(sale_hints, "{} {}:{}".format("jokke", coke.id, 2))
def test_order_execute_single_no_remaining(self, fulfill): self.product.sale_set.create( price=100, member=self.member ) self.product.start_date = datetime.date(year=2017, month=1, day=1) self.product.quantity = 1 order = Order(self.member, self.room) item = OrderItem(self.product, order, 1) order.items.add(item) with self.assertRaises(NoMoreInventoryError): order.execute() fulfill.was_not_called()
def test_order_execute_multi_some_remaining(self, fulfill): self.product.sale_set.create( price=100, member=self.member ) self.product.start_date = datetime.date(year=2017, month=1, day=1) self.product.quantity = 2 order = Order(self.member, self.room) item = OrderItem(self.product, order, 2) order.items.add(item) with self.assertRaises(NoMoreInventoryError): order.execute() fulfill.was_not_called()
def test_invoice_get_rates_inconsistent_hourly_paidtask_rates(member): PAID_TASK_RATE_ONE = 0.5 PAID_TASK_RATE_TWO = 0.2 month = timezone.datetime(2014, 04, 01) paid_task_kwargs = { 'rate': PAID_TASK_RATE_ONE, # Note how this doesn't match user's rate 'datetime': month, 'user': member, 'task_type': PaidTaskTypes.HOURLY_WORK, } PaidTaskFactory(**paid_task_kwargs) PaidTaskFactory(**dict(paid_task_kwargs, rate=PAID_TASK_RATE_TWO)) invoice = Invoice(member, FAKE_CONFIG, month=month) with pytest.raises(ValueError) as e: invoice.get_rates() assert ( 'Multiple HOURLY_WORK rate values for user %s' % (member.username) in e.value.message )
def _check_single_paidtask(invoice, amount): local_now = timezone.localtime(invoice.now) current_month_start = local_now.replace(day=1, hour=0, minute=0, second=0) PaidTask.objects.get( task_type=PaidTaskTypes.CORRECTION, amount=(-1) * amount, datetime=invoice.month_end, description='Carryover to the next month', user=invoice.user, ) PaidTask.objects.get( task_type=PaidTaskTypes.CORRECTION, amount=amount, datetime=current_month_start, description='Carryover from the previous month', user=invoice.user, ) assert PaidTask.objects.filter(task_type=PaidTaskTypes.CORRECTION).count() == 2
def test_telescope_availability_spans_interval(self, mock_intervals): mock_intervals.return_value = [(datetime(2016, 9, 30, 18, 30, 0, tzinfo=timezone.utc), datetime(2016, 9, 30, 21, 0, 0, tzinfo=timezone.utc)), (datetime(2016, 10, 1, 18, 30, 0, tzinfo=timezone.utc), datetime(2016, 10, 1, 19, 0, 0, tzinfo=timezone.utc)), (datetime(2016, 10, 1, 19, 10, 0, tzinfo=timezone.utc), datetime(2016, 10, 1, 19, 20, 0, tzinfo=timezone.utc)), (datetime(2016, 10, 2, 18, 30, 0, tzinfo=timezone.utc), datetime(2016, 10, 2, 21, 0, 0, tzinfo=timezone.utc))] start = datetime(2016, 9, 30, tzinfo=timezone.utc) end = datetime(2016, 10, 2, tzinfo=timezone.utc) telescope_availability = get_telescope_availability_per_day(start, end) self.assertIn(self.tk1, telescope_availability) self.assertIn(self.tk2, telescope_availability) doma_available_time = (datetime(2016, 10, 1, 19, 0, 0) - datetime(2016, 10, 1, 18, 30, 0)).total_seconds() doma_available_time += (datetime(2016, 10, 1, 19, 20, 0) - datetime(2016, 10, 1, 19, 10, 0)).total_seconds() doma_total_time = doma_available_time doma_expected_availability = doma_available_time / doma_total_time self.assertAlmostEqual(doma_expected_availability, telescope_availability[self.tk1][0][1]) domb_expected_availability = 1.0 self.assertAlmostEqual(domb_expected_availability, telescope_availability[self.tk2][0][1])
def test_get_for_closing(self): now = arrow.now().floor('day').to('UTC').datetime event_before = Event() event_before.begin = now - timezone.timedelta(days=3) event_before.end = now - timezone.timedelta(days=4) event_before.title = 'test_title_now' event_before.status = 'open' event_before.save() event_after = copy.copy(event_before) event_after.id = None event_after.begin = now + timezone.timedelta(days=3) event_after.end = now + timezone.timedelta(days=4) event_after.save() queryset = Event.objects.get_for_closing() self.assertTrue(event_before in queryset) self.assertTrue(event_after not in queryset) self.assertEqual(queryset.count(), 2) for event in queryset: self.assertTrue(event.end < now) self.assertTrue(event.paid >= event.total)
def test_calendar(self): calendar = Calendar() now = arrow.now().floor('day').to('UTC').datetime week = now + timezone.timedelta(days=settings.EVENTS_CALENDAR_PERIOD) self.assertEqual(now, calendar.begin) self.assertEqual(week, calendar.end) event = Event() event.begin = now + timezone.timedelta(days=3) event.end = now + timezone.timedelta(days=4) event.title = 'test_title_now' event.status = 'open' event.save() days = calendar.get_days() self.assertEqual(settings.EVENTS_CALENDAR_PERIOD, len(days)) for element in days: if event.begin <= element.date < event.end: self.assertIn(event, element.events) for hour in element.hours: if event.begin <= hour.date < event.end: self.assertIn(event, hour.events)
def get_channel_schedule(self, url): """Get movie schedule from CJ E&M channels.""" schedule = self.get_original_data(url).find('div', class_='scheduler') date_text = schedule.find('em').text[:-4].strip() date_split = date_text.split(".") # If date is different from the day of argument, return None. if "".join(date_split) != url[-8:]: return None schedule_date = timezone.datetime(int(date_split[0]), int(date_split[1]), int(date_split[2])) schedule_table = schedule.find('tbody').find_all('tr') if len(schedule_table) == 0: # If no schedule exists return None return self.parse_daily_schedule(schedule_table, schedule_date)
def parse_daily_schedule(self, table, date): """Get daily schedule for t.cast channel.""" date_format = timezone.datetime.strftime(date, "%Y%m%d") next_date = date + timezone.timedelta(days=1) daily_schedule = [] # Get schedule for hour in range(24): date_hour_string = date_format + '{:02d}'.format(hour) cell = table.find('td', id=date_hour_string) if cell is None: return None schedules = cell.find_all('div', class_='con active') for schedule in schedules: if hour in range(self.start_hour): # Next day's schedule. daily_schedule.append(self.parse_schedule_item(schedule, next_date)) else: daily_schedule.append(self.parse_schedule_item(schedule, date)) # Add to list return daily_schedule
def test_should_notify_set(self): """ Notifications: Test should_notify()'s output when service is set """ settings = NotificationSetting.get_solo() settings.send_notification = True settings.notification_service = NotificationSetting.NOTIFICATION_NMA settings.save() self.assertTrue(settings.send_notification) # Should fail because we haven't set an API key self.assertFalse(dsmr_notification.services.should_notify(settings)) settings.api_key = 'es7sh2d-DSMR-Reader-Rulez-iweu732' settings.save() self.assertTrue(dsmr_notification.services.should_notify(settings)) settings.next_notification = None dsmr_notification.services.set_next_notification( settings, timezone.make_aware(timezone.datetime(2116, 11, 16))) self.assertFalse(dsmr_notification.services.should_notify(settings))
def test_set_next_notification_date(self, now_mock): """ Notifications: Test if next notification date is set """ now_mock.return_value = timezone.make_aware( timezone.datetime(2016, 11, 16)) now = timezone.localtime(timezone.now()) tomorrow = (timezone.localtime(timezone.now()) + timezone.timedelta(hours=24)).date() settings = NotificationSetting.get_solo() settings.next_notification = now settings.save() dsmr_notification.services.set_next_notification(settings, now) self.assertEqual(settings.next_notification, tomorrow)
def test_notification_api_fail(self, now_mock, requests_post_mock): """ Notifications: Test API failure for notify() """ now_mock.return_value = timezone.make_aware( timezone.datetime(2016, 11, 17, hour=0, minute=5)) requests_post_mock.return_value = mock.MagicMock( status_code=403, text='Forbidden') settings = NotificationSetting.get_solo() settings.send_notification = True settings.notification_service = NotificationSetting.NOTIFICATION_NMA settings.api_key = 'es7sh2d-DSMR-Reader-Rulez-iweu732' settings.next_notification = timezone.localtime(timezone.now()) settings.save() if self.fixtures: with self.assertRaises(AssertionError): dsmr_notification.services.notify() else: # When having no data, this should NOT raise an exception. return dsmr_notification.services.notify() with self.assertRaisesMessage( AssertionError, 'Notify API call failed: Forbidden (HTTP403)'): dsmr_notification.services.notify()
def test_grouping(self, now_mock): """ Test grouping per minute, instead of the default 10-second interval. """ now_mock.return_value = timezone.make_aware( timezone.datetime(2015, 11, 10, hour=21) ) # Make sure to verify the blocking of read ahead. dr = DsmrReading.objects.get(pk=3) dr.timestamp = timezone.now() dr.save() dsmr_consumption.services.compact_all() self.assertEqual(DsmrReading.objects.unprocessed().count(), 1) self.assertTrue(DsmrReading.objects.unprocessed().exists()) self.assertEqual(ElectricityConsumption.objects.count(), 1) if self.support_gas_readings: self.assertEqual(GasConsumption.objects.count(), 1) else: self.assertEqual(GasConsumption.objects.count(), 0)
def test_reading_values(self, now_mock): """ Test whether dsmr_datalogger reads the correct values. """ now_mock.return_value = timezone.make_aware(timezone.datetime(2017, 2, 1, hour=0, minute=0, second=0)) self._fake_dsmr_reading() self.assertTrue(DsmrReading.objects.exists()) reading = DsmrReading.objects.get() self.assertEqual( reading.timestamp, timezone.datetime(2017, 1, 10, 19, 40, 57, tzinfo=pytz.UTC) ) self.assertEqual(reading.electricity_delivered_1, Decimal('9012.345')) self.assertEqual(reading.electricity_returned_1, Decimal('9123.456')) self.assertEqual(reading.electricity_delivered_2, Decimal('9067.890')) self.assertEqual(reading.electricity_returned_2, Decimal('9789.012')) self.assertEqual(reading.electricity_currently_delivered, Decimal('0.320')) self.assertEqual(reading.electricity_currently_returned, Decimal('0')) self.assertEqual( reading.extra_device_timestamp, timezone.datetime(2017, 1, 10, 19, 40, 9, tzinfo=pytz.UTC) ) self.assertEqual(reading.extra_device_delivered, Decimal('123.456'))
def reading_timestamp_to_datetime(string): """ Converts a string containing a timestamp to a timezone aware datetime. """ timestamp = re.search(r'(\d{2,2})(\d{2,2})(\d{2,2})(\d{2,2})(\d{2,2})(\d{2,2})([WS])+', string) meter_timestamp = timezone.datetime( year=2000 + int(timestamp.group(1)), month=int(timestamp.group(2)), day=int(timestamp.group(3)), hour=int(timestamp.group(4)), minute=int(timestamp.group(5)), second=int(timestamp.group(6)), ) is_dst = timestamp.group(7) == 'S' local_timezone = pytz.timezone(settings.TIME_ZONE) return local_timezone.localize(meter_timestamp, is_dst=is_dst).astimezone(pytz.utc)
def count_active_days(enable_date, disable_date): """Return the number of days the segment has been active. :param enable_date: The date the segment was enabled :type enable_date: timezone.datetime :param disable_date: The date the segment was disabled :type disable_date: timezone.datetime :returns: The amount of days a segment is/has been active :rtype: int """ if enable_date is not None: if disable_date is None or disable_date <= enable_date: # There is no disable date, or it is not relevant. delta = timezone.now() - enable_date return delta.days if disable_date > enable_date: # There is a disable date and it is relevant. delta = disable_date - enable_date return delta.days return 0
def print_counter(self, user): obj = PrintCounterDocumentAlbaran() obj.albaran = self obj.user = user obj.date = datetime.datetime.now() if self.lock: obj.status_document = STATUS_PRINTER_DOCUMENT_DEFINITVE else: obj.status_document = STATUS_PRINTER_DOCUMENT_TEMPORARY obj.save() return PrintCounterDocumentAlbaran.objects.filter( status_document=STATUS_PRINTER_DOCUMENT_DEFINITVE, albaran=self ).count() # lineas de albaranes
def setUp(self): self.room = Room.objects.create( name="test" ) self.jokke = Member.objects.create( username="jokke" ) self.coke = Product.objects.create( name="coke", price=100, active=True ) self.flan = Product.objects.create( name="flan", price=200, active=True ) self.sales = [] with freeze_time(timezone.datetime(2000, 1, 1)) as frozen_time: for i in range(1, 4): self.sales.append( Sale.objects.create( member=self.jokke, product=self.coke, price=100, ) ) frozen_time.tick() self.payments = [] with freeze_time(timezone.datetime(2000, 1, 1)) as frozen_time: for i in range(1, 3): self.payments.append( Payment.objects.create( member=self.jokke, amount=100, ) ) frozen_time.tick()
def test_is_active_active_not_expired(self): product = Product.objects.create( active=True, price=100, deactivate_date=(timezone.now() + datetime.timedelta(hours=1)) ) self.assertTrue(product.is_active())
def test_is_active_active_out_of_stock(self): product = Product.objects.create( active=True, price=100, quantity=1, start_date=datetime.date(year=2017, month=1, day=1) ) product.sale_set.create( price=100, member=self.jeff ) self.assertFalse(product.is_active())
def test_is_active_active_in_stock(self): product = Product.objects.create( active=True, price=100, quantity=2, start_date=datetime.date(year=2017, month=1, day=1) ) product.sale_set.create( price=100, member=self.jeff ) self.assertTrue(product.is_active())
def test_is_active_deactive_expired(self): product = Product.objects.create( active=False, price=100, deactivate_date=(timezone.now() - datetime.timedelta(hours=1)) ) self.assertFalse(product.is_active())
def test_is_active_deactive_out_of_stock(self): product = Product.objects.create( active=False, price=100, quantity=1, start_date=datetime.date(year=2017, month=12, day=1) ) product.sale_set.create( price=100, member=self.jeff ) self.assertFalse(product.is_active())
def test_is_active_deactive_in_stock(self): product = Product.objects.create( active=False, price=100, quantity=2, start_date=datetime.date(year=2017, month=12, day=1) ) product.sale_set.create( price=100, member=self.jeff ) self.assertFalse(product.is_active())
def test_promille_staggered_female(self): user = Member.objects.create(username="test", gender='F') # (330 ml * 4.6%) = 15.18 alcoholic_drink = ( Product.objects.create( name="øl", price=2.0, alcohol_content_ml=15.18, active=True)) with freeze_time(timezone.datetime(year=2000, month=1, day=1, hour=0, minute=0)) as ft: for i in range(5): ft.tick(delta=datetime.timedelta(minutes=10)) user.sale_set.create( product=alcoholic_drink, price=alcoholic_drink.price) # The last drink was at 2000/01/01 00:50:00 with freeze_time(timezone.datetime(year=2000, month=1, day=1, hour=0, minute=50)) as ft: self.assertAlmostEqual( 1.15, user.calculate_alcohol_promille(), places=2 )
def test_sales_to_user_in_period(self): res = views._sales_to_user_in_period( self.alan.username, timezone.datetime(2017, 2, 1, 0, 0, tzinfo=pytz.UTC), timezone.datetime(2017, 2, 17, 0, 0, tzinfo=pytz.UTC), [self.flan.id, self.flanmad.id], {self.flan.name: 0, self.flanmad.name: 0}, ) self.assertEqual(2, res[self.flan.name]) self.assertEqual(1, res[self.flanmad.name])
def test_sales_to_user_no_results_out_of_period(self): res = views._sales_to_user_in_period( self.bob.username, timezone.datetime(2017, 2, 1, 0, 0, tzinfo=pytz.UTC), timezone.datetime(2017, 2, 17, 0, 0, tzinfo=pytz.UTC), [self.flan.id, self.flanmad.id], {self.flan.name: 0, self.flanmad.name: 0}, ) self.assertEqual(0, res[self.flan.name]) self.assertEqual(0, res[self.flanmad.name])
def fjule_party(year): first_december = timezone.datetime( year, 12, 1, 22, tzinfo=pytz.timezone("Europe/Copenhagen") ) days_to_add = (11 - first_december.weekday()) % 7 return first_december + datetime.timedelta(days=days_to_add)
def late(date): return timezone.datetime(date.year, date.month, date.day, 23, 59, 59)
def first_of_month(date): return timezone.datetime(date.year, date.month, 1, 23, 59, 59)
def daily(request): current_date = timezone.now().replace(hour=0, minute=0, second=0) latest_sales = (Sale.objects .prefetch_related('product', 'member') .order_by('-timestamp')[:7]) top_today = (Product.objects .filter(sale__timestamp__gt=current_date) .annotate(Count('sale')) .order_by('-sale__count')[:7]) startTime_day = timezone.now() - datetime.timedelta(hours=24) revenue_day = (Sale.objects .filter(timestamp__gt=startTime_day) .aggregate(Sum("price")) ["price__sum"]) or 0.0 startTime_month = timezone.now() - datetime.timedelta(days=30) revenue_month = (Sale.objects .filter(timestamp__gt=startTime_month) .aggregate(Sum("price")) ["price__sum"]) or 0.0 top_month_category = (Category.objects .filter(product__sale__timestamp__gt=startTime_month) .annotate(sale=Count("product__sale")) .order_by("-sale")[:7]) return render(request, 'admin/stregsystem/report/daily.html', locals())
def sales_api(request): startTime_month = timezone.now() - datetime.timedelta(days=30) qs = (Sale.objects .filter(timestamp__gt=startTime_month) .annotate(day=TruncDay('timestamp')) .values('day') .annotate(c=Count('*')) .annotate(r=Sum('price')) ) db_sales = {i["day"].date(): (i["c"], money(i["r"])) for i in qs} base = timezone.now().date() date_list = [base - datetime.timedelta(days=x) for x in range(0, 30)] sales_list = [] revenue_list = [] for date in date_list: if date in db_sales: sales, revenue = db_sales[date] sales_list.append(sales) revenue_list.append(revenue) else: sales_list.append(0) revenue_list.append(0) items = { "day": date_list, "sales": sales_list, "revenue": revenue_list, } return JsonResponse(items)
def test_invoice_get_rates_paidtask_rates(member): """Tests that `Invoice.get_rates()` returns the rates set for users in their `PaidTask` entries. """ USER_RATE_ONE = 0.5 USER_RATE_TWO = 0.2 # Set some user rate member.hourly_rate = USER_RATE_ONE member.save() month = timezone.datetime(2014, 04, 01) paid_task_kwargs = { 'rate': USER_RATE_ONE, 'datetime': month, 'user': member, 'task_type': PaidTaskTypes.HOURLY_WORK, } PaidTaskFactory(**paid_task_kwargs) invoice = Invoice(member, FAKE_CONFIG, month=month) # Set user rate to something else to ensure we get the recorded rates member.hourly_rate = USER_RATE_TWO member.save() rate, review_rate, hourly_rate = invoice.get_rates() assert hourly_rate == USER_RATE_ONE
def test_aggregate_states_1(self): start = datetime(2016, 10, 1) end = datetime(2016, 10, 2) telescope_states = TelescopeStates(start, end).get() self.assertIn(self.tk1, telescope_states) self.assertIn(self.tk2, telescope_states) doma_expected_available_state = {'telescope': 'tst.doma.1m0a', 'event_type': 'AVAILABLE', 'event_reason': 'Available for scheduling', 'start': datetime(2016, 10, 1, 18, 24, 58, tzinfo=timezone.utc), 'end': datetime(2016, 10, 1, 20, 44, 58, tzinfo=timezone.utc) } self.assertIn(doma_expected_available_state, telescope_states[self.tk1]) domb_expected_available_state1 = {'telescope': 'tst.domb.1m0a', 'event_type': 'AVAILABLE', 'event_reason': 'Available for scheduling', 'start': datetime(2016, 10, 1, 18, 30, 0, tzinfo=timezone.utc), 'end': datetime(2016, 10, 1, 19, 24, 59, tzinfo=timezone.utc) } self.assertIn(domb_expected_available_state1, telescope_states[self.tk2]) domb_expected_available_state2 = {'telescope': 'tst.domb.1m0a', 'event_type': 'AVAILABLE', 'event_reason': 'Available for scheduling', 'start': datetime(2016, 10, 1, 20, 24, 59, tzinfo=timezone.utc), 'end': datetime(2016, 10, 1, 20, 44, 58, tzinfo=timezone.utc) } self.assertIn(domb_expected_available_state2, telescope_states[self.tk2])
def test_telescope_availability_limits_interval(self, mock_intervals): mock_intervals.return_value = [(datetime(2016, 9, 30, 18, 30, 0, tzinfo=timezone.utc), datetime(2016, 9, 30, 21, 0, 0, tzinfo=timezone.utc)), (datetime(2016, 10, 1, 18, 30, 0, tzinfo=timezone.utc), datetime(2016, 10, 1, 21, 0, 0, tzinfo=timezone.utc)), (datetime(2016, 10, 2, 18, 30, 0, tzinfo=timezone.utc), datetime(2016, 10, 2, 21, 0, 0, tzinfo=timezone.utc))] start = datetime(2016, 9, 30, tzinfo=timezone.utc) end = datetime(2016, 10, 2, tzinfo=timezone.utc) telescope_availability = get_telescope_availability_per_day(start, end) self.assertIn(self.tk1, telescope_availability) self.assertIn(self.tk2, telescope_availability) doma_available_time = (datetime(2016, 10, 1, 20, 44, 58) - datetime(2016, 10, 1, 18, 30, 0)).total_seconds() doma_total_time = (datetime(2016, 10, 1, 21, 0, 0) - datetime(2016, 10, 1, 18, 30, 0)).total_seconds() doma_expected_availability = doma_available_time / doma_total_time self.assertAlmostEqual(doma_expected_availability, telescope_availability[self.tk1][0][1]) domb_available_time = (datetime(2016, 10, 1, 19, 24, 59) - datetime(2016, 10, 1, 18, 30, 0)).total_seconds() domb_available_time += (datetime(2016, 10, 1, 20, 44, 58) - datetime(2016, 10, 1, 20, 24, 59)).total_seconds() domb_total_time = (datetime(2016, 10, 1, 21, 0, 0) - datetime(2016, 10, 1, 18, 30, 0)).total_seconds() domb_expected_availability = domb_available_time / domb_total_time self.assertAlmostEqual(domb_expected_availability, telescope_availability[self.tk2][0][1])
def test_telescope_availability_combine(self, mock_intervals): mock_intervals.return_value = [(datetime(2016, 9, 30, 18, 30, 0, tzinfo=timezone.utc), datetime(2016, 9, 30, 21, 0, 0, tzinfo=timezone.utc)), (datetime(2016, 10, 1, 18, 30, 0, tzinfo=timezone.utc), datetime(2016, 10, 1, 21, 0, 0, tzinfo=timezone.utc)), (datetime(2016, 10, 2, 18, 30, 0, tzinfo=timezone.utc), datetime(2016, 10, 2, 21, 0, 0, tzinfo=timezone.utc))] start = datetime(2016, 9, 30, tzinfo=timezone.utc) end = datetime(2016, 10, 2, tzinfo=timezone.utc) telescope_availability = get_telescope_availability_per_day(start, end) self.assertIn(self.tk1, telescope_availability) self.assertIn(self.tk2, telescope_availability) combined_telescope_availability = combine_telescope_availabilities_by_site_and_class(telescope_availability) combined_key = TelescopeKey(self.tk1.site, '', self.tk1.telescope[:-1]) self.assertIn(combined_key, combined_telescope_availability) doma_available_time = (datetime(2016, 10, 1, 20, 44, 58) - datetime(2016, 10, 1, 18, 30, 0)).total_seconds() doma_total_time = (datetime(2016, 10, 1, 21, 0, 0) - datetime(2016, 10, 1, 18, 30, 0)).total_seconds() doma_expected_availability = doma_available_time / doma_total_time domb_available_time = (datetime(2016, 10, 1, 19, 24, 59) - datetime(2016, 10, 1, 18, 30, 0)).total_seconds() domb_available_time += (datetime(2016, 10, 1, 20, 44, 58) - datetime(2016, 10, 1, 20, 24, 59)).total_seconds() domb_total_time = (datetime(2016, 10, 1, 21, 0, 0) - datetime(2016, 10, 1, 18, 30, 0)).total_seconds() domb_expected_availability = domb_available_time / domb_total_time total_expected_availability = (doma_expected_availability + domb_expected_availability) / 2.0 self.assertAlmostEqual(total_expected_availability, combined_telescope_availability[combined_key][0][1])
def setUp(self): self.configdb_null_patcher = patch('valhalla.common.configdb.ConfigDB._get_configdb_data') mock_configdb_null = self.configdb_null_patcher.start() mock_configdb_null.return_value = {} self.configdb_patcher = patch('valhalla.common.configdb.ConfigDB.get_instrument_types_per_telescope') self.mock_configdb = self.configdb_patcher.start() self.mock_configdb.return_value = { TelescopeKey(site='coj', observatory='clma', telescope='2m0a'): ['2M0-FLOYDS-SCICAM', '2M0-SCICAM-SPECTRAL'], TelescopeKey(site='coj', observatory='doma', telescope='1m0a'): ['1M0-SCICAM-SINISTRO'], TelescopeKey(site='coj', observatory='domb', telescope='1m0a'): ['1M0-SCICAM-SINISTRO'], TelescopeKey(site='cpt', observatory='domb', telescope='1m0a'): ['1M0-SCICAM-SINISTRO'], TelescopeKey(site='cpt', observatory='domc', telescope='1m0a'): ['1M0-SCICAM-SINISTRO'], TelescopeKey(site='elp', observatory='doma', telescope='1m0a'): ['1M0-SCICAM-SINISTRO'], TelescopeKey(site='lsc', observatory='domb', telescope='1m0a'): ['1M0-SCICAM-SINISTRO'], TelescopeKey(site='lsc', observatory='domc', telescope='1m0a'): ['1M0-SCICAM-SINISTRO'], TelescopeKey(site='ogg', observatory='clma', telescope='0m4b'): ['0M4-SCICAM-SBIG'], TelescopeKey(site='ogg', observatory='clma', telescope='2m0a'): ['2M0-FLOYDS-SCICAM'], TelescopeKey(site='sqa', observatory='doma', telescope='0m8a'): ['0M8-SCICAM-SBIG', '0M8-NRES-SCICAM']} with open('valhalla/common/test_data/es_telescope_states_data.txt', 'r') as input_file: self.es_output = json.loads(input_file.read()) self.start = datetime(2016, 10, 1, tzinfo=timezone.utc) self.end = datetime(2016, 10, 10, tzinfo=timezone.utc) self.short_end = datetime(2016, 10, 4, tzinfo=timezone.utc) self.es_patcher = patch('valhalla.common.telescope_states.TelescopeStates._get_es_data') self.mock_es = self.es_patcher.start() self.mock_es.return_value = self.es_output
def test_get_site_rise_set_intervals_should_not_return_an_interval(self): start = timezone.datetime(year=2017, month=5, day=5, tzinfo=timezone.utc) end = timezone.datetime(year=2017, month=5, day=6, tzinfo=timezone.utc) self.assertFalse(rise_set_utils.get_site_rise_set_intervals(start=start, end=end, site_code='bpl'))
def test_if_url_mapping_is_removed(self): log = factories.LastUsageLogFactory(used_date=timezone.datetime(1999, 12, 10, 22, 11, tzinfo=pytz.utc)) factories.UrlMappingFactory(last_usage=log, id=199) management.call_command('remove_expired_redirects') self.assertFalse(models.UrlMapping.objects.filter(pk=199).exists())
def test_if_regexp_mapping_is_removed(self): log = factories.LastUsageLogFactory(used_date=timezone.datetime(1999, 12, 10, 22, 11, tzinfo=pytz.utc)) factories.UrlRegexpMappingFactory(last_usage=log, id=233) management.call_command('remove_expired_redirects') self.assertFalse(models.UrlRegexpMapping.objects.filter(pk=233).exists())
def test_if_used_mapping_is_not_removed(self): log = factories.LastUsageLogFactory(used_date=timezone.datetime(2001, 12, 10, 22, 11, tzinfo=pytz.utc)) factories.UrlMappingFactory(last_usage=log, id=344) management.call_command('remove_expired_redirects') self.assertTrue(models.UrlMapping.objects.filter(pk=344).exists())
def test_if_used_regexp_mapping_is_not_removed(self): log = factories.LastUsageLogFactory(used_date=timezone.datetime(2001, 12, 10, 22, 11, tzinfo=pytz.utc)) factories.UrlRegexpMappingFactory(last_usage=log, id=422) management.call_command('remove_expired_redirects') self.assertTrue(models.UrlRegexpMapping.objects.filter(pk=422).exists())
def test_if_removing_regexp_mapping_does_not_remove_generated_mappings(self): log = factories.LastUsageLogFactory(used_date=timezone.datetime(1999, 12, 10, 22, 11, tzinfo=pytz.utc)) mapping = factories.UrlRegexpMappingFactory(last_usage=log, id=988) factories.RegexpGeneratedMappingFactory(regexp=mapping, id=1022) management.call_command('remove_expired_redirects') self.assertTrue(models.RegexpGeneratedMapping.objects.filter(pk=1022).exists())
def get_default_value_due(): return timezone.now() + datetime.timedelta(days=7)
def get_default_value_mref(): return "ONDON" + str((timezone.now() - timezone.datetime(1970,1,1,tzinfo=timezone.utc)).total_seconds())
def test_syncsession_creation_fails_with_expired_nonce(self): data = self.get_initial_syncsession_data_for_request() Nonce.objects.all().update(timestamp=timezone.datetime(2000, 1, 1, tzinfo=timezone.get_current_timezone())) self.assertSyncSessionCreationFails(data)
def test_get_video_processing_state_started_at_truncated_microseconds(self): factories.VideoFactory(public_id='videoid', title="Some title", owner=self.user) started_at = datetime(2016, 1, 1, 12, 13, 14, 1516, get_current_timezone()) models.ProcessingState.objects.filter(video__public_id='videoid').update(started_at=started_at) response = self.client.get(reverse('api:v1:video-detail', kwargs={'id': 'videoid'})) video = response.json() # Check that microseconds are truncated self.assertEqual('2016-01-01T12:13:14Z', video['processing']['started_at'])
def test_get_for_notification(self): date = timezone.datetime(2016, 12, 8, 10, 0, 0, 0, pytz.UTC) queryset = Event.objects.get_for_notification(date) self.assertGreater(queryset.count(), 0) for event in queryset: self.assertTrue(event.begin >= date) self.assertTrue(event.notified_at is None)