我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用datetime.datetime.timedelta()。
def get_lookback(lookback_length): """Take a string from the user and return a datetime.timedelta object.""" time_units = {'weeks': 0, 'days': 0, 'hours': 0} time_shortunits = [x[0] for x in time_units.keys()] lookback_unit = lookback_length[-1] lookback_value = lookback_length[:-1] try: lookback_unit = next(i for i in list(time_units) if i.startswith(lookback_unit)) except StopIteration: panic("hs_history_lookback time unit {lookback_unit} is not " "suppported. Please choose one of: " "{time_shortunits}.".format(**locals())) try: time_units[lookback_unit] = int(lookback_value) except ValueError as exc: panic("hs_history_lookback should be an integer value followed by a " "single time unit element from {time_shortunits}.\n" "ValueError: {exc}".format(**locals())) lookback_timedelta = timedelta(weeks=time_units['weeks'], days=time_units['days'], hours=time_units['hours']) return lookback_timedelta
def getMaxBuildAge(channel, version_overall = False): import datetime if channel == 'release': return datetime.timedelta(weeks=12) elif channel == 'beta': return datetime.timedelta(weeks=4) elif channel == 'aurora': if version_overall: return datetime.timedelta(weeks=9) else: return datetime.timedelta(weeks=2) elif channel == 'nightly': if version_overall: return datetime.timedelta(weeks=9) else: return datetime.timedelta(weeks=1) else: return datetime.timedelta(days=365); # almost forever
def from_frontend_value(key, value): """Returns a `SiteConfiguration` object value for the relevant `key` and JSON-serializable `value`, applying any transformation reversed by to_frontend_value.""" if key == NICETIES_OPEN: from datetime import timedelta return timedelta(days=value) elif key == CLOSING_TIME: from datetime import datetime return datetime.strptime(value, '%H:%M').time() elif key == CLOSING_BUFFER: from datetime import timedelta return timedelta(minutes=value) elif key == CACHE_TIMEOUT: from datetime import timedelta return timedelta(seconds=value) elif key == INCLUDE_FACULTY: return value elif key == INCLUDE_RESIDENTS: return value else: raise ValueError('No such config key!')
def format_time(x): """Formats date values This function formats :class:`datetime.datetime` and :class:`datetime.timedelta` objects (and the corresponding numpy objects) using the :func:`xarray.core.formatting.format_timestamp` and the :func:`xarray.core.formatting.format_timedelta` functions. Parameters ---------- x: object The value to format. If not a time object, the value is returned Returns ------- str or `x` Either the formatted time object or the initial `x`""" if isinstance(x, (datetime64, datetime)): return format_timestamp(x) elif isinstance(x, (timedelta64, timedelta)): return format_timedelta(x) elif isinstance(x, ndarray): return list(x) if x.ndim else x[()] return x
def __init__(self, description, default=relativedelta()): """ default: may either be provided as a: * datetime.date object * string in ISO format (YYYY-mm-dd) * datetime.timedelta object. The date will be current - delta * dateutil.relativedelta object. The date will be current - delta If not specified, it will be the current date. Note that dateutil is not in the Python standard library. It provides a simpler API to specify a duration in days, weeks, months, etc. You can install it with pip. """ self.description = description self.default = to_date_model(default) # see Bootstrap date picker docs for options # https://bootstrap-datepicker.readthedocs.io/en/stable/# self.attributes = { 'data-date-format': 'yyyy-mm-dd', 'data-date-orientation': 'left bottom', 'data-date-autoclose': 'true', 'value': self.default.iso(), }
def update_job_statuses(db, jobs): # job_names_param = ",".join("gasp_" + x.id for x in jobs) p = subprocess.Popen(["/usr/cluster/bin/sacct", "-u", pwd.getpwuid(os.getuid())[0], "--format", "jobid,state,exitcode,jobname", "--noheader", "-P", "-S", (datetime.date.today() - datetime.timedelta(days=30)).strftime("%Y-%m-%d")], stdout=subprocess.PIPE, stderr=subprocess.PIPE) squeue_out, squeue_err = p.communicate() fake_data = """29646434 PENDING 0 29646435 COMPLETED 0 """ # only keep last record for jobs that were re-run slurm_jobs_found = dict() for line in squeue_out.rstrip().split("\n"): if line: slurm_job = line.strip().split("|") # strip off "gasp_" slurm_jobs_found[slurm_job[3][5:]] = slurm_job for slurm_job in slurm_jobs_found.values(): for j in jobs: if slurm_job[3][5:] == j.id: Tracker.update_job_status(db, j, slurm_job[1], slurm_job[2]) break
def schedule2time(schedule): times = [] start = now = datetime.utcnow() delta_prev = timedelta(0) for s in schedule: name = '{delta}/{period}'.format(**s) delta = parse_period(s['delta']) period = parse_period(s['period']) # note2: we are counting backward end = start start = now - period times.append({"name": name, "start": start, "end": end, "period": period, "delta": delta}) delta_prev = delta return times # find the names of existing machines
def stop(self): """ To be called when the job is completed. Can be called multiple times, will only be applied once. Returns ------- state : EStates status : EStatuses duration : datetime.timedelta Duration of the job """ self._set_duration() if self.state is not AJob.EStates.COMPLETED: self.status = AJob.EStatuses.FAILURE if self.has_failed() else AJob.EStatuses.SUCCESS self.progress(AJob.EStates.COMPLETED) # This will also save the job logger.debug( "{} terminated in {}s with status '{}'".format(self, self.duration, self.status.label)) return self.state, self.status, self.duration
def __init__(self): self.ipo=ts.new_stocks() #print ipo.info() #???? self.ipo['ipo_date']=self.ipo['ipo_date'].astype('datetime64') #print ipo.info() self.start=self.ipo['ipo_date'].values[-1] self.end=self.ipo['ipo_date'].values[0] print type(self.end) #???? #ipo['ipo_date']=ipo['ipo_date'].astype('datetime64') #self.start_d=datetime.datetime.strptime(self.start,'%Y-%m-%d') #self.end_d=datetime.datetime.strptime(self.end,'%Y-%m-%d') #print type(self.start_d) #period=self.start_d+datetime.timedelta(days=30) #print period.strftime('%Y-%m-%d') #print ipo[ipo['ipo_date']<np.datetime64(period)]
def add_timedelta(self, delta): """ Add timedelta duration to the instance. :param delta: The timedelta instance :type delta: datetime.timedelta :rtype: Time """ if delta.days: raise TypeError('Cannot timedelta with days to Time.') return self.add( seconds=delta.seconds, microseconds=delta.microseconds )
def subtract_timedelta(self, delta): """ Remove timedelta duration from the instance. :param delta: The timedelta instance :type delta: datetime.timedelta :rtype: Time """ if delta.days: raise TypeError('Cannot timedelta with days to Time.') return self.subtract( seconds=delta.seconds, microseconds=delta.microseconds )
def test_class_ops_dateutil(self): tm._skip_if_no_dateutil() from dateutil.tz import tzutc def compare(x, y): self.assertEqual(int(np.round(Timestamp(x).value / 1e9)), int(np.round(Timestamp(y).value / 1e9))) compare(Timestamp.now(), datetime.now()) compare(Timestamp.now('UTC'), datetime.now(tzutc())) compare(Timestamp.utcnow(), datetime.utcnow()) compare(Timestamp.today(), datetime.today()) current_time = calendar.timegm(datetime.now().utctimetuple()) compare(Timestamp.utcfromtimestamp(current_time), datetime.utcfromtimestamp(current_time)) compare(Timestamp.fromtimestamp(current_time), datetime.fromtimestamp(current_time)) date_component = datetime.utcnow() time_component = (date_component + timedelta(minutes=10)).time() compare(Timestamp.combine(date_component, time_component), datetime.combine(date_component, time_component))
def test_date_range_normalize(self): snap = datetime.today() n = 50 rng = date_range(snap, periods=n, normalize=False, freq='2D') offset = timedelta(2) values = np.array([snap + i * offset for i in range(n)], dtype='M8[ns]') self.assert_numpy_array_equal(rng, values) rng = date_range('1/1/2000 08:15', periods=n, normalize=False, freq='B') the_time = time(8, 15) for val in rng: self.assertEqual(val.time(), the_time)
def test_timedelta(self): # this is valid too index = date_range('1/1/2000', periods=50, freq='B') shifted = index + timedelta(1) back = shifted + timedelta(-1) self.assertTrue(tm.equalContents(index, back)) self.assertEqual(shifted.freq, index.freq) self.assertEqual(shifted.freq, back.freq) result = index - timedelta(1) expected = index + timedelta(-1) self.assertTrue(result.equals(expected)) # GH4134, buggy with timedeltas rng = date_range('2013', '2014') s = Series(rng) result1 = rng - pd.offsets.Hour(1) result2 = DatetimeIndex(s - np.timedelta64(100000000)) result3 = rng - np.timedelta64(100000000) result4 = DatetimeIndex(s - pd.offsets.Hour(1)) self.assertTrue(result1.equals(result4)) self.assertTrue(result2.equals(result3))
def testReadTimeout(self): """Test if read timeouts work and group_read operations group_read operations should never block """ tunnel = KNXIPTunnel(gwip) tunnel.connect() # Read from some random address and hope nothing responds here tick = datetime.now() res = tunnel.group_read(37000, timeout=1) tock = datetime.now() diff = tock - tick # the result is a datetime.timedelta object self.assertTrue(diff.total_seconds() >= 1 and diff.total_seconds() < 3) self.assertIsNone(res) # Read from some random address and hope nothing responds here tick = datetime.now() res = tunnel.group_read(37000, timeout=5) tock = datetime.now() diff = tock - tick # the result is a datetime.timedelta object self.assertTrue(diff.total_seconds() >= 5 and diff.total_seconds() < 6) self.assertIsNone(res) tunnel.disconnect()
def test_lookup_symbol(self): # Incrementing by two so that start and end dates for each # generated Asset don't overlap (each Asset's end_date is the # day after its start date.) dates = pd.date_range('2013-01-01', freq='2D', periods=5, tz='UTC') df = pd.DataFrame.from_records( [ { 'sid': i, 'symbol': 'existing', 'start_date': date.value, 'end_date': (date + timedelta(days=1)).value, 'exchange': 'NYSE', } for i, date in enumerate(dates) ] ) self.env.write_data(equities_df=df) finder = self.asset_finder_type(self.env.engine) for _ in range(2): # Run checks twice to test for caching bugs. with self.assertRaises(SymbolNotFound): finder.lookup_symbol('NON_EXISTING', dates[0]) with self.assertRaises(MultipleSymbolsFound): finder.lookup_symbol('EXISTING', None) for i, date in enumerate(dates): # Verify that we correctly resolve multiple symbols using # the supplied date result = finder.lookup_symbol('EXISTING', date) self.assertEqual(result.symbol, 'EXISTING') self.assertEqual(result.sid, i)
def test_offset(self): """ Test the offset method of FutureChain. """ cl = FutureChain(self.asset_finder, lambda: '2005-12-01', 'CL') # Test that an offset forward sets as_of_date as expected self.assertEqual( cl.offset('3 days').as_of_date, cl.as_of_date + pd.Timedelta(days=3) ) # Test that an offset backward sets as_of_date as expected, with # time delta given as str, datetime.timedelta, and pd.Timedelta. self.assertEqual( cl.offset('-1000 days').as_of_date, cl.as_of_date + pd.Timedelta(days=-1000) ) self.assertEqual( cl.offset(timedelta(days=-1000)).as_of_date, cl.as_of_date + pd.Timedelta(days=-1000) ) self.assertEqual( cl.offset(pd.Timedelta('-1000 days')).as_of_date, cl.as_of_date + pd.Timedelta(days=-1000) ) # An offset of zero should give the original chain. self.assertEqual(cl[0], cl.offset(0)[0]) self.assertEqual(cl[0], cl.offset("0 days")[0]) # A string that doesn't represent a time delta should raise a # ValueError. with self.assertRaises(ValueError): cl.offset("blah")
def eligible_replays(replays, *, age=None): """Filter replays down to just the replays we want to train with. Parameters ---------- replays : iterable[Replay] The replays to filter. age : datetime.timedelta, optional Only count replays less than this age old. Yields ------ replay : Replay The eligible replays in the directory. Notes ----- The same beatmap may appear more than once if there are multiple replays for this beatmap. """ for replay in replays: if age is not None and datetime.utcnow() - replay.timestamp > age: continue if not (replay.mode != GameMode.standard or replay.failed or replay.autoplay or replay.auto_pilot or replay.cinema or replay.relax or len(replay.beatmap.hit_objects) < 2): # ignore plays with mods that are not representative of user skill yield replay
def utcoffset(self, dt): """ Offset from UTC. >>> UTC.utcoffset(None) datetime.timedelta(0) """ return ZERO
def dst(self, dt): """ DST is not in effect. >>> UTC.dst(None) datetime.timedelta(0) """ return ZERO
def __init__(self, description, timeuuid, source, source_elapsed, thread_name): self.description = description self.datetime = datetime.utcfromtimestamp(unix_time_from_uuid1(timeuuid)) self.source = source if source_elapsed is not None: self.source_elapsed = timedelta(microseconds=source_elapsed) else: self.source_elapsed = None self.thread_name = thread_name
def info(self, ctx): """Information about the bot""" msg = await ctx.send('Getting statistics...') shards = self.bot.shard_count shard_id = ctx.message.guild.shard_id guilds = len(list(self.bot.guilds)) users = str(len([m for m in set(self.bot.get_all_members())])) channels = str(len([m for m in set(self.bot.get_all_channels())])) # await msg.edit("Getting uptime...") up = abs(self.bot.uptime - int(time.perf_counter())) up = str(datetime.timedelta(seconds=up)) data = discord.Embed(title="__**Information**__", colour=discord.Colour(value=11735575)) data.add_field(name="Version", value="2.5-beta", inline=False) data.add_field(name="Shard ID", value=ctx.message.guild.shard_id) data.add_field(name="Total Shards", value=shards) data.add_field(name="Total Servers", value=guilds) # data.add_field(name="Servers (total)", value=total_guilds) data.add_field(name="Users", value=users) data.add_field(name="Channels", value=channels) data.add_field(name="Uptime", value="{}".format(up)) data.add_field(name="Support Development", value="Donate on [Patreon](https://www.patreon.com/franc_ist) or [PayPal](https://paypal.me/MLAutomod/5)") data.set_footer( text="Made with \U00002665 by Francis#6565. Support server: https://discord.gg/yp8WpMh") try: await msg.edit(content=None, embed=data) statsd.increment('bot.commands.run', 1) except discord.HTTPException: logger.exception("Missing embed links perms") statsd.increment('bot.commands.errored', 1) await ctx.send("Looks like the bot doesn't have embed links perms... It kinda needs these, so I'd suggest adding them!")
def uptime(self, ctx): """Shows the bot's uptime""" up = abs(self.bot.uptime - int(time.perf_counter())) up = str(datetime.timedelta(seconds=up)) await ctx.send("`Uptime: {}`".format(up)) statsd.increment('bot.commands.run', 1)
def get_cur_runtime(self): """Returns the how long the test case has been running as a datetime.timedelta object.""" return datetime.now() - self.test_start_time # Sorter
def total_seconds(td): """ Given a timedelta (*td*) return an integer representing the equivalent of Python 2.7's :meth:`datetime.timdelta.total_seconds`. """ return ((( td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6)) # NOTE: This is something I'm investigating as a way to use the new go_async # module. A work-in-progress. Ignore for now...
def timeout(self): """ A `property` that controls how long a key will last before being automatically removed. May be be given as a `datetime.timedelta` object or a string like, "1d", "30s" (will be passed through the `convert_to_timedelta` function). """ if not hasattr(self, "_timeout"): self._timeout = timedelta(hours=1) # Default is 1-hour timeout return self._timeout
def interval(self): """ A `property` that controls how often we check for expired keys. May be given as milliseconds (integer), a `datetime.timedelta` object, or a string like, "1d", "30s" (will be passed through the `convert_to_timedelta` function). """ if not hasattr(self, "_interval"): self._interval = 10000 # Default is every 10 seconds return self._interval
def interval(self, value): if isinstance(value, basestring): value = convert_to_timedelta(value) if isinstance(value, timedelta): value = total_seconds(value) * 1000 # PeriodicCallback uses ms self._interval = value # Restart the PeriodicCallback if hasattr(self, '_key_watcher'): self._key_watcher.stop() self._key_watcher = PeriodicCallback( self._timeout_checker, value, io_loop=self.io_loop)
def memorized_timedelta(seconds): '''Create only one instance of each distinct timedelta''' try: return _timedelta_cache[seconds] except KeyError: delta = timedelta(seconds=seconds) _timedelta_cache[seconds] = delta return delta
def memorized_datetime(seconds): '''Create only one instance of each distinct datetime''' try: return _datetime_cache[seconds] except KeyError: # NB. We can't just do datetime.utcfromtimestamp(seconds) as this # fails with negative values under Windows (Bug #90096) dt = _epoch + timedelta(seconds=seconds) _datetime_cache[seconds] = dt return dt
def _to_seconds(td): '''Convert a timedelta to seconds''' return td.seconds + td.days * 24 * 60 * 60
def utcoffset(self, dt, is_dst=None): '''See datetime.tzinfo.utcoffset The is_dst parameter may be used to remove ambiguity during DST transitions. >>> from pytz import timezone >>> tz = timezone('America/St_Johns') >>> ambiguous = datetime(2009, 10, 31, 23, 30) >>> tz.utcoffset(ambiguous, is_dst=False) datetime.timedelta(-1, 73800) >>> tz.utcoffset(ambiguous, is_dst=True) datetime.timedelta(-1, 77400) >>> try: ... tz.utcoffset(ambiguous) ... except AmbiguousTimeError: ... print('Ambiguous') Ambiguous ''' if dt is None: return None elif dt.tzinfo is not self: dt = self.localize(dt, is_dst) return dt.tzinfo._utcoffset else: return self._utcoffset
def dayList(backlog_days, forced_dates = None): from datetime import datetime, timedelta import re forced_dates = forced_dates or [] days_to_analyze = []; for daysback in xrange(backlog_days): days_to_analyze.append(beforeTodayString(days=daysback)) for anaday in forced_dates: if re.match(r"\d+-\d+-\d+", anaday) and anaday not in days_to_analyze: days_to_analyze.append(anaday) days_to_analyze.sort() return days_to_analyze
def beforeTodayString(days = 0, weeks = 0): from datetime import datetime, timedelta return (datetime.utcnow() - timedelta(days=days, weeks=weeks)).strftime('%Y-%m-%d')
def dayStringAdd(anaday, days = 0, weeks = 0): from datetime import datetime, timedelta return (datetime.strptime(anaday, '%Y-%m-%d') + timedelta(days=days, weeks=weeks)).strftime('%Y-%m-%d')
def startScheduler(): db.create_all() #create default roles! if not db.session.query(models.Role).filter(models.Role.name == "admin").first(): admin_role = models.Role(name='admin', description='Administrator Role') user_role = models.Role(name='user', description='User Role') db.session.add(admin_role) db.session.add(user_role) db.session.commit() try: import tzlocal tz = tzlocal.get_localzone() logger.info("local timezone: %s" % tz) except: tz = None if not tz or tz.zone == "local": logger.error('Local timezone name could not be determined. Scheduler will display times in UTC for any log' 'messages. To resolve this set up /etc/timezone with correct time zone name.') tz = pytz.utc #in debug mode this is executed twice :( #DONT run flask in auto reload mode when testing this! scheduler = BackgroundScheduler(logger=sched_logger, timezone=tz) scheduler.add_job(notify.task, 'interval', seconds=config.SCAN_INTERVAL, max_instances=1, start_date=datetime.datetime.now(tz) + datetime.timedelta(seconds=2)) scheduler.start() sched = scheduler #notify.task()
def every(self, get_coro_or_fut, interval: _timedelta_cls, start_at: datetime, count=float('inf'), loop=None): """ executes the given job at (start_at, start_at + interval, start_at + 2 * interval, ....) :param get_coro_or_fut: a callable which returns a co-routine or a future :param interval: a datetime.timedelta object denoting the interval between consecutive job excutions :param start_at: a datetime.datetime object denoting the start time to start this schedule :param count: the number of time to execute this job, by default the job will be executed continously :param loop: event loop if the given future by get_coro_or_fut is hooked up with a custom event loop :return: JobSchedule object so that the user can control the schedule based on it's future """ if interval.total_seconds() < 0: raise AScheduleException("error: invalid interval ({} seconds).".format( interval.total_seconds())) if count <= 0: raise AScheduleException("error: invalid count({}).".format(count)) diff = start_at - datetime.now() if round(diff.total_seconds()) < 0: start_at += interval * ceil((-diff) / interval) intervals = itertools.chain( iter([round((start_at - datetime.now()).total_seconds())]), map(lambda x: x[1], itertools.takewhile( lambda x: x[0] < (count - 1), enumerate(itertools.repeat(round(interval.total_seconds())))))) schedule = JobSchedule(get_coro_or_fut, intervals, loop=loop) self.add_schedule(schedule) return schedule
def every_random_interval(job, interval: timedelta, loop=None): """ executes the job randomly once in the specified interval. example: run a job every day at random time run a job every hour at random time :param job: a callable(co-routine function) which returns a co-routine or a future or an awaitable :param interval: the interval can also be given in the format of datetime.timedelta, then seconds, minutes, hours, days, weeks parameters are ignored. :param loop: io loop if the provided job is a custom future linked up with a different event loop. :return: schedule object, so it could be cancelled at will of the user by aschedule.cancel(schedule) """ if loop is None: loop = asyncio.get_event_loop() start = loop.time() def wait_time_gen(): count = 0 while True: rand = random.randrange(round(interval.total_seconds())) tmp = round(start + interval.total_seconds() * count + rand - loop.time()) yield tmp count += 1 schedule = JobSchedule(job, wait_time_gen(), loop=loop) # add it to default_schedule_manager, so that user can aschedule.cancel it default_schedule_manager.add_schedule(schedule) return schedule
def every_day(job, loop=None): return every(job, timedelta=timedelta(days=1), loop=loop)
def every_week(job, loop=None): return every(job, timedelta=timedelta(days=7), loop=loop)
def _every_weekday(job, weekday, loop=None): return every(job, timedelta=timedelta(days=7), start_at=_nearest_weekday(weekday), loop=loop)
def set_to_default(): import datetime set(NICETIES_OPEN, datetime.timedelta(days=14)) set(CLOSING_TIME, datetime.time(18, 0)) set(CLOSING_BUFFER, datetime.timedelta(minutes=30)) set(CACHE_TIMEOUT, datetime.timedelta(days=7)) set(INCLUDE_FACULTY, False) set(INCLUDE_RESIDENTS, False) db.session.commit()
def __init__(self, offset=0): self.ZERO = datetime.timedelta(hours=offset)
def _total_seconds(td): # Python 2.6 doesn't have a total_seconds() method on timedelta objects return ((td.seconds + td.days * 86400) * 1000000 + td.microseconds) // 1000000
def to_date_model(date): raise ValueError("given date must be: a) ISO format string, b) datetime.date object, c) datetime.timedelta object, or d) dateutil.relativedelta object") # date obj converts to absolute date