我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用sqlalchemy.sql.or_()。
def get_region_by_id_or_name(self, region_id_or_name): logger.debug("Get region by id or name: {}".format(region_id_or_name)) try: session = self._engine_facade.get_session() with session.begin(): record = session.query(Region) record = record.filter(or_(Region.region_id == region_id_or_name, Region.name == region_id_or_name)) if record.first(): return record.first().to_wsme() return None except Exception as exp: logger.exception("DB error filtering by id/name") raise
def get_relationships(self, with_package=None, type=None, active=True, direction='both'): '''Returns relationships this package has. Keeps stored type/ordering (not from pov of self).''' assert direction in ('both', 'forward', 'reverse') if with_package: assert isinstance(with_package, Package) from package_relationship import PackageRelationship forward_filters = [PackageRelationship.subject==self] reverse_filters = [PackageRelationship.object==self] if with_package: forward_filters.append(PackageRelationship.object==with_package) reverse_filters.append(PackageRelationship.subject==with_package) if active: forward_filters.append(PackageRelationship.state==core.State.ACTIVE) reverse_filters.append(PackageRelationship.state==core.State.ACTIVE) if type: forward_filters.append(PackageRelationship.type==type) reverse_type = PackageRelationship.reverse_type(type) reverse_filters.append(PackageRelationship.type==reverse_type) q = meta.Session.query(PackageRelationship) if direction == 'both': q = q.filter(or_( and_(*forward_filters), and_(*reverse_filters), )) elif direction == 'forward': q = q.filter(and_(*forward_filters)) elif direction == 'reverse': q = q.filter(and_(*reverse_filters)) return q.all()
def in_grid(cls, grid): # check if a point is within the boundaries of the grid return or_(and_(cls.lon_min > grid.lon_min, cls.lon_min < grid.lon_max, cls.lat_min > grid.lat_min, cls.lat_min < grid.lat_max), and_(cls.lon_min > grid.lon_min, cls.lon_min < grid.lon_max, cls.lat_max > grid.lat_min, cls.lat_max < grid.lat_max), and_(cls.lon_max > grid.lon_min, cls.lon_max < grid.lon_max, cls.lat_min > grid.lat_min, cls.lat_min < grid.lat_max), and_(cls.lon_max > grid.lon_min, cls.lon_max < grid.lon_max, cls.lat_max > grid.lat_min, cls.lat_max < grid.lat_max), and_(cls.lon_min < grid.lon_min, cls.lon_max > grid.lon_min, cls.lat_min < grid.lat_min, cls.lat_max > grid.lat_min), and_(cls.lon_min < grid.lon_min, cls.lon_max > grid.lon_min, cls.lat_min < grid.lat_max, cls.lat_max > grid.lat_max), and_(cls.lon_min < grid.lon_max, cls.lon_max > grid.lon_max, cls.lat_min < grid.lat_min, cls.lat_max > grid.lat_min), and_(cls.lon_min < grid.lon_max, cls.lon_max > grid.lon_max, cls.lat_min < grid.lat_max, cls.lat_max > grid.lat_max))
def text_filter(query, value, table): pairs = ((n, c) for n, c in table.c.items() if isinstance(c.type, sa.sql.sqltypes.String)) sub_queries = [] for name, column in pairs: do_compare = op("like", column) sub_queries.append(do_compare(column, value)) query = query.where(or_(*sub_queries)) return query # TODO: validate that value supplied in filter has same type as in table # TODO: use functional style to create query
def get_user_filters(user): clauses=[] sfq = db.session.query(SampleFilter) clauses.append(SampleFilter.user_id==user.user_id) if not user.is_admin: clauses.append(SampleFilter.is_public==True) sfq.filter(or_(*clauses)) sfs = sfq.all() data=[{'name':x.sample_filter_name,'set':x.sample_filter_tag, 'id':x.sample_filter_id, 'filters':json.loads(x.sample_filter_data)} for x in sfs] return data
def _get_comparison(self, values, op): columns = self.__clause_element__().clauses if op in strict_op_map: stricter_op = strict_op_map[op] else: stricter_op = op return sql.or_(stricter_op(columns[0], values[0]), sql.and_(columns[0] == values[0], op(columns[1], values[1])))
def get_flavor_by_id_or_name(self, id_or_name): try: flavor = self.session.query(Flavor).filter(or_(Flavor.id == id_or_name, Flavor.name == id_or_name)) return flavor.first() except Exception as exception: message = "Failed to get_flavor_by_id_or_name: id or name: {0}".format(id_or_name) LOG.log_exception(message, exception) raise
def bookmark_list(): ''' Returns a list of bookmarks ''' search_form = SearchForm(request.args) # Create the base query # After this query is created, we keep iterating over it until we reach the desired level of filtering query = Bookmark.query.filter_by(user=current_user.id, deleted=False).order_by(Bookmark.added_on.desc()) # Get list of tags we'll be filtering by tag_args = request.values.getlist('tags') if len(tag_args) == 0: tag_args = None else: for tag in tag_args: # Check is any of the tags for the bookmark match up with tag query = query.filter(Bookmark.tags.any(tag)) # This means that the search form has been used if search_form.query.data is not None: # Search query type can be either basic, full text, or url # This is basic search, which searches in the bookmark titles and descriptions if search_form.parameter.data == 'basic': query = search(query, search_form.query.data, vector=Bookmark.search_vector) # XXX is this safe? user_count = Bookmark.query.filter_by(user=current_user.id, deleted=False).count() # Postgres full text search seems to fail when using non-ASCII characters # When the failure happens, all the bookmarks are returned instead # We check if this has happened, and if it has, we fall back to non-indexed search instead if query.count() == user_count: query = query.filter(or_(Bookmark.title.contains(search_form.query.data), Bookmark.description.contains(search_form.query.data))) elif search_form.parameter.data == 'ft': # We will search over the entire contents of the page here query = search(query, search_form.query.data, vector=Bookmark.fulltext_vector) user_count = Bookmark.query.filter_by(user=current_user.id, deleted=False).count() if query.count() == user_count: query = query.filter(Bookmark.full_text.contains(search_form.query.data)) # URL search lets you filter by domains or other parts of the url elif search_form.parameter.data == 'url': query = query.filter(Bookmark.main_url.contains(search_form.query.data)) else: pass # Context view takes you to the page the bookmark with a specific id is present on # Here the id is used to know which bookmark should be highlighted try: context_id = request.args['bid'] except KeyError: context_id = 0 # Pagination, with defaulting to the first page page = request.args.get('page', 1, type=int) # Users are allowed to specify how many bookmarks they want per page bookmarks_per_page = User.query.get(current_user.id).bookmarks_per_page # Paginate the results of our query pagination = query.paginate(page, per_page=bookmarks_per_page, error_out=False) delete_form = DeleteForm() return render_template("manager/bookmark_list.html", pagination=pagination, search_form=search_form, delete_form=delete_form, context_id=context_id, tag_args=tag_args)