我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用sqlalchemy.sql.expression.nullslast()。
def test_oc(): a = asc('a') b = desc('a') c = asc('b') n = nullslast(desc('a')) a = OC(a) b = OC(b) c = OC(c) n = OC(n) assert str(a) == str(OC('a')) assert a.is_ascending assert not b.is_ascending assert not n.reversed.reversed.is_ascending assert str(a.element) == str(b.element) == str(n.element) assert str(a) == str(b.reversed) assert str(n.reversed.reversed) == str(n) assert a.name == 'a' assert n.name == 'a' assert n.quoted_full_name == 'a' assert repr(n) == '<OC: a DESC NULLS LAST>'
def __new__(cls, column_name, mData=None, search_like=True, filter=str, searchable=True, filterarg='cell', nulls_order=None): """Set default values for mData and filter. On creation, sets default None values for mData and string value for filter (cause: Object representation is not JSON serializable). """ # check if allowed value if nulls_order and nulls_order not in ['nullsfirst', 'nullslast']: raise ValueError('`%s` is not an allowed value for nulls_order.' % nulls_order) return super(ColumnDT, cls).__new__( cls, column_name, mData, search_like, filter, searchable, filterarg, nulls_order)
def get_conversion_candidate(ebook_id, to_format): to_format_id = await get_format_id(to_format) async with engine.acquire() as conn: source = model.Source.__table__ format = model.Format.__table__ res = await conn.execute(select([source.c.id, format.c.extension]).where(and_(source.c.ebook_id == ebook_id, source.c.format_id == to_format_id, source.c.format_id == format.c.id))\ .order_by(nullslast(desc(source.c.quality)))) res = await res.first() if res: return res.as_tuple() #TODO: Consider optimal selection of the source # in previous version we first selected format (from available convertable in ebook) # and then one with best quality - so actually the other way around q=select([source.c.id, format.c.extension])\ .where(and_(source.c.format_id == format.c.id, source.c.ebook_id == ebook_id)).order_by(nullslast(desc(source.c.quality))) async for row in conn.execute(q): if row.extension in settings.CONVERTABLE_TYPES: return row.id, row.extension return None, None
def get_existing_conversion(ebook_id, user_id, to_format): format_id = await get_format_id(to_format) async with engine.acquire() as conn: source = model.Source.__table__ conversion = model.Conversion.__table__ res = await conn.execute(select([conversion.c.id]).select_from(conversion.join(source))\ .where(and_(source.c.ebook_id == ebook_id, conversion.c.created_by_id == user_id, conversion.c.format_id == format_id))\ .order_by(nullslast(desc(source.c.quality)))) return await res.scalar()
def _sort_by_key_value(cls, query, column, descending=False): expression = column if descending: expression = desc(expression) if settings.db.url.startswith('sqlite'): return query.order_by(expression) return query.order_by((nullsfirst if descending else nullslast)(expression))
def version_at_time_q(cls, base_id, timestamp, db=None): db = db or cls.default_db # Version that can be used without first # return db.query(cls).distinct(cls.base_id).filter( # cls.base_id == self.base_id, # (cls.tombstone_date == None) || (cls.tombstone_date > timestamp) # ).order_by(cls.base_id, nullslast(asc(cls.tombstone_date))) return db.query(cls).filter( cls.base_id == base_id, (cls.tombstone_date == None) | (cls.tombstone_date > timestamp) ).order_by(nullslast(asc(cls.tombstone_date)))