我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用sqlalchemy.orm.contains_eager()。
def get_dragents_hosting_bgp_speakers(self, context, bgp_speaker_ids, active=None, admin_state_up=None): query = context.session.query(BgpSpeakerDrAgentBinding) query = query.options(orm.contains_eager( BgpSpeakerDrAgentBinding.dragent)) query = query.join(BgpSpeakerDrAgentBinding.dragent) if len(bgp_speaker_ids) == 1: query = query.filter( BgpSpeakerDrAgentBinding.bgp_speaker_id == ( bgp_speaker_ids[0])) elif bgp_speaker_ids: query = query.filter( BgpSpeakerDrAgentBinding.bgp_speaker_id in bgp_speaker_ids) if admin_state_up is not None: query = query.filter(agent_model.Agent.admin_state_up == admin_state_up) return [binding.dragent for binding in query if as_db.AgentSchedulerDbMixin.is_eligible_agent( active, binding.dragent)]
def get(self, build: Build): """ Return a list of test cases for a given build. """ query = TestCase.query.options(contains_eager('job')).join( Job, TestCase.job_id == Job.id, ).filter( Job.build_id == build.id, ) result = request.args.get('result') if result: try: query = query.filter(TestCase.result == getattr(Result, result)) except AttributeError: raise NotImplementedError query = query.order_by((TestCase.result == Result.failed).desc(), TestCase.name.asc()) return self.paginate_with_schema(testcases_schema, query)
def get_l3_agents_hosting_routers(self, context, router_ids, admin_state_up=None, active=None): if not router_ids: return [] query = context.session.query(RouterL3AgentBinding) query = query.options(orm.contains_eager( RouterL3AgentBinding.l3_agent)) query = query.join(RouterL3AgentBinding.l3_agent) query = query.filter(RouterL3AgentBinding.router_id.in_(router_ids)) if admin_state_up is not None: query = (query.filter(agents_db.Agent.admin_state_up == admin_state_up)) l3_agents = [binding.l3_agent for binding in query] if active is not None: l3_agents = [l3_agent for l3_agent in l3_agents if not agents_db.AgentDbMixin.is_agent_down( l3_agent['heartbeat_timestamp'])] return l3_agents
def fixed_ip_get_by_instance(context, instance_uuid): if not uuidutils.is_uuid_like(instance_uuid): raise exception.InvalidUUID(uuid=instance_uuid) vif_and = and_(models.VirtualInterface.id == models.FixedIp.virtual_interface_id, models.VirtualInterface.deleted == 0) result = model_query(context, models.FixedIp, read_deleted="no").\ filter_by(instance_uuid=instance_uuid).\ outerjoin(models.VirtualInterface, vif_and).\ options(contains_eager("virtual_interface")).\ options(joinedload('network')).\ options(joinedload('floating_ips')).\ order_by(asc(models.VirtualInterface.created_at), asc(models.VirtualInterface.id)).\ all() if not result: raise exception.FixedIpNotFoundForInstance(instance_uuid=instance_uuid) return result
def aggregate_metadata_get_by_host(context, host, key=None): query = model_query(context, models.Aggregate) query = query.join("_hosts") query = query.join("_metadata") query = query.filter(models.AggregateHost.host == host) query = query.options(contains_eager("_metadata")) if key: query = query.filter(models.AggregateMetadata.key == key) rows = query.all() metadata = collections.defaultdict(set) for agg in rows: for kv in agg._metadata: metadata[kv['key']].add(kv['value']) return dict(metadata)
def select_reported_list(userid): q = ( Report.query .join(ReportComment) .options(contains_eager(Report.comments)) .options(joinedload('_target_sub')) .options(joinedload('_target_char')) .options(joinedload('_target_journal')) .filter(ReportComment.violation != 0) .filter_by(userid=userid)) reports = q.all() for report in reports: report.latest_report = max(c.unixtime for c in report.comments) reports.sort(key=lambda r: r.latest_report, reverse=True) return reports
def get_localized_message(login_id, message_id): ''' Get message localization for language of a specific user. If translation for language of a user doesn't exist English translation is given. ''' with session_scope() as session: localized_message = session.query(model.MessageLocalization).\ from_statement(text("select lm.*, m.* from get_localized_message(:login_id, :message_id) lm inner join message m on lm.message_id = m.id")).\ params(login_id=login_id, message_id=message_id).\ options(contains_eager(model.MessageLocalization.message)).\ one_or_none() return localized_message
def list_inventories(self, context, filters=None, limit=None, marker=None, sort_key=None, sort_dir=None): session = get_session() query = model_query(models.Inventory, session=session) query = self._add_inventories_filters(query, filters) query = query.join(models.Inventory.resource_provider) query = query.options(contains_eager('resource_provider')) return _paginate_query(models.Inventory, limit, marker, sort_key, sort_dir, query)
def get_inventory(self, context, inventory_id): session = get_session() query = model_query(models.Inventory, session=session) query = query.join(models.Inventory.resource_provider) query = query.options(contains_eager('resource_provider')) query = query.filter_by(id=inventory_id) try: return query.one() except NoResultFound: raise exception.InventoryNotFound(inventory=inventory_id)
def list_allocations(self, context, filters=None, limit=None, marker=None, sort_key=None, sort_dir=None): session = get_session() query = model_query(models.Allocation, session=session) query = self._add_allocations_filters(query, filters) query = query.join(models.Allocation.resource_provider) query = query.options(contains_eager('resource_provider')) return _paginate_query(models.Allocation, limit, marker, sort_key, sort_dir, query)
def get_allocation(self, context, allocation_id): session = get_session() query = model_query(models.Allocation, session=session) query = query.join(models.Allocation.resource_provider) query = query.options(contains_eager('resource_provider')) query = query.filter_by(id=allocation_id) try: return query.one() except NoResultFound: raise exception.AllocationNotFound(allocation=allocation_id)
def ports_with_security_groups_find(context): query = context.session.query(models.Port) query = query.join(models.Port.security_groups) query = query.options(orm.contains_eager(models.Port.security_groups)) return query
def get(self, build: Build): """ Return a list of style violations for a given build. """ query = StyleViolation.query.options(contains_eager('job')).join( Job, StyleViolation.job_id == Job.id, ).filter( Job.build_id == build.id, ) severity = request.args.get('severity') if severity: try: query = query.filter( StyleViolation.severity == getattr(Severity, severity)) except AttributeError: raise NotImplementedError query = query.order_by( (StyleViolation.severity == Severity.error).desc(), StyleViolation.filename.asc(), StyleViolation.lineno.asc(), StyleViolation.colno.asc()) return self.paginate_with_schema(styleviolation_schema, query)
def get(self, revision: Revision): """ Return a list of test cases for a given revision. """ build = fetch_build_for_revision(revision.repository, revision) if not build: return self.respond(status=404) build_ids = [original.id for original in build.original] query = TestCase.query.options(contains_eager('job')).join( Job, TestCase.job_id == Job.id, ).filter( Job.build_id.in_(build_ids), ) result = request.args.get('result') if result: try: query = query.filter( TestCase.result == getattr(Result, result)) except AttributeError: raise NotImplementedError query = query.order_by( (TestCase.result == Result.failed).desc(), TestCase.name.asc()) return self.paginate_with_schema(testcases_schema, query)
def aggregate_get_by_metadata_key(self, context, key): query = model_query(context, models.Aggregate) query = query.join("_metadata") query = query.filter(models.AggregateMetadata.key == key) query = query.options(contains_eager("_metadata")) return query.all()
def aggregate_get_by_metadata(self, context, key, value): query = model_query(context, models.Aggregate) query = query.join("_metadata") query = query.filter(and_(models.AggregateMetadata.key == key, models.AggregateMetadata.value == value)) query = query.options(contains_eager("_metadata")) return query.all()
def _show_handins(username=None): """Render an administrator page to show submissions. :param user: The interested user's name. If not given, show submissions from all users. :type user: :class:`str` :return: A :class:`flask.Response` object. """ # get pagination argument try: page = int(request.args.get('page', 1)) except ValueError: page = 1 try: perpage = int(request.args.get('perpage', 10)) except ValueError: perpage = 10 # query about all handins handins = Handin.query.join(Handin.user). \ options(contains_eager(Handin.user)).filter() # whether we want to view the submissions from one single user? if username: user = User.query.filter(User.name == username).first() if not user: raise NotFound() handins = handins.filter(Handin.user_id == user.id) # Sort the handins handins = handins.order_by(-Handin.id) # build pagination object return render_template( 'admin.handins.html', the_page=handins.paginate(page, perpage), username=username )
def aggregate_get_by_metadata_key(context, key): """Return rows that match metadata key. :param key Matches metadata key. """ query = model_query(context, models.Aggregate) query = query.join("_metadata") query = query.filter(models.AggregateMetadata.key == key) query = query.options(contains_eager("_metadata")) query = query.options(joinedload("_hosts")) return query.all()
def comment_tree(cls, target): # XXX: needs to filter more stuff out comments = ( cls.query .filter_by(**target._comment_criteria()) .join(Login, cls.userid == Login.userid) .options(contains_eager(cls.poster)) .order_by(cls.unixtime.asc()) .all()) comment_map = {} ret = [] users = set() for c in comments: comment_map[c.commentid] = c c.subcomments = [] if c.parentid: comment_map[c.parentid].subcomments.append(c) else: ret.append(c) users.add(c.poster) from libweasyl.media import populate_with_user_media populate_with_user_media(users) ret.reverse() for comment in comment_map.values(): comment.subcomments.reverse() return len(comment_map), ret
def animal_info(identifier, database, db_path=None, ): """Return the __str__ attribute of an Animal object query filterd by the id column OR by arguments of the external_id objects. Parameters ---------- db_path : string Path to a LabbookDB formatted database. identifier : int or string The identifier of the animal database : string or None, optional If specified gives a constraint on the AnimalExternalIdentifier.database column AND truns the identifier attribute into a constraint on the AnimalExternalIdentifier.identifier column. If unspecified, the identfier argument is used as a constraint on the Animal.id column. """ if not db_path: db_path = os.environ["LDB_PATH"] if database and identifier: session, engine = load_session(db_path) sql_query = session.query(Animal) sql_query = sql_query.join(Animal.external_ids)\ .filter(AnimalExternalIdentifier.database == database).filter(AnimalExternalIdentifier.identifier == identifier)\ .options(contains_eager('external_ids')) identifier = [i for i in sql_query][0].id session.close() engine.dispose() session, engine = load_session(db_path) sql_query = session.query(Animal) sql_query = sql_query.filter(Animal.id == identifier) try: animal = [i.__str__() for i in sql_query][0] except: if database == None: print("No entry was found with {id} in the Animal.id column of the database located at {loc}.".format(id=identifier,loc=db_path)) else: print("No entry was found with {id} in the AnimalExternalIdentifier.identifier column and {db} in the AnimalExternalIdentifier.database column of the database located at {loc}.".format(id=identifier,db=database,loc=db_path)) else: print(animal) session.close() engine.dispose()
def select_list(userid, form): # Find the unique violation types and the number of reporters. This will be # joined against the Report model to get the violations/reporters for each # selected report. subq = ( ReportComment.dbsession.query( ReportComment.reportid, sa.func.count(), sa.type_coerce( sa.func.array_agg(ReportComment.violation.distinct()), ARRAY(sa.Integer, as_tuple=True)).label('violations')) .filter(ReportComment.violation != 0) .group_by(ReportComment.reportid) .subquery()) # Find reports, joining against the aforementioned subquery, and eager-load # the reports' owners. q = ( Report.dbsession.query(Report, subq) .options(joinedload(Report.owner)) .join(subq, Report.reportid == subq.c.reportid) .reset_joinpoint()) # For each type of report, eagerly load the content reported and the # content's owner. Also, keep track of the Login model aliases used for each # report type so they can be filtered against later. login_aliases = [] for column_name in _report_types: login_alias = aliased(Login) login_aliases.append(login_alias) q = ( q .outerjoin(getattr(Report, column_name)) .outerjoin(login_alias) .options(contains_eager(column_name + '.owner', alias=login_alias)) .reset_joinpoint()) # Filter by report status. form.status can also be 'all', in which case no # filter is applied. if form.status == 'closed': q = q.filter_by(is_closed=True) elif form.status == 'open': q = q.filter_by(is_closed=False) # If filtering by the report's content's owner, iterate over the previously # collected Login model aliases to compare against Login.login_name. if form.submitter: submitter = legacy.login_name(form.submitter) q = q.filter(sa.or_(l.login_name == submitter for l in login_aliases)) # If filtering by violation type, see if the violation is in the array # aggregate of unique violations for this report. if form.violation and form.violation != '-1': q = q.filter(sa.literal(int(form.violation)) == sa.func.any(subq.c.violations)) q = q.order_by(Report.opened_at.desc()) return [(report, report_count, map(_convert_violation, violations)) for report, _, report_count, violations in q.all()]