Python sqlalchemy.types 模块,Integer() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.types.Integer()

项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def get_column_specification(self, column, override_pk=False, **kwargs):
        colspec = column.name
        if column.primary_key and isinstance(column.type, types.Integer) and (column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional)):
            colspec += " SERIAL"
        else:
            colspec += " " + column.type.get_col_spec()
            default = self.get_column_default_string(column)
            if default is not None:
                colspec += " DEFAULT " + default

        if not column.nullable:
            colspec += " NOT NULL"
        if column.primary_key and not override_pk:
            colspec += " PRIMARY KEY"
        if column.foreign_key:
            colspec += " REFERENCES %s(%s)" % (column.column.foreign_key.column.table.fullname, column.column.foreign_key.column.name) 
        return colspec
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def get_column_specification(self, column, override_pk=False, first_pk=False):
        colspec = column.name + " " + column.type.get_col_spec()
        default = self.get_column_default_string(column)
        if default is not None:
            colspec += " DEFAULT " + default

        if not column.nullable:
            colspec += " NOT NULL"
        if column.primary_key:
            if not override_pk:
                colspec += " PRIMARY KEY"
            if first_pk and isinstance(column.type, types.Integer):
                colspec += " AUTO_INCREMENT"
        if column.foreign_key:
            colspec += ", FOREIGN KEY (%s) REFERENCES %s(%s)" % (column.name, column.column.foreign_key.column.table.name, column.column.foreign_key.column.name) 
        return colspec
项目:osm-wikidata    作者:EdwardBetts    | 项目源码 | 文件源码
def url(self):
        return '{}/{}/{}'.format(osm_api_base, self.osm_type, self.osm_id)

# class ItemCandidateTag(Base):
#     __tablename__ = 'item_candidate_tag'
#     __table_args__ = (
#         ForeignKeyConstraint(['item_id', 'osm_id', 'osm_type'],
#                              [ItemCandidate.item_id,
#                               ItemCandidate.osm_id,
#                               ItemCandidate.osm_type]),
#     )
#
#     item_id = Column(Integer, primary_key=True)
#     osm_id = Column(BigInteger, primary_key=True)
#     osm_type = Column(osm_type_enum, primary_key=True)
#     k = Column(String, primary_key=True)
#     v = Column(String, primary_key=True)
#
#     item_candidate = relationship(ItemCandidate,
#                                   backref=backref('tag_table', lazy='dynamic'))
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def get_column_specification(self, column, **kw):
        colspec = self.preparer.format_column(column)
        first = None
        if column.primary_key and column.autoincrement:
            try:
                first = [c for c in column.table.primary_key.columns
                         if (c.autoincrement and
                             isinstance(c.type, sqltypes.Integer) and
                             not c.foreign_keys)].pop(0)
            except IndexError:
                pass

        if column is first:
            colspec += " SERIAL"
        else:
            colspec += " " + self.dialect.type_compiler.process(column.type)
            default = self.get_column_default_string(column)
            if default is not None:
                colspec += " DEFAULT " + default

        if not column.nullable:
            colspec += " NOT NULL"

        return colspec
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def get_column_specification(self, column, **kwargs):
        coltype = self.dialect.type_compiler.process(column.type)
        colspec = self.preparer.format_column(column) + " " + coltype
        default = self.get_column_default_string(column)
        if default is not None:
            colspec += " DEFAULT " + default

        if not column.nullable:
            colspec += " NOT NULL"

        if (column.primary_key and
            column.table.kwargs.get('sqlite_autoincrement', False) and
            len(column.table.primary_key.columns) == 1 and
            issubclass(column.type._type_affinity, sqltypes.Integer) and
            not column.foreign_keys):
                colspec += " PRIMARY KEY AUTOINCREMENT"

        return colspec
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def _expression_adaptations(self):
        return {
            operators.add: {
                Integer: self.__class__,
                Interval: DateTime,
                Time: DateTime,
            },
            operators.sub: {
                # date - integer = date
                Integer: self.__class__,

                # date - date = integer.
                Date: Integer,

                Interval: DateTime,

                # date - datetime = interval,
                # this one is not in the PG docs
                # but works
                DateTime: Interval,
            },
        }
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def visit_column(self, delta):
        table = delta.table
        colspec = self.get_column_specification(delta.result_column)
        if delta.result_column.autoincrement:
            primary_keys = [c for c in table.primary_key.columns
                       if (c.autoincrement and
                            isinstance(c.type, sqltypes.Integer) and
                            not c.foreign_keys)]

            if primary_keys:
                first = primary_keys.pop(0)
                if first.name == delta.current_name:
                    colspec += " AUTO_INCREMENT"
        old_col_name = self.preparer.quote(delta.current_name, table.quote)

        self.start_alter_table(table)

        self.append("CHANGE COLUMN %s " % old_col_name)
        self.append(colspec)
        self.execute()
