我们从Python开源项目中,提取了以下34个代码示例,用于说明如何使用sqlalchemy.types.UnicodeText()。
def define_user_notification_table(): global user_notification_table user_notification_table = Table('ckanext_requestdata_user_notification', metadata, Column('id', types.UnicodeText, primary_key=True, default=make_uuid), Column('package_maintainer_id', types.UnicodeText, nullable=False), Column('seen', types.Boolean, default=False), Index('ckanext_requestdata_user_' 'notification_id_idx', 'id')) mapper( ckanextUserNotification, user_notification_table )
def define_maintainers_table(): global maintainers_table maintainers_table = Table('ckanext_requestdata_maintainers', metadata, Column('id', types.UnicodeText, primary_key=True, default=make_uuid), Column('request_data_id', types.UnicodeText, ForeignKey('ckanext_requestdata_' 'requests.id')), Column('maintainer_id', types.UnicodeText), Column('email', types.UnicodeText), Index('ckanext_requestdata_maintainers_id_idx', 'id')) mapper( ckanextMaintainers, maintainers_table )
def result_processor(self, dialect, coltype): lob_processor = _LOBMixin.result_processor(self, dialect, coltype) if lob_processor is None: return None string_processor = sqltypes.UnicodeText.result_processor( self, dialect, coltype) if string_processor is None: return lob_processor else: def process(value): return string_processor(lob_processor(value)) return process
def setup_spatial_table(package_extent_class, db_srid=None): if legacy_geoalchemy: package_extent_table = Table( 'package_extent', meta.metadata, Column('package_id', types.UnicodeText, primary_key=True), GeometryExtensionColumn('the_geom', Geometry(2, srid=db_srid)) ) meta.mapper( package_extent_class, package_extent_table, properties={'the_geom': GeometryColumn(package_extent_table.c.the_geom, comparator=PGComparator)} ) GeometryDDL(package_extent_table) else: # PostGIS 1.5 requires management=True when defining the Geometry # field management = (postgis_version()[:1] == '1') package_extent_table = Table( 'package_extent', meta.metadata, Column('package_id', types.UnicodeText, primary_key=True), Column('the_geom', Geometry('GEOMETRY', srid=db_srid, management=management)), ) meta.mapper(package_extent_class, package_extent_table) return package_extent_table
def upgrade(migrate_engine): metadata = MetaData() metadata.bind = migrate_engine group_table = Table('group', metadata, autoload=True) group_extra_table = Table('group_extra', metadata, Column('id', UnicodeText, primary_key=True, default=make_uuid), Column('group_id', UnicodeText, ForeignKey('group.id')), Column('key', UnicodeText), Column('value', JsonType), ) group_extra_table.create()
def define_request_data_table(): global request_data_table request_data_table = Table('ckanext_requestdata_requests', metadata, Column('id', types.UnicodeText, primary_key=True, default=make_uuid), Column('sender_name', types.UnicodeText, nullable=False), Column('sender_user_id', types.UnicodeText, nullable=False), Column('email_address', types.UnicodeText, nullable=False), Column('message_content', types.UnicodeText, nullable=False), Column('package_id', types.UnicodeText, nullable=False), Column('state', types.UnicodeText, default=u'new'), Column('data_shared', types.Boolean, default=False), Column('rejected', types.Boolean, default=False), Column('created_at', types.DateTime, default=datetime.datetime.now), Column('modified_at', types.DateTime, default=datetime.datetime.now), Index('ckanext_requestdata_requests_id_idx', 'id')) mapper( ckanextRequestdata, request_data_table )
def define_request_data_counters_table(): global request_data_counters_table request_data_counters_table = Table('ckanext_requestdata_counters', metadata, Column('id', types.UnicodeText, primary_key=True, default=make_uuid), Column('package_id', types.UnicodeText), Column('org_id', types.UnicodeText), Column('requests', types.Integer, default=0), Column('replied', types.Integer, default=0), Column('declined', types.Integer, default=0), Column('shared', types.Integer, default=0), Index('ckanext_requestdata_counters_' 'id_idx', 'id')) mapper( ckanextRequestDataCounters, request_data_counters_table )
def result_processor(self, dialect, coltype): lob_processor = _LOBMixin.result_processor(self, dialect, coltype) if lob_processor is None: return None string_processor = sqltypes.UnicodeText.result_processor(self, dialect, coltype) if string_processor is None: return lob_processor else: def process(value): return string_processor(lob_processor(value)) return process
def define_security_member_table(): global security_member_table security_member_table = Table( 'security_member', meta.metadata, Column( 'id', types.Integer, primary_key=True, autoincrement=True), Column( 'member_id', types.UnicodeText, primary_key=True, nullable=False), Column('user_id', types.UnicodeText, nullable=False), Column('group_id', types.UnicodeText, nullable=False), Column('state', types.UnicodeText, nullable=False), Column('dataset_type', types.UnicodeText, nullable=False), Column('classification', types.UnicodeText, nullable=False), ForeignKeyConstraint( ['member_id'], ['member.id'], onupdate="CASCADE", ondelete="CASCADE" ), UniqueConstraint('member_id', 'user_id', 'group_id', 'dataset_type', name='uix_1') ) meta.mapper(SecurityMember, security_member_table)
def setup(): """ Set up ORM. Does not create any database tables, see :py:func:`create_tables` for that. """ global resource_metadata_table if resource_metadata_table is None: log.debug('Defining resource metadata table') resource_metadata_table = Table( RESOURCE_METADATA_TABLE_NAME, metadata, Column('resource_id', types.UnicodeText, ForeignKey('resource.id', ondelete='CASCADE', onupdate='CASCADE'), nullable=False, primary_key=True), Column('last_extracted', types.DateTime), Column('last_url', types.UnicodeText), Column('last_format', types.UnicodeText), Column('task_id', types.UnicodeText) ) mapper( ResourceMetadata, resource_metadata_table, properties={ '_meta': relationship(ResourceMetadatum, collection_class= attribute_mapped_collection('key'), cascade='all, delete, delete-orphan'), } ) else: log.debug('Resource metadata table already defined') global resource_metadatum_table if resource_metadatum_table is None: log.debug('Defining resource metadatum table') resource_metadatum_table = Table( RESOURCE_METADATUM_TABLE_NAME, metadata, Column('id', types.Integer, nullable=False, primary_key=True), Column('resource_id', types.UnicodeText, ForeignKey( RESOURCE_METADATA_TABLE_NAME + '.resource_id', ondelete='CASCADE', onupdate='CASCADE'), nullable=False), Column('key', types.UnicodeText, nullable=False), Column('value', types.UnicodeText) ) mapper(ResourceMetadatum, resource_metadatum_table) else: log.debug('Resource metadatum table already defined')