Python datetime.datetime 模块,utcfromtimestamp() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用datetime.datetime.utcfromtimestamp()

项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def get(self):
        self.response.headers['Content-Type'] = 'application/json; charset=utf-8'
        self.response.headers['Access-Control-Allow-Origin'] = '*'
        try:
            data = json.loads(memcache.get(MC_OSCARS_TOP10))
        except:
            self.response.write(json.dumps({'entities':[]}))
            return
        entities = data['entities']
        if not entities:
            self.response.write(json.dumps({'entities':[]}))
            return
        self.response.write(json.dumps({
            'entities': zip(*entities)[0],
            "time": datetime.utcfromtimestamp(data['timestamp']).isoformat()+'.000Z'
            }))
项目:kaira    作者:mulonemartin    | 项目源码 | 文件源码
def __init__(self, name, value=None, path='/',
                 expires=None, max_age=None,
                 domain=None, secure=None, httponly=None,
                 options=None):
        self.name = name
        self.value = value
        self.path = path
        if max_age is None:
            self.expires = expires
        else:
            self.expires = datetime.utcfromtimestamp(time() + max_age)
        if domain is None:
            self.domain = options['HTTP_COOKIE_DOMAIN']
        else:
            self.domain = domain
        if secure is None:
            self.secure = options['HTTP_COOKIE_SECURE']
        else:
            self.secure = secure
        if httponly is None:
            self.httponly = options['HTTP_COOKIE_HTTPONLY']
        else:
            self.httponly = httponly
项目:PokeAlarm    作者:PokeAlarm    | 项目源码 | 文件源码
def pokestop(data):
        log.debug("Converting to pokestop: \n {}".format(data))
        if data.get('lure_expiration') is None:
            log.debug("Un-lured pokestop... ignoring.")
            return None
        stop = {
            'type': "pokestop",
            'id': data['pokestop_id'],
            'expire_time':  datetime.utcfromtimestamp(data['lure_expiration']),
            'lat': float(data['latitude']),
            'lng': float(data['longitude']),
            'lat_5': "{:.5f}".format(float(data['latitude'])),
            'lng_5': "{:.5f}".format(float(data['longitude']))
        }
        stop['gmaps'] = get_gmaps_link(stop['lat'], stop['lng'])
        stop['applemaps'] = get_applemaps_link(stop['lat'], stop['lng'])
        return stop
