Python sqlalchemy.func 模块,min() 实例源码

我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用sqlalchemy.func.min()

项目:NodeDefender    作者:CTSNE    | 项目源码 | 文件源码
def Get(icpe, from_date = (datetime.now() - timedelta(days=7)), to_date =
        datetime.now()):
    power_data = SQL.session.query(PowerModel, \
                                  label('low', func.min(PowerModel.low)),
                                  label('high', func.max(PowerModel.high)),
                                  label('total', func.sum(PowerModel.average)),
                                  label('date', PowerModel.date)).\
            join(iCPEModel).\
            filter(iCPEModel.mac_address == icpe).\
            filter(PowerModel.date > from_date).\
            filter(PowerModel.date < to_date).\
            group_by(PowerModel.date).all()

    if not power_data:
        return False

    ret_json = {'icpe' : icpe}
    ret_json['power'] = []
    for data in power_data:
        ret_json['power'].append({'date' : str(data.date), 'low' : data.low, 'high' : data.high,
                                    'total' : data.total})
    return ret_json
项目:NodeDefender    作者:CTSNE    | 项目源码 | 文件源码
def latest(icpe):
    heat_data = SQL.session.query(HeatModel, \
                                  label('low', func.min(HeatModel.low)),
                                  label('high', func.max(HeatModel.high)),
                                  label('total', func.sum(HeatModel.average)),
                                  label('date', HeatModel.date)).\
            join(iCPEModel).\
            filter(iCPEModel.mac_address == icpe).\
            order_by(HeatModel.date.desc()).\
            group_by(HeatModel.date).first()

    if not heat_data:
        return False

    return {'icpe' : icpe, 'date' : str(heat_data.date), 'low' : power_data.low,\
            'high' : heat_data.high, 'total' : power_data.total}
项目:NodeDefender    作者:CTSNE    | 项目源码 | 文件源码
def get(icpe, from_date = (datetime.now() - timedelta(days=7)), to_date =
        datetime.now()):
    heat_data = SQL.session.query(HeatModel, \
                                  label('low', func.min(HeatModel.low)),
                                  label('high', func.max(HeatModel.high)),
                                  label('total', func.sum(HeatModel.average)),
                                  label('date', HeatModel.date)).\
            join(iCPEModel).\
            filter(iCPEModel.mac_address == icpe).\
            filter(HeatModel.date > from_date).\
            filter(HeatModel.date < to_date).\
            group_by(HeatModel.date).all()

    if not heat_data:
        return False

    ret_json = {'icpe' : icpe}
    ret_json['heat'] = []
    for data in heat_data:
        ret_json['heat'].append({'date' : str(data.date), 'low' : data.low, 'high' : data.high,
                                    'total' : data.total})
    return ret_json
项目:NodeDefender    作者:CTSNE    | 项目源码 | 文件源码
def latest(icpe):
    power_data = SQL.session.query(PowerModel, \
                                  label('low', func.min(PowerModel.low)),
                                  label('high', func.max(PowerModel.high)),
                                  label('total', func.sum(PowerModel.average)),
                                  label('date', PowerModel.date)).\
            join(iCPEModel).\
            filter(iCPEModel.mac_address == icpe).\
            order_by(PowerModel.date.desc()).\
            group_by(PowerModel.date).first()

    if not power_data:
        return False

    return {'icpe' : icpe, 'date' : str(power_data.date), 'low' : power_data.low,\
            'high' : power_data.high, 'total' : power_data.total}
