我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.text()。
def upgrade(): op.execute(sa.schema.CreateSequence(sa.Sequence('pod_states_id_seq'))) op.add_column('pod_states', sa.Column('id', sa.Integer(), nullable=False, server_default=sa.text("nextval('pod_states_id_seq'::regclass)"))) op.execute("ALTER TABLE pod_states DROP CONSTRAINT pod_states_pkey, " "ADD CONSTRAINT pod_states_pkey PRIMARY KEY (id);") op.add_column('container_states', sa.Column('exit_code', sa.Integer(), nullable=True)) op.add_column('container_states', sa.Column('pod_state_id', sa.Integer(), nullable=True)) op.add_column('container_states', sa.Column('reason', sa.Text(), nullable=True)) op.create_index('ix_pod_id_start_time', 'pod_states', ['pod_id', 'start_time'], unique=True) op.create_foreign_key('container_states_pod_state_id_fkey', 'container_states', 'pod_states', ['pod_state_id'], ['id']) upgrade_data() op.alter_column('container_states', 'pod_state_id', existing_type=sa.INTEGER(), nullable=False) op.drop_constraint(u'container_states_pod_id_fkey', 'container_states', type_='foreignkey') op.drop_column('container_states', 'pod_id')
def get_entity(self): """returns one Status instance """ sql = """ select "Statuses".code from "Statuses" where "Statuses".id = :id """ from sqlalchemy import text from stalker.db.session import DBSession conn = DBSession.connection() result = conn.execute(text(sql), id=self.entity_id) data = { 'code': result.fetchone()[0] } response = super(StatusViews, self).get_entity() return self.update_response_data(response, data)
def get_entity(self): """returns the DateRangeMixin portion of this mixed-in class data """ sql = """ select (extract(epoch from entity_table.start::timestamp AT TIME ZONE 'UTC') * 1000)::bigint as start, (extract(epoch from entity_table.end::timestamp AT TIME ZONE 'UTC') * 1000)::bigint as end from "%s" as entity_table where entity_table.id = :id """ % self.entity.__tablename__ from sqlalchemy import text from stalker.db.session import DBSession conn = DBSession.connection() r = conn.execute(text(sql), id=self.entity_id).fetchone() data = {} if r: data = { 'start': r[0], 'end': r[1], } return data
def _literal_as_text(element, warn=False): if isinstance(element, Visitable): return element elif hasattr(element, '__clause_element__'): return element.__clause_element__() elif isinstance(element, util.string_types): if warn: util.warn_limited( "Textual SQL expression %(expr)r should be " "explicitly declared as text(%(expr)r)", {"expr": util.ellipses_string(element)}) return TextClause(util.text_type(element)) elif isinstance(element, (util.NoneType, bool)): return _const_expr(element) else: raise exc.ArgumentError( "SQL expression object or string expected, got object of type %r " "instead" % type(element) )
def _exec(self, construct, execution_options=None, multiparams=(), params=util.immutabledict()): if isinstance(construct, string_types): construct = text(construct) if self.as_sql: if multiparams or params: # TODO: coverage raise Exception("Execution arguments not allowed with as_sql") if self.literal_binds and not isinstance( construct, schema.DDLElement): compile_kw = dict(compile_kwargs={"literal_binds": True}) else: compile_kw = {} self.static_output(text_type( construct.compile(dialect=self.dialect, **compile_kw) ).replace("\t", " ").strip() + self.command_terminator) else: conn = self.connection if execution_options: conn = conn.execution_options(**execution_options) return conn.execute(construct, *multiparams, **params)
def active_users_week(): """Create or update active users last week materialized view.""" if _exists_materialized_view('dashboard_week_users'): return _refresh_materialized_view('dashboard_week_users') else: sql = text('''CREATE MATERIALIZED VIEW dashboard_week_users AS WITH crafters_per_day AS (select to_date(task_run.finish_time, 'YYYY-MM-DD\THH24:MI:SS.US') AS day, user_id, COUNT(task_run.user_id) AS day_crafters FROM task_run WHERE to_date(task_run.finish_time, 'YYYY-MM-DD\THH24:MI:SS.US') >= NOW() - ('1 week'):: INTERVAL GROUP BY day, task_run.user_id) SELECT day, COUNT(crafters_per_day.user_id) AS n_users FROM crafters_per_day GROUP BY day ORDER BY day;''') db.session.execute(sql) db.session.commit() return "Materialized view created"
def active_anon_week(): """Create or update active anon last week materialized view.""" if _exists_materialized_view('dashboard_week_anon'): return _refresh_materialized_view('dashboard_week_anon') else: sql = text('''CREATE MATERIALIZED VIEW dashboard_week_anon AS WITH crafters_per_day AS (select to_date(task_run.finish_time, 'YYYY-MM-DD\THH24:MI:SS.US') AS day, user_ip, COUNT(task_run.user_ip) AS day_crafters FROM task_run WHERE to_date(task_run.finish_time, 'YYYY-MM-DD\THH24:MI:SS.US') >= NOW() - ('1 week'):: INTERVAL GROUP BY day, task_run.user_ip) SELECT day, COUNT(crafters_per_day.user_ip) AS n_users FROM crafters_per_day GROUP BY day ORDER BY day;''') db.session.execute(sql) db.session.commit() return "Materialized view created"
def draft_projects_week(): """Create or update new created draft projects last week materialized view.""" if _exists_materialized_view('dashboard_week_project_draft'): return _refresh_materialized_view('dashboard_week_project_draft') else: sql = text('''CREATE MATERIALIZED VIEW dashboard_week_project_draft AS SELECT TO_DATE(project.created, 'YYYY-MM-DD\THH24:MI:SS.US') AS day, project.id, short_name, project.name, owner_id, "user".name AS u_name, "user".email_addr FROM project, "user" WHERE TO_DATE(project.created, 'YYYY-MM-DD\THH24:MI:SS.US') >= now() - ('1 week')::INTERVAL AND "user".id = project.owner_id AND project.published = false GROUP BY project.id, "user".name, "user".email_addr;''') db.session.execute(sql) db.session.commit() return "Materialized view created"
def update_projects_week(): """Create or update updated projects last week materialized view.""" if _exists_materialized_view('dashboard_week_project_update'): return _refresh_materialized_view('dashboard_week_project_update') else: sql = text('''CREATE MATERIALIZED VIEW dashboard_week_project_update AS SELECT TO_DATE(project.updated, 'YYYY-MM-DD\THH24:MI:SS.US') AS day, project.id, short_name, project.name, owner_id, "user".name AS u_name, "user".email_addr FROM project, "user" WHERE TO_DATE(project.updated, 'YYYY-MM-DD\THH24:MI:SS.US') >= now() - ('1 week')::INTERVAL AND "user".id = project.owner_id GROUP BY project.id, "user".name, "user".email_addr;''') db.session.execute(sql) db.session.commit() return "Materialized view created"
def new_tasks_week(): """Create or update new tasks last week materialized view.""" if _exists_materialized_view('dashboard_week_new_task'): return _refresh_materialized_view('dashboard_week_new_task') else: sql = text('''CREATE MATERIALIZED VIEW dashboard_week_new_task AS SELECT TO_DATE(task.created, 'YYYY-MM-DD\THH24:MI:SS.US') AS day, COUNT(task.id) AS day_tasks FROM task WHERE TO_DATE(task.created, 'YYYY-MM-DD\THH24:MI:SS.US') >= now() - ('1 week'):: INTERVAL GROUP BY day ORDER BY day ASC;''') db.session.execute(sql) db.session.commit() return "Materialized view created"
def new_task_runs_week(): """Create or update new task_runs last week materialized view.""" if _exists_materialized_view('dashboard_week_new_task_run'): return _refresh_materialized_view('dashboard_week_new_task_run') else: sql = text('''CREATE MATERIALIZED VIEW dashboard_week_new_task_run AS SELECT TO_DATE(task_run.finish_time, 'YYYY-MM-DD\THH24:MI:SS.US') AS day, COUNT(task_run.id) AS day_task_runs FROM task_run WHERE TO_DATE(task_run.finish_time, 'YYYY-MM-DD\THH24:MI:SS.US') >= now() - ('1 week'):: INTERVAL GROUP BY day;''') db.session.execute(sql) db.session.commit() return "Materialized view created"
def new_users_week(): """Create or update new users last week materialized view.""" if _exists_materialized_view('dashboard_week_new_users'): return _refresh_materialized_view('dashboard_week_new_users') else: sql = text('''CREATE MATERIALIZED VIEW dashboard_week_new_users AS SELECT TO_DATE("user".created, 'YYYY-MM-DD\THH24:MI:SS.US') AS day, COUNT("user".id) AS day_users FROM "user" WHERE TO_DATE("user".created, 'YYYY-MM-DD\THH24:MI:SS.US') >= now() - ('1 week'):: INTERVAL GROUP BY day;''') db.session.execute(sql) db.session.commit() return "Materialized view created"
def returning_users_week(): """Create or update returning users last week materialized view.""" if _exists_materialized_view('dashboard_week_returning_users'): return _refresh_materialized_view('dashboard_week_returning_users') else: sql = text('''CREATE MATERIALIZED VIEW dashboard_week_returning_users AS WITH data AS ( SELECT user_id, TO_DATE(task_run.finish_time, 'YYYY-MM-DD\THH24:MI:SS.US') AS day FROM task_run WHERE TO_DATE(task_run.finish_time, 'YYYY-MM-DD\THH24:MI:SS.US') >= NOW() - ('1 week')::INTERVAL GROUP BY day, task_run.user_id) SELECT user_id, COUNT(user_id) AS n_days FROM data GROUP BY user_id HAVING(count(user_id) > 1) ORDER by n_days; ''') db.session.execute(sql) db.session.commit() return "Materialized view created"
def update_tasks_redundancy(self, project, n_answer): """update the n_answer of every task from a project and their state. Use raw SQL for performance""" sql = text(''' UPDATE task SET n_answers=:n_answers, state='ongoing' WHERE project_id=:project_id''') self.db.session.execute(sql, dict(n_answers=n_answer, project_id=project.id)) # Update task.state according to their new n_answers value sql = text(''' WITH project_tasks AS ( SELECT task.id, task.n_answers, COUNT(task_run.id) AS n_task_runs, task.state FROM task, task_run WHERE task_run.task_id=task.id AND task.project_id=:project_id GROUP BY task.id) UPDATE task SET state='completed' FROM project_tasks WHERE (project_tasks.n_task_runs >=:n_answers) and project_tasks.id=task.id ''') self.db.session.execute(sql, dict(n_answers=n_answer, project_id=project.id)) self.db.session.commit() cached_projects.clean_project(project.id)
def _build_query(self, engine, session, query_items): query = session.query(*query_items) if self._query is not None: if isinstance(self._query, types.StringTypes): query = query.filter(text(self._query)) else: query = query.filter(self._query) if self._order_by is not None: query = query.order_by(self._order_by) if self._group_by is not None: if isinstance(self._group_by, types.StringTypes): self._group_by = self.automap(engine, self._group_by) query = query.group_by(self._group_by) if self._params is not None: query = query.params(**self._params) return query
def _render_table_html(table, metadata, show_indexes, show_datatypes): def format_col_type(col): try: return col.type.get_col_spec() except (AttributeError, NotImplementedError): return str(col.type) def format_col_str(col): if show_datatypes: return "- %s : %s" % (col.name, format_col_type(col)) else: return "- %s" % col.name html = '<<TABLE BORDER="1" CELLBORDER="0" CELLSPACING="0"><TR><TD ALIGN="CENTER">%s</TD></TR><TR><TD BORDER="1" CELLPADDING="0"></TD></TR>' % table.name html += ''.join('<TR><TD ALIGN="LEFT" PORT="%s">%s</TD></TR>' % (col.name, format_col_str(col)) for col in table.columns) if metadata.bind and isinstance(metadata.bind.dialect, PGDialect): # postgres engine doesn't reflect indexes indexes = dict((name,defin) for name,defin in metadata.bind.execute(text("SELECT indexname, indexdef FROM pg_indexes WHERE tablename = '%s'" % table.name))) if indexes and show_indexes: html += '<TR><TD BORDER="1" CELLPADDING="0"></TD></TR>' for index, defin in list(indexes.items()): ilabel = 'UNIQUE' in defin and 'UNIQUE ' or 'INDEX ' ilabel += defin[defin.index('('):] html += '<TR><TD ALIGN="LEFT">%s</TD></TR>' % ilabel html += '</TABLE>>' return html
def _query_class(self, q): _self = self class Query(q): def whoosh_search(self, query, fields=None, limit=None, or_=False): logger.warning( 'whoosh_search has been replaced by msearch.please use msearch' ) return self.msearch(query, fields, limit, or_) def msearch(self, query, fields=None, limit=None, or_=False): model = self._mapper_zero().class_ results = _self.msearch(model, query, fields, limit, or_) if not results: return self.filter(sqlalchemy.text('null')) result_set = set() for i in results: result_set.add(i[DEFAULT_PRIMARY_KEY]) return self.filter( getattr(model, DEFAULT_PRIMARY_KEY).in_(result_set)) return Query
def query(sql, **kwargs): ti = time.perf_counter() _query = text(sql).execution_options(autocommit=False) try: res = conn.execute(_query, **kwargs) ms = int((time.perf_counter() - ti) * 1000) QueryStats.log(sql, ms) if ms > 100: disp = re.sub('\s+', ' ', sql).strip()[:250] print("\033[93m[SQL][{}ms] {}\033[0m".format(ms, disp)) logger.debug(res) return res except Exception as e: print("[SQL] Error in query {} ({})".format(sql, kwargs)) conn.close() logger.exception(e) raise e # n*m
def raw_query(query, **kwargs): """Execute a raw query against the database.""" # Cast all the strings that represent integers to integers because type # matters when using ``bindparams``. for key, val in kwargs.items(): if key.endswith('_id'): try: val = int(val) kwargs[key] = val except ValueError: pass stmt = text(query) stmt = stmt.bindparams(**kwargs) return get_engine().execute(stmt)
def revision_delete_all(): """Delete all revisions and resets primary key index back to 1 for each table in the database. .. warning:: Effectively purges all data from database. :param session: Database session object. :returns: None """ engine = get_engine() if engine.name == 'postgresql': # NOTE(fmontei): While cascade should delete all data from all tables, # we also need to reset the index to 1 for each table. for table in ['buckets', 'revisions', 'revision_tags', 'documents', 'validations']: engine.execute( text("TRUNCATE TABLE %s RESTART IDENTITY CASCADE;" % table) .execution_options(autocommit=True)) else: raw_query("DELETE FROM revisions;")
def get_most_frequently_keywords(cls, limit=10): """ ????????? n ???? :param limit: ???????? :return: """ sql = text("""SELECT keyword.id, keyword.name, COUNT(*) AS count FROM keyword, job_keyword WHERE keyword.id = job_keyword.keyword_id GROUP BY keyword.id, keyword.name ORDER BY count DESC LIMIT :limit_count""") query = cls.session.execute(sql, {'limit_count': limit}) result = query.fetchall() keywords = [] for row in result: keyword = KeywordModel(id=row[0], name=row[1]) keyword.times = row[2] keywords.append(keyword) return keywords
def _exec(self, construct, execution_options=None, multiparams=(), params=util.immutabledict()): if isinstance(construct, string_types): construct = text(construct) if self.as_sql: if multiparams or params: # TODO: coverage raise Exception("Execution arguments not allowed with as_sql") self.static_output(text_type( construct.compile(dialect=self.dialect) ).replace("\t", " ").strip() + self.command_terminator) else: conn = self.connection if execution_options: conn = conn.execution_options(**execution_options) conn.execute(construct, *multiparams, **params)
def upgrade(): op.alter_column('kubes', 'name', existing_type=sa.VARCHAR(length=64), nullable=False) op.create_index('one_default', 'kubes', ['is_default'], unique=True, postgresql_where=sa.text(u'kubes.is_default IS true')) op.drop_constraint(u'kubes_is_default_key', 'kubes', type_='unique') op.alter_column('packages', 'name', existing_type=sa.VARCHAR(length=64), nullable=False) op.alter_column('packages', 'prefix', existing_type=sa.VARCHAR(), nullable=False) op.alter_column('packages', 'suffix', existing_type=sa.VARCHAR(), nullable=False) session = Session(bind=op.get_bind()) session.query(PackageKube).filter(sa.or_( PackageKube.package_id.is_(None), PackageKube.kube_id.is_(None), )).delete() session.commit() op.alter_column('package_kube', 'kube_id', existing_type=sa.INTEGER(), nullable=False) op.alter_column('package_kube', 'package_id', existing_type=sa.INTEGER(), nullable=False)
def member_roles_list(context, data_dict): '''Return the possible roles for members of groups and organizations. :param group_type: the group type, either ``"group"`` or ``"organization"`` (optional, default ``"organization"``) :type id: string :returns: a list of dictionaries each with two keys: ``"text"`` (the display name of the role, e.g. ``"Admin"``) and ``"value"`` (the internal name of the role, e.g. ``"admin"``) :rtype: list of dictionaries ''' group_type = data_dict.get('group_type', 'organization') roles_list = authz.roles_list() if group_type == 'group': roles_list = [role for role in roles_list if role['value'] != 'editor'] _check_access('member_roles_list', context, data_dict) return roles_list
def search(self, query, limit=None, fields=None, or_=False): """ Perform a woosh index search. """ if not isinstance(query, str): raise WhooshAlchemyError('query parameter must be string-like') results = self._searcher(query, limit, fields, or_) if not results: return self.filter(sqlalchemy.text('null')) result_set = set() result_rank = {} for rank, result in enumerate(results): result_set.add(result) result_rank[result] = rank f = self.filter(getattr(self._model_cls, self._pk).in_(result_set)) f._whoosh_results = result_rank return f
def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.create_table('user', sa.Column('id', sqlalchemy_utils.types.uuid.UUIDType(), server_default=sa.text('uuid_generate_v4()'), nullable=False), sa.Column('username', sa.Unicode(length=255), nullable=False), sa.Column('password', sqlalchemy_utils.types.password.PasswordType(), nullable=False), sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), sa.Column('is_superuser', sa.Boolean(), server_default='FALSE', nullable=False), sa.PrimaryKeyConstraint('id') ) op.create_index(op.f('ix_user_username'), 'user', ['username'], unique=True) op.create_table('user_problem', sa.Column('id', sqlalchemy_utils.types.uuid.UUIDType(), server_default=sa.text('uuid_generate_v4()'), nullable=False), sa.Column('problem_id', sqlalchemy_utils.types.uuid.UUIDType(), nullable=False), sa.Column('user_id', sqlalchemy_utils.types.uuid.UUIDType(), nullable=False), sa.ForeignKeyConstraint(['problem_id'], ['problem.id'], ), sa.ForeignKeyConstraint(['user_id'], ['user.id'], ), sa.PrimaryKeyConstraint('id') ) op.create_index(op.f('ix_user_problem_user_id'), 'user_problem', ['user_id'], unique=False) # ### end Alembic commands ###
def upgrade(): ### commands auto generated by Alembic - please adjust! ### op.create_table('comments', sa.Column('id', sa.Integer(), nullable=False), sa.Column('body', sa.Text(), nullable=True), sa.Column('body_html', sa.Text(), nullable=True), sa.Column('timestamp', sa.DateTime(), nullable=True), sa.Column('disabled', sa.Boolean(), nullable=True), sa.Column('author_id', sa.Integer(), nullable=True), sa.Column('post_id', sa.Integer(), nullable=True), sa.ForeignKeyConstraint(['author_id'], ['users.id'], ), sa.ForeignKeyConstraint(['post_id'], ['posts.id'], ), sa.PrimaryKeyConstraint('id') ) op.create_index(op.f('ix_comments_timestamp'), 'comments', ['timestamp'], unique=False) op.alter_column(u'statistic_visitor', 'referred', existing_type=mysql.VARCHAR(collation=u'utf8_unicode_ci', length=128), nullable=True, existing_server_default=sa.text(u"''")) ### end Alembic commands ###
def upgrade(): ### commands auto generated by Alembic - please adjust! ### op.create_table('machine_statistic', sa.Column('id', sa.Integer(), nullable=False), sa.Column('type', sa.String(length=64), nullable=True), sa.Column('timestamp', sa.String(length=64), nullable=True), sa.Column('userid', sa.String(length=64), nullable=True), sa.Column('netcode', sa.String(length=64), nullable=True), sa.Column('login_time', sa.DateTime(), nullable=True), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('id') ) op.create_index(op.f('ix_machine_statistic_login_time'), 'machine_statistic', ['login_time'], unique=False) op.alter_column(u'statistic_visitor', 'referred', existing_type=mysql.VARCHAR(collation=u'utf8_unicode_ci', length=128), nullable=True, existing_server_default=sa.text(u"''")) ### end Alembic commands ###
def downgrade(): # ### commands auto generated by Alembic - please adjust! ### op.drop_constraint(None, 'mailing_list', type_='unique') op.drop_column('mailing_list', 'url') op.create_table('email', sa.Column('id', sa.INTEGER(), server_default=sa.text(u"nextval('email_id_seq'::regclass)"), nullable=False), sa.Column('value', sa.VARCHAR(length=120), autoincrement=False, nullable=True), sa.PrimaryKeyConstraint('id', name=u'email_pkey'), sa.UniqueConstraint('value', name=u'email_value_key'), postgresql_ignore_search_path=False ) op.create_table('email_mailing_list', sa.Column('mailing_list_id', sa.INTEGER(), autoincrement=False, nullable=True), sa.Column('email_id', sa.INTEGER(), autoincrement=False, nullable=True), sa.ForeignKeyConstraint(['email_id'], [u'email.id'], name=u'email_mailing_list_email_id_fkey'), sa.ForeignKeyConstraint(['mailing_list_id'], [u'mailing_list.id'], name=u'email_mailing_list_mailing_list_id_fkey') ) # ### end Alembic commands ###
def test_pk_default(self): Table( 'simple_items', self.metadata, Column('id', INTEGER, primary_key=True, server_default=text('uuid_generate_v4()')) ) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import Column, Integer, text from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() metadata = Base.metadata class SimpleItem(Base): __tablename__ = 'simple_items' id = Column(Integer, primary_key=True, server_default=text("uuid_generate_v4()")) """
def relations(self, schema): sql = u""" SELECT KCU.TABLE_NAME, COLUMN_NAME, KCU.CONSTRAINT_NAME, KCU.REFERENCED_TABLE_NAME, KCU.REFERENCED_COLUMN_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE KCU LEFT JOIN INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS RC ON KCU.CONSTRAINT_NAME = RC.CONSTRAINT_NAME WHERE RC.CONSTRAINT_SCHEMA = :schema """ rows = self.engine.execute(text(sql), schema=schema) result = [] for row in rows: result.append((row.TABLE_NAME, row.REFERENCED_TABLE_NAME)) return result
def create_view(self, name, sql): sql_ = """ IF exists(SELECT * FROM sysobjects WHERE name = '{name}') BEGIN DROP VIEW {name} END """.format(name=name) # because of sqlalchemy can't detect this is not a simple query, so autocommit must be True self.engine.execute(text(sql_).execution_options(autocommit=True)) sql_ = """ CREATE VIEW {name} AS ({sql}) """.format(name=name, sql=sql) self.engine.execute(sql_)
def get_entity(self): """returns one ImageFormat instance data as json """ sql = """select "ImageFormats".id, "ImageFormats".width, "ImageFormats".height, "ImageFormats".pixel_aspect, "ImageFormats".print_resolution from "ImageFormats" where "ImageFormats".id = :id """ from stalker.db.session import DBSession conn = DBSession.connection() from sqlalchemy import text result = conn.execute(text(sql), id=self.entity_id) r = result.fetchone() data = { 'id': r[0], 'width': r[1], 'height': r[2], 'pixel_aspect': r[3], 'print_resolution': r[4], } response = super(ImageFormatViews, self).get_entity() return self.update_response_data(response, data)
def get_entity(self): """return one StatusList instance as json """ sql = """select "StatusLists".target_entity_type from "StatusLists" where "StatusLists".id = :id """ from sqlalchemy import text from stalker.db.session import DBSession conn = DBSession.connection() result = conn.execute(text(sql), id=self.entity_id) r = result.fetchone() # get statuses count from stalker.models.status import StatusList_Statuses statuses_count = DBSession.query(StatusList_Statuses.c.status_id)\ .filter(StatusList_Statuses.c.status_list_id == self.entity_id)\ .count() from stalker_pyramid import entity_type_to_url data = { 'statuses': { '$ref': '%s/%s/statuses' % (entity_type_to_url['StatusList'], self.entity_id), 'length': statuses_count }, 'target_entity_type': r[0] } # update with super data response = super(StatusListViews, self).get_entity() return self.update_response_data(response, data)
def get_entity(self): """return one Group instance data as JSON """ # get supers response response = super(GroupViews, self).get_entity() # get entity type entity_type = response.json_body['entity_type'] # get user count from stalker.db.session import DBSession from stalker.models.auth import Group_Users user_count = DBSession.query(Group_Users.c.uid)\ .filter(Group_Users.c.gid == self.entity_id)\ .count() # get permission count sql = """select "Permissions".access || '_' || "Permissions".action || '_' || "Permissions".class_name from "Group_Permissions" join "Permissions" on "Group_Permissions".permission_id = "Permissions".id where "Group_Permissions".group_id = :id """ from sqlalchemy import text conn = DBSession.connection() permissions = conn.execute(text(sql), id=self.entity_id).fetchall() # prepare data from stalker_pyramid import entity_type_to_url data = { 'permissions': [r[0] for r in permissions], 'users': { '$ref': '%s/%s/users' % (entity_type_to_url[entity_type], self.entity_id), 'length': user_count } } # update supers response with our data and return return self.update_response_data(response, data)
def get_entity(self): """returns the ReferenceMixin portion of this mixed-in class data """ # get references count sql = """select er.link_id from "%s_References" as er where er.%s_id = :id """ % ( self.entity.entity_type, self.entity.entity_type.lower() ) from stalker.db.session import DBSession conn = DBSession.connection() from sqlalchemy import text r = conn.execute(text(sql), id=self.entity_id).fetchone() reference_count = r[0] if r else 0 from stalker_pyramid import entity_type_to_url data = { 'references': { '$ref': '%s/%s/references' % ( entity_type_to_url[self.entity.entity_type], self.entity_id ), 'length': reference_count }, } return data
def get_references(self): """returns the ReferenceMixin portion of this mixed-in class data """ sql = """select entity_references.link_id, "SimpleEntities".name, "SimpleEntities".entity_type from "%s_References" as entity_references join "SimpleEntities" on entity_references.link_id = "SimpleEntities".id where entity_references.%s_id = :id """ % ( self.entity.__class__.__name__, self.entity.__class__.__name__.lower() ) from sqlalchemy import text from stalker.db.session import DBSession conn = DBSession.connection() result = conn.execute(text(sql), id=self.entity_id) from stalker_pyramid import entity_type_to_url data = [ { 'id': r[0], 'name': r[1], 'entity_type': r[2], '$ref': '%s/%s' % (entity_type_to_url[r[2]], r[0]) } for r in result.fetchall() ] return data
def get_entity(self): """returns stalker.models.vacation.Vacation instance """ sql = """ select "Vacations".id, (extract(epoch from "Vacations".start::timestamp at time zone 'UTC') * 1000)::bigint as start, (extract(epoch from "Vacations".end::timestamp at time zone 'UTC') * 1000)::bigint as end, "Vacations".user_id, "User_SimpleEntities".name, "User_SimpleEntities".entity_type from "Vacations" left outer join "SimpleEntities" as "User_SimpleEntities" on "Vacations".user_id = "User_SimpleEntities".id where "Vacations".id = :id """ from stalker.db.session import DBSession conn = DBSession.connection() from sqlalchemy import text result = conn.execute(text(sql), id=self.entity_id) r = result.fetchone() from stalker_pyramid import entity_type_to_url data = { 'id': r[0], 'start': r[1], 'end': r[2], 'user': { 'id': r[3], '$ref': '%s/%s' % (entity_type_to_url[r[5]], r[3]), 'name': r[4], 'entity_type': r[5] } } response = super(VacationViews, self).get_entity() return self.update_response_data(response, data)
def get_references(self): """returns the references of this project """ sql = """select rse.id, rse.name, rse.entity_type from "Project_References" as pr join "SimpleEntities" as rse on pr.link_id = rse.id where pr.project_id = :id union select rse.id, rse.name, rse.entity_type from "Task_References" as tr join "Tasks" as t on tr.task_id = t.id join "SimpleEntities" as rse on tr.link_id = rse.id where t.project_id = :id """ from stalker.db.session import DBSession from sqlalchemy import text conn = DBSession.connection() result = conn.execute(text(sql), id=self.entity_id).fetchall() from stalker_pyramid import entity_type_to_url project_ref_data = [ { 'id': r[0], 'name': r[1], 'entity_type': r[2], '$ref': '%s/%s' % (entity_type_to_url[r[2]], r[0]) } for r in result ] from pyramid.response import Response return Response(json_body=project_ref_data, status=200)
def get_entity(self): """returns one TimeLog instance as JSON """ sql = """ select "TimeLogs".resource_id, "SimpleEntities".name, "SimpleEntities".entity_type from "TimeLogs" left join "SimpleEntities" on "TimeLogs".resource_id = "SimpleEntities".id where "TimeLogs".id = :id """ from sqlalchemy import text from stalker.db.session import DBSession conn = DBSession.connection() r = conn.execute(text(sql), id=self.entity_id).fetchone() from stalker_pyramid import entity_type_to_url data = { 'resource': { 'id': r[0], 'name': r[1], 'entity_type': r[2], '$ref': '%s/%s' % (entity_type_to_url[r[2]], r[0]), } if r else None, } data.update(DateRangeMixinViews.get_entity(self)) entity_response = super(TimeLogViews, self).get_entity() return self.update_response_data(entity_response, data)
def get_vacations(self): """returns user vacations """ sql = """ select "Vacations".id, "SimpleEntities".name, "SimpleEntities".entity_type from "Vacations" join "SimpleEntities" on "Vacations".id = "SimpleEntities".id where "Vacations".user_id = :id """ from stalker.db.session import DBSession conn = DBSession.connection() from sqlalchemy import text result = conn.execute(text(sql), id=self.entity_id) from stalker_pyramid import entity_type_to_url data = [{ 'id': r[0], '$ref': '%s/%s' % (entity_type_to_url[r[2]], r[0]), 'name': r[1], 'entity_type': r[2] } for r in result.fetchall()] from pyramid.response import Response return Response( json_body=data, status=200 ) # User <-> Task
def get_time_logs(self): """returns time logs of the user """ sql = """ select "TimeLogs".id, "SimpleEntities".name, "SimpleEntities".entity_type from "TimeLogs" join "SimpleEntities" on "TimeLogs".id = "SimpleEntities".id where "TimeLogs".resource_id = :id """ from sqlalchemy import text from stalker.db.session import DBSession conn = DBSession.connection() result = conn.execute(text(sql), id=self.entity_id).fetchall() from stalker_pyramid import entity_type_to_url data = [ { 'id': r[0], 'name': r[1], 'entity_type': r[2], '$ref': '%s/%s' % (entity_type_to_url[r[2]], r[0]) } for r in result ] from pyramid.response import Response return Response( json_body=data, status=200 )
def get_from_hint_text(self, table, text): return text
def visit_primary_key_constraint(self, constraint): text = super(MySQLDDLCompiler, self).\ visit_primary_key_constraint(constraint) using = constraint.dialect_options['mysql']['using'] if using: text += " USING %s" % (self.preparer.quote(using)) return text