Python dateutil.tz 模块,tzutc() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用dateutil.tz.tzutc()

项目:backend.ai-client-py    作者:lablup    | 项目源码 | 文件源码
def test_generate_signature(defconfig):
    kwargs = dict(
        method='GET',
        version=defconfig.version,
        endpoint=defconfig.endpoint,
        date=datetime.now(tzutc()),
        request_path='/path/to/api/',
        content=b'"test data"',
        content_type='application/json',
        access_key=defconfig.access_key,
        secret_key=defconfig.secret_key,
        hash_type='md5'
    )
    headers, signature = generate_signature(**kwargs)

    assert kwargs['hash_type'].upper() in headers['Authorization']
    assert kwargs['access_key'] in headers['Authorization']
    assert signature in headers['Authorization']
项目:Dshield    作者:ywjt    | 项目源码 | 文件源码
def testTzAll(self):
        from dateutil.tz import tzutc
        from dateutil.tz import tzoffset
        from dateutil.tz import tzlocal
        from dateutil.tz import tzfile
        from dateutil.tz import tzrange
        from dateutil.tz import tzstr
        from dateutil.tz import tzical
        from dateutil.tz import gettz
        from dateutil.tz import tzwin
        from dateutil.tz import tzwinlocal

        tz_all = ["tzutc", "tzoffset", "tzlocal", "tzfile", "tzrange",
                  "tzstr", "tzical", "gettz"]

        tz_all += ["tzwin", "tzwinlocal"] if sys.platform.startswith("win") else []
        lvars = locals()

        for var in tz_all:
            self.assertIsNot(lvars[var], None)
项目:Dshield    作者:ywjt    | 项目源码 | 文件源码
def testFoldPositiveUTCOffset(self):
        # Test that we can resolve ambiguous times
        tzname = self._get_tzname('Australia/Sydney')

        with self._gettz_context(tzname):
            SYD0 = self.gettz(tzname)
            SYD1 = self.gettz(tzname)

            t0_u = datetime(2012, 3, 31, 15, 30, tzinfo=tz.tzutc())  # AEST
            t1_u = datetime(2012, 3, 31, 16, 30, tzinfo=tz.tzutc())  # AEDT

            # Using fresh tzfiles
            t0_syd0 = t0_u.astimezone(SYD0)
            t1_syd1 = t1_u.astimezone(SYD1)

            self.assertEqual(t0_syd0.replace(tzinfo=None),
                             datetime(2012, 4, 1, 2, 30))

            self.assertEqual(t1_syd1.replace(tzinfo=None),
                             datetime(2012, 4, 1, 2, 30))

            self.assertEqual(t0_syd0.utcoffset(), timedelta(hours=11))
            self.assertEqual(t1_syd1.utcoffset(), timedelta(hours=10))
项目:Dshield    作者:ywjt    | 项目源码 | 文件源码
def testGapPositiveUTCOffset(self):
        # Test that we don't have a problem around gaps.
        tzname = self._get_tzname('Australia/Sydney')

        with self._gettz_context(tzname):
            SYD0 = self.gettz(tzname)
            SYD1 = self.gettz(tzname)

            t0_u = datetime(2012, 10, 6, 15, 30, tzinfo=tz.tzutc())  # AEST
            t1_u = datetime(2012, 10, 6, 16, 30, tzinfo=tz.tzutc())  # AEDT

            # Using fresh tzfiles
            t0 = t0_u.astimezone(SYD0)
            t1 = t1_u.astimezone(SYD1)

            self.assertEqual(t0.replace(tzinfo=None),
                             datetime(2012, 10, 7, 1, 30))

            self.assertEqual(t1.replace(tzinfo=None),
                             datetime(2012, 10, 7, 3, 30))

            self.assertEqual(t0.utcoffset(), timedelta(hours=10))
            self.assertEqual(t1.utcoffset(), timedelta(hours=11))
项目:Dshield    作者:ywjt    | 项目源码 | 文件源码
def testFoldNegativeUTCOffset(self):
            # Test that we can resolve ambiguous times
            tzname = self._get_tzname('America/Toronto')

            with self._gettz_context(tzname):
                # Calling fromutc() alters the tzfile object
                TOR0 = self.gettz(tzname)
                TOR1 = self.gettz(tzname)

                t0_u = datetime(2011, 11, 6, 5, 30, tzinfo=tz.tzutc())
                t1_u = datetime(2011, 11, 6, 6, 30, tzinfo=tz.tzutc())

                # Using fresh tzfiles
                t0_tor0 = t0_u.astimezone(TOR0)
                t1_tor1 = t1_u.astimezone(TOR1)

                self.assertEqual(t0_tor0.replace(tzinfo=None),
                                 datetime(2011, 11, 6, 1, 30))

                self.assertEqual(t1_tor1.replace(tzinfo=None),
                                 datetime(2011, 11, 6, 1, 30))

                self.assertEqual(t0_tor0.utcoffset(), timedelta(hours=-4.0))
                self.assertEqual(t1_tor1.utcoffset(), timedelta(hours=-5.0))
