我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用sqlalchemy.PickleType()。
def task_table(base): class Task(base): __tablename__ = 'Task' id_ = Column(Integer, Sequence('task_id_seq'), primary_key=True) run_id = Column(String, default='') step_id = Column(String,) sequence = Column(Integer,) recovery = Column(Integer, nullable=False) pid = Column(Integer, nullable=True) status = Column(SQLEnum(TaskStatus), ) result = Column(PickleType() , nullable=True,) created = Column(DateTime(), default=datetime.utcnow) updated = Column(DateTime(), nullable=True, ) __table_args__ = ( UniqueConstraint('run_id', 'step_id', 'sequence', 'recovery'), ) def __repr__(self): return "<Task(id='%s', run_id='%s', step_id='%s', sequence='%s', recovery='%s', pid='%s', status='%s', created='%s', updated='%s')>" % ( self.id_, self.run_id, self.step_id, self.sequence, self.recovery, self.pid, self.status, self.created, self.updated) return Task
def upgrade(): op.create_table('artists', sa.Column('id', sqlalchemy_utils.types.uuid.UUIDType(), nullable=False), sa.Column('name', sa.String(), nullable=True), sa.Column('description', sa.String(), nullable=True), sa.Column('search_vector', sqlalchemy_utils.types.ts_vector.TSVectorType(), nullable=True), sa.PrimaryKeyConstraint('id') ) op.create_table('torrents', sa.Column('id', sa.String(length=40), nullable=False), sa.Column('info', sa.PickleType(), nullable=True), sa.PrimaryKeyConstraint('id') ) op.create_table('albums', sa.Column('id', sqlalchemy_utils.types.uuid.UUIDType(), nullable=False), sa.Column('title', sa.String(), nullable=True), sa.Column('tracks', sqlalchemy_utils.types.json.JSONType(), nullable=True), sa.Column('search_vector', sqlalchemy_utils.types.ts_vector.TSVectorType(), nullable=True), sa.Column('torrent_id', sa.String(length=40), nullable=True), sa.Column('artist_id', sqlalchemy_utils.types.uuid.UUIDType(), nullable=True), sa.ForeignKeyConstraint(['artist_id'], ['artists.id'], ), sa.ForeignKeyConstraint(['torrent_id'], ['torrents.id'], ), sa.PrimaryKeyConstraint('id') )
def downgrade(): op.drop_column('session_data', 'user_id') op.drop_column('session_data', 'role_id') op.add_column('session_data', sa.Column('data', sa.PickleType, nullable=True))
def make_cache_table(metadata, table_name='beaker_cache', schema_name=None): """Return a ``Table`` object suitable for storing cached values for the namespace manager. Do not create the table.""" return sa.Table(table_name, metadata, sa.Column('namespace', sa.String(255), primary_key=True), sa.Column('accessed', sa.DateTime, nullable=False), sa.Column('created', sa.DateTime, nullable=False), sa.Column('data', sa.PickleType, nullable=False), schema=schema_name if schema_name else metadata.schema)
def downgrade(): # use batch_alter_table to support SQLite workaround with op.batch_alter_table("xcom") as batch_op: batch_op.alter_column('value', type_=sa.PickleType(pickler=dill))
def upgrade(): op.add_column('dag_run', sa.Column('conf', sa.PickleType(), nullable=True))
def make_cache_table(metadata, table_name='beaker_cache'): """Return a ``Table`` object suitable for storing cached values for the namespace manager. Do not create the table.""" return sa.Table(table_name, metadata, sa.Column('namespace', sa.String(255), primary_key=True), sa.Column('accessed', sa.DateTime, nullable=False), sa.Column('created', sa.DateTime, nullable=False), sa.Column('data', sa.PickleType, nullable=False))
def upgrade(): # making ontology terms strings for now # leaving out externalId, diseases, pheno, etc. mappings for now op.create_table( 'individual', sa.Column('id', sa.BigInteger, primary_key=True), sa.Column('guid', sa.String(36), nullable=False, unique=True), sa.Column('name', sa.Text), sa.Column('info', sa.PickleType), sa.Column('record_create_time', sa.Text), sa.Column('record_update_time', sa.Text), )
def upgrade(): op.execute(CreateSequence(Sequence('callset_id_seq', minvalue=0, start=0))) op.create_table( 'callset', sa.Column('id', sa.BigInteger, Sequence('callset_id_seq'), primary_key=True), sa.Column('guid', sa.String(36), nullable=False, unique=True), sa.Column('individual_id', sa.BigInteger, sa.ForeignKey('individual.id'), nullable=False), sa.Column('dbrow_id', sa.BigInteger, sa.ForeignKey('db_row.id'), nullable=False), sa.Column('name', sa.Text, nullable=False), sa.Column('created', sa.BigInteger, nullable=False), sa.Column('updated', sa.BigInteger, nullable=False), sa.Column('info', sa.PickleType) )
def upgrade(): op.create_table('vpp_etcd_journal', sa.Column('id', sa.Integer, primary_key=True, autoincrement=True, nullable=False), sa.Column('k', sa.String(255), nullable=False), sa.Column('v', sa.PickleType, nullable=True), sa.Column('retry_count', sa.Integer, default=0), sa.Column('created_at', sa.DateTime, default=sa.func.now()), sa.Column('last_retried', sa.TIMESTAMP, server_default=sa.func.now(), onupdate=sa.func.now()))
def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.add_column('users', sa.Column('spd', sa.PickleType(), nullable=True)) # ### end Alembic commands ###
def upgrade(): op.create_table('vpp_etcd_journal', sa.Column('id', sa.Integer, primary_key=True, autoincrement=True, nullable=False), sa.Column('k', sa.String(255), nullable=False), sa.Column('v', sa.PickleType, nullable=True), sa.Column('retry_count', sa.Integer, default=0), sa.Column('created_at', sa.DateTime, server_default=sa.func.now()), sa.Column('last_retried', sa.DateTime, server_default=sa.func.now(), onupdate=sa.func.now()))