Python datetime.datetime 模块,timedelta() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用datetime.datetime.timedelta()

项目:fingerprint-securedrop    作者:freedomofpress    | 项目源码 | 文件源码
def get_lookback(lookback_length):
    """Take a string from the user and return a datetime.timedelta object."""
    time_units = {'weeks': 0, 'days': 0, 'hours': 0} 
    time_shortunits = [x[0] for x in time_units.keys()]
    lookback_unit = lookback_length[-1]
    lookback_value = lookback_length[:-1]

    try:
        lookback_unit = next(i for i in list(time_units) if
                             i.startswith(lookback_unit))
    except StopIteration:
        panic("hs_history_lookback time unit {lookback_unit} is not "
            "suppported. Please choose one of: "
            "{time_shortunits}.".format(**locals()))
    try:
        time_units[lookback_unit] = int(lookback_value)
    except ValueError as exc:
        panic("hs_history_lookback should be an integer value followed by a "
              "single time unit element from {time_shortunits}.\n"
              "ValueError: {exc}".format(**locals()))

    lookback_timedelta = timedelta(weeks=time_units['weeks'],
                                   days=time_units['days'],
                                   hours=time_units['hours'])
    return lookback_timedelta
项目:magdalena    作者:KaiRo-at    | 项目源码 | 文件源码
def getMaxBuildAge(channel, version_overall = False):
    import datetime
    if channel == 'release':
        return datetime.timedelta(weeks=12)
    elif channel == 'beta':
        return datetime.timedelta(weeks=4)
    elif channel == 'aurora':
        if version_overall:
            return datetime.timedelta(weeks=9)
        else:
            return datetime.timedelta(weeks=2)
    elif channel == 'nightly':
        if version_overall:
            return datetime.timedelta(weeks=9)
        else:
            return datetime.timedelta(weeks=1)
    else:
        return datetime.timedelta(days=365); # almost forever
项目:rc-niceties    作者:mjec    | 项目源码 | 文件源码
def from_frontend_value(key, value):
    """Returns a `SiteConfiguration` object value for the relevant `key` and
    JSON-serializable `value`, applying any transformation reversed by to_frontend_value."""
    if key == NICETIES_OPEN:
        from datetime import timedelta
        return timedelta(days=value)
    elif key == CLOSING_TIME:
        from datetime import datetime
        return datetime.strptime(value, '%H:%M').time()
    elif key == CLOSING_BUFFER:
        from datetime import timedelta
        return timedelta(minutes=value)
    elif key == CACHE_TIMEOUT:
        from datetime import timedelta
        return timedelta(seconds=value)
    elif key == INCLUDE_FACULTY:
        return value
    elif key == INCLUDE_RESIDENTS:
        return value
    else:
        raise ValueError('No such config key!')
项目:psyplot    作者:Chilipp    | 项目源码 | 文件源码
def format_time(x):
    """Formats date values

    This function formats :class:`datetime.datetime` and
    :class:`datetime.timedelta` objects (and the corresponding numpy objects)
    using the :func:`xarray.core.formatting.format_timestamp` and the
    :func:`xarray.core.formatting.format_timedelta` functions.

    Parameters
    ----------
    x: object
        The value to format. If not a time object, the value is returned

    Returns
    -------
    str or `x`
        Either the formatted time object or the initial `x`"""
    if isinstance(x, (datetime64, datetime)):
        return format_timestamp(x)
    elif isinstance(x, (timedelta64, timedelta)):
        return format_timedelta(x)
    elif isinstance(x, ndarray):
        return list(x) if x.ndim else x[()]
    return x
