Java 类com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber 实例源码

项目:samza    文件:TestKinesisSystemConsumer.java   
private Map<String, KinesisRecordProcessor> createAndInitProcessors(IRecordProcessorFactory factory, int numShards) {
  Map<String, KinesisRecordProcessor> processorMap = new HashMap<>();
  IntStream.range(0, numShards)
      .forEach(p -> {
          String shardId = String.format("shard-%05d", p);
          // Create Kinesis processor
          KinesisRecordProcessor processor = (KinesisRecordProcessor) factory.createProcessor();

          // Initialize the shard
          ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
          InitializationInput initializationInput =
              new InitializationInput().withShardId(shardId).withExtendedSequenceNumber(seqNum);
          processor.initialize(initializationInput);
          processorMap.put(shardId, processor);
        });
  return processorMap;
}
项目:beam    文件:ShardCheckpoint.java   
private ExtendedSequenceNumber extendedSequenceNumber() {
  String fullSequenceNumber = sequenceNumber;
  if (fullSequenceNumber == null) {
    fullSequenceNumber = shardIteratorType.toString();
  }
  return new ExtendedSequenceNumber(fullSequenceNumber, subSequenceNumber);
}
项目:beam    文件:ShardCheckpointTest.java   
@Test
public void testComparisonWithExtendedSequenceNumber() {
  assertThat(new ShardCheckpoint("", "", new StartingPoint(LATEST)).isBeforeOrAt(
      recordWith(new ExtendedSequenceNumber("100", 0L))
  )).isTrue();

  assertThat(new ShardCheckpoint("", "", new StartingPoint(TRIM_HORIZON)).isBeforeOrAt(
      recordWith(new ExtendedSequenceNumber("100", 0L))
  )).isTrue();

  assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "10", 1L).isBeforeOrAt(
      recordWith(new ExtendedSequenceNumber("100", 0L))
  )).isTrue();

  assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
      recordWith(new ExtendedSequenceNumber("100", 0L))
  )).isTrue();

  assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
      recordWith(new ExtendedSequenceNumber("100", 0L))
  )).isFalse();

  assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 1L).isBeforeOrAt(
      recordWith(new ExtendedSequenceNumber("100", 0L))
  )).isFalse();

  assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
      recordWith(new ExtendedSequenceNumber("99", 1L))
  )).isFalse();
}
项目:samza    文件:KinesisRecordProcessor.java   
/**
 * Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the
 * application. Upon fail over, the new instance will get records with sequence number greater than the checkpoint
 * position for each partition key.
 *
 * @param processRecordsInput Provides the records to be processed as well as information and capabilities related
 *        to them (eg checkpointing).
 */
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
  // KCL does not send any records to the processor that was shutdown.
  Validate.isTrue(!shutdownRequested,
      String.format("KCL returned records after shutdown is called on the processor %s.", this));
  // KCL aways gives reference to the same checkpointer instance for a given processor instance.
  checkpointer = processRecordsInput.getCheckpointer();
  List<Record> records = processRecordsInput.getRecords();
  // Empty records are expected when KCL config has CallProcessRecordsEvenForEmptyRecordList set to true.
  if (!records.isEmpty()) {
    lastProcessedRecordSeqNumber = new ExtendedSequenceNumber(records.get(records.size() - 1).getSequenceNumber());
    listener.onReceiveRecords(ssp, records, processRecordsInput.getMillisBehindLatest());
  }
}
项目:beam    文件:KinesisRecord.java   
public ExtendedSequenceNumber getExtendedSequenceNumber() {
  return new ExtendedSequenceNumber(getSequenceNumber(), getSubSequenceNumber());
}
项目:beam    文件:ShardCheckpointTest.java   
private KinesisRecord recordWith(ExtendedSequenceNumber extendedSequenceNumber) {
  KinesisRecord record = mock(KinesisRecord.class);
  given(record.getExtendedSequenceNumber()).willReturn(extendedSequenceNumber);
  return record;
}