我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用sqlalchemy.func.avg()。
def calculate_avg_sleep_over_time(user_id, start_date, end_date): """Calculates the average of each time interval from start_date to end_date. Returns a list of averages as floats and a list of dates as strings.""" averages = [] start_dates = [] while start_date <= (end_date - timedelta(days=7)): interval_end_date = start_date + timedelta(days=7) avg = calculate_avg_sleep(user_id, start_date, interval_end_date) averages.append(avg) start_date = start_date + timedelta(days=7) date = "%s/%s" % (start_date.month, start_date.day) start_dates.append(date) return averages, start_dates # NOTE: Collapse into calculate_median during Phase 3 refactor.
def get_bikes_for_weekday(cls, dbsession, weekday, station_id): """returns a list of bikes for a provided weekday and station. averaged per hour so 24 results.""" station = [("Time", "Available Bikes", "Available Stands")] station_data = dbsession.query(func.hour(cls.last_update), func.avg(cls.available_bikes), func.avg(cls.available_bike_stands)) \ .filter(cls.station_id == station_id, func.weekday(cls.last_update) == weekday) \ .group_by(func.hour(cls.last_update)) \ .all() # this section parses the query return into a readable list. # from docs:extend() appends the contents of seq to list. if station_data: station.extend([(a, float(b), float(c)) for a, b, c in station_data]) else: station.extend([(0,0,0)]) return station
def get_bikes_for_wetday(cls, dbsession, wetdate, station_id): """very similar to get_bikes_for_weekday but not the same: date specified is wetdate not weekday. returns a list of bikes for a provided datetime object (wetdate) and station.""" # averaged per hour so 24 results. station = [("Time", "Available Bikes", "Available Stands")] station_data = dbsession.query( func.hour(cls.last_update), func.avg(cls.available_bikes), func.avg(cls.available_bike_stands))\ .filter(cls.station_id == station_id, func.date(cls.last_update) == wetdate.date())\ .group_by(func.hour(cls.last_update)).all() # this section parses the query return into a readable list. # from docs:extend() appends the contents of seq to list. if station_data: station.extend([(a, float(b), float(c)) for a, b, c in station_data]) else: station.extend([(0,0,0)]) return station
def get_bikes_for_week(cls, dbsession, station_id): """as method name describes. similar to methods above but averaged over week.""" station = [("Day", "Available Bikes")] station_data = dbsession.query(func.weekday(cls.last_update), func.avg(cls.available_bikes)) \ .filter(cls.station_id == station_id) \ .group_by(func.weekday(cls.last_update)) \ .all() # this section parses the query return into a readable list. # from docs:extend() appends the contents of seq to list. if station_data: station.extend([(days[a], float(b)) for a, b in station_data]) else: station.extend([(0,0)]) return station
def recalibrate_hero_values(session, league_id): heroes = session.query(Hero).filter(Hero.league == league_id) average_points = float(session.query(func.avg(Hero.points)).filter(Hero.league == league_id).scalar()) for hero in heroes: new_calibration = calibrate_value(average_points, hero.points) print "new calbration: %s, from %s" % (new_calibration, hero.value) hero.value = round(combine_calibrations(hero.value, new_calibration), 1)
def top_rated_packages(cls, limit=10): # NB Not using sqlalchemy as sqla 0.4 doesn't work using both group_by # and apply_avg package = table('package') rating = table('rating') sql = select([package.c.id, func.avg(rating.c.rating), func.count(rating.c.rating)], from_obj=[package.join(rating)]).\ where(and_(package.c.private==False, package.c.state=='active')). \ group_by(package.c.id).\ order_by(func.avg(rating.c.rating).desc(), func.count(rating.c.rating).desc()).\ limit(limit) res_ids = model.Session.execute(sql).fetchall() res_pkgs = [(model.Session.query(model.Package).get(unicode(pkg_id)), avg, num) for pkg_id, avg, num in res_ids] return res_pkgs
def aggregate(cls, value): return { 'average_age': func.avg(cls.age), 'average_wage': func.avg(cls.wage), 'wage': func.sum(cls.wage), 'jobs': func.count(cls.employee), 'average_establishment_size': func.count(cls.employee) / func.count(distinct(cls.establishment)) }[value]
def aggregate(cls, value): return { 'enrolleds': func.count(), 'entrants': func.sum(cls.entrant), 'graduates': func.sum(cls.graduate), 'average_age': func.avg(cls.age) }[value]
def aggregate(cls, value): return { 'average_age': func.avg(cls.age), 'students': func.count(), 'classes': func.count(distinct(cls.sc_class)), 'average_class_size': func.count() / func.count(distinct(cls.sc_class)), 'schools': func.count(distinct(cls.sc_school)), }[value]
def test_session_query(session, table): col_concat = func.concat(table.c.string).label('concat') result = ( session .query( table.c.string, col_concat, func.avg(table.c.integer), func.sum(case([(table.c.boolean == True, 1)], else_=0)) ) .group_by(table.c.string, col_concat) .having(func.avg(table.c.integer) > 10) ).all() assert len(result) > 0
def calculate_avg_sleep(user_id, start_date, end_date): """Calculates user's average hours of sleep per night from start_date to end_date.""" avg_sleep = db.session.query(func.avg(Entry.minutes_asleep)).filter(Entry.user_id\ == user_id, Entry.date >= start_date, Entry.date <= end_date) avg_sleep = int(avg_sleep[0][0])/60.0 return avg_sleep
def calculate_avg(user_id, start_date, end_date, column_name): """Calculates average of field with column_name from start_date to end_date.""" avg = db.session.query(func.avg(column_name)).filter\ (Entry.user_id == user_id, Entry.date >= start_date, \ Entry.date <= end_date) avg = avg[0][0] return avg