我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.types.to_instance()。
def __init__(self, name, column_name, schema=None, newname=None, type_=None, nullable=None, default=False, autoincrement=None): super(AlterColumn, self).__init__(name, schema=schema) self.column_name = column_name self.nullable = nullable self.newname = newname self.default = default self.autoincrement = autoincrement if type_ is None: raise util.CommandError( "All MySQL CHANGE/MODIFY COLUMN operations " "require the existing type." ) self.type_ = sqltypes.to_instance(type_)
def __init__(self, name, column_name, schema=None, newname=None, type_=None, nullable=None, default=False, autoincrement=None): super(AlterColumn, self).__init__(name, schema=schema) self.column_name = column_name self.nullable = nullable self.newname = newname self.default = default self.autoincrement = autoincrement if type_ is None: raise util.CommandError( "All MySQL ALTER COLUMN operations " "require the existing type." ) self.type_ = sqltypes.to_instance(type_)
def __init__(self, name, column_name, schema=None, existing_type=None, existing_nullable=None, existing_server_default=None): super(AlterColumn, self).__init__(name, schema=schema) self.column_name = column_name self.existing_type = sqltypes.to_instance(existing_type) \ if existing_type is not None else None self.existing_nullable = existing_nullable self.existing_server_default = existing_server_default
def __init__(self, name, column_name, type_, **kw): super(ColumnType, self).__init__(name, column_name, **kw) self.type_ = sqltypes.to_instance(type_)
def alter_column(self, table_name, column_name, nullable=None, server_default=False, name=None, type_=None, autoincrement=None, **kw ): existing = self.columns[column_name] existing_transfer = self.column_transfers[column_name] if name is not None and name != column_name: # note that we don't change '.key' - we keep referring # to the renamed column by its old key in _create(). neat! existing.name = name existing_transfer["name"] = name if type_ is not None: type_ = sqltypes.to_instance(type_) # old type is being discarded so turn off eventing # rules. Alternatively we can # erase the events set up by this type, but this is simpler. # we also ignore the drop_constraint that will come here from # Operations.implementation_for(alter_column) if isinstance(existing.type, SchemaEventTarget): existing.type._create_events = \ existing.type.create_constraint = False existing.type = type_ # we *dont* however set events for the new type, because # alter_column is invoked from # Operations.implementation_for(alter_column) which already # will emit an add_constraint() existing_transfer["expr"] = cast(existing_transfer["expr"], type_) if nullable is not None: existing.nullable = nullable if server_default is not False: sql_schema.DefaultClause(server_default)._set_parent(existing) if autoincrement is not None: existing.autoincrement = bool(autoincrement)
def __init__(self, name, column_name, schema=None, existing_type=None, existing_nullable=None, existing_server_default=None): super(AlterColumn, self).__init__(name, schema=schema) self.column_name = column_name self.existing_type=sqltypes.to_instance(existing_type) \ if existing_type is not None else None self.existing_nullable=existing_nullable self.existing_server_default=existing_server_default
def __init__(self, name, column_name, type_, **kw): using = kw.pop('using', None) super(PostgresqlColumnType, self).__init__(name, column_name, **kw) self.type_ = sqltypes.to_instance(type_) self.using = using
def alter_column(self, table_name, column_name, nullable=None, server_default=False, name=None, type_=None, autoincrement=None, **kw ): existing = self.columns[column_name] existing_transfer = self.column_transfers[column_name] if name is not None and name != column_name: # note that we don't change '.key' - we keep referring # to the renamed column by its old key in _create(). neat! existing.name = name existing_transfer["name"] = name if type_ is not None: type_ = sqltypes.to_instance(type_) existing.type = type_ existing_transfer["expr"] = cast(existing_transfer["expr"], type_) if nullable is not None: existing.nullable = nullable if server_default is not False: existing.server_default = server_default if autoincrement is not None: existing.autoincrement = bool(autoincrement)
def as_mutable(cls, orig_sqltype): """Mark the value as nested mutable value. What happens here * We coerce the return value - the type value set on sqlalchemy.Column() to the underlying SQL typ * We mark this type value with a marker attribute * Then we set a global SQAlchemy mapper event handler * When mapper is done setting up our model classes, it will call the event handler for all models * We check if any of the models columns have our marked type value as the value * If so we call ``associate_with_attribute`` for this model and column that sets up ``MutableBase._listen_on_attribute`` event handlers. These event handlers take care of taking the raw dict coming out from database and wrapping it to NestedMutableDict. :param orig_sqltype: Usually websauna.system.model.column.JSONB instance :return: Marked and coerced type value """ # Create an instance of this type and add a marker attribute, # so we later find it. # We cannot directly compare the result type values, as looks like # the type value might be mangled by dialect specific implementations # or lost somewhere. Never figured this out 100%. sqltype = types.to_instance(orig_sqltype) sqltype._column_value_id = id(sqltype) def listen_for_type(mapper, class_): for prop in mapper.column_attrs: # The original implementation has SQLAlchemy type comparator. # Here we need to be little more complex, because we define a type alias # for generic JSONB implementation if getattr(prop.columns[0].type, "_column_value_id", None) == sqltype._column_value_id: cls.associate_with_attribute(getattr(class_, prop.key)) event.listen(mapper, 'mapper_configured', listen_for_type) return sqltype
def alter_column(self, table_name, column_name, nullable=None, server_default=False, name=None, type_=None, autoincrement=None, **kw ): existing = self.columns[column_name] existing_transfer = self.column_transfers[column_name] if name is not None and name != column_name: # note that we don't change '.key' - we keep referring # to the renamed column by its old key in _create(). neat! existing.name = name existing_transfer["name"] = name if type_ is not None: type_ = sqltypes.to_instance(type_) # old type is being discarded so turn off eventing # rules. Alternatively we can # erase the events set up by this type, but this is simpler. # we also ignore the drop_constraint that will come here from # Operations.implementation_for(alter_column) if isinstance(existing.type, SchemaEventTarget): existing.type._create_events = \ existing.type.create_constraint = False existing.type = type_ # we *dont* however set events for the new type, because # alter_column is invoked from # Operations.implementation_for(alter_column) which already # will emit an add_constraint() existing_transfer["expr"] = cast(existing_transfer["expr"], type_) if nullable is not None: existing.nullable = nullable if server_default is not False: if server_default is None: existing.server_default = None else: sql_schema.DefaultClause(server_default)._set_parent(existing) if autoincrement is not None: existing.autoincrement = bool(autoincrement)
def alter_column(self, table_name, column_name, nullable=None, server_default=False, name=None, type_=None, autoincrement=None, **kw ): existing = self.columns[column_name] existing_transfer = self.column_transfers[column_name] if name is not None and name != column_name: # note that we don't change '.key' - we keep referring # to the renamed column by its old key in _create(). neat! existing.name = name existing_transfer["name"] = name if type_ is not None: type_ = sqltypes.to_instance(type_) # old type is being discarded so turn off eventing # rules. Alternatively we can # erase the events set up by this type, but this is simpler. # we also ignore the drop_constraint that will come here from # Operations.implementation_for(alter_column) if isinstance(existing.type, SchemaEventTarget): existing.type._create_events = \ existing.type.create_constraint = False if existing.type._type_affinity is not type_._type_affinity: existing_transfer["expr"] = cast( existing_transfer["expr"], type_) existing.type = type_ # we *dont* however set events for the new type, because # alter_column is invoked from # Operations.implementation_for(alter_column) which already # will emit an add_constraint() if nullable is not None: existing.nullable = nullable if server_default is not False: if server_default is None: existing.server_default = None else: sql_schema.DefaultClause(server_default)._set_parent(existing) if autoincrement is not None: existing.autoincrement = bool(autoincrement)
def to_sql(self, frame, name, if_exists='fail', index=True, index_label=None, schema=None, chunksize=None, dtype=None): """ Write records stored in a DataFrame to a SQL database. Parameters ---------- frame : DataFrame name : string Name of SQL table if_exists : {'fail', 'replace', 'append'}, default 'fail' - fail: If table exists, do nothing. - replace: If table exists, drop it, recreate it, and insert data. - append: If table exists, insert data. Create if does not exist. index : boolean, default True Write DataFrame index as a column index_label : string or sequence, default None Column label for index column(s). If None is given (default) and `index` is True, then the index names are used. A sequence should be given if the DataFrame uses MultiIndex. schema : string, default None Name of SQL schema in database to write to (if database flavor supports this). If specified, this overwrites the default schema of the SQLDatabase object. chunksize : int, default None If not None, then rows will be written in batches of this size at a time. If None, all rows will be written at once. dtype : dict of column name to SQL type, default None Optional specifying the datatype for columns. The SQL type should be a SQLAlchemy type. """ if dtype is not None: from sqlalchemy.types import to_instance, TypeEngine for col, my_type in dtype.items(): if not isinstance(to_instance(my_type), TypeEngine): raise ValueError('The type of %s is not a SQLAlchemy ' 'type ' % col) table = SQLTable(name, self, frame=frame, index=index, if_exists=if_exists, index_label=index_label, schema=schema, dtype=dtype) table.create() table.insert(chunksize) # check for potentially case sensitivity issues (GH7815) engine = self.connectable.engine with self.connectable.connect() as conn: table_names = engine.table_names( schema=schema or self.meta.schema, connection=conn, ) if name not in table_names: warnings.warn("The provided table name '{0}' is not found exactly " "as such in the database after writing the table, " "possibly due to case sensitivity issues. Consider " "using lower case table names.".format(name), UserWarning)