Python datetime.datetime 模块,isoformat() 实例源码

我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用datetime.datetime.isoformat()

项目:fileflow    作者:industrydive    | 项目源码 | 文件源码
def test_write_timestamp_file(self, mock_dt):
        """
        Assert a TaskRunner calls write_json with the expected args. Implicitly testing that we are using datetime.isoformat to derive
        the value for the "RUN" key.

        :param mock_dt: ??? not sure this patch is necessary
        """
        fake_date_str = "not iso formatted"
        mock_datetime_obj = mock.MagicMock()
        mock_datetime_obj.isoformat = mock.MagicMock(return_value=fake_date_str)
        mock_dt.datetime.now = mock.MagicMock(return_value=mock_datetime_obj)

        self.task_runner_instance.write_json = mock.MagicMock()
        self.task_runner_instance.write_timestamp_file()

        self.task_runner_instance.write_json.assert_called_once_with({'RUN': fake_date_str})
项目:kubernetes-ec2-autoscaler    作者:openai    | 项目源码 | 文件源码
def test_reap_dead_node(self):
        node = copy.deepcopy(self.dummy_node)
        TestInstance = collections.namedtuple('TestInstance', ['launch_time'])
        instance = TestInstance(datetime.now(pytz.utc))

        ready_condition = None
        for condition in node['status']['conditions']:
            if condition['type'] == 'Ready':
                ready_condition = condition
                break
        ready_condition['status'] = 'Unknown'

        ready_condition['lastHeartbeatTime'] = datetime.isoformat(datetime.now(pytz.utc) - timedelta(minutes=30))
        kube_node = KubeNode(pykube.Node(self.api, node))
        kube_node.delete = mock.Mock(return_value="mocked stuff")
        self.cluster.maintain([kube_node], {kube_node.instance_id: instance}, {}, [], [])
        kube_node.delete.assert_not_called()

        ready_condition['lastHeartbeatTime'] = datetime.isoformat(datetime.now(pytz.utc) - timedelta(hours=2))
        kube_node = KubeNode(pykube.Node(self.api, node))
        kube_node.delete = mock.Mock(return_value="mocked stuff")
        self.cluster.maintain([kube_node], {kube_node.instance_id: instance}, {}, [], [])
        kube_node.delete.assert_called_once_with()
项目:kubernetes-ec2-autoscaler    作者:openai    | 项目源码 | 文件源码
def test_max_scale_in(self):
        node1 = copy.deepcopy(self.dummy_node)
        node2 = copy.deepcopy(self.dummy_node)
        TestInstance = collections.namedtuple('TestInstance', ['launch_time'])
        instance1 = TestInstance(datetime.now(pytz.utc))
        instance2 = TestInstance(datetime.now(pytz.utc))

        for node in [node1, node2]:
            for condition in node['status']['conditions']:
                if condition['type'] == 'Ready':
                    condition['status'] = 'Unknown'
                    condition['lastHeartbeatTime'] = datetime.isoformat(datetime.now(pytz.utc) - timedelta(hours=2))
                    break

        kube_node1 = KubeNode(pykube.Node(self.api, node1))
        kube_node1.delete = mock.Mock(return_value="mocked stuff")
        kube_node2 = KubeNode(pykube.Node(self.api, node2))
        kube_node2.delete = mock.Mock(return_value="mocked stuff")
        self.cluster.maintain([kube_node1, kube_node2], {kube_node1.instance_id: instance1, kube_node2.instance_id: instance2}, {}, [], [])
        kube_node1.delete.assert_not_called()
        kube_node2.delete.assert_not_called()
项目:directory-tests    作者:uktrade    | 项目源码 | 文件源码
def take_screenshot(driver: webdriver, page_name: str):
    """Will take a screenshot of current page.

    :param driver: Any of the WebDrivers
    :param page_name: page name which will be used in the screenshot filename
    """
    if TAKE_SCREENSHOTS:
        session_id = driver.session_id
        browser = driver.capabilities.get("browserName", "unknown_browser")
        version = driver.capabilities.get("version", "unknown_version")
        platform = driver.capabilities.get("platform", "unknown_platform")
        stamp = datetime.isoformat(datetime.utcnow())
        filename = ("{}-{}-{}-{}-{}-{}.png"
                    .format(stamp, page_name, browser, version, platform,
                            session_id))
        file_path = abspath(join("screenshots", filename))
        driver.save_screenshot(file_path)
        logging.debug(
            "Screenshot of %s page saved in: %s", page_name, filename)
    else:
        logging.debug(
            "Taking screenshots is disabled. In order to turn it on please set"
            " n environment variable TAKE_SCREENSHOTS=true")
