我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.dialects.postgresql.UUID。
def downgrade(): op.add_column('container_states', sa.Column('pod_id', postgresql.UUID(), autoincrement=False, nullable=True)) op.create_foreign_key(u'container_states_pod_id_fkey', 'container_states', 'pods', ['pod_id'], ['id']) downgrade_data() op.drop_column('container_states', 'reason') op.drop_column('container_states', 'exit_code') op.drop_constraint('container_states_pod_state_id_fkey', 'container_states', type_='foreignkey') op.drop_index('ix_pod_id_start_time', table_name='pod_states') op.drop_column('container_states', 'pod_state_id') op.execute("ALTER TABLE pod_states DROP CONSTRAINT pod_states_pkey, " "ADD CONSTRAINT pod_states_pkey PRIMARY KEY (pod_id, start_time);") op.drop_column('pod_states', 'id') op.execute(sa.schema.DropSequence(sa.Sequence('pod_states_id_seq')))
def upgrade(): op.create_table('pd_states', sa.Column('user_id', sa.Integer(), nullable=False), sa.Column('pd_name', sa.String(), nullable=False), sa.Column('size', sa.Integer(), nullable=False), sa.Column('start_time', sa.DateTime(), nullable=False), sa.Column('end_time', sa.DateTime(), nullable=True), sa.ForeignKeyConstraint(['user_id'], ['users.id']), sa.PrimaryKeyConstraint('start_time')) op.create_table('ip_states', sa.Column('pod_id', postgresql.UUID(), nullable=False), sa.Column('ip_address', sa.BigInteger(), nullable=False), sa.Column('start_time', sa.DateTime(), nullable=False), sa.Column('end_time', sa.DateTime(), nullable=True), sa.ForeignKeyConstraint(['pod_id'], ['pods.id']), sa.PrimaryKeyConstraint('pod_id', 'start_time'))
def test_name_postgresql(self) -> None: """ Tests that if the name ``postgresql`` is passed into the UUID database type, then PostgreSQL's UUID type is loaded as a dialect for the UUID type. The returned type descriptor should be the same as the postgresql UUID database type """ uuid_instance = DB_UUID() dialect_impl = uuid_instance.load_dialect_impl(self.pg_sql_dialect) self.assertIsInstance( dialect_impl, self.pg_sql_dialect.type_descriptor( postgresql.UUID() ).__class__ )
def test_bind_uuid_not_uuid_type(self, uuid: UUID) -> None: """ Tests that if the parameter to write in is a string that looks like a UUID, then it is written to the DB as a de-hyphenated UUID. This means that the id ``7490bda6-7c69-47c2-ad97-c7453f15811c`` gets written as ``7490bda67c6947c2ad97c7453f15811c``. The length of the de-hyphenated UUID MUST be 32 characters. :param uuid: The uuid to write, randomly generated """ db_uuid = DB_UUID() value_to_store = db_uuid.process_bind_param( str(uuid), self.sqlite_dialect ) self.assertEqual( value_to_store, value_to_store.replace('-', '') ) self.assertEqual( self.expected_de_hyphenated_uuid_length, len(value_to_store) )
def load_dialect_impl(self, dialect: dialects) -> DialectType: """ SQLAlchemy wraps all database-specific features into dialects, which are then responsible for generating the SQL code for a specific DB type when loading in data. ``load_dialect_impl`` is called when CRUD (create, update, delete operations) needs to be done on the database. This method is responsible for telling SQLAlchemy how to configure the dialect to write this type :param dialect: The loaded dialect :return: The type descriptor for this type. """ if dialect.name == 'postgresql': return dialect.type_descriptor(postgresql.UUID()) else: return dialect.type_descriptor(CHAR(32))
def test_querying_table(metadata): """ Create an object for test table. """ # When using pytest-xdist, we don't want concurrent table creations # across test processes so we assign a unique name for table based on # the current worker id. worker_id = os.environ.get('PYTEST_XDIST_WORKER', 'master') return Table( 'test_querying_table_' + worker_id, metadata, Column('id', types.Integer, autoincrement=True, primary_key=True), Column('t_string', types.String(60)), Column('t_list', types.ARRAY(types.String(60))), Column('t_enum', types.Enum(MyEnum)), Column('t_int_enum', types.Enum(MyIntEnum)), Column('t_datetime', types.DateTime()), Column('t_date', types.DateTime()), Column('t_interval', types.Interval()), Column('uniq_uuid', PG_UUID, nullable=False, unique=True, default=uuid4), )
def _add_column_kwargs(self, kwargs, column): """Add keyword arguments to kwargs (in-place) based on the passed in `Column <sqlalchemy.schema.Column>`. """ if column.nullable: kwargs['allow_none'] = True kwargs['required'] = not column.nullable and not _has_default(column) if hasattr(column.type, 'enums'): kwargs['validate'].append(validate.OneOf(choices=column.type.enums)) # Add a length validator if a max length is set on the column # Skip UUID columns # (see https://github.com/marshmallow-code/marshmallow-sqlalchemy/issues/54) if hasattr(column.type, 'length'): try: python_type = column.type.python_type except (AttributeError, NotImplementedError): python_type = None if not python_type or not issubclass(python_type, uuid.UUID): kwargs['validate'].append(validate.Length(max=column.type.length)) if hasattr(column.type, 'scale'): kwargs['places'] = getattr(column.type, 'scale', None)
def upgrade(): op.create_table('base_string', sa.Column('id', postgresql.UUID(as_uuid=True), server_default=sa.text('uuid_generate_v4()'), nullable=False), sa.Column('resource_pk', postgresql.UUID(as_uuid=True), nullable=False), sa.Column('base_string', sa.Text(), nullable=False), sa.Column('comment', sa.Text(), nullable=False), sa.Column('context', sa.Text(), nullable=False), sa.ForeignKeyConstraint(['resource_pk'], ['resource.id'], ), sa.PrimaryKeyConstraint('id') ) op.drop_table('translated_string') op.create_table('translated_string', sa.Column('id', sa.Integer(), nullable=False, autoincrement=True), sa.Column('base_string_pk', postgresql.UUID(as_uuid=True), nullable=False), sa.Column('language_pk', sa.Integer, nullable=False), sa.Column('translation', sa.Text(), nullable=False), sa.Column('translator_comment', sa.Text(), nullable=False), sa.ForeignKeyConstraint(['base_string_pk'], ['base_string.id'], ), sa.ForeignKeyConstraint(['language_pk'], ['language.id'], ), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('base_string_pk', 'language_pk', name='base_string_language_uc'), )
def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.create_table('india_ethnicities', sa.Column('id', postgresql.UUID(), server_default=sa.text(u'uuid_generate_v4()'), nullable=False), sa.Column('patient_id', sa.Integer(), nullable=False), sa.Column('source_group_id', sa.Integer(), nullable=False), sa.Column('source_type', sa.String(), nullable=False), sa.Column('father_ancestral_state', sa.String(), nullable=True), sa.Column('father_language', sa.String(), nullable=True), sa.Column('mother_ancestral_state', sa.String(), nullable=True), sa.Column('mother_language', sa.String(), nullable=True), sa.Column('created_user_id', sa.Integer(), nullable=False), sa.Column('created_date', sa.DateTime(timezone=True), server_default=sa.text(u'now()'), nullable=False), sa.Column('modified_user_id', sa.Integer(), nullable=False), sa.Column('modified_date', sa.DateTime(timezone=True), server_default=sa.text(u'now()'), nullable=False), sa.ForeignKeyConstraint(['created_user_id'], ['users.id'], ), sa.ForeignKeyConstraint(['modified_user_id'], ['users.id'], ), sa.ForeignKeyConstraint(['patient_id'], ['patients.id'], onupdate='CASCADE', ondelete='CASCADE'), sa.ForeignKeyConstraint(['source_group_id'], ['groups.id'], ), sa.PrimaryKeyConstraint('id') ) op.create_index('india_ethnicity_patient_idx', 'india_ethnicities', ['patient_id'], unique=False) # ### end Alembic commands ###
def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.create_table('web_hook', sa.Column('id', postgresql.UUID(as_uuid=True), nullable=False), sa.Column('name', sa.TEXT(), nullable=False, unique=True), sa.Column('description', sa.TEXT(), nullable=True), sa.Column('url', sa.TEXT(), nullable=False), sa.Column('shared_secret', sa.TEXT(), nullable=False), sa.Column('status', sa.Integer(), nullable=False), sa.Column('consecutive_failure_count', sa.Integer(), nullable=False), sa.Column('register_time', sa.TIMESTAMP(), nullable=False), sa.Column('created_by_uid', postgresql.UUID(as_uuid=True), nullable=True), sa.Column('permissions', sa.TEXT(), nullable=False, server_default='"[]"'), sa.PrimaryKeyConstraint('id') ) op.create_table('web_hook_token', sa.Column('token_id', sa.TEXT(), nullable=False), sa.Column('user_id', postgresql.UUID(as_uuid=True), nullable=False), sa.Column('web_hook_id', postgresql.UUID(as_uuid=True), nullable=False), sa.ForeignKeyConstraint(['user_id'], ['users.id'], ), sa.ForeignKeyConstraint(['web_hook_id'], ['web_hook.id'], ), sa.PrimaryKeyConstraint('user_id', 'web_hook_id') ) # ### end Alembic commands ###
def _update_rels(self, model, links): session = self.DBSession() source = model.rid rels = {(k, uuid.UUID(target)) for k, targets in links.items() for target in targets} existing = { (link.rel, link.target_rid) for link in model.rels } to_remove = existing - rels to_add = rels - existing for rel, target in to_remove: link = session.query(Link).get((source, rel, target)) session.delete(link) for rel, target in to_add: link = Link(source_rid=source, rel=rel, target_rid=target) session.add(link) return to_add, to_remove
def load_dialect_impl(self, dialect): if dialect.name == 'postgresql': return dialect.type_descriptor(UUID()) else: return dialect.type_descriptor(CHAR(32))
def process_bind_param(self, value, dialect): if value is None: return value elif dialect.name == 'postgresql': return str(value) else: if not isinstance(value, uuid.UUID): return "%.32x" % uuid.UUID(value).int else: # hexstring return "%.32x" % value.int
def process_result_value(self, value, dialect): if value is None: return value else: return uuid.UUID(value)
def upgrade(): ### commands auto generated by Alembic - please adjust! ### op.create_table('pod_states', sa.Column('pod_id', postgresql.UUID(), nullable=False), sa.Column('start_time', sa.DateTime(), nullable=False), sa.Column('end_time', sa.DateTime(), nullable=True), sa.Column('last_event_time', sa.DateTime(), nullable=True), sa.Column('last_event', sa.String(length=255), nullable=True), sa.Column('hostname', sa.String(length=255), nullable=True), sa.ForeignKeyConstraint(['pod_id'], ['pods.id'], ), sa.PrimaryKeyConstraint('pod_id', 'start_time') ) ### end Alembic commands ###
def upgrade(): op.create_table( 'persistent_disk', sa.Column('id', sa.String(length=32), nullable=False), sa.Column('drive_name', sa.String(length=64), nullable=False), sa.Column('name', sa.String(length=64), nullable=False), sa.Column('owner_id', sa.Integer(), nullable=False), sa.Column('size', sa.Integer(), nullable=False), sa.Column('pod_id', postgresql.UUID(), nullable=True), sa.ForeignKeyConstraint(['owner_id'], ['users.id'], ), sa.ForeignKeyConstraint(['pod_id'], ['pods.id'], ), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('drive_name'), sa.UniqueConstraint('name', 'owner_id') )
def downgrade(): op.add_column('dataset', sa.Column('organization_id', postgresql.UUID(), autoincrement=False, nullable=True)) op.execute(''' UPDATE "dataset" SET organization_id = (meta->>'organization_id')::uuid WHERE meta->>'organization_id' IS NOT NULL ''') op.drop_column('dataset', 'meta')
def test_bind_uuid_postgres(self, uuid: UUID) -> None: """ Tests that if the UUID is passed to a Postgres dialect, that it is returned as a string with hyphens :param uuid: A randomly-generated UUID to store """ db_uuid = DB_UUID() value_to_store = db_uuid.process_bind_param( uuid, self.pg_sql_dialect ) self.assertEqual(value_to_store, str(uuid))
def test_bind_uuid_something_else(self, uuid: UUID) -> None: """ Tests that the UUID gets de-hyphenated if using the CHAR 32 type, same as always. :param uuid: A randomly-generated UUID to store """ db_uuid = DB_UUID() value_to_store = db_uuid.process_bind_param( uuid, self.sqlite_dialect ) self.assertEqual( value_to_store, "%.32x" % int(uuid) )
def process_result_value( self, value: Optional[str], dialect: dialects ) -> Optional[dict]: """ :param value: The value to process from the SQL query :param dialect: The dialect to use for the processing :return: The value as a UUID """ if value is None: return value else: return json.loads(value)
def process_result_value( self, value: Optional[str], dialect: dialects ) -> Optional[uuid.UUID]: """ :param value: The value to process from the SQL query :param dialect: The dialect to use for the processing :return: The value as a UUID """ if value is None: return value else: return uuid.UUID(value)
def copy(self, *args, **kwargs) -> 'UUID': """ :param args: The arguments to the UUID constructor :param kwargs: The keyword arguments to the UUID constructor :return: A deep copy of this object """ return UUID(*args, **kwargs)
def UuidColumn(*args, **kwargs): kwargs.setdefault('nullable', False) kwargs['type_'] = UUID(as_uuid=True) return Column(*args, **kwargs)
def downgrade(): # ### commands auto generated by Alembic - please adjust! ### op.drop_constraint(None, 'source', type_='foreignkey') op.drop_index(op.f('ix_source_author_id'), table_name='source') op.drop_column('source', 'author_id') op.add_column( 'build', sa.Column('author_id', postgresql.UUID(), autoincrement=False, nullable=True) ) op.create_foreign_key('build_author_id_fkey', 'build', 'author', ['author_id'], ['id']) op.create_index('ix_build_author_id', 'build', ['author_id'], unique=False) # ### end Alembic commands ###
def downgrade(): # ### commands auto generated by Alembic - please adjust! ### op.add_column('filecoverage', sa.Column( 'job_id', postgresql.UUID(), autoincrement=False, nullable=False)) op.drop_constraint(None, 'filecoverage', type_='foreignkey') op.create_foreign_key('filecoverage_job_id_fkey', 'filecoverage', 'job', [ 'job_id'], ['id'], ondelete='CASCADE') op.create_unique_constraint( 'unq_job_filname', 'filecoverage', ['job_id', 'filename']) op.drop_constraint('unq_coverage_filname', 'filecoverage', type_='unique') op.drop_index(op.f('ix_filecoverage_build_id'), table_name='filecoverage') op.drop_column('filecoverage', 'build_id') # ### end Alembic commands ###
def process_bind_param(self, value, dialect): if value is None: return value elif dialect.name == 'postgresql': return str(value) else: if not isinstance(value, uuid.UUID): return "%.32x" % uuid.UUID(value) else: # hexstring return "%.32x" % value
def __init__(self, as_uuid=True): super(UUID, self).__init__(binary=True, native=True)
def load_dialect_impl(self, dialect): if dialect.name == 'postgresql' and self.native: # Use the native UUID type. return dialect.type_descriptor(postgresql.UUID()) else: # Fallback to either a BINARY or a CHAR. kind = types.BINARY(16) return dialect.type_descriptor(kind)
def auto_uuid(): uuid_gen_expr = sa.text('uuid_generate_v4()') return sa.Column(sap.UUID(as_uuid=True), primary_key=True, server_default=uuid_gen_expr)
def load_dialect_impl(self, dialect): if dialect.name == 'postgresql': return dialect.type_descriptor(UUID()) elif dialect.name == 'oracle': return dialect.type_descriptor(RAW(16)) elif dialect.name == 'mysql': return dialect.type_descriptor(BINARY(16)) else: return dialect.type_descriptor(CHAR(32))
def process_bind_param(self, value, dialect): if value is None: return value elif dialect.name == 'postgresql': return str(value).lower() elif dialect.name == 'oracle': return uuid.UUID(value).bytes elif dialect.name == 'mysql': return uuid.UUID(value).bytes else: if not isinstance(value, uuid.UUID): return "%.32x" % uuid.UUID(value) else: # hexstring return "%.32x" % value
def process_result_value(self, value, dialect): if value is None: return value elif dialect.name == 'oracle': return str(uuid.UUID(bytes=value)).replace('-', '').lower() elif dialect.name == 'mysql': return str(uuid.UUID(bytes=value)).replace('-', '').lower() else: return str(uuid.UUID(value)).replace('-', '').lower()
def load_dialect_impl(self, dialect): if dialect.name == 'postgresql' and self.native: # Use the native UUID type. return dialect.type_descriptor(postgresql.UUID()) else: # Fallback to either a BINARY or a CHAR. kind = self.impl if self.binary else types.CHAR(32) return dialect.type_descriptor(kind)
def _coerce(value): if value and not isinstance(value, uuid.UUID): try: value = uuid.UUID(value) except (TypeError, ValueError): value = uuid.UUID(bytes=value) return value
def process_bind_param(self, value, dialect): if value is None: return value if not isinstance(value, uuid.UUID): value = self._coerce(value) if self.native and dialect.name == 'postgresql': return str(value) return value.bytes if self.binary else value.hex
def process_result_value(self, value, dialect): if value is None: return value if self.native and dialect.name == 'postgresql': if isinstance(value, uuid.UUID): # Some drivers convert PostgreSQL's uuid values to # Python's uuid.UUID objects by themselves return value return uuid.UUID(value) return uuid.UUID(bytes=value) if self.binary else uuid.UUID(value)
def upgrade(): if current_env in ['test', 'dev']: users_table = table( 'users', sa.Column('id', postgresql.UUID, server_default=sa.text('uuid_generate_v1()'), primary_key=True), sa.Column('name', sa.Text, nullable=False, unique=True), sa.Column('password', sa.Text, nullable=False), sa.Column('modified', sa.DateTime, server_default=sa.text('clock_timestamp()')), sa.Column('created', sa.DateTime, server_default=sa.text('now()')) ) op.bulk_insert(users_table, [{'name': "test_user", 'password': hash_password('test123')}])
def upgrade(): connection = op.get_bind() connection.execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp";') op.create_table( 'users', sa.Column('id', postgresql.UUID, server_default=sa.text('uuid_generate_v1()'), primary_key=True), sa.Column('name', sa.Text, nullable=False, unique=True), sa.Column('password', sa.Text, nullable=False), sa.Column('modified', sa.DateTime, server_default=sa.text('clock_timestamp()')), sa.Column('created', sa.DateTime, server_default=sa.text('now()')), )
def get_species_birth_generation(self, scenario_id, species_id, session=None): if session is None: session = self.Session() return self.get_individuals(scenario_id=scenario_id, session=session) \ .with_entities(Individual.generation) \ .filter(Individual.species == uuid.UUID(int=species_id)) \ .order_by(Individual.generation) \ .first() \ .generation
def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp";') op.create_table('translated_string', sa.Column('id', postgresql.UUID(as_uuid=True), server_default=sa.text('uuid_generate_v4()'), nullable=False), sa.Column('base_string', sa.Text(), nullable=True), sa.Column('translation', sa.Text(), nullable=True), sa.Column('comment', sa.Text(), nullable=True), sa.Column('translator_comment', sa.Text(), nullable=True), sa.Column('context', sa.Text(), nullable=True), sa.PrimaryKeyConstraint('id') ) # ### end Alembic commands ###
def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.create_table('po_metadata', sa.Column('id', postgresql.UUID(as_uuid=True), server_default=sa.text('uuid_generate_v4()'), nullable=False), sa.Column('resource_pk', postgresql.UUID(as_uuid=True), nullable=False), sa.Column('language_pk', sa.Integer(), nullable=False), sa.Column('key', sa.Text(), nullable=False), sa.Column('value', sa.Text(), nullable=False), sa.ForeignKeyConstraint(['language_pk'], ['language.id'], ), sa.ForeignKeyConstraint(['resource_pk'], ['resource.id'], ), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('resource_pk', 'language_pk', 'key', name='resource_language_key_uc') ) # ### end Alembic commands ###
def upgrade(): op.create_table('resource', sa.Column('id', postgresql.UUID(as_uuid=True), server_default=sa.text('uuid_generate_v4()'), nullable=False), sa.PrimaryKeyConstraint('id') ) op.add_column('translated_string', sa.Column('resource_pk', postgresql.UUID(as_uuid=True), nullable=False)) op.create_foreign_key(None, 'translated_string', 'resource', ['resource_pk'], ['id'])
def downgrade(): op.drop_table('translated_string') op.create_table('translated_string', sa.Column('id', postgresql.UUID(), server_default=sa.text('uuid_generate_v4()'), autoincrement=False, nullable=False), sa.Column('base_string', sa.TEXT(), autoincrement=False, nullable=False), sa.Column('translation', sa.TEXT(), autoincrement=False, nullable=False), sa.Column('comment', sa.TEXT(), autoincrement=False, nullable=False), sa.Column('translator_comment', sa.TEXT(), autoincrement=False, nullable=False), sa.Column('context', sa.TEXT(), autoincrement=False, nullable=False), sa.Column('resource_pk', postgresql.UUID(), autoincrement=False, nullable=False), sa.ForeignKeyConstraint(['resource_pk'], ['resource.id'], name='translated_string_resource_pk_fkey'), sa.PrimaryKeyConstraint('id', name='translated_string_pkey') ) op.drop_table('base_string')
def process_bind_param(self, value, dialect): if value is None: return value elif dialect.name == 'postgresql': return str(value) else: if not isinstance(value, uuid.UUID): return '%.32x' % uuid.UUID(value).int else: # hexstring return '%.32x' % value.int