Java 类com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason 实例源码

项目:lumber-mill    文件:RecordProcessor.java   
@Override
public void shutdown(ShutdownInput input) {

    Thread.currentThread().setName(kinesisShardId);
    if (input.getShutdownReason() == ShutdownReason.ZOMBIE) {
        /* This happens because we have lost our lease. Either because of re-balancing
        * (there is another NomNom running on another host), or because we have failed
        * failed to renew our leases, which would happen if we are too busy.
        *
        * It happens when a new version of NomNom is deployed, since the two versions
        * will run side by side for a few seconds before the old one is terminated. And
        * the new one will try to take a few leases at startup.
        */
        LOG.warn("We're a ZOMBIE - someone stole our lease. Quitting.");
    } else {
        /* This happens when a shard is split or merged, meaning that it stops existing
         * and we have other shards to process instead. Very rare.
         */
        LOG.warn("Shard is shutting down, reason: {}", input.getShutdownReason());
        try {
            input.getCheckpointer().checkpoint();
        } catch (Exception e) {
            LOG.error("Failed to checkpoint after shard shutdown", e);
        }
    }
    LOG.info("");
}
项目:samza    文件:KinesisRecordProcessor.java   
/**
 * Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this
 * RecordProcessor instance.
 *
 * @param shutdownInput Provides information and capabilities (eg checkpointing) related to shutdown of this record
 *        processor.
 */
@Override
public void shutdown(ShutdownInput shutdownInput) {
  LOG.info("Shutting down {} with reason:{}", this, shutdownInput.getShutdownReason());

  Validate.isTrue(!shutdownRequested, String.format("KCL called shutdown more than once for processor %s.", this));
  shutdownRequested = true;
  // shutdown reason TERMINATE indicates that the shard is closed due to re-sharding. It also indicates that all the
  // records from the shard have been delivered to the consumer and the consumer is expected to checkpoint the
  // progress.
  if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
    // We need to ensure that all records are processed and checkpointed before going ahead and marking the processing
    // complete by calling checkpoint() on KCL. We need to checkpoint the completion state for parent shard, for KCL
    // to consume from the child shard(s).
    try {
      LOG.info("Waiting for all the records for {} to be processed.", this);
      // Let's poll periodically and block until the last processed record is checkpointed. Also handle the case
      // where there are no records received for the processor, in which case the lastProcessedRecordSeqNumber will
      // be null.
      while (lastProcessedRecordSeqNumber != null
          && !lastProcessedRecordSeqNumber.equals(lastCheckpointedRecordSeqNumber)) {
        Thread.sleep(POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS);
      }
      LOG.info("Final checkpoint for {} before shutting down.", this);
      shutdownInput.getCheckpointer().checkpoint();
    } catch (Exception e) {
      LOG.warn("An error occurred while committing the final checkpoint in the parent shard {}", this, e);
    }
  }
  listener.onShutdown(ssp);
}
项目:samza    文件:TestKinesisRecordProcessor.java   
static void shutDownProcessor(KinesisRecordProcessor processor, ShutdownReason reason) {
  try {
    ShutdownInput shutdownInput = Mockito.mock(ShutdownInput.class);
    when(shutdownInput.getShutdownReason()).thenReturn(reason);
    when(shutdownInput.getCheckpointer()).thenReturn(getCheckpointer(processor));
    processor.shutdown(shutdownInput);
  } catch (Exception ex) {
    throw new RuntimeException(ex);
  }
}
项目:datacollector    文件:StreamSetsRecordProcessor.java   
/**
 * We don't checkpoint on SHUTDOWN_REQUESTED because we currently always
 * checkpoint each batch in {@link #processRecords}.
 *
 * @param shutdownInput {@inheritDoc}
 */
