我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.desc()。
def crawl(instance_id): instance = Instance.query.filter_by(id=instance_id).first() if instance.lock: return instance.lock = True save(instance) try: toots_query = Toot.query.filter_by(instance_id=instance_id) toots_count = toots_query.count() if toots_count == 0: since_id = None else: last_toot = toots_query.order_by(desc(Toot.id)).first() since_id = last_toot.id get_toots(instance, since_id) finally: instance.lock = False save(instance)
def get_revision(self, dataset, revision_id='latest'): with self.session_scope() as session: if revision_id == 'latest': ret = session.query(DatasetRevision).filter_by(dataset_id=dataset)\ .order_by(desc(DatasetRevision.revision)).first() elif revision_id == 'successful': ret = session.query(DatasetRevision).filter_by( dataset_id=dataset, status=STATE_SUCCESS)\ .order_by(desc(DatasetRevision.revision)).first() else: if not isinstance(revision_id, int): return None ret = session.query(DatasetRevision).filter_by( dataset_id=dataset, revision=revision_id).first() if ret is not None: return FlowRegistry.object_as_dict(ret) return None
def editcomment(project, comment_id): comment = Comment.get_by_id(comment_id) form = CommentForm(request.form, comment=comment.content) if current_user != comment.owner: flash(_('You are not allowed to edit this comment'), 'error') if 'return_url' in request.args: return redirect(urllib.unquote(request.args['return_url'])) else: return redirect(url_for('branches.view', project=project.name, branch='master', filename='index')) if request.method == 'POST' and form.validate(): comment.content = form.comment.data db.session.commit() flash(_('Comment modified successfully'), 'info') if 'return_url' in request.args: return redirect(urllib.unquote(request.args['return_url'])) else: return redirect(url_for('branches.view', project=project.name, branch='master', filename='index')) threads = (Thread.query.filter_by(id=comment.thread.id) .order_by(desc(Thread.posted_at))) return render_template('threads/newcomment.html', form=form)
def _filter_by(self, model, limit=None, offset=0, yielded=False, last_id=None, fulltextsearch=None, desc=False, orderby='id', **filters): """Filter by using several arguments and ordering items.""" query = self.create_context(filters, fulltextsearch, model) if last_id: query = query.filter(model.id > last_id) query = self._set_orderby_desc(query, model, limit, last_id, offset, desc, orderby) else: query = self._set_orderby_desc(query, model, limit, last_id, offset, desc, orderby) if yielded: limit = limit or 1 return query.yield_per(limit) return query.all()
def test_oc(): a = asc('a') b = desc('a') c = asc('b') n = nullslast(desc('a')) a = OC(a) b = OC(b) c = OC(c) n = OC(n) assert str(a) == str(OC('a')) assert a.is_ascending assert not b.is_ascending assert not n.reversed.reversed.is_ascending assert str(a.element) == str(b.element) == str(n.element) assert str(a) == str(b.reversed) assert str(n.reversed.reversed) == str(n) assert a.name == 'a' assert n.name == 'a' assert n.quoted_full_name == 'a' assert repr(n) == '<OC: a DESC NULLS LAST>'
def get_latest_failed_jobs(context): jobs = [] query = context.session.query(models.Job.type, models.Job.resource_id, sql.func.count(models.Job.id)) query = query.group_by(models.Job.type, models.Job.resource_id) for job_type, resource_id, count in query: _query = context.session.query(models.Job) _query = _query.filter_by(type=job_type, resource_id=resource_id) _query = _query.order_by(sql.desc('timestamp')) # when timestamps of job entries are the same, sort entries by status # so "Fail" job is placed before "New" and "Success" jobs _query = _query.order_by(sql.asc('status')) latest_job = _query[0].to_dict() if latest_job['status'] == constants.JS_Fail: jobs.append(latest_job) return jobs
def setup_mappers(cls): User, users = cls.classes.User, cls.tables.users UserInfo, user_infos = cls.classes.UserInfo, cls.tables.user_infos Address, addresses = cls.classes.Address, cls.tables.addresses Thing, things = cls.classes.Thing, cls.tables.things mapper(User, users, properties={ 'addresses': relationship( Address, backref=backref('user', lazy="bulk"), lazy="bulk", order_by=[desc(addresses.c.email_address)] ), 'children': relationship(User, backref=backref('parent', remote_side=[users.c.id], lazy="bulk"), lazy="bulk"), 'user_info': relationship(UserInfo, lazy="bulk", backref=backref('user', lazy="bulk"), uselist=False), 'things': relationship(Thing, secondary=cls.tables.user_to_things, lazy="bulk"), }) mapper(Address, addresses) mapper(UserInfo, user_infos) mapper(Thing, things, properties={ 'users': relationship(User, secondary=cls.tables.user_to_things, lazy="bulk"), }) configure_mappers()
def test_predictions(self): with self.now: self.gen_data(offset=10) records = HygroRecord.query.filter(HygroRecord.sensor_uuid == 1) \ .order_by(desc(HygroRecord.timestamp)).all() self.assertEqual(21, len(records)) poly = analytics._get_polynomial(1, 1468810074) for r in self.all_records[:-2]: prediction = analytics._predict_at( float(r.timestamp), poly, 1468810074) # err = prediction - float(r.value) # print(prediction, r.value, err) # Test prediction is ~20% accurate self.assertFalse(prediction > float(r.value) * 1.2) self.assertFalse(prediction < float(r.value) * 0.8) # print(prediction, r.value) self.assertEqual(analytics._predict_next_watering( poly, 1468810074), 1468889274)
def _get_last_watering_timestamp(sensor_uuid): """Try to find last watering from the last 500 records of the sensor. A diffenrece of `watering_thresold` must be found between now and then to be considered a watering.""" watering_thresold = 50 # Minimum fluctuation to consider a watering records = HygroRecord.query \ .filter(HygroRecord.sensor_uuid == sensor_uuid) \ .order_by(desc(HygroRecord.timestamp)).limit(5000).all() last_record = records[0] for current in records[1:]: if current.value < last_record.value \ and last_record.value - current.value >= watering_thresold: return last_record.timestamp last_record = current
def _unpick_search(sort, allowed_fields=None, total=None): ''' This is a helper function that takes a sort string eg 'name asc, last_modified desc' and returns a list of split field order eg [('name', 'asc'), ('last_modified', 'desc')] allowed_fields can limit which field names are ok. total controls how many sorts can be specifed ''' sorts = [] split_sort = sort.split(',') for part in split_sort: split_part = part.strip().split() field = split_part[0] if len(split_part) > 1: order = split_part[1].lower() else: order = 'asc' if allowed_fields: if field not in allowed_fields: raise ValidationError('Cannot sort by field `%s`' % field) if order not in ['asc', 'desc']: raise ValidationError('Invalid sort direction `%s`' % order) sorts.append((field, order)) if total and len(sorts) > total: raise ValidationError( 'Too many sort criteria provided only %s allowed' % total) return sorts
def files_on_date(date): if not DAY_REGEXP.match(date): raise BadRequest('Invalid date format') order_by = request.args.get('order_by', 'test_start_time') order = request.args.get('order', 'desc') if order.lower() not in ('desc', 'asc'): raise BadRequest('order must be desc or asc') if order_by not in ('test_start_time', 'probe_cc', 'report_id', 'test_name', 'probe_asn'): raise BadRequest() return render_template('files/list.html', report_files=_files_on_date(date, order_by=order_by, order=order), by='date', order=order, order_by=order_by, current_date=date)
def files_in_country(country_code): if len(country_code) != 2: raise BadRequest('Country code must be two characters') country_code = country_code.upper() order_by = request.args.get('order_by', 'test_start_time') order = request.args.get('order', 'desc') if order.lower() not in ('desc', 'asc'): raise BadRequest() if order_by not in ('test_start_time', 'probe_cc', 'report_id', 'test_name', 'probe_asn'): raise BadRequest() return render_template('files/list.html', report_files=_files_in_country( country_code, order_by=order_by, order=order), by='country', order=order, order_by=order_by, current_country=country_code)
def assignResults(user, assign_id, sort_id): assign = session.query(Assignment).filter( Assignment.id == assign_id).first() if user.admin: if sort_id == 0: posts = session.query(Post).join(Post.user).filter( Post.assignment_id == assign_id).order_by( User.l_name, User.f_name) elif sort_id == 1: posts = session.query(Post).filter( Post.assignment_id == assign_id).order_by(Post.created.desc()) else: posts = session.query(Post).filter(and_( Post.assignment_id == assign_id, Post.user_id == user.id)).order_by(desc(Post.created)).all() return render_template('assignResults.html', user=user, posts=posts, assign=assign, sort_id=sort_id)
def recommendations(): """List of all recommendations; allows users to leave feedback on recommendations.""" current_user = User.query.get(1) query = Recommendation.query.filter(Recommendation.date_provided <= dt.datetime.now(), Recommendation.userbook.has(user_id=current_user_id)) recs_to_date=query.order_by(desc(Recommendation.date_provided)).all() if len(recs_to_date) == 0: recs_to_show = None today_rec = None else: if recs_to_date[0].date_provided.date() == dt.date.today(): recs_to_show = recs_to_date[1:] today_rec = recs_to_date[0] else: recs_to_show = recs_to_date today_rec = False return render_template("recommendation-list.html", recs_to_date=recs_to_show, today_rec=today_rec)
def mac_address_range_find_allocation_counts(context, address=None, use_forbidden_mac_range=False): count = sql_func.count(models.MacAddress.address) query = context.session.query(models.MacAddressRange, count.label("count")).with_lockmode("update") query = query.outerjoin(models.MacAddress) query = query.group_by(models.MacAddressRange.id) query = query.order_by(desc(count)) if address: query = query.filter(models.MacAddressRange.last_address >= address) query = query.filter(models.MacAddressRange.first_address <= address) query = query.filter(models.MacAddressRange.next_auto_assign_mac != -1) if not use_forbidden_mac_range: query = query.filter(models.MacAddressRange.do_not_use == '0') # noqa query = query.limit(1) return query.first()
def get_last_n_records(model, n, sort_key='id', sort_dir='desc'): query = model_query(model) query = query.order_by(desc(model.id)) query = query.limit(n) if sort_dir == 'desc': try: result = query.all() return result except Exception as err: LOG.error("Database exception: %s" % err.message) return [] else: try: result = query.from_self(). \ order_by(getattr(model, sort_key)).all() return result except Exception as err: LOG.error("Database exception: %s" % err.message) return []
def get_activity_periods(instances): r = [] start = 0 running = 0 duration = 0 startstop_events = sorted( [(i, i.snapshots.order_by(models.FuzzerSnapshot.unix_time).first().unix_time, True) for i in instances] + [(i, i.snapshots.order_by(desc(models.FuzzerSnapshot.unix_time)).first().unix_time, False) for i in instances], key=itemgetter(1)) for instance, t, event in startstop_events: if event: if not running: instances = [] start = t instances.append(instance) running += 1 else: running -= 1 if not running: r.append((start, t, instances)) duration += t - start return r, duration
def groupByCategoryAndCount(queryObject,columnName,sampleRate=0.2,numCats=False, includeNone=False): columnObject = getattr(Dataset,columnName) query = ( queryObject.with_entities(columnObject) # choose only the column we care about .filter(func.rand() < sampleRate) # grab random sample of rows .add_columns(func.count(1).label('count')) # add count to response .group_by(columnName) # group by .order_by(desc('count')) # order by the largest first ) if not includeNone: query = query.filter(columnObject.isnot(None)) # filter out NULLs # If no numCats is passed in, show all the groups if numCats: query = query.limit(numCats) # show the top N results #TODO maybe: count 'other column' if numCats, where sum counts all but top numCats fields return ( dict((key,val * (1/sampleRate)) for key, val in # rescale sampled columns to approx. results on full dataset query.all() # actually run the query ) ) # Create a query object for a complex "histogram in SQL" query with custom numerical bin sizes
def get_package_dependents_count(ecosystem_backend, package, db_session=None): """Get number of GitHub projects dependent on the `package`. :param ecosystem_backend: str, Ecosystem backend from `f8a_worker.enums.EcosystemBackend` :param package: str, Package name :param db_session: obj, Database session to use for querying :return: number of dependent projects, or -1 if the information is not available """ if not db_session: storage = StoragePool.get_connected_storage("BayesianPostgres") db_session = storage.session try: count = db_session.query(PackageGHUsage.count).filter(PackageGHUsage.name == package) \ .filter(PackageGHUsage.ecosystem_backend == ecosystem_backend) \ .order_by(desc(PackageGHUsage.timestamp)) \ .first() except SQLAlchemyError: db_session.rollback() raise if count: return count[0] return -1
def get_latest_analysis(ecosystem, package, version, db_session=None): """Get latest analysis for the given EPV.""" if not db_session: storage = StoragePool.get_connected_storage("BayesianPostgres") db_session = storage.session try: return db_session.query(Analysis).\ filter(Ecosystem.name == ecosystem).\ filter(Package.name == package).\ filter(Version.identifier == version).\ order_by(Analysis.started_at.desc()).\ first() except SQLAlchemyError: db_session.rollback() raise
def deleteDupes(coder): """ If a dupe was coded more than once (somehow??) then remove the one that was coded second. """ dupes = db_session.query(SecondPassQueue.article_id, SecondPassQueue.coder_id, func.count(SecondPassQueue.id)).\ filter(SecondPassQueue.coder_id == coder).\ group_by(SecondPassQueue.article_id, SecondPassQueue.coder_id).having(func.count(SecondPassQueue.id) > 1).all() dupes_all = db_session.query(SecondPassQueue).\ filter(SecondPassQueue.article_id.in_([x[0] for x in dupes]), SecondPassQueue.coder_id == coder, SecondPassQueue.coded_dt != None).\ order_by(desc(SecondPassQueue.coded_dt)).all() ## order by descending and delete the first item you come across deleted = [] for aq in dupes_all: if aq.article_id not in deleted: deleted.append(aq.article_id) db_session.delete(aq) db_session.commit()
def index(): yesterday = str(datetime.date.today()-datetime.timedelta(days=1)) day_count = count_day_status.query.filter_by(count_date=yesterday).first() if day_count: back_success_file =day_count.back_file_success back_customers_success =day_count.back_customers_success back_file_failed =day_count.back_file_failed back_customers_failed =day_count.back_customers_failed else: back_success_file =0 back_customers_success =0 back_file_failed =0 back_customers_failed =0 mon_count = count_mon_status.query.order_by(desc(count_mon_status.id)).limit(12).all() customer_count = [] customer_count_customer = [] customer_count_file = [] for mon_item in mon_count: mon_dict = {'y':str(mon_item.count_date),'a':str(mon_item.back_customers),'b':str(mon_item.back_customers_stop)} mon_dict_customer = {'period':str(mon_item.count_date),'platform':str(mon_item.back_customers)} mon_dict_file = {'period':str(mon_item.count_date),'file':str(mon_item.back_file)} customer_count.append(mon_dict) customer_count_customer.append(mon_dict_customer) customer_count_file.append(mon_dict_file) customer_count = customer_count[::-1] customer_count_customer = customer_count_customer[::-1] customer_count_file = customer_count_file[::-1] yes_backfailed = backfailed.query.filter_by(count_date=yesterday).order_by(desc(backfailed.back_failed)).limit(4).all() for i in range(0,int(4-len(yes_backfailed))): yes_backfailed.append(0) return render_template('index.html',back_success_file=back_success_file,back_customers_success=back_customers_success, back_file_failed=back_file_failed,back_customers_failed=back_customers_failed, customer_count=str(customer_count),customer_count_customer=str(customer_count_customer), customer_count_file=str(customer_count_file),yes_backfailed=yes_backfailed)
def backmanage(): if request.method == 'POST': pass else: backarchive_all = backarchives.query.order_by(desc(backarchives.id)).all() return render_template('backarchives.html',backarchives=backarchive_all)
def failed_customer(): backfaileds = backfailed.query.order_by(desc(backfailed.count_date)).all() return render_template('backfailed.html',backfaileds=backfaileds)
def set_user_rankings(session, league_id, day=None): # But is there a better way?? user_l = LeagueUser if day is None else LeagueUserDay l_query = session.query(user_l.username).filter(user_l.league == league_id) l_query = l_query.filter(user_l.day == day) if day is not None else l_query wins_ranking = l_query.order_by(desc(user_l.wins)).all() for i, user in enumerate(wins_ranking): # try user.wins_rank += 1? l_query.filter(user_l.username == user.username).\ update({user_l.wins_rank: i+1}) points_ranking = l_query.order_by(desc(user_l.points)).all() for i, user in enumerate(points_ranking): l_query.filter(user_l.username == user.username). \ update({user_l.points_rank: i + 1}) picks_ranking = l_query.order_by(desc(user_l.picks)).all() for i, user in enumerate(picks_ranking): l_query.filter(user_l.username == user.username). \ update({user_l.picks_rank: i + 1}) bans_ranking = l_query.order_by(desc(user_l.bans)).all() for i, user in enumerate(bans_ranking): l_query.filter(user_l.username == user.username) . \ update({user_l.bans_rank: i + 1}) return
def show_entries(): query = db.query(Toot, Account) query = query.join(Account.id, Toot.account_id) query = query.order_by(desc(Toot.creation_date)) query = query.limit(10) return render_template('index.html', toots=query.all())
def _create_nullsfirst(cls, column): """Produce the ``NULLS FIRST`` modifier for an ``ORDER BY`` expression. :func:`.nullsfirst` is intended to modify the expression produced by :func:`.asc` or :func:`.desc`, and indicates how NULL values should be handled when they are encountered during ordering:: from sqlalchemy import desc, nullsfirst stmt = select([users_table]).\\ order_by(nullsfirst(desc(users_table.c.name))) The SQL expression from the above would resemble:: SELECT id, name FROM user ORDER BY name DESC NULLS FIRST Like :func:`.asc` and :func:`.desc`, :func:`.nullsfirst` is typically invoked from the column expression itself using :meth:`.ColumnElement.nullsfirst`, rather than as its standalone function version, as in:: stmt = (select([users_table]). order_by(users_table.c.name.desc().nullsfirst()) ) .. seealso:: :func:`.asc` :func:`.desc` :func:`.nullslast` :meth:`.Select.order_by` """ return UnaryExpression( _literal_as_label_reference(column), modifier=operators.nullsfirst_op, wraps_column_expression=False)
def _create_nullslast(cls, column): """Produce the ``NULLS LAST`` modifier for an ``ORDER BY`` expression. :func:`.nullslast` is intended to modify the expression produced by :func:`.asc` or :func:`.desc`, and indicates how NULL values should be handled when they are encountered during ordering:: from sqlalchemy import desc, nullslast stmt = select([users_table]).\\ order_by(nullslast(desc(users_table.c.name))) The SQL expression from the above would resemble:: SELECT id, name FROM user ORDER BY name DESC NULLS LAST Like :func:`.asc` and :func:`.desc`, :func:`.nullslast` is typically invoked from the column expression itself using :meth:`.ColumnElement.nullslast`, rather than as its standalone function version, as in:: stmt = select([users_table]).\\ order_by(users_table.c.name.desc().nullslast()) .. seealso:: :func:`.asc` :func:`.desc` :func:`.nullsfirst` :meth:`.Select.order_by` """ return UnaryExpression( _literal_as_label_reference(column), modifier=operators.nullslast_op, wraps_column_expression=False)
def _create_desc(cls, column): """Produce a descending ``ORDER BY`` clause element. e.g.:: from sqlalchemy import desc stmt = select([users_table]).order_by(desc(users_table.c.name)) will produce SQL as:: SELECT id, name FROM user ORDER BY name DESC The :func:`.desc` function is a standalone version of the :meth:`.ColumnElement.desc` method available on all SQL expressions, e.g.:: stmt = select([users_table]).order_by(users_table.c.name.desc()) :param column: A :class:`.ColumnElement` (e.g. scalar SQL expression) with which to apply the :func:`.desc` operation. .. seealso:: :func:`.asc` :func:`.nullsfirst` :func:`.nullslast` :meth:`.Select.order_by` """ return UnaryExpression( _literal_as_label_reference(column), modifier=operators.desc_op, wraps_column_expression=False)
def _create_asc(cls, column): """Produce an ascending ``ORDER BY`` clause element. e.g.:: from sqlalchemy import asc stmt = select([users_table]).order_by(asc(users_table.c.name)) will produce SQL as:: SELECT id, name FROM user ORDER BY name ASC The :func:`.asc` function is a standalone version of the :meth:`.ColumnElement.asc` method available on all SQL expressions, e.g.:: stmt = select([users_table]).order_by(users_table.c.name.asc()) :param column: A :class:`.ColumnElement` (e.g. scalar SQL expression) with which to apply the :func:`.asc` operation. .. seealso:: :func:`.desc` :func:`.nullsfirst` :func:`.nullslast` :meth:`.Select.order_by` """ return UnaryExpression( _literal_as_label_reference(column), modifier=operators.asc_op, wraps_column_expression=False)
def view(project, branch, filename): filename, file_extension = os.path.splitext(filename) if file_extension == '': file_extension = '.html' g.menu['right'].insert(0, {'name': _('Edit'), 'url': url_for('branches.edit', project=project.name, branch=branch.name, filename=filename)}) g.menu['left'].append({'name': _('Source'), 'url': url_for('branches.source', project=project.name, branch=branch.name, filename=filename + '.rst')}) # will be deprecated if (current_user.is_authenticated and (current_user == branch.owner or current_user == project.get_master().owner)): merge_pendencies = get_merge_pendencies(project.name, branch.name) if merge_pendencies: return merge_pendencies #################### try: content = load_file(join('repos', project.name, branch.name, 'build/html', filename + file_extension)) except: raise Custom404(_('Could not find file {}').format(filename)) threads = (Thread.query.join(File_Tag) .filter(File_Tag.filename==filename) .filter(Thread.project_id==project.id) .order_by(desc(Thread.posted_at)).all()) threads_by_tag = project.get_threads_by_tag(filename) named_tags = ','.join([ t.name if re.search(t.file_regexp, filename) else '' for t in (Named_Tag.query .filter(Named_Tag.project_id==project.id).all())]) return render_template('view.html', content=content, threads=threads, filename=filename, threads_by_tag=threads_by_tag, named_tags=named_tags)
def _set_orderby_desc(self, query, model, limit, last_id, offset, descending, orderby): """Return an updated query with the proper orderby and desc.""" if orderby == 'fav_user_ids': n_favs = func.coalesce(func.array_length(model.fav_user_ids, 1), 0).label('n_favs') query = query.add_column(n_favs) if orderby in ['created', 'updated', 'finish_time']: if descending: query = query.order_by(desc( cast(getattr(model, orderby), TIMESTAMP))) else: query = query.order_by(cast(getattr(model, orderby), TIMESTAMP)) else: if orderby != 'fav_user_ids': if descending: query = query.order_by(desc(getattr(model, orderby))) else: query = query.order_by(getattr(model, orderby)) else: if descending: query = query.order_by(desc("n_favs")) else: query = query.order_by("n_favs") if last_id: query = query.limit(limit) else: query = query.limit(limit).offset(offset) return query
def show_items(category): items = session.query(Item).filter_by(category_id=category.id).order_by(desc(Item.created_at)) return render('showitems.html', category=category, items=items)
def do_orm_tests(dburl): spec = [desc(Book.b), Book.d, Book.id] with S(dburl, echo=ECHO) as s: q = s.query(Book, Author, Book.id).outerjoin(Author).order_by(*spec) q2 = s.query(Book).order_by(Book.id, Book.name) q3 = s.query(Book.id, Book.name.label('x')).order_by(Book.name, Book.id) check_paging(q=q) check_paging(q=q2) check_paging(q=q3)
def main(): with S(DB, echo=False) as s: s.execute(""" drop table if exists single; """) s.execute(""" create table if not exists single(id serial, title text, year int, peak_position int) """) with S(DB, echo=False) as s: for line in SINGLES.splitlines(): title, year, peak = line.rsplit(' ', 2) single = Single( title=title, year=year, peak_position=peak ) s.add(single) with S(DB, echo=False) as s: q = s.query(Single).order_by(Single.peak_position, desc(Single.year), Single.title, desc(Single.id)) bookmark = None while True: p = get_page(q, per_page=PER_PAGE, page=bookmark) print_page(p) bookmark = p.paging.bookmark_next if not p.paging.has_next: break
def findAllUid(offset=0, limit=None): s = select([Lamadb.analysis.c._uid]).order_by(desc("_uid")).limit(limit).offset(offset) result = Lamadb.execute(s) return [row[0] for row in result]
def query_resource(context, model, filters, sorts): query = context.session.query(model) query = _filter_query(model, query, filters) for sort_key, sort_dir in sorts: sort_dir_func = sql.asc if sort_dir else sql.desc query = query.order_by(sort_dir_func(sort_key)) return [obj.to_dict() for obj in query]
def index(): """ index loads the home page of the project """ items = Item.query.order_by(desc(Item.id)).limit(10).all() return render_template( 'home.html', categories=get_category_list(), items=items)
def list_category(category): """ lists items in a specific category """ # spaces were replaced by dashes to make urls look better category = category.replace('-', ' ') # thanks https://stackoverflow.com/questions/4985762/ items = Item.query.join(Category).filter(Category.name == category)\ .order_by(desc(Item.id)).all() return render_template( 'category.html', category=category, items=items, categories=get_category_list() )
def tag(name=None): sort_by = request.args.get('sort_by') if sort_by == 'play_count': sort_by = [Artist.playcount.desc()] elif sort_by == 'local_play_count': sort_by = [Artist.local_playcount.desc()] elif sort_by == 'tag_strength': sort_by = [desc('strength')] else: sort_by = [desc('strength'), Artist.local_playcount.desc()] name = name.lower() top_artists = ( db.session.query(Artist.name, Artist.tags[name].cast(Integer).label('strength'), Artist.local_playcount, Artist.playcount) .filter(Artist.tags.has_key(name)) .order_by(*sort_by) .all() ) top_artists = enumerate(top_artists, start=1) return render_template( 'meta/tag.html', tag=tag, top_artists=top_artists, )
def maintenance_tracks(): show_ignored = get_argument('show_ignored', arg_type=bool) arg_artist = request.args.get('artist', '') artist_filter1 = True if arg_artist == '' else (Scrobble.artist == arg_artist) artist_filter2 = True if arg_artist == '' else (DiffTracks.artist == arg_artist) artists = ( db.session.query(Scrobble.artist.label('artist')) .filter(artist_filter1) .group_by('artist') .order_by(desc(func.count(Scrobble.artist)))[:50] ) track_count = {} for artist in artists: tracks_count = ( db.session.query( Scrobble.track.label('track'), func.count(Scrobble.artist).label('count') ) .filter(Scrobble.artist == artist) .group_by('track') .order_by(desc(func.count(Scrobble.track))) .all() ) track_count[artist[0]] = {track: count for track, count in tracks_count} diffs = ( db.session.query(DiffTracks.id, DiffTracks.artist, DiffTracks.track1, DiffTracks.track2) .filter(artist_filter2, DiffTracks.ignore == show_ignored) .order_by(DiffTracks.id.asc()) .all() ) return render_template('maintenance/tracks.html', diffs=diffs, track_count=track_count)
def download_artist_metadata(limit): artists = [obj[0] for obj in ( db.session.query(Scrobble.artist).group_by(Scrobble.artist) .order_by(func.count(Scrobble.artist).desc()) .limit(limit).all() )] for artist in artists: artist_obj = db.session.query(Artist).filter(Artist.name == artist).count() if not artist_obj: sync(artist)
def make_shell_context(): from pprint import pprint from flask_sqlalchemy import get_debug_queries from sqlalchemy import desc, func from scrobbler.models import Album, Artist, NowPlaying, Scrobble, Session, Token, User return dict( app=app, db=db, pprint=pprint, gq=get_debug_queries, func=func, desc=desc, User=User, Session=Session, Token=Token, Scrobble=Scrobble, NowPlaying=NowPlaying, Artist=Artist, Album=Album, )
def maximum(run_id, parameter): param = getattr(materials.c, parameter) cols = [materials.c.uuid, param] rows = and_(materials.c.run_id == run_id, or_(materials.c.retest_passed == None, materials.c.retest_passed == True)) s = select(cols, rows).order_by(desc(param)).limit(1) print('\nuuid\t\t\t\t\t%s' % parameter) result = engine.execute(s) for row in result: print('%s\t%s' % (row[0], row[1])) result.close()
def get_last_podcasts_thumbs(self): podcasts = Podcast.query.with_entities(Podcast.name, Podcast.feed, Podcast.image).order_by(desc(Podcast.id)).limit(6) return podcasts
def get_top_terms(self, init_date, final_date, num_limit): pterm = PopularTerm query = db.session.query(pterm.term, func.sum(pterm.times).label('total') ).filter((pterm.date_search.between(init_date, final_date)) ).group_by(pterm.term).order_by(desc('total')).limit(num_limit) terms = [] for q in query: term = { 'name': q[0], 'times': q[1] } terms.append(term) return terms