项目:IWN-Ingest    作者:LimnoTech    | 项目源码 | 文件源码
def accumulate_data(unique_station_sensor,data,date_filter):
    """
    Prep the data in a format easy to push to the server
    """
    rolled_up_data = {}
    with open(data,'r') as fi:
        r = csv.reader(fi)
        for i, row in enumerate(r):
            j = i-1
            if i == 0:
                continue
            else:
                if date_filter[j] and row[4] != "": #only process row if date_filter is true and value is not missing
                    data_id = (row[0],row[3])  #(stationid,parameter)
                    date_time = dateutil.parser.parse("{} {}".format(row[1],row[2]))
                    data_value = (datetime.isoformat(date_time),row[4])
                    #print(data_value)

                    #rolled_up_data.setdefault(data_id, []).append(data_value)
                    rolled_up_data.setdefault(data_id, {}).setdefault('values',[]).append(data_value)
    for k,v in rolled_up_data.items():
        count = len(v['values'])
        rolled_up_data[k]['count']=count
    return rolled_up_data
项目:zun    作者:openstack    | 项目源码 | 文件源码
def create_zun_service(self, values):
        values['created_at'] = datetime.isoformat(timeutils.utcnow())
        zun_service = models.ZunService(values)
        zun_service.save()
        return zun_service
项目:zun    作者:openstack    | 项目源码 | 文件源码
def update_zun_service(self, host, binary, values):
        try:
            target = self.client.read('/zun_services/' + host + '_' + binary)
            target_value = json.loads(target.value)
            values['updated_at'] = datetime.isoformat(timeutils.utcnow())
            target_value.update(values)
            target.value = json.dump_as_bytes(target_value)
            self.client.update(target)
        except etcd.EtcdKeyNotFound:
            raise exception.ZunServiceNotFound(host=host, binary=binary)
        except Exception as e:
            LOG.error('Error occurred while updating service: %s',
                      six.text_type(e))
            raise
项目:zun    作者:openstack    | 项目源码 | 文件源码
def create_compute_node(self, context, values):
        values['created_at'] = datetime.isoformat(timeutils.utcnow())
        if not values.get('uuid'):
            values['uuid'] = uuidutils.generate_uuid()
        compute_node = models.ComputeNode(values)
        compute_node.save()
        return compute_node
项目:Kubernetes-acs-engine-autoscaler    作者:wbuchwalter    | 项目源码 | 文件源码
def setUp(self):
        # load dummy kube specs
        dir_path = os.path.dirname(os.path.realpath(__file__))
        with open(os.path.join(dir_path, 'data/busybox.yaml'), 'r') as f:
            self.dummy_pod = yaml.load(f.read())
        with open(os.path.join(dir_path, 'data/ds-pod.yaml'), 'r') as f:
            self.dummy_ds_pod = yaml.load(f.read())
        with open(os.path.join(dir_path, 'data/rc-pod.yaml'), 'r') as f:
            self.dummy_rc_pod = yaml.load(f.read())
        with open(os.path.join(dir_path, 'data/node.yaml'), 'r') as f:
            self.dummy_node = yaml.load(f.read())
            for condition in self.dummy_node['status']['conditions']:
                if condition['type'] == 'Ready' and condition['status'] == 'True':
                    condition['lastHeartbeatTime'] = datetime.now(condition['lastHeartbeatTime'].tzinfo)
            # Convert timestamps to strings to match PyKube
            for condition in self.dummy_node['status']['conditions']:
                condition['lastHeartbeatTime'] = datetime.isoformat(condition['lastHeartbeatTime'])
                condition['lastTransitionTime'] = datetime.isoformat(condition['lastTransitionTime'])

        # this isn't actually used here
        # only needed to create the KubePod object...
        dir_path = os.path.dirname(os.path.realpath(__file__))
        self.api = pykube.HTTPClient(pykube.KubeConfig.from_file(os.path.join(dir_path, './data/kube_config.yaml')))

        self.cluster = Cluster(
            kubeconfig='~/.kube/config',
            idle_threshold=60,
            spare_agents=1,
            instance_init_time=60,
            resource_group='my-rg',
            notifier=None,
            service_principal_app_id='dummy',
            service_principal_secret='dummy',
            service_principal_tenant_id='dummy',
            kubeconfig_private_key='dummy',
            client_private_key='dummy',
            ca_private_key='dummy',
            ignore_pools='',
            over_provision=0
        )
