Python sqlalchemy.func 模块,date() 实例源码

我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用sqlalchemy.func.date()

项目:DublinBikeApp    作者:charlawl    | 项目源码 | 文件源码
def get_bikes_for_wetday(cls, dbsession, wetdate, station_id):
        """very similar to get_bikes_for_weekday but not the same: date specified is wetdate not weekday.
        returns a list of bikes for a provided datetime object (wetdate) and station."""
        # averaged per hour so 24 results.
        station = [("Time", "Available Bikes", "Available Stands")]
        station_data = dbsession.query(
            func.hour(cls.last_update),
            func.avg(cls.available_bikes),
            func.avg(cls.available_bike_stands))\
            .filter(cls.station_id == station_id,
                    func.date(cls.last_update) == wetdate.date())\
            .group_by(func.hour(cls.last_update)).all()

        # this section parses the query return into a readable list.
        # from docs:extend() appends the contents of seq to list.
        if station_data:
            station.extend([(a, float(b), float(c)) for a, b, c in station_data])
        else:
            station.extend([(0,0,0)])
        return station
项目:DublinBikeApp    作者:charlawl    | 项目源码 | 文件源码
def get_current_station_info(cls, dbsession):
        """as the method name suggests this returns the up to date station information."""
        sub = dbsession.query(UsageData.station_id, func.max(UsageData.id).label('max_update')).group_by(
            UsageData.station_id).subquery()
        return dbsession.query(
            UsageData.last_update,
             UsageData.available_bike_stands, UsageData.available_bikes).join(sub, and_(
                sub.c.max_update == UsageData.id)).all()
项目:PhoenixNow    作者:ECGHelloWorld    | 项目源码 | 文件源码
def weekly_checkins(date, user):
    week = week_magic(date)

    checkins = Checkin.query.filter(Checkin.user==user, func.date(Checkin.checkin_timestamp)>=week[0]).order_by('Checkin.checkin_timestamp')

    user_week = [False, False, False, False, False]

    for date in week:
        for checkin in checkins:
            if date == checkin.checkin_timestamp.date():
                user_week[date.weekday()] = True

    return user_week
项目:PhoenixNow    作者:ECGHelloWorld    | 项目源码 | 文件源码
def admin_weekly_checkins(date, grade=None):
    week = week_magic(date)
    if grade is None:
        checkins = Checkin.query.filter(and_(func.date(Checkin.checkin_timestamp)>=week[0], func.date(Checkin.checkin_timestamp)<=week[4])).order_by('Checkin.checkin_timestamp')
    else:
        checkins = Checkin.query.filter(and_(Checkin.user.has(grade=grade), func.date(Checkin.checkin_timestamp)>=week[0], func.date(Checkin.checkin_timestamp)<=week[4])).order_by('Checkin.checkin_timestamp')
    return checkins
项目:PhoenixNow    作者:ECGHelloWorld    | 项目源码 | 文件源码
def monthly_checkins(year_num, month_num, user):
    month_range = calendar.monthrange(year_num, month_num)
    month = [False] * month_range[1]

    start_of_month = datetime.date(year_num, month_num, 1)

    checkins = Checkin.query.filter(Checkin.user==user, Checkin.checkin_timestamp>=start_of_month).order_by('Checkin.checkin_timestamp')

    for checkin in checkins:
        if checkin.checkin_timestamp.date().month == month_num:
            if checkin.checkin_timestamp.date().year == year_num:
                month[checkin.checkin_timestamp.date().day - 1] = True

    return month
项目:PhoenixNow    作者:ECGHelloWorld    | 项目源码 | 文件源码
def checkin_user(user):
    today = datetime.date.today()
    for checkin in user.checkins:
        if checkin.checkin_timestamp.date() == today:
            return False
    checkinObject = Checkin()
    user.checkins.append(checkinObject)
    db.session.add(checkinObject)
    user.checkedin = True
    db.session.commit()
    return True
项目:PhoenixNow    作者:ECGHelloWorld    | 项目源码 | 文件源码
def __init__(self, date): # creates a list of all checkins on each day of the week
        self.monday_checkins = Checkin.query.filter(Checkin.checkin_week == str(date.isocalendar()[1]), Checkin.checkin_day == str(1) ).all()
        self.tuesday_checkins = Checkin.query.filter(Checkin.checkin_week == str(date.isocalendar()[1]), Checkin.checkin_day == str(2) ).all()
        self.wednesday_checkins = Checkin.query.filter(Checkin.checkin_week == str(date.isocalendar()[1]), Checkin.checkin_day == str(3) ).all()
        self.thursday_checkins = Checkin.query.filter(Checkin.checkin_week == str(date.isocalendar()[1]), Checkin.checkin_day == str(4) ).all()
        self.friday_checkins = Checkin.query.filter(Checkin.checkin_week == str(date.isocalendar()[1]), Checkin.checkin_day == str(5) ).all()