@Override
public void shutdown(ShutdownInput shutdownInput) {
  LOG.info("Shutting down record processor for shard: {}", shardId);

  if (ShutdownReason.TERMINATE.equals(shutdownInput.getShutdownReason())) {
    // Shard is closed / finished processing. Checkpoint all processing up to here.
    try {
      shutdownInput.getCheckpointer().checkpoint();
      LOG.debug("Checkpointed due to record processor shutdown request.");
    } catch (InvalidStateException | ShutdownException e) {
      LOG.error("Error checkpointing batch: {}", e.toString(), e);
    }
  }
}
项目:stail    文件:RawRecordProcessor.java   
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
}
项目:stail    文件:JSONRecordProcessor.java   
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
}
项目:samza    文件:TestKinesisSystemConsumer.java   
/**
 * Helper to simulate and test the life-cycle of record processing from a kinesis stream with a given number of shards
 * 1. Creation of record processors.
 * 2. Initialization of record processors.
 * 3. Processing records via record processors.
 * 4. Calling checkpoint on record processors.
 * 5. Shutting down (due to re-assignment or lease expiration) record processors.
 */
private void testProcessRecordsHelper(String system, String stream, int numShards, int numRecordsPerShard)
    throws InterruptedException, ShutdownException, InvalidStateException,
           NoSuchFieldException, IllegalAccessException {

  KinesisConfig kConfig = new KinesisConfig(new MapConfig());
  // Create consumer
  KinesisSystemConsumer consumer = new KinesisSystemConsumer(system, kConfig, new NoOpMetricsRegistry());
  initializeMetrics(consumer, stream);

  List<SystemStreamPartition> ssps = new LinkedList<>();
  IntStream.range(0, numShards)
      .forEach(p -> {
          SystemStreamPartition ssp = new SystemStreamPartition(system, stream, new Partition(p));
          ssps.add(ssp);
        });
  ssps.forEach(ssp -> consumer.register(ssp, SYSTEM_CONSUMER_REGISTER_OFFSET));

  // Create Kinesis record processor factory
  IRecordProcessorFactory factory = consumer.createRecordProcessorFactory(stream);

  // Create and initialize Kinesis record processor
  Map<String, KinesisRecordProcessor> processorMap = createAndInitProcessors(factory, numShards);
  List<KinesisRecordProcessor> processorList = new ArrayList<>(processorMap.values());

  // Generate records to Kinesis record processor
  Map<KinesisRecordProcessor, List<Record>> inputRecordMap = generateRecords(numRecordsPerShard, processorList);

  // Verification steps

  // Read events from the BEM queue
  Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages =
      readEvents(new HashSet<>(ssps), consumer, numRecordsPerShard);
  if (numRecordsPerShard > 0) {
    Assert.assertEquals(messages.size(), numShards);
  } else {
    // No input records and hence no messages
    Assert.assertEquals(messages.size(), 0);
    return;
  }

  Map<SystemStreamPartition, KinesisRecordProcessor> sspToProcessorMap = getProcessorMap(consumer);
  ssps.forEach(ssp -> {
      try {
        KinesisRecordProcessor processor = sspToProcessorMap.get(ssp);

        if (numRecordsPerShard > 0) {
          // Verify that the read messages are received in order and are the same as input records
          Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard);
          List<IncomingMessageEnvelope> envelopes = messages.get(ssp);
          List<Record> inputRecords = inputRecordMap.get(processor);
          verifyRecords(envelopes, inputRecords, processor.getShardId());

          // Call checkpoint on consumer and verify that the checkpoint is called with the right offset
          IncomingMessageEnvelope lastEnvelope = envelopes.get(envelopes.size() - 1);
          consumer.onCheckpoint(Collections.singletonMap(ssp, lastEnvelope.getOffset()));
          ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
          verify(getCheckpointer(processor)).checkpoint(argument.capture());
          Assert.assertEquals(inputRecords.get(inputRecords.size() - 1).getSequenceNumber(), argument.getValue());
        }

        // Call shutdown (with ZOMBIE reason) on processor and verify if shutdown freed the ssp mapping
        shutDownProcessor(processor, ShutdownReason.ZOMBIE);
        Assert.assertTrue(!sspToProcessorMap.containsValue(processor));
        Assert.assertTrue(isSspAvailable(consumer, ssp));
      } catch (NoSuchFieldException | IllegalAccessException | InvalidStateException | ShutdownException ex) {
        throw new RuntimeException(ex);
      }
    });
}