我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.select()。
def test_write_version(self): env = TradingEnvironment(load=noop_load) metadata = sa.MetaData(bind=env.engine) version_table = _version_table_schema(metadata) version_table.delete().execute() # Assert that the version is not present in the table self.assertIsNone(sa.select((version_table.c.version,)).scalar()) # This should fail because the table has no version info and is, # therefore, consdered v0 with self.assertRaises(AssetDBVersionError): check_version_info(version_table, -2) # This should not raise an error because the version has been written write_version_info(version_table, -2) check_version_info(version_table, -2) # Assert that the version is in the table and correct self.assertEqual(sa.select((version_table.c.version,)).scalar(), -2) # Assert that trying to overwrite the version fails with self.assertRaises(sa.exc.IntegrityError): write_version_info(version_table, -3)
def test_last_inserted_id(self): r = config.db.execute( self.tables.autoinc_pk.insert(), data="some data" ) pk = config.db.scalar(select([self.tables.autoinc_pk.c.id])) eq_( r.inserted_primary_key, [pk] ) # failed on pypy1.9 but seems to be OK on pypy 2.1 # @exclusions.fails_if(lambda: util.pypy, # "lastrowid not maintained after " # "connection close")
def _offset_or_limit_clause_asint(clause, attrname): """Convert the "offset or limit" clause of a select construct to an integer. This is only possible if the value is stored as a simple bound parameter. Otherwise, a compilation error is raised. """ if clause is None: return None try: value = clause._limit_offset_value except AttributeError: raise exc.CompileError( "This SELECT structure does not use a simple " "integer value for %s" % attrname) else: return util.asint(value)
def columns(self): """A named-based collection of :class:`.ColumnElement` objects maintained by this :class:`.FromClause`. The :attr:`.columns`, or :attr:`.c` collection, is the gateway to the construction of SQL expressions using table-bound or other selectable-bound columns:: select([mytable]).where(mytable.c.somecolumn == 5) """ if '_columns' not in self.__dict__: self._init_collections() self._populate_column_collection() return self._columns.as_immutable()
def select(self, whereclause=None, **kwargs): """Create a :class:`.Select` from this :class:`.Join`. The equivalent long-hand form, given a :class:`.Join` object ``j``, is:: from sqlalchemy import select j = select([j.left, j.right], **kw).\\ where(whereclause).\\ select_from(j) :param whereclause: the WHERE criterion that will be sent to the :func:`select()` function :param \**kwargs: all other kwargs are sent to the underlying :func:`select()` function. """ collist = [self.left, self.right] return Select(collist, whereclause, from_obj=[self], **kwargs)
def limit(self, limit): """return a new selectable with the given LIMIT criterion applied. This is a numerical value which usually renders as a ``LIMIT`` expression in the resulting select. Backends that don't support ``LIMIT`` will attempt to provide similar functionality. .. versionchanged:: 1.0.0 - :meth:`.Select.limit` can now accept arbitrary SQL expressions as well as integer values. :param limit: an integer LIMIT parameter, or a SQL expression that provides an integer result. """ self._limit_clause = _offset_or_limit_clause(limit)
def offset(self, offset): """return a new selectable with the given OFFSET criterion applied. This is a numeric value which usually renders as an ``OFFSET`` expression in the resulting select. Backends that don't support ``OFFSET`` will attempt to provide similar functionality. .. versionchanged:: 1.0.0 - :meth:`.Select.offset` can now accept arbitrary SQL expressions as well as integer values. :param offset: an integer OFFSET parameter, or a SQL expression that provides an integer result. """ self._offset_clause = _offset_or_limit_clause(offset)
def _create_union(cls, *selects, **kwargs): """Return a ``UNION`` of multiple selectables. The returned object is an instance of :class:`.CompoundSelect`. A similar :func:`union()` method is available on all :class:`.FromClause` subclasses. \*selects a list of :class:`.Select` instances. \**kwargs available keyword arguments are the same as those of :func:`select`. """ return CompoundSelect(CompoundSelect.UNION, *selects, **kwargs)
def _create_union_all(cls, *selects, **kwargs): """Return a ``UNION ALL`` of multiple selectables. The returned object is an instance of :class:`.CompoundSelect`. A similar :func:`union_all()` method is available on all :class:`.FromClause` subclasses. \*selects a list of :class:`.Select` instances. \**kwargs available keyword arguments are the same as those of :func:`select`. """ return CompoundSelect(CompoundSelect.UNION_ALL, *selects, **kwargs)
def _populate_column_collection(self): for cols in zip(*[s.c._all_columns for s in self.selects]): # this is a slightly hacky thing - the union exports a # column that resembles just that of the *first* selectable. # to get at a "composite" column, particularly foreign keys, # you have to dig through the proxies collection which we # generate below. We may want to improve upon this, such as # perhaps _make_proxy can accept a list of other columns # that are "shared" - schema.column can then copy all the # ForeignKeys in. this would allow the union() to have all # those fks too. proxy = cols[0]._make_proxy( self, name=cols[0]._label if self.use_labels else None, key=cols[0]._key_label if self.use_labels else None) # hand-construct the "_proxies" collection to include all # derived columns place a 'weight' annotation corresponding # to how low in the list of select()s the column occurs, so # that the corresponding_column() operation can resolve # conflicts proxy._proxies = [ c._annotate({'weight': i + 1}) for (i, c) in enumerate(cols)]
def _froms(self): # would love to cache this, # but there's just enough edge cases, particularly now that # declarative encourages construction of SQL expressions # without tables present, to just regen this each time. froms = [] seen = set() translate = self._from_cloned for item in itertools.chain( _from_objects(*self._raw_columns), _from_objects(self._whereclause) if self._whereclause is not None else (), self._from_obj ): if item is self: raise exc.InvalidRequestError( "select() construct refers to itself as a FROM") if translate and item in translate: item = translate[item] if not seen.intersection(item._cloned_set): froms.append(item) seen.update(item._cloned_set) return froms
def distinct(self, *expr): """Return a new select() construct which will apply DISTINCT to its columns clause. :param \*expr: optional column expressions. When present, the Postgresql dialect will render a ``DISTINCT ON (<expressions>>)`` construct. """ if expr: expr = [_literal_as_label_reference(e) for e in expr] if isinstance(self._distinct, list): self._distinct = self._distinct + expr else: self._distinct = expr else: self._distinct = True
def append_column(self, column): """append the given column expression to the columns clause of this select() construct. This is an **in-place** mutation method; the :meth:`~.Select.column` method is preferred, as it provides standard :term:`method chaining`. """ self._reset_exported() column = _interpret_as_column_or_from(column) if isinstance(column, ScalarSelect): column = column.self_group(against=operators.comma_op) self._raw_columns = self._raw_columns + [column]
def load(self, name, flagtype): """ Load a flag's value given its name. Updates the last used date. """ field = f"value_{flagtype.value}" with self.connection.begin(): query = sa.select([self.flags.c.get(field)]).\ where(self.flags.c.name == name) data = self.connection.execute(query).fetchone() query = self.flags.update().\ where(self.flags.c.name == name).\ values(used=sa.func.now()) self.connection.execute(query) return data[field]
def distinct(self, *expr): """Return a new select() construct which will apply DISTINCT to its columns clause. :param \*expr: optional column expressions. When present, the Postgresql dialect will render a ``DISTINCT ON (<expressions>>)`` construct. """ if expr: expr = [_literal_as_text(e) for e in expr] if isinstance(self._distinct, list): self._distinct = self._distinct + expr else: self._distinct = expr else: self._distinct = True
def check_version_info(version_table, expected_version): """ Checks for a version value in the version table. Parameters ---------- version_table : sa.Table The version table of the asset database expected_version : int The expected version of the asset database Raises ------ AssetDBVersionError If the version is in the table and not equal to ASSET_DB_VERSION. """ # Read the version out of the table version_from_table = sa.select((version_table.c.version,)).scalar() # A db without a version is considered v0 if version_from_table is None: version_from_table = 0 # Raise an error if the versions do not match if (version_from_table != expected_version): raise AssetDBVersionError(db_version=version_from_table, expected_version=expected_version)
def lookup_asset_types(self, sids): """ Retrieve asset types for a list of sids. Parameters ---------- sids : list[int] Returns ------- types : dict[sid -> str or None] Asset types for the provided sids. """ found = {} missing = set() for sid in sids: try: found[sid] = self._asset_type_cache[sid] except KeyError: missing.add(sid) if not missing: return found router_cols = self.asset_router.c for assets in self._group_into_chunks(missing): query = sa.select((router_cols.sid, router_cols.asset_type)).where( self.asset_router.c.sid.in_(map(int, assets)) ) for sid, type_ in query.execute().fetchall(): missing.remove(sid) found[sid] = self._asset_type_cache[sid] = type_ for sid in missing: found[sid] = self._asset_type_cache[sid] = None return found
def _select_assets_by_sid(asset_tbl, sids): return sa.select([asset_tbl]).where( asset_tbl.c.sid.in_(map(int, sids)) )
def _get_fuzzy_candidates(self, fuzzy_symbol): candidates = sa.select( (self.equities.c.sid,) ).where(self.equities.c.fuzzy_symbol == fuzzy_symbol).order_by( self.equities.c.start_date.desc(), self.equities.c.end_date.desc() ).execute().fetchall() return candidates
def _get_fuzzy_candidates_in_range(self, fuzzy_symbol, ad_value): candidates = sa.select( (self.equities.c.sid,) ).where( sa.and_( self.equities.c.fuzzy_symbol == fuzzy_symbol, self.equities.c.start_date <= ad_value, self.equities.c.end_date >= ad_value ) ).order_by( self.equities.c.start_date.desc(), self.equities.c.end_date.desc(), ).execute().fetchall() return candidates
def _get_split_candidates_in_range(self, company_symbol, share_class_symbol, ad_value): candidates = sa.select( (self.equities.c.sid,) ).where( sa.and_( self.equities.c.company_symbol == company_symbol, self.equities.c.share_class_symbol == share_class_symbol, self.equities.c.start_date <= ad_value, self.equities.c.end_date >= ad_value ) ).order_by( self.equities.c.start_date.desc(), self.equities.c.end_date.desc(), ).execute().fetchall() return candidates
def _get_split_candidates(self, company_symbol, share_class_symbol): candidates = sa.select( (self.equities.c.sid,) ).where( sa.and_( self.equities.c.company_symbol == company_symbol, self.equities.c.share_class_symbol == share_class_symbol ) ).order_by( self.equities.c.start_date.desc(), self.equities.c.end_date.desc(), ).execute().fetchall() return candidates
def _resolve_no_matching_candidates(self, company_symbol, share_class_symbol, ad_value): candidates = sa.select((self.equities.c.sid,)).where( sa.and_( self.equities.c.company_symbol == company_symbol, self.equities.c.share_class_symbol == share_class_symbol, self.equities.c.start_date <= ad_value), ).order_by( self.equities.c.end_date.desc(), ).execute().fetchall() return candidates
def _compute_asset_lifetimes(self): """ Compute and cache a recarry of asset lifetimes. """ equities_cols = self.equities.c buf = np.array( tuple( sa.select(( equities_cols.sid, equities_cols.start_date, equities_cols.end_date, )).execute(), ), dtype='<f8', # use doubles so we get NaNs ) lifetimes = np.recarray( buf=buf, shape=(len(buf),), dtype=[ ('sid', '<f8'), ('start', '<f8'), ('end', '<f8') ], ) start = lifetimes.start end = lifetimes.end start[np.isnan(start)] = 0 # convert missing starts to 0 end[np.isnan(end)] = np.iinfo(int).max # convert missing end to INTMAX # Cast the results back down to int. return lifetimes.astype([ ('sid', '<i8'), ('start', '<i8'), ('end', '<i8'), ])
def __init__(self, engine): super(AssetFinderCachedEquities, self).__init__(engine) self.fuzzy_symbol_hashed_equities = {} self.company_share_class_hashed_equities = {} self.hashed_equities = sa.select(self.equities.c).execute().fetchall() self._load_hashed_equities()
def _assert_result(self, select, result): eq_( config.db.execute(select).fetchall(), result )
def test_plain(self): table = self.tables.some_table lx = table.c.x.label('lx') self._assert_result( select([lx]).order_by(lx), [(1, ), (2, ), (3, )] )
def test_composed_int(self): table = self.tables.some_table lx = (table.c.x + table.c.y).label('lx') self._assert_result( select([lx]).order_by(lx), [(3, ), (5, ), (7, )] )
def test_composed_multiple(self): table = self.tables.some_table lx = (table.c.x + table.c.y).label('lx') ly = (func.lower(table.c.q) + table.c.p).label('ly') self._assert_result( select([lx, ly]).order_by(lx, ly.desc()), [(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))] )
def test_plain_desc(self): table = self.tables.some_table lx = table.c.x.label('lx') self._assert_result( select([lx]).order_by(lx.desc()), [(3, ), (2, ), (1, )] )
def test_group_by_composed(self): table = self.tables.some_table expr = (table.c.x + table.c.y).label('lx') stmt = select([func.count(table.c.id), expr]).group_by(expr).order_by(expr) self._assert_result( stmt, [(1, 3), (1, 5), (1, 7)] )