Python sqlalchemy.sql.expression 模块,cast() 实例源码

我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用sqlalchemy.sql.expression.cast()

项目:telemetrics-backend    作者:clearlinux    | 项目源码 | 文件源码
def get_top_crash_guilties(classes=None):
        q = db.session.query(Guilty.function, Guilty.module, Build.build, db.func.count(Record.id).label('total'), Guilty.id, Guilty.comment)
        q = q.join(Record)
        q = q.join(Build)
        q = q.join(Classification).filter(Record.classification_id == Classification.id)
        if not classes:
            classes = ['org.clearlinux/crash/clr']
        q = q.filter(Classification.classification.in_(classes))
        q = q.filter(Build.build.op('~')('^[0-9][0-9]+$'))
        q = q.filter(Guilty.hide == False)
        q = q.group_by(Guilty.function, Guilty.module, Guilty.comment, Guilty.id, Build.build)
        q = q.order_by(desc(cast(Build.build, db.Integer)), desc('total'))
        # query for records created in the last week (~ 10 Clear builds)
        q = q.filter(Build.build.in_(sorted(tuple(set([x[2] for x in q.all()])), key=lambda x: int(x))[-8:]))
        interval_sec = 24 * 60 * 60 * 7
        current_time = time()
        sec_in_past = current_time - interval_sec
        q = q.filter(Record.tsp > sec_in_past)
        return q.all()
项目:telemetrics-backend    作者:clearlinux    | 项目源码 | 文件源码
def get_heartbeat_msgs(most_recent=None):
        # These two expressions are SQL CASE conditional expressions, later
        # used within count(distinct ...) aggregates for the query.
        internal_expr = case([(Record.external == False, Record.machine_id), ]).label('internal_count')
        external_expr = case([(Record.external == True, Record.machine_id), ]).label('external_count')

        q = db.session.query(Build.build, db.func.count(db.distinct(internal_expr)), db.func.count(db.distinct(external_expr)))
        q = q.join(Record).join(Classification)
        q = q.filter(Classification.classification == "org.clearlinux/heartbeat/ping")
        q = q.filter(Record.os_name == 'clear-linux-os')
        q = q.group_by(Build.build)

        if most_recent:
            interval_sec = 24 * 60 * 60 * int(most_recent)
            current_time = time()
            sec_in_past = current_time - interval_sec
            q = q.filter(Record.tsp > sec_in_past)

        q = q.order_by(cast(Build.build, db.Integer))
        return q.all()
项目:telemetrics-backend    作者:clearlinux    | 项目源码 | 文件源码
def get_top_crash_guilties(classes=None):
        q = db.session.query(Guilty.function, Guilty.module, Build.build, db.func.count(Record.id).label('total'), Guilty.id, Guilty.comment)
        q = q.join(Record)
        q = q.join(Build)
        q = q.join(Classification).filter(Record.classification_id == Classification.id)
        if not classes:
            classes = ['org.clearlinux/crash/clr']
        q = q.filter(Classification.classification.in_(classes))
        q = q.filter(Build.build.op('~')('^[0-9][0-9]+$'))
        q = q.filter(Guilty.hide == False)
        q = q.group_by(Guilty.function, Guilty.module, Guilty.comment, Guilty.id, Build.build)
        q = q.order_by(desc(cast(Build.build, db.Integer)), desc('total'))
        # query for records created in the last week (~ 10 Clear builds)
        q = q.filter(Build.build.in_(sorted(tuple(set([x[2] for x in q.all()])), key=lambda x: int(x))[-8:]))
        interval_sec = 24 * 60 * 60 * 7
        current_time = time()
        sec_in_past = current_time - interval_sec
        q = q.filter(Record.tsp > sec_in_past)
        return q.all()
项目:telemetrics-backend    作者:clearlinux    | 项目源码 | 文件源码
def get_heartbeat_msgs(most_recent=None):
        # These two expressions are SQL CASE conditional expressions, later
        # used within count(distinct ...) aggregates for the query.
        internal_expr = case([(Record.external == False, Record.machine_id), ]).label('internal_count')
        external_expr = case([(Record.external == True, Record.machine_id), ]).label('external_count')

        q = db.session.query(Build.build, db.func.count(db.distinct(internal_expr)), db.func.count(db.distinct(external_expr)))
        q = q.join(Record).join(Classification)
        q = q.filter(Classification.classification == "org.clearlinux/heartbeat/ping")
        q = q.filter(Record.os_name == 'clear-linux-os')
        q = q.group_by(Build.build)

        if most_recent:
            interval_sec = 24 * 60 * 60 * int(most_recent)
            current_time = time()
            sec_in_past = current_time - interval_sec
            q = q.filter(Record.tsp > sec_in_past)

        q = q.order_by(cast(Build.build, db.Integer))
        return q.all()