项目:Monocle    作者:Noctem    | 项目源码 | 文件源码
def estimate_remaining_time(session, spawn_id, seen):
    first, last = get_first_last(session, spawn_id)

    if not first:
        return 90, 1800

    if seen > last:
        last = seen
    elif seen < first:
        first = seen

    if last - first > 1710:
        estimates = [
            time_until_time(x, seen)
            for x in (first + 90, last + 90, first + 1800, last + 1800)]
        return min(estimates), max(estimates)

    soonest = last + 90
    latest = first + 1800
    return time_until_time(soonest, seen), time_until_time(latest, seen)
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def get_new_packages(cls):
        '''
        @return: Returns list of new pkgs and date when they were created, in
                 format: [(id, date_ordinal), ...]
        '''
        def new_packages():
            # Can't filter by time in select because 'min' function has to
            # be 'for all time' else you get first revision in the time period.
            package_revision = table('package_revision')
            revision = table('revision')
            s = select([package_revision.c.id, func.min(revision.c.timestamp)], from_obj=[package_revision.join(revision)]).group_by(package_revision.c.id).order_by(func.min(revision.c.timestamp))
            res = model.Session.execute(s).fetchall() # [(id, datetime), ...]
            res_pickleable = []
            for pkg_id, created_datetime in res:
                res_pickleable.append((pkg_id, created_datetime.toordinal()))
            return res_pickleable
        if cache_enabled:
            week_commences = cls.get_date_week_started(datetime.date.today())
            key = 'all_new_packages_%s' + week_commences.strftime(DATE_FORMAT)
            new_packages = our_cache.get_value(key=key,
                                               createfunc=new_packages)
        else:
            new_packages = new_packages()
        return new_packages
项目:scrobbler    作者:hatarist    | 项目源码 | 文件源码
def dashboard(period=None):
    period, days = PERIODS.get(period, PERIODS['1w'])

    col_year = func.extract('year', Scrobble.played_at)
    year_from, year_to = db.session.query(func.min(col_year), func.max(col_year)).first()
    year_from, year_to = int(year_from), int(year_to)

    return render_template(
        'dashboard.html',
        period=period,
        year_min=year_from,
        year_max=year_to,
    )
项目:Monocle    作者:Noctem    | 项目源码 | 文件源码
def get_session_stats(session):
    query = session.query(func.min(Sighting.expire_timestamp),
        func.max(Sighting.expire_timestamp))
    if conf.REPORT_SINCE:
        query = query.filter(Sighting.expire_timestamp > SINCE_TIME)
    min_max_result = query.one()
    length_hours = (min_max_result[1] - min_max_result[0]) // 3600
    if length_hours == 0:
        length_hours = 1
    # Convert to datetime
    return {
        'start': datetime.fromtimestamp(min_max_result[0]),
        'end': datetime.fromtimestamp(min_max_result[1]),
        'length_hours': length_hours
    }
项目:Monocle    作者:Noctem    | 项目源码 | 文件源码
def get_first_last(session, spawn_id):
    return session.query(func.min(Mystery.first_seconds), func.max(Mystery.last_seconds)) \
        .filter(Mystery.spawn_id == spawn_id) \
        .filter(Mystery.first_seen > conf.LAST_MIGRATION) \
        .first()
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def get_deleted_packages(cls):
        '''
        @return: Returns list of deleted pkgs and date when they were deleted, in
                 format: [(id, date_ordinal), ...]
        '''
        def deleted_packages():
            # Can't filter by time in select because 'min' function has to
            # be 'for all time' else you get first revision in the time period.
            package_revision = table('package_revision')
            revision = table('revision')
            s = select([package_revision.c.id, func.min(revision.c.timestamp)], from_obj=[package_revision.join(revision)]).\
                where(package_revision.c.state==model.State.DELETED).\
                group_by(package_revision.c.id).\
                order_by(func.min(revision.c.timestamp))
            res = model.Session.execute(s).fetchall() # [(id, datetime), ...]
            res_pickleable = []
            for pkg_id, deleted_datetime in res:
                res_pickleable.append((pkg_id, deleted_datetime.toordinal()))
            return res_pickleable
        if cache_enabled:
            week_commences = cls.get_date_week_started(datetime.date.today())
            key = 'all_deleted_packages_%s' + week_commences.strftime(DATE_FORMAT)
            deleted_packages = our_cache.get_value(key=key,
                                                   createfunc=deleted_packages)
        else:
            deleted_packages = deleted_packages()
        return deleted_packages
