我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.utils.dateparse.parse_datetime()。
def test_stop_entry(self): DESCRIPTION = 'EXAMPLE' startTime = timezone.now() currentEntry = TimeEntry(user=self.TestUser, description=DESCRIPTION, start=startTime) currentEntry.save() url = reverse("api:time-entry-stop") data = {} response = self.client.post(url, data) response_start_time = dateparse.parse_datetime(response.data['start']) response_stop_time = dateparse.parse_datetime(response.data['stop']) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data['description'], DESCRIPTION) self.assertEqual(response_start_time, startTime) self.assertGreater(response_stop_time, response_start_time) self.assertEqual(response.data['duration'],(response_stop_time- response_start_time).total_seconds())
def update_segment(lsegment, segments): s = find(lsegment, segments) segment, created=Segment.objects.get_or_create( departure_place=Place.objects.get(pk=s['OriginStation']), arrival_place=Place.objects.get(pk=s['DestinationStation']), departure=UTC.localize(parse_datetime(s['DepartureDateTime']), is_dst=True), arrival=UTC.localize(parse_datetime(s['ArrivalDateTime']), is_dst=True), duration=s['Duration'], directionality=s['Directionality'], journey_mode=JourneyMode.objects.get_or_create(name=s['JourneyMode'])[0], flight_number=s['FlightNumber'], carrier=Carrier.objects.get(pk=s['Carrier']), operating_carrier=Carrier.objects.get(pk=s['OperatingCarrier']) ) return segment
def handle(self, *args, **kwargs): # Reset all sql deletes to None Instance.objects.exclude( deleted_at=None, xform__downloadable=True).update(deleted_at=None) # Get all mongo deletes query = '{"$and": [{"_deleted_at": {"$exists": true}}, ' \ '{"_deleted_at": {"$ne": null}}]}' query = json.loads(query) xform_instances = settings.MONGO_DB.instances cursor = xform_instances.find(query) for record in cursor: # update sql instance with deleted_at datetime from mongo try: i = Instance.objects.get( uuid=record["_uuid"], xform__downloadable=True) except Instance.DoesNotExist: continue else: deleted_at = parse_datetime(record["_deleted_at"]) if not timezone.is_aware(deleted_at): deleted_at = timezone.make_aware( deleted_at, timezone.utc) i.set_deleted(deleted_at)
def dispatch(self, request, *args, **kwargs): ''' Require session data to be set to proceed, otherwise go back to step 1. Because they have the same expiration date, this also implies that the TemporaryRegistration object is not yet expired. ''' if REG_VALIDATION_STR not in request.session: return HttpResponseRedirect(reverse('registration')) try: self.temporaryRegistration = TemporaryRegistration.objects.get( id=self.request.session[REG_VALIDATION_STR].get('temporaryRegistrationId') ) except ObjectDoesNotExist: messages.error(request,_('Invalid registration identifier passed to sign-up form.')) return HttpResponseRedirect(reverse('registration')) expiry = parse_datetime( self.request.session[REG_VALIDATION_STR].get('temporaryRegistrationExpiry',''), ) if not expiry or expiry < timezone.now(): messages.info(request,_('Your registration session has expired. Please try again.')) return HttpResponseRedirect(reverse('registration')) return super(StudentInfoView,self).dispatch(request,*args,**kwargs)
def dispatch(self,request,*args,**kwargs): ''' Handle the session data passed by the prior view. ''' lessonSession = request.session.get(PRIVATELESSON_VALIDATION_STR,{}) try: self.lesson = PrivateLessonEvent.objects.get(id=lessonSession.get('lesson')) except (ValueError, ObjectDoesNotExist): messages.error(request,_('Invalid lesson identifier passed to sign-up form.')) return HttpResponseRedirect(reverse('bookPrivateLesson')) expiry = parse_datetime(lessonSession.get('expiry',''),) if not expiry or expiry < timezone.now(): messages.info(request,_('Your registration session has expired. Please try again.')) return HttpResponseRedirect(reverse('bookPrivateLesson')) self.payAtDoor = lessonSession.get('payAtDoor',False) return super(PrivateLessonStudentInfoView,self).dispatch(request,*args,**kwargs)
def deserialize_instance(model, data): ret = model() for k, v in data.items(): if v is not None: try: f = model._meta.get_field(k) if isinstance(f, DateTimeField): v = dateparse.parse_datetime(v) elif isinstance(f, TimeField): v = dateparse.parse_time(v) elif isinstance(f, DateField): v = dateparse.parse_date(v) elif isinstance(f, BinaryField): v = force_bytes( base64.b64decode( force_bytes(v))) except FieldDoesNotExist: pass setattr(ret, k, v) return ret
def __init__(self, manifest, manifest_url): dict.__init__(self) self['manifest'] = manifest self['manifest_url'] = manifest_url self['uuid'] = manifest['uuid'] self['name'] = manifest['name'] self['version'] = manifest.get('version', '') self['created'] = parse_datetime(manifest.get('published_at', '')) or '' self['ostype'] = Image.os_to_ostype(manifest) self['desc'] = manifest.get('description', '') self['homepage'] = manifest.get('homepage') self['size'] = manifest.get('image_size', Image.DEFAULT_SIZE) self['state'] = manifest.get('state', '') try: self['download_size'] = manifest['files'][0]['size'] except (KeyError, IndexError): self['download_size'] = 0
def date_condition(date_or_str, **kwargs): """ Does the current date match the given date? date_or_str is either a date object or an ISO 8601 string """ try: date = dateparse.parse_datetime(date_or_str) except TypeError: date = date_or_str now = timezone.now() try: date_test = (now >= date) except TypeError: date_test = False return date_test
def datetime_diff_seconds(older_time, newer_time=None): """ Return the seconds elapsed between older_time and newer_time. If newer_time is unset, return the seconds elapsed between older_time and now. older_time and newer_time are expected to be ISO-formatted datetime strings. """ older_datetime = parse_datetime(older_time) if not older_datetime.tzinfo: # if no timezone set (naive datetime) we'll assume it to be UTC. older_datetime = pytz.UTC.localize(older_datetime) newer_datetime = parse_datetime(newer_time if newer_time else localized_datetime_string_now()) if not newer_datetime.tzinfo: newer_datetime = pytz.UTC.localize(newer_datetime) return (newer_datetime - older_datetime).total_seconds()
def test_earliest_start_date(self): program = { "courses": [ { "course_runs": [ { "start": "2016-01-01T00:00:00Z", }, { "start": "2017-01-01T00:00:00Z", } ] } ] } assert get_earliest_start_date_from_program(program) == parse_datetime('2016-01-01 00:00:00+0000')
def comp_sync(): for round in range(get_current_round(), ROUNDS+1): logger.info("Fetching round %s/%s...", round, ROUNDS) start = datetime.now() r = requests.get("http://api.stats.foxsports.com.au/3.0/api/sports/league/series/1/seasons/115/rounds/"+str(round)+"/fixturesandresultswithbyes.json?userkey=A00239D3-45F6-4A0A-810C-54A347F144C2") logger.info("%s", r.text) for game in json.loads(r.text): logger.info("%s", game["fixture_id"]) stored_game = Game.objects.get(fixture_id=game["fixture_id"]) logger.info("Syncing game %s vs. %s", str(stored_game.home_team), str(stored_game.away_team)) if stored_game.start_time != parse_datetime(game["match_start_date"]): logger.info("Start time has changed... updating") stored_game.start_time = parse_datetime(game["match_start_date"]) if stored_game.stadium != game["venue"]["name"]: logger.info("Venue has changed... updating") stored_game.stadium = game["venue"]["name"] stored_game.save() end = datetime.now() elapsed_time = end-start if elapsed_time.total_seconds()<5: time.sleep(5 - elapsed_time.total_seconds())
def get_govuk_capture_time(self, govuk_payment): try: capture_submit_time = parse_datetime( govuk_payment['settlement_summary'].get('capture_submit_time', '') ) captured_date = parse_date( govuk_payment['settlement_summary'].get('captured_date', '') ) if captured_date is not None: capture_submit_time = ( capture_submit_time or timezone.now() ).astimezone(timezone.utc) if capture_submit_time.date() < captured_date: return datetime.combine( captured_date, time.min ).replace(tzinfo=timezone.utc) elif capture_submit_time.date() > captured_date: return datetime.combine( captured_date, time.max ).replace(tzinfo=timezone.utc) else: return capture_submit_time except (KeyError, TypeError): pass raise GovUkPaymentStatusException( 'Capture date not yet available for payment %s' % govuk_payment['reference'] )
def test_has_current_entry(self): '''An entry exists''' startTime = timezone.now() currentEntry = TimeEntry(user=self.TestUser, description="Example", start=startTime) currentEntry.save() url = reverse("api:time-entry-current") data = {} response = self.client.get(url, data) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data['description'], "Example") self.assertEqual(dateparse.parse_datetime(response.data['start']), startTime) self.assertEqual(response.data['stop'], None) self.assertGreater(response.data['duration'], 0)
def test_starts_entry_if_none_exists(self): DESCRIPTION = 'description' startTime = timezone.now() url = reverse("api:time-entry-start") data = {'description': DESCRIPTION} response = self.client.post(url, data) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data['description'], DESCRIPTION) self.assertGreater(dateparse.parse_datetime(response.data['start']), startTime) self.assertEqual(response.data['stop'], None) self.assertGreater(response.data['duration'], 0)
def test_start_entry_with_current_running(self): DESCRIPTION = 'EXAMPLE' startTime = timezone.now() currentEntry = TimeEntry(user=self.TestUser, description=DESCRIPTION, start=startTime) currentEntry.save() NEWDESCRIPTION = "NEW DESCRIPTION" url = reverse("api:time-entry-start") data = {'description': NEWDESCRIPTION } response = self.client.post(url, data) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data['description'], NEWDESCRIPTION) self.assertEqual(response.data['stop'], None) self.assertGreater(dateparse.parse_datetime(response.data['start']), startTime) self.assertGreater(response.data['duration'], 0) url = reverse("api:time-entry-get", args=(currentEntry.id,)) response = self.client.get(url, data) response_start_time = dateparse.parse_datetime(response.data['start']) response_stop_time = dateparse.parse_datetime(response.data['stop']) response_duration = response.data['duration'] self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data['description'], DESCRIPTION) self.assertEqual(response.data['user'], self.TestUser.id) self.assertEqual(response_start_time, startTime) self.assertGreater(response_stop_time, response_start_time) self.assertEqual(response_duration,(response_stop_time- response_start_time).total_seconds()) response = self.client.get(url, data) self.assertEqual(response_duration, response.data['duration'], "We checked the duration twice but it changed from %s to %s" % (response_duration, response.data['duration']))
def convert_datetimefield_value(self, value, expression, connection, context): if value is not None: if not isinstance(value, datetime.datetime): value = parse_datetime(value) if settings.USE_TZ: value = timezone.make_aware(value, self.connection.timezone) return value
def parse_datetime(s): if not s: return s if ciso8601: return ciso8601.parse_datetime(s) return dateparse.parse_datetime(s)
def _date_from_string(self, field_name, val): try: setattr(self, field_name, dateparse.parse_datetime(val)) except Exception as e: setattr(self, field_name, val) logger.warning('can not parse date (raw value used) %s: %s', field_name, e)
def get_legs(result): legs = result['Legs'] segments = result['Segments'] for leg in legs: l, created = Leg.objects.get_or_create( id = leg['Id'], departure_place = Place.objects.get(pk=leg['OriginStation']), arrival_place = Place.objects.get(pk=leg['DestinationStation']), departure = UTC.localize(parse_datetime(leg['Departure']), is_dst=True), arrival = UTC.localize(parse_datetime(leg['Arrival']), is_dst=True), duration = leg['Duration'], directionality = leg['Directionality'], journey_mode = JourneyMode.objects.get_or_create(name=leg['JourneyMode'])[0] ) carriers = leg['Carriers'] ocarriers = leg['OperatingCarriers'] stops = leg['Stops'] lsegments = leg['SegmentIds'] for carrier in carriers: l.carriers.add(Carrier.objects.get(pk=carrier)) for ocarrier in ocarriers: l.operating_carriers.add(Carrier.objects.get(pk=ocarrier)) for stop in stops: if stop != 0: l.stops.add(Place.objects.get(pk=stop)) for lsegment in lsegments: l.segments.add(update_segment(lsegment, segments)) l.save()
def test_clean_dates(self): """ Cleaning models dates follow different type of rules, also depending on event """ event = Event.objects.get(title='PyCon SK 2016') # PyCon SK 2016 is in the past tt = TicketType(title='Test Ticket', price=12, event=event) # Past event and no dates raises error self.assertRaises(ValidationError, tt.clean) # Past event and dates after it raises error tt.date_from = now tt.date_to = now + 3 * day self.assertRaises(ValidationError, tt.clean) tt.date_from = parse_datetime('2016-01-11T09:00:00Z') self.assertRaises(ValidationError, tt.clean) # End date can not be before start date tt.date_to = tt.date_from - 3 * day self.assertRaises(ValidationError, tt.clean) # Correct data in the past before event, SAVE. tt.date_to = tt.date_from + 9 * day tt.clean() tt.save() # PyCon SK 2054 is in the future future_event = Event.objects.create( title='PyCon SK 2054', description='test', event_type=Event.MEETUP, status=Event.PUBLISHED, location=event.location, cfp_end=distant_future - 7 * day, date_from=distant_future, date_to=distant_future + 7 * day) ftt = TicketType(title='Test Future Ticket', price=120, event=future_event) # Future event pre-populate the dates ftt.clean() self.assertEquals(abs(ftt.date_from - timezone.now()).seconds, 0) self.assertEquals(abs(ftt.date_to - ftt.event.date_to).seconds, 0) ftt.save()
def convert_time_from_deployd(d_time): t_time = parse_datetime(d_time) tzchina = timezone('Asia/Shanghai') utc = timezone('UTC') t_time = t_time.replace(tzinfo=utc).astimezone(tzchina) try: return t_time.strftime('%Y-%m-%d %H:%M:%S') except Exception as e: logger.error("strftime error:%s d_time:%s", str(e), d_time) return orc_convert_time_from_deployd(d_time)
def dateStringsToQ(self, field_name, date_from_str, date_to_str): """ Convert the date strings from_date_str and to_date_str into a set of args in the form {'<field_name>__gte': <date from>, '<field_name>__lte': <date to>} where date_from and date_to are Django-timezone-aware dates; then convert that into a Django Q object Returns the Q object based on those criteria """ # one of the values required for the filter is missing, so set # it to the one which was supplied if date_from_str == '': date_from_str = date_to_str elif date_to_str == '': date_to_str = date_from_str date_from_naive = dateparse.parse_datetime(date_from_str + ' 00:00:00') date_to_naive = dateparse.parse_datetime(date_to_str + ' 23:59:59') tz = timezone.get_default_timezone() date_from = timezone.make_aware(date_from_naive, tz) date_to = timezone.make_aware(date_to_naive, tz) args = {} args[field_name + '__gte'] = date_from args[field_name + '__lte'] = date_to return Q(**args)
def to_python_datetime(value): datetime = parse_datetime(value) if value and not datetime: raise ValueError('Can\'t convert "{}" to datetime'.format(value)) return datetime
def _parse_datetime(self, strdate): tz = pytz.timezone(self.event.timezone) obj = parse_datetime(strdate) assert obj if obj.tzinfo is None: obj = tz.localize(obj) return obj
def parse_datetime_with_timezone_support(value): dt = parse_datetime(value) # Confirm that dt is naive before overwriting its tzinfo. if dt is not None and settings.USE_TZ and timezone.is_naive(dt): dt = dt.replace(tzinfo=timezone.utc) return dt
def insert(self, samples): try: outdated = Stats.insert((id, dateparse.parse_datetime(dt), value) for id, dt, value in samples) except db.IntegrityError: log.error("Duplicate stats insert: " + db.connection.queries[-1]['sql']) db.transaction.rollback() # allow future stats to still work except: log.error("Error handling stats insert: " + traceback.format_exc()) else: if outdated: log.warn("Outdated samples ignored: {0}".format(outdated))
def setUp(self): self._create_user_and_login() self.fixture_dir = os.path.join( self.this_directory, 'fixtures', 'csv_export') self._submission_time = parse_datetime('2013-02-18 15:54:01Z')
def setUp(self): super(TestExports, self).setUp() self._submission_time = parse_datetime('2013-02-18 15:54:01Z')
def setUp(self): self._create_user_and_login() self._submission_time = parse_datetime('2013-02-18 15:54:01Z')
def dispatch(self,request,*args,**kwargs): ''' Always check that the temporary registration has not expired ''' regSession = self.request.session.get(REG_VALIDATION_STR,{}) if not regSession: return HttpResponseRedirect(reverse('registration')) try: reg = TemporaryRegistration.objects.get( id=self.request.session[REG_VALIDATION_STR].get('temporaryRegistrationId') ) except ObjectDoesNotExist: messages.error(request,_('Invalid registration identifier passed to summary view.')) return HttpResponseRedirect(reverse('registration')) expiry = parse_datetime( self.request.session[REG_VALIDATION_STR].get('temporaryRegistrationExpiry',''), ) if not expiry or expiry < timezone.now(): messages.info(request,_('Your registration session has expired. Please try again.')) return HttpResponseRedirect(reverse('registration')) # If OK, pass the registration and proceed kwargs.update({ 'reg': reg, }) return super(RegistrationSummaryView,self).dispatch(request, *args, **kwargs)
def to_internal_value(self, value): input_formats = getattr(self, 'input_formats', api_settings.DATETIME_INPUT_FORMATS) if isinstance(value, datetime.date) and not isinstance(value, datetime.datetime): self.fail('date') if isinstance(value, datetime.datetime): return self.enforce_timezone(value) for input_format in input_formats: if input_format.lower() == ISO_8601: try: parsed = parse_datetime(value) except (ValueError, TypeError): pass else: if parsed is not None: return self.enforce_timezone(parsed) else: try: parsed = self.datetime_parser(value, input_format) except (ValueError, TypeError): pass else: return self.enforce_timezone(parsed) humanized_format = humanize_datetime.datetime_formats(input_formats) self.fail('invalid', format=humanized_format)
def _getFeeds(self, user, time, fields=None): returnvalue = [] data = { 'access_token': user.access_token, 'fields': fields, 'limit': 1, } fb_request_url = Config.get("API_BASE_URI") + "/me/feed" fb_user_last_post_id = user.last_post_id fb_user_last_post_time = user.last_post_time try: resp = requests.get(fb_request_url, params=data) break_loop = False while resp.ok and not break_loop: for feed in resp.json()['data']: log.debug(feed) if feed['id'] == user.last_post_id \ or parse_datetime(feed['created_time']) <= user.last_post_time \ or feed['id'] == fb_user_last_post_id: break_loop = True break returnvalue.append(feed) if fb_user_last_post_time < parse_datetime(feed['created_time']): fb_user_last_post_id = feed['id'] fb_user_last_post_time = parse_datetime(feed['created_time']) resp = requests.get(resp.json()['paging']['next'], timeout=5.00) except requests.exceptions.RequestException: pass user.last_post_id = fb_user_last_post_id user.last_post_time = fb_user_last_post_time user.save() return returnvalue[::-1]
def test_get_feeds(self, mock_get): fb_response_fields = 'message,actions,full_picture,picture,from,created_time,' \ 'link,permalink_url,type,description,source,object_id' data = {'paging': {'next': 'https://graph.facebook.com/v2.7/1ZD', 'previous': 'https://graph.facebook.com/v2.7/101' }, 'data': [{'message': 'Tet', 'id': '101915710270588_133598340435658', 'created_time': '2016-10-04T15:26:44+0000', }] } mock_get.return_value = MagicMock() mock_get.return_value.json = MagicMock() mock_get.return_value.json.return_value = data mock_get.return_value.ok = True self.facebook_account.last_post_id = 1 self.facebook_account.last_post_time = parse_datetime('2016-01-04T15:26:44+0000') self.facebook_account.save() resp = self.channel._getFeeds(self.facebook_account, None, fb_response_fields) self.assertEqual(data['data'], resp) mock_get.side_effect = requests.exceptions.ConnectTimeout resp = self.channel._getFeeds(self.facebook_account, None, fb_response_fields) self.assertEqual([], resp)
def string_to_date(value): return parse_datetime(value)
def from_native(self, value): if value in validators.EMPTY_VALUES: return None if isinstance(value, datetime.datetime): return value if isinstance(value, datetime.date): value = datetime.datetime(value.year, value.month, value.day) if settings.USE_TZ: # For backwards compatibility, interpret naive datetimes in # local time. This won't work during DST change, but we can't # do much about it, so we let the exceptions percolate up the # call stack. warnings.warn("DateTimeField received a naive datetime (%s)" " while time zone support is active." % value, RuntimeWarning) default_timezone = timezone.get_default_timezone() value = timezone.make_aware(value, default_timezone) return value for fmt in self.input_formats: if fmt.lower() == ISO_8601: try: parsed = parse_datetime(value) except (ValueError, TypeError): pass else: if parsed is not None: return parsed else: try: parsed = datetime.datetime.strptime(value, fmt) except (ValueError, TypeError): pass else: return parsed msg = self.error_messages['invalid'] % readable_datetime_formats(self.input_formats) raise ValidationError(msg)
def dtparse(s): return parse_datetime(s)
def _github_repos(self): repos = cache.get('github_projects', [])[:6] for repo in repos: repo['updated_at'] = dateparse.parse_datetime(repo['updated_at']) return repos
def _convert(self, value: Any): if self.input_format is not None: return datetime.datetime.strptime(value, self.input_format) else: return parse_datetime(value)
def convert_datetimefield_value(self, value, expression, connection, context): if value is not None: if not isinstance(value, datetime.datetime): value = parse_datetime(value) if settings.USE_TZ and not timezone.is_aware(value): value = timezone.make_aware(value, self.connection.timezone) return value