项目:telemetrics-backend    作者:clearlinux    | 项目源码 | 文件源码
def get_top_crash_guilties(classes=None):
        q = db.session.query(Guilty.function, Guilty.module, Build.build, db.func.count(Record.id).label('total'), Guilty.id, Guilty.comment)
        q = q.join(Record)
        q = q.join(Build)
        q = q.join(Classification).filter(Record.classification_id == Classification.id)
        if not classes:
            classes = ['org.clearlinux/crash/clr']
        q = q.filter(Classification.classification.in_(classes))
        q = q.filter(Build.build.op('~')('^[0-9][0-9]+$'))
        q = q.filter(Guilty.hide == False)
        q = q.group_by(Guilty.function, Guilty.module, Guilty.comment, Guilty.id, Build.build)
        q = q.order_by(desc(cast(Build.build, db.Integer)), desc('total'))
        # query for records created in the last week (~ 10 Clear builds)
        q = q.filter(Build.build.in_(sorted(tuple(set([x[2] for x in q.all()])), key=lambda x: int(x))[-8:]))
        interval_sec = 24 * 60 * 60 * 7
        current_time = time()
        sec_in_past = current_time - interval_sec
        q = q.filter(Record.tsp > sec_in_past)
        return q.all()
项目:telemetrics-backend    作者:clearlinux    | 项目源码 | 文件源码
def get_heartbeat_msgs(most_recent=None):
        # These two expressions are SQL CASE conditional expressions, later
        # used within count(distinct ...) aggregates for the query.
        internal_expr = case([(Record.external == False, Record.machine_id), ]).label('internal_count')
        external_expr = case([(Record.external == True, Record.machine_id), ]).label('external_count')

        q = db.session.query(Build.build, db.func.count(db.distinct(internal_expr)), db.func.count(db.distinct(external_expr)))
        q = q.join(Record).join(Classification)
        q = q.filter(Classification.classification == "org.clearlinux/heartbeat/ping")
        q = q.filter(Record.os_name == 'clear-linux-os')
        q = q.group_by(Build.build)

        if most_recent:
            interval_sec = 24 * 60 * 60 * int(most_recent)
            current_time = time()
            sec_in_past = current_time - interval_sec
            q = q.filter(Record.tsp > sec_in_past)

        q = q.order_by(cast(Build.build, db.Integer))
        return q.all()
项目:Dallinger    作者:Dallinger    | 项目源码 | 文件源码
def generation(self):
        """Make generation queryable."""
        from sqlalchemy import Integer
        from sqlalchemy.sql.expression import cast
        return cast(self.property2, Integer)
项目:Dallinger    作者:Dallinger    | 项目源码 | 文件源码
def generation(self):
        """Make generation queryable."""
        return cast(self.property2, Integer)
项目:Dallinger    作者:Dallinger    | 项目源码 | 文件源码
def score(self):
        """Make score queryable."""
        return cast(self.property3, Integer)
项目:Dallinger    作者:Dallinger    | 项目源码 | 文件源码
def proportion(self):
        """Make proportion queryable."""
        return cast(self.property4, Float)
项目:Dallinger    作者:Dallinger    | 项目源码 | 文件源码
def chosen(self):
        """Retrieve chosen via property1."""
        return cast(self.property1, Boolean)
项目:telemetrics-backend    作者:clearlinux    | 项目源码 | 文件源码
def get_recordcnts_by_build():
        q = db.session.query(Build.build, db.func.count(Record.id)).join(Record.build)
        q = q.filter(Build.build.op('~')('^[0-9]+$'))
        q = q.group_by(Build.build).order_by(cast(Build.build, db.Integer)).all()
        return q
项目:telemetrics-backend    作者:clearlinux    | 项目源码 | 文件源码
def get_crashcnts_by_build(classes=None):
        q = db.session.query(Build.build, db.func.count(Record.id)).join(Record).join(Classification)
        if not classes:
            classes = ['org.clearlinux/crash/clr']
        q = q.filter(Classification.classification.in_(classes))
        q = q.filter(Build.build.op('~')('^[0-9]+$'))
        q = q.group_by(Build.build)
        q = q.order_by(desc(cast(Build.build, db.Integer)))
        q = q.limit(10)
        return q.all()
项目:telemetrics-backend    作者:clearlinux    | 项目源码 | 文件源码
def get_machine_ids_for_guilty(id, most_recent=None):
        q = db.session.query(Build.build, Record.machine_id, db.func.count(Record.id).label('total'), Record.guilty_id)
        q = q.join(Record)
        q = q.filter(Record.guilty_id == id)
        q = q.filter(Record.os_name == 'clear-linux-os')
        q = q.filter(Build.build.op('~')('^[0-9][0-9]+$'))
        q = q.group_by(Build.build, Record.machine_id, Record.guilty_id)
        q = q.order_by(desc(cast(Build.build, db.Integer)), desc('total'))
        if most_recent:
            interval_sec = 24 * 60 * 60 * int(most_recent)
            current_time = time()
            sec_in_past = current_time - interval_sec
            q = q.filter(Record.tsp > sec_in_past)
        return q.all()