项目:Dshield    作者:ywjt    | 项目源码 | 文件源码
def testGapNegativeUTCOffset(self):
        # Test that we don't have a problem around gaps.
        tzname = self._get_tzname('America/Toronto')

        with self._gettz_context(tzname):
            # Calling fromutc() alters the tzfile object
            TOR0 = self.gettz(tzname)
            TOR1 = self.gettz(tzname)

            t0_u = datetime(2011, 3, 13, 6, 30, tzinfo=tz.tzutc())
            t1_u = datetime(2011, 3, 13, 7, 30, tzinfo=tz.tzutc())

            # Using fresh tzfiles
            t0 = t0_u.astimezone(TOR0)
            t1 = t1_u.astimezone(TOR1)

            self.assertEqual(t0.replace(tzinfo=None),
                             datetime(2011, 3, 13, 1, 30))

            self.assertEqual(t1.replace(tzinfo=None),
                             datetime(2011, 3, 13, 3, 30))

            self.assertNotEqual(t0, t1)
            self.assertEqual(t0.utcoffset(), timedelta(hours=-5.0))
            self.assertEqual(t1.utcoffset(), timedelta(hours=-4.0))
项目:Dshield    作者:ywjt    | 项目源码 | 文件源码
def testEqualAmbiguousComparison(self):
        tzname = self._get_tzname('Australia/Sydney')

        with self._gettz_context(tzname):
            SYD0 = self.gettz(tzname)
            SYD1 = self.gettz(tzname)

            t0_u = datetime(2012, 3, 31, 14, 30, tzinfo=tz.tzutc())  # AEST
            t1_u = datetime(2012, 3, 31, 16, 30, tzinfo=tz.tzutc())  # AEDT

            t0_syd0 = t0_u.astimezone(SYD0)
            t0_syd1 = t0_u.astimezone(SYD1)

            # This is considered an "inter-zone comparison" because it's an
            # ambiguous datetime.
            self.assertEqual(t0_syd0, t0_syd1)
项目:Dshield    作者:ywjt    | 项目源码 | 文件源码
def get_utc_transitions(self, tzi, year, gap):
        dston, dstoff = tzi.transitions(year)
        if gap:
            t_n = dston - timedelta(minutes=30)

            t0_u = t_n.replace(tzinfo=tzi).astimezone(tz.tzutc())
            t1_u = t0_u + timedelta(hours=1)
        else:
            # Get 1 hour before the first ambiguous date
            t_n = dstoff - timedelta(minutes=30)

            t0_u = t_n.replace(tzinfo=tzi).astimezone(tz.tzutc())
            t_n += timedelta(hours=1)                   # Naive ambiguous date
            t0_u = t0_u + timedelta(hours=1)            # First ambiguous date
            t1_u = t0_u + timedelta(hours=1)            # Second ambiguous date

        return t_n, t0_u, t1_u
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def datetime2timestamp(dt, default_timezone=None):
    """Calculate the timestamp based on the given datetime instance.

    :type dt: datetime
    :param dt: A datetime object to be converted into timestamp
    :type default_timezone: tzinfo
    :param default_timezone: If it is provided as None, we treat it as tzutc().
                             But it is only used when dt is a naive datetime.
    :returns: The timestamp
    """
    epoch = datetime.datetime(1970, 1, 1)
    if dt.tzinfo is None:
        if default_timezone is None:
            default_timezone = tzutc()
        dt = dt.replace(tzinfo=default_timezone)
    d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
    if hasattr(d, "total_seconds"):
        return d.total_seconds()  # Works in Python 2.7+
    return (d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6) / 10**6
项目:agile-analytics    作者:cmheisel    | 项目源码 | 文件源码
def make_analyzed_tickets(AnalyzedAgileTicket, datetime, tzutc):
    """Make ticket from a list of dicts with key data."""
    from dateutil.parser import parse
    default = datetime(1979, 8, 15, 0, 0, 0, tzinfo=tzutc)

    def _make_analyzed_tickets(ticket_datas):
        tickets = []
        for data in ticket_datas:
            t = AnalyzedAgileTicket(
                key=data['key'],
                committed=dict(state="Committed", entered_at=parse(data['committed'], default=default)),
                started=dict(state="Started", entered_at=parse(data['started'], default=default)),
                ended=dict(state="Ended", entered_at=parse(data['ended'], default=default))
            )
            tickets.append(t)
        return tickets
    return _make_analyzed_tickets
项目:agile-analytics    作者:cmheisel    | 项目源码 | 文件源码
def weeks_of_tickets(datetime, tzutc, AnalyzedAgileTicket):
    """A bunch of tickets."""
    from dateutil.parser import parse
    parsed = []
    default = datetime(1979, 8, 15, 0, 0, 0, tzinfo=tzutc)

    current_path = path.dirname(path.abspath(__file__))
    csv_file = path.join(current_path, 'data', 'weeks_of_tickets.csv')

    count = 1
    for row in csv.DictReader(open(csv_file, 'r')):
        t = AnalyzedAgileTicket(
            key="FOO-{}".format(count),
            committed=dict(state="committed", entered_at=parse(row['committed'], default=default)),
            started=dict(state="started", entered_at=parse(row['started'], default=default)),
            ended=dict(state="ended", entered_at=parse(row['ended'], default=default))
        )
        parsed.append(t)
        count += 1

    return parsed
