我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用dateutil.tz.tzutc()。
def test_generate_signature(defconfig): kwargs = dict( method='GET', version=defconfig.version, endpoint=defconfig.endpoint, date=datetime.now(tzutc()), request_path='/path/to/api/', content=b'"test data"', content_type='application/json', access_key=defconfig.access_key, secret_key=defconfig.secret_key, hash_type='md5' ) headers, signature = generate_signature(**kwargs) assert kwargs['hash_type'].upper() in headers['Authorization'] assert kwargs['access_key'] in headers['Authorization'] assert signature in headers['Authorization']
def testTzAll(self): from dateutil.tz import tzutc from dateutil.tz import tzoffset from dateutil.tz import tzlocal from dateutil.tz import tzfile from dateutil.tz import tzrange from dateutil.tz import tzstr from dateutil.tz import tzical from dateutil.tz import gettz from dateutil.tz import tzwin from dateutil.tz import tzwinlocal tz_all = ["tzutc", "tzoffset", "tzlocal", "tzfile", "tzrange", "tzstr", "tzical", "gettz"] tz_all += ["tzwin", "tzwinlocal"] if sys.platform.startswith("win") else [] lvars = locals() for var in tz_all: self.assertIsNot(lvars[var], None)
def testFoldPositiveUTCOffset(self): # Test that we can resolve ambiguous times tzname = self._get_tzname('Australia/Sydney') with self._gettz_context(tzname): SYD0 = self.gettz(tzname) SYD1 = self.gettz(tzname) t0_u = datetime(2012, 3, 31, 15, 30, tzinfo=tz.tzutc()) # AEST t1_u = datetime(2012, 3, 31, 16, 30, tzinfo=tz.tzutc()) # AEDT # Using fresh tzfiles t0_syd0 = t0_u.astimezone(SYD0) t1_syd1 = t1_u.astimezone(SYD1) self.assertEqual(t0_syd0.replace(tzinfo=None), datetime(2012, 4, 1, 2, 30)) self.assertEqual(t1_syd1.replace(tzinfo=None), datetime(2012, 4, 1, 2, 30)) self.assertEqual(t0_syd0.utcoffset(), timedelta(hours=11)) self.assertEqual(t1_syd1.utcoffset(), timedelta(hours=10))
def testGapPositiveUTCOffset(self): # Test that we don't have a problem around gaps. tzname = self._get_tzname('Australia/Sydney') with self._gettz_context(tzname): SYD0 = self.gettz(tzname) SYD1 = self.gettz(tzname) t0_u = datetime(2012, 10, 6, 15, 30, tzinfo=tz.tzutc()) # AEST t1_u = datetime(2012, 10, 6, 16, 30, tzinfo=tz.tzutc()) # AEDT # Using fresh tzfiles t0 = t0_u.astimezone(SYD0) t1 = t1_u.astimezone(SYD1) self.assertEqual(t0.replace(tzinfo=None), datetime(2012, 10, 7, 1, 30)) self.assertEqual(t1.replace(tzinfo=None), datetime(2012, 10, 7, 3, 30)) self.assertEqual(t0.utcoffset(), timedelta(hours=10)) self.assertEqual(t1.utcoffset(), timedelta(hours=11))
def testFoldNegativeUTCOffset(self): # Test that we can resolve ambiguous times tzname = self._get_tzname('America/Toronto') with self._gettz_context(tzname): # Calling fromutc() alters the tzfile object TOR0 = self.gettz(tzname) TOR1 = self.gettz(tzname) t0_u = datetime(2011, 11, 6, 5, 30, tzinfo=tz.tzutc()) t1_u = datetime(2011, 11, 6, 6, 30, tzinfo=tz.tzutc()) # Using fresh tzfiles t0_tor0 = t0_u.astimezone(TOR0) t1_tor1 = t1_u.astimezone(TOR1) self.assertEqual(t0_tor0.replace(tzinfo=None), datetime(2011, 11, 6, 1, 30)) self.assertEqual(t1_tor1.replace(tzinfo=None), datetime(2011, 11, 6, 1, 30)) self.assertEqual(t0_tor0.utcoffset(), timedelta(hours=-4.0)) self.assertEqual(t1_tor1.utcoffset(), timedelta(hours=-5.0))
def testGapNegativeUTCOffset(self): # Test that we don't have a problem around gaps. tzname = self._get_tzname('America/Toronto') with self._gettz_context(tzname): # Calling fromutc() alters the tzfile object TOR0 = self.gettz(tzname) TOR1 = self.gettz(tzname) t0_u = datetime(2011, 3, 13, 6, 30, tzinfo=tz.tzutc()) t1_u = datetime(2011, 3, 13, 7, 30, tzinfo=tz.tzutc()) # Using fresh tzfiles t0 = t0_u.astimezone(TOR0) t1 = t1_u.astimezone(TOR1) self.assertEqual(t0.replace(tzinfo=None), datetime(2011, 3, 13, 1, 30)) self.assertEqual(t1.replace(tzinfo=None), datetime(2011, 3, 13, 3, 30)) self.assertNotEqual(t0, t1) self.assertEqual(t0.utcoffset(), timedelta(hours=-5.0)) self.assertEqual(t1.utcoffset(), timedelta(hours=-4.0))
def testEqualAmbiguousComparison(self): tzname = self._get_tzname('Australia/Sydney') with self._gettz_context(tzname): SYD0 = self.gettz(tzname) SYD1 = self.gettz(tzname) t0_u = datetime(2012, 3, 31, 14, 30, tzinfo=tz.tzutc()) # AEST t1_u = datetime(2012, 3, 31, 16, 30, tzinfo=tz.tzutc()) # AEDT t0_syd0 = t0_u.astimezone(SYD0) t0_syd1 = t0_u.astimezone(SYD1) # This is considered an "inter-zone comparison" because it's an # ambiguous datetime. self.assertEqual(t0_syd0, t0_syd1)
def get_utc_transitions(self, tzi, year, gap): dston, dstoff = tzi.transitions(year) if gap: t_n = dston - timedelta(minutes=30) t0_u = t_n.replace(tzinfo=tzi).astimezone(tz.tzutc()) t1_u = t0_u + timedelta(hours=1) else: # Get 1 hour before the first ambiguous date t_n = dstoff - timedelta(minutes=30) t0_u = t_n.replace(tzinfo=tzi).astimezone(tz.tzutc()) t_n += timedelta(hours=1) # Naive ambiguous date t0_u = t0_u + timedelta(hours=1) # First ambiguous date t1_u = t0_u + timedelta(hours=1) # Second ambiguous date return t_n, t0_u, t1_u
def datetime2timestamp(dt, default_timezone=None): """Calculate the timestamp based on the given datetime instance. :type dt: datetime :param dt: A datetime object to be converted into timestamp :type default_timezone: tzinfo :param default_timezone: If it is provided as None, we treat it as tzutc(). But it is only used when dt is a naive datetime. :returns: The timestamp """ epoch = datetime.datetime(1970, 1, 1) if dt.tzinfo is None: if default_timezone is None: default_timezone = tzutc() dt = dt.replace(tzinfo=default_timezone) d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch if hasattr(d, "total_seconds"): return d.total_seconds() # Works in Python 2.7+ return (d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6) / 10**6
def make_analyzed_tickets(AnalyzedAgileTicket, datetime, tzutc): """Make ticket from a list of dicts with key data.""" from dateutil.parser import parse default = datetime(1979, 8, 15, 0, 0, 0, tzinfo=tzutc) def _make_analyzed_tickets(ticket_datas): tickets = [] for data in ticket_datas: t = AnalyzedAgileTicket( key=data['key'], committed=dict(state="Committed", entered_at=parse(data['committed'], default=default)), started=dict(state="Started", entered_at=parse(data['started'], default=default)), ended=dict(state="Ended", entered_at=parse(data['ended'], default=default)) ) tickets.append(t) return tickets return _make_analyzed_tickets
def weeks_of_tickets(datetime, tzutc, AnalyzedAgileTicket): """A bunch of tickets.""" from dateutil.parser import parse parsed = [] default = datetime(1979, 8, 15, 0, 0, 0, tzinfo=tzutc) current_path = path.dirname(path.abspath(__file__)) csv_file = path.join(current_path, 'data', 'weeks_of_tickets.csv') count = 1 for row in csv.DictReader(open(csv_file, 'r')): t = AnalyzedAgileTicket( key="FOO-{}".format(count), committed=dict(state="committed", entered_at=parse(row['committed'], default=default)), started=dict(state="started", entered_at=parse(row['started'], default=default)), ended=dict(state="ended", entered_at=parse(row['ended'], default=default)) ) parsed.append(t) count += 1 return parsed
def __init__(self, product_id: str, sequence_id: int, order_side: OrderSide, size: str, price: str, status: OrderStatus = OrderStatus.open, order_id: str = None, order_type: OrderType = OrderType.limit, created_at: Optional[datetime] = None, historical: bool = False, confirmed: bool = False): self.product_id = product_id self.order_side = order_side self.order_type = order_type self.status = status self.sequence_id = int(sequence_id) self.size = size self.filled_size = '0' self.price = str(price) self.order_id = order_id if created_at is None: self.created_at = datetime.now(tz.tzutc()) else: # orders cannot be created in the future please self.created_at = min(datetime.now(tz.tzutc()), created_at) self.historical = historical self.confirmed = confirmed if float(self.size) < 0: raise OrderException('Order size must be positive {}'.format(self.size))
def _build_datetime(parts): timestamp = parts.get('timestamp') if timestamp: tz_utc = tz.tzutc() return datetime.fromtimestamp(timestamp, tz=tz_utc) am_pm = parts.get('am_pm') hour = parts.get('hour', 0) if am_pm == 'pm' and hour < 12: hour += 12 elif am_pm == 'am' and hour == 12: hour = 0 return datetime(year=parts.get('year', 1), month=parts.get('month', 1), day=parts.get('day', 1), hour=hour, minute=parts.get('minute', 0), second=parts.get('second', 0), microsecond=parts.get('microsecond', 0), tzinfo=parts.get('tzinfo'))
def __init__(self, dt): if not isinstance(dt, (datetime, date, timedelta, time)): raise ValueError('You must use datetime, date, timedelta or time') if isinstance(dt, datetime): self.params = Parameters({'value': 'DATE-TIME'}) elif isinstance(dt, date): self.params = Parameters({'value': 'DATE'}) elif isinstance(dt, time): self.params = Parameters({'value': 'TIME'}) if (isinstance(dt, datetime) or isinstance(dt, time))\ and getattr(dt, 'tzinfo', False): tzinfo = dt.tzinfo if tzinfo is not pytz.utc and\ (tzutc is None or not isinstance(tzinfo, tzutc)): # set the timezone as a parameter to the property tzid = tzid_from_dt(dt) if tzid: self.params.update({'TZID': tzid}) self.dt = dt
def my_local_time(): # METHOD 1: Hardcode zones: # from_zone = tz.gettz('UTC') # to_zone = tz.gettz('America/New_York') # METHOD 2: Auto-detect zones: from_zone = tz.tzutc() to_zone = tz.tzlocal() utc = datetime.utcnow() # utc = datetime.strptime('2011-01-21 02:37:21', '%Y-%m-%d %H:%M:%S') # Tell the datetime object that it's in UTC time zone since # datetime objects are 'naive' by default utc = utc.replace(tzinfo=from_zone) # Convert time zone return utc.astimezone(to_zone)
def test_index_convert_to_datetime_array_dateutil(self): tm._skip_if_no_dateutil() import dateutil def _check_rng(rng): converted = rng.to_pydatetime() tm.assertIsInstance(converted, np.ndarray) for x, stamp in zip(converted, rng): tm.assertIsInstance(x, datetime) self.assertEqual(x, stamp.to_pydatetime()) self.assertEqual(x.tzinfo, stamp.tzinfo) rng = date_range('20090415', '20090519') rng_eastern = date_range('20090415', '20090519', tz='dateutil/US/Eastern') rng_utc = date_range('20090415', '20090519', tz=dateutil.tz.tzutc()) _check_rng(rng) _check_rng(rng_eastern) _check_rng(rng_utc)
def test_period_resample_with_local_timezone_dateutil(self): # GH5430 tm._skip_if_no_dateutil() import dateutil local_timezone = 'dateutil/America/Los_Angeles' start = datetime(year=2013, month=11, day=1, hour=0, minute=0, tzinfo=dateutil.tz.tzutc()) # 1 day later end = datetime(year=2013, month=11, day=2, hour=0, minute=0, tzinfo=dateutil.tz.tzutc()) index = pd.date_range(start, end, freq='H') series = pd.Series(1, index=index) series = series.tz_convert(local_timezone) result = series.resample('D', kind='period').mean() # Create the expected series # Index is moved back a day with the timezone conversion from UTC to # Pacific expected_index = (pd.period_range(start=start, end=end, freq='D') - 1) expected = pd.Series(1, index=expected_index) assert_series_equal(result, expected)
def test_class_ops_dateutil(self): tm._skip_if_no_dateutil() from dateutil.tz import tzutc def compare(x, y): self.assertEqual(int(np.round(Timestamp(x).value / 1e9)), int(np.round(Timestamp(y).value / 1e9))) compare(Timestamp.now(), datetime.now()) compare(Timestamp.now('UTC'), datetime.now(tzutc())) compare(Timestamp.utcnow(), datetime.utcnow()) compare(Timestamp.today(), datetime.today()) current_time = calendar.timegm(datetime.now().utctimetuple()) compare(Timestamp.utcfromtimestamp(current_time), datetime.utcfromtimestamp(current_time)) compare(Timestamp.fromtimestamp(current_time), datetime.fromtimestamp(current_time)) date_component = datetime.utcnow() time_component = (date_component + timedelta(minutes=10)).time() compare(Timestamp.combine(date_component, time_component), datetime.combine(date_component, time_component))
def test_cant_compare_tz_naive_w_aware_dateutil(self): tm._skip_if_no_dateutil() from dateutil.tz import tzutc utc = tzutc() # #1404 a = Timestamp('3/12/2012') b = Timestamp('3/12/2012', tz=utc) self.assertRaises(Exception, a.__eq__, b) self.assertRaises(Exception, a.__ne__, b) self.assertRaises(Exception, a.__lt__, b) self.assertRaises(Exception, a.__gt__, b) self.assertRaises(Exception, b.__eq__, a) self.assertRaises(Exception, b.__ne__, a) self.assertRaises(Exception, b.__lt__, a) self.assertRaises(Exception, b.__gt__, a) if sys.version_info < (3, 3): self.assertRaises(Exception, a.__eq__, b.to_pydatetime()) self.assertRaises(Exception, a.to_pydatetime().__eq__, b) else: self.assertFalse(a == b.to_pydatetime()) self.assertFalse(a.to_pydatetime() == b)
def process(self, asgs): msg_tmpl = self.data.get('message', self.default_template) key = self.data.get('key', self.data.get('tag', DEFAULT_TAG)) op = self.data.get('op', 'suspend') date = self.data.get('days', 4) n = datetime.now(tz=tzutc()) stop_date = n + timedelta(days=date) try: msg = msg_tmpl.format( op=op, action_date=stop_date.strftime('%Y/%m/%d')) except Exception: self.log.warning("invalid template %s" % msg_tmpl) msg = self.default_template.format( op=op, action_date=stop_date.strftime('%Y/%m/%d')) self.log.info("Tagging %d asgs for %s on %s" % ( len(asgs), op, stop_date.strftime('%Y/%m/%d'))) self.tag(asgs, key, msg)
def fetch_credential_report(self): client = local_session(self.manager.session_factory).client('iam') try: report = client.get_credential_report() except ClientError as e: if e.response['Error']['Code'] != 'ReportNotPresent': raise report = None if report: threshold = datetime.datetime.now(tz=tzutc()) - timedelta( seconds=self.get_value_or_schema_default( 'report_max_age')) if not report['GeneratedTime'].tzinfo: threshold = threshold.replace(tzinfo=None) if report['GeneratedTime'] < threshold: report = None if report is None: if not self.get_value_or_schema_default('report_generate'): raise ValueError("Credential Report Not Present") client.generate_credential_report() time.sleep(self.get_value_or_schema_default('report_delay')) report = client.get_credential_report() return report['Content']
def test_age(self): now = datetime.now(tz=tz.tzutc()) three_months = now - timedelta(90) two_months = now - timedelta(60) one_month = now - timedelta(30) def i(d): return instance(LaunchTime=d) fdata = { 'type': 'value', 'key': 'LaunchTime', 'op': 'less-than', 'value_type': 'age', 'value': 32} self.assertFilter(fdata, i(three_months), False) self.assertFilter(fdata, i(two_months), False) self.assertFilter(fdata, i(one_month), True) self.assertFilter(fdata, i(now), True) self.assertFilter(fdata, i(now.isoformat()), True)
def test_expiration(self): now = datetime.now(tz=tz.tzutc()) three_months = now + timedelta(90) two_months = now + timedelta(60) def i(d): return instance(LaunchTime=d) fdata = { 'type': 'value', 'key': 'LaunchTime', 'op': 'less-than', 'value_type': 'expiration', 'value': 61} self.assertFilter(fdata, i(three_months), False) self.assertFilter(fdata, i(two_months), True) self.assertFilter(fdata, i(now), True) self.assertFilter(fdata, i(now.isoformat()), True)
def test_filter_instance_age(self): now = datetime.now(tz=tz.tzutc()) three_months = now - timedelta(90) two_months = now - timedelta(60) one_month = now - timedelta(30) def i(d): return instance(LaunchTime=d) for ii, v in [ (i(now), False), (i(three_months), True), (i(two_months), True), (i(one_month), False) ]: self.assertFilter({'type': 'instance-uptime', 'op': 'gte', 'days': 60}, ii, v)
def filter_extant_exports(client, bucket, prefix, days, start, end=None): """Filter days where the bucket already has extant export keys. """ end = end or datetime.now() # days = [start + timedelta(i) for i in range((end-start).days)] try: tag_set = client.get_object_tagging(Bucket=bucket, Key=prefix).get('TagSet', []) except ClientError as e: if e.response['Error']['Code'] != 'NoSuchKey': raise tag_set = [] tags = {t['Key']: t['Value'] for t in tag_set} if 'LastExport' not in tags: return sorted(days) last_export = parse(tags['LastExport']) if last_export.tzinfo is None: last_export = last_export.replace(tzinfo=tzutc()) return [d for d in sorted(days) if d > last_export]
def iso8601_to_rostime(iso): """Converts ISO 8601 time to ROS Time. Args: iso: ISO 8601 encoded string. Returns: std_msgs/Time. """ # Convert to datetime in UTC. t = dateutil.parser.parse(iso) if not t.utcoffset(): t = t.replace(tzinfo=tzutc()) # Convert to time from epoch in UTC. epoch = datetime.utcfromtimestamp(0) epoch = epoch.replace(tzinfo=tzutc()) dt = t - epoch # Create ROS message. time = Time() time.data.secs = int(dt.total_seconds()) time.data.nsecs = dt.microseconds * 1000 return time
def _deserialize(cls, data): kind = data.get(cls._type_field, None) if kind is None: return data value = data.get("value") if kind == "blob": return base64.b64decode(value.encode("utf-8")) elif kind == "datetime": return datetime.fromtimestamp(value, tz.tzutc()) elif kind == "model": return cls._entity_from_dict(value) else: raise ValueError(f"Invalid kind {kind!r}.")
def validateLogDate(self, lines): delta = timedelta(hours=2) for line in lines: if not line: continue elif line.startswith("# Time: "): log_time = datetime.strptime(line[8:], "%y%m%d %H:%M:%S") log_time = log_time.replace(tzinfo=tz.tzutc()).astimezone( zoneinfo.gettz(self._GENERAL_CONFIG["TIMEZONE"])) log_time = log_time.replace(tzinfo=None) print(self._now, log_time) print("diff :", self._now - log_time) if (self._now - log_time) > delta: return False else: return True return True # Initialization.
def validateLogDate(self, lines): delta = timedelta(hours=2) for line in lines: if not line: continue elif line.startswith("# Time: "): log_time = datetime.strptime(line[8:], "%y%m%d %H:%M:%S") log_time = log_time.replace(tzinfo=tz.tzutc()).astimezone(zoneinfo.gettz(self._GENERAL_CONFIG["TIMEZONE"])) log_time = log_time.replace(tzinfo=None) print(self._now, log_time) print("diff :", self._now - log_time) if (self._now - log_time) > delta: return False else: return True return True
def validate_log_date(self, line): delta = timedelta(hours=2) m = REG_GENERAL_ERR.match(line) if m: log_time = datetime.strptime(m.group(1), "%Y-%m-%d %H:%M:%S") log_time = log_time.replace(tzinfo=tz.tzutc()).astimezone(zoneinfo.gettz(self._GENERAL_CONFIG["TIMEZONE"])) log_time = log_time.replace(tzinfo=None) if (self._now - log_time) > delta: return False elif BEGIN_DEADLOCK in line: m = REG_DEADLOCK.match(line) log_time = datetime.strptime(m.group(1), "%Y-%m-%d %H:%M:%S") log_time = log_time.replace(tzinfo=tz.tzutc()).astimezone(zoneinfo.gettz(self._GENERAL_CONFIG["TIMEZONE"])) log_time = log_time.replace(tzinfo=None) if (self._now - log_time) > delta: return False return True
def test_recurrence(self): # Ensure date valued UNTILs in rrules are in a reasonable timezone, # and include that day (12/28 in this test) test_file = get_test_file("recurrence.ics") cal = base.readOne(test_file, findBegin=False) dates = list(cal.vevent.getrruleset()) self.assertEqual( dates[0], datetime.datetime(2006, 1, 26, 23, 0, tzinfo=tzutc()) ) self.assertEqual( dates[1], datetime.datetime(2006, 2, 23, 23, 0, tzinfo=tzutc()) ) self.assertEqual( dates[-1], datetime.datetime(2006, 12, 28, 23, 0, tzinfo=tzutc()) )
def test_import_anno_ok_2(wa_image): catcha = wa_image now = datetime.now(tz.tzutc()) # import first because CRUD.create changes created time in input catcha['id'] = 'naomi-xx-imported' resp = CRUD.import_annos([catcha]) x2 = Anno._default_manager.get(pk=catcha['id']) assert x2 is not None assert Anno._default_manager.count() == 1 # x2 was created more in the past? import preserves created date? delta = timedelta(hours=25) assert x2.created < (now - delta) # about to create catcha['id'] = 'naomi-xx' x1 = CRUD.create_anno(catcha) assert x1 is not None assert Anno._default_manager.count() == 2 # x1 was created less than 1m ago? delta = timedelta(minutes=1) assert (now - delta) < x1.created
def test_import_anno_ok(wa_image): catcha = wa_image catcha_reply = make_wa_object( age_in_hours=1, reply_to=catcha['id']) now = datetime.now(tz.tzutc()) resp = CRUD.import_annos([catcha, catcha_reply]) x2 = Anno._default_manager.get(pk=catcha['id']) assert x2 is not None # preserve replies? assert x2.total_replies == 1 assert x2.replies[0].anno_id == catcha_reply['id'] # import preserve created date? delta = timedelta(hours=25) assert x2.created < (now - delta)
def test_deleted_entry(self): deleted_entry = DeletedEntry({ "sys": { "space": { "sys": { "type": "Link", "linkType": "Space", "id": "cfexampleapi" } }, "id": "5c6VY0gWg0gwaIeYkUUiqG", "type": "DeletedEntry", "createdAt": "2013-09-09T16:17:12.600Z", "updatedAt": "2013-09-09T16:17:12.600Z", "deletedAt": "2013-09-09T16:17:12.600Z", "revision": 1 } }) self.assertEqual(deleted_entry.id, '5c6VY0gWg0gwaIeYkUUiqG') self.assertEqual(deleted_entry.deleted_at, datetime.datetime(2013, 9, 9, 16, 17, 12, 600000, tzinfo=tzutc())) self.assertEqual(str(deleted_entry), "<DeletedEntry id='5c6VY0gWg0gwaIeYkUUiqG'>")
def test_deleted_asset(self): deleted_asset = DeletedAsset({ "sys": { "space": { "sys": { "type": "Link", "linkType": "Space", "id": "cfexampleapi" } }, "id": "5c6VY0gWg0gwaIeYkUUiqG", "type": "DeletedAsset", "createdAt": "2013-09-09T16:17:12.600Z", "updatedAt": "2013-09-09T16:17:12.600Z", "deletedAt": "2013-09-09T16:17:12.600Z", "revision": 1 } }) self.assertEqual(deleted_asset.id, '5c6VY0gWg0gwaIeYkUUiqG') self.assertEqual(deleted_asset.deleted_at, datetime.datetime(2013, 9, 9, 16, 17, 12, 600000, tzinfo=tzutc())) self.assertEqual(str(deleted_asset), "<DeletedAsset id='5c6VY0gWg0gwaIeYkUUiqG'>")
def format_event(device, event): from_zone = tz.tzutc() to_zone = tz.tzlocal() utc = event['created_at'].replace(tzinfo=from_zone) local_time = utc.astimezone(to_zone) local_time_string = local_time.strftime('%I:%M %p ') local_time_string += ADDON.getLocalizedString(30300) local_time_string += local_time.strftime(' %A %b %d %Y') event_name = '' if event['kind'] == 'on_demand': event_name = ADDON.getLocalizedString(30301) if event['kind'] == 'motion': event_name = ADDON.getLocalizedString(30302) if event['kind'] == 'ding': event_name = ADDON.getLocalizedString(30303) return ' '.join([device.name, event_name, local_time_string])
def _wait_for_completion(self): """ Waits for a stack operation to finish. Prints CloudFormation events while it waits. :returns: The final stack status. :rtype: sceptre.stack_status.StackStatus """ status = StackStatus.IN_PROGRESS self.most_recent_event_datetime = ( datetime.now(tzutc()) - timedelta(seconds=3) ) while status == StackStatus.IN_PROGRESS: status = self._get_simplified_status(self.get_status()) self._log_new_events() time.sleep(4) return status
def combine_histories_and_messages(histories, smsMessages): cmdSuccessStyle = 'class="commandSuccess"' cmdErrorStyle = 'class="commandError"' basicStyle = '' from_zone = tz.tzutc() to_zone = tz.tzlocal() combinedList = [] for history in histories: combinedList.append({"action": history.get('state'), "timestamp": history.get('changedAt'), "style": basicStyle}) for smsMessage in smsMessages: style = cmdSuccessStyle if smsMessage.get('status').lower() == 'processed' else cmdErrorStyle createdAt = smsMessage.get('createdAt').replace(tzinfo=from_zone) combinedList.append({"action": smsMessage.get('body').lower(), "timestamp": createdAt.astimezone(to_zone), "style": style}) combinedList.sort(key=lambda c: c.get("timestamp")) return combinedList
def time_params_to_dates(args): now = datetime.now(tz=tzutc()) if args['--hours'] or args['--days']: if args['--days']: hours = int(args['--days']) * 24 else: hours = int(args['--hours']) until_date = now from_date = until_date - timedelta(hours=hours) else: if args['--from']: from_date = parse_date(args['--from']) if args['--until']: until_date = parse_date(args['--until']) else: until_date = now else: until_date = now from_date = until_date - timedelta(days=1) return from_date, until_date
def __init__(self, token): """ Container for storing Windows Live Refresh Token. Subclass of :class:`Token` WARNING: Only invoke when creating a FRESH token! Don't use to convert saved token into object Refresh Token usually has a lifetime of 14 days Args: token (str): The JWT Refresh-Token """ date_issued = datetime.now(tzutc()) date_valid = date_issued + timedelta(days=14) super(RefreshToken, self).__init__(token, date_issued, date_valid)
def test_scratchbuild(self): """ CreateCalendar 2.0 format from scratch """ test_cal = get_test_file("simple_2_0_test.ics") cal = base.newFromBehavior('vcalendar', '2.0') cal.add('vevent') cal.vevent.add('dtstart').value = datetime.datetime(2006, 5, 9) cal.vevent.add('description').value = "Test event" cal.vevent.add('created').value = \ datetime.datetime(2006, 1, 1, 10, tzinfo=dateutil.tz.tzical( "test_files/timezones.ics").get('US/Pacific')) cal.vevent.add('uid').value = "Not very random UID" cal.vevent.add('dtstamp').value = datetime.datetime(2017, 6, 26, 0, tzinfo=tzutc()) # Note we're normalizing line endings, because no one got time for that. self.assertEqual( cal.serialize().replace('\r\n', '\n'), test_cal.replace('\r\n', '\n') )
def test_recurrence(self): """ Ensure date valued UNTILs in rrules are in a reasonable timezone, and include that day (12/28 in this test) """ test_file = get_test_file("recurrence.ics") cal = base.readOne(test_file) dates = list(cal.vevent.getrruleset()) self.assertEqual( dates[0], datetime.datetime(2006, 1, 26, 23, 0, tzinfo=tzutc()) ) self.assertEqual( dates[1], datetime.datetime(2006, 2, 23, 23, 0, tzinfo=tzutc()) ) self.assertEqual( dates[-1], datetime.datetime(2006, 12, 28, 23, 0, tzinfo=tzutc()) )
def cleanup_instances(self): """Cleanup Instances. Delete instances if they are in an error state or are over the age defined in INSTANCE_AGE_LIMIT. Instances that don't match PROTECTED_PREFIX are cleaned up """ current_time = datetime.datetime.now(tzutc()) max_age = datetime.timedelta(hours=self.age_limit) for server in self.unprotected_servers: created_time = dateutil.parser.parse(server.created_at) age = current_time - created_time errored = server.status == "ERROR" if errored or age > max_age: _indp("Deleting {name} Errored: {error} Age: {age}".format( name=server.name, error=errored, age=age)) self.conn.compute.delete_server(server.id)