项目:BlogSpider    作者:hack4code    | 项目源码 | 文件源码
def get_begin_day():
    r = db.session.query(
        func.min(Article.crawl_date).label('first')
        ).one()
    return r.first.date()
项目:BlogSpider    作者:hack4code    | 项目源码 | 文件源码
def get_first_aid(category):
    r = db.session.query(
        func.min(Article.id).label('min_aid')
        ).filter_by(category=category).one()
    return r.min_aid
项目:PyPush    作者:VRGhost    | 项目源码 | 文件源码
def __init__(self, service):
        self.service = service
        self.minReconnectTime = collections.defaultdict(lambda: 0) # UID -> min time.time
项目:PyPush    作者:VRGhost    | 项目源码 | 文件源码
def _run(self):
        """A separate daemon thread that writes back actions scheduled in the db."""
        while self.running:
            with self.service.sessionCtx() as session:
                try:
                    self.reconnector.step(session)
                except Exception:
                    self.log.exception("Error reconnecting to a microbot.")

                try:
                    self.actionWriter.step(session)
                except Exception:
                    self.log.exception("Write actions exception.")

                nextActionTime = session.query(
                    func.min(db.Action.scheduled_at)
                ).filter(
                    db.Action.prev_action == None
                ).scalar()

            now = datetime.datetime.utcnow()
            if nextActionTime is None:
                waitTime = 30
            elif nextActionTime < now:
                waitTime = 0;
            else:
                waitTime = (nextActionTime - now).seconds
                waitTime = min(max(waitTime, 1), 10)

            self._wakeup.wait(waitTime)
            self._wakeup.clear()
项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def minmax_tasks(self):
         """Find tasks minimum and maximum
         @return: unix timestamps of minimum and maximum
         """
         session = self.Session()
         try:
             _min = session.query(func.min(Task.started_on).label("min")).first()
             _max = session.query(func.max(Task.completed_on).label("max")).first()
             return int(_min[0].strftime("%s")), int(_max[0].strftime("%s"))
         except SQLAlchemyError as e:
             log.debug("Database error counting tasks: {0}".format(e))
             return 0
         finally:
             session.close()
项目:radar    作者:renalreg    | 项目源码 | 文件源码
def recruited_date(cls, group=None):
        q = select([func.min(GroupPatient.from_date)])
        q = q.select_from(join(GroupPatient, Group, GroupPatient.group_id == Group.id))
        q = q.where(GroupPatient.patient_id == cls.id)

        if group is not None:
            q = q.where(Group.id == group.id)
        else:
            q = q.where(Group.type == GROUP_TYPE.SYSTEM)

        q = q.as_scalar()

        return q
项目:radar    作者:renalreg    | 项目源码 | 文件源码
def primary_patient_number(self):
        patient_numbers = [x for x in self.patient_numbers if x.number_group.is_recruitment_number_group]

        if len(patient_numbers) == 0:
            return None

        def by_modified_date(x):
            return (x.modified_date or datetime.min, x.id)

        return max(patient_numbers, key=by_modified_date)
项目:radar    作者:renalreg    | 项目源码 | 文件源码
def latest_demographics(self):
        patient_demographics = self.patient_demographics

        if len(patient_demographics) == 0:
            return None

        def by_modified_date(x):
            return (x.modified_date or datetime.min, x.id)

        return max(patient_demographics, key=by_modified_date)
项目:radar    作者:renalreg    | 项目源码 | 文件源码
def patients_by_recruitment_group_date(group, interval='month'):
    """
    Number of patients recruited by each group over time.
    """

    group_id_c = func.first_value(GroupPatient.created_group_id)\
        .over(partition_by=GroupPatient.patient_id, order_by=GroupPatient.from_date)\
        .label('group_id')
    date_c = func.min(GroupPatient.from_date)\
        .over(partition_by=GroupPatient.patient_id)\
        .label('date')

    query = db.session.query(GroupPatient.patient_id, group_id_c, date_c)
    query = query.distinct()
    query = query.join(GroupPatient.patient)
    query = query.filter(GroupPatient.group_id == group.id)
    query = query.filter(Patient.test == false())

    if group.type == GROUP_TYPE.SYSTEM:
        query = query.filter(Patient.current(group) == true())
    else:
        query = query.filter(Patient.current() == true())

    query = query.cte()

    results = _get_results(query, interval)

    return results
