我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用sqlalchemy.sql.true()。
def flavor_get(self, context, flavor_uuid): query = model_query(context, models.Flavors).filter_by( uuid=flavor_uuid) if not context.is_admin: query = query.filter_by(disabled=False) the_filter = [models.Flavors.is_public == true()] the_filter.extend([ models.Flavors.projects.has(project_id=context.project_id) ]) query = query.filter(or_(*the_filter)) try: return query.one() except NoResultFound: raise exception.FlavorNotFound( flavor_id=flavor_uuid)
def project_get_networks(context, project_id, associate=True): # NOTE(tr3buchet): as before this function will associate # a project with a network if it doesn't have one and # associate is true result = model_query(context, models.Network, read_deleted="no").\ filter_by(project_id=project_id).\ all() if not result: if not associate: return [] return [network_associate(context, project_id)] return result ###################
def flavor_get_all(self, context): query = model_query(context, models.Flavors) if not context.is_admin: query = query.filter_by(disabled=False) the_filter = [models.Flavors.is_public == true()] the_filter.extend([ models.Flavors.projects.has(project_id=context.project_id) ]) query = query.filter(or_(*the_filter)) return query.all()
def has_property(self, prop): property_granted_select = select( [null()], from_obj=[ Property.__table__, PropertyGroup.__table__, Membership.__table__ ] ).where( and_( Property.name == prop, Property.property_group_id == PropertyGroup.id, PropertyGroup.id == Membership.group_id, Membership.user_id == self.id, Membership.active ) ) #.cte("property_granted_select") return and_( not_(exists( property_granted_select.where( Property.granted == false()) )), exists( property_granted_select.where( Property.granted == true() ) ) )
def test_true_false(self): self.assertEqual(self.compile(sql.false()), '0') self.assertEqual(self.compile(sql.true()), '1')
def fixed_ip_disassociate_all_by_timeout(context, host, time): # NOTE(vish): only update fixed ips that "belong" to this # host; i.e. the network host or the instance # host matches. Two queries necessary because # join with update doesn't work. host_filter = or_(and_(models.Instance.host == host, models.Network.multi_host == true()), models.Network.host == host) result = model_query(context, models.FixedIp, (models.FixedIp.id,), read_deleted="no").\ filter(models.FixedIp.allocated == false()).\ filter(models.FixedIp.updated_at < time).\ join((models.Network, models.Network.id == models.FixedIp.network_id)).\ join((models.Instance, models.Instance.uuid == models.FixedIp.instance_uuid)).\ filter(host_filter).\ all() fixed_ip_ids = [fip[0] for fip in result] if not fixed_ip_ids: return 0 result = model_query(context, models.FixedIp).\ filter(models.FixedIp.id.in_(fixed_ip_ids)).\ update({'instance_uuid': None, 'leased': False, 'updated_at': timeutils.utcnow()}, synchronize_session='fetch') return result
def _flavor_get_query(context, read_deleted=None): query = model_query(context, models.InstanceTypes, read_deleted=read_deleted).\ options(joinedload('extra_specs')) if not context.is_admin: the_filter = [models.InstanceTypes.is_public == true()] the_filter.extend([ models.InstanceTypes.projects.any(project_id=context.project_id) ]) query = query.filter(or_(*the_filter)) return query
def _flavor_get_query_from_db(context): query = context.session.query(api_models.Flavors).\ options(joinedload('extra_specs')) if not context.is_admin: the_filter = [api_models.Flavors.is_public == true()] the_filter.extend([ api_models.Flavors.projects.any(project_id=context.project_id) ]) query = query.filter(or_(*the_filter)) return query