Java 类com.amazonaws.services.kinesis.model.ShardIteratorType 实例源码

项目:beam    文件:SimplifiedKinesisClient.java   
public String getShardIterator(final String streamName, final String shardId,
    final ShardIteratorType shardIteratorType,
    final String startingSequenceNumber, final Instant timestamp)
    throws TransientKinesisException {
  final Date date = timestamp != null ? timestamp.toDate() : null;
  return wrapExceptions(new Callable<String>() {

    @Override
    public String call() throws Exception {
      return kinesis.getShardIterator(new GetShardIteratorRequest()
          .withStreamName(streamName)
          .withShardId(shardId)
          .withShardIteratorType(shardIteratorType)
          .withStartingSequenceNumber(startingSequenceNumber)
          .withTimestamp(date)
      ).getShardIterator();
    }
  });
}
项目:beam    文件:ShardCheckpoint.java   
private ShardCheckpoint(String streamName, String shardId, ShardIteratorType shardIteratorType,
    String sequenceNumber, Long subSequenceNumber, Instant timestamp) {
  this.shardIteratorType = checkNotNull(shardIteratorType, "shardIteratorType");
  this.streamName = checkNotNull(streamName, "streamName");
  this.shardId = checkNotNull(shardId, "shardId");
  if (shardIteratorType == AT_SEQUENCE_NUMBER || shardIteratorType == AFTER_SEQUENCE_NUMBER) {
    checkNotNull(sequenceNumber,
        "You must provide sequence number for AT_SEQUENCE_NUMBER"
            + " or AFTER_SEQUENCE_NUMBER");
  } else {
    checkArgument(sequenceNumber == null,
        "Sequence number must be null for LATEST, TRIM_HORIZON or AT_TIMESTAMP");
  }
  if (shardIteratorType == AT_TIMESTAMP) {
    checkNotNull(timestamp,
        "You must provide timestamp for AT_TIMESTAMP");
  } else {
    checkArgument(timestamp == null,
        "Timestamp must be null for an iterator type other than AT_TIMESTAMP");
  }

  this.subSequenceNumber = subSequenceNumber;
  this.sequenceNumber = sequenceNumber;
  this.timestamp = timestamp;
}
项目:beam    文件:SimplifiedKinesisClientTest.java   
@Test
public void shouldReturnIteratorStartingWithTimestamp() throws Exception {
  Instant timestamp = Instant.now();
  given(kinesis.getShardIterator(new GetShardIteratorRequest()
      .withStreamName(STREAM)
      .withShardId(SHARD_1)
      .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
      .withTimestamp(timestamp.toDate())
  )).willReturn(new GetShardIteratorResult()
      .withShardIterator(SHARD_ITERATOR));

  String stream = underTest.getShardIterator(STREAM, SHARD_1,
      ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp);

  assertThat(stream).isEqualTo(SHARD_ITERATOR);
}
项目:beam    文件:SimplifiedKinesisClientTest.java   
private void shouldHandleGetShardIteratorError(
    Exception thrownException,
    Class<? extends Exception> expectedExceptionClass) {
  GetShardIteratorRequest request = new GetShardIteratorRequest()
      .withStreamName(STREAM)
      .withShardId(SHARD_1)
      .withShardIteratorType(ShardIteratorType.LATEST);

  given(kinesis.getShardIterator(request)).willThrow(thrownException);

  try {
    underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null);
    failBecauseExceptionWasNotThrown(expectedExceptionClass);
  } catch (Exception e) {
    assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
  } finally {
    reset(kinesis);
  }
}
项目:flink    文件:ShardConsumer.java   
/**
 * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
 * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
 * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
 * be used for the next call to this method.
 *
 * <p>Note: it is important that this method is not called again before all the records from the last result have been
 * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
 * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
 * incorrect shard iteration if the iterator had to be refreshed.
 *
 * @param shardItr shard iterator to use
 * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
 * @return get records result
 * @throws InterruptedException
 */
