Java 类com.amazonaws.services.dynamodbv2.model.Shard 实例源码

项目:Camel    文件:ShardList.java   
Shard atAfterSeq(String sequenceNumber, BigIntComparisons condition) {
    BigInteger atAfter = new BigInteger(sequenceNumber);
    List<Shard> sorted = new ArrayList<>();
    sorted.addAll(shards.values());
    Collections.sort(sorted, StartingSequenceNumberComparator.INSTANCE);
    for (Shard shard : sorted) {
        if (shard.getSequenceNumberRange().getEndingSequenceNumber() != null) {
            BigInteger end = new BigInteger(shard.getSequenceNumberRange().getEndingSequenceNumber());
            // essentially: after < end or after <= end
            if (condition.matches(atAfter, end)) {
                return shard;
            }

        }
    }
    if (shards.size() > 0) {
        return sorted.get(sorted.size() - 1);
    }
    throw new IllegalStateException("Unable to find a shard with appropriate sequence numbers for " + sequenceNumber + " in " + shards);
}
项目:Camel    文件:ShardList.java   
/**
 * Removes shards that are older than the provided shard. Does not remove
 * the provided shard.
 *
 * @param removeBefore
 */
void removeOlderThan(Shard removeBefore) {
    String current = removeBefore.getParentShardId();

    int removedShards = 0;
    while (current != null) {
        Shard s = shards.remove(current);
        if (s == null) {
            current = null;
        } else {
            removedShards++;
            current = s.getParentShardId();
        }
    }
    log.trace("removed {} shards from the store, new size is {}", removedShards, shards.size());
}
项目:Camel    文件:ShardListTest.java   
@Test
public void reAddingEntriesMaintainsOrder() throws Exception {
    Shard first = new Shard()
            .withShardId("first_shard");
    Shard second = new Shard()
            .withParentShardId("first_shard")
            .withShardId("second_shard");

    ShardList shards = new ShardList();
    shards.add(first);
    shards.add(second);

    assertThat(shards.nextAfter(first), is(second));

    Shard second2 = new Shard()
            .withParentShardId("first_shard")
            .withShardId("second_shard");
    Shard third = new Shard()
            .withParentShardId("second_shard")
            .withShardId("third_shard");
    shards.add(second2);
    shards.add(third);

    assertThat(shards.nextAfter(first), is(second));
    assertThat(shards.nextAfter(second), is(third));
}
项目:Camel    文件:ShardListTest.java   
static List<Shard> createShardsWithSequenceNumbers(String initialParent, String... shardIdsAndSeqNos) {
    String previous = initialParent;
    List<Shard> result = new ArrayList<>();
    for (int i = 0; i < shardIdsAndSeqNos.length; i += 3) {
        String id = shardIdsAndSeqNos[i];
        String seqStart = shardIdsAndSeqNos[i + 1];
        String seqEnd = shardIdsAndSeqNos[i + 2];
        result.add(new Shard()
                .withShardId(id)
                .withParentShardId(previous)
                .withSequenceNumberRange(new SequenceNumberRange()
                    .withStartingSequenceNumber(seqStart)
                    .withEndingSequenceNumber(seqEnd)
                )
        );
        previous = id;
    }
    return result;
}
项目:kafka-connect-dynamodb    文件:DynamoDbSourceConnector.java   
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
    return ConnectorUtils.groupPartitions(new ArrayList<>(streamShards.keySet()), maxTasks).stream().map(taskShards -> {
        final Map<String, String> taskConfig = new HashMap<>();
        taskConfig.put(TaskConfig.Keys.REGION, config.region.getName());
        taskConfig.put(TaskConfig.Keys.TOPIC_FORMAT, config.topicFormat);
        taskConfig.put(TaskConfig.Keys.SHARDS, taskShards.stream().map(Shard::getShardId).collect(Collectors.joining(",")));
        taskShards.forEach(shard -> {
            final TableDescription tableDesc = streamShards.get(shard);
            taskConfig.put(shard.getShardId() + "." + TaskConfig.Keys.TABLE, tableDesc.getTableName());
            taskConfig.put(shard.getShardId() + "." + TaskConfig.Keys.STREAM_ARN, tableDesc.getLatestStreamArn());
        });
        return taskConfig;
    }).collect(Collectors.toList());
}
项目:Camel    文件:ShardList.java   
Shard nextAfter(Shard previous) {
    for (Shard shard : shards.values()) {
        if (previous.getShardId().equals(shard.getParentShardId())) {
            return shard;
        }
    }
    throw new IllegalStateException("Unable to find the next shard for " + previous + " in " + shards);
}
项目:Camel    文件:ShardList.java   
Shard first() {
    // Potential optimisation: if the two provided sequence numbers are the
    // same then we can skip the shard entirely. Need to confirm this with AWS.
    for (Shard shard : shards.values()) {
        if (!shards.containsKey(shard.getParentShardId())) {
            return shard;
        }
    }
    throw new IllegalStateException("Unable to find an unparented shard in " + shards);
}
项目:Camel    文件:ShardIteratorHandler.java   
private Shard resolveNewShard(ShardIteratorType type, String resumeFrom) {
    switch(type) {
    case AFTER_SEQUENCE_NUMBER:
        return shardList.afterSeq(resumeFrom != null ? resumeFrom : getEndpoint().getSequenceNumber());
    case AT_SEQUENCE_NUMBER:
        return shardList.atSeq(getEndpoint().getSequenceNumber());
    case TRIM_HORIZON:
        return shardList.first();
    case LATEST:
    default:
        return shardList.last();
    }
}
项目:Camel    文件:ShardListTest.java   
@Test
public void nextReturnsShardWithParent() throws Exception {
    Shard first = new Shard()
            .withShardId("first_shard")
            .withParentShardId("other_shard_id");
    Shard second = new Shard()
            .withParentShardId("first_shard")
            .withShardId("second_shard");

    ShardList shards = new ShardList();
    shards.add(first);
    shards.add(second);

    assertThat(shards.nextAfter(first), is(second));
}
项目:Camel    文件:ShardListTest.java   
@Test
public void nextWithNullReturnsFirstKnownShard() throws Exception {
    Shard first = new Shard()
            .withShardId("first_shard");
    Shard second = new Shard()
            .withParentShardId("first_shard")
            .withShardId("second_shard");

    ShardList shards = new ShardList();
    shards.add(first);
    shards.add(second);

    assertThat(shards.nextAfter(first), is(second));
}
项目:Camel    文件:ShardListTest.java   
@Test
public void removingShards() throws Exception {
    ShardList shards = new ShardList();
    shards.addAll(createShards(null, "a", "b", "c", "d"));
    Shard removeBefore = new Shard().withShardId("c").withParentShardId("b");
    shards.removeOlderThan(removeBefore);
    assertThat(shards.first().getShardId(), is("c"));
}
项目:Camel    文件:ShardListTest.java   
static List<Shard> createShards(String initialParent, String... shardIds) {
    String previous = initialParent;
    List<Shard> result = new ArrayList<>();
    for (String s : shardIds) {
        result.add(new Shard().withShardId(s).withParentShardId(previous));
        previous = s;
    }
    return result;
}
项目:Camel    文件:ShardList.java   
void addAll(Collection<Shard> shards) {
    for (Shard shard : shards) {
        add(shard);
    }
}
项目:Camel    文件:ShardList.java   
void add(Shard shard) {
    shards.put(shard.getShardId(), shard);
}
项目:Camel    文件:ShardList.java   
Shard afterSeq(String sequenceNumber) {
    return atAfterSeq(sequenceNumber, BigIntComparisons.Conditions.LT);
}
项目:Camel    文件:ShardList.java   
Shard atSeq(String sequenceNumber) {
    return atAfterSeq(sequenceNumber, BigIntComparisons.Conditions.LTEQ);
}
项目:Camel    文件:ShardList.java   
@Override
public int compare(Shard o1, Shard o2) {
    BigInteger i1 = new BigInteger(o1.getSequenceNumberRange().getStartingSequenceNumber());
    BigInteger i2 = new BigInteger(o2.getSequenceNumberRange().getStartingSequenceNumber());
    return i1.compareTo(i2);
}