我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用django.utils.timezone.utc()。
def value_to_db_datetime(self, value): if value is None: return None # MySQL doesn't support tz-aware times if timezone.is_aware(value): if settings.USE_TZ: value = value.astimezone(timezone.utc).replace(tzinfo=None) else: raise ValueError( "MySQL backend does not support timezone-aware times." ) if not self.connection.features.supports_microsecond_precision: value = value.replace(microsecond=0) if not self.connection.use_pure: return datetime_to_mysql(value) return self.connection.converter.to_mysql(value)
def test_parking_str(parking_factory): parking = parking_factory( time_start=datetime.datetime(2014, 1, 1, 6, 0, 0, tzinfo=utc), time_end=datetime.datetime(2016, 1, 1, 7, 0, 0, tzinfo=utc), registration_number='ABC-123', ) assert all(str(parking).count(val) == 1 for val in ('2014', '2016', '8', '9', 'ABC-123')) parking = parking_factory( time_start=datetime.datetime(2016, 1, 1, 6, 0, 0, tzinfo=utc), time_end=datetime.datetime(2016, 1, 1, 7, 0, 0, tzinfo=utc), registration_number='ABC-123', ) assert all(str(parking).count(val) == 1 for val in ('2016', '8', '9', 'ABC-123')) parking = parking_factory( time_start=datetime.datetime(2016, 1, 1, 6, 0, 0, tzinfo=utc), time_end=None, registration_number='ABC-123', ) assert all(str(parking).count(val) == 1 for val in ('2016', '8', 'ABC-123'))
def render(self, datestring): """Parses a date-like input string into a timezone aware Python datetime. """ formats = ["%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"] if datestring: for format in formats: try: parsed = datetime.strptime(datestring, format) if not timezone.is_aware(parsed): parsed = timezone.make_aware(parsed, timezone.utc) return parsed except Exception: pass return None
def adapt_datetime_warn_on_aware_datetime(value, conv): # Remove this function and rely on the default adapter in Django 2.0. if settings.USE_TZ and timezone.is_aware(value): warnings.warn( "The MySQL database adapter received an aware datetime (%s), " "probably from cursor.execute(). Update your code to pass a " "naive datetime in the database connection's time zone (UTC by " "default).", RemovedInDjango20Warning) # This doesn't account for the database connection's timezone, # which isn't known. (That's why this adapter is deprecated.) value = value.astimezone(timezone.utc).replace(tzinfo=None) return Thing2Literal(value.strftime("%Y-%m-%d %H:%M:%S.%f"), conv) # MySQLdb-1.2.1 returns TIME columns as timedelta -- they are more like # timedelta in terms of actual behavior as they are signed and include days -- # and Django expects time, so we still need to override that. We also need to # add special handling for SafeText and SafeBytes as MySQLdb's type # checking is too tight to catch those (see Django ticket #6052).
def clone(self, **override): # See: https://github.com/django/django/commit/a97ecfdea8 copy = self.__class__.objects.get(pk=self.pk) copy.pk = None copy.date_complete = datetime(1970, 1, 2, tzinfo=timezone.utc) copy.failure_datetime = None copy.queued = copy.running = False copy.complete_duration = 0 copy.backup_size_mb = 0 # Use the overrides. for key, value in override.items(): setattr(copy, key, value) copy.save() return copy
def test_not_expired(self): paper = create_paper() render1 = create_render(paper=paper) render1.created_at = datetime.datetime(1900, 1, 1).replace(tzinfo=timezone.utc) render1.save() render2 = create_render(paper=paper) # haven't updated expired status yet qs = Render.objects.not_expired() self.assertIn(render1, qs) self.assertIn(render2, qs) # batch job which updates the expired flag Render.objects.update_is_expired() # render1 should have expired now qs = Render.objects.not_expired() self.assertNotIn(render1, qs) self.assertIn(render2, qs)
def get_request_state_from_pond_blocks(request_state, acceptability_threshold, request_blocks): active_blocks = False future_blocks = False now = timezone.now() for block in request_blocks: start_time = dateutil.parser.parse(block['start']).replace(tzinfo=timezone.utc) end_time = dateutil.parser.parse(block['end']).replace(tzinfo=timezone.utc) # mark a block as complete if a % of the total exposures of all its molecules are complete completion_percent = exposure_completion_percentage_from_pond_block(block) if isclose(acceptability_threshold, completion_percent) or completion_percent >= acceptability_threshold: return 'COMPLETED' if (not block['canceled'] and not any(molecule['failed'] for molecule in block['molecules']) and start_time < now < end_time): active_blocks = True if now < start_time: future_blocks = True if not (future_blocks or active_blocks): return 'FAILED' return request_state
def setUp(self): super().setUp() self.proposal = mixer.blend(Proposal) self.user = mixer.blend(User) mixer.blend(Profile, user=self.user) self.client.force_login(self.user) self.semester = mixer.blend( Semester, id='2016B', start=datetime(2016, 9, 1, tzinfo=timezone.utc), end=datetime(2016, 12, 31, tzinfo=timezone.utc) ) self.time_allocation_1m0_sbig = mixer.blend( TimeAllocation, proposal=self.proposal, semester=self.semester, telescope_class='1m0', instrument_name='1M0-SCICAM-SBIG', std_allocation=100.0, std_time_used=0.0, too_allocation=10, too_time_used=0.0, ipp_limit=10.0, ipp_time_available=5.0 ) self.time_allocation_2m0_floyds = mixer.blend( TimeAllocation, proposal=self.proposal, semester=self.semester, telescope_class='2m0', instrument_name='2M0-FLOYDS-SCICAM', std_allocation=100.0, std_time_used=0.0, too_allocation=10, too_time_used=0.0, ipp_limit=10.0, ipp_time_available=5.0 ) self.membership = mixer.blend(Membership, user=self.user, proposal=self.proposal) self.generic_payload = copy.deepcopy(generic_payload) self.generic_payload['proposal'] = self.proposal.id
def setUp(self): super().setUp() self.proposal = mixer.blend(Proposal) self.user = mixer.blend(User) self.client.force_login(self.user) mixer.blend(Membership, user=self.user, proposal=self.proposal) semester = mixer.blend( Semester, id='2016B', start=datetime(2016, 9, 1, tzinfo=timezone.utc), end=datetime(2016, 12, 31, tzinfo=timezone.utc), ) self.time_allocation_1m0 = mixer.blend( TimeAllocation, proposal=self.proposal, semester=semester, telescope_class='1m0', std_allocation=100.0, std_time_used=0.0, too_allocation=10, too_time_used=0.0, ipp_limit=10.0, ipp_time_available=5.0 ) self.generic_payload = copy.deepcopy(generic_payload) self.generic_payload['proposal'] = self.proposal.id
def setUp(self): super().setUp() self.proposal = mixer.blend(Proposal) self.user = mixer.blend(User) self.client.force_login(self.user) semester = mixer.blend(Semester, id='2016B', start=datetime(2016, 9, 1, tzinfo=timezone.utc), end=datetime(2016, 12, 31, tzinfo=timezone.utc) ) self.time_allocation_1m0_sbig = mixer.blend( TimeAllocation, proposal=self.proposal, semester=semester, telescope_class='1m0', instrument_name='1M0-SCICAM-SBIG', std_allocation=100.0, std_time_used=0.0, too_allocation=10, too_time_used=0.0, ipp_limit=10.0, ipp_time_available=5.0 ) self.time_allocation_2m0_floyds = mixer.blend( TimeAllocation, proposal=self.proposal, semester=semester, telescope_class='2m0', instrument_name='2M0-FLOYDS-SCICAM', std_allocation=100.0, std_time_used=0.0, too_allocation=10, too_time_used=0.0, ipp_limit=10.0, ipp_time_available=5.0 ) mixer.blend(Membership, user=self.user, proposal=self.proposal) self.generic_payload = copy.deepcopy(generic_payload) self.generic_payload['proposal'] = self.proposal.id
def setUp(self): super().setUp() self.proposal = mixer.blend(Proposal, id='temp') self.semester = mixer.blend(Semester, id='2016B', start=datetime(2016, 9, 1, tzinfo=timezone.utc), end=datetime(2016, 12, 31, tzinfo=timezone.utc)) self.time_allocation_1m0_sbig = mixer.blend( TimeAllocation, proposal=self.proposal, semester=self.semester, telescope_class='1m0', instrument_name='1M0-SCICAM-SBIG', std_allocation=100.0, std_time_used=0.0, too_allocation=10.0, too_time_used=0.0, ipp_limit=10.0, ipp_time_available=1.0 ) self.time_allocation_0m4_sbig = mixer.blend( TimeAllocation, proposal=self.proposal, semester=self.semester, telescope_class='0m4', instrument_name='0M4-SCICAM-SBIG', std_allocation=100.0, std_time_used=0.0, too_allocation=10.0, too_time_used=0.0, ipp_limit=10.0, ipp_time_available=1.0 ) self.user = mixer.blend(User) mixer.blend(Membership, user=self.user, proposal=self.proposal) self.client.force_login(self.user) self.generic_payload = copy.deepcopy(generic_payload)
def test_request_intervals_for_one_week(self): intervals = get_rise_set_intervals(self.request.as_dict) truth_intervals = [(datetime(2016, 10, 1, 0, 0, tzinfo=timezone.utc), datetime(2016, 10, 1, 3, 20, 31, 366820, tzinfo=timezone.utc)), (datetime(2016, 10, 1, 19, 13, 14, 944205, tzinfo=timezone.utc), datetime(2016, 10, 2, 3, 19, 9, 181040, tzinfo=timezone.utc)), (datetime(2016, 10, 2, 19, 9, 19, 241762, tzinfo=timezone.utc), datetime(2016, 10, 3, 3, 17, 47, 117329, tzinfo=timezone.utc)), (datetime(2016, 10, 3, 19, 5, 23, 539011, tzinfo=timezone.utc), datetime(2016, 10, 4, 3, 16, 25, 202612, tzinfo=timezone.utc)), (datetime(2016, 10, 4, 19, 1, 27, 835928, tzinfo=timezone.utc), datetime(2016, 10, 5, 3, 15, 3, 464340, tzinfo=timezone.utc)), (datetime(2016, 10, 5, 18, 57, 32, 132481, tzinfo=timezone.utc), datetime(2016, 10, 6, 3, 12, 5, 895932, tzinfo=timezone.utc)), (datetime(2016, 10, 6, 18, 53, 36, 428629, tzinfo=timezone.utc), datetime(2016, 10, 7, 3, 8, 10, 183626, tzinfo=timezone.utc)), (datetime(2016, 10, 7, 18, 49, 40, 724307, tzinfo=timezone.utc), datetime(2016, 10, 8, 0, 0, tzinfo=timezone.utc))] self.assertEqual(intervals, truth_intervals)
def test_telescope_availability_spans_interval(self, mock_intervals): mock_intervals.return_value = [(datetime(2016, 9, 30, 18, 30, 0, tzinfo=timezone.utc), datetime(2016, 9, 30, 21, 0, 0, tzinfo=timezone.utc)), (datetime(2016, 10, 1, 18, 30, 0, tzinfo=timezone.utc), datetime(2016, 10, 1, 19, 0, 0, tzinfo=timezone.utc)), (datetime(2016, 10, 1, 19, 10, 0, tzinfo=timezone.utc), datetime(2016, 10, 1, 19, 20, 0, tzinfo=timezone.utc)), (datetime(2016, 10, 2, 18, 30, 0, tzinfo=timezone.utc), datetime(2016, 10, 2, 21, 0, 0, tzinfo=timezone.utc))] start = datetime(2016, 9, 30, tzinfo=timezone.utc) end = datetime(2016, 10, 2, tzinfo=timezone.utc) telescope_availability = get_telescope_availability_per_day(start, end) self.assertIn(self.tk1, telescope_availability) self.assertIn(self.tk2, telescope_availability) doma_available_time = (datetime(2016, 10, 1, 19, 0, 0) - datetime(2016, 10, 1, 18, 30, 0)).total_seconds() doma_available_time += (datetime(2016, 10, 1, 19, 20, 0) - datetime(2016, 10, 1, 19, 10, 0)).total_seconds() doma_total_time = doma_available_time doma_expected_availability = doma_available_time / doma_total_time self.assertAlmostEqual(doma_expected_availability, telescope_availability[self.tk1][0][1]) domb_expected_availability = 1.0 self.assertAlmostEqual(domb_expected_availability, telescope_availability[self.tk2][0][1])
def _order_downtime_by_resource(raw_downtime_intervals): ''' Puts the raw downtime interval sets into a dictionary by resource ''' downtime_intervals = {} for interval in raw_downtime_intervals: resource = '.'.join([interval['telescope'], interval['observatory'], interval['site']]) if resource not in downtime_intervals: downtime_intervals[resource] = [] start = datetime.strptime(interval['start'], DOWNTIME_DATE_FORMAT).replace(tzinfo=timezone.utc) end = datetime.strptime(interval['end'], DOWNTIME_DATE_FORMAT).replace(tzinfo=timezone.utc) downtime_intervals[resource].append({'type': 'start', 'time': start}) downtime_intervals[resource].append({'type': 'end', 'time': end}) for resource in downtime_intervals: downtime_intervals[resource] = Intervals(downtime_intervals[resource]) return downtime_intervals
def __init__(self, start, end, telescopes=None, sites=None, instrument_types=None): try: self.es = Elasticsearch([settings.ELASTICSEARCH_URL]) except LocationValueError: logger.error('Could not find host. Make sure ELASTICSEARCH_URL is set.') raise ImproperlyConfigured('ELASTICSEARCH_URL') self.instrument_types = instrument_types self.available_telescopes = self._get_available_telescopes() sites = list({tk.site for tk in self.available_telescopes}) if not sites else sites telescopes = list({tk.telescope for tk in self.available_telescopes if tk.site in sites}) \ if not telescopes else telescopes self.start = start.replace(tzinfo=timezone.utc).replace(microsecond=0) self.end = end.replace(tzinfo=timezone.utc).replace(microsecond=0) cached_event_data = cache.get('tel_event_data') if cached_event_data: self.event_data = cached_event_data else: self.event_data = self._get_es_data(sites, telescopes) cache.set('tel_event_data', self.event_data, 1800)
def _occurrences_after_generator(self, after=None, tzinfo=pytz.utc): """ returns a generator that produces unpresisted occurrences after the datetime ``after``. """ if after is None: after = timezone.now() rule = self.get_rrule_object() if rule is None: if self.end > after: yield self._create_occurrence(self.start, self.end) raise StopIteration date_iter = iter(rule) difference = self.end - self.start while True: o_start = next(date_iter) if self.end_recurring_period and o_start > self.end_recurring_period: raise StopIteration o_end = o_start + difference if o_end > after: yield self._create_occurrence(o_start, o_end)
def _format_parameters( self, parameters ): parameters = list( parameters ) for index in range( len( parameters ) ): # With raw SQL queries, datetimes can reach this function # without being converted by DateTimeField.get_db_prep_value. if settings.USE_TZ and isinstance( parameters[index], datetime.datetime ): param = parameters[index] if timezone.is_naive( param ): warnings.warn(u"Received a naive datetime (%s)" u" while time zone support is active." % param, RuntimeWarning) default_timezone = timezone.get_default_timezone() param = timezone.make_aware( param, default_timezone ) param = param.astimezone(timezone.utc).replace(tzinfo=None) parameters[index] = param return tuple( parameters ) # Over-riding this method to modify SQLs which contains format parameter to qmark.
def value_to_db_datetime( self, value ): if value is None: return None if( djangoVersion[0:2] <= ( 1, 3 ) ): #DB2 doesn't support time zone aware datetime if ( value.tzinfo is not None ): raise ValueError( "Timezone aware datetime not supported" ) else: return value else: if is_aware(value): if settings.USE_TZ: value = value.astimezone( utc ).replace( tzinfo=None ) else: raise ValueError( "Timezone aware datetime not supported" ) return unicode( value )
def create(self, request, *args, **kwargs): email = request.data.get('email', None) try: user = self.get_queryset().get(email__iexact=email) except: user = None if user: # Allow only 5 requests per hour limit = 5 now = timezone.now() to_check = (now - relativedelta(hours=1)).replace(tzinfo=timezone.utc) tokens = models.PasswordRecoveryToken.objects.filter(user=user, created_date__gte=to_check, channel__slug=request.channel) if tokens.count() >= limit: will_release = tokens.order_by('-created_date')[limit-1].created_date + relativedelta(hours=1) seconds = abs((will_release - now).seconds) return response.Response({'success': False, 'message': 'Five tokens generated last hour.', 'try_again_in': seconds}, status=status.HTTP_429_TOO_MANY_REQUESTS) token = models.PasswordRecoveryToken.objects.create(user=user, object_channel=request.channel) return response.Response({'success': True, 'message': 'Token requested successfully(if user exists).'})
def jwt_blacklist_set_handler(payload): """ Default implementation that blacklists a jwt token. Should return a black listed token or None. """ try: data = { 'jti': payload.get('jti'), 'created': now(), 'expires': datetime.fromtimestamp(payload.get('exp'), tz=utc) } token = JWTBlacklistToken.objects.create(**data) except (TypeError, IntegrityError, Exception): return None else: return token
def create_invoices(self, date): # monthly quotas start = make_aware(datetime.datetime(date.year, date.month, 1), utc) if date.month == 12: year = date.year + 1 month = 1 else: year = date.year month = date.month + 1 end = make_aware(datetime.datetime(year, month, 1), utc) for quota in self.filter(quota_type__periodicity=QuotaType.MONTHLY): if not quota.quotainvoice_set.filter(date__range=(start, end)).exists(): print "quota necesita invoice:", quota.create_invoice(date) # yearly quotas start = make_aware(datetime.datetime(date.year, 1, 1), utc) end = make_aware(datetime.datetime(date.year + 1, 1, 1), utc) quotainvoices = [] for quota in self.filter(quota_type__periodicity=QuotaType.ANUAL): if not quota.quotainvoice_set.filter(date__range=(start, end)).exists(): quotainvoice = quota.create_invoice(date) quotainvoices.append(quotainvoice) return quotainvoices
def needs_invoice(self, obj): if obj.last_invoice_date is None: return True date = now() if obj.quota_type.periodicity == obj.quota_type.ANUAL: start = make_aware(datetime.datetime(date.year, 1, 1), utc) return obj.last_invoice_date < start elif obj.quota_type.periodicity == obj.quota_type.ANUAL: if date.month == 12: year = date.year + 1 month = 1 else: year = date.year month = date.month + 1 start = make_aware(datetime.datetime(year, month, 1), utc) return obj.last_invoice_date < start return False
def adapt_datetime_with_timezone_support(value, conv): # Equivalent to DateTimeField.get_db_prep_value. Used only by raw SQL. if settings.USE_TZ: if timezone.is_naive(value): warnings.warn("MySQL received a naive datetime (%s)" " while time zone support is active." % value, RuntimeWarning) default_timezone = timezone.get_default_timezone() value = timezone.make_aware(value, default_timezone) value = value.astimezone(timezone.utc).replace(tzinfo=None) return Thing2Literal(value.strftime("%Y-%m-%d %H:%M:%S.%f"), conv) # MySQLdb-1.2.1 returns TIME columns as timedelta -- they are more like # timedelta in terms of actual behavior as they are signed and include days -- # and Django expects time, so we still need to override that. We also need to # add special handling for SafeText and SafeBytes as MySQLdb's type # checking is too tight to catch those (see Django ticket #6052). # Finally, MySQLdb always returns naive datetime objects. However, when # timezone support is active, Django expects timezone-aware datetime objects.
def test_json_stores_user_attribute(self, mock_time): mock_time.return_value = datetime.utcnow().replace(tzinfo=utc) self._publish_transportation_form() # make account require phone auth self.user.profile.require_auth = True self.user.profile.save() # submit instance with a request user path = os.path.join( self.this_directory, 'fixtures', 'transportation', 'instances', self.surveys[0], self.surveys[0] + '.xml') auth = DigestAuth(self.login_username, self.login_password) self._make_submission(path, auth=auth) instances = Instance.objects.filter(xform_id=self.xform).all() self.assertTrue(len(instances) > 0) for instance in instances: self.assertEqual(instance.json[SUBMITTED_BY], 'bob') # check that the parsed instance's to_dict_for_mongo also contains # the _user key, which is what's used by the JSON REST service pi = ParsedInstance.objects.get(instance=instance) self.assertEqual(pi.to_dict_for_mongo()[SUBMITTED_BY], 'bob')
def handle(self, *args, **kwargs): # Reset all sql deletes to None Instance.objects.exclude( deleted_at=None, xform__downloadable=True).update(deleted_at=None) # Get all mongo deletes query = '{"$and": [{"_deleted_at": {"$exists": true}}, ' \ '{"_deleted_at": {"$ne": null}}]}' query = json.loads(query) xform_instances = settings.MONGO_DB.instances cursor = xform_instances.find(query) for record in cursor: # update sql instance with deleted_at datetime from mongo try: i = Instance.objects.get( uuid=record["_uuid"], xform__downloadable=True) except Instance.DoesNotExist: continue else: deleted_at = parse_datetime(record["_deleted_at"]) if not timezone.is_aware(deleted_at): deleted_at = timezone.make_aware( deleted_at, timezone.utc) i.set_deleted(deleted_at)
def do_password_reset(request, token=None): try: email, timestamp = _password_reset_token_factory.parse_token(token) except (signing.BadSignature, signing.SignatureExpired): return render(request, 'users/password_reset/reset_token_invalid.html', {}) try: user = get_user(email) except User.DoesNotExist: raise Http404() profile = user.profile timestamp = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc) if profile.last_password_change and profile.last_password_change > timestamp: return render(request, 'users/password_reset/token_already_used.html', {}) form = SetPasswordForm(user, request.POST or None) if form.is_valid(): form.save() profile.last_password_change = timezone.now() profile.save() return render(request, 'users/password_reset/reset_complete.html', {}) return render(request, 'users/password_reset/reset_form.html', { 'user': user, 'form': form, })
def authenticate(self, request, **kwargs): """ Validates a lost_key request by hash, id, and expiration time """ user_hash = request.QUERY_PARAMS.get('hash', None) if not user_hash: user_hash = request.DATA.get('hash') lost_key_id = request.parser_context.get('view').kwargs.get('pk') try: lost_key = LostKey.objects.get(hash=user_hash, pk=lost_key_id, expires_at__gte=datetime.utcnow(). replace(tzinfo=utc), used=False) return lost_key.created_by, lost_key.hash except: return None
def create_from_zfs_name(cls, zfs_name, status=OK, name=None, timestamp=None, **kwargs): """Create new snapshot from info gathered from compute node""" t, id = zfs_name.split('-', 1) t = t[0] if t == TT_EXEC: type = cls.MANUAL elif t == TT_AUTO: type = cls.AUTO else: raise AssertionError('Unknown snapshot type') if not name or name == '-': name = zfs_name snap = cls(id=int(id), name=name, type=type, status=status, **kwargs) if timestamp: snap.created = datetime.fromtimestamp(timestamp, timezone.utc) snap.save(force_insert=True) return snap
def key_state(key, gpg): if not key: return None, "invalid" results = gpg.import_keys(key).results # Key data is present in the last element of the list if not results or not results[-1]["fingerprint"]: return None, "invalid" key_fingerprint = results[-1]["fingerprint"] # Since the keyring is exclusive for this import # only the imported key exists in it. key = gpg.list_keys()[0] exp_timestamp = int(key["expires"]) if key["expires"] else 0 expires = datetime.fromtimestamp(exp_timestamp, timezone.utc) if key["trust"] == "r": state = "revoked" elif exp_timestamp and expires < timezone.now(): state = "expired" else: state = "valid" return key_fingerprint, state
def test_feed(self, feed): self.assertIsNone(cache.get(self.cache_key)) stream = self.stream.get_items(config=self.feedconfig) self.assertIsNotNone(cache.get(self.cache_key)) self.assertEqual(len(stream), 12) for item in stream: assert isinstance(item, FeedItem) self.assertEqual( stream[0].posted, datetime.datetime(2017, 11, 15, 21, 55, 44, tzinfo=timezone.utc)) self.assertEqual(stream[0].image_dict['small']['src'], "https://scontent-amt2-1.cdninstagram.com/t51.2885-15/s320x320/e35/c86.0.908.908/23507082_173663316554801_3781761610851287040_n.jpg" # NOQA ) self.assertEqual(stream[0].image_dict['thumb']['src'], "https://scontent-amt2-1.cdninstagram.com/t51.2885-15/s240x240/e35/c86.0.908.908/23507082_173663316554801_3781761610851287040_n.jpg" # NOQA ) self.assertEqual(stream[0].image_dict['medium']['src'], "https://scontent-amt2-1.cdninstagram.com/t51.2885-15/s480x480/e35/c86.0.908.908/23507082_173663316554801_3781761610851287040_n.jpg" # NOQA ) # The following data is not explicitly stored, but should still be accessible self.assertEqual(stream[0].code, "Bbh7J7JlCRn")
def test_feed(self, feed): self.assertIsNone(cache.get(self.cache_key)) stream = self.stream.get_items(config=self.feedconfig) self.assertIsNotNone(cache.get(self.cache_key)) self.assertEqual(len(stream), 25) for item in stream: self.assertIsInstance(item, FeedItem) self.assertEqual( stream[0].posted, datetime.datetime(2016, 10, 4, 14, 48, 9, tzinfo=timezone.utc)) self.assertEqual(stream[0].image_dict['thumb']['url'], "https://scontent.xx.fbcdn.net/v/t1.0-0/s130x130/14606290_1103282596374848_3084561525150401400_n.jpg?oh=4a993e12211341d2304724a5822b1fbf&oe=58628491" # NOQA ) # The following data is not explicitly stored, but should still be accessible self.assertEqual(stream[0].icon, "https://www.facebook.com/images/icons/photo.gif")
def timezone(self): """ Time zone for datetimes stored as naive values in the database. Returns a tzinfo object or None. This is only needed when time zone support is enabled and the database doesn't support time zones. (When the database supports time zones, the adapter handles aware datetimes so Django doesn't need to.) """ if not settings.USE_TZ: return None elif self.features.supports_timezones: return None elif self.settings_dict['TIME_ZONE'] is None: return timezone.utc else: # Only this branch requires pytz. return pytz.timezone(self.settings_dict['TIME_ZONE'])
def latest_post_date(self): """ Returns the latest item's pubdate or updateddate. If no items have either of these attributes this returns the current UTC date/time. """ latest_date = None date_keys = ('updateddate', 'pubdate') for item in self.items: for date_key in date_keys: item_date = item.get(date_key) if item_date: if latest_date is None or item_date > latest_date: latest_date = item_date # datetime.now(tz=utc) is slower, as documented in django.utils.timezone.now return latest_date or datetime.datetime.utcnow().replace(tzinfo=utc)
def timestamp_for_metadata(dt=None): """ Return a timestamp with a timezone for the configured locale. If all else fails, consider localtime to be UTC. Originally written by Marco Bonetti. """ dt = dt or datetime.datetime.now() if timezone is None: return dt.strftime('%Y-%m-%d %H:%M%z') if not dt.tzinfo: tz = timezone.get_current_timezone() if not tz: tz = timezone.utc dt = dt.replace(tzinfo=timezone.get_current_timezone()) return dt.strftime("%Y-%m-%d %H:%M%z")
def test_time_filtering(operator, staff_api_client, parking_factory, name): p1 = parking_factory( registration_number='ABC-123', time_start=datetime(2012, 1, 1, 12, 0, 0, tzinfo=utc), time_end=datetime(2014, 1, 1, 12, 0, 0, tzinfo=utc), operator=operator) p2 = parking_factory( registration_number='ABC-123', time_start=datetime(2014, 1, 1, 12, 0, 0, tzinfo=utc), time_end=datetime(2016, 1, 1, 12, 0, 0, tzinfo=utc), operator=operator) p3 = parking_factory(registration_number='ABC-123') (time, expected_parkings) = { 'before_all': ('2000-01-01T12:00:00Z', []), 'at_start_of_1st': ('2012-01-01T12:00:00Z', [p1]), 'after_start_of_1st': ('2012-01-01T12:00:01Z', [p1]), 'before_end_of_1st': ('2014-01-01T11:59:59Z', [p1]), 'between_1st_and_2nd': ('2014-01-01T12:00:00Z', [p1, p2]), 'after_start_of_2nd': ('2014-01-01T12:00:01Z', [p2]), 'before_end_of_2nd': ('2016-01-01T11:59:59Z', [p2]), 'at_end_of_2nd': ('2016-01-01T12:00:00Z', [p2]), 'less_than_15min_after_2nd': ('2016-01-01T12:14:59Z', [p2]), 'more_than_15min_after_2nd': ('2016-01-02T12:15:01Z', []), 'less_than_day_after_2nd': ('2016-01-01T22:00:00Z', []), 'more_than_day_after_2nd': ('2016-01-02T13:00:00Z', []), 'now': ('', [p3]), }[name] filtering = '&time={}'.format(time) if time else '' response = get(staff_api_client, list_url_for_abc + filtering) check_response_objects(response, expected_parkings)
def get_start(year, month, day): start = datetime.datetime(year, month, day, 0, 0, 0) return timezone.make_aware(start, timezone.utc)
def get_end(year, month, day): end = datetime.datetime(year, month, day, 23, 59, 59) return timezone.make_aware(end, timezone.utc)
def summarize(self, start, end): if not api.nova.extension_supported('SimpleTenantUsage', self.request): return if start <= end and start <= self.today: # The API can't handle timezone aware datetime, so convert back # to naive UTC just for this last step. start = timezone.make_naive(start, timezone.utc) end = timezone.make_naive(end, timezone.utc) try: self.usage_list = self.get_usage_list(start, end) except Exception: exceptions.handle(self.request, _('Unable to retrieve usage information.')) elif end < start: messages.error(self.request, _("Invalid time period. The end date should be " "more recent than the start date.")) elif start > self.today: messages.error(self.request, _("Invalid time period. You are requesting " "data from the future which may not exist.")) for project_usage in self.usage_list: project_summary = project_usage.get_summary() for key, value in project_summary.items(): self.summary.setdefault(key, 0) self.summary[key] += value
def canonical_time(self): if settings.USE_TZ: return int((self.uploaded_at - datetime(1970, 1, 1, tzinfo=timezone.utc)).total_seconds()) else: return int((self.uploaded_at - datetime(1970, 1, 1)).total_seconds())
def _last_modification(self): """ Return the modification time of the file storing the session's content. """ modification = os.stat(self._key_to_file()).st_mtime if settings.USE_TZ: modification = datetime.datetime.utcfromtimestamp(modification) modification = modification.replace(tzinfo=timezone.utc) else: modification = datetime.datetime.fromtimestamp(modification) return modification
def set_cookie(self, key, value='', max_age=None, expires=None, path='/', domain=None, secure=False, httponly=False): """ Sets a cookie. ``expires`` can be: - a string in the correct format, - a naive ``datetime.datetime`` object in UTC, - an aware ``datetime.datetime`` object in any time zone. If it is a ``datetime.datetime`` object then ``max_age`` will be calculated. """ value = force_str(value) self.cookies[key] = value if expires is not None: if isinstance(expires, datetime.datetime): if timezone.is_aware(expires): expires = timezone.make_naive(expires, timezone.utc) delta = expires - expires.utcnow() # Add one second so the date matches exactly (a fraction of # time gets lost between converting to a timedelta and # then the date string). delta = delta + datetime.timedelta(seconds=1) # Just set max_age - the max_age logic will set expires. expires = None max_age = max(0, delta.days * 86400 + delta.seconds) else: self.cookies[key]['expires'] = expires if max_age is not None: self.cookies[key]['max-age'] = max_age # IE requires expires, so set it if hasn't been already. if not expires: self.cookies[key]['expires'] = cookie_date(time.time() + max_age) if path is not None: self.cookies[key]['path'] = path if domain is not None: self.cookies[key]['domain'] = domain if secure: self.cookies[key]['secure'] = True if httponly: self.cookies[key]['httponly'] = True
def adapt_datetime_warn_on_aware_datetime(value): # Remove this function and rely on the default adapter in Django 2.0. if settings.USE_TZ and timezone.is_aware(value): warnings.warn( "The SQLite database adapter received an aware datetime (%s), " "probably from cursor.execute(). Update your code to pass a " "naive datetime in the database connection's time zone (UTC by " "default).", RemovedInDjango20Warning) # This doesn't account for the database connection's timezone, # which isn't known. (That's why this adapter is deprecated.) value = value.astimezone(timezone.utc).replace(tzinfo=None) return value.isoformat(str(" "))
def typecast_timestamp(s): # does NOT store time zone information # "2005-07-29 15:48:00.590358-05" # "2005-07-29 09:56:00-05" if not s: return None if ' ' not in s: return typecast_date(s) d, t = s.split() # Extract timezone information, if it exists. Currently we just throw # it away, but in the future we may make use of it. if '-' in t: t, tz = t.split('-', 1) tz = '-' + tz elif '+' in t: t, tz = t.split('+', 1) tz = '+' + tz else: tz = '' dates = d.split('-') times = t.split(':') seconds = times[2] if '.' in seconds: # check whether seconds have a fractional part seconds, microseconds = seconds.split('.') else: microseconds = '0' tzinfo = utc if settings.USE_TZ else None return datetime.datetime(int(dates[0]), int(dates[1]), int(dates[2]), int(times[0]), int(times[1]), int(seconds), int((microseconds + '000000')[:6]), tzinfo)
def serialize_datetime(value): """ Returns a serialized version of a datetime object that is valid, executable python code. It converts timezone-aware values to utc with an 'executable' utc representation of tzinfo. """ if value.tzinfo is not None and value.tzinfo != utc: value = value.astimezone(utc) value_repr = repr(value).replace("<UTC>", "utc") if isinstance(value, datetime_safe.datetime): value_repr = "datetime.%s" % value_repr return value_repr