我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用datetime.datetime.utcfromtimestamp()。
def get(self): self.response.headers['Content-Type'] = 'application/json; charset=utf-8' self.response.headers['Access-Control-Allow-Origin'] = '*' try: data = json.loads(memcache.get(MC_OSCARS_TOP10)) except: self.response.write(json.dumps({'entities':[]})) return entities = data['entities'] if not entities: self.response.write(json.dumps({'entities':[]})) return self.response.write(json.dumps({ 'entities': zip(*entities)[0], "time": datetime.utcfromtimestamp(data['timestamp']).isoformat()+'.000Z' }))
def __init__(self, name, value=None, path='/', expires=None, max_age=None, domain=None, secure=None, httponly=None, options=None): self.name = name self.value = value self.path = path if max_age is None: self.expires = expires else: self.expires = datetime.utcfromtimestamp(time() + max_age) if domain is None: self.domain = options['HTTP_COOKIE_DOMAIN'] else: self.domain = domain if secure is None: self.secure = options['HTTP_COOKIE_SECURE'] else: self.secure = secure if httponly is None: self.httponly = options['HTTP_COOKIE_HTTPONLY'] else: self.httponly = httponly
def pokestop(data): log.debug("Converting to pokestop: \n {}".format(data)) if data.get('lure_expiration') is None: log.debug("Un-lured pokestop... ignoring.") return None stop = { 'type': "pokestop", 'id': data['pokestop_id'], 'expire_time': datetime.utcfromtimestamp(data['lure_expiration']), 'lat': float(data['latitude']), 'lng': float(data['longitude']), 'lat_5': "{:.5f}".format(float(data['latitude'])), 'lng_5': "{:.5f}".format(float(data['longitude'])) } stop['gmaps'] = get_gmaps_link(stop['lat'], stop['lng']) stop['applemaps'] = get_applemaps_link(stop['lat'], stop['lng']) return stop
def __get_result_set_from_mock(mock, query): result_wrapper = Mock() influx_points = [] result = re.search(r"(\d{19}).*?(\d{19})", query) start, end = result.groups() # extract range points = list(filter(lambda point: point["time"] > int(start) and point["time"] < int(end), mock.points)) country_result = re.search(r"\"country\"=\'(\w+)\'", query) if country_result: country = country_result.groups()[0] points = list(filter(lambda point: point["tags"]["country"] == country, points)) for point in points: d = {**point["fields"], **point.get("tags", {})} d["time"] = datetime.utcfromtimestamp(point["time"] // 1000000000).strftime('%Y-%m-%dT%H:%M:%SZ') influx_points.append(d) result_wrapper.get_points.return_value = influx_points return result_wrapper
def _get_message_date(self, message): """Finds date and time information for `message` and converts it to ISO-8601 format and UTC timezone. """ mail_date = message.get('Date', '').decode('utf-8') if not mail_date: """The get_from() result always (so far as I have seen!) has the date string in the last 30 characters""" mail_date = message.get_from().strip()[-30:] datetime_tuple = email.utils.parsedate_tz(mail_date) if datetime_tuple: unix_time = email.utils.mktime_tz(datetime_tuple) mail_date_iso8601 = datetime.utcfromtimestamp(unix_time).isoformat(' ') else: mail_date_iso8601 = '' return mail_date_iso8601
def pprint(rate_limit): """ Pretty print rate limit dictionary to be easily parsable and readable across multiple lines """ # Ignoring the 'rate' key b/c github API claims this will be removed in # next major version: # https://developer.github.com/v3/rate_limit/#deprecation-notice def print_(name, limits): date_ = datetime.utcfromtimestamp(limits[name]['reset']) print '%8s remaining: %4s limit: %4s reset: %s' % ( name, limits[name]['remaining'], limits[name]['limit'], date_.strftime('%d-%m-%Y %H:%M:%S')) print_('core', rate_limit['resources']) print_('search', rate_limit['resources'])
def parse_trade(cls, instmt, raw): """ :param instmt: Instrument :param raw: Raw data in JSON :return: """ trade = Trade() trade_id = raw[0] timestamp = raw[1] trade_price = raw[2] trade_volume = raw[3] trade.date_time = datetime.utcfromtimestamp(timestamp).strftime("%Y%m%d %H:%M:%S.%f") trade.trade_side = Trade.Side.BUY if trade_volume > 0 else Trade.Side.SELL trade.trade_volume = abs(trade_volume) trade.trade_id = str(trade_id) trade.trade_price = trade_price return trade
def parse_trade(cls, instmt, raw): """ :param instmt: Instrument :param raw: Raw data in JSON :return: """ trade = Trade() trade_id = raw['order_id'] trade_price = float(raw['counter']) / float(raw['base']) trade_volume = float(raw['base']) timestamp = float(raw[cls.get_trades_timestamp_field_name()]) / 1000.0 trade.date_time = datetime.utcfromtimestamp(timestamp).strftime("%Y%m%d %H:%M:%S.%f") trade.trade_volume = trade_volume trade.trade_id = trade_id trade.trade_price = trade_price return trade
def parse_trade(cls, instmt, raw): """ :param instmt: Instrument :param raw: Raw data in JSON :return: """ trade = Trade() # Trade price trade.trade_price = float(str(raw[0])) # Trade volume trade.trade_volume = float(str(raw[1])) # Timestamp date_time = float(raw[2]) trade.date_time = datetime.utcfromtimestamp(date_time).strftime("%Y%m%d %H:%M:%S.%f") # Trade side trade.trade_side = Trade.parse_side(raw[3]) # Trade id trade.trade_id = trade.date_time + '-' + str(instmt.get_exch_trade_id()) return trade
def dump(self, *args, **kwargs): print("file_header (0x%04x):" % (self.sa_magic,)) print("\tsa_ust_time", repr(self.sa_ust_time), datetime.utcfromtimestamp(self.sa_ust_time)) print("\tsa_actflag", repr(self.sa_actflag)) print("\tsa_nr_pid", repr(self.sa_nr_pid)) print("\tsa_irqcpu", repr(self.sa_irqcpu)) print("\tsa_nr_disk", repr(self.sa_nr_disk)) print("\tsa_proc", repr(self.sa_proc)) print("\tsa_serial", repr(self.sa_serial)) print("\tsa_iface", repr(self.sa_iface)) print("\tsa_magic 0x%04x" % self.sa_magic) print("\tsa_st_size", repr(self.sa_st_size)) print("\tsa_day", repr(self.sa_day)) print("\tsa_month", repr(self.sa_month)) print("\tsa_year", repr(self.sa_year)) print("\tsa_sizeof_long", repr(self.sa_sizeof_long)) print("\tsa_sysname", repr(self.sa_sysname)) print("\tsa_nodename", repr(self.sa_nodename)) print("\tsa_release", repr(self.sa_release))
def check_for_update(): if os.path.exists(FILE_UPDATE): mtime = os.path.getmtime(FILE_UPDATE) last = datetime.utcfromtimestamp(mtime).strftime('%Y-%m-%d') today = datetime.utcnow().strftime('%Y-%m-%d') if last == today: return try: with open(FILE_UPDATE, 'a'): os.utime(FILE_UPDATE, None) request = urllib2.Request( CORE_VERSION_URL, urllib.urlencode({'version': main.__version__}), ) response = urllib2.urlopen(request) with open(FILE_UPDATE, 'w') as update_json: update_json.write(response.read()) except (urllib2.HTTPError, urllib2.URLError): pass
def make_new_entry(self, rel_path, id_handler): """ Generates a new entry for the specified path. Note: This will mutate the id_handler! """ # Try to match to an existing book. e_id = id_handler.new_id() abs_path = os.path.join(read_from_config('media_loc').path, rel_path) lmtime = os.path.getmtime(abs_path) added_dt = datetime.utcfromtimestamp(lmtime) last_modified = added_dt.replace(tzinfo=timezone.utc) entry_obj = oh.Entry(id=e_id, path=rel_path, date_added=datetime.now(timezone.utc), last_modified=last_modified, type='Book', table=self.BOOK_TABLE_NAME, data_id=None, hashseed=_rand.randint(0, 2**32)) return entry_obj
def load(self, raw_value): """Converts an input raw value into a timestamp. Returns: Datetime object, if the conversion succeeds; None, if the conversion fails. raw_value -- The raw value, in string format (eg. '2014-12-20 15:01:02'), or in milliseconds since Epoch (eg. 1293581619000) """ if isinstance(raw_value, str): try: timestamp = datetime.strptime(raw_value, "%Y-%m-%d %H:%M:%S") except: timestamp = None else: try: timestamp = datetime.utcfromtimestamp(float(raw_value)/1000) except: timestamp = None return timestamp
def test_datetimefield(self): class A(Model): date = DatetimeField() time = 1478390400 dt = datetime.utcfromtimestamp(time) a = A(date=dt) self.assertEqual(a.date, dt) with self.assertRaises(FieldTypeError): a.date = 'hello' pj = a.to_pyjson() self.assertEqual(pj['date'], time) aa = A.from_json(a.to_json()) self.assertEqual(aa.date, dt)
def get_seconds_from_date(date): """ The number of seconds from the epoch. Parameters ---------- date: datetime Returns ------- int """ epoch = datetime.utcfromtimestamp(0) epoch = epoch.replace(tzinfo=pytz.UTC) return int((date - epoch).total_seconds())
def get_minutebar(filesource): output = [] with open(filesource, 'rb') as file_: a = C_PERIODIC_BAR() while (file_.readinto(a) == sizeof(C_PERIODIC_BAR)): open_ = Quote(a.open.bid_price, a.open.bid_size, a.open.ask_price, a.open.ask_size) close_ = Quote(a.close.bid_price, a.close.bid_size, a.close.ask_price, a.close.ask_size) ts = datetime.utcfromtimestamp(a.ts.time.tv_sec + a.ts.time.tv_usec / 1000000) ts_with_tz = datetime(year=ts.year, month=ts.month, day=ts.day, hour=ts.hour, minute=ts.minute, second=ts.second, tzinfo=pytz.UTC) elem = PeriodicBar(open_, close_, a.high, a.low, a.volume, ts_with_tz) output.append(elem) return output ## @brief Make quote objects from futures data # date,product,specific_ticker,open,high,low,close,contract_volume,contract_oi,total_volume,total_oi #
def check_for_update(): if os.path.exists(FILE_UPDATE): mtime = os.path.getmtime(FILE_UPDATE) last = datetime.utcfromtimestamp(mtime).strftime('%Y-%m-%d') today = datetime.utcnow().strftime('%Y-%m-%d') if last == today: return try: with open(FILE_UPDATE, 'a'): os.utime(FILE_UPDATE, None) request = urllib2.Request( CORE_VERSION_URL, urllib.urlencode({'version': __version__}), ) response = urllib2.urlopen(request) with open(FILE_UPDATE, 'w') as update_json: update_json.write(response.read()) except (urllib2.HTTPError, urllib2.URLError): pass
def from_soup(cls, tweet): return cls( user=tweet.find('span', 'username').text[1:], fullname=tweet.find('strong', 'fullname').text, id=tweet['data-item-id'], url = tweet.find('div', 'tweet')['data-permalink-path'], timestamp=datetime.utcfromtimestamp( int(tweet.find('span', '_timestamp')['data-time'])), text=tweet.find('p', 'tweet-text').text or "", replies = tweet.find( 'span', 'ProfileTweet-action--reply u-hiddenVisually').find( 'span', 'ProfileTweet-actionCount')['data-tweet-stat-count'] or '0', retweets = tweet.find( 'span', 'ProfileTweet-action--retweet u-hiddenVisually').find( 'span', 'ProfileTweet-actionCount')['data-tweet-stat-count'] or '0', likes = tweet.find( 'span', 'ProfileTweet-action--favorite u-hiddenVisually').find( 'span', 'ProfileTweet-actionCount')['data-tweet-stat-count'] or '0', )
def format_date(unix_timestamp): """ Return a standardized date format for use in the two1 library. This function produces a localized datetime string that includes the UTC timezone offset. This offset is computed as the difference between the local version of the timestamp (python's datatime.fromtimestamp) and the utc representation of the input timestamp. Args: unix_timestamp (float): a floating point unix timestamp Returns: string: A string formatted with "%Y-%m-%d %H:%M:%S %Z" """ local_datetime = datetime.fromtimestamp(unix_timestamp) utz_offset = local_datetime - datetime.utcfromtimestamp(unix_timestamp) local_date = local_datetime.replace( tzinfo=timezone(utz_offset) ).strftime("%Y-%m-%d %H:%M:%S %Z") return local_date
def listsnapshots(self, sortbycreation=False, sortreverse=False): output = self._srv.invoke("volume-size", "volume", self._volname) self._check_netapp_error(output, "Failed to get volume size information") volsize = self._volsize_to_num(output.child_get_string("volume-size")) pct_limit = round(2147483648*100/(volsize/self._blocksize)) output = self._srv.invoke("snapshot-list-info", "volume", self._volname) self._check_netapp_error(output, "Failed to list snapshots") snapshotlist = output.child_get("snapshots") snapshots = [] if (snapshotlist is not None and snapshotlist): for ss in snapshotlist.children_get(): snapshots.append( {'id': ss.child_get_string("name"), 'creation': datetime.utcfromtimestamp(float(ss.child_get_int("access-time"))), 'numclones': 1 if ss.child_get_string("busy") == "true" else 0, 'space_total': ss.child_get_int("cumulative-total")*self._blocksize if ss.child_get_int("cumulative-percentage-of-total-blocks") < pct_limit else round(volsize*ss.child_get_int("cumulative-percentage-of-total-blocks")/100), 'space_unique': ss.child_get_int("total")*self._blocksize if ss.child_get_int("percentage-of-total-blocks") < pct_limit else round(volsize*ss.child_get_int("percentage-of-total-blocks")/100) } ) if not sortbycreation: return snapshots else: return sorted(snapshots, key=operator.itemgetter('creation'), reverse=sortreverse)
def get_stamp(stampdir): stampfile = os.path.join(stampdir,'stamp') # read first line of stamp file f = open(stampfile,mode='rt') line = f.readline() f.close() # extract timestamp (first field of line) and compare to others stamp = int( line.split().pop(0) ) if not stamp>0: raise BackupError("Can't read `{}'".format(stampfile)) dt = datetime.utcfromtimestamp(stamp) return dt # find a list of backups for a particular machine
def unix_time(zulu_time_string): dt = datetime.strptime(zulu_time_string, "%Y-%m-%dT%H:%M:%SZ") epoch = datetime.utcfromtimestamp(0) delta = dt - epoch return int(delta.total_seconds() * 1000)
def timestamp_to_datetime(timestamp): # UTC datetime return datetime.utcfromtimestamp(timestamp)
def localize_utc_timestamp(utc_datetime): ''' Convert timestamp in UTC to local timezone. ''' now = time.time() offset = datetime.fromtimestamp(now) - datetime.utcfromtimestamp(now) return utc_datetime + offset
def _create_list_zerofill(self, data, start,stop): next = self._to_timestamp(start)/10*10+10 stop = self._to_timestamp(stop)/10*10-180 now = 0 prev = 0 prev_t = next for i in data: # now = self._to_timestamp(i.timestamp) # while now > next: # w = 1.*(now - next)/(now - prev_t) # yield { # "time": datetime.utcfromtimestamp(next).isoformat()+'.000Z', # "count": int(prev * w + i.frequency * (1-w)), # } # next += 10 yield { "time": i.timestamp.isoformat()+'.000Z', "count": int(i.frequency), } prev = i.frequency prev_t = now next = now+10 # while next < stop: # yield { # "time": datetime.utcfromtimestamp(next).isoformat()+'.000Z', # "count": 0, # } # next += 10
def test_can_insert_udts_with_all_datatypes(self): """ Test for inserting all column types into a UserType test_can_insert_udts_with_all_datatypes tests that each cqlengine column type can be inserted into a UserType. It first creates a UserType that has each cqlengine column type, and a corresponding table/Model. It then creates a UserType instance where all the fields have corresponding data, and inserts the UserType as an instance of the Model. Finally, it verifies that each column read from the UserType from Cassandra is the same as the input parameters. @since 2.5.0 @jira_ticket PYTHON-251 @expected_result The UserType is inserted with each column type, and the resulting read yields proper data for each column. @test_category data_types:udt """ sync_table(AllDatatypesModel) self.addCleanup(drop_table, AllDatatypesModel) input = AllDatatypes(a='ascii', b=2 ** 63 - 1, c=bytearray(b'hello world'), d=True, e=datetime.utcfromtimestamp(872835240), f=Decimal('12.3E+7'), g=2.39, h=3.4028234663852886e+38, i='123.123.123.123', j=2147483647, k='text', l=UUID('FE2B4360-28C6-11E2-81C1-0800200C9A66'), m=UUID('067e6162-3b6f-4ae2-a171-2470b63dff00'), n=int(str(2147483647) + '000')) AllDatatypesModel.create(id=0, data=input) self.assertEqual(1, AllDatatypesModel.objects.count()) output = AllDatatypesModel.objects.first().data for i in range(ord('a'), ord('a') + 14): self.assertEqual(input[chr(i)], output[chr(i)])
def test_datetime_timestamp(self): dt_value = 1454520554 self.DatetimeTest.objects.create(test_id=5, created_at=dt_value) dt2 = self.DatetimeTest.objects(test_id=5).first() self.assertEqual(dt2.created_at, datetime.utcfromtimestamp(dt_value))
def test_conversion_specific_date(self): dt = datetime(1981, 7, 11, microsecond=555000) uuid = util.uuid_from_time(dt) from uuid import UUID assert isinstance(uuid, UUID) ts = (uuid.time - 0x01b21dd213814000) / 1e7 # back to a timestamp new_dt = datetime.utcfromtimestamp(ts) # checks that we created a UUID1 with the proper timestamp assert new_dt == dt
def to_python(self, value): if value is None: return if isinstance(value, datetime): if DateTime.truncate_microseconds: us = value.microsecond truncated_us = us // 1000 * 1000 return value - timedelta(microseconds=us - truncated_us) else: return value elif isinstance(value, date): return datetime(*(value.timetuple()[:6])) return datetime.utcfromtimestamp(value)
def test_utc_timestamping(): assert timestamp( datetime(2017, 7, 14, 2, 40).replace(tzinfo=utc) ) == 1500000000 for d in ( datetime.now(), datetime.utcnow(), datetime(1999, 12, 31, 23, 59, 59), datetime(2000, 1, 1, 0, 0, 0) ): assert datetime.utcfromtimestamp( timestamp(d)) - d < timedelta(microseconds=10)
def _x_format(self): """Return the value formatter for this graph""" def datetime_to_str(x): dt = datetime.utcfromtimestamp(x) return self.x_value_formatter(dt) return datetime_to_str
def _opener(self, filename): return lambda: ( open(filename, 'rb'), datetime.utcfromtimestamp(os.path.getmtime(filename)), int(os.path.getsize(filename)) )
def timestamp_to_datetime(self, ts): """Used to convert the timestamp from `get_timestamp` into a datetime object. """ return datetime.utcfromtimestamp(ts + EPOCH)
def get_issue_date(self, header): rv = header.get('iat') if isinstance(rv, number_types): return datetime.utcfromtimestamp(int(rv))
def get_last_qso(cursor) : cursor.execute('SELECT timestamp, callsign, exchange, section, operator.name, band_id \n' 'FROM qso_log JOIN operator WHERE operator.id = operator_id \n' 'ORDER BY timestamp DESC LIMIT 1') last_qso_time = int(time.time()) - 60 message = '' for row in cursor: last_qso_time = row[0] message = 'Last QSO: %s %s %s on %s by %s at %s' % ( row[1], row[2], row[3], constants.Bands.BANDS_TITLE[row[5]], row[4], datetime.utcfromtimestamp(row[0]).strftime('%H:%M:%S')) logging.debug(message) return last_qso_time, message
def get_qsos_per_hour_per_band(cursor): qsos_per_hour = [] qsos_by_band = [0] * constants.Bands.count() slice_minutes = 15 slices_per_hour = 60 / slice_minutes window_seconds = slice_minutes * 60 logging.debug('Load QSOs per Hour by Band') cursor.execute('SELECT timestamp / %d * %d AS ts, band_id, COUNT(*) AS qso_count \n' 'FROM qso_log GROUP BY ts, band_id;' % (window_seconds, window_seconds)) for row in cursor: if len(qsos_per_hour) == 0: qsos_per_hour.append([0] * constants.Bands.count()) qsos_per_hour[-1][0] = row[0] while qsos_per_hour[-1][0] != row[0]: ts = qsos_per_hour[-1][0] + window_seconds qsos_per_hour.append([0] * constants.Bands.count()) qsos_per_hour[-1][0] = ts qsos_per_hour[-1][row[1]] = row[2] * slices_per_hour qsos_by_band[row[1]] += row[2] for rec in qsos_per_hour: # FIXME rec[0] = datetime.utcfromtimestamp(rec[0]) t = rec[0].strftime('%H:%M:%S') return qsos_per_hour, qsos_by_band
def memorized_datetime(seconds): '''Create only one instance of each distinct datetime''' try: return _datetime_cache[seconds] except KeyError: # NB. We can't just do datetime.utcfromtimestamp(seconds) as this # fails with negative values under Windows (Bug #90096) dt = _epoch + timedelta(seconds=seconds) _datetime_cache[seconds] = dt return dt
def wait_rate_limit_reset(now): reset = ( datetime.utcfromtimestamp(GITHUB.x_ratelimit_reset) .replace(tzinfo=timezone.utc) ) delta = reset - now wait = delta.total_seconds() + .5 if wait < 1 or 3500 < wait: # Our data is outdated. Just go on. return 0 logger.warning("Waiting rate limit reset in %s seconds.", wait) time.sleep(wait) GITHUB._instance.x_ratelimit_remaining = -1 return wait
def _yahoo(self, quote, exchange=None): """ Collects data from Yahoo Finance API """ query = quote + "." + exchange.upper() if exchange else quote if not hasattr(self, '_session_y'): self._session_y = requests.Session() r = self._session_y.get(__class__._Y_API + query) if r.status_code == 404: raise LookupError('Ticker symbol not found.') else: r.raise_for_status() jayson = r.json()['optionChain']['result'][0]['quote'] self.ticker = jayson['symbol'] self._price = jayson['regularMarketPrice'] self.currency = jayson['currency'] self.exchange = jayson['exchange'] self.change = jayson['regularMarketChange'] self.cp = jayson['regularMarketChangePercent'] self._last_trade = datetime.utcfromtimestamp(jayson['regularMarketTime']) self.name = jayson['longName'] self.dy = jayson.get('trailingAnnualDividendYield', 0)
def _yahoo(self, quote, d, m, y): """ Collects data from Yahoo Finance API """ epoch = int(round(mktime(date(y, m, d).timetuple())/86400, 0)*86400) if not hasattr(self, '_session_y'): self._session_y = requests.Session() r = self._session_y.get(__class__._Y_API + quote + '?date=' + str(epoch)) if r.status_code == 404: raise LookupError('Ticker symbol not found.') else: r.raise_for_status() json = r.json() try: self.data = json['optionChain']['result'][0]['options'][0] except IndexError: raise LookupError('No options listed for this stock.') self._exp = [datetime.utcfromtimestamp(i).date() for i in json['optionChain']['result'][0]['expirationDates']]
def swift_get_container(request, container_name, with_data=True): if with_data: headers, data = swift_api(request).get_object(container_name, "") else: data = None headers = swift_api(request).head_container(container_name) timestamp = None is_public = False public_url = None try: is_public = GLOBAL_READ_ACL in headers.get('x-container-read', '') if is_public: swift_endpoint = base.url_for(request, 'object-store', endpoint_type='publicURL') parameters = urlparse.quote(container_name.encode('utf8')) public_url = swift_endpoint + '/' + parameters ts_float = float(headers.get('x-timestamp')) timestamp = datetime.utcfromtimestamp(ts_float).isoformat() except Exception: pass container_info = { 'name': container_name, 'container_object_count': headers.get('x-container-object-count'), 'container_bytes_used': headers.get('x-container-bytes-used'), 'timestamp': timestamp, 'data': data, 'is_public': is_public, 'public_url': public_url, } return Container(container_info)
def swift_get_object(request, container_name, object_name, with_data=True, resp_chunk_size=CHUNK_SIZE): if with_data: headers, data = swift_api(request).get_object( container_name, object_name, resp_chunk_size=resp_chunk_size) else: data = None headers = swift_api(request).head_object(container_name, object_name) orig_name = headers.get("x-object-meta-orig-filename") timestamp = None try: ts_float = float(headers.get('x-timestamp')) timestamp = datetime.utcfromtimestamp(ts_float).isoformat() except Exception: pass obj_info = { 'name': object_name, 'bytes': headers.get('content-length'), 'content_type': headers.get('content-type'), 'etag': headers.get('etag'), 'timestamp': timestamp, } return StorageObject(obj_info, container_name, orig_name=orig_name, data=data)
def test_datetime_timestamp(self): dt_value = 1454520554 self.DatetimeTest.objects.create( self.conn, test_id=5, created_at=dt_value, ) dt2 = self.DatetimeTest.objects(test_id=5).first(self.conn) assert dt2.created_at == datetime.utcfromtimestamp(dt_value)