项目:trading_package    作者:abrahamchaibi    | 项目源码 | 文件源码
def __init__(self, product_id: str, sequence_id: int, order_side: OrderSide, size: str, price: str,
                 status: OrderStatus = OrderStatus.open, order_id: str = None, order_type: OrderType = OrderType.limit,
                 created_at: Optional[datetime] = None, historical: bool = False, confirmed: bool = False):
        self.product_id = product_id
        self.order_side = order_side
        self.order_type = order_type
        self.status = status
        self.sequence_id = int(sequence_id)
        self.size = size
        self.filled_size = '0'
        self.price = str(price)
        self.order_id = order_id
        if created_at is None:
            self.created_at = datetime.now(tz.tzutc())
        else:
            # orders cannot be created in the future please
            self.created_at = min(datetime.now(tz.tzutc()), created_at)
        self.historical = historical
        self.confirmed = confirmed
        if float(self.size) < 0:
            raise OrderException('Order size must be positive {}'.format(self.size))
项目:AshsSDK    作者:thehappydinoa    | 项目源码 | 文件源码
def datetime2timestamp(dt, default_timezone=None):
    """Calculate the timestamp based on the given datetime instance.

    :type dt: datetime
    :param dt: A datetime object to be converted into timestamp
    :type default_timezone: tzinfo
    :param default_timezone: If it is provided as None, we treat it as tzutc().
                             But it is only used when dt is a naive datetime.
    :returns: The timestamp
    """
    epoch = datetime.datetime(1970, 1, 1)
    if dt.tzinfo is None:
        if default_timezone is None:
            default_timezone = tzutc()
        dt = dt.replace(tzinfo=default_timezone)
    d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
    if hasattr(d, "total_seconds"):
        return d.total_seconds()  # Works in Python 2.7+
    return (d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6) / 10**6
项目:Orator-Google-App-Engine    作者:MakarenaLabs    | 项目源码 | 文件源码
def _build_datetime(parts):

        timestamp = parts.get('timestamp')

        if timestamp:
            tz_utc = tz.tzutc()
            return datetime.fromtimestamp(timestamp, tz=tz_utc)

        am_pm = parts.get('am_pm')
        hour = parts.get('hour', 0)

        if am_pm == 'pm' and hour < 12:
            hour += 12
        elif am_pm == 'am' and hour == 12:
            hour = 0

        return datetime(year=parts.get('year', 1), month=parts.get('month', 1),
            day=parts.get('day', 1), hour=hour, minute=parts.get('minute', 0),
            second=parts.get('second', 0), microsecond=parts.get('microsecond', 0),
            tzinfo=parts.get('tzinfo'))
项目:iCalVerter    作者:amsysuk    | 项目源码 | 文件源码
def __init__(self, dt):
        if not isinstance(dt, (datetime, date, timedelta, time)):
            raise ValueError('You must use datetime, date, timedelta or time')
        if isinstance(dt, datetime):
            self.params = Parameters({'value': 'DATE-TIME'})
        elif isinstance(dt, date):
            self.params = Parameters({'value': 'DATE'})
        elif isinstance(dt, time):
            self.params = Parameters({'value': 'TIME'})

        if (isinstance(dt, datetime) or isinstance(dt, time))\
                and getattr(dt, 'tzinfo', False):
            tzinfo = dt.tzinfo
            if tzinfo is not pytz.utc and\
               (tzutc is None or not isinstance(tzinfo, tzutc)):
                # set the timezone as a parameter to the property
                tzid = tzid_from_dt(dt)
                if tzid:
                    self.params.update({'TZID': tzid})
        self.dt = dt
项目:aws-ec2rescue-linux    作者:awslabs    | 项目源码 | 文件源码
def datetime2timestamp(dt, default_timezone=None):
    """Calculate the timestamp based on the given datetime instance.

    :type dt: datetime
    :param dt: A datetime object to be converted into timestamp
    :type default_timezone: tzinfo
    :param default_timezone: If it is provided as None, we treat it as tzutc().
                             But it is only used when dt is a naive datetime.
    :returns: The timestamp
    """
    epoch = datetime.datetime(1970, 1, 1)
    if dt.tzinfo is None:
        if default_timezone is None:
            default_timezone = tzutc()
        dt = dt.replace(tzinfo=default_timezone)
    d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
    if hasattr(d, "total_seconds"):
        return d.total_seconds()  # Works in Python 2.7+
    return (d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6) / 10**6
