我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用iso8601.parse_date()。
def cast_primitive_value(spec, value): format = spec.get('format') type = spec.get('type') if type == 'boolean': return (force_text(value).lower() in ('1', 'yes', 'true')) if type == 'integer' or format in ('integer', 'long'): return int(value) if type == 'number' or format in ('float', 'double'): return float(value) if format == 'byte': # base64 encoded characters return base64.b64decode(value) if format == 'binary': # any sequence of octets return force_bytes(value) if format == 'date': # ISO8601 date return iso8601.parse_date(value).date() if format == 'dateTime': # ISO8601 datetime return iso8601.parse_date(value) if type == 'string': return force_text(value) return value
def _process_event(self, repo, event): """Process potentially new event for repository :param repo: Repository related to event :type repo: ``repocribro.models.Repository`` :param event: GitHub event data :type event: dict :return: If the event was new or already registered before :rtype: bool """ last = pytz.utc.localize(repo.last_event) if iso8601.parse_date(event['created_at']) <= last: return False hook_type = self.event2webhook.get(event['type'], 'uknown') for event_processor in self.hooks.get(hook_type, []): try: event_processor(db=self.db, repo=repo, payload=event['payload'], actor=event['actor']) print('Processed {} from {} event for {}'.format( event['type'], event['created_at'], repo.full_name )) except HTTPException: print('Error while processing #{}'.format(event['id'])) return True
def _is_valid_type(self, t, value): try: if t == 'number': float(value) elif t == 'integer': int(value) elif t == 'boolean': assert type(value) == bool elif t == 'timestamp': iso8601.parse_date(value) elif t == 'date': iso8601.parse_date(value + 'T00:00:00Z') elif t == 'string': # Allow coercing ints/floats, but nothing else assert type(value) in [str, int, float] except: return False return True
def linear_check(self): """Action to wait until all checks are done Each check has own timeout equals 300 secs""" creation_dates = [] for item in self.__linear_order: resource_repr = getattr(item, '__resource_repr') helpers.wait( lambda: self._linear_check(item), timeout=300, interval=2, timeout_msg="{} creation timeout reached".format(resource_repr) ) k8s_obj = self.get_k8s_object(resource_repr) creation_date = iso8601.parse_date( k8s_obj.metadata.creation_timestamp) creation_dates.append(creation_date) if len(creation_dates) > 1: assert creation_dates[-2] <= creation_dates[-1], ( "The order of linear objects is broken!") LOG.info("Linear check passed!")
def parse_operators(args): """ Avoid problem that mongoengine for some reason the operators gte, gt, lt, lte doesn't work with dates in isoformat """ args = {k: v for k, v in args.items() if k not in ['skip', 'limit']} for k, v in args.items(): try: is_data = iso8601.parse_date(v) except: is_data = False if is_data: args[k] = is_data if is_data else v return args
def make_node(cls, data): try: key = data['key'] except KeyError: key, kwargs = u'/', {} else: kwargs = {'modified_index': int(data['modifiedIndex']), 'created_index': int(data['createdIndex'])} ttl = data.get('ttl') if ttl is not None: expiration = iso8601.parse_date(data['expiration']) kwargs.update(ttl=ttl, expiration=expiration) if 'value' in data: node_cls = Value args = (data['value'],) elif data.get('dir', False): node_cls = Directory args = ([cls.make_node(n) for n in data.get('nodes', ())],) else: node_cls, args = Node, () return node_cls(key, *args, **kwargs)
def run(self, query, args): self.start = args.get(constants.PARAM_START_DATE) self.end = args.get(constants.PARAM_END_DATE) self.validate_start_end_dates() if self.errors: return start_date, end_date = None, None if self.start: start_date = str(iso8601.parse_date(self.start).date()) if self.end: end_date = iso8601.parse_date(self.end).date() if '-' not in self.end: # Solo especifica año end_date = datetime.date(end_date.year, 12, 31) if self.end.count('-') == 1: # Especifica año y mes # Obtengo el último día del mes, monthrange devuelve # tupla (month, last_day) days = monthrange(end_date.year, end_date.month)[1] end_date = datetime.date(end_date.year, end_date.month, days) query.add_filter(start_date, end_date)
def validate_date(self, _date, param): """Valida y parsea la fecha pasada. Args: _date (str): date string, ISO 8601 param (str): Parámetro siendo parseado Returns: date con la fecha parseada Raises: ValueError: si el formato no es válido """ try: parsed_date = iso8601.parse_date(_date) except iso8601.ParseError: self._append_error(strings.INVALID_DATE.format(param, _date)) raise ValueError return parsed_date
def test_add_collapse(self): """Testea que luego de agregar un collapse default, los resultados sean anuales, es decir cada uno a un año de diferencia con su anterior""" self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity) self.query.add_collapse() data = self.query.run() prev_timestamp = None for row in data: timestamp = row[0] parsed_timestamp = iso8601.parse_date(timestamp) if not prev_timestamp: prev_timestamp = parsed_timestamp continue delta = relativedelta(parsed_timestamp, prev_timestamp) self.assertTrue(delta.years == 1, timestamp) prev_timestamp = parsed_timestamp
def test_query_fills_nulls(self): self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity) self.query.add_series(self.delayed_series, self.rep_mode, self.series_periodicity) query = ESQuery(index=settings.TEST_INDEX) query.add_series(self.single_series, self.rep_mode, self.series_periodicity) query.sort('asc') delayed_first_date = iso8601.parse_date(query.run()[0][0]) data = self.query.run() delayed_series_index = 1 # Primera serie agregada for row in data: current_date = iso8601.parse_date(row[0]) if current_date < delayed_first_date: self.assertEqual(row[delayed_series_index], None) else: break
def test_query_fills_nulls_second_series(self): self.query.add_series(self.delayed_series, self.rep_mode, self.series_periodicity) self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity) query = ESQuery(index=settings.TEST_INDEX) query.add_series(self.single_series, self.rep_mode, self.series_periodicity) query.sort('asc') delayed_first_date = iso8601.parse_date(query.run()[0][0]) data = self.query.run() delayed_series_index = 2 # Segunda serie agregada for row in data: current_date = iso8601.parse_date(row[0]) if current_date < delayed_first_date: self.assertEqual(row[delayed_series_index], None) else: break
def test_end_of_period(self): query = ESQuery(index=settings.TEST_INDEX) query.add_series(self.single_series, self.rep_mode, self.series_periodicity) query.add_pagination(start=0, limit=1000) query.sort('asc') query.add_filter(start="1970") orig_data = query.run() self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity, 'end_of_period') self.query.add_filter(start="1970") self.query.add_collapse('year') eop_data = self.query.run() for eop_row in eop_data: eop_value = eop_row[1] year = iso8601.parse_date(eop_row[0]).year for row in orig_data: row_date = iso8601.parse_date(row[0]) if row_date.year == year and row_date.month == 12: self.assertAlmostEqual(eop_value, row[1], 5) # EOP trae pérdida de precisión break
def test_query_fills_nulls(self): self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity) self.query.add_series(self.delayed_series, self.rep_mode, self.series_periodicity) query = ESQuery(index=settings.TEST_INDEX) query.add_series(self.single_series, self.rep_mode, self.series_periodicity) query.sort('asc') delayed_first_date = iso8601.parse_date(query.run()[0][0]) self.query.sort('asc') data = self.query.run() delayed_series_index = 1 # Primera serie agregada for row in data: current_date = iso8601.parse_date(row[0]) if current_date < delayed_first_date: self.assertEqual(row[delayed_series_index], None) else: break
def test_query_fills_nulls_second_series(self): self.query.add_series(self.delayed_series, self.rep_mode, self.series_periodicity) self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity) query = ESQuery(index=settings.TEST_INDEX) query.add_series(self.single_series, self.rep_mode, self.series_periodicity) query.sort('asc') delayed_first_date = iso8601.parse_date(query.run()[0][0]) self.query.sort('asc') data = self.query.run() delayed_series_index = 2 # Segunda serie agregada for row in data: current_date = iso8601.parse_date(row[0]) if current_date < delayed_first_date: self.assertEqual(row[delayed_series_index], None) else: break
def test_index_continuity(self): self.query.add_series(self.delayed_series, self.rep_mode, self.series_periodicity) self.query.add_series(self.single_series, self.rep_mode, self.series_periodicity) self.query.add_filter(start="1910", end="1920") self.query.sort('asc') query = ESQuery(index=settings.TEST_INDEX) query.add_series(self.single_series, self.rep_mode, self.series_periodicity) query.add_filter(start="1921") # Garantiza datos vacíos entre 1920-1921 query.add_pagination(start=0, limit=1000) query.sort('asc') data = self.query.run() current_date = iso8601.parse_date(data[0][0]) for row in data[1:]: row_date = iso8601.parse_date(row[0]) self.assertEqual(current_date + relativedelta(months=1), row_date) current_date = row_date
def get_times(request): """Gets start and endtime from request As we use no timezone in NAV, remove it from parsed timestamps :param request: django.http.HttpRequest """ starttime = request.GET.get('starttime') endtime = request.GET.get('endtime') try: if starttime: starttime = iso8601.parse_date(starttime).replace(tzinfo=None) if endtime: endtime = iso8601.parse_date(endtime).replace(tzinfo=None) except iso8601.ParseError: raise Iso8601ParseError return starttime, endtime
def get_timestamp(filename): class GMLHandler(xml.sax.ContentHandler): timestamp = None def startElement(self, name, attrs): if name == "wfs:FeatureCollection": self.timestamp = attrs['timeStamp'] handler = GMLHandler() parser = xml.sax.make_parser() parser.setContentHandler(handler) parser.parse(filename) timestamp = iso8601.parse_date(handler.timestamp, default_timezone=None) return pytz.timezone(settings.TIME_ZONE).localize(timestamp)
def _test_get_job_destruction(self, username): ''' GET /{jobs}/{job-id}/destruction returns the destruction instant for {job-id} as [std:iso8601]. ''' for job in self.jobs: url = reverse(self.url_names['destruction'], kwargs={'pk': job.pk}) response = self.client.get(url) if username == 'user': self.assertEqual(response.status_code, 200) if job.destruction_time: destruction_time = iso8601.parse_date(response.content.decode()) self.assertEqual(destruction_time, job.destruction_time) else: self.assertEqual(response.content.decode(), '') else: self.assertEqual(response.status_code, 404)
def _test_post_job_destruction(self, username): ''' POST /{jobs}/{job-id}/destruction with DESTRUCTION={std:iso8601} (application/x-www-form-urlencoded) sets the destruction instant for {job-id} and redirects to /{jobs}/{job-id} as 303. ''' destruction_time = '2016-01-01T00:00:00' for job in self.jobs: url = reverse(self.url_names['destruction'], kwargs={'pk': job.pk}) response = self.client.post(url, urlencode({'DESTRUCTION': destruction_time}), content_type='application/x-www-form-urlencoded') if username == 'user': redirect_url = 'http://testserver' + reverse(self.url_names['detail'], kwargs={'pk': job.pk}) self.assertRedirects(response, redirect_url, status_code=303) self.assertEqual( self.jobs.get(pk=job.pk).destruction_time, iso8601.parse_date('2016-01-01T00:00:00') ) else: self.assertEqual(response.status_code, 404)
def filter_queryset(self, request, queryset, view): query_dict = make_query_dict_upper_case(request.GET) # apply only for list if view.action == 'list_jobs': phases = query_dict.getlist('PHASE') if phases: queryset = queryset.filter(phase__in=phases) else: queryset = queryset.exclude(phase__exact=Job.PHASE_ARCHIVED) after = query_dict.get('AFTER') if after: queryset = queryset.filter(creation_time__gt=iso8601.parse_date(after)) last = query_dict.get('LAST') if last: queryset = queryset.filter(start_time__isnull=False) \ .order_by('-start_time')[:int(last)] return queryset
def _timestamp_parse(ts: ConvertableTimestamp) -> datetime: """ Takes something representing a timestamp and returns a timestamp in the representation we want. """ if isinstance(ts, str): ts = iso8601.parse_date(ts) # Set resolution to milliseconds instead of microseconds # (Fixes incompability with software based on unix time, for example mongodb) ts = ts.replace(microsecond=int(ts.microsecond / 1000) * 1000) # Add timezone if not set if not ts.tzinfo: # Needed? All timestamps should be iso8601 so ought to always contain timezone. # Yes, because it is optional in iso8601 logger.warning("timestamp without timezone found, using UTC: {}".format(ts)) ts = ts.replace(tzinfo=timezone.utc) return ts
def test_create_auction_auctionPeriod(self): data = self.initial_data.copy() #tenderPeriod = data.pop('tenderPeriod') #data['auctionPeriod'] = {'startDate': tenderPeriod['endDate']} response = self.app.post_json('/auctions', {'data': data}) self.assertEqual(response.status, '201 Created') self.assertEqual(response.content_type, 'application/json') auction = response.json['data'] self.assertIn('tenderPeriod', auction) self.assertIn('auctionPeriod', auction) self.assertNotIn('startDate', auction['auctionPeriod']) self.assertEqual(parse_date(data['auctionPeriod']['startDate']).date(), parse_date(auction['auctionPeriod']['shouldStartAfter'], TZ).date()) if SANDBOX_MODE: auction_startDate = parse_date(data['auctionPeriod']['startDate'], None) if not auction_startDate.tzinfo: auction_startDate = TZ.localize(auction_startDate) tender_endDate = parse_date(auction['tenderPeriod']['endDate'], None) if not tender_endDate.tzinfo: tender_endDate = TZ.localize(tender_endDate) self.assertLessEqual((auction_startDate - tender_endDate).total_seconds(), 70) else: self.assertEqual(parse_date(auction['tenderPeriod']['endDate']).date(), parse_date(data['auctionPeriod']['startDate'], TZ).date() - timedelta(days=1)) self.assertEqual(parse_date(auction['tenderPeriod']['endDate']).time(), time(20, 0))
def test_instance_get_all_by_filters_changes_since(self): i1 = self.create_instance_with_args(updated_at= '2013-12-05T15:03:25.000000') i2 = self.create_instance_with_args(updated_at= '2013-12-05T15:03:26.000000') changes_since = iso8601.parse_date('2013-12-05T15:03:25.000000') result = db.instance_get_all_by_filters(self.ctxt, {'changes-since': changes_since}) self._assertEqualListsOfInstances([i1, i2], result) changes_since = iso8601.parse_date('2013-12-05T15:03:26.000000') result = db.instance_get_all_by_filters(self.ctxt, {'changes-since': changes_since}) self._assertEqualListsOfInstances([i2], result) db.instance_destroy(self.ctxt, i1['uuid']) filters = {} filters['changes-since'] = changes_since filters['marker'] = i1['uuid'] result = db.instance_get_all_by_filters(self.ctxt, filters) self._assertEqualListsOfInstances([i2], result)
def test_parse_no_timezone(): """issue 4 - Handle datetime string without timezone This tests what happens when you parse a date with no timezone. While not strictly correct this is quite common. I'll assume UTC for the time zone in this case. """ d = iso8601.parse_date("2007-01-01T08:00:00") assert d.year == 2007 assert d.month == 1 assert d.day == 1 assert d.hour == 8 assert d.minute == 0 assert d.second == 0 assert d.microsecond == 0 assert d.tzinfo == iso8601.UTC
def load_logs(self, start, end, log_filter): try: response = urllib.request.urlopen(self.get_url(start, end, log_filter)) except urllib.error.URLError as err: print("Error reading from network: %s" % err) return [] body = response.read().decode('utf-8') events = [] for line in body.split('\n'): match = re.match(r'.*?(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}' r'.\d{6}[-+]\d{2}:\d{2}).*path="([^"]+)".*', line) if match: path = self.filter_path(match.groups()[1]) when = iso8601.parse_date(match.groups()[0]) events.append((when, path)) return events
def as_dict(self): details = json.loads(self.details) name = "Anonymous Donor" if 'donorName' in details and details['donorName']: name = details['donorName'] datetime = iso8601.parse_date(details['createdOn']) info = { # general 'name': name, 'comment': details.get('message', "") or '', 'donation_amount': float(details['donationAmount']), 'currency': 'USD', # Display-friendly 'amount': "$%.2f" % details['donationAmount'], 'timestamp': datetime, } return info
def as_dict(self): details = json.loads(self.details) name = "Anonymous" amount = " ".join([str(details['amount']), details['currencyCode']]) if 'user' in details: name = details['user']['displayName'] elif 'username' in details: name = details['username'] timestamp = iso8601.parse_date(details['date']) info = { 'name': name, 'amount': amount, 'comment': details['note'], 'donation_amount': float(details['amount']), 'currency': details['currencyCode'], 'timestamp': timestamp, } return info
def parse_datetime(date): """ Validates date is in iso8601 format. Returns parsed datetime in UTC as as native datetime (tzinfo=None). """ if not isinstance(date, basestring): raise Invalid('date is not a string') try: return iso8601.parse_date(date).astimezone(iso8601.UTC).replace( tzinfo=None) except: raise Invalid('date is not in iso8601 format')
def test_sync_all_active_plans(): all_plans = get_fixture("rest.billingplan.all.active.json") models.BillingPlan.objects.sync_data(all_plans["plans"]) assert models.BillingPlan.objects.count() == len(all_plans["plans"]) for plan in all_plans["plans"]: plan_obj = get_fixture( "GET/v1/payments/billing-plans/{id}.json".format(id=plan["id"]) ) plan = models.BillingPlan.objects.get(id=plan_obj["id"]) assert plan.id == plan_obj["id"] assert plan.state == getattr(enums.BillingPlanState, plan_obj["state"]) assert plan.type == getattr(enums.BillingPlanType, plan_obj["type"]) assert plan.name == plan_obj["name"] assert plan.description == plan_obj["description"] assert plan.merchant_preferences == plan_obj["merchant_preferences"] assert plan.create_time == parse_date(plan_obj["create_time"]) assert plan.update_time == parse_date(plan_obj["update_time"]) for definition in plan_obj["payment_definitions"]: pd = models.PaymentDefinition.objects.get(id=definition["id"]) assert pd.id == definition["id"] assert pd.name == definition["name"] assert pd.type == getattr(enums.PaymentDefinitionType, definition["type"]) assert plan.payment_definitions.filter(id=pd.id).count() == 1
def test_sync_executed_billing_agreement(): ba = get_fixture("rest.billingagreement.execute.json") inst, created = models.BillingAgreement.get_or_update_from_api_data(ba, always_sync=True) assert created assert inst.id == ba["id"] assert inst.last_payment_date == parse_date("2017-08-24T11:47:17Z") assert inst.calculate_end_of_period() == parse_date("2017-09-24T11:47:17Z")
def to_python(self, value, timezone_in_use): if isinstance(value, datetime.datetime): return value.astimezone(pytz.utc) if value.tzinfo else value.replace(tzinfo=pytz.utc) if isinstance(value, datetime.date): return datetime.datetime(value.year, value.month, value.day, tzinfo=pytz.utc) if isinstance(value, int): return datetime.datetime.utcfromtimestamp(value).replace(tzinfo=pytz.utc) if isinstance(value, string_types): if value == '0000-00-00 00:00:00': return self.class_default if len(value) == 10: try: value = int(value) return datetime.datetime.utcfromtimestamp(value).replace(tzinfo=pytz.utc) except ValueError: pass try: # left the date naive in case of no tzinfo set dt = iso8601.parse_date(value, default_timezone=None) except iso8601.ParseError as e: raise ValueError(text_type(e)) # convert naive to aware if dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None: dt = timezone_in_use.localize(dt) return dt.astimezone(pytz.utc) raise ValueError('Invalid value for %s - %r' % (self.__class__.__name__, value))
def parse_isotime(timestr, default=None): """This duplicates oslo timeutils parse_isotime but with a @register.filter annotation and a silent fallback on error. """ try: return iso8601.parse_date(timestr) except (iso8601.ParseError, TypeError): return default or ''
def load_keys(args): """ Get the Facebook API keys. Order of precedence is command line, environment, config file. """ config = {} input_app_id = None input_app_secret = None input_short_access_token = None if args.config: config = load_config(args) if not config: input_app_id, input_app_secret, input_short_access_token = input_keys(args) if not input_short_access_token: save_config(args, input_app_id, input_app_secret) app_id = args.app_id or os.environ.get('APP_ID') or config.get('app_id') or input_app_id app_secret = args.app_secret or os.environ.get('APP_SECRET') or config.get('app_secret') or input_app_secret short_access_token = args.access_token or os.environ.get('ACCESS_TOKEN') or input_short_access_token long_access_token = config.get('access_token') expires_at = None if 'expires_at' in config: expires_at = iso8601.parse_date(config['expires_at']) if not (app_id and app_secret): sys.exit('App id and secret are required.') return app_id, app_secret, short_access_token, long_access_token, expires_at
def str_to_datetime(s): """ :param str s: :return datetime.datetime: """ return parse_date(s)
def parse_isotime(timestr): """Parse time from ISO 8601 format""" try: return iso8601.parse_date(timestr) except iso8601.ParseError as e: raise ValueError(e.message) except TypeError as e: raise ValueError(e.message)
def datetime_u(s): fmt = "%Y-%m-%dT%H:%M:%S" try: return _strptime(s, fmt) except ValueError: try: # strip zulu timezone suffix or utc offset if s[-1] == "Z" or (s[-3] == ":" and s[-6] in (' ', '-', '+')): try: import iso8601 return iso8601.parse_date(s) except ImportError: pass try: import isodate return isodate.parse_datetime(s) except ImportError: pass try: import dateutil.parser return dateutil.parser.parse(s) except ImportError: pass warnings.warn('removing unsupported "Z" suffix or UTC offset. Install `iso8601`, `isodate` or `python-dateutil` package to support it', RuntimeWarning) s = s[:-1] if s[-1] == "Z" else s[:-6] # parse microseconds try: return _strptime(s, fmt + ".%f") except: return _strptime(s, fmt) except ValueError: # strip microseconds (not supported in this platform) if "." in s: warnings.warn('removing unsuppported microseconds', RuntimeWarning) s = s[:s.index(".")] return _strptime(s, fmt)
def parse_isotime(timestr): """Parse time from ISO 8601 format.""" try: return iso8601.parse_date(timestr) except iso8601.ParseError as e: raise ValueError(six.text_type(e)) except TypeError as e: raise ValueError(six.text_type(e))
def parse_wit_datime(dt): value = dt['value'] return iso8601.parse_date(value)
def timestamp_from_string(str): print(str) if (str.startswith( 'last ', 0, 5 )): # sample queries: 1m; 1m,2s; 1d,2h,3m,4s query = Timeseries_query(str.split('last ')[1]) diff = datetime.timedelta(seconds=query.s, minutes=query.m, hours=query.h, days=query.d) return utcnow() - diff return iso8601.parse_date(str)
def get_context_data(self, **kwargs): api_kwargs = self.get_wp_api_kwargs(**kwargs) page = api_kwargs.get('page_number', 1) search = api_kwargs.get('search', '') blogs = WPApiConnector().get_posts(**api_kwargs) tags = WPApiConnector().get_tags(lang=self.blog_language) categories = WPApiConnector().get_categories(lang=self.blog_language) if 'server_error' in blogs or\ 'server_error' in tags: messages.add_message(self.request, messages.ERROR, blogs['server_error']) raise Http404 if not blogs['body']: raise Http404 for blog in blogs['body']: if blog['excerpt'] is not None: position = blog['excerpt'].find( 'Continue reading') if position != -1: blog['excerpt'] = blog['excerpt'][:position] blog['slug'] = str(blog['slug']) blog['bdate'] = iso8601.parse_date(blog['date']).date() context = { 'blogs': blogs['body'], 'tags': tags, 'categories': categories, 'search': search, 'total_posts': int(blogs['headers']['X-WP-Total']), 'total_pages': int(blogs['headers']['X-WP-TotalPages']), 'current_page': page, 'previous_page': page - 1, 'next_page': page + 1, } return context
def get_context_data(self, **kwargs): api_kwargs = self.get_wp_api_kwargs(**kwargs) page = api_kwargs.get('page_number', 1) search = api_kwargs.get('search', '') if not isinstance(page, int): page = 1 blogs = WPApiConnector().get_posts(**api_kwargs) tags = WPApiConnector().get_tags(lang=self.blog_language) categories = WPApiConnector().get_categories(lang=self.blog_language) if 'server_error' in blogs or\ 'server_error' in tags: raise Http404 if not blogs['body']: raise Http404 for blog in blogs['body']: if blog['excerpt'] is not None: position = blog['excerpt'].find( 'Continue reading') if position != -1: blog['excerpt'] = blog['excerpt'][:position] blog['slug'] = str(blog['slug']) blog['bdate'] = iso8601.parse_date(blog['date']).date() context = { 'blogs': blogs['body'], 'tags': tags, 'categories': categories, 'search': search, 'total_posts': int(blogs['headers']['X-WP-Total']), 'total_pages': int(blogs['headers']['X-WP-TotalPages']), 'current_page': page, 'previous_page': page - 1, 'next_page': page + 1, } return context
def parse_date_utc(date, milliseconds=True): """Parses dates from ISO8601 or Epoch formats to a standard datetime object. This is particularly useful since Habitica returns dates in two formats:: - iso8601 encoded strings - Long integer Epoch times Args: date (str): A date string in either iso8601 or Epoch format. milliseconds (bool): If True, then epoch times are treated as millisecond values, otherwise they are evaluated as seconds. Returns: datetime: The parsed date time in UTC. """ parsed_date = None try: parsed_date = iso8601.parse_date(date) except iso8601.ParseError: value = int(date) # utcfromtimestamp expects values in seconds if milliseconds: value /= 1000 parsed_date = datetime.utcfromtimestamp(value) return parsed_date.replace(tzinfo=pytz.utc)
def get(self, bucket_id): args = request.args limit = int(args["limit"]) if "limit" in args else 100 start = iso8601.parse_date(args["start"]) if "start" in args else None end = iso8601.parse_date(args["end"]) if "end" in args else None events = app.api.get_events(bucket_id, limit=limit, start=start, end=end) return events, 200 # TODO: How to tell expect that it could be a list of events? Until then we can't use validate.
def get(self, viewname): args = request.args start = iso8601.parse_date(args["start"]) if "start" in args else None end = iso8601.parse_date(args["end"]) if "end" in args else None result = app.api.query_view(viewname, start, end) return result, 200
def to_native(self, value, context=None): if isinstance(value, datetime): return value try: date = parse_date(value, None) if not date.tzinfo: date = TZ.localize(date) return date except ParseError: raise ConversionError(self.messages['parse'].format(value)) except OverflowError as e: raise ConversionError(e.message)
def get(self, request): return Response({ "datetime": iso8601.parse_date(datetime.utcnow().isoformat()) })
def deserialize_primitive(cls, data): if cls is datetime.datetime: try: d = parse_date(data) except iso8601.ParseError: raise ValueError("'{}' is not a datetime.".format(data)) elif cls is datetime.date: try: d = parse_date(data).date() except iso8601.ParseError: raise ValueError("'{}' is not a date.".format(data)) elif cls in {int, float, uuid.UUID, bool}: d = cls(data) elif cls is numbers.Integral: d = data elif cls is decimal.Decimal: try: d = cls(data) except decimal.InvalidOperation: raise ValueError("'{}' is not a decimal.".format(data)) elif cls is text_type: if not isinstance(data, text_type): raise ValueError("'{}' is not a string.".format(data)) d = cls(data) else: raise TypeError( "'{0}' is not a primitive type.".format(typing._type_repr(cls)) ) return d
def _convert_dates(cls, data): # NOTE(sileht): browse to aggregates measures dict tree and convert # date when we found timeseries, dict can looks like # {"aggregated": ...}, {"metric_id": {"agg": ...}} or # {"resource_id": {"metric_name": {"agg": ...}}} for key in data: if isinstance(data[key], list): data[key] = [(iso8601.parse_date(ts), g, value) for ts, g, value in data[key]] elif isinstance(data[key], dict): cls._convert_dates(data[key]) else: raise RuntimeError("Unexpected aggregates API output %s" % data[key])