项目:kafka-spark-influx-csv-analysis    作者:bwsw    | 项目源码 | 文件源码
def __get_result_set_from_mock(mock, query):
        result_wrapper = Mock()
        influx_points = []

        result = re.search(r"(\d{19}).*?(\d{19})", query)
        start, end = result.groups()

        # extract range
        points = list(filter(lambda point: point["time"] > int(start) and point["time"] < int(end),
                             mock.points))

        country_result = re.search(r"\"country\"=\'(\w+)\'", query)
        if country_result:
            country = country_result.groups()[0]
            points = list(filter(lambda point: point["tags"]["country"] == country, points))

        for point in points:
            d = {**point["fields"], **point.get("tags", {})}
            d["time"] = datetime.utcfromtimestamp(point["time"] // 1000000000).strftime('%Y-%m-%dT%H:%M:%SZ')
            influx_points.append(d)

        result_wrapper.get_points.return_value = influx_points
        return result_wrapper
项目:takeout-inspector    作者:cdubz    | 项目源码 | 文件源码
def _get_message_date(self, message):
        """Finds date and time information for `message` and converts it to ISO-8601 format and UTC timezone.
        """
        mail_date = message.get('Date', '').decode('utf-8')
        if not mail_date:
            """The get_from() result always (so far as I have seen!) has the date string in the last 30 characters"""
            mail_date = message.get_from().strip()[-30:]

        datetime_tuple = email.utils.parsedate_tz(mail_date)
        if datetime_tuple:
            unix_time = email.utils.mktime_tz(datetime_tuple)
            mail_date_iso8601 = datetime.utcfromtimestamp(unix_time).isoformat(' ')
        else:
            mail_date_iso8601 = ''

        return mail_date_iso8601
项目:guides-cms    作者:pluralsight    | 项目源码 | 文件源码
def pprint(rate_limit):
    """
    Pretty print rate limit dictionary to be easily parsable and readable
    across multiple lines
    """

    # Ignoring the 'rate' key b/c github API claims this will be removed in
    # next major version:
    # https://developer.github.com/v3/rate_limit/#deprecation-notice

    def print_(name, limits):
        date_ = datetime.utcfromtimestamp(limits[name]['reset'])
        print '%8s remaining: %4s limit: %4s reset: %s' % (
                name,
                limits[name]['remaining'],
                limits[name]['limit'],
                date_.strftime('%d-%m-%Y %H:%M:%S'))

    print_('core', rate_limit['resources'])
    print_('search', rate_limit['resources'])
项目:BitcoinExchangeFH    作者:Aurora-Team    | 项目源码 | 文件源码
def parse_trade(cls, instmt, raw):
        """
        :param instmt: Instrument
        :param raw: Raw data in JSON
        :return:
        """
        trade = Trade()
        trade_id = raw[0]
        timestamp = raw[1]
        trade_price = raw[2]
        trade_volume = raw[3]

        trade.date_time = datetime.utcfromtimestamp(timestamp).strftime("%Y%m%d %H:%M:%S.%f")
        trade.trade_side = Trade.Side.BUY if trade_volume > 0 else Trade.Side.SELL
        trade.trade_volume = abs(trade_volume)
        trade.trade_id = str(trade_id)
        trade.trade_price = trade_price

        return trade
项目:BitcoinExchangeFH    作者:Aurora-Team    | 项目源码 | 文件源码
def parse_trade(cls, instmt, raw):
        """
        :param instmt: Instrument
        :param raw: Raw data in JSON
        :return:
        """

        trade = Trade()
        trade_id = raw['order_id']
        trade_price = float(raw['counter']) / float(raw['base'])
        trade_volume = float(raw['base'])

        timestamp = float(raw[cls.get_trades_timestamp_field_name()]) / 1000.0
        trade.date_time = datetime.utcfromtimestamp(timestamp).strftime("%Y%m%d %H:%M:%S.%f")
        trade.trade_volume = trade_volume
        trade.trade_id = trade_id
        trade.trade_price = trade_price

        return trade
项目:BitcoinExchangeFH    作者:Aurora-Team    | 项目源码 | 文件源码
def parse_trade(cls, instmt, raw):
        """
        :param instmt: Instrument
        :param raw: Raw data in JSON
        :return:
        """
        trade = Trade()

        # Trade price
        trade.trade_price = float(str(raw[0]))

        # Trade volume
        trade.trade_volume = float(str(raw[1]))

        # Timestamp
        date_time = float(raw[2])
        trade.date_time = datetime.utcfromtimestamp(date_time).strftime("%Y%m%d %H:%M:%S.%f")

        # Trade side
        trade.trade_side = Trade.parse_side(raw[3])

        # Trade id
        trade.trade_id = trade.date_time + '-' + str(instmt.get_exch_trade_id())

        return trade
项目:sarjitsu    作者:distributed-system-analysis    | 项目源码 | 文件源码
def dump(self, *args, **kwargs):
        print("file_header (0x%04x):" % (self.sa_magic,))
        print("\tsa_ust_time", repr(self.sa_ust_time), datetime.utcfromtimestamp(self.sa_ust_time))
        print("\tsa_actflag", repr(self.sa_actflag))
        print("\tsa_nr_pid", repr(self.sa_nr_pid))
        print("\tsa_irqcpu", repr(self.sa_irqcpu))
        print("\tsa_nr_disk", repr(self.sa_nr_disk))
        print("\tsa_proc", repr(self.sa_proc))
        print("\tsa_serial", repr(self.sa_serial))
        print("\tsa_iface", repr(self.sa_iface))
        print("\tsa_magic 0x%04x" % self.sa_magic)
        print("\tsa_st_size", repr(self.sa_st_size))
        print("\tsa_day", repr(self.sa_day))
        print("\tsa_month", repr(self.sa_month))
        print("\tsa_year", repr(self.sa_year))
        print("\tsa_sizeof_long", repr(self.sa_sizeof_long))
        print("\tsa_sysname", repr(self.sa_sysname))
        print("\tsa_nodename", repr(self.sa_nodename))
        print("\tsa_release", repr(self.sa_release))
项目:electron-crash-reporter    作者:lipis    | 项目源码 | 文件源码
def check_for_update():
  if os.path.exists(FILE_UPDATE):
    mtime = os.path.getmtime(FILE_UPDATE)
    last = datetime.utcfromtimestamp(mtime).strftime('%Y-%m-%d')
    today = datetime.utcnow().strftime('%Y-%m-%d')
    if last == today:
      return
  try:
    with open(FILE_UPDATE, 'a'):
      os.utime(FILE_UPDATE, None)
    request = urllib2.Request(
      CORE_VERSION_URL,
      urllib.urlencode({'version': main.__version__}),
    )
    response = urllib2.urlopen(request)
    with open(FILE_UPDATE, 'w') as update_json:
      update_json.write(response.read())
  except (urllib2.HTTPError, urllib2.URLError):
    pass
项目:audio-feeder    作者:pganssle    | 项目源码 | 文件源码
def make_new_entry(self, rel_path, id_handler):
        """
        Generates a new entry for the specified path.

        Note: This will mutate the id_handler!
        """
        # Try to match to an existing book.
        e_id = id_handler.new_id()

        abs_path = os.path.join(read_from_config('media_loc').path, rel_path)
        lmtime = os.path.getmtime(abs_path)
        added_dt = datetime.utcfromtimestamp(lmtime)
        last_modified = added_dt.replace(tzinfo=timezone.utc)

        entry_obj = oh.Entry(id=e_id, path=rel_path,
            date_added=datetime.now(timezone.utc),
            last_modified=last_modified,
            type='Book', table=self.BOOK_TABLE_NAME, data_id=None,
            hashseed=_rand.randint(0, 2**32))

        return entry_obj
项目:FCParser    作者:josecamachop    | 项目源码 | 文件源码
def load(self, raw_value):
        """Converts an input raw value into a timestamp.
        Returns: Datetime object, if the conversion succeeds;
                 None, if the conversion fails.

        raw_value -- The raw value, in string format (eg. '2014-12-20 15:01:02'),
                     or in milliseconds since Epoch  (eg. 1293581619000)
        """
        if isinstance(raw_value, str):
            try:
                timestamp = datetime.strptime(raw_value, "%Y-%m-%d %H:%M:%S")
            except:
                timestamp = None
        else:
            try:
                timestamp = datetime.utcfromtimestamp(float(raw_value)/1000)
            except:
                timestamp = None
        return timestamp
项目:pyjo    作者:marcopaz    | 项目源码 | 文件源码
def test_datetimefield(self):

        class A(Model):
            date = DatetimeField()

        time = 1478390400
        dt = datetime.utcfromtimestamp(time)

        a = A(date=dt)
        self.assertEqual(a.date, dt)

        with self.assertRaises(FieldTypeError):
            a.date = 'hello'

        pj = a.to_pyjson()
        self.assertEqual(pj['date'], time)
        aa = A.from_json(a.to_json())
        self.assertEqual(aa.date, dt)
项目:meet-notes    作者:lipis    | 项目源码 | 文件源码
def check_for_update():
  if os.path.exists(FILE_UPDATE):
    mtime = os.path.getmtime(FILE_UPDATE)
    last = datetime.utcfromtimestamp(mtime).strftime('%Y-%m-%d')
    today = datetime.utcnow().strftime('%Y-%m-%d')
    if last == today:
      return
  try:
    with open(FILE_UPDATE, 'a'):
      os.utime(FILE_UPDATE, None)
    request = urllib2.Request(
      CORE_VERSION_URL,
      urllib.urlencode({'version': main.__version__}),
    )
    response = urllib2.urlopen(request)
    with open(FILE_UPDATE, 'w') as update_json:
      update_json.write(response.read())
  except (urllib2.HTTPError, urllib2.URLError):
    pass
项目:catalyst    作者:enigmampc    | 项目源码 | 文件源码
def get_seconds_from_date(date):
    """
    The number of seconds from the epoch.

    Parameters
    ----------
    date: datetime

    Returns
    -------
    int

    """
    epoch = datetime.utcfromtimestamp(0)
    epoch = epoch.replace(tzinfo=pytz.UTC)

    return int((date - epoch).total_seconds())
项目:trade-analysis    作者:qplum    | 项目源码 | 文件源码
def get_minutebar(filesource):
        output = []
        with open(filesource, 'rb') as file_:
            a = C_PERIODIC_BAR()
            while (file_.readinto(a) == sizeof(C_PERIODIC_BAR)):
                open_ = Quote(a.open.bid_price, a.open.bid_size, a.open.ask_price, a.open.ask_size)
                close_ = Quote(a.close.bid_price, a.close.bid_size, a.close.ask_price, a.close.ask_size)
                ts = datetime.utcfromtimestamp(a.ts.time.tv_sec + a.ts.time.tv_usec / 1000000)
                ts_with_tz = datetime(year=ts.year,
                                      month=ts.month,
                                      day=ts.day,
                                      hour=ts.hour,
                                      minute=ts.minute,
                                      second=ts.second,
                                      tzinfo=pytz.UTC)
                elem = PeriodicBar(open_, close_, a.high, a.low, a.volume, ts_with_tz)
                output.append(elem)

        return output

    ## @brief Make quote objects from futures data
    #  date,product,specific_ticker,open,high,low,close,contract_volume,contract_oi,total_volume,total_oi
    #
项目:vote4code    作者:welovecoding    | 项目源码 | 文件源码
def check_for_update():
  if os.path.exists(FILE_UPDATE):
    mtime = os.path.getmtime(FILE_UPDATE)
    last = datetime.utcfromtimestamp(mtime).strftime('%Y-%m-%d')
    today = datetime.utcnow().strftime('%Y-%m-%d')
    if last == today:
      return
  try:
    with open(FILE_UPDATE, 'a'):
      os.utime(FILE_UPDATE, None)
    request = urllib2.Request(
      CORE_VERSION_URL,
      urllib.urlencode({'version': __version__}),
    )
    response = urllib2.urlopen(request)
    with open(FILE_UPDATE, 'w') as update_json:
      update_json.write(response.read())
  except (urllib2.HTTPError, urllib2.URLError):
    pass
项目:twitterscraper    作者:taspinar    | 项目源码 | 文件源码
def from_soup(cls, tweet):
        return cls(
            user=tweet.find('span', 'username').text[1:],
            fullname=tweet.find('strong', 'fullname').text,
            id=tweet['data-item-id'],
            url = tweet.find('div', 'tweet')['data-permalink-path'],
            timestamp=datetime.utcfromtimestamp(
                int(tweet.find('span', '_timestamp')['data-time'])),
            text=tweet.find('p', 'tweet-text').text or "",
            replies = tweet.find(
                'span', 'ProfileTweet-action--reply u-hiddenVisually').find(
                    'span', 'ProfileTweet-actionCount')['data-tweet-stat-count'] or '0',
            retweets = tweet.find(
                'span', 'ProfileTweet-action--retweet u-hiddenVisually').find(
                    'span', 'ProfileTweet-actionCount')['data-tweet-stat-count'] or '0',
            likes = tweet.find(
                'span', 'ProfileTweet-action--favorite u-hiddenVisually').find(
                    'span', 'ProfileTweet-actionCount')['data-tweet-stat-count'] or '0',
        )
项目:two1-python    作者:21dotco    | 项目源码 | 文件源码
def format_date(unix_timestamp):
    """ Return a standardized date format for use in the two1 library.

    This function produces a localized datetime string that includes the UTC timezone offset. This offset is
    computed as the difference between the local version of the timestamp (python's datatime.fromtimestamp)
    and the utc representation of the input timestamp.

    Args:
        unix_timestamp (float): a floating point unix timestamp

    Returns:
        string: A string formatted with "%Y-%m-%d %H:%M:%S %Z"
    """

    local_datetime = datetime.fromtimestamp(unix_timestamp)
    utz_offset = local_datetime - datetime.utcfromtimestamp(unix_timestamp)
    local_date = local_datetime.replace(
        tzinfo=timezone(utz_offset)
    ).strftime("%Y-%m-%d %H:%M:%S %Z")

    return local_date
项目:oracle-imagecopy-backup    作者:unibet    | 项目源码 | 文件源码
def listsnapshots(self, sortbycreation=False, sortreverse=False):
        output = self._srv.invoke("volume-size", "volume", self._volname)
        self._check_netapp_error(output, "Failed to get volume size information")
        volsize = self._volsize_to_num(output.child_get_string("volume-size"))
        pct_limit = round(2147483648*100/(volsize/self._blocksize))
        output = self._srv.invoke("snapshot-list-info", "volume", self._volname)
        self._check_netapp_error(output, "Failed to list snapshots")
        snapshotlist = output.child_get("snapshots")
        snapshots = []
        if (snapshotlist is not None and snapshotlist):
            for ss in snapshotlist.children_get():
                snapshots.append( {'id': ss.child_get_string("name"),
                    'creation': datetime.utcfromtimestamp(float(ss.child_get_int("access-time"))),
                    'numclones':  1 if ss.child_get_string("busy") == "true" else 0,
                    'space_total': ss.child_get_int("cumulative-total")*self._blocksize if ss.child_get_int("cumulative-percentage-of-total-blocks") < pct_limit else round(volsize*ss.child_get_int("cumulative-percentage-of-total-blocks")/100),
                    'space_unique': ss.child_get_int("total")*self._blocksize if ss.child_get_int("percentage-of-total-blocks") < pct_limit else round(volsize*ss.child_get_int("percentage-of-total-blocks")/100) 
                } )
        if not sortbycreation:
            return snapshots
        else:
            return sorted(snapshots, key=operator.itemgetter('creation'), reverse=sortreverse)
项目:smartbackup    作者:baszoetekouw    | 项目源码 | 文件源码
def get_stamp(stampdir):
    stampfile = os.path.join(stampdir,'stamp')

    # read first line of stamp file
    f = open(stampfile,mode='rt')
    line = f.readline()
    f.close()

    # extract timestamp (first field of line) and compare to others
    stamp = int( line.split().pop(0) )
    if not stamp>0:
        raise BackupError("Can't read `{}'".format(stampfile))

    dt = datetime.utcfromtimestamp(stamp)
    return dt

# find a list of backups for a particular machine
项目:WikiLoves    作者:wmes    | 项目源码 | 文件源码
def unix_time(zulu_time_string):
    dt = datetime.strptime(zulu_time_string, "%Y-%m-%dT%H:%M:%SZ")
    epoch = datetime.utcfromtimestamp(0)
    delta = dt - epoch
    return int(delta.total_seconds() * 1000)
项目:WikiLoves    作者:wmes    | 项目源码 | 文件源码
def unix_time(zulu_time_string):
    dt = datetime.strptime(zulu_time_string, "%Y-%m-%dT%H:%M:%SZ")
    epoch = datetime.utcfromtimestamp(0)
    delta = dt - epoch
    return int(delta.total_seconds() * 1000)
项目:WikiLoves    作者:wmes    | 项目源码 | 文件源码
def unix_time(zulu_time_string):
    dt = datetime.strptime(zulu_time_string, "%Y-%m-%dT%H:%M:%SZ")
    epoch = datetime.utcfromtimestamp(0)
    delta = dt - epoch
    return int(delta.total_seconds() * 1000)
项目:tornado-ssdb-project    作者:ego008    | 项目源码 | 文件源码
def timestamp_to_datetime(timestamp):  # UTC datetime
    return datetime.utcfromtimestamp(timestamp)
项目:boss    作者:kabirbaidhya    | 项目源码 | 文件源码
def localize_utc_timestamp(utc_datetime):
    ''' Convert timestamp in UTC to local timezone. '''
    now = time.time()
    offset = datetime.fromtimestamp(now) - datetime.utcfromtimestamp(now)
    return utc_datetime + offset
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _create_list_zerofill(self, data, start,stop):
        next = self._to_timestamp(start)/10*10+10
        stop = self._to_timestamp(stop)/10*10-180
        now = 0
        prev = 0
        prev_t = next
        for i in data:
            # now = self._to_timestamp(i.timestamp)
            # while now > next:
            #     w = 1.*(now - next)/(now - prev_t)
            #     yield {
            #         "time": datetime.utcfromtimestamp(next).isoformat()+'.000Z',
            #         "count": int(prev * w + i.frequency * (1-w)),
            #         }
            #     next += 10
            yield {
                "time": i.timestamp.isoformat()+'.000Z',
                "count": int(i.frequency),
            }
            prev = i.frequency
            prev_t = now
            next = now+10
        # while next < stop:
        #     yield {
        #         "time": datetime.utcfromtimestamp(next).isoformat()+'.000Z',
        #         "count": 0,
        #         }
        #     next += 10
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_can_insert_udts_with_all_datatypes(self):
        """
        Test for inserting all column types into a UserType

        test_can_insert_udts_with_all_datatypes tests that each cqlengine column type can be inserted into a UserType.
        It first creates a UserType that has each cqlengine column type, and a corresponding table/Model. It then creates
        a UserType instance where all the fields have corresponding data, and inserts the UserType as an instance of the Model.
        Finally, it verifies that each column read from the UserType from Cassandra is the same as the input parameters.

        @since 2.5.0
        @jira_ticket PYTHON-251
        @expected_result The UserType is inserted with each column type, and the resulting read yields proper data for each column.

        @test_category data_types:udt
        """
        sync_table(AllDatatypesModel)
        self.addCleanup(drop_table, AllDatatypesModel)

        input = AllDatatypes(a='ascii', b=2 ** 63 - 1, c=bytearray(b'hello world'), d=True,
                             e=datetime.utcfromtimestamp(872835240), f=Decimal('12.3E+7'), g=2.39,
                             h=3.4028234663852886e+38, i='123.123.123.123', j=2147483647, k='text',
                             l=UUID('FE2B4360-28C6-11E2-81C1-0800200C9A66'),
                             m=UUID('067e6162-3b6f-4ae2-a171-2470b63dff00'), n=int(str(2147483647) + '000'))
        AllDatatypesModel.create(id=0, data=input)

        self.assertEqual(1, AllDatatypesModel.objects.count())
        output = AllDatatypesModel.objects.first().data

        for i in range(ord('a'), ord('a') + 14):
            self.assertEqual(input[chr(i)], output[chr(i)])
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_datetime_timestamp(self):
        dt_value = 1454520554
        self.DatetimeTest.objects.create(test_id=5, created_at=dt_value)
        dt2 = self.DatetimeTest.objects(test_id=5).first()
        self.assertEqual(dt2.created_at, datetime.utcfromtimestamp(dt_value))
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_conversion_specific_date(self):
        dt = datetime(1981, 7, 11, microsecond=555000)

        uuid = util.uuid_from_time(dt)

        from uuid import UUID
        assert isinstance(uuid, UUID)

        ts = (uuid.time - 0x01b21dd213814000) / 1e7 # back to a timestamp
        new_dt = datetime.utcfromtimestamp(ts)

        # checks that we created a UUID1 with the proper timestamp
        assert new_dt == dt
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def to_python(self, value):
        if value is None:
            return
        if isinstance(value, datetime):
            if DateTime.truncate_microseconds:
                us = value.microsecond
                truncated_us = us // 1000 * 1000
                return value - timedelta(microseconds=us - truncated_us)
            else:
                return value
        elif isinstance(value, date):
            return datetime(*(value.timetuple()[:6]))

        return datetime.utcfromtimestamp(value)
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def test_utc_timestamping():
    assert timestamp(
        datetime(2017, 7, 14, 2, 40).replace(tzinfo=utc)
    ) == 1500000000

    for d in (
        datetime.now(),
        datetime.utcnow(),
        datetime(1999, 12, 31, 23, 59, 59),
        datetime(2000, 1, 1, 0, 0, 0)
    ):
        assert datetime.utcfromtimestamp(
            timestamp(d)) - d < timedelta(microseconds=10)
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def _x_format(self):
        """Return the value formatter for this graph"""
        def datetime_to_str(x):
            dt = datetime.utcfromtimestamp(x)
            return self.x_value_formatter(dt)
        return datetime_to_str
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _opener(self, filename):
        return lambda: (
            open(filename, 'rb'),
            datetime.utcfromtimestamp(os.path.getmtime(filename)),
            int(os.path.getsize(filename))
        )
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def timestamp_to_datetime(self, ts):
        """Used to convert the timestamp from `get_timestamp` into a
        datetime object.
        """
        return datetime.utcfromtimestamp(ts + EPOCH)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def get_issue_date(self, header):
        rv = header.get('iat')
        if isinstance(rv, number_types):
            return datetime.utcfromtimestamp(int(rv))
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def _opener(self, filename):
        return lambda: (
            open(filename, 'rb'),
            datetime.utcfromtimestamp(os.path.getmtime(filename)),
            int(os.path.getsize(filename))
        )
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def timestamp_to_datetime(self, ts):
        """Used to convert the timestamp from `get_timestamp` into a
        datetime object.
        """
        return datetime.utcfromtimestamp(ts + EPOCH)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def get_issue_date(self, header):
        rv = header.get('iat')
        if isinstance(rv, number_types):
            return datetime.utcfromtimestamp(int(rv))
项目:n1mm_view    作者:n1kdo    | 项目源码 | 文件源码
def get_last_qso(cursor) :
    cursor.execute('SELECT timestamp, callsign, exchange, section, operator.name, band_id \n'
                   'FROM qso_log JOIN operator WHERE operator.id = operator_id \n'
                   'ORDER BY timestamp DESC LIMIT 1')
    last_qso_time = int(time.time()) - 60
    message = ''
    for row in cursor:
        last_qso_time = row[0]
    message = 'Last QSO: %s %s %s on %s by %s at %s' % (
        row[1], row[2], row[3], constants.Bands.BANDS_TITLE[row[5]], row[4],
        datetime.utcfromtimestamp(row[0]).strftime('%H:%M:%S'))
    logging.debug(message)
    return last_qso_time, message
项目:n1mm_view    作者:n1kdo    | 项目源码 | 文件源码
def get_qsos_per_hour_per_band(cursor):
    qsos_per_hour = []
    qsos_by_band = [0] * constants.Bands.count()
    slice_minutes = 15
    slices_per_hour = 60 / slice_minutes
    window_seconds = slice_minutes * 60

    logging.debug('Load QSOs per Hour by Band')
    cursor.execute('SELECT timestamp / %d * %d AS ts, band_id, COUNT(*) AS qso_count \n'
                   'FROM qso_log GROUP BY ts, band_id;' % (window_seconds, window_seconds))
    for row in cursor:
        if len(qsos_per_hour) == 0:
            qsos_per_hour.append([0] * constants.Bands.count())
            qsos_per_hour[-1][0] = row[0]
        while qsos_per_hour[-1][0] != row[0]:
            ts = qsos_per_hour[-1][0] + window_seconds
            qsos_per_hour.append([0] * constants.Bands.count())
            qsos_per_hour[-1][0] = ts
        qsos_per_hour[-1][row[1]] = row[2] * slices_per_hour
        qsos_by_band[row[1]] += row[2]

    for rec in qsos_per_hour:  # FIXME
        rec[0] = datetime.utcfromtimestamp(rec[0])
        t = rec[0].strftime('%H:%M:%S')

    return qsos_per_hour, qsos_by_band
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def memorized_datetime(seconds):
    '''Create only one instance of each distinct datetime'''
    try:
        return _datetime_cache[seconds]
    except KeyError:
        # NB. We can't just do datetime.utcfromtimestamp(seconds) as this
        # fails with negative values under Windows (Bug #90096)
        dt = _epoch + timedelta(seconds=seconds)
        _datetime_cache[seconds] = dt
        return dt
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def wait_rate_limit_reset(now):
    reset = (
        datetime.utcfromtimestamp(GITHUB.x_ratelimit_reset)
        .replace(tzinfo=timezone.utc)
    )
    delta = reset - now
    wait = delta.total_seconds() + .5
    if wait < 1 or 3500 < wait:
        # Our data is outdated. Just go on.
        return 0

    logger.warning("Waiting rate limit reset in %s seconds.", wait)
    time.sleep(wait)
    GITHUB._instance.x_ratelimit_remaining = -1
    return wait
项目:wallstreet    作者:mcdallas    | 项目源码 | 文件源码
def _yahoo(self, quote, exchange=None):
        """ Collects data from Yahoo Finance API """

        query = quote + "." + exchange.upper() if exchange else quote

        if not hasattr(self, '_session_y'):
            self._session_y = requests.Session()
        r = self._session_y.get(__class__._Y_API + query)

        if r.status_code == 404:
            raise LookupError('Ticker symbol not found.')
        else:
            r.raise_for_status()

        jayson = r.json()['optionChain']['result'][0]['quote']

        self.ticker = jayson['symbol']
        self._price = jayson['regularMarketPrice']
        self.currency = jayson['currency']
        self.exchange = jayson['exchange']
        self.change = jayson['regularMarketChange']
        self.cp = jayson['regularMarketChangePercent']
        self._last_trade = datetime.utcfromtimestamp(jayson['regularMarketTime'])
        self.name = jayson['longName']
        self.dy = jayson.get('trailingAnnualDividendYield', 0)
项目:wallstreet    作者:mcdallas    | 项目源码 | 文件源码
def _yahoo(self, quote, d, m, y):
        """ Collects data from Yahoo Finance API """

        epoch = int(round(mktime(date(y, m, d).timetuple())/86400, 0)*86400)

        if not hasattr(self, '_session_y'):
            self._session_y = requests.Session()
        r = self._session_y.get(__class__._Y_API + quote + '?date=' + str(epoch))

        if r.status_code == 404:
            raise LookupError('Ticker symbol not found.')
        else:
            r.raise_for_status()

        json = r.json()

        try:
            self.data = json['optionChain']['result'][0]['options'][0]
        except IndexError:
            raise LookupError('No options listed for this stock.')

        self._exp = [datetime.utcfromtimestamp(i).date() for i in json['optionChain']['result'][0]['expirationDates']]
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def swift_get_container(request, container_name, with_data=True):
    if with_data:
        headers, data = swift_api(request).get_object(container_name, "")
    else:
        data = None
        headers = swift_api(request).head_container(container_name)
    timestamp = None
    is_public = False
    public_url = None
    try:
        is_public = GLOBAL_READ_ACL in headers.get('x-container-read', '')
        if is_public:
            swift_endpoint = base.url_for(request,
                                          'object-store',
                                          endpoint_type='publicURL')
            parameters = urlparse.quote(container_name.encode('utf8'))
            public_url = swift_endpoint + '/' + parameters
        ts_float = float(headers.get('x-timestamp'))
        timestamp = datetime.utcfromtimestamp(ts_float).isoformat()
    except Exception:
        pass
    container_info = {
        'name': container_name,
        'container_object_count': headers.get('x-container-object-count'),
        'container_bytes_used': headers.get('x-container-bytes-used'),
        'timestamp': timestamp,
        'data': data,
        'is_public': is_public,
        'public_url': public_url,
    }
    return Container(container_info)
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def swift_get_object(request, container_name, object_name, with_data=True,
                     resp_chunk_size=CHUNK_SIZE):
    if with_data:
        headers, data = swift_api(request).get_object(
            container_name, object_name, resp_chunk_size=resp_chunk_size)
    else:
        data = None
        headers = swift_api(request).head_object(container_name,
                                                 object_name)
    orig_name = headers.get("x-object-meta-orig-filename")
    timestamp = None
    try:
        ts_float = float(headers.get('x-timestamp'))
        timestamp = datetime.utcfromtimestamp(ts_float).isoformat()
    except Exception:
        pass
    obj_info = {
        'name': object_name,
        'bytes': headers.get('content-length'),
        'content_type': headers.get('content-type'),
        'etag': headers.get('etag'),
        'timestamp': timestamp,
    }
    return StorageObject(obj_info,
                         container_name,
                         orig_name=orig_name,
                         data=data)
项目:cqlmapper    作者:reddit    | 项目源码 | 文件源码
def test_datetime_timestamp(self):
        dt_value = 1454520554
        self.DatetimeTest.objects.create(
            self.conn,
            test_id=5,
            created_at=dt_value,
        )
        dt2 = self.DatetimeTest.objects(test_id=5).first(self.conn)
        assert dt2.created_at == datetime.utcfromtimestamp(dt_value)
项目:cqlmapper    作者:reddit    | 项目源码 | 文件源码
def test_conversion_specific_date(self):
        dt = datetime(1981, 7, 11, microsecond=555000)

        uuid = util.uuid_from_time(dt)

        from uuid import UUID
        assert isinstance(uuid, UUID)

        ts = (uuid.time - 0x01b21dd213814000) / 1e7 # back to a timestamp
        new_dt = datetime.utcfromtimestamp(ts)

        # checks that we created a UUID1 with the proper timestamp
        assert new_dt == dt