项目:asyncpgsa    作者:CanopyTax    | 项目源码 | 文件源码
def test_querying_table(metadata):
    """
    Create an object for test table.

    """

    # When using pytest-xdist, we don't want concurrent table creations
    # across test processes so we assign a unique name for table based on
    # the current worker id.
    worker_id = os.environ.get('PYTEST_XDIST_WORKER', 'master')
    return Table(
        'test_querying_table_' + worker_id, metadata,
        Column('id', types.Integer, autoincrement=True, primary_key=True),
        Column('t_string', types.String(60)),
        Column('t_list', types.ARRAY(types.String(60))),
        Column('t_enum', types.Enum(MyEnum)),
        Column('t_int_enum', types.Enum(MyIntEnum)),
        Column('t_datetime', types.DateTime()),
        Column('t_date', types.DateTime()),
        Column('t_interval', types.Interval()),
        Column('uniq_uuid', PG_UUID, nullable=False, unique=True, default=uuid4),
    )
项目:Hawkeye    作者:tozhengxq    | 项目源码 | 文件源码
def visit_column(self, delta):
        table = delta.table
        colspec = self.get_column_specification(delta.result_column)
        if delta.result_column.autoincrement:
            primary_keys = [c for c in table.primary_key.columns
                       if (c.autoincrement and
                            isinstance(c.type, sqltypes.Integer) and
                            not c.foreign_keys)]

            if primary_keys:
                first = primary_keys.pop(0)
                if first.name == delta.current_name:
                    colspec += " AUTO_INCREMENT"
        q = util.safe_quote(table)
        old_col_name = self.preparer.quote(delta.current_name, q)

        self.start_alter_table(table)

        self.append("CHANGE COLUMN %s " % old_col_name)
        self.append(colspec)
        self.execute()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def define_tables(cls, metadata):
        Table('test_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('data', String(50))
              )
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_nullable_reflection(self):
        t = Table('t', self.metadata,
                  Column('a', Integer, nullable=True),
                  Column('b', Integer, nullable=False))
        t.create()
        eq_(
            dict(
                (col['name'], col['nullable'])
                for col in inspect(self.metadata.bind).get_columns('t')
            ),
            {"a": True, "b": False}
        )
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def compare_server_default(self, inspector_column,
                               metadata_column,
                               rendered_metadata_default,
                               rendered_inspector_default):
        # partially a workaround for SQLAlchemy issue #3023; if the
        # column were created without "NOT NULL", MySQL may have added
        # an implicit default of '0' which we need to skip
        if metadata_column.type._type_affinity is sqltypes.Integer and \
            inspector_column.primary_key and \
                not inspector_column.autoincrement and \
                not rendered_metadata_default and \
                rendered_inspector_default == "'0'":
            return False
        else:
            return rendered_inspector_default != rendered_metadata_default
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def check_constraint(self, name, source, condition, schema=None, **kw):
        t = sa_schema.Table(source, self.metadata(),
                            sa_schema.Column('x', Integer), schema=schema)
        ck = sa_schema.CheckConstraint(condition, name=name, **kw)
        t.append_constraint(ck)
        return ck
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def get_column_default(self, column):
        if column.primary_key and isinstance(column.type, types.Integer) and (column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional)):
            c = self.proxy("select nextval('%s_%s_seq')" % (column.table.name, column.name))
            return c.fetchone()[0]
        else:
            return ansisql.ANSIDefaultRunner.get_column_default(self, column)
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def _oid_col(self):
        # OID remains a little hackish so far
        if not hasattr(self, '_oid_column'):
            if self.table.engine.oid_column_name() is not None:
                self._oid_column = schema.Column(self.table.engine.oid_column_name(), sqltypes.Integer, hidden=True)
                self._oid_column._set_parent(self.table)
            else:
                self._oid_column = None
        return self._oid_column
