Java 类com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput 实例源码

项目:samza    文件:TestKinesisSystemConsumer.java   
private Map<String, KinesisRecordProcessor> createAndInitProcessors(IRecordProcessorFactory factory, int numShards) {
  Map<String, KinesisRecordProcessor> processorMap = new HashMap<>();
  IntStream.range(0, numShards)
      .forEach(p -> {
          String shardId = String.format("shard-%05d", p);
          // Create Kinesis processor
          KinesisRecordProcessor processor = (KinesisRecordProcessor) factory.createProcessor();

          // Initialize the shard
          ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
          InitializationInput initializationInput =
              new InitializationInput().withShardId(shardId).withExtendedSequenceNumber(seqNum);
          processor.initialize(initializationInput);
          processorMap.put(shardId, processor);
        });
  return processorMap;
}
项目:samza    文件:KinesisRecordProcessor.java   
/**
 * Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance
 * (via processRecords).
 *
 * @param initializationInput Provides information related to initialization
 */
@Override
public void initialize(InitializationInput initializationInput) {
  Validate.isTrue(listener != null, "There is no listener set for the processor.");
  initSeqNumber = initializationInput.getExtendedSequenceNumber();
  shardId = initializationInput.getShardId();
  LOG.info("Initialization done for {} with sequence {}", this,
      initializationInput.getExtendedSequenceNumber().getSequenceNumber());
}
项目:datacollector    文件:StreamSetsRecordProcessor.java   
/**
 * {@inheritDoc}
 */
@Override
public void initialize(InitializationInput initializationInput) {
  shardId = initializationInput.getShardId();
  if (LOG.isDebugEnabled()) {
    LOG.debug("Initializing record processor at: {}", initializationInput.getExtendedSequenceNumber().toString());
    LOG.debug("Initializing record processor for shard: {}", shardId);
  }
}
项目:zipkin-aws    文件:KinesisSpanProcessor.java   
@Override
public void initialize(InitializationInput initializationInput) {
}
项目:aws-kinesis-zombies    文件:ZombieRecordProcessor.java   
@Override
public void initialize(InitializationInput initializationInput) {
    log.info("Processing from shard {} beginning with subsequence {}.", 
             initializationInput.getShardId(), initializationInput.getExtendedSequenceNumber().getSubSequenceNumber());
}
项目:lumber-mill    文件:RecordProcessor.java   
@Override
public void initialize(InitializationInput initializationInput) {
    LOG.info("Init RecordProcessor " + initializationInput.getShardId());
    this.kinesisShardId = initializationInput.getShardId();
}