我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pytz.utc()。
def datetime_from_simple_time(cls, el, datetime_field): """ The function takes a datetime_fieldname and returns a datetime aware of the timezone. Returns None if fields do not exist in el. """ if (datetime_field not in el or el[datetime_field] is None or el[datetime_field] == '0000-00-00 00:00:00' or el[datetime_field] == ''): return None else: return timezone.make_aware( datetime.datetime.strptime(u"{} UTC".format(el[datetime_field]), "%Y-%m-%d %H:%M:%S %Z"), pytz.utc)
def _create_daily_stats(self, perfs): # create daily and cumulative stats dataframe daily_perfs = [] # TODO: the loop here could overwrite expected properties # of daily_perf. Could potentially raise or log a # warning. for perf in perfs: if 'daily_perf' in perf: perf['daily_perf'].update( perf['daily_perf'].pop('recorded_vars') ) perf['daily_perf'].update(perf['cumulative_risk_metrics']) daily_perfs.append(perf['daily_perf']) else: self.risk_report = perf daily_dts = [np.datetime64(perf['period_close'], utc=True) for perf in daily_perfs] daily_stats = pd.DataFrame(daily_perfs, index=daily_dts) return daily_stats
def on_dt_changed(self, dt): """ Callback triggered by the simulation loop whenever the current dt changes. Any logic that should happen exactly once at the start of each datetime group should happen here. """ assert isinstance(dt, datetime), \ "Attempt to set algorithm's current time with non-datetime" assert dt.tzinfo == pytz.utc, \ "Algorithm expects a utc datetime" self.datetime = dt self.perf_tracker.set_date(dt) self.blotter.set_date(dt)
def create_test_panel_ohlc_source(sim_params, env): start = sim_params.first_open \ if sim_params else pd.datetime(1990, 1, 3, 0, 0, 0, 0, pytz.utc) end = sim_params.last_close \ if sim_params else pd.datetime(1990, 1, 8, 0, 0, 0, 0, pytz.utc) index = env.days_in_range(start, end) price = np.arange(0, len(index)) + 100 high = price * 1.05 low = price * 0.95 open_ = price + .1 * (price % 2 - .5) volume = np.ones(len(index)) * 1000 arbitrary = np.ones(len(index)) df = pd.DataFrame({'price': price, 'high': high, 'low': low, 'open': open_, 'volume': volume, 'arbitrary': arbitrary}, index=index) panel = pd.Panel.from_dict({0: df}) return DataPanelSource(panel), panel
def _coerce_datetime(maybe_dt): if isinstance(maybe_dt, datetime.datetime): return maybe_dt elif isinstance(maybe_dt, datetime.date): return datetime.datetime( year=maybe_dt.year, month=maybe_dt.month, day=maybe_dt.day, tzinfo=pytz.utc, ) elif isinstance(maybe_dt, (tuple, list)) and len(maybe_dt) == 3: year, month, day = maybe_dt return datetime.datetime( year=year, month=month, day=day, tzinfo=pytz.utc, ) else: raise TypeError('Cannot coerce %s into a datetime.datetime' % type(maybe_dt).__name__)
def test_generator_dates(self): """ Ensure the pipeline of generators are in sync, at least as far as their current dates. """ sim_params = factory.create_simulation_parameters( start=datetime(2011, 7, 30, tzinfo=pytz.utc), end=datetime(2012, 7, 30, tzinfo=pytz.utc), env=self.env, ) algo = TestAlgo(self, sim_params=sim_params, env=self.env) trade_source = factory.create_daily_trade_source( [8229], sim_params, env=self.env, ) algo.set_sources([trade_source]) gen = algo.get_generator() self.assertTrue(list(gen)) self.assertTrue(algo.slippage.latest_date) self.assertTrue(algo.latest_date)
def test_progress(self): """ Ensure the pipeline of generators are in sync, at least as far as their current dates. """ sim_params = factory.create_simulation_parameters( start=datetime(2008, 1, 1, tzinfo=pytz.utc), end=datetime(2008, 1, 5, tzinfo=pytz.utc), env=self.env, ) algo = TestAlgo(self, sim_params=sim_params, env=self.env) trade_source = factory.create_daily_trade_source( [8229], sim_params, env=self.env, ) algo.set_sources([trade_source]) gen = algo.get_generator() results = list(gen) self.assertEqual(results[-2]['progress'], 1.0)
def test_serialization(self): start_dt = datetime(year=2008, month=10, day=9, tzinfo=pytz.utc) end_dt = datetime(year=2008, month=10, day=16, tzinfo=pytz.utc) sim_params = SimulationParameters( period_start=start_dt, period_end=end_dt, env=self.env, ) perf_tracker = perf.PerformanceTracker( sim_params, env=self.env ) check_perf_tracker_serialization(perf_tracker)
def test_yahoo_bars_to_panel_source(self): env = TradingEnvironment() finder = AssetFinder(env.engine) stocks = ['AAPL', 'GE'] env.write_data(equities_identifiers=stocks) start = pd.datetime(1993, 1, 1, 0, 0, 0, 0, pytz.utc) end = pd.datetime(2002, 1, 1, 0, 0, 0, 0, pytz.utc) data = factory.load_bars_from_yahoo(stocks=stocks, indexes={}, start=start, end=end) check_fields = ['sid', 'open', 'high', 'low', 'close', 'volume', 'price'] copy_panel = data.copy() sids = finder.map_identifier_index_to_sids( data.items, data.major_axis[0] ) copy_panel.items = sids source = DataPanelSource(copy_panel) for event in source: for check_field in check_fields: self.assertIn(check_field, event) self.assertTrue(isinstance(event['volume'], (integer_types))) self.assertTrue(event['sid'] in sids)
def test_load_bars_from_yahoo(self): stocks = ['AAPL', 'GE'] start = pd.datetime(1993, 1, 1, 0, 0, 0, 0, pytz.utc) end = pd.datetime(2002, 1, 1, 0, 0, 0, 0, pytz.utc) data = load_bars_from_yahoo(stocks=stocks, start=start, end=end) assert data.major_axis[0] == pd.Timestamp('1993-01-04 00:00:00+0000') assert data.major_axis[-1] == pd.Timestamp('2001-12-31 00:00:00+0000') for stock in stocks: assert stock in data.items for ohlc in ['open', 'high', 'low', 'close', 'volume', 'price']: assert ohlc in data.minor_axis np.testing.assert_raises( AssertionError, load_bars_from_yahoo, stocks=stocks, start=end, end=start )
def test_day_after_thanksgiving(self): early_closes = tradingcalendar.get_early_closes( tradingcalendar.start, tradingcalendar.end.replace(year=tradingcalendar.end.year + 1) ) # November 2012 # Su Mo Tu We Th Fr Sa # 1 2 3 # 4 5 6 7 8 9 10 # 11 12 13 14 15 16 17 # 18 19 20 21 22 23 24 # 25 26 27 28 29 30 fourth_friday = datetime.datetime(2012, 11, 23, tzinfo=pytz.utc) self.assertIn(fourth_friday, early_closes) # November 2013 # Su Mo Tu We Th Fr Sa # 1 2 # 3 4 5 6 7 8 9 # 10 11 12 13 14 15 16 # 17 18 19 20 21 22 23 # 24 25 26 27 28 29 30 fifth_friday = datetime.datetime(2013, 11, 29, tzinfo=pytz.utc) self.assertIn(fifth_friday, early_closes)
def test_partial_month(self): start = datetime.datetime( year=1991, month=1, day=1, hour=0, minute=0, tzinfo=pytz.utc) # 1992 and 1996 were leap years total_days = 365 * 5 + 2 end = start + datetime.timedelta(days=total_days) sim_params90s = SimulationParameters( period_start=start, period_end=end, env=self.env, ) returns = factory.create_returns_from_range(sim_params90s) returns = returns[:-10] # truncate the returns series to end mid-month metrics = risk.RiskReport(returns, sim_params90s, env=self.env) total_months = 60 self.check_metrics(metrics, total_months, start)
def use_testing_credentials(args, credentials): print("Skipping AWS API calls because AWSMFA_TESTING_MODE is set.", file=sys.stderr) # AWS returns offset-aware UTC times, so we fake that in order to # verify consistent code paths between py2 and py3 datetime. fake_expiration = (datetime.datetime.now(tz=pytz.utc) + datetime.timedelta(minutes=5)) fake_credentials = { 'AccessKeyId': credentials.get(args.identity_profile, 'aws_access_key_id'), 'SecretAccessKey': credentials.get(args.identity_profile, 'aws_secret_access_key'), 'SessionToken': "420", 'Expiration': fake_expiration, } print_expiration_time(fake_expiration) update_credentials_file(args.aws_credentials, args.target_profile, args.identity_profile, credentials, fake_credentials)
def add_emails(self, topic_ids): ''' add all emails and email_blobs ''' # loop over emails for email in range(0, self.num_emails): # loop over emails em = self.metadata[email] dtime_orig = dateparse(em['Date']) dtime_utc = dtime_orig.astimezone(pytz.utc) values = nparray([em['Subject'], em['From'], em['To'], em['Cc'], em['Bcc'], dtime_orig, dtime_utc]) values = nparray([value.replace("'", " ") if (value and isinstance(value, str)) else value for value in values]) rows = nparray(['subject', 'sender', 'receiver', 'cc', 'bcc', 'send_time', 'send_time_utc']) bool = nparray([True if a else False for a in values]) self.add_email('email', rows[bool], values[bool]) for idx2, t_id in enumerate(topic_ids): # loop over topics rows = nparray(['topic_id', 'topic_probability']) values = nparray([t_id, self.email_prob[idx2, email]]) self.add_blob('email_blob', rows, values)
def test_stats_hours(self): """Test CACHE PROJECT STATS hours works.""" pr = ProjectFactory.create() task = TaskFactory.create(n_answers=1) today = datetime.now(pytz.utc) TaskFactory.create() TaskRunFactory.create(project=pr, task=task) AnonymousTaskRunFactory.create(project=pr) hours, hours_anon, hours_auth, max_hours, \ max_hours_anon, max_hours_auth = stats_hours(pr.id) assert len(hours) == 24, len(hours) assert hours[today.strftime('%H')] == 2, hours[today.strftime('%H')] assert hours_anon[today.strftime('%H')] == 1, hours_anon[today.strftime('%H')] assert hours_auth[today.strftime('%H')] == 1, hours_auth[today.strftime('%H')] assert max_hours == 2 assert max_hours_anon == 1 assert max_hours_auth == 1
def test_stats_hours_with_period(self): """Test CACHE PROJECT STATS hours with period works.""" pr = ProjectFactory.create() today = datetime.now(pytz.utc) d = date.today() - timedelta(days=6) task = TaskFactory.create(n_answers=1, created=d) TaskRunFactory.create(project=pr, task=task, created=d, finish_time=d) d = date.today() - timedelta(days=16) AnonymousTaskRunFactory.create(project=pr, created=d, finish_time=d) hours, hours_anon, hours_auth, max_hours, \ max_hours_anon, max_hours_auth = stats_hours(pr.id) assert len(hours) == 24, len(hours) # We use 00 as the timedelta sets the hour to 00 assert hours['00'] == 1, hours[today.strftime('%H')] assert hours_anon['00'] == 0, hours_anon[today.strftime('%H')] assert hours_auth['00'] == 1, hours_auth[today.strftime('%H')] assert max_hours == 1 assert max_hours_anon is None assert max_hours_auth == 1
def test_fromutc(self): # naive datetime. dt1 = datetime(2011, 10, 31) # localized datetime, same timezone. dt2 = self.tz.localize(dt1) # Both should give the same results. Note that the standard # Python tzinfo.fromutc() only supports the second. for dt in [dt1, dt2]: loc_dt = self.tz.fromutc(dt) loc_dt2 = pytz.utc.localize(dt1).astimezone(self.tz) self.assertEqual(loc_dt, loc_dt2) # localized datetime, different timezone. new_tz = pytz.timezone('Europe/Paris') self.assertTrue(self.tz is not new_tz) dt3 = new_tz.localize(dt1) self.assertRaises(ValueError, self.tz.fromutc, dt3)
def test_encode_datetime(): now = datetime.utcnow() now_utc = now.replace(tzinfo=pytz.utc) stripped = datetime(*now.timetuple()[:3]) serialized = loads(dumps({ 'datetime': now, 'tz': now_utc, 'date': now.date(), 'time': now.time()}, )) assert serialized == { 'datetime': now.isoformat(), 'tz': '{0}Z'.format(now_utc.isoformat().split('+', 1)[0]), 'time': now.time().isoformat(), 'date': stripped.isoformat(), }
def _sync_character(server, character, data): has_no_changes = ( (data['x'] == character.x) or (data['y'] == character.y) or (data['z'] == character.y)) last_online = (datetime .utcfromtimestamp(data['last_online']) .replace(tzinfo=pytz.utc)) character.server = server character.conan_id = data['conan_id'] character.name = data['name'] character.level = data['level'] character.is_online = data['is_online'] character.steam_id = data['steam_id'] character.last_killed_by = data['last_killed_by'] character.x = data['x'] character.y = data['y'] character.z = data['z'] character.last_online = last_online character.save() return not has_no_changes
def test_to_dict(self): instance = SimpleModel(date_field='1973-12-06', int_field='100', float_field='7') self.assertDictEqual(instance.to_dict(), { "date_field": datetime.date(1973, 12, 6), "int_field": 100, "float_field": 7.0, "datetime_field": datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc), "alias_field": 0.0, 'str_field': 'dozo' }) self.assertDictEqual(instance.to_dict(include_readonly=False), { "date_field": datetime.date(1973, 12, 6), "int_field": 100, "float_field": 7.0, "datetime_field": datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc), 'str_field': 'dozo' }) self.assertDictEqual( instance.to_dict(include_readonly=False, field_names=('int_field', 'alias_field', 'datetime_field')), { "int_field": 100, "datetime_field": datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc) })
def test_datetime_field(self): f = DateTimeField() epoch = datetime(1970, 1, 1, tzinfo=pytz.utc) # Valid values for value in (date(1970, 1, 1), datetime(1970, 1, 1), epoch, epoch.astimezone(pytz.timezone('US/Eastern')), epoch.astimezone(pytz.timezone('Asia/Jerusalem')), '1970-01-01 00:00:00', '1970-01-17 00:00:17', '0000-00-00 00:00:00', 0, '2017-07-26T08:31:05', '2017-07-26T08:31:05Z', '2017-07-26 08:31', '2017-07-26T13:31:05+05', '2017-07-26 13:31:05+0500'): dt = f.to_python(value, pytz.utc) self.assertEquals(dt.tzinfo, pytz.utc) # Verify that conversion to and from db string does not change value dt2 = f.to_python(f.to_db_string(dt, quote=False), pytz.utc) self.assertEquals(dt, dt2) # Invalid values for value in ('nope', '21/7/1999', 0.5, '2017-01 15:06:00', '2017-01-01X15:06:00', '2017-13-01T15:06:00'): with self.assertRaises(ValueError): f.to_python(value, pytz.utc)
def test_date_field(self): f = DateField() epoch = date(1970, 1, 1) # Valid values for value in (datetime(1970, 1, 1), epoch, '1970-01-01', '0000-00-00', 0): d = f.to_python(value, pytz.utc) self.assertEquals(d, epoch) # Verify that conversion to and from db string does not change value d2 = f.to_python(f.to_db_string(d, quote=False), pytz.utc) self.assertEquals(d, d2) # Invalid values for value in ('nope', '21/7/1999', 0.5): with self.assertRaises(ValueError): f.to_python(value, pytz.utc) # Range check for value in (date(1900, 1, 1), date(2900, 1, 1)): with self.assertRaises(ValueError): f.validate(value)
def from_tsv(cls, line, field_names=None, timezone_in_use=pytz.utc, database=None): ''' Create a model instance from a tab-separated line. The line may or may not include a newline. The `field_names` list must match the fields defined in the model, but does not have to include all of them. If omitted, it is assumed to be the names of all fields in the model, in order of definition. - `line`: the TSV-formatted data. - `field_names`: names of the model fields in the data. - `timezone_in_use`: the timezone to use when parsing dates and datetimes. - `database`: if given, sets the database that this instance belongs to. ''' from six import next field_names = field_names or [name for name, field in cls._fields] values = iter(parse_tsv(line)) kwargs = {} for name in field_names: field = getattr(cls, name) kwargs[name] = field.to_python(next(values), timezone_in_use) obj = cls(**kwargs) if database is not None: obj.set_database(database) return obj
def filter(self, message): # First filter by weekday: we don't work on weekends date = TZ.normalize(message.date.replace(tzinfo=pytz.utc)) if not is_workday(date): return False # Then filter by time: we work in range of [9.am, 18.pm] if date.hour < START or date.hour >= END: return False # Then filter by message text text = message.text if self._bad_keyword_detect(text) or self._keyword_detect(text): return True return False
def workday_end_time(bot, update): bot.sendChatAction( chat_id=update.message.chat_id, action=ChatAction.TYPING ) text_templates = ( '{}?????????????', '?????{}', '??{}????', '??????{}', '???{}????', '????{}', '??????: {}', ) now = TZ.normalize(update.message.date.replace(tzinfo=pytz.utc)) end_time = TZ.localize(datetime(now.year, now.month, now.day, hour=18)) duration = end_time - now hour = duration.seconds // 3600 minute = (duration.seconds % 3600) // 60 time_remaining = ' {} ??'.format(hour) if hour else '' time_remaining += ' {} ??'.format(minute) text = random.choice(text_templates).format(time_remaining) update.message.reply_text(text, quote=True)
def test_serial(): assert s.serialize_value(None) == 'x' assert s.serialize_value(True) == 'true' assert s.serialize_value(False) == 'false' assert s.serialize_value(5) == 'i:5' assert s.serialize_value(5.0) == 'f:5.0' assert s.serialize_value(decimal.Decimal('5.5')) == 'n:5.5' assert s.serialize_value('abc') == 's:abc' assert s.serialize_value(b'abc') == 'b:YWJj' assert s.serialize_value(b'abc') == 'b:YWJj' assert s.serialize_value(datetime.date(2007, 12, 5)) == 'd:2007-12-05' assert s.serialize_value(datetime.datetime(2007, 12, 5, 12, 30, 30, tzinfo=utc)) \ == 'dt:2007-12-05 12:30:30+00:00' assert s.serialize_value(datetime.time(12, 34, 56)) == 't:12:34:56' with raises(NotImplementedError): s.serialize_value(csv.reader)
def test_unserial(): def twoway(x): assert s.unserialize_value(s.serialize_value(x)) == x twoway(None) twoway(True) twoway(False) twoway(5) twoway(5.0) twoway(decimal.Decimal('5.5')) twoway('abc') twoway(b'abc') twoway(b'abc') twoway(datetime.date(2007, 12, 5)) twoway(datetime.datetime(2007, 12, 5, 12, 30, 30, tzinfo=utc)) twoway(Z('abc')) with raises(ValueError): s.unserialize_value('zzzz:abc')
def utc_week_limits(utc_dt): """Returns US/Pacific start (12:00 am Sunday) and end (11:59 pm Saturday) of the week containing utc_dt, in UTC.""" local_now = utc_dt.replace(tzinfo=pytz.utc).astimezone(pytz.timezone('US/Pacific')) local_week_start = local_now - timedelta( days=local_now.weekday() + 1, hours=local_now.hour, minutes=local_now.minute, seconds=local_now.second, microseconds=local_now.microsecond, ) local_week_end = local_week_start + timedelta(days=7, minutes=-1) utc_week_start = local_week_start.astimezone(pytz.utc).replace(tzinfo=None) utc_week_end = local_week_end.astimezone(pytz.utc).replace(tzinfo=None) return (utc_week_start, utc_week_end)
def parse(cls, report, objects, data, localcontext): Company = Pool().get('company.company') timezone = None company_id = Transaction().context.get('company') if company_id: company = Company(company_id) if company.timezone: timezone = pytz.timezone(company.timezone) dt = datetime.now() localcontext['print_date'] = datetime.astimezone(dt.replace( tzinfo=pytz.utc), timezone) localcontext['print_time'] = localcontext['print_date'].time() return super(PatientEvaluationReport, cls).parse(report, objects, data, localcontext)
def get_date_ymd(dt): """Get a datetime from a string 'Year-month-day' Args: dt (str): a date Returns: datetime: a datetime object """ assert dt if isinstance(dt, datetime): return as_utc(dt) if dt == 'today': today = datetime.utcnow() return pytz.utc.localize(datetime(today.year, today.month, today.day)) elif dt == 'tomorrow': tomorrow = datetime.utcnow() + timedelta(1) return pytz.utc.localize(datetime(tomorrow.year, tomorrow.month, tomorrow.day)) elif dt == 'yesterday': yesterday = datetime.utcnow() - timedelta(1) return pytz.utc.localize(datetime(yesterday.year, yesterday.month, yesterday.day)) return as_utc(dateutil.parser.parse(dt))
def get_date_from_buildid(bid): """Get a date from buildid Args: bid (str): build_id Returns: date: date object """ # 20160407164938 == 2016 04 07 16 49 38 bid = str(bid) year = int(bid[:4]) month = int(bid[4:6]) day = int(bid[6:8]) hour = int(bid[8:10]) minute = int(bid[10:12]) second = int(bid[12:14]) return __pacific.localize(datetime(year, month, day, hour, minute, second)).astimezone(pytz.utc)
def test_create(self, request): resp = create_mock_json('tests/resources/task_router/activities_instance.json') resp.status_code = 201 request.return_value = resp activities = Activities(BASE_URI, AUTH) activity = activities.create("Test Activity", True) self.assertTrue(activity is not None) self.assertEqual(activity.date_created, datetime(2014, 5, 14, 10, 50, 2, tzinfo=pytz.utc)) exp_params = { 'FriendlyName': "Test Activity", 'Available': "true" } request.assert_called_with("POST", "{0}/Activities".format(BASE_URI), data=exp_params, auth=AUTH, use_json_extension=False)
def __init__(self, clip_id, project_id, rank, title, descr, clip_start_utc, url, category=None, timecodes=None): url_parts = url.split('_') start_ts = url_parts[-2] start_ts = datetime.strptime(start_ts, '%Y%m%d%H%M%S') start_ts = start_ts.replace(tzinfo=pytz.utc) duration = url_parts[-1].replace('.mp4', '') duration = timedelta(hours=int(duration[0:2]), minutes=int(duration[2:4]), seconds=int(duration[4:6])) super().__init__(clip_id, title, category, start_ts.isoformat(), (start_ts + duration).isoformat(), timecodes, url) self.project_id = project_id self.rank = rank self.descr = descr self.clip_start_utc = clip_start_utc self.duration = duration
def parse_time_range_from_url(adaptive_url): """ Parse the time information available in a video URL. :param adaptive_url: Video URL, which contains time info. :return: Tuple of start time, end time, and duration """ filename = adaptive_url.split('/')[-1] filename = filename.replace('.mp4', '') find_part = '_pc_' ts_part = filename[filename.find(find_part) + len(find_part):] start_ts, duration = ts_part.split('_') start_ts = datetime.strptime(start_ts, '%Y%m%d%H%M%S') start_ts = start_ts.replace(tzinfo=pytz.utc) duration = timedelta(hours=int(duration[:2]), minutes=int(duration[2:4]), seconds=int(duration[4:])) end_ts = start_ts + duration return start_ts, end_ts, duration
def _generate_cached_enrollments(user, program, num_course_runs=1, data=None): """ Helper method to generate CachedEnrollments for test cases """ fake = faker.Factory.create() course = CourseFactory.create(program=program) course_run_params = dict(before_now=True, after_now=False, tzinfo=pytz.utc) course_runs = [ CourseRunFactory.create( course=course, enrollment_start=fake.date_time_this_month(**course_run_params), start_date=fake.date_time_this_month(**course_run_params), enrollment_end=fake.date_time_this_month(**course_run_params), end_date=fake.date_time_this_year(**course_run_params), ) for _ in range(num_course_runs) ] factory_kwargs = dict(user=user) if data is not None: factory_kwargs['data'] = data return [CachedEnrollmentFactory.create(course_run=course_run, **factory_kwargs) for course_run in course_runs]
def __init__(self, data_file='', reader=septo3d_reader, wind_screen=2, temperature_screen=2, localisation={'city': 'Montpellier', 'latitude': 43.61, 'longitude': 3.87}, timezone='UTC'): self.data_path = data_file self.models = {'global_radiation': PPFD_to_global, 'vapor_pressure': humidity_to_vapor_pressure, 'PPFD': global_to_PPFD, 'degree_days': linear_degree_days} self.timezone = pytz.timezone(timezone) if data_file is '': self.data = None else: self.data = reader(data_file) date = self.data['date'] date = map(lambda x: self.timezone.localize(x), date) utc = map(lambda x: x.astimezone(pytz.utc), date) self.data.index = utc self.data.index.name = 'date_utc' self.wind_screen = wind_screen self.temperature_screen = temperature_screen self.localisation = localisation
def test_success_datetime_from_timestamp(self, datetime_tuple, timestamp): """Test successful calls to datetime_from_timestamp().""" if os.name == 'nt' and timestamp > TS_3001_LIMIT: # Skip this test case, due to the lower limit on Windows return # Create the expected datetime result (always timezone-aware in UTC) dt_unaware = datetime(*datetime_tuple) exp_dt = pytz.utc.localize(dt_unaware) # Execute the code to be tested dt = datetime_from_timestamp(timestamp) # Verify the result assert dt == exp_dt
def test_time_function(): dt = datetime(1970, 1, 1) timestamp = _datetime_to_utc_timestamp(dt) assert timestamp == 0.0 assert type(timestamp) is float assert time() != timestamp with immobilus(dt): assert time() == timestamp assert time() != timestamp with immobilus(dt.replace(tzinfo=pytz.utc)): assert time() == timestamp assert time() != timestamp
def test_create_notification(self, m): register_uris({'account': ['create_notification']}, m) subject = 'Subject' notif_dict = { 'subject': subject, 'message': 'Message', 'start_at': '2015-04-01T00:00:00Z', 'end_at': '2018-04-01T00:00:00Z' } notif = self.account.create_notification(notif_dict) self.assertIsInstance(notif, AccountNotification) self.assertTrue(hasattr(notif, 'subject')) self.assertEqual(notif.subject, subject) self.assertTrue(hasattr(notif, 'start_at_date')) self.assertIsInstance(notif.start_at_date, datetime.datetime) self.assertEqual(notif.start_at_date.tzinfo, pytz.utc)
def set_timezone(self, timezone = None): with self.tree_lock: if isinstance(timezone, datetime.tzinfo): self.timezone = timezone else: if timezone == None: timezone = self.data_value(["timezone"], str, default='utc') try: oldtz = self.timezone self.timezone = pytz.timezone(timezone) except: if isinstance(oldtz, datetime.tzinfo): self.warn('Invalid timezone "%s" suplied. Falling back to the old timezone "%s"' \ % (timezone, oldtz.tzname), dtdata_defWarning, 2) self.timezone = oldtz else: self.warn('Invalid timezone "%s" suplied. Falling back to UTC' % (timezone, ), dtdata_defWarning, 2) self.timezone = pytz.utc self.set_current_date() self.set_current_weekdays()
def set_current_date(self, cdate = None): with self.tree_lock: if isinstance(cdate, datetime.datetime): if cdate.tzinfo == None: self.current_date = self.timezone.localize(cdate).date() else: self.current_date = self.timezone.normalize(cdate.astimezone(self.timezone)).date() self.current_ordinal = self.current_date.toordinal() elif isinstance(cdate, datetime.date): self.current_date = cdate self.current_ordinal = self.current_date.toordinal() elif isinstance(cdate, int): self.current_ordinal = cdate datetime.datetime.fromordinal(cdate) else: if cdate != None: self.warn('Invalid or no current_date "%s" suplied. Falling back to NOW' % (cdate, ), dtdata_defWarning, 2) self.current_date = self.timezone.normalize(datetime.datetime.now(pytz.utc).astimezone(self.timezone)).date() self.current_ordinal = self.current_date.toordinal()
def __init__(self, data_def, data = None, warnaction = "default", warngoal = sys.stderr, caller_id = 0): self.tree_lock = RLock() with self.tree_lock: self.dtc = DataTreeConstants() self.ddconv = DataDef_Convert(warnaction = warnaction , warngoal = warngoal, caller_id = caller_id) self.caller_id = caller_id self.print_tags = False self.print_searchtree = False self.show_result = False self.fle = sys.stdout if sys.modules['DataTreeGrab']._warnings == None: sys.modules['DataTreeGrab']._warnings = _Warnings(warnaction, warngoal, caller_id) else: sys.modules['DataTreeGrab']._warnings.set_warnaction(warnaction, caller_id) self.searchtree = None self.timezone = pytz.utc self.errorcode = dte.dtDataInvalid self.result = [] self.data_def = None self.init_data_def(data_def) if data != None: self.init_data(data)
def format_datetime(dttm): # 1. Convert to timezone-aware # 2. Convert to UTC # 3. Format in ISO format # 4. Add subsecond value if non-zero # 5. Add "Z" if dttm.tzinfo is None or dttm.tzinfo.utcoffset(dttm) is None: # dttm is timezone-naive; assume UTC zoned = pytz.utc.localize(dttm) else: zoned = dttm.astimezone(pytz.utc) ts = zoned.strftime("%Y-%m-%dT%H:%M:%S") if zoned.microsecond > 0: ms = zoned.strftime("%f") ts = ts + '.' + ms.rstrip("0") return ts + "Z"
def test_partial(self): """ You can use Filter macros to create partials from other Filter types. """ MyDatetime = filter_macro(f.Datetime, timezone=12) self.assertEqual( MyDatetime().apply('2015-10-13 15:22:18'), # By default, MyDatetime assumes a timezone of UTC+12... datetime(2015, 10, 13, 3, 22, 18, tzinfo=utc), ) self.assertEqual( # ... however, we can override it. MyDatetime(timezone=6).apply('2015-10-13 15:22:18'), datetime(2015, 10, 13, 9, 22, 18, tzinfo=utc), )
def test_pass_naive_timestamp_default_timezone(self): """ The incoming value is a naive timestamp, but the Filter is configured not to treat naive timestamps as UTC. """ self.assertFilterPasses( # The incoming value is a naive timestamp, and the Filter # is configured to use UTC+8 by default. self._filter( '2015-05-12 09:20:03', timezone = tzoffset('UTC+8', 8*3600), ), # The resulting datetime is still converted to UTC. datetime(2015, 5, 12, 1, 20, 3, tzinfo=utc), )
def _ToTimeZone(t, tzinfo): """Converts 't' to the time zone 'tzinfo'. Arguments: t: a datetime object. It may be in any pytz time zone, or it may be timezone-naive (interpreted as UTC). tzinfo: a pytz timezone object, or None. Returns: a datetime object in the time zone 'tzinfo' """ if pytz is None: return t.replace(tzinfo=None) elif tzinfo: if not t.tzinfo: t = pytz.utc.localize(t) return tzinfo.normalize(t.astimezone(tzinfo)) elif t.tzinfo: return pytz.utc.normalize(t.astimezone(pytz.utc)).replace(tzinfo=None) else: return t
def convert_to_local_from_utc(utc_time): local_timezone = tzlocal.get_localzone() local_time = utc_time.replace(tzinfo=pytz.utc).astimezone(local_timezone) return local_time
def convert_to_tz_from_utc(utc_time, tz): local_time = utc_time.replace(tzinfo=pytz.utc).astimezone(pytz.timezone(tz)) return local_time
def test_datetime_from_fields_correct(self): el = { "next_activity_date": "2017-03-20", "next_activity_time": "14:01:02" } result = Organization.datetime_from_fields(el, 'next_activity_date', 'next_activity_time') expected = datetime.datetime(2017, 3, 20, 14, 1, 2, 0, tzinfo=pytz.utc) self.assertEquals(result, expected)
def test_datetime_from_simple_time_correct(self): el = {u'update_time': u'2017-04-03 16:21:12'} result = Organization.datetime_from_simple_time(el, u'update_time') expected = datetime.datetime(2017, 4, 3, 16, 21, 12, 0, tzinfo=pytz.utc) self.assertEquals(result, expected)