项目:scrobbler    作者:hatarist    | 项目源码 | 文件源码
def tag(name=None):
    sort_by = request.args.get('sort_by')

    if sort_by == 'play_count':
        sort_by = [Artist.playcount.desc()]
    elif sort_by == 'local_play_count':
        sort_by = [Artist.local_playcount.desc()]
    elif sort_by == 'tag_strength':
        sort_by = [desc('strength')]
    else:
        sort_by = [desc('strength'), Artist.local_playcount.desc()]

    name = name.lower()

    top_artists = (
        db.session.query(Artist.name, Artist.tags[name].cast(Integer).label('strength'), Artist.local_playcount, Artist.playcount)
        .filter(Artist.tags.has_key(name))
        .order_by(*sort_by)
        .all()
    )

    top_artists = enumerate(top_artists, start=1)

    return render_template(
        'meta/tag.html',
        tag=tag,
        top_artists=top_artists,
    )
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def define_tables(cls, metadata):
        Table('test_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('data', String(50))
              )
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def test_nullable_reflection(self):
        t = Table('t', self.metadata,
                  Column('a', Integer, nullable=True),
                  Column('b', Integer, nullable=False))
        t.create()
        eq_(
            dict(
                (col['name'], col['nullable'])
                for col in inspect(self.metadata.bind).get_columns('t')
            ),
            {"a": True, "b": False}
        )
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def visit_typeclause(self, typeclause):
        type_ = typeclause.type.dialect_impl(self.dialect)
        if isinstance(type_, sqltypes.Integer):
            return 'INTEGER'
        else:
            return super(DrizzleCompiler, self).visit_typeclause(typeclause)
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def define_tables(cls, metadata):
        Table('test_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('data', String(50))
              )
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def test_nullable_reflection(self):
        t = Table('t', self.metadata,
                  Column('a', Integer, nullable=True),
                  Column('b', Integer, nullable=False))
        t.create()
        eq_(
            dict(
                (col['name'], col['nullable'])
                for col in inspect(self.metadata.bind).get_columns('t')
            ),
            {"a": True, "b": False}
        )
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def _check_constraint(self, name, source, condition, schema=None, **kw):
        t = sa_schema.Table(source, sa_schema.MetaData(),
                    sa_schema.Column('x', Integer), schema=schema)
        ck = sa_schema.CheckConstraint(condition, name=name, **kw)
        t.append_constraint(ck)
        return ck
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def bulk_insert(self, table, rows):
        """Issue a "bulk insert" operation using the current
        migration context.

        This provides a means of representing an INSERT of multiple rows
        which works equally well in the context of executing on a live
        connection as well as that of generating a SQL script.   In the
        case of a SQL script, the values are rendered inline into the
        statement.

        e.g.::

            from alembic import op
            from datetime import date
            from sqlalchemy.sql import table, column
            from sqlalchemy import String, Integer, Date

            # Create an ad-hoc table to use for the insert statement.
            accounts_table = table('account',
                column('id', Integer),
                column('name', String),
                column('create_date', Date)
            )

            op.bulk_insert(accounts_table,
                [
                    {'id':1, 'name':'John Smith',
                            'create_date':date(2010, 10, 5)},
                    {'id':2, 'name':'Ed Williams',
                            'create_date':date(2007, 5, 27)},
                    {'id':3, 'name':'Wendy Jones',
                            'create_date':date(2008, 8, 15)},
                ]
            )
          """
        self.impl.bulk_insert(table, rows)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def visit_typeclause(self, typeclause):
        type_ = typeclause.type.dialect_impl(self.dialect)
        if isinstance(type_, sqltypes.Integer):
            return 'INTEGER'
        else:
            return super(DrizzleCompiler, self).visit_typeclause(typeclause)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def define_tables(cls, metadata):
        Table('test_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('data', String(50))
              )
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_nullable_reflection(self):
        t = Table('t', self.metadata,
                  Column('a', Integer, nullable=True),
                  Column('b', Integer, nullable=False))
        t.create()
        eq_(
            dict(
                (col['name'], col['nullable'])
                for col in inspect(self.metadata.bind).get_columns('t')
            ),
            {"a": True, "b": False}
        )
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def define_tables(cls, metadata):
        Table('test_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('data', String(50))
              )
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def test_nullable_reflection(self):
        t = Table('t', self.metadata,
                  Column('a', Integer, nullable=True),
                  Column('b', Integer, nullable=False))
        t.create()
        eq_(
            dict(
                (col['name'], col['nullable'])
                for col in inspect(self.metadata.bind).get_columns('t')
            ),
            {"a": True, "b": False}
        )
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def compare_server_default(self, inspector_column,
                               metadata_column,
                               rendered_metadata_default,
                               rendered_inspector_default):
        # partially a workaround for SQLAlchemy issue #3023; if the
        # column were created without "NOT NULL", MySQL may have added
        # an implicit default of '0' which we need to skip
        if metadata_column.type._type_affinity is sqltypes.Integer and \
            inspector_column.primary_key and \
                not inspector_column.autoincrement and \
                not rendered_metadata_default and \
                rendered_inspector_default == "'0'":
            return False
        else:
            return rendered_inspector_default != rendered_metadata_default
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def check_constraint(self, name, source, condition, schema=None, **kw):
        t = sa_schema.Table(source, self.metadata(),
                            sa_schema.Column('x', Integer), schema=schema)
        ck = sa_schema.CheckConstraint(condition, name=name, **kw)
        t.append_constraint(ck)
        return ck
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def define_tables(cls, metadata):
        Table('test_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('data', String(50))
              )
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def test_nullable_reflection(self):
        t = Table('t', self.metadata,
                  Column('a', Integer, nullable=True),
                  Column('b', Integer, nullable=False))
        t.create()
        eq_(
            dict(
                (col['name'], col['nullable'])
                for col in inspect(self.metadata.bind).get_columns('t')
            ),
            {"a": True, "b": False}
        )
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def define_tables(cls, metadata):
        Table(
            quoted_name('t1', quote=True), metadata,
            Column('id', Integer, primary_key=True),
        )
        Table(
            quoted_name('t2', quote=True), metadata,
            Column('id', Integer, primary_key=True),
            Column('t1id', ForeignKey('t1.id'))
        )
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def _adapt_expression(self, op, other_comparator):
            """evaluate the return type of <self> <op> <othertype>,
            and apply any adaptations to the given operator.

            This method determines the type of a resulting binary expression
            given two source types and an operator.   For example, two
            :class:`.Column` objects, both of the type :class:`.Integer`, will
            produce a :class:`.BinaryExpression` that also has the type
            :class:`.Integer` when compared via the addition (``+``) operator.
            However, using the addition operator with an :class:`.Integer`
            and a :class:`.Date` object will produce a :class:`.Date`, assuming
            "days delta" behavior by the database (in reality, most databases
            other than PostgreSQL don't accept this particular operation).

            The method returns a tuple of the form <operator>, <type>.
            The resulting operator and type will be those applied to the
            resulting :class:`.BinaryExpression` as the final operator and the
            right-hand side of the expression.

            Note that only a subset of operators make usage of
            :meth:`._adapt_expression`,
            including math operators and user-defined operators, but not
            boolean comparison or special SQL keywords like MATCH or BETWEEN.

            """
            return op, self.type
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def compare_server_default(self, inspector_column,
                               metadata_column,
                               rendered_metadata_default,
                               rendered_inspector_default):
        # partially a workaround for SQLAlchemy issue #3023; if the
        # column were created without "NOT NULL", MySQL may have added
        # an implicit default of '0' which we need to skip
        if metadata_column.type._type_affinity is sqltypes.Integer and \
            inspector_column.primary_key and \
                not inspector_column.autoincrement and \
                not rendered_metadata_default and \
                rendered_inspector_default == "'0'":
            return False
        else:
            return rendered_inspector_default != rendered_metadata_default
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def check_constraint(self, name, source, condition, schema=None, **kw):
        t = sa_schema.Table(source, self.metadata(),
                            sa_schema.Column('x', Integer), schema=schema)
        ck = sa_schema.CheckConstraint(condition, name=name, **kw)
        t.append_constraint(ck)
        return ck
