public List<Shard> listShards(final String streamName) throws TransientKinesisException { return wrapExceptions(new Callable<List<Shard>>() { @Override public List<Shard> call() throws Exception { List<Shard> shards = Lists.newArrayList(); String lastShardId = null; StreamDescription description; do { description = kinesis.describeStream(streamName, lastShardId) .getStreamDescription(); shards.addAll(description.getShards()); lastShardId = shards.get(shards.size() - 1).getShardId(); } while (description.getHasMoreShards()); return shards; } }); }
@Test public void shouldListAllShards() throws Exception { Shard shard1 = new Shard().withShardId(SHARD_1); Shard shard2 = new Shard().withShardId(SHARD_2); Shard shard3 = new Shard().withShardId(SHARD_3); given(kinesis.describeStream(STREAM, null)).willReturn(new DescribeStreamResult() .withStreamDescription(new StreamDescription() .withShards(shard1, shard2) .withHasMoreShards(true))); given(kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult() .withStreamDescription(new StreamDescription() .withShards(shard3) .withHasMoreShards(false))); List<Shard> shards = underTest.listShards(STREAM); assertThat(shards).containsOnly(shard1, shard2, shard3); }
@Override public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) { int nextShardId = 0; if (exclusiveStartShardId != null) { nextShardId = parseInt(exclusiveStartShardId) + 1; } boolean hasMoreShards = nextShardId + 1 < shardedData.size(); List<Shard> shards = new ArrayList<>(); if (nextShardId < shardedData.size()) { shards.add(new Shard().withShardId(Integer.toString(nextShardId))); } HttpResponse response = new HttpResponse(null, null); response.setStatusCode(200); DescribeStreamResult result = new DescribeStreamResult(); result.setSdkHttpMetadata(SdkHttpMetadata.from(response)); result.withStreamDescription( new StreamDescription() .withHasMoreShards(hasMoreShards) .withShards(shards) .withStreamName(streamName)); return result; }
@Before public void setup() throws Exception { KinesisEndpoint endpoint = new KinesisEndpoint(null, "streamName", component); endpoint.setAmazonKinesisClient(kinesisClient); endpoint.setIteratorType(ShardIteratorType.LATEST); undertest = new KinesisConsumer(endpoint, processor); when(kinesisClient.getRecords(any(GetRecordsRequest.class))) .thenReturn(new GetRecordsResult() .withNextShardIterator("nextShardIterator") ); when(kinesisClient.describeStream(any(DescribeStreamRequest.class))) .thenReturn(new DescribeStreamResult() .withStreamDescription(new StreamDescription() .withShards(new Shard().withShardId("shardId")) ) ); when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))) .thenReturn(new GetShardIteratorResult() .withShardIterator("shardIterator") ); }
private List<Shard> describeStream(String stream) { AmazonKinesisAsync amazonKinesis = localKinesisResource.getResource(); String exclusiveStartShardId = null; DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest() .withStreamName(stream); List<Shard> shardList = new ArrayList<>(); while (true) { DescribeStreamResult describeStreamResult = null; describeStreamRequest.withExclusiveStartShardId(exclusiveStartShardId); describeStreamResult = amazonKinesis.describeStream(describeStreamRequest); StreamDescription streamDescription = describeStreamResult.getStreamDescription(); if (StreamStatus.ACTIVE.toString().equals(streamDescription.getStreamStatus())) { shardList.addAll(streamDescription.getShards()); if (streamDescription.getHasMoreShards()) { exclusiveStartShardId = shardList.get(shardList.size() - 1).getShardId(); continue; } else { return shardList; } } try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); } } }
private static DescribeStreamResult describeStreamResultWithShards(List<Shard> shards) { return new DescribeStreamResult() .withStreamDescription( new StreamDescription() .withShards(shards) .withStreamStatus(StreamStatus.ACTIVE) .withHasMoreShards(Boolean.FALSE)); }
@Test public void start() { final DescribeStreamRequest expectedDescribeStreamRequest = new DescribeStreamRequest() .withStreamName(TestData.EXPECTED_STREAM_NAME); final int SHARD_COUNT = 50; List<Shard> shards = new ArrayList<>(SHARD_COUNT); for (int i = 0; i < SHARD_COUNT; i++) { String shardId = String.format("%03d", i); final Shard shard = new Shard() .withShardId(shardId); shards.add(shard); } final StreamDescription streamDescription = new StreamDescription() .withStreamName(TestData.EXPECTED_STREAM_NAME) .withShards(shards); final DescribeStreamResult expectedStreamRequest = new DescribeStreamResult() .withStreamDescription(streamDescription); when(this.kinesisClient.describeStream(any(DescribeStreamRequest.class))).thenReturn(expectedStreamRequest); this.connector.start(TestData.settings()); List<Map<String, String>> taskConfigs = this.connector.taskConfigs(SHARD_COUNT); assertEquals(SHARD_COUNT, taskConfigs.size()); verify(this.kinesisClient, atLeastOnce()).describeStream(expectedDescribeStreamRequest); }
public static long getShardCount( ClientConfiguration awsClientConfig, KinesisConfigBean conf, String streamName ) throws StageException { AmazonKinesis kinesisClient = getKinesisClient(awsClientConfig, conf); try { long numShards = 0; String lastShardId = null; StreamDescription description; do { if (lastShardId == null) { description = kinesisClient.describeStream(streamName).getStreamDescription(); } else { description = kinesisClient.describeStream(streamName, lastShardId).getStreamDescription(); } for (Shard shard : description.getShards()) { if (shard.getSequenceNumberRange().getEndingSequenceNumber() == null) { // Then this shard is open, so we should count it. Shards with an ending sequence number // are closed and cannot be written to, so we skip counting them. ++numShards; } } int pageSize = description.getShards().size(); lastShardId = description.getShards().get(pageSize - 1).getShardId(); } while (description.getHasMoreShards()); LOG.debug("Connected successfully to stream: '{}' with '{}' shards.", streamName, numShards); return numShards; } finally { kinesisClient.shutdown(); } }
/** * Get the last shard Id in the given stream * In preview mode, kinesis source uses the last Shard Id to get records from kinesis * @param awsClientConfig generic AWS client configuration * @param conf * @param streamName */ public static String getLastShardId( ClientConfiguration awsClientConfig, KinesisConfigBean conf, String streamName ) throws StageException { AmazonKinesis kinesisClient = getKinesisClient(awsClientConfig, conf); String lastShardId = null; try { StreamDescription description; do { if (lastShardId == null) { description = kinesisClient.describeStream(streamName).getStreamDescription(); } else { description = kinesisClient.describeStream(streamName, lastShardId).getStreamDescription(); } int pageSize = description.getShards().size(); lastShardId = description.getShards().get(pageSize - 1).getShardId(); } while (description.getHasMoreShards()); return lastShardId; } finally { kinesisClient.shutdown(); } }
@Override public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) throws AmazonServiceException, AmazonClientException { InternalStream theStream = this.getStream(describeStreamRequest.getStreamName()); if (theStream != null) { StreamDescription desc = new StreamDescription(); desc = desc.withStreamName(theStream.getStreamName()).withStreamStatus(theStream.getStreamStatus()).withStreamARN(theStream.getStreamARN()); if (describeStreamRequest.getExclusiveStartShardId() == null || describeStreamRequest.getExclusiveStartShardId().isEmpty()) { desc.setShards(this.getShards(theStream)); desc.setHasMoreShards(false); } else { // Filter from given shard Id, or may not have any more String startId = describeStreamRequest.getExclusiveStartShardId(); desc.setShards(this.getShards(theStream, startId)); desc.setHasMoreShards(false); } DescribeStreamResult result = new DescribeStreamResult(); result = result.withStreamDescription(desc); return result; } else { throw new AmazonClientException("This stream does not exist!"); } }
private String checkStreamStatus(String streamName) { DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); StreamDescription streamDescription = amazonKinesisClient.describeStream(describeStreamRequest).getStreamDescription(); return streamDescription.getStreamStatus(); }