Python sqlalchemy 模块,case() 实例源码

我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用sqlalchemy.case()

项目:beproudbot    作者:beproud    | 项目源码 | 文件源码
def count_water_stock(message):
    """????????????????

    :param message: slackbot?????????????class
    """
    s = Session()
    stock_number, latest_ctime = (
        s.query(func.sum(WaterHistory.delta),
                func.max(case(whens=((
                    WaterHistory.delta != 0,
                    WaterHistory.ctime),), else_=None))).first()
    )

    if stock_number:
        # SQLite????????????????
        if not isinstance(latest_ctime, datetime.datetime):
            latest_ctime = datetime.datetime.strptime(latest_ctime,
                                                      '%Y-%m-%d %H:%M:%S')
        message.send('??: {}? ({:%Y?%m?%d?} ??)'
                     .format(stock_number, latest_ctime))
    else:
        message.send('??????????')
项目:witchcraft    作者:llllllllll    | 项目源码 | 文件源码
def pattern_order(column, patterns):
    """Create a clause suitable for use in an ``order by`` clause which will
    sort results in the order they are matched by the patterns.

    Parameters
    ----------
    column : sa.sql.ColumnClause
        The column being matched against.
    patterns : iterable[str]
        The patterns to search in order.

    Returns
    -------
    sa.sql.ColumnClause
        The clause used in an ``order by`` to enforce the given order.
    """
    return sa.case([
        (column.like(fuzzy(pattern)), n)
        for n, pattern in enumerate(patterns)
    ]).asc()
项目:salicapi    作者:Lafaiet    | 项目源码 | 文件源码
def all(self, PRONAC = None, CgcCpf = None):

        descricao_case = case([
                                (CertidoesNegativasModel.CodigoCertidao == '49', u'Quitação de Tributos Federais'),
                                (CertidoesNegativasModel.CodigoCertidao == '51', 'FGTS'),
                                (CertidoesNegativasModel.CodigoCertidao == '52', 'INSS'),
                                (CertidoesNegativasModel.CodigoCertidao == '244', 'CADIN'),
        ])

        situacao_case = case([(CertidoesNegativasModel.cdSituacaoCertidao == 0, u'Pendente')],
        else_ = u'Não Pendente'
        )

        res  = self.sql_connector.session.query(CertidoesNegativasModel.DtEmissao.label('data_emissao'),
                                                CertidoesNegativasModel.DtValidade.label('data_validade'),
                                                descricao_case.label('descricao'),
                                                situacao_case.label('situacao'),
                                                )

        if PRONAC is not None:
            res = res.filter(CertidoesNegativasModel.PRONAC == PRONAC)

        return res.all()
项目:marvin    作者:sdss    | 项目源码 | 文件源码
def logmass(parameter):

    @hybrid_property
    def mass(self):
        par = getattr(self, parameter)
        return math.log10(par) if par > 0. else 0.

    @mass.expression
    def mass(cls):
        par = getattr(cls, parameter)
        return cast(case([(par > 0., func.log(par)),
                          (par == 0., 0.)]), Float)

    return mass
项目:marvin    作者:sdss    | 项目源码 | 文件源码
def HybridRatio(line1, line2):
    ''' produces emission line ratio hybrid properties '''

    @hybrid_property
    def hybridRatio(self):

        if type(line1) == tuple:
            myline1 = getattr(self, line1[0])+getattr(self, line1[1])
        else:
            myline1 = getattr(self, line1)

        if getattr(self, line2) > 0:
            return myline1/getattr(self, line2)
        else:
            return -999.

    @hybridRatio.expression
    def hybridRatio(cls):

        if type(line1) == tuple:
            myline1 = getattr(cls, line1[0])+getattr(cls, line1[1])
        else:
            myline1 = getattr(cls, line1)

        return cast(case([(getattr(cls, line2) > 0., myline1/getattr(cls, line2)),
                          (getattr(cls, line2) == 0., -999.)]), Float)

    return hybridRatio
项目:knowledge-repo    作者:airbnb    | 项目源码 | 文件源码
def ajax_post_typeahead():
    if not permissions.index_view.can():
        return '[]'

    # this a string of the search term
    search_terms = request.args.get('search', '')
    search_terms = search_terms.split(" ")
    case_statements = []
    for term in search_terms:
        case_stmt = case([(Post.keywords.ilike('%' + term.strip() + '%'), 1)], else_=0)
        case_statements += [case_stmt]

    match_score = sum(case_statements).label("match_score")

    posts = (db_session.query(Post, match_score)
                       .filter(Post.status == current_repo.PostStatus.PUBLISHED.value)
                       .order_by(desc(match_score))
                       .limit(5)
                       .all())

    matches = []
    for (post, count) in posts:
        authors_str = [author.format_name for author in post.authors]
        typeahead_entry = {'author': authors_str,
                           'title': str(post.title),
                           'path': str(post.path),
                           'keywords': str(post.keywords)}
        matches += [typeahead_entry]
    return json.dumps(matches)
项目:pybigquery    作者:mxmzdlv    | 项目源码 | 文件源码
def test_session_query(session, table):
    col_concat = func.concat(table.c.string).label('concat')
    result = (
        session
        .query(
            table.c.string,
            col_concat,
            func.avg(table.c.integer),
            func.sum(case([(table.c.boolean == True, 1)], else_=0))
        )
        .group_by(table.c.string, col_concat)
        .having(func.avg(table.c.integer) > 10)

    ).all()
    assert len(result) > 0
项目:clickhouse-sqlalchemy    作者:xzkostyan    | 项目源码 | 文件源码
def test_else_required(self):
        with self.assertRaises(exc.CompileError) as ex:
            self.compile(case([(literal(1), 0)]))

        self.assertEqual(
            str(ex.exception),
            'ELSE clause is required in CASE'
        )
项目:clickhouse-sqlalchemy    作者:xzkostyan    | 项目源码 | 文件源码
def test_case(self):
        expression = case([(literal(1), 0)], else_=1)
        self.assertEqual(
            self.compile(expression, literal_binds=True),
            'CASE WHEN 1 THEN 0 ELSE 1 END'
        )
项目:research-eGrader    作者:openstax    | 项目源码 | 文件源码
def get_grades(cls, user_id, exercise_id, active=None):
        expr = case([(cls.junk == True, 0.0)], else_=cls.score)

        query = db.session.query(distinct(Response.id), expr) \
            .outerjoin(cls, and_(cls.response_id == Response.id,
                                 cls.user_id == user_id)) \
            .filter(Response.exercise_id == exercise_id).order_by(Response.id)

        if active:
            query = query.filter(Response.active == True)

        return query.all()
项目:radar    作者:renalreg    | 项目源码 | 文件源码
def sort_by_demographics_field(current_user, field, reverse, else_=None):
    if current_user.is_admin:
        expression = field
    else:
        sub_query = filter_by_group_roles(current_user, get_roles_with_permission(PERMISSION.VIEW_DEMOGRAPHICS))
        expression = case([(sub_query, field)], else_=else_)

    if reverse:
        expression = desc(expression)

    return expression