项目:ckanext-requestdata    作者:ViderumGlobal    | 项目源码 | 文件源码
def define_request_data_counters_table():
    global request_data_counters_table

    request_data_counters_table = Table('ckanext_requestdata_counters',
                                        metadata,
                                        Column('id', types.UnicodeText,
                                               primary_key=True,
                                               default=make_uuid),
                                        Column('package_id',
                                               types.UnicodeText),
                                        Column('org_id', types.UnicodeText),
                                        Column('requests', types.Integer,
                                               default=0),
                                        Column('replied', types.Integer,
                                               default=0),
                                        Column('declined', types.Integer,
                                               default=0),
                                        Column('shared', types.Integer,
                                               default=0),
                                        Index('ckanext_requestdata_counters_'
                                              'id_idx', 'id'))

    mapper(
        ckanextRequestDataCounters,
        request_data_counters_table
    )
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def get_column_default_string(self, column):
        if (isinstance(column.server_default, schema.DefaultClause) and
            isinstance(column.server_default.arg, basestring)):
                if isinstance(column.type, (sqltypes.Integer, sqltypes.Numeric)):
                    return self.sql_compiler.process(text(column.server_default.arg))

        return super(InfoDDLCompiler, self).get_column_default_string(column)

    ### Informix wants the constraint name at the end, hence this ist c&p from sql/compiler.py
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def visit_primary_key_constraint(self, constraint):
        # for columns with sqlite_autoincrement=True,
        # the PRIMARY KEY constraint can only be inline
        # with the column itself.
        if len(constraint.columns) == 1:
            c = list(constraint)[0]
            if c.primary_key and \
                c.table.kwargs.get('sqlite_autoincrement', False) and \
                issubclass(c.type._type_affinity, sqltypes.Integer) and \
                not c.foreign_keys:
                return None

        return super(SQLiteDDLCompiler, self).\
                    visit_primary_key_constraint(constraint)
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def visit_typeclause(self, typeclause):
        type_ = typeclause.type.dialect_impl(self.dialect)
        if isinstance(type_, sqltypes.Integer):
            return 'INTEGER'
        else:
            return super(DrizzleCompiler, self).visit_typeclause(typeclause)
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def _expression_adaptations(self):
        return {
            operators.mul: {
                Interval: Interval,
                Numeric: self.__class__,
                Integer: self.__class__,
            },
            # Py2K
            operators.div: {
                Numeric: self.__class__,
                Integer: self.__class__,
            },
            # end Py2K
            operators.truediv: {
                Numeric: self.__class__,
                Integer: self.__class__,
            },
            operators.add: {
                Numeric: self.__class__,
                Integer: self.__class__,
            },
            operators.sub: {
                Numeric: self.__class__,
                Integer: self.__class__,
            }
        }
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def define_tables(cls, metadata):
        Table('test_table', metadata,
                Column('id', Integer, primary_key=True),
                Column('data', String(50))
            )
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def _check_constraint(self, name, source, condition, schema=None, **kw):
        t = sa_schema.Table(source, sa_schema.MetaData(),
                    sa_schema.Column('x', Integer), schema=schema)
        ck = sa_schema.CheckConstraint(condition, name=name, **kw)
        t.append_constraint(ck)
        return ck
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def bulk_insert(self, table, rows):
        """Issue a "bulk insert" operation using the current
        migration context.

        This provides a means of representing an INSERT of multiple rows
        which works equally well in the context of executing on a live
        connection as well as that of generating a SQL script.   In the
        case of a SQL script, the values are rendered inline into the
        statement.

        e.g.::

            from alembic import op
            from datetime import date
            from sqlalchemy.sql import table, column
            from sqlalchemy import String, Integer, Date

            # Create an ad-hoc table to use for the insert statement.
            accounts_table = table('account',
                column('id', Integer),
                column('name', String),
                column('create_date', Date)
            )

            op.bulk_insert(accounts_table,
                [
                    {'id':1, 'name':'John Smith',
                            'create_date':date(2010, 10, 5)},
                    {'id':2, 'name':'Ed Williams',
                            'create_date':date(2007, 5, 27)},
                    {'id':3, 'name':'Wendy Jones',
                            'create_date':date(2008, 8, 15)},
                ]
            )
          """
        self.impl.bulk_insert(table, rows)
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def define_tables(cls, metadata):
        Table('test_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('data', String(50))
              )
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def test_nullable_reflection(self):
        t = Table('t', self.metadata,
                  Column('a', Integer, nullable=True),
                  Column('b', Integer, nullable=False))
        t.create()
        eq_(
            dict(
                (col['name'], col['nullable'])
                for col in inspect(self.metadata.bind).get_columns('t')
            ),
            {"a": True, "b": False}
        )
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def _adapt_expression(self, op, other_comparator):
            """evaluate the return type of <self> <op> <othertype>,
            and apply any adaptations to the given operator.

            This method determines the type of a resulting binary expression
            given two source types and an operator.   For example, two
            :class:`.Column` objects, both of the type :class:`.Integer`, will
            produce a :class:`.BinaryExpression` that also has the type
            :class:`.Integer` when compared via the addition (``+``) operator.
            However, using the addition operator with an :class:`.Integer`
            and a :class:`.Date` object will produce a :class:`.Date`, assuming
            "days delta" behavior by the database (in reality, most databases
            other than PostgreSQL don't accept this particular operation).

            The method returns a tuple of the form <operator>, <type>.
            The resulting operator and type will be those applied to the
            resulting :class:`.BinaryExpression` as the final operator and the
            right-hand side of the expression.

            Note that only a subset of operators make usage of
            :meth:`._adapt_expression`,
            including math operators and user-defined operators, but not
            boolean comparison or special SQL keywords like MATCH or BETWEEN.

            """
            return op, self.type
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def compare_server_default(self, inspector_column,
                               metadata_column,
                               rendered_metadata_default,
                               rendered_inspector_default):
        # partially a workaround for SQLAlchemy issue #3023; if the
        # column were created without "NOT NULL", MySQL may have added
        # an implicit default of '0' which we need to skip
        if metadata_column.type._type_affinity is sqltypes.Integer and \
            inspector_column.primary_key and \
                not inspector_column.autoincrement and \
                not rendered_metadata_default and \
                rendered_inspector_default == "'0'":
            return False
        else:
            return rendered_inspector_default != rendered_metadata_default
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def check_constraint(self, name, source, condition, schema=None, **kw):
        t = sa_schema.Table(source, self.metadata(),
                            sa_schema.Column('x', Integer), schema=schema)
        ck = sa_schema.CheckConstraint(condition, name=name, **kw)
        t.append_constraint(ck)
        return ck
项目:quark    作者:openstack    | 项目源码 | 文件源码
def coerce_compared_value(self, op, value):
        # NOTE(mdietz): If left unimplemented, the column is coerced into a
        # string every time, causing the next_auto_assign_increment to be a
        # string concatenation rather than an addition. 'value' in the
        # signature is the "other" value being compared for the purposes of
        # casting.
        if isinstance(value, int):
            return types.Integer()
        return self