项目:telemetrics-backend    作者:clearlinux    | 项目源码 | 文件源码
def get_crashcnts_by_build(classes=None):
        q = db.session.query(Build.build, db.func.count(Record.id)).join(Record).join(Classification)
        if not classes:
            classes = ['org.clearlinux/crash/clr']
        q = q.filter(Classification.classification.in_(classes))
        q = q.filter(Build.build.op('~')('^[0-9]+$'))
        q = q.group_by(Build.build)
        q = q.order_by(desc(cast(Build.build, db.Integer)))
        q = q.limit(10)
        return q.all()
项目:telemetrics-backend    作者:clearlinux    | 项目源码 | 文件源码
def get_machine_ids_for_guilty(id, most_recent=None):
        q = db.session.query(Build.build, Record.machine_id, db.func.count(Record.id).label('total'), Record.guilty_id)
        q = q.join(Record)
        q = q.filter(Record.guilty_id == id)
        q = q.filter(Record.os_name == 'clear-linux-os')
        q = q.filter(Build.build.op('~')('^[0-9][0-9]+$'))
        q = q.group_by(Build.build, Record.machine_id, Record.guilty_id)
        q = q.order_by(desc(cast(Build.build, db.Integer)), desc('total'))
        if most_recent:
            interval_sec = 24 * 60 * 60 * int(most_recent)
            current_time = time()
            sec_in_past = current_time - interval_sec
            q = q.filter(Record.tsp > sec_in_past)
        return q.all()
项目:telemetrics-backend    作者:clearlinux    | 项目源码 | 文件源码
def get_recordcnts_by_build():
        q = db.session.query(Build.build, db.func.count(Record.id)).join(Record.build)
        q = q.filter(Build.build.op('~')('^[0-9]+$'))
        q = q.group_by(Build.build).order_by(cast(Build.build, db.Integer)).all()
        return q
项目:telemetrics-backend    作者:clearlinux    | 项目源码 | 文件源码
def get_machine_ids_for_guilty(id, most_recent=None):
        q = db.session.query(Build.build, Record.machine_id, db.func.count(Record.id).label('total'), Record.guilty_id)
        q = q.join(Record)
        q = q.filter(Record.guilty_id == id)
        q = q.filter(Record.os_name == 'clear-linux-os')
        q = q.filter(Build.build.op('~')('^[0-9][0-9]+$'))
        q = q.group_by(Build.build, Record.machine_id, Record.guilty_id)
        q = q.order_by(desc(cast(Build.build, db.Integer)), desc('total'))
        if most_recent:
            interval_sec = 24 * 60 * 60 * int(most_recent)
            current_time = time()
            sec_in_past = current_time - interval_sec
            q = q.filter(Record.tsp > sec_in_past)
        return q.all()
项目:deb-python-sqlalchemy-utils    作者:openstack    | 项目源码 | 文件源码
def ancestor_of(self, other):
            if isinstance(other, list):
                return self.op('@>')(expression.cast(other, ARRAY(LtreeType)))
            else:
                return self.op('@>')(other)
项目:deb-python-sqlalchemy-utils    作者:openstack    | 项目源码 | 文件源码
def descendant_of(self, other):
            if isinstance(other, list):
                return self.op('<@')(expression.cast(other, ARRAY(LtreeType)))
            else:
                return self.op('<@')(other)
项目:deb-python-sqlalchemy-utils    作者:openstack    | 项目源码 | 文件源码
def lquery(self, other):
            if isinstance(other, list):
                return self.op('?')(expression.cast(other, ARRAY(LQUERY)))
            else:
                return self.op('~')(other)
项目:antares    作者:CONABIO    | 项目源码 | 文件源码
def acquisitions_by_mapgrid_and_date(date, mapgrid_target, day_buffer):
    from sqlalchemy import Integer, func, Date
    from sqlalchemy.sql.expression import cast
    session = SESSION_MAKER()
    images_paths = session.query(RawProduct.product_path, RapidEyeFootPrintsMexicoOld.code, RapidEyeFootPrintsMexicoOld.mapgrid2).distinct().join(RawProduct.information).filter(RawProduct.satellite_id == 1, RapidEyeFootPrintsMexicoOld.mapgrid2 == mapgrid_target, cast(RapidEyeFootPrintsMexicoOld.code, Integer) == cast(Information.grid_id, Integer), func.abs(cast(RawProduct.acquisition_date, Date) - date) < day_buffer).all()
    #RawProduct.sensor_id
    return images_paths
项目:idealoom    作者:conversence    | 项目源码 | 文件源码
def maybe_cast(column):
    cast_to = column_casts.get(column.table.name, {}).get(column.name, None)
    return column if cast_to is None else cast(column, cast_to)