我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.types.String()。
def _render_server_default_for_compare(metadata_default, metadata_col, autogen_context): rendered = _user_defined_render( "server_default", metadata_default, autogen_context) if rendered is not False: return rendered if isinstance(metadata_default, sa_schema.DefaultClause): if isinstance(metadata_default.arg, compat.string_types): metadata_default = metadata_default.arg else: metadata_default = str(metadata_default.arg.compile( dialect=autogen_context.dialect)) if isinstance(metadata_default, compat.string_types): if metadata_col.type._type_affinity is sqltypes.String: metadata_default = re.sub(r"^'|'$", "", metadata_default) return repr(metadata_default) else: return metadata_default else: return None
def url(self): return '{}/{}/{}'.format(osm_api_base, self.osm_type, self.osm_id) # class ItemCandidateTag(Base): # __tablename__ = 'item_candidate_tag' # __table_args__ = ( # ForeignKeyConstraint(['item_id', 'osm_id', 'osm_type'], # [ItemCandidate.item_id, # ItemCandidate.osm_id, # ItemCandidate.osm_type]), # ) # # item_id = Column(Integer, primary_key=True) # osm_id = Column(BigInteger, primary_key=True) # osm_type = Column(osm_type_enum, primary_key=True) # k = Column(String, primary_key=True) # v = Column(String, primary_key=True) # # item_candidate = relationship(ItemCandidate, # backref=backref('tag_table', lazy='dynamic'))
def compare_server_default(self, inspector_column, metadata_column, rendered_metadata_default, rendered_inspector_default): # don't do defaults for SERIAL columns if metadata_column.primary_key and \ metadata_column is metadata_column.table._autoincrement_column: return False conn_col_default = rendered_inspector_default if None in (conn_col_default, rendered_metadata_default): return conn_col_default != rendered_metadata_default if metadata_column.type._type_affinity is not sqltypes.String: rendered_metadata_default = re.sub(r"^'|'$", "", rendered_metadata_default) return not self.connection.scalar( "SELECT %s = %s" % ( conn_col_default, rendered_metadata_default ) )
def test_varchar_reflection(self): typ = self._type_round_trip(sql_types.String(52))[0] assert isinstance(typ, sql_types.String) eq_(typ.length, 52)
def _render_server_default_for_compare(metadata_default, metadata_col, autogen_context): rendered = _user_defined_render( "server_default", metadata_default, autogen_context) if rendered is not False: return rendered if isinstance(metadata_default, sa_schema.DefaultClause): if isinstance(metadata_default.arg, compat.string_types): metadata_default = metadata_default.arg else: metadata_default = str(metadata_default.arg.compile( dialect=autogen_context['dialect'])) if isinstance(metadata_default, compat.string_types): if metadata_col.type._type_affinity is sqltypes.String: metadata_default = re.sub(r"^'|'$", "", metadata_default) return repr(metadata_default) else: return metadata_default else: return None
def test_querying_table(metadata): """ Create an object for test table. """ # When using pytest-xdist, we don't want concurrent table creations # across test processes so we assign a unique name for table based on # the current worker id. worker_id = os.environ.get('PYTEST_XDIST_WORKER', 'master') return Table( 'test_querying_table_' + worker_id, metadata, Column('id', types.Integer, autoincrement=True, primary_key=True), Column('t_string', types.String(60)), Column('t_list', types.ARRAY(types.String(60))), Column('t_enum', types.Enum(MyEnum)), Column('t_int_enum', types.Enum(MyIntEnum)), Column('t_datetime', types.DateTime()), Column('t_date', types.DateTime()), Column('t_interval', types.Interval()), Column('uniq_uuid', PG_UUID, nullable=False, unique=True, default=uuid4), )
def __init__(self, type_in=None, key=None, engine=None, **kwargs): """Initialization.""" if not cryptography: raise ImproperlyConfigured( "'cryptography' is required to use EncryptedType" ) super(EncryptedType, self).__init__(**kwargs) # set the underlying type if type_in is None: type_in = String() elif isinstance(type_in, type): type_in = type_in() self.underlying_type = type_in self._key = key if not engine: engine = AesEngine self.engine = engine()
def bind_processor(self, dialect): if dialect._cx_oracle_with_unicode: def process(value): if value is None: return value else: return unicode(value) return process else: return super( _NativeUnicodeMixin, self).bind_processor(dialect) # we apply a connection output handler that returns # unicode in all cases, so the "native_unicode" flag # will be set for the default String.result_processor.
def define_tables(cls, metadata): Table('test_table', metadata, Column('id', Integer, primary_key=True), Column('data', String(50)) )
def assert_tables_equal(self, table, reflected_table, strict_types=False): assert len(table.c) == len(reflected_table.c) for c, reflected_c in zip(table.c, reflected_table.c): eq_(c.name, reflected_c.name) assert reflected_c is reflected_table.c[c.name] eq_(c.primary_key, reflected_c.primary_key) eq_(c.nullable, reflected_c.nullable) if strict_types: msg = "Type '%s' doesn't correspond to type '%s'" assert isinstance(reflected_c.type, type(c.type)), \ msg % (reflected_c.type, c.type) else: self.assert_types_base(reflected_c, c) if isinstance(c.type, sqltypes.String): eq_(c.type.length, reflected_c.type.length) eq_( set([f.column.name for f in c.foreign_keys]), set([f.column.name for f in reflected_c.foreign_keys]) ) if c.server_default: assert isinstance(reflected_c.server_default, schema.FetchedValue) assert len(table.primary_key) == len(reflected_table.primary_key) for c in table.primary_key: assert reflected_table.primary_key.columns[c.name] is not None
def with_variant(self, type_, dialect_name): """Produce a new type object that will utilize the given type when applied to the dialect of the given name. e.g.:: from sqlalchemy.types import String from sqlalchemy.dialects import mysql s = String() s = s.with_variant(mysql.VARCHAR(collation='foo'), 'mysql') The construction of :meth:`.TypeEngine.with_variant` is always from the "fallback" type to that which is dialect specific. The returned type is an instance of :class:`.Variant`, which itself provides a :meth:`.Variant.with_variant` that can be called repeatedly. :param type_: a :class:`.TypeEngine` that will be selected as a variant from the originating type, when a dialect of the given name is in use. :param dialect_name: base name of the dialect which uses this type. (i.e. ``'postgresql'``, ``'mysql'``, etc.) .. versionadded:: 0.7.2 """ return Variant(self, {dialect_name: to_instance(type_)})
def binary_operator_string(self, binary): if isinstance(binary.type, sqltypes.String) and binary.operator == '+': return '||' else: return ansisql.ANSICompiler.binary_operator_string(self, binary)
def _test_get_unique_constraints(self, schema=None): uniques = sorted( [ {'name': 'unique_a', 'column_names': ['a']}, {'name': 'unique_a_b_c', 'column_names': ['a', 'b', 'c']}, {'name': 'unique_c_a_b', 'column_names': ['c', 'a', 'b']}, {'name': 'unique_asc_key', 'column_names': ['asc', 'key']}, ], key=operator.itemgetter('name') ) orig_meta = self.metadata table = Table( 'testtbl', orig_meta, Column('a', sa.String(20)), Column('b', sa.String(30)), Column('c', sa.Integer), # reserved identifiers Column('asc', sa.String(30)), Column('key', sa.String(30)), schema=schema ) for uc in uniques: table.append_constraint( sa.UniqueConstraint(*uc['column_names'], name=uc['name']) ) orig_meta.create_all() inspector = inspect(orig_meta.bind) reflected = sorted( inspector.get_unique_constraints('testtbl', schema=schema), key=operator.itemgetter('name') ) for orig, refl in zip(uniques, reflected): eq_(orig, refl)
def with_variant(self, type_, dialect_name): """Produce a new type object that will utilize the given type when applied to the dialect of the given name. e.g.:: from sqlalchemy.types import String from sqlalchemy.dialects import mysql s = String() s = s.with_variant(mysql.VARCHAR(collation='foo'), 'mysql') The construction of :meth:`.TypeEngine.with_variant` is always from the "fallback" type to that which is dialect specific. The returned type is an instance of :class:`.Variant`, which itself provides a :meth:`~sqlalchemy.types.Variant.with_variant` that can be called repeatedly. :param type_: a :class:`.TypeEngine` that will be selected as a variant from the originating type, when a dialect of the given name is in use. :param dialect_name: base name of the dialect which uses this type. (i.e. ``'postgresql'``, ``'mysql'``, etc.) .. versionadded:: 0.7.2 """ return Variant(self, {dialect_name: to_instance(type_)})
def bind_processor(self, dialect): if dialect._cx_oracle_with_unicode: def process(value): if value is None: return value else: return unicode(value) return process else: return super(_NativeUnicodeMixin, self).bind_processor(dialect) # end Py2K # we apply a connection output handler that returns # unicode in all cases, so the "native_unicode" flag # will be set for the default String.result_processor.
def assert_tables_equal(self, table, reflected_table, strict_types=False): assert len(table.c) == len(reflected_table.c) for c, reflected_c in zip(table.c, reflected_table.c): eq_(c.name, reflected_c.name) assert reflected_c is reflected_table.c[c.name] eq_(c.primary_key, reflected_c.primary_key) eq_(c.nullable, reflected_c.nullable) if strict_types: msg = "Type '%s' doesn't correspond to type '%s'" assert type(reflected_c.type) is type(c.type), \ msg % (reflected_c.type, c.type) else: self.assert_types_base(reflected_c, c) if isinstance(c.type, sqltypes.String): eq_(c.type.length, reflected_c.type.length) eq_( set([f.column.name for f in c.foreign_keys]), set([f.column.name for f in reflected_c.foreign_keys]) ) if c.server_default: assert isinstance(reflected_c.server_default, schema.FetchedValue) assert len(table.primary_key) == len(reflected_table.primary_key) for c in table.primary_key: assert reflected_table.primary_key.columns[c.name] is not None
def __init__(self, table): super(Model, self).__init__() self.table = table self.schema = table.schema # Adapt column types to the most reasonable generic types (ie. VARCHAR -> String) for column in table.columns: cls = column.type.__class__ for supercls in cls.__mro__: if hasattr(supercls, '__visit_name__'): cls = supercls if supercls.__name__ != supercls.__name__.upper() and not supercls.__name__.startswith('_'): break column.type = column.type.adapt(cls)