private void refreshShards() { DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); String exclusiveStartShardId = null; List<Shard> shards = new ArrayList<>(); do { describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId); DescribeStreamResult describeStreamResult = kinesisClient.describeStream(describeStreamRequest); shards.addAll(describeStreamResult.getStreamDescription().getShards()); if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) { exclusiveStartShardId = shards.get(shards.size() - 1).getShardId(); } else { exclusiveStartShardId = null; } } while (exclusiveStartShardId != null); this.shards = shards; }
@Test public void testProvisionProducerSuccessfulWithExistingStream() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); ExtendedProducerProperties<KinesisProducerProperties> extendedProducerProperties = new ExtendedProducerProperties<>(new KinesisProducerProperties()); String name = "test-stream"; DescribeStreamResult describeStreamResult = describeStreamResultWithShards( Collections.singletonList(new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenReturn(describeStreamResult); ProducerDestination destination = provisioner.provisionProducerDestination(name, extendedProducerProperties); verify(amazonKinesisMock) .describeStream(any(DescribeStreamRequest.class)); assertThat(destination.getName()).isEqualTo(name); }
@Test public void testProvisionConsumerSuccessfulWithExistingStream() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(new KinesisConsumerProperties()); String name = "test-stream"; String group = "test-group"; DescribeStreamResult describeStreamResult = describeStreamResultWithShards(Collections.singletonList(new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenReturn(describeStreamResult); ConsumerDestination destination = provisioner.provisionConsumerDestination(name, group, extendedConsumerProperties); verify(amazonKinesisMock) .describeStream(any(DescribeStreamRequest.class)); assertThat(destination.getName()).isEqualTo(name); }
@Test public void shouldListAllShards() throws Exception { Shard shard1 = new Shard().withShardId(SHARD_1); Shard shard2 = new Shard().withShardId(SHARD_2); Shard shard3 = new Shard().withShardId(SHARD_3); given(kinesis.describeStream(STREAM, null)).willReturn(new DescribeStreamResult() .withStreamDescription(new StreamDescription() .withShards(shard1, shard2) .withHasMoreShards(true))); given(kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult() .withStreamDescription(new StreamDescription() .withShards(shard3) .withHasMoreShards(false))); List<Shard> shards = underTest.listShards(STREAM); assertThat(shards).containsOnly(shard1, shard2, shard3); }
@Override public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) { int nextShardId = 0; if (exclusiveStartShardId != null) { nextShardId = parseInt(exclusiveStartShardId) + 1; } boolean hasMoreShards = nextShardId + 1 < shardedData.size(); List<Shard> shards = new ArrayList<>(); if (nextShardId < shardedData.size()) { shards.add(new Shard().withShardId(Integer.toString(nextShardId))); } HttpResponse response = new HttpResponse(null, null); response.setStatusCode(200); DescribeStreamResult result = new DescribeStreamResult(); result.setSdkHttpMetadata(SdkHttpMetadata.from(response)); result.withStreamDescription( new StreamDescription() .withHasMoreShards(hasMoreShards) .withShards(shards) .withStreamName(streamName)); return result; }
private List<StreamShardHandle> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException { List<StreamShardHandle> shardsOfStream = new ArrayList<>(); DescribeStreamResult describeStreamResult; do { describeStreamResult = describeStream(streamName, lastSeenShardId); List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); for (Shard shard : shards) { shardsOfStream.add(new StreamShardHandle(streamName, shard)); } if (shards.size() != 0) { lastSeenShardId = shards.get(shards.size() - 1).getShardId(); } } while (describeStreamResult.getStreamDescription().isHasMoreShards()); return shardsOfStream; }
private List<KinesisStreamShard> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException { List<KinesisStreamShard> shardsOfStream = new ArrayList<>(); DescribeStreamResult describeStreamResult; do { describeStreamResult = describeStream(streamName, lastSeenShardId); List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); for (Shard shard : shards) { shardsOfStream.add(new KinesisStreamShard(streamName, shard)); } if (shards.size() != 0) { lastSeenShardId = shards.get(shards.size() - 1).getShardId(); } } while (describeStreamResult.getStreamDescription().isHasMoreShards()); return shardsOfStream; }
@Before public void setup() throws Exception { KinesisEndpoint endpoint = new KinesisEndpoint(null, "streamName", component); endpoint.setAmazonKinesisClient(kinesisClient); endpoint.setIteratorType(ShardIteratorType.LATEST); undertest = new KinesisConsumer(endpoint, processor); when(kinesisClient.getRecords(any(GetRecordsRequest.class))) .thenReturn(new GetRecordsResult() .withNextShardIterator("nextShardIterator") ); when(kinesisClient.describeStream(any(DescribeStreamRequest.class))) .thenReturn(new DescribeStreamResult() .withStreamDescription(new StreamDescription() .withShards(new Shard().withShardId("shardId")) ) ); when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))) .thenReturn(new GetShardIteratorResult() .withShardIterator("shardIterator") ); }
@Override protected void validateStreamName(AmazonKinesisAsyncClient client, String streamName) { DescribeStreamResult describeResult = null; try { describeResult = getClient().describeStream(streamName); String streamStatus = describeResult.getStreamDescription().getStreamStatus(); if(!StreamStatus.ACTIVE.name().equals(streamStatus) && !StreamStatus.UPDATING.name().equals(streamStatus)) { setInitializationFailed(true); addError("Stream " + streamName + " is not ready (in active/updating status) for appender: " + name); } } catch(ResourceNotFoundException rnfe) { setInitializationFailed(true); addError("Stream " + streamName + " doesn't exist for appender: " + name, rnfe); } }
private static void waitForStreamToBecomeAvailable(String myStreamName) throws InterruptedException { LOG.info("Waiting for " + myStreamName + " to become ACTIVE..."); long startTime = System.currentTimeMillis(); long endTime = startTime + TimeUnit.MINUTES.toMillis(5); while (System.currentTimeMillis() < endTime) { Thread.sleep( TimeUnit.SECONDS.toMillis(5)); DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(myStreamName); describeStreamRequest.setLimit(10); DescribeStreamResult describeStreamResponse = kinesisClient .describeStream(describeStreamRequest); String streamStatus = describeStreamResponse .getStreamDescription().getStreamStatus(); if ("ACTIVE".equals(streamStatus)) { return; } } }
@Test public void testAutoCreateStreamForNonExistingStream() throws Exception { KinesisTestBinder binder = getBinder(); DirectChannel output = createBindableChannel("output", new BindingProperties()); ExtendedConsumerProperties<KinesisConsumerProperties> consumerProperties = createConsumerProperties(); Date testDate = new Date(); consumerProperties.getExtension() .setShardIteratorType(ShardIteratorType.AT_TIMESTAMP.name() + ":" + testDate.getTime()); String testStreamName = "nonexisting" + System.currentTimeMillis(); Binding<?> binding = binder.bindConsumer(testStreamName, "test", output, consumerProperties); binding.unbind(); DescribeStreamResult streamResult = localKinesisResource.getResource().describeStream(testStreamName); String createdStreamName = streamResult.getStreamDescription().getStreamName(); int createdShards = streamResult.getStreamDescription().getShards().size(); String createdStreamStatus = streamResult.getStreamDescription().getStreamStatus(); assertThat(createdStreamName).isEqualTo(testStreamName); assertThat(createdShards) .isEqualTo(consumerProperties.getInstanceCount() * consumerProperties.getConcurrency()); assertThat(createdStreamStatus).isEqualTo(StreamStatus.ACTIVE.toString()); KinesisShardOffset shardOffset = TestUtils.getPropertyValue(binding, "lifecycle.streamInitialSequence", KinesisShardOffset.class); assertThat(shardOffset.getIteratorType()).isEqualTo(ShardIteratorType.AT_TIMESTAMP); assertThat(shardOffset.getTimestamp()).isEqualTo(testDate); }
private List<Shard> describeStream(String stream) { AmazonKinesisAsync amazonKinesis = localKinesisResource.getResource(); String exclusiveStartShardId = null; DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest() .withStreamName(stream); List<Shard> shardList = new ArrayList<>(); while (true) { DescribeStreamResult describeStreamResult = null; describeStreamRequest.withExclusiveStartShardId(exclusiveStartShardId); describeStreamResult = amazonKinesis.describeStream(describeStreamRequest); StreamDescription streamDescription = describeStreamResult.getStreamDescription(); if (StreamStatus.ACTIVE.toString().equals(streamDescription.getStreamStatus())) { shardList.addAll(streamDescription.getShards()); if (streamDescription.getHasMoreShards()) { exclusiveStartShardId = shardList.get(shardList.size() - 1).getShardId(); continue; } else { return shardList; } } try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); } } }
@Test public void testProvisionConsumerExistingStreamUpdateShards() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); ArgumentCaptor<UpdateShardCountRequest> updateShardCaptor = ArgumentCaptor.forClass(UpdateShardCountRequest.class); String name = "test-stream"; String group = "test-group"; int targetShardCount = 2; KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); binderProperties.setMinShardCount(targetShardCount); binderProperties.setAutoAddShards(true); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(new KinesisConsumerProperties()); DescribeStreamResult describeOriginalStream = describeStreamResultWithShards(Collections.singletonList(new Shard())); DescribeStreamResult describeUpdatedStream = describeStreamResultWithShards(Arrays.asList(new Shard(), new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenReturn(describeOriginalStream) .thenReturn(describeUpdatedStream); provisioner.provisionConsumerDestination(name, group, extendedConsumerProperties); verify(amazonKinesisMock, times(1)) .updateShardCount(updateShardCaptor.capture()); assertThat(updateShardCaptor.getValue().getStreamName()).isEqualTo(name); assertThat(updateShardCaptor.getValue().getScalingType()).isEqualTo(ScalingType.UNIFORM_SCALING.name()); assertThat(updateShardCaptor.getValue().getTargetShardCount()).isEqualTo(targetShardCount); }
@Test public void testProvisionProducerSuccessfulWithNewStream() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); ExtendedProducerProperties<KinesisProducerProperties> extendedProducerProperties = new ExtendedProducerProperties<>(new KinesisProducerProperties()); String name = "test-stream"; Integer shards = 1; DescribeStreamResult describeStreamResult = describeStreamResultWithShards(Collections.singletonList(new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenThrow(new ResourceNotFoundException("I got nothing")) .thenReturn(describeStreamResult); when(amazonKinesisMock.createStream(name, shards)) .thenReturn(new CreateStreamResult()); ProducerDestination destination = provisioner.provisionProducerDestination(name, extendedProducerProperties); verify(amazonKinesisMock, times(2)) .describeStream(any(DescribeStreamRequest.class)); verify(amazonKinesisMock) .createStream(name, shards); assertThat(destination.getName()).isEqualTo(name); }
@Test public void testProvisionProducerUpdateShards() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); ArgumentCaptor<UpdateShardCountRequest> updateShardCaptor = ArgumentCaptor.forClass(UpdateShardCountRequest.class); String name = "test-stream"; String group = "test-group"; int targetShardCount = 2; KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); binderProperties.setMinShardCount(targetShardCount); binderProperties.setAutoAddShards(true); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(new KinesisConsumerProperties()); DescribeStreamResult describeOriginalStream = describeStreamResultWithShards(Collections.singletonList(new Shard())); DescribeStreamResult describeUpdatedStream = describeStreamResultWithShards(Arrays.asList(new Shard(), new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenReturn(describeOriginalStream) .thenReturn(describeUpdatedStream); provisioner.provisionConsumerDestination(name, group, extendedConsumerProperties); verify(amazonKinesisMock, times(1)) .updateShardCount(updateShardCaptor.capture()); assertThat(updateShardCaptor.getValue().getStreamName()).isEqualTo(name); assertThat(updateShardCaptor.getValue().getScalingType()).isEqualTo(ScalingType.UNIFORM_SCALING.name()); assertThat(updateShardCaptor.getValue().getTargetShardCount()).isEqualTo(targetShardCount); }
@Test public void testProvisionConsumerSuccessfulWithNewStream() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); int instanceCount = 1; int concurrency = 1; ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(new KinesisConsumerProperties()); extendedConsumerProperties.setInstanceCount(instanceCount); extendedConsumerProperties.setConcurrency(concurrency); String name = "test-stream"; String group = "test-group"; DescribeStreamResult describeStreamResult = describeStreamResultWithShards(Collections.singletonList(new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenThrow(new ResourceNotFoundException("I got nothing")) .thenReturn(describeStreamResult); when(amazonKinesisMock.createStream(name, instanceCount * concurrency)) .thenReturn(new CreateStreamResult()); ConsumerDestination destination = provisioner.provisionConsumerDestination(name, group, extendedConsumerProperties); verify(amazonKinesisMock, times(2)) .describeStream(any(DescribeStreamRequest.class)); verify(amazonKinesisMock) .createStream(name, instanceCount * concurrency); assertThat(destination.getName()).isEqualTo(name); }
private static DescribeStreamResult describeStreamResultWithShards(List<Shard> shards) { return new DescribeStreamResult() .withStreamDescription( new StreamDescription() .withShards(shards) .withStreamStatus(StreamStatus.ACTIVE) .withHasMoreShards(Boolean.FALSE)); }
@Override public void start(Map<String, String> settings) { log.info("start()"); this.settings = settings; this.config = new KinesisSourceConnectorConfig(settings); this.kinesisClient = this.kinesisClientFactory.create(this.config); DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest() .withStreamName(this.config.kinesisStreamName); DescribeStreamResult describeStreamResult = this.kinesisClient.describeStream(describeStreamRequest); this.streamDescription = describeStreamResult.getStreamDescription(); }
@Test public void start() { final DescribeStreamRequest expectedDescribeStreamRequest = new DescribeStreamRequest() .withStreamName(TestData.EXPECTED_STREAM_NAME); final int SHARD_COUNT = 50; List<Shard> shards = new ArrayList<>(SHARD_COUNT); for (int i = 0; i < SHARD_COUNT; i++) { String shardId = String.format("%03d", i); final Shard shard = new Shard() .withShardId(shardId); shards.add(shard); } final StreamDescription streamDescription = new StreamDescription() .withStreamName(TestData.EXPECTED_STREAM_NAME) .withShards(shards); final DescribeStreamResult expectedStreamRequest = new DescribeStreamResult() .withStreamDescription(streamDescription); when(this.kinesisClient.describeStream(any(DescribeStreamRequest.class))).thenReturn(expectedStreamRequest); this.connector.start(TestData.settings()); List<Map<String, String>> taskConfigs = this.connector.taskConfigs(SHARD_COUNT); assertEquals(SHARD_COUNT, taskConfigs.size()); verify(this.kinesisClient, atLeastOnce()).describeStream(expectedDescribeStreamRequest); }
private static boolean streamExists(AmazonKinesis client, String streamName) { try { DescribeStreamResult describeStreamResult = client.describeStream(streamName); return (describeStreamResult != null && describeStreamResult.getSdkHttpMetadata().getHttpStatusCode() == 200); } catch (Exception e) { LOG.warn("Error checking whether stream {} exists.", streamName, e); } return false; }
private String getShardItertor() { // either return a cached one or get a new one via a GetShardIterator request. if (currentShardIterator == null) { String shardId; //If ShardId supplied use it, else choose first one if (!getEndpoint().getShardId().isEmpty()) { shardId = getEndpoint().getShardId(); } else { DescribeStreamRequest req1 = new DescribeStreamRequest() .withStreamName(getEndpoint().getStreamName()); DescribeStreamResult res1 = getClient().describeStream(req1); shardId = res1.getStreamDescription().getShards().get(0).getShardId(); } LOG.debug("ShardId is: {}", shardId); GetShardIteratorRequest req = new GetShardIteratorRequest() .withStreamName(getEndpoint().getStreamName()) .withShardId(shardId) .withShardIteratorType(getEndpoint().getIteratorType()); if (hasSequenceNumber()) { req.withStartingSequenceNumber(getEndpoint().getSequenceNumber()); } GetShardIteratorResult result = getClient().getShardIterator(req); currentShardIterator = result.getShardIterator(); } LOG.debug("Shard Iterator is: {}", currentShardIterator); return currentShardIterator; }
public void split(final String streamName, final String awsAccessKey, final String awsSecretKey, long secsToWait) throws InterruptedException { AWSCredentialsProvider creds = createAwsCredentialsProvider(awsAccessKey, awsSecretKey); AmazonKinesisClient client = new AmazonKinesisClient(creds); // Describes the stream to get the information about each shard. DescribeStreamResult result = client.describeStream(streamName); List<Shard> shards = result.getStreamDescription().getShards(); log.log(Level.INFO, "Splitting the Stream: [{0}], there are [{1}] shards to split.", new Object[]{streamName, shards.size()}); for (final Shard shard : shards) { // Gets the new shard start key. BigInteger startKey = new BigInteger(shard.getHashKeyRange().getStartingHashKey()); BigInteger endKey = new BigInteger(shard.getHashKeyRange().getEndingHashKey()); String newStartKey = startKey.add(endKey).divide(DENOMINATOR).toString(); log.log(Level.INFO, "Processing the Shard:[{0}], StartKey:[{1}] EndKey:[{2}] - NewStartKey:[{3}]", new String[]{shard.getShardId(), shard.getHashKeyRange().getStartingHashKey(), shard.getHashKeyRange().getEndingHashKey(), newStartKey}); // Split the shard. client.splitShard(new SplitShardRequest() .withStreamName(streamName) .withShardToSplit(shard.getShardId()) .withNewStartingHashKey(newStartKey)); // Give some time to kinesis to process. TimeUnit.SECONDS.sleep(secsToWait); } log.info("Done!"); }
public static void checkStream(DescribeStreamResult result) { String statusText = result.getStreamDescription().getStreamStatus(); String streamName = result.getStreamDescription().getStreamName(); if (!statusText.equals("ACTIVE")) { System.err.println("Inactive Stream: " + streamName); System.exit(1); } else { System.out.println("Stream " + streamName + " is ACTIVE!"); } }
private static boolean streamActive(AmazonKinesis client, String streamName) { try { DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); DescribeStreamResult describeStreamResult = client.describeStream(describeStreamRequest); String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); if ("ACTIVE".equals(streamStatus)) { return true; } } catch (Exception e) { return false; } return false; }
/** * Internal method to retrieve the stream description and get the shards from AWS. * * Gets from the internal cache unless not yet created or too old. * * @param streamName * @return */ protected InternalStreamDescription getStreamDescription(String streamName) { InternalStreamDescription desc = this.streamMap.get(streamName); if (desc == null || System.currentTimeMillis() - desc.getCreateTimeStamp() >= MAX_CACHE_AGE_MILLIS) { desc = new InternalStreamDescription(streamName); DescribeStreamRequest describeStreamRequest = clientManager.getDescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); // Collect shards from Kinesis String exclusiveStartShardId = null; List<Shard> shards = new ArrayList<>(); do { describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId); DescribeStreamResult describeStreamResult = clientManager.getClient().describeStream(describeStreamRequest); String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); if (!streamStatus.equals("ACTIVE") && !streamStatus.equals("UPDATING")) { throw new ResourceNotFoundException("Stream not Active"); } desc.addAllShards(describeStreamResult.getStreamDescription().getShards()); if (describeStreamResult.getStreamDescription().getHasMoreShards() && (shards.size() > 0)) { exclusiveStartShardId = shards.get(shards.size() - 1).getShardId(); } else { exclusiveStartShardId = null; } } while (exclusiveStartShardId != null); this.streamMap.put(streamName, desc); } return desc; }
@Override public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) throws AmazonServiceException, AmazonClientException { InternalStream theStream = this.getStream(describeStreamRequest.getStreamName()); if (theStream != null) { StreamDescription desc = new StreamDescription(); desc = desc.withStreamName(theStream.getStreamName()).withStreamStatus(theStream.getStreamStatus()).withStreamARN(theStream.getStreamARN()); if (describeStreamRequest.getExclusiveStartShardId() == null || describeStreamRequest.getExclusiveStartShardId().isEmpty()) { desc.setShards(this.getShards(theStream)); desc.setHasMoreShards(false); } else { // Filter from given shard Id, or may not have any more String startId = describeStreamRequest.getExclusiveStartShardId(); desc.setShards(this.getShards(theStream, startId)); desc.setHasMoreShards(false); } DescribeStreamResult result = new DescribeStreamResult(); result = result.withStreamDescription(desc); return result; } else { throw new AmazonClientException("This stream does not exist!"); } }
/** * Wait up to the specified time for the stream to be created * @param topic * @param minutesToWait */ private void waitForStreamToBecomeAvailable(final String topic, final int minutesToWait) { /** Ask for no more than 10 shards at a time -- this is an optional parameter **/ final int shardsToQuery = 10; LOGGER.info("Waiting for topic {} to become ACTIVE...", topic); final long startTime = System.currentTimeMillis(); final long endTime = startTime + (minutesToWait * 60L * 1000L); final long sleepTime = (1000L * 10L); while (System.currentTimeMillis() < endTime) { try { Thread.sleep(sleepTime); } catch (InterruptedException e) { // Ignore interruption (doesn't impact stream creation) } try { final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(topic); describeStreamRequest.setLimit(shardsToQuery); /** stream response **/ final DescribeStreamResult describeStreamResponse = this.client.describeStream(describeStreamRequest); final String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus(); LOGGER.info("Topic: {} Current state: {}", topic, streamStatus); if (streamStatus.equals("ACTIVE")) { return; } } catch (AmazonServiceException ase) { if (ase.getErrorCode().equalsIgnoreCase("ResourceNotFoundException") == false) { throw ase; } throw new RuntimeException("Stream " + topic + " never became active"); } } }
@Override public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) { throw new RuntimeException("Not implemented"); }
@Override public DescribeStreamResult describeStream(String streamName) { return describeStream(streamName, null); }
@Override public DescribeStreamResult describeStream(String streamName, Integer limit, String exclusiveStartShardId) { throw new RuntimeException("Not implemented"); }
/** * Get metainfo for a Kinesis stream, which contains information about which shards this Kinesis stream possess. * * <p>This method is using a "full jitter" approach described in AWS's article, * <a href="https://www.awsarchitectureblog.com/2015/03/backoff.html">"Exponential Backoff and Jitter"</a>. * This is necessary because concurrent calls will be made by all parallel subtask's fetcher. This * jitter backoff approach will help distribute calls across the fetchers over time. * * @param streamName the stream to describe * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) * @return the result of the describe stream operation */ private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); describeStreamRequest.setExclusiveStartShardId(startShardId); DescribeStreamResult describeStreamResult = null; // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException). int attemptCount = 0; while (describeStreamResult == null) { // retry until we get a result try { describeStreamResult = kinesisClient.describeStream(describeStreamRequest); } catch (LimitExceededException le) { long backoffMillis = fullJitterBackoff( describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++); LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for " + backoffMillis + " millis."); Thread.sleep(backoffMillis); } catch (ResourceNotFoundException re) { throw new RuntimeException("Error while getting stream details", re); } } String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) { if (LOG.isWarnEnabled()) { LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " + "describeStream operation will not contain any shard information."); } } // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive // start shard id in the returned shards list; check if we need to remove these erroneously returned shards if (startShardId != null) { List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); Iterator<Shard> shardItr = shards.iterator(); while (shardItr.hasNext()) { if (StreamShardHandle.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) { shardItr.remove(); } } } return describeStreamResult; }
/** * Get metainfo for a Kinesis stream, which contains information about which shards this Kinesis stream possess. * * This method is using a "full jitter" approach described in AWS's article, * <a href="https://www.awsarchitectureblog.com/2015/03/backoff.html">"Exponential Backoff and Jitter"</a>. * This is necessary because concurrent calls will be made by all parallel subtask's fetcher. This * jitter backoff approach will help distribute calls across the fetchers over time. * * @param streamName the stream to describe * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) * @return the result of the describe stream operation */ private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); describeStreamRequest.setExclusiveStartShardId(startShardId); DescribeStreamResult describeStreamResult = null; // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException). int attemptCount = 0; while (describeStreamResult == null) { // retry until we get a result try { describeStreamResult = kinesisClient.describeStream(describeStreamRequest); } catch (LimitExceededException le) { long backoffMillis = fullJitterBackoff( describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++); LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for " + backoffMillis + " millis."); Thread.sleep(backoffMillis); } catch (ResourceNotFoundException re) { throw new RuntimeException("Error while getting stream details", re); } } String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) { if (LOG.isWarnEnabled()) { LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " + "describeStream operation will not contain any shard information."); } } // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive // start shard id in the returned shards list; check if we need to remove these erroneously returned shards if (startShardId != null) { List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); Iterator<Shard> shardItr = shards.iterator(); while (shardItr.hasNext()) { if (KinesisStreamShard.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) { shardItr.remove(); } } } return describeStreamResult; }
@Override public DescribeStreamResult describeStream(String s) throws AmazonServiceException, AmazonClientException { return null; }
@Override public DescribeStreamResult describeStream(String s, String s1) throws AmazonServiceException, AmazonClientException { return null; }
@Override public DescribeStreamResult describeStream(String s, Integer integer, String s1) throws AmazonServiceException, AmazonClientException { return null; }
public static void main(String[] args) { AmazonKinesisClient kinesisClient = Helper.setupKinesisClient(); // Retrieve the Shards from a Stream DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(Helper.properties().getProperty("kinesisStreamName")); DescribeStreamResult describeStreamResult; List<Shard> shards = new ArrayList<>(); String lastShardId = null; do { describeStreamRequest.setExclusiveStartShardId(lastShardId); describeStreamResult = kinesisClient.describeStream(describeStreamRequest); shards.addAll(describeStreamResult.getStreamDescription().getShards()); if (shards.size() > 0) { lastShardId = shards.get(shards.size() - 1).getShardId(); } } while (describeStreamResult.getStreamDescription().getHasMoreShards()); // Get Data from the Shards in a Stream // Hard-coded to use only 1 shard String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(Helper.properties().getProperty("kinesisStreamName")); getShardIteratorRequest.setShardId(shards.get(0).getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator(); // Continuously read data records from shard. List<Record> records; while (true) { // Create new GetRecordsRequest with existing shardIterator. // Set maximum records to return to 1000. GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(1000); GetRecordsResult result = kinesisClient.getRecords(getRecordsRequest); // Put result into record list. Result may be empty. records = result.getRecords(); // Print records for (Record record : records) { ByteBuffer byteBuffer = record.getData(); System.out.println(String.format("Seq No: %s - %s", record.getSequenceNumber(), new String(byteBuffer.array()))); } try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); } }
/** * Configures this appender instance and makes it ready for use by the * consumers. It validates mandatory parameters and confirms if the configured * stream is ready for publishing data yet. * * Error details are made available through the fallback handler for this * appender * * @throws IllegalStateException * if we encounter issues configuring this appender instance */ @Override public void activateOptions() { if (streamName == null) { initializationFailed = true; error("Invalid configuration - streamName cannot be null for appender: " + name); } if (layout == null) { initializationFailed = true; error("Invalid configuration - No layout for appender: " + name); } ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration = setProxySettingsFromSystemProperties(clientConfiguration); clientConfiguration.setMaxErrorRetry(maxRetries); clientConfiguration.setRetryPolicy(new RetryPolicy(PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION, PredefinedRetryPolicies.DEFAULT_BACKOFF_STRATEGY, maxRetries, true)); clientConfiguration.setUserAgent(AppenderConstants.USER_AGENT_STRING); BlockingQueue<Runnable> taskBuffer = new LinkedBlockingDeque<Runnable>(bufferSize); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(threadCount, threadCount, AppenderConstants.DEFAULT_THREAD_KEEP_ALIVE_SEC, TimeUnit.SECONDS, taskBuffer, new BlockFastProducerPolicy()); threadPoolExecutor.prestartAllCoreThreads(); kinesisClient = new AmazonKinesisAsyncClient(new CustomCredentialsProviderChain(), clientConfiguration, threadPoolExecutor); boolean regionProvided = !Validator.isBlank(region); if (!regionProvided) { region = AppenderConstants.DEFAULT_REGION; } if (!Validator.isBlank(endpoint)) { if (regionProvided) { LOGGER .warn("Received configuration for both region as well as Amazon Kinesis endpoint. (" + endpoint + ") will be used as endpoint instead of default endpoint for region (" + region + ")"); } kinesisClient.setEndpoint(endpoint, AppenderConstants.DEFAULT_SERVICE_NAME, region); } else { kinesisClient.setRegion(Region.getRegion(Regions.fromName(region))); } DescribeStreamResult describeResult = null; try { describeResult = kinesisClient.describeStream(streamName); String streamStatus = describeResult.getStreamDescription().getStreamStatus(); if (!StreamStatus.ACTIVE.name().equals(streamStatus) && !StreamStatus.UPDATING.name().equals(streamStatus)) { initializationFailed = true; error("Stream " + streamName + " is not ready (in active/updating status) for appender: " + name); } } catch (ResourceNotFoundException rnfe) { initializationFailed = true; error("Stream " + streamName + " doesn't exist for appender: " + name, rnfe); } asyncCallHander = new AsyncPutCallStatsReporter(name); }
@Test(enabled = false) public void testKinesisConnection() { DescribeStreamResult describeStreamResult = client.describeStream(awsStreamName); System.out.println("AWS Kinesis stream " + awsStreamName + ": " + (describeStreamResult == null ? "null" : describeStreamResult.toString())); }