我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用datetime.time()。
def __preptime(self,when): """ Extract information in a suitable format from when, a datetime.datetime object. """ # datetime days are numbered in the Gregorian calendar # while the calculations from NOAA are distibuted as # OpenOffice spreadsheets with days numbered from # 1/1/1900. The difference are those numbers taken for # 18/12/2010 self.day = when.toordinal()-(734124-40529) t=when.time() self.time= (t.hour + t.minute/60.0 + t.second/3600.0)/24.0 self.timezone=0 offset=when.utcoffset() if not offset is None: self.timezone=offset.seconds/3600.0
def get_minute_in_day_to_message_statistic(self, message_statistic): """ Maps each minute in a day to the sum of the values of a message statistic over all messages from that minute. Args: message_statistic: A function mapping a Message object to an int or a float. Returns: minute_in_day_to_message_statistic: A dict mapping a time object representing a minute in a day to the sum of the values of message_statistic over all messages in self.messages from that minute. """ minute_in_day_to_message_statistic = {} for hour in range(self.HOURS_PER_DAY): for minute in range(self.MINUTES_PER_HOUR): minute_in_day = time(hour, minute) minute_in_day_to_message_statistic[minute_in_day] = 0 for message in self.messages: minute_in_day = time(message.timestamp.hour, message.timestamp.minute) minute_in_day_to_message_statistic[minute_in_day] += message_statistic(message) return minute_in_day_to_message_statistic
def n_messages_chi_square(self, time_interval): """ Computes a chi square test against the null hypothesis that the number of messages is uniformly distributed across the time interval. Only makes sense for the time intervals 'minute in hour', 'minute in day', 'hour' since those ones have a fixed number of values. Args: time_interval: One of 'minute in hour', 'minute in day', 'hour'. Returns: chisq: A float representing the chi square statistic where the observations consist of the number of messages in each value of time_interval and the null hypothesis is that the number of messages is uniformly distributed. p: A float representing the p-value of the chi square test. """ valid_time_intervals = ['minute in hour', 'minute in day', 'hour'] if time_interval not in valid_time_intervals: raise ValueError('time_interval must be in {}'.format(valid_time_intervals)) result = chisquare(self.get_n_messages_in_time_interval(time_interval)) return (result.statistic, result.pvalue)
def QA_data_tick_resample(tick, type_='1min'): data = tick['price'].resample( type_, label='right', closed='left').ohlc() data['volume'] = tick['vol'].resample( type_, label='right', closed='left').sum() data['code'] = tick['code'][0] __data_ = pd.DataFrame() _temp = tick.drop_duplicates('date')['date'] for item in _temp: __data = data[item] _data = __data[time(9, 31):time(11, 30)].append( __data[time(13, 1):time(15, 0)]) __data_ = __data_.append(_data) __data_['datetime'] = __data_.index __data_['date'] = __data_['datetime'].apply(lambda x: str(x)[0:10]) return __data_.fillna(method='ffill').set_index(['datetime', 'code'], drop=False)
def _check_annotations(value): """ Recursively check that value is either of a "simple" type (number, string, date/time) or is a (possibly nested) dict, list or numpy array containing only simple types. """ if isinstance(value, np.ndarray): if not issubclass(value.dtype.type, ALLOWED_ANNOTATION_TYPES): raise ValueError("Invalid annotation. NumPy arrays with dtype %s" "are not allowed" % value.dtype.type) elif isinstance(value, dict): for element in value.values(): _check_annotations(element) elif isinstance(value, (list, tuple)): for element in value: _check_annotations(element) elif not isinstance(value, ALLOWED_ANNOTATION_TYPES): raise ValueError("Invalid annotation. Annotations of type %s are not" "allowed" % type(value))
def __init__(self, value): """ Initializer value can be: - integer_type: absolute nanoseconds in the day - datetime.time: built-in time - string_type: a string time of the form "HH:MM:SS[.mmmuuunnn]" """ if isinstance(value, six.integer_types): self._from_timestamp(value) elif isinstance(value, datetime.time): self._from_time(value) elif isinstance(value, six.string_types): self._from_timestring(value) else: raise TypeError('Time arguments must be a whole number, datetime.time, or string')
def __init__(self, value): """ Initializer value can be: - integer_type: absolute days from epoch (1970, 1, 1). Can be negative. - datetime.date: built-in date - string_type: a string time of the form "yyyy-mm-dd" """ if isinstance(value, six.integer_types): self.days_from_epoch = value elif isinstance(value, (datetime.date, datetime.datetime)): self._from_timetuple(value.timetuple()) elif isinstance(value, six.string_types): self._from_datestring(value) else: raise TypeError('Date arguments must be a whole number, datetime.date, or string')
def test_time(): """Test a simple timeline""" time_chart = TimeLine(truncate_label=1000) time_chart.add('times', [ (time(1, 12, 29), 2), (time(21, 2, 29), 10), (time(12, 30, 59), 7) ]) q = time_chart.render_pyquery() assert list( map(lambda t: t.split(' ')[0], q(".axis.x text").map(texts))) == [ '02:46:40', '05:33:20', '08:20:00', '11:06:40', '13:53:20', '16:40:00', '19:26:40']
def bind_processor(self, dialect): datetime_time = datetime.time format = self._storage_format def process(value): if value is None: return None elif isinstance(value, datetime_time): return format % { 'hour': value.hour, 'minute': value.minute, 'second': value.second, 'microsecond': value.microsecond, } else: raise TypeError("SQLite Time type only accepts Python " "time objects as input.") return process
def __init__(self, isolation_level=None, native_datetime=False, **kwargs): default.DefaultDialect.__init__(self, **kwargs) self.isolation_level = isolation_level # this flag used by pysqlite dialect, and perhaps others in the # future, to indicate the driver is handling date/timestamp # conversions (and perhaps datetime/time as well on some hypothetical # driver ?) self.native_datetime = native_datetime if self.dbapi is not None: self.supports_default_values = ( self.dbapi.sqlite_version_info >= (3, 3, 8)) self.supports_cast = ( self.dbapi.sqlite_version_info >= (3, 2, 3)) self.supports_multivalues_insert = ( # http://www.sqlite.org/releaselog/3_7_11.html self.dbapi.sqlite_version_info >= (3, 7, 11)) # see http://www.sqlalchemy.org/trac/ticket/2568 # as well as http://www.sqlite.org/src/info/600482d161 self._broken_fk_pragma_quotes = ( self.dbapi.sqlite_version_info < (3, 6, 14))
def result_processor(self, dialect, coltype): time = datetime.time def process(value): # convert from a timedelta value if value is not None: microseconds = value.microseconds seconds = value.seconds minutes = seconds // 60 return time(minutes // 60, minutes % 60, seconds - minutes * 60, microsecond=microseconds) else: return None return process
def default(self, o, dates=(datetime.datetime, datetime.date), times=(datetime.time,), textual=(decimal.Decimal, uuid.UUID, DjangoPromise), isinstance=isinstance, datetime=datetime.datetime, text_type=text_type): if isinstance(o, dates): if not isinstance(o, datetime): o = datetime(o.year, o.month, o.day, 0, 0, 0, 0) r = o.isoformat() if r.endswith("+00:00"): r = r[:-6] + "Z" return r elif isinstance(o, times): return o.isoformat() elif isinstance(o, textual): return text_type(o) else: return super(JsonEncoder, self).default(o)
def test_escape_any(self): now = datetime.datetime.now() self.assertEqual(escape_any('foo\n\r\\bar'), r'"foo\n\r\\bar"') self.assertEqual(escape_any(now), '"%s"^^xsd:dateTime' % now.isoformat()) self.assertEqual(escape_any(now.date()), '"%s"^^xsd:date' % now.date().isoformat()) self.assertEqual(escape_any(now.time()), '"%s"^^xsd:time' % now.time().isoformat()) self.assertEqual(escape_any(True), 'true') self.assertEqual(escape_any(5), '5') self.assertEqual(escape_any(Decimal(5.5)), '5.5') self.assertEqual(escape_any(5.5), '"5.5"^^xsd:double') self.assertEqual(escape_any(RDFTerm("raw")), 'raw') self.assertEqual(escape_any(Node("subject", {})), 'subject') with self.assertRaises(TypeError): escape_any(int)
def put(self, job, next_t=None): """Queue a new job. Args: job (telegram.ext.Job): The ``Job`` instance representing the new job next_t (Optional[int, float, datetime.timedelta, datetime.datetime, datetime.time]): Time in or at which the job should run for the first time. This parameter will be interpreted depending on its type. ``int`` or ``float`` will be interpreted as "seconds from now" in which the job should run. ``datetime.timedelta`` will be interpreted as "time from now" in which the job should run. ``datetime.datetime`` will be interpreted as a specific date and time at which the job should run. ``datetime.time`` will be interpreted as a specific time at which the job should run. This could be either today or, if the time has already passed, tomorrow. """ warnings.warn("'JobQueue.put' is being deprecated, use 'JobQueue.run_once', " "'JobQueue.run_daily' or 'JobQueue.run_repeating' instead") if job.job_queue is None: job.job_queue = self self._put(job, next_t=next_t)
def _main_loop(self): """ Thread target of thread ``job_queue``. Runs in background and performs ticks on the job queue. """ while self._running: # self._next_peek may be (re)scheduled during self.tick() or self.put() with self.__next_peek_lock: tmout = self._next_peek - time.time() if self._next_peek else None self._next_peek = None self.__tick.clear() self.__tick.wait(tmout) # If we were woken up by self.stop(), just bail out if not self._running: break self.tick() self.logger.debug('%s thread stopped', self.__class__.__name__)
def test_serial(): assert s.serialize_value(None) == 'x' assert s.serialize_value(True) == 'true' assert s.serialize_value(False) == 'false' assert s.serialize_value(5) == 'i:5' assert s.serialize_value(5.0) == 'f:5.0' assert s.serialize_value(decimal.Decimal('5.5')) == 'n:5.5' assert s.serialize_value('abc') == 's:abc' assert s.serialize_value(b'abc') == 'b:YWJj' assert s.serialize_value(b'abc') == 'b:YWJj' assert s.serialize_value(datetime.date(2007, 12, 5)) == 'd:2007-12-05' assert s.serialize_value(datetime.datetime(2007, 12, 5, 12, 30, 30, tzinfo=utc)) \ == 'dt:2007-12-05 12:30:30+00:00' assert s.serialize_value(datetime.time(12, 34, 56)) == 't:12:34:56' with raises(NotImplementedError): s.serialize_value(csv.reader)
def __excel_date_dt(self, date): adj = False if isinstance(date, dt.date): if self.__parent_wb.dates_1904: epoch_tuple = (1904, 1, 1) else: epoch_tuple = (1899, 12, 31) adj = True if isinstance(date, dt.datetime): epoch = dt.datetime(*epoch_tuple) else: epoch = dt.date(*epoch_tuple) else: # it's a datetime.time instance date = dt.datetime.combine(dt.datetime(1900, 1, 1), date) epoch = dt.datetime(1900, 1, 1) delta = date - epoch xldate = delta.days + delta.seconds / 86400.0 # Add a day for Excel's missing leap day in 1900 if adj and xldate > 59: xldate += 1 return xldate
def readDate(self): """ Reads a UTC date from the data stream. Client and servers are responsible for applying their own timezones. Date: C{0x0B T7 T6} .. C{T0 Z1 Z2 T7} to C{T0} form a 64 bit Big Endian number that specifies the number of nanoseconds that have passed since 1/1/1970 0:00 to the specified time. This format is UTC 1970. C{Z1} and C{Z0} for a 16 bit Big Endian number indicating the indicated time's timezone in minutes. """ ms = self.stream.read_double() / 1000.0 self.stream.read_short() # tz # Timezones are ignored d = util.get_datetime(ms) if self.timezone_offset: d = d + self.timezone_offset self.context.addObject(d) return d
def writeDate(self, d): """ Writes a date to the data stream. @type d: Instance of C{datetime.datetime} @param d: The date to be encoded to the AMF0 data stream. """ if isinstance(d, datetime.time): raise pyamf.EncodeError( 'A datetime.time instance was found but AMF0 has no way to ' 'encode time objects. Please use datetime.datetime instead ' '(got:%r)' % (d,) ) # According to the Red5 implementation of AMF0, dates references are # created, but not used. if self.timezone_offset is not None: d -= self.timezone_offset secs = util.get_timestamp(d) tz = 0 self.writeType(TYPE_DATE) self.stream.write_double(secs * 1000.0) self.stream.write_short(tz)
def get_timestamp(d): """ Returns a UTC timestamp for a C{datetime.datetime} object. @type d: C{datetime.datetime} @return: UTC timestamp. @rtype: C{float} @see: Inspiration taken from the U{Intertwingly blog <http://intertwingly.net/blog/2007/09/02/Dealing-With-Dates>}. """ if isinstance(d, datetime.date) and not isinstance(d, datetime.datetime): d = datetime.datetime.combine(d, datetime.time(0, 0, 0, 0)) msec = str(d.microsecond).rjust(6).replace(' ', '0') return float('%s.%s' % (calendar.timegm(d.utctimetuple()), msec))
def date2values(date): ''' Convert a date object into values for create or write ''' res = {} if not isinstance(date, datetime.datetime): res['date'] = True res['datetime'] = datetime.datetime.combine(date, datetime.time()) else: res['date'] = False if date.tzinfo: res['datetime'] = date.astimezone(tzlocal) else: res['datetime'] = date return res
def sql_format(value): if isinstance(value, (Query, Expression)): return value if value is None: return None if isinstance(value, basestring): year, month, day = map(int, value.split("-", 2)) return datetime.date(year, month, day) assert(isinstance(value, datetime.date)) # Allow datetime with min time for XML-RPC # datetime must be tested separately because datetime is a # subclass of date assert(not isinstance(value, datetime.datetime) or value.time() == datetime.time()) return value
def test_parse_time_timezone(self): self.check_time_tz("+01", 3600) self.check_time_tz("-01", -3600) self.check_time_tz("+01:15", 4500) self.check_time_tz("-01:15", -4500) # The Python datetime module does not support time zone # offsets that are not a whole number of minutes. # We round the offset to the nearest minute. self.check_time_tz("+01:15:00", 60 * (60 + 15)) self.check_time_tz("+01:15:29", 60 * (60 + 15)) self.check_time_tz("+01:15:30", 60 * (60 + 16)) self.check_time_tz("+01:15:59", 60 * (60 + 16)) self.check_time_tz("-01:15:00", -60 * (60 + 15)) self.check_time_tz("-01:15:29", -60 * (60 + 15)) self.check_time_tz("-01:15:30", -60 * (60 + 16)) self.check_time_tz("-01:15:59", -60 * (60 + 16))
def check_datetime_tz(self, str_offset, offset): from datetime import datetime, timedelta base = datetime(2007, 1, 1, 13, 30, 29) base_str = '2007-01-01 13:30:29' value = self.DATETIME(base_str + str_offset, self.curs) # Value has time zone info and correct UTC offset. self.assertNotEqual(value.tzinfo, None), self.assertEqual(value.utcoffset(), timedelta(seconds=offset)) # Datetime is correct. self.assertEqual(value.replace(tzinfo=None), base) # Conversion to UTC produces the expected offset. UTC = FixedOffsetTimezone(0, "UTC") value_utc = value.astimezone(UTC).replace(tzinfo=None) self.assertEqual(base - value_utc, timedelta(seconds=offset))
def test_parse_datetime_timezone(self): self.check_datetime_tz("+01", 3600) self.check_datetime_tz("-01", -3600) self.check_datetime_tz("+01:15", 4500) self.check_datetime_tz("-01:15", -4500) # The Python datetime module does not support time zone # offsets that are not a whole number of minutes. # We round the offset to the nearest minute. self.check_datetime_tz("+01:15:00", 60 * (60 + 15)) self.check_datetime_tz("+01:15:29", 60 * (60 + 15)) self.check_datetime_tz("+01:15:30", 60 * (60 + 16)) self.check_datetime_tz("+01:15:59", 60 * (60 + 16)) self.check_datetime_tz("-01:15:00", -60 * (60 + 15)) self.check_datetime_tz("-01:15:29", -60 * (60 + 15)) self.check_datetime_tz("-01:15:30", -60 * (60 + 16)) self.check_datetime_tz("-01:15:59", -60 * (60 + 16))
def escape_parameter(v): if v is None: return 'NULL' t = type(v) if t == str: return u"'" + v.replace(u"'", u"''") + u"'" elif t == bool: return u"TRUE" if v else u"FALSE" elif t == time.struct_time: return u'%04d-%02d-%02d %02d:%02d:%02d' % ( v.tm_year, v.tm_mon, v.tm_mday, v.tm_hour, v.tm_min, v.tm_sec) elif t == datetime.datetime: return "timestamp '" + v.isoformat() + "'" elif t == datetime.date: return "date '" + str(v) + "'" elif t == datetime.timedelta: return u"interval '" + str(v) + "'" elif t == int or t == float: return str(v) elif t == decimal.Decimal: return "decimal '" + str(v) + "'" else: return "'" + str(v) + "'"
def value(self): """ Returns the value for this BoundField, using the initial value if the form is not bound or the data otherwise. """ if not self.form.is_bound: data = self.form.initial.get(self.name, self.field.initial) if callable(data): if self._initial_value is not UNSET: data = self._initial_value else: data = data() # If this is an auto-generated default date, nix the # microseconds for standardized handling. See #22502. if (isinstance(data, (datetime.datetime, datetime.time)) and not self.field.widget.supports_microseconds): data = data.replace(microsecond=0) self._initial_value = data else: data = self.field.bound_data( self.data, self.form.initial.get(self.name, self.field.initial) ) return self.field.prepare_value(data)
def adapt_unknown_value(self, value): """ Transforms a value to something compatible with the backend driver. This method only depends on the type of the value. It's designed for cases where the target type isn't known, such as .raw() SQL queries. As a consequence it may not work perfectly in all circumstances. """ if isinstance(value, datetime.datetime): # must be before date return self.adapt_datetimefield_value(value) elif isinstance(value, datetime.date): return self.adapt_datefield_value(value) elif isinstance(value, datetime.time): return self.adapt_timefield_value(value) elif isinstance(value, decimal.Decimal): return self.adapt_decimalfield_value(value) else: return value
def executemany(self, sql, param_list): start = time() try: return super(CursorDebugWrapper, self).executemany(sql, param_list) finally: stop = time() duration = stop - start try: times = len(param_list) except TypeError: # param_list could be an iterator times = '?' self.db.queries_log.append({ 'sql': '%s times: %s' % (times, sql), 'time': "%.3f" % duration, }) logger.debug('(%.3f) %s; args=%s' % (duration, sql, param_list), extra={'duration': duration, 'sql': sql, 'params': param_list} ) ############################################### # Converters from database (string) to Python # ###############################################
def get_prep_value(self, value): value = super(DateTimeField, self).get_prep_value(value) value = self.to_python(value) if value is not None and settings.USE_TZ and timezone.is_naive(value): # For backwards compatibility, interpret naive datetimes in local # time. This won't work during DST change, but we can't do much # about it, so we let the exceptions percolate up the call stack. try: name = '%s.%s' % (self.model.__name__, self.name) except AttributeError: name = '(unbound)' warnings.warn("DateTimeField %s received a naive datetime (%s)" " while time zone support is active." % (name, value), RuntimeWarning) default_timezone = timezone.get_default_timezone() value = timezone.make_aware(value, default_timezone) return value
def localize(value, use_l10n=None): """ Checks if value is a localizable type (date, number...) and returns it formatted as a string using current locale format. If use_l10n is provided and is not None, that will force the value to be localized (or not), overriding the value of settings.USE_L10N. """ if isinstance(value, bool): return mark_safe(six.text_type(value)) elif isinstance(value, (decimal.Decimal, float) + six.integer_types): return number_format(value, use_l10n=use_l10n) elif isinstance(value, datetime.datetime): return date_format(value, 'DATETIME_FORMAT', use_l10n=use_l10n) elif isinstance(value, datetime.date): return date_format(value, use_l10n=use_l10n) elif isinstance(value, datetime.time): return time_format(value, 'TIME_FORMAT', use_l10n=use_l10n) else: return value
def localize_input(value, default=None): """ Checks if an input value is a localizable type and returns it formatted with the appropriate formatting string of the current locale. """ if isinstance(value, (decimal.Decimal, float) + six.integer_types): return number_format(value) elif isinstance(value, datetime.datetime): value = datetime_safe.new_datetime(value) format = force_str(default or get_format('DATETIME_INPUT_FORMATS')[0]) return value.strftime(format) elif isinstance(value, datetime.date): value = datetime_safe.new_date(value) format = force_str(default or get_format('DATE_INPUT_FORMATS')[0]) return value.strftime(format) elif isinstance(value, datetime.time): format = force_str(default or get_format('TIME_INPUT_FORMATS')[0]) return value.strftime(format) return value
def parse_time(value): """Parses a string and return a datetime.time. This function doesn't support time zone offsets. Raises ValueError if the input is well formatted but not a valid time. Returns None if the input isn't well formatted, in particular if it contains an offset. """ match = time_re.match(value) if match: kw = match.groupdict() if kw['microsecond']: kw['microsecond'] = kw['microsecond'].ljust(6, '0') kw = {k: int(v) for k, v in six.iteritems(kw) if v is not None} return datetime.time(**kw)
def display_for_value(value, empty_value_display, boolean=False): from django.contrib.admin.templatetags.admin_list import _boolean_icon if boolean: return _boolean_icon(value) elif value is None: return empty_value_display elif isinstance(value, datetime.datetime): return formats.localize(timezone.template_localtime(value)) elif isinstance(value, (datetime.date, datetime.time)): return formats.localize(value) elif isinstance(value, six.integer_types + (decimal.Decimal, float)): return formats.number_format(value) elif isinstance(value, (list, tuple)): return ', '.join(force_text(v) for v in value) else: return smart_text(value)
def getDaysFromTimes(times): days = set() for time in times: try: for day in time.get("days"): if type(day) == int: days.add(day % 7) except: pass return sorted(days) ## ## @brief Convert days to their text representation in a list. ## ## @param days (int)(list) The days ## ## @return A list of strings. ##
def parse_time(time_string): try: return datetime.datetime.strptime(time_string, "%I:%M%p").time() except: pass try: return datetime.datetime.strptime(time_string, "%H:%M").time() except: return None ## ## @brief Get the mini term. ## ## @return (int) The current mini if no date is provided. ##
def db_value(self, value): if value is None: return if isinstance(value, datetime.datetime): pass elif isinstance(value, datetime.date): value = datetime.datetime(value.year, value.month, value.day) else: return int(round(value * self.resolution)) if self.utc: timestamp = calendar.timegm(value.utctimetuple()) else: timestamp = time.mktime(value.timetuple()) timestamp += (value.microsecond * .000001) if self.resolution > 1: timestamp *= self.resolution return int(round(timestamp))
def __init__(self): """SecurityOptions() Initialize. """ # I don't believe any of these types can ever pose a security hazard, # except perhaps "reference"... self.allowedTypes = {"None": 1, "bool": 1, "boolean": 1, "string": 1, "str": 1, "int": 1, "float": 1, "datetime": 1, "time": 1, "date": 1, "timedelta": 1, "NoneType": 1} if hasattr(types, 'UnicodeType'): self.allowedTypes['unicode'] = 1 self.allowedModules = {} self.allowedClasses = {}
def occurrences(self): import datetime from dateutil import rrule datelist = [] if self.rrule: rr = rrule.rrulestr(self.rrule, dtstart=self.start_date) start = datetime.datetime.combine(self.start_date, datetime.datetime.min.time()) end = start + datetime.timedelta(days=365) datelist = rr.between(start, end, inc=True) else: if self.start_date >= datetime.date.today(): datelist.append(datetime.datetime.combine(self.start_date, datetime.time(0, 0, 0))) if not self.start_time: return datelist return [datetime.datetime.combine(x.date(), self.start_time) for x in datelist]
def format_time(hour_tuple): """ Convert to a 12hour time """ hour = hour_tuple[0] duration = hour_tuple[1] if hour is None: return '' elif isinstance(hour, datetime.time): time = _parse_time_to_string(hour) if duration: end_hour = _add_minutes(hour, duration) end_time = _parse_time_to_string(end_hour) time = '%s - %s' % (time, end_time) return time else: try: if hour < 12: return "%s AM" % str(hour) elif hour == 12: return "%s PM" % str(hour) else: return "%s PM" % str(hour - 12) except ValueError: return hour
def make_schedule_array(event): """ Convert an event's schedules into a schedule_array """ # Since the schedules in the event are time-fixed, events with multiple # times will have the same start_date, rrule and place combination # This groups the times according to the start_date/rrule combination # { # (<start_date>, "<rrule>", <place>): [<start_time>, ] # } schedules = OrderedDict() for s in event.dates.all().order_by("start_date", "start_time"): key = (s.start_date, s.rrule, s.place, s.more_info_url) if key in schedules: schedules[key].append((s.start_time, s.duration)) else: schedules[key] = [(s.start_time, s.duration), ] return schedules
def at(self, time_str): """Schedule the job every day at a specific time. Calling this is only valid for jobs scheduled to run every N day(s). """ assert self.unit in ('days', 'hours') or self.start_day hour, minute = time_str.split(':') minute = int(minute) if self.unit == 'days' or self.start_day: hour = int(hour) assert 0 <= hour <= 23 elif self.unit == 'hours': hour = 0 assert 0 <= minute <= 59 self.at_time = datetime.time(hour, minute) return self
def do(self, job_func, *args, **kwargs): """Specifies the job_func that should be called every time the job runs. Any additional arguments are passed on to job_func when the job runs. """ self.job_func = functools.partial(job_func, *args, **kwargs) try: functools.update_wrapper(self.job_func, job_func) except AttributeError: # job_funcs already wrapped by functools.partial won't have # __name__, __module__ or __doc__ and the update_wrapper() # call will fail. pass self._schedule_next_run() return self
def get_time_interval_to_message_statistic(self, time_interval, message_statistic): """ Maps each value of time interval to the sum of the values of a message statistic over all messages from that time interval value. Wrapper function for the functions 'get_' + time_interval.replace(' ', '_') + '_to_message_statistic'. Args: time_interval: One of 'minute in hour', 'minute in day', 'hour', 'date', 'week', 'month', 'year'. message_statistic: A function mapping a Message object to an int or a float. Returns: time_interval_to_message_statistic: A dict mapping each value of a time interval to the sum of the values of message_statistic over all messages in self.messages from that time interval value. """ if time_interval not in self.TIME_INTERVALS: raise ValueError('time_interval must be in {}'.format(self.TIME_INTERVALS)) getter = getattr(self, 'get_' + time_interval.replace(' ', '_') + '_to_message_statistic') time_interval_to_message_statistic = getter(message_statistic) return time_interval_to_message_statistic
def ensure_utc(time, tz='UTC'): """ Normalize a time. If the time is tz-naive, assume it is UTC. """ if not time.tzinfo: time = time.replace(tzinfo=pytz.timezone(tz)) return time.replace(tzinfo=pytz.utc)
def _build_time(time, kwargs): """ Builds the time argument for event rules. """ tz = kwargs.pop('tz', 'UTC') if time: if kwargs: raise ValueError('Cannot pass kwargs and a time') else: return ensure_utc(time, tz) elif not kwargs: raise ValueError('Must pass a time or kwargs') else: return datetime.time(**kwargs)