我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用sqlalchemy.sql.func.max()。
def fetch_buy_or_sell_item_market_stats(item_id, is_buy_order): stats = MarketOrder \ .query \ .filter(MarketOrder.expire_time > datetime.utcnow()) \ .filter_by(item_id=item_id, is_buy_order=is_buy_order) \ .with_entities( func.sum(MarketOrder.amount).label('total_volume'), func.avg(MarketOrder.price).label('price_average'), func.min(MarketOrder.price).label('price_minimum'), func.max(MarketOrder.price).label('price_maximum'), func.count(MarketOrder.id).label('order_count'), ).one() return { 'total_volume': stats.total_volume if stats.total_volume else 0, 'price_average': round(float(stats.price_average), 2) if stats.price_average else 0, 'price_minimum': stats.price_minimum if stats.price_minimum else 0, 'price_maximum': stats.price_maximum if stats.price_maximum else 0, 'order_count': stats.order_count, }
def register(): """Register new user.""" form = RegisterForm(request.form, csrf_enabled=False) if form.validate_on_submit(): user_id = (db.session.query(func.max(User.user_id)).scalar() or 0)+1 User.create( user_id=user_id, username=form.username.data, email=form.email.data, password=form.password.data, first_name=form.first_name.data, last_name=form.last_name.data, active=True ) flash("Thanks for registering! You're now logged in.", 'success') return redirect(url_for('public.home')) else: flash_errors(form) return render_template('public/register.html', form=form)
def get_max_group_id(self, session): """Return the maximumn group_id of table `article`. Parameters ---------- session : object A SQLAlchemy Session instance. Returns ------- int The maximum group_id of table `article`. """ group_id = session.query(func.max(Article.group_id)).scalar() return group_id if group_id is not None else 0
def last_generation(run_id): """Finds latest generation present in database. Args: run_id (str): identification string for run. Returns: Last generation(int) to be included in database. """ return session.query(func.max(Material.generation)).filter( Material.run_id == run_id, )[0][0]
def mls_draft_format(self, dtype): """ Return number of rounds in draft, given draft type. :param dtype: Draft type (AcquisitionType) :return: """ return self.session.query(func.max(PlayerDrafts.round), func.max(PlayerDrafts.selection)).filter( PlayerDrafts.path == dtype, PlayerDrafts.year == self.year).one()
def random_data(): rng = settings.Session.query(func.min(Comment163.id), func.max(Comment163.id)).all()[0] data = [] for i in range(12): v = random.uniform(rng[0], rng[1]) d = settings.engine.execute("select txt,liked,a.author,song_name,a.song_id,b.author from comment163 a inner join music163 b on a.song_id= b.song_id where a.id>" +str(v) + " limit 1").fetchone() data.append({"txt": d[0],"like": d[1] ,"author": d[2], "song" :{"name":d[3], "author": d[5], "id": d[4]}}) return data
def revision(self): return str(self.session.query(func.max(self.PostRef.updated_at)).first()[0])
def get_date_range_from_analysis(self): result = None #log.debug("get_date_range_from_analysis()") try: result = self.session.query(func.max(Exercise_Analysis.date).\ label("end_date"), func.min(Exercise_Analysis.date).\ label("start_date")).one() except Exception as e: log.error("---------------------------------------") log.error("Error - get_date_range_from_analysis(): %s", e) log.debug("get_date_range_from_analysis(): %s", result) return result
def _get_max_flavor_id(context): return context.session.query(func.max(api_models.Flavors.id)).one()[0]
def update_next_year(): """ ?????????????????? """ step = 100 # ???100??? session = get_db_session() # 2016??? mappings2016 = {} min_id, max_id = session.query(func.min(Pinkunhu2016.id), func.max(Pinkunhu2016.id)).one() while min_id <= max_id: objs = session.query(Pinkunhu2016).filter(Pinkunhu2016.id >= min_id, Pinkunhu2016.id <= min_id + step).all() for item in objs: mappings2016[item.card_number] = True mappings2016[item.card_number+'year_total_income'] = item.year_total_income mappings2016[item.card_number+'person_year_total_income'] = item.person_year_total_income min_id += step print 'min_id:%s' % min_id # 2015??? mappings2015 = {} min_id, max_id = session.query(func.min(Pinkunhu2015.id), func.max(Pinkunhu2015.id)).one() while min_id <= max_id: objs = session.query(Pinkunhu2015).filter(Pinkunhu2015.id >= min_id, Pinkunhu2015.id <= min_id + step).all() for item in objs: # ??2015??? mappings2015[item.card_number] = True mappings2015[item.card_number+'year_total_income'] = item.year_total_income mappings2015[item.card_number+'person_year_total_income'] = item.person_year_total_income # ??????? if mappings2016.get(item.card_number): item.ny_is_poor = 1 item.ny_total_income = mappings2016[item.card_number+'year_total_income'] item.ny_person_income = mappings2016[item.card_number+'person_year_total_income'] else: print item.id, '???' min_id += step print 'min_id:%s' % min_id # 2014??? min_id, max_id = session.query(func.min(Pinkunhu2014.id), func.max(Pinkunhu2014.id)).one() while min_id <= max_id: objs = session.query(Pinkunhu2014).filter(Pinkunhu2014.id >= min_id, Pinkunhu2014.id <= min_id + step).all() for item in objs: # ??????? if mappings2015.get(item.card_number): item.ny_is_poor = 1 item.ny_total_income = mappings2015[item.card_number+'year_total_income'] item.ny_person_income = mappings2015[item.card_number+'person_year_total_income'] else: print item.id, '???' min_id += step print 'min_id:%s' % min_id session.flush() session.commit()
def init_table(table, year, kind): """ ???? :param table: :param year: :param kind: :return: """ step = 100 # ???100??? session = get_db_session() mongo = get_db() # 2016??? mappings2016 = {} min_id, max_id = session.query(func.min(table.id), func.max(table.id)).one() if not min_id: return min_id -= 1 min_id = 2543595 while min_id < max_id: objs = session.query(table).filter(table.id > min_id).limit(step) for item in objs: min_id = item.id fid = item.sid data = {} for col in inspect(table).attrs: data[col.key] = getattr(item, col.key) print fid print '-------------------------' if kind in ['bangfuren', 'pinkunhu']: mongo.families.update( {'family_id': fid}, {'$set': { '%s.%s' % (year, kind): data } }, upsert=True ) elif kind in ['pinkunjiating']: mongo.families.update( {'family_id': fid}, {'$push': { '%s.%s' % (year, kind): data } }, upsert=True ) print 'min_id:%s' % min_id