/** * Utility function to convert {@link StreamShardMetadata} into {@link StreamShardHandle}. * * @param streamShardMetadata the {@link StreamShardMetadata} to be converted * @return a {@link StreamShardHandle} object */ public static StreamShardHandle convertToStreamShardHandle(StreamShardMetadata streamShardMetadata) { Shard shard = new Shard(); shard.withShardId(streamShardMetadata.getShardId()); shard.withParentShardId(streamShardMetadata.getParentShardId()); shard.withAdjacentParentShardId(streamShardMetadata.getAdjacentParentShardId()); HashKeyRange hashKeyRange = new HashKeyRange(); hashKeyRange.withStartingHashKey(streamShardMetadata.getStartingHashKey()); hashKeyRange.withEndingHashKey(streamShardMetadata.getEndingHashKey()); shard.withHashKeyRange(hashKeyRange); SequenceNumberRange sequenceNumberRange = new SequenceNumberRange(); sequenceNumberRange.withStartingSequenceNumber(streamShardMetadata.getStartingSequenceNumber()); sequenceNumberRange.withEndingSequenceNumber(streamShardMetadata.getEndingSequenceNumber()); shard.withSequenceNumberRange(sequenceNumberRange); return new StreamShardHandle(streamShardMetadata.getStreamName(), shard); }
@Test public void testStreamShardMetadataAndHandleConversion() { String streamName = "fakeStream1"; String shardId = "shard-000001"; String parentShardId = "shard-000002"; String adjacentParentShardId = "shard-000003"; String startingHashKey = "key-000001"; String endingHashKey = "key-000010"; String startingSequenceNumber = "seq-0000021"; String endingSequenceNumber = "seq-00000031"; StreamShardMetadata kinesisStreamShard = new StreamShardMetadata(); kinesisStreamShard.setStreamName(streamName); kinesisStreamShard.setShardId(shardId); kinesisStreamShard.setParentShardId(parentShardId); kinesisStreamShard.setAdjacentParentShardId(adjacentParentShardId); kinesisStreamShard.setStartingHashKey(startingHashKey); kinesisStreamShard.setEndingHashKey(endingHashKey); kinesisStreamShard.setStartingSequenceNumber(startingSequenceNumber); kinesisStreamShard.setEndingSequenceNumber(endingSequenceNumber); Shard shard = new Shard() .withShardId(shardId) .withParentShardId(parentShardId) .withAdjacentParentShardId(adjacentParentShardId) .withHashKeyRange(new HashKeyRange() .withStartingHashKey(startingHashKey) .withEndingHashKey(endingHashKey)) .withSequenceNumberRange(new SequenceNumberRange() .withStartingSequenceNumber(startingSequenceNumber) .withEndingSequenceNumber(endingSequenceNumber)); StreamShardHandle streamShardHandle = new StreamShardHandle(streamName, shard); assertEquals(kinesisStreamShard, KinesisDataFetcher.convertToStreamShardMetadata(streamShardHandle)); assertEquals(streamShardHandle, KinesisDataFetcher.convertToStreamShardHandle(kinesisStreamShard)); }
@Test public void testLegacyKinesisStreamShardToStreamShardMetadataConversion() { String streamName = "fakeStream1"; String shardId = "shard-000001"; String parentShardId = "shard-000002"; String adjacentParentShardId = "shard-000003"; String startingHashKey = "key-000001"; String endingHashKey = "key-000010"; String startingSequenceNumber = "seq-0000021"; String endingSequenceNumber = "seq-00000031"; StreamShardMetadata streamShardMetadata = new StreamShardMetadata(); streamShardMetadata.setStreamName(streamName); streamShardMetadata.setShardId(shardId); streamShardMetadata.setParentShardId(parentShardId); streamShardMetadata.setAdjacentParentShardId(adjacentParentShardId); streamShardMetadata.setStartingHashKey(startingHashKey); streamShardMetadata.setEndingHashKey(endingHashKey); streamShardMetadata.setStartingSequenceNumber(startingSequenceNumber); streamShardMetadata.setEndingSequenceNumber(endingSequenceNumber); Shard shard = new Shard() .withShardId(shardId) .withParentShardId(parentShardId) .withAdjacentParentShardId(adjacentParentShardId) .withHashKeyRange(new HashKeyRange() .withStartingHashKey(startingHashKey) .withEndingHashKey(endingHashKey)) .withSequenceNumberRange(new SequenceNumberRange() .withStartingSequenceNumber(startingSequenceNumber) .withEndingSequenceNumber(endingSequenceNumber)); KinesisStreamShard kinesisStreamShard = new KinesisStreamShard(streamName, shard); assertEquals(streamShardMetadata, KinesisStreamShard.convertToStreamShardMetadata(kinesisStreamShard)); }
public InternalStream(String aName, int nbShards, boolean isActive) { this.streamName = aName; this.streamARN = "local:fake.stream:" + aName; if (isActive) { this.streamStatus = "ACTIVE"; } for (int i = 0; i < nbShards; i++) { InternalShard newShard = new InternalShard(this.streamName, i); newShard.setSequenceNumberRange((new SequenceNumberRange()).withStartingSequenceNumber("100").withEndingSequenceNumber("999")); this.shards.add(newShard); } }