项目:radar    作者:renalreg    | 项目源码 | 文件源码
def sort_by_group(group):
    q = db.session.query(func.min(GroupPatient.from_date))
    q = q.filter(GroupPatient.patient_id == Patient.id)
    q = q.filter(GroupPatient.group_id == group.id)
    return q.as_scalar()
项目:scrobbler    作者:hatarist    | 项目源码 | 文件源码
def top_yearly_artists():
    scrobbles = func.count(Scrobble.artist).label('count')
    charts = {}

    col_year = func.extract('year', Scrobble.played_at)
    year_from, year_to = db.session.query(func.min(col_year), func.max(col_year)).first()
    year_from, year_to = int(year_from), int(year_to)

    stat_count = 1000
    show_count = 100

    for year in range(year_from, year_to + 1):
        time_from = datetime.datetime(year, 1, 1)
        time_to = datetime.datetime(year, 12, 31, 23, 59, 59, 999999)
        charts[year] = (
            db.session.query(Scrobble.artist, scrobbles)
            .filter(Scrobble.user_id == current_user.id)
            .filter(Scrobble.played_at >= time_from, Scrobble.played_at <= time_to)
            .group_by(Scrobble.artist)
            .order_by(scrobbles.desc())
            .limit(stat_count)
            .all()
        )

    position_changes = {}

    for year in range(year_from + 1, year_to + 1):
        chart = {artist: position for position, (artist, scrobbles) in enumerate(charts[year], 1)}
        prev_chart = {
            artist: position for position, (artist, scrobbles) in enumerate(charts[year - 1], 1)
        }

        prev_charts = (chart for chart_year, chart in charts.items() if chart_year < year)
        prev_artists = {artist for chart in prev_charts for (artist, scrobbles) in chart}

        if year not in position_changes:
            position_changes[year] = {}

        for artist, data in chart.items():
            if artist in prev_chart:
                position_changes[year][artist] = prev_chart[artist] - chart[artist]
            elif artist not in prev_artists:
                position_changes[year][artist] = 'new'

    charts = sorted(charts.items())

    return render_template(
        'charts/top_yearly_artists.html',
        charts=charts,
        position_changes=position_changes,
        show_count=show_count
    )
项目:scrobbler    作者:hatarist    | 项目源码 | 文件源码
def unique_monthly():
    stats = {}

    col_year = func.extract('year', Scrobble.played_at)
    year_from, year_to = db.session.query(func.min(col_year), func.max(col_year)).first()
    year_from, year_to = int(year_from), int(year_to)

    for year in range(year_from, year_to + 1):
        for month in range(1, 13):
            time_from = datetime.datetime(year, month, 1)
            time_to = time_from + datetime.timedelta(days=calendar.monthrange(year, month)[1])

            scrobbles = (
                db.session.query(Scrobble)
                .filter(Scrobble.user_id == current_user.id)
                .filter(Scrobble.played_at >= time_from, Scrobble.played_at <= time_to)
                .count()
            )
            unique_artists = (
                db.session.query(Scrobble.artist)
                .filter(
                    Scrobble.user_id == current_user.id,
                    Scrobble.played_at >= time_from,
                    Scrobble.played_at <= time_to
                )
                .group_by(Scrobble.artist)
                .count()
            )
            unique_tracks = (
                db.session.query(Scrobble.artist, Scrobble.track)
                .filter(Scrobble.user_id == current_user.id)
                .filter(Scrobble.played_at >= time_from, Scrobble.played_at <= time_to)
                .group_by(Scrobble.artist, Scrobble.track)
                .count()
            )

            key = '{:d}-{:02d}'.format(year, month)
            stats[key] = (scrobbles, unique_artists, unique_tracks)

    stats = sorted(stats.items())

    return render_template(
        'stats/unique.html',
        stats=stats
    )
