我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用pytz.UTC。
def __init__(self): Analyzer.__init__(self) # Get config parameters self.path = self.getParam('config.blocklistpath', None, 'No path to blocklists provided.') self.ignoreolderthandays = self.getParam('config.ignoreolderthandays', 365) self.utc = pytz.UTC self.now = dt.datetime.now(tz=self.utc) # Check if directory exists if not os.path.exists(self.path): os.mkdir(self.path, 0700) # Downloading/updating the list is implemented with an external cronjob which git pulls the repo # Read files in the given path and prepare file lists for ip- and netsets files = os.listdir(self.path) self.ipsets = [] self.netsets = [] for file in files: if '.ipset' in file: self.ipsets.append(file) elif '.netset' in file: self.netsets.append(file)
def test_multibuy_hint_two_buys_applicable(self): member = Member.objects.get(username="jokke") coke = Product.objects.create( name="coke", price=100, active=True ) with freeze_time(timezone.datetime(2018, 1, 1)) as frozen_time: for i in range(1, 3): Sale.objects.create( member=member, product=coke, price=100, ) frozen_time.tick() give_multibuy_hint, sale_hints = stregsystem_views._multibuy_hint(timezone.datetime(2018, 1, 1, tzinfo=pytz.UTC), member) self.assertTrue(give_multibuy_hint) self.assertEqual(sale_hints, "{} {}:{}".format("jokke", coke.id, 2))
def _hist_data_req_to_args(self, req: object): """Convert marketdata.HistDataReq to IB arguments. """ assert req.DataType.upper() in IB_HIST_DATA_TYPES,\ 'Invalid IB data type requested: %s' % req.DataType contract = self._hist_data_req_to_contract(req) endDateTime = pytz.UTC.normalize( req.TimeEnd).strftime('%Y%m%d %H:%M:%S %Z') durationStr = timedur_to_IB(req.TimeDur) barSizeSetting = barsize_to_IB(req.BarSize) whatToShow = req.DataType.upper() useRTH = True if barSizeSetting in ('1 day', '1W', '1M') else False formatDate = 2 # enforce UTC output keepUpToDate = False chartOptions = None return (contract, endDateTime, durationStr, barSizeSetting, whatToShow, useRTH, formatDate, keepUpToDate, chartOptions)
def req_hist_data_async(self, *req_list: [object]): """ Concurrently downloads historical market data for multiple requests. """ ibparms_list = (self._hist_data_req_to_args(req) for req in req_list) bars_list = await asyncio.gather(*( self.reqHistoricalDataAsync(*ibparms) for ibparms in ibparms_list)) df_list = [ib_insync.util.df(bars) for bars in bars_list] xchg_tz_list = await asyncio.gather(*( self.hist_data_req_timezone(req) for req in req_list)) blk_list = [] for req, df, xchg_tz in zip(req_list, df_list, xchg_tz_list): _logger.debug(df.iloc[:3]) if req.BarSize[-1] in ('d', 'W', 'M'): # not intraday dl_tz = xchg_tz # dates without timezone, init with xchg_tz. else: dl_tz = pytz.UTC blk = MarketDataBlock(df, symbol=req.Symbol, datatype=req.DataType, barsize=req.BarSize, tz=dl_tz) blk.tz_convert(xchg_tz) blk_list.append(blk) return blk_list
def query_hist_data( engine: object, sectype: str, symbol: str, datatype: str, barsize: str, start: datetime=None, end: datetime=None) -> MarketDataBlock: """Query database on conditions. """ if start is None: start = pytz.UTC.localize(datetime(1, 1, 1)) if end is None: end = pytz.UTC.localize(datetime(9999, 12, 31, 23, 59, 59)) table = _gen_sa_table(sectype) stmt = table.select().where( and_( table.c.Symbol == symbol, table.c.DataType == datatype, table.c.BarSize == barsize, table.c.TickerTime.between( start.astimezone(pytz.UTC), end.astimezone(pytz.UTC)) ) ) async with engine.acquire() as conn: result = await conn.execute(stmt) df = pd.DataFrame(list(result), columns=table.columns.keys()) blk = MarketDataBlock(df, tz='UTC') blk.tz_convert(start.tzinfo) return blk
def get_signature(self, uri, method='GET', body=''): """Return a dictionary with the API key and API get_signature to be sent for the given request.""" # What time is it now? timestamp = timegm(datetime.datetime.now(tz=pytz.UTC).timetuple()) # Calculate the base string to use for the signature. base_string = unicode(''.join(( self.auth_key.secret, unicode(timestamp), method.upper(), uri, body, ))).encode('utf-8') # Return a dictionary with the headers to send. return { 'HTTP_X_API_KEY': self.auth_key.auth_key, 'HTTP_X_API_SIGNATURE': sha1(base_string).hexdigest(), 'HTTP_X_API_TIMESTAMP': timestamp, }
def get_dates_query(args): dates = args.get('dates', []) query = None for date in dates: temporal_filter = { 'temporal_filter': { 'start_end_dates': { 'start_date': date, # will be transformed to 00:00:00 UTC 'end_date': date # will be transformed to 23:59:59 UTC } } } query = or_op( query, start_end_date_query( None, **temporal_filter)) return query
def get_start_end_dates(args): dates = args.get('dates') if dates: start, end = dates[0], dates[-1] else: start, end = get_dates_range(args) if start: start = datetime.datetime.combine( start, datetime.time(0, 0, 0, tzinfo=pytz.UTC)) if end: end = datetime.datetime.combine( end, datetime.time(23, 59, 59, tzinfo=pytz.UTC)) return start, end
def get_dates_range_query(args): dates = args.get('datesRange') if not dates: return None start, end = get_dates_range(args) temporal_filter = { 'temporal_filter': { 'start_end_dates': { 'start_date': start, # will be transformed to 00:00:00 UTC 'end_date': end # will be transformed to 23:59:59 UTC } } } query = start_end_date_query(None, **temporal_filter) return query
def _init_values(self, **args): #get validated data (Filter) start_end_dates = args.get('temporal_filter', {}).get('start_end_dates', {}) if self.start_value is None: start_date = start_end_dates.get('start_date', None) self.start_value = ((start_date is not None) and start_date) or\ datetime.datetime.now(tz=pytz.UTC) self.start_value = datetime.datetime.combine( self.start_value, datetime.time(0, 0, 0, tzinfo=pytz.UTC)) if self.end_value is None: end_date = start_end_dates.get('end_date', None) if end_date: self.end_value = datetime.datetime.combine( end_date, datetime.time(23, 59, 59, tzinfo=pytz.UTC)) elif 'ignore_end_date' not in args: site = get_site_folder(True) days_visibility = getattr(site, 'days_visibility', DEFAULT_DAYS_VISIBILITY) default_date = datetime.timedelta(days_visibility-1) + \ self.start_value self.end_value = default_date
def test_list_date_to_dates(self): dates_str = "Du 7 juin 2016 de 12h10 à 13h30 au 30 juin 2016 de 10h10 à 11h25" dates = self.get_dates_intervals(dates_str) self.assertEqual(len(dates), 2) self.assertIn('201667', dates) self.assertIn('2016630', dates) d1 = dates['201667'] d2 = dates['2016630'] self.assertEqual(len(d1), 1) self.assertEqual(len(d2), 1) d1_start = d1[0]['start'] d1_end = d1[0]['end'] d2_start = d2[0]['start'] d2_end = d2[0]['end'] self.assertEqual(d1_start, datetime.datetime(2016, 6, 7, 12, 10, tzinfo=pytz.UTC)) self.assertEqual(d1_end, datetime.datetime(2016, 6, 7, 13, 30, tzinfo=pytz.UTC)) self.assertEqual(d2_start, datetime.datetime(2016, 6, 30, 10, 10, tzinfo=pytz.UTC)) self.assertEqual(d2_end, datetime.datetime(2016, 6, 30, 11, 25, tzinfo=pytz.UTC))
def update(self): self.execute(None) objects = self.context.files default = datetime.datetime.now(tz=pytz.UTC) objects = sorted(objects, key=lambda e: getattr(e, 'modified_at', default), reverse=True) batch = Batch(objects, self.request, default_size=BATCH_DEFAULT_SIZE) batch.target = "#results_files" len_result = batch.seqlen result_body = [] for obj in batch: object_values = {'object': obj} body = self.content(args=object_values, template=obj.templates['default'])['body'] result_body.append(body) result = {} values = {'bodies': result_body, 'length': len_result, 'batch': batch} body = self.content(args=values, template=self.template)['body'] item = self.adapt_item(body, self.viewid) result['coordinates'] = {self.coordinates: [item]} return result
def update(self): self.execute(None) root = getSite() objects = root.services_definition default = datetime.datetime.now(tz=pytz.UTC) objects = sorted(objects, key=lambda e: getattr(e, 'modified_at', default), reverse=True) result_servicesbody = [] for obj in objects: object_values = {'object': obj, 'state': None} body = self.content(args=object_values, template=obj.templates['default'])['body'] result_servicesbody.append(body) result = {} values = { 'services': result_servicesbody, 'row_len_services': math.ceil(len(objects)/4) } body = self.content(args=values, template=self.template)['body'] item = self.adapt_item(body, self.viewid) result['coordinates'] = {self.coordinates: [item]} return result
def get_user_alerts(self): user = get_current() site = get_site_folder(True, self.request) objects = getattr(user, 'alerts', []) objects = [a for a in objects if a.__parent__ is site] now = datetime.datetime.now(tz=pytz.UTC) objects = sorted( objects, key=lambda e: getattr(e, 'modified_at', now), reverse=True) result_body = [] for obj in objects: render_dict = { 'object': obj, 'current_user': user } body = self.content(args=render_dict, template=obj.get_templates()['small'])['body'] result_body.append(body) values = {'bodies': result_body} body = self.content(args=values, template=self.alert_template)['body'] return {'body': body}
def review_deserializer(args): artists = args.get('artists', []) args['artists'] = merge_artists(artists) zone = pytz.UTC.zone if 'created_at' in args: args['created_at'] = datetime.datetime.strptime( args['created_at'], '%Y-%m-%d %H:%M:%S %Z (%z)') if not args['created_at'].tzinfo or \ args['created_at'].tzinfo.tzname(args['created_at']) != zone: args['created_at'] = args['created_at'].replace(tzinfo=pytz.UTC) if 'modified_at' in args: args['modified_at'] = datetime.datetime.strptime( args['modified_at'], '%Y-%m-%d %H:%M:%S %Z (%z)') if not args['modified_at'].tzinfo or \ args['modified_at'].tzinfo.tzname(args['modified_at']) != zone: args['modified_at'] = args['modified_at'].replace(tzinfo=pytz.UTC) return args
def cinema_review_deserializer(args): artists = args.get('artists', []) args['artists'] = merge_artists(artists) directors = args.get('directors', []) args['directors'] = merge_artists(directors) for director in args['directors']: director.is_director = True zone = pytz.UTC.zone if 'created_at' in args: args['created_at'] = datetime.datetime.strptime( args['created_at'], '%Y-%m-%d %H:%M:%S %Z (%z)') if not args['created_at'].tzinfo or \ args['created_at'].tzinfo.tzname(args['created_at']) != zone: args['created_at'] = args['created_at'].replace(tzinfo=pytz.UTC) if 'modified_at' in args: args['modified_at'] = datetime.datetime.strptime( args['modified_at'], '%Y-%m-%d %H:%M:%S %Z (%z)') if not args['modified_at'].tzinfo or \ args['modified_at'].tzinfo.tzname(args['modified_at']) != zone: args['modified_at'] = args['modified_at'].replace(tzinfo=pytz.UTC) return args
def generate_ical_rec(event, schedule, start_date, end_date, to_exclude=[], tzinfo=pytz.UTC): ical_events = [] periods = list_date_to_dates(start_date, True, tzinfo)[1] end_dates = list_date_to_dates(end_date, True, tzinfo)[1] until = end_dates[-1]['end'] exdate = [datetime.datetime(l[0], l[1], l[2], tzinfo=tzinfo) for l in to_exclude] for index, period in enumerate(periods): ical_event = Event() ical_event.add('summary', event.title) ical_event.add('dtstart', period['start']) ical_event.add('dtend', period['end']) ical_event.add('rrule', {'freq': 'daily', 'until': until}) if exdate: ical_event.add('exdate', exdate) ical_event['uid'] = str(get_oid(schedule)) + str(index) + '/' + 'lac.org' ical_events.append(ical_event) return ical_events
def start(self, context, request, appstruct, **kw): changepassword = appstruct['change_password']['changepassword'] current_user_password = appstruct['change_password']['currentuserpassword'] user = get_current() if changepassword and user.check_password(current_user_password): password = appstruct['change_password']['password'] context.set_password(password) context.set_title() name = name_chooser(name=context.title) if not context.name.startswith(name): principals = find_service(getSite(), 'principals') context.name = name_chooser(principals['users'], name=name) context.modified_at = datetime.datetime.now(tz=pytz.UTC) context.reindex() request.registry.notify(ActivityExecuted(self, [context], user)) return {}
def start(self, context, request, appstruct, **kw): root = getSite() sites = root.site_folders now = datetime.datetime.combine( datetime.datetime.utcnow(), datetime.time(23, 59, 59, tzinfo=pytz.UTC)) for site in sites: automatic_newsletters = [n for n in site.newsletters if getattr(n, 'recurrence', False) and now >= n.get_sending_date() and n.can_send()] for newsletter in automatic_newsletters: send_newsletter_content(newsletter, request) log.info('Send: '+site.title+'->'+newsletter.title) return {}
def start(self, context, request, appstruct, **kw): user = get_current() artists, new_artists = extract_artists( appstruct.get('artists', []), request) context.setproperty('artists', artists) context.modified_at = datetime.datetime.now(tz=pytz.UTC) context.set_metadata(appstruct) if 'published' in context.state: not_published_artists = [a for a in context.artists if 'published' not in a.state] for artist in not_published_artists: publish_artist(artist, request, user) context.reindex() new_objects = new_artists new_objects.append(context) request.registry.notify(ActivityExecuted(self, new_objects, user)) return {}
def start(self, context, request, appstruct, **kw): site_services = context.get_all_services(validate=False, delegation=False) user = get_current() if 'moderation' in site_services: author = getattr(context, 'author', user) moderations = site_services['moderation'] for moderation in moderations: moderation.unsubscribe(context, author, service=moderation) remove_empty_orders(author) context.sumited_to = PersistentList([]) context.state = PersistentList(['editable']) context.modified_at = datetime.datetime.now(tz=pytz.UTC) context.reindex() request.registry.notify(ActivityExecuted(self, [context], user)) return {}
def start(self, context, request, appstruct, **kw): artists, new_artists = extract_artists( appstruct.pop('artists', []), request) directors, new_directors = extract_artists( appstruct.pop('directors', []), request, is_directors=True) appstruct.pop('artists_ids') appstruct.pop('directors_ids') if appstruct.get('picture', None) is not None and \ OBJECT_DATA in appstruct['picture']: appstruct['picture'] = appstruct['picture'][OBJECT_DATA] if not getattr(appstruct['picture'], '__name__', None): appstruct['picture'].__name__ = 'picture' context.set_metadata(appstruct) context.set_data(appstruct) context.setproperty('artists', artists) context.setproperty('directors', directors) context.modified_at = datetime.datetime.now(tz=pytz.UTC) context.reindex() objects = [context] objects.extend(new_artists) objects.extend(new_directors) request.registry.notify(ActivityExecuted(self, objects, get_current())) return {}
def start(self, context, request, appstruct, **kw): user = get_current() if not hasattr(user, 'customeraccount') and\ context.author: user = context.author if '_csrf_token_' in appstruct: appstruct.pop('_csrf_token_') service = self.service(**appstruct) service.configure(context, user) service.subscribe(context, user) service.setproperty('delegate', user) context.modified_at = datetime.datetime.now(tz=pytz.UTC) service.reindex() return {}
def start(self, context, request, appstruct, **kw): user = get_current() if not hasattr(user, 'customeraccount') and\ context.customer: user = context.customer.user if '_csrf_token_' in appstruct: appstruct.pop('_csrf_token_') service = self.service(**appstruct) service.configure(context, user) service.subscribe(context, user) service.setproperty('delegate', user) context.modified_at = datetime.datetime.now(tz=pytz.UTC) service.reindex() return {}
def renew_state_validation(process, context): type_ = context.subscription.get('subscription_type') if type_ != 'subscription': return False if context.is_expired(): return True end_date = getattr(context, 'end_date', None) if end_date: now = datetime.datetime.now(tz=pytz.UTC) alert_date = (end_date - datetime.timedelta( days=2)).replace(tzinfo=pytz.UTC) return now >= alert_date return False
def start(self, context, request, appstruct, **kw): context.state = PersistentList(['active']) today = datetime.datetime.now(tz=pytz.UTC) end_date = context.end_date.replace(tzinfo=pytz.UTC) start_date = today if today > end_date else end_date context.end_date = (datetime.timedelta(days=30) + \ start_date).replace(tzinfo=pytz.UTC) context.modified_at = today if isinstance(context.perimeter, SiteFolder): grant_roles( context.delegate, roles=(('Moderator', context.perimeter),)) context.delegate.reindex() context.reindex() return {}
def start(self, context, request, appstruct, **kw): context.modified_at = datetime.datetime.now(tz=pytz.UTC) if isinstance(context.perimeter, SiteFolder): revoke_roles( context.delegate, roles=(('Moderator', context.perimeter),), root=context.perimeter) context.delegate.reindex() context.delegate = appstruct['delegate'] grant_roles( context.delegate, roles=(("Moderator", context.perimeter),)) else: context.delegate = appstruct['delegate'] context.delegate.reindex() context.reindex() return {}
def start(self, context, request, appstruct, **kw): site_folder = appstruct['_object_data'] site_folder.modified_at = datetime.datetime.now(tz=pytz.UTC) site_folder.filters = PersistentList( getattr(site_folder, 'filters', [])) filters = getattr(site_folder, 'filters', []) root = getSite() for filter_ in filters: sources = filter_.get('other_filter', {}).get('sources', []) if sources and 'self' in sources: sources_ = list(sources) sources_.remove('self') sources_.append(str(get_oid(site_folder))) filter_['other_filter']['sources'] = list(set(sources_)) tree = filter_.get('metadata_filter', {}).get('tree', None) if tree: site_folder.merge_tree(tree) root.merge_tree(tree) site_folder.reindex() return {}
def refresh_user_token(user_social): """ Utility function to refresh the access token if is (almost) expired Args: user_social (UserSocialAuth): a user social auth instance """ try: last_update = datetime.fromtimestamp(user_social.extra_data.get('updated_at'), tz=pytz.UTC) expires_in = timedelta(seconds=user_social.extra_data.get('expires_in')) except TypeError: _send_refresh_request(user_social) return # small error margin of 5 minutes to be safe error_margin = timedelta(minutes=5) if now_in_utc() - last_update >= expires_in - error_margin: _send_refresh_request(user_social)
def parse_datetime(dt_string): """ Attempts to parse a datetime string with any one of the datetime formats that we expect from Pearson Args: dt_string (str): datetime string to be parsed Returns: datetime.datetime: parsed datetime Raises: UnparsableRowException: Thrown if the datetime string cannot be parsed with any of the accepted formats """ for dt_format in PEARSON_DATETIME_FORMATS: try: return datetime.strptime(dt_string, dt_format).replace(tzinfo=pytz.UTC) except ValueError: pass raise UnparsableRowException('Unparsable datetime: {}'.format(dt_string))
def __init__(self, regr, key, meta=None): self.key = key self.regr = regr self.meta = self.Meta( # pyrfc3339 drops microseconds, make sure __eq__ is sane creation_dt=datetime.datetime.now( tz=pytz.UTC).replace(microsecond=0), creation_host=socket.getfqdn()) if meta is None else meta self.id = hashlib.md5( self.key.key.public_key().public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo) ).hexdigest() # Implementation note: Email? Multiple accounts can have the # same email address. Registration URI? Assigned by the # server, not guaranteed to be stable over time, nor # canonical URI can be generated. ACME protocol doesn't allow # account key (and thus its fingerprint) to be updated...
def add_time_interval(base_time, interval, textparser=parsedatetime.Calendar()): """Parse the time specified time interval, and add it to the base_time The interval can be in the English-language format understood by parsedatetime, e.g., '10 days', '3 weeks', '6 months', '9 hours', or a sequence of such intervals like '6 months 1 week' or '3 days 12 hours'. If an integer is found with no associated unit, it is interpreted by default as a number of days. :param datetime.datetime base_time: The time to be added with the interval. :param str interval: The time interval to parse. :returns: The base_time plus the interpretation of the time interval. :rtype: :class:`datetime.datetime`""" if interval.strip().isdigit(): interval += " days" # try to use the same timezone, but fallback to UTC tzinfo = base_time.tzinfo or pytz.UTC return textparser.parseDT(interval, base_time, tzinfo=tzinfo)[0]
def test_to_local_timezone(self): i18n.get_i18n().set_timezone('US/Eastern') format = '%Y-%m-%d %H:%M:%S %Z%z' # Test datetime with timezone set base = datetime.datetime(2002, 10, 27, 6, 0, 0, tzinfo=pytz.UTC) localtime = i18n.to_local_timezone(base) result = localtime.strftime(format) self.assertEqual(result, '2002-10-27 01:00:00 EST-0500') # Test naive datetime - no timezone set base = datetime.datetime(2002, 10, 27, 6, 0, 0) localtime = i18n.to_local_timezone(base) result = localtime.strftime(format) self.assertEqual(result, '2002-10-27 01:00:00 EST-0500')
def test_to_utc(self): i18n.get_i18n().set_timezone('US/Eastern') format = '%Y-%m-%d %H:%M:%S' # Test datetime with timezone set base = datetime.datetime(2002, 10, 27, 6, 0, 0, tzinfo=pytz.UTC) localtime = i18n.to_utc(base) result = localtime.strftime(format) self.assertEqual(result, '2002-10-27 06:00:00') # Test naive datetime - no timezone set base = datetime.datetime(2002, 10, 27, 6, 0, 0) localtime = i18n.to_utc(base) result = localtime.strftime(format) self.assertEqual(result, '2002-10-27 11:00:00')
def format_datetime(dttm): # 1. Convert to timezone-aware # 2. Convert to UTC # 3. Format in ISO format # 4. Add subsecond value if non-zero # 5. Add "Z" if dttm.tzinfo is None or dttm.tzinfo.utcoffset(dttm) is None: # dttm is timezone-naive; assume UTC zoned = pytz.utc.localize(dttm) else: zoned = dttm.astimezone(pytz.utc) ts = zoned.strftime("%Y-%m-%dT%H:%M:%S") if zoned.microsecond > 0: ms = zoned.strftime("%f") ts = ts + '.' + ms.rstrip("0") return ts + "Z"
def __init__(self, test_id, status, message, timestamp=None): """ Parametrized constructor for the TestProgress model :param test_id: The value of the 'test_id' field of TestProgress model :type test_id: int :param status: The value of the 'status' field of TestProgress model :type status: TestStatus :param message: The value of the 'message' field of TestProgress model :type message: str :param timestamp: The value of the 'timestamp' field of TestProgress model (None by default) :type timestamp: datetime """ self.test_id = test_id self.status = status tz = get_localzone() if timestamp is None: timestamp = tz.localize(datetime.datetime.now(), is_dst=None) timestamp = timestamp.astimezone(pytz.UTC) if timestamp.tzinfo is None: timestamp = pytz.utc.localize(timestamp, is_dst=None) self.timestamp = timestamp self.message = message
def get_gsod_data(self, station, year): filename_format = '/pub/data/gsod/{year}/{station}-{year}.op.gz' lines = self._retreive_file_lines(filename_format, station, year) dates = pd.date_range("{}-01-01 00:00".format(year), "{}-12-31 00:00".format(year), freq='D', tz=pytz.UTC) series = pd.Series(None, index=dates, dtype=float) for line in lines[1:]: columns = line.split() date_str = columns[2].decode('utf-8') temp_F = float(columns[3]) temp_C = (5. / 9.) * (temp_F - 32.) dt = pytz.UTC.localize(datetime.strptime(date_str, "%Y%m%d")) series[dt] = temp_C return series
def get_isd_data(self, station, year): filename_format = '/pub/data/noaa/{year}/{station}-{year}.gz' lines = self._retreive_file_lines(filename_format, station, year) dates = pd.date_range("{}-01-01 00:00".format(year), "{}-12-31 23:00".format(int(year) + 1), freq='H', tz=pytz.UTC) series = pd.Series(None, index=dates, dtype=float) for line in lines: if line[87:92].decode('utf-8') == "+9999": temp_C = float("nan") else: temp_C = float(line[87:92]) / 10. date_str = line[15:27].decode('utf-8') # there can be multiple readings per hour, so set all to minute 0 dt = pytz.UTC.localize(datetime.strptime(date_str, "%Y%m%d%H%M")).replace(minute=0) # only set the temp if it's the first encountered in the hour. if pd.isnull(series.ix[dt]): series[dt] = temp_C return series
def to_records(self, df): records = [] if df.shape[0] > 0: records.append({ "end": pytz.UTC.localize(df.index[0].to_pydatetime()), "value": np.nan, "estimated": False, }) for e, v, est in zip(df.index[1:], df.value, df.estimated): records.append({ "end": pytz.UTC.localize(e.to_pydatetime()), "value": v, "estimated": bool(est), }) return records
def test_basic_usage(): start_date = datetime(2000, 1, 1, tzinfo=pytz.UTC) end_date = datetime(2001, 1, 1, tzinfo=pytz.UTC) interventions = [Intervention(start_date, end_date)] mps = get_modeling_period_set(interventions) groups = list(mps.iter_modeling_period_groups()) assert len(groups) == 1 labels, periods = groups[0] assert labels == ('baseline', 'reporting') assert periods[0].start_date is None assert periods[0].end_date is start_date assert periods[1].start_date is end_date assert periods[1].end_date is None modeling_periods = list(mps.iter_modeling_periods()) assert len(modeling_periods) == 2
def test_multiple_records_no_gap(serializer): records = [ { "start": datetime(2000, 1, 1, tzinfo=pytz.UTC), "end": datetime(2000, 1, 2, tzinfo=pytz.UTC), "value": 1, }, { "start": datetime(2000, 1, 2, tzinfo=pytz.UTC), "end": datetime(2000, 1, 3, tzinfo=pytz.UTC), "value": 2, }, ] df = serializer.to_dataframe(records) assert df.value[datetime(2000, 1, 1, tzinfo=pytz.UTC)] == 1 assert not df.estimated[datetime(2000, 1, 1, tzinfo=pytz.UTC)] assert df.value[datetime(2000, 1, 2, tzinfo=pytz.UTC)] == 2 assert not df.estimated[datetime(2000, 1, 2, tzinfo=pytz.UTC)] assert pd.isnull(df.value[datetime(2000, 1, 3, tzinfo=pytz.UTC)]) assert not df.estimated[datetime(2000, 1, 3, tzinfo=pytz.UTC)]
def test_multiple_records_with_gap(serializer): records = [ { "start": datetime(2000, 1, 1, tzinfo=pytz.UTC), "end": datetime(2000, 1, 2, tzinfo=pytz.UTC), "value": 1, }, { "start": datetime(2000, 1, 3, tzinfo=pytz.UTC), "end": datetime(2000, 1, 4, tzinfo=pytz.UTC), "value": 2, }, ] df = serializer.to_dataframe(records) assert df.value[datetime(2000, 1, 1, tzinfo=pytz.UTC)] == 1 assert not df.estimated[datetime(2000, 1, 1, tzinfo=pytz.UTC)] assert pd.isnull(df.value[datetime(2000, 1, 2, tzinfo=pytz.UTC)]) assert not df.estimated[datetime(2000, 1, 2, tzinfo=pytz.UTC)] assert df.value[datetime(2000, 1, 3, tzinfo=pytz.UTC)] == 2 assert not df.estimated[datetime(2000, 1, 3, tzinfo=pytz.UTC)] assert pd.isnull(df.value[datetime(2000, 1, 4, tzinfo=pytz.UTC)]) assert not df.estimated[datetime(2000, 1, 4, tzinfo=pytz.UTC)]
def test_multiple_records(serializer): records = [ { "start": datetime(2000, 1, 1, tzinfo=pytz.UTC), "value": 1, }, { "start": datetime(2000, 1, 2, tzinfo=pytz.UTC), "value": 2, }, ] df = serializer.to_dataframe(records) assert df.value[datetime(2000, 1, 1, tzinfo=pytz.UTC)] == 1 assert not df.estimated[datetime(2000, 1, 1, tzinfo=pytz.UTC)] assert pd.isnull(df.value[datetime(2000, 1, 2, tzinfo=pytz.UTC)]) assert not df.estimated[datetime(2000, 1, 2, tzinfo=pytz.UTC)]
def test_to_records(serializer): data = {"value": [1, np.nan], "estimated": [True, False]} columns = ["value", "estimated"] index = pd.date_range('2000-01-01', periods=2, freq='D') df = pd.DataFrame(data, index=index, columns=columns) records = serializer.to_records(df) assert len(records) == 2 assert records[0]["start"] == datetime(2000, 1, 1, tzinfo=pytz.UTC) assert records[0]["value"] == 1 assert records[0]["estimated"] assert records[1]["start"] == datetime(2000, 1, 2, tzinfo=pytz.UTC) assert pd.isnull(records[1]["value"]) assert not records[1]["estimated"]
def test_sales_to_user_in_period(self): res = views._sales_to_user_in_period( self.alan.username, timezone.datetime(2017, 2, 1, 0, 0, tzinfo=pytz.UTC), timezone.datetime(2017, 2, 17, 0, 0, tzinfo=pytz.UTC), [self.flan.id, self.flanmad.id], {self.flan.name: 0, self.flanmad.name: 0}, ) self.assertEqual(2, res[self.flan.name]) self.assertEqual(1, res[self.flanmad.name])
def test_sales_to_user_no_results_out_of_period(self): res = views._sales_to_user_in_period( self.bob.username, timezone.datetime(2017, 2, 1, 0, 0, tzinfo=pytz.UTC), timezone.datetime(2017, 2, 17, 0, 0, tzinfo=pytz.UTC), [self.flan.id, self.flanmad.id], {self.flan.name: 0, self.flanmad.name: 0}, ) self.assertEqual(0, res[self.flan.name]) self.assertEqual(0, res[self.flanmad.name])