项目:indy-node    作者:hyperledger    | 项目源码 | 文件源码
def validUpgrade(nodeIds, tconf):
    schedule = {}
    unow = datetime.utcnow().replace(tzinfo=dateutil.tz.tzutc())
    startAt = unow + timedelta(seconds=100)
    acceptableDiff = tconf.MinSepBetweenNodeUpgrades + 1
    for i in nodeIds:
        schedule[i] = datetime.isoformat(startAt)
        startAt = startAt + timedelta(seconds=acceptableDiff + 3)
    return dict(name='upgrade-13', version=bumpedVersion(), action=START,
                schedule=schedule,
                # sha256=get_valid_code_hash(),
                sha256='db34a72a90d026dae49c3b3f0436c8d3963476c77468ad955845a1ccf7b03f55',
                timeout=1)
项目:indy-node    作者:hyperledger    | 项目源码 | 文件源码
def invalidUpgrade(nodeIds, tconf):
    schedule = {}
    unow = datetime.utcnow().replace(tzinfo=dateutil.tz.tzutc())
    startAt = unow + timedelta(seconds=60)
    acceptableDiff = tconf.MinSepBetweenNodeUpgrades + 1
    for i in nodeIds:
        schedule[i] = datetime.isoformat(startAt)
        startAt = startAt + timedelta(seconds=acceptableDiff - 3)
    return dict(name='upgrade-14', version=bumpedVersion(), action=START,
                schedule=schedule,
                # sha256=get_valid_code_hash(),
                sha256='46c715a90b1067142d548cb1f1405b0486b32b1a27d418ef3a52bd976e9fae50',
                timeout=10)
项目:exchange    作者:boundlessgeo    | 项目源码 | 文件源码
def _timefmt(val):
        return datetime.isoformat(datetime.utcfromtimestamp(val))
项目:SigMF    作者:gnuradio    | 项目源码 | 文件源码
def get_sigmf_iso8601_datetime_now():
    return datetime.isoformat(datetime.utcnow()) + 'Z'
项目:python-yakumo    作者:yosshy    | 项目源码 | 文件源码
def to_json(manager_class, attr):
        try:
            return datetime.isoformat(attr)
        except:
            return None
项目:pipelines    作者:gis-rpd    | 项目源码 | 文件源码
def generate_timestamp():
    """generate ISO8601 timestamp incl microsends, but with colons
    replaced to avoid problems if used as file name
    """
    return datetime.isoformat(datetime.now()).replace(":", "-")
项目:old-sovrin    作者:sovrin-foundation    | 项目源码 | 文件源码
def validUpgrade(nodeIds, tconf):
    schedule = {}
    unow = datetime.utcnow().replace(tzinfo=dateutil.tz.tzutc())
    startAt = unow + timedelta(seconds=90)
    acceptableDiff = tconf.MinSepBetweenNodeUpgrades + 1
    for i in nodeIds:
        schedule[i] = datetime.isoformat(startAt)
        startAt = startAt + timedelta(seconds=acceptableDiff + 3)
    return dict(name='upgrade-13', version=bumpedVersion(), action=START,
                schedule=schedule, sha256='aad1242', timeout=10)
项目:old-sovrin    作者:sovrin-foundation    | 项目源码 | 文件源码
def invalidUpgrade(nodeIds, tconf):
    schedule = {}
    unow = datetime.utcnow().replace(tzinfo=dateutil.tz.tzutc())
    startAt = unow + timedelta(seconds=60)
    acceptableDiff = tconf.MinSepBetweenNodeUpgrades + 1
    for i in nodeIds:
        schedule[i] = datetime.isoformat(startAt)
        startAt = startAt + timedelta(seconds=acceptableDiff - 3)

    return dict(name='upgrade-14', version=bumpedVersion(), action=START,
                schedule=schedule, sha256='ffd1224', timeout=10)