项目:scrobbler    作者:hatarist    | 项目源码 | 文件源码
def unique_yearly():
    stats = {}

    col_year = func.extract('year', Scrobble.played_at)
    year_from, year_to = db.session.query(func.min(col_year), func.max(col_year)).first()
    year_from, year_to = int(year_from), int(year_to)

    for year in range(year_from, year_to + 1):
        time_from = datetime.datetime(year, 1, 1)
        time_to = datetime.datetime(year, 12, 31, 23, 59, 59, 999999)
        # select extract(year from played_at) as year, count(id) from scrobbles group by year;
        scrobbles = (
            db.session.query(Scrobble)
            .filter(Scrobble.user_id == current_user.id)
            .filter(Scrobble.played_at >= time_from, Scrobble.played_at <= time_to)
            .count()
        )

        # select extract(year from played_at) as year, sum(1) from scrobbles group by year, artist;
        unique_artists = (
            db.session.query(Scrobble.artist)
            .filter(Scrobble.user_id == current_user.id)
            .filter(Scrobble.played_at >= time_from, Scrobble.played_at <= time_to)
            .group_by(Scrobble.artist)
            .count()
        )
        unique_tracks = (
            db.session.query(Scrobble.artist, Scrobble.track)
            .filter(Scrobble.user_id == current_user.id)
            .filter(Scrobble.played_at >= time_from, Scrobble.played_at <= time_to)
            .group_by(Scrobble.artist, Scrobble.track)
            .count()
        )

        stats[year] = (scrobbles, unique_artists, unique_tracks)

    stats = sorted(stats.items())

    return render_template(
        'stats/unique.html',
        stats=stats
    )
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def get_num_packages_by_week(cls):
        def num_packages():
            new_packages_by_week = cls.get_by_week('new_packages')
            deleted_packages_by_week = cls.get_by_week('deleted_packages')
            first_date = (min(datetime.datetime.strptime(new_packages_by_week[0][0], DATE_FORMAT),
                              datetime.datetime.strptime(deleted_packages_by_week[0][0], DATE_FORMAT))).date()
            cls._cumulative_num_pkgs = 0
            new_pkgs = []
            deleted_pkgs = []
            def build_weekly_stats(week_commences, new_pkg_ids, deleted_pkg_ids):
                num_pkgs = len(new_pkg_ids) - len(deleted_pkg_ids)
                new_pkgs.extend([model.Session.query(model.Package).get(id).name for id in new_pkg_ids])
                deleted_pkgs.extend([model.Session.query(model.Package).get(id).name for id in deleted_pkg_ids])
                cls._cumulative_num_pkgs += num_pkgs
                return (week_commences.strftime(DATE_FORMAT),
                        num_pkgs, cls._cumulative_num_pkgs)
            week_ends = first_date
            today = datetime.date.today()
            new_package_week_index = 0
            deleted_package_week_index = 0
            weekly_numbers = [] # [(week_commences, num_packages, cumulative_num_pkgs])]
            while week_ends <= today:
                week_commences = week_ends
                week_ends = week_commences + datetime.timedelta(days=7)
                if datetime.datetime.strptime(new_packages_by_week[new_package_week_index][0], DATE_FORMAT).date() == week_commences:
                    new_pkg_ids = new_packages_by_week[new_package_week_index][1]
                    new_package_week_index += 1
                else:
                    new_pkg_ids = []
                if datetime.datetime.strptime(deleted_packages_by_week[deleted_package_week_index][0], DATE_FORMAT).date() == week_commences:
                    deleted_pkg_ids = deleted_packages_by_week[deleted_package_week_index][1]
                    deleted_package_week_index += 1
                else:
                    deleted_pkg_ids = []
                weekly_numbers.append(build_weekly_stats(week_commences, new_pkg_ids, deleted_pkg_ids))
            # just check we got to the end of each count
            assert new_package_week_index == len(new_packages_by_week)
            assert deleted_package_week_index == len(deleted_packages_by_week)
            return weekly_numbers
        if cache_enabled:
            week_commences = cls.get_date_week_started(datetime.date.today())
            key = 'number_packages_%s' + week_commences.strftime(DATE_FORMAT)
            num_packages = our_cache.get_value(key=key,
                                               createfunc=num_packages)
        else:
            num_packages = num_packages()
        return num_packages
