我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用sqlalchemy.orm.undefer()。
def undefer_group(name): """Return a :class:`.MapperOption` that will convert the given group of deferred column properties into a non-deferred (regular column) load. Used with :meth:`.Query.options`. e.g.:: query(MyClass).options(undefer("group_one")) See also: :ref:`deferred` :param name: String name of the deferred group. This name is established using the "group" name to the :func:`.orm.deferred` configurational function. """ return strategies.UndeferGroupOption(name)
def _get_instance(auth, owner, package_name, package_hash): instance = ( Instance.query .filter_by(hash=package_hash) .options(undefer('contents')) # Contents is deferred by default. .join(Instance.package) .filter_by(owner=owner, name=package_name) .join(Package.access) .filter(Access.user.in_([auth.user, PUBLIC])) .one_or_none() ) if instance is None: raise ApiException( requests.codes.not_found, "Package hash does not exist" ) return instance
def _build_instance_get(context, columns_to_join=None): query = model_query(context, models.Instance, project_only=True).\ options(joinedload_all('security_groups.rules')).\ options(joinedload('info_cache')) if columns_to_join is None: columns_to_join = ['metadata', 'system_metadata'] for column in columns_to_join: if column in ['info_cache', 'security_groups']: # Already always joined above continue if 'extra.' in column: query = query.options(undefer(column)) else: query = query.options(joinedload(column)) # NOTE(alaski) Stop lazy loading of columns not needed. for col in ['metadata', 'system_metadata']: if col not in columns_to_join: query = query.options(noload(col)) return query
def process_email_ids(self, email_ids): self.set_status(ReaderStatus.READING) self.refresh_source() log.info("Processing messages from IMAP: %d "% (len(email_ids))) for email_id in email_ids: self.import_email(email_id) if self.status != ReaderStatus.READING: break # We imported mails, we need to re-thread self.source.db.flush() # Rethread emails globally (sigh) emails = self.source.db.query(Post).filter_by( discussion_id=self.source.discussion_id ).options(undefer(ImportedPost.imported_blob)).all() AbstractMailbox.thread_mails(emails) self.source.db.commit()
def dispatch_request(self, test_id: str, *args, **kwargs) -> Response: test = TestCase.query.options(undefer('message')).get(test_id) if not test: return self.not_found() return Resource.dispatch_request(self, test, *args, **kwargs)
def instance_get_active_by_window_joined(context, begin, end=None, project_id=None, host=None, columns_to_join=None): """Return instances and joins that were active during window.""" query = context.session.query(models.Instance) if columns_to_join is None: columns_to_join_new = ['info_cache', 'security_groups'] manual_joins = ['metadata', 'system_metadata'] else: manual_joins, columns_to_join_new = ( _manual_join_columns(columns_to_join)) for column in columns_to_join_new: if 'extra.' in column: query = query.options(undefer(column)) else: query = query.options(joinedload(column)) query = query.filter(or_(models.Instance.terminated_at == null(), models.Instance.terminated_at > begin)) if end: query = query.filter(models.Instance.launched_at < end) if project_id: query = query.filter_by(project_id=project_id) if host: query = query.filter_by(host=host) return _instances_fill_metadata(context, query.all(), manual_joins)
def _instance_get_all_query(context, project_only=False, joins=None): if joins is None: joins = ['info_cache', 'security_groups'] query = model_query(context, models.Instance, project_only=project_only) for column in joins: if 'extra.' in column: query = query.options(undefer(column)) else: query = query.options(joinedload(column)) return query
def instance_extra_get_by_instance_uuid(context, instance_uuid, columns=None): query = model_query(context, models.InstanceExtra).\ filter_by(instance_uuid=instance_uuid) if columns is None: columns = ['numa_topology', 'pci_requests', 'flavor', 'vcpu_model', 'migration_context'] for column in columns: query = query.options(undefer(column)) instance_extra = query.first() return instance_extra ###################
def undefer(*key): """Return a :class:`.MapperOption` that will convert the column property of the given name into a non-deferred (regular column) load. Used with :meth:`.Query.options`. e.g.:: from sqlalchemy.orm import undefer query(MyClass).options( undefer("attribute_one"), undefer("attribute_two")) A class bound descriptor is also accepted:: query(MyClass).options( undefer(MyClass.attribute_one), undefer(MyClass.attribute_two)) A "path" can be specified onto a related or collection object using a dotted name. The :func:`.orm.undefer` option will be applied to that object when loaded:: query(MyClass).options( undefer("related.attribute_one"), undefer("related.attribute_two")) To specify a path via class, send multiple arguments:: query(MyClass).options( undefer(MyClass.related, MyOtherClass.attribute_one), undefer(MyClass.related, MyOtherClass.attribute_two)) See also: :func:`.orm.undefer_group` as a means to "undefer" a group of attributes at once. :ref:`deferred` :param \*key: A key representing an individual path. Multiple entries are accepted to allow a multiple-token path for a single target, not multiple targets. """ return strategies.DeferredOption(key, defer=False)
def _subnet_find(context, limit, sorts, marker, page_reverse, fields, defaults=None, provider_query=False, **filters): query = context.session.query(models.Subnet) model_filters = _model_query(context, models.Subnet, filters, query) if defaults: invert_defaults = False if INVERT_DEFAULTS in defaults: invert_defaults = True defaults.pop(0) # when 'invert_defaults' were the only entry in defaults, # defaults will be empty now. The next 4 lines optimize # performance by avoiding running the in_ filter on an empty set: # like so: models.Subnet.id.in_([]) if defaults: subnet_filter = models.Subnet.id.in_(defaults) else: # if defaults is an empty list, just create a False # BinaryExpression subnet_filter = models.Subnet.id != models.Subnet.id if filters and invert_defaults: query = query.filter(and_(not_(subnet_filter), and_(*model_filters))) elif not provider_query and filters and not invert_defaults: query = query.filter(or_(subnet_filter, and_(*model_filters))) elif not invert_defaults: query = query.filter(subnet_filter) else: query = query.filter(*model_filters) if "join_dns" in filters: query = query.options(orm.joinedload(models.Subnet.dns_nameservers)) if "join_routes" in filters: query = query.options(orm.joinedload(models.Subnet.routes)) if "join_pool" in filters: query = query.options(orm.undefer('_allocation_pool_cache')) return paginate_query(query, models.Subnet, limit, sorts, marker)
def update_fn(cls, method_name, obj_id_list, shortcut_data=None, index=1): # we are in a fork! dispose of our engine. # will get a new one automatically db.engine.dispose() start = time() q = db.session.query(cls).options(orm.undefer('*')).filter(cls.id.in_(obj_id_list)) obj_rows = q.all() num_obj_rows = len(obj_rows) print "{repr}.{method_name}() got {num_obj_rows} objects in {elapsed}sec".format( repr=cls.__name__, method_name=method_name, num_obj_rows=num_obj_rows, elapsed=elapsed(start) ) for count, obj in enumerate(obj_rows): start_time = time() if obj is None: return None method_to_run = getattr(obj, method_name) print u"\n***\n{count}: starting {repr}.{method_name}() method".format( count=count + (num_obj_rows*index), repr=obj, method_name=method_name ) if shortcut_data: method_to_run(shortcut_data) else: method_to_run() print u"finished {repr}.{method_name}(). took {elapsed}sec".format( repr=obj, method_name=method_name, elapsed=elapsed(start_time, 4) ) commit_success = safe_commit(db) if not commit_success: print u"COMMIT fail" db.session.remove() # close connection nicely return None # important for if we use this on RQ