我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用sqlalchemy.func.date()。
def get_bikes_for_wetday(cls, dbsession, wetdate, station_id): """very similar to get_bikes_for_weekday but not the same: date specified is wetdate not weekday. returns a list of bikes for a provided datetime object (wetdate) and station.""" # averaged per hour so 24 results. station = [("Time", "Available Bikes", "Available Stands")] station_data = dbsession.query( func.hour(cls.last_update), func.avg(cls.available_bikes), func.avg(cls.available_bike_stands))\ .filter(cls.station_id == station_id, func.date(cls.last_update) == wetdate.date())\ .group_by(func.hour(cls.last_update)).all() # this section parses the query return into a readable list. # from docs:extend() appends the contents of seq to list. if station_data: station.extend([(a, float(b), float(c)) for a, b, c in station_data]) else: station.extend([(0,0,0)]) return station
def get_current_station_info(cls, dbsession): """as the method name suggests this returns the up to date station information.""" sub = dbsession.query(UsageData.station_id, func.max(UsageData.id).label('max_update')).group_by( UsageData.station_id).subquery() return dbsession.query( UsageData.last_update, UsageData.available_bike_stands, UsageData.available_bikes).join(sub, and_( sub.c.max_update == UsageData.id)).all()
def weekly_checkins(date, user): week = week_magic(date) checkins = Checkin.query.filter(Checkin.user==user, func.date(Checkin.checkin_timestamp)>=week[0]).order_by('Checkin.checkin_timestamp') user_week = [False, False, False, False, False] for date in week: for checkin in checkins: if date == checkin.checkin_timestamp.date(): user_week[date.weekday()] = True return user_week
def admin_weekly_checkins(date, grade=None): week = week_magic(date) if grade is None: checkins = Checkin.query.filter(and_(func.date(Checkin.checkin_timestamp)>=week[0], func.date(Checkin.checkin_timestamp)<=week[4])).order_by('Checkin.checkin_timestamp') else: checkins = Checkin.query.filter(and_(Checkin.user.has(grade=grade), func.date(Checkin.checkin_timestamp)>=week[0], func.date(Checkin.checkin_timestamp)<=week[4])).order_by('Checkin.checkin_timestamp') return checkins
def monthly_checkins(year_num, month_num, user): month_range = calendar.monthrange(year_num, month_num) month = [False] * month_range[1] start_of_month = datetime.date(year_num, month_num, 1) checkins = Checkin.query.filter(Checkin.user==user, Checkin.checkin_timestamp>=start_of_month).order_by('Checkin.checkin_timestamp') for checkin in checkins: if checkin.checkin_timestamp.date().month == month_num: if checkin.checkin_timestamp.date().year == year_num: month[checkin.checkin_timestamp.date().day - 1] = True return month
def checkin_user(user): today = datetime.date.today() for checkin in user.checkins: if checkin.checkin_timestamp.date() == today: return False checkinObject = Checkin() user.checkins.append(checkinObject) db.session.add(checkinObject) user.checkedin = True db.session.commit() return True
def __init__(self, date): # creates a list of all checkins on each day of the week self.monday_checkins = Checkin.query.filter(Checkin.checkin_week == str(date.isocalendar()[1]), Checkin.checkin_day == str(1) ).all() self.tuesday_checkins = Checkin.query.filter(Checkin.checkin_week == str(date.isocalendar()[1]), Checkin.checkin_day == str(2) ).all() self.wednesday_checkins = Checkin.query.filter(Checkin.checkin_week == str(date.isocalendar()[1]), Checkin.checkin_day == str(3) ).all() self.thursday_checkins = Checkin.query.filter(Checkin.checkin_week == str(date.isocalendar()[1]), Checkin.checkin_day == str(4) ).all() self.friday_checkins = Checkin.query.filter(Checkin.checkin_week == str(date.isocalendar()[1]), Checkin.checkin_day == str(5) ).all()
def get_bad_replicas_summary(rse_expression=None, from_date=None, to_date=None, session=None): """ List the bad file replicas summary. Method used by the rucio-ui. :param rse_expression: The RSE expression. :param from_date: The start date. :param to_date: The end date. :param session: The database session in use. """ result = [] incidents = {} rse_clause = [] if rse_expression: for rse in parse_expression(expression=rse_expression, session=session): rse_clause.append(models.RSE.rse == rse['rse']) if session.bind.dialect.name == 'oracle': to_days = func.trunc(models.BadReplicas.created_at, str('DD')) elif session.bind.dialect.name == 'mysql': to_days = func.date(models.BadReplicas.created_at) elif session.bind.dialect.name == 'postgresql': to_days = func.date_trunc('day', models.BadReplicas.created_at) else: to_days = func.strftime(models.BadReplicas.created_at, '%Y-%m-%d') query = session.query(func.count(), to_days, models.RSE.rse, models.BadReplicas.state, models.BadReplicas.reason).filter(models.RSE.id == models.BadReplicas.rse_id) # To be added : HINTS if rse_clause != []: query = query.filter(or_(*rse_clause)) if from_date: query = query.filter(models.BadReplicas.created_at > from_date) if to_date: query = query.filter(models.BadReplicas.created_at < to_date) summary = query.group_by(to_days, models.RSE.rse, models.BadReplicas.reason, models.BadReplicas.state).all() for row in summary: if (row[2], row[1], row[4]) not in incidents: incidents[(row[2], row[1], row[4])] = {} incidents[(row[2], row[1], row[4])][str(row[3])] = row[0] for incident in incidents: res = incidents[incident] res['rse'] = incident[0] res['created_at'] = incident[1] res['reason'] = incident[2] result.append(res) return result
def list_bad_replicas_status(state=BadFilesStatus.BAD, rse=None, younger_than=None, older_than=None, limit=None, list_pfns=False, session=None): """ List the bad file replicas history states. Method used by the rucio-ui. :param state: The state of the file (SUSPICIOUS or BAD). :param rse: The RSE name. :param younger_than: datetime object to select bad replicas younger than this date. :param older_than: datetime object to select bad replicas older than this date. :param limit: The maximum number of replicas returned. :param session: The database session in use. """ result = [] rse_id = None if rse: rse_id = get_rse_id(rse, session=session) query = session.query(models.BadReplicas.scope, models.BadReplicas.name, models.RSE.rse, models.BadReplicas.state, models.BadReplicas.created_at, models.BadReplicas.updated_at) if state: query = query.filter(models.BadReplicas.state == state) if rse_id: query = query.filter(models.BadReplicas.rse_id == rse_id) if younger_than: query = query.filter(models.BadReplicas.created_at >= younger_than) if older_than: query = query.filter(models.BadReplicas.created_at <= older_than) query = query.filter(models.RSE.id == models.BadReplicas.rse_id) if limit: query = query.limit(limit) for badfile in query.yield_per(1000): if list_pfns: result.append({'scope': badfile.scope, 'name': badfile.name, 'type': DIDType.FILE}) else: result.append({'scope': badfile.scope, 'name': badfile.name, 'rse': badfile.rse, 'state': badfile.state, 'created_at': badfile.created_at, 'updated_at': badfile.updated_at}) if list_pfns: reps = [] for rep in list_replicas(result, schemes=['srm', ], unavailable=False, request_id=None, ignore_availability=True, all_states=True, session=session): pfn = None if rse in rep['rses'] and rep['rses'][rse]: pfn = rep['rses'][rse][0] if pfn and pfn not in reps: reps.append(pfn) else: reps.extend([item for row in rep['rses'].values() for item in row]) list(set(reps)) result = reps return result
def get_streaks(s: sqlalchemy.orm.session.Session, active: Optional[bool]=None, limit: Optional[int]=None, max_age: Optional[int]=None, ) \ -> Sequence[Streak]: """Get streaks, ordered by length (longest first). Parameters: active: only return streaks with this active flag limit: only return (up to) limit results max_age: only return streaks with a win less than this many days old Returns: List of active streaks. """ # The following code is a translation of this basic SQL: # SELECT streaks.*, count(games.streak_id) as streak_length # FROM streaks # JOIN games ON (streaks.id = games.streak_id) # GROUP BY streaks.id # HAVING streak_length > 1 # ORDER BY streak_length DESC streak_length = func.count(Game.streak_id).label('streak_length') streak_last_activity = func.max(Game.end).label('streak_last_activity') q = s.query(Streak, streak_length).join(Streak.games) q = q.group_by(Streak.id) q = q.having(streak_length > 1) if max_age is not None: q = q.having( streak_last_activity > func.date('now', '-%s day' % max_age)) q = q.order_by(streak_length.desc()) if active is not None: q = q.filter(Streak.active == (sqlalchemy.true() if active else sqlalchemy.false())) if limit is not None: q = q.limit(limit) streaks = q.all() # Since we added a column to the query, the result format is: # ((Streak, length), (Streak, length), ...) # It's annoying to deal with a custom format, and recalculating the streak # length for a few streaks is NBD, so just return a list of Streaks return [t.Streak for t in streaks]