我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用datetime.timezone.utc()。
def to_utc_timestamp(date_time): """Convert a naive or timezone-aware datetime to UTC timestamp. Arguments: date_time (:py:class:`datetime.datetime`): The datetime to convert. Returns: :py:class:`int`: The timestamp (in seconds). """ if date_time is None: return if date_time.tzname() is None: timestamp = date_time.replace(tzinfo=timezone.utc).timestamp() else: timestamp = date_time.timestamp() return int(round(timestamp, 0))
def occurred(at_): """Calculate when a service event occurred. Arguments: at_ (:py:class:`str`): When the event occurred. Returns: :py:class:`str`: The humanized occurrence time. """ try: occurred_at = parse(at_) except (TypeError, ValueError): logger.warning('failed to parse occurrence time %r', at_) return 'time not available' utc_now = datetime.now(tz=timezone.utc) try: return naturaltime((utc_now - occurred_at).total_seconds()) except TypeError: # at_ is a naive datetime return naturaltime((datetime.now() - occurred_at).total_seconds())
def calculate_timeout(http_date): """Extract request timeout from e.g. ``Retry-After`` header. Note: Per :rfc:`2616#section-14.37`, the ``Retry-After`` header can be either an integer number of seconds or an HTTP date. This function can handle either. Arguments: http_date (:py:class:`str`): The date to parse. Returns: :py:class:`int`: The timeout, in seconds. """ try: return int(http_date) except ValueError: date_after = parse(http_date) utc_now = datetime.now(tz=timezone.utc) return int((date_after - utc_now).total_seconds())
def reset(self, variantId, buildId, resultHash): self.__variantId = variantId self.__buildId = buildId self.__resultHash = resultHash self.__recipes = None self.__defines = {} u = os.uname() self.__build = { 'sysname' : u.sysname, 'nodename' : u.nodename, 'release' : u.release, 'version' : u.version, 'machine' : u.machine, 'date' : datetime.now(timezone.utc).isoformat(), } self.__env = "" self.__metaEnv = {} self.__scms = [] self.__deps = [] self.__tools = {} self.__sandbox = None self.__id = None
def test_throttling_compute_fine(SETTINGS): SETTINGS.RATE_LIMIT_THRESHOLD = 0 from jenkins_epo.procedures import compute_throttling # Consumed 1/5 calls at 2/3 of the time. now = datetime(2017, 1, 18, 14, 40, tzinfo=timezone.utc) reset = datetime(2017, 1, 18, 15, tzinfo=timezone.utc) remaining = 4000 seconds = compute_throttling( now=now, rate_limit=dict(rate=dict( limit=5000, remaining=remaining, reset=reset.timestamp(), )), ) assert 0 == seconds # Fine !
def convert_time(time): # 2014-11-05T09:00:00Z # yyyy-mm-ddThh:mm:ssZ # 2013-06-12T16:00:00+09:00 if time is None: return None if len(time) < 10: return None year = int(time[:4]) month = int(time[5:7]) date = int(time[8:10]) hour = int(time[11:13]) minute = int(time[14:16]) second = int(time[17:19]) if len(time) > 21: tz = timezone(timedelta(hours=9)) else: tz = timezone.utc dt = datetime.datetime(year, month, date, hour, minute, second, 0, tz) return dt
def event_remaining(dt_start, dt_end): now = datetime.datetime.now(timezone.utc) diff_end = dt_end - now diff_start = now - dt_start if diff_start.total_seconds() < 0: return "Event has not started yet!" elif diff_end.total_seconds() < 0: return "Event has ended!" else: seconds = diff_end.seconds hours = seconds // 3600 seconds -= hours * 3600 minutes = seconds // 60 seconds -= minutes * 60 return "Event ends in {} days, {} hours, {} minutes and {} seconds.".format(diff_end.days, hours, minutes, seconds) ###### Sunshine ######
def header(self): dDc = self.configs.Dc_cell Dc_min, Dc_max = self.configs.Dc_limit header = fits.Header() header["BUNIT"] = (self.configs.unit, "Data unit") header["zmin"] = (self.configs.zmin, "HI simulation minimum redshift") header["zmax"] = (self.configs.zmax, "HI simulation maximum redshift") header["dz"] = (self.configs.dz, "HI simulation redshift step size") header["Dc_min"] = (Dc_min, "[cMpc] comoving distance at zmin") header["Dc_max"] = (Dc_max, "[cMpc] comoving distance at zmax") header["Dc_step"] = (dDc, "[cMpc] comoving distance between slices") header["Lside"] = (self.configs.Lside, "[cMpc] Simulation side length") header["Nside"] = (self.configs.Nside, "Number of cells at each side") header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(), "File creation date") header.add_history(" ".join(sys.argv)) header.extend(self.wcs.to_header(), update=True) return header
def write_slice(self, outfile, data, z, clobber=False): freq = z2freq(z) Dc = cosmo.comoving_distance(z).value # [Mpc] header = fits.Header() header["BUNIT"] = (self.header["BUNIT"], self.header.comments["BUNIT"]) header["Lside"] = (self.header["Lside"], self.header.comments["Lside"]) header["Nside"] = (self.header["Nside"], self.header.comments["Nside"]) header["REDSHIFT"] = (z, "redshift of this slice") header["FREQ"] = (freq, "[MHz] observed HI signal frequency") header["Dc"] = (Dc, "[cMpc] comoving distance") header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(), "File creation date") header.add_history(" ".join(sys.argv)) hdu = fits.PrimaryHDU(data=data, header=header) try: hdu.writeto(outfile, overwrite=clobber) except TypeError: hdu.writeto(outfile, clobber=clobber) logger.info("Wrote slice to file: %s" % outfile)
def header(self): dDc = self.Dc_cell header = fits.Header() header["BUNIT"] = (str(self.unit), "Data unit") header["zmin"] = (self.zmin, "HI simulation minimum redshift") header["zmax"] = (self.zmax, "HI simulation maximum redshift") header["Dc_min"] = (self.Dc_min, "[cMpc] comoving distance at zmin") header["Dc_max"] = (self.Dc_max, "[cMpc] comoving distance at zmax") header["Dc_step"] = (dDc, "[cMpc] comoving distance between slices") header["Lside"] = (self.Lside, "[cMpc] Simulation side length") header["Nside"] = (self.Nside, "Number of cells at each side") header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(), "File creation date") header.add_history(" ".join(sys.argv)) header.extend(self.wcs.to_header(), update=True) return header
def create_game(): """Create a new game.""" form = GameCreateForm(request.form) _set_game_create_choices(form) if form.validate_on_submit(): white = Player.get_by_id(form.white_id.data) black = Player.get_by_id(form.black_id.data) played_at = None if form.played_at.data is not None: played_at = form.played_at.data.astimezone(timezone.utc) game = Game.create( white=white, black=black, winner=form.winner.data, handicap=form.handicap.data, komi=form.komi.data, season=form.season.data, episode=form.episode.data, played_at=played_at ) messenger.notify_slack(_slack_game_msg(game)) return jsonify(game.to_dict()), 201 else: return jsonify(**form.errors), 404
def _slack_game_msg(game): if game.winner is Color.white: msg = '<{w_url}|{w_name}> (W) defeated <{b_url}|{b_name}> (B)' else: msg = '<{b_url}|{b_name}> (B) defeated <{w_url}|{w_name}> (W)' result = (msg + ' at {handicap} stones, {komi}.5 komi at <!date^{date_val}' '^{{time}} on {{date_num}}|{date_string}> ' '(S{season:0>2}E{episode:0>2})') # Gross hack around the fact that we retrieve as naive DateTimes. # See: https://github.com/massgo/league/issues/93 utc_time = int(game.played_at.replace(tzinfo=timezone.utc).timestamp()) return result.format(w_name=game.white.full_name, w_url=url_for('dashboard.get_player', player_id=game.white.id, _external=True), b_name=game.black.full_name, b_url=url_for('dashboard.get_player', player_id=game.black.id, _external=True), handicap=game.handicap, komi=game.komi, date_string=game.played_at, date_val=utc_time, season=game.season, episode=game.episode)
def update_game(): """Update an existing game.""" form = GameUpdateForm(request.form) _set_game_create_choices(form) if form.validate_on_submit(): white = Player.get_by_id(form.white_id.data) black = Player.get_by_id(form.black_id.data) played_at = None if form.played_at.data is not None: played_at = form.played_at.data.astimezone(timezone.utc) game = Game.get_by_id(form.game_id.data) game.update( white=white, black=black, winner=form.winner.data, handicap=form.handicap.data, komi=form.komi.data, season=form.season.data, episode=form.episode.data, played_at=played_at ) return jsonify(game.to_dict()), 200 else: return jsonify(**form.errors), 404
def _step(self, exc=None): """ Wrapper around `Task._step()` to automatically dispatch a `TaskExecState.BEGIN` event. """ if not self._in_progress: self._start = datetime.now(timezone.utc) source = {'task_exec_id': self.uid} if self._template: source['task_template_id'] = self._template.uid if self._workflow: source['workflow_template_id'] = self._workflow.template.uid source['workflow_exec_id'] = self._workflow.uid self._source = EventSource(**source) self._in_progress = True data = { 'type': TaskExecState.BEGIN.value, 'content': self._inputs } self._broker.dispatch( data, topics=workflow_exec_topics(self._source._workflow_exec_id), source=self._source, ) super()._step(exc)
def get_next_event(self): """Access to the next Event in the calendar. Returns: The Event object corresponding to the next event in the calendar or None if there is no event. """ now = datetime.now(timezone.utc) while self.event_list and self.event_list[0].end < now: self.event_list.pop(0) if len(self.event_list) == 0: return None elif self.event_list[0].start > now: return self.event_list[0] elif len(self.event_list) == 1: return None else: return self.event_list[1]
def get_now_event(self): """Access to the current Event in the calendar. Returns: The Event object corresponding to the current event in the calendar or None if there is no event. """ now = datetime.now(timezone.utc) while self.event_list and self.event_list[0].end < now: self.event_list.pop(0) if len(self.event_list) == 0: return None elif self.event_list[0].start < now < self.event_list[0].end: return self.event_list[0] else: return None
def make_new_entry(self, rel_path, id_handler): """ Generates a new entry for the specified path. Note: This will mutate the id_handler! """ # Try to match to an existing book. e_id = id_handler.new_id() abs_path = os.path.join(read_from_config('media_loc').path, rel_path) lmtime = os.path.getmtime(abs_path) added_dt = datetime.utcfromtimestamp(lmtime) last_modified = added_dt.replace(tzinfo=timezone.utc) entry_obj = oh.Entry(id=e_id, path=rel_path, date_added=datetime.now(timezone.utc), last_modified=last_modified, type='Book', table=self.BOOK_TABLE_NAME, data_id=None, hashseed=_rand.randint(0, 2**32)) return entry_obj
def map_external_gallery_data_to_internal(gallery_data: DataDict) -> GalleryData: internal_gallery_data = GalleryData( gallery_data['gid'], token=gallery_data['token'], archiver_key=gallery_data['archiver_key'], title=unescape(gallery_data['title']), title_jpn=unescape(gallery_data['title_jpn']), thumbnail_url=gallery_data['thumb'], category=gallery_data['category'], provider=constants.provider_name, uploader=gallery_data['uploader'], posted=datetime.fromtimestamp(int(gallery_data['posted']), timezone.utc), filecount=gallery_data['filecount'], filesize=gallery_data['filesize'], expunged=gallery_data['expunged'], rating=gallery_data['rating'], tags=translate_tag_list(gallery_data['tags']), ) m = re.search(constants.default_fjord_tags, ",".join(internal_gallery_data.tags)) if m: internal_gallery_data.fjord = True if constants.ex_thumb_url in internal_gallery_data.thumbnail_url: internal_gallery_data.thumbnail_url = internal_gallery_data.thumbnail_url.replace(constants.ex_thumb_url, constants.ge_thumb_url) return internal_gallery_data
def t_date(s): """ TaskWarrior provides times as UTC timestamps in ISO 8601 """ year = int(s[0:4]) month = int(s[4:6]) day = int(s[6:8]) hour = int(s[9:11]) minute = int(s[11:13]) second = int(s[13:15]) # This is UTC time ts = datetime(year, month, day, hour, minute, second) # Convert to local time local_time = ts.replace(tzinfo=timezone.utc).astimezone(tz=None) # Convert to ISO display format, and remove timezone offset iso_format = local_time.isoformat(sep=" ")[:-6] return iso_format # TODO: move to separate module
def convert_between_tz_and_utc(self, tz, utc): dston = self.dston.replace(tzinfo=tz) # Because 1:MM on the day DST ends is taken as being standard time, # there is no spelling in tz for the last hour of daylight time. # For purposes of the test, the last hour of DST is 0:MM, which is # taken as being daylight time (and 1:MM is taken as being standard # time). dstoff = self.dstoff.replace(tzinfo=tz) for delta in (timedelta(weeks=13), DAY, HOUR, timedelta(minutes=1), timedelta(microseconds=1)): self.checkinside(dston, tz, utc, dston, dstoff) for during in dston + delta, dstoff - delta: self.checkinside(during, tz, utc, dston, dstoff) self.checkoutside(dstoff, tz, utc) for outside in dston - delta, dstoff + delta: self.checkoutside(outside, tz, utc)
def test_easy(self): # Despite the name of this test, the endcases are excruciating. self.convert_between_tz_and_utc(Eastern, utc_real) self.convert_between_tz_and_utc(Pacific, utc_real) self.convert_between_tz_and_utc(Eastern, utc_fake) self.convert_between_tz_and_utc(Pacific, utc_fake) # The next is really dancing near the edge. It works because # Pacific and Eastern are far enough apart that their "problem # hours" don't overlap. self.convert_between_tz_and_utc(Eastern, Pacific) self.convert_between_tz_and_utc(Pacific, Eastern) # OTOH, these fail! Don't enable them. The difficulty is that # the edge case tests assume that every hour is representable in # the "utc" class. This is always true for a fixed-offset tzinfo # class (lke utc_real and utc_fake), but not for Eastern or Central. # For these adjacent DST-aware time zones, the range of time offsets # tested ends up creating hours in the one that aren't representable # in the other. For the same reason, we would see failures in the # Eastern vs Pacific tests too if we added 3*HOUR to the list of # offset deltas in convert_between_tz_and_utc(). # # self.convert_between_tz_and_utc(Eastern, Central) # can't work # self.convert_between_tz_and_utc(Central, Eastern) # can't work
def parse_match(self, response): item = MatchInfo() item['id'] = parse_qs(response.xpath('//div[@class="clearfix subnav level-1"]//li//a/@href').extract()[3])['id'][0] item['area'] = response.xpath('//div[@class="clearfix subnav level-1"]//li//a/text()').extract()[1] item['competition'] = response.xpath('//div[@class="clearfix subnav level-1"]//li//a/text()').extract()[2] item['home_team'] = response.xpath('//div[@class="container left"]//a/text()').extract_first() item['away_team'] = response.xpath('//div[@class="container right"]//a/text()').extract_first() item['ht_last5'] = ''.join(response.xpath('//div[@class="container left"]//a/text()').extract()[1:6]) item['at_last5'] = ''.join(response.xpath('//div[@class="container right"]//a/text()').extract()[1:6]) item['datetime'] = datetime.fromtimestamp(int(response.xpath('//div[@class="details clearfix"]/dl/dt[.="Date"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Date"]//span/@data-value').extract_first()), timezone.utc).isoformat(' ') #item['competition'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Competition"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Competition"]/a/text()').extract_first() item['game_week'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Game week"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Game week"]/text()').extract_first() item['kick_off'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Kick-off"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Kick-off"]//span/text()').extract_first() item['venue'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Venue"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Venue"]//a/text()').extract_first() item['updated'] = datetime.utcnow().isoformat(' ') yield item return item #self.log('URL: {}'.format(response.url))
def _decode(self, o): if isinstance(o, dict): if len(o) == 1: if "$escape" in o: return self._decode_escaped(o['$escape']) if "$date" in o: return datetime.fromtimestamp(o["$date"] / 1000.0, timezone.utc) if "$binary" in o: return b64decode(o['$binary']) if len(o) == 2 and "$type" in o and "$value" in o: try: reviver = self.custom_type_hooks[o['$type']] except KeyError: raise UnknownTypeError(o["$type"]) return reviver(o["$value"]) if self.object_pairs_hook is not None: return self.object_pairs_hook((k, self._decode(v)) for k, v in o.items()) return {k: self._decode(v) for k, v in o.items()} if isinstance(o, (list, tuple)): return [self._decode(v) for v in o] return o
def benchmark(): ds = aw_datastore.Datastore(aw_datastore.storages.PeeweeStorage, testing=True) api = aw_server.api.ServerAPI(ds, testing=True) print(api.get_info()) bucket_id = "test-benchmark" try: api.create_bucket(bucket_id, "test", "test", "test") except Exception as e: print(e) print("Benchmarking... this will take 30 seconds") for i in range(120): sleep(0.1) api.heartbeat(bucket_id, Event(timestamp=datetime.now(tz=tz.utc), data={"test": str(int(i))}), pulsetime=0.3)
def _create_heartbeat_events(start=datetime.now(tz=timezone.utc), delta=timedelta(seconds=1)): e1_ts = start e2_ts = e1_ts + delta # Needed since server (or underlying datastore) drops precision up to milliseconds. # Update: Even with millisecond precision it sometimes fails. (tried using `round` and `int`) # Now rounding down to 10ms precision to prevent random failure. # 10ms precision at least seems to work well. # TODO: Figure out why it sometimes fails with millisecond precision. Would probably # be useful to find the microsecond values where it consistently always fails. e1_ts = e1_ts.replace(microsecond=int(e1_ts.microsecond / 10000) * 100) e2_ts = e2_ts.replace(microsecond=int(e2_ts.microsecond / 10000) * 100) e1 = Event(timestamp=e1_ts, data={"label": "test"}) e2 = Event(timestamp=e2_ts, data={"label": "test"}) return e1, e2
def test_midnight_heartbeats(client, bucket): now = datetime.now(tz=timezone.utc) midnight = now.replace(hour=23, minute=50) events = _create_periodic_events(20, start=midnight, delta=timedelta(minutes=1)) label_ring = ["1", "1", "2", "3", "4"] for i, e in enumerate(events): e.data["label"] = label_ring[i % len(label_ring)] client.heartbeat(bucket, e, pulsetime=90) recv_events_merged = client.get_events(bucket, limit=-1) assert len(recv_events_merged) == 4 / 5 * len(events) recv_events_after_midnight = client.get_events(bucket, start=midnight + timedelta(minutes=10)) pprint(recv_events_after_midnight) assert len(recv_events_after_midnight) == int(len(recv_events_merged) / 2)
def test_astimezone(self): d = Pendulum(2015, 1, 15, 18, 15, 34) now = Pendulum(2015, 1, 15, 18, 15, 34) self.assertEqual('UTC', d.timezone_name) self.assertPendulum(d, now.year, now.month, now.day, now.hour, now.minute) d = d.astimezone('Europe/Paris') self.assertEqual('Europe/Paris', d.timezone_name) self.assertPendulum(d, now.year, now.month, now.day, now.hour + 1, now.minute) if sys.version_info >= (3, 2): d = d.astimezone(timezone.utc) self.assertEqual('+00:00', d.timezone_name) self.assertPendulum(d, now.year, now.month, now.day, now.hour, now.minute) d = d.astimezone(timezone(timedelta(hours=-8))) self.assertEqual('-08:00', d.timezone_name) self.assertPendulum(d, now.year, now.month, now.day, now.hour - 8, now.minute)
def parse_tweets(raw_tweets, source, now=None): """ Parses a list of raw tweet lines from a twtxt file and returns a list of :class:`Tweet` objects. :param list raw_tweets: list of raw tweet lines :param Source source: the source of the given tweets :param Datetime now: the current datetime :returns: a list of parsed tweets :class:`Tweet` objects :rtype: list """ if now is None: now = datetime.now(timezone.utc) tweets = [] for line in raw_tweets: try: tweet = parse_tweet(line, source, now) except (ValueError, OverflowError) as e: logger.debug("{0} - {1}".format(source.url, e)) else: tweets.append(tweet) return tweets
def parse_tweet(raw_tweet, source, now=None): """ Parses a single raw tweet line from a twtxt file and returns a :class:`Tweet` object. :param str raw_tweet: a single raw tweet line :param Source source: the source of the given tweet :param Datetime now: the current datetime :returns: the parsed tweet :rtype: Tweet """ if now is None: now = datetime.now(timezone.utc) raw_created_at, text = raw_tweet.split("\t", 1) created_at = parse_iso8601(raw_created_at) if created_at > now: raise ValueError("Tweet is from the future") return Tweet(click.unstyle(text.strip()), created_at, source)
def set_time_range(self, time_range): # allow a single time to be passed in place of a range if type(time_range) not in [tuple, list]: time_range = (time_range, time_range) # translate the times to unix timestamps def parse_time(time): if type(time) in [int, float, str]: time = int(time) # realistic timestamp range if 10**8 < time < 10**13: return time # otherwise archive.org timestamp format (possibly truncated) time_string = str(time)[::-1].zfill(14)[::-1] time = datetime.strptime(time_string, self.timestamp_format) time = time.replace(tzinfo=timezone.utc) return time.timestamp() self.time_range = [parse_time(time) for time in time_range]
def create_service_principal(options, sub_config): credentials = ServicePrincipalCredentials( tenant=options['tenant_id'], client_id=options['script_service_principal_client_id'], secret=options['script_service_principal_secret'], resource='https://graph.windows.net' ) rbac_client = GraphRbacManagementClient( credentials, tenant_id=options['tenant_id']) # Create Service Principal current_time = datetime.now(timezone.utc) key = { 'start_date': current_time.isoformat(), 'end_date': current_time.replace(year=current_time.year + 3).isoformat(), 'key_id': str(uuid.uuid4()), 'value': str(uuid.uuid4()) } sub_config['secret_key'] = key['value'] sub_config['service_principal'] = rbac_client.service_principals.create({ 'app_id': sub_config['application'].app_id, 'account_enabled': True, 'password_credentials': [key] }) return sub_config
def test_model_with_history_creates_changes_on_creation(self): model_data = { 'name': 'Daffy Duck', } response = self.client.post('/animal/', data=json.dumps(model_data), content_type='application/json') self.assertEqual(response.status_code, 200) self.assertEqual(1, Changeset.objects.count()) cs = Changeset.objects.get() self.assertEqual('testuser', cs.user.username) self.assertAlmostEqual(datetime.now(tz=timezone.utc), cs.date, delta=timedelta(seconds=1)) self.assertEqual(5, Change.objects.count()) self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='name', before='null', after='"Daffy Duck"').count()) self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='id', before='null', after=Animal.objects.get().id).count()) self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='caretaker', before='null', after='null').count()) self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='zoo', before='null', after='null').count()) self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='deleted', before='null', after='false').count())
def extract_event_info(ctxt, event, mode='good'): global gbl_channel my_list=[] create_field(my_list, "namespaces", event.metadata.namespace) create_field(my_list, "name", event.metadata.name) create_field(my_list, "labels", event.metadata.labels) create_field(my_list, "status.message", str(event.status.container_statuses[0].state)) create_field(my_list, "status.start_time", event.status.start_time) create_field(my_list, "status.phase", event.status.phase) c = datetime.now(timezone.utc) - event.status.start_time create_field(my_list, "Runing since", str(c)) text=[ { "color" : mode, "pretext" : "Just from information from k8s" + ctxt , "fields" : my_list } ] send_slack_msg(gbl_channel, ":k8s:", text)
def get_volume_from_history(history, candle_size): """ Returns volume for given candle_size :param history: history data :param candle_size: in minutes :return: Calculated volume for given candle_size """ volume = 0.0 epoch_now = int(time.time()) epoch_candle_start = epoch_now - candle_size * 60 pattern = '%Y-%m-%dT%H:%M:%S' for item in history: time_string = item['TimeStamp'].split('.', 1)[0] dt = datetime.datetime.strptime(time_string, pattern) item_epoch = dt.replace(tzinfo=timezone.utc).timestamp() if item_epoch >= epoch_candle_start: quantity = item['Quantity'] volume += quantity return volume
def tinkers_construct_file(tinkers_construct) -> addon.File: """Tinkers construct file.""" return addon.File( id=2338518, mod=tinkers_construct, name='TConstruct-1.10.2-2.5.6b.jar', date=datetime( year=2016, month=10, day=22, hour=15, minute=11, second=19, tzinfo=timezone.utc, ), release=proxy.Release.Release, url='https://addons.cursecdn.com/files/2338/518/TConstruct-1.10.2-2.5.6b.jar', dependencies=[74924], )
def tinkers_update(tinkers_construct) -> addon.File: """Update for tinkers_construct_file.""" return addon.File( id=2353329, mod=tinkers_construct, name='TConstruct-1.10.2-2.6.1.jar', date=datetime( year=2016, month=12, day=7, hour=18, minute=35, second=45, tzinfo=timezone.utc, ), release=proxy.Release.Release, url='https://addons.cursecdn.com/files/2353/329/TConstruct-1.10.2-2.6.1.jar', dependencies=[74924], )
def mantle_file(mantle) -> addon.File: """Mantle (Tinkers dependency) file.""" return addon.File( id=2366244, mod=mantle, name='Mantle-1.10.2-1.1.4.jar', date=datetime( year=2017, month=1, day=9, hour=19, minute=40, second=41, tzinfo=timezone.utc, ), release=proxy.Release.Release, url='https://addons.cursecdn.com/files/2366/244/Mantle-1.10.2-1.1.4.jar', dependencies=[], )
def test_file_init(): """Does the File initialization behaves as expected?""" m = addon.Mod(id=42, name=str(), summary=str()) addon.File( id=42, mod=m, name='test.jar', date=datetime.now(tz=timezone.utc), release=addon.Release.Release, url='https://httpbin.org', ) addon.File( id=43, mod=m, name='test.jar', date=datetime.now(tz=timezone.utc), release=addon.Release.Alpha, url='https://httpbin.org', ) with pytest.raises(TypeError): addon.File( id='43', mod=m, name='test.jar', date=datetime.now(tz=timezone.utc), release=addon.Release.Beta, url=None, ) assert len(responses.calls) == 0
def test_calculate_timeout_http_date(): three_minutes_later = datetime.now(tz=timezone.utc) + timedelta(minutes=3) http_date = '%a, %d %b %Y %H:%M:%S %Z' assert 179 <= Service.calculate_timeout( three_minutes_later.strftime(http_date), ) <= 181
def getinfo(run,now): schedule = run.find_all('tr',attrs={'class':None}) game,runner,console,comment,eta,nextgame,nextrunner,nextconsole,nexteta,nextcomment = '','','','','','','','','','' for item in schedule: group = item.find_all('td') try: group2 = item.find_next_sibling().find_all('td') except: nextgame = False return (game, runner, console, comment, eta, nextgame, nextrunner, nexteta, nextconsole, nextcomment) st = group[0].getText() #estfix = timedelta(hours=-5) starttime = datetime.strptime(st, '%Y-%m-%dT%H:%M:%SZ' ) starttime = starttime.replace(tzinfo=timezone.utc) #starttime = starttime + estfix try: offset = datetime.strptime(group2[0].getText().strip(), "%H:%M:%S") endtime = starttime + timedelta(hours = offset.hour, minutes = offset.minute, seconds=offset.second) except: endtime = datetime(2011,1,1,12,00) if starttime < now and endtime > now: game = group[1].getText() runner = group[2].getText() #console = group[3].getText() comment = group2[1].getText() eta = group2[0].getText().strip() if starttime > now: nextgame = group[1].getText() nextrunner = group[2].getText() #nextconsole = group[3].getText() nexteta = group2[0].getText().strip() nextcomment = group2[1].getText() break else: nextgame = 'done' nextrunner = 'done' return (game, runner, console, comment, eta, nextgame, nextrunner, nexteta, nextconsole, nextcomment)
def gdq(bot, trigger): now = datetime.utcnow() now = now.replace(tzinfo=timezone.utc) delta = datetime(2018,1,7,16,30,tzinfo=timezone.utc) - now textdate = "January 7" url = 'https://gamesdonequick.com/schedule' try: x = requests.get(url).content except: return bot.say("GDQ is {0} days away ({1})".format(delta.days,textdate)) bs = BeautifulSoup(x) try: run = bs.find("table",{"id":"runTable"}).tbody except: return bot.say("GDQ is {0} days away ({1})".format(delta.days, textdate)) try: gdqstart = datetime.strptime(run.td.getText(), '%Y-%m-%dT%H:%M:%SZ') gdqstart = gdqstart.replace(tzinfo=timezone.utc) except: return bot.say("GDQ is {0} days away ({1})".format(delta.days, textdate)) (game, runner, console, comment, eta, nextgame, nextrunner, nexteta, nextconsole, nextcomment) = getinfo(run,now) if not nextgame: return bot.say("GDQ is {0} days away ({1})".format(delta.days,textdate)) if now < gdqstart: tts = gdqstart - now if tts.days <= 3: return bot.say("GDQ is {0}H{1}M away. First game: {2} by {3} ETA: {4} Comment: {5} | https://gamesdonequick.com/schedule".format(int(tts.total_seconds() // 3600),int((tts.total_seconds() % 3600) // 60), nextgame, nextrunner, nexteta, nextcomment)) else: return bot.say("GDQ is {0} days away ({1}) | https://gamesdonequick.com/schedule".format(tts.days,gdqstart.strftime('%m/%d/%Y'))) if nextgame == 'done': return bot.say("GDQ is {0} days away ({1} [estimated])".format(delta.days,textdate)) if game: if comment: bot.say("Current Game: {0} by {1} ETA: {2} Comment: {3} | Next Game: {4} by {5} | http://www.twitch.tv/gamesdonequick | https://gamesdonequick.com/schedule".format(game, runner, eta, comment, nextgame, nextrunner)) else: bot.say("Current Game: {0} by {1} ETA: {2} | Next Game: {3} by {4} | http://www.twitch.tv/gamesdonequick | https://gamesdonequick.com/schedule".format(game, runner, eta, nextgame, nextrunner)) else: bot.say("Current Game: setup?? | Next Game {0} by {1} | http://www.twitch.tv/gamesdonequick | https://gamesdonequick.com/schedule".format(nextgame, nextrunner))
def get_processed_at_dict(): return {"processed_at": datetime.now(timezone.utc).isoformat()}
def read(self): out = {k: None for k in self.FIELDS + self.EXTRA_FIELD} start = datetime.now() self.ser.reset_input_buffer() while not all(out.values()): line = self.readline() if (datetime.now() - start).total_seconds() > self.timeout: break line = re.sub(r'[\x00-\x1F]|\r|\n|\t|\$', "", line) cmd = line.split(',')[0] if cmd not in ['GNGGA', 'GNRMC']: continue try: msg = pynmea2.parse(line) for key in out: if hasattr(msg, key): out[key] = getattr(msg, key) except pynmea2.ParseError as e: print("Parse error:", e) if out['datestamp'] is not None and out['timestamp'] is not None: timestamp = datetime.combine(out['datestamp'], out['timestamp']).replace(tzinfo=timezone.utc) out['timestamp'] = timestamp.isoformat() else: del out['timestamp'] if out[self.FIELDS[-1]] is not None: out[self.FIELDS[-1]] *= self.KNOTS_PER_KMPH if out.get('latitude') is not None and out.get('longitude') is not None: if out['latitude'] != 0.0 and out['longitude'] != 0.0: out['pos'] = { 'type': 'Point', 'coordinates': [out['longitude'], out['latitude']] } del out['latitude'] del out['longitude'] for f in self.EXTRA_FIELD: if f in out: del out[f] return out
def time_date(msg): if msg == b'\xff\xff\xff\xff\xff\xff\xff\xff': return 'err' return datetime( year=msg[5] + 1985, month=msg[3], day=msg[4] // 4 + 1, hour=msg[2], minute=msg[1], second=msg[0] // 4, tzinfo=timezone.utc ).isoformat()
def _wrap_payload(payload_key, payload): now = datetime.now() timestamp = now.replace(tzinfo=timezone.utc).timestamp() wrapper = { 'meta': { 'method': 'fernet', 'timestamp': timestamp, 'timezone': 'utc' }, payload_key: payload } return wrapper