Python sqlalchemy.types 模块,String() 实例源码


项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _render_server_default_for_compare(metadata_default,
                                       metadata_col, autogen_context):
    rendered = _user_defined_render(
        "server_default", metadata_default, autogen_context)
    if rendered is not False:
        return rendered

    if isinstance(metadata_default, sa_schema.DefaultClause):
        if isinstance(metadata_default.arg, compat.string_types):
            metadata_default = metadata_default.arg
            metadata_default = str(metadata_default.arg.compile(
    if isinstance(metadata_default, compat.string_types):
        if metadata_col.type._type_affinity is sqltypes.String:
            metadata_default = re.sub(r"^'|'$", "", metadata_default)
            return repr(metadata_default)
            return metadata_default
        return None
项目:osm-wikidata    作者:EdwardBetts    | 项目源码 | 文件源码
def url(self):
        return '{}/{}/{}'.format(osm_api_base, self.osm_type, self.osm_id)

# class ItemCandidateTag(Base):
#     __tablename__ = 'item_candidate_tag'
#     __table_args__ = (
#         ForeignKeyConstraint(['item_id', 'osm_id', 'osm_type'],
#                              [ItemCandidate.item_id,
#                               ItemCandidate.osm_id,
#                               ItemCandidate.osm_type]),
#     )
#     item_id = Column(Integer, primary_key=True)
#     osm_id = Column(BigInteger, primary_key=True)
#     osm_type = Column(osm_type_enum, primary_key=True)
#     k = Column(String, primary_key=True)
#     v = Column(String, primary_key=True)
#     item_candidate = relationship(ItemCandidate,
#                                   backref=backref('tag_table', lazy='dynamic'))
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def compare_server_default(self, inspector_column,

        # don't do defaults for SERIAL columns
        if metadata_column.primary_key and \
            metadata_column is metadata_column.table._autoincrement_column:
            return False

        conn_col_default = rendered_inspector_default

        if None in (conn_col_default, rendered_metadata_default):
            return conn_col_default != rendered_metadata_default

        if metadata_column.type._type_affinity is not sqltypes.String:
            rendered_metadata_default = re.sub(r"^'|'$", "", rendered_metadata_default)

        return not self.connection.scalar(
            "SELECT %s = %s" % (
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def _render_server_default_for_compare(metadata_default,
                                       metadata_col, autogen_context):
    rendered = _user_defined_render(
        "server_default", metadata_default, autogen_context)
    if rendered is not False:
        return rendered

    if isinstance(metadata_default, sa_schema.DefaultClause):
        if isinstance(metadata_default.arg, compat.string_types):
            metadata_default = metadata_default.arg
            metadata_default = str(metadata_default.arg.compile(
    if isinstance(metadata_default, compat.string_types):
        if metadata_col.type._type_affinity is sqltypes.String:
            metadata_default = re.sub(r"^'|'$", "", metadata_default)
            return repr(metadata_default)
            return metadata_default
        return None
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def compare_server_default(self, inspector_column,

        # don't do defaults for SERIAL columns
        if metadata_column.primary_key and \
            metadata_column is metadata_column.table._autoincrement_column:
            return False

        conn_col_default = rendered_inspector_default

        if None in (conn_col_default, rendered_metadata_default):
            return conn_col_default != rendered_metadata_default

        if metadata_column.type._type_affinity is not sqltypes.String:
            rendered_metadata_default = re.sub(r"^'|'$", "", rendered_metadata_default)

        return not self.connection.scalar(
            "SELECT %s = %s" % (
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def test_varchar_reflection(self):
        typ = self._type_round_trip(sql_types.String(52))[0]
        assert isinstance(typ, sql_types.String)
        eq_(typ.length, 52)
项目:Callandtext    作者:iaora    | 项目源码 | 文件源码
def _render_server_default_for_compare(metadata_default,
                                       metadata_col, autogen_context):
    rendered = _user_defined_render(
        "server_default", metadata_default, autogen_context)
    if rendered is not False:
        return rendered

    if isinstance(metadata_default, sa_schema.DefaultClause):
        if isinstance(metadata_default.arg, compat.string_types):
            metadata_default = metadata_default.arg
            metadata_default = str(metadata_default.arg.compile(
    if isinstance(metadata_default, compat.string_types):
        if metadata_col.type._type_affinity is sqltypes.String:
            metadata_default = re.sub(r"^'|'$", "", metadata_default)
            return repr(metadata_default)
            return metadata_default
        return None
项目:asyncpgsa    作者:CanopyTax    | 项目源码 | 文件源码
def test_querying_table(metadata):
    Create an object for test table.


    # When using pytest-xdist, we don't want concurrent table creations
    # across test processes so we assign a unique name for table based on
    # the current worker id.
    worker_id = os.environ.get('PYTEST_XDIST_WORKER', 'master')
    return Table(
        'test_querying_table_' + worker_id, metadata,
        Column('id', types.Integer, autoincrement=True, primary_key=True),
        Column('t_string', types.String(60)),
        Column('t_list', types.ARRAY(types.String(60))),
        Column('t_enum', types.Enum(MyEnum)),
        Column('t_int_enum', types.Enum(MyIntEnum)),
        Column('t_datetime', types.DateTime()),
        Column('t_date', types.DateTime()),
        Column('t_interval', types.Interval()),
        Column('uniq_uuid', PG_UUID, nullable=False, unique=True, default=uuid4),
项目:deb-python-sqlalchemy-utils    作者:openstack    | 项目源码 | 文件源码
def __init__(self, type_in=None, key=None, engine=None, **kwargs):
        if not cryptography:
            raise ImproperlyConfigured(
                "'cryptography' is required to use EncryptedType"
        super(EncryptedType, self).__init__(**kwargs)
        # set the underlying type
        if type_in is None:
            type_in = String()
        elif isinstance(type_in, type):
            type_in = type_in()
        self.underlying_type = type_in
        self._key = key
        if not engine:
            engine = AesEngine
        self.engine = engine()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def bind_processor(self, dialect):
            if dialect._cx_oracle_with_unicode:
                def process(value):
                    if value is None:
                        return value
                        return unicode(value)
                return process
                return super(
                    _NativeUnicodeMixin, self).bind_processor(dialect)

    # we apply a connection output handler that returns
    # unicode in all cases, so the "native_unicode" flag
    # will be set for the default String.result_processor.
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def define_tables(cls, metadata):
        Table('test_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('data', String(50))
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def assert_tables_equal(self, table, reflected_table, strict_types=False):
        assert len(table.c) == len(reflected_table.c)
        for c, reflected_c in zip(table.c, reflected_table.c):
            assert reflected_c is reflected_table.c[]
            eq_(c.primary_key, reflected_c.primary_key)
            eq_(c.nullable, reflected_c.nullable)

            if strict_types:
                msg = "Type '%s' doesn't correspond to type '%s'"
                assert isinstance(reflected_c.type, type(c.type)), \
                    msg % (reflected_c.type, c.type)
                self.assert_types_base(reflected_c, c)

            if isinstance(c.type, sqltypes.String):
                eq_(c.type.length, reflected_c.type.length)

                set([ for f in c.foreign_keys]),
                set([ for f in reflected_c.foreign_keys])
            if c.server_default:
                assert isinstance(reflected_c.server_default,

        assert len(table.primary_key) == len(reflected_table.primary_key)
        for c in table.primary_key:
            assert reflected_table.primary_key.columns[] is not None
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def with_variant(self, type_, dialect_name):
        """Produce a new type object that will utilize the given
        type when applied to the dialect of the given name.


            from sqlalchemy.types import String
            from sqlalchemy.dialects import mysql

            s = String()

            s = s.with_variant(mysql.VARCHAR(collation='foo'), 'mysql')

        The construction of :meth:`.TypeEngine.with_variant` is always
        from the "fallback" type to that which is dialect specific.
        The returned type is an instance of :class:`.Variant`, which
        itself provides a :meth:`.Variant.with_variant`
        that can be called repeatedly.

        :param type_: a :class:`.TypeEngine` that will be selected
         as a variant from the originating type, when a dialect
         of the given name is in use.
        :param dialect_name: base name of the dialect which uses
         this type. (i.e. ``'postgresql'``, ``'mysql'``, etc.)

        .. versionadded:: 0.7.2

        return Variant(self, {dialect_name: to_instance(type_)})
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def binary_operator_string(self, binary):
        if isinstance(binary.type, sqltypes.String) and binary.operator == '+':
            return '||'
            return ansisql.ANSICompiler.binary_operator_string(self, binary)
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def binary_operator_string(self, binary):
        if isinstance(binary.type, sqltypes.String) and binary.operator == '+':
            return '||'
            return ansisql.ANSICompiler.binary_operator_string(self, binary)
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def bind_processor(self, dialect):
            if dialect._cx_oracle_with_unicode:
                def process(value):
                    if value is None:
                        return value
                        return unicode(value)
                return process
                return super(
                    _NativeUnicodeMixin, self).bind_processor(dialect)

    # we apply a connection output handler that returns
    # unicode in all cases, so the "native_unicode" flag
    # will be set for the default String.result_processor.
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def define_tables(cls, metadata):
        Table('test_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('data', String(50))
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def assert_tables_equal(self, table, reflected_table, strict_types=False):
        assert len(table.c) == len(reflected_table.c)
        for c, reflected_c in zip(table.c, reflected_table.c):
            assert reflected_c is reflected_table.c[]
            eq_(c.primary_key, reflected_c.primary_key)
            eq_(c.nullable, reflected_c.nullable)

            if strict_types:
                msg = "Type '%s' doesn't correspond to type '%s'"
                assert isinstance(reflected_c.type, type(c.type)), \
                    msg % (reflected_c.type, c.type)
                self.assert_types_base(reflected_c, c)

            if isinstance(c.type, sqltypes.String):
                eq_(c.type.length, reflected_c.type.length)

                set([ for f in c.foreign_keys]),
                set([ for f in reflected_c.foreign_keys])
            if c.server_default:
                assert isinstance(reflected_c.server_default,

        assert len(table.primary_key) == len(reflected_table.primary_key)
        for c in table.primary_key:
            assert reflected_table.primary_key.columns[] is not None
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def with_variant(self, type_, dialect_name):
        """Produce a new type object that will utilize the given
        type when applied to the dialect of the given name.


            from sqlalchemy.types import String
            from sqlalchemy.dialects import mysql

            s = String()

            s = s.with_variant(mysql.VARCHAR(collation='foo'), 'mysql')

        The construction of :meth:`.TypeEngine.with_variant` is always
        from the "fallback" type to that which is dialect specific.
        The returned type is an instance of :class:`.Variant`, which
        itself provides a :meth:`.Variant.with_variant`
        that can be called repeatedly.

        :param type_: a :class:`.TypeEngine` that will be selected
         as a variant from the originating type, when a dialect
         of the given name is in use.
        :param dialect_name: base name of the dialect which uses
         this type. (i.e. ``'postgresql'``, ``'mysql'``, etc.)

        .. versionadded:: 0.7.2

        return Variant(self, {dialect_name: to_instance(type_)})
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def bind_processor(self, dialect):
            if dialect._cx_oracle_with_unicode:
                def process(value):
                    if value is None:
                        return value
                        return unicode(value)
                return process
                return super(
                    _NativeUnicodeMixin, self).bind_processor(dialect)

    # we apply a connection output handler that returns
    # unicode in all cases, so the "native_unicode" flag
    # will be set for the default String.result_processor.
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def define_tables(cls, metadata):
        Table('test_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('data', String(50))
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def _test_get_unique_constraints(self, schema=None):
        uniques = sorted(
                {'name': 'unique_a', 'column_names': ['a']},
                {'name': 'unique_a_b_c', 'column_names': ['a', 'b', 'c']},
                {'name': 'unique_c_a_b', 'column_names': ['c', 'a', 'b']},
                {'name': 'unique_asc_key', 'column_names': ['asc', 'key']},
        orig_meta = self.metadata
        table = Table(
            'testtbl', orig_meta,
            Column('a', sa.String(20)),
            Column('b', sa.String(30)),
            Column('c', sa.Integer),
            # reserved identifiers
            Column('asc', sa.String(30)),
            Column('key', sa.String(30)),
        for uc in uniques:
                sa.UniqueConstraint(*uc['column_names'], name=uc['name'])

        inspector = inspect(orig_meta.bind)
        reflected = sorted(
            inspector.get_unique_constraints('testtbl', schema=schema),

        for orig, refl in zip(uniques, reflected):
            eq_(orig, refl)
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def assert_tables_equal(self, table, reflected_table, strict_types=False):
        assert len(table.c) == len(reflected_table.c)
        for c, reflected_c in zip(table.c, reflected_table.c):
            assert reflected_c is reflected_table.c[]
            eq_(c.primary_key, reflected_c.primary_key)
            eq_(c.nullable, reflected_c.nullable)

            if strict_types:
                msg = "Type '%s' doesn't correspond to type '%s'"
                assert isinstance(reflected_c.type, type(c.type)), \
                    msg % (reflected_c.type, c.type)
                self.assert_types_base(reflected_c, c)

            if isinstance(c.type, sqltypes.String):
                eq_(c.type.length, reflected_c.type.length)

                set([ for f in c.foreign_keys]),
                set([ for f in reflected_c.foreign_keys])
            if c.server_default:
                assert isinstance(reflected_c.server_default,

        assert len(table.primary_key) == len(reflected_table.primary_key)
        for c in table.primary_key:
            assert reflected_table.primary_key.columns[] is not None
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def with_variant(self, type_, dialect_name):
        """Produce a new type object that will utilize the given
        type when applied to the dialect of the given name.


            from sqlalchemy.types import String
            from sqlalchemy.dialects import mysql

            s = String()

            s = s.with_variant(mysql.VARCHAR(collation='foo'), 'mysql')

        The construction of :meth:`.TypeEngine.with_variant` is always
        from the "fallback" type to that which is dialect specific.
        The returned type is an instance of :class:`.Variant`, which
        itself provides a :meth:`.Variant.with_variant`
        that can be called repeatedly.

        :param type_: a :class:`.TypeEngine` that will be selected
         as a variant from the originating type, when a dialect
         of the given name is in use.
        :param dialect_name: base name of the dialect which uses
         this type. (i.e. ``'postgresql'``, ``'mysql'``, etc.)

        .. versionadded:: 0.7.2

        return Variant(self, {dialect_name: to_instance(type_)})
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def bind_processor(self, dialect):
            if dialect._cx_oracle_with_unicode:
                def process(value):
                    if value is None:
                        return value
                        return unicode(value)
                return process
                return super(
                    _NativeUnicodeMixin, self).bind_processor(dialect)

    # we apply a connection output handler that returns
    # unicode in all cases, so the "native_unicode" flag
    # will be set for the default String.result_processor.
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def define_tables(cls, metadata):
        Table('test_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('data', String(50))
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def _test_get_unique_constraints(self, schema=None):
        uniques = sorted(
                {'name': 'unique_a', 'column_names': ['a']},
                {'name': 'unique_a_b_c', 'column_names': ['a', 'b', 'c']},
                {'name': 'unique_c_a_b', 'column_names': ['c', 'a', 'b']},
                {'name': 'unique_asc_key', 'column_names': ['asc', 'key']},
        orig_meta = self.metadata
        table = Table(
            'testtbl', orig_meta,
            Column('a', sa.String(20)),
            Column('b', sa.String(30)),
            Column('c', sa.Integer),
            # reserved identifiers
            Column('asc', sa.String(30)),
            Column('key', sa.String(30)),
        for uc in uniques:
                sa.UniqueConstraint(*uc['column_names'], name=uc['name'])

        inspector = inspect(orig_meta.bind)
        reflected = sorted(
            inspector.get_unique_constraints('testtbl', schema=schema),

        for orig, refl in zip(uniques, reflected):
            eq_(orig, refl)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def assert_tables_equal(self, table, reflected_table, strict_types=False):
        assert len(table.c) == len(reflected_table.c)
        for c, reflected_c in zip(table.c, reflected_table.c):
            assert reflected_c is reflected_table.c[]
            eq_(c.primary_key, reflected_c.primary_key)
            eq_(c.nullable, reflected_c.nullable)

            if strict_types:
                msg = "Type '%s' doesn't correspond to type '%s'"
                assert isinstance(reflected_c.type, type(c.type)), \
                    msg % (reflected_c.type, c.type)
                self.assert_types_base(reflected_c, c)

            if isinstance(c.type, sqltypes.String):
                eq_(c.type.length, reflected_c.type.length)

                set([ for f in c.foreign_keys]),
                set([ for f in reflected_c.foreign_keys])
            if c.server_default:
                assert isinstance(reflected_c.server_default,

        assert len(table.primary_key) == len(reflected_table.primary_key)
        for c in table.primary_key:
            assert reflected_table.primary_key.columns[] is not None
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def with_variant(self, type_, dialect_name):
        """Produce a new type object that will utilize the given
        type when applied to the dialect of the given name.


            from sqlalchemy.types import String
            from sqlalchemy.dialects import mysql

            s = String()

            s = s.with_variant(mysql.VARCHAR(collation='foo'), 'mysql')

        The construction of :meth:`.TypeEngine.with_variant` is always
        from the "fallback" type to that which is dialect specific.
        The returned type is an instance of :class:`.Variant`, which
        itself provides a :meth:`~sqlalchemy.types.Variant.with_variant`
        that can be called repeatedly.

        :param type_: a :class:`.TypeEngine` that will be selected
         as a variant from the originating type, when a dialect
         of the given name is in use.
        :param dialect_name: base name of the dialect which uses
         this type. (i.e. ``'postgresql'``, ``'mysql'``, etc.)

        .. versionadded:: 0.7.2

        return Variant(self, {dialect_name: to_instance(type_)})
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def bind_processor(self, dialect):
            if dialect._cx_oracle_with_unicode:
                def process(value):
                    if value is None:
                        return value
                        return unicode(value)
                return process
                return super(
                    _NativeUnicodeMixin, self).bind_processor(dialect)

    # we apply a connection output handler that returns
    # unicode in all cases, so the "native_unicode" flag
    # will be set for the default String.result_processor.
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def define_tables(cls, metadata):
        Table('test_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('data', String(50))
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def assert_tables_equal(self, table, reflected_table, strict_types=False):
        assert len(table.c) == len(reflected_table.c)
        for c, reflected_c in zip(table.c, reflected_table.c):
            assert reflected_c is reflected_table.c[]
            eq_(c.primary_key, reflected_c.primary_key)
            eq_(c.nullable, reflected_c.nullable)

            if strict_types:
                msg = "Type '%s' doesn't correspond to type '%s'"
                assert isinstance(reflected_c.type, type(c.type)), \
                    msg % (reflected_c.type, c.type)
                self.assert_types_base(reflected_c, c)

            if isinstance(c.type, sqltypes.String):
                eq_(c.type.length, reflected_c.type.length)

                set([ for f in c.foreign_keys]),
                set([ for f in reflected_c.foreign_keys])
            if c.server_default:
                assert isinstance(reflected_c.server_default,

        assert len(table.primary_key) == len(reflected_table.primary_key)
        for c in table.primary_key:
            assert reflected_table.primary_key.columns[] is not None
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def with_variant(self, type_, dialect_name):
        """Produce a new type object that will utilize the given
        type when applied to the dialect of the given name.


            from sqlalchemy.types import String
            from sqlalchemy.dialects import mysql

            s = String()

            s = s.with_variant(mysql.VARCHAR(collation='foo'), 'mysql')

        The construction of :meth:`.TypeEngine.with_variant` is always
        from the "fallback" type to that which is dialect specific.
        The returned type is an instance of :class:`.Variant`, which
        itself provides a :meth:`.Variant.with_variant`
        that can be called repeatedly.

        :param type_: a :class:`.TypeEngine` that will be selected
         as a variant from the originating type, when a dialect
         of the given name is in use.
        :param dialect_name: base name of the dialect which uses
         this type. (i.e. ``'postgresql'``, ``'mysql'``, etc.)

        .. versionadded:: 0.7.2

        return Variant(self, {dialect_name: to_instance(type_)})
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def bind_processor(self, dialect):
            if dialect._cx_oracle_with_unicode:
                def process(value):
                    if value is None:
                        return value
                        return unicode(value)
                return process
                return super(
                    _NativeUnicodeMixin, self).bind_processor(dialect)

    # we apply a connection output handler that returns
    # unicode in all cases, so the "native_unicode" flag
    # will be set for the default String.result_processor.
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def define_tables(cls, metadata):
        Table('test_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('data', String(50))
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def assert_tables_equal(self, table, reflected_table, strict_types=False):
        assert len(table.c) == len(reflected_table.c)
        for c, reflected_c in zip(table.c, reflected_table.c):
            assert reflected_c is reflected_table.c[]
            eq_(c.primary_key, reflected_c.primary_key)
            eq_(c.nullable, reflected_c.nullable)

            if strict_types:
                msg = "Type '%s' doesn't correspond to type '%s'"
                assert isinstance(reflected_c.type, type(c.type)), \
                    msg % (reflected_c.type, c.type)
                self.assert_types_base(reflected_c, c)

            if isinstance(c.type, sqltypes.String):
                eq_(c.type.length, reflected_c.type.length)

                set([ for f in c.foreign_keys]),
                set([ for f in reflected_c.foreign_keys])
            if c.server_default:
                assert isinstance(reflected_c.server_default,

        assert len(table.primary_key) == len(reflected_table.primary_key)
        for c in table.primary_key:
            assert reflected_table.primary_key.columns[] is not None
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def with_variant(self, type_, dialect_name):
        """Produce a new type object that will utilize the given
        type when applied to the dialect of the given name.


            from sqlalchemy.types import String
            from sqlalchemy.dialects import mysql

            s = String()

            s = s.with_variant(mysql.VARCHAR(collation='foo'), 'mysql')

        The construction of :meth:`.TypeEngine.with_variant` is always
        from the "fallback" type to that which is dialect specific.
        The returned type is an instance of :class:`.Variant`, which
        itself provides a :meth:`.Variant.with_variant`
        that can be called repeatedly.

        :param type_: a :class:`.TypeEngine` that will be selected
         as a variant from the originating type, when a dialect
         of the given name is in use.
        :param dialect_name: base name of the dialect which uses
         this type. (i.e. ``'postgresql'``, ``'mysql'``, etc.)

        .. versionadded:: 0.7.2

        return Variant(self, {dialect_name: to_instance(type_)})
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def bind_processor(self, dialect):
        if dialect._cx_oracle_with_unicode:
            def process(value):
                if value is None:
                    return value
                    return unicode(value)
            return process
            return super(_NativeUnicodeMixin, self).bind_processor(dialect)
    # end Py2K

    # we apply a connection output handler that returns
    # unicode in all cases, so the "native_unicode" flag
    # will be set for the default String.result_processor.
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def define_tables(cls, metadata):
        Table('test_table', metadata,
                Column('id', Integer, primary_key=True),
                Column('data', String(50))
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def assert_tables_equal(self, table, reflected_table, strict_types=False):
        assert len(table.c) == len(reflected_table.c)
        for c, reflected_c in zip(table.c, reflected_table.c):
            assert reflected_c is reflected_table.c[]
            eq_(c.primary_key, reflected_c.primary_key)
            eq_(c.nullable, reflected_c.nullable)

            if strict_types:
                msg = "Type '%s' doesn't correspond to type '%s'"
                assert type(reflected_c.type) is type(c.type), \
                    msg % (reflected_c.type, c.type)
                self.assert_types_base(reflected_c, c)

            if isinstance(c.type, sqltypes.String):
                eq_(c.type.length, reflected_c.type.length)

                set([ for f in c.foreign_keys]),
                set([ for f in reflected_c.foreign_keys])
            if c.server_default:
                assert isinstance(reflected_c.server_default,

        assert len(table.primary_key) == len(reflected_table.primary_key)
        for c in table.primary_key:
            assert reflected_table.primary_key.columns[] is not None
项目:sqlacodegen    作者:agronholm    | 项目源码 | 文件源码
def __init__(self, table):
        super(Model, self).__init__()
        self.table = table
        self.schema = table.schema

        # Adapt column types to the most reasonable generic types (ie. VARCHAR -> String)
        for column in table.columns:
            cls = column.type.__class__
            for supercls in cls.__mro__:
                if hasattr(supercls, '__visit_name__'):
                    cls = supercls
                if supercls.__name__ != supercls.__name__.upper() and not supercls.__name__.startswith('_'):

            column.type = column.type.adapt(cls)
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def bind_processor(self, dialect):
            if dialect._cx_oracle_with_unicode:
                def process(value):
                    if value is None:
                        return value
                        return unicode(value)
                return process
                return super(
                    _NativeUnicodeMixin, self).bind_processor(dialect)

    # we apply a connection output handler that returns
    # unicode in all cases, so the "native_unicode" flag
    # will be set for the default String.result_processor.
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def define_tables(cls, metadata):
        Table('test_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('data', String(50))
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def assert_tables_equal(self, table, reflected_table, strict_types=False):
        assert len(table.c) == len(reflected_table.c)
        for c, reflected_c in zip(table.c, reflected_table.c):
            assert reflected_c is reflected_table.c[]
            eq_(c.primary_key, reflected_c.primary_key)
            eq_(c.nullable, reflected_c.nullable)

            if strict_types:
                msg = "Type '%s' doesn't correspond to type '%s'"
                assert isinstance(reflected_c.type, type(c.type)), \
                    msg % (reflected_c.type, c.type)
                self.assert_types_base(reflected_c, c)

            if isinstance(c.type, sqltypes.String):
                eq_(c.type.length, reflected_c.type.length)

                set([ for f in c.foreign_keys]),
                set([ for f in reflected_c.foreign_keys])
            if c.server_default:
                assert isinstance(reflected_c.server_default,

        assert len(table.primary_key) == len(reflected_table.primary_key)
        for c in table.primary_key:
            assert reflected_table.primary_key.columns[] is not None
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def with_variant(self, type_, dialect_name):
        """Produce a new type object that will utilize the given
        type when applied to the dialect of the given name.


            from sqlalchemy.types import String
            from sqlalchemy.dialects import mysql

            s = String()

            s = s.with_variant(mysql.VARCHAR(collation='foo'), 'mysql')

        The construction of :meth:`.TypeEngine.with_variant` is always
        from the "fallback" type to that which is dialect specific.
        The returned type is an instance of :class:`.Variant`, which
        itself provides a :meth:`.Variant.with_variant`
        that can be called repeatedly.

        :param type_: a :class:`.TypeEngine` that will be selected
         as a variant from the originating type, when a dialect
         of the given name is in use.
        :param dialect_name: base name of the dialect which uses
         this type. (i.e. ``'postgresql'``, ``'mysql'``, etc.)

        .. versionadded:: 0.7.2

        return Variant(self, {dialect_name: to_instance(type_)})
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def bind_processor(self, dialect):
            if dialect._cx_oracle_with_unicode:
                def process(value):
                    if value is None:
                        return value
                        return unicode(value)
                return process
                return super(
                    _NativeUnicodeMixin, self).bind_processor(dialect)

    # we apply a connection output handler that returns
    # unicode in all cases, so the "native_unicode" flag
    # will be set for the default String.result_processor.
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def define_tables(cls, metadata):
        Table('test_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('data', String(50))
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def assert_tables_equal(self, table, reflected_table, strict_types=False):
        assert len(table.c) == len(reflected_table.c)
        for c, reflected_c in zip(table.c, reflected_table.c):
            assert reflected_c is reflected_table.c[]
            eq_(c.primary_key, reflected_c.primary_key)
            eq_(c.nullable, reflected_c.nullable)

            if strict_types:
                msg = "Type '%s' doesn't correspond to type '%s'"
                assert isinstance(reflected_c.type, type(c.type)), \
                    msg % (reflected_c.type, c.type)
                self.assert_types_base(reflected_c, c)

            if isinstance(c.type, sqltypes.String):
                eq_(c.type.length, reflected_c.type.length)

                set([ for f in c.foreign_keys]),
                set([ for f in reflected_c.foreign_keys])
            if c.server_default:
                assert isinstance(reflected_c.server_default,

        assert len(table.primary_key) == len(reflected_table.primary_key)
        for c in table.primary_key:
            assert reflected_table.primary_key.columns[] is not None
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def with_variant(self, type_, dialect_name):
        """Produce a new type object that will utilize the given
        type when applied to the dialect of the given name.


            from sqlalchemy.types import String
            from sqlalchemy.dialects import mysql

            s = String()

            s = s.with_variant(mysql.VARCHAR(collation='foo'), 'mysql')

        The construction of :meth:`.TypeEngine.with_variant` is always
        from the "fallback" type to that which is dialect specific.
        The returned type is an instance of :class:`.Variant`, which
        itself provides a :meth:`.Variant.with_variant`
        that can be called repeatedly.

        :param type_: a :class:`.TypeEngine` that will be selected
         as a variant from the originating type, when a dialect
         of the given name is in use.
        :param dialect_name: base name of the dialect which uses
         this type. (i.e. ``'postgresql'``, ``'mysql'``, etc.)

        .. versionadded:: 0.7.2

        return Variant(self, {dialect_name: to_instance(type_)})
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def bind_processor(self, dialect):
            if dialect._cx_oracle_with_unicode:
                def process(value):
                    if value is None:
                        return value
                        return unicode(value)
                return process
                return super(
                    _NativeUnicodeMixin, self).bind_processor(dialect)

    # we apply a connection output handler that returns
    # unicode in all cases, so the "native_unicode" flag
    # will be set for the default String.result_processor.