项目:EasyCrawler    作者:playwolf719    | 项目源码 | 文件源码
def my_local_time():
    # METHOD 1: Hardcode zones:
    # from_zone = tz.gettz('UTC')
    # to_zone = tz.gettz('America/New_York')

    # METHOD 2: Auto-detect zones:
    from_zone = tz.tzutc()
    to_zone = tz.tzlocal()

    utc = datetime.utcnow()
    # utc = datetime.strptime('2011-01-21 02:37:21', '%Y-%m-%d %H:%M:%S')

    # Tell the datetime object that it's in UTC time zone since 
    # datetime objects are 'naive' by default
    utc = utc.replace(tzinfo=from_zone)

    # Convert time zone
    return utc.astimezone(to_zone)
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_index_convert_to_datetime_array_dateutil(self):
        tm._skip_if_no_dateutil()
        import dateutil

        def _check_rng(rng):
            converted = rng.to_pydatetime()
            tm.assertIsInstance(converted, np.ndarray)
            for x, stamp in zip(converted, rng):
                tm.assertIsInstance(x, datetime)
                self.assertEqual(x, stamp.to_pydatetime())
                self.assertEqual(x.tzinfo, stamp.tzinfo)

        rng = date_range('20090415', '20090519')
        rng_eastern = date_range('20090415', '20090519',
                                 tz='dateutil/US/Eastern')
        rng_utc = date_range('20090415', '20090519', tz=dateutil.tz.tzutc())

        _check_rng(rng)
        _check_rng(rng_eastern)
        _check_rng(rng_utc)
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_period_resample_with_local_timezone_dateutil(self):
        # GH5430
        tm._skip_if_no_dateutil()
        import dateutil

        local_timezone = 'dateutil/America/Los_Angeles'

        start = datetime(year=2013, month=11, day=1, hour=0, minute=0,
                         tzinfo=dateutil.tz.tzutc())
        # 1 day later
        end = datetime(year=2013, month=11, day=2, hour=0, minute=0,
                       tzinfo=dateutil.tz.tzutc())

        index = pd.date_range(start, end, freq='H')

        series = pd.Series(1, index=index)
        series = series.tz_convert(local_timezone)
        result = series.resample('D', kind='period').mean()

        # Create the expected series
        # Index is moved back a day with the timezone conversion from UTC to
        # Pacific
        expected_index = (pd.period_range(start=start, end=end, freq='D') - 1)
        expected = pd.Series(1, index=expected_index)
        assert_series_equal(result, expected)
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_class_ops_dateutil(self):
        tm._skip_if_no_dateutil()
        from dateutil.tz import tzutc

        def compare(x, y):
            self.assertEqual(int(np.round(Timestamp(x).value / 1e9)),
                             int(np.round(Timestamp(y).value / 1e9)))

        compare(Timestamp.now(), datetime.now())
        compare(Timestamp.now('UTC'), datetime.now(tzutc()))
        compare(Timestamp.utcnow(), datetime.utcnow())
        compare(Timestamp.today(), datetime.today())
        current_time = calendar.timegm(datetime.now().utctimetuple())
        compare(Timestamp.utcfromtimestamp(current_time),
                datetime.utcfromtimestamp(current_time))
        compare(Timestamp.fromtimestamp(current_time),
                datetime.fromtimestamp(current_time))

        date_component = datetime.utcnow()
        time_component = (date_component + timedelta(minutes=10)).time()
        compare(Timestamp.combine(date_component, time_component),
                datetime.combine(date_component, time_component))
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_cant_compare_tz_naive_w_aware_dateutil(self):
        tm._skip_if_no_dateutil()
        from dateutil.tz import tzutc
        utc = tzutc()
        # #1404
        a = Timestamp('3/12/2012')
        b = Timestamp('3/12/2012', tz=utc)

        self.assertRaises(Exception, a.__eq__, b)
        self.assertRaises(Exception, a.__ne__, b)
        self.assertRaises(Exception, a.__lt__, b)
        self.assertRaises(Exception, a.__gt__, b)
        self.assertRaises(Exception, b.__eq__, a)
        self.assertRaises(Exception, b.__ne__, a)
        self.assertRaises(Exception, b.__lt__, a)
        self.assertRaises(Exception, b.__gt__, a)

        if sys.version_info < (3, 3):
            self.assertRaises(Exception, a.__eq__, b.to_pydatetime())
            self.assertRaises(Exception, a.to_pydatetime().__eq__, b)
        else:
            self.assertFalse(a == b.to_pydatetime())
            self.assertFalse(a.to_pydatetime() == b)
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def process(self, asgs):
        msg_tmpl = self.data.get('message', self.default_template)
        key = self.data.get('key', self.data.get('tag', DEFAULT_TAG))
        op = self.data.get('op', 'suspend')
        date = self.data.get('days', 4)

        n = datetime.now(tz=tzutc())
        stop_date = n + timedelta(days=date)
        try:
            msg = msg_tmpl.format(
                op=op, action_date=stop_date.strftime('%Y/%m/%d'))
        except Exception:
            self.log.warning("invalid template %s" % msg_tmpl)
            msg = self.default_template.format(
                op=op, action_date=stop_date.strftime('%Y/%m/%d'))

        self.log.info("Tagging %d asgs for %s on %s" % (
            len(asgs), op, stop_date.strftime('%Y/%m/%d')))
        self.tag(asgs, key, msg)
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def fetch_credential_report(self):
        client = local_session(self.manager.session_factory).client('iam')
        try:
            report = client.get_credential_report()
        except ClientError as e:
            if e.response['Error']['Code'] != 'ReportNotPresent':
                raise
            report = None
        if report:
            threshold = datetime.datetime.now(tz=tzutc()) - timedelta(
                seconds=self.get_value_or_schema_default(
                    'report_max_age'))
            if not report['GeneratedTime'].tzinfo:
                threshold = threshold.replace(tzinfo=None)
            if report['GeneratedTime'] < threshold:
                report = None
        if report is None:
            if not self.get_value_or_schema_default('report_generate'):
                raise ValueError("Credential Report Not Present")
            client.generate_credential_report()
            time.sleep(self.get_value_or_schema_default('report_delay'))
            report = client.get_credential_report()
        return report['Content']
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def test_age(self):
        now = datetime.now(tz=tz.tzutc())
        three_months = now - timedelta(90)
        two_months = now - timedelta(60)
        one_month = now - timedelta(30)

        def i(d):
            return instance(LaunchTime=d)

        fdata = {
            'type': 'value',
            'key': 'LaunchTime',
            'op': 'less-than',
            'value_type': 'age',
            'value': 32}

        self.assertFilter(fdata, i(three_months), False)
        self.assertFilter(fdata, i(two_months), False)
        self.assertFilter(fdata, i(one_month), True)
        self.assertFilter(fdata, i(now), True)
        self.assertFilter(fdata, i(now.isoformat()), True)
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def test_expiration(self):

        now = datetime.now(tz=tz.tzutc())
        three_months = now + timedelta(90)
        two_months = now + timedelta(60)

        def i(d):
            return instance(LaunchTime=d)

        fdata = {
            'type': 'value',
            'key': 'LaunchTime',
            'op': 'less-than',
            'value_type': 'expiration',
            'value': 61}

        self.assertFilter(fdata, i(three_months), False)
        self.assertFilter(fdata, i(two_months), True)
        self.assertFilter(fdata, i(now), True)
        self.assertFilter(fdata, i(now.isoformat()), True)
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def test_filter_instance_age(self):
        now = datetime.now(tz=tz.tzutc())
        three_months = now - timedelta(90)
        two_months = now - timedelta(60)
        one_month = now - timedelta(30)

        def i(d):
            return instance(LaunchTime=d)

        for ii, v in [
                (i(now), False),
                (i(three_months), True),
                (i(two_months), True),
                (i(one_month), False)
        ]:
            self.assertFilter({'type': 'instance-uptime', 'op': 'gte', 'days': 60}, ii, v)
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def filter_extant_exports(client, bucket, prefix, days, start, end=None):
    """Filter days where the bucket already has extant export keys.
    """
    end = end or datetime.now()
    # days = [start + timedelta(i) for i in range((end-start).days)]
    try:
        tag_set = client.get_object_tagging(Bucket=bucket, Key=prefix).get('TagSet', [])
    except ClientError as e:
        if e.response['Error']['Code'] != 'NoSuchKey':
            raise
        tag_set = []
    tags = {t['Key']: t['Value'] for t in tag_set}

    if 'LastExport' not in tags:
        return sorted(days)
    last_export = parse(tags['LastExport'])
    if last_export.tzinfo is None:
        last_export = last_export.replace(tzinfo=tzutc())
    return [d for d in sorted(days) if d > last_export]
