我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用sqlalchemy.case()。
def count_water_stock(message): """???????????????? :param message: slackbot?????????????class """ s = Session() stock_number, latest_ctime = ( s.query(func.sum(WaterHistory.delta), func.max(case(whens=(( WaterHistory.delta != 0, WaterHistory.ctime),), else_=None))).first() ) if stock_number: # SQLite???????????????? if not isinstance(latest_ctime, datetime.datetime): latest_ctime = datetime.datetime.strptime(latest_ctime, '%Y-%m-%d %H:%M:%S') message.send('??: {}? ({:%Y?%m?%d?} ??)' .format(stock_number, latest_ctime)) else: message.send('??????????')
def pattern_order(column, patterns): """Create a clause suitable for use in an ``order by`` clause which will sort results in the order they are matched by the patterns. Parameters ---------- column : sa.sql.ColumnClause The column being matched against. patterns : iterable[str] The patterns to search in order. Returns ------- sa.sql.ColumnClause The clause used in an ``order by`` to enforce the given order. """ return sa.case([ (column.like(fuzzy(pattern)), n) for n, pattern in enumerate(patterns) ]).asc()
def all(self, PRONAC = None, CgcCpf = None): descricao_case = case([ (CertidoesNegativasModel.CodigoCertidao == '49', u'Quitação de Tributos Federais'), (CertidoesNegativasModel.CodigoCertidao == '51', 'FGTS'), (CertidoesNegativasModel.CodigoCertidao == '52', 'INSS'), (CertidoesNegativasModel.CodigoCertidao == '244', 'CADIN'), ]) situacao_case = case([(CertidoesNegativasModel.cdSituacaoCertidao == 0, u'Pendente')], else_ = u'Não Pendente' ) res = self.sql_connector.session.query(CertidoesNegativasModel.DtEmissao.label('data_emissao'), CertidoesNegativasModel.DtValidade.label('data_validade'), descricao_case.label('descricao'), situacao_case.label('situacao'), ) if PRONAC is not None: res = res.filter(CertidoesNegativasModel.PRONAC == PRONAC) return res.all()
def logmass(parameter): @hybrid_property def mass(self): par = getattr(self, parameter) return math.log10(par) if par > 0. else 0. @mass.expression def mass(cls): par = getattr(cls, parameter) return cast(case([(par > 0., func.log(par)), (par == 0., 0.)]), Float) return mass
def HybridRatio(line1, line2): ''' produces emission line ratio hybrid properties ''' @hybrid_property def hybridRatio(self): if type(line1) == tuple: myline1 = getattr(self, line1[0])+getattr(self, line1[1]) else: myline1 = getattr(self, line1) if getattr(self, line2) > 0: return myline1/getattr(self, line2) else: return -999. @hybridRatio.expression def hybridRatio(cls): if type(line1) == tuple: myline1 = getattr(cls, line1[0])+getattr(cls, line1[1]) else: myline1 = getattr(cls, line1) return cast(case([(getattr(cls, line2) > 0., myline1/getattr(cls, line2)), (getattr(cls, line2) == 0., -999.)]), Float) return hybridRatio
def ajax_post_typeahead(): if not permissions.index_view.can(): return '[]' # this a string of the search term search_terms = request.args.get('search', '') search_terms = search_terms.split(" ") case_statements = [] for term in search_terms: case_stmt = case([(Post.keywords.ilike('%' + term.strip() + '%'), 1)], else_=0) case_statements += [case_stmt] match_score = sum(case_statements).label("match_score") posts = (db_session.query(Post, match_score) .filter(Post.status == current_repo.PostStatus.PUBLISHED.value) .order_by(desc(match_score)) .limit(5) .all()) matches = [] for (post, count) in posts: authors_str = [author.format_name for author in post.authors] typeahead_entry = {'author': authors_str, 'title': str(post.title), 'path': str(post.path), 'keywords': str(post.keywords)} matches += [typeahead_entry] return json.dumps(matches)
def test_session_query(session, table): col_concat = func.concat(table.c.string).label('concat') result = ( session .query( table.c.string, col_concat, func.avg(table.c.integer), func.sum(case([(table.c.boolean == True, 1)], else_=0)) ) .group_by(table.c.string, col_concat) .having(func.avg(table.c.integer) > 10) ).all() assert len(result) > 0
def test_else_required(self): with self.assertRaises(exc.CompileError) as ex: self.compile(case([(literal(1), 0)])) self.assertEqual( str(ex.exception), 'ELSE clause is required in CASE' )
def test_case(self): expression = case([(literal(1), 0)], else_=1) self.assertEqual( self.compile(expression, literal_binds=True), 'CASE WHEN 1 THEN 0 ELSE 1 END' )
def get_grades(cls, user_id, exercise_id, active=None): expr = case([(cls.junk == True, 0.0)], else_=cls.score) query = db.session.query(distinct(Response.id), expr) \ .outerjoin(cls, and_(cls.response_id == Response.id, cls.user_id == user_id)) \ .filter(Response.exercise_id == exercise_id).order_by(Response.id) if active: query = query.filter(Response.active == True) return query.all()
def sort_by_demographics_field(current_user, field, reverse, else_=None): if current_user.is_admin: expression = field else: sub_query = filter_by_group_roles(current_user, get_roles_with_permission(PERMISSION.VIEW_DEMOGRAPHICS)) expression = case([(sub_query, field)], else_=else_) if reverse: expression = desc(expression) return expression