Python sqlalchemy.schema 模块,ForeignKey() 实例源码

我们从Python开源项目中,提取了以下4个代码示例,用于说明如何使用sqlalchemy.schema.ForeignKey()

项目:sqlacodegen    作者:agronholm    | 项目源码 | 文件源码
def add_imports(self, collector):
        if self.table.columns:
            collector.add_import(Column)

        for column in self.table.columns:
            collector.add_import(column.type)
            if column.server_default:
                collector.add_literal_import('sqlalchemy', 'text')

        for constraint in sorted(self.table.constraints, key=_get_constraint_sort_key):
            if isinstance(constraint, ForeignKeyConstraint):
                if len(constraint.columns) > 1:
                    collector.add_literal_import('sqlalchemy', 'ForeignKeyConstraint')
                else:
                    collector.add_literal_import('sqlalchemy', 'ForeignKey')
            elif isinstance(constraint, UniqueConstraint):
                if len(constraint.columns) > 1:
                    collector.add_literal_import('sqlalchemy', 'UniqueConstraint')
            elif not isinstance(constraint, PrimaryKeyConstraint):
                collector.add_import(constraint)

        for index in self.table.indexes:
            if len(index.columns) > 1:
                collector.add_import(index)
项目:sqlacodegen    作者:agronholm    | 项目源码 | 文件源码
def render_constraint(constraint):
        def render_fk_options(*opts):
            opts = [repr(opt) for opt in opts]
            for attr in 'ondelete', 'onupdate', 'deferrable', 'initially', 'match':
                value = getattr(constraint, attr, None)
                if value:
                    opts.append('{0}={1!r}'.format(attr, value))

            return ', '.join(opts)

        if isinstance(constraint, ForeignKey):
            remote_column = '{0}.{1}'.format(constraint.column.table.fullname, constraint.column.name)
            return 'ForeignKey({0})'.format(render_fk_options(remote_column))
        elif isinstance(constraint, ForeignKeyConstraint):
            local_columns = _get_column_names(constraint)
            remote_columns = ['{0}.{1}'.format(fk.column.table.fullname, fk.column.name)
                              for fk in constraint.elements]
            return 'ForeignKeyConstraint({0})'.format(render_fk_options(local_columns, remote_columns))
        elif isinstance(constraint, CheckConstraint):
            return 'CheckConstraint({0!r})'.format(_get_compiled_expression(constraint.sqltext))
        elif isinstance(constraint, UniqueConstraint):
            columns = [repr(col.name) for col in constraint.columns]
            return 'UniqueConstraint({0})'.format(', '.join(columns))
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def reflecttable(self, table):
        # to use information_schema:
        #ischema.reflecttable(self, table, ischema_names, use_mysql=True)

        tabletype, foreignkeyD = self.moretableinfo(table=table)
        table._impl.mysql_engine = tabletype

        c = self.execute("describe " + table.name, {})
        while True:
            row = c.fetchone()
            if row is None:
                break
            #print "row! " + repr(row)
            (name, type, nullable, primary_key, default) = (row[0], row[1], row[2] == 'YES', row[3] == 'PRI', row[4])

            match = re.match(r'(\w+)(\(.*?\))?', type)
            coltype = match.group(1)
            args = match.group(2)

            #print "coltype: " + repr(coltype) + " args: " + repr(args)
            coltype = ischema_names.get(coltype, MSString)
            if args is not None:
                args = re.findall(r'(\d+)', args)
                #print "args! " +repr(args)
                coltype = coltype(*[int(a) for a in args])

            arglist = []
            fkey = foreignkeyD.get(name)
            if fkey is not None:
                arglist.append(schema.ForeignKey(fkey))

            table.append_item(schema.Column(name, coltype, *arglist,
                                            **dict(primary_key=primary_key,
                                                   nullable=nullable,
                                                   default=default
                                                   )))
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def reflecttable(self, table):
        c = self.execute("PRAGMA table_info(" + table.name + ")", {})
        while True:
            row = c.fetchone()
            if row is None:
                break
            #print "row! " + repr(row)
            (name, type, nullable, primary_key) = (row[1], row[2].upper(), not row[3], row[5])

            match = re.match(r'(\w+)(\(.*?\))?', type)
            coltype = match.group(1)
            args = match.group(2)

            #print "coltype: " + repr(coltype) + " args: " + repr(args)
            coltype = pragma_names.get(coltype, SLString)
            if args is not None:
                args = re.findall(r'(\d+)', args)
                #print "args! " +repr(args)
                coltype = coltype(*[int(a) for a in args])
            table.append_item(schema.Column(name, coltype, primary_key = primary_key, nullable = nullable))
        c = self.execute("PRAGMA foreign_key_list(" + table.name + ")", {})
        while True:
            row = c.fetchone()
            if row is None:
                break
            (tablename, localcol, remotecol) = (row[2], row[3], row[4])
            #print "row! " + repr(row)
            remotetable = Table(tablename, self, autoload = True)
            table.c[localcol].append_item(schema.ForeignKey(remotetable.c[remotecol]))
        # check for UNIQUE indexes
        c = self.execute("PRAGMA index_list(" + table.name + ")", {})
        unique_indexes = []
        while True:
            row = c.fetchone()
            if row is None:
                break
            if (row[2] == 1):
                unique_indexes.append(row[1])
        # loop thru unique indexes for one that includes the primary key
        for idx in unique_indexes:
            c = self.execute("PRAGMA index_info(" + idx + ")", {})
            cols = []
            while True:
                row = c.fetchone()
                if row is None:
                    break
                cols.append(row[2])
                col = table.columns[row[2]]
            # unique index that includes the pk is considered a multiple primary key
            for col in cols:
                column = table.columns[col]
                table.columns[col]._set_primary_key()