我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用sqlalchemy.sql.expression.select()。
def get_selects(self): """ Constructs select queries for this aggregation Returns: a dictionary of group : queries pairs where group are the same keys as groups queries is a list of Select queries, one for each date in dates """ queries = {} for group, groupby in self.groups.items(): columns = [groupby] columns += self._get_aggregates_sql(group) gb_clause = make_sql_clause(groupby, ex.literal_column) query = ex.select(columns=columns, from_obj=self.from_obj)\ .group_by(gb_clause) queries[group] = [query] return queries
def get_creates(self): """ Construct create queries for this aggregation Args: selects: the dictionary of select queries to use if None, use self.get_selects() this allows you to customize select queries before creation Returns: a dictionary of group : create pairs where group are the same keys as groups create is a CreateTableAs object """ return {group: CreateTableAs(self.get_table_name(group), next(iter(sels)).limit(0)) for group, sels in self.get_selects().items()}
def get_join_table(self): """ Generates a join table, consisting of an entry for each combination of groups and dates in the from_obj """ groups = list(self.groups.values()) intervals = list(set(chain(*self.intervals.values()))) queries = [] for date in self.dates: columns = groups + [ex.literal_column("'%s'::date" % date).label( self.output_date_column)] queries.append(ex.select(columns, from_obj=self.from_obj) .where(self.where(date, intervals)) .group_by(*groups)) return str.join("\nUNION ALL\n", map(str, queries))
def seed_identifier(cls): # @NoSelf '''returns data_identifier if the latter is not None, else net.sta.loc.cha by querying the relative channel and station''' # Needed note: To know what we are doing in 'sel' below, please look: # http://docs.sqlalchemy.org/en/latest/orm/extensions/hybrid.html#correlated-subquery-relationship-hybrid # Notes # - we use limit(1) cause we might get more than one # result. Regardless of why it happens (because we don't join or apply a distinct?) # it is relevant for us to get the first result which has the requested # network+station and location + channel strings # - the label(...) at the end makes all the difference. The doc is, as always, unclear # http://docs.sqlalchemy.org/en/latest/core/sqlelement.html#sqlalchemy.sql.expression.label dot = text("'.'") sel = select([concat(Station.network, dot, Station.station, dot, Channel.location, dot, Channel.channel)]).\ where((Channel.id == cls.channel_id) & (Station.id == Channel.station_id)).limit(1).\ label('seedidentifier') return case([(cls.data_identifier.isnot(None), cls.data_identifier)], else_=sel)
def get_inserts(self): """ Construct insert queries from this aggregation Args: selects: the dictionary of select queries to use if None, use self.get_selects() this allows you to customize select queries before creation Returns: a dictionary of group : inserts pairs where group are the same keys as groups inserts is a list of InsertFromSelect objects """ return {group: [InsertFromSelect(self.get_table_name(group), sel) for sel in sels] for group, sels in self.get_selects().items()}
def get_selects(self): """ Constructs select queries for this aggregation Returns: a dictionary of group : queries pairs where group are the same keys as groups queries is a list of Select queries, one for each date in dates """ queries = {} for group, groupby in self.groups.items(): intervals = self.intervals[group] queries[group] = [] for date in self.dates: columns = [groupby, ex.literal_column("'%s'::date" % date).label(self.output_date_column)] columns += list(chain(*[self._get_aggregates_sql( i, date, group) for i in intervals])) gb_clause = make_sql_clause(groupby, ex.literal_column) query = ex.select(columns=columns, from_obj=self.from_obj)\ .group_by(gb_clause) query = query.where(self.where(date, intervals)) queries[group].append(query) return queries
def _loadAttributes(self): for row in self._connection.execute(ex.select([md.InventoryClasses.c.class_namespace, md.InventoryClasses.c.class_name, md.InventoryClassAttributes]).select_from(ex.join(md.InventoryClassAttributes, md.InventoryClasses, md.InventoryClassAttributes.c.class_id == md.InventoryClasses.c.class_id)).where(and_(md.InventoryClasses.c.class_namespace == self._namespace, md.InventoryClasses.c.class_name == self._class_name))): self._classId = row["class_id"] self._attributes[row["attr_key"]] = {} for i in ["attr_name", "attr_type", "attr_default", "attr_mandatory"]: self._attributes[row["attr_key"]][i] = row[i]
def getObjectIdByName(self, object_name, object_subname=None): andList = [ md.InventoryObjects.c.class_id == self._classId, md.InventoryObjects.c.object_name == object_name ] if not object_subname is None: andList.append(md.InventoryObjects.c.object_subname == object_subname) object_id = None i = 0 for row in self._connection.execute(md.InventoryObjects.select().where(and_(*andList))): i = i + 1 object_id = row["object_id"] if i > 1: raise LookupException("Too many objects were found") if i == 0: raise EmptyLookupException("No objects were found") return object_id
def search(self, object_id=None, object_name=None, object_subname=None, **kwargs): andList = [ md.InventoryObjects.c.class_id == self._classId ] if not object_id is None: andList.append(md.InventoryObjects.c.object_id == object_id) if not object_name is None: andList.append(md.InventoryObjects.c.object_name.like(object_name)) if not object_subname is None: andList.append(md.InventoryObjects.c.object_subname.like(object_subname)) # append attributes subqueries for k in kwargs: if k in self._attributes: andList.append(md.InventoryObjects.c.object_id.in_( ex.select([md.InventoryObjectAttributes.c.object_id]).select_from(md.InventoryObjectAttributes).where(and_( md.InventoryObjectAttributes.c.class_id == self._classId, md.InventoryObjectAttributes.c.attr_key == k, md.InventoryObjectAttributes.c.attr_value.like(kwargs[k]) )) )) data = [] for row in self._connection.execute(md.InventoryObjects.select().where(and_(*andList))): data.append({ "object_id": row["object_id"], self._objectName : row["object_name"], self._objectSubName : row["object_subname"] }) return data
def attributeExists(self, object_id, attribute_name): assert not (object_id is None), "At least one identifier must be set" for count in self._connection.execute(ex.select([func.count()]).select_from(md.InventoryObjectAttributes).where(and_(md.InventoryObjectAttributes.c.class_id == self._classId, md.InventoryObjectAttributes.c.object_id == object_id, md.InventoryObjectAttributes.c.attr_key == attribute_name))): count = count[0] if count == 0: return False else: return True
def _lookupAttribtue(self, index): if isinstance(index, dict): if "object_id" in index: index["attributes"] = {} for row in self._connection.execute(md.InventoryObjectAttributes.select().where(and_(md.InventoryObjectAttributes.c.object_id == index["object_id"]))): index["attributes"][row["attr_key"]] = row["attr_value"] return index else: d = {} for row in self._connection.execute(md.InventoryObjectAttributes.select().where(and_(md.InventoryObjectAttributes.c.object_id == index))): d[row["attr_key"]] = row["attr_value"] return d
def get_database_engine(config: AttrDict) -> Engine: try: return config.DATABASE_ENGINE except AttributeError: db_url = config.DATABASE_URL echo = config.DATABASE_ECHO engine = create_engine(db_url, echo=echo) @listens_for(engine, 'engine_connect') def ping_connection(connection, branch): """ Disconnect handling http://docs.sqlalchemy.org/en/latest/core/ pooling.html#disconnect-handling-pessimistic """ if branch: return save_should_close_with_result = connection.should_close_with_result connection.should_close_with_result = False try: connection.scalar(select([1])) except DBAPIError as err: if err.connection_invalidated: connection.scalar(select([1])) else: raise finally: connection.should_close_with_result = \ save_should_close_with_result return engine
def get_account_verification_token(email=None, username=None): email = email and emailer.normalize_address(email) username = username and d.get_sysname(username) logincreate = d.meta.tables['logincreate'] statement = select([logincreate.c.token]) if email: statement = statement.where(logincreate.c.email.ilike(email)) else: statement = statement.where(logincreate.c.login_name == username) return d.engine.scalar(statement)
def _build_total_expressions(self, queryset, totals): mapper = inspect(self.objects_class) primary_keys = mapper.primary_key relationships = { 'aliases': {}, 'join_chains': [], 'prefix': 'totals_', } aggregates = [] group_cols = OrderedDict() group_by = [] group_limit = None for total in totals: for aggregate, columns in total.items(): if aggregate == self.AGGR_GROUPLIMIT: if not isinstance(columns, int): raise HTTPBadRequest('Invalid attribute', 'Group limit option requires an integer value') group_limit = columns continue if not columns: if aggregate == self.AGGR_GROUPBY: raise HTTPBadRequest('Invalid attribute', 'Group by option requires at least one column name') if len(primary_keys) > 1: aggregates.append(Function(aggregate, func.row(*primary_keys)).label(aggregate)) else: aggregates.append(Function(aggregate, *primary_keys).label(aggregate)) continue if not isinstance(columns, list): columns = [columns] for column in columns: expression = self._parse_tokens(self.objects_class, column.split('__'), None, relationships, lambda c, n, v: n) if expression is not None: if aggregate == self.AGGR_GROUPBY: group_cols[column] = expression.label(column) group_by.append(expression) else: aggregates.append(Function(aggregate, expression).label(aggregate)) agg_query = self._apply_joins(queryset, relationships, distinct=False) group_cols_expr = list(group_cols.values()) columns = group_cols_expr + aggregates if group_limit: row_order = list(map(lambda c: c.desc(), aggregates)) columns.append(func.row_number().over(partition_by=group_cols_expr[:-1], order_by=row_order).label('row_number')) order = ','.join(list(map(str, range(1, len(group_cols_expr) + 1))) + list(map(lambda c: str(c) + ' DESC', range(1 + len(group_cols_expr), len(aggregates) + len(group_cols_expr) + 1)))) agg_query = agg_query.statement.with_only_columns(columns).order_by(None).order_by(order) if group_by: agg_query = agg_query.group_by(*group_by) if group_limit: subquery = agg_query.alias() agg_query = select([subquery]).where(subquery.c.row_number <= group_limit) return agg_query, list(group_cols.keys())
def list_bad_replicas_status(state=BadFilesStatus.BAD, rse=None, younger_than=None, older_than=None, limit=None, list_pfns=False, session=None): """ List the bad file replicas history states. Method used by the rucio-ui. :param state: The state of the file (SUSPICIOUS or BAD). :param rse: The RSE name. :param younger_than: datetime object to select bad replicas younger than this date. :param older_than: datetime object to select bad replicas older than this date. :param limit: The maximum number of replicas returned. :param session: The database session in use. """ result = [] rse_id = None if rse: rse_id = get_rse_id(rse, session=session) query = session.query(models.BadReplicas.scope, models.BadReplicas.name, models.RSE.rse, models.BadReplicas.state, models.BadReplicas.created_at, models.BadReplicas.updated_at) if state: query = query.filter(models.BadReplicas.state == state) if rse_id: query = query.filter(models.BadReplicas.rse_id == rse_id) if younger_than: query = query.filter(models.BadReplicas.created_at >= younger_than) if older_than: query = query.filter(models.BadReplicas.created_at <= older_than) query = query.filter(models.RSE.id == models.BadReplicas.rse_id) if limit: query = query.limit(limit) for badfile in query.yield_per(1000): if list_pfns: result.append({'scope': badfile.scope, 'name': badfile.name, 'type': DIDType.FILE}) else: result.append({'scope': badfile.scope, 'name': badfile.name, 'rse': badfile.rse, 'state': badfile.state, 'created_at': badfile.created_at, 'updated_at': badfile.updated_at}) if list_pfns: reps = [] for rep in list_replicas(result, schemes=['srm', ], unavailable=False, request_id=None, ignore_availability=True, all_states=True, session=session): pfn = None if rse in rep['rses'] and rep['rses'][rse]: pfn = rep['rses'][rse][0] if pfn and pfn not in reps: reps.append(pfn) else: reps.extend([item for row in rep['rses'].values() for item in row]) list(set(reps)) result = reps return result
def list_unlocked_replicas(rse, limit, bytes=None, rse_id=None, worker_number=None, total_workers=None, delay_seconds=0, session=None): """ List RSE File replicas with no locks. :param rse: the rse name. :param bytes: the amount of needed bytes. :param session: The database session in use. :returns: a list of dictionary replica. """ if not rse_id: rse_id = get_rse_id(rse=rse, session=session) # filter(models.RSEFileAssociation.state != ReplicaState.BEING_DELETED).\ none_value = None # Hack to get pep8 happy... query = session.query(models.RSEFileAssociation.scope, models.RSEFileAssociation.name, models.RSEFileAssociation.path, models.RSEFileAssociation.bytes, models.RSEFileAssociation.tombstone, models.RSEFileAssociation.state).\ with_hint(models.RSEFileAssociation, "INDEX_RS_ASC(replicas REPLICAS_TOMBSTONE_IDX) NO_INDEX_FFS(replicas REPLICAS_TOMBSTONE_IDX)", 'oracle').\ filter(models.RSEFileAssociation.tombstone < datetime.utcnow()).\ filter(models.RSEFileAssociation.lock_cnt == 0).\ filter(case([(models.RSEFileAssociation.tombstone != none_value, models.RSEFileAssociation.rse_id), ]) == rse_id).\ filter(or_(models.RSEFileAssociation.state.in_((ReplicaState.AVAILABLE, ReplicaState.UNAVAILABLE, ReplicaState.BAD)), and_(models.RSEFileAssociation.state == ReplicaState.BEING_DELETED, models.RSEFileAssociation.updated_at < datetime.utcnow() - timedelta(seconds=delay_seconds)))).\ order_by(models.RSEFileAssociation.tombstone) # do no delete files used as sources stmt = exists(select([1]).prefix_with("/*+ INDEX(requests REQUESTS_SCOPE_NAME_RSE_IDX) */", dialect='oracle')).\ where(and_(models.RSEFileAssociation.scope == models.Request.scope, models.RSEFileAssociation.name == models.Request.name)) query = query.filter(not_(stmt)) if worker_number and total_workers and total_workers - 1 > 0: if session.bind.dialect.name == 'oracle': bindparams = [bindparam('worker_number', worker_number - 1), bindparam('total_workers', total_workers - 1)] query = query.filter(text('ORA_HASH(name, :total_workers) = :worker_number', bindparams=bindparams)) elif session.bind.dialect.name == 'mysql': query = query.filter(text('mod(md5(name), %s) = %s' % (total_workers - 1, worker_number - 1))) elif session.bind.dialect.name == 'postgresql': query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_workers - 1, worker_number - 1))) needed_space = bytes total_bytes, total_files = 0, 0 rows = [] for (scope, name, path, bytes, tombstone, state) in query.yield_per(1000): if state != ReplicaState.UNAVAILABLE: total_bytes += bytes if tombstone != OBSOLETE and needed_space is not None and total_bytes > needed_space: break total_files += 1 if total_files > limit: break rows.append({'scope': scope, 'name': name, 'path': path, 'bytes': bytes, 'tombstone': tombstone, 'state': state}) return rows
def list_quarantined_replicas(rse, limit, worker_number=None, total_workers=None, session=None): """ List RSE Quarantined File replicas. :param rse: the rse name. :param limit: The maximum number of replicas returned. :param worker_number: id of the executing worker. :param total_workers: Number of total workers. :param session: The database session in use. :returns: a list of dictionary replica. """ rse_id = get_rse_id(rse, session=session) query = session.query(models.QuarantinedReplica.path, models.QuarantinedReplica.bytes, models.QuarantinedReplica.scope, models.QuarantinedReplica.name, models.QuarantinedReplica.created_at).\ filter(models.QuarantinedReplica.rse_id == rse_id) # do no delete valid replicas stmt = exists(select([1]).prefix_with("/*+ index(REPLICAS REPLICAS_PK) */", dialect='oracle')).\ where(and_(models.RSEFileAssociation.scope == models.QuarantinedReplica.scope, models.RSEFileAssociation.name == models.QuarantinedReplica.name, models.RSEFileAssociation.rse_id == models.QuarantinedReplica.rse_id)) query = query.filter(not_(stmt)) if worker_number and total_workers and total_workers - 1 > 0: if session.bind.dialect.name == 'oracle': bindparams = [bindparam('worker_number', worker_number - 1), bindparam('total_workers', total_workers - 1)] query = query.filter(text('ORA_HASH(path, :total_workers) = :worker_number', bindparams=bindparams)) elif session.bind.dialect.name == 'mysql': query = query.filter('mod(md5(path), %s) = %s' % (total_workers - 1, worker_number - 1)) elif session.bind.dialect.name == 'postgresql': query = query.filter('mod(abs((\'x\'||md5(path))::bit(32)::int), %s) = %s' % (total_workers - 1, worker_number - 1)) return [{'path': path, 'rse': rse, 'rse_id': rse_id, 'created_at': created_at, 'scope': scope, 'name': name, 'bytes': bytes} for path, bytes, scope, name, created_at in query.limit(limit)]
def add_volatile_replicas(rse, replicas, session=None): """ Bulk add volatile replicas. :param rse: the rse name. :param replicas: the list of volatile replicas. :param session: The database session in use. :returns: True is successful. """ # first check that the rse is a volatile one try: rse_id = session.query(models.RSE.id).filter_by(rse=rse, volatile=True).one()[0] except NoResultFound: raise exception.UnsupportedOperation('No volatile rse found for %(rse)s !' % locals()) file_clause, replica_clause = [], [] for replica in replicas: file_clause.append(and_(models.DataIdentifier.scope == replica['scope'], models.DataIdentifier.name == replica['name'], ~exists(select([1]).prefix_with("/*+ INDEX(REPLICAS REPLICAS_PK) */", dialect='oracle')).where(and_(models.RSEFileAssociation.scope == replica['scope'], models.RSEFileAssociation.name == replica['name'], models.RSEFileAssociation.rse_id == rse_id)))) replica_clause.append(and_(models.RSEFileAssociation.scope == replica['scope'], models.RSEFileAssociation.name == replica['name'], models.RSEFileAssociation.rse_id == rse_id)) if replica_clause: now = datetime.utcnow() session.query(models.RSEFileAssociation).\ with_hint(models.RSEFileAssociation, "index(REPLICAS REPLICAS_PK)", 'oracle').\ filter(or_(*replica_clause)).\ update({'updated_at': now, 'tombstone': now}, synchronize_session=False) if file_clause: file_query = session.query(models.DataIdentifier.scope, models.DataIdentifier.name, models.DataIdentifier.bytes, models.DataIdentifier.md5, models.DataIdentifier.adler32).\ filter(or_(*file_clause)) session.bulk_insert_mappings( models.RSEFileAssociation, [{'rse_id': rse_id, 'adler32': adler32, 'state': ReplicaState.AVAILABLE, 'scope': scope, 'name': name, 'lock_cnt': 0, 'tombstone': datetime.utcnow(), 'bytes': bytes, 'md5': md5} for scope, name, bytes, md5, adler32 in file_query])
def list_new_dids(did_type, thread=None, total_threads=None, chunk_size=1000, session=None): """ List recent identifiers. :param did_type : The DID type. :param thread: The assigned thread for this necromancer. :param total_threads: The total number of threads of all necromancers. :param chunk_size: Number of requests to return per yield. :param session: The database session in use. """ stmt = select([1]).\ prefix_with("/*+ INDEX(RULES ATLAS_RUCIO.RULES_SCOPE_NAME_IDX) */", dialect='oracle').\ where(and_(models.DataIdentifier.scope == models.ReplicationRule.scope, models.DataIdentifier.name == models.ReplicationRule.name, models.ReplicationRule.state == RuleState.INJECT)) query = session.query(models.DataIdentifier).\ with_hint(models.DataIdentifier, "index(dids DIDS_IS_NEW_IDX)", 'oracle').\ filter_by(is_new=True).\ filter(~exists(stmt)) if did_type: if isinstance(did_type, str) or isinstance(did_type, unicode): query = query.filter_by(did_type=DIDType.from_sym(did_type)) elif isinstance(did_type, EnumSymbol): query = query.filter_by(did_type=did_type) if total_threads and (total_threads - 1) > 0: if session.bind.dialect.name == 'oracle': bindparams = [bindparam('thread_number', thread), bindparam('total_threads', total_threads - 1)] query = query.filter(text('ORA_HASH(name, :total_threads) = :thread_number', bindparams=bindparams)) elif session.bind.dialect.name == 'mysql': query = query.filter(text('mod(md5(name), %s) = %s' % (total_threads - 1, thread))) elif session.bind.dialect.name == 'postgresql': query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_threads - 1, thread))) row_count = 0 for chunk in query.yield_per(10): row_count += 1 if row_count <= chunk_size: yield {'scope': chunk.scope, 'name': chunk.name, 'did_type': chunk.did_type} # TODO Change this to the proper filebytes [RUCIO-199] else: break
def rebuild_table(table, delete_missing=False): from virtuoso.alchemy import AddForeignKey, DropForeignKey print("rebuilding", table) session = get_session_maker()() incoming = set(get_incoming_fks(table)) outgoing = set(table.foreign_keys) all_fkeys = incoming | outgoing self_ref = incoming & outgoing try: for fk in all_fkeys: if not delete_rows_with_missing_fkey(fk, delete_missing): print("There are missing keys, will not rebuild " + table.name) return except Exception as e: traceback.print_exc() print("Could not delete missing keys") raise e # Booleans with NULL values for col in table.c: if isinstance(col.type, Boolean): session.execute(table.update().where(col == None).values(**{col.name:0})) # Drop all keys for fk in all_fkeys: try: session.execute(DropForeignKey(fk)) except Exception as e: print("Could not drop fkey %s, maybe does not exist." % (fk_as_str(fk),)) print(e) clone = clone_table(table, table.name+"_temp", False, False) clone.create(session.bind) column_names = [c.name for c in table.columns] sel = select([getattr(table.c, cname) for cname in column_names]) with transaction.manager: session.execute(clone.insert().from_select(column_names, sel)) mark_changed(session) session.execute(DropTable(table)) # Should we create it without outgoing first? table.create(session.bind) # self ref will make the insert fail. for fk in self_ref: try: session.execute(DropForeignKey(fk)) except Exception as e: print("Could not drop fkey %s, maybe does not exist." % (fk_as_str(fk),)) print(e) sel = select([getattr(clone.c, cname) for cname in column_names]) with transaction.manager: session.execute(table.insert().from_select(column_names, sel)) mark_changed(session) session.execute(DropTable(clone)) if delete_missing: # Delete a second time, in case. for fk in outgoing: assert delete_rows_with_missing_fkey(fk, True), "OUCH" for fk in incoming: # includes self_ref session.execute(AddForeignKey(fk))
def verify(token): lo = d.meta.tables["login"] lc = d.meta.tables["logincreate"] query = d.engine.execute(lc.select().where(lc.c.token == token)).first() if not query: raise WeasylError("logincreateRecordMissing") db = d.connect() with db.begin(): # Create login record userid = db.scalar(lo.insert().returning(lo.c.userid), { "login_name": d.get_sysname(query.username), "last_login": arrow.now(), "email": query.email, }) # Create profile records db.execute(d.meta.tables["authbcrypt"].insert(), { "userid": userid, "hashsum": query.hashpass, }) db.execute(d.meta.tables["profile"].insert(), { "userid": userid, "username": query.username, "full_name": query.username, "unixtime": arrow.now(), "config": "kscftj", }) db.execute(d.meta.tables["userinfo"].insert(), { "userid": userid, "birthday": query.birthday, }) db.execute(d.meta.tables["userstats"].insert(), { "userid": userid, }) db.execute(d.meta.tables["welcomecount"].insert(), { "userid": userid, }) # Update logincreate records db.execute(lc.delete().where(lc.c.token == token)) d.metric('increment', 'verifiedusers')