我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用pytz.AmbiguousTimeError()。
def timezone_offset(timezone): """Takes a tz name like 'US/Eastern' and returns 'US/Eastern - UTC-05:00'. Intended to be used by the timezone notification page fragment. The pytz package should gracefully handle DST so the above will render 'US/Eastern - UTC-04:00' when DST is in effect. """ if timezone == 'UTC': return 'UTC' now = datetime.datetime.now() try: seconds = pytz.timezone(timezone).utcoffset(now).total_seconds() except (pytz.NonExistentTimeError, pytz.AmbiguousTimeError): # If we're in the midst of a DST transition, add an hour and try again. now = now + datetime.timedelta(hours=1) seconds = pytz.timezone(timezone).utcoffset(now).total_seconds() sign = '+' if seconds < 0: # The minus sign is added automatically! sign = '' hours, remainder = divmod(seconds, 60*60) minutes, _ = divmod(remainder, 60) offset = '%02d:%02d' % (hours, minutes) display = '%s (UTC%s%s)' % (timezone, sign, offset) return display
def test_tz_localize_dti(self): dti = DatetimeIndex(start='1/1/2005', end='1/1/2005 0:00:30.256', freq='L') dti2 = dti.tz_localize(self.tzstr('US/Eastern')) dti_utc = DatetimeIndex(start='1/1/2005 05:00', end='1/1/2005 5:00:30.256', freq='L', tz='utc') self.assert_numpy_array_equal(dti2.values, dti_utc.values) dti3 = dti2.tz_convert(self.tzstr('US/Pacific')) self.assert_numpy_array_equal(dti3.values, dti_utc.values) dti = DatetimeIndex(start='11/6/2011 1:59', end='11/6/2011 2:00', freq='L') self.assertRaises(pytz.AmbiguousTimeError, dti.tz_localize, self.tzstr('US/Eastern')) dti = DatetimeIndex(start='3/13/2011 1:59', end='3/13/2011 2:00', freq='L') self.assertRaises(pytz.NonExistentTimeError, dti.tz_localize, self.tzstr('US/Eastern'))
def test_with_tz_ambiguous_times(self): tz = self.tz('US/Eastern') # March 13, 2011, spring forward, skip from 2 AM to 3 AM dr = date_range(datetime(2011, 3, 13, 1, 30), periods=3, freq=datetools.Hour()) self.assertRaises(pytz.NonExistentTimeError, dr.tz_localize, tz) # after dst transition, it works dr = date_range(datetime(2011, 3, 13, 3, 30), periods=3, freq=datetools.Hour(), tz=tz) # November 6, 2011, fall back, repeat 2 AM hour dr = date_range(datetime(2011, 11, 6, 1, 30), periods=3, freq=datetools.Hour()) self.assertRaises(pytz.AmbiguousTimeError, dr.tz_localize, tz) # UTC is OK dr = date_range(datetime(2011, 3, 13), periods=48, freq=datetools.Minute(30), tz=pytz.utc)
def _CombineDateAndTime(date, time, tzinfo): """Creates a datetime object from date and time objects. This is similar to the datetime.combine method, but its timezone calculations are designed to work with pytz. Arguments: date: a datetime.date object, in any timezone time: a datetime.time object, in any timezone tzinfo: a pytz timezone object, or None Returns: a datetime.datetime object, in the timezone 'tzinfo' """ naive_result = datetime.datetime( date.year, date.month, date.day, time.hour, time.minute, time.second) if tzinfo is None: return naive_result try: return tzinfo.localize(naive_result, is_dst=None) except AmbiguousTimeError: return min(tzinfo.localize(naive_result, is_dst=True), tzinfo.localize(naive_result, is_dst=False)) except NonExistentTimeError: while True: naive_result += datetime.timedelta(minutes=1) try: return tzinfo.localize(naive_result, is_dst=None) except NonExistentTimeError: pass
def parse_time(input_time): """Parse an Atom time stamp.""" parsed = None try: parsed = timezone.make_aware(timezone.datetime(*input_time[:-3]), timezone.utc) except (pytz.NonExistentTimeError, pytz.AmbiguousTimeError): added_hour = (timezone.datetime(*input_time[:-3]) + datetime.timedelta(hours=1)) parsed = timezone.make_aware(added_hour, timezone.utc) return parsed.astimezone(TZ)
def test_end_dst_double_occurrence(self): cron_expression = "30 1 * * * *" timezone = pytz.timezone("Europe/London") start = dt.datetime.strptime('2015-10-25T00:00:00', "%Y-%m-%dT%H:%M:%S") start = timezone.localize(start, is_dst=True) testee = tzcron.Schedule(cron_expression, timezone, start) self.assertRaises(pytz.AmbiguousTimeError, lambda: next(testee))
def test_ambiguous_infer(self): # November 6, 2011, fall back, repeat 2 AM hour # With no repeated hours, we cannot infer the transition tz = self.tz('US/Eastern') dr = date_range(datetime(2011, 11, 6, 0), periods=5, freq=datetools.Hour()) self.assertRaises(pytz.AmbiguousTimeError, dr.tz_localize, tz) # With repeated hours, we can infer the transition dr = date_range(datetime(2011, 11, 6, 0), periods=5, freq=datetools.Hour(), tz=tz) times = ['11/06/2011 00:00', '11/06/2011 01:00', '11/06/2011 01:00', '11/06/2011 02:00', '11/06/2011 03:00'] di = DatetimeIndex(times) localized = di.tz_localize(tz, ambiguous='infer') self.assert_numpy_array_equal(dr, localized) with tm.assert_produces_warning(FutureWarning): localized_old = di.tz_localize(tz, infer_dst=True) self.assert_numpy_array_equal(dr, localized_old) self.assert_numpy_array_equal(dr, DatetimeIndex(times, tz=tz, ambiguous='infer')) # When there is no dst transition, nothing special happens dr = date_range(datetime(2011, 6, 1, 0), periods=10, freq=datetools.Hour()) localized = dr.tz_localize(tz) localized_infer = dr.tz_localize(tz, ambiguous='infer') self.assert_numpy_array_equal(localized, localized_infer) with tm.assert_produces_warning(FutureWarning): localized_infer_old = dr.tz_localize(tz, infer_dst=True) self.assert_numpy_array_equal(localized, localized_infer_old)
def infer_freq(index, warn=True): """ Infer the most likely frequency given the input index. If the frequency is uncertain, a warning will be printed. Parameters ---------- index : DatetimeIndex or TimedeltaIndex if passed a Series will use the values of the series (NOT THE INDEX) warn : boolean, default True Returns ------- freq : string or None None if no discernible frequency TypeError if the index is not datetime-like ValueError if there are less than three values. """ import pandas as pd if isinstance(index, com.ABCSeries): values = index._values if not (com.is_datetime64_dtype(values) or com.is_timedelta64_dtype(values) or values.dtype == object): raise TypeError("cannot infer freq from a non-convertible " "dtype on a Series of {0}".format(index.dtype)) index = values if com.is_period_arraylike(index): raise TypeError("PeriodIndex given. Check the `freq` attribute " "instead of using infer_freq.") elif isinstance(index, pd.TimedeltaIndex): inferer = _TimedeltaFrequencyInferer(index, warn=warn) return inferer.get_freq() if isinstance(index, pd.Index) and not isinstance(index, pd.DatetimeIndex): if isinstance(index, (pd.Int64Index, pd.Float64Index)): raise TypeError("cannot infer freq from a non-convertible index " "type {0}".format(type(index))) index = index.values if not isinstance(index, pd.DatetimeIndex): try: index = pd.DatetimeIndex(index) except AmbiguousTimeError: index = pd.DatetimeIndex(index.asi8) inferer = _FrequencyInferer(index, warn=warn) return inferer.get_freq()
def _process_task(self, task): # 1. Fetch schedule self.org = self.client.get_organization( task.data.get("organization_id")) self.loc = self.org.get_location(task.data.get("location_id")) self.role = self.loc.get_role(task.data.get("role_id")) self.sched = self.role.get_schedule(task.data.get("schedule_id")) self._compute_demand() self._subtract_existing_shifts_from_demand() # Run the calculation s = Splitter(self.demand, self.sched.data.get("min_shift_length_hour"), self.sched.data.get("max_shift_length_hour")) s.calculate() s.efficiency() # Naive becuase not yet datetimes naive_shifts = s.get_shifts() logger.info("Starting upload of %s shifts", len(naive_shifts)) local_start_time = self._get_local_start_time() for shift in naive_shifts: # We have to think of daylight savings time here, so we need to # guarantee that we don't have any errors. We do this by overshooting # the timedelta by an extra two hours, then rounding back to midnight. logger.debug("Processing shift %s", shift) start_day = normalize_to_midnight( deepcopy(local_start_time) + timedelta(days=shift["day"])) # Beware of time changes - duplicate times are possible try: start = start_day.replace(hour=shift["start"]) except pytz.AmbiguousTimeError: # Randomly pick one. Minor tech debt. start = start_day.replace(hour=shift["start"], is_dst=False) stop = start + timedelta(hours=shift["length"]) # Convert to the strings we are passing up to the cLoUd utc_start_str = start.astimezone(self.default_tz).isoformat() utc_stop_str = stop.astimezone(self.default_tz).isoformat() logger.info("Creating shift with start %s stop %s", start, stop) self.role.create_shift(start=utc_start_str, stop=utc_stop_str)
def _subtract_existing_shifts_from_demand(self): logger.info("Starting demand: %s", self.demand) demand_copy = deepcopy(self.demand) search_start = (self._get_local_start_time() - timedelta( hours=config.MAX_SHIFT_LENGTH_HOURS)).astimezone(self.default_tz) # 1 week search_end = (self._get_local_start_time() + timedelta( days=7, hours=config.MAX_SHIFT_LENGTH_HOURS) ).astimezone(self.default_tz) shifts = self.role.get_shifts(start=search_start, end=search_end) logger.info("Checking %s shifts for existing demand", len(shifts)) # Search hour by hour throughout the weeks for day in range(len(self.demand)): start_day = normalize_to_midnight(self._get_local_start_time() + timedelta(days=day)) for start in range(len(self.demand[0])): # Beware of time changes - duplicate times are possible try: start_hour = deepcopy(start_day).replace(hour=start) except pytz.AmbiguousTimeError: # Randomly pick one - cause phucket. Welcome to chomp. start_hour = deepcopy(start_day).replace( hour=start, is_dst=False) try: stop_hour = start_hour + timedelta(hours=1) except pytz.AmbiguousTimeError: stop_hour = start_hour + timedelta(hours=1, is_dst=False) # Find shift current_staffing_level = 0 for shift in shifts: shift_start = iso8601.parse_date( shift.data.get("start")).replace( tzinfo=self.default_tz) shift_stop = iso8601.parse_date( shift.data.get("stop")).replace(tzinfo=self.default_tz) if ((shift_start <= start_hour and shift_stop > stop_hour) or (shift_start >= start_hour and shift_start < stop_hour) or (shift_stop > start_hour and shift_stop <= stop_hour)): # increment staffing level during that bucket current_staffing_level += 1 logger.debug("Current staffing level at day %s time %s is %s", day, start, current_staffing_level) demand_copy[day][start] -= current_staffing_level # demand cannot be less than zero if demand_copy[day][start] < 0: demand_copy[day][start] = 0 logger.info("Demand minus existing shifts: %s", demand_copy) self.demand = demand_copy