项目:ros-interop    作者:mcgill-robotics    | 项目源码 | 文件源码
def iso8601_to_rostime(iso):
    """Converts ISO 8601 time to ROS Time.

    Args:
        iso: ISO 8601 encoded string.

    Returns:
        std_msgs/Time.
    """
    # Convert to datetime in UTC.
    t = dateutil.parser.parse(iso)
    if not t.utcoffset():
        t = t.replace(tzinfo=tzutc())

    # Convert to time from epoch in UTC.
    epoch = datetime.utcfromtimestamp(0)
    epoch = epoch.replace(tzinfo=tzutc())
    dt = t - epoch

    # Create ROS message.
    time = Time()
    time.data.secs = int(dt.total_seconds())
    time.data.nsecs = dt.microseconds * 1000

    return time
项目:anom-py    作者:Bogdanp    | 项目源码 | 文件源码
def _deserialize(cls, data):
        kind = data.get(cls._type_field, None)
        if kind is None:
            return data

        value = data.get("value")
        if kind == "blob":
            return base64.b64decode(value.encode("utf-8"))

        elif kind == "datetime":
            return datetime.fromtimestamp(value, tz.tzutc())

        elif kind == "model":
            return cls._entity_from_dict(value)

        else:
            raise ValueError(f"Invalid kind {kind!r}.")
项目:rdsmant    作者:JangYoungWhan    | 项目源码 | 文件源码
def validateLogDate(self, lines):
        delta = timedelta(hours=2)

        for line in lines:
            if not line:
                continue
            elif line.startswith("# Time: "):
                log_time = datetime.strptime(line[8:], "%y%m%d %H:%M:%S")
                log_time = log_time.replace(tzinfo=tz.tzutc()).astimezone(
                    zoneinfo.gettz(self._GENERAL_CONFIG["TIMEZONE"]))
                log_time = log_time.replace(tzinfo=None)
                print(self._now, log_time)
                print("diff :", self._now - log_time)
                if (self._now - log_time) > delta:
                    return False
                else:
                    return True

        return True

    # Initialization.
