我们从Python开源项目中,提取了以下4个代码示例,用于说明如何使用sqlalchemy.schema.ForeignKey()。
def add_imports(self, collector): if self.table.columns: collector.add_import(Column) for column in self.table.columns: collector.add_import(column.type) if column.server_default: collector.add_literal_import('sqlalchemy', 'text') for constraint in sorted(self.table.constraints, key=_get_constraint_sort_key): if isinstance(constraint, ForeignKeyConstraint): if len(constraint.columns) > 1: collector.add_literal_import('sqlalchemy', 'ForeignKeyConstraint') else: collector.add_literal_import('sqlalchemy', 'ForeignKey') elif isinstance(constraint, UniqueConstraint): if len(constraint.columns) > 1: collector.add_literal_import('sqlalchemy', 'UniqueConstraint') elif not isinstance(constraint, PrimaryKeyConstraint): collector.add_import(constraint) for index in self.table.indexes: if len(index.columns) > 1: collector.add_import(index)
def render_constraint(constraint): def render_fk_options(*opts): opts = [repr(opt) for opt in opts] for attr in 'ondelete', 'onupdate', 'deferrable', 'initially', 'match': value = getattr(constraint, attr, None) if value: opts.append('{0}={1!r}'.format(attr, value)) return ', '.join(opts) if isinstance(constraint, ForeignKey): remote_column = '{0}.{1}'.format(constraint.column.table.fullname, constraint.column.name) return 'ForeignKey({0})'.format(render_fk_options(remote_column)) elif isinstance(constraint, ForeignKeyConstraint): local_columns = _get_column_names(constraint) remote_columns = ['{0}.{1}'.format(fk.column.table.fullname, fk.column.name) for fk in constraint.elements] return 'ForeignKeyConstraint({0})'.format(render_fk_options(local_columns, remote_columns)) elif isinstance(constraint, CheckConstraint): return 'CheckConstraint({0!r})'.format(_get_compiled_expression(constraint.sqltext)) elif isinstance(constraint, UniqueConstraint): columns = [repr(col.name) for col in constraint.columns] return 'UniqueConstraint({0})'.format(', '.join(columns))
def reflecttable(self, table): # to use information_schema: #ischema.reflecttable(self, table, ischema_names, use_mysql=True) tabletype, foreignkeyD = self.moretableinfo(table=table) table._impl.mysql_engine = tabletype c = self.execute("describe " + table.name, {}) while True: row = c.fetchone() if row is None: break #print "row! " + repr(row) (name, type, nullable, primary_key, default) = (row[0], row[1], row[2] == 'YES', row[3] == 'PRI', row[4]) match = re.match(r'(\w+)(\(.*?\))?', type) coltype = match.group(1) args = match.group(2) #print "coltype: " + repr(coltype) + " args: " + repr(args) coltype = ischema_names.get(coltype, MSString) if args is not None: args = re.findall(r'(\d+)', args) #print "args! " +repr(args) coltype = coltype(*[int(a) for a in args]) arglist = [] fkey = foreignkeyD.get(name) if fkey is not None: arglist.append(schema.ForeignKey(fkey)) table.append_item(schema.Column(name, coltype, *arglist, **dict(primary_key=primary_key, nullable=nullable, default=default )))
def reflecttable(self, table): c = self.execute("PRAGMA table_info(" + table.name + ")", {}) while True: row = c.fetchone() if row is None: break #print "row! " + repr(row) (name, type, nullable, primary_key) = (row[1], row[2].upper(), not row[3], row[5]) match = re.match(r'(\w+)(\(.*?\))?', type) coltype = match.group(1) args = match.group(2) #print "coltype: " + repr(coltype) + " args: " + repr(args) coltype = pragma_names.get(coltype, SLString) if args is not None: args = re.findall(r'(\d+)', args) #print "args! " +repr(args) coltype = coltype(*[int(a) for a in args]) table.append_item(schema.Column(name, coltype, primary_key = primary_key, nullable = nullable)) c = self.execute("PRAGMA foreign_key_list(" + table.name + ")", {}) while True: row = c.fetchone() if row is None: break (tablename, localcol, remotecol) = (row[2], row[3], row[4]) #print "row! " + repr(row) remotetable = Table(tablename, self, autoload = True) table.c[localcol].append_item(schema.ForeignKey(remotetable.c[remotecol])) # check for UNIQUE indexes c = self.execute("PRAGMA index_list(" + table.name + ")", {}) unique_indexes = [] while True: row = c.fetchone() if row is None: break if (row[2] == 1): unique_indexes.append(row[1]) # loop thru unique indexes for one that includes the primary key for idx in unique_indexes: c = self.execute("PRAGMA index_info(" + idx + ")", {}) cols = [] while True: row = c.fetchone() if row is None: break cols.append(row[2]) col = table.columns[row[2]] # unique index that includes the pk is considered a multiple primary key for col in cols: column = table.columns[col] table.columns[col]._set_primary_key()