public String getShardIterator(final String streamName, final String shardId, final ShardIteratorType shardIteratorType, final String startingSequenceNumber, final Instant timestamp) throws TransientKinesisException { final Date date = timestamp != null ? timestamp.toDate() : null; return wrapExceptions(new Callable<String>() { @Override public String call() throws Exception { return kinesis.getShardIterator(new GetShardIteratorRequest() .withStreamName(streamName) .withShardId(shardId) .withShardIteratorType(shardIteratorType) .withStartingSequenceNumber(startingSequenceNumber) .withTimestamp(date) ).getShardIterator(); } }); }
private ShardCheckpoint(String streamName, String shardId, ShardIteratorType shardIteratorType, String sequenceNumber, Long subSequenceNumber, Instant timestamp) { this.shardIteratorType = checkNotNull(shardIteratorType, "shardIteratorType"); this.streamName = checkNotNull(streamName, "streamName"); this.shardId = checkNotNull(shardId, "shardId"); if (shardIteratorType == AT_SEQUENCE_NUMBER || shardIteratorType == AFTER_SEQUENCE_NUMBER) { checkNotNull(sequenceNumber, "You must provide sequence number for AT_SEQUENCE_NUMBER" + " or AFTER_SEQUENCE_NUMBER"); } else { checkArgument(sequenceNumber == null, "Sequence number must be null for LATEST, TRIM_HORIZON or AT_TIMESTAMP"); } if (shardIteratorType == AT_TIMESTAMP) { checkNotNull(timestamp, "You must provide timestamp for AT_TIMESTAMP"); } else { checkArgument(timestamp == null, "Timestamp must be null for an iterator type other than AT_TIMESTAMP"); } this.subSequenceNumber = subSequenceNumber; this.sequenceNumber = sequenceNumber; this.timestamp = timestamp; }
@Test public void shouldReturnIteratorStartingWithTimestamp() throws Exception { Instant timestamp = Instant.now(); given(kinesis.getShardIterator(new GetShardIteratorRequest() .withStreamName(STREAM) .withShardId(SHARD_1) .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) .withTimestamp(timestamp.toDate()) )).willReturn(new GetShardIteratorResult() .withShardIterator(SHARD_ITERATOR)); String stream = underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp); assertThat(stream).isEqualTo(SHARD_ITERATOR); }
private void shouldHandleGetShardIteratorError( Exception thrownException, Class<? extends Exception> expectedExceptionClass) { GetShardIteratorRequest request = new GetShardIteratorRequest() .withStreamName(STREAM) .withShardId(SHARD_1) .withShardIteratorType(ShardIteratorType.LATEST); given(kinesis.getShardIterator(request)).willThrow(thrownException); try { underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null); failBecauseExceptionWasNotThrown(expectedExceptionClass); } catch (Exception e) { assertThat(e).isExactlyInstanceOf(expectedExceptionClass); } finally { reset(kinesis); } }
/** * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should * be used for the next call to this method. * * <p>Note: it is important that this method is not called again before all the records from the last result have been * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to * incorrect shard iteration if the iterator had to be refreshed. * * @param shardItr shard iterator to use * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt * @return get records result * @throws InterruptedException */ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException { GetRecordsResult getRecordsResult = null; while (getRecordsResult == null) { try { getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords); // Update millis behind latest so it gets reported by the millisBehindLatest gauge shardMetricsReporter.setMillisBehindLatest(getRecordsResult.getMillisBehindLatest()); } catch (ExpiredIteratorException eiEx) { LOG.warn("Encountered an unexpected expired iterator {} for shard {};" + " refreshing the iterator ...", shardItr, subscribedShard); shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator if (fetchIntervalMillis != 0) { Thread.sleep(fetchIntervalMillis); } } } return getRecordsResult; }
/** * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should * be used for the next call to this method. * * Note: it is important that this method is not called again before all the records from the last result have been * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to * incorrect shard iteration if the iterator had to be refreshed. * * @param shardItr shard iterator to use * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt * @return get records result * @throws InterruptedException */ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException { GetRecordsResult getRecordsResult = null; while (getRecordsResult == null) { try { getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords); } catch (ExpiredIteratorException eiEx) { LOG.warn("Encountered an unexpected expired iterator {} for shard {};" + " refreshing the iterator ...", shardItr, subscribedShard); shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator if (fetchIntervalMillis != 0) { Thread.sleep(fetchIntervalMillis); } } } return getRecordsResult; }
@Before public void setup() throws Exception { KinesisEndpoint endpoint = new KinesisEndpoint(null, "streamName", component); endpoint.setAmazonKinesisClient(kinesisClient); endpoint.setIteratorType(ShardIteratorType.LATEST); undertest = new KinesisConsumer(endpoint, processor); when(kinesisClient.getRecords(any(GetRecordsRequest.class))) .thenReturn(new GetRecordsResult() .withNextShardIterator("nextShardIterator") ); when(kinesisClient.describeStream(any(DescribeStreamRequest.class))) .thenReturn(new DescribeStreamResult() .withStreamDescription(new StreamDescription() .withShards(new Shard().withShardId("shardId")) ) ); when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))) .thenReturn(new GetShardIteratorResult() .withShardIterator("shardIterator") ); }
@Test public void itObtainsAShardIteratorOnFirstPollForSequenceNumber() throws Exception { undertest.getEndpoint().setSequenceNumber("12345"); undertest.getEndpoint().setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); undertest.poll(); final ArgumentCaptor<DescribeStreamRequest> describeStreamReqCap = ArgumentCaptor.forClass(DescribeStreamRequest.class); final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(kinesisClient).describeStream(describeStreamReqCap.capture()); assertThat(describeStreamReqCap.getValue().getStreamName(), is("streamName")); verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture()); assertThat(getShardIteratorReqCap.getValue().getStreamName(), is("streamName")); assertThat(getShardIteratorReqCap.getValue().getShardId(), is("shardId")); assertThat(getShardIteratorReqCap.getValue().getShardIteratorType(), is("AFTER_SEQUENCE_NUMBER")); assertThat(getShardIteratorReqCap.getValue().getStartingSequenceNumber(), is("12345")); }
@Test public void allTheEndpointParams() throws Exception { KinesisEndpoint endpoint = (KinesisEndpoint) camelContext.getEndpoint("aws-kinesis://some_stream_name" + "?amazonKinesisClient=#kinesisClient" + "&maxResultsPerRequest=101" + "&iteratorType=latest" + "&shardId=abc" + "&sequenceNumber=123" ); assertThat(endpoint.getClient(), is(amazonKinesisClient)); assertThat(endpoint.getStreamName(), is("some_stream_name")); assertThat(endpoint.getIteratorType(), is(ShardIteratorType.LATEST)); assertThat(endpoint.getMaxResultsPerRequest(), is(101)); assertThat(endpoint.getSequenceNumber(), is("123")); assertThat(endpoint.getShardId(), is("abc")); }
@Test public void testAutoCreateStreamForNonExistingStream() throws Exception { KinesisTestBinder binder = getBinder(); DirectChannel output = createBindableChannel("output", new BindingProperties()); ExtendedConsumerProperties<KinesisConsumerProperties> consumerProperties = createConsumerProperties(); Date testDate = new Date(); consumerProperties.getExtension() .setShardIteratorType(ShardIteratorType.AT_TIMESTAMP.name() + ":" + testDate.getTime()); String testStreamName = "nonexisting" + System.currentTimeMillis(); Binding<?> binding = binder.bindConsumer(testStreamName, "test", output, consumerProperties); binding.unbind(); DescribeStreamResult streamResult = localKinesisResource.getResource().describeStream(testStreamName); String createdStreamName = streamResult.getStreamDescription().getStreamName(); int createdShards = streamResult.getStreamDescription().getShards().size(); String createdStreamStatus = streamResult.getStreamDescription().getStreamStatus(); assertThat(createdStreamName).isEqualTo(testStreamName); assertThat(createdShards) .isEqualTo(consumerProperties.getInstanceCount() * consumerProperties.getConcurrency()); assertThat(createdStreamStatus).isEqualTo(StreamStatus.ACTIVE.toString()); KinesisShardOffset shardOffset = TestUtils.getPropertyValue(binding, "lifecycle.streamInitialSequence", KinesisShardOffset.class); assertThat(shardOffset.getIteratorType()).isEqualTo(ShardIteratorType.AT_TIMESTAMP); assertThat(shardOffset.getTimestamp()).isEqualTo(testDate); }
@Override public void start(Map<String, String> settings) { this.config = new KinesisSourceConnectorConfig(settings); this.kinesisClient = this.kinesisClientFactory.create(this.config); this.sourcePartition = ImmutableMap.of(RecordConverter.FIELD_SHARD_ID, this.config.kinesisShardId); Map<String, Object> lastOffset = this.context.offsetStorageReader().offset(this.sourcePartition); GetShardIteratorRequest shardIteratorRequest = new GetShardIteratorRequest() .withShardId(this.config.kinesisShardId) .withStreamName(this.config.kinesisStreamName); if (null != lastOffset && !lastOffset.isEmpty()) { String startingSequenceNumber = (String) lastOffset.get(RecordConverter.FIELD_SEQUENCE_NUMBER); log.info("start() - Starting iterator after last processed sequence number of '{}'", startingSequenceNumber); shardIteratorRequest.setShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); shardIteratorRequest.setStartingSequenceNumber(startingSequenceNumber); } else { log.info("start() - Setting Shard Iterator Type to {} for {}", this.config.kinesisPosition, this.config.kinesisShardId); shardIteratorRequest.setShardIteratorType(this.config.kinesisPosition); } GetShardIteratorResult shardIteratorResult = this.kinesisClient.getShardIterator(shardIteratorRequest); log.info("start() - Using Shard Iterator {}", shardIteratorResult.getShardIterator()); this.recordsRequest = new GetRecordsRequest() .withLimit(this.config.kinesisRecordLimit) .withShardIterator(shardIteratorResult.getShardIterator()); this.recordConverter = new RecordConverter(this.config); }
public KinesisSourceConnectorConfig(Map<String, String> parsedConfig) { super(config(), parsedConfig); this.awsAccessKeyId = this.getString(AWS_ACCESS_KEY_ID_CONF); this.awsSecretKeyId = this.getPassword(AWS_SECRET_KEY_ID_CONF).value(); this.kafkaTopic = this.getString(TOPIC_CONF); this.kinesisStreamName = this.getString(STREAM_NAME_CONF); this.kinesisPosition = ConfigUtils.getEnum(ShardIteratorType.class, this, KINESIS_POSISTION_CONF); this.kinesisRegion = ConfigUtils.getEnum(Regions.class, this, KINESIS_REGION_CONF); this.kinesisShardId = this.getString(KINESIS_SHARD_ID_CONF); this.kinesisRecordLimit = this.getInt(KINESIS_RECORD_LIMIT_CONF); this.kinesisEmptyRecordsBackoffMs = this.getLong(KINESIS_EMPTY_RECORDS_BACKOFF_MS_CONF); this.kinesisThroughputExceededBackoffMs = this.getLong(KINESIS_THROUGHPUT_EXCEEDED_BACKOFF_MS_CONF); }
public static ConfigDef config() { return new ConfigDef() .define(AWS_ACCESS_KEY_ID_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, AWS_ACCESS_KEY_ID_DOC) .define(AWS_SECRET_KEY_ID_CONF, ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, AWS_SECRET_KEY_ID_DOC) .define(TOPIC_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC) .define(STREAM_NAME_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, STREAM_NAME_DOC) .define(KINESIS_POSISTION_CONF, ConfigDef.Type.STRING, ShardIteratorType.TRIM_HORIZON.toString(), ValidEnum.of(ShardIteratorType.class), ConfigDef.Importance.MEDIUM, KINESIS_POSISTION_DOC) .define(KINESIS_REGION_CONF, ConfigDef.Type.STRING, Regions.US_EAST_1.toString(), ValidEnum.of(Regions.class), ConfigDef.Importance.MEDIUM, KINESIS_REGION_DOC) .define(KINESIS_SHARD_ID_CONF, ConfigDef.Type.STRING, ".*", ConfigDef.Importance.HIGH, KINESIS_SHARD_ID_DOC) .define(KINESIS_RECORD_LIMIT_CONF, ConfigDef.Type.INT, 500, ConfigDef.Range.between(1, 10000), ConfigDef.Importance.MEDIUM, KINESIS_RECORD_LIMIT_DOC) .define(KINESIS_EMPTY_RECORDS_BACKOFF_MS_CONF, ConfigDef.Type.LONG, 5000L, ConfigDef.Range.between(500, Integer.MAX_VALUE), ConfigDef.Importance.MEDIUM, KINESIS_EMPTY_RECORDS_BACKOFF_MS_DOC) .define(KINESIS_THROUGHPUT_EXCEEDED_BACKOFF_MS_CONF, ConfigDef.Type.LONG, 10 * 1000L, ConfigDef.Range.between(500, Integer.MAX_VALUE), ConfigDef.Importance.MEDIUM, KINESIS_THROUGHPUT_EXCEEDED_BACKOFF_MS_DOC); }
@Test public void sourceOffsets() throws InterruptedException { final String SEQUENCE_NUMBER = "asdfasdfddsa"; Map<String, Object> sourceOffset = ImmutableMap.of(RecordConverter.FIELD_SEQUENCE_NUMBER, SEQUENCE_NUMBER); when(this.offsetStorageReader.offset(anyMap())).thenReturn(sourceOffset); when(this.kinesisClient.getShardIterator(any())).thenReturn( new GetShardIteratorResult().withShardIterator("dfasdfsadfasdf") ); this.task.start(settings); GetRecordsResult recordsResult = new GetRecordsResult() .withNextShardIterator("dsfargadsfasdfasda") .withRecords(TestData.record()) .withMillisBehindLatest(0L); when(this.kinesisClient.getRecords(any())).thenReturn(recordsResult); List<SourceRecord> records = this.task.poll(); assertNotNull(records, "records should not be null."); assertFalse(records.isEmpty(), "records should not be empty."); verify(this.offsetStorageReader, atLeastOnce()).offset(anyMap()); GetShardIteratorRequest expectedIteratorRequest = new GetShardIteratorRequest() .withShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER) .withShardId(this.config.kinesisShardId) .withStreamName(this.config.kinesisStreamName) .withStartingSequenceNumber(SEQUENCE_NUMBER); verify(this.kinesisClient, atLeastOnce()).getShardIterator(expectedIteratorRequest); }
@Test public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception { given(kinesis.getShardIterator(new GetShardIteratorRequest() .withStreamName(STREAM) .withShardId(SHARD_1) .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) .withStartingSequenceNumber(SEQUENCE_NUMBER) )).willReturn(new GetShardIteratorResult() .withShardIterator(SHARD_ITERATOR)); String stream = underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null); assertThat(stream).isEqualTo(SHARD_ITERATOR); }
@Override public GetShardIteratorResult getShardIterator( GetShardIteratorRequest getShardIteratorRequest) { ShardIteratorType shardIteratorType = ShardIteratorType.fromValue( getShardIteratorRequest.getShardIteratorType()); String shardIterator; if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) { shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0); } else { throw new RuntimeException("Not implemented"); } return new GetShardIteratorResult().withShardIterator(shardIterator); }
/** * {@inheritDoc} */ @Override public String getShardIterator(StreamShardHandle shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException { GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() .withStreamName(shard.getStreamName()) .withShardId(shard.getShard().getShardId()) .withShardIteratorType(shardIteratorType); switch (ShardIteratorType.fromValue(shardIteratorType)) { case TRIM_HORIZON: case LATEST: break; case AT_TIMESTAMP: if (startingMarker instanceof Date) { getShardIteratorRequest.setTimestamp((Date) startingMarker); } else { throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_TIMESTAMP. Must be a Date object."); } break; case AT_SEQUENCE_NUMBER: case AFTER_SEQUENCE_NUMBER: if (startingMarker instanceof String) { getShardIteratorRequest.setStartingSequenceNumber((String) startingMarker); } else { throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER. Must be a String."); } } return getShardIterator(getShardIteratorRequest); }
@Override protected void doStart() throws Exception { if ((iteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || iteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) && sequenceNumber.isEmpty()) { throw new IllegalArgumentException("Sequence Number must be specified with iterator Types AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER"); } super.doStart(); }
@Test public void onlyRequiredEndpointParams() throws Exception { KinesisEndpoint endpoint = (KinesisEndpoint) camelContext.getEndpoint("aws-kinesis://some_stream_name" + "?amazonKinesisClient=#kinesisClient" ); assertThat(endpoint.getClient(), is(amazonKinesisClient)); assertThat(endpoint.getStreamName(), is("some_stream_name")); assertThat(endpoint.getIteratorType(), is(ShardIteratorType.TRIM_HORIZON)); assertThat(endpoint.getMaxResultsPerRequest(), is(1)); }
@Test public void afterSequenceNumberRequiresSequenceNumber() throws Exception { KinesisEndpoint endpoint = (KinesisEndpoint) camelContext.getEndpoint("aws-kinesis://some_stream_name" + "?amazonKinesisClient=#kinesisClient" + "&iteratorType=AFTER_SEQUENCE_NUMBER" + "&shardId=abc" + "&sequenceNumber=123" ); assertThat(endpoint.getClient(), is(amazonKinesisClient)); assertThat(endpoint.getStreamName(), is("some_stream_name")); assertThat(endpoint.getIteratorType(), is(ShardIteratorType.AFTER_SEQUENCE_NUMBER)); assertThat(endpoint.getShardId(), is("abc")); assertThat(endpoint.getSequenceNumber(), is("123")); }
@Test public void atSequenceNumberRequiresSequenceNumber() throws Exception { KinesisEndpoint endpoint = (KinesisEndpoint) camelContext.getEndpoint("aws-kinesis://some_stream_name" + "?amazonKinesisClient=#kinesisClient" + "&iteratorType=AT_SEQUENCE_NUMBER" + "&shardId=abc" + "&sequenceNumber=123" ); assertThat(endpoint.getClient(), is(amazonKinesisClient)); assertThat(endpoint.getStreamName(), is("some_stream_name")); assertThat(endpoint.getIteratorType(), is(ShardIteratorType.AT_SEQUENCE_NUMBER)); assertThat(endpoint.getShardId(), is("abc")); assertThat(endpoint.getSequenceNumber(), is("123")); }
/** * Get the records from the particular shard * @param streamName Name of the stream from where the records to be accessed * @param recordsLimit Number of records to return from shard * @param shId Shard Id of the shard * @param iteratorType Shard iterator type * @param seqNo Record sequence number * @return the list of records from the given shard * @throws AmazonClientException */ public List<Record> getRecords(String streamName, Integer recordsLimit, String shId, ShardIteratorType iteratorType, String seqNo) throws AmazonClientException { assert client != null : "Illegal client"; try { // Create the GetShardIteratorRequest instance and sets streamName, shardId and iteratorType to it GetShardIteratorRequest iteratorRequest = new GetShardIteratorRequest(); iteratorRequest.setStreamName(streamName); iteratorRequest.setShardId(shId); iteratorRequest.setShardIteratorType(iteratorType); // If the iteratorType is AFTER_SEQUENCE_NUMBER, set the sequence No to the iteratorRequest if (ShardIteratorType.AFTER_SEQUENCE_NUMBER.equals(iteratorType) || ShardIteratorType.AT_SEQUENCE_NUMBER.equals(iteratorType)) { iteratorRequest.setStartingSequenceNumber(seqNo); } // Get the Response from the getShardIterator service method & get the shardIterator from that response GetShardIteratorResult iteratorResponse = client.getShardIterator(iteratorRequest); // getShardIterator() specifies the position in the shard String iterator = iteratorResponse.getShardIterator(); // Create the GetRecordsRequest instance and set the recordsLimit and iterator GetRecordsRequest getRequest = new GetRecordsRequest(); getRequest.setLimit(recordsLimit); getRequest.setShardIterator(iterator); // Get the Response from the getRecords service method and get the data records from that response. GetRecordsResult getResponse = client.getRecords(getRequest); return getResponse.getRecords(); } catch (AmazonClientException e) { throw new RuntimeException(e); } }
/** * This method returns the iterator type of the given shard */ public ShardIteratorType getIteratorType(String shardId) { if (shardPosition.containsKey(shardId)) { return ShardIteratorType.AFTER_SEQUENCE_NUMBER; } return initialOffset.equalsIgnoreCase("earliest") ? ShardIteratorType.TRIM_HORIZON : ShardIteratorType.LATEST; }
@Override protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<KinesisConsumerProperties> properties) { KinesisConsumerProperties kinesisConsumerProperties = properties.getExtension(); Set<KinesisShardOffset> shardOffsets = null; String shardIteratorType = kinesisConsumerProperties.getShardIteratorType(); KinesisShardOffset kinesisShardOffset = KinesisShardOffset.latest(); if (StringUtils.hasText(shardIteratorType)) { String[] typeValue = shardIteratorType.split(":", 2); ShardIteratorType iteratorType = ShardIteratorType.valueOf(typeValue[0]); kinesisShardOffset = new KinesisShardOffset(iteratorType); if (typeValue.length > 1) { if (ShardIteratorType.AT_TIMESTAMP.equals(iteratorType)) { kinesisShardOffset.setTimestamp(new Date(Long.parseLong(typeValue[1]))); } else { kinesisShardOffset.setSequenceNumber(typeValue[1]); } } } if (properties.getInstanceCount() > 1) { shardOffsets = new HashSet<>(); KinesisConsumerDestination kinesisConsumerDestination = (KinesisConsumerDestination) destination; List<Shard> shards = kinesisConsumerDestination.getShards(); for (int i = 0; i < shards.size(); i++) { // divide shards across instances if ((i % properties.getInstanceCount()) == properties.getInstanceIndex()) { KinesisShardOffset shardOffset = new KinesisShardOffset(kinesisShardOffset); shardOffset.setStream(destination.getName()); shardOffset.setShard(shards.get(i).getShardId()); shardOffsets.add(shardOffset); } } } KinesisMessageDrivenChannelAdapter adapter; if (shardOffsets == null) { adapter = new KinesisMessageDrivenChannelAdapter(this.amazonKinesis, destination.getName()); } else { adapter = new KinesisMessageDrivenChannelAdapter(this.amazonKinesis, shardOffsets.toArray(new KinesisShardOffset[shardOffsets.size()])); } boolean anonymous = !StringUtils.hasText(group); String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group; adapter.setConsumerGroup(consumerGroup); adapter.setStreamInitialSequence( anonymous || StringUtils.hasText(shardIteratorType) ? kinesisShardOffset : KinesisShardOffset.trimHorizon()); adapter.setListenerMode(kinesisConsumerProperties.getListenerMode()); adapter.setCheckpointMode(kinesisConsumerProperties.getCheckpointMode()); adapter.setRecordsLimit(kinesisConsumerProperties.getRecordsLimit()); adapter.setIdleBetweenPolls(kinesisConsumerProperties.getIdleBetweenPolls()); adapter.setConsumerBackoff(kinesisConsumerProperties.getConsumerBackoff()); if (this.checkpointStore != null) { adapter.setCheckpointStore(this.checkpointStore); } adapter.setConcurrency(properties.getConcurrency()); adapter.setStartTimeout(kinesisConsumerProperties.getStartTimeout()); adapter.setDescribeStreamBackoff(this.configurationProperties.getDescribeStreamBackoff()); adapter.setDescribeStreamRetries(this.configurationProperties.getDescribeStreamRetries()); // Deffer byte[] conversion to the ReceivingHandler adapter.setConverter(null); return adapter; }
public ShardCheckpoint(String streamName, String shardId, StartingPoint startingPoint) { this(streamName, shardId, ShardIteratorType.fromValue(startingPoint.getPositionName()), startingPoint.getTimestamp()); }
public ShardCheckpoint(String streamName, String shardId, ShardIteratorType shardIteratorType, Instant timestamp) { this(streamName, shardId, shardIteratorType, null, null, timestamp); }
public ShardCheckpoint(String streamName, String shardId, ShardIteratorType shardIteratorType, String sequenceNumber, Long subSequenceNumber) { this(streamName, shardId, shardIteratorType, sequenceNumber, subSequenceNumber, null); }
public String getPositionName() { return position != null ? position.name() : ShardIteratorType.AT_TIMESTAMP.name(); }
private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, String sequenceNumber, Long subSequenceNumber) { return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, sequenceNumber, subSequenceNumber); }
private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, Instant timestamp) { return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, timestamp); }
public ShardIteratorType getIteratorType() { return iteratorType; }
public void setIteratorType(ShardIteratorType iteratorType) { this.iteratorType = iteratorType; }
private boolean hasSequenceNumber() { return !getEndpoint().getSequenceNumber().isEmpty() && (getEndpoint().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || getEndpoint().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER)); }