我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用datetime.datetime.fromtimestamp()。
def check(request): return { 'hostname': socket.gethostname(), 'ips': ips, 'cpus': psutil.cpu_count(), 'uptime': timesince(datetime.fromtimestamp(psutil.boot_time())), 'memory': { 'total': filesizeformat(psutil.virtual_memory().total), 'available': filesizeformat(psutil.virtual_memory().available), 'used': filesizeformat(psutil.virtual_memory().used), 'free': filesizeformat(psutil.virtual_memory().free), 'percent': psutil.virtual_memory().percent }, 'swap': { 'total': filesizeformat(psutil.swap_memory().total), 'used': filesizeformat(psutil.swap_memory().used), 'free': filesizeformat(psutil.swap_memory().free), 'percent': psutil.swap_memory().percent } }
def dump_schedule(tasks): """Dump schedule content""" from .utils import load_manager manager = load_manager(tasks) count = 5000 offset = 0 while True: items = manager.queue.get_schedule(offset, count) if not items: break for ts, queue, item in items: print(datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'), queue, json.dumps(item, ensure_ascii=False, sort_keys=True), sep='\t') offset += count
def __init__(self, start_from, **kwargs): self.flags = Flags() if start_from == 0 or start_from == 0xFFFFFFFF: self.timestamp = start_from else: try: datetime.fromtimestamp(start_from) except TypeError as exc: raise_from(InvalidTimestampError('Timestamp invalid (0, 0xFFFFFFFF, or Unix Timestamp'), exc) else: self.timestamp = start_from for k,v in iteritems(kwargs): try: getattr(self.flags.flag, k) setattr(self.flags.flag, k, int(v)) except AttributeError as exc: raise_from(InvalidFlagError('Invalid flag: {}'.format(k)), exc) # save the timestamp and flags for reuse (if needed) Struct.set_ts(self.timestamp) Struct.set_flags(self.flags.from_bytes) # build the request self.event_request = EventRequest(timestamp=self.timestamp,flags=self.flags.from_bytes) self.message_header = MessageHeader(type=2, data=self.event_request.pack()) self.record = self.message_header.pack()
def read_chrome_history(history_db, tm_min=0, tm_max=10000000000000, google=False): command = "SELECT urls.url, title, visit_time, last_visit_time, visit_count FROM urls, visits WHERE (urls.id = visits.id)" \ + " AND ((visit_time/10000000) > %s AND (visit_time/10000000) < %s);" % (tm_min, tm_max) if google: command = "SELECT urls.url, title, visit_time, last_visit_time, visit_count FROM urls, visits WHERE (urls.id = visits.id)" \ + " AND ((visit_time/10000000) > %s AND (visit_time/10000000) < %s) " % (tm_min, tm_max) \ + "AND (title like '%Google%');" res = pull_from_db(history_db, command) data = init_data("chrome_scanner History", len(res)) + init_table_header("./templates/init_chrome_history_html.html") for row in res: visit_time = dt.fromtimestamp(row[2]/10000000) last_visit_time = dt.fromtimestamp(row[3]/10000000) line = "<tr><td>%s</td><td>%s</td><td>%s</td><td>%s</td><td>%s</td></tr>" % (visit_time, last_visit_time, row[1], row[0], row[4]) data += line data += close_table_html() saveResult("chrome_history.html", data)
def read_chrome_cookies(cookies_db, tm_min=0, tm_max=10000000000000, host=None): command = "SELECT name, host_key, value, creation_utc, expires_utc, last_access_utc, has_expires from cookies " \ + "WHERE (creation_utc/10000000 > %s AND creation_utc/10000000 < %s);" % (tm_min, tm_max) if host: command = command[:-1] + " AND (host_key LIKE '%s');" % host res = pull_from_db(cookies_db, command) data = init_data("chrome_scanner Cookies", len(res)) + init_table_header("./templates/init_chrome_cookies_html.html") exp_dict = {"0" : "No", "1" : "Yes"} for row in res: creation_date = dt.fromtimestamp(row[3]/10000000) exp_date = dt.fromtimestamp(row[4]/10000000) last_access_date = dt.fromtimestamp(row[5]/10000000) exp_stat = exp_dict[str(row[6])] line = "<tr><td>%s</td><td>%s</td><td>%s</td><td>%s</td>" % (row[1], row[0], row[2], creation_date) \ + "<td>%s</td><td>%s</td><td>%s</td></tr>" % (exp_date, last_access_date, exp_stat) data += line data += close_table_html() saveResult("chrome_cookies.html", data)
def read_chrome_logins(logins_db, tm_min=0, tm_max=10000000000000, domain=None): command = "SELECT action_url, username_value, password_value, signon_realm, date_created, times_used, form_data FROM logins " \ + "WHERE (date_created/10000000 > %s AND date_created/10000000 < %s);" % (tm_min, tm_max) if domain: command = command[:-1] + " AND (signon_realm LIKE '%s');" % domain res = pull_from_db(logins_db, command) data = init_data("chrome_scanner Logins", len(res)) + init_table_header("./templates/init_chrome_logins_html.html") for row in res: creation_date = dt.fromtimestamp(row[4]/10000000) form_data = row[6].decode("ISO-8859-1") line = "<tr><td>%s</td><td>%s</td><td>%s</td><td>%s</td>" % (creation_date, row[3], row[0], row[1]) \ + "<td>%s</td><td>%s</td><td>%s</td></tr>" % (row[2].decode("ISO-8859-1"), row[5], form_data) data += line data += close_table_html() saveResult("chrome_logins.html", data)
def weather7(bot,trigger): location = trigger.group(2) if not location: location, forecast, postal, error = get_forecast(bot,trigger) else: location, forecast, postal, error = get_forecast(bot,trigger,location) if error: return summary = forecast.json()['daily']['summary'] sevendays = [] weekdays = {1:'M',2:'Tu',3:'W',4:'Th',5:'F',6:'Sa',7:'Su'} for day in forecast.json()['daily']['data']: wkday = weekdays[datetime.fromtimestamp(int(day['time'])).isoweekday()] maxtemp = round(day['temperatureMax']) mintemp = round(day['temperatureMin']) sevendays.append("{0}:({1}|{2})".format(wkday,mintemp,maxtemp)) del sevendays[0] sevendays = ", ".join(sevendays) bot.say("{0}: [{1}] {2}".format(location, summary, str(sevendays)))
def test_get_schemas_created_after_date_filter(self, schematizer): created_after = self._get_created_after() creation_timestamp = long( (created_after - datetime.fromtimestamp(0, created_after.tzinfo)).total_seconds() ) day_two = (2016, 6, 10, 19, 10, 26, 0) created_after2 = datetime(*day_two, tzinfo=created_after.tzinfo) creation_timestamp2 = long( (created_after2 - datetime.fromtimestamp(0, created_after.tzinfo)).total_seconds() ) schemas = schematizer.get_schemas_created_after_date( creation_timestamp ) schemas_later = schematizer.get_schemas_created_after_date( creation_timestamp2 ) assert len(schemas) >= len(schemas_later)
def scheduled_times(self, earliest_time='now', latest_time='+1h'): """Returns the times when this search is scheduled to run. By default this method returns the times in the next hour. For different time ranges, set *earliest_time* and *latest_time*. For example, for all times in the last day use "earliest_time=-1d" and "latest_time=now". :param earliest_time: The earliest time. :type earliest_time: ``string`` :param latest_time: The latest time. :type latest_time: ``string`` :return: The list of search times. """ response = self.get("scheduled_times", earliest_time=earliest_time, latest_time=latest_time) data = self._load_atom_entry(response) rec = _parse_atom_entry(data) times = [datetime.fromtimestamp(int(t)) for t in rec.content.scheduled_times] return times
def compare_attr(self, neoobj, nixobj): if neoobj.name: if isinstance(neoobj, (AnalogSignal, IrregularlySampledSignal)): nix_name = ".".join(nixobj.name.split(".")[:-1]) else: nix_name = nixobj.name self.assertEqual(neoobj.name, nix_name) self.assertEqual(neoobj.description, nixobj.definition) if hasattr(neoobj, "rec_datetime") and neoobj.rec_datetime: self.assertEqual(neoobj.rec_datetime, datetime.fromtimestamp(nixobj.created_at)) if hasattr(neoobj, "file_datetime") and neoobj.file_datetime: self.assertEqual(neoobj.file_datetime, datetime.fromtimestamp( nixobj.metadata["file_datetime"])) if neoobj.annotations: nixmd = nixobj.metadata for k, v, in neoobj.annotations.items(): if isinstance(v, pq.Quantity): self.assertEqual(nixmd.props[str(k)].unit, str(v.dimensionality)) np.testing.assert_almost_equal(nixmd[str(k)], v.magnitude) else: self.assertEqual(nixmd[str(k)], v)
def open(self): """ Setup the internal structure. NB : Call this function before extracting data from a file. """ if self.file : self.file.close() try : self.file = open(self.path, 'rb') except Exception as e: raise Exception("python couldn't open file %s : %s" % (self.path, e)) self.file_size = path.getsize(self.file.name) self.creation_date = datetime.fromtimestamp(path.getctime(self.file.name)) self.modification_date = datetime.fromtimestamp(path.getmtime(self.file.name)) self.nomenclature = self.get_nomenclature() self.factory = self.get_factory() self.layout = self.create_layout()
def fix_task_date(): """Fix Date format in Task.""" import re from datetime import datetime with app.app_context(): query = text('''SELECT id, created FROM task WHERE created LIKE ('%Date%')''') results = db.engine.execute(query) tasks = results.fetchall() for task in tasks: # It's in miliseconds timestamp = int(re.findall(r'\d+', task.created)[0]) print timestamp # Postgresql expects this format 2015-05-21T13:19:06.471074 fixed_created = datetime.fromtimestamp(timestamp/1000)\ .replace(microsecond=timestamp%1000*1000)\ .strftime('%Y-%m-%dT%H:%M:%S.%f') query = text('''UPDATE task SET created=:created WHERE id=:id''') db.engine.execute(query, created=fixed_created, id=task.id)
def _get_local_timestamp(record): """ Get the record's UTC timestamp as an ISO-formatted date / time string. :param record: The LogRecord. :type record: StructuredLogRecord :return: The ISO-formatted date / time string. :rtype: str """ timestamp = datetime.fromtimestamp( timestamp=record.created, tz=tzlocal() ) return timestamp.isoformat(sep=' ')
def print_posts(self): with open('../data/index.html', 'w') as f: for post in self.model1.reddit_post_list: date = str(datetime.fromtimestamp(post.created_utc)) post_url = post.link_url + post.id post_karma = str(post.score) f.write('<h3 style="color: #5e9ca0; display: inline;">' + 'username:  ' + self.model1.reddit_username + '  </h3>\n') f.write('<h3 style="color: #5e9ca0; display: inline;">' + 'date:  ' + date + '  ' + 'karma:  ' + post_karma + ' </h3>\n') f.write('<p style="color: #5e9ca0; display: inline;"><a href="' + post_url + '"><img\n') f.write('src="https://cdn4.iconfinder.com/data/icons/web-links/512/41-512.png" alt="" ' 'width="14" height="14" /></a></p>') f.write('<div class="md">\n') f.write(post.body_html.encode('utf-8') + '\n') f.write('</div>\n<hr />\n')
def last_seen(msg): entries = [] with open('/home/archangelic/irc/log', 'rb') as f: lines = f.readlines() for line in lines: try: l = line.split(b'\t') d = l[0] u = l[1] if u.decode() == msg.arg[:9]: entries.append(float(d)) except: continue if entries: entries.sort() last_entry = entries[-1] if msg.arg in msg.nick_list: leader = '{} is currently online and in the channel!'.format(msg.arg) else: leader = msg.arg out = '{} last spoke in chat on {}'.format(leader, datetime.fromtimestamp(last_entry)) else: out = 'Sorry, {} was not found'.format(msg.arg) return pinhook.plugin.message(out)
def __setup(self): global module_logger filename = store_from_module(self.__calling_module) #module_logger.info("Eventor store file: %s" % filename) db_mode = DbMode.write if self.__run_mode==RunMode.restart else DbMode.append #self.__db_daly_adj=0 #if self.__run_mode != RunMode.restart: # try: # db_mtime=os.path.getmtime(self.__filename) # except OSError: # pass # else: # self.__db_daly_adj=(datetime.now() - datetime.fromtimestamp(db_mtime)).total_seconds() self.db = DbApi(config=self.__config, modulefile=filename, shared_db=self.shared_db, run_id=self.run_id, userstore=self.store, mode=db_mode, echo=False, logger=module_logger) #self.debug) self.__requestors = vrp.Requestors() if self.__run_mode == RunMode.restart: self.__write_info() else: self.__read_info(run_mode=self.__run_mode, recovery_run=self.__recovery_run)
def refresh(self): try: #open the data url self.req = urlopen(self.data_url) #read data from the url self.raw_data = self.req.read() #load in the json self.json_data = json.loads(self.raw_data.decode()) #get time from json self.time = datetime.fromtimestamp(self.parser.time(self.json_data)) #load all the aircarft self.aircraft = self.parser.aircraft_data(self.json_data, self.time) except Exception: print("exception in FlightData.refresh():") traceback.print_exc()
def process_(child) -> Tuple[str, datetime]: name, text = child.name, child.get_text() try: # Try converting text to an integer text = int(text) # Ignore if we get a value we can't cast to int except ValueError: pass if name == "my_last_updated": text = datetime.fromtimestamp(float(text)) if name in ('my_finish_date', "my_start_date", "series_end", "series_start"): try: text = datetime.strptime(text, "%Y-%m-%d") except ValueError: text = datetime.fromtimestamp(0) # Return name and text in tuple return name, text
def preprocess_message(self, request): ''' ???????? ''' component = get_component() content = component.crypto.decrypt_message( request.body, request.query_params['msg_signature'], int(request.query_params['timestamp']), int(request.query_params['nonce']) ) message = xmltodict.parse(to_text(content))['xml'] cc = json.loads(json.dumps(message)) cc['CreateTime'] = int(cc['CreateTime']) cc['CreateTime'] = datetime.fromtimestamp(cc['CreateTime']) if 'MsgId' in cc: cc['MsgId'] = int(cc['MsgId']) return cc
def _real_extract(self, url): list_id = self._match_id(url) info = self.query_api( 'playlist/detail?id=%s&lv=-1&tv=-1' % list_id, list_id, 'Downloading playlist data')['result'] name = info['name'] desc = info.get('description') if info.get('specialType') == 10: # is a chart/toplist datestamp = datetime.fromtimestamp( self.convert_milliseconds(info['updateTime'])).strftime('%Y-%m-%d') name = '%s %s' % (name, datestamp) entries = [ self.url_result('http://music.163.com/#/song?id=%s' % song['id'], 'NetEaseMusic', song['id']) for song in info['tracks'] ] return self.playlist_result(entries, list_id, name, desc)
def set_nasa_wallpaper(): st = datetime.fromtimestamp(time.time()).strftime('%y%m%d') url = URL07.format(st) r = requests.get(url) if r.status_code == 200: try: parser = etree.HTMLParser(recover=True) html = etree.HTML(r.content, parser) images = html.iter('img') if images is not None: images = list(images) if len(images) > 0: image_url = images[0].getparent().attrib['href'] image_url = 'https://apod.nasa.gov/' + image_url if download(image_url) is True: set_background(comun.POTD) except Exception as e: print(e)
def getcommits(self): """Return meta data about exitsting commits. Returns: A list containing dictionaries with commit meta data """ commits = [] if len(self.repo.listall_reference_objects()) > 0: for commit in self.repo.walk(self.repo.head.target, GIT_SORT_REVERSE): commits.append({ 'id': str(commit.oid), 'message': str(commit.message), 'commit_date': datetime.fromtimestamp( commit.commit_time).strftime('%Y-%m-%dT%H:%M:%SZ'), 'author_name': commit.author.name, 'author_email': commit.author.email, 'parents': [c.hex for c in commit.parents], } ) return commits
def post(self): try: args = parser.parse_args() pass_md5 = hashlib.md5(args['password'].encode('utf-8')).hexdigest() user = User.query.filter(User.login == args['login'], User.password == pass_md5).one_or_none() if user is None: log.info("Invalid login/password") return {'state': 'fail', 'message': 'No such user or password invalid'} new_token = Token(token=str(uuid.uuid4()), user_id=user.id, device=args['device']) if args['expires'] is not None: new_token.expires_at = datetime.fromtimestamp(args['expires'] / 1000.0) db.session.add(new_token) db.session.commit() log.info("Created new token: %s" % new_token.token) return {'token': new_token.token} except Exception as e: db.session.rollback() log.exception(e) return {'state': 'fail', 'message': str(e)}, 500 # sign up
def put(self): try: args = parser.parse_args() pass_md5 = hashlib.md5(args['password'].encode('utf-8')).hexdigest() new_user = User(login=args['login'], password=pass_md5) db.session.add(new_user) new_token = Token(token=str(uuid.uuid4()), user=new_user, device=args['device']) if args['expires'] is not None: new_token.expires_at = datetime.fromtimestamp(args['expires'] / 1000.0) db.session.add(new_token) db.session.commit() return {'token': new_token.token} except Exception as e: db.session.rollback() log.error(e) return {'state': 'fail', 'message': str(e)}, 500 # close session(remove token)
def edit_table(): from datetime import datetime as dt from collections import OrderedDict as odict moz_places_fields = ('id', 'url', 'title', 'rev_host', 'visit_count', 'hidden', 'typed', 'favicon_id', 'frecency', 'last_visit_date', 'guid', 'foreign_count', 'url_hash', 'description', 'preview_image_url', 'last_visit_date_readable' ) bindings_placeholders = '?, ' * len(moz_places_fields) with sqlite3.connect('db_for_testing_search.sqlite') as source_conn: source_conn.row_factory = sqlite3.Row query_source_result = source_conn.execute('SELECT * FROM moz_places') with sqlite3.connect('db_for_testing_search_new.sqlite') as sink_conn: try: query_sink_result = sink_conn.execute(f"CREATE TABLE moz_places ({', '.join(moz_places_fields)})") except Exception as excep: print(excep) finally: for row in query_source_result : row = odict(row) row.setdefault('last_visit_date_readable', None) try: row['last_visit_date_readable'] = dt.fromtimestamp(row['last_visit_date'] // 10**6).strftime('%x %X') except TypeError: pass sink_conn.execute(f'INSERT INTO moz_places VALUES ({bindings_placeholders[:-2]})', row)
def call(self, milliseconds, jrdds): # Clear the failure self.failure = None try: if self.ctx is None: self.ctx = SparkContext._active_spark_context if not self.ctx or not self.ctx._jsc: # stopped return # extend deserializers with the first one sers = self.deserializers if len(sers) < len(jrdds): sers += (sers[0],) * (len(jrdds) - len(sers)) rdds = [self.rdd_wrap_func(jrdd, self.ctx, ser) if jrdd else None for jrdd, ser in zip(jrdds, sers)] t = datetime.fromtimestamp(milliseconds / 1000.0) r = self.func(t, *rdds) if r: return r._jrdd except: self.failure = traceback.format_exc()
def send_message(self, **kwargs): string = '' parameters = kwargs["param"] if parameters["key"]: string = string + " Parameter '{}' for key={} out of range [{},{}] and equal {}".format( parameters["field"], parameters["key"], parameters["lower_bound"], parameters["upper_bound"], parameters["value"]) else: string = string + " Parameter '{}' out of range [{},{}] and equal {}".format( parameters["field"], parameters["lower_bound"], parameters["upper_bound"], parameters["value"]) string = kwargs["AnalysisModule"] + ": Time: {}".format( datetime.fromtimestamp(int(kwargs["timestamp"]))) + "." + string print(string)
def _song_from_info(self, info): if "id" in info: song_id = info['id'] else: song_id = info['storeId'] songs = self._songs if song_id in songs: return songs[song_id] artist = info['artist'] title = info['title'] duration_millis = int(info['durationMillis']) duration = datetime.fromtimestamp(duration_millis / 1000).strftime("%M:%S") url = None if "albumArtRef" in info: ref = info["albumArtRef"] if ref: ref = ref[0] if "url" in ref: url = ref["url"] song = Song(song_id, self, title, artist, url, " - ".join([artist, title]), duration) songs[song_id] = song return song
def _song_from_info(self, info) -> Song: """ Create a song object from the json dict returned by the HTTP API. :param info: the json dict :return: a Song object, or None """ song_id = str(info.id) songs = self._songs if song_id in songs: return songs[song_id] if not info.streamable and not info.downloadable: return None artist = info.user['username'] title = info.title url = info.artwork_url duration_millis = info.duration duration = datetime.fromtimestamp(duration_millis / 1000).strftime("%M:%S") song = Song(song_id, self, title, artist, url, " - ".join([artist, title]), duration=duration) songs[song_id] = song return song
def __init__(self, id=None, created=None, shards=None, locked=None, user=None, size=None, storageSize=None): self.id = id self.locked = locked self.user = user self.size = size self.storageSize = storageSize if created is not None: self.created = datetime.fromtimestamp( strict_rfc3339.rfc3339_to_timestamp(created)) else: self.created = None if shards is None: self.shards = [] else: self.shards = shards
def __init__( self, token=None, bucket=None, operation=None, expires=None, encryptionKey=None, id=None, ): self.token = token self.bucket = Bucket(id=bucket) self.operation = operation self.id = id if expires is not None: self.expires = datetime.fromtimestamp( strict_rfc3339.rfc3339_to_timestamp(expires)) else: self.expires = None self.encryptionKey = encryptionKey
def parse_job_list_page(self, response): self.get_connector().log(self.name, self.ACTION_CRAWL_LIST, response.url) feed_parser = feedparser.parse(response.body) for job_entry in feed_parser.entries: job_url = job_entry.link job_publication_date = datetime.fromtimestamp(mktime(job_entry.published_parsed)) job_publication_time = mktime(job_publication_date.timetuple()) last_job_publication_time = mktime(self._last_job_date.timetuple()) if job_publication_time <= last_job_publication_time: self.get_connector().log(self.name, self.ACTION_MARKER_FOUND, "%s <= %s" % (job_publication_time, last_job_publication_time)) return prepared_job = JobItem() request = Request(job_url, self.parse_job_page) request.meta['item'] = prepared_job prepared_job['title'] = job_entry.title prepared_job['description'] = job_entry.description prepared_job['publication_datetime'] = job_publication_date yield request
def refresh_user_token(user_social): """ Utility function to refresh the access token if is (almost) expired Args: user_social (UserSocialAuth): a user social auth instance """ try: last_update = datetime.fromtimestamp(user_social.extra_data.get('updated_at'), tz=pytz.UTC) expires_in = timedelta(seconds=user_social.extra_data.get('expires_in')) except TypeError: _send_refresh_request(user_social) return # small error margin of 5 minutes to be safe error_margin = timedelta(minutes=5) if now_in_utc() - last_update >= expires_in - error_margin: _send_refresh_request(user_social)
def datetime_filter(t): date_time = datetime.fromtimestamp(t) str_date = date_time.strftime("%Y-%m-%d %X") delta = int(time.time() - t) if delta < 60: return u'<span title="{}">1???</span>'.format(str_date) if delta < 3600: return u'<span title="{}">{}???</span>'.format(str_date, delta // 60) if delta < 86400: return u'<span title="{}">{}???</span>'.format(str_date, delta // 3600) if delta < 604800: return u'<span title="{}">{}??</span>'.format(str_date, delta // 86400) #dt = datetime.fromtimestamp(t) return u'<span title="{}">{}</span>'.format(str_date, date_time.strftime("%Y?%m?%d?")) #def index(request): #return web.Response(body=b'<h1>Awesome Python3 Web</h1>', content_type='text/html')
def walkSubNodes(self, vcn): logging.debug("Inside walkSubNodes: vcn %s" % vcn) entries = self.parseIndexBlocks(vcn) files = [] for entry in entries: if entry.isSubNode(): files += self.walkSubNodes(entry.getVCN()) else: if len(entry.getKey()) > 0 and entry.getINodeNumber() > 16: fn = NTFS_FILE_NAME_ATTR(entry.getKey()) if fn['FileNameType'] != FILE_NAME_DOS: #inode = INODE(self.NTFSVolume) #inode.FileAttributes = fn['FileAttributes'] #inode.FileSize = fn['DataSize'] #inode.LastDataChangeTime = datetime.fromtimestamp(getUnixTime(fn['LastDataChangeTime'])) #inode.INodeNumber = entry.getINodeNumber() #inode.FileName = fn['FileName'].decode('utf-16le') #inode.displayName() files.append(fn) # if inode.FileAttributes & FILE_ATTR_I30_INDEX_PRESENT and entry.getINodeNumber() > 16: # inode2 = self.NTFSVolume.getINode(entry.getINodeNumber()) # inode2.walk() return files
def datetime_from_utc(cls, metadata, element=None): """ Generates a homan readable time from an items UTC timestamp :param metadata: Item metadata :type metadata: dict :param details: Item details :type details: dict :returns: tuple -- Match date & match time """ date_container = None if metadata.get('scheduled_start'): date_container = metadata.get('scheduled_start', {}) elif element is not None: date_container = element.get('scheduled_start', {}) if date_container is None: return (None, None) timestamp = float(date_container.get('date')) match_datetime = datetime.fromtimestamp(timestamp) match_date = match_datetime.strftime('%d.%m.%Y') match_time = match_datetime.strftime('%H:%M') return (match_date, match_time)
def get_task_from_cache(domain_to_request, provider): """ Check if there is already a pending/resolved similar request """ defendant = service = expiration = ticket = None for entry in utils.redis.lrange(common.CDN_REQUEST_REDIS_QUEUE % provider, 0, -1): entry = json.loads(entry, object_pairs_hook=OrderedDict) if entry['domain'] == domain_to_request: defendant = Defendant.objects.filter( id=entry['defendant_id'] ).last() service = Service.objects.filter( id=entry['service_id'] ).last() ticket = Ticket.objects.get( id=entry['request_ticket_id'] ) expiration = datetime.fromtimestamp(entry['expiration']) break return defendant, service, ticket, expiration
def logout(request): """ Logout a user """ try: token = request.environ['HTTP_X_API_TOKEN'] except (KeyError, IndexError, TypeError): raise BadRequest('Missing HTTP X-Api-Token header') try: data = jwt.decode(token, settings.SECRET_KEY) data = json.loads(CRYPTO.decrypt(str(data['data']))) user = User.objects.get(id=data['id']) user.last_login = datetime.fromtimestamp(0) user.save() return {'message': 'Logged out'} except (utils.CryptoException, KeyError, jwt.DecodeError, jwt.ExpiredSignature, User.DoesNotExist): raise BadRequest('Invalid token')
def load_jobs(bot, job_queue): """Load all existing jobs (pending reminders) into the given 'job_queue', and apologise if we missed any. """ if not os.path.isdir(bot.username): return now = datetime.now() for chat_id in os.listdir(bot.username): apologise = False for reminder in os.listdir(get_user_dir(bot, chat_id)): reminder_file = os.path.join(bot.username, chat_id, reminder) reminder_date = datetime.fromtimestamp(int(reminder)) if reminder_date > now: queue_message(job_queue, reminder_date, int(chat_id), reminder_file) else: apologise = True os.remove(reminder_file) if apologise: bot.send_message(chat_id, text='Oops… looks like I missed some reminders. Sorry :(')
def status(bot, update): directory = get_user_dir(bot, update.message.chat_id) reminders = list(sorted(os.listdir(directory))) if not reminders: update.message.reply_text('You have no pending reminders. Hooray ^_^') return reminder = reminders[0] diff = format_time_diff(datetime.fromtimestamp(int(reminder))) with open(os.path.join(directory, reminder)) as f: text = f.read() text = ':\n' + text if text else '.' amount = ('{} reminders' if len(reminders) > 1 else '{} reminder')\ .format(len(reminders)) update.message.reply_text('{}. Next reminder in {}{}' .format(amount, diff, text))
def save(self): """Saves the logger's buffered points to a CSV file. If the file exists, then the data points are appended. """ #We need to see if enough time has passed since the last from datetime import datetime from time import time if self.lastsave is not None: elapsed = (datetime.fromtimestamp(time()) - datetime.fromtimestamp(self.lastsave)).total_seconds() else: elapsed = self.logfreq + 1 if elapsed > self.logfreq: self._csv_append() self.lastsave = time()
def createTimestamp(): return datetime.fromtimestamp(time()).strftime('%Y-%m-%d %H:%M:%S')
def writeStdout(s): ts = datetime.fromtimestamp(time()).strftime('%Y-%m-%d %H:%M:%S') sys.stdout.write('\t'.join([ts, s]) + '\n') sys.stdout.flush()
def writeStderr(s): ts = datetime.fromtimestamp(time()).strftime('%Y-%m-%d %H:%M:%S') sys.stderr.write('\t'.join([ts, s]) + '\n') sys.stderr.flush()
def estimate_time(builds): """Update the working build with an estimated completion time. Takes a simple average over the previous builds, using those whose outcome is ``'passed'``. Arguments: builds (:py:class:`list`): All builds. """ try: index, current = next( (index, build) for index, build in enumerate(builds[:4]) if build['outcome'] == 'working' ) except StopIteration: return # no in-progress builds if current.get('started_at') is None: current['elapsed'] = 'estimate not available' return usable = [ current for current in builds[index + 1:] if current['outcome'] == 'passed' and current['duration'] is not None ] if not usable: current['elapsed'] = 'estimate not available' return average_duration = int(sum(build['duration'] for build in usable) / float(len(usable))) finish = current['started_at'] + average_duration remaining = (datetime.fromtimestamp(finish) - datetime.now()).total_seconds() if remaining >= 0: current['elapsed'] = '{} left'.format(naturaldelta(remaining)) else: current['elapsed'] = 'nearly done'
def current_time(): ts = time.time() return datetime.fromtimestamp(ts).strftime(TIME_FORMAT) # converts a string to a timestamp