项目:rdsmant    作者:JangYoungWhan    | 项目源码 | 文件源码
def validateLogDate(self, lines):
    delta = timedelta(hours=2)

    for line in lines:
      if not line:
        continue
      elif line.startswith("# Time: "):
        log_time = datetime.strptime(line[8:], "%y%m%d %H:%M:%S")
        log_time = log_time.replace(tzinfo=tz.tzutc()).astimezone(zoneinfo.gettz(self._GENERAL_CONFIG["TIMEZONE"]))
        log_time = log_time.replace(tzinfo=None)
        print(self._now, log_time)
        print("diff :", self._now - log_time)
        if (self._now - log_time) > delta:
          return False
        else:
          return True

    return True
项目:rdsmant    作者:JangYoungWhan    | 项目源码 | 文件源码
def validate_log_date(self, line):
    delta = timedelta(hours=2)

    m = REG_GENERAL_ERR.match(line)
    if m:
      log_time = datetime.strptime(m.group(1), "%Y-%m-%d %H:%M:%S")
      log_time = log_time.replace(tzinfo=tz.tzutc()).astimezone(zoneinfo.gettz(self._GENERAL_CONFIG["TIMEZONE"]))
      log_time = log_time.replace(tzinfo=None)
      if (self._now - log_time) > delta:
        return False
    elif BEGIN_DEADLOCK in line:
      m = REG_DEADLOCK.match(line)
      log_time = datetime.strptime(m.group(1), "%Y-%m-%d %H:%M:%S")
      log_time = log_time.replace(tzinfo=tz.tzutc()).astimezone(zoneinfo.gettz(self._GENERAL_CONFIG["TIMEZONE"]))
      log_time = log_time.replace(tzinfo=None)
      if (self._now - log_time) > delta:
        return False

    return True
项目:python-card-me    作者:tBaxter    | 项目源码 | 文件源码
def test_recurrence(self):
        # Ensure date valued UNTILs in rrules are in a reasonable timezone,
        # and include that day (12/28 in this test)
        test_file = get_test_file("recurrence.ics")
        cal = base.readOne(test_file, findBegin=False)
        dates = list(cal.vevent.getrruleset())
        self.assertEqual(
            dates[0],
            datetime.datetime(2006, 1, 26, 23, 0, tzinfo=tzutc())
        )
        self.assertEqual(
            dates[1],
            datetime.datetime(2006, 2, 23, 23, 0, tzinfo=tzutc())
        )
        self.assertEqual(
            dates[-1],
            datetime.datetime(2006, 12, 28, 23, 0, tzinfo=tzutc())
        )
项目:kscore    作者:liuyichen    | 项目源码 | 文件源码
def datetime2timestamp(dt, default_timezone=None):
    """Calculate the timestamp based on the given datetime instance.

    :type dt: datetime
    :param dt: A datetime object to be converted into timestamp
    :type default_timezone: tzinfo
    :param default_timezone: If it is provided as None, we treat it as tzutc().
                             But it is only used when dt is a naive datetime.
    :returns: The timestamp
    """
    epoch = datetime.datetime(1970, 1, 1)
    if dt.tzinfo is None:
        if default_timezone is None:
            default_timezone = tzutc()
        dt = dt.replace(tzinfo=default_timezone)
    d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
    if hasattr(d, "total_seconds"):
        return d.total_seconds()  # Works in Python 2.7+
    return (d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6) / 10**6
项目:catchpy    作者:nmaekawa    | 项目源码 | 文件源码
def test_import_anno_ok_2(wa_image):
    catcha = wa_image

    now = datetime.now(tz.tzutc())

    # import first because CRUD.create changes created time in input
    catcha['id'] = 'naomi-xx-imported'
    resp = CRUD.import_annos([catcha])
    x2 = Anno._default_manager.get(pk=catcha['id'])
    assert x2 is not None
    assert Anno._default_manager.count() == 1

    # x2 was created more in the past? import preserves created date?
    delta = timedelta(hours=25)
    assert x2.created < (now - delta)

    # about to create
    catcha['id'] = 'naomi-xx'
    x1 = CRUD.create_anno(catcha)
    assert x1 is not None
    assert Anno._default_manager.count() == 2

    # x1 was created less than 1m ago?
    delta = timedelta(minutes=1)
    assert (now - delta) < x1.created
项目:catchpy    作者:nmaekawa    | 项目源码 | 文件源码
def test_import_anno_ok(wa_image):
    catcha = wa_image
    catcha_reply = make_wa_object(
        age_in_hours=1, reply_to=catcha['id'])

    now = datetime.now(tz.tzutc())

    resp = CRUD.import_annos([catcha, catcha_reply])
    x2 = Anno._default_manager.get(pk=catcha['id'])
    assert x2 is not None

    # preserve replies?
    assert x2.total_replies == 1
    assert x2.replies[0].anno_id == catcha_reply['id']

    # import preserve created date?
    delta = timedelta(hours=25)
    assert x2.created < (now - delta)
