我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用datetime.datetime.min()。
def filter_by_first_message_dt(self, start_dt=datetime.min, end_dt=datetime.max): """ Returns a copy of self after filtering self.conversations by conversations whose first messages lie in a datetime interval. Args: start_dt: A datetime object satisfying start_dt <= end_dt. end_dt: A datetime object satisfying start_dt <= end_dt. Returns: A UserConversations object that is equal to self after filtering self.conversations such that the datetime of the first message in each conversation is in the closed interval [start_dt, end_dt]. Raises: EmptyUserConversationsError: Filtering self.conversations results in an empty list. """ if start_dt > end_dt: raise ValueError('Must have start_dt <= end_dt') conversation_filter = lambda x: x.messages[0].timestamp >= start_dt and x.messages[0].timestamp <= end_dt return self.filter_user_conversations(conversation_filter)
def filter_by_last_message_dt(self, start_dt=datetime.min, end_dt=datetime.max): """ Returns a copy of self after filtering self.conversations by conversations whose last messages lie in a datetime interval. Args: start_dt: A datetime object satisfying start_dt <= end_dt. end_dt: A datetime object satisfying start_dt <= end_dt. Returns: A UserConversations object that is equal to self after filtering self.conversations such that the datetime of the last message in each conversation is in the closed interval [start_dt, end_dt]. Raises: EmptyUserConversationsError: Filtering self.conversations results in an empty list. """ if start_dt > end_dt: raise ValueError('Must have start_dt <= end_dt') conversation_filter = lambda x: x.messages[-1].timestamp >= start_dt and x.messages[-1].timestamp <= end_dt return self.filter_user_conversations(conversation_filter)
def filter_by_datetime(self, start_dt=datetime.min, end_dt=datetime.max): """ Returns a copy of self after filtering each conversation's messages to only contain messages that lie in a datetime interval and removing conversations with no messages in the datetime interval. Args: start_dt: A datetime object satisfying start_dt <= end_dt. end_dt: A datetime object satisfying start_dt <= end_dt. Returns: A UserConversations object that is equal to self after filtering the messages of each conversation in self.conversations to keep messages whose datetimes are in the closed interval [start_dt, end_dt]. Conversations with no messages in the interval are discarded. Raises: EmptyUserConversationsError: Across all conversations, there is no message that lies in the closed interval [start_dt, end_dt]. """ if start_dt > end_dt: raise ValueError("start_dt must be less than or equal to end_dt") message_filter = lambda x: x.timestamp >= start_dt and x.timestamp <= end_dt return self.filter_conversations(message_filter=message_filter)
def filter_by_datetime(self, start_dt=datetime.min, end_dt=datetime.max): """ Returns a copy of self after filtering self.messages by messages that lie in a datetime interval. Args: start_dt: A datetime object satisfying start_dt <= end_dt. end_dt: A datetime object satisfying start_dt <= end_dt. Returns: A Conversation object that is equal to self after filtering by messages whose datetimes are in the closed interval [start_dt, end_dt]. Raises: EmptyConversationError: Filtering self.messages results in an empty list. """ if start_dt > end_dt: raise ValueError("start_dt must be less than or equal to end_dt") message_filter = lambda x: x.timestamp >= start_dt and x.timestamp <= end_dt filtered = self.filter_conversation(message_filter=message_filter) return filtered
def cast_date(value, connection): """Cast a date value.""" # The output format depends on the server setting DateStyle. The default # setting ISO and the setting for German are actually unambiguous. The # order of days and months in the other two settings is however ambiguous, # so at least here we need to consult the setting to properly parse values. if value == '-infinity': return date.min if value == 'infinity': return date.max value = value.split() if value[-1] == 'BC': return date.min value = value[0] if len(value) > 10: return date.max fmt = connection.date_format() return datetime.strptime(value, fmt).date()
def cast_timestamp(value, connection): """Cast a timestamp value.""" if value == '-infinity': return datetime.min if value == 'infinity': return datetime.max value = value.split() if value[-1] == 'BC': return datetime.min fmt = connection.date_format() if fmt.endswith('-%Y') and len(value) > 2: value = value[1:5] if len(value[3]) > 4: return datetime.max fmt = ['%d %b' if fmt.startswith('%d') else '%b %d', '%H:%M:%S.%f' if len(value[2]) > 8 else '%H:%M:%S', '%Y'] else: if len(value[0]) > 10: return datetime.max fmt = [fmt, '%H:%M:%S.%f' if len(value[1]) > 8 else '%H:%M:%S'] return datetime.strptime(' '.join(value), ' '.join(fmt))
def cast_timestamptz(value, connection): """Cast a timestamptz value.""" if value == '-infinity': return datetime.min if value == 'infinity': return datetime.max value = value.split() if value[-1] == 'BC': return datetime.min fmt = connection.date_format() if fmt.endswith('-%Y') and len(value) > 2: value = value[1:] if len(value[3]) > 4: return datetime.max fmt = ['%d %b' if fmt.startswith('%d') else '%b %d', '%H:%M:%S.%f' if len(value[2]) > 8 else '%H:%M:%S', '%Y'] value, tz = value[:-1], value[-1] else: if fmt.startswith('%Y-'): tz = _re_timezone.match(value[1]) if tz: value[1], tz = tz.groups() else: tz = '+0000' else: value, tz = value[:-1], value[-1] if len(value[0]) > 10: return datetime.max fmt = [fmt, '%H:%M:%S.%f' if len(value[1]) > 8 else '%H:%M:%S'] if _has_timezone: value.append(_timezone_as_offset(tz)) fmt.append('%z') return datetime.strptime(' '.join(value), ' '.join(fmt)) return datetime.strptime(' '.join(value), ' '.join(fmt)).replace( tzinfo=_get_timezone(tz))
def testTruncateRestart(self): truncate = self.db.truncate self.assertRaises(TypeError, truncate, 'test_table', restart='invalid') query = self.db.query self.createTable('test_table', 'n serial, t text') for n in range(3): query("insert into test_table (t) values ('test')") q = "select count(n), min(n), max(n) from test_table" r = query(q).getresult()[0] self.assertEqual(r, (3, 1, 3)) truncate('test_table') r = query(q).getresult()[0] self.assertEqual(r, (0, None, None)) for n in range(3): query("insert into test_table (t) values ('test')") r = query(q).getresult()[0] self.assertEqual(r, (3, 4, 6)) truncate('test_table', restart=True) r = query(q).getresult()[0] self.assertEqual(r, (0, None, None)) for n in range(3): query("insert into test_table (t) values ('test')") r = query(q).getresult()[0] self.assertEqual(r, (3, 1, 3))
def testDate(self): query = self.db.query for datestyle in ('ISO', 'Postgres, MDY', 'Postgres, DMY', 'SQL, MDY', 'SQL, DMY', 'German'): self.db.set_parameter('datestyle', datestyle) d = date(2016, 3, 14) q = "select $1::date" r = query(q, (d,)).getresult()[0][0] self.assertIsInstance(r, date) self.assertEqual(r, d) q = "select '10000-08-01'::date, '0099-01-08 BC'::date" r = query(q).getresult()[0] self.assertIsInstance(r[0], date) self.assertIsInstance(r[1], date) self.assertEqual(r[0], date.max) self.assertEqual(r[1], date.min) q = "select 'infinity'::date, '-infinity'::date" r = query(q).getresult()[0] self.assertIsInstance(r[0], date) self.assertIsInstance(r[1], date) self.assertEqual(r[0], date.max) self.assertEqual(r[1], date.min)
def __init__(self, name, description=None, start_datetime=datetime.min, timestep_period_duration=TimePeriod.month, timestep_period_count=1): """Initialise the object. :param name: The name. :param description: The description. :param start_datetime: The start datetime. :param timestep_period_duration: The duration of each time period. :param timestep_period_count: The number of periods that makes up a timestep. """ super(Clock, self).__init__(name, description) self.start_datetime = start_datetime self.timestep_period_duration = timestep_period_duration self.timestep_period_count = timestep_period_count self.timestep_ix = 0
def create_transaction(self, name, description=None, tx_date=datetime.min.date(), dt_account=None, cr_account=None, source=None, amount=0.00): """ Create a transaction in the general ledger. :param name: The transaction's name. :param description: The transaction's description. :param tx_date: The date of the transaction. :param cr_account: The transaction's credit account's name. :param dt_account: The transaction's debit account's name. :param source: The name of source the transaction originated from. :param amount: The transaction amount. :returns: The created transaction. """ new_tx = Transaction(name, description, tx_date, dt_account, cr_account, source, amount) self.transactions.append(new_tx) return new_tx
def transaction_list(self, start=datetime.min, end=datetime.max, format=ReportFormat.printout, component_path="", output_path=None): """ Generate a transaction list report. :param start: The start date to generate the report for. :param end: The end date to generate the report for. :param format: The format of the report. :param component_path: The path of the component to filter the report's transactions by. :param output_path: The path to the file the report is written to. If None, then the report is not written to a file. :returns: The generated report. """ rpt = TransactionList(self, start, end, component_path, output_path) return rpt.render(format)
def income_statement(self, start=datetime.min, end=datetime.max, format=ReportFormat.printout, component_path="", output_path=None): """ Generate a transaction list report. :param start: The start date to generate the report for. :param end: The end date to generate the report for. :param format: The format of the report. :param component_path: The path of the component to filter the report's transactions by. :param output_path: The path to the file the report is written to. If None, then the report is not written to a file. :returns: The generated report. """ rpt = IncomeStatement(self, start, end, component_path, output_path) return rpt.render(format)
def test_IsoDateTimeType_model(self): dt = IsoDateTimeType() value = dt.to_primitive(now) self.assertEqual(now, dt.to_native(now)) self.assertEqual(now, dt.to_native(value)) date = datetime.now() value = dt.to_primitive(date) self.assertEqual(date, dt.to_native(date)) self.assertEqual(now.tzinfo, dt.to_native(value).tzinfo) # ParseError for date in (None, '', 2017, "2007-06-23X06:40:34.00Z"): with self.assertRaisesRegexp(ConversionError, u'Could not parse %s. Should be ISO8601.' % date): dt.to_native(date) # OverflowError for date in (datetime.max, datetime.min): self.assertEqual(date, dt.to_native(date)) with self.assertRaises(ConversionError): dt.to_native(dt.to_primitive(date))
def obtain_flags(self, symbol): """ This will add attributes to price_result and indicate the results of a couple testsin the `flags` key """ # Test flags self.price_result[symbol]["flags"] = [] # Check max price change if fabs(self.price_result[symbol]["priceChange"]) > fabs(self.assetconf(symbol, "min_change")): self.price_result[symbol]["flags"].append("min_change") # Check max price change if fabs(self.price_result[symbol]["priceChange"]) > fabs(self.assetconf(symbol, "warn_change")): self.price_result[symbol]["flags"].append("over_warn_change") # Check max price change if fabs(self.price_result[symbol]["priceChange"]) > fabs(self.assetconf(symbol, "skip_change")): self.price_result[symbol]["flags"].append("skip_change") # Feed too old feed_age = self.price_result[symbol]["current_feed"]["date"] if self.price_result[symbol]["current_feed"] else datetime.min if (datetime.utcnow() - feed_age).total_seconds() > self.assetconf(symbol, "maxage"): self.price_result[symbol]["flags"].append("over_max_age")
def map_from_json(json_slots): slots = [] for json_slot in json_slots: from_long = json_slot['start'] / 1e3 if from_long < TimeSlot.time_to_long(datetime.min): from_long = TimeSlot.time_to_long(datetime.min) duration_long = json_slot['duration'] / 1e3 to_long = from_long + duration_long if to_long > TimeSlot.time_to_long(datetime.max): to_long = TimeSlot.time_to_long(datetime.max) fr = datetime.fromtimestamp(from_long) to = datetime.fromtimestamp(to_long) slots.append(TimeSlot(fr, to)) return slots
def inquiry_subtitle(since = None, page = 0): resource_list = [str.strip() for str in config.get('resource','name').split(',')] logging.info("Resource list: " + str(resource_list)) since = config.get('history','since') format = '%Y-%m-%d %H:%M' if since: since_time = datetime.strptime(since, format ) else: since_time = datetime.min logging.info("Start searching subtitle download link since " + since_time.strftime(format)) result = {} for name in resource_list: result.update(inquiry_subtitle_on_resource(name,since_time)) logging.info("Update the history-since time in config file.") config.set('history','since',datetime.now().strftime(format)) config.write(open('leecher.config', 'w', encoding='utf-8')) return result
def __init__(self, username, password, gas=DEFAULT_GAS, solar=DEFAULT_SOLAR): """Initialize toon.""" from toonlib import Toon # Creating the class toon = Toon(username, password) self.toon = toon self.gas = gas self.solar = solar self.data = {} self.last_update = datetime.min self.update()
def datetime_to_long(dt): """Converts a datetime object to a long integer representing the number of microseconds since ``datetime.min``. """ return timedelta_to_usecs(dt.replace(tzinfo=None) - dt.min)
def long_to_datetime(x): """Converts a long integer representing the number of microseconds since ``datetime.min`` to a datetime object. """ days = x // 86400000000 # Microseconds in a day x -= days * 86400000000 seconds = x // 1000000 # Microseconds in a second x -= seconds * 1000000 return datetime.min + timedelta(days=days, seconds=seconds, microseconds=x) # Ambiguous datetime object
def last_updated(self): doc = next(self.query(properties=[self.lu_field]).sort( [(self.lu_field, pymongo.DESCENDING)]).limit(1), None) # Handle when collection has docs but `NoneType` lu_field. return (self.lu_func[0](doc[self.lu_field]) if (doc and doc[self.lu_field]) else datetime.min)
def tslice(unit, start=ben(), end=None, step=1, count=float('inf')): """ tslice(unit, start=None, end=None, step=1, count=None) -> generator of Blackhole object unit in ['year', 'month', 'day', 'hour', 'minute', 'second', 'microsecond'] this is some kind xrange-like :param unit: :param start: :param end: :param step: :param count: :return: """ if unit not in Blackhole._units: raise AttributeError() if isinstance(start, basestring): start = ben(start) if isinstance(end, basestring): end = ben(end) cnt = 0 if step > 0: end = end or ben(datetime.max) while start < end and cnt < count: yield start start = start.shifted(**{unit: step}) cnt += 1 elif step < 0: end = end or ben(datetime.min) while start > end and cnt < count: yield start start = start.shifted(**{unit: step}) cnt += 1
def testTimestamp(self): query = self.db.query for datestyle in ('ISO', 'Postgres, MDY', 'Postgres, DMY', 'SQL, MDY', 'SQL, DMY', 'German'): self.db.set_parameter('datestyle', datestyle) d = datetime(2016, 3, 14) q = "select $1::timestamp" r = query(q, (d,)).getresult()[0][0] self.assertIsInstance(r, datetime) self.assertEqual(r, d) d = datetime(2016, 3, 14, 15, 9, 26) q = "select $1::timestamp" r = query(q, (d,)).getresult()[0][0] self.assertIsInstance(r, datetime) self.assertEqual(r, d) d = datetime(2016, 3, 14, 15, 9, 26, 535897) q = "select $1::timestamp" r = query(q, (d,)).getresult()[0][0] self.assertIsInstance(r, datetime) self.assertEqual(r, d) q = ("select '10000-08-01 AD'::timestamp," " '0099-01-08 BC'::timestamp") r = query(q).getresult()[0] self.assertIsInstance(r[0], datetime) self.assertIsInstance(r[1], datetime) self.assertEqual(r[0], datetime.max) self.assertEqual(r[1], datetime.min) q = "select 'infinity'::timestamp, '-infinity'::timestamp" r = query(q).getresult()[0] self.assertIsInstance(r[0], datetime) self.assertIsInstance(r[1], datetime) self.assertEqual(r[0], datetime.max) self.assertEqual(r[1], datetime.min)
def testTimestamptz(self): query = self.db.query timezones = dict(CET=1, EET=2, EST=-5, UTC=0) for timezone in sorted(timezones): tz = '%+03d00' % timezones[timezone] try: tzinfo = datetime.strptime(tz, '%z').tzinfo except ValueError: # Python < 3.2 tzinfo = pg._get_timezone(tz) self.db.set_parameter('timezone', timezone) for datestyle in ('ISO', 'Postgres, MDY', 'Postgres, DMY', 'SQL, MDY', 'SQL, DMY', 'German'): self.db.set_parameter('datestyle', datestyle) d = datetime(2016, 3, 14, tzinfo=tzinfo) q = "select $1::timestamptz" r = query(q, (d,)).getresult()[0][0] self.assertIsInstance(r, datetime) self.assertEqual(r, d) d = datetime(2016, 3, 14, 15, 9, 26, tzinfo=tzinfo) q = "select $1::timestamptz" r = query(q, (d,)).getresult()[0][0] self.assertIsInstance(r, datetime) self.assertEqual(r, d) d = datetime(2016, 3, 14, 15, 9, 26, 535897, tzinfo) q = "select $1::timestamptz" r = query(q, (d,)).getresult()[0][0] self.assertIsInstance(r, datetime) self.assertEqual(r, d) q = ("select '10000-08-01 AD'::timestamptz," " '0099-01-08 BC'::timestamptz") r = query(q).getresult()[0] self.assertIsInstance(r[0], datetime) self.assertIsInstance(r[1], datetime) self.assertEqual(r[0], datetime.max) self.assertEqual(r[1], datetime.min) q = "select 'infinity'::timestamptz, '-infinity'::timestamptz" r = query(q).getresult()[0] self.assertIsInstance(r[0], datetime) self.assertIsInstance(r[1], datetime) self.assertEqual(r[0], datetime.max) self.assertEqual(r[1], datetime.min)
def __init__(self, batch_name, sd, mimetype_files=None): self.sd = sd self.batch = batch_name self.log_fp = None self.num_files = 0 self.total_time = 0 self.min_time = timedelta.max self.max_time = timedelta.min self.earliest_time = datetime.max self.latest_time = datetime.min self.queue = self.sd.get_obj('output_queue') self.domain = self.sd.get_obj('output_domain')
def calculateID(self, file_name_fullpath): # Get the creation date for the first PersistenceItem in the audit (they will all be the same) instanceID = datetime.min tmp_instanceID = None try: file_object = loadFile(file_name_fullpath) root = ET.parse(file_object).getroot() file_object.close() reg_key = root.find('PersistenceItem') reg_modified = reg_key.get('created') try: tmp_instanceID = datetime.strptime(reg_modified, "%Y-%m-%dT%H:%M:%SZ") except ValueError as e: tmp_instanceID = datetime.max logger.warning("Invalid reg_modified date found!: %s (%s)" % (reg_modified, file_name_fullpath)) instanceID = tmp_instanceID except Exception: traceback.print_exc(file=sys.stdout) # If we found no PersistenceItem date we go with plan B (but most probably this is corrupt and will fail later) if instanceID is None: file_object = loadFile(file_name_fullpath) content = file_object.read() instanceID = hashlib.md5(content).hexdigest() file_object.close() return instanceID
def calculateID(self, file_name_fullpath): # Get the creation date for the first PersistenceItem in the audit (they will all be the same) instanceID = datetime.min tmp_instanceID = None try: file_object = loadFile(file_name_fullpath) root = ET.parse(file_object).getroot() file_object.close() reg_key = root.find('AppCompatItemExtended') reg_modified = reg_key.get('created') try: tmp_instanceID = datetime.strptime(reg_modified, "%Y-%m-%dT%H:%M:%SZ") except ValueError as e: tmp_instanceID = datetime.max logger.warning("Invalid reg_modified date found!: %s (%s)" % (reg_modified, file_name_fullpath)) instanceID = tmp_instanceID except Exception: traceback.print_exc(file=sys.stdout) # If we found no PersistenceItem date we go with plan B (but most probably this is corrupt and will fail later) if instanceID is None: file_object = loadFile(file_name_fullpath) content = file_object.read() instanceID = hashlib.md5(content).hexdigest() file_object.close() return instanceID
def calculateID(self, file_name_fullpath): instanceID = datetime.min tmp_instanceID = None try: file_object = loadFile(file_name_fullpath) root = ET.parse(file_object).getroot() file_object.close() for reg_key in root.findall('RegistryItem'): tmp_reg_key = reg_key.find('Modified') if tmp_reg_key is not None: reg_modified = tmp_reg_key.text try: tmp_instanceID = datetime.strptime(reg_modified, "%Y-%m-%dT%H:%M:%SZ") except ValueError as e: tmp_instanceID = datetime.max logger.warning("Invalid reg_modified date found!: %s (%s)" % (reg_modified, file_name_fullpath)) if instanceID < tmp_instanceID: instanceID = tmp_instanceID else: logger.warning("Found RegistryItem with no Modified date (Mir bug?): %s" % file_name_fullpath) except Exception: logger.exception("Error on calculateID for: %s" % file_name_fullpath) # If we found no Modified date in any of the RegistryItems we go with plan B (but most probably ShimCacheParser will fail to parse anyway) if instanceID is None: file_object = loadFile(file_name_fullpath) content = file_object.read() instanceID = hashlib.md5(content).hexdigest() file_object.close() return instanceID
def calculateID(self, file_name_fullpath): # Get the creation date for the first PersistenceItem in the audit (they will all be the same) instanceID = datetime.min tmp_instanceID = None try: file_object = loadFile(file_name_fullpath) root = ET.parse(file_object).getroot() file_object.close() reg_key = root.find('AmCacheItem') reg_modified = reg_key.get('created') try: tmp_instanceID = datetime.strptime(reg_modified, "%Y-%m-%dT%H:%M:%SZ") except ValueError as e: tmp_instanceID = datetime.max logger.warning("Invalid reg_modified date found!: %s (%s)" % (reg_modified, file_name_fullpath)) instanceID = tmp_instanceID except Exception: traceback.print_exc(file=sys.stdout) # If we found no PersistenceItem date we go with plan B (but most probably this is corrupt and will fail later) if instanceID is None: file_object = loadFile(file_name_fullpath) content = file_object.read() instanceID = hashlib.md5(content).hexdigest() file_object.close() return instanceID
def make_windows_timestamp_value_getter(value_name): """ return a function that fetches the value from the registry key as a Windows timestamp. """ f = make_value_getter(value_name) def _value_getter(key): try: if key[0].get_value_by_name(value_name) != None: return parse_windows_timestamp(key[0].get_value_by_name(value_name).get_data_as_integer() or 0) else: return datetime.min except ValueError: return datetime.min return _value_getter
def make_unix_timestamp_value_getter(value_name): """ return a function that fetches the value from the registry key as a UNIX timestamp. """ f = make_value_getter(value_name) def _value_getter(key): try: if key[0].get_value_by_name(value_name) != None: return parse_unix_timestamp(key[0].get_value_by_name(value_name).get_data_as_integer() or 0) else: return datetime.min except ValueError: return datetime.min return _value_getter
def find_date_range(self): date_min = datetime.max date_max = datetime.min for tweet in self.get_collection_iterators(): date_to_process = datetime.strptime(tweet['created_at'],'%a %b %d %H:%M:%S +0000 %Y') if date_to_process <= date_min: date_min = date_to_process if date_to_process >= date_max: date_max = date_to_process return {"date_min":date_min,"date_max":date_max}
def find_date_range(self): date_min = datetime.max date_max = datetime.min for tweet in self.collection.get_iterator(): date_to_process = datetime.strptime(tweet['created_at'],'%a %b %d %H:%M:%S +0000 %Y') if date_to_process <= date_min: date_min = date_to_process if date_to_process >= date_max: date_max = date_to_process return {"date_min":date_min,"date_max":date_max}