项目:thunderingplains    作者:lyctc    | 项目源码 | 文件源码
def post_add_group_member_sql(gid, uid):
    q = DBS.query(User).filter_by(uid=uid)
    gid_old = q.first().gid
    q.update({'gid': gid})

    q = DBS.query(User).filter_by(gid=gid_old)
    r = q.first()

    if not r:
        q = DBS.query(Group).filter_by(gid=gid_old)
        q.delete()
        q = DBS.query(Role_assigned).filter_by(gid=gid_old)
        q.delete()

        pid_old = DBS.query(func.min(Project.pid)).filter_by(gid=gid_old) \
            .first()[0]
        while pid_old:
            r = DBS.query(func.max(Project.pid)).filter_by(gid=gid).first()[0]
            pid = 1 if not r else r + 1

            q = DBS.query(Project).filter_by(gid=gid_old, pid=pid_old)
            q.update({'gid': gid, 'pid': pid})

            q = DBS.query(Task).filter_by(gid=gid_old, pid=pid_old)
            q.update({'gid': gid, 'pid': pid})

            q = DBS.query(Assign).filter_by(gid=gid_old, pid=pid_old)
            q.update({'gid': gid, 'pid': pid})

            q = DBS.query(Comment).filter_by(gid=gid_old, pid=pid_old)
            q.update({'gid': gid, 'pid': pid})

            q = DBS.query(Workflow).filter_by(gid=gid_old, pid=pid_old)
            q.update({'gid': gid, 'pid': pid})

            q = DBS.query(Stage).filter_by(gid=gid_old, pid=pid_old)
            q.update({'gid': gid, 'pid': pid})

            q = DBS.query(Log).filter_by(gid=gid_old, pid=pid_old)
            q.update({'gid': gid, 'pid': pid})

            pid_old = DBS.query(func.min(Project.pid)).filter_by(gid=gid_old) \
                .first()[0]

    transaction.commit()


# remove member from group and add them to their own group
项目:radar    作者:renalreg    | 项目源码 | 文件源码
def patients_by_group_date(group=None, group_type=None, interval='month'):
    """
    Number of patients in each group over time.
    """

    query = db.session.query(
        GroupPatient.group_id,
        GroupPatient.patient_id,
        func.min(GroupPatient.from_date).label('date')
    )
    query = query.join(GroupPatient.patient)
    query = query.filter(Patient.test == false())

    if group is not None and group.type == 'SYSTEM':
        query = query.filter(Patient.current(group) == true())
    else:
        query = query.filter(Patient.current() == true())

    # Filter by patients beloning to the specified group
    if group is not None:
        patient_alias = aliased(Patient)
        group_subquery = db.session.query(patient_alias)
        group_subquery = group_subquery.join(patient_alias.group_patients)
        group_subquery = group_subquery.filter(
            patient_alias.id == Patient.id,
            GroupPatient.group == group,
        )
        group_subquery = group_subquery.exists()

        query = query.filter(group_subquery)

    # Filter by group type
    if group_type is not None:
        query = query.join(GroupPatient.group)
        query = query.filter(Group.type == group_type)

    query = query.group_by(GroupPatient.group_id, GroupPatient.patient_id)
    query = query.cte()

    results = _get_results(query, interval)

    return results