private void refreshShards() { DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); String exclusiveStartShardId = null; List<Shard> shards = new ArrayList<>(); do { describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId); DescribeStreamResult describeStreamResult = kinesisClient.describeStream(describeStreamRequest); shards.addAll(describeStreamResult.getStreamDescription().getShards()); if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) { exclusiveStartShardId = shards.get(shards.size() - 1).getShardId(); } else { exclusiveStartShardId = null; } } while (exclusiveStartShardId != null); this.shards = shards; }
@Test public void testProvisionProducerSuccessfulWithExistingStream() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); ExtendedProducerProperties<KinesisProducerProperties> extendedProducerProperties = new ExtendedProducerProperties<>(new KinesisProducerProperties()); String name = "test-stream"; DescribeStreamResult describeStreamResult = describeStreamResultWithShards( Collections.singletonList(new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenReturn(describeStreamResult); ProducerDestination destination = provisioner.provisionProducerDestination(name, extendedProducerProperties); verify(amazonKinesisMock) .describeStream(any(DescribeStreamRequest.class)); assertThat(destination.getName()).isEqualTo(name); }
@Test public void testProvisionConsumerSuccessfulWithExistingStream() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(new KinesisConsumerProperties()); String name = "test-stream"; String group = "test-group"; DescribeStreamResult describeStreamResult = describeStreamResultWithShards(Collections.singletonList(new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenReturn(describeStreamResult); ConsumerDestination destination = provisioner.provisionConsumerDestination(name, group, extendedConsumerProperties); verify(amazonKinesisMock) .describeStream(any(DescribeStreamRequest.class)); assertThat(destination.getName()).isEqualTo(name); }
@Before public void setup() throws Exception { KinesisEndpoint endpoint = new KinesisEndpoint(null, "streamName", component); endpoint.setAmazonKinesisClient(kinesisClient); endpoint.setIteratorType(ShardIteratorType.LATEST); undertest = new KinesisConsumer(endpoint, processor); when(kinesisClient.getRecords(any(GetRecordsRequest.class))) .thenReturn(new GetRecordsResult() .withNextShardIterator("nextShardIterator") ); when(kinesisClient.describeStream(any(DescribeStreamRequest.class))) .thenReturn(new DescribeStreamResult() .withStreamDescription(new StreamDescription() .withShards(new Shard().withShardId("shardId")) ) ); when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))) .thenReturn(new GetShardIteratorResult() .withShardIterator("shardIterator") ); }
@Test public void itObtainsAShardIteratorOnFirstPollForSequenceNumber() throws Exception { undertest.getEndpoint().setSequenceNumber("12345"); undertest.getEndpoint().setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); undertest.poll(); final ArgumentCaptor<DescribeStreamRequest> describeStreamReqCap = ArgumentCaptor.forClass(DescribeStreamRequest.class); final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(kinesisClient).describeStream(describeStreamReqCap.capture()); assertThat(describeStreamReqCap.getValue().getStreamName(), is("streamName")); verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture()); assertThat(getShardIteratorReqCap.getValue().getStreamName(), is("streamName")); assertThat(getShardIteratorReqCap.getValue().getShardId(), is("shardId")); assertThat(getShardIteratorReqCap.getValue().getShardIteratorType(), is("AFTER_SEQUENCE_NUMBER")); assertThat(getShardIteratorReqCap.getValue().getStartingSequenceNumber(), is("12345")); }
/** * Create the specified topic with the specified number of partitions */ public void createTopic(String topicName, int partitions) { LOGGER.info("Determining if Kinesis topic: {} already exists...", topicName); try{ final DescribeStreamRequest describeRequest = new DescribeStreamRequest(); describeRequest.withStreamName(topicName); this.client.describeStream(describeRequest); }catch(ResourceNotFoundException rnf){ LOGGER.info("Kinesis stream for topic: {} does not exist, creating now with shard count: {}",topicName, partitions); final CreateStreamRequest request = new CreateStreamRequest(); request.withStreamName(topicName); request.withShardCount(partitions); this.client.createStream(request); this.waitForStreamToBecomeAvailable(topicName, DEFAULT_WAIT_TIME_MINUTES); LOGGER.info("Create topic completed for topic: {}", topicName); } }
private static void waitForStreamToBecomeAvailable(String myStreamName) throws InterruptedException { LOG.info("Waiting for " + myStreamName + " to become ACTIVE..."); long startTime = System.currentTimeMillis(); long endTime = startTime + TimeUnit.MINUTES.toMillis(5); while (System.currentTimeMillis() < endTime) { Thread.sleep( TimeUnit.SECONDS.toMillis(5)); DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(myStreamName); describeStreamRequest.setLimit(10); DescribeStreamResult describeStreamResponse = kinesisClient .describeStream(describeStreamRequest); String streamStatus = describeStreamResponse .getStreamDescription().getStreamStatus(); if ("ACTIVE".equals(streamStatus)) { return; } } }
private List<Shard> describeStream(String stream) { AmazonKinesisAsync amazonKinesis = localKinesisResource.getResource(); String exclusiveStartShardId = null; DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest() .withStreamName(stream); List<Shard> shardList = new ArrayList<>(); while (true) { DescribeStreamResult describeStreamResult = null; describeStreamRequest.withExclusiveStartShardId(exclusiveStartShardId); describeStreamResult = amazonKinesis.describeStream(describeStreamRequest); StreamDescription streamDescription = describeStreamResult.getStreamDescription(); if (StreamStatus.ACTIVE.toString().equals(streamDescription.getStreamStatus())) { shardList.addAll(streamDescription.getShards()); if (streamDescription.getHasMoreShards()) { exclusiveStartShardId = shardList.get(shardList.size() - 1).getShardId(); continue; } else { return shardList; } } try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); } } }
@Test public void testProvisionConsumerExistingStreamUpdateShards() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); ArgumentCaptor<UpdateShardCountRequest> updateShardCaptor = ArgumentCaptor.forClass(UpdateShardCountRequest.class); String name = "test-stream"; String group = "test-group"; int targetShardCount = 2; KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); binderProperties.setMinShardCount(targetShardCount); binderProperties.setAutoAddShards(true); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(new KinesisConsumerProperties()); DescribeStreamResult describeOriginalStream = describeStreamResultWithShards(Collections.singletonList(new Shard())); DescribeStreamResult describeUpdatedStream = describeStreamResultWithShards(Arrays.asList(new Shard(), new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenReturn(describeOriginalStream) .thenReturn(describeUpdatedStream); provisioner.provisionConsumerDestination(name, group, extendedConsumerProperties); verify(amazonKinesisMock, times(1)) .updateShardCount(updateShardCaptor.capture()); assertThat(updateShardCaptor.getValue().getStreamName()).isEqualTo(name); assertThat(updateShardCaptor.getValue().getScalingType()).isEqualTo(ScalingType.UNIFORM_SCALING.name()); assertThat(updateShardCaptor.getValue().getTargetShardCount()).isEqualTo(targetShardCount); }
@Test public void testProvisionProducerSuccessfulWithNewStream() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); ExtendedProducerProperties<KinesisProducerProperties> extendedProducerProperties = new ExtendedProducerProperties<>(new KinesisProducerProperties()); String name = "test-stream"; Integer shards = 1; DescribeStreamResult describeStreamResult = describeStreamResultWithShards(Collections.singletonList(new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenThrow(new ResourceNotFoundException("I got nothing")) .thenReturn(describeStreamResult); when(amazonKinesisMock.createStream(name, shards)) .thenReturn(new CreateStreamResult()); ProducerDestination destination = provisioner.provisionProducerDestination(name, extendedProducerProperties); verify(amazonKinesisMock, times(2)) .describeStream(any(DescribeStreamRequest.class)); verify(amazonKinesisMock) .createStream(name, shards); assertThat(destination.getName()).isEqualTo(name); }
@Test public void testProvisionProducerUpdateShards() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); ArgumentCaptor<UpdateShardCountRequest> updateShardCaptor = ArgumentCaptor.forClass(UpdateShardCountRequest.class); String name = "test-stream"; String group = "test-group"; int targetShardCount = 2; KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); binderProperties.setMinShardCount(targetShardCount); binderProperties.setAutoAddShards(true); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(new KinesisConsumerProperties()); DescribeStreamResult describeOriginalStream = describeStreamResultWithShards(Collections.singletonList(new Shard())); DescribeStreamResult describeUpdatedStream = describeStreamResultWithShards(Arrays.asList(new Shard(), new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenReturn(describeOriginalStream) .thenReturn(describeUpdatedStream); provisioner.provisionConsumerDestination(name, group, extendedConsumerProperties); verify(amazonKinesisMock, times(1)) .updateShardCount(updateShardCaptor.capture()); assertThat(updateShardCaptor.getValue().getStreamName()).isEqualTo(name); assertThat(updateShardCaptor.getValue().getScalingType()).isEqualTo(ScalingType.UNIFORM_SCALING.name()); assertThat(updateShardCaptor.getValue().getTargetShardCount()).isEqualTo(targetShardCount); }
@Test public void testProvisionConsumerSuccessfulWithNewStream() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); int instanceCount = 1; int concurrency = 1; ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(new KinesisConsumerProperties()); extendedConsumerProperties.setInstanceCount(instanceCount); extendedConsumerProperties.setConcurrency(concurrency); String name = "test-stream"; String group = "test-group"; DescribeStreamResult describeStreamResult = describeStreamResultWithShards(Collections.singletonList(new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenThrow(new ResourceNotFoundException("I got nothing")) .thenReturn(describeStreamResult); when(amazonKinesisMock.createStream(name, instanceCount * concurrency)) .thenReturn(new CreateStreamResult()); ConsumerDestination destination = provisioner.provisionConsumerDestination(name, group, extendedConsumerProperties); verify(amazonKinesisMock, times(2)) .describeStream(any(DescribeStreamRequest.class)); verify(amazonKinesisMock) .createStream(name, instanceCount * concurrency); assertThat(destination.getName()).isEqualTo(name); }
@Override public void start(Map<String, String> settings) { log.info("start()"); this.settings = settings; this.config = new KinesisSourceConnectorConfig(settings); this.kinesisClient = this.kinesisClientFactory.create(this.config); DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest() .withStreamName(this.config.kinesisStreamName); DescribeStreamResult describeStreamResult = this.kinesisClient.describeStream(describeStreamRequest); this.streamDescription = describeStreamResult.getStreamDescription(); }
@Test public void start() { final DescribeStreamRequest expectedDescribeStreamRequest = new DescribeStreamRequest() .withStreamName(TestData.EXPECTED_STREAM_NAME); final int SHARD_COUNT = 50; List<Shard> shards = new ArrayList<>(SHARD_COUNT); for (int i = 0; i < SHARD_COUNT; i++) { String shardId = String.format("%03d", i); final Shard shard = new Shard() .withShardId(shardId); shards.add(shard); } final StreamDescription streamDescription = new StreamDescription() .withStreamName(TestData.EXPECTED_STREAM_NAME) .withShards(shards); final DescribeStreamResult expectedStreamRequest = new DescribeStreamResult() .withStreamDescription(streamDescription); when(this.kinesisClient.describeStream(any(DescribeStreamRequest.class))).thenReturn(expectedStreamRequest); this.connector.start(TestData.settings()); List<Map<String, String>> taskConfigs = this.connector.taskConfigs(SHARD_COUNT); assertEquals(SHARD_COUNT, taskConfigs.size()); verify(this.kinesisClient, atLeastOnce()).describeStream(expectedDescribeStreamRequest); }
private String getShardItertor() { // either return a cached one or get a new one via a GetShardIterator request. if (currentShardIterator == null) { String shardId; //If ShardId supplied use it, else choose first one if (!getEndpoint().getShardId().isEmpty()) { shardId = getEndpoint().getShardId(); } else { DescribeStreamRequest req1 = new DescribeStreamRequest() .withStreamName(getEndpoint().getStreamName()); DescribeStreamResult res1 = getClient().describeStream(req1); shardId = res1.getStreamDescription().getShards().get(0).getShardId(); } LOG.debug("ShardId is: {}", shardId); GetShardIteratorRequest req = new GetShardIteratorRequest() .withStreamName(getEndpoint().getStreamName()) .withShardId(shardId) .withShardIteratorType(getEndpoint().getIteratorType()); if (hasSequenceNumber()) { req.withStartingSequenceNumber(getEndpoint().getSequenceNumber()); } GetShardIteratorResult result = getClient().getShardIterator(req); currentShardIterator = result.getShardIterator(); } LOG.debug("Shard Iterator is: {}", currentShardIterator); return currentShardIterator; }
@Test public void itObtainsAShardIteratorOnFirstPoll() throws Exception { undertest.poll(); final ArgumentCaptor<DescribeStreamRequest> describeStreamReqCap = ArgumentCaptor.forClass(DescribeStreamRequest.class); final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(kinesisClient).describeStream(describeStreamReqCap.capture()); assertThat(describeStreamReqCap.getValue().getStreamName(), is("streamName")); verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture()); assertThat(getShardIteratorReqCap.getValue().getStreamName(), is("streamName")); assertThat(getShardIteratorReqCap.getValue().getShardId(), is("shardId")); assertThat(getShardIteratorReqCap.getValue().getShardIteratorType(), is("LATEST")); }
@Test public void itDoesNotMakeADescribeStreamRequestIfShardIdIsSet() throws Exception { undertest.getEndpoint().setShardId("shardIdPassedAsUrlParam"); undertest.poll(); verify(kinesisClient, never()).describeStream(any(DescribeStreamRequest.class)); final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture()); assertThat(getShardIteratorReqCap.getValue().getStreamName(), is("streamName")); assertThat(getShardIteratorReqCap.getValue().getShardId(), is("shardIdPassedAsUrlParam")); assertThat(getShardIteratorReqCap.getValue().getShardIteratorType(), is("LATEST")); }
@Test public void itUsesTheShardIteratorOnSubsiquentPolls() throws Exception { undertest.poll(); undertest.poll(); final ArgumentCaptor<GetRecordsRequest> getRecordsReqCap = ArgumentCaptor.forClass(GetRecordsRequest.class); verify(kinesisClient, times(1)).describeStream(any(DescribeStreamRequest.class)); verify(kinesisClient, times(1)).getShardIterator(any(GetShardIteratorRequest.class)); verify(kinesisClient, times(2)).getRecords(getRecordsReqCap.capture()); assertThat(getRecordsReqCap.getAllValues().get(0).getShardIterator(), is("shardIterator")); assertThat(getRecordsReqCap.getAllValues().get(1).getShardIterator(), is("nextShardIterator")); }
/** * Helper method to determine if an Amazon Kinesis stream exists. * * @param kinesisClient * The {@link AmazonKinesisClient} with Amazon Kinesis read privileges * @param streamName * The Amazon Kinesis stream to check for * @return true if the Amazon Kinesis stream exists, otherwise return false */ private static boolean streamExists(AmazonKinesisClient kinesisClient, String streamName) { DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); try { kinesisClient.describeStream(describeStreamRequest); return true; } catch (ResourceNotFoundException e) { return false; } }
/** * Return the state of a Amazon Kinesis stream. * * @param kinesisClient * The {@link AmazonKinesisClient} with Amazon Kinesis read privileges * @param streamName * The Amazon Kinesis stream to get the state of * @return String representation of the Stream state */ private static String streamState(AmazonKinesisClient kinesisClient, String streamName) { DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); try { return kinesisClient.describeStream(describeStreamRequest).getStreamDescription().getStreamStatus(); } catch (AmazonServiceException e) { return null; } }
private static boolean streamActive(AmazonKinesis client, String streamName) { try { DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); DescribeStreamResult describeStreamResult = client.describeStream(describeStreamRequest); String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); if ("ACTIVE".equals(streamStatus)) { return true; } } catch (Exception e) { return false; } return false; }
/** * Internal method to retrieve the stream description and get the shards from AWS. * * Gets from the internal cache unless not yet created or too old. * * @param streamName * @return */ protected InternalStreamDescription getStreamDescription(String streamName) { InternalStreamDescription desc = this.streamMap.get(streamName); if (desc == null || System.currentTimeMillis() - desc.getCreateTimeStamp() >= MAX_CACHE_AGE_MILLIS) { desc = new InternalStreamDescription(streamName); DescribeStreamRequest describeStreamRequest = clientManager.getDescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); // Collect shards from Kinesis String exclusiveStartShardId = null; List<Shard> shards = new ArrayList<>(); do { describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId); DescribeStreamResult describeStreamResult = clientManager.getClient().describeStream(describeStreamRequest); String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); if (!streamStatus.equals("ACTIVE") && !streamStatus.equals("UPDATING")) { throw new ResourceNotFoundException("Stream not Active"); } desc.addAllShards(describeStreamResult.getStreamDescription().getShards()); if (describeStreamResult.getStreamDescription().getHasMoreShards() && (shards.size() > 0)) { exclusiveStartShardId = shards.get(shards.size() - 1).getShardId(); } else { exclusiveStartShardId = null; } } while (exclusiveStartShardId != null); this.streamMap.put(streamName, desc); } return desc; }
@Override public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) throws AmazonServiceException, AmazonClientException { InternalStream theStream = this.getStream(describeStreamRequest.getStreamName()); if (theStream != null) { StreamDescription desc = new StreamDescription(); desc = desc.withStreamName(theStream.getStreamName()).withStreamStatus(theStream.getStreamStatus()).withStreamARN(theStream.getStreamARN()); if (describeStreamRequest.getExclusiveStartShardId() == null || describeStreamRequest.getExclusiveStartShardId().isEmpty()) { desc.setShards(this.getShards(theStream)); desc.setHasMoreShards(false); } else { // Filter from given shard Id, or may not have any more String startId = describeStreamRequest.getExclusiveStartShardId(); desc.setShards(this.getShards(theStream, startId)); desc.setHasMoreShards(false); } DescribeStreamResult result = new DescribeStreamResult(); result = result.withStreamDescription(desc); return result; } else { throw new AmazonClientException("This stream does not exist!"); } }
private String checkStreamStatus(String streamName) { DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); StreamDescription streamDescription = amazonKinesisClient.describeStream(describeStreamRequest).getStreamDescription(); return streamDescription.getStreamStatus(); }
/** * Wait up to the specified time for the stream to be created * @param topic * @param minutesToWait */ private void waitForStreamToBecomeAvailable(final String topic, final int minutesToWait) { /** Ask for no more than 10 shards at a time -- this is an optional parameter **/ final int shardsToQuery = 10; LOGGER.info("Waiting for topic {} to become ACTIVE...", topic); final long startTime = System.currentTimeMillis(); final long endTime = startTime + (minutesToWait * 60L * 1000L); final long sleepTime = (1000L * 10L); while (System.currentTimeMillis() < endTime) { try { Thread.sleep(sleepTime); } catch (InterruptedException e) { // Ignore interruption (doesn't impact stream creation) } try { final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(topic); describeStreamRequest.setLimit(shardsToQuery); /** stream response **/ final DescribeStreamResult describeStreamResponse = this.client.describeStream(describeStreamRequest); final String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus(); LOGGER.info("Topic: {} Current state: {}", topic, streamStatus); if (streamStatus.equals("ACTIVE")) { return; } } catch (AmazonServiceException ase) { if (ase.getErrorCode().equalsIgnoreCase("ResourceNotFoundException") == false) { throw ase; } throw new RuntimeException("Stream " + topic + " never became active"); } } }
public static void createStream(AmazonKinesisClient client, String streamName) throws Exception { client.createStream(streamName, 1); Waiter<DescribeStreamRequest> waiter = client.waiters().streamExists(); DescribeStreamRequest request = new DescribeStreamRequest().withStreamName(streamName); Assert.assertNotNull("Cannot obtain stream description", request); Future<Void> future = waiter.runAsync(new WaiterParameters<DescribeStreamRequest>(request), new NoOpWaiterHandler()); future.get(1, TimeUnit.MINUTES); }
@Override public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) { throw new RuntimeException("Not implemented"); }
/** * Get metainfo for a Kinesis stream, which contains information about which shards this Kinesis stream possess. * * <p>This method is using a "full jitter" approach described in AWS's article, * <a href="https://www.awsarchitectureblog.com/2015/03/backoff.html">"Exponential Backoff and Jitter"</a>. * This is necessary because concurrent calls will be made by all parallel subtask's fetcher. This * jitter backoff approach will help distribute calls across the fetchers over time. * * @param streamName the stream to describe * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) * @return the result of the describe stream operation */ private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); describeStreamRequest.setExclusiveStartShardId(startShardId); DescribeStreamResult describeStreamResult = null; // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException). int attemptCount = 0; while (describeStreamResult == null) { // retry until we get a result try { describeStreamResult = kinesisClient.describeStream(describeStreamRequest); } catch (LimitExceededException le) { long backoffMillis = fullJitterBackoff( describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++); LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for " + backoffMillis + " millis."); Thread.sleep(backoffMillis); } catch (ResourceNotFoundException re) { throw new RuntimeException("Error while getting stream details", re); } } String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) { if (LOG.isWarnEnabled()) { LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " + "describeStream operation will not contain any shard information."); } } // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive // start shard id in the returned shards list; check if we need to remove these erroneously returned shards if (startShardId != null) { List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); Iterator<Shard> shardItr = shards.iterator(); while (shardItr.hasNext()) { if (StreamShardHandle.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) { shardItr.remove(); } } } return describeStreamResult; }
/** * Get metainfo for a Kinesis stream, which contains information about which shards this Kinesis stream possess. * * This method is using a "full jitter" approach described in AWS's article, * <a href="https://www.awsarchitectureblog.com/2015/03/backoff.html">"Exponential Backoff and Jitter"</a>. * This is necessary because concurrent calls will be made by all parallel subtask's fetcher. This * jitter backoff approach will help distribute calls across the fetchers over time. * * @param streamName the stream to describe * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) * @return the result of the describe stream operation */ private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); describeStreamRequest.setExclusiveStartShardId(startShardId); DescribeStreamResult describeStreamResult = null; // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException). int attemptCount = 0; while (describeStreamResult == null) { // retry until we get a result try { describeStreamResult = kinesisClient.describeStream(describeStreamRequest); } catch (LimitExceededException le) { long backoffMillis = fullJitterBackoff( describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++); LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for " + backoffMillis + " millis."); Thread.sleep(backoffMillis); } catch (ResourceNotFoundException re) { throw new RuntimeException("Error while getting stream details", re); } } String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) { if (LOG.isWarnEnabled()) { LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " + "describeStream operation will not contain any shard information."); } } // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive // start shard id in the returned shards list; check if we need to remove these erroneously returned shards if (startShardId != null) { List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); Iterator<Shard> shardItr = shards.iterator(); while (shardItr.hasNext()) { if (KinesisStreamShard.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) { shardItr.remove(); } } } return describeStreamResult; }
@Override public DescribeStreamRequest getDescribeStreamRequest() { return new DescribeStreamRequest(); }
public static void main(String[] args) { AmazonKinesisClient kinesisClient = Helper.setupKinesisClient(); // Retrieve the Shards from a Stream DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(Helper.properties().getProperty("kinesisStreamName")); DescribeStreamResult describeStreamResult; List<Shard> shards = new ArrayList<>(); String lastShardId = null; do { describeStreamRequest.setExclusiveStartShardId(lastShardId); describeStreamResult = kinesisClient.describeStream(describeStreamRequest); shards.addAll(describeStreamResult.getStreamDescription().getShards()); if (shards.size() > 0) { lastShardId = shards.get(shards.size() - 1).getShardId(); } } while (describeStreamResult.getStreamDescription().getHasMoreShards()); // Get Data from the Shards in a Stream // Hard-coded to use only 1 shard String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(Helper.properties().getProperty("kinesisStreamName")); getShardIteratorRequest.setShardId(shards.get(0).getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator(); // Continuously read data records from shard. List<Record> records; while (true) { // Create new GetRecordsRequest with existing shardIterator. // Set maximum records to return to 1000. GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(1000); GetRecordsResult result = kinesisClient.getRecords(getRecordsRequest); // Put result into record list. Result may be empty. records = result.getRecords(); // Print records for (Record record : records) { ByteBuffer byteBuffer = record.getData(); System.out.println(String.format("Seq No: %s - %s", record.getSequenceNumber(), new String(byteBuffer.array()))); } try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); } }
DescribeStreamRequest getDescribeStreamRequest();