项目:TripletEmbedding    作者:hrantzsch    | 项目源码 | 文件源码
def __init__(self, args, optimizer, name, extra_msg=''):
        self.iteration = 0
        self.sum_loss = 0
        self.sum_acc = 0
        self.sum_mean_diff = 0
        self.sum_max_diff = 0
        self.current_section = ''
        self.optimizer = optimizer

        # setup according to arguments
        self.name = name if name is not '' else 'signdist'
        self.out_file = "{}_{}".format(date.isoformat(date.today()), self.name)
        self.log_file = "{}.log".format(self.out_file)
        # write config to head of the log file
        self.write_config(args, extra_msg)
项目:TripletEmbedding    作者:hrantzsch    | 项目源码 | 文件源码
def write_config(self, args, extra_msg):
        with open(self.log_file, 'a+') as f:
            self._comment("=" * 40, f)
            self._comment("{} initiated at {}".format(
                self.name, datetime.isoformat(datetime.now())
            ), f)
            self._comment("-" * 40, f)  # arguments passed
            self._comment("Data: " + args.data, f)
            self._comment("Batchsize: {}".format(args.batchsize), f)
            self._comment("Test ratio: {}".format(args.test), f)
            self._comment("Hard triplet ratio: {}".format(args.skilled), f)
            dev = "CPU" if args.gpu < 0 else "GPU ".format(args.gpu)
            self._comment("Device: " + dev, f)
            if args.initmodel:
                self._comment("Init model: " + args.initmodel, f)
            if args.resume:
                self._comment("Resume state: " + args.resume, f)
            self._comment("-" * 40, f)  # parameters set in script
            self._comment("Optimizer: " + self.optimizer.__class__.__name__, f)
            self._comment("Initial LR: {}".format(self.optimizer.lr), f)
            self._comment("LR interval: {}".format(args.lrinterval), f)
            self._comment("Weight decay: {}".format(args.weight_decay), f)
            self._comment("Epoch: {}".format(self.optimizer.epoch), f)
            if extra_msg:
                self._comment(extra_msg, f)
            self._comment("-" * 40, f)  # complete call
            self._comment("{}".format(sys.argv), f)
            self._comment("=" * 40, f)
项目:decocare    作者:openaps    | 项目源码 | 文件源码
def big_days(x=0):
  """
    # page 17, RECORD 11
    >>> parse_date( big_days(0) ).isoformat( )
    '2012-11-20T21:53:41'

    # page 17, ~ RECORD 12
    >>> parse_date( big_days(1) ).isoformat( )
    '2012-11-20T22:07:38'

    >>> parse_date( big_days(2) ).isoformat( )
    '2012-11-20T21:53:41'

    >>> parse_date( big_days(3) ).isoformat( )
    '2012-11-20T22:07:38'

    # page 16, RECORD ~15
    >>> parse_date( big_days(4) ).isoformat( )
    '2012-11-25T16:41:34'

    >>> parse_date( big_days(5) ).isoformat( )
    '2012-11-25T13:54:32'

    # page 15
    >>> parse_date( big_days(6) ).isoformat( )
    '2012-11-29T20:25:37'

    # page 0
    >>> parse_date( big_days(7) ).isoformat( )
    '2012-12-20T14:59:02'

    >>> parse_date( big_days(8) ).isoformat( )
    '2012-12-20T15:28:25'

  """
  return _bad_days[x]
项目:stream2segment    作者:rizac    | 项目源码 | 文件源码
def strptime(string, formats=None):
    """
        Converts a date in string format into a datetime python object. The inverse can be obtained
        by calling datetime.isoformat() (which returns 'T' as date time separator, and optionally
        microseconds if they are not zero). This function is an easy version of
        `dateutil.parser.parse` for parsing iso-like datetime format (e.g. fdnsws standard)
        without the need of a module import
        :param: string: if a datetime object, returns it. If date object, converts to datetime
        and returns it. Otherwise must be a string representing a datetime
        :type: string: a string, a date or a datetime object (in that case just returns it)
        :param formats: if list or iterable, it holds the strings denoting the formats to be used
        to convert string (in the order they are declared). If None (the default), the datetime
        format will be guessed from the string length among the following (with optional 'Z', and
        with 'T' replaced by space as vaild option):
           - '%Y-%m-%dT%H:%M:%S.%fZ'
           - '%Y-%m-%dT%H:%M:%SZ'
           - '%Y-%m-%dZ'
        :raise: ValueError if the string cannot be parsed
        :type: on_err_return_none: object or Exception
        :return: a datetime object
        :Example:
strptime("2016-06-01T09:04:00.5600Z")
        strptime("2016-06-01T09:04:00.5600")
        strptime("2016-06-01 09:04:00.5600Z")
        strptime("2016-06-01T09:04:00Z")
        strptime("2016-06-01T09:04:00")
        strptime("2016-06-01 09:04:00Z")
        strptime("2016-06-01")
    ```
"""
if isinstance(string, datetime):
    return string

