Shard atAfterSeq(String sequenceNumber, BigIntComparisons condition) { BigInteger atAfter = new BigInteger(sequenceNumber); List<Shard> sorted = new ArrayList<>(); sorted.addAll(shards.values()); Collections.sort(sorted, StartingSequenceNumberComparator.INSTANCE); for (Shard shard : sorted) { if (shard.getSequenceNumberRange().getEndingSequenceNumber() != null) { BigInteger end = new BigInteger(shard.getSequenceNumberRange().getEndingSequenceNumber()); // essentially: after < end or after <= end if (condition.matches(atAfter, end)) { return shard; } } } if (shards.size() > 0) { return sorted.get(sorted.size() - 1); } throw new IllegalStateException("Unable to find a shard with appropriate sequence numbers for " + sequenceNumber + " in " + shards); }
/** * Removes shards that are older than the provided shard. Does not remove * the provided shard. * * @param removeBefore */ void removeOlderThan(Shard removeBefore) { String current = removeBefore.getParentShardId(); int removedShards = 0; while (current != null) { Shard s = shards.remove(current); if (s == null) { current = null; } else { removedShards++; current = s.getParentShardId(); } } log.trace("removed {} shards from the store, new size is {}", removedShards, shards.size()); }
@Test public void reAddingEntriesMaintainsOrder() throws Exception { Shard first = new Shard() .withShardId("first_shard"); Shard second = new Shard() .withParentShardId("first_shard") .withShardId("second_shard"); ShardList shards = new ShardList(); shards.add(first); shards.add(second); assertThat(shards.nextAfter(first), is(second)); Shard second2 = new Shard() .withParentShardId("first_shard") .withShardId("second_shard"); Shard third = new Shard() .withParentShardId("second_shard") .withShardId("third_shard"); shards.add(second2); shards.add(third); assertThat(shards.nextAfter(first), is(second)); assertThat(shards.nextAfter(second), is(third)); }
static List<Shard> createShardsWithSequenceNumbers(String initialParent, String... shardIdsAndSeqNos) { String previous = initialParent; List<Shard> result = new ArrayList<>(); for (int i = 0; i < shardIdsAndSeqNos.length; i += 3) { String id = shardIdsAndSeqNos[i]; String seqStart = shardIdsAndSeqNos[i + 1]; String seqEnd = shardIdsAndSeqNos[i + 2]; result.add(new Shard() .withShardId(id) .withParentShardId(previous) .withSequenceNumberRange(new SequenceNumberRange() .withStartingSequenceNumber(seqStart) .withEndingSequenceNumber(seqEnd) ) ); previous = id; } return result; }
@Override public List<Map<String, String>> taskConfigs(int maxTasks) { return ConnectorUtils.groupPartitions(new ArrayList<>(streamShards.keySet()), maxTasks).stream().map(taskShards -> { final Map<String, String> taskConfig = new HashMap<>(); taskConfig.put(TaskConfig.Keys.REGION, config.region.getName()); taskConfig.put(TaskConfig.Keys.TOPIC_FORMAT, config.topicFormat); taskConfig.put(TaskConfig.Keys.SHARDS, taskShards.stream().map(Shard::getShardId).collect(Collectors.joining(","))); taskShards.forEach(shard -> { final TableDescription tableDesc = streamShards.get(shard); taskConfig.put(shard.getShardId() + "." + TaskConfig.Keys.TABLE, tableDesc.getTableName()); taskConfig.put(shard.getShardId() + "." + TaskConfig.Keys.STREAM_ARN, tableDesc.getLatestStreamArn()); }); return taskConfig; }).collect(Collectors.toList()); }
Shard nextAfter(Shard previous) { for (Shard shard : shards.values()) { if (previous.getShardId().equals(shard.getParentShardId())) { return shard; } } throw new IllegalStateException("Unable to find the next shard for " + previous + " in " + shards); }
Shard first() { // Potential optimisation: if the two provided sequence numbers are the // same then we can skip the shard entirely. Need to confirm this with AWS. for (Shard shard : shards.values()) { if (!shards.containsKey(shard.getParentShardId())) { return shard; } } throw new IllegalStateException("Unable to find an unparented shard in " + shards); }
private Shard resolveNewShard(ShardIteratorType type, String resumeFrom) { switch(type) { case AFTER_SEQUENCE_NUMBER: return shardList.afterSeq(resumeFrom != null ? resumeFrom : getEndpoint().getSequenceNumber()); case AT_SEQUENCE_NUMBER: return shardList.atSeq(getEndpoint().getSequenceNumber()); case TRIM_HORIZON: return shardList.first(); case LATEST: default: return shardList.last(); } }
@Test public void nextReturnsShardWithParent() throws Exception { Shard first = new Shard() .withShardId("first_shard") .withParentShardId("other_shard_id"); Shard second = new Shard() .withParentShardId("first_shard") .withShardId("second_shard"); ShardList shards = new ShardList(); shards.add(first); shards.add(second); assertThat(shards.nextAfter(first), is(second)); }
@Test public void nextWithNullReturnsFirstKnownShard() throws Exception { Shard first = new Shard() .withShardId("first_shard"); Shard second = new Shard() .withParentShardId("first_shard") .withShardId("second_shard"); ShardList shards = new ShardList(); shards.add(first); shards.add(second); assertThat(shards.nextAfter(first), is(second)); }
@Test public void removingShards() throws Exception { ShardList shards = new ShardList(); shards.addAll(createShards(null, "a", "b", "c", "d")); Shard removeBefore = new Shard().withShardId("c").withParentShardId("b"); shards.removeOlderThan(removeBefore); assertThat(shards.first().getShardId(), is("c")); }
static List<Shard> createShards(String initialParent, String... shardIds) { String previous = initialParent; List<Shard> result = new ArrayList<>(); for (String s : shardIds) { result.add(new Shard().withShardId(s).withParentShardId(previous)); previous = s; } return result; }
void addAll(Collection<Shard> shards) { for (Shard shard : shards) { add(shard); } }
void add(Shard shard) { shards.put(shard.getShardId(), shard); }
Shard afterSeq(String sequenceNumber) { return atAfterSeq(sequenceNumber, BigIntComparisons.Conditions.LT); }
Shard atSeq(String sequenceNumber) { return atAfterSeq(sequenceNumber, BigIntComparisons.Conditions.LTEQ); }
@Override public int compare(Shard o1, Shard o2) { BigInteger i1 = new BigInteger(o1.getSequenceNumberRange().getStartingSequenceNumber()); BigInteger i2 = new BigInteger(o2.getSequenceNumberRange().getStartingSequenceNumber()); return i1.compareTo(i2); }