我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.sql.text()。
def get_temp_table_names(self, connection, **kw): schema = self.denormalize_name(self.default_schema_name) sql_str = "SELECT table_name FROM all_tables WHERE " if self.exclude_tablespaces: sql_str += ( "nvl(tablespace_name, 'no tablespace') " "NOT IN (%s) AND " % ( ', '.join(["'%s'" % ts for ts in self.exclude_tablespaces]) ) ) sql_str += ( "OWNER = :owner " "AND IOT_NAME IS NULL " "AND DURATION IS NOT NULL") cursor = connection.execute(sql.text(sql_str), owner=schema) return [self.normalize_name(row[0]) for row in cursor]
def define_constraint_cascades(self, constraint): text = "" if constraint.ondelete is not None: text += " ON DELETE %s" % constraint.ondelete # oracle has no ON UPDATE CASCADE - # its only available via triggers # http://asktom.oracle.com/tkyte/update_cascade/index.html if constraint.onupdate is not None: util.warn( "Oracle does not contain native UPDATE CASCADE " "functionality - onupdates will not be rendered for foreign " "keys. Consider using deferrable=True, initially='deferred' " "or triggers.") return text
def get_view_definition(self, connection, view_name, schema=None, resolve_synonyms=False, dblink='', **kw): info_cache = kw.get('info_cache') (view_name, schema, dblink, synonym) = \ self._prepare_reflection_args(connection, view_name, schema, resolve_synonyms, dblink, info_cache=info_cache) params = {'view_name': view_name} text = "SELECT text FROM all_views WHERE view_name=:view_name" if schema is not None: text += " AND owner = :schema" params['schema'] = schema rp = connection.execute(sql.text(text), **params).scalar() if rp: if util.py2k: rp = rp.decode(self.encoding) return rp else: return None
def get_table_names(self, connection, schema=None, **kw): if schema is None: schema = self.default_schema_name TABLE_SQL = text(""" SELECT o.name AS name FROM sysobjects o JOIN sysusers u ON o.uid = u.uid WHERE u.name = :schema_name AND o.type = 'U' """) if util.py2k: if isinstance(schema, unicode): schema = schema.encode("ascii") tables = connection.execute(TABLE_SQL, schema_name=schema) return [t["name"] for t in tables]
def get_view_definition(self, connection, view_name, schema=None, **kw): if schema is None: schema = self.default_schema_name VIEW_DEF_SQL = text(""" SELECT c.text FROM syscomments c JOIN sysobjects o ON c.id = o.id WHERE o.name = :view_name AND o.type = 'V' """) if util.py2k: if isinstance(view_name, unicode): view_name = view_name.encode("ascii") view = connection.execute(VIEW_DEF_SQL, view_name=view_name) return view.scalar()
def get_view_names(self, connection, schema=None, **kw): if schema is None: schema = self.default_schema_name VIEW_SQL = text(""" SELECT o.name AS name FROM sysobjects o JOIN sysusers u ON o.uid = u.uid WHERE u.name = :schema_name AND o.type = 'V' """) if util.py2k: if isinstance(schema, unicode): schema = schema.encode("ascii") views = connection.execute(VIEW_SQL, schema_name=schema) return [v["name"] for v in views]
def _literal_as_text(element, warn=False): if isinstance(element, Visitable): return element elif hasattr(element, '__clause_element__'): return element.__clause_element__() elif isinstance(element, util.string_types): if warn: util.warn_limited( "Textual SQL expression %(expr)r should be " "explicitly declared as text(%(expr)r)", {"expr": util.ellipses_string(element)}) return TextClause(util.text_type(element)) elif isinstance(element, (util.NoneType, bool)): return _const_expr(element) else: raise exc.ArgumentError( "SQL expression object or string expected, got object of type %r " "instead" % type(element) )
def fix_task_date(): """Fix Date format in Task.""" import re from datetime import datetime with app.app_context(): query = text('''SELECT id, created FROM task WHERE created LIKE ('%Date%')''') results = db.engine.execute(query) tasks = results.fetchall() for task in tasks: # It's in miliseconds timestamp = int(re.findall(r'\d+', task.created)[0]) print timestamp # Postgresql expects this format 2015-05-21T13:19:06.471074 fixed_created = datetime.fromtimestamp(timestamp/1000)\ .replace(microsecond=timestamp%1000*1000)\ .strftime('%Y-%m-%dT%H:%M:%S.%f') query = text('''UPDATE task SET created=:created WHERE id=:id''') db.engine.execute(query, created=fixed_created, id=task.id)
def add_custom_contrib_button_to(project, user_id_or_ip): """Add a customized contrib button for a project.""" if type(project) != dict: project = project.dictize() project['contrib_button'] = check_contributing_state(project, **user_id_or_ip) query = text(''' SELECT COUNT(id) as ct from blogpost WHERE project_id=:project_id; ''') results = session.execute(query, dict(project_id=project['id'])) for row in results: project['n_blogposts'] = row.ct project['n_results'] = n_results(project['id']) return project
def rank_and_score(user_id): """Return rank and score for a user.""" # See: https://gist.github.com/tokumine/1583695 sql = text(''' WITH global_rank AS ( WITH scores AS ( SELECT user_id, COUNT(*) AS score FROM task_run WHERE user_id IS NOT NULL GROUP BY user_id) SELECT user_id, score, rank() OVER (ORDER BY score desc) FROM scores) SELECT * from global_rank WHERE user_id=:user_id; ''') results = session.execute(sql, dict(user_id=user_id)) rank_and_score = dict(rank=None, score=None) for row in results: rank_and_score['rank'] = row.rank rank_and_score['score'] = row.score return rank_and_score
def projects_contributed(user_id): """Return projects that user_id has contributed to.""" sql = text(''' WITH projects_contributed as (SELECT DISTINCT(project_id) FROM task_run WHERE user_id=:user_id) SELECT project.id, project.name, project.short_name, project.owner_id, project.description, project.info FROM project, projects_contributed WHERE project.id=projects_contributed.project_id ORDER BY project.name DESC; ''') results = session.execute(sql, dict(user_id=user_id)) projects_contributed = [] for row in results: project = dict(id=row.id, name=row.name, short_name=row.short_name, owner_id=row.owner_id, description=row.description, overall_progress=overall_progress(row.id), n_tasks=n_tasks(row.id), n_volunteers=n_volunteers(row.id), info=row.info) projects_contributed.append(project) return projects_contributed
def published_projects(user_id): """Return published projects for user_id.""" sql = text(''' SELECT project.id, project.name, project.short_name, project.description, project.owner_id, project.info FROM project WHERE project.published=true AND project.owner_id=:user_id; ''') projects_published = [] results = session.execute(sql, dict(user_id=user_id)) for row in results: project = dict(id=row.id, name=row.name, short_name=row.short_name, owner_id=row.owner_id, description=row.description, overall_progress=overall_progress(row.id), n_tasks=n_tasks(row.id), n_volunteers=n_volunteers(row.id), info=row.info) projects_published.append(project) return projects_published
def draft_projects(user_id): """Return draft projects for user_id.""" sql = text(''' SELECT project.id, project.name, project.short_name, project.description, project.owner_id, project.info FROM project WHERE project.owner_id=:user_id AND project.published=false; ''') projects_draft = [] results = session.execute(sql, dict(user_id=user_id)) for row in results: project = dict(id=row.id, name=row.name, short_name=row.short_name, owner_id=row.owner_id, description=row.description, overall_progress=overall_progress(row.id), n_tasks=n_tasks(row.id), n_volunteers=n_volunteers(row.id), info=row.info) projects_draft.append(project) return projects_draft
def get_top5_projects_24_hours(): """Return the top 5 projects more active in the last 24 hours.""" # Top 5 Most active projects in last 24 hours sql = text('''SELECT project.id, project.name, project.short_name, project.info, COUNT(task_run.project_id) AS n_answers FROM project, task_run WHERE project.id=task_run.project_id AND DATE(task_run.finish_time) > NOW() - INTERVAL '24 hour' AND DATE(task_run.finish_time) <= NOW() GROUP BY project.id ORDER BY n_answers DESC LIMIT 5;''') results = session.execute(sql, dict(limit=5)) top5_apps_24_hours = [] for row in results: tmp = dict(id=row.id, name=row.name, short_name=row.short_name, info=row.info, n_answers=row.n_answers) top5_apps_24_hours.append(tmp) return top5_apps_24_hours
def get_top5_users_24_hours(): """Return top 5 users in last 24 hours.""" # Top 5 Most active users in last 24 hours sql = text('''SELECT "user".id, "user".fullname, "user".name, COUNT(task_run.project_id) AS n_answers FROM "user", task_run WHERE "user".id=task_run.user_id AND DATE(task_run.finish_time) > NOW() - INTERVAL '24 hour' AND DATE(task_run.finish_time) <= NOW() GROUP BY "user".id ORDER BY n_answers DESC LIMIT 5;''') results = session.execute(sql, dict(limit=5)) top5_users_24_hours = [] for row in results: user = dict(id=row.id, fullname=row.fullname, name=row.name, n_answers=row.n_answers) top5_users_24_hours.append(user) return top5_users_24_hours
def get_top(n=4): """Return top n=4 projects.""" sql = text('''SELECT project.id, project.name, project.short_name, project.description, project.info, COUNT(project_id) AS total FROM task_run, project WHERE project_id IS NOT NULL AND project.id=project_id AND (project.info->>'passwd_hash') IS NULL GROUP BY project.id ORDER BY total DESC LIMIT :limit;''') results = session.execute(sql, dict(limit=n)) top_projects = [] for row in results: project = dict(id=row.id, name=row.name, short_name=row.short_name, description=row.description, info=row.info, n_volunteers=n_volunteers(row.id), n_completed_tasks=n_completed_tasks(row.id)) top_projects.append(Project().to_public_json(project)) return top_projects #@memoize(timeout=timeouts.get('BROWSE_TASKS_TIMEOUT'))
def browse_tasks(project_id, limit=10, offset=0): """Cache browse tasks view for a project.""" sql = text(''' SELECT task.id, task.n_answers, sum(counter.n_task_runs) as n_task_runs FROM task, counter WHERE task.id=counter.task_id and task.project_id=:project_id GROUP BY task.id ORDER BY task.id ASC LIMIT :limit OFFSET :offset ''') results = session.execute(sql, dict(project_id=project_id, limit=limit, offset=offset)) tasks = [] for row in results: task = dict(id=row.id, n_task_runs=row.n_task_runs, n_answers=row.n_answers) task['pct_status'] = _pct_status(row.n_task_runs, row.n_answers) tasks.append(task) return tasks
def n_count(category): """Count the number of projects in a given category.""" if category == 'featured': return _n_featured() if category == 'draft': return _n_draft() sql = text(''' WITH uniq AS ( SELECT COUNT(project.id) FROM project LEFT OUTER JOIN category ON project.category_id=category.id WHERE category.short_name=:category AND project.published=true AND (project.info->>'passwd_hash') IS NULL GROUP BY project.id) SELECT COUNT(*) FROM uniq ''') results = session.execute(sql, dict(category=category)) count = 0 for row in results: count = row[0] return count
def test_create_dict_job(self): """Test JOB create dict job works.""" user = UserFactory.create(pro=True) project = ProjectFactory.create(owner=user) from sqlalchemy.sql import text from pybossa.core import db sql = text('''SELECT project.id, project.short_name FROM project, "user" WHERE project.owner_id="user".id AND "user".pro=True;''') results = db.slave_session.execute(sql) jobs_generator = create_dict_jobs(results, get_project_stats, (10 * 60)) jobs = [] for job in jobs_generator: jobs.append(job) err_msg = "There should be only one job" assert len(jobs) == 1, err_msg job = jobs[0] assert 'get_project_stats' in job['name'].__name__ assert job['args'] == [project.id, project.short_name]
def tabularasa(self): """Truncate all tables. Used for testing to truncate all tables so the database is clean. """ table_names = [ 'tasks', 'result_message', 'active_instance', 'boot_action', 'boot_action_status', ] conn = self.db_engine.connect() for t in table_names: query_text = sql.text( "TRUNCATE TABLE %s" % t).execution_options(autocommit=True) conn.execute(query_text) conn.close()
def update_node_ceph_flags(): hosts_query = text("SELECT hostname from nodes") hosts = [item[0] for item in db.session.execute(hosts_query).fetchall()] hosts_with_ceph = [] for host in hosts: if not tasks.is_ceph_installed_on_node(host): continue hosts_with_ceph.append(host) if not hosts_with_ceph: return flags_query =\ "INSERT INTO node_flags (node_id, created, deleted, flag_name, flag_value) "\ "SELECT id, NOW() at time zone 'utc', NULL, 'ceph_installed', 'true' FROM nodes "\ "WHERE hostname IN ({})".format( ', '.join("'" + host + "'" for host in hosts_with_ceph) ) db.session.execute(text(flags_query)) db.session.commit() for host in hosts_with_ceph: tasks.add_k8s_node_labels( host, {NODE_CEPH_AWARE_KUBERDOCK_LABEL: "True"})
def get_table_names(self, connection, schema=None, **kw): schema = self.denormalize_name(schema or self.default_schema_name) # note that table_names() isn't loading DBLINKed or synonym'ed tables if schema is None: schema = self.default_schema_name sql_str = "SELECT table_name FROM all_tables WHERE " if self.exclude_tablespaces: sql_str += ( "nvl(tablespace_name, 'no tablespace') " "NOT IN (%s) AND " % ( ', '.join(["'%s'" % ts for ts in self.exclude_tablespaces]) ) ) sql_str += ( "OWNER = :owner " "AND IOT_NAME IS NULL " "AND DURATION IS NULL") cursor = connection.execute(sql.text(sql_str), owner=schema) return [self.normalize_name(row[0]) for row in cursor]
def get_select_hint_text(self, byfroms): return " ".join( "/*+ %s */" % text for table, text in byfroms.items() )
def visit_create_index(self, create): index = create.element self._verify_index_table(index) preparer = self.preparer text = "CREATE " if index.unique: text += "UNIQUE " if index.dialect_options['oracle']['bitmap']: text += "BITMAP " text += "INDEX %s ON %s (%s)" % ( self._prepared_index_name(index, include_schema=True), preparer.format_table(index.table, use_schema=True), ', '.join( self.sql_compiler.process( expr, include_table=False, literal_binds=True) for expr in index.expressions) ) if index.dialect_options['oracle']['compress'] is not False: if index.dialect_options['oracle']['compress'] is True: text += " COMPRESS" else: text += " COMPRESS %d" % ( index.dialect_options['oracle']['compress'] ) return text
def has_table(self, connection, table_name, schema=None): if not schema: schema = self.default_schema_name cursor = connection.execute( sql.text("SELECT table_name FROM all_tables " "WHERE table_name = :name AND owner = :schema_name"), name=self.denormalize_name(table_name), schema_name=self.denormalize_name(schema)) return cursor.first() is not None