我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用datetime.datetime.isoformat()。
def test_write_timestamp_file(self, mock_dt): """ Assert a TaskRunner calls write_json with the expected args. Implicitly testing that we are using datetime.isoformat to derive the value for the "RUN" key. :param mock_dt: ??? not sure this patch is necessary """ fake_date_str = "not iso formatted" mock_datetime_obj = mock.MagicMock() mock_datetime_obj.isoformat = mock.MagicMock(return_value=fake_date_str) mock_dt.datetime.now = mock.MagicMock(return_value=mock_datetime_obj) self.task_runner_instance.write_json = mock.MagicMock() self.task_runner_instance.write_timestamp_file() self.task_runner_instance.write_json.assert_called_once_with({'RUN': fake_date_str})
def test_reap_dead_node(self): node = copy.deepcopy(self.dummy_node) TestInstance = collections.namedtuple('TestInstance', ['launch_time']) instance = TestInstance(datetime.now(pytz.utc)) ready_condition = None for condition in node['status']['conditions']: if condition['type'] == 'Ready': ready_condition = condition break ready_condition['status'] = 'Unknown' ready_condition['lastHeartbeatTime'] = datetime.isoformat(datetime.now(pytz.utc) - timedelta(minutes=30)) kube_node = KubeNode(pykube.Node(self.api, node)) kube_node.delete = mock.Mock(return_value="mocked stuff") self.cluster.maintain([kube_node], {kube_node.instance_id: instance}, {}, [], []) kube_node.delete.assert_not_called() ready_condition['lastHeartbeatTime'] = datetime.isoformat(datetime.now(pytz.utc) - timedelta(hours=2)) kube_node = KubeNode(pykube.Node(self.api, node)) kube_node.delete = mock.Mock(return_value="mocked stuff") self.cluster.maintain([kube_node], {kube_node.instance_id: instance}, {}, [], []) kube_node.delete.assert_called_once_with()
def test_max_scale_in(self): node1 = copy.deepcopy(self.dummy_node) node2 = copy.deepcopy(self.dummy_node) TestInstance = collections.namedtuple('TestInstance', ['launch_time']) instance1 = TestInstance(datetime.now(pytz.utc)) instance2 = TestInstance(datetime.now(pytz.utc)) for node in [node1, node2]: for condition in node['status']['conditions']: if condition['type'] == 'Ready': condition['status'] = 'Unknown' condition['lastHeartbeatTime'] = datetime.isoformat(datetime.now(pytz.utc) - timedelta(hours=2)) break kube_node1 = KubeNode(pykube.Node(self.api, node1)) kube_node1.delete = mock.Mock(return_value="mocked stuff") kube_node2 = KubeNode(pykube.Node(self.api, node2)) kube_node2.delete = mock.Mock(return_value="mocked stuff") self.cluster.maintain([kube_node1, kube_node2], {kube_node1.instance_id: instance1, kube_node2.instance_id: instance2}, {}, [], []) kube_node1.delete.assert_not_called() kube_node2.delete.assert_not_called()
def take_screenshot(driver: webdriver, page_name: str): """Will take a screenshot of current page. :param driver: Any of the WebDrivers :param page_name: page name which will be used in the screenshot filename """ if TAKE_SCREENSHOTS: session_id = driver.session_id browser = driver.capabilities.get("browserName", "unknown_browser") version = driver.capabilities.get("version", "unknown_version") platform = driver.capabilities.get("platform", "unknown_platform") stamp = datetime.isoformat(datetime.utcnow()) filename = ("{}-{}-{}-{}-{}-{}.png" .format(stamp, page_name, browser, version, platform, session_id)) file_path = abspath(join("screenshots", filename)) driver.save_screenshot(file_path) logging.debug( "Screenshot of %s page saved in: %s", page_name, filename) else: logging.debug( "Taking screenshots is disabled. In order to turn it on please set" " n environment variable TAKE_SCREENSHOTS=true")
def accumulate_data(unique_station_sensor,data,date_filter): """ Prep the data in a format easy to push to the server """ rolled_up_data = {} with open(data,'r') as fi: r = csv.reader(fi) for i, row in enumerate(r): j = i-1 if i == 0: continue else: if date_filter[j] and row[4] != "": #only process row if date_filter is true and value is not missing data_id = (row[0],row[3]) #(stationid,parameter) date_time = dateutil.parser.parse("{} {}".format(row[1],row[2])) data_value = (datetime.isoformat(date_time),row[4]) #print(data_value) #rolled_up_data.setdefault(data_id, []).append(data_value) rolled_up_data.setdefault(data_id, {}).setdefault('values',[]).append(data_value) for k,v in rolled_up_data.items(): count = len(v['values']) rolled_up_data[k]['count']=count return rolled_up_data
def create_zun_service(self, values): values['created_at'] = datetime.isoformat(timeutils.utcnow()) zun_service = models.ZunService(values) zun_service.save() return zun_service
def update_zun_service(self, host, binary, values): try: target = self.client.read('/zun_services/' + host + '_' + binary) target_value = json.loads(target.value) values['updated_at'] = datetime.isoformat(timeutils.utcnow()) target_value.update(values) target.value = json.dump_as_bytes(target_value) self.client.update(target) except etcd.EtcdKeyNotFound: raise exception.ZunServiceNotFound(host=host, binary=binary) except Exception as e: LOG.error('Error occurred while updating service: %s', six.text_type(e)) raise
def create_compute_node(self, context, values): values['created_at'] = datetime.isoformat(timeutils.utcnow()) if not values.get('uuid'): values['uuid'] = uuidutils.generate_uuid() compute_node = models.ComputeNode(values) compute_node.save() return compute_node
def setUp(self): # load dummy kube specs dir_path = os.path.dirname(os.path.realpath(__file__)) with open(os.path.join(dir_path, 'data/busybox.yaml'), 'r') as f: self.dummy_pod = yaml.load(f.read()) with open(os.path.join(dir_path, 'data/ds-pod.yaml'), 'r') as f: self.dummy_ds_pod = yaml.load(f.read()) with open(os.path.join(dir_path, 'data/rc-pod.yaml'), 'r') as f: self.dummy_rc_pod = yaml.load(f.read()) with open(os.path.join(dir_path, 'data/node.yaml'), 'r') as f: self.dummy_node = yaml.load(f.read()) for condition in self.dummy_node['status']['conditions']: if condition['type'] == 'Ready' and condition['status'] == 'True': condition['lastHeartbeatTime'] = datetime.now(condition['lastHeartbeatTime'].tzinfo) # Convert timestamps to strings to match PyKube for condition in self.dummy_node['status']['conditions']: condition['lastHeartbeatTime'] = datetime.isoformat(condition['lastHeartbeatTime']) condition['lastTransitionTime'] = datetime.isoformat(condition['lastTransitionTime']) # this isn't actually used here # only needed to create the KubePod object... dir_path = os.path.dirname(os.path.realpath(__file__)) self.api = pykube.HTTPClient(pykube.KubeConfig.from_file(os.path.join(dir_path, './data/kube_config.yaml'))) self.cluster = Cluster( kubeconfig='~/.kube/config', idle_threshold=60, spare_agents=1, instance_init_time=60, resource_group='my-rg', notifier=None, service_principal_app_id='dummy', service_principal_secret='dummy', service_principal_tenant_id='dummy', kubeconfig_private_key='dummy', client_private_key='dummy', ca_private_key='dummy', ignore_pools='', over_provision=0 )
def validUpgrade(nodeIds, tconf): schedule = {} unow = datetime.utcnow().replace(tzinfo=dateutil.tz.tzutc()) startAt = unow + timedelta(seconds=100) acceptableDiff = tconf.MinSepBetweenNodeUpgrades + 1 for i in nodeIds: schedule[i] = datetime.isoformat(startAt) startAt = startAt + timedelta(seconds=acceptableDiff + 3) return dict(name='upgrade-13', version=bumpedVersion(), action=START, schedule=schedule, # sha256=get_valid_code_hash(), sha256='db34a72a90d026dae49c3b3f0436c8d3963476c77468ad955845a1ccf7b03f55', timeout=1)
def invalidUpgrade(nodeIds, tconf): schedule = {} unow = datetime.utcnow().replace(tzinfo=dateutil.tz.tzutc()) startAt = unow + timedelta(seconds=60) acceptableDiff = tconf.MinSepBetweenNodeUpgrades + 1 for i in nodeIds: schedule[i] = datetime.isoformat(startAt) startAt = startAt + timedelta(seconds=acceptableDiff - 3) return dict(name='upgrade-14', version=bumpedVersion(), action=START, schedule=schedule, # sha256=get_valid_code_hash(), sha256='46c715a90b1067142d548cb1f1405b0486b32b1a27d418ef3a52bd976e9fae50', timeout=10)
def _timefmt(val): return datetime.isoformat(datetime.utcfromtimestamp(val))
def get_sigmf_iso8601_datetime_now(): return datetime.isoformat(datetime.utcnow()) + 'Z'
def to_json(manager_class, attr): try: return datetime.isoformat(attr) except: return None
def generate_timestamp(): """generate ISO8601 timestamp incl microsends, but with colons replaced to avoid problems if used as file name """ return datetime.isoformat(datetime.now()).replace(":", "-")
def validUpgrade(nodeIds, tconf): schedule = {} unow = datetime.utcnow().replace(tzinfo=dateutil.tz.tzutc()) startAt = unow + timedelta(seconds=90) acceptableDiff = tconf.MinSepBetweenNodeUpgrades + 1 for i in nodeIds: schedule[i] = datetime.isoformat(startAt) startAt = startAt + timedelta(seconds=acceptableDiff + 3) return dict(name='upgrade-13', version=bumpedVersion(), action=START, schedule=schedule, sha256='aad1242', timeout=10)
def invalidUpgrade(nodeIds, tconf): schedule = {} unow = datetime.utcnow().replace(tzinfo=dateutil.tz.tzutc()) startAt = unow + timedelta(seconds=60) acceptableDiff = tconf.MinSepBetweenNodeUpgrades + 1 for i in nodeIds: schedule[i] = datetime.isoformat(startAt) startAt = startAt + timedelta(seconds=acceptableDiff - 3) return dict(name='upgrade-14', version=bumpedVersion(), action=START, schedule=schedule, sha256='ffd1224', timeout=10)
def __init__(self, args, optimizer, name, extra_msg=''): self.iteration = 0 self.sum_loss = 0 self.sum_acc = 0 self.sum_mean_diff = 0 self.sum_max_diff = 0 self.current_section = '' self.optimizer = optimizer # setup according to arguments self.name = name if name is not '' else 'signdist' self.out_file = "{}_{}".format(date.isoformat(date.today()), self.name) self.log_file = "{}.log".format(self.out_file) # write config to head of the log file self.write_config(args, extra_msg)
def write_config(self, args, extra_msg): with open(self.log_file, 'a+') as f: self._comment("=" * 40, f) self._comment("{} initiated at {}".format( self.name, datetime.isoformat(datetime.now()) ), f) self._comment("-" * 40, f) # arguments passed self._comment("Data: " + args.data, f) self._comment("Batchsize: {}".format(args.batchsize), f) self._comment("Test ratio: {}".format(args.test), f) self._comment("Hard triplet ratio: {}".format(args.skilled), f) dev = "CPU" if args.gpu < 0 else "GPU ".format(args.gpu) self._comment("Device: " + dev, f) if args.initmodel: self._comment("Init model: " + args.initmodel, f) if args.resume: self._comment("Resume state: " + args.resume, f) self._comment("-" * 40, f) # parameters set in script self._comment("Optimizer: " + self.optimizer.__class__.__name__, f) self._comment("Initial LR: {}".format(self.optimizer.lr), f) self._comment("LR interval: {}".format(args.lrinterval), f) self._comment("Weight decay: {}".format(args.weight_decay), f) self._comment("Epoch: {}".format(self.optimizer.epoch), f) if extra_msg: self._comment(extra_msg, f) self._comment("-" * 40, f) # complete call self._comment("{}".format(sys.argv), f) self._comment("=" * 40, f)
def big_days(x=0): """ # page 17, RECORD 11 >>> parse_date( big_days(0) ).isoformat( ) '2012-11-20T21:53:41' # page 17, ~ RECORD 12 >>> parse_date( big_days(1) ).isoformat( ) '2012-11-20T22:07:38' >>> parse_date( big_days(2) ).isoformat( ) '2012-11-20T21:53:41' >>> parse_date( big_days(3) ).isoformat( ) '2012-11-20T22:07:38' # page 16, RECORD ~15 >>> parse_date( big_days(4) ).isoformat( ) '2012-11-25T16:41:34' >>> parse_date( big_days(5) ).isoformat( ) '2012-11-25T13:54:32' # page 15 >>> parse_date( big_days(6) ).isoformat( ) '2012-11-29T20:25:37' # page 0 >>> parse_date( big_days(7) ).isoformat( ) '2012-12-20T14:59:02' >>> parse_date( big_days(8) ).isoformat( ) '2012-12-20T15:28:25' """ return _bad_days[x]
def strptime(string, formats=None): """ Converts a date in string format into a datetime python object. The inverse can be obtained by calling datetime.isoformat() (which returns 'T' as date time separator, and optionally microseconds if they are not zero). This function is an easy version of `dateutil.parser.parse` for parsing iso-like datetime format (e.g. fdnsws standard) without the need of a module import :param: string: if a datetime object, returns it. If date object, converts to datetime and returns it. Otherwise must be a string representing a datetime :type: string: a string, a date or a datetime object (in that case just returns it) :param formats: if list or iterable, it holds the strings denoting the formats to be used to convert string (in the order they are declared). If None (the default), the datetime format will be guessed from the string length among the following (with optional 'Z', and with 'T' replaced by space as vaild option): - '%Y-%m-%dT%H:%M:%S.%fZ' - '%Y-%m-%dT%H:%M:%SZ' - '%Y-%m-%dZ' :raise: ValueError if the string cannot be parsed :type: on_err_return_none: object or Exception :return: a datetime object :Example:
strptime("2016-06-01T09:04:00.5600Z") strptime("2016-06-01T09:04:00.5600") strptime("2016-06-01 09:04:00.5600Z") strptime("2016-06-01T09:04:00Z") strptime("2016-06-01T09:04:00") strptime("2016-06-01 09:04:00Z") strptime("2016-06-01") ``` """ if isinstance(string, datetime): return string try: string = string.strip() if formats is None: has_z = string[-1] == 'Z' has_t = 'T' in string if has_t or has_z or ' ' in string: t_str, z_str = 'T' if has_t else ' ', 'Z' if has_z else '' formats = ['%Y-%m-%d{}%H:%M:%S.%f{}'.format(t_str, z_str), '%Y-%m-%d{}%H:%M:%S{}'.format(t_str, z_str)] else: formats = ['%Y-%m-%d'] for dtformat in formats: try: return datetime.strptime(string, dtformat) except ValueError: # as exce: pass raise ValueError("invalid date time '%s'" % str(string)) except ValueError: raise except Exception as exc: raise ValueError(str(exc))
```
def _test_decode_bolus( ): """ ## correct >>> parse_date( bytearray( _bewest_dates['page-19'][6] ) ).isoformat( ) '2012-11-12T00:55:42' ## correct >>> parse_date( bytearray( _bewest_dates['page-19'][0] ) ).isoformat( ) '2012-11-12T00:55:42' ## this is wrong >>> parse_date( bytearray( _bewest_dates['page-19'][1] ) ).isoformat( ) '2012-04-10T12:12:00' ## day,month is wrong, time H:M:S is correct # expected: >>> parse_date( bytearray( _bewest_dates['page-19'][2] ) ).isoformat( ) '2012-02-08T03:11:12' ## correct >>> parse_date( bytearray( _bewest_dates['page-19'][3] ) ).isoformat( ) '2012-11-12T08:03:11' #### not a valid date # >>> parse_date( bytearray( _bewest_dates['page-19'][4] ) ).isoformat( ) ## correct >>> parse_date( bytearray( _bewest_dates['page-19'][5] ) ).isoformat( ) '2012-11-12T08:03:13' """ """ 0x5b 0x7e # bolus wizard, 0xaa 0xf7 0x00 0x0c 0x0c # page-19[0] 0x0f 0x50 0x0d 0x2d 0x6a 0x00 0x0b 0x00 0x00 0x07 0x00 0x0b 0x7d 0x5c 0x08 0x58 0x97 0x04 0x30 0x05 0x14 0x34 0xc8 0x91 0xf8 # 0x91, 0xf8 = month=11, minute=56, seconds=17! 0x00 # general parsing fails here 0x00 0xaa 0xf7 0x40 0x0c 0x0c # expected - page-19[6] 0x0a 0x0c 0x8b 0xc3 0x28 0x0c 0x8c # page-19[3] 0x5b 0x0c 0x8d 0xc3 0x08 0x0c 0x0c # page-19[5] 0x00 0x51 0x0d 0x2d 0x6a 0x1f 0x00 0x00 0x00 0x00 0x00 """ ### csv deconstructed
def decode_wizard(data): """ BYTE 01: 02: 03: 04: 05: 06: 07: 08: 09: 10: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: """ head = data[:2] date = data[2:7] datetime = parse_date(date) body = data[7:] bg = lib.BangInt([ body[1] & 0x0f, head[1] ]) carb_input = int(body[0]) carb_ratio = int(body[2]) bg_target_low = int(body[5]) bg_target_high = int(body[3]) sensitivity = int(body[4]) print "BOLUS WIZARD", datetime.isoformat( ) wizard = { 'bg_input': bg, 'carb_input': carb_input, 'carb_ratio': carb_ratio, 'insulin_sensitivity': sensitivity, 'bg_target_low': bg_target_low, 'bg_target_high': bg_target_high, } return wizard