private void refreshShards() { DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); String exclusiveStartShardId = null; List<Shard> shards = new ArrayList<>(); do { describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId); DescribeStreamResult describeStreamResult = kinesisClient.describeStream(describeStreamRequest); shards.addAll(describeStreamResult.getStreamDescription().getShards()); if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) { exclusiveStartShardId = shards.get(shards.size() - 1).getShardId(); } else { exclusiveStartShardId = null; } } while (exclusiveStartShardId != null); this.shards = shards; }
@Test @Ignore("Kinesalite doesn't support updateShardCount. Test only against real AWS Kinesis") public void testPartitionCountIncreasedIfAutoAddPartitionsSet() { KinesisBinderConfigurationProperties configurationProperties = new KinesisBinderConfigurationProperties(); String stream = "existing" + System.currentTimeMillis(); AmazonKinesisAsync amazonKinesis = localKinesisResource.getResource(); amazonKinesis.createStream(stream, 1); List<Shard> shards = describeStream(stream); assertThat(shards.size()).isEqualTo(1); configurationProperties.setMinShardCount(6); configurationProperties.setAutoAddShards(true); KinesisTestBinder binder = getBinder(configurationProperties); ExtendedConsumerProperties<KinesisConsumerProperties> consumerProperties = createConsumerProperties(); Binding<?> binding = binder.bindConsumer(stream, "test", new NullChannel(), consumerProperties); binding.unbind(); shards = describeStream(stream); assertThat(shards.size()).isEqualTo(6); }
@Test public void testProvisionProducerSuccessfulWithExistingStream() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); ExtendedProducerProperties<KinesisProducerProperties> extendedProducerProperties = new ExtendedProducerProperties<>(new KinesisProducerProperties()); String name = "test-stream"; DescribeStreamResult describeStreamResult = describeStreamResultWithShards( Collections.singletonList(new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenReturn(describeStreamResult); ProducerDestination destination = provisioner.provisionProducerDestination(name, extendedProducerProperties); verify(amazonKinesisMock) .describeStream(any(DescribeStreamRequest.class)); assertThat(destination.getName()).isEqualTo(name); }
@Test public void testProvisionConsumerSuccessfulWithExistingStream() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(new KinesisConsumerProperties()); String name = "test-stream"; String group = "test-group"; DescribeStreamResult describeStreamResult = describeStreamResultWithShards(Collections.singletonList(new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenReturn(describeStreamResult); ConsumerDestination destination = provisioner.provisionConsumerDestination(name, group, extendedConsumerProperties); verify(amazonKinesisMock) .describeStream(any(DescribeStreamRequest.class)); assertThat(destination.getName()).isEqualTo(name); }
public List<Shard> listShards(final String streamName) throws TransientKinesisException { return wrapExceptions(new Callable<List<Shard>>() { @Override public List<Shard> call() throws Exception { List<Shard> shards = Lists.newArrayList(); String lastShardId = null; StreamDescription description; do { description = kinesis.describeStream(streamName, lastShardId) .getStreamDescription(); shards.addAll(description.getShards()); lastShardId = shards.get(shards.size() - 1).getShardId(); } while (description.getHasMoreShards()); return shards; } }); }
@Test public void shouldListAllShards() throws Exception { Shard shard1 = new Shard().withShardId(SHARD_1); Shard shard2 = new Shard().withShardId(SHARD_2); Shard shard3 = new Shard().withShardId(SHARD_3); given(kinesis.describeStream(STREAM, null)).willReturn(new DescribeStreamResult() .withStreamDescription(new StreamDescription() .withShards(shard1, shard2) .withHasMoreShards(true))); given(kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult() .withStreamDescription(new StreamDescription() .withShards(shard3) .withHasMoreShards(false))); List<Shard> shards = underTest.listShards(STREAM); assertThat(shards).containsOnly(shard1, shard2, shard3); }
@Override public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) { int nextShardId = 0; if (exclusiveStartShardId != null) { nextShardId = parseInt(exclusiveStartShardId) + 1; } boolean hasMoreShards = nextShardId + 1 < shardedData.size(); List<Shard> shards = new ArrayList<>(); if (nextShardId < shardedData.size()) { shards.add(new Shard().withShardId(Integer.toString(nextShardId))); } HttpResponse response = new HttpResponse(null, null); response.setStatusCode(200); DescribeStreamResult result = new DescribeStreamResult(); result.setSdkHttpMetadata(SdkHttpMetadata.from(response)); result.withStreamDescription( new StreamDescription() .withHasMoreShards(hasMoreShards) .withShards(shards) .withStreamName(streamName)); return result; }
/** * Utility function to convert {@link StreamShardMetadata} into {@link StreamShardHandle}. * * @param streamShardMetadata the {@link StreamShardMetadata} to be converted * @return a {@link StreamShardHandle} object */ public static StreamShardHandle convertToStreamShardHandle(StreamShardMetadata streamShardMetadata) { Shard shard = new Shard(); shard.withShardId(streamShardMetadata.getShardId()); shard.withParentShardId(streamShardMetadata.getParentShardId()); shard.withAdjacentParentShardId(streamShardMetadata.getAdjacentParentShardId()); HashKeyRange hashKeyRange = new HashKeyRange(); hashKeyRange.withStartingHashKey(streamShardMetadata.getStartingHashKey()); hashKeyRange.withEndingHashKey(streamShardMetadata.getEndingHashKey()); shard.withHashKeyRange(hashKeyRange); SequenceNumberRange sequenceNumberRange = new SequenceNumberRange(); sequenceNumberRange.withStartingSequenceNumber(streamShardMetadata.getStartingSequenceNumber()); sequenceNumberRange.withEndingSequenceNumber(streamShardMetadata.getEndingSequenceNumber()); shard.withSequenceNumberRange(sequenceNumberRange); return new StreamShardHandle(streamShardMetadata.getStreamName(), shard); }
private List<StreamShardHandle> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException { List<StreamShardHandle> shardsOfStream = new ArrayList<>(); DescribeStreamResult describeStreamResult; do { describeStreamResult = describeStream(streamName, lastSeenShardId); List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); for (Shard shard : shards) { shardsOfStream.add(new StreamShardHandle(streamName, shard)); } if (shards.size() != 0) { lastSeenShardId = shards.get(shards.size() - 1).getShardId(); } } while (describeStreamResult.getStreamDescription().isHasMoreShards()); return shardsOfStream; }
public NonReshardedStreamsKinesis(Map<String, Integer> streamsToShardCount) { for (Map.Entry<String, Integer> streamToShardCount : streamsToShardCount.entrySet()) { String streamName = streamToShardCount.getKey(); int shardCount = streamToShardCount.getValue(); if (shardCount == 0) { // don't do anything } else { List<StreamShardHandle> shardsOfStream = new ArrayList<>(shardCount); for (int i = 0; i < shardCount; i++) { shardsOfStream.add( new StreamShardHandle( streamName, new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i)))); } streamsWithListOfShards.put(streamName, shardsOfStream); } } }
private List<KinesisStreamShard> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException { List<KinesisStreamShard> shardsOfStream = new ArrayList<>(); DescribeStreamResult describeStreamResult; do { describeStreamResult = describeStream(streamName, lastSeenShardId); List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); for (Shard shard : shards) { shardsOfStream.add(new KinesisStreamShard(streamName, shard)); } if (shards.size() != 0) { lastSeenShardId = shards.get(shards.size() - 1).getShardId(); } } while (describeStreamResult.getStreamDescription().isHasMoreShards()); return shardsOfStream; }
public NonReshardedStreamsKinesis(Map<String,Integer> streamsToShardCount) { for (Map.Entry<String,Integer> streamToShardCount : streamsToShardCount.entrySet()) { String streamName = streamToShardCount.getKey(); int shardCount = streamToShardCount.getValue(); if (shardCount == 0) { // don't do anything } else { List<KinesisStreamShard> shardsOfStream = new ArrayList<>(shardCount); for (int i=0; i < shardCount; i++) { shardsOfStream.add( new KinesisStreamShard( streamName, new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i)))); } streamsWithListOfShards.put(streamName, shardsOfStream); } } }
@Before public void setup() throws Exception { KinesisEndpoint endpoint = new KinesisEndpoint(null, "streamName", component); endpoint.setAmazonKinesisClient(kinesisClient); endpoint.setIteratorType(ShardIteratorType.LATEST); undertest = new KinesisConsumer(endpoint, processor); when(kinesisClient.getRecords(any(GetRecordsRequest.class))) .thenReturn(new GetRecordsResult() .withNextShardIterator("nextShardIterator") ); when(kinesisClient.describeStream(any(DescribeStreamRequest.class))) .thenReturn(new DescribeStreamResult() .withStreamDescription(new StreamDescription() .withShards(new Shard().withShardId("shardId")) ) ); when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))) .thenReturn(new GetShardIteratorResult() .withShardIterator("shardIterator") ); }
@Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) { KinesisTableLayoutHandle kinesislayout = handleResolver.convertLayout(layout); KinesisTableHandle kinesisTableHandle = kinesislayout.getTable(); InternalStreamDescription desc = this.getStreamDescription(kinesisTableHandle.getStreamName()); ImmutableList.Builder<ConnectorSplit> builder = ImmutableList.builder(); for (Shard shard : desc.getShards()) { KinesisSplit split = new KinesisSplit(connectorId, kinesisTableHandle.getStreamName(), kinesisTableHandle.getMessageDataFormat(), shard.getShardId(), shard.getSequenceNumberRange().getStartingSequenceNumber(), shard.getSequenceNumberRange().getEndingSequenceNumber()); builder.add(split); } return new FixedSplitSource(builder.build()); }
private List<Shard> describeStream(String stream) { AmazonKinesisAsync amazonKinesis = localKinesisResource.getResource(); String exclusiveStartShardId = null; DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest() .withStreamName(stream); List<Shard> shardList = new ArrayList<>(); while (true) { DescribeStreamResult describeStreamResult = null; describeStreamRequest.withExclusiveStartShardId(exclusiveStartShardId); describeStreamResult = amazonKinesis.describeStream(describeStreamRequest); StreamDescription streamDescription = describeStreamResult.getStreamDescription(); if (StreamStatus.ACTIVE.toString().equals(streamDescription.getStreamStatus())) { shardList.addAll(streamDescription.getShards()); if (streamDescription.getHasMoreShards()) { exclusiveStartShardId = shardList.get(shardList.size() - 1).getShardId(); continue; } else { return shardList; } } try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); } } }
@Test public void testProvisionConsumerExistingStreamUpdateShards() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); ArgumentCaptor<UpdateShardCountRequest> updateShardCaptor = ArgumentCaptor.forClass(UpdateShardCountRequest.class); String name = "test-stream"; String group = "test-group"; int targetShardCount = 2; KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); binderProperties.setMinShardCount(targetShardCount); binderProperties.setAutoAddShards(true); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(new KinesisConsumerProperties()); DescribeStreamResult describeOriginalStream = describeStreamResultWithShards(Collections.singletonList(new Shard())); DescribeStreamResult describeUpdatedStream = describeStreamResultWithShards(Arrays.asList(new Shard(), new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenReturn(describeOriginalStream) .thenReturn(describeUpdatedStream); provisioner.provisionConsumerDestination(name, group, extendedConsumerProperties); verify(amazonKinesisMock, times(1)) .updateShardCount(updateShardCaptor.capture()); assertThat(updateShardCaptor.getValue().getStreamName()).isEqualTo(name); assertThat(updateShardCaptor.getValue().getScalingType()).isEqualTo(ScalingType.UNIFORM_SCALING.name()); assertThat(updateShardCaptor.getValue().getTargetShardCount()).isEqualTo(targetShardCount); }
@Test public void testProvisionProducerSuccessfulWithNewStream() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); ExtendedProducerProperties<KinesisProducerProperties> extendedProducerProperties = new ExtendedProducerProperties<>(new KinesisProducerProperties()); String name = "test-stream"; Integer shards = 1; DescribeStreamResult describeStreamResult = describeStreamResultWithShards(Collections.singletonList(new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenThrow(new ResourceNotFoundException("I got nothing")) .thenReturn(describeStreamResult); when(amazonKinesisMock.createStream(name, shards)) .thenReturn(new CreateStreamResult()); ProducerDestination destination = provisioner.provisionProducerDestination(name, extendedProducerProperties); verify(amazonKinesisMock, times(2)) .describeStream(any(DescribeStreamRequest.class)); verify(amazonKinesisMock) .createStream(name, shards); assertThat(destination.getName()).isEqualTo(name); }
@Test public void testProvisionProducerUpdateShards() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); ArgumentCaptor<UpdateShardCountRequest> updateShardCaptor = ArgumentCaptor.forClass(UpdateShardCountRequest.class); String name = "test-stream"; String group = "test-group"; int targetShardCount = 2; KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); binderProperties.setMinShardCount(targetShardCount); binderProperties.setAutoAddShards(true); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(new KinesisConsumerProperties()); DescribeStreamResult describeOriginalStream = describeStreamResultWithShards(Collections.singletonList(new Shard())); DescribeStreamResult describeUpdatedStream = describeStreamResultWithShards(Arrays.asList(new Shard(), new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenReturn(describeOriginalStream) .thenReturn(describeUpdatedStream); provisioner.provisionConsumerDestination(name, group, extendedConsumerProperties); verify(amazonKinesisMock, times(1)) .updateShardCount(updateShardCaptor.capture()); assertThat(updateShardCaptor.getValue().getStreamName()).isEqualTo(name); assertThat(updateShardCaptor.getValue().getScalingType()).isEqualTo(ScalingType.UNIFORM_SCALING.name()); assertThat(updateShardCaptor.getValue().getTargetShardCount()).isEqualTo(targetShardCount); }
@Test public void testProvisionConsumerSuccessfulWithNewStream() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); int instanceCount = 1; int concurrency = 1; ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(new KinesisConsumerProperties()); extendedConsumerProperties.setInstanceCount(instanceCount); extendedConsumerProperties.setConcurrency(concurrency); String name = "test-stream"; String group = "test-group"; DescribeStreamResult describeStreamResult = describeStreamResultWithShards(Collections.singletonList(new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenThrow(new ResourceNotFoundException("I got nothing")) .thenReturn(describeStreamResult); when(amazonKinesisMock.createStream(name, instanceCount * concurrency)) .thenReturn(new CreateStreamResult()); ConsumerDestination destination = provisioner.provisionConsumerDestination(name, group, extendedConsumerProperties); verify(amazonKinesisMock, times(2)) .describeStream(any(DescribeStreamRequest.class)); verify(amazonKinesisMock) .createStream(name, instanceCount * concurrency); assertThat(destination.getName()).isEqualTo(name); }
private static DescribeStreamResult describeStreamResultWithShards(List<Shard> shards) { return new DescribeStreamResult() .withStreamDescription( new StreamDescription() .withShards(shards) .withStreamStatus(StreamStatus.ACTIVE) .withHasMoreShards(Boolean.FALSE)); }
@Test public void start() { final DescribeStreamRequest expectedDescribeStreamRequest = new DescribeStreamRequest() .withStreamName(TestData.EXPECTED_STREAM_NAME); final int SHARD_COUNT = 50; List<Shard> shards = new ArrayList<>(SHARD_COUNT); for (int i = 0; i < SHARD_COUNT; i++) { String shardId = String.format("%03d", i); final Shard shard = new Shard() .withShardId(shardId); shards.add(shard); } final StreamDescription streamDescription = new StreamDescription() .withStreamName(TestData.EXPECTED_STREAM_NAME) .withShards(shards); final DescribeStreamResult expectedStreamRequest = new DescribeStreamResult() .withStreamDescription(streamDescription); when(this.kinesisClient.describeStream(any(DescribeStreamRequest.class))).thenReturn(expectedStreamRequest); this.connector.start(TestData.settings()); List<Map<String, String>> taskConfigs = this.connector.taskConfigs(SHARD_COUNT); assertEquals(SHARD_COUNT, taskConfigs.size()); verify(this.kinesisClient, atLeastOnce()).describeStream(expectedDescribeStreamRequest); }
@Override public KinesisReaderCheckpoint generate(SimplifiedKinesisClient kinesis) throws TransientKinesisException { return new KinesisReaderCheckpoint( transform(kinesis.listShards(streamName), new Function<Shard, ShardCheckpoint>() { @Override public ShardCheckpoint apply(Shard shard) { return new ShardCheckpoint(streamName, shard.getShardId(), startingPoint); } }) ); }
/** * Create a new StreamShardHandle. * * @param streamName * the name of the Kinesis stream that this shard belongs to * @param shard * the actual AWS Shard instance that will be wrapped within this StreamShardHandle */ public StreamShardHandle(String streamName, Shard shard) { this.streamName = checkNotNull(streamName); this.shard = checkNotNull(shard); // since our description of Kinesis Streams shards can be fully defined with the stream name and shard id, // our hash doesn't need to use hash code of Amazon's description of Shards, which uses other info for calculation int hash = 17; hash = 37 * hash + streamName.hashCode(); hash = 37 * hash + shard.getShardId().hashCode(); this.cachedHash = hash; }
/** * Create a new KinesisStreamShard. * * @param streamName * the name of the Kinesis stream that this shard belongs to * @param shard * the actual AWS Shard instance that will be wrapped within this KinesisStreamShard */ public KinesisStreamShard(String streamName, Shard shard) { this.streamName = checkNotNull(streamName); this.shard = checkNotNull(shard); // since our description of Kinesis Streams shards can be fully defined with the stream name and shard id, // our hash doesn't need to use hash code of Amazon's description of Shards, which uses other info for calculation int hash = 17; hash = 37 * hash + streamName.hashCode(); hash = 37 * hash + shard.getShardId().hashCode(); this.cachedHash = hash; }
private static StreamShardHandle getMockStreamShard(String streamName, int shardId) { return new StreamShardHandle( streamName, new Shard() .withShardId(KinesisShardIdGenerator.generateFromShardOrder(shardId)) .withHashKeyRange( new HashKeyRange() .withStartingHashKey("0") .withEndingHashKey(new BigInteger(StringUtils.repeat("FF", 16), 16).toString()))); }
@Test public void testStreamShardMetadataAndHandleConversion() { String streamName = "fakeStream1"; String shardId = "shard-000001"; String parentShardId = "shard-000002"; String adjacentParentShardId = "shard-000003"; String startingHashKey = "key-000001"; String endingHashKey = "key-000010"; String startingSequenceNumber = "seq-0000021"; String endingSequenceNumber = "seq-00000031"; StreamShardMetadata kinesisStreamShard = new StreamShardMetadata(); kinesisStreamShard.setStreamName(streamName); kinesisStreamShard.setShardId(shardId); kinesisStreamShard.setParentShardId(parentShardId); kinesisStreamShard.setAdjacentParentShardId(adjacentParentShardId); kinesisStreamShard.setStartingHashKey(startingHashKey); kinesisStreamShard.setEndingHashKey(endingHashKey); kinesisStreamShard.setStartingSequenceNumber(startingSequenceNumber); kinesisStreamShard.setEndingSequenceNumber(endingSequenceNumber); Shard shard = new Shard() .withShardId(shardId) .withParentShardId(parentShardId) .withAdjacentParentShardId(adjacentParentShardId) .withHashKeyRange(new HashKeyRange() .withStartingHashKey(startingHashKey) .withEndingHashKey(endingHashKey)) .withSequenceNumberRange(new SequenceNumberRange() .withStartingSequenceNumber(startingSequenceNumber) .withEndingSequenceNumber(endingSequenceNumber)); StreamShardHandle streamShardHandle = new StreamShardHandle(streamName, shard); assertEquals(kinesisStreamShard, KinesisDataFetcher.convertToStreamShardMetadata(streamShardHandle)); assertEquals(streamShardHandle, KinesisDataFetcher.convertToStreamShardHandle(kinesisStreamShard)); }
@Test public void testLegacyKinesisStreamShardToStreamShardMetadataConversion() { String streamName = "fakeStream1"; String shardId = "shard-000001"; String parentShardId = "shard-000002"; String adjacentParentShardId = "shard-000003"; String startingHashKey = "key-000001"; String endingHashKey = "key-000010"; String startingSequenceNumber = "seq-0000021"; String endingSequenceNumber = "seq-00000031"; StreamShardMetadata streamShardMetadata = new StreamShardMetadata(); streamShardMetadata.setStreamName(streamName); streamShardMetadata.setShardId(shardId); streamShardMetadata.setParentShardId(parentShardId); streamShardMetadata.setAdjacentParentShardId(adjacentParentShardId); streamShardMetadata.setStartingHashKey(startingHashKey); streamShardMetadata.setEndingHashKey(endingHashKey); streamShardMetadata.setStartingSequenceNumber(startingSequenceNumber); streamShardMetadata.setEndingSequenceNumber(endingSequenceNumber); Shard shard = new Shard() .withShardId(shardId) .withParentShardId(parentShardId) .withAdjacentParentShardId(adjacentParentShardId) .withHashKeyRange(new HashKeyRange() .withStartingHashKey(startingHashKey) .withEndingHashKey(endingHashKey)) .withSequenceNumberRange(new SequenceNumberRange() .withStartingSequenceNumber(startingSequenceNumber) .withEndingSequenceNumber(endingSequenceNumber)); KinesisStreamShard kinesisStreamShard = new KinesisStreamShard(streamName, shard); assertEquals(streamShardMetadata, KinesisStreamShard.convertToStreamShardMetadata(kinesisStreamShard)); }
private HashMap<StreamShardHandle, SequenceNumber> getFakeRestoredStore(String streamName) { HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = new HashMap<>(); if (streamName.equals("fakeStream1") || streamName.equals("all")) { fakeRestoredState.put( new StreamShardHandle("fakeStream1", new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))), new SequenceNumber(UUID.randomUUID().toString())); fakeRestoredState.put( new StreamShardHandle("fakeStream1", new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))), new SequenceNumber(UUID.randomUUID().toString())); fakeRestoredState.put( new StreamShardHandle("fakeStream1", new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))), new SequenceNumber(UUID.randomUUID().toString())); } if (streamName.equals("fakeStream2") || streamName.equals("all")) { fakeRestoredState.put( new StreamShardHandle("fakeStream2", new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))), new SequenceNumber(UUID.randomUUID().toString())); fakeRestoredState.put( new StreamShardHandle("fakeStream2", new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))), new SequenceNumber(UUID.randomUUID().toString())); } return fakeRestoredState; }
/** * Create a new KinesisStreamShard * * @param streamName * the name of the Kinesis stream that this shard belongs to * @param shard * the actual AWS Shard instance that will be wrapped within this KinesisStreamShard */ public KinesisStreamShard(String streamName, Shard shard) { this.streamName = checkNotNull(streamName); this.shard = checkNotNull(shard); // since our description of Kinesis Streams shards can be fully defined with the stream name and shard id, // our hash doesn't need to use hash code of Amazon's description of Shards, which uses other info for calculation int hash = 17; hash = 37 * hash + streamName.hashCode(); hash = 37 * hash + shard.getShardId().hashCode(); this.cachedHash = hash; }
@Test public void testCorrectNumOfCollectedRecordsAndUpdatedState() { KinesisStreamShard fakeToBeConsumedShard = new KinesisStreamShard( "fakeStream", new Shard() .withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)) .withHashKeyRange( new HashKeyRange() .withStartingHashKey("0") .withEndingHashKey(new BigInteger(StringUtils.repeat("FF", 16), 16).toString()))); LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>(); subscribedShardsStateUnderTest.add( new KinesisStreamShardState(fakeToBeConsumedShard, new SequenceNumber("fakeStartingState"))); TestableKinesisDataFetcher fetcher = new TestableKinesisDataFetcher( Collections.singletonList("fakeStream"), new Properties(), 10, 2, new AtomicReference<Throwable>(), subscribedShardsStateUnderTest, KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")), Mockito.mock(KinesisProxyInterface.class)); new ShardConsumer<>( fetcher, 0, subscribedShardsStateUnderTest.get(0).getKinesisStreamShard(), subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(), FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9)).run(); assertTrue(fetcher.getNumOfElementsCollected() == 1000); assertTrue(subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum().equals( SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())); }
@Test public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator() { KinesisStreamShard fakeToBeConsumedShard = new KinesisStreamShard( "fakeStream", new Shard() .withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)) .withHashKeyRange( new HashKeyRange() .withStartingHashKey("0") .withEndingHashKey(new BigInteger(StringUtils.repeat("FF", 16), 16).toString()))); LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>(); subscribedShardsStateUnderTest.add( new KinesisStreamShardState(fakeToBeConsumedShard, new SequenceNumber("fakeStartingState"))); TestableKinesisDataFetcher fetcher = new TestableKinesisDataFetcher( Collections.singletonList("fakeStream"), new Properties(), 10, 2, new AtomicReference<Throwable>(), subscribedShardsStateUnderTest, KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")), Mockito.mock(KinesisProxyInterface.class)); new ShardConsumer<>( fetcher, 0, subscribedShardsStateUnderTest.get(0).getKinesisStreamShard(), subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(), // Get a total of 1000 records with 9 getRecords() calls, // and the 7th getRecords() call will encounter an unexpected expired shard iterator FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(1000, 9, 7)).run(); assertTrue(fetcher.getNumOfElementsCollected() == 1000); assertTrue(subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum().equals( SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())); }
/** * Get the available shards from the kinesis * @param streamName Name of the stream from where the shards to be accessed * @return the list of shards from the given stream */ public List<Shard> getShardList(String streamName) { assert client != null : "Illegal client"; DescribeStreamRequest describeRequest = new DescribeStreamRequest(); describeRequest.setStreamName(streamName); DescribeStreamResult describeResponse = client.describeStream(describeRequest); return describeResponse.getStreamDescription().getShards(); }
/** * This method is called in setup method of the operator */ public void create() { holdingBuffer = new ArrayBlockingQueue<Pair<String, Record>>(bufferSize); boolean defaultSelect = (shardIds == null) || (shardIds.size() == 0); final List<Shard> pms = KinesisUtil.getInstance().getShardList(streamName); for (final Shard shId: pms) { if ((shardIds.contains(shId.getShardId()) || defaultSelect) && !closedShards.contains(shId)) { simpleConsumerThreads.add(shId); } } }
private List<Shard> getOpenShards(Collection<Partition<AbstractKinesisInputOperator>> partitions) { List<Shard> closedShards = new ArrayList<Shard>(); for (Partition<AbstractKinesisInputOperator> op : partitions) { closedShards.addAll(op.getPartitionedInstance().getConsumer().getClosedShards()); } List<Shard> shards = KinesisUtil.getInstance().getShardList(getStreamName()); List<Shard> openShards = new ArrayList<Shard>(); for (Shard shard :shards) { if (!closedShards.contains(shard)) { openShards.add(shard); } } return openShards; }
public void split(final String streamName, final String awsAccessKey, final String awsSecretKey, long secsToWait) throws InterruptedException { AWSCredentialsProvider creds = createAwsCredentialsProvider(awsAccessKey, awsSecretKey); AmazonKinesisClient client = new AmazonKinesisClient(creds); // Describes the stream to get the information about each shard. DescribeStreamResult result = client.describeStream(streamName); List<Shard> shards = result.getStreamDescription().getShards(); log.log(Level.INFO, "Splitting the Stream: [{0}], there are [{1}] shards to split.", new Object[]{streamName, shards.size()}); for (final Shard shard : shards) { // Gets the new shard start key. BigInteger startKey = new BigInteger(shard.getHashKeyRange().getStartingHashKey()); BigInteger endKey = new BigInteger(shard.getHashKeyRange().getEndingHashKey()); String newStartKey = startKey.add(endKey).divide(DENOMINATOR).toString(); log.log(Level.INFO, "Processing the Shard:[{0}], StartKey:[{1}] EndKey:[{2}] - NewStartKey:[{3}]", new String[]{shard.getShardId(), shard.getHashKeyRange().getStartingHashKey(), shard.getHashKeyRange().getEndingHashKey(), newStartKey}); // Split the shard. client.splitShard(new SplitShardRequest() .withStreamName(streamName) .withShardToSplit(shard.getShardId()) .withNewStartingHashKey(newStartKey)); // Give some time to kinesis to process. TimeUnit.SECONDS.sleep(secsToWait); } log.info("Done!"); }
public static long getShardCount( ClientConfiguration awsClientConfig, KinesisConfigBean conf, String streamName ) throws StageException { AmazonKinesis kinesisClient = getKinesisClient(awsClientConfig, conf); try { long numShards = 0; String lastShardId = null; StreamDescription description; do { if (lastShardId == null) { description = kinesisClient.describeStream(streamName).getStreamDescription(); } else { description = kinesisClient.describeStream(streamName, lastShardId).getStreamDescription(); } for (Shard shard : description.getShards()) { if (shard.getSequenceNumberRange().getEndingSequenceNumber() == null) { // Then this shard is open, so we should count it. Shards with an ending sequence number // are closed and cannot be written to, so we skip counting them. ++numShards; } } int pageSize = description.getShards().size(); lastShardId = description.getShards().get(pageSize - 1).getShardId(); } while (description.getHasMoreShards()); LOG.debug("Connected successfully to stream: '{}' with '{}' shards.", streamName, numShards); return numShards; } finally { kinesisClient.shutdown(); } }
/** * Internal method to retrieve the stream description and get the shards from AWS. * * Gets from the internal cache unless not yet created or too old. * * @param streamName * @return */ protected InternalStreamDescription getStreamDescription(String streamName) { InternalStreamDescription desc = this.streamMap.get(streamName); if (desc == null || System.currentTimeMillis() - desc.getCreateTimeStamp() >= MAX_CACHE_AGE_MILLIS) { desc = new InternalStreamDescription(streamName); DescribeStreamRequest describeStreamRequest = clientManager.getDescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); // Collect shards from Kinesis String exclusiveStartShardId = null; List<Shard> shards = new ArrayList<>(); do { describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId); DescribeStreamResult describeStreamResult = clientManager.getClient().describeStream(describeStreamRequest); String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); if (!streamStatus.equals("ACTIVE") && !streamStatus.equals("UPDATING")) { throw new ResourceNotFoundException("Stream not Active"); } desc.addAllShards(describeStreamResult.getStreamDescription().getShards()); if (describeStreamResult.getStreamDescription().getHasMoreShards() && (shards.size() > 0)) { exclusiveStartShardId = shards.get(shards.size() - 1).getShardId(); } else { exclusiveStartShardId = null; } } while (exclusiveStartShardId != null); this.streamMap.put(streamName, desc); } return desc; }
protected ArrayList<Shard> getShards(InternalStream theStream) { ArrayList<Shard> externalList = new ArrayList<Shard>(); for (InternalShard intshard : theStream.getShards()) { externalList.add(intshard); } return externalList; }
protected ArrayList<Shard> getShards(InternalStream theStream, String fromShardId) { ArrayList<Shard> externalList = new ArrayList<Shard>(); for (InternalShard intshard : theStream.getShardsFrom(fromShardId)) { externalList.add(intshard); } return externalList; }
@Override protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<KinesisConsumerProperties> properties) { KinesisConsumerProperties kinesisConsumerProperties = properties.getExtension(); Set<KinesisShardOffset> shardOffsets = null; String shardIteratorType = kinesisConsumerProperties.getShardIteratorType(); KinesisShardOffset kinesisShardOffset = KinesisShardOffset.latest(); if (StringUtils.hasText(shardIteratorType)) { String[] typeValue = shardIteratorType.split(":", 2); ShardIteratorType iteratorType = ShardIteratorType.valueOf(typeValue[0]); kinesisShardOffset = new KinesisShardOffset(iteratorType); if (typeValue.length > 1) { if (ShardIteratorType.AT_TIMESTAMP.equals(iteratorType)) { kinesisShardOffset.setTimestamp(new Date(Long.parseLong(typeValue[1]))); } else { kinesisShardOffset.setSequenceNumber(typeValue[1]); } } } if (properties.getInstanceCount() > 1) { shardOffsets = new HashSet<>(); KinesisConsumerDestination kinesisConsumerDestination = (KinesisConsumerDestination) destination; List<Shard> shards = kinesisConsumerDestination.getShards(); for (int i = 0; i < shards.size(); i++) { // divide shards across instances if ((i % properties.getInstanceCount()) == properties.getInstanceIndex()) { KinesisShardOffset shardOffset = new KinesisShardOffset(kinesisShardOffset); shardOffset.setStream(destination.getName()); shardOffset.setShard(shards.get(i).getShardId()); shardOffsets.add(shardOffset); } } } KinesisMessageDrivenChannelAdapter adapter; if (shardOffsets == null) { adapter = new KinesisMessageDrivenChannelAdapter(this.amazonKinesis, destination.getName()); } else { adapter = new KinesisMessageDrivenChannelAdapter(this.amazonKinesis, shardOffsets.toArray(new KinesisShardOffset[shardOffsets.size()])); } boolean anonymous = !StringUtils.hasText(group); String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group; adapter.setConsumerGroup(consumerGroup); adapter.setStreamInitialSequence( anonymous || StringUtils.hasText(shardIteratorType) ? kinesisShardOffset : KinesisShardOffset.trimHorizon()); adapter.setListenerMode(kinesisConsumerProperties.getListenerMode()); adapter.setCheckpointMode(kinesisConsumerProperties.getCheckpointMode()); adapter.setRecordsLimit(kinesisConsumerProperties.getRecordsLimit()); adapter.setIdleBetweenPolls(kinesisConsumerProperties.getIdleBetweenPolls()); adapter.setConsumerBackoff(kinesisConsumerProperties.getConsumerBackoff()); if (this.checkpointStore != null) { adapter.setCheckpointStore(this.checkpointStore); } adapter.setConcurrency(properties.getConcurrency()); adapter.setStartTimeout(kinesisConsumerProperties.getStartTimeout()); adapter.setDescribeStreamBackoff(this.configurationProperties.getDescribeStreamBackoff()); adapter.setDescribeStreamRetries(this.configurationProperties.getDescribeStreamRetries()); // Deffer byte[] conversion to the ReceivingHandler adapter.setConverter(null); return adapter; }