我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用datetime.datetime.utcnow()。
def add_credentials_options(self): """Add credentials to the Options dictionary (if necessary).""" api_username = self.configuration.username api_key = self.configuration.password self.options['api_username'] = api_username if self.configuration.secure_auth == True: timestamp = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') uri = '/' + self.configuration.sub_url + ('/' if self.domain_name.strip()=='' else '/' + self.domain_name + '/') + self.service_name self.options['timestamp'] = timestamp params = ''.join([api_username, timestamp, uri]) self.options['signature'] = hmac.new(api_key, params, digestmod=hashlib.sha1).hexdigest() else: self.options['api_key'] = api_key
def check_if_media_sync_offset_satisfied(logger, settings, audit): """ Check if the media sync offset is satisfied. The media sync offset is a duration in seconds specified in the configuration file. This duration is the amount of time audit media is given to sync up with SafetyCulture servers before this tool exports the audit data. :param logger: The logger :param settings: Settings from command line and configuration file :param audit: Audit JSON :return: Boolean - True if the media sync offset is satisfied, otherwise, returns false. """ modified_at = dateutil.parser.parse(audit['modified_at']) now = datetime.utcnow() elapsed_time_difference = (pytz.utc.localize(now) - modified_at) # if the media_sync_offset has been satisfied if not elapsed_time_difference > timedelta(seconds=settings[MEDIA_SYNC_OFFSET_IN_SECONDS]): logger.info('Audit {0} modified too recently, some media may not have completed syncing. Skipping export until next sync cycle'.format( audit['audit_id'])) return False return True
def elapsed(self): """ Returns the elapsed time (as a float) of the threaded execution which includes the number of microseconds. """ if self._execution_begin is None: # No elapsed time has taken place yet return 0.0 if self._execution_finish is not None: # Execution has completed, we only want to calculate # the execution time. elapsed_time = self._execution_finish - self._execution_begin else: # Calculate Elapsed Time elapsed_time = datetime.utcnow() - self._execution_begin elapsed_time = (elapsed_time.days * 86400) \ + elapsed_time.seconds \ + (elapsed_time.microseconds/1e6) return elapsed_time
def export(metadata, start, end, container_image_pattern): queries = [] metadata["start"] = start.isoformat() + "Z" metadata["end"] = end.isoformat() + "Z" metadata["services"] = [] ts = datetime.utcnow().strftime("%Y%m%d%H%M%S-") path = os.path.join(metadata["metrics_export"], ts + metadata["measurement_name"]) if not os.path.isdir(path): os.makedirs(path) for app in APPS: metadata["services"].append(dump_app(app, path, start, end, container_image_pattern)) with open(os.path.join(path, "metadata.json"), "w+") as f: json.dump(metadata, f, cls=Encoder, sort_keys=True, indent=4) f.flush()
def insert_earned_achievement(aid, data): """ Store earned achievement for a user/team. Args: aid: the achievement id data: the data necessary to assess the achievement must include tid, uid """ db = api.common.get_conn() tid, uid = data.pop("tid"), data.pop("uid") name, description = data.pop("name"), data.pop("description") db.earned_achievements.insert({ "aid": aid, "tid": tid, "uid": uid, "data": data, "name": name, "description": description, "timestamp": datetime.utcnow().timestamp(), "seen": False })
def block_before_competition(return_result): """ Wraps a routing function that should be blocked before the start time of the competition """ def decorator(f): """ Inner decorator """ @wraps(f) def wrapper(*args, **kwds): if datetime.utcnow().timestamp() > api.config.get_settings()["start_time"].timestamp(): return f(*args, **kwds) else: return return_result return wrapper return decorator
def block_after_competition(return_result): """ Wraps a routing function that should be blocked after the end time of the competition """ def decorator(f): """ Inner decorator """ @wraps(f) def wrapper(*args, **kwds): if datetime.utcnow().timestamp() < api.config.get_settings()["end_time"].timestamp(): return f(*args, **kwds) else: return return_result return wrapper return decorator
def LOGEXCEPTION(message): if LOGGER: LOGGER.exception(message) else: print( '%sZ [EXC!]: %s\nexception was: %s' % ( datetime.utcnow().isoformat(), message, format_exc() ) ) ####################### ## UTILITY FUNCTIONS ## ####################### # Utility function to report best scores # modified from a snippet taken from: # http://scikit-learn.org/stable/auto_examples/model_selection/plot_randomized_search.html
def LOGEXCEPTION(message): if LOGGER: LOGGER.exception(message) else: print( '%sZ [EXC!]: %s\nexception was: %s' % ( datetime.utcnow().isoformat(), message, format_exc() ) ) ######################## ## COLUMN DEFINITIONS ## ######################## # LC column definitions # the first elem is the column description, the second is the format to use when # writing a CSV LC column, the third is the type to use when parsing a CSV LC # column
def LOGEXCEPTION(message): if LOGGER: LOGGER.exception(message) else: print( '%sZ [EXC!]: %s\nexception was: %s' % ( datetime.utcnow().isoformat(), message, format_exc() ) ) ################### ## USEFUL CONFIG ## ################### # used to find HATIDs
def LOGEXCEPTION(message): if LOGGER: LOGGER.exception(message) else: print( '%sZ [EXC!]: %s\nexception was: %s' % ( dtime.utcnow().isoformat(), message, format_exc() ) ) ############################################### ## MAGIC CONSTANTS FOR 2MASS TO COUSINS/SDSS ## ############################################### # converting from JHK to BVRI
def LOGEXCEPTION(message): if LOGGER: LOGGER.exception(message) else: print( '%sZ [EXC!]: %s\nexception was: %s' % ( datetime.utcnow().isoformat(), message, format_exc() ) ) ################### ## LOCAL IMPORTS ## ################### # LC reading functions
def LOGEXCEPTION(message): if LOGGER: LOGGER.exception(message) else: print( '%sZ [EXC!]: %s\nexception was: %s' % ( datetime.utcnow().isoformat(), message, format_exc() ) ) ################### ## LOCAL IMPORTS ## ###################
def LOGEXCEPTION(message): if LOGGER: LOGGER.exception(message) else: print( '%sZ [EXC!]: %s\nexception was: %s' % ( datetime.utcnow().isoformat(), message, format_exc() ) ) ########################### ## FLARE MODEL FUNCTIONS ## ###########################
def LOGEXCEPTION(message): if LOGGER: LOGGER.exception(message) else: print( '%sZ [EXC!]: %s\nexception was: %s' % ( datetime.utcnow().isoformat(), message, format_exc() ) ) ######################## ## SOME OTHER IMPORTS ## ########################
def LOGEXCEPTION(message): if LOGGER: LOGGER.exception(message) else: print( '%sZ [EXC!]: %s\nexception was: %s' % ( datetime.utcnow().isoformat(), message, format_exc() ) ) ####################### ## UTILITY FUNCTIONS ## ####################### # this is from Tornado's source (MIT License): # http://www.tornadoweb.org/en/stable/_modules/tornado/escape.html#squeeze
def get_package_loader(self, package, package_path): from pkg_resources import DefaultProvider, ResourceManager, \ get_provider loadtime = datetime.utcnow() provider = get_provider(package) manager = ResourceManager() filesystem_bound = isinstance(provider, DefaultProvider) def loader(path): if path is None: return None, None path = posixpath.join(package_path, path) if not provider.has_resource(path): return None, None basename = posixpath.basename(path) if filesystem_bound: return basename, self._opener( provider.get_resource_filename(manager, path)) return basename, lambda: ( provider.get_resource_stream(manager, path), loadtime, 0 ) return loader
def container_sas(self): container_name = self._create_container() self.service.create_blob_from_text(container_name, 'blob1', b'hello world') # Access only to the blobs in the given container # Read permissions to access blobs # Expires in an hour token = self.service.generate_container_shared_access_signature( container_name, ContainerPermissions.READ, datetime.utcnow() + timedelta(hours=1), ) # Create a service and use the SAS sas_service = BlockBlobService( account_name=self.account.account_name, sas_token=token, ) blob = sas_service.get_blob_to_text(container_name, 'blob1') content = blob.content # hello world self.service.delete_container(container_name)
def test_streams_tasks(): mock_scheduler = Mock() streams_tasks(mock_scheduler) mock_scheduler.schedule.assert_called_once_with( scheduled_time=datetime.utcnow(), func=groom_redis_precaches, interval=60 * 60 * 24, )
def streams_tasks(scheduler): scheduler.schedule( scheduled_time=datetime.utcnow(), func=groom_redis_precaches, interval=60*60*24, # a day )
def export_actions(logger, settings, sc_client): """ Export all actions created after date specified :param logger: The logger :param settings: Settings from command line and configuration file :param sc_client: instance of safetypy.SafetyCulture class """ logger.info('Exporting iAuditor actions') last_successful_actions_export = get_last_successful_actions_export(logger) actions_array = sc_client.get_audit_actions(last_successful_actions_export) if actions_array is not None: logger.info('Found ' + str(len(actions_array)) + ' actions') save_exported_actions_to_csv_file(logger, settings[EXPORT_PATH], actions_array) utc_iso_datetime_now = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.000Z') update_actions_sync_marker_file(logger, utc_iso_datetime_now)
def utctimestamp(): ts = datetime.utcnow() - datetime(1970, 1, 1) return ts.seconds + ts.days * 24 * 3600
def send_remind_activating_email(username): #admin_email_address = env.getenv('ADMIN_EMAIL_ADDRESS') nulladdr = ['\'\'', '\"\"', ''] email_from_address = settings.get('EMAIL_FROM_ADDRESS') admin_email_address = settings.get('ADMIN_EMAIL_ADDRESS') if (email_from_address in nulladdr or admin_email_address in nulladdr): return #text = 'Dear '+ username + ':\n' + ' Your account in docklet has been activated' text = '<html><h4>Dear '+ 'admin' + ':</h4>' text += '''<p> An activating request for %s in <a href='%s'>%s</a> has been sent</p> <p> Please check it !</p> <br/><br/> <p> Docklet Team, SEI, PKU</p> ''' % (username, env.getenv("PORTAL_URL"), env.getenv("PORTAL_URL")) text += '<p>'+ str(datetime.utcnow()) + '</p>' text += '</html>' subject = 'An activating request in Docklet has been sent' if admin_email_address[0] == '"': admins_addr = admin_email_address[1:-1].split(" ") else: admins_addr = admin_email_address.split(" ") alladdr="" for addr in admins_addr: alladdr = alladdr+addr+", " alladdr=alladdr[:-2] msg = MIMEMultipart() textmsg = MIMEText(text,'html','utf-8') msg['Subject'] = Header(subject, 'utf-8') msg['From'] = email_from_address msg['To'] = alladdr msg.attach(textmsg) s = smtplib.SMTP() s.connect() try: s.sendmail(email_from_address, admins_addr, msg.as_string()) except Exception as e: logger.error(e) s.close()
def __init__(self, title, content=''): self.title = title self.content = content self.create_date = datetime.utcnow() self.status = 'open'
def check_competition_active(): """ Is the competition currently running """ settings = api.config.get_settings() return settings["start_time"].timestamp() < datetime.utcnow().timestamp() < settings["end_time"].timestamp()
def get_time(): return WebSuccess(data=int(datetime.utcnow().timestamp()))
def gdq(bot, trigger): now = datetime.utcnow() now = now.replace(tzinfo=timezone.utc) delta = datetime(2018,1,7,16,30,tzinfo=timezone.utc) - now textdate = "January 7" url = 'https://gamesdonequick.com/schedule' try: x = requests.get(url).content except: return bot.say("GDQ is {0} days away ({1})".format(delta.days,textdate)) bs = BeautifulSoup(x) try: run = bs.find("table",{"id":"runTable"}).tbody except: return bot.say("GDQ is {0} days away ({1})".format(delta.days, textdate)) try: gdqstart = datetime.strptime(run.td.getText(), '%Y-%m-%dT%H:%M:%SZ') gdqstart = gdqstart.replace(tzinfo=timezone.utc) except: return bot.say("GDQ is {0} days away ({1})".format(delta.days, textdate)) (game, runner, console, comment, eta, nextgame, nextrunner, nexteta, nextconsole, nextcomment) = getinfo(run,now) if not nextgame: return bot.say("GDQ is {0} days away ({1})".format(delta.days,textdate)) if now < gdqstart: tts = gdqstart - now if tts.days <= 3: return bot.say("GDQ is {0}H{1}M away. First game: {2} by {3} ETA: {4} Comment: {5} | https://gamesdonequick.com/schedule".format(int(tts.total_seconds() // 3600),int((tts.total_seconds() % 3600) // 60), nextgame, nextrunner, nexteta, nextcomment)) else: return bot.say("GDQ is {0} days away ({1}) | https://gamesdonequick.com/schedule".format(tts.days,gdqstart.strftime('%m/%d/%Y'))) if nextgame == 'done': return bot.say("GDQ is {0} days away ({1} [estimated])".format(delta.days,textdate)) if game: if comment: bot.say("Current Game: {0} by {1} ETA: {2} Comment: {3} | Next Game: {4} by {5} | http://www.twitch.tv/gamesdonequick | https://gamesdonequick.com/schedule".format(game, runner, eta, comment, nextgame, nextrunner)) else: bot.say("Current Game: {0} by {1} ETA: {2} | Next Game: {3} by {4} | http://www.twitch.tv/gamesdonequick | https://gamesdonequick.com/schedule".format(game, runner, eta, nextgame, nextrunner)) else: bot.say("Current Game: setup?? | Next Game {0} by {1} | http://www.twitch.tv/gamesdonequick | https://gamesdonequick.com/schedule".format(nextgame, nextrunner))
def get_time(time, compare_time): if not time or not time.strip(): return datetime.utcnow() else: time = parse_time(time) if time: if time == compare_time: return None elif time > compare_time: return time if (time - compare_time > ONE_SECOND) else None else: return time if (compare_time - time > ONE_SECOND) else None else: return None
def iso_time_now(): return datetime.utcnow().strftime(ISO_TIME_FORMAT)
def extract_post_time_and_name(item): post_data = {} title_data_raw = item.find("h2") raw_time = title_data_raw.find("span").text raw_time = clean_raw_time(raw_time) time_delta = string_time_to_timedelta_dict(raw_time) post_data["created"] = (datetime.utcnow() - relativedelta(**time_delta)).isoformat() h = title_data_raw.find(href=True) post_data["name"], post_data["url"] = extract_href_name_and_url(h) return post_data
def export_watchlists(cb, args): exported_watchlists = [] if args.watchlists: watchlists_to_export = args.watchlists.split(",") else: watchlists_to_export = [] for watchlist in cb.select(Watchlist): if watchlists_to_export: if watchlist.name not in watchlists_to_export: continue if args.selective: if not confirm(watchlist.name): continue exported_watchlists.append( { "Name": watchlist.name, "URL": watchlist.search_query, "Type": watchlist.index_type, "SearchString": watchlist.query, "Description": "Please fill in if you intend to share this." } ) export = { "Author": args.author or "Fill in author", "ExportDate": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"), "ExportDescription": args.description or "Fill in description", "Watchlists": exported_watchlists, } json.dump(export, open(args.file, "w"), indent=4) print("-> Done exporting! <-")
def remove_expired_responses(self): """ Removes expired responses from storage """ if not self._cache_expire_after: return self.cache.remove_old_entries(datetime.utcnow() - self._cache_expire_after)
def eligible_replays(replays, *, age=None): """Filter replays down to just the replays we want to train with. Parameters ---------- replays : iterable[Replay] The replays to filter. age : datetime.timedelta, optional Only count replays less than this age old. Yields ------ replay : Replay The eligible replays in the directory. Notes ----- The same beatmap may appear more than once if there are multiple replays for this beatmap. """ for replay in replays: if age is not None and datetime.utcnow() - replay.timestamp > age: continue if not (replay.mode != GameMode.standard or replay.failed or replay.autoplay or replay.auto_pilot or replay.cinema or replay.relax or len(replay.beatmap.hit_objects) < 2): # ignore plays with mods that are not representative of user skill yield replay
def str2timestamp(cls, s): d = datetime.utcnow() d0 = datetime(1970, 1, 1) ts = (d - d0).days * 86400 assert len(s) in [8, 12], s ts += int(s[:2]) * 3600 ts += int(s[3:5]) * 60 ts += int(s[6:8]) if len(s) == 12: ts += int(s[9:12]) / 1000. return ts
def date_compare(snap1, snap2): utc = pytz.UTC now = datetime.utcnow().replace(tzinfo=utc) if snap1.get('SnapshotCreateTime', now) < snap2.get('SnapshotCreateTime', now): return -1 elif snap1.get('SnapshotCreateTime', now) == snap2.get('SnapshotCreateTime', now): return 0 return 1
def resolve_snapshot_time(self, resource): now = datetime.utcnow() return resource.get('SnapshotCreateTime', now)
def get_active(cls): query = (Pokemon .select() .where(Pokemon.disappear_time > datetime.utcnow()) .dicts()) pokemons = [] for p in query: p['pokemon_name'] = get_pokemon_name(p['pokemon_id']) pokemons.append(p) return pokemons
def marathon_timestamp(time=datetime.utcnow()): """ Make a Marathon/JodaTime-like timestamp string in ISO8601 format with milliseconds for the current time in UTC. """ return time.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
def IsMarathonEvent(event_type, **kwargs): """ Match a dict (deserialized from JSON) as a Marathon event. Matches the event type and checks for a recent timestamp. :param event_type: The event type ('eventType' field value) :param kwargs: Any other matchers to apply to the dict """ matching_dict = { 'eventType': Equals(event_type), 'timestamp': After(_parse_marathon_event_timestamp, matches_time_or_just_before(datetime.utcnow())) } matching_dict.update(kwargs) return MatchesDict(matching_dict)
def _start_lottery(self, ctx, loadout=None): """Starts a lottery. Use help on lottery start for more info This command defaults to the standard lottery loadout in slot 0. If you wish to use another loadout, specify it when calling the command. Additionally, you may change the default loadout using the [p]setlottery default command. Starting the lottery will apply all parameters set in the creation process. """ author = ctx.message.author settings = self.check_server_settings(author.server) if settings["Config"]["Active"]: return await self.bot.say("I cannot start a new lottery until the current one has " "ended.") if loadout is None: loadout = settings["Config"]["Default Loadout"] if not self.slot_checker(settings, loadout): return await self.bot.say("The load selected or the default loadout ({}) is empty! " "Please pick another loadout, edit the current, or set a new " "default loadout.".format(loadout)) start_params = self.lottery_setup(settings, loadout, author) load_pref = settings["Loadouts"][loadout] lottery_id = str(uuid.uuid4()) settings["Config"]["Lottery ID"] = lottery_id self.save_system() await self.bot.say(load_pref["Start Message"].format_map(Formatter(start_params))) if load_pref["Timer"] > 0: settings["Config"]["Tracker"] = datetime.utcnow().isoformat() self.save_system() await asyncio.sleep(load_pref["Timer"]) if settings["Config"]["Lottery ID"] == lottery_id: end_msg = self.lottery_teardown(settings, load_pref, author.server) await self.bot.say("The lottery is now ending...") await asyncio.sleep(5) await self.bot.say(end_msg)
def _status_lottery(self, ctx): """Check if a lottery is active""" author = ctx.message.author settings = self.check_server_settings(author.server) if settings["Config"]["Active"]: ld = settings["Config"]["Current Loadout"] timer = settings["Loadouts"][ld]["Timer"] if timer == 0: remaining = "no time limit" else: counter = settings["Config"]["Tracker"] seconds = timer - (datetime.utcnow() - parser.parse(counter)).seconds remaining = "{} remaining".format(self.time_formatter(seconds)) winners = settings["Loadouts"][ld]["Winners"] entry_limit = settings["Loadouts"][ld]["Limit"] dos = settings["Loadouts"][ld]["DOS"] role_req = settings["Loadouts"][ld]["Role"] prize = settings["Loadouts"][ld]["Prize"] footer = "There are currently {} users in the lottery.".format(len(settings["Players"])) if author.id in settings["Players"]: desc = "You are currently in the lottery." else: desc = "You have **not** entered into this lottery yet." embed = discord.Embed(title="Loadout {}".format(ld), description=desc, color=0x50bdfe) embed.set_author(name="Lottery System 3.0") embed.add_field(name="Prize", value=prize, inline=True) embed.add_field(name="Possible Winners", value=winners, inline=True) embed.add_field(name="Role", value=role_req, inline=True) embed.add_field(name="Limit", value=entry_limit, inline=True) embed.add_field(name="Time Remaining", value=remaining, inline=True) embed.add_field(name="Days on Server Required", value=dos, inline=True) embed.set_footer(text=footer) await self.bot.say(embed=embed) else: await self.bot.say("There aren't any lotteries running on this server right now.")
def patch_1694(path): """This patch aimed at converting the old cooldown times into unix time.""" for player in path["Players"]: try: for cooldown in path["Players"][player]["Cooldowns"]: s = path["Players"][player]["Cooldowns"][cooldown] convert = datetime.utcnow() - timedelta(seconds=s) path["Players"][player]["Cooldowns"][cooldown] = convert.isoformat() except TypeError: pass
def check_cooldowns(self, user, method, settings, triggered=False, brief=False): user_time = settings["Players"][user.id]["Cooldowns"][method] user_membership = settings["Players"][user.id]["Membership"] try: reduction = settings["Memberships"][user_membership]["Cooldown Reduction"] except KeyError: reduction = 0 # Find the base cooldown by method if method in c_games: base = settings["Games"][method]["Cooldown"] elif method == "Payday": reduction = 0 base = settings["System Config"]["Payday Timer"] else: reduction = 0 base = settings["System Config"]["Transfer Cooldown"] # Begin cooldown logic calculation if user_time == 0: # For new accounts if triggered: settings["Players"][user.id]["Cooldowns"][method] = datetime.utcnow().isoformat() super().save_system() return None elif int((datetime.utcnow() - parser.parse(user_time)).total_seconds()) + reduction < base: diff = int((datetime.utcnow() - parser.parse(user_time)).total_seconds()) seconds = base - diff - reduction if brief: remaining = self.time_format(seconds, True) msg = remaining else: remaining = self.time_format(seconds, False) msg = _("{} is still on a cooldown. You still have: {}").format(method, remaining) return msg else: if triggered: settings["Players"][user.id]["Cooldowns"][method] = datetime.utcnow().isoformat() super().save_system() return None
def _ensure_cfg_structure(self): self.cfg.setdefault('save_stamp', datetime.utcnow()) self.cfg.setdefault('codebase', dict(directory=None, vcs=None)) self.cfg.setdefault('matplotlib', dict(notebook={ 'lines.linewidth': 2.5, 'figure.figsize': (16, 6) })) self.cfg.setdefault('data', dict(frames={})) self.cfg.setdefault('default_data_directory', '.')
def save_cfg(self): self.cfg['save_stamp'] = datetime.utcnow() with open(self.cfg_yaml, 'w') as yaml_file: yaml.dump(self.cfg, yaml_file, default_flow_style=False)
def LOGDEBUG(message): if LOGGER: LOGGER.debug(message) elif DEBUG: print('%sZ [DBUG]: %s' % (datetime.utcnow().isoformat(), message))
def LOGINFO(message): if LOGGER: LOGGER.info(message) else: print('%sZ [INFO]: %s' % (datetime.utcnow().isoformat(), message))
def LOGERROR(message): if LOGGER: LOGGER.error(message) else: print('%sZ [ERR!]: %s' % (datetime.utcnow().isoformat(), message))