项目:rucio    作者:rucio01    | 项目源码 | 文件源码
def get_bad_replicas_summary(rse_expression=None, from_date=None, to_date=None, session=None):
    """
    List the bad file replicas summary. Method used by the rucio-ui.
    :param rse_expression: The RSE expression.
    :param from_date: The start date.
    :param to_date: The end date.
    :param session: The database session in use.
    """
    result = []
    incidents = {}
    rse_clause = []
    if rse_expression:
        for rse in parse_expression(expression=rse_expression, session=session):
            rse_clause.append(models.RSE.rse == rse['rse'])

    if session.bind.dialect.name == 'oracle':
        to_days = func.trunc(models.BadReplicas.created_at, str('DD'))
    elif session.bind.dialect.name == 'mysql':
        to_days = func.date(models.BadReplicas.created_at)
    elif session.bind.dialect.name == 'postgresql':
        to_days = func.date_trunc('day', models.BadReplicas.created_at)
    else:
        to_days = func.strftime(models.BadReplicas.created_at, '%Y-%m-%d')
    query = session.query(func.count(), to_days, models.RSE.rse, models.BadReplicas.state, models.BadReplicas.reason).filter(models.RSE.id == models.BadReplicas.rse_id)
    # To be added : HINTS
    if rse_clause != []:
        query = query.filter(or_(*rse_clause))
    if from_date:
        query = query.filter(models.BadReplicas.created_at > from_date)
    if to_date:
        query = query.filter(models.BadReplicas.created_at < to_date)
    summary = query.group_by(to_days, models.RSE.rse, models.BadReplicas.reason, models.BadReplicas.state).all()
    for row in summary:
        if (row[2], row[1], row[4]) not in incidents:
            incidents[(row[2], row[1], row[4])] = {}
        incidents[(row[2], row[1], row[4])][str(row[3])] = row[0]
    for incident in incidents:
        res = incidents[incident]
        res['rse'] = incident[0]
        res['created_at'] = incident[1]
        res['reason'] = incident[2]
        result.append(res)
    return result
项目:rucio    作者:rucio01    | 项目源码 | 文件源码
def list_bad_replicas_status(state=BadFilesStatus.BAD, rse=None, younger_than=None, older_than=None, limit=None, list_pfns=False, session=None):
    """
    List the bad file replicas history states. Method used by the rucio-ui.
    :param state: The state of the file (SUSPICIOUS or BAD).
    :param rse: The RSE name.
    :param younger_than: datetime object to select bad replicas younger than this date.
    :param older_than:  datetime object to select bad replicas older than this date.
    :param limit: The maximum number of replicas returned.
    :param session: The database session in use.
    """
    result = []
    rse_id = None
    if rse:
        rse_id = get_rse_id(rse, session=session)
    query = session.query(models.BadReplicas.scope, models.BadReplicas.name, models.RSE.rse, models.BadReplicas.state, models.BadReplicas.created_at, models.BadReplicas.updated_at)
    if state:
        query = query.filter(models.BadReplicas.state == state)
    if rse_id:
        query = query.filter(models.BadReplicas.rse_id == rse_id)
    if younger_than:
        query = query.filter(models.BadReplicas.created_at >= younger_than)
    if older_than:
        query = query.filter(models.BadReplicas.created_at <= older_than)
    query = query.filter(models.RSE.id == models.BadReplicas.rse_id)
    if limit:
        query = query.limit(limit)
    for badfile in query.yield_per(1000):
        if list_pfns:
            result.append({'scope': badfile.scope, 'name': badfile.name, 'type': DIDType.FILE})
        else:
            result.append({'scope': badfile.scope, 'name': badfile.name, 'rse': badfile.rse, 'state': badfile.state, 'created_at': badfile.created_at, 'updated_at': badfile.updated_at})
    if list_pfns:
        reps = []
        for rep in list_replicas(result, schemes=['srm', ], unavailable=False, request_id=None, ignore_availability=True, all_states=True, session=session):
            pfn = None
            if rse in rep['rses'] and rep['rses'][rse]:
                pfn = rep['rses'][rse][0]
                if pfn and pfn not in reps:
                    reps.append(pfn)
            else:
                reps.extend([item for row in rep['rses'].values() for item in row])
        list(set(reps))
        result = reps
    return result
项目:dcss-scoreboard    作者:zxc23    | 项目源码 | 文件源码
def get_streaks(s: sqlalchemy.orm.session.Session,
                active: Optional[bool]=None,
                limit: Optional[int]=None,
                max_age: Optional[int]=None,
                ) \
        -> Sequence[Streak]:
    """Get streaks, ordered by length (longest first).

    Parameters:
        active: only return streaks with this active flag
        limit: only return (up to) limit results
        max_age: only return streaks with a win less than this many days old

    Returns:
        List of active streaks.
    """
    # The following code is a translation of this basic SQL:
    # SELECT streaks.*, count(games.streak_id) as streak_length
    # FROM streaks
    # JOIN games ON (streaks.id = games.streak_id)
    # GROUP BY streaks.id
    # HAVING streak_length > 1
    # ORDER BY streak_length DESC
    streak_length = func.count(Game.streak_id).label('streak_length')
    streak_last_activity = func.max(Game.end).label('streak_last_activity')
    q = s.query(Streak, streak_length).join(Streak.games)
    q = q.group_by(Streak.id)
    q = q.having(streak_length > 1)
    if max_age is not None:
        q = q.having(
            streak_last_activity > func.date('now', '-%s day' % max_age))
    q = q.order_by(streak_length.desc())
    if active is not None:
        q = q.filter(Streak.active == (sqlalchemy.true()
                                       if active else sqlalchemy.false()))
    if limit is not None:
        q = q.limit(limit)
    streaks = q.all()
    # Since we added a column to the query, the result format is:
    # ((Streak, length), (Streak, length), ...)
    # It's annoying to deal with a custom format, and recalculating the streak
    # length for a few streaks is NBD, so just return a list of Streaks
    return [t.Streak for t in streaks]