我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.dialects.postgresql.ARRAY。
def table(): meta = sa.MetaData() post = sa.Table( 'post', meta, sa.Column('id', sa.Integer, nullable=False), sa.Column('title', sa.String(200), nullable=False), sa.Column('body', sa.Text, nullable=False), sa.Column('views', sa.Integer, nullable=False), sa.Column('average_note', sa.Float, nullable=False), sa.Column('pictures', postgresql.JSON, server_default='{}'), sa.Column('published_at', sa.Date, nullable=False), sa.Column('tags', postgresql.ARRAY(sa.Integer), server_default='[]'), # Indexes # sa.PrimaryKeyConstraint('id', name='post_id_pkey')) return post
def User(Base): class User(Base): __tablename__ = 'user' id = sa.Column('_id', sa.Integer, primary_key=True) name = sa.Column('_name', sa.String(20)) age = sa.Column('_age', sa.Integer, nullable=False) email = sa.Column( '_email', sa.String(200), nullable=False, unique=True ) fav_numbers = sa.Column('_fav_numbers', ARRAY(sa.Integer)) __table_args__ = ( sa.CheckConstraint(sa.and_(age >= 0, age <= 150)), sa.CheckConstraint( sa.and_( sa.func.array_length(fav_numbers, 1) <= 8 ) ) ) return User
def upgrade(): op.create_table('icdcm', # Meta sa.Column('meta_id', sa.Text, unique=True), sa.Column('meta_source', sa.Text), sa.Column('meta_created', sa.DateTime(timezone=True)), sa.Column('meta_updated', sa.DateTime(timezone=True)), # General sa.Column('name', sa.Text, primary_key=True), sa.Column('desc', sa.Text), sa.Column('terms', ARRAY(sa.Text)), sa.Column('version', sa.Text), sa.Column('last_updated', sa.Date), )
def upgrade(): op.add_column('task', sa.Column(field, postgresql.ARRAY(sa.Integer)))
def upgrade(): op.create_table( 'result', sa.Column('id', sa.Integer, primary_key=True), sa.Column('created', sa.Text, default=make_timestamp), sa.Column('project_id', sa.Integer, sa.ForeignKey('project.id'), nullable=False), sa.Column('task_id', sa.Integer, sa.ForeignKey('task.id'), nullable=False), sa.Column('task_run_ids', ARRAY(sa.Integer), nullable=False), sa.Column('last_version', sa.Boolean, default=True), sa.Column('info', JSON) )
def upgrade(): context = op.get_context() connection = op.get_bind() op.create_table('message_blacklist', sa.Column('id', sa.Integer(), nullable=False), sa.Column('login_id', sa.BigInteger(), nullable=False), sa.Column('blacklist', postgresql.ARRAY(sa.Integer)), sa.ForeignKeyConstraint(['login_id'], ['login.id'], ondelete='CASCADE', name="ref_message_blacklist_login_id_to_login"), sa.PrimaryKeyConstraint('id') ) op.create_index(op.f('idx_message_blacklist_login_id'), 'message_blacklist', ['login_id'], unique=True)
def Any(other, arrexpr, operator=operators.eq): """A synonym for the :meth:`.ARRAY.Comparator.any` method. This method is legacy and is here for backwards-compatibility. .. seealso:: :func:`.expression.any_` """ return arrexpr.any(other, operator)
def All(other, arrexpr, operator=operators.eq): """A synonym for the :meth:`.ARRAY.Comparator.all` method. This method is legacy and is here for backwards-compatibility. .. seealso:: :func:`.expression.all_` """ return arrexpr.all(other, operator)
def __init__(self, clauses, **kw): super(array, self).__init__(*clauses, **kw) self.type = ARRAY(self.type)
def upgrade(): op.add_column('bookmark', sa.Column('tags', postgresql.ARRAY(sa.Text()), nullable=True))
def upgrade(): ### commands auto generated by Alembic - please adjust! ### op.create_table('rule', sa.Column('id', sa.Integer(), nullable=False), sa.Column('type', sa.String(), nullable=False), sa.Column('name', sa.String(), nullable=False), sa.Column('action', sa.Enum('added', 'removed', 'both', name='rule_actions'), nullable=False), sa.Column('alerters', postgresql.ARRAY(sa.String()), nullable=False), sa.Column('config', postgresql.JSONB(), nullable=True), sa.PrimaryKeyConstraint('id') ) ### end Alembic commands ###
def build_trafaret(sa_type, **kwargs): if isinstance(sa_type, sa.sql.sqltypes.Enum): trafaret = t.Enum(*sa_type.enums, **kwargs) # check for Text should be before String elif isinstance(sa_type, sa.sql.sqltypes.Text): trafaret = t.String(**kwargs) elif isinstance(sa_type, sa.sql.sqltypes.String): trafaret = t.String(max_length=sa_type.length, **kwargs) elif isinstance(sa_type, sa.sql.sqltypes.Integer): trafaret = t.Int(**kwargs) elif isinstance(sa_type, sa.sql.sqltypes.Float): trafaret = t.Float(**kwargs) elif isinstance(sa_type, sa.sql.sqltypes.DateTime): trafaret = DateTime(**kwargs) # RFC3339 elif isinstance(sa_type, sa.sql.sqltypes.Date): trafaret = DateTime(**kwargs) # RFC3339 elif isinstance(sa_type, sa.sql.sqltypes.Boolean): trafaret = t.StrBool(**kwargs) # Add PG related JSON and ARRAY elif isinstance(sa_type, postgresql.JSON): trafaret = AnyDict | t.List(AnyDict) # Add PG related JSON and ARRAY elif isinstance(sa_type, postgresql.ARRAY): item_trafaret = build_trafaret(sa_type.item_type) trafaret = t.List(item_trafaret) else: type_ = str(sa_type) msg = 'Validator for type {} not implemented'.format(type_) raise NotImplementedError(msg) return trafaret
def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.add_column( 'identity', sa.Column( 'scopes', postgresql.ARRAY( sa.String( length=64)), nullable=True)) # ### end Alembic commands ###
def upgrade(): ### commands auto generated by Alembic - please adjust! ### op.create_table('draft', sa.Column('id', sa.Integer(), nullable=False), sa.Column('user_id', sa.Integer(), nullable=False), sa.Column('link_ids', postgresql.ARRAY(sa.Integer()), nullable=True), sa.Column('created_at', sa.DateTime(), nullable=True), sa.Column('updated_at', sa.DateTime(), nullable=True), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('user_id', name='ux_draft_user') ) ### end Alembic commands ###
def test_get_field_type_can_map_pg_array_column(self): column = sqa.Column(postgresql.ARRAY(item_type=sqa.Integer())) field = field_mapping.get_field_type(column) self.assertTrue(issubclass(field, fields.ListField)) self.assertIsInstance(field().child, fields.IntegerField)
def test_get_field_type_pg_array_column_raises_when_item_type_not_found(self): class DummyType(object): python_type = None column = sqa.Column(postgresql.ARRAY(item_type=DummyType)) with self.assertRaises(KeyError): field_mapping.get_field_type(column)
def get_field_type(column): """ Returns the field type to be used determined by the sqlalchemy column type or the column type's python type """ if isinstance(column.type, sqltypes.Enum) and not column.type.enum_class: return fields.ChoiceField if isinstance(column.type, postgresql.ARRAY): child_field = SERIALIZER_FIELD_MAPPING.get(column.type.item_type.__class__ ) or SERIALIZER_FIELD_MAPPING.get(column.type.item_type.python_type) if child_field is None: raise KeyError("Could not figure out field for ARRAY item type '{}'".format(column.type.__class__)) class ArrayField(fields.ListField): """Nested array field for PostreSQL's ARRAY type""" def __init__(self, *args, **kwargs): kwargs['child'] = child_field() super(ArrayField, self).__init__(*args, **kwargs) return ArrayField if column.type.__class__ in SERIALIZER_FIELD_MAPPING: return SERIALIZER_FIELD_MAPPING.get(column.type.__class__) if issubclass(column.type.python_type, bool): return fields.NullBooleanField if column.nullable else fields.BooleanField return SERIALIZER_FIELD_MAPPING.get(column.type.python_type)
def ancestor_of(self, other): if isinstance(other, list): return self.op('@>')(expression.cast(other, ARRAY(LtreeType))) else: return self.op('@>')(other)
def descendant_of(self, other): if isinstance(other, list): return self.op('<@')(expression.cast(other, ARRAY(LtreeType))) else: return self.op('<@')(other)
def lquery(self, other): if isinstance(other, list): return self.op('?')(expression.cast(other, ARRAY(LQUERY))) else: return self.op('~')(other)
def _repeated_value(type_): if isinstance(type_, ARRAY): if isinstance(type_.item_type, sa.Integer): return [0] elif isinstance(type_.item_type, sa.String): return [u'a'] elif isinstance(type_.item_type, sa.Numeric): return [Decimal('0')] else: raise TypeError('Unknown array item type') else: return u'a'
def _expected_exception(type_): if isinstance(type_, ARRAY): return IntegrityError else: return DataError
def __init__(self, arg, default=None, **kw): self.type = postgresql.ARRAY(arg.type) self.default = default GenericFunction.__init__(self, arg, **kw)
def test_type(self): assert isinstance( sa.func.array_agg(sa.text('u.name')).type, postgresql.ARRAY )
def test_array_agg_with_default(self): Base = sa.ext.declarative.declarative_base() class Article(Base): __tablename__ = 'article' id = sa.Column(sa.Integer, primary_key=True) assert str(sa.func.array_agg(Article.id, [1]).compile( dialect=postgresql.dialect() )) == ( 'coalesce(array_agg(article.id), CAST(ARRAY[%(param_1)s]' ' AS INTEGER[]))' )
def upgrade(): op.create_table('trials', sa.Column('uuid', UUID, primary_key=True), sa.Column('updated', sa.DateTime(timezone=True), nullable=False), sa.Column('records', ARRAY(sa.Text), nullable=False, unique=True), sa.Column('nct_id', sa.Text, unique=True), sa.Column('euctr_id', sa.Text, unique=True), sa.Column('isrctn_id', sa.Text, unique=True), sa.Column('scientific_title', sa.Text, unique=True), )
def downgrade(): op.create_table('trials', sa.Column('uuid', UUID, primary_key=True), sa.Column('updated', sa.DateTime(timezone=True), nullable=False), sa.Column('records', ARRAY(sa.Text), nullable=False, unique=True), sa.Column('nct_id', sa.Text, unique=True), sa.Column('euctr_id', sa.Text, unique=True), sa.Column('isrctn_id', sa.Text, unique=True), sa.Column('scientific_title', sa.Text, unique=True), )
def upgrade(): op.create_table('ictrp', # Meta sa.Column('meta_uuid', sa.Text), sa.Column('meta_source', sa.Text), sa.Column('meta_created', sa.DateTime(timezone=True)), sa.Column('meta_updated', sa.DateTime(timezone=True)), # Main sa.Column('register', sa.Text, primary_key=True), sa.Column('last_refreshed_on', sa.Date), sa.Column('main_id', sa.Text, primary_key=True), sa.Column('date_of_registration', sa.Text), sa.Column('primary_sponsor', sa.Text), sa.Column('public_title', sa.Text), sa.Column('scientific_title', sa.Text), sa.Column('date_of_first_enrollment', sa.Text), sa.Column('target_sample_size', sa.Integer), sa.Column('recruitment_status', sa.Text), sa.Column('url', sa.Text), sa.Column('study_type', sa.Text), sa.Column('study_design', sa.Text), sa.Column('study_phase', sa.Text), # Additional sa.Column('countries_of_recruitment', ARRAY(sa.Text)), sa.Column('contacts', JSONB), sa.Column('key_inclusion_exclusion_criteria', sa.Text), sa.Column('health_conditions_or_problems_studied', ARRAY(sa.Text)), sa.Column('interventions', ARRAY(sa.Text)), sa.Column('primary_outcomes', ARRAY(sa.Text)), sa.Column('secondary_outcomes', ARRAY(sa.Text)), sa.Column('secondary_ids', ARRAY(sa.Text)), sa.Column('sources_of_monetary_support', ARRAY(sa.Text)), sa.Column('secondary_sponsors', ARRAY(sa.Text)), )
def __init__(self, field=None, **params): super(Array, self).__init__(**params) if field is None: field = Text() self.__field = field self.__column_type = ARRAY(field.column_type)
def upgrade(): ### commands auto generated by Alembic - please adjust! ### op.add_column('exercises', sa.Column('features', postgresql.ARRAY(sa.Integer()), nullable=True)) op.add_column('exercises', sa.Column('forest_name', sa.String(), nullable=True)) ### end Alembic commands ###
def upgrade(): ### commands auto generated by Alembic - please adjust! ### op.add_column('exercises', sa.Column('vocab', postgresql.ARRAY(sa.String()), nullable=True)) ### end Alembic commands ###