private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
    GetRecordsResult getRecordsResult = null;
    while (getRecordsResult == null) {
        try {
            getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);

            // Update millis behind latest so it gets reported by the millisBehindLatest gauge
            shardMetricsReporter.setMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
        } catch (ExpiredIteratorException eiEx) {
            LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
                " refreshing the iterator ...", shardItr, subscribedShard);
            shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());

            // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
            if (fetchIntervalMillis != 0) {
                Thread.sleep(fetchIntervalMillis);
            }
        }
    }
    return getRecordsResult;
}
项目:flink    文件:ShardConsumer.java   
/**
 * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
 * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
 * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
 * be used for the next call to this method.
 *
 * Note: it is important that this method is not called again before all the records from the last result have been
 * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
 * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
 * incorrect shard iteration if the iterator had to be refreshed.
 *
 * @param shardItr shard iterator to use
 * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
 * @return get records result
 * @throws InterruptedException
 */
private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
    GetRecordsResult getRecordsResult = null;
    while (getRecordsResult == null) {
        try {
            getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
        } catch (ExpiredIteratorException eiEx) {
            LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
                " refreshing the iterator ...", shardItr, subscribedShard);
            shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());

            // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
            if (fetchIntervalMillis != 0) {
                Thread.sleep(fetchIntervalMillis);
            }
        }
    }
    return getRecordsResult;
}
项目:Camel    文件:KinesisConsumerTest.java   
@Before
public void setup() throws Exception {
    KinesisEndpoint endpoint = new KinesisEndpoint(null, "streamName", component);
    endpoint.setAmazonKinesisClient(kinesisClient);
    endpoint.setIteratorType(ShardIteratorType.LATEST);
    undertest = new KinesisConsumer(endpoint, processor);

    when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
        .thenReturn(new GetRecordsResult()
            .withNextShardIterator("nextShardIterator")
        );
    when(kinesisClient.describeStream(any(DescribeStreamRequest.class)))
        .thenReturn(new DescribeStreamResult()
            .withStreamDescription(new StreamDescription()
                .withShards(new Shard().withShardId("shardId"))
            )
        );
    when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
        .thenReturn(new GetShardIteratorResult()
            .withShardIterator("shardIterator")
        );
}
项目:Camel    文件:KinesisConsumerTest.java   
@Test
public void itObtainsAShardIteratorOnFirstPollForSequenceNumber() throws Exception {
    undertest.getEndpoint().setSequenceNumber("12345");
    undertest.getEndpoint().setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);

    undertest.poll();

    final ArgumentCaptor<DescribeStreamRequest> describeStreamReqCap = ArgumentCaptor.forClass(DescribeStreamRequest.class);
    final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = ArgumentCaptor.forClass(GetShardIteratorRequest.class);

    verify(kinesisClient).describeStream(describeStreamReqCap.capture());
    assertThat(describeStreamReqCap.getValue().getStreamName(), is("streamName"));

    verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture());
    assertThat(getShardIteratorReqCap.getValue().getStreamName(), is("streamName"));
    assertThat(getShardIteratorReqCap.getValue().getShardId(), is("shardId"));
    assertThat(getShardIteratorReqCap.getValue().getShardIteratorType(), is("AFTER_SEQUENCE_NUMBER"));
    assertThat(getShardIteratorReqCap.getValue().getStartingSequenceNumber(), is("12345"));

}
项目:Camel    文件:KinesisEndpointTest.java   
@Test
public void allTheEndpointParams() throws Exception {
    KinesisEndpoint endpoint = (KinesisEndpoint) camelContext.getEndpoint("aws-kinesis://some_stream_name"
            + "?amazonKinesisClient=#kinesisClient"
            + "&maxResultsPerRequest=101"
            + "&iteratorType=latest"
            + "&shardId=abc"
            + "&sequenceNumber=123"
    );

    assertThat(endpoint.getClient(), is(amazonKinesisClient));
    assertThat(endpoint.getStreamName(), is("some_stream_name"));
    assertThat(endpoint.getIteratorType(), is(ShardIteratorType.LATEST));
    assertThat(endpoint.getMaxResultsPerRequest(), is(101));
    assertThat(endpoint.getSequenceNumber(), is("123"));
    assertThat(endpoint.getShardId(), is("abc"));
}
项目:spring-cloud-stream-binder-aws-kinesis    文件:KinesisBinderTests.java   
@Test
public void testAutoCreateStreamForNonExistingStream() throws Exception {
    KinesisTestBinder binder = getBinder();
    DirectChannel output = createBindableChannel("output", new BindingProperties());
    ExtendedConsumerProperties<KinesisConsumerProperties> consumerProperties = createConsumerProperties();
    Date testDate = new Date();
    consumerProperties.getExtension()
            .setShardIteratorType(ShardIteratorType.AT_TIMESTAMP.name() + ":" + testDate.getTime());
    String testStreamName = "nonexisting" + System.currentTimeMillis();
    Binding<?> binding = binder.bindConsumer(testStreamName, "test", output, consumerProperties);
    binding.unbind();

    DescribeStreamResult streamResult = localKinesisResource.getResource().describeStream(testStreamName);
    String createdStreamName = streamResult.getStreamDescription().getStreamName();
    int createdShards = streamResult.getStreamDescription().getShards().size();
    String createdStreamStatus = streamResult.getStreamDescription().getStreamStatus();

    assertThat(createdStreamName).isEqualTo(testStreamName);
    assertThat(createdShards)
            .isEqualTo(consumerProperties.getInstanceCount() * consumerProperties.getConcurrency());
    assertThat(createdStreamStatus).isEqualTo(StreamStatus.ACTIVE.toString());

    KinesisShardOffset shardOffset =
            TestUtils.getPropertyValue(binding, "lifecycle.streamInitialSequence", KinesisShardOffset.class);
    assertThat(shardOffset.getIteratorType()).isEqualTo(ShardIteratorType.AT_TIMESTAMP);
    assertThat(shardOffset.getTimestamp()).isEqualTo(testDate);
}
项目:kafka-connect-kinesis    文件:KinesisSourceTask.java   
@Override
public void start(Map<String, String> settings) {
  this.config = new KinesisSourceConnectorConfig(settings);
  this.kinesisClient = this.kinesisClientFactory.create(this.config);
  this.sourcePartition = ImmutableMap.of(RecordConverter.FIELD_SHARD_ID, this.config.kinesisShardId);

  Map<String, Object> lastOffset = this.context.offsetStorageReader().offset(this.sourcePartition);

  GetShardIteratorRequest shardIteratorRequest = new GetShardIteratorRequest()
      .withShardId(this.config.kinesisShardId)
      .withStreamName(this.config.kinesisStreamName);

  if (null != lastOffset && !lastOffset.isEmpty()) {
    String startingSequenceNumber = (String) lastOffset.get(RecordConverter.FIELD_SEQUENCE_NUMBER);
    log.info("start() - Starting iterator after last processed sequence number of '{}'", startingSequenceNumber);
    shardIteratorRequest.setShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
    shardIteratorRequest.setStartingSequenceNumber(startingSequenceNumber);
  } else {
    log.info("start() - Setting Shard Iterator Type to {} for {}", this.config.kinesisPosition, this.config.kinesisShardId);
    shardIteratorRequest.setShardIteratorType(this.config.kinesisPosition);
  }

  GetShardIteratorResult shardIteratorResult = this.kinesisClient.getShardIterator(shardIteratorRequest);
  log.info("start() - Using Shard Iterator {}", shardIteratorResult.getShardIterator());

  this.recordsRequest = new GetRecordsRequest()
      .withLimit(this.config.kinesisRecordLimit)
      .withShardIterator(shardIteratorResult.getShardIterator());

  this.recordConverter = new RecordConverter(this.config);
}
项目:kafka-connect-kinesis    文件:KinesisSourceConnectorConfig.java   
public KinesisSourceConnectorConfig(Map<String, String> parsedConfig) {
  super(config(), parsedConfig);
  this.awsAccessKeyId = this.getString(AWS_ACCESS_KEY_ID_CONF);
  this.awsSecretKeyId = this.getPassword(AWS_SECRET_KEY_ID_CONF).value();
  this.kafkaTopic = this.getString(TOPIC_CONF);
  this.kinesisStreamName = this.getString(STREAM_NAME_CONF);
  this.kinesisPosition = ConfigUtils.getEnum(ShardIteratorType.class, this, KINESIS_POSISTION_CONF);
  this.kinesisRegion = ConfigUtils.getEnum(Regions.class, this, KINESIS_REGION_CONF);
  this.kinesisShardId = this.getString(KINESIS_SHARD_ID_CONF);
  this.kinesisRecordLimit = this.getInt(KINESIS_RECORD_LIMIT_CONF);
  this.kinesisEmptyRecordsBackoffMs = this.getLong(KINESIS_EMPTY_RECORDS_BACKOFF_MS_CONF);
  this.kinesisThroughputExceededBackoffMs = this.getLong(KINESIS_THROUGHPUT_EXCEEDED_BACKOFF_MS_CONF);
}
项目:kafka-connect-kinesis    文件:KinesisSourceConnectorConfig.java   
public static ConfigDef config() {
  return new ConfigDef()
      .define(AWS_ACCESS_KEY_ID_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, AWS_ACCESS_KEY_ID_DOC)
      .define(AWS_SECRET_KEY_ID_CONF, ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, AWS_SECRET_KEY_ID_DOC)
      .define(TOPIC_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
      .define(STREAM_NAME_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, STREAM_NAME_DOC)
      .define(KINESIS_POSISTION_CONF, ConfigDef.Type.STRING, ShardIteratorType.TRIM_HORIZON.toString(), ValidEnum.of(ShardIteratorType.class), ConfigDef.Importance.MEDIUM, KINESIS_POSISTION_DOC)
      .define(KINESIS_REGION_CONF, ConfigDef.Type.STRING, Regions.US_EAST_1.toString(), ValidEnum.of(Regions.class), ConfigDef.Importance.MEDIUM, KINESIS_REGION_DOC)
      .define(KINESIS_SHARD_ID_CONF, ConfigDef.Type.STRING, ".*", ConfigDef.Importance.HIGH, KINESIS_SHARD_ID_DOC)
      .define(KINESIS_RECORD_LIMIT_CONF, ConfigDef.Type.INT, 500, ConfigDef.Range.between(1, 10000), ConfigDef.Importance.MEDIUM, KINESIS_RECORD_LIMIT_DOC)
      .define(KINESIS_EMPTY_RECORDS_BACKOFF_MS_CONF, ConfigDef.Type.LONG, 5000L, ConfigDef.Range.between(500, Integer.MAX_VALUE), ConfigDef.Importance.MEDIUM, KINESIS_EMPTY_RECORDS_BACKOFF_MS_DOC)
      .define(KINESIS_THROUGHPUT_EXCEEDED_BACKOFF_MS_CONF, ConfigDef.Type.LONG, 10 * 1000L, ConfigDef.Range.between(500, Integer.MAX_VALUE), ConfigDef.Importance.MEDIUM, KINESIS_THROUGHPUT_EXCEEDED_BACKOFF_MS_DOC);
}
项目:kafka-connect-kinesis    文件:KinesisSourceTaskTest.java   
@Test
public void sourceOffsets() throws InterruptedException {
  final String SEQUENCE_NUMBER = "asdfasdfddsa";
  Map<String, Object> sourceOffset = ImmutableMap.of(RecordConverter.FIELD_SEQUENCE_NUMBER, SEQUENCE_NUMBER);
  when(this.offsetStorageReader.offset(anyMap())).thenReturn(sourceOffset);
  when(this.kinesisClient.getShardIterator(any())).thenReturn(
      new GetShardIteratorResult().withShardIterator("dfasdfsadfasdf")
  );
  this.task.start(settings);

  GetRecordsResult recordsResult = new GetRecordsResult()
      .withNextShardIterator("dsfargadsfasdfasda")
      .withRecords(TestData.record())
      .withMillisBehindLatest(0L);

  when(this.kinesisClient.getRecords(any())).thenReturn(recordsResult);

  List<SourceRecord> records = this.task.poll();

  assertNotNull(records, "records should not be null.");
  assertFalse(records.isEmpty(), "records should not be empty.");

  verify(this.offsetStorageReader, atLeastOnce()).offset(anyMap());

  GetShardIteratorRequest expectedIteratorRequest = new GetShardIteratorRequest()
      .withShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
      .withShardId(this.config.kinesisShardId)
      .withStreamName(this.config.kinesisStreamName)
      .withStartingSequenceNumber(SEQUENCE_NUMBER);

  verify(this.kinesisClient, atLeastOnce()).getShardIterator(expectedIteratorRequest);
}
项目:beam    文件:SimplifiedKinesisClientTest.java   
@Test
public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception {
  given(kinesis.getShardIterator(new GetShardIteratorRequest()
      .withStreamName(STREAM)
      .withShardId(SHARD_1)
      .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
      .withStartingSequenceNumber(SEQUENCE_NUMBER)
  )).willReturn(new GetShardIteratorResult()
      .withShardIterator(SHARD_ITERATOR));

  String stream = underTest.getShardIterator(STREAM, SHARD_1,
      ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null);

  assertThat(stream).isEqualTo(SHARD_ITERATOR);
}
项目:beam    文件:AmazonKinesisMock.java   
@Override
public GetShardIteratorResult getShardIterator(
    GetShardIteratorRequest getShardIteratorRequest) {
  ShardIteratorType shardIteratorType = ShardIteratorType.fromValue(
      getShardIteratorRequest.getShardIteratorType());

  String shardIterator;
  if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) {
    shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0);
  } else {
    throw new RuntimeException("Not implemented");
  }

  return new GetShardIteratorResult().withShardIterator(shardIterator);
}
项目:flink    文件:KinesisProxy.java   
/**
 * {@inheritDoc}
 */