try:
    string = string.strip()

    if formats is None:
        has_z = string[-1] == 'Z'
        has_t = 'T' in string
        if has_t or has_z or ' ' in string:
            t_str, z_str = 'T' if has_t else ' ', 'Z' if has_z else ''
            formats = ['%Y-%m-%d{}%H:%M:%S.%f{}'.format(t_str, z_str),
                       '%Y-%m-%d{}%H:%M:%S{}'.format(t_str, z_str)]
        else:
            formats = ['%Y-%m-%d']

    for dtformat in formats:
        try:
            return datetime.strptime(string, dtformat)
        except ValueError:  # as exce:
            pass
    raise ValueError("invalid date time '%s'" % str(string))
except ValueError:
    raise
except Exception as exc:
    raise ValueError(str(exc))

```

项目:decocare    作者:openaps    | 项目源码 | 文件源码
def _test_decode_bolus( ):
  """
  ## correct
  >>> parse_date( bytearray( _bewest_dates['page-19'][6] ) ).isoformat( )
  '2012-11-12T00:55:42'

  ## correct
  >>> parse_date( bytearray( _bewest_dates['page-19'][0] ) ).isoformat( )
  '2012-11-12T00:55:42'

  ## this is wrong
  >>> parse_date( bytearray( _bewest_dates['page-19'][1] ) ).isoformat( )
  '2012-04-10T12:12:00'

  ## day,month is wrong, time H:M:S is correct
  # expected:
  >>> parse_date( bytearray( _bewest_dates['page-19'][2] ) ).isoformat( )
  '2012-02-08T03:11:12'

  ## correct
  >>> parse_date( bytearray( _bewest_dates['page-19'][3] ) ).isoformat( )
  '2012-11-12T08:03:11'

  #### not a valid date
  # >>> parse_date( bytearray( _bewest_dates['page-19'][4] ) ).isoformat( )

  ## correct
  >>> parse_date( bytearray( _bewest_dates['page-19'][5] ) ).isoformat( )
  '2012-11-12T08:03:13'


  """
  """

  0x5b 0x7e # bolus wizard,
  0xaa 0xf7 0x00 0x0c 0x0c # page-19[0]

  0x0f 0x50 0x0d 0x2d 0x6a 0x00 0x0b 0x00
  0x00 0x07 0x00 0x0b 0x7d 0x5c 0x08 0x58
  0x97 0x04 0x30 0x05 0x14 0x34 0xc8
  0x91 0xf8      # 0x91, 0xf8 = month=11, minute=56, seconds=17!
  0x00           # general parsing fails here
  0x00
  0xaa 0xf7 0x40 0x0c 0x0c # expected  - page-19[6]

  0x0a 0x0c
  0x8b 0xc3 0x28 0x0c 0x8c # page-19[3]


  0x5b 0x0c

  0x8d 0xc3 0x08 0x0c 0x0c # page-19[5]
  0x00 0x51 0x0d 0x2d 0x6a 0x1f 0x00 0x00 0x00 0x00 0x00
  """

### csv deconstructed
项目:decocare    作者:openaps    | 项目源码 | 文件源码
def decode_wizard(data):
  """
  BYTE
  01:
  02:
  03:
  04:
  05:
  06:
  07:
  08:
  09:
  10:
  12:
  13:
  14:
  15:
  16:
  17:
  18:
  19:
  20:
  21:
  22:
  """
  head = data[:2]
  date = data[2:7]
  datetime = parse_date(date)
  body = data[7:]
  bg = lib.BangInt([ body[1] & 0x0f, head[1] ])
  carb_input = int(body[0])
  carb_ratio = int(body[2])
  bg_target_low = int(body[5])
  bg_target_high = int(body[3])
  sensitivity = int(body[4])

  print "BOLUS WIZARD", datetime.isoformat( )
  wizard = { 'bg_input': bg, 'carb_input': carb_input,
             'carb_ratio': carb_ratio,
             'insulin_sensitivity': sensitivity,
             'bg_target_low': bg_target_low,
             'bg_target_high': bg_target_high,
  }
  return wizard