我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用sqlalchemy.types.NullType()。
def _compare_type( autogen_context, alter_column_op, schema, tname, cname, conn_col, metadata_col): conn_type = conn_col.type alter_column_op.existing_type = conn_type metadata_type = metadata_col.type if conn_type._type_affinity is sqltypes.NullType: log.info("Couldn't determine database type " "for column '%s.%s'", tname, cname) return if metadata_type._type_affinity is sqltypes.NullType: log.info("Column '%s.%s' has no type within " "the model; can't compare", tname, cname) return isdiff = autogen_context.migration_context._compare_type( conn_col, metadata_col) if isdiff: alter_column_op.modify_type = metadata_type log.info("Detected type change from %r to %r on '%s.%s'", conn_type, metadata_type, tname, cname )
def get_columns(self, connection, table_name, schema=None, **kw): query = 'DESCRIBE TABLE {}'.format(table_name) rows = self._execute(connection, query) columns = [] for name, type_, default_type, default_expression in rows: # Get only type without extra modifiers. type_ = re.search(r'^\w+', type_).group(0) try: type_ = ischema_names[type_] except KeyError: type_ = sqltypes.NullType columns.append({ 'name': name, 'type': type_, 'nullable': True, 'default': None, }) return columns
def _compare_type(schema, tname, cname, conn_col, metadata_col, diffs, autogen_context): conn_type = conn_col.type metadata_type = metadata_col.type if conn_type._type_affinity is sqltypes.NullType: log.info("Couldn't determine database type " "for column '%s.%s'", tname, cname) return if metadata_type._type_affinity is sqltypes.NullType: log.info("Column '%s.%s' has no type within " "the model; can't compare", tname, cname) return isdiff = autogen_context['context']._compare_type(conn_col, metadata_col) if isdiff: diffs.append( ("modify_type", schema, tname, cname, { "existing_nullable": conn_col.nullable, "existing_server_default": conn_col.server_default, }, conn_type, metadata_type), ) log.info("Detected type change from %r to %r on '%s.%s'", conn_type, metadata_type, tname, cname )
def _get_column_info(self, name, type_, nullable, default, primary_key): match = re.match(r'(\w+)(\(.*?\))?', type_) if match: coltype = match.group(1) args = match.group(2) else: coltype = "VARCHAR" args = '' try: coltype = self.ischema_names[coltype] if args is not None: args = re.findall(r'(\d+)', args) coltype = coltype(*[int(a) for a in args]) except KeyError: util.warn("Did not recognize type '%s' of column '%s'" % (coltype, name)) coltype = sqltypes.NullType() if default is not None: default = unicode(default) return { 'name': name, 'type': coltype, 'nullable': nullable, 'default': default, 'autoincrement': default is None, 'primary_key': primary_key }
def _create_shadow_tables(migrate_engine): meta = MetaData(migrate_engine) meta.reflect(migrate_engine) table_names = list(meta.tables.keys()) meta.bind = migrate_engine for table_name in table_names: table = Table(table_name, meta, autoload=True) columns = [] for column in table.columns: column_copy = None # NOTE(boris-42): BigInteger is not supported by sqlite, so # after copy it will have NullType, other # types that are used in Nova are supported by # sqlite. if isinstance(column.type, NullType): column_copy = Column(column.name, BigInteger(), default=0) if table_name == 'instances' and column.name == 'locked_by': enum = Enum('owner', 'admin', name='shadow_instances0locked_by') column_copy = Column(column.name, enum) else: column_copy = column.copy() columns.append(column_copy) shadow_table_name = 'shadow_' + table_name shadow_table = Table(shadow_table_name, meta, *columns, mysql_engine='InnoDB') try: shadow_table.create() except Exception: LOG.info(repr(shadow_table)) LOG.exception(_LE('Exception while creating table.')) raise
def create_shadow_table(migrate_engine, table_name=None, table=None, **col_name_col_instance): """This method create shadow table for table with name ``table_name`` or table instance ``table``. :param table_name: Autoload table with this name and create shadow table :param table: Autoloaded table, so just create corresponding shadow table. :param col_name_col_instance: contains pair column_name=column_instance. column_instance is instance of Column. These params are required only for columns that have unsupported types by sqlite. For example BigInteger. :returns: The created shadow_table object. """ meta = MetaData(bind=migrate_engine) if table_name is None and table is None: raise exception.NovaException(_("Specify `table_name` or `table` " "param")) if not (table_name is None or table is None): raise exception.NovaException(_("Specify only one param `table_name` " "`table`")) if table is None: table = Table(table_name, meta, autoload=True) columns = [] for column in table.columns: if isinstance(column.type, NullType): new_column = oslodbutils._get_not_supported_column( col_name_col_instance, column.name) columns.append(new_column) else: columns.append(column.copy()) shadow_table_name = db._SHADOW_TABLE_PREFIX + table.name shadow_table = Table(shadow_table_name, meta, *columns, mysql_engine='InnoDB') try: shadow_table.create() return shadow_table except (db_exc.DBError, OperationalError): # NOTE(ekudryashova): At the moment there is a case in oslo.db code, # which raises unwrapped OperationalError, so we should catch it until # oslo.db would wraps all such exceptions LOG.info(repr(shadow_table)) LOG.exception(_LE('Exception while creating table.')) raise exception.ShadowTableExists(name=shadow_table_name) except Exception: LOG.info(repr(shadow_table)) LOG.exception(_LE('Exception while creating table.'))