我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用sqlalchemy.schema()。
def render_table(self, model): rendered = 't_{0} = Table(\n{1}{0!r}, metadata,\n'.format(model.table.name, self.indentation) for column in model.table.columns: rendered += '{0}{1},\n'.format(self.indentation, self.render_column(column, True)) for constraint in sorted(model.table.constraints, key=_get_constraint_sort_key): if isinstance(constraint, PrimaryKeyConstraint): continue if isinstance(constraint, (ForeignKeyConstraint, UniqueConstraint)) and len(constraint.columns) == 1: continue rendered += '{0}{1},\n'.format(self.indentation, self.render_constraint(constraint)) for index in model.table.indexes: if len(index.columns) > 1: rendered += '{0}{1},\n'.format(self.indentation, self.render_index(index)) if model.schema: rendered += "{0}schema='{1}',\n".format(self.indentation, model.schema) return rendered.rstrip('\n,') + '\n)\n'
def test_check(self): """Can create columns with check constraint""" col = Column('data', Integer, sqlalchemy.schema.CheckConstraint('data > 4')) col.create(self.table) # check if constraint was added (cannot test on objects) self.table.insert(values={'data': 5}).execute() try: self.table.insert(values={'data': 3}).execute() except (sqlalchemy.exc.IntegrityError, sqlalchemy.exc.ProgrammingError): pass else: self.fail() col.drop()
def _setup(self, url): super(TestColumnChange, self)._setup(url) self.meta = MetaData(self.engine) self.table = Table(self.table_name, self.meta, Column('id', Integer, primary_key=True), Column('data', String(40), server_default=DefaultClause("tluafed"), nullable=True), ) if self.table.exists(): self.table.drop() try: self.table.create() except sqlalchemy.exceptions.SQLError, e: # SQLite: database schema has changed if not self.url.startswith('sqlite://'): raise
def test_datetime(self): df = DataFrame({'A': date_range('2013-01-01 09:00:00', periods=3), 'B': np.arange(3.0)}) df.to_sql('test_datetime', self.conn) # with read_table -> type information from schema used result = sql.read_sql_table('test_datetime', self.conn) result = result.drop('index', axis=1) tm.assert_frame_equal(result, df) # with read_sql -> no type information -> sqlite has no native result = sql.read_sql_query('SELECT * FROM test_datetime', self.conn) result = result.drop('index', axis=1) if self.flavor == 'sqlite': self.assertTrue(isinstance(result.loc[0, 'A'], string_types)) result['A'] = to_datetime(result['A']) tm.assert_frame_equal(result, df) else: tm.assert_frame_equal(result, df)
def test_datetime_NaT(self): df = DataFrame({'A': date_range('2013-01-01 09:00:00', periods=3), 'B': np.arange(3.0)}) df.loc[1, 'A'] = np.nan df.to_sql('test_datetime', self.conn, index=False) # with read_table -> type information from schema used result = sql.read_sql_table('test_datetime', self.conn) tm.assert_frame_equal(result, df) # with read_sql -> no type information -> sqlite has no native result = sql.read_sql_query('SELECT * FROM test_datetime', self.conn) if self.flavor == 'sqlite': self.assertTrue(isinstance(result.loc[0, 'A'], string_types)) result['A'] = to_datetime(result['A'], errors='coerce') tm.assert_frame_equal(result, df) else: tm.assert_frame_equal(result, df)
def test_notnull_dtype(self): cols = {'Bool': Series([True, None]), 'Date': Series([datetime(2012, 5, 1), None]), 'Int': Series([1, None], dtype='object'), 'Float': Series([1.1, None]) } df = DataFrame(cols) tbl = 'notnull_dtype_test' df.to_sql(tbl, self.conn) returned_df = sql.read_sql_table(tbl, self.conn) # noqa meta = sqlalchemy.schema.MetaData(bind=self.conn) meta.reflect() if self.flavor == 'mysql': my_type = sqltypes.Integer else: my_type = sqltypes.Boolean col_dict = meta.tables[tbl].columns self.assertTrue(isinstance(col_dict['Bool'].type, my_type)) self.assertTrue(isinstance(col_dict['Date'].type, sqltypes.DateTime)) self.assertTrue(isinstance(col_dict['Int'].type, sqltypes.Integer)) self.assertTrue(isinstance(col_dict['Float'].type, sqltypes.Float))
def _create_table_setup(self): from sqlalchemy import Table, Column, PrimaryKeyConstraint column_names_and_types = \ self._get_column_names_and_types(self._sqlalchemy_type) columns = [Column(name, typ, index=is_index) for name, typ, is_index in column_names_and_types] if self.keys is not None: if not com.is_list_like(self.keys): keys = [self.keys] else: keys = self.keys pkc = PrimaryKeyConstraint(*keys, name=self.name + '_pk') columns.append(pkc) schema = self.schema or self.pd_sql.meta.schema # At this point, attach to new metadata, only attach to self.meta # once table is created. from sqlalchemy.schema import MetaData meta = MetaData(self.pd_sql, schema=schema) return Table(self.name, meta, *columns, schema=schema)
def _setup(self, url): super(TestColumnChange, self)._setup(url) self.meta = MetaData(self.engine) self.table = Table(self.table_name, self.meta, Column('id', Integer, primary_key=True), Column('data', String(40), server_default=DefaultClause("tluafed"), nullable=True), ) if self.table.exists(): self.table.drop() try: self.table.create() except sqlalchemy.exc.SQLError: # SQLite: database schema has changed if not self.url.startswith('sqlite://'): raise
def has_table(self, connection, tablename, schema=None): result = connection.scalar( sql.text( "select count(*) from msysobjects where " "type=1 and name=:name"), name=tablename ) return bool(result)
def get_table_names(self, connection, schema=None, **kw): result = connection.execute("select name from msysobjects where " "type=1 and name not like 'MSys%'") table_names = [r[0] for r in result] return table_names
def __init__(self, table): super(Model, self).__init__() self.table = table self.schema = table.schema # Adapt column types to the most reasonable generic types (ie. VARCHAR -> String) for column in table.columns: cls = column.type.__class__ for supercls in cls.__mro__: if hasattr(supercls, '__visit_name__'): cls = supercls if supercls.__name__ != supercls.__name__.upper() and not supercls.__name__.startswith('_'): break column.type = column.type.adapt(cls)
def _meta_key(self): """Get the meta key for this table.""" return sqlalchemy.schema._get_table_key(self.name, self.schema)
def deregister(self): """Remove this table from its metadata""" if SQLA_07: self.metadata._remove_table(self.name, self.schema) else: key = self._meta_key() meta = self.metadata if key in meta.tables: del meta.tables[key]
def test_fk(self): """Can create columns with foreign keys""" # create FK's target reftable = Table('tmp_ref', self.meta, Column('id', Integer, primary_key=True), ) if self.engine.has_table(reftable.name): reftable.drop() reftable.create() # create column with fk col = Column('data', Integer, ForeignKey(reftable.c.id)) col.create(self.table) # check if constraint is added for cons in self.table.constraints: if isinstance(cons, sqlalchemy.schema.ForeignKeyConstraint): break else: self.fail('No constraint found') # TODO: test on db level if constraints work if SQLA_07: self.assertEqual(reftable.c.id.name, list(col.foreign_keys)[0].column.name) else: self.assertEqual(reftable.c.id.name, col.foreign_keys[0].column.name) col.drop(self.table) if self.engine.has_table(reftable.name): reftable.drop()
def _teardown(self): if self.table.exists(): try: self.table.drop(self.engine) except sqlalchemy.exceptions.SQLError,e: # SQLite: database schema has changed if not self.url.startswith('sqlite://'): raise super(TestColumnChange, self)._teardown()
def _get_all_tables(self): meta = sqlalchemy.schema.MetaData(bind=self.conn) meta.reflect() table_list = meta.tables.keys() return table_list
def _get_sqlite_column_type(self, schema, column): for col in schema.split('\n'): if col.split()[0].strip('""') == column: return col.split()[1] raise ValueError('Column %s not found' % (column))
def test_sqlite_type_mapping(self): # Test Timestamp objects (no datetime64 because of timezone) (GH9085) df = DataFrame({'time': to_datetime(['201412120154', '201412110254'], utc=True)}) db = sql.SQLiteDatabase(self.conn, self.flavor) table = sql.SQLiteTable("test_type", db, frame=df) schema = table.sql_schema() self.assertEqual(self._get_sqlite_column_type(schema, 'time'), "TIMESTAMP") # ----------------------------------------------------------------------------- # -- Database flavor specific tests
def test_double_precision(self): V = 1.23456789101112131415 df = DataFrame({'f32': Series([V, ], dtype='float32'), 'f64': Series([V, ], dtype='float64'), 'f64_as_f32': Series([V, ], dtype='float64'), 'i32': Series([5, ], dtype='int32'), 'i64': Series([5, ], dtype='int64'), }) df.to_sql('test_dtypes', self.conn, index=False, if_exists='replace', dtype={'f64_as_f32': sqlalchemy.Float(precision=23)}) res = sql.read_sql_table('test_dtypes', self.conn) # check precision of float64 self.assertEqual(np.round(df['f64'].iloc[0], 14), np.round(res['f64'].iloc[0], 14)) # check sql types meta = sqlalchemy.schema.MetaData(bind=self.conn) meta.reflect() col_dict = meta.tables['test_dtypes'].columns self.assertEqual(str(col_dict['f32'].type), str(col_dict['f64_as_f32'].type)) self.assertTrue(isinstance(col_dict['f32'].type, sqltypes.Float)) self.assertTrue(isinstance(col_dict['f64'].type, sqltypes.Float)) self.assertTrue(isinstance(col_dict['i32'].type, sqltypes.Integer)) self.assertTrue(isinstance(col_dict['i64'].type, sqltypes.BigInteger))
def has_table(table_name, con, flavor='sqlite', schema=None): """ Check if DataBase has named table. Parameters ---------- table_name: string Name of SQL table con: SQLAlchemy connectable(engine/connection) or sqlite3 DBAPI2 connection Using SQLAlchemy makes it possible to use any DB supported by that library. If a DBAPI2 object, only sqlite3 is supported. flavor: {'sqlite', 'mysql'}, default 'sqlite' The flavor of SQL to use. Ignored when using SQLAlchemy connectable. 'mysql' is deprecated and will be removed in future versions, but it will be further supported through SQLAlchemy connectables. schema : string, default None Name of SQL schema in database to write to (if database flavor supports this). If None, use default schema (default). Returns ------- boolean """ pandas_sql = pandasSQL_builder(con, flavor=flavor, schema=schema) return pandas_sql.has_table(table_name)
def pandasSQL_builder(con, flavor=None, schema=None, meta=None, is_cursor=False): """ Convenience function to return the correct PandasSQL subclass based on the provided parameters """ # When support for DBAPI connections is removed, # is_cursor should not be necessary. con = _engine_builder(con) if _is_sqlalchemy_connectable(con): return SQLDatabase(con, schema=schema, meta=meta) else: if flavor == 'mysql': warnings.warn(_MYSQL_WARNING, FutureWarning, stacklevel=3) return SQLiteDatabase(con, flavor, is_cursor=is_cursor)
def exists(self): return self.pd_sql.has_table(self.name, self.schema)
def sql_schema(self): from sqlalchemy.schema import CreateTable return str(CreateTable(self.table).compile(self.pd_sql.connectable))
def create(self): if self.exists(): if self.if_exists == 'fail': raise ValueError("Table '%s' already exists." % self.name) elif self.if_exists == 'replace': self.pd_sql.drop_table(self.name, self.schema) self._execute_create() elif self.if_exists == 'append': pass else: raise ValueError( "'{0}' is not valid for if_exists".format(self.if_exists)) else: self._execute_create()
def __init__(self, engine, schema=None, meta=None): self.connectable = engine if not meta: from sqlalchemy.schema import MetaData meta = MetaData(self.connectable, schema=schema) self.meta = meta
def has_table(self, name, schema=None): return self.connectable.run_callable( self.connectable.dialect.has_table, name, schema or self.meta.schema, )
def get_table(self, table_name, schema=None): schema = schema or self.meta.schema if schema: tbl = self.meta.tables.get('.'.join([schema, table_name])) else: tbl = self.meta.tables.get(table_name) # Avoid casting double-precision floats into decimals from sqlalchemy import Numeric for column in tbl.columns: if isinstance(column.type, Numeric): column.type.asdecimal = False return tbl
def drop_table(self, table_name, schema=None): schema = schema or self.meta.schema if self.has_table(table_name, schema): self.meta.reflect(only=[table_name], schema=schema) self.get_table(table_name, schema).drop() self.meta.clear()
def to_sql(self, frame, name, if_exists='fail', index=True, index_label=None, schema=None, chunksize=None, dtype=None): """ Write records stored in a DataFrame to a SQL database. Parameters ---------- frame: DataFrame name: name of SQL table if_exists: {'fail', 'replace', 'append'}, default 'fail' fail: If table exists, do nothing. replace: If table exists, drop it, recreate it, and insert data. append: If table exists, insert data. Create if does not exist. index : boolean, default True Write DataFrame index as a column index_label : string or sequence, default None Column label for index column(s). If None is given (default) and `index` is True, then the index names are used. A sequence should be given if the DataFrame uses MultiIndex. schema : string, default None Ignored parameter included for compatability with SQLAlchemy version of ``to_sql``. chunksize : int, default None If not None, then rows will be written in batches of this size at a time. If None, all rows will be written at once. dtype : dict of column name to SQL type, default None Optional specifying the datatype for columns. The SQL type should be a string. """ if dtype is not None: for col, my_type in dtype.items(): if not isinstance(my_type, str): raise ValueError('%s (%s) not a string' % ( col, str(my_type))) table = SQLiteTable(name, self, frame=frame, index=index, if_exists=if_exists, index_label=index_label, dtype=dtype) table.create() table.insert(chunksize)
def get_table(self, table_name, schema=None): return None # not supported in fallback mode
def drop_table(self, name, schema=None): escape = _SQL_GET_IDENTIFIER[self.flavor] drop_sql = "DROP TABLE %s" % escape(name) self.execute(drop_sql)
def get_schema(frame, name, flavor='sqlite', keys=None, con=None, dtype=None): """ Get the SQL db table schema for the given frame. Parameters ---------- frame : DataFrame name : string name of SQL table flavor : {'sqlite', 'mysql'}, default 'sqlite' The flavor of SQL to use. Ignored when using SQLAlchemy connectable. 'mysql' is deprecated and will be removed in future versions, but it will be further supported through SQLAlchemy engines. keys : string or sequence, default: None columns to use a primary key con: an open SQL database connection object or a SQLAlchemy connectable Using SQLAlchemy makes it possible to use any DB supported by that library, default: None If a DBAPI2 object, only sqlite3 is supported. dtype : dict of column name to SQL type, default None Optional specifying the datatype for columns. The SQL type should be a SQLAlchemy type, or a string for sqlite3 fallback connection. """ pandas_sql = pandasSQL_builder(con=con, flavor=flavor) return pandas_sql._create_sql_schema(frame, name, keys=keys, dtype=dtype)
def _set_search_path(schema, dbapi_connection, connection_record, connection_proxy): cursor = dbapi_connection.cursor() cursor.execute("SET search_path TO {};".format(schema)) dbapi_connection.commit() cursor.close()
def _db_engine(conn_string, schema): db_engine = sqlalchemy.create_engine(conn_string, json_serializer=Encoder().encode) sqlalchemy.event.listens_for(db_engine, "engine_connect")(_ping_postgres) sqlalchemy.event.listens_for(db_engine, "connect")(_record_pid) sqlalchemy.event.listens_for(db_engine, "checkout")(_check_pid) if schema: sqlalchemy.event.listens_for(db_engine, "checkout")(_set_search_path(schema)) return db_engine
def _to_index(index, table=None, engine=None): """Return if instance of Index, else construct new with metadata""" if isinstance(index, sqlalchemy.Index): return index # Given: index name; table name required table = _to_table(table, engine) ret = sqlalchemy.Index(index) ret.table = table return ret # Python3: if we just use: # # class ColumnDelta(DictMixin, sqlalchemy.schema.SchemaItem): # ... # # We get the following error: # TypeError: metaclass conflict: the metaclass of a derived class must be a # (non-strict) subclass of the metaclasses of all its bases. # # The complete inheritance/metaclass relationship list of ColumnDelta can be # summarized by this following dot file: # # digraph test123 { # ColumnDelta -> MutableMapping; # MutableMapping -> Mapping; # Mapping -> {Sized Iterable Container}; # {Sized Iterable Container} -> ABCMeta[style=dashed]; # # ColumnDelta -> SchemaItem; # SchemaItem -> {SchemaEventTarget Visitable}; # SchemaEventTarget -> object; # Visitable -> {VisitableType object} [style=dashed]; # VisitableType -> type; # } # # We need to use a metaclass that inherits from all the metaclasses of # DictMixin and sqlalchemy.schema.SchemaItem. Let's call it "MyMeta".
def test_fk(self): """Can create columns with foreign keys""" # create FK's target reftable = Table('tmp_ref', self.meta, Column('id', Integer, primary_key=True), ) if self.engine.has_table(reftable.name): reftable.drop() reftable.create() # create column with fk col = Column('data', Integer, ForeignKey(reftable.c.id, name='testfk')) col.create(self.table) # check if constraint is added for cons in self.table.constraints: if isinstance(cons, sqlalchemy.schema.ForeignKeyConstraint): break else: self.fail('No constraint found') # TODO: test on db level if constraints work if SQLA_07: self.assertEqual(reftable.c.id.name, list(col.foreign_keys)[0].column.name) else: self.assertEqual(reftable.c.id.name, col.foreign_keys[0].column.name) if self.engine.name == 'mysql': constraint.ForeignKeyConstraint([self.table.c.data], [reftable.c.id], name='testfk').drop() col.drop(self.table) if self.engine.has_table(reftable.name): reftable.drop()
def _actual_foreign_keys(self): from sqlalchemy.schema import ForeignKeyConstraint result = [] for cons in self.table.constraints: if isinstance(cons,ForeignKeyConstraint): col_names = [] for col_name in cons.columns: if not isinstance(col_name,six.string_types): col_name = col_name.name col_names.append(col_name) result.append(col_names) result.sort() return result
def _teardown(self): if self.table.exists(): try: self.table.drop(self.engine) except sqlalchemy.exc.SQLError: # SQLite: database schema has changed if not self.url.startswith('sqlite://'): raise super(TestColumnChange, self)._teardown()
def render_class(self, model): rendered = 'class {0}({1}):\n'.format(model.name, model.parent_name) rendered += '{0}__tablename__ = {1!r}\n'.format(self.indentation, model.table.name) # Render constraints and indexes as __table_args__ table_args = [] for constraint in sorted(model.table.constraints, key=_get_constraint_sort_key): if isinstance(constraint, PrimaryKeyConstraint): continue if isinstance(constraint, (ForeignKeyConstraint, UniqueConstraint)) and len(constraint.columns) == 1: continue table_args.append(self.render_constraint(constraint)) for index in model.table.indexes: if len(index.columns) > 1: table_args.append(self.render_index(index)) table_kwargs = {} if model.schema: table_kwargs['schema'] = model.schema kwargs_items = ', '.join('{0!r}: {1!r}'.format(key, table_kwargs[key]) for key in table_kwargs) kwargs_items = '{{{0}}}'.format(kwargs_items) if kwargs_items else None if table_kwargs and not table_args: rendered += '{0}__table_args__ = {1}\n'.format(self.indentation, kwargs_items) elif table_args: if kwargs_items: table_args.append(kwargs_items) if len(table_args) == 1: table_args[0] += ',' table_args_joined = ',\n{0}{0}'.format(self.indentation).join(table_args) rendered += '{0}__table_args__ = (\n{0}{0}{1}\n{0})\n'.format(self.indentation, table_args_joined) # Render columns rendered += '\n' for attr, column in model.attributes.items(): if isinstance(column, Column): show_name = attr != column.name rendered += '{0}{1} = {2}\n'.format(self.indentation, attr, self.render_column(column, show_name)) # Render relationships if any(isinstance(value, Relationship) for value in model.attributes.values()): rendered += '\n' for attr, relationship in model.attributes.items(): if isinstance(relationship, Relationship): rendered += '{0}{1} = {2}\n'.format(self.indentation, attr, self.render_relationship(relationship)) # Render subclasses for child_class in model.children: rendered += self.model_separator + self.render_class(child_class) return rendered
def alter_column(*p, **k): """Alter a column. This is a helper function that creates a :class:`ColumnDelta` and runs it. :argument column: The name of the column to be altered or a :class:`ChangesetColumn` column representing it. :param table: A :class:`~sqlalchemy.schema.Table` or table name to for the table where the column will be changed. :param engine: The :class:`~sqlalchemy.engine.base.Engine` to use for table reflection and schema alterations. :returns: A :class:`ColumnDelta` instance representing the change. """ if 'table' not in k and isinstance(p[0], sqlalchemy.Column): k['table'] = p[0].table if 'engine' not in k: k['engine'] = k['table'].bind # deprecation if len(p) >= 2 and isinstance(p[1], sqlalchemy.Column): warnings.warn( "Passing a Column object to alter_column is deprecated." " Just pass in keyword parameters instead.", MigrateDeprecationWarning ) engine = k['engine'] # enough tests seem to break when metadata is always altered # that this crutch has to be left in until they can be sorted # out k['alter_metadata']=True delta = ColumnDelta(*p, **k) visitorcallable = get_engine_visitor(engine, 'schemachanger') engine._run_visitor(visitorcallable, delta) return delta
def test_drop_with_complex_foreign_keys(self): from sqlalchemy.schema import ForeignKeyConstraint from sqlalchemy.schema import UniqueConstraint self.table.drop() self.meta.clear() # create FK's target reftable = Table('tmp_ref', self.meta, Column('id', Integer, primary_key=True), Column('jd', Integer), UniqueConstraint('id','jd') ) if self.engine.has_table(reftable.name): reftable.drop() reftable.create() # add a table with a complex foreign key constraint self.table = Table( self.table_name, self.meta, Column('id', Integer, primary_key=True), Column('r1', Integer), Column('r2', Integer), ForeignKeyConstraint(['r1','r2'], [reftable.c.id,reftable.c.jd], name='test_fk') ) self.table.create() # paranoid check self.assertEqual([['r1','r2']], self._actual_foreign_keys()) # delete one self.table.c.r2.drop() # check the constraint is gone, since part of it # is no longer there - if people hit this, # they may be confused, maybe we should raise an error # and insist that the constraint is deleted first, separately? self.assertEqual([], self._actual_foreign_keys())
def to_sql(frame, name, con, flavor='sqlite', schema=None, if_exists='fail', index=True, index_label=None, chunksize=None, dtype=None): """ Write records stored in a DataFrame to a SQL database. Parameters ---------- frame : DataFrame name : string Name of SQL table con : SQLAlchemy connectable(engine/connection) or database string URI or sqlite3 DBAPI2 connection Using SQLAlchemy makes it possible to use any DB supported by that library. If a DBAPI2 object, only sqlite3 is supported. flavor : {'sqlite', 'mysql'}, default 'sqlite' The flavor of SQL to use. Ignored when using SQLAlchemy connectable. 'mysql' is deprecated and will be removed in future versions, but it will be further supported through SQLAlchemy connectables. schema : string, default None Name of SQL schema in database to write to (if database flavor supports this). If None, use default schema (default). if_exists : {'fail', 'replace', 'append'}, default 'fail' - fail: If table exists, do nothing. - replace: If table exists, drop it, recreate it, and insert data. - append: If table exists, insert data. Create if does not exist. index : boolean, default True Write DataFrame index as a column index_label : string or sequence, default None Column label for index column(s). If None is given (default) and `index` is True, then the index names are used. A sequence should be given if the DataFrame uses MultiIndex. chunksize : int, default None If not None, then rows will be written in batches of this size at a time. If None, all rows will be written at once. dtype : dict of column name to SQL type, default None Optional specifying the datatype for columns. The SQL type should be a SQLAlchemy type, or a string for sqlite3 fallback connection. """ if if_exists not in ('fail', 'replace', 'append'): raise ValueError("'{0}' is not valid for if_exists".format(if_exists)) pandas_sql = pandasSQL_builder(con, schema=schema, flavor=flavor) if isinstance(frame, Series): frame = frame.to_frame() elif not isinstance(frame, DataFrame): raise NotImplementedError("'frame' argument should be either a " "Series or a DataFrame") pandas_sql.to_sql(frame, name, if_exists=if_exists, index=index, index_label=index_label, schema=schema, chunksize=chunksize, dtype=dtype)
def to_sql(self, frame, name, if_exists='fail', index=True, index_label=None, schema=None, chunksize=None, dtype=None): """ Write records stored in a DataFrame to a SQL database. Parameters ---------- frame : DataFrame name : string Name of SQL table if_exists : {'fail', 'replace', 'append'}, default 'fail' - fail: If table exists, do nothing. - replace: If table exists, drop it, recreate it, and insert data. - append: If table exists, insert data. Create if does not exist. index : boolean, default True Write DataFrame index as a column index_label : string or sequence, default None Column label for index column(s). If None is given (default) and `index` is True, then the index names are used. A sequence should be given if the DataFrame uses MultiIndex. schema : string, default None Name of SQL schema in database to write to (if database flavor supports this). If specified, this overwrites the default schema of the SQLDatabase object. chunksize : int, default None If not None, then rows will be written in batches of this size at a time. If None, all rows will be written at once. dtype : dict of column name to SQL type, default None Optional specifying the datatype for columns. The SQL type should be a SQLAlchemy type. """ if dtype is not None: from sqlalchemy.types import to_instance, TypeEngine for col, my_type in dtype.items(): if not isinstance(to_instance(my_type), TypeEngine): raise ValueError('The type of %s is not a SQLAlchemy ' 'type ' % col) table = SQLTable(name, self, frame=frame, index=index, if_exists=if_exists, index_label=index_label, schema=schema, dtype=dtype) table.create() table.insert(chunksize) # check for potentially case sensitivity issues (GH7815) engine = self.connectable.engine with self.connectable.connect() as conn: table_names = engine.table_names( schema=schema or self.meta.schema, connection=conn, ) if name not in table_names: warnings.warn("The provided table name '{0}' is not found exactly " "as such in the database after writing the table, " "possibly due to case sensitivity issues. Consider " "using lower case table names.".format(name), UserWarning)
def objectify(self, dict_, context=None): """Extended to handle JSON properties.""" mapper = self.inspector context = mapper.class_() if context is None else context # If our schema and widgets wants pass us back full objects instead of theri dictified versions, let them pass through if sqlalchemy.inspect(dict_, raiseerr=False) is not None: return dict_ for attr in dict_: if mapper.has_property(attr): prop = mapper.get_property(attr) if hasattr(prop, 'mapper'): cls = prop.mapper.class_ value = dict_[attr] if prop.uselist: # Sequence of objects if isinstance(value, ModelSetResultList): # We know we do not need to try to convert these pass else: # Try to map incoming colander items back to SQL items value = [self[attr].children[0].objectify(obj) for obj in dict_[attr]] else: if hasattr(value, "__tablename__"): # Raw SQLAlchemy object - do not try to convert pass else: # Single object if value: value = self[attr].objectify(value) else: value = dict_[attr] if value is colander.null: # `colander.null` is never an appropriate # value to be placed on an SQLAlchemy object # so we translate it into `None`. value = None setattr(context, attr, value) elif hasattr(context, attr): # Set any properties on the object which are not SQLAlchemy column based. # These are JSONBProperty like user_data and password on the user model (actual column is called _password, but we mangle the password has before pushing it through) value = dict_[attr] setattr(context, attr, value) else: # Ignore attributes if they are not mapped logger.debug( 'SQLAlchemySchemaNode.objectify: %s not found on ' '%s. This property has been ignored.', attr, self ) continue return context
def __init__(self, db, store, read=True, write=True, read_through_write=True, delete=True, create_db=False, schema=None, create_schema=True): upgrade_db=False super(PostgresRepo, self).__init__(read=read, write=write, read_through_write=read_through_write, delete=delete) if not isinstance(db, string_type) and schema is not None: raise ValueError("You can only provide a schema with a DB url.") init_db = False if create_db and isinstance(db, string_type): _create_db_if_needed(db) init_db = True upgrade_db = False self._run = None if isinstance(db, string_type): if create_db: init_db = True self._db_engine = _db_engine(db, schema) self._sessionmaker = sqlalchemy.orm.sessionmaker(bind=self._db_engine) else: self._session = db if create_schema and schema is not None: with self.session() as session: q = sa.exists(sa.select([("schema_name")]).select_from(sa.text("information_schema.schemata")) .where(sa.text("schema_name = :schema") .bindparams(schema=schema))) if not session.query(q).scalar(): session.execute(CreateSchema(schema)) session.commit() init_db = True upgrade_db = False if init_db: self._db_init() if upgrade_db: self._db_upgrade() self.blobstore = store
def test_drop_with_complex_foreign_keys(self): from sqlalchemy.schema import ForeignKeyConstraint from sqlalchemy.schema import UniqueConstraint self.table.drop() self.meta.clear() # NOTE(mriedem): DB2 does not currently support unique constraints # on nullable columns, so the columns that are used to create the # foreign keys here need to be non-nullable for testing with DB2 # to work. # create FK's target reftable = Table('tmp_ref', self.meta, Column('id', Integer, primary_key=True), Column('jd', Integer, nullable=False), UniqueConstraint('id','jd') ) if self.engine.has_table(reftable.name): reftable.drop() reftable.create() # add a table with a complex foreign key constraint self.table = Table( self.table_name, self.meta, Column('id', Integer, primary_key=True), Column('r1', Integer, nullable=False), Column('r2', Integer, nullable=False), ForeignKeyConstraint(['r1','r2'], [reftable.c.id,reftable.c.jd], name='test_fk') ) self.table.create() # paranoid check self.assertEqual([['r1','r2']], self._actual_foreign_keys()) # delete one if self.engine.name == 'mysql': constraint.ForeignKeyConstraint([self.table.c.r1, self.table.c.r2], [reftable.c.id, reftable.c.jd], name='test_fk').drop() self.table.c.r2.drop() # check the constraint is gone, since part of it # is no longer there - if people hit this, # they may be confused, maybe we should raise an error # and insist that the constraint is deleted first, separately? self.assertEqual([], self._actual_foreign_keys())