我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用datetime.datetime.max()。
def test_overflow(self): tiny = timedelta.resolution td = timedelta.min + tiny td -= tiny # no problem self.assertRaises(OverflowError, td.__sub__, tiny) self.assertRaises(OverflowError, td.__add__, -tiny) td = timedelta.max - tiny td += tiny # no problem self.assertRaises(OverflowError, td.__add__, tiny) self.assertRaises(OverflowError, td.__sub__, -tiny) self.assertRaises(OverflowError, lambda: -timedelta.max) day = timedelta(1) self.assertRaises(OverflowError, day.__mul__, 10**9) self.assertRaises(OverflowError, day.__mul__, 1e9) self.assertRaises(OverflowError, day.__truediv__, 1e-20) self.assertRaises(OverflowError, day.__truediv__, 1e-10) self.assertRaises(OverflowError, day.__truediv__, 9e-10)
def update_heartbeat(): @retry( stop_max_delay=30000, # 30 seconds max wait_exponential_multiplier=100, # wait 2^i * 100 ms, on the i-th retry wait_exponential_max=1000, # but wait 1 second per try maximum wrap_exception=True ) def retry_fetch_fail_after_30sec(): return requests.post( config['webservice']['shifthelperHeartbeat'], auth=( config['webservice']['user'], config['webservice']['password'] ) ).json() try: return retry_fetch_fail_after_30sec() except RetryError as e: return {}
def filter_by_first_message_dt(self, start_dt=datetime.min, end_dt=datetime.max): """ Returns a copy of self after filtering self.conversations by conversations whose first messages lie in a datetime interval. Args: start_dt: A datetime object satisfying start_dt <= end_dt. end_dt: A datetime object satisfying start_dt <= end_dt. Returns: A UserConversations object that is equal to self after filtering self.conversations such that the datetime of the first message in each conversation is in the closed interval [start_dt, end_dt]. Raises: EmptyUserConversationsError: Filtering self.conversations results in an empty list. """ if start_dt > end_dt: raise ValueError('Must have start_dt <= end_dt') conversation_filter = lambda x: x.messages[0].timestamp >= start_dt and x.messages[0].timestamp <= end_dt return self.filter_user_conversations(conversation_filter)
def filter_by_last_message_dt(self, start_dt=datetime.min, end_dt=datetime.max): """ Returns a copy of self after filtering self.conversations by conversations whose last messages lie in a datetime interval. Args: start_dt: A datetime object satisfying start_dt <= end_dt. end_dt: A datetime object satisfying start_dt <= end_dt. Returns: A UserConversations object that is equal to self after filtering self.conversations such that the datetime of the last message in each conversation is in the closed interval [start_dt, end_dt]. Raises: EmptyUserConversationsError: Filtering self.conversations results in an empty list. """ if start_dt > end_dt: raise ValueError('Must have start_dt <= end_dt') conversation_filter = lambda x: x.messages[-1].timestamp >= start_dt and x.messages[-1].timestamp <= end_dt return self.filter_user_conversations(conversation_filter)
def filter_by_datetime(self, start_dt=datetime.min, end_dt=datetime.max): """ Returns a copy of self after filtering each conversation's messages to only contain messages that lie in a datetime interval and removing conversations with no messages in the datetime interval. Args: start_dt: A datetime object satisfying start_dt <= end_dt. end_dt: A datetime object satisfying start_dt <= end_dt. Returns: A UserConversations object that is equal to self after filtering the messages of each conversation in self.conversations to keep messages whose datetimes are in the closed interval [start_dt, end_dt]. Conversations with no messages in the interval are discarded. Raises: EmptyUserConversationsError: Across all conversations, there is no message that lies in the closed interval [start_dt, end_dt]. """ if start_dt > end_dt: raise ValueError("start_dt must be less than or equal to end_dt") message_filter = lambda x: x.timestamp >= start_dt and x.timestamp <= end_dt return self.filter_conversations(message_filter=message_filter)
def filter_by_datetime(self, start_dt=datetime.min, end_dt=datetime.max): """ Returns a copy of self after filtering self.messages by messages that lie in a datetime interval. Args: start_dt: A datetime object satisfying start_dt <= end_dt. end_dt: A datetime object satisfying start_dt <= end_dt. Returns: A Conversation object that is equal to self after filtering by messages whose datetimes are in the closed interval [start_dt, end_dt]. Raises: EmptyConversationError: Filtering self.messages results in an empty list. """ if start_dt > end_dt: raise ValueError("start_dt must be less than or equal to end_dt") message_filter = lambda x: x.timestamp >= start_dt and x.timestamp <= end_dt filtered = self.filter_conversation(message_filter=message_filter) return filtered
def cast_date(value, connection): """Cast a date value.""" # The output format depends on the server setting DateStyle. The default # setting ISO and the setting for German are actually unambiguous. The # order of days and months in the other two settings is however ambiguous, # so at least here we need to consult the setting to properly parse values. if value == '-infinity': return date.min if value == 'infinity': return date.max value = value.split() if value[-1] == 'BC': return date.min value = value[0] if len(value) > 10: return date.max fmt = connection.date_format() return datetime.strptime(value, fmt).date()
def cast_timestamp(value, connection): """Cast a timestamp value.""" if value == '-infinity': return datetime.min if value == 'infinity': return datetime.max value = value.split() if value[-1] == 'BC': return datetime.min fmt = connection.date_format() if fmt.endswith('-%Y') and len(value) > 2: value = value[1:5] if len(value[3]) > 4: return datetime.max fmt = ['%d %b' if fmt.startswith('%d') else '%b %d', '%H:%M:%S.%f' if len(value[2]) > 8 else '%H:%M:%S', '%Y'] else: if len(value[0]) > 10: return datetime.max fmt = [fmt, '%H:%M:%S.%f' if len(value[1]) > 8 else '%H:%M:%S'] return datetime.strptime(' '.join(value), ' '.join(fmt))
def cast_timestamptz(value, connection): """Cast a timestamptz value.""" if value == '-infinity': return datetime.min if value == 'infinity': return datetime.max value = value.split() if value[-1] == 'BC': return datetime.min fmt = connection.date_format() if fmt.endswith('-%Y') and len(value) > 2: value = value[1:] if len(value[3]) > 4: return datetime.max fmt = ['%d %b' if fmt.startswith('%d') else '%b %d', '%H:%M:%S.%f' if len(value[2]) > 8 else '%H:%M:%S', '%Y'] value, tz = value[:-1], value[-1] else: if fmt.startswith('%Y-'): tz = _re_timezone.match(value[1]) if tz: value[1], tz = tz.groups() else: tz = '+0000' else: value, tz = value[:-1], value[-1] if len(value[0]) > 10: return datetime.max fmt = [fmt, '%H:%M:%S.%f' if len(value[1]) > 8 else '%H:%M:%S'] if _has_timezone: value.append(_timezone_as_offset(tz)) fmt.append('%z') return datetime.strptime(' '.join(value), ' '.join(fmt)) return datetime.strptime(' '.join(value), ' '.join(fmt)).replace( tzinfo=_get_timezone(tz))
def testTruncateRestart(self): truncate = self.db.truncate self.assertRaises(TypeError, truncate, 'test_table', restart='invalid') query = self.db.query self.createTable('test_table', 'n serial, t text') for n in range(3): query("insert into test_table (t) values ('test')") q = "select count(n), min(n), max(n) from test_table" r = query(q).getresult()[0] self.assertEqual(r, (3, 1, 3)) truncate('test_table') r = query(q).getresult()[0] self.assertEqual(r, (0, None, None)) for n in range(3): query("insert into test_table (t) values ('test')") r = query(q).getresult()[0] self.assertEqual(r, (3, 4, 6)) truncate('test_table', restart=True) r = query(q).getresult()[0] self.assertEqual(r, (0, None, None)) for n in range(3): query("insert into test_table (t) values ('test')") r = query(q).getresult()[0] self.assertEqual(r, (3, 1, 3))
def testDate(self): query = self.db.query for datestyle in ('ISO', 'Postgres, MDY', 'Postgres, DMY', 'SQL, MDY', 'SQL, DMY', 'German'): self.db.set_parameter('datestyle', datestyle) d = date(2016, 3, 14) q = "select $1::date" r = query(q, (d,)).getresult()[0][0] self.assertIsInstance(r, date) self.assertEqual(r, d) q = "select '10000-08-01'::date, '0099-01-08 BC'::date" r = query(q).getresult()[0] self.assertIsInstance(r[0], date) self.assertIsInstance(r[1], date) self.assertEqual(r[0], date.max) self.assertEqual(r[1], date.min) q = "select 'infinity'::date, '-infinity'::date" r = query(q).getresult()[0] self.assertIsInstance(r[0], date) self.assertIsInstance(r[1], date) self.assertEqual(r[0], date.max) self.assertEqual(r[1], date.min)
def transaction_list(self, start=datetime.min, end=datetime.max, format=ReportFormat.printout, component_path="", output_path=None): """ Generate a transaction list report. :param start: The start date to generate the report for. :param end: The end date to generate the report for. :param format: The format of the report. :param component_path: The path of the component to filter the report's transactions by. :param output_path: The path to the file the report is written to. If None, then the report is not written to a file. :returns: The generated report. """ rpt = TransactionList(self, start, end, component_path, output_path) return rpt.render(format)
def income_statement(self, start=datetime.min, end=datetime.max, format=ReportFormat.printout, component_path="", output_path=None): """ Generate a transaction list report. :param start: The start date to generate the report for. :param end: The end date to generate the report for. :param format: The format of the report. :param component_path: The path of the component to filter the report's transactions by. :param output_path: The path to the file the report is written to. If None, then the report is not written to a file. :returns: The generated report. """ rpt = IncomeStatement(self, start, end, component_path, output_path) return rpt.render(format)
def __init__(self, name, dt_account, cr_account, amount=0, start=datetime.min, end=datetime.max, interval=1, description=None): """ """ super(BasicActivity, self).__init__( name, description=description, start=start, end=end, interval=interval) self.interval = interval self.amount = amount self.dt_account = dt_account self.cr_account = cr_account
def test_IsoDateTimeType_model(self): dt = IsoDateTimeType() value = dt.to_primitive(now) self.assertEqual(now, dt.to_native(now)) self.assertEqual(now, dt.to_native(value)) date = datetime.now() value = dt.to_primitive(date) self.assertEqual(date, dt.to_native(date)) self.assertEqual(now.tzinfo, dt.to_native(value).tzinfo) # ParseError for date in (None, '', 2017, "2007-06-23X06:40:34.00Z"): with self.assertRaisesRegexp(ConversionError, u'Could not parse %s. Should be ISO8601.' % date): dt.to_native(date) # OverflowError for date in (datetime.max, datetime.min): self.assertEqual(date, dt.to_native(date)) with self.assertRaises(ConversionError): dt.to_native(dt.to_primitive(date))
def map_from_json(json_slots): slots = [] for json_slot in json_slots: from_long = json_slot['start'] / 1e3 if from_long < TimeSlot.time_to_long(datetime.min): from_long = TimeSlot.time_to_long(datetime.min) duration_long = json_slot['duration'] / 1e3 to_long = from_long + duration_long if to_long > TimeSlot.time_to_long(datetime.max): to_long = TimeSlot.time_to_long(datetime.max) fr = datetime.fromtimestamp(from_long) to = datetime.fromtimestamp(to_long) slots.append(TimeSlot(fr, to)) return slots
def set(self, key, value): """Sets the key and value represented by this cache item. Note: The $value argument may be any item that can be serialized by python, although the method of serialization is left up to the Implementing Library. Args: value: The serializable value to be stored. :return The invoked object. """ self.key = key self.value = value self.expire_at = datetime.max self._setHit() return self.key, self.value
def save(self, item): """Persists a cache item immediately. :param item: The cache item to save. :return True if the item was successfully persisted. False if there was an error. """ if item.expire_at != datetime.max: expire_seconds = (item.expire_at - datetime.utcnow()).seconds if expire_seconds > 0: return self.client.setex(self.normalize_key(item.key), cPickle.dumps(item), expire_seconds) else: return False else: return self.client.set(self.normalize_key(item.key), cPickle.dumps(item))
def _load_existing_mappings(self): """Load the existing ARP records for this box from the db. Returns: A deferred whose result is a dictionary: { (ip, mac): arpid } """ self._logger.debug("Loading open arp records from database") open_arp_records_queryset = manage.Arp.objects.filter( netbox__id=self.netbox.id, end_time__gte=datetime.max).values('id', 'ip', 'mac') open_arp_records = yield db.run_in_thread( storage.shadowify_queryset_and_commit, open_arp_records_queryset) self._logger.debug("Loaded %d open records from arp", len(open_arp_records)) open_mappings = dict(((IP(arp['ip']), arp['mac']), arp['id']) for arp in open_arp_records) defer.returnValue(open_mappings)
def _make_new_mappings(self, mappings): """Convert a sequence of (ip, mac) tuples into a Arp shadow containers. Arguments: mappings -- An iterable containing tuples: (ip, mac) """ netbox = self.containers.factory(None, shadows.Netbox) timestamp = datetime.now() infinity = datetime.max for (ip, mac) in mappings: if not ip or not mac: continue # Some devices seem to return empty results! arp = self.containers.factory((ip, mac), shadows.Arp) arp.netbox = netbox arp.sysname = self.netbox.sysname arp.ip = ip.strCompressed() arp.mac = mac arp.prefix_id = self._find_largest_matching_prefix(ip) arp.start_time = timestamp arp.end_time = infinity
def test_time_since(self): def timestamp_calc(*args, **kwargs): return datetime.now() - timedelta(*args, **kwargs) minute = 60 hour = minute * 60 self.assertEqual( time_since(None), "Never") self.assertEqual( time_since(timestamp_calc(seconds=(10 * minute + 10))), u"10\xa0mins") self.assertEqual( time_since(timestamp_calc(seconds=(1 * minute + 5))), u"1\xa0min") self.assertEqual( time_since(timestamp_calc(0)), u"Now") self.assertEqual( time_since(datetime.max), u"Now")
def has_notification(self): settings = self.settings_json notifications = settings.get('notifications', []) now = datetime.utcnow() for notification in notifications: try: start = parse_datetime(notification['start']) end = notification.get('end', None) end = parse_datetime(end) if end else datetime.max if now < start or now > end: continue except (ValueError, TypeError, KeyError) as e: continue notification_data = self.notification_data(notification) if notification_data: yield notification_data
def get_user_to_damped_n_messages(self, dt_max, alpha): """ Maps each user to the number of messages before a reference datetime, where each message count is exponentially damped by a constant times the difference between the reference datetime and the datetime of the message. Args: dt_max: A datetime representing the max datetime of messages to consider. alpha: A nonnegative float representing the damping factor. Returns: user_to_damped_n_messages: A dict mapping each user in self.users_union to the damped number of messages by that user before dt_max. The contribution of a message is a float equal to exp(-alpha * t), where t is the difference in days between dt_max and the datetime of the message. """ if alpha < 0: raise ValueError('Must have alpha >= 0') try: # Only keep messages with datetimes <= dt_max filtered = self.filter_by_datetime(end_dt=dt_max) except EmptyConversationError: # Map all users to 0 if dt_max occurs before all messages return self.get_user_to_message_statistic(lambda x: 0) damped_message_count = lambda x: self.exp_damped_day_difference(dt_max, x.timestamp, alpha) user_to_damped_n_messages = filtered.get_user_to_message_statistic(damped_message_count) return user_to_damped_n_messages
def tslice(unit, start=ben(), end=None, step=1, count=float('inf')): """ tslice(unit, start=None, end=None, step=1, count=None) -> generator of Blackhole object unit in ['year', 'month', 'day', 'hour', 'minute', 'second', 'microsecond'] this is some kind xrange-like :param unit: :param start: :param end: :param step: :param count: :return: """ if unit not in Blackhole._units: raise AttributeError() if isinstance(start, basestring): start = ben(start) if isinstance(end, basestring): end = ben(end) cnt = 0 if step > 0: end = end or ben(datetime.max) while start < end and cnt < count: yield start start = start.shifted(**{unit: step}) cnt += 1 elif step < 0: end = end or ben(datetime.min) while start > end and cnt < count: yield start start = start.shifted(**{unit: step}) cnt += 1
def find_max_value(test_func, initial_value): """ Starting from an initial number (integer or float), find the maximum value for which the test function does not yet fail, and return that maximum value. """ assert isinstance(initial_value, int) and initial_value > 0 fails = FailsArray(test_func) value = initial_value # Advance the value exponentially beyond the max value while fails[value] == 0: value *= 2 # Search for the exact max value in the previous range. We search for the # boundary where the fails array goes from 0 to 1. boundary = 0.5 value = binary_search(fails, boundary, value // 2, value) max_value = value - 1 # Verify that we found exactly the maximum: assert fails[max_value] == 0 and fails[max_value + 1] == 1, \ "max_value={}, fails[+-2]: {}, {}, {}, {}, {}".\ format(max_value, fails[max_value - 2], fails[max_value - 1], fails[max_value], fails[max_value + 1], fails[max_value + 2]) return max_value
def x_test_print_datetime_max(self): """Print datetime.max.""" print("\nMax value for Python datetime (datetime.max): {!r}". format(datetime.max)) sys.stdout.flush()
def test_datetime_max(self): """Test timestamp_from_datetime() with datetime.max.""" # The test is that it does not raise an exception: timestamp_from_datetime(datetime.max)
def _get_iteration_params(cls, end, limit): if end is None: if limit is None: raise Exception('one of \'end\' or \'limit\' is required') return cls.max, limit else: return end, sys.maxsize
def testTimestamp(self): query = self.db.query for datestyle in ('ISO', 'Postgres, MDY', 'Postgres, DMY', 'SQL, MDY', 'SQL, DMY', 'German'): self.db.set_parameter('datestyle', datestyle) d = datetime(2016, 3, 14) q = "select $1::timestamp" r = query(q, (d,)).getresult()[0][0] self.assertIsInstance(r, datetime) self.assertEqual(r, d) d = datetime(2016, 3, 14, 15, 9, 26) q = "select $1::timestamp" r = query(q, (d,)).getresult()[0][0] self.assertIsInstance(r, datetime) self.assertEqual(r, d) d = datetime(2016, 3, 14, 15, 9, 26, 535897) q = "select $1::timestamp" r = query(q, (d,)).getresult()[0][0] self.assertIsInstance(r, datetime) self.assertEqual(r, d) q = ("select '10000-08-01 AD'::timestamp," " '0099-01-08 BC'::timestamp") r = query(q).getresult()[0] self.assertIsInstance(r[0], datetime) self.assertIsInstance(r[1], datetime) self.assertEqual(r[0], datetime.max) self.assertEqual(r[1], datetime.min) q = "select 'infinity'::timestamp, '-infinity'::timestamp" r = query(q).getresult()[0] self.assertIsInstance(r[0], datetime) self.assertIsInstance(r[1], datetime) self.assertEqual(r[0], datetime.max) self.assertEqual(r[1], datetime.min)
def testTimestamptz(self): query = self.db.query timezones = dict(CET=1, EET=2, EST=-5, UTC=0) for timezone in sorted(timezones): tz = '%+03d00' % timezones[timezone] try: tzinfo = datetime.strptime(tz, '%z').tzinfo except ValueError: # Python < 3.2 tzinfo = pg._get_timezone(tz) self.db.set_parameter('timezone', timezone) for datestyle in ('ISO', 'Postgres, MDY', 'Postgres, DMY', 'SQL, MDY', 'SQL, DMY', 'German'): self.db.set_parameter('datestyle', datestyle) d = datetime(2016, 3, 14, tzinfo=tzinfo) q = "select $1::timestamptz" r = query(q, (d,)).getresult()[0][0] self.assertIsInstance(r, datetime) self.assertEqual(r, d) d = datetime(2016, 3, 14, 15, 9, 26, tzinfo=tzinfo) q = "select $1::timestamptz" r = query(q, (d,)).getresult()[0][0] self.assertIsInstance(r, datetime) self.assertEqual(r, d) d = datetime(2016, 3, 14, 15, 9, 26, 535897, tzinfo) q = "select $1::timestamptz" r = query(q, (d,)).getresult()[0][0] self.assertIsInstance(r, datetime) self.assertEqual(r, d) q = ("select '10000-08-01 AD'::timestamptz," " '0099-01-08 BC'::timestamptz") r = query(q).getresult()[0] self.assertIsInstance(r[0], datetime) self.assertIsInstance(r[1], datetime) self.assertEqual(r[0], datetime.max) self.assertEqual(r[1], datetime.min) q = "select 'infinity'::timestamptz, '-infinity'::timestamptz" r = query(q).getresult()[0] self.assertIsInstance(r[0], datetime) self.assertIsInstance(r[1], datetime) self.assertEqual(r[0], datetime.max) self.assertEqual(r[1], datetime.min)
def __init__(self, batch_name, sd, mimetype_files=None): self.sd = sd self.batch = batch_name self.log_fp = None self.num_files = 0 self.total_time = 0 self.min_time = timedelta.max self.max_time = timedelta.min self.earliest_time = datetime.max self.latest_time = datetime.min self.queue = self.sd.get_obj('output_queue') self.domain = self.sd.get_obj('output_domain')
def calculateID(self, file_name_fullpath): # Get the creation date for the first PersistenceItem in the audit (they will all be the same) instanceID = datetime.min tmp_instanceID = None try: file_object = loadFile(file_name_fullpath) root = ET.parse(file_object).getroot() file_object.close() reg_key = root.find('PersistenceItem') reg_modified = reg_key.get('created') try: tmp_instanceID = datetime.strptime(reg_modified, "%Y-%m-%dT%H:%M:%SZ") except ValueError as e: tmp_instanceID = datetime.max logger.warning("Invalid reg_modified date found!: %s (%s)" % (reg_modified, file_name_fullpath)) instanceID = tmp_instanceID except Exception: traceback.print_exc(file=sys.stdout) # If we found no PersistenceItem date we go with plan B (but most probably this is corrupt and will fail later) if instanceID is None: file_object = loadFile(file_name_fullpath) content = file_object.read() instanceID = hashlib.md5(content).hexdigest() file_object.close() return instanceID
def calculateID(self, file_name_fullpath): # Get the creation date for the first PersistenceItem in the audit (they will all be the same) instanceID = datetime.min tmp_instanceID = None try: file_object = loadFile(file_name_fullpath) root = ET.parse(file_object).getroot() file_object.close() reg_key = root.find('AppCompatItemExtended') reg_modified = reg_key.get('created') try: tmp_instanceID = datetime.strptime(reg_modified, "%Y-%m-%dT%H:%M:%SZ") except ValueError as e: tmp_instanceID = datetime.max logger.warning("Invalid reg_modified date found!: %s (%s)" % (reg_modified, file_name_fullpath)) instanceID = tmp_instanceID except Exception: traceback.print_exc(file=sys.stdout) # If we found no PersistenceItem date we go with plan B (but most probably this is corrupt and will fail later) if instanceID is None: file_object = loadFile(file_name_fullpath) content = file_object.read() instanceID = hashlib.md5(content).hexdigest() file_object.close() return instanceID
def calculateID(self, file_name_fullpath): instanceID = datetime.min tmp_instanceID = None try: file_object = loadFile(file_name_fullpath) root = ET.parse(file_object).getroot() file_object.close() for reg_key in root.findall('RegistryItem'): tmp_reg_key = reg_key.find('Modified') if tmp_reg_key is not None: reg_modified = tmp_reg_key.text try: tmp_instanceID = datetime.strptime(reg_modified, "%Y-%m-%dT%H:%M:%SZ") except ValueError as e: tmp_instanceID = datetime.max logger.warning("Invalid reg_modified date found!: %s (%s)" % (reg_modified, file_name_fullpath)) if instanceID < tmp_instanceID: instanceID = tmp_instanceID else: logger.warning("Found RegistryItem with no Modified date (Mir bug?): %s" % file_name_fullpath) except Exception: logger.exception("Error on calculateID for: %s" % file_name_fullpath) # If we found no Modified date in any of the RegistryItems we go with plan B (but most probably ShimCacheParser will fail to parse anyway) if instanceID is None: file_object = loadFile(file_name_fullpath) content = file_object.read() instanceID = hashlib.md5(content).hexdigest() file_object.close() return instanceID