我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用sqlalchemy.sql.expression.bindparam()。
def get_updated_account_counters(total_workers, worker_number, session=None): """ Get updated rse_counters. :param total_workers: Number of total workers. :param worker_number: id of the executing worker. :param session: Database session in use. :returns: List of rse_ids whose rse_counters need to be updated. """ query = session.query(models.UpdatedAccountCounter.account, models.UpdatedAccountCounter.rse_id).\ distinct(models.UpdatedAccountCounter.account, models.UpdatedAccountCounter.rse_id) if total_workers > 0: if session.bind.dialect.name == 'oracle': bindparams = [bindparam('worker_number', worker_number), bindparam('total_workers', total_workers)] query = query.filter(text('ORA_HASH(CONCAT(account, rse_id), :total_workers) = :worker_number', bindparams=bindparams)) elif session.bind.dialect.name == 'mysql': query = query.filter('mod(md5(concat(account, rse_id)), %s) = %s' % (total_workers + 1, worker_number)) elif session.bind.dialect.name == 'postgresql': query = query.filter('mod(abs((\'x\'||md5(concat(account, rse_id)))::bit(32)::int), %s) = %s' % (total_workers + 1, worker_number)) return query.all()
def test_unicode(self, engine, connection): unicode_str = '??' one_row = Table('one_row', MetaData(bind=engine)) returned_str = sqlalchemy.select( [expression.bindparam('????', unicode_str)], from_obj=one_row, ).scalar() self.assertEqual(returned_str, unicode_str)
def get_updated_dids(total_workers, worker_number, limit=100, blacklisted_dids=[], session=None): """ Get updated dids. :param total_workers: Number of total workers. :param worker_number: id of the executing worker. :param limit: Maximum number of dids to return. :param blacklisted_dids: Blacklisted dids to filter. :param session: Database session in use. """ query = session.query(models.UpdatedDID.id, models.UpdatedDID.scope, models.UpdatedDID.name, models.UpdatedDID.rule_evaluation_action) if total_workers > 0: if session.bind.dialect.name == 'oracle': bindparams = [bindparam('worker_number', worker_number), bindparam('total_workers', total_workers)] query = query.filter(text('ORA_HASH(name, :total_workers) = :worker_number', bindparams=bindparams)) elif session.bind.dialect.name == 'mysql': query = query.filter(text('mod(md5(name), %s) = %s' % (total_workers + 1, worker_number))) elif session.bind.dialect.name == 'postgresql': query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_workers + 1, worker_number))) if limit: fetched_dids = query.order_by(models.UpdatedDID.created_at).limit(limit).all() filtered_dids = [did for did in fetched_dids if (did.scope, did.name) not in blacklisted_dids] if len(fetched_dids) == limit and len(filtered_dids) == 0: return get_updated_dids(total_workers=total_workers, worker_number=worker_number, limit=None, blacklisted_dids=blacklisted_dids, session=session) else: return filtered_dids else: return [did for did in query.order_by(models.UpdatedDID.created_at).all() if (did.scope, did.name) not in blacklisted_dids]
def get_rules_beyond_eol(date_check, worker_number, total_workers, session): """ Get rules which have eol_at before a certain date. :param date_check: The reference date that should be compared to eol_at. :param worker_number: id of the executing worker. :param total_workers: Number of total workers. :param session: Database session in use. """ query = session.query(models.ReplicationRule.scope, models.ReplicationRule.name, models.ReplicationRule.rse_expression, models.ReplicationRule.locked, models.ReplicationRule.id, models.ReplicationRule.eol_at, models.ReplicationRule.expires_at).\ filter(models.ReplicationRule.eol_at < date_check) if session.bind.dialect.name == 'oracle': bindparams = [bindparam('worker_number', worker_number), bindparam('total_workers', total_workers)] query = query.filter(text('ORA_HASH(name, :total_workers) = :worker_number', bindparams=bindparams)) elif session.bind.dialect.name == 'mysql': query = query.filter(text('mod(md5(name), %s) = %s' % (total_workers + 1, worker_number))) elif session.bind.dialect.name == 'postgresql': query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_workers + 1, worker_number))) return [rule for rule in query.all()]
def get_expired_rules(total_workers, worker_number, limit=100, blacklisted_rules=[], session=None): """ Get expired rules. :param total_workers: Number of total workers. :param worker_number: id of the executing worker. :param limit: Maximum number of rules to return. :param backlisted_rules: List of blacklisted rules. :param session: Database session in use. """ query = session.query(models.ReplicationRule.id, models.ReplicationRule.rse_expression).filter(models.ReplicationRule.expires_at < datetime.utcnow(), models.ReplicationRule.locked == False, models.ReplicationRule.child_rule_id == None).\ with_hint(models.ReplicationRule, "index(rules RULES_EXPIRES_AT_IDX)", 'oracle').\ order_by(models.ReplicationRule.expires_at) # NOQA if session.bind.dialect.name == 'oracle': bindparams = [bindparam('worker_number', worker_number), bindparam('total_workers', total_workers)] query = query.filter(text('ORA_HASH(name, :total_workers) = :worker_number', bindparams=bindparams)) elif session.bind.dialect.name == 'mysql': query = query.filter(text('mod(md5(name), %s) = %s' % (total_workers + 1, worker_number))) elif session.bind.dialect.name == 'postgresql': query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_workers + 1, worker_number))) if limit: fetched_rules = query.limit(limit).all() filtered_rules = [rule for rule in fetched_rules if rule[0] not in blacklisted_rules] if len(fetched_rules) == limit and len(filtered_rules) == 0: return get_expired_rules(total_workers=total_workers, worker_number=worker_number, limit=None, blacklisted_rules=blacklisted_rules, session=session) else: return filtered_rules else: return [rule for rule in query.all() if rule[0] not in blacklisted_rules]
def list_bad_replicas_history(limit=10000, thread=None, total_threads=None, session=None): """ List the bad file replicas history. Method only used by necromancer :param limit: The maximum number of replicas returned. :param thread: The assigned thread for this necromancer. :param total_threads: The total number of threads of all necromancers. :param session: The database session in use. """ query = session.query(models.BadReplicas.scope, models.BadReplicas.name, models.BadReplicas.rse_id).\ filter(models.BadReplicas.state == BadFilesStatus.BAD) if total_threads and (total_threads - 1) > 0: if session.bind.dialect.name == 'oracle': bindparams = [bindparam('thread_number', thread), bindparam('total_threads', total_threads - 1)] query = query.filter(text('ORA_HASH(name, :total_threads) = :thread_number', bindparams=bindparams)) elif session.bind.dialect.name == 'mysql': query = query.filter(text('mod(md5(name), %s) = %s' % (total_threads - 1, thread))) elif session.bind.dialect.name == 'postgresql': query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_threads - 1, thread))) query = query.limit(limit) bad_replicas = {} for scope, name, rse_id in query.yield_per(1000): if rse_id not in bad_replicas: bad_replicas[rse_id] = [] bad_replicas[rse_id].append({'scope': scope, 'name': name}) return bad_replicas
def list_expired_temporary_dids(rse, limit, worker_number=None, total_workers=None, session=None): """ List expired temporary DIDs. :param rse: the rse name. :param limit: The maximum number of replicas returned. :param worker_number: id of the executing worker. :param total_workers: Number of total workers. :param session: The database session in use. :returns: a list of dictionary replica. """ rse_id = get_rse_id(rse, session=session) is_none = None query = session.query(models.TemporaryDataIdentifier.scope, models.TemporaryDataIdentifier.name, models.TemporaryDataIdentifier.path, models.TemporaryDataIdentifier.bytes).\ with_hint(models.TemporaryDataIdentifier, "INDEX(tmp_dids TMP_DIDS_EXPIRED_AT_IDX)", 'oracle').\ filter(case([(models.TemporaryDataIdentifier.expired_at != is_none, models.TemporaryDataIdentifier.rse_id), ]) == rse_id) if worker_number and total_workers and total_workers - 1 > 0: if session.bind.dialect.name == 'oracle': bindparams = [bindparam('worker_number', worker_number - 1), bindparam('total_workers', total_workers - 1)] query = query.filter(text('ORA_HASH(name, :total_workers) = :worker_number', bindparams=bindparams)) elif session.bind.dialect.name == 'mysql': query = query.filter(text('mod(md5(name), %s) = %s' % (total_workers - 1, worker_number - 1))) elif session.bind.dialect.name == 'postgresql': query = query.filter(text('mod(abs((\'x\'||md5(path))::bit(32)::int), %s) = %s' % (total_workers - 1, worker_number - 1))) return [{'path': path, 'rse': rse, 'rse_id': rse_id, 'scope': scope, 'name': name, 'bytes': bytes} for scope, name, path, bytes in query.limit(limit)]
def test_unicode(engine, table_one_row): unicode_str = "?????" returned_str = sqlalchemy.select( [expression.bindparam("?", unicode_str)], from_obj=table_one_row, ).scalar() assert returned_str == unicode_str
def get_descendants_query( cls, root_idea_id=bindparam('root_idea_id', type_=Integer), inclusive=True): if cls.using_virtuoso: sql = text( """SELECT transitive t_in (1) t_out (2) T_DISTINCT T_NO_CYCLES source_id, target_id FROM idea_idea_link WHERE tombstone_date IS NULL""" ).columns(column('source_id'), column('target_id')).alias() select_exp = select([sql.c.target_id.label('id')] ).select_from(sql).where(sql.c.source_id==root_idea_id) else: link = select( [IdeaLink.source_id, IdeaLink.target_id] ).select_from( IdeaLink ).where( (IdeaLink.tombstone_date == None) & (IdeaLink.source_id == root_idea_id) ).cte(recursive=True) source_alias = aliased(link) targets_alias = aliased(IdeaLink) parent_link = targets_alias.source_id == source_alias.c.target_id children = select( [targets_alias.source_id, targets_alias.target_id] ).select_from(targets_alias).where(parent_link & (targets_alias.tombstone_date == None)) with_children = link.union(children) select_exp = select([with_children.c.target_id.label('id')] ).select_from(with_children) if inclusive: if isinstance(root_idea_id, int): root_idea_id = literal_column(str(root_idea_id), Integer) select_exp = select_exp.union( select([root_idea_id.label('id')])) return select_exp.alias()
def get_injected_rules(total_workers, worker_number, limit=100, blacklisted_rules=[], session=None): """ Get rules to be injected. :param total_workers: Number of total workers. :param worker_number: id of the executing worker. :param limit: Maximum number of rules to return. :param blacklisted_rules: Blacklisted rules not to include. :param session: Database session in use. """ if session.bind.dialect.name == 'oracle': query = session.query(models.ReplicationRule.id).\ with_hint(models.ReplicationRule, "index(rules RULES_INJECTIONSTATE_IDX)", 'oracle').\ filter(text("(CASE when rules.state='I' THEN rules.state ELSE null END)= 'I' ")).\ filter(models.ReplicationRule.state == RuleState.INJECT).\ order_by(models.ReplicationRule.created_at) else: query = session.query(models.ReplicationRule.id).\ with_hint(models.ReplicationRule, "index(rules RULES_INJECTIONSTATE_IDX)", 'oracle').\ filter(models.ReplicationRule.state == RuleState.INJECT).\ order_by(models.ReplicationRule.created_at) if session.bind.dialect.name == 'oracle': bindparams = [bindparam('worker_number', worker_number), bindparam('total_workers', total_workers)] query = query.filter(text('ORA_HASH(name, :total_workers) = :worker_number', bindparams=bindparams)) elif session.bind.dialect.name == 'mysql': query = query.filter(text('mod(md5(name), %s) = %s' % (total_workers + 1, worker_number))) elif session.bind.dialect.name == 'postgresql': query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_workers + 1, worker_number))) if limit: fetched_rules = query.limit(limit).all() filtered_rules = [rule for rule in fetched_rules if rule[0] not in blacklisted_rules] if len(fetched_rules) == limit and len(filtered_rules) == 0: return get_injected_rules(total_workers=total_workers, worker_number=worker_number, limit=None, blacklisted_rules=blacklisted_rules, session=session) else: return filtered_rules else: return [rule for rule in query.all() if rule[0] not in blacklisted_rules]
def get_stuck_rules(total_workers, worker_number, delta=600, limit=10, blacklisted_rules=[], session=None): """ Get stuck rules. :param total_workers: Number of total workers. :param worker_number: id of the executing worker. :param delta: Delta in seconds to select rules in. :param limit: Maximum number of rules to select. :param blacklisted_rules: Blacklisted rules to filter out. :param session: Database session in use. """ if session.bind.dialect.name == 'oracle': query = session.query(models.ReplicationRule.id).\ with_hint(models.ReplicationRule, "index(rules RULES_STUCKSTATE_IDX)", 'oracle').\ filter(text("(CASE when rules.state='S' THEN rules.state ELSE null END)= 'S' ")).\ filter(models.ReplicationRule.state == RuleState.STUCK).\ filter(models.ReplicationRule.updated_at < datetime.utcnow() - timedelta(seconds=delta)).\ filter(or_(models.ReplicationRule.expires_at == null(), models.ReplicationRule.expires_at > datetime.utcnow(), models.ReplicationRule.locked == true())).\ order_by(models.ReplicationRule.updated_at) # NOQA else: query = session.query(models.ReplicationRule.id).\ with_hint(models.ReplicationRule, "index(rules RULES_STUCKSTATE_IDX)", 'oracle').\ filter(models.ReplicationRule.state == RuleState.STUCK).\ filter(models.ReplicationRule.updated_at < datetime.utcnow() - timedelta(seconds=delta)).\ filter(or_(models.ReplicationRule.expires_at == null(), models.ReplicationRule.expires_at > datetime.utcnow(), models.ReplicationRule.locked == true())).\ order_by(models.ReplicationRule.updated_at) if session.bind.dialect.name == 'oracle': bindparams = [bindparam('worker_number', worker_number), bindparam('total_workers', total_workers)] query = query.filter(text('ORA_HASH(name, :total_workers) = :worker_number', bindparams=bindparams)) elif session.bind.dialect.name == 'mysql': query = query.filter(text('mod(md5(name), %s) = %s' % (total_workers + 1, worker_number))) elif session.bind.dialect.name == 'postgresql': query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_workers + 1, worker_number))) if limit: fetched_rules = query.limit(limit).all() filtered_rules = [rule for rule in fetched_rules if rule[0] not in blacklisted_rules] if len(fetched_rules) == limit and len(filtered_rules) == 0: return get_stuck_rules(total_workers=total_workers, worker_number=worker_number, delta=delta, limit=None, blacklisted_rules=blacklisted_rules, session=session) else: return filtered_rules else: return [rule for rule in query.all() if rule[0] not in blacklisted_rules]
def list_bad_replicas(limit=10000, thread=None, total_threads=None, session=None): """ List RSE File replicas with no locks. :param limit: The maximum number of replicas returned. :param thread: The assigned thread for this necromancer. :param total_threads: The total number of threads of all necromancers. :param session: The database session in use. :returns: a list of dictionary {'scope' scope, 'name': name, 'rse_id': rse_id, 'rse': rse}. """ if session.bind.dialect.name == 'oracle': # The filter(text...)) is needed otherwise, SQLA uses bind variables and the index is not used. query = session.query(models.RSEFileAssociation.scope, models.RSEFileAssociation.name, models.RSEFileAssociation.rse_id).\ with_hint(models.RSEFileAssociation, "+ index(replicas REPLICAS_STATE_IDX)", 'oracle').\ filter(text("CASE WHEN (%s.replicas.state != 'A') THEN %s.replicas.rse_id END IS NOT NULL" % (DEFAULT_SCHEMA_NAME, DEFAULT_SCHEMA_NAME))).\ filter(models.RSEFileAssociation.state == ReplicaState.BAD) else: query = session.query(models.RSEFileAssociation.scope, models.RSEFileAssociation.name, models.RSEFileAssociation.rse_id).\ filter(models.RSEFileAssociation.state == ReplicaState.BAD) if total_threads and (total_threads - 1) > 0: if session.bind.dialect.name == 'oracle': bindparams = [bindparam('thread_number', thread), bindparam('total_threads', total_threads - 1)] query = query.filter(text('ORA_HASH(name, :total_threads) = :thread_number', bindparams=bindparams)) elif session.bind.dialect.name == 'mysql': query = query.filter(text('mod(md5(name), %s) = %s' % (total_threads - 1, thread))) elif session.bind.dialect.name == 'postgresql': query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_threads - 1, thread))) query = query.limit(limit) rows = [] rse_map = {} for scope, name, rse_id in query.yield_per(1000): if rse_id not in rse_map: rse_map[rse_id] = get_rse_name(rse_id=rse_id, session=session) rows.append({'scope': scope, 'name': name, 'rse_id': rse_id, 'rse': rse_map[rse_id]}) return rows
def list_unlocked_replicas(rse, limit, bytes=None, rse_id=None, worker_number=None, total_workers=None, delay_seconds=0, session=None): """ List RSE File replicas with no locks. :param rse: the rse name. :param bytes: the amount of needed bytes. :param session: The database session in use. :returns: a list of dictionary replica. """ if not rse_id: rse_id = get_rse_id(rse=rse, session=session) # filter(models.RSEFileAssociation.state != ReplicaState.BEING_DELETED).\ none_value = None # Hack to get pep8 happy... query = session.query(models.RSEFileAssociation.scope, models.RSEFileAssociation.name, models.RSEFileAssociation.path, models.RSEFileAssociation.bytes, models.RSEFileAssociation.tombstone, models.RSEFileAssociation.state).\ with_hint(models.RSEFileAssociation, "INDEX_RS_ASC(replicas REPLICAS_TOMBSTONE_IDX) NO_INDEX_FFS(replicas REPLICAS_TOMBSTONE_IDX)", 'oracle').\ filter(models.RSEFileAssociation.tombstone < datetime.utcnow()).\ filter(models.RSEFileAssociation.lock_cnt == 0).\ filter(case([(models.RSEFileAssociation.tombstone != none_value, models.RSEFileAssociation.rse_id), ]) == rse_id).\ filter(or_(models.RSEFileAssociation.state.in_((ReplicaState.AVAILABLE, ReplicaState.UNAVAILABLE, ReplicaState.BAD)), and_(models.RSEFileAssociation.state == ReplicaState.BEING_DELETED, models.RSEFileAssociation.updated_at < datetime.utcnow() - timedelta(seconds=delay_seconds)))).\ order_by(models.RSEFileAssociation.tombstone) # do no delete files used as sources stmt = exists(select([1]).prefix_with("/*+ INDEX(requests REQUESTS_SCOPE_NAME_RSE_IDX) */", dialect='oracle')).\ where(and_(models.RSEFileAssociation.scope == models.Request.scope, models.RSEFileAssociation.name == models.Request.name)) query = query.filter(not_(stmt)) if worker_number and total_workers and total_workers - 1 > 0: if session.bind.dialect.name == 'oracle': bindparams = [bindparam('worker_number', worker_number - 1), bindparam('total_workers', total_workers - 1)] query = query.filter(text('ORA_HASH(name, :total_workers) = :worker_number', bindparams=bindparams)) elif session.bind.dialect.name == 'mysql': query = query.filter(text('mod(md5(name), %s) = %s' % (total_workers - 1, worker_number - 1))) elif session.bind.dialect.name == 'postgresql': query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_workers - 1, worker_number - 1))) needed_space = bytes total_bytes, total_files = 0, 0 rows = [] for (scope, name, path, bytes, tombstone, state) in query.yield_per(1000): if state != ReplicaState.UNAVAILABLE: total_bytes += bytes if tombstone != OBSOLETE and needed_space is not None and total_bytes > needed_space: break total_files += 1 if total_files > limit: break rows.append({'scope': scope, 'name': name, 'path': path, 'bytes': bytes, 'tombstone': tombstone, 'state': state}) return rows
def list_quarantined_replicas(rse, limit, worker_number=None, total_workers=None, session=None): """ List RSE Quarantined File replicas. :param rse: the rse name. :param limit: The maximum number of replicas returned. :param worker_number: id of the executing worker. :param total_workers: Number of total workers. :param session: The database session in use. :returns: a list of dictionary replica. """ rse_id = get_rse_id(rse, session=session) query = session.query(models.QuarantinedReplica.path, models.QuarantinedReplica.bytes, models.QuarantinedReplica.scope, models.QuarantinedReplica.name, models.QuarantinedReplica.created_at).\ filter(models.QuarantinedReplica.rse_id == rse_id) # do no delete valid replicas stmt = exists(select([1]).prefix_with("/*+ index(REPLICAS REPLICAS_PK) */", dialect='oracle')).\ where(and_(models.RSEFileAssociation.scope == models.QuarantinedReplica.scope, models.RSEFileAssociation.name == models.QuarantinedReplica.name, models.RSEFileAssociation.rse_id == models.QuarantinedReplica.rse_id)) query = query.filter(not_(stmt)) if worker_number and total_workers and total_workers - 1 > 0: if session.bind.dialect.name == 'oracle': bindparams = [bindparam('worker_number', worker_number - 1), bindparam('total_workers', total_workers - 1)] query = query.filter(text('ORA_HASH(path, :total_workers) = :worker_number', bindparams=bindparams)) elif session.bind.dialect.name == 'mysql': query = query.filter('mod(md5(path), %s) = %s' % (total_workers - 1, worker_number - 1)) elif session.bind.dialect.name == 'postgresql': query = query.filter('mod(abs((\'x\'||md5(path))::bit(32)::int), %s) = %s' % (total_workers - 1, worker_number - 1)) return [{'path': path, 'rse': rse, 'rse_id': rse_id, 'created_at': created_at, 'scope': scope, 'name': name, 'bytes': bytes} for path, bytes, scope, name, created_at in query.limit(limit)]
def list_new_dids(did_type, thread=None, total_threads=None, chunk_size=1000, session=None): """ List recent identifiers. :param did_type : The DID type. :param thread: The assigned thread for this necromancer. :param total_threads: The total number of threads of all necromancers. :param chunk_size: Number of requests to return per yield. :param session: The database session in use. """ stmt = select([1]).\ prefix_with("/*+ INDEX(RULES ATLAS_RUCIO.RULES_SCOPE_NAME_IDX) */", dialect='oracle').\ where(and_(models.DataIdentifier.scope == models.ReplicationRule.scope, models.DataIdentifier.name == models.ReplicationRule.name, models.ReplicationRule.state == RuleState.INJECT)) query = session.query(models.DataIdentifier).\ with_hint(models.DataIdentifier, "index(dids DIDS_IS_NEW_IDX)", 'oracle').\ filter_by(is_new=True).\ filter(~exists(stmt)) if did_type: if isinstance(did_type, str) or isinstance(did_type, unicode): query = query.filter_by(did_type=DIDType.from_sym(did_type)) elif isinstance(did_type, EnumSymbol): query = query.filter_by(did_type=did_type) if total_threads and (total_threads - 1) > 0: if session.bind.dialect.name == 'oracle': bindparams = [bindparam('thread_number', thread), bindparam('total_threads', total_threads - 1)] query = query.filter(text('ORA_HASH(name, :total_threads) = :thread_number', bindparams=bindparams)) elif session.bind.dialect.name == 'mysql': query = query.filter(text('mod(md5(name), %s) = %s' % (total_threads - 1, thread))) elif session.bind.dialect.name == 'postgresql': query = query.filter(text('mod(abs((\'x\'||md5(name))::bit(32)::int), %s) = %s' % (total_threads - 1, thread))) row_count = 0 for chunk in query.yield_per(10): row_count += 1 if row_count <= chunk_size: yield {'scope': chunk.scope, 'name': chunk.name, 'did_type': chunk.did_type} # TODO Change this to the proper filebytes [RUCIO-199] else: break
def retrieve_messages(bulk=1000, thread=None, total_threads=None, event_type=None, lock=False, session=None): """ Retrieve up to $bulk messages. :param bulk: Number of messages as an integer. :param thread: Identifier of the caller thread as an integer. :param total_threads: Maximum number of threads as an integer. :param event_type: Return only specified event_type. If None, returns everything except email. :param lock: Select exclusively some rows. :param session: The database session to use. :returns messages: List of dictionaries {id, created_at, event_type, payload} """ messages = [] try: subquery = session.query(Message.id) if total_threads and (total_threads - 1) > 0: if session.bind.dialect.name == 'oracle': bindparams = [bindparam('thread_number', thread), bindparam('total_threads', total_threads - 1)] subquery = subquery.filter(text('ORA_HASH(id, :total_threads) = :thread_number', bindparams=bindparams)) elif session.bind.dialect.name == 'mysql': subquery = subquery.filter('mod(md5(id), %s) = %s' % (total_threads - 1, thread)) elif session.bind.dialect.name == 'postgresql': subquery = subquery.filter('mod(abs((\'x\'||md5(id))::bit(32)::int), %s) = %s' % (total_threads - 1, thread)) if event_type: subquery = subquery.filter_by(event_type=event_type) else: subquery = subquery.filter(Message.event_type != 'email') # Step 1: # MySQL does not support limits in nested queries, limit on the outer query instead. # This is not as performant, but the best we can get from MySQL. if session.bind.dialect.name == 'mysql': subquery = subquery.order_by(Message.created_at) else: subquery = subquery.order_by(Message.created_at).limit(bulk) query = session.query(Message.id, Message.created_at, Message.event_type, Message.payload)\ .filter(Message.id.in_(subquery))\ .with_for_update(nowait=True) # Step 2: # MySQL does not support limits in nested queries, limit on the outer query instead. # This is not as performant, but the best we can get from MySQL. if session.bind.dialect.name == 'mysql': query = query.limit(bulk) for id, created_at, event_type, payload in query: messages.append({'id': id, 'created_at': created_at, 'event_type': event_type, 'payload': json.loads(str(payload))}) return messages except IntegrityError, e: raise RucioException(e.args)
def get_ancestors_query( cls, target_id=bindparam('root_id', type_=Integer), inclusive=True, tombstone_date=None): if cls.using_virtuoso: if isinstance(target_id, list): raise NotImplementedError() sql = text( """SELECT transitive t_in (1) t_out (2) T_DISTINCT T_NO_CYCLES source_id, target_id FROM idea_idea_link WHERE tombstone_date IS NULL""" ).columns(column('source_id'), column('target_id')).alias() select_exp = select([sql.c.source_id.label('id')] ).select_from(sql).where(sql.c.target_id==target_id) else: if isinstance(target_id, list): root_condition = IdeaLink.target_id.in_(target_id) else: root_condition = (IdeaLink.target_id == target_id) link = select( [IdeaLink.source_id, IdeaLink.target_id] ).select_from( IdeaLink ).where( (IdeaLink.tombstone_date == tombstone_date) & (root_condition) ).cte(recursive=True) target_alias = aliased(link) sources_alias = aliased(IdeaLink) parent_link = sources_alias.target_id == target_alias.c.source_id parents = select( [sources_alias.source_id, sources_alias.target_id] ).select_from(sources_alias).where(parent_link & (sources_alias.tombstone_date == tombstone_date)) with_parents = link.union(parents) select_exp = select([with_parents.c.source_id.label('id')] ).select_from(with_parents) if inclusive: if isinstance(target_id, int): target_id = literal_column(str(target_id), Integer) elif isinstance(target_id, list): raise NotImplementedError() # postgres: select * from unnest(ARRAY[1,6,7]) as id else: select_exp = select_exp.union( select([target_id.label('id')])) return select_exp.alias()