我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用sqlalchemy.UnicodeText()。
def _init_jobs_table(): """Initialise the "jobs" table in the db.""" _jobs_table = sqlalchemy.Table( 'jobs', _METADATA, sqlalchemy.Column('job_id', sqlalchemy.UnicodeText, primary_key=True), sqlalchemy.Column('job_type', sqlalchemy.UnicodeText), sqlalchemy.Column('status', sqlalchemy.UnicodeText, index=True), sqlalchemy.Column('data', sqlalchemy.UnicodeText), sqlalchemy.Column('error', sqlalchemy.UnicodeText), sqlalchemy.Column('requested_timestamp', sqlalchemy.DateTime), sqlalchemy.Column('finished_timestamp', sqlalchemy.DateTime), sqlalchemy.Column('sent_data', sqlalchemy.UnicodeText), # Callback URL: sqlalchemy.Column('result_url', sqlalchemy.UnicodeText), # CKAN API key: sqlalchemy.Column('api_key', sqlalchemy.UnicodeText), ) return _jobs_table
def downgrade(pyramid_env): # with context.begin_transaction(): # op.add_column("content", sa.Column( # "subject", sa.Unicode, server_default="")) # op.add_column("content", sa.Column( # "body", sa.UnicodeText, server_default="")) from assembl import models as m db = m.get_session_maker()() with transaction.manager: for target in ("subject", "body"): r = db.execute( """select content.id, langstring_entry.value from content join langstring_entry on content.{0}_id = langstring_entry.langstring_id join locale on langstring_entry.locale_id = locale.id where locale.code not like '%-x-mtfrom-%'""".format(target)) for id, text in r: if len(text): db.execute("UPDATE content set %s = :txt WHERE id= :id" % ( (target,)), dict(txt=text, id=id)) mark_changed()
def upgrade(pyramid_env): from assembl.semantic.virtuoso_mapping import get_session dbsession = get_session() dbsession.execute("SPARQL drop quad map quadnames:col_pattern_Post_subject") dbsession.commit() with context.begin_transaction(): op.add_column('content', sa.Column('subject', sa.Unicode, server_default="")) op.add_column('content', sa.Column('body', sa.UnicodeText, server_default="")) with context.begin_transaction(): op.execute( """UPDATE content SET subject = ( SELECT subject FROM post WHERE post.id = content.id) """) op.execute( """UPDATE content SET body = ( SELECT body FROM post WHERE post.id = content.id) """) from assembl.scripts.rebuild_tables import rebuild_table from assembl.models import Post # Thanks to https://github.com/openlink/virtuoso-opensource/issues/378 # The aim is to remove the columns from the table; make sure they are not in the model when you migrate rebuild_table(Post.__table__, True)
def upgrade(pyramid_env): with context.begin_transaction(): from assembl.models.post import PublicationStates schema = config.get('db_schema')+"."+config.get('db_user') op.add_column("post", sa.Column( "publication_state", PublicationStates.db_type(), nullable=False, server_default=PublicationStates.PUBLISHED.name), schema=schema) op.add_column("post", sa.Column( "moderator_id", sa.Integer, sa.ForeignKey( 'user.id', ondelete='SET NULL', onupdate='CASCADE'), nullable=True,)) op.add_column("post", sa.Column( "moderated_on", sa.DateTime)) op.add_column("post", sa.Column( "moderation_text", sa.UnicodeText)) op.add_column("post", sa.Column( "moderator_comment", sa.UnicodeText))
def upgrade(): table_prefix = context.config.get_main_option('table_prefix') op.create_unique_constraint('_repo_name_unique', table_prefix + 'repos', ['repo_name']) op.alter_column(table_prefix + 'repos', 'user_key', type_=sa.UnicodeText(), existing_type=sa.String(255))
def downgrade(): table_prefix = context.config.get_main_option('table_prefix') op.drop_constraint('_repo_name_unique', table_prefix + 'repos') op.alter_column(table_prefix + 'repos', 'user_key', existing_type=sa.UnicodeText(), type_=sa.String(255))
def upgrade(): op.create_table( 'announcement', sa.Column('id', sa.Integer, primary_key=True), sa.Column('title', sa.Unicode(length=255), nullable=False), sa.Column('body', sa.UnicodeText, nullable=False), sa.Column('user_id', sa.Integer, sa.ForeignKey('user.id')), sa.Column('created', sa.Text, default=make_timestamp), )
def upgrade(): op.alter_column('app', 'long_description', type_=sa.UnicodeText) pass
def upgrade(): op.create_table( 'blogpost', sa.Column('id', sa.Integer, primary_key=True), sa.Column('title', sa.Unicode(length=255), nullable=False), sa.Column('body', sa.UnicodeText, nullable=False), sa.Column('app_id', sa.Integer, sa.ForeignKey('app.id', ondelete='CASCADE'), nullable=False), sa.Column('user_id', sa.Integer, sa.ForeignKey('user.id')), sa.Column('created', sa.Text, default=make_timestamp), )
def _init_metadata_table(): """Initialise the "metadata" table in the db.""" _metadata_table = sqlalchemy.Table( 'metadata', _METADATA, sqlalchemy.Column( 'job_id', sqlalchemy.ForeignKey("jobs.job_id", ondelete="CASCADE"), nullable=False, primary_key=True), sqlalchemy.Column('key', sqlalchemy.UnicodeText, primary_key=True), sqlalchemy.Column('value', sqlalchemy.UnicodeText, index=True), sqlalchemy.Column('type', sqlalchemy.UnicodeText), ) return _metadata_table
def _init_logs_table(): """Initialise the "logs" table in the db.""" _logs_table = sqlalchemy.Table( 'logs', _METADATA, sqlalchemy.Column( 'job_id', sqlalchemy.ForeignKey("jobs.job_id", ondelete="CASCADE"), nullable=False), sqlalchemy.Column('timestamp', sqlalchemy.DateTime), sqlalchemy.Column('message', sqlalchemy.UnicodeText), sqlalchemy.Column('level', sqlalchemy.UnicodeText), sqlalchemy.Column('module', sqlalchemy.UnicodeText), sqlalchemy.Column('funcName', sqlalchemy.UnicodeText), sqlalchemy.Column('lineno', sqlalchemy.Integer) ) return _logs_table
def new_SomeObject(self): class SomeObject(Base): __tablename__ = 'someobject' id = Column(Integer, primary_key=True) done_flag = Column(Boolean, nullable=False, default=False) name = Column(UnicodeText, nullable=False) deadline_flag = Column(Boolean, nullable=False, default=False) def make_done(self): self.done_flag = True def deadline_reached(self): self.deadline_flag = True return SomeObject
def new_MyDeferredAction(self): fixture = self class MyDeferredAction(DeferredAction): __tablename__ = 'mydeferredaction' __mapper_args__ = {'polymorphic_identity': 'mydeferredaction'} id = Column(Integer, ForeignKey(DeferredAction.id), primary_key=True) some_object_key = Column(UnicodeText, nullable=False) def __init__(self, some_object, **kwargs): super(MyDeferredAction, self).__init__(some_object_key=some_object.name, **kwargs) def success_action(self): Session.query(fixture.SomeObject).filter_by(name=self.some_object_key).one().make_done() def deadline_action(self): Session.query(fixture.SomeObject).filter_by(name=self.some_object_key).one().deadline_reached() return MyDeferredAction
def schedule_upgrades(self): self.schedule('alter', op.add_column, 'persistedfile', Column('mime_type', UnicodeText, nullable=False)) self.schedule('data', op.execute, 'update persistedfile set mime_type=content_type') self.schedule('cleanup', op.drop_column, 'persistedfile', 'content_type')
def __init__(self, db_url, tablename, echo=False, aliases={}, prefixes={}): """ aliases -- dict mapping resource aliases to IRIs, e.g. { u'wde:Female' : u'http://www.wikidata.org/entity/Q6581072', u'wde:Male' : u'http://www.wikidata.org/entity/Q6581097', } prefixes -- dict mapping aliases to IRIs, e.g. { 'dbo' : 'http://dbpedia.org/ontology/', 'dbr' : 'http://dbpedia.org/resource/', 'dbp' : 'http://dbpedia.org/property/', } """ self.db_url = db_url self.aliases = aliases self.prefixes = prefixes self.metadata = MetaData() self.quads = Table(tablename, self.metadata, Column('id', Integer, primary_key=True), Column('s', UnicodeText, index=True), Column('p', UnicodeText, index=True), Column('o', UnicodeText, index=True), Column('context', UnicodeText, index=True), Column('lang', String, index=True), Column('datatype', String), ) Index('idx_%s_spo' % tablename, self.quads.c.s, self.quads.c.p, self.quads.c.o) self.engine = create_engine(db_url, echo=echo) self.metadata.create_all(self.engine)
def downgrade(pyramid_env): with context.begin_transaction(): op.add_column('extract', sa.Column('body', sa.UnicodeText))
def downgrade(pyramid_env): with context.begin_transaction(): op.add_column( 'timeline_event', sa.Column('title', sa.Unicode())) op.add_column( 'timeline_event', sa.Column('description', sa.UnicodeText)) # Do stuff with the app's models here. from assembl import models as m db = m.get_session_maker()() with transaction.manager: # This assumes as single LangStringEntry per timeline event. # Not true in general, but enough to revert the upgrade. data = list(db.execute( """SELECT te.id, title.langstring_id, description.langstring_id, title.value, description.value FROM timeline_event AS te JOIN langstring_entry AS title ON title.langstring_id = te.title_id LEFT JOIN langstring_entry AS description ON description.langstring_id = te.description_id""")) ids = [] if data: for ev_id, t_id, d_id, title, description in data: db.execute( 'UPDATE timeline_event SET title=:title, description=:desc WHERE id = :id', {"title": title, "desc": description, "id": ev_id}) ids.extend([str(t_id), str(d_id)]) mark_changed() with context.begin_transaction(): op.drop_column('timeline_event', 'title_id') op.drop_column('timeline_event', 'description_id') op.drop_column('timeline_event', 'image_url') # after title_id and description_id are gone if ids: ids = (",".join(ids),) op.execute( "DELETE FROM langstring_entry WHERE langstring_id IN (%s)" % ids) op.execute( "DELETE FROM langstring WHERE id IN (%s)" % ids) op.alter_column('timeline_event', 'title', nullable=False)
def upgrade(pyramid_env): with context.begin_transaction(): op.add_column('discussion', sa.Column( 'objectives', sa.UnicodeText))
def downgrade(pyramid_env): from assembl.semantic.virtuoso_mapping import get_session from assembl.models import Content assert not ('body' in Content.__table__.c or 'subject' in Content.__table__.c), \ "Comment out the body and subject from Content to run the back migration" dbsession = get_session() try: dbsession.execute("SPARQL drop quad map quadnames:col_pattern_Content_subject") dbsession.commit() except: dbsession.rollback() op.add_column('post', sa.Column('subject', sa.Unicode, server_default="")) op.add_column('post', sa.Column('body', sa.UnicodeText, server_default="")) with context.begin_transaction(): op.execute( """UPDATE post SET subject = ( SELECT subject FROM content WHERE post.id = content.id) """) op.execute( """UPDATE post SET body = ( SELECT body FROM content WHERE content.id = post.id AND content.body IS NOT NULL) """) from assembl.scripts.rebuild_tables import rebuild_table rebuild_table(Content.__table__, True)
def upgrade(pyramid_env): with context.begin_transaction(): op.alter_column('extract', 'body', nullable=True, existing_nullable=False) op.add_column('text_fragment_identifier', sa.Column('body', sa.UnicodeText)) op.execute('''UPDATE text_fragment_identifier SET body = ( SELECT body FROM extract WHERE extract.id = text_fragment_identifier.extract_id AND length(body) > 0)''') op.execute('''UPDATE extract SET body = NULL WHERE id NOT IN ( SELECT idea_content_link.id FROM idea_content_link JOIN content ON (content.id=idea_content_link.content_id) WHERE content.type = 'webpage')''')
def upgrade(pyramid_env): with context.begin_transaction(): op.create_table( 'source_imapmailbox', sa.Column('id', sa.Integer, sa.ForeignKey( 'mailbox.id', ondelete='CASCADE', onupdate='CASCADE' ), primary_key=True), sa.Column('host', sa.String(1024), nullable=False), sa.Column('port', sa.Integer, nullable=False), sa.Column('username', sa.UnicodeText, nullable=False), sa.Column('use_ssl', sa.Boolean, default=True), sa.Column('password', sa.UnicodeText, nullable=False), ) op.drop_constraint('source_mailinglist_mailbox_id_id', 'source_mailinglist') op.execute( "INSERT INTO source_imapmailbox (id, host, port, username, use_ssl, \"password\") " "SELECT id, host, port, username, use_ssl, \"password\" " "FROM mailbox" ) op.drop_column('mailbox', 'host') op.drop_column('mailbox', 'port') op.drop_column('mailbox', 'username') op.drop_column('mailbox', 'use_ssl') op.drop_column('mailbox', 'password') op.create_foreign_key('source_mailinglist_source_imapmailbox_id_id', "source_mailinglist", "source_imapmailbox", ["id"], ["id"]) op.create_table( 'source_filesystemmailbox', sa.Column('id', sa.Integer, sa.ForeignKey( 'mailbox.id', ondelete='CASCADE', onupdate='CASCADE' ), primary_key=True), sa.Column('filesystem_path', sa.Unicode(), nullable=False) ) op.create_table( 'source_maildirmailbox', sa.Column('id', sa.Integer, sa.ForeignKey( 'source_filesystemmailbox.id', ondelete='CASCADE', onupdate='CASCADE' ), primary_key=True), )
def upgrade(pyramid_env): with context.begin_transaction(): op.create_table( "locale", sa.Column("id", sa.Integer, primary_key=True), sa.Column("code", sa.String(20), unique=True), sa.Column("rtl", sa.Boolean, server_default="0")) op.create_table( "locale_label", sa.Column("id", sa.Integer, primary_key=True), sa.Column( "named_locale_id", sa.Integer, sa.ForeignKey( "locale.id", ondelete="CASCADE", onupdate="CASCADE"), nullable=False), sa.Column( "locale_id_of_label", sa.Integer, sa.ForeignKey( "locale.id", ondelete="CASCADE", onupdate="CASCADE"), nullable=False), sa.Column("name", sa.Unicode)) op.create_table( "langstring", sa.Column('id', sa.Integer, primary_key=True)) op.create_table( "langstring_entry", sa.Column("id", sa.Integer, primary_key=True), sa.Column("langstring_id", sa.Integer, sa.ForeignKey( "langstring.id", ondelete="CASCADE"), nullable=False, index=True), sa.Column("locale_id", sa.Integer, sa.ForeignKey( "locale.id", ondelete="CASCADE", onupdate="CASCADE"), nullable=False), sa.Column("locale_identification_data", sa.String), sa.Column("locale_confirmed", sa.Boolean, server_default="0"), sa.Column("tombstone_date", sa.DateTime, server_default=None), sa.Column("value", sa.UnicodeText), sa.schema.UniqueConstraint( "langstring_id", "locale_id", "tombstone_date")) # Do stuff with the app's models here. from assembl import models as m db = m.get_session_maker()() import simplejson as json names = json.load(open('assembl/nlp/data/language-names.json')) with transaction.manager: locales = {x[0] for x in names}.union({x[1] for x in names}) for l in locales: parts = l.split("_") rtl = parts[0] in rtl_locales or "_".join(parts[:2]) in rtl_locales db.add(m.Locale(code=l, rtl=rtl)) with transaction.manager: c = m.Locale.locale_collection for (l, t, n) in names: db.add(m.LocaleLabel(named_locale_id=c[l], locale_id_of_label=c[t], name=n))