我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.types.VARCHAR。
def test_enum_detection(self): Table( 'simple_items', self.metadata, Column('enum', VARCHAR(255)), CheckConstraint(r"simple_items.enum IN ('A', '\'B', 'C')") ) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import Column, Enum, MetaData, Table metadata = MetaData() t_simple_items = Table( 'simple_items', metadata, Column('enum', Enum('A', "\\\\'B", 'C')) ) """
def test_mysql_column_types(self): Table( 'simple_items', self.metadata, Column('id', mysql.INTEGER), Column('name', mysql.VARCHAR(255)) ) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import Column, Integer, MetaData, String, Table metadata = MetaData() t_simple_items = Table( 'simple_items', metadata, Column('id', Integer), Column('name', String(255)) ) """
def test_foreign_key_options(self): Table( 'simple_items', self.metadata, Column('name', VARCHAR, ForeignKey('simple_items.name', ondelete='CASCADE', onupdate='CASCADE', deferrable=True, initially='DEFERRED')) ) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import Column, ForeignKey, MetaData, String, Table metadata = MetaData() t_simple_items = Table( 'simple_items', metadata, Column('name', String, ForeignKey('simple_items.name', ondelete='CASCADE', onupdate='CASCADE', \ deferrable=True, initially='DEFERRED')) ) """
def load_dialect_impl(self, dialect: dialects) -> DialectType: """ SQLAlchemy wraps all database-specific features into dialects, which are then responsible for generating the SQL code for a specific DB type when loading in data. ``load_dialect_impl`` is called when CRUD (create, update, delete operations) needs to be done on the database. This method is responsible for telling SQLAlchemy how to configure the dialect to write this type :param dialect: The loaded dialect :return: The type descriptor for this type. """ if dialect.name == 'postgresql': return dialect.type_descriptor(postgresql.JSON()) elif dialect.name == 'mysql': if 'JSON' in dialect.ischema_names: return dialect.type_descriptor(mysql.JSON()) else: return dialect.type_descriptor( VARCHAR(self._MAX_VARCHAR_LIMIT) ) else: return dialect.type_descriptor(VARCHAR(self._MAX_VARCHAR_LIMIT))
def str_to_sqltype(expr): import re import sqlalchemy.types as sqltypes norm_expr = expr.lower() if norm_expr.startswith('integer'): match_result = re.match(r'integer\((\d+)\)', norm_expr) if match_result is not None: return sqltypes.BIGINT() if int(match_result.group(1)) > 11 else sqltypes.INTEGER() return sqltypes.BIGINT() if norm_expr == 'decimal': return sqltypes.DECIMAL() if norm_expr == 'date': return sqltypes.DATETIME() if norm_expr == 'bool' or norm_expr == 'boolean': return sqltypes.BOOLEAN() if norm_expr.startswith('string'): match_result = re.match(r'string\((\d+)\)', norm_expr) if match_result is not None: maxlen = int(match_result.group(1)) return sqltypes.VARCHAR(maxlen) if maxlen < 65536 else sqltypes.TEXT return sqltypes.TEXT() raise RuntimeError("Unsupported data type [" + expr + "]")
def __init__(self, length=None, **kwargs): super(VARCHAR, self).__init__(length=length, **kwargs)
def visit_VARCHAR(self, type_, **kw): if not type_.length: raise exc.CompileError( "VARCHAR requires a length on dialect %s" % self.dialect.name) basic = super(FBTypeCompiler, self).visit_VARCHAR(type_, **kw) return self._extend_string(type_, basic)
def visit_VARCHAR(self, type_): if not type_.length: raise exc.CompileError( "VARCHAR requires a length on dialect %s" % self.dialect.name) basic = super(FBTypeCompiler, self).visit_VARCHAR(type_) return self._extend_string(type_, basic)
def __init__(self, length=None, **kwargs): """Construct a VARCHAR. :param collation: Optional, a column-level collation for this string value. Takes precedence to 'binary' short-hand. :param binary: Defaults to False: short-hand, pick the binary collation type that matches the column's character set. Generates BINARY in schema. This does not affect the type of data stored, only the collation of character data. """ super(VARCHAR, self).__init__(length=length, **kwargs)
def _get_column_info(self, name, type_, nullable, default, primary_key): match = re.match(r'(\w+)(\(.*?\))?', type_) if match: coltype = match.group(1) args = match.group(2) else: coltype = "VARCHAR" args = '' try: coltype = self.ischema_names[coltype] if args is not None: args = re.findall(r'(\d+)', args) coltype = coltype(*[int(a) for a in args]) except KeyError: util.warn("Did not recognize type '%s' of column '%s'" % (coltype, name)) coltype = sqltypes.NullType() if default is not None: default = unicode(default) return { 'name': name, 'type': coltype, 'nullable': nullable, 'default': default, 'autoincrement': default is None, 'primary_key': primary_key }
def test_indexes_class(self): simple_items = Table( 'simple_items', self.metadata, Column('id', INTEGER, primary_key=True), Column('number', INTEGER), Column('text', VARCHAR) ) simple_items.indexes.add(Index('idx_number', simple_items.c.number)) simple_items.indexes.add(Index('idx_text_number', simple_items.c.text, simple_items.c.number)) simple_items.indexes.add(Index('idx_text', simple_items.c.text, unique=True)) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import Column, Index, Integer, String from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() metadata = Base.metadata class SimpleItem(Base): __tablename__ = 'simple_items' __table_args__ = ( Index('idx_text_number', 'text', 'number'), ) id = Column(Integer, primary_key=True) number = Column(Integer, index=True) text = Column(String, unique=True) """
def sqltype_to_stdtype(sqltype): import sqlalchemy.types as sqltypes if isinstance(sqltype, (sqltypes.VARCHAR, sqltypes.CHAR, sqltypes.TEXT, sqltypes.Enum, sqltypes.String)): return _STRING_TYPE if isinstance(sqltype, (sqltypes.DATETIME, sqltypes.DATE, sqltypes.TIME, sqltypes.TIMESTAMP)): return _DATE_TYPE if isinstance(sqltype, (sqltypes.INTEGER, sqltypes.BIGINT, sqltypes.SMALLINT, sqltypes.Integer)): return _INTEGER_TYPE if isinstance(sqltype, (sqltypes.REAL, sqltypes.DECIMAL, sqltypes.NUMERIC, sqltypes.FLOAT)): return _DECIMAL_TYPE if isinstance(sqltype, sqltypes.BOOLEAN): return _BOOLEAN_TYPE
def stdtype_to_sqltype(stdtype): import sqlalchemy.types as sqltypes if isinstance(stdtype, stdtypes.StringType): return sqltypes.VARCHAR(length=stdtype.max_len) if 0 < stdtype.max_len < 65536 else sqltypes.TEXT() if isinstance(stdtype, stdtypes.BoolType): return sqltypes.BOOLEAN() if isinstance(stdtype, stdtypes.DateType): return sqltypes.DATE() if stdtype.only_date else sqltypes.TIMESTAMP() if isinstance(stdtype, stdtypes.IntegerType): return sqltypes.BIGINT() if stdtype.length > 11 else sqltypes.INTEGER() if isinstance(stdtype, stdtypes.DecimalType): return sqltypes.DECIMAL() if isinstance(stdtype, stdtypes.ArrayType): return sqltypes.ARRAY(item_type=stdtype.item_type)
def tick_insert(data): logging.debug("tick_data_rows: %d", len(data)) dtypes = { k: VARCHAR(32) for k, v in data.dtypes.items() if v.name == 'object'} dtypes['time'] = DATETIME with db_buffer.connect() as conn: data.to_sql('tick_data', conn, if_exists="append", index=False, dtype=dtypes, chunksize=None)
def history_insert(data): logging.debug("history_data_rows: %d", len(data)) dtypes = { k: VARCHAR(32) for k, v in data.dtypes.items() if v.name == 'object'} with db_buffer.connect() as conn: data.to_sql('history', conn, index=False, if_exists="append", dtype=dtypes, chunksize=None)
def history_index_insert(data): logging.debug("history_index_data_rows: %d", len(data)) dtypes = { k: VARCHAR(32) for k, v in data.dtypes.items() if v.name == 'object'} with db_buffer.connect() as conn: data.to_sql('history_index', conn, index=False, if_exists="append", dtype=dtypes, chunksize=None)
def fetch_stock_basics(conn): logging.debug("Fetch stocks") df = ts.get_stock_basics() df['timeToMarket'] = df['timeToMarket'].map(lambda s: datetime.strptime(str(s), '%Y%m%d') if s > 0 else None) df.to_sql('stock_basics', conn, if_exists="replace", dtype={"code": VARCHAR(32)})
def fetch_index_list(conn): logging.debug("Fetch indices") df = ts.get_index()[['code', 'name']] df.to_sql('stock_index', conn, if_exists="replace", dtype={"code": VARCHAR(32)})