@Override
public String getShardIterator(StreamShardHandle shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException {
    GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest()
        .withStreamName(shard.getStreamName())
        .withShardId(shard.getShard().getShardId())
        .withShardIteratorType(shardIteratorType);

    switch (ShardIteratorType.fromValue(shardIteratorType)) {
        case TRIM_HORIZON:
        case LATEST:
            break;
        case AT_TIMESTAMP:
            if (startingMarker instanceof Date) {
                getShardIteratorRequest.setTimestamp((Date) startingMarker);
            } else {
                throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_TIMESTAMP. Must be a Date object.");
            }
            break;
        case AT_SEQUENCE_NUMBER:
        case AFTER_SEQUENCE_NUMBER:
            if (startingMarker instanceof String) {
                getShardIteratorRequest.setStartingSequenceNumber((String) startingMarker);
            } else {
                throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER. Must be a String.");
            }
    }
    return getShardIterator(getShardIteratorRequest);
}
项目:Camel    文件:KinesisEndpoint.java   
@Override
protected void doStart() throws Exception {
    if ((iteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || iteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) && sequenceNumber.isEmpty()) {
        throw new IllegalArgumentException("Sequence Number must be specified with iterator Types AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER");
    }
    super.doStart();
}
项目:Camel    文件:KinesisEndpointTest.java   
@Test
public void onlyRequiredEndpointParams() throws Exception {
    KinesisEndpoint endpoint = (KinesisEndpoint) camelContext.getEndpoint("aws-kinesis://some_stream_name"
            + "?amazonKinesisClient=#kinesisClient"
    );

    assertThat(endpoint.getClient(), is(amazonKinesisClient));
    assertThat(endpoint.getStreamName(), is("some_stream_name"));
    assertThat(endpoint.getIteratorType(), is(ShardIteratorType.TRIM_HORIZON));
    assertThat(endpoint.getMaxResultsPerRequest(), is(1));
}
项目:Camel    文件:KinesisEndpointTest.java   
@Test
public void afterSequenceNumberRequiresSequenceNumber() throws Exception {
    KinesisEndpoint endpoint = (KinesisEndpoint) camelContext.getEndpoint("aws-kinesis://some_stream_name"
            + "?amazonKinesisClient=#kinesisClient"
            + "&iteratorType=AFTER_SEQUENCE_NUMBER"
            + "&shardId=abc"
            + "&sequenceNumber=123"
    );

    assertThat(endpoint.getClient(), is(amazonKinesisClient));
    assertThat(endpoint.getStreamName(), is("some_stream_name"));
    assertThat(endpoint.getIteratorType(), is(ShardIteratorType.AFTER_SEQUENCE_NUMBER));
    assertThat(endpoint.getShardId(), is("abc"));
    assertThat(endpoint.getSequenceNumber(), is("123"));
}
项目:Camel    文件:KinesisEndpointTest.java   
@Test
public void atSequenceNumberRequiresSequenceNumber() throws Exception {
    KinesisEndpoint endpoint = (KinesisEndpoint) camelContext.getEndpoint("aws-kinesis://some_stream_name"
            + "?amazonKinesisClient=#kinesisClient"
            + "&iteratorType=AT_SEQUENCE_NUMBER"
            + "&shardId=abc"
            + "&sequenceNumber=123"
    );

    assertThat(endpoint.getClient(), is(amazonKinesisClient));
    assertThat(endpoint.getStreamName(), is("some_stream_name"));
    assertThat(endpoint.getIteratorType(), is(ShardIteratorType.AT_SEQUENCE_NUMBER));
    assertThat(endpoint.getShardId(), is("abc"));
    assertThat(endpoint.getSequenceNumber(), is("123"));
}
项目:apex-malhar    文件:KinesisUtil.java   
/**
 * Get the records from the particular shard
 * @param streamName Name of the stream from where the records to be accessed
 * @param recordsLimit Number of records to return from shard
 * @param shId Shard Id of the shard
 * @param iteratorType Shard iterator type
 * @param seqNo Record sequence number
 * @return the list of records from the given shard
 * @throws AmazonClientException
 */
public List<Record> getRecords(String streamName, Integer recordsLimit, String shId, ShardIteratorType iteratorType, String seqNo)
  throws AmazonClientException
{
  assert client != null : "Illegal client";
  try {
    // Create the GetShardIteratorRequest instance and sets streamName, shardId and iteratorType to it
    GetShardIteratorRequest iteratorRequest = new GetShardIteratorRequest();
    iteratorRequest.setStreamName(streamName);
    iteratorRequest.setShardId(shId);
    iteratorRequest.setShardIteratorType(iteratorType);

    // If the iteratorType is AFTER_SEQUENCE_NUMBER, set the sequence No to the iteratorRequest
    if (ShardIteratorType.AFTER_SEQUENCE_NUMBER.equals(iteratorType) ||
        ShardIteratorType.AT_SEQUENCE_NUMBER.equals(iteratorType)) {
      iteratorRequest.setStartingSequenceNumber(seqNo);
    }
    // Get the Response from the getShardIterator service method & get the shardIterator from that response
    GetShardIteratorResult iteratorResponse = client.getShardIterator(iteratorRequest);
    // getShardIterator() specifies the position in the shard
    String iterator = iteratorResponse.getShardIterator();

    // Create the GetRecordsRequest instance and set the recordsLimit and iterator
    GetRecordsRequest getRequest = new GetRecordsRequest();
    getRequest.setLimit(recordsLimit);
    getRequest.setShardIterator(iterator);

    // Get the Response from the getRecords service method and get the data records from that response.
    GetRecordsResult getResponse = client.getRecords(getRequest);
    return getResponse.getRecords();
  } catch (AmazonClientException e) {
    throw new RuntimeException(e);
  }
}
项目:apex-malhar    文件:KinesisConsumer.java   
/**
 * This method returns the iterator type of the given shard
 */
public ShardIteratorType getIteratorType(String shardId)
{
  if (shardPosition.containsKey(shardId)) {
    return ShardIteratorType.AFTER_SEQUENCE_NUMBER;
  }
  return initialOffset.equalsIgnoreCase("earliest") ? ShardIteratorType.TRIM_HORIZON : ShardIteratorType.LATEST;
}
项目:spring-cloud-stream-binder-aws-kinesis    文件:KinesisMessageChannelBinder.java   
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
        ExtendedConsumerProperties<KinesisConsumerProperties> properties) {

    KinesisConsumerProperties kinesisConsumerProperties = properties.getExtension();

    Set<KinesisShardOffset> shardOffsets = null;

    String shardIteratorType = kinesisConsumerProperties.getShardIteratorType();

    KinesisShardOffset kinesisShardOffset = KinesisShardOffset.latest();

    if (StringUtils.hasText(shardIteratorType)) {
        String[] typeValue = shardIteratorType.split(":", 2);
        ShardIteratorType iteratorType = ShardIteratorType.valueOf(typeValue[0]);
        kinesisShardOffset = new KinesisShardOffset(iteratorType);
        if (typeValue.length > 1) {
            if (ShardIteratorType.AT_TIMESTAMP.equals(iteratorType)) {
                kinesisShardOffset.setTimestamp(new Date(Long.parseLong(typeValue[1])));
            }
            else {
                kinesisShardOffset.setSequenceNumber(typeValue[1]);
            }
        }
    }

    if (properties.getInstanceCount() > 1) {
        shardOffsets = new HashSet<>();
        KinesisConsumerDestination kinesisConsumerDestination = (KinesisConsumerDestination) destination;
        List<Shard> shards = kinesisConsumerDestination.getShards();
        for (int i = 0; i < shards.size(); i++) {
            // divide shards across instances
            if ((i % properties.getInstanceCount()) == properties.getInstanceIndex()) {
                KinesisShardOffset shardOffset = new KinesisShardOffset(kinesisShardOffset);
                shardOffset.setStream(destination.getName());
                shardOffset.setShard(shards.get(i).getShardId());
                shardOffsets.add(shardOffset);
            }
        }
    }

    KinesisMessageDrivenChannelAdapter adapter;

    if (shardOffsets == null) {
        adapter = new KinesisMessageDrivenChannelAdapter(this.amazonKinesis, destination.getName());
    }
    else {
        adapter = new KinesisMessageDrivenChannelAdapter(this.amazonKinesis,
                shardOffsets.toArray(new KinesisShardOffset[shardOffsets.size()]));
    }

    boolean anonymous = !StringUtils.hasText(group);
    String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
    adapter.setConsumerGroup(consumerGroup);

    adapter.setStreamInitialSequence(
            anonymous || StringUtils.hasText(shardIteratorType)
                    ? kinesisShardOffset
                    : KinesisShardOffset.trimHorizon());

    adapter.setListenerMode(kinesisConsumerProperties.getListenerMode());
    adapter.setCheckpointMode(kinesisConsumerProperties.getCheckpointMode());
    adapter.setRecordsLimit(kinesisConsumerProperties.getRecordsLimit());
    adapter.setIdleBetweenPolls(kinesisConsumerProperties.getIdleBetweenPolls());
    adapter.setConsumerBackoff(kinesisConsumerProperties.getConsumerBackoff());

    if (this.checkpointStore != null) {
        adapter.setCheckpointStore(this.checkpointStore);
    }
    adapter.setConcurrency(properties.getConcurrency());
    adapter.setStartTimeout(kinesisConsumerProperties.getStartTimeout());
    adapter.setDescribeStreamBackoff(this.configurationProperties.getDescribeStreamBackoff());
    adapter.setDescribeStreamRetries(this.configurationProperties.getDescribeStreamRetries());

    // Deffer byte[] conversion to the ReceivingHandler
    adapter.setConverter(null);

    return adapter;
}
项目:beam    文件:ShardCheckpoint.java   
public ShardCheckpoint(String streamName, String shardId, StartingPoint
    startingPoint) {
  this(streamName, shardId,
      ShardIteratorType.fromValue(startingPoint.getPositionName()),
      startingPoint.getTimestamp());
}
项目:beam    文件:ShardCheckpoint.java   
public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
    shardIteratorType, Instant timestamp) {
  this(streamName, shardId, shardIteratorType, null, null, timestamp);
}
项目:beam    文件:ShardCheckpoint.java   
public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
    shardIteratorType, String sequenceNumber, Long subSequenceNumber) {
  this(streamName, shardId, shardIteratorType, sequenceNumber, subSequenceNumber, null);
}
项目:beam    文件:StartingPoint.java   
public String getPositionName() {
  return position != null ? position.name() : ShardIteratorType.AT_TIMESTAMP.name();
}
项目:beam    文件:ShardCheckpointTest.java   
private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, String sequenceNumber,
    Long subSequenceNumber) {
  return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, sequenceNumber,
      subSequenceNumber);
}
项目:beam    文件:ShardCheckpointTest.java   
private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, Instant timestamp) {
  return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, timestamp);
}
项目:Camel    文件:KinesisEndpoint.java   
public ShardIteratorType getIteratorType() {
    return iteratorType;
}
项目:Camel    文件:KinesisEndpoint.java   
public void setIteratorType(ShardIteratorType iteratorType) {
    this.iteratorType = iteratorType;
}
项目:Camel    文件:KinesisConsumer.java   
private boolean hasSequenceNumber() {
    return !getEndpoint().getSequenceNumber().isEmpty()
            && (getEndpoint().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
                || getEndpoint().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
}