Python sqlalchemy.func 模块,avg() 实例源码

我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用sqlalchemy.func.avg()

项目:insomnia-app    作者:k-wiz    | 项目源码 | 文件源码
def calculate_avg_sleep_over_time(user_id, start_date, end_date):
    """Calculates the average of each time interval from start_date to end_date.
    Returns a list of averages as floats and a list of dates as strings."""

    averages = []
    start_dates = []

    while start_date <= (end_date - timedelta(days=7)):
        interval_end_date = start_date + timedelta(days=7)
        avg = calculate_avg_sleep(user_id, start_date, interval_end_date)
        averages.append(avg)

        start_date = start_date + timedelta(days=7)
        date = "%s/%s" % (start_date.month, start_date.day)
        start_dates.append(date)

    return averages, start_dates



# NOTE: Collapse into calculate_median during Phase 3 refactor.
项目:DublinBikeApp    作者:charlawl    | 项目源码 | 文件源码
def get_bikes_for_weekday(cls, dbsession, weekday, station_id):
        """returns a list of bikes for a provided weekday and station.
        averaged per hour so 24 results."""
        station = [("Time", "Available Bikes", "Available Stands")]

        station_data = dbsession.query(func.hour(cls.last_update),
                                        func.avg(cls.available_bikes),
                                        func.avg(cls.available_bike_stands)) \
            .filter(cls.station_id == station_id,
                    func.weekday(cls.last_update) == weekday) \
            .group_by(func.hour(cls.last_update)) \
            .all()

        # this section parses the query return into a readable list.
        # from docs:extend() appends the contents of seq to list.
        if station_data:
            station.extend([(a, float(b), float(c)) for a, b, c in station_data])
        else:
            station.extend([(0,0,0)])
        return station
项目:DublinBikeApp    作者:charlawl    | 项目源码 | 文件源码
def get_bikes_for_wetday(cls, dbsession, wetdate, station_id):
        """very similar to get_bikes_for_weekday but not the same: date specified is wetdate not weekday.
        returns a list of bikes for a provided datetime object (wetdate) and station."""
        # averaged per hour so 24 results.
        station = [("Time", "Available Bikes", "Available Stands")]
        station_data = dbsession.query(
            func.hour(cls.last_update),
            func.avg(cls.available_bikes),
            func.avg(cls.available_bike_stands))\
            .filter(cls.station_id == station_id,
                    func.date(cls.last_update) == wetdate.date())\
            .group_by(func.hour(cls.last_update)).all()

        # this section parses the query return into a readable list.
        # from docs:extend() appends the contents of seq to list.
        if station_data:
            station.extend([(a, float(b), float(c)) for a, b, c in station_data])
        else:
            station.extend([(0,0,0)])
        return station
项目:DublinBikeApp    作者:charlawl    | 项目源码 | 文件源码
def get_bikes_for_week(cls, dbsession, station_id):
        """as method name describes.
        similar to methods above but averaged over week."""
        station = [("Day", "Available Bikes")]
        station_data = dbsession.query(func.weekday(cls.last_update),
                                       func.avg(cls.available_bikes)) \
            .filter(cls.station_id == station_id) \
            .group_by(func.weekday(cls.last_update)) \
            .all()

        # this section parses the query return into a readable list.
        # from docs:extend() appends the contents of seq to list.
        if station_data:
            station.extend([(days[a], float(b)) for a, b in station_data])
        else:
            station.extend([(0,0)])

        return station
项目:fantasy-dota-heroes    作者:ThePianoDentist    | 项目源码 | 文件源码
def recalibrate_hero_values(session, league_id):
    heroes = session.query(Hero).filter(Hero.league == league_id)
    average_points = float(session.query(func.avg(Hero.points)).filter(Hero.league == league_id).scalar())
    for hero in heroes:
        new_calibration = calibrate_value(average_points, hero.points)
        print "new calbration: %s, from %s" % (new_calibration, hero.value)
        hero.value = round(combine_calibrations(hero.value, new_calibration), 1)
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def top_rated_packages(cls, limit=10):
        # NB Not using sqlalchemy as sqla 0.4 doesn't work using both group_by
        # and apply_avg
        package = table('package')
        rating = table('rating')
        sql = select([package.c.id, func.avg(rating.c.rating), func.count(rating.c.rating)], from_obj=[package.join(rating)]).\
              where(and_(package.c.private==False, package.c.state=='active')). \
              group_by(package.c.id).\
              order_by(func.avg(rating.c.rating).desc(), func.count(rating.c.rating).desc()).\
              limit(limit)
        res_ids = model.Session.execute(sql).fetchall()
        res_pkgs = [(model.Session.query(model.Package).get(unicode(pkg_id)), avg, num) for pkg_id, avg, num in res_ids]
        return res_pkgs
项目:dataviva-api    作者:DataViva    | 项目源码 | 文件源码
def aggregate(cls, value):
        return {
            'average_age': func.avg(cls.age),
            'average_wage': func.avg(cls.wage),
            'wage': func.sum(cls.wage),
            'jobs': func.count(cls.employee),
            'average_establishment_size': func.count(cls.employee) / func.count(distinct(cls.establishment))    
        }[value]
项目:dataviva-api    作者:DataViva    | 项目源码 | 文件源码
def aggregate(cls, value):
        return {
            'enrolleds': func.count(),
            'entrants': func.sum(cls.entrant),
            'graduates': func.sum(cls.graduate),
            'average_age': func.avg(cls.age)
        }[value]
项目:dataviva-api    作者:DataViva    | 项目源码 | 文件源码
def aggregate(cls, value):
        return {
            'average_age': func.avg(cls.age),
            'students': func.count(),
            'classes': func.count(distinct(cls.sc_class)),
            'average_class_size': func.count() / func.count(distinct(cls.sc_class)),
            'schools': func.count(distinct(cls.sc_school)),
        }[value]
项目:pybigquery    作者:mxmzdlv    | 项目源码 | 文件源码
def test_session_query(session, table):
    col_concat = func.concat(table.c.string).label('concat')
    result = (
        session
        .query(
            table.c.string,
            col_concat,
            func.avg(table.c.integer),
            func.sum(case([(table.c.boolean == True, 1)], else_=0))
        )
        .group_by(table.c.string, col_concat)
        .having(func.avg(table.c.integer) > 10)

    ).all()
    assert len(result) > 0
项目:insomnia-app    作者:k-wiz    | 项目源码 | 文件源码
def calculate_avg_sleep(user_id, start_date, end_date):
    """Calculates user's average hours of sleep per night from start_date
    to end_date."""

    avg_sleep = db.session.query(func.avg(Entry.minutes_asleep)).filter(Entry.user_id\
     == user_id, Entry.date >= start_date, Entry.date <= end_date)
    avg_sleep = int(avg_sleep[0][0])/60.0
    return avg_sleep
项目:insomnia-app    作者:k-wiz    | 项目源码 | 文件源码
def calculate_avg(user_id, start_date, end_date, column_name):
    """Calculates average of field with column_name from start_date to end_date."""

    avg = db.session.query(func.avg(column_name)).filter\
                        (Entry.user_id == user_id, Entry.date >= start_date, \
                            Entry.date <= end_date)
    avg = avg[0][0]
    return avg