项目:contentful.py    作者:contentful    | 项目源码 | 文件源码
def test_deleted_entry(self):
        deleted_entry = DeletedEntry({
            "sys": {
                "space": {
                    "sys": {
                        "type": "Link",
                        "linkType": "Space",
                        "id": "cfexampleapi"
                        }
                    },
                "id": "5c6VY0gWg0gwaIeYkUUiqG",
                "type": "DeletedEntry",
                "createdAt": "2013-09-09T16:17:12.600Z",
                "updatedAt": "2013-09-09T16:17:12.600Z",
                "deletedAt": "2013-09-09T16:17:12.600Z",
                "revision": 1
            }
        })

        self.assertEqual(deleted_entry.id, '5c6VY0gWg0gwaIeYkUUiqG')
        self.assertEqual(deleted_entry.deleted_at, datetime.datetime(2013, 9, 9, 16, 17, 12, 600000, tzinfo=tzutc()))
        self.assertEqual(str(deleted_entry), "<DeletedEntry id='5c6VY0gWg0gwaIeYkUUiqG'>")
项目:contentful.py    作者:contentful    | 项目源码 | 文件源码
def test_deleted_asset(self):
        deleted_asset = DeletedAsset({
            "sys": {
                "space": {
                    "sys": {
                        "type": "Link",
                        "linkType": "Space",
                        "id": "cfexampleapi"
                        }
                    },
                "id": "5c6VY0gWg0gwaIeYkUUiqG",
                "type": "DeletedAsset",
                "createdAt": "2013-09-09T16:17:12.600Z",
                "updatedAt": "2013-09-09T16:17:12.600Z",
                "deletedAt": "2013-09-09T16:17:12.600Z",
                "revision": 1
            }
        })

        self.assertEqual(deleted_asset.id, '5c6VY0gWg0gwaIeYkUUiqG')
        self.assertEqual(deleted_asset.deleted_at, datetime.datetime(2013, 9, 9, 16, 17, 12, 600000, tzinfo=tzutc()))
        self.assertEqual(str(deleted_asset), "<DeletedAsset id='5c6VY0gWg0gwaIeYkUUiqG'>")
项目:jepsen-training-vpc    作者:bloomberg    | 项目源码 | 文件源码
def datetime2timestamp(dt, default_timezone=None):
    """Calculate the timestamp based on the given datetime instance.

    :type dt: datetime
    :param dt: A datetime object to be converted into timestamp
    :type default_timezone: tzinfo
    :param default_timezone: If it is provided as None, we treat it as tzutc().
                             But it is only used when dt is a naive datetime.
    :returns: The timestamp
    """
    epoch = datetime.datetime(1970, 1, 1)
    if dt.tzinfo is None:
        if default_timezone is None:
            default_timezone = tzutc()
        dt = dt.replace(tzinfo=default_timezone)
    d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
    if hasattr(d, "total_seconds"):
        return d.total_seconds()  # Works in Python 2.7+
    return (d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6) / 10**6
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def _build_datetime(parts):

        timestamp = parts.get('timestamp')

        if timestamp:
            tz_utc = tz.tzutc()
            return datetime.fromtimestamp(timestamp, tz=tz_utc)

        am_pm = parts.get('am_pm')
        hour = parts.get('hour', 0)

        if am_pm == 'pm' and hour < 12:
            hour += 12
        elif am_pm == 'am' and hour == 12:
            hour = 0

        return datetime(year=parts.get('year', 1), month=parts.get('month', 1),
            day=parts.get('day', 1), hour=hour, minute=parts.get('minute', 0),
            second=parts.get('second', 0), microsecond=parts.get('microsecond', 0),
            tzinfo=parts.get('tzinfo'))
项目:repository.jlippold    作者:jlippold    | 项目源码 | 文件源码
def format_event(device, event):
    from_zone = tz.tzutc()
    to_zone = tz.tzlocal()
    utc = event['created_at'].replace(tzinfo=from_zone)
    local_time = utc.astimezone(to_zone)

    local_time_string = local_time.strftime('%I:%M %p ') 
    local_time_string += ADDON.getLocalizedString(30300) 
    local_time_string += local_time.strftime(' %A %b %d %Y')

    event_name = ''
    if event['kind'] == 'on_demand':
        event_name = ADDON.getLocalizedString(30301)
    if event['kind'] == 'motion':
        event_name = ADDON.getLocalizedString(30302)
    if event['kind'] == 'ding':
        event_name = ADDON.getLocalizedString(30303)

    return ' '.join([device.name, event_name, local_time_string])
项目:sceptre    作者:cloudreach    | 项目源码 | 文件源码
def _wait_for_completion(self):
        """
        Waits for a stack operation to finish. Prints CloudFormation events
        while it waits.

        :returns: The final stack status.
        :rtype: sceptre.stack_status.StackStatus
        """
        status = StackStatus.IN_PROGRESS

        self.most_recent_event_datetime = (
            datetime.now(tzutc()) - timedelta(seconds=3)
        )
        while status == StackStatus.IN_PROGRESS:
            status = self._get_simplified_status(self.get_status())
            self._log_new_events()
            time.sleep(4)
        return status
项目:AWS-AutoTag    作者:cpollard0    | 项目源码 | 文件源码
def datetime2timestamp(dt, default_timezone=None):
    """Calculate the timestamp based on the given datetime instance.

    :type dt: datetime
    :param dt: A datetime object to be converted into timestamp
    :type default_timezone: tzinfo
    :param default_timezone: If it is provided as None, we treat it as tzutc().
                             But it is only used when dt is a naive datetime.
    :returns: The timestamp
    """
    epoch = datetime.datetime(1970, 1, 1)
    if dt.tzinfo is None:
        if default_timezone is None:
            default_timezone = tzutc()
        dt = dt.replace(tzinfo=default_timezone)
    d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
    if hasattr(d, "total_seconds"):
        return d.total_seconds()  # Works in Python 2.7+
    return (d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6) / 10**6