项目:diva    作者:mgriley    | 项目源码 | 文件源码
def __init__(self, description, default=relativedelta()):
        """
        default: may either be provided as a:

        * datetime.date object
        * string in ISO format (YYYY-mm-dd)
        * datetime.timedelta object. The date will be current - delta
        * dateutil.relativedelta object. The date will be current - delta

        If not specified, it will be the current date.
        Note that dateutil is not in the Python standard library. It provides a simpler
        API to specify a duration in days, weeks, months, etc. You can install it with pip.
        """
        self.description = description
        self.default = to_date_model(default)
        # see Bootstrap date picker docs for options
        # https://bootstrap-datepicker.readthedocs.io/en/stable/#
        self.attributes = {
            'data-date-format': 'yyyy-mm-dd',
            'data-date-orientation': 'left bottom',
            'data-date-autoclose': 'true',
            'value': self.default.iso(),
        }
项目:encore    作者:statgen    | 项目源码 | 文件源码
def update_job_statuses(db, jobs):
        # job_names_param = ",".join("gasp_" + x.id for x in jobs)
        p = subprocess.Popen(["/usr/cluster/bin/sacct", "-u", pwd.getpwuid(os.getuid())[0], "--format", "jobid,state,exitcode,jobname", "--noheader", "-P", "-S", (datetime.date.today() - datetime.timedelta(days=30)).strftime("%Y-%m-%d")], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        squeue_out, squeue_err = p.communicate()

        fake_data = """29646434            PENDING             0
29646435            COMPLETED             0
"""

        # only keep last record for jobs that were re-run
        slurm_jobs_found = dict()
        for line in squeue_out.rstrip().split("\n"):
            if line:
                slurm_job = line.strip().split("|")
                # strip off "gasp_"
                slurm_jobs_found[slurm_job[3][5:]] = slurm_job
        for slurm_job in slurm_jobs_found.values():
            for j in jobs:
                if slurm_job[3][5:] == j.id:
                    Tracker.update_job_status(db, j, slurm_job[1], slurm_job[2])
                    break
项目:smartbackup    作者:baszoetekouw    | 项目源码 | 文件源码
def schedule2time(schedule):
    times = []
    start = now = datetime.utcnow()
    delta_prev = timedelta(0)
    for s in schedule:
        name   = '{delta}/{period}'.format(**s)
        delta  = parse_period(s['delta'])
        period = parse_period(s['period'])

        # note2: we are counting backward
        end   = start
        start = now - period
        times.append({"name": name, "start": start, "end": end, "period": period, "delta": delta})

        delta_prev = delta

    return times


# find the names of existing machines
项目:django-celery-growthmonitor    作者:mbourqui    | 项目源码 | 文件源码
def stop(self):
        """
        To be called when the job is completed. Can be called multiple times, will only be applied once.

        Returns
        -------
        state : EStates
        status : EStatuses
        duration : datetime.timedelta
            Duration of the job

        """
        self._set_duration()
        if self.state is not AJob.EStates.COMPLETED:
            self.status = AJob.EStatuses.FAILURE if self.has_failed() else AJob.EStatuses.SUCCESS
            self.progress(AJob.EStates.COMPLETED)  # This will also save the job
            logger.debug(
                "{} terminated in {}s with status '{}'".format(self, self.duration, self.status.label))
        return self.state, self.status, self.duration
项目:stock    作者:Rockyzsu    | 项目源码 | 文件源码
def __init__(self):
        self.ipo=ts.new_stocks()
        #print ipo.info()

        #????
        self.ipo['ipo_date']=self.ipo['ipo_date'].astype('datetime64')
        #print ipo.info()
        self.start=self.ipo['ipo_date'].values[-1]
        self.end=self.ipo['ipo_date'].values[0]
        print type(self.end)
        #????
        #ipo['ipo_date']=ipo['ipo_date'].astype('datetime64')
        #self.start_d=datetime.datetime.strptime(self.start,'%Y-%m-%d')
        #self.end_d=datetime.datetime.strptime(self.end,'%Y-%m-%d')
        #print type(self.start_d)
        #period=self.start_d+datetime.timedelta(days=30)
        #print period.strftime('%Y-%m-%d')
        #print ipo[ipo['ipo_date']<np.datetime64(period)]
项目:pendulum    作者:sdispater    | 项目源码 | 文件源码
def add_timedelta(self, delta):
        """
        Add timedelta duration to the instance.

        :param delta: The timedelta instance
        :type delta: datetime.timedelta

        :rtype: Time
        """
        if delta.days:
            raise TypeError('Cannot timedelta with days to Time.')

        return self.add(
            seconds=delta.seconds,
            microseconds=delta.microseconds
        )
项目:pendulum    作者:sdispater    | 项目源码 | 文件源码
def subtract_timedelta(self, delta):
        """
        Remove timedelta duration from the instance.

        :param delta: The timedelta instance
        :type delta: datetime.timedelta

        :rtype: Time
        """
        if delta.days:
            raise TypeError('Cannot timedelta with days to Time.')

        return self.subtract(
            seconds=delta.seconds,
            microseconds=delta.microseconds
        )
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_class_ops_dateutil(self):
        tm._skip_if_no_dateutil()
        from dateutil.tz import tzutc

        def compare(x, y):
            self.assertEqual(int(np.round(Timestamp(x).value / 1e9)),
                             int(np.round(Timestamp(y).value / 1e9)))

        compare(Timestamp.now(), datetime.now())
        compare(Timestamp.now('UTC'), datetime.now(tzutc()))
        compare(Timestamp.utcnow(), datetime.utcnow())
        compare(Timestamp.today(), datetime.today())
        current_time = calendar.timegm(datetime.now().utctimetuple())
        compare(Timestamp.utcfromtimestamp(current_time),
                datetime.utcfromtimestamp(current_time))
        compare(Timestamp.fromtimestamp(current_time),
                datetime.fromtimestamp(current_time))

        date_component = datetime.utcnow()
        time_component = (date_component + timedelta(minutes=10)).time()
        compare(Timestamp.combine(date_component, time_component),
                datetime.combine(date_component, time_component))
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_date_range_normalize(self):
        snap = datetime.today()
        n = 50

        rng = date_range(snap, periods=n, normalize=False, freq='2D')

        offset = timedelta(2)
        values = np.array([snap + i * offset for i in range(n)],
                          dtype='M8[ns]')

        self.assert_numpy_array_equal(rng, values)

        rng = date_range('1/1/2000 08:15', periods=n, normalize=False,
                         freq='B')
        the_time = time(8, 15)
        for val in rng:
            self.assertEqual(val.time(), the_time)
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def test_timedelta(self):
        # this is valid too
        index = date_range('1/1/2000', periods=50, freq='B')
        shifted = index + timedelta(1)
        back = shifted + timedelta(-1)
        self.assertTrue(tm.equalContents(index, back))
        self.assertEqual(shifted.freq, index.freq)
        self.assertEqual(shifted.freq, back.freq)

        result = index - timedelta(1)
        expected = index + timedelta(-1)
        self.assertTrue(result.equals(expected))

        # GH4134, buggy with timedeltas
        rng = date_range('2013', '2014')
        s = Series(rng)
        result1 = rng - pd.offsets.Hour(1)
        result2 = DatetimeIndex(s - np.timedelta64(100000000))
        result3 = rng - np.timedelta64(100000000)
        result4 = DatetimeIndex(s - pd.offsets.Hour(1))
        self.assertTrue(result1.equals(result4))
        self.assertTrue(result2.equals(result3))
项目:pknx    作者:open-homeautomation    | 项目源码 | 文件源码
def testReadTimeout(self):
        """Test if read timeouts work and group_read operations

        group_read operations should never block
        """
        tunnel = KNXIPTunnel(gwip)
        tunnel.connect()

        # Read from some random address and hope nothing responds here
        tick = datetime.now()
        res = tunnel.group_read(37000, timeout=1)
        tock = datetime.now()
        diff = tock - tick    # the result is a datetime.timedelta object
        self.assertTrue(diff.total_seconds() >= 1 and diff.total_seconds() < 3)
        self.assertIsNone(res)

        # Read from some random address and hope nothing responds here
        tick = datetime.now()
        res = tunnel.group_read(37000, timeout=5)
        tock = datetime.now()
        diff = tock - tick    # the result is a datetime.timedelta object
        self.assertTrue(diff.total_seconds() >= 5 and diff.total_seconds() < 6)
        self.assertIsNone(res)

        tunnel.disconnect()
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_lookup_symbol(self):

        # Incrementing by two so that start and end dates for each
        # generated Asset don't overlap (each Asset's end_date is the
        # day after its start date.)
        dates = pd.date_range('2013-01-01', freq='2D', periods=5, tz='UTC')
        df = pd.DataFrame.from_records(
            [
                {
                    'sid': i,
                    'symbol':  'existing',
                    'start_date': date.value,
                    'end_date': (date + timedelta(days=1)).value,
                    'exchange': 'NYSE',
                }
                for i, date in enumerate(dates)
            ]
        )
        self.env.write_data(equities_df=df)
        finder = self.asset_finder_type(self.env.engine)
        for _ in range(2):  # Run checks twice to test for caching bugs.
            with self.assertRaises(SymbolNotFound):
                finder.lookup_symbol('NON_EXISTING', dates[0])

            with self.assertRaises(MultipleSymbolsFound):
                finder.lookup_symbol('EXISTING', None)

            for i, date in enumerate(dates):
                # Verify that we correctly resolve multiple symbols using
                # the supplied date
                result = finder.lookup_symbol('EXISTING', date)
                self.assertEqual(result.symbol, 'EXISTING')
                self.assertEqual(result.sid, i)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_offset(self):
        """ Test the offset method of FutureChain.
        """
        cl = FutureChain(self.asset_finder, lambda: '2005-12-01', 'CL')

        # Test that an offset forward sets as_of_date as expected
        self.assertEqual(
            cl.offset('3 days').as_of_date,
            cl.as_of_date + pd.Timedelta(days=3)
        )

        # Test that an offset backward sets as_of_date as expected, with
        # time delta given as str, datetime.timedelta, and pd.Timedelta.
        self.assertEqual(
            cl.offset('-1000 days').as_of_date,
            cl.as_of_date + pd.Timedelta(days=-1000)
        )
        self.assertEqual(
            cl.offset(timedelta(days=-1000)).as_of_date,
            cl.as_of_date + pd.Timedelta(days=-1000)
        )
        self.assertEqual(
            cl.offset(pd.Timedelta('-1000 days')).as_of_date,
            cl.as_of_date + pd.Timedelta(days=-1000)
        )

        # An offset of zero should give the original chain.
        self.assertEqual(cl[0], cl.offset(0)[0])
        self.assertEqual(cl[0], cl.offset("0 days")[0])

        # A string that doesn't represent a time delta should raise a
        # ValueError.
        with self.assertRaises(ValueError):
            cl.offset("blah")
项目:lain    作者:llllllllll    | 项目源码 | 文件源码
def eligible_replays(replays, *, age=None):
    """Filter replays down to just the replays we want to train with.

    Parameters
    ----------
    replays : iterable[Replay]
        The replays to filter.
    age : datetime.timedelta, optional
        Only count replays less than this age old.

    Yields
    ------
    replay : Replay
        The eligible replays in the directory.

    Notes
    -----
    The same beatmap may appear more than once if there are multiple replays
    for this beatmap.
    """
    for replay in replays:
        if age is not None and datetime.utcnow() - replay.timestamp > age:
            continue

        if not (replay.mode != GameMode.standard or
                replay.failed or
                replay.autoplay or
                replay.auto_pilot or
                replay.cinema or
                replay.relax or
                len(replay.beatmap.hit_objects) < 2):
            # ignore plays with mods that are not representative of user skill
            yield replay
项目:kaira    作者:mulonemartin    | 项目源码 | 文件源码
def utcoffset(self, dt):
        """ Offset from UTC.

            >>> UTC.utcoffset(None)
            datetime.timedelta(0)
        """
        return ZERO
项目:kaira    作者:mulonemartin    | 项目源码 | 文件源码
def dst(self, dt):
        """ DST is not in effect.

            >>> UTC.dst(None)
            datetime.timedelta(0)
        """
        return ZERO
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def __init__(self, description, timeuuid, source, source_elapsed, thread_name):
        self.description = description
        self.datetime = datetime.utcfromtimestamp(unix_time_from_uuid1(timeuuid))
        self.source = source
        if source_elapsed is not None:
            self.source_elapsed = timedelta(microseconds=source_elapsed)
        else:
            self.source_elapsed = None
        self.thread_name = thread_name
项目:youtube    作者:FishyFing    | 项目源码 | 文件源码
def info(self, ctx):
        """Information about the bot"""
        msg = await ctx.send('Getting statistics...')
        shards = self.bot.shard_count
        shard_id = ctx.message.guild.shard_id
        guilds = len(list(self.bot.guilds))
        users = str(len([m for m in set(self.bot.get_all_members())]))

        channels = str(len([m for m in set(self.bot.get_all_channels())]))
        # await msg.edit("Getting uptime...")
        up = abs(self.bot.uptime - int(time.perf_counter()))
        up = str(datetime.timedelta(seconds=up))

        data = discord.Embed(title="__**Information**__",
                             colour=discord.Colour(value=11735575))
        data.add_field(name="Version", value="2.5-beta", inline=False)
        data.add_field(name="Shard ID", value=ctx.message.guild.shard_id)
        data.add_field(name="Total Shards", value=shards)
        data.add_field(name="Total Servers", value=guilds)
        # data.add_field(name="Servers (total)", value=total_guilds)
        data.add_field(name="Users", value=users)
        data.add_field(name="Channels", value=channels)
        data.add_field(name="Uptime", value="{}".format(up))
        data.add_field(name="Support Development",
                       value="Donate on [Patreon](https://www.patreon.com/franc_ist) or [PayPal](https://paypal.me/MLAutomod/5)")
        data.set_footer(
            text="Made with \U00002665 by Francis#6565. Support server: https://discord.gg/yp8WpMh")
        try:
            await msg.edit(content=None, embed=data)
            statsd.increment('bot.commands.run', 1)
        except discord.HTTPException:
            logger.exception("Missing embed links perms")
            statsd.increment('bot.commands.errored', 1)
            await ctx.send("Looks like the bot doesn't have embed links perms... It kinda needs these, so I'd suggest adding them!")
项目:youtube    作者:FishyFing    | 项目源码 | 文件源码
def uptime(self, ctx):
        """Shows the bot's uptime"""
        up = abs(self.bot.uptime - int(time.perf_counter()))
        up = str(datetime.timedelta(seconds=up))
        await ctx.send("`Uptime: {}`".format(up))
        statsd.increment('bot.commands.run', 1)
项目:fingerprint-securedrop    作者:freedomofpress    | 项目源码 | 文件源码
def get_cur_runtime(self):
        """Returns the how long the test case has been running as a
        datetime.timedelta object."""
        return datetime.now() - self.test_start_time

    # Sorter
项目:TerminalView    作者:Wramberg    | 项目源码 | 文件源码
def total_seconds(td):
    """
    Given a timedelta (*td*) return an integer representing the equivalent of
    Python 2.7's :meth:`datetime.timdelta.total_seconds`.
    """
    return (((
        td.microseconds +
        (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6))

# NOTE:  This is something I'm investigating as a way to use the new go_async
# module.  A work-in-progress.  Ignore for now...
项目:TerminalView    作者:Wramberg    | 项目源码 | 文件源码
def timeout(self):
        """
        A `property` that controls how long a key will last before being
        automatically removed.  May be be given as a `datetime.timedelta`
        object or a string like, "1d", "30s" (will be passed through the
        `convert_to_timedelta` function).
        """
        if not hasattr(self, "_timeout"):
            self._timeout = timedelta(hours=1) # Default is 1-hour timeout
        return self._timeout
项目:TerminalView    作者:Wramberg    | 项目源码 | 文件源码
def interval(self):
        """
        A `property` that controls how often we check for expired keys.  May be
        given as milliseconds (integer), a `datetime.timedelta` object, or a
        string like, "1d", "30s" (will be passed through the
        `convert_to_timedelta` function).
        """
        if not hasattr(self, "_interval"):
            self._interval = 10000 # Default is every 10 seconds
        return self._interval
项目:TerminalView    作者:Wramberg    | 项目源码 | 文件源码
def interval(self, value):
        if isinstance(value, basestring):
            value = convert_to_timedelta(value)
        if isinstance(value, timedelta):
            value = total_seconds(value) * 1000 # PeriodicCallback uses ms
        self._interval = value
        # Restart the PeriodicCallback
        if hasattr(self, '_key_watcher'):
            self._key_watcher.stop()
        self._key_watcher = PeriodicCallback(
            self._timeout_checker, value, io_loop=self.io_loop)
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def memorized_timedelta(seconds):
    '''Create only one instance of each distinct timedelta'''
    try:
        return _timedelta_cache[seconds]
    except KeyError:
        delta = timedelta(seconds=seconds)
        _timedelta_cache[seconds] = delta
        return delta
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def memorized_datetime(seconds):
    '''Create only one instance of each distinct datetime'''
    try:
        return _datetime_cache[seconds]
    except KeyError:
        # NB. We can't just do datetime.utcfromtimestamp(seconds) as this
        # fails with negative values under Windows (Bug #90096)
        dt = _epoch + timedelta(seconds=seconds)
        _datetime_cache[seconds] = dt
        return dt
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def _to_seconds(td):
    '''Convert a timedelta to seconds'''
    return td.seconds + td.days * 24 * 60 * 60
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def utcoffset(self, dt, is_dst=None):
        '''See datetime.tzinfo.utcoffset

        The is_dst parameter may be used to remove ambiguity during DST
        transitions.

        >>> from pytz import timezone
        >>> tz = timezone('America/St_Johns')
        >>> ambiguous = datetime(2009, 10, 31, 23, 30)

        >>> tz.utcoffset(ambiguous, is_dst=False)
        datetime.timedelta(-1, 73800)

        >>> tz.utcoffset(ambiguous, is_dst=True)
        datetime.timedelta(-1, 77400)

        >>> try:
        ...     tz.utcoffset(ambiguous)
        ... except AmbiguousTimeError:
        ...     print('Ambiguous')
        Ambiguous

        '''
        if dt is None:
            return None
        elif dt.tzinfo is not self:
            dt = self.localize(dt, is_dst)
            return dt.tzinfo._utcoffset
        else:
            return self._utcoffset
项目:magdalena    作者:KaiRo-at    | 项目源码 | 文件源码
def dayList(backlog_days, forced_dates = None):
    from datetime import datetime, timedelta
    import re
    forced_dates = forced_dates or []
    days_to_analyze = [];
    for daysback in xrange(backlog_days):
        days_to_analyze.append(beforeTodayString(days=daysback))
    for anaday in forced_dates:
        if re.match(r"\d+-\d+-\d+", anaday) and anaday not in days_to_analyze:
            days_to_analyze.append(anaday)
    days_to_analyze.sort()
    return days_to_analyze
项目:magdalena    作者:KaiRo-at    | 项目源码 | 文件源码
def beforeTodayString(days = 0, weeks = 0):
    from datetime import datetime, timedelta
    return (datetime.utcnow() - timedelta(days=days, weeks=weeks)).strftime('%Y-%m-%d')
项目:magdalena    作者:KaiRo-at    | 项目源码 | 文件源码
def dayStringAdd(anaday, days = 0, weeks = 0):
    from datetime import datetime, timedelta
    return (datetime.strptime(anaday, '%Y-%m-%d') + timedelta(days=days, weeks=weeks)).strftime('%Y-%m-%d')
项目:plexivity    作者:mutschler    | 项目源码 | 文件源码
def startScheduler():
    db.create_all()
    #create default roles!
    if not db.session.query(models.Role).filter(models.Role.name == "admin").first():
        admin_role = models.Role(name='admin', description='Administrator Role')
        user_role = models.Role(name='user', description='User Role')
        db.session.add(admin_role)
        db.session.add(user_role)
        db.session.commit()

    try:
        import tzlocal

        tz = tzlocal.get_localzone()
        logger.info("local timezone: %s" % tz)
    except:
        tz = None

    if not tz or tz.zone == "local":
        logger.error('Local timezone name could not be determined. Scheduler will display times in UTC for any log'
                 'messages. To resolve this set up /etc/timezone with correct time zone name.')
        tz = pytz.utc
    #in debug mode this is executed twice :(
    #DONT run flask in auto reload mode when testing this!
    scheduler = BackgroundScheduler(logger=sched_logger, timezone=tz)
    scheduler.add_job(notify.task, 'interval', seconds=config.SCAN_INTERVAL, max_instances=1,
                      start_date=datetime.datetime.now(tz) + datetime.timedelta(seconds=2))
    scheduler.start()
    sched = scheduler
    #notify.task()
项目:aschedule    作者:eightnoteight    | 项目源码 | 文件源码
def every(self, get_coro_or_fut, interval: _timedelta_cls, start_at: datetime,
              count=float('inf'), loop=None):
        """
        executes the given job at (start_at, start_at + interval, start_at + 2 * interval, ....)

        :param get_coro_or_fut: a callable which returns a co-routine or a future
        :param interval: a datetime.timedelta object denoting the interval between
                         consecutive job excutions
        :param start_at: a datetime.datetime object denoting the start time to start
                         this schedule
        :param count: the number of time to execute this job, by default the job will be executed
                      continously
        :param loop: event loop if the given future by get_coro_or_fut is hooked up
                     with a custom event loop
        :return: JobSchedule object so that the user can control the schedule based
                 on it's future
        """
        if interval.total_seconds() < 0:
            raise AScheduleException("error: invalid interval ({} seconds).".format(
                interval.total_seconds()))
        if count <= 0:
            raise AScheduleException("error: invalid count({}).".format(count))
        diff = start_at - datetime.now()
        if round(diff.total_seconds()) < 0:
            start_at += interval * ceil((-diff) / interval)
        intervals = itertools.chain(
            iter([round((start_at - datetime.now()).total_seconds())]),
            map(lambda x: x[1], itertools.takewhile(
                lambda x: x[0] < (count - 1),
                enumerate(itertools.repeat(round(interval.total_seconds()))))))
        schedule = JobSchedule(get_coro_or_fut, intervals, loop=loop)
        self.add_schedule(schedule)
        return schedule
项目:aschedule    作者:eightnoteight    | 项目源码 | 文件源码
def every_random_interval(job, interval: timedelta, loop=None):
    """
    executes the job randomly once in the specified interval.
    example:
            run a job every day at random time
            run a job every hour at random time
    :param job: a callable(co-routine function) which returns
                a co-routine or a future or an awaitable
    :param interval: the interval can also be given in the format of datetime.timedelta,
                      then seconds, minutes, hours, days, weeks parameters are ignored.
    :param loop: io loop if the provided job is a custom future linked up
                 with a different event loop.
    :return: schedule object, so it could be cancelled at will of the user by
             aschedule.cancel(schedule)
    """
    if loop is None:
        loop = asyncio.get_event_loop()
    start = loop.time()

    def wait_time_gen():
        count = 0
        while True:
            rand = random.randrange(round(interval.total_seconds()))
            tmp = round(start + interval.total_seconds() * count + rand - loop.time())
            yield tmp
            count += 1

    schedule = JobSchedule(job, wait_time_gen(), loop=loop)
    # add it to default_schedule_manager, so that user can aschedule.cancel it
    default_schedule_manager.add_schedule(schedule)
    return schedule
项目:aschedule    作者:eightnoteight    | 项目源码 | 文件源码
def every_day(job, loop=None):
    return every(job, timedelta=timedelta(days=1), loop=loop)
项目:aschedule    作者:eightnoteight    | 项目源码 | 文件源码
def every_week(job, loop=None):
    return every(job, timedelta=timedelta(days=7), loop=loop)
项目:aschedule    作者:eightnoteight    | 项目源码 | 文件源码
def _every_weekday(job, weekday, loop=None):
    return every(job, timedelta=timedelta(days=7), start_at=_nearest_weekday(weekday), loop=loop)
项目:rc-niceties    作者:mjec    | 项目源码 | 文件源码
def set_to_default():
    import datetime
    set(NICETIES_OPEN, datetime.timedelta(days=14))
    set(CLOSING_TIME, datetime.time(18, 0))
    set(CLOSING_BUFFER, datetime.timedelta(minutes=30))
    set(CACHE_TIMEOUT, datetime.timedelta(days=7))
    set(INCLUDE_FACULTY, False)
    set(INCLUDE_RESIDENTS, False)
    db.session.commit()
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def memorized_timedelta(seconds):
    '''Create only one instance of each distinct timedelta'''
    try:
        return _timedelta_cache[seconds]
    except KeyError:
        delta = timedelta(seconds=seconds)
        _timedelta_cache[seconds] = delta
        return delta
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def memorized_datetime(seconds):
    '''Create only one instance of each distinct datetime'''
    try:
        return _datetime_cache[seconds]
    except KeyError:
        # NB. We can't just do datetime.utcfromtimestamp(seconds) as this
        # fails with negative values under Windows (Bug #90096)
        dt = _epoch + timedelta(seconds=seconds)
        _datetime_cache[seconds] = dt
        return dt
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def _to_seconds(td):
    '''Convert a timedelta to seconds'''
    return td.seconds + td.days * 24 * 60 * 60
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def utcoffset(self, dt, is_dst=None):
        '''See datetime.tzinfo.utcoffset

        The is_dst parameter may be used to remove ambiguity during DST
        transitions.

        >>> from pytz import timezone
        >>> tz = timezone('America/St_Johns')
        >>> ambiguous = datetime(2009, 10, 31, 23, 30)

        >>> tz.utcoffset(ambiguous, is_dst=False)
        datetime.timedelta(-1, 73800)

        >>> tz.utcoffset(ambiguous, is_dst=True)
        datetime.timedelta(-1, 77400)

        >>> try:
        ...     tz.utcoffset(ambiguous)
        ... except AmbiguousTimeError:
        ...     print('Ambiguous')
        Ambiguous

        '''
        if dt is None:
            return None
        elif dt.tzinfo is not self:
            dt = self.localize(dt, is_dst)
            return dt.tzinfo._utcoffset
        else:
            return self._utcoffset
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def __init__(self, offset=0):
        self.ZERO = datetime.timedelta(hours=offset)
项目:Dshield    作者:ywjt    | 项目源码 | 文件源码
def _total_seconds(td):
    # Python 2.6 doesn't have a total_seconds() method on timedelta objects
    return ((td.seconds + td.days * 86400) * 1000000 +
            td.microseconds) // 1000000
项目:diva    作者:mgriley    | 项目源码 | 文件源码
def to_date_model(date):
    raise ValueError("given date must be: a) ISO format string, b) datetime.date object, c) datetime.timedelta object, or d) dateutil.relativedelta object")

# date obj converts to absolute date
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def _total_seconds(td):
    # Python 2.6 doesn't have a total_seconds() method on timedelta objects
    return ((td.seconds + td.days * 86400) * 1000000 +
            td.microseconds) // 1000000