我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用sqlalchemy.func.min()。
def Get(icpe, from_date = (datetime.now() - timedelta(days=7)), to_date = datetime.now()): power_data = SQL.session.query(PowerModel, \ label('low', func.min(PowerModel.low)), label('high', func.max(PowerModel.high)), label('total', func.sum(PowerModel.average)), label('date', PowerModel.date)).\ join(iCPEModel).\ filter(iCPEModel.mac_address == icpe).\ filter(PowerModel.date > from_date).\ filter(PowerModel.date < to_date).\ group_by(PowerModel.date).all() if not power_data: return False ret_json = {'icpe' : icpe} ret_json['power'] = [] for data in power_data: ret_json['power'].append({'date' : str(data.date), 'low' : data.low, 'high' : data.high, 'total' : data.total}) return ret_json
def latest(icpe): heat_data = SQL.session.query(HeatModel, \ label('low', func.min(HeatModel.low)), label('high', func.max(HeatModel.high)), label('total', func.sum(HeatModel.average)), label('date', HeatModel.date)).\ join(iCPEModel).\ filter(iCPEModel.mac_address == icpe).\ order_by(HeatModel.date.desc()).\ group_by(HeatModel.date).first() if not heat_data: return False return {'icpe' : icpe, 'date' : str(heat_data.date), 'low' : power_data.low,\ 'high' : heat_data.high, 'total' : power_data.total}
def get(icpe, from_date = (datetime.now() - timedelta(days=7)), to_date = datetime.now()): heat_data = SQL.session.query(HeatModel, \ label('low', func.min(HeatModel.low)), label('high', func.max(HeatModel.high)), label('total', func.sum(HeatModel.average)), label('date', HeatModel.date)).\ join(iCPEModel).\ filter(iCPEModel.mac_address == icpe).\ filter(HeatModel.date > from_date).\ filter(HeatModel.date < to_date).\ group_by(HeatModel.date).all() if not heat_data: return False ret_json = {'icpe' : icpe} ret_json['heat'] = [] for data in heat_data: ret_json['heat'].append({'date' : str(data.date), 'low' : data.low, 'high' : data.high, 'total' : data.total}) return ret_json
def latest(icpe): power_data = SQL.session.query(PowerModel, \ label('low', func.min(PowerModel.low)), label('high', func.max(PowerModel.high)), label('total', func.sum(PowerModel.average)), label('date', PowerModel.date)).\ join(iCPEModel).\ filter(iCPEModel.mac_address == icpe).\ order_by(PowerModel.date.desc()).\ group_by(PowerModel.date).first() if not power_data: return False return {'icpe' : icpe, 'date' : str(power_data.date), 'low' : power_data.low,\ 'high' : power_data.high, 'total' : power_data.total}
def estimate_remaining_time(session, spawn_id, seen): first, last = get_first_last(session, spawn_id) if not first: return 90, 1800 if seen > last: last = seen elif seen < first: first = seen if last - first > 1710: estimates = [ time_until_time(x, seen) for x in (first + 90, last + 90, first + 1800, last + 1800)] return min(estimates), max(estimates) soonest = last + 90 latest = first + 1800 return time_until_time(soonest, seen), time_until_time(latest, seen)
def get_new_packages(cls): ''' @return: Returns list of new pkgs and date when they were created, in format: [(id, date_ordinal), ...] ''' def new_packages(): # Can't filter by time in select because 'min' function has to # be 'for all time' else you get first revision in the time period. package_revision = table('package_revision') revision = table('revision') s = select([package_revision.c.id, func.min(revision.c.timestamp)], from_obj=[package_revision.join(revision)]).group_by(package_revision.c.id).order_by(func.min(revision.c.timestamp)) res = model.Session.execute(s).fetchall() # [(id, datetime), ...] res_pickleable = [] for pkg_id, created_datetime in res: res_pickleable.append((pkg_id, created_datetime.toordinal())) return res_pickleable if cache_enabled: week_commences = cls.get_date_week_started(datetime.date.today()) key = 'all_new_packages_%s' + week_commences.strftime(DATE_FORMAT) new_packages = our_cache.get_value(key=key, createfunc=new_packages) else: new_packages = new_packages() return new_packages
def dashboard(period=None): period, days = PERIODS.get(period, PERIODS['1w']) col_year = func.extract('year', Scrobble.played_at) year_from, year_to = db.session.query(func.min(col_year), func.max(col_year)).first() year_from, year_to = int(year_from), int(year_to) return render_template( 'dashboard.html', period=period, year_min=year_from, year_max=year_to, )
def get_session_stats(session): query = session.query(func.min(Sighting.expire_timestamp), func.max(Sighting.expire_timestamp)) if conf.REPORT_SINCE: query = query.filter(Sighting.expire_timestamp > SINCE_TIME) min_max_result = query.one() length_hours = (min_max_result[1] - min_max_result[0]) // 3600 if length_hours == 0: length_hours = 1 # Convert to datetime return { 'start': datetime.fromtimestamp(min_max_result[0]), 'end': datetime.fromtimestamp(min_max_result[1]), 'length_hours': length_hours }
def get_first_last(session, spawn_id): return session.query(func.min(Mystery.first_seconds), func.max(Mystery.last_seconds)) \ .filter(Mystery.spawn_id == spawn_id) \ .filter(Mystery.first_seen > conf.LAST_MIGRATION) \ .first()
def get_deleted_packages(cls): ''' @return: Returns list of deleted pkgs and date when they were deleted, in format: [(id, date_ordinal), ...] ''' def deleted_packages(): # Can't filter by time in select because 'min' function has to # be 'for all time' else you get first revision in the time period. package_revision = table('package_revision') revision = table('revision') s = select([package_revision.c.id, func.min(revision.c.timestamp)], from_obj=[package_revision.join(revision)]).\ where(package_revision.c.state==model.State.DELETED).\ group_by(package_revision.c.id).\ order_by(func.min(revision.c.timestamp)) res = model.Session.execute(s).fetchall() # [(id, datetime), ...] res_pickleable = [] for pkg_id, deleted_datetime in res: res_pickleable.append((pkg_id, deleted_datetime.toordinal())) return res_pickleable if cache_enabled: week_commences = cls.get_date_week_started(datetime.date.today()) key = 'all_deleted_packages_%s' + week_commences.strftime(DATE_FORMAT) deleted_packages = our_cache.get_value(key=key, createfunc=deleted_packages) else: deleted_packages = deleted_packages() return deleted_packages
def get_begin_day(): r = db.session.query( func.min(Article.crawl_date).label('first') ).one() return r.first.date()
def get_first_aid(category): r = db.session.query( func.min(Article.id).label('min_aid') ).filter_by(category=category).one() return r.min_aid
def __init__(self, service): self.service = service self.minReconnectTime = collections.defaultdict(lambda: 0) # UID -> min time.time
def _run(self): """A separate daemon thread that writes back actions scheduled in the db.""" while self.running: with self.service.sessionCtx() as session: try: self.reconnector.step(session) except Exception: self.log.exception("Error reconnecting to a microbot.") try: self.actionWriter.step(session) except Exception: self.log.exception("Write actions exception.") nextActionTime = session.query( func.min(db.Action.scheduled_at) ).filter( db.Action.prev_action == None ).scalar() now = datetime.datetime.utcnow() if nextActionTime is None: waitTime = 30 elif nextActionTime < now: waitTime = 0; else: waitTime = (nextActionTime - now).seconds waitTime = min(max(waitTime, 1), 10) self._wakeup.wait(waitTime) self._wakeup.clear()
def minmax_tasks(self): """Find tasks minimum and maximum @return: unix timestamps of minimum and maximum """ session = self.Session() try: _min = session.query(func.min(Task.started_on).label("min")).first() _max = session.query(func.max(Task.completed_on).label("max")).first() return int(_min[0].strftime("%s")), int(_max[0].strftime("%s")) except SQLAlchemyError as e: log.debug("Database error counting tasks: {0}".format(e)) return 0 finally: session.close()
def recruited_date(cls, group=None): q = select([func.min(GroupPatient.from_date)]) q = q.select_from(join(GroupPatient, Group, GroupPatient.group_id == Group.id)) q = q.where(GroupPatient.patient_id == cls.id) if group is not None: q = q.where(Group.id == group.id) else: q = q.where(Group.type == GROUP_TYPE.SYSTEM) q = q.as_scalar() return q
def primary_patient_number(self): patient_numbers = [x for x in self.patient_numbers if x.number_group.is_recruitment_number_group] if len(patient_numbers) == 0: return None def by_modified_date(x): return (x.modified_date or datetime.min, x.id) return max(patient_numbers, key=by_modified_date)
def latest_demographics(self): patient_demographics = self.patient_demographics if len(patient_demographics) == 0: return None def by_modified_date(x): return (x.modified_date or datetime.min, x.id) return max(patient_demographics, key=by_modified_date)
def patients_by_recruitment_group_date(group, interval='month'): """ Number of patients recruited by each group over time. """ group_id_c = func.first_value(GroupPatient.created_group_id)\ .over(partition_by=GroupPatient.patient_id, order_by=GroupPatient.from_date)\ .label('group_id') date_c = func.min(GroupPatient.from_date)\ .over(partition_by=GroupPatient.patient_id)\ .label('date') query = db.session.query(GroupPatient.patient_id, group_id_c, date_c) query = query.distinct() query = query.join(GroupPatient.patient) query = query.filter(GroupPatient.group_id == group.id) query = query.filter(Patient.test == false()) if group.type == GROUP_TYPE.SYSTEM: query = query.filter(Patient.current(group) == true()) else: query = query.filter(Patient.current() == true()) query = query.cte() results = _get_results(query, interval) return results
def sort_by_group(group): q = db.session.query(func.min(GroupPatient.from_date)) q = q.filter(GroupPatient.patient_id == Patient.id) q = q.filter(GroupPatient.group_id == group.id) return q.as_scalar()
def top_yearly_artists(): scrobbles = func.count(Scrobble.artist).label('count') charts = {} col_year = func.extract('year', Scrobble.played_at) year_from, year_to = db.session.query(func.min(col_year), func.max(col_year)).first() year_from, year_to = int(year_from), int(year_to) stat_count = 1000 show_count = 100 for year in range(year_from, year_to + 1): time_from = datetime.datetime(year, 1, 1) time_to = datetime.datetime(year, 12, 31, 23, 59, 59, 999999) charts[year] = ( db.session.query(Scrobble.artist, scrobbles) .filter(Scrobble.user_id == current_user.id) .filter(Scrobble.played_at >= time_from, Scrobble.played_at <= time_to) .group_by(Scrobble.artist) .order_by(scrobbles.desc()) .limit(stat_count) .all() ) position_changes = {} for year in range(year_from + 1, year_to + 1): chart = {artist: position for position, (artist, scrobbles) in enumerate(charts[year], 1)} prev_chart = { artist: position for position, (artist, scrobbles) in enumerate(charts[year - 1], 1) } prev_charts = (chart for chart_year, chart in charts.items() if chart_year < year) prev_artists = {artist for chart in prev_charts for (artist, scrobbles) in chart} if year not in position_changes: position_changes[year] = {} for artist, data in chart.items(): if artist in prev_chart: position_changes[year][artist] = prev_chart[artist] - chart[artist] elif artist not in prev_artists: position_changes[year][artist] = 'new' charts = sorted(charts.items()) return render_template( 'charts/top_yearly_artists.html', charts=charts, position_changes=position_changes, show_count=show_count )
def unique_monthly(): stats = {} col_year = func.extract('year', Scrobble.played_at) year_from, year_to = db.session.query(func.min(col_year), func.max(col_year)).first() year_from, year_to = int(year_from), int(year_to) for year in range(year_from, year_to + 1): for month in range(1, 13): time_from = datetime.datetime(year, month, 1) time_to = time_from + datetime.timedelta(days=calendar.monthrange(year, month)[1]) scrobbles = ( db.session.query(Scrobble) .filter(Scrobble.user_id == current_user.id) .filter(Scrobble.played_at >= time_from, Scrobble.played_at <= time_to) .count() ) unique_artists = ( db.session.query(Scrobble.artist) .filter( Scrobble.user_id == current_user.id, Scrobble.played_at >= time_from, Scrobble.played_at <= time_to ) .group_by(Scrobble.artist) .count() ) unique_tracks = ( db.session.query(Scrobble.artist, Scrobble.track) .filter(Scrobble.user_id == current_user.id) .filter(Scrobble.played_at >= time_from, Scrobble.played_at <= time_to) .group_by(Scrobble.artist, Scrobble.track) .count() ) key = '{:d}-{:02d}'.format(year, month) stats[key] = (scrobbles, unique_artists, unique_tracks) stats = sorted(stats.items()) return render_template( 'stats/unique.html', stats=stats )
def unique_yearly(): stats = {} col_year = func.extract('year', Scrobble.played_at) year_from, year_to = db.session.query(func.min(col_year), func.max(col_year)).first() year_from, year_to = int(year_from), int(year_to) for year in range(year_from, year_to + 1): time_from = datetime.datetime(year, 1, 1) time_to = datetime.datetime(year, 12, 31, 23, 59, 59, 999999) # select extract(year from played_at) as year, count(id) from scrobbles group by year; scrobbles = ( db.session.query(Scrobble) .filter(Scrobble.user_id == current_user.id) .filter(Scrobble.played_at >= time_from, Scrobble.played_at <= time_to) .count() ) # select extract(year from played_at) as year, sum(1) from scrobbles group by year, artist; unique_artists = ( db.session.query(Scrobble.artist) .filter(Scrobble.user_id == current_user.id) .filter(Scrobble.played_at >= time_from, Scrobble.played_at <= time_to) .group_by(Scrobble.artist) .count() ) unique_tracks = ( db.session.query(Scrobble.artist, Scrobble.track) .filter(Scrobble.user_id == current_user.id) .filter(Scrobble.played_at >= time_from, Scrobble.played_at <= time_to) .group_by(Scrobble.artist, Scrobble.track) .count() ) stats[year] = (scrobbles, unique_artists, unique_tracks) stats = sorted(stats.items()) return render_template( 'stats/unique.html', stats=stats )
def get_num_packages_by_week(cls): def num_packages(): new_packages_by_week = cls.get_by_week('new_packages') deleted_packages_by_week = cls.get_by_week('deleted_packages') first_date = (min(datetime.datetime.strptime(new_packages_by_week[0][0], DATE_FORMAT), datetime.datetime.strptime(deleted_packages_by_week[0][0], DATE_FORMAT))).date() cls._cumulative_num_pkgs = 0 new_pkgs = [] deleted_pkgs = [] def build_weekly_stats(week_commences, new_pkg_ids, deleted_pkg_ids): num_pkgs = len(new_pkg_ids) - len(deleted_pkg_ids) new_pkgs.extend([model.Session.query(model.Package).get(id).name for id in new_pkg_ids]) deleted_pkgs.extend([model.Session.query(model.Package).get(id).name for id in deleted_pkg_ids]) cls._cumulative_num_pkgs += num_pkgs return (week_commences.strftime(DATE_FORMAT), num_pkgs, cls._cumulative_num_pkgs) week_ends = first_date today = datetime.date.today() new_package_week_index = 0 deleted_package_week_index = 0 weekly_numbers = [] # [(week_commences, num_packages, cumulative_num_pkgs])] while week_ends <= today: week_commences = week_ends week_ends = week_commences + datetime.timedelta(days=7) if datetime.datetime.strptime(new_packages_by_week[new_package_week_index][0], DATE_FORMAT).date() == week_commences: new_pkg_ids = new_packages_by_week[new_package_week_index][1] new_package_week_index += 1 else: new_pkg_ids = [] if datetime.datetime.strptime(deleted_packages_by_week[deleted_package_week_index][0], DATE_FORMAT).date() == week_commences: deleted_pkg_ids = deleted_packages_by_week[deleted_package_week_index][1] deleted_package_week_index += 1 else: deleted_pkg_ids = [] weekly_numbers.append(build_weekly_stats(week_commences, new_pkg_ids, deleted_pkg_ids)) # just check we got to the end of each count assert new_package_week_index == len(new_packages_by_week) assert deleted_package_week_index == len(deleted_packages_by_week) return weekly_numbers if cache_enabled: week_commences = cls.get_date_week_started(datetime.date.today()) key = 'number_packages_%s' + week_commences.strftime(DATE_FORMAT) num_packages = our_cache.get_value(key=key, createfunc=num_packages) else: num_packages = num_packages() return num_packages
def post_add_group_member_sql(gid, uid): q = DBS.query(User).filter_by(uid=uid) gid_old = q.first().gid q.update({'gid': gid}) q = DBS.query(User).filter_by(gid=gid_old) r = q.first() if not r: q = DBS.query(Group).filter_by(gid=gid_old) q.delete() q = DBS.query(Role_assigned).filter_by(gid=gid_old) q.delete() pid_old = DBS.query(func.min(Project.pid)).filter_by(gid=gid_old) \ .first()[0] while pid_old: r = DBS.query(func.max(Project.pid)).filter_by(gid=gid).first()[0] pid = 1 if not r else r + 1 q = DBS.query(Project).filter_by(gid=gid_old, pid=pid_old) q.update({'gid': gid, 'pid': pid}) q = DBS.query(Task).filter_by(gid=gid_old, pid=pid_old) q.update({'gid': gid, 'pid': pid}) q = DBS.query(Assign).filter_by(gid=gid_old, pid=pid_old) q.update({'gid': gid, 'pid': pid}) q = DBS.query(Comment).filter_by(gid=gid_old, pid=pid_old) q.update({'gid': gid, 'pid': pid}) q = DBS.query(Workflow).filter_by(gid=gid_old, pid=pid_old) q.update({'gid': gid, 'pid': pid}) q = DBS.query(Stage).filter_by(gid=gid_old, pid=pid_old) q.update({'gid': gid, 'pid': pid}) q = DBS.query(Log).filter_by(gid=gid_old, pid=pid_old) q.update({'gid': gid, 'pid': pid}) pid_old = DBS.query(func.min(Project.pid)).filter_by(gid=gid_old) \ .first()[0] transaction.commit() # remove member from group and add them to their own group
def patients_by_group_date(group=None, group_type=None, interval='month'): """ Number of patients in each group over time. """ query = db.session.query( GroupPatient.group_id, GroupPatient.patient_id, func.min(GroupPatient.from_date).label('date') ) query = query.join(GroupPatient.patient) query = query.filter(Patient.test == false()) if group is not None and group.type == 'SYSTEM': query = query.filter(Patient.current(group) == true()) else: query = query.filter(Patient.current() == true()) # Filter by patients beloning to the specified group if group is not None: patient_alias = aliased(Patient) group_subquery = db.session.query(patient_alias) group_subquery = group_subquery.join(patient_alias.group_patients) group_subquery = group_subquery.filter( patient_alias.id == Patient.id, GroupPatient.group == group, ) group_subquery = group_subquery.exists() query = query.filter(group_subquery) # Filter by group type if group_type is not None: query = query.join(GroupPatient.group) query = query.filter(Group.type == group_type) query = query.group_by(GroupPatient.group_id, GroupPatient.patient_id) query = query.cte() results = _get_results(query, interval) return results