我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用sqlalchemy.dialects.postgresql.JSON。
def upgrade(): op.create_table( 'project_stats', sa.Column('id', sa.Integer, primary_key=True), sa.Column('project_id', sa.Integer, sa.ForeignKey('project.id', ondelete='CASCADE')), sa.Column('n_tasks', sa.Integer, default=0), sa.Column('n_task_runs', sa.Integer, default=0), sa.Column('n_results', sa.Integer, default=0), sa.Column('n_volunteers', sa.Integer, default=0), sa.Column('n_completed_tasks', sa.Integer, default=0), sa.Column('overall_progress', sa.Integer, default=0), sa.Column('average_time', sa.Float, default=0), sa.Column('n_blogposts', sa.Integer, default=0), sa.Column('last_activity', sa.Text, default=make_timestamp), sa.Column('info', JSON, nullable=False) )
def load_dialect_impl(self, dialect: dialects) -> DialectType: """ SQLAlchemy wraps all database-specific features into dialects, which are then responsible for generating the SQL code for a specific DB type when loading in data. ``load_dialect_impl`` is called when CRUD (create, update, delete operations) needs to be done on the database. This method is responsible for telling SQLAlchemy how to configure the dialect to write this type :param dialect: The loaded dialect :return: The type descriptor for this type. """ if dialect.name == 'postgresql': return dialect.type_descriptor(postgresql.JSON()) elif dialect.name == 'mysql': if 'JSON' in dialect.ischema_names: return dialect.type_descriptor(mysql.JSON()) else: return dialect.type_descriptor( VARCHAR(self._MAX_VARCHAR_LIMIT) ) else: return dialect.type_descriptor(VARCHAR(self._MAX_VARCHAR_LIMIT))
def table(): meta = sa.MetaData() post = sa.Table( 'post', meta, sa.Column('id', sa.Integer, nullable=False), sa.Column('title', sa.String(200), nullable=False), sa.Column('body', sa.Text, nullable=False), sa.Column('views', sa.Integer, nullable=False), sa.Column('average_note', sa.Float, nullable=False), sa.Column('pictures', postgresql.JSON, server_default='{}'), sa.Column('published_at', sa.Date, nullable=False), sa.Column('tags', postgresql.ARRAY(sa.Integer), server_default='[]'), # Indexes # sa.PrimaryKeyConstraint('id', name='post_id_pkey')) return post
def upgrade(): """Upgrade the database to a newer revision.""" # ### commands auto generated by Alembic - please adjust! ### op.create_table('api_requests', sa.Column('id', sa.String(length=64), nullable=False), sa.Column('api_name', sa.String(length=256), nullable=True), sa.Column('submit_time', sa.DateTime(), nullable=False), sa.Column('user_email', sa.String(length=256), nullable=True), sa.Column('user_profile_digest', sa.String(length=128), nullable=True), sa.Column('origin', sa.String(length=64), nullable=True), sa.Column('team', sa.String(length=64), nullable=True), sa.Column('recommendation', postgresql.JSON(astext_type=sa.Text()), nullable=True), sa.Column('request_digest', sa.String(length=128), nullable=False), sa.PrimaryKeyConstraint('id')) # ### end Alembic commands ###
def upgrade(): ### commands auto generated by Alembic - please adjust! ### op.create_table('pending_ride', sa.Column('id', sa.Integer(), nullable=False), sa.Column('uid', sa.String(), nullable=True), sa.Column('user_integration_id', sa.Integer(), nullable=False), sa.Column('start_coord', postgresql.JSON(), nullable=True), sa.Column('end_coord', postgresql.JSON(), nullable=True), sa.Column('fare', sa.Numeric(), nullable=True), sa.Column('meta', postgresql.JSON(), nullable=True), sa.Column('status', sa.Integer(), nullable=False), sa.Column('created_at', sa.DateTime(), nullable=True), sa.Column('is_destroyed', sa.Boolean(), server_default='f', nullable=True), sa.PrimaryKeyConstraint('id') ) op.create_index(op.f('ix_pending_ride_uid'), 'pending_ride', ['uid'], unique=False) ### end Alembic commands ###
def upgrade(): ### commands auto generated by Alembic - please adjust! ### op.create_table('pending_ride', sa.Column('id', sa.Integer(), nullable=False), sa.Column('uid', sa.String(), nullable=True), sa.Column('user_integration_id', sa.Integer(), nullable=False), sa.Column('start_coord', postgresql.JSON(), nullable=True), sa.Column('end_coord', postgresql.JSON(), nullable=True), sa.Column('fare', postgresql.JSON(), nullable=True), sa.Column('meta', postgresql.JSON(), nullable=True), sa.Column('status', sa.Integer(), nullable=False), sa.Column('created_at', sa.DateTime(), nullable=True), sa.Column('is_destroyed', sa.Boolean(), server_default='f', nullable=True), sa.PrimaryKeyConstraint('id') ) op.create_index(op.f('ix_pending_ride_uid'), 'pending_ride', ['uid'], unique=False) ### end Alembic commands ###
def upgrade(): op.create_table('webhook', sa.Column('id', sa.Integer, primary_key=True), sa.Column('created', sa.Text, default=make_timestamp), sa.Column('updated', sa.Text, default=make_timestamp), sa.Column('project_id', sa.Integer, sa.ForeignKey('project.id')), sa.Column('payload', JSON), sa.Column('response', sa.Text), sa.Column('response_status_code', sa.Integer) )
def upgrade(): op.create_table( 'result', sa.Column('id', sa.Integer, primary_key=True), sa.Column('created', sa.Text, default=make_timestamp), sa.Column('project_id', sa.Integer, sa.ForeignKey('project.id'), nullable=False), sa.Column('task_id', sa.Integer, sa.ForeignKey('task.id'), nullable=False), sa.Column('task_run_ids', ARRAY(sa.Integer), nullable=False), sa.Column('last_version', sa.Boolean, default=True), sa.Column('info', JSON) )
def upgrade(): op.create_table('helpingmaterial', sa.Column('id', sa.Integer, primary_key=True), sa.Column('project_id', sa.Integer, sa.ForeignKey('project.id', ondelete='CASCADE'), nullable=False), sa.Column('created', TIMESTAMP, default=make_timestamp), sa.Column('info', JSON, nullable=False), sa.Column('media_url', sa.Text), )
def upgrade(): op.add_column('blogpost', sa.Column(field, sa.String)) op.add_column('blogpost', sa.Column('info', JSON))
def upgrade(): ### commands auto generated by Alembic - please adjust! ### op.add_column('episode', sa.Column('to_json', postgresql.JSON(), nullable=True)) ### end Alembic commands ###
def upgrade(): ### commands auto generated by Alembic - please adjust! ### op.create_table('results', sa.Column('id', sa.Integer(), nullable=False), sa.Column('url', sa.String(), nullable=True), sa.Column('result_all', postgresql.JSON(astext_type=Text()), nullable=True), sa.Column('result_no_stop_words', postgresql.JSON(astext_type=Text()), nullable=True), sa.PrimaryKeyConstraint('id') ) ### end Alembic commands ###
def copy(self, *args, **kwargs) -> 'JSON': """ :param args: The arguments to the UUID constructor :param kwargs: The keyword arguments to the UUID constructor :return: A deep copy of this object """ return JSON(*args, **kwargs)
def build_trafaret(sa_type, **kwargs): if isinstance(sa_type, sa.sql.sqltypes.Enum): trafaret = t.Enum(*sa_type.enums, **kwargs) # check for Text should be before String elif isinstance(sa_type, sa.sql.sqltypes.Text): trafaret = t.String(**kwargs) elif isinstance(sa_type, sa.sql.sqltypes.String): trafaret = t.String(max_length=sa_type.length, **kwargs) elif isinstance(sa_type, sa.sql.sqltypes.Integer): trafaret = t.Int(**kwargs) elif isinstance(sa_type, sa.sql.sqltypes.Float): trafaret = t.Float(**kwargs) elif isinstance(sa_type, sa.sql.sqltypes.DateTime): trafaret = DateTime(**kwargs) # RFC3339 elif isinstance(sa_type, sa.sql.sqltypes.Date): trafaret = DateTime(**kwargs) # RFC3339 elif isinstance(sa_type, sa.sql.sqltypes.Boolean): trafaret = t.StrBool(**kwargs) # Add PG related JSON and ARRAY elif isinstance(sa_type, postgresql.JSON): trafaret = AnyDict | t.List(AnyDict) # Add PG related JSON and ARRAY elif isinstance(sa_type, postgresql.ARRAY): item_trafaret = build_trafaret(sa_type.item_type) trafaret = t.List(item_trafaret) else: type_ = str(sa_type) msg = 'Validator for type {} not implemented'.format(type_) raise NotImplementedError(msg) return trafaret
def upgrade(): """Upgrade the database to a newer revision.""" # commands auto generated by Alembic - please adjust! ### # op.alter_column('stack_analyses_request', 'requestJson', # existing_type=sa.VARCHAR(length=4096), # type_=postgresql.JSON(astext_type=postgresql.JSON()), # existing_nullable=False) op.execute('ALTER TABLE stack_analyses_request ALTER COLUMN "requestJson" TYPE json ' 'using CAST("requestJson" as json)') # end Alembic commands ###
def downgrade(): """Downgrade the database to an older revision.""" # commands auto generated by Alembic - please adjust! ### # op.alter_column('stack_analyses_request', 'requestJson', # existing_type=postgresql.JSON(astext_type=postgresql.JSON()), # type_=sa.VARCHAR(length=4096), # existing_nullable=False) op.execute('ALTER TABLE stack_analyses_request ALTER COLUMN "requestJson" TYPE varchar(4098) ' 'using CAST("requestJson" as varchar(4098))') # end Alembic commands ###
def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.create_table('queue', sa.Column('id', sa.BIGINT(), nullable=False), sa.Column('enqueued_at', postgresql.TIMESTAMP(timezone=True), server_default=sa.text('now()'), autoincrement=False, nullable=False), sa.Column('dequeued_at', postgresql.TIMESTAMP(timezone=True), autoincrement=False, nullable=True), sa.Column('expected_at', postgresql.TIMESTAMP(timezone=True), autoincrement=False, nullable=True), sa.Column('schedule_at', postgresql.TIMESTAMP(timezone=True), autoincrement=False, nullable=True), sa.Column('q_name', sa.TEXT(), autoincrement=False, nullable=False), sa.Column('data', postgresql.JSON(astext_type=sa.Text()), autoincrement=False, nullable=False), sa.PrimaryKeyConstraint('id') ) op.create_index('priority_idx', 'queue', ['schedule_at', 'expected_at'], unique=False) # ### end Alembic commands ###
def upgrade(): """Upgrade the database to a newer revision.""" # ### commands auto generated by Alembic - please adjust! ### # op.alter_column('stack_analyses_request', 'requestJson', # existing_type=sa.VARCHAR(length=4096), # type_=postgresql.JSON(astext_type=postgresql.JSON()), # existing_nullable=False) op.execute('ALTER TABLE stack_analyses_request ALTER COLUMN "requestJson" TYPE json using ' 'CAST("requestJson" as json)') # ### end Alembic commands ###
def downgrade(): """Downgrade the database to an older revision.""" # ### commands auto generated by Alembic - please adjust! ### # op.alter_column('stack_analyses_request', 'requestJson', # existing_type=postgresql.JSON(astext_type=postgresql.JSON()), # type_=sa.VARCHAR(length=4096), # existing_nullable=False) op.execute('ALTER TABLE stack_analyses_request ALTER COLUMN "requestJson" TYPE varchar(4098) ' 'using CAST("requestJson" as varchar(4098))') # ### end Alembic commands ###
def load_dialect_impl(self, dialect): if dialect.name == 'postgresql': # Use the native JSON type. if has_postgres_json: return dialect.type_descriptor(JSON()) else: return dialect.type_descriptor(PostgresJSONType()) else: return dialect.type_descriptor(self.impl)
def test_type(self): assert isinstance( sa.func.row_to_json(sa.text('article.*')).type, postgresql.JSON )
def upgrade(): ### commands auto generated by Alembic - please adjust! ### op.add_column('user_integration', sa.Column('meta', postgresql.JSON(), nullable=True)) op.add_column('user_integration', sa.Column('refresh_token', sa.String(), nullable=True)) ### end Alembic commands ###
def upgrade(): op.create_table('verification', sa.Column('verification_id', sa.String(length=32), nullable=False), sa.Column('ip4', sa.BigInteger(), nullable=False), sa.Column('expires', sa.BigInteger(), nullable=False), sa.Column('data', postgresql.JSON(), nullable=False), sa.PrimaryKeyConstraint('verification_id') ) op.create_index(op.f('ix_verification_expires'), 'verification', ['expires'], unique=False) op.create_index(op.f('ix_verification_ip4'), 'verification', ['ip4'], unique=False)
def load_dialect_impl(self, dialect): if dialect.name == 'postgresql': if dialect.server_version_info >= (9, 4): self.using_native_json = True return dialect.type_descriptor(postgresql.JSONB()) if dialect.server_version_info >= (9, 2): self.using_native_json = True return dialect.type_descriptor(postgresql.JSON()) return dialect.type_descriptor(types.Text())
def get_parsed_exercise(exercise_id): """ Get the JSON exercise data from the exercise.data field This is where the feedback, answers, and other info is located Parameters ---------- exercise_id Returns ------- """ exercise = Exercise.get(exercise_id) subject = Subject.get(exercise.subject_id) answer_html = 'Question not supplied with a correct answer' if exercise.book_row_id: book_url = '{0}:{1}'.format(subject.book_url, exercise.book_row_id) else: book_url = subject.book_url e_data = exercise.data['questions'][0] # Create a list for the feedback_choices feedback_choices = [] # Get the correct answer for answer in e_data['answers']: if 'feedback_html' in answer and answer['feedback_html']: feedback = (str(answer['id']), answer['feedback_html']) feedback_choices.append(feedback) if answer['correctness'] == '1.0': answer_html = answer['content_html'] # Get number of responses return dict(id=exercise.id, exercise_html=e_data['stem_html'], answer_html=answer_html, feedback_choices=feedback_choices, uid=exercise.uid, book_url=book_url, chapter_id=exercise.chapter_id )