我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.types.Integer()。
def get_column_specification(self, column, override_pk=False, **kwargs): colspec = column.name if column.primary_key and isinstance(column.type, types.Integer) and (column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional)): colspec += " SERIAL" else: colspec += " " + column.type.get_col_spec() default = self.get_column_default_string(column) if default is not None: colspec += " DEFAULT " + default if not column.nullable: colspec += " NOT NULL" if column.primary_key and not override_pk: colspec += " PRIMARY KEY" if column.foreign_key: colspec += " REFERENCES %s(%s)" % (column.column.foreign_key.column.table.fullname, column.column.foreign_key.column.name) return colspec
def get_column_specification(self, column, override_pk=False, first_pk=False): colspec = column.name + " " + column.type.get_col_spec() default = self.get_column_default_string(column) if default is not None: colspec += " DEFAULT " + default if not column.nullable: colspec += " NOT NULL" if column.primary_key: if not override_pk: colspec += " PRIMARY KEY" if first_pk and isinstance(column.type, types.Integer): colspec += " AUTO_INCREMENT" if column.foreign_key: colspec += ", FOREIGN KEY (%s) REFERENCES %s(%s)" % (column.name, column.column.foreign_key.column.table.name, column.column.foreign_key.column.name) return colspec
def url(self): return '{}/{}/{}'.format(osm_api_base, self.osm_type, self.osm_id) # class ItemCandidateTag(Base): # __tablename__ = 'item_candidate_tag' # __table_args__ = ( # ForeignKeyConstraint(['item_id', 'osm_id', 'osm_type'], # [ItemCandidate.item_id, # ItemCandidate.osm_id, # ItemCandidate.osm_type]), # ) # # item_id = Column(Integer, primary_key=True) # osm_id = Column(BigInteger, primary_key=True) # osm_type = Column(osm_type_enum, primary_key=True) # k = Column(String, primary_key=True) # v = Column(String, primary_key=True) # # item_candidate = relationship(ItemCandidate, # backref=backref('tag_table', lazy='dynamic'))
def get_column_specification(self, column, **kw): colspec = self.preparer.format_column(column) first = None if column.primary_key and column.autoincrement: try: first = [c for c in column.table.primary_key.columns if (c.autoincrement and isinstance(c.type, sqltypes.Integer) and not c.foreign_keys)].pop(0) except IndexError: pass if column is first: colspec += " SERIAL" else: colspec += " " + self.dialect.type_compiler.process(column.type) default = self.get_column_default_string(column) if default is not None: colspec += " DEFAULT " + default if not column.nullable: colspec += " NOT NULL" return colspec
def get_column_specification(self, column, **kwargs): coltype = self.dialect.type_compiler.process(column.type) colspec = self.preparer.format_column(column) + " " + coltype default = self.get_column_default_string(column) if default is not None: colspec += " DEFAULT " + default if not column.nullable: colspec += " NOT NULL" if (column.primary_key and column.table.kwargs.get('sqlite_autoincrement', False) and len(column.table.primary_key.columns) == 1 and issubclass(column.type._type_affinity, sqltypes.Integer) and not column.foreign_keys): colspec += " PRIMARY KEY AUTOINCREMENT" return colspec
def _expression_adaptations(self): return { operators.add: { Integer: self.__class__, Interval: DateTime, Time: DateTime, }, operators.sub: { # date - integer = date Integer: self.__class__, # date - date = integer. Date: Integer, Interval: DateTime, # date - datetime = interval, # this one is not in the PG docs # but works DateTime: Interval, }, }
def visit_column(self, delta): table = delta.table colspec = self.get_column_specification(delta.result_column) if delta.result_column.autoincrement: primary_keys = [c for c in table.primary_key.columns if (c.autoincrement and isinstance(c.type, sqltypes.Integer) and not c.foreign_keys)] if primary_keys: first = primary_keys.pop(0) if first.name == delta.current_name: colspec += " AUTO_INCREMENT" old_col_name = self.preparer.quote(delta.current_name, table.quote) self.start_alter_table(table) self.append("CHANGE COLUMN %s " % old_col_name) self.append(colspec) self.execute()
def test_querying_table(metadata): """ Create an object for test table. """ # When using pytest-xdist, we don't want concurrent table creations # across test processes so we assign a unique name for table based on # the current worker id. worker_id = os.environ.get('PYTEST_XDIST_WORKER', 'master') return Table( 'test_querying_table_' + worker_id, metadata, Column('id', types.Integer, autoincrement=True, primary_key=True), Column('t_string', types.String(60)), Column('t_list', types.ARRAY(types.String(60))), Column('t_enum', types.Enum(MyEnum)), Column('t_int_enum', types.Enum(MyIntEnum)), Column('t_datetime', types.DateTime()), Column('t_date', types.DateTime()), Column('t_interval', types.Interval()), Column('uniq_uuid', PG_UUID, nullable=False, unique=True, default=uuid4), )
def visit_column(self, delta): table = delta.table colspec = self.get_column_specification(delta.result_column) if delta.result_column.autoincrement: primary_keys = [c for c in table.primary_key.columns if (c.autoincrement and isinstance(c.type, sqltypes.Integer) and not c.foreign_keys)] if primary_keys: first = primary_keys.pop(0) if first.name == delta.current_name: colspec += " AUTO_INCREMENT" q = util.safe_quote(table) old_col_name = self.preparer.quote(delta.current_name, q) self.start_alter_table(table) self.append("CHANGE COLUMN %s " % old_col_name) self.append(colspec) self.execute()
def define_tables(cls, metadata): Table('test_table', metadata, Column('id', Integer, primary_key=True), Column('data', String(50)) )
def test_nullable_reflection(self): t = Table('t', self.metadata, Column('a', Integer, nullable=True), Column('b', Integer, nullable=False)) t.create() eq_( dict( (col['name'], col['nullable']) for col in inspect(self.metadata.bind).get_columns('t') ), {"a": True, "b": False} )
def compare_server_default(self, inspector_column, metadata_column, rendered_metadata_default, rendered_inspector_default): # partially a workaround for SQLAlchemy issue #3023; if the # column were created without "NOT NULL", MySQL may have added # an implicit default of '0' which we need to skip if metadata_column.type._type_affinity is sqltypes.Integer and \ inspector_column.primary_key and \ not inspector_column.autoincrement and \ not rendered_metadata_default and \ rendered_inspector_default == "'0'": return False else: return rendered_inspector_default != rendered_metadata_default
def check_constraint(self, name, source, condition, schema=None, **kw): t = sa_schema.Table(source, self.metadata(), sa_schema.Column('x', Integer), schema=schema) ck = sa_schema.CheckConstraint(condition, name=name, **kw) t.append_constraint(ck) return ck
def get_column_default(self, column): if column.primary_key and isinstance(column.type, types.Integer) and (column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional)): c = self.proxy("select nextval('%s_%s_seq')" % (column.table.name, column.name)) return c.fetchone()[0] else: return ansisql.ANSIDefaultRunner.get_column_default(self, column)
def _oid_col(self): # OID remains a little hackish so far if not hasattr(self, '_oid_column'): if self.table.engine.oid_column_name() is not None: self._oid_column = schema.Column(self.table.engine.oid_column_name(), sqltypes.Integer, hidden=True) self._oid_column._set_parent(self.table) else: self._oid_column = None return self._oid_column
def tag(name=None): sort_by = request.args.get('sort_by') if sort_by == 'play_count': sort_by = [Artist.playcount.desc()] elif sort_by == 'local_play_count': sort_by = [Artist.local_playcount.desc()] elif sort_by == 'tag_strength': sort_by = [desc('strength')] else: sort_by = [desc('strength'), Artist.local_playcount.desc()] name = name.lower() top_artists = ( db.session.query(Artist.name, Artist.tags[name].cast(Integer).label('strength'), Artist.local_playcount, Artist.playcount) .filter(Artist.tags.has_key(name)) .order_by(*sort_by) .all() ) top_artists = enumerate(top_artists, start=1) return render_template( 'meta/tag.html', tag=tag, top_artists=top_artists, )
def visit_typeclause(self, typeclause): type_ = typeclause.type.dialect_impl(self.dialect) if isinstance(type_, sqltypes.Integer): return 'INTEGER' else: return super(DrizzleCompiler, self).visit_typeclause(typeclause)
def _check_constraint(self, name, source, condition, schema=None, **kw): t = sa_schema.Table(source, sa_schema.MetaData(), sa_schema.Column('x', Integer), schema=schema) ck = sa_schema.CheckConstraint(condition, name=name, **kw) t.append_constraint(ck) return ck
def bulk_insert(self, table, rows): """Issue a "bulk insert" operation using the current migration context. This provides a means of representing an INSERT of multiple rows which works equally well in the context of executing on a live connection as well as that of generating a SQL script. In the case of a SQL script, the values are rendered inline into the statement. e.g.:: from alembic import op from datetime import date from sqlalchemy.sql import table, column from sqlalchemy import String, Integer, Date # Create an ad-hoc table to use for the insert statement. accounts_table = table('account', column('id', Integer), column('name', String), column('create_date', Date) ) op.bulk_insert(accounts_table, [ {'id':1, 'name':'John Smith', 'create_date':date(2010, 10, 5)}, {'id':2, 'name':'Ed Williams', 'create_date':date(2007, 5, 27)}, {'id':3, 'name':'Wendy Jones', 'create_date':date(2008, 8, 15)}, ] ) """ self.impl.bulk_insert(table, rows)
def define_tables(cls, metadata): Table( quoted_name('t1', quote=True), metadata, Column('id', Integer, primary_key=True), ) Table( quoted_name('t2', quote=True), metadata, Column('id', Integer, primary_key=True), Column('t1id', ForeignKey('t1.id')) )
def _adapt_expression(self, op, other_comparator): """evaluate the return type of <self> <op> <othertype>, and apply any adaptations to the given operator. This method determines the type of a resulting binary expression given two source types and an operator. For example, two :class:`.Column` objects, both of the type :class:`.Integer`, will produce a :class:`.BinaryExpression` that also has the type :class:`.Integer` when compared via the addition (``+``) operator. However, using the addition operator with an :class:`.Integer` and a :class:`.Date` object will produce a :class:`.Date`, assuming "days delta" behavior by the database (in reality, most databases other than PostgreSQL don't accept this particular operation). The method returns a tuple of the form <operator>, <type>. The resulting operator and type will be those applied to the resulting :class:`.BinaryExpression` as the final operator and the right-hand side of the expression. Note that only a subset of operators make usage of :meth:`._adapt_expression`, including math operators and user-defined operators, but not boolean comparison or special SQL keywords like MATCH or BETWEEN. """ return op, self.type
def define_request_data_counters_table(): global request_data_counters_table request_data_counters_table = Table('ckanext_requestdata_counters', metadata, Column('id', types.UnicodeText, primary_key=True, default=make_uuid), Column('package_id', types.UnicodeText), Column('org_id', types.UnicodeText), Column('requests', types.Integer, default=0), Column('replied', types.Integer, default=0), Column('declined', types.Integer, default=0), Column('shared', types.Integer, default=0), Index('ckanext_requestdata_counters_' 'id_idx', 'id')) mapper( ckanextRequestDataCounters, request_data_counters_table )
def get_column_default_string(self, column): if (isinstance(column.server_default, schema.DefaultClause) and isinstance(column.server_default.arg, basestring)): if isinstance(column.type, (sqltypes.Integer, sqltypes.Numeric)): return self.sql_compiler.process(text(column.server_default.arg)) return super(InfoDDLCompiler, self).get_column_default_string(column) ### Informix wants the constraint name at the end, hence this ist c&p from sql/compiler.py
def visit_primary_key_constraint(self, constraint): # for columns with sqlite_autoincrement=True, # the PRIMARY KEY constraint can only be inline # with the column itself. if len(constraint.columns) == 1: c = list(constraint)[0] if c.primary_key and \ c.table.kwargs.get('sqlite_autoincrement', False) and \ issubclass(c.type._type_affinity, sqltypes.Integer) and \ not c.foreign_keys: return None return super(SQLiteDDLCompiler, self).\ visit_primary_key_constraint(constraint)
def _expression_adaptations(self): return { operators.mul: { Interval: Interval, Numeric: self.__class__, Integer: self.__class__, }, # Py2K operators.div: { Numeric: self.__class__, Integer: self.__class__, }, # end Py2K operators.truediv: { Numeric: self.__class__, Integer: self.__class__, }, operators.add: { Numeric: self.__class__, Integer: self.__class__, }, operators.sub: { Numeric: self.__class__, Integer: self.__class__, } }
def coerce_compared_value(self, op, value): # NOTE(mdietz): If left unimplemented, the column is coerced into a # string every time, causing the next_auto_assign_increment to be a # string concatenation rather than an addition. 'value' in the # signature is the "other" value being compared for the purposes of # casting. if isinstance(value, int): return types.Integer() return self