Java 类com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory 实例源码

项目:samza    文件:KinesisSystemConsumer.java   
IRecordProcessorFactory createRecordProcessorFactory(String stream) {
  return () -> {
    // This code is executed in Kinesis thread context.
    try {
      SystemStreamPartition ssp = sspAllocator.allocate(stream);
      KinesisRecordProcessor processor = new KinesisRecordProcessor(ssp, KinesisSystemConsumer.this);
      KinesisRecordProcessor prevProcessor = processors.put(ssp, processor);
      Validate.isTrue(prevProcessor == null, String.format("Adding new kinesis record processor %s while the"
              + " previous processor %s for the same ssp %s is still active.", processor, prevProcessor, ssp));
      return processor;
    } catch (Exception e) {
      callbackException = e;
      // This exception is the result of kinesis dynamic shard splits due to which sspAllocator ran out of free ssps.
      // Set the failed state in consumer which will eventually result in stopping the container. A manual job restart
      // will be required at this point. After the job restart, the newly created shards will be discovered and enough
      // ssps will be added to sspAllocator freePool.
      throw new SamzaException(e);
    }
  };
}
项目:samza    文件:TestKinesisSystemConsumer.java   
private Map<String, KinesisRecordProcessor> createAndInitProcessors(IRecordProcessorFactory factory, int numShards) {
  Map<String, KinesisRecordProcessor> processorMap = new HashMap<>();
  IntStream.range(0, numShards)
      .forEach(p -> {
          String shardId = String.format("shard-%05d", p);
          // Create Kinesis processor
          KinesisRecordProcessor processor = (KinesisRecordProcessor) factory.createProcessor();

          // Initialize the shard
          ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
          InitializationInput initializationInput =
              new InitializationInput().withShardId(shardId).withExtendedSequenceNumber(seqNum);
          processor.initialize(initializationInput);
          processorMap.put(shardId, processor);
        });
  return processorMap;
}
项目:aws-kinesis-zombies    文件:Consumer.java   
@Autowired
public Consumer(@Value("${stream}") String streamName,
        @Value("${region}") String region,
        IRecordProcessorFactory zombieRecordFactory) {
    this.streamName = streamName;
    this.region = region;
    this.zombieRecordFactory = zombieRecordFactory;
    this.initKinesis();
}
项目:datacollector    文件:KinesisSource.java   
private Worker createKinesisWorker(IRecordProcessorFactory recordProcessorFactory, int maxBatchSize) {
  KinesisClientLibConfiguration kclConfig =
      new KinesisClientLibConfiguration(
          conf.applicationName,
          conf.streamName,
          credentials,
          getWorkerId()
      );

  kclConfig
      .withMaxRecords(maxBatchSize)
      .withCallProcessRecordsEvenForEmptyRecordList(false)
      .withIdleTimeBetweenReadsInMillis(conf.idleTimeBetweenReads)
      .withInitialPositionInStream(conf.initialPositionInStream)
      .withKinesisClientConfig(clientConfiguration);

  if (conf.initialPositionInStream == InitialPositionInStream.AT_TIMESTAMP) {
    kclConfig.withTimestampAtInitialPositionInStream(new Date(conf.initialTimestamp));
  }

  if (conf.region == AWSRegions.OTHER) {
    kclConfig.withKinesisEndpoint(conf.endpoint);
  } else {
    kclConfig.withRegionName(conf.region.getLabel());
  }

  return new Worker.Builder()
      .recordProcessorFactory(recordProcessorFactory)
      .metricsFactory(metricsFactory)
      .dynamoDBClient(dynamoDBClient)
      .cloudWatchClient(cloudWatchClient)
      .execService(executor)
      .config(kclConfig)
      .build();
}
项目:samza    文件:TestKinesisSystemConsumer.java   
/**
 * Helper to simulate and test the life-cycle of record processing from a kinesis stream with a given number of shards
 * 1. Creation of record processors.
 * 2. Initialization of record processors.
 * 3. Processing records via record processors.
 * 4. Calling checkpoint on record processors.
 * 5. Shutting down (due to re-assignment or lease expiration) record processors.
 */
private void testProcessRecordsHelper(String system, String stream, int numShards, int numRecordsPerShard)
    throws InterruptedException, ShutdownException, InvalidStateException,
           NoSuchFieldException, IllegalAccessException {

  KinesisConfig kConfig = new KinesisConfig(new MapConfig());
  // Create consumer
  KinesisSystemConsumer consumer = new KinesisSystemConsumer(system, kConfig, new NoOpMetricsRegistry());
  initializeMetrics(consumer, stream);

  List<SystemStreamPartition> ssps = new LinkedList<>();
  IntStream.range(0, numShards)
      .forEach(p -> {
          SystemStreamPartition ssp = new SystemStreamPartition(system, stream, new Partition(p));
          ssps.add(ssp);
        });
  ssps.forEach(ssp -> consumer.register(ssp, SYSTEM_CONSUMER_REGISTER_OFFSET));

  // Create Kinesis record processor factory
  IRecordProcessorFactory factory = consumer.createRecordProcessorFactory(stream);

  // Create and initialize Kinesis record processor
  Map<String, KinesisRecordProcessor> processorMap = createAndInitProcessors(factory, numShards);
  List<KinesisRecordProcessor> processorList = new ArrayList<>(processorMap.values());

  // Generate records to Kinesis record processor
  Map<KinesisRecordProcessor, List<Record>> inputRecordMap = generateRecords(numRecordsPerShard, processorList);

  // Verification steps

  // Read events from the BEM queue
  Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages =
      readEvents(new HashSet<>(ssps), consumer, numRecordsPerShard);
  if (numRecordsPerShard > 0) {
    Assert.assertEquals(messages.size(), numShards);
  } else {
    // No input records and hence no messages
    Assert.assertEquals(messages.size(), 0);
    return;
  }

  Map<SystemStreamPartition, KinesisRecordProcessor> sspToProcessorMap = getProcessorMap(consumer);
  ssps.forEach(ssp -> {
      try {
        KinesisRecordProcessor processor = sspToProcessorMap.get(ssp);

        if (numRecordsPerShard > 0) {
          // Verify that the read messages are received in order and are the same as input records
          Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard);
          List<IncomingMessageEnvelope> envelopes = messages.get(ssp);
          List<Record> inputRecords = inputRecordMap.get(processor);
          verifyRecords(envelopes, inputRecords, processor.getShardId());

          // Call checkpoint on consumer and verify that the checkpoint is called with the right offset
          IncomingMessageEnvelope lastEnvelope = envelopes.get(envelopes.size() - 1);
          consumer.onCheckpoint(Collections.singletonMap(ssp, lastEnvelope.getOffset()));
          ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
          verify(getCheckpointer(processor)).checkpoint(argument.capture());
          Assert.assertEquals(inputRecords.get(inputRecords.size() - 1).getSequenceNumber(), argument.getValue());
        }

        // Call shutdown (with ZOMBIE reason) on processor and verify if shutdown freed the ssp mapping
        shutDownProcessor(processor, ShutdownReason.ZOMBIE);
        Assert.assertTrue(!sspToProcessorMap.containsValue(processor));
        Assert.assertTrue(isSspAvailable(consumer, ssp));
      } catch (NoSuchFieldException | IllegalAccessException | InvalidStateException | ShutdownException ex) {
        throw new RuntimeException(ex);
      }
    });
}