项目:garage-door    作者:denniskline    | 项目源码 | 文件源码
def combine_histories_and_messages(histories, smsMessages):
    cmdSuccessStyle = 'class="commandSuccess"'
    cmdErrorStyle = 'class="commandError"'
    basicStyle = ''

    from_zone = tz.tzutc()
    to_zone = tz.tzlocal()

    combinedList = []
    for history in histories:
        combinedList.append({"action": history.get('state'), "timestamp": history.get('changedAt'), "style": basicStyle})
    for smsMessage in smsMessages:
        style = cmdSuccessStyle if smsMessage.get('status').lower() == 'processed' else cmdErrorStyle
        createdAt = smsMessage.get('createdAt').replace(tzinfo=from_zone)
        combinedList.append({"action": smsMessage.get('body').lower(), "timestamp": createdAt.astimezone(to_zone), "style": style})

    combinedList.sort(key=lambda c: c.get("timestamp"))
    return combinedList
项目:hadroid    作者:hadroid    | 项目源码 | 文件源码
def time_params_to_dates(args):
    now = datetime.now(tz=tzutc())
    if args['--hours'] or args['--days']:
        if args['--days']:
            hours = int(args['--days']) * 24
        else:
            hours = int(args['--hours'])
        until_date = now
        from_date = until_date - timedelta(hours=hours)
    else:
        if args['--from']:
            from_date = parse_date(args['--from'])
            if args['--until']:
                until_date = parse_date(args['--until'])
            else:
                until_date = now
        else:
            until_date = now
            from_date = until_date - timedelta(days=1)
    return from_date, until_date
项目:py-xbox-webapi    作者:tuxuser    | 项目源码 | 文件源码
def __init__(self, token):
        """
        Container for storing Windows Live Refresh Token.

        Subclass of :class:`Token`

        WARNING: Only invoke when creating a FRESH token!
        Don't use to convert saved token into object
        Refresh Token usually has a lifetime of 14 days

        Args:
            token (str): The JWT Refresh-Token
        """
        date_issued = datetime.now(tzutc())
        date_valid = date_issued + timedelta(days=14)
        super(RefreshToken, self).__init__(token, date_issued, date_valid)
项目:vobject    作者:eventable    | 项目源码 | 文件源码
def test_scratchbuild(self):
        """
        CreateCalendar 2.0 format from scratch
        """
        test_cal = get_test_file("simple_2_0_test.ics")
        cal = base.newFromBehavior('vcalendar', '2.0')
        cal.add('vevent')
        cal.vevent.add('dtstart').value = datetime.datetime(2006, 5, 9)
        cal.vevent.add('description').value = "Test event"
        cal.vevent.add('created').value = \
            datetime.datetime(2006, 1, 1, 10,
                              tzinfo=dateutil.tz.tzical(
                                  "test_files/timezones.ics").get('US/Pacific'))
        cal.vevent.add('uid').value = "Not very random UID"
        cal.vevent.add('dtstamp').value = datetime.datetime(2017, 6, 26, 0, tzinfo=tzutc())

        # Note we're normalizing line endings, because no one got time for that.
        self.assertEqual(
            cal.serialize().replace('\r\n', '\n'),
            test_cal.replace('\r\n', '\n')
        )
项目:vobject    作者:eventable    | 项目源码 | 文件源码
def test_recurrence(self):
        """
        Ensure date valued UNTILs in rrules are in a reasonable timezone,
        and include that day (12/28 in this test)
        """
        test_file = get_test_file("recurrence.ics")
        cal = base.readOne(test_file)
        dates = list(cal.vevent.getrruleset())
        self.assertEqual(
            dates[0],
            datetime.datetime(2006, 1, 26, 23, 0, tzinfo=tzutc())
        )
        self.assertEqual(
            dates[1],
            datetime.datetime(2006, 2, 23, 23, 0, tzinfo=tzutc())
        )
        self.assertEqual(
            dates[-1],
            datetime.datetime(2006, 12, 28, 23, 0, tzinfo=tzutc())
        )
项目:rpc-gating    作者:rcbops    | 项目源码 | 文件源码
def cleanup_instances(self):
        """Cleanup Instances.

        Delete instances if they are in an error state or are over the
        age defined in INSTANCE_AGE_LIMIT.

        Instances that don't match PROTECTED_PREFIX are cleaned up
        """
        current_time = datetime.datetime.now(tzutc())
        max_age = datetime.timedelta(hours=self.age_limit)

        for server in self.unprotected_servers:
            created_time = dateutil.parser.parse(server.created_at)
            age = current_time - created_time
            errored = server.status == "ERROR"
            if errored or age > max_age:
                _indp("Deleting {name} Errored: {error} Age: {age}".format(
                    name=server.name, error=errored, age=age))
                self.conn.compute.delete_server(server.id)