我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.schema.Column()。
def create_lazy_clause(table, primaryjoin, secondaryjoin, foreignkey): binds = {} def visit_binary(binary): circular = isinstance(binary.left, schema.Column) and isinstance(binary.right, schema.Column) and binary.left.table is binary.right.table if isinstance(binary.left, schema.Column) and ((not circular and binary.left.table is table) or (circular and binary.right is foreignkey)): binary.left = binds.setdefault(binary.left, sql.BindParamClause(binary.right.table.name + "_" + binary.right.name, None, shortname = binary.left.name)) binary.swap() if isinstance(binary.right, schema.Column) and ((not circular and binary.right.table is table) or (circular and binary.left is foreignkey)): binary.right = binds.setdefault(binary.right, sql.BindParamClause(binary.left.table.name + "_" + binary.left.name, None, shortname = binary.right.name)) if secondaryjoin is not None: lazywhere = sql.and_(primaryjoin, secondaryjoin) else: lazywhere = primaryjoin lazywhere = lazywhere.copy_container() li = BinaryVisitor(visit_binary) lazywhere.accept_visitor(li) return (lazywhere, binds)
def insert(table, values = None, **kwargs): """returns an INSERT clause element. This can also be called from a table directly via the table's insert() method. 'table' is the table to be inserted into. 'values' is a dictionary which specifies the column specifications of the INSERT, and is optional. If left as None, the column specifications are determined from the bind parameters used during the compile phase of the INSERT statement. If the bind parameters also are None during the compile phase, then the column specifications will be generated from the full list of table columns. If both 'values' and compile-time bind parameters are present, the compile-time bind parameters override the information specified within 'values' on a per-key basis. The keys within 'values' can be either Column objects or their string identifiers. Each key may reference one of: a literal data value (i.e. string, number, etc.), a Column object, or a SELECT statement. If a SELECT statement is specified which references this INSERT statement's table, the statement will be correlated against the INSERT statement. """ return Insert(table, values, **kwargs)
def __init__(self, engine, statement, parameters): """constructs a new Compiled object. engine - SQLEngine to compile against statement - ClauseElement to be compiled parameters - optional dictionary indicating a set of bind parameters specified with this Compiled object. These parameters are the "default" values corresponding to the ClauseElement's BindParamClauses when the Compiled is executed. In the case of an INSERT or UPDATE statement, these parameters will also result in the creation of new BindParamClause objects for each key and will also affect the generated column list in an INSERT statement and the SET clauses of an UPDATE statement. The keys of the parameter dictionary can either be the string names of columns or actual sqlalchemy.schema.Column objects.""" self.engine = engine self.parameters = parameters self.statement = statement
def url(self): return '{}/{}/{}'.format(osm_api_base, self.osm_type, self.osm_id) # class ItemCandidateTag(Base): # __tablename__ = 'item_candidate_tag' # __table_args__ = ( # ForeignKeyConstraint(['item_id', 'osm_id', 'osm_type'], # [ItemCandidate.item_id, # ItemCandidate.osm_id, # ItemCandidate.osm_type]), # ) # # item_id = Column(Integer, primary_key=True) # osm_id = Column(BigInteger, primary_key=True) # osm_type = Column(osm_type_enum, primary_key=True) # k = Column(String, primary_key=True) # v = Column(String, primary_key=True) # # item_candidate = relationship(ItemCandidate, # backref=backref('tag_table', lazy='dynamic'))
def append_column(self, column): """Append a :class:`~.schema.Column` to this :class:`~.schema.Table`. The "key" of the newly added :class:`~.schema.Column`, i.e. the value of its ``.key`` attribute, will then be available in the ``.c`` collection of this :class:`~.schema.Table`, and the column definition will be included in any CREATE TABLE, SELECT, UPDATE, etc. statements generated from this :class:`~.schema.Table` construct. Note that this does **not** change the definition of the table as it exists within any underlying database, assuming that table has already been created in the database. Relational databases support the addition of columns to existing tables using the SQL ALTER command, which would need to be emitted for an already-existing table that doesn't contain the newly added column. """ column._set_parent_with_dispatch(self)
def __repr__(self): kwarg = [] if self.key != self.name: kwarg.append('key') if self.primary_key: kwarg.append('primary_key') if not self.nullable: kwarg.append('nullable') if self.onupdate: kwarg.append('onupdate') if self.default: kwarg.append('default') if self.server_default: kwarg.append('server_default') return "Column(%s)" % ', '.join( [repr(self.name)] + [repr(self.type)] + [repr(x) for x in self.foreign_keys if x is not None] + [repr(x) for x in self.constraints] + [(self.table is not None and "table=<%s>" % self.table.description or "table=None")] + ["%s=%s" % (k, repr(getattr(self, k))) for k in kwarg])
def matchIndex(self, name=None): # Get index of correct column incols = [x for x in self.cols if x in self.collist] if not any(incols): return None elif len(incols) == 1: idx = self.cols.index(incols[0]) else: if not name: print('Multiple columns found. Column name must be specified!') return None elif name in self.collist: idx = self.cols.index(name) else: return None return idx
def _get_index_rendered_expressions(idx, autogen_context): if compat.sqla_08: return [repr(_ident(getattr(exp, "name", None))) if isinstance(exp, sa_schema.Column) else _render_potential_expr(exp, autogen_context) for exp in idx.expressions] else: return [ repr(_ident(getattr(col, "name", None))) for col in idx.columns]
def _find_columns(clause): """locate Column objects within the given expression.""" cols = set() traverse(clause, {}, {'column': cols.add}) return cols
def _textual_index_column(table, text_): """a workaround for the Index construct's severe lack of flexibility""" if isinstance(text_, compat.string_types): c = Column(text_, sqltypes.NULLTYPE) table.append_column(c) return c elif isinstance(text_, TextClause): return _textual_index_element(table, text_) else: raise ValueError("String or text() construct expected")
def __init__(self, table, text): self.table = table self.text = text self.key = text.text self.fake_column = schema.Column(self.text.text, sqltypes.NULLTYPE) table.append_column(self.fake_column)
def primary_key_constraint(self, name, table_name, cols, schema=None): m = self.metadata() columns = [sa_schema.Column(n, NULLTYPE) for n in cols] t = sa_schema.Table( table_name, m, *columns, schema=schema) p = sa_schema.PrimaryKeyConstraint( *[t.c[n] for n in cols], name=name) t.append_constraint(p) return p
def unique_constraint(self, name, source, local_cols, schema=None, **kw): t = sa_schema.Table( source, self.metadata(), *[sa_schema.Column(n, NULLTYPE) for n in local_cols], schema=schema) kw['name'] = name uq = sa_schema.UniqueConstraint(*[t.c[n] for n in local_cols], **kw) # TODO: need event tests to ensure the event # is fired off here t.append_constraint(uq) return uq
def check_constraint(self, name, source, condition, schema=None, **kw): t = sa_schema.Table(source, self.metadata(), sa_schema.Column('x', Integer), schema=schema) ck = sa_schema.CheckConstraint(condition, name=name, **kw) t.append_constraint(ck) return ck
def column(self, name, type_, **kw): return sa_schema.Column(name, type_, **kw)
def _ensure_table_for_fk(self, metadata, fk): """create a placeholder Table object for the referent of a ForeignKey. """ if isinstance(fk._colspec, string_types): table_key, cname = fk._colspec.rsplit('.', 1) sname, tname = self._parse_table_key(table_key) if table_key not in metadata.tables: rel_t = sa_schema.Table(tname, metadata, schema=sname) else: rel_t = metadata.tables[table_key] if cname not in rel_t.c: rel_t.append_column(sa_schema.Column(cname, NULLTYPE))
def columnimpl(self, column): """returns a new sql.ColumnImpl object to correspond to the given Column object. A ColumnImpl provides SQL statement builder operations on a Column metadata object, and a subclass of this object may be provided by a SQLEngine subclass to provide database-specific behavior.""" return sql.ColumnImpl(column)
def _get_col(self, row, key): if isinstance(key, schema.Column) or isinstance(key, sql.ColumnElement): try: rec = self.props[key._label.lower()] except KeyError: try: rec = self.props[key.key.lower()] except KeyError: rec = self.props[key.name.lower()] elif isinstance(key, str): rec = self.props[key.lower()] else: rec = self.props[key] return rec[0].convert_result_value(row[rec[1]], self.engine)
def reflecttable(self, table): # to use information_schema: #ischema.reflecttable(self, table, ischema_names, use_mysql=True) tabletype, foreignkeyD = self.moretableinfo(table=table) table._impl.mysql_engine = tabletype c = self.execute("describe " + table.name, {}) while True: row = c.fetchone() if row is None: break #print "row! " + repr(row) (name, type, nullable, primary_key, default) = (row[0], row[1], row[2] == 'YES', row[3] == 'PRI', row[4]) match = re.match(r'(\w+)(\(.*?\))?', type) coltype = match.group(1) args = match.group(2) #print "coltype: " + repr(coltype) + " args: " + repr(args) coltype = ischema_names.get(coltype, MSString) if args is not None: args = re.findall(r'(\d+)', args) #print "args! " +repr(args) coltype = coltype(*[int(a) for a in args]) arglist = [] fkey = foreignkeyD.get(name) if fkey is not None: arglist.append(schema.ForeignKey(fkey)) table.append_item(schema.Column(name, coltype, *arglist, **dict(primary_key=primary_key, nullable=nullable, default=default )))
def _find_dependent(self): """searches through the primary join condition to determine which side has the primary key and which has the foreign key - from this we return the "foreign key" for this property which helps determine one-to-many/many-to-one.""" # set as a reference to allow assignment from inside a first-class function dependent = [None] def foo(binary): if binary.operator != '=': return if isinstance(binary.left, schema.Column) and binary.left.primary_key: if dependent[0] is binary.left.table: raise "bidirectional dependency not supported...specify foreignkey" dependent[0] = binary.right.table self.foreignkey= binary.right elif isinstance(binary.right, schema.Column) and binary.right.primary_key: if dependent[0] is binary.right.table: raise "bidirectional dependency not supported...specify foreignkey" dependent[0] = binary.left.table self.foreignkey = binary.left visitor = BinaryVisitor(foo) self.primaryjoin.accept_visitor(visitor) if dependent[0] is None: raise "cant determine primary foreign key in the join relationship....specify foreignkey=<column> or foreignkey=[<columns>]" else: self.foreigntable = dependent[0]
def visit_compound(self, compound): for i in range(0, len(compound.clauses)): if isinstance(compound.clauses[i], schema.Column) and self.tables.has_key(compound.clauses[i].table): compound.clauses[i] = self.get_alias(compound.clauses[i].table)._get_col_by_original(compound.clauses[i]) self.match = True
def visit_binary(self, binary): if isinstance(binary.left, schema.Column) and self.tables.has_key(binary.left.table): binary.left = self.get_alias(binary.left.table)._get_col_by_original(binary.left) self.match = True if isinstance(binary.right, schema.Column) and self.tables.has_key(binary.right.table): binary.right = self.get_alias(binary.right.table)._get_col_by_original(binary.right) self.match = True
def bindparam(key, value = None, type=None): """creates a bind parameter clause with the given key. An optional default value can be specified by the value parameter, and the optional type parameter is a sqlalchemy.types.TypeEngine object which indicates bind-parameter and result-set translation for this bind parameter.""" if isinstance(key, schema.Column): return BindParamClause(key.name, value, type=key.type) else: return BindParamClause(key, value, type=type)
def is_column(col): return isinstance(col, schema.Column) or isinstance(col, ColumnElement)
def is_selectable(self): """returns True if this ClauseElement is Selectable, i.e. it contains a list of Column objects and can be used as the target of a select statement.""" return False
def _compare_self(self): """allows ColumnImpl to return its Column object for usage in ClauseElements, all others to just return self""" return self
def _get_col_by_original(self, column): """given a column which is a schema.Column object attached to a schema.Table object (i.e. an "original" column), return the Column object from this Selectable which corresponds to that original Column, or None if this Selectable does not contain the column.""" return self.original_columns.get(column.original, None)
def compare(self, other): """compares this ColumnImpl's column to the other given Column""" return self.column is other
def _compare_self(self): """allows ColumnImpl to return its Column object for usage in ClauseElements, all others to just return self""" return self.column
def _oid_col(self): # OID remains a little hackish so far if not hasattr(self, '_oid_column'): if self.table.engine.oid_column_name() is not None: self._oid_column = schema.Column(self.table.engine.oid_column_name(), sqltypes.Integer, hidden=True) self._oid_column._set_parent(self.table) else: self._oid_column = None return self._oid_column
def _primary_key_constraint(self, name, table_name, cols, schema=None): m = sa_schema.MetaData() columns = [sa_schema.Column(n, NULLTYPE) for n in cols] t1 = sa_schema.Table(table_name, m, *columns, schema=schema) p = sa_schema.PrimaryKeyConstraint(*columns, name=name) t1.append_constraint(p) return p
def _foreign_key_constraint(self, name, source, referent, local_cols, remote_cols, onupdate=None, ondelete=None, deferrable=None, source_schema=None, referent_schema=None): m = sa_schema.MetaData() if source == referent: t1_cols = local_cols + remote_cols else: t1_cols = local_cols sa_schema.Table(referent, m, *[sa_schema.Column(n, NULLTYPE) for n in remote_cols], schema=referent_schema) t1 = sa_schema.Table(source, m, *[sa_schema.Column(n, NULLTYPE) for n in t1_cols], schema=source_schema) tname = "%s.%s" % (referent_schema, referent) if referent_schema \ else referent f = sa_schema.ForeignKeyConstraint(local_cols, ["%s.%s" % (tname, n) for n in remote_cols], name=name, onupdate=onupdate, ondelete=ondelete, deferrable=deferrable ) t1.append_constraint(f) return f
def _unique_constraint(self, name, source, local_cols, schema=None, **kw): t = sa_schema.Table(source, sa_schema.MetaData(), *[sa_schema.Column(n, NULLTYPE) for n in local_cols], schema=schema) kw['name'] = name uq = sa_schema.UniqueConstraint(*[t.c[n] for n in local_cols], **kw) # TODO: need event tests to ensure the event # is fired off here t.append_constraint(uq) return uq
def _check_constraint(self, name, source, condition, schema=None, **kw): t = sa_schema.Table(source, sa_schema.MetaData(), sa_schema.Column('x', Integer), schema=schema) ck = sa_schema.CheckConstraint(condition, name=name, **kw) t.append_constraint(ck) return ck
def _column(self, name, type_, **kw): return sa_schema.Column(name, type_, **kw)
def _compare_type(schema, tname, cname, conn_col, metadata_col, diffs, autogen_context): conn_type = conn_col.type metadata_type = metadata_col.type if conn_type._type_affinity is sqltypes.NullType: log.info("Couldn't determine database type " "for column '%s.%s'", tname, cname) return if metadata_type._type_affinity is sqltypes.NullType: log.info("Column '%s.%s' has no type within " "the model; can't compare", tname, cname) return isdiff = autogen_context['context']._compare_type(conn_col, metadata_col) if isdiff: diffs.append( ("modify_type", schema, tname, cname, { "existing_nullable": conn_col.nullable, "existing_server_default": conn_col.server_default, }, conn_type, metadata_type), ) log.info("Detected type change from %r to %r on '%s.%s'", conn_type, metadata_type, tname, cname )
def _get_index_rendered_expressions(idx, autogen_context): if util.sqla_08: return [repr(_ident(getattr(exp, "name", None))) if isinstance(exp, sa_schema.Column) else _render_potential_expr(exp, autogen_context) for exp in idx.expressions] else: return [ repr(_ident(getattr(col, "name", None))) for col in idx.columns]