Java 类org.apache.kafka.clients.consumer.OffsetResetStrategy 实例源码

项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerCoordinatorTest.java   
@Before
public void setup() {
    this.time = new MockTime();
    this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
    this.metadata = new Metadata(0, Long.MAX_VALUE, true);
    this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
    this.client = new MockClient(time, metadata);
    this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
    this.metrics = new Metrics(time);
    this.rebalanceListener = new MockRebalanceListener();
    this.mockOffsetCommitCallback = new MockCommitCallback();
    this.partitionAssignor.clear();

    client.setNode(node);
    this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommitEnabled, true);
}
项目:kafka-0.11.0.0-src-with-comment    文件:StoreChangelogReaderTest.java   
@SuppressWarnings("unchecked")
@Test
public void shouldThrowStreamsExceptionWhenTimeoutExceptionThrown() throws Exception {
    final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) {
        @Override
        public Map<String, List<PartitionInfo>> listTopics() {
            throw new TimeoutException("KABOOM!");
        }
    };
    final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new MockTime(), 0);
    try {
        changelogReader.validatePartitionExists(topicPartition, "store");
        fail("Should have thrown streams exception");
    } catch (final StreamsException e) {
        // pass
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:StoreChangelogReaderTest.java   
@SuppressWarnings("unchecked")
@Test
public void shouldFallbackToPartitionsForIfPartitionNotInAllPartitionsList() throws Exception {
    final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) {
        @Override
        public List<PartitionInfo> partitionsFor(final String topic) {
            return Collections.singletonList(partitionInfo);
        }
    };

    final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new MockTime(), 10);
    changelogReader.validatePartitionExists(topicPartition, "store");
}
项目:kafka-0.11.0.0-src-with-comment    文件:StoreChangelogReaderTest.java   
@SuppressWarnings("unchecked")
@Test
public void shouldThrowStreamsExceptionIfTimeoutOccursDuringPartitionsFor() throws Exception {
    final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) {
        @Override
        public List<PartitionInfo> partitionsFor(final String topic) {
            throw new TimeoutException("KABOOM!");
        }
    };
    final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new MockTime(), 5);
    try {
        changelogReader.validatePartitionExists(topicPartition, "store");
        fail("Should have thrown streams exception");
    } catch (final StreamsException e) {
        // pass
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:StoreChangelogReaderTest.java   
@SuppressWarnings("unchecked")
@Test
public void shouldRequestPartitionInfoIfItDoesntExist() throws Exception {
    final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) {
        @Override
        public Map<String, List<PartitionInfo>> listTopics() {
            return Collections.emptyMap();
        }
    };

    consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(partitionInfo));
    final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, Time.SYSTEM, 5000);
    changelogReader.validatePartitionExists(topicPartition, "store");
}
项目:kafka-0.11.0.0-src-with-comment    文件:GlobalStateManagerImplTest.java   
@Before
public void before() throws IOException {
    final Map<String, String> storeToTopic = new HashMap<>();
    storeToTopic.put("t1-store", "t1");
    storeToTopic.put("t2-store", "t2");

    final Map<StateStore, ProcessorNode> storeToProcessorNode = new HashMap<>();
    store1 = new NoOpReadOnlyStore<>("t1-store");
    storeToProcessorNode.put(store1, new MockProcessorNode(-1));
    store2 = new NoOpReadOnlyStore("t2-store");
    storeToProcessorNode.put(store2, new MockProcessorNode(-1));
    topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(),
                                     Collections.<String, SourceNode>emptyMap(),
                                     Collections.<String, SinkNode>emptyMap(),
                                     Collections.<StateStore>emptyList(),
                                     storeToTopic,
                                     Arrays.<StateStore>asList(store1, store2));

    context = new NoOpProcessorContext();
    stateDirPath = TestUtils.tempDirectory().getPath();
    stateDirectory = new StateDirectory("appId", stateDirPath, time);
    consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    stateManager = new GlobalStateManagerImpl(topology, consumer, stateDirectory);
    checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME);
}
项目:kafka-0.11.0.0-src-with-comment    文件:GlobalStreamThreadTest.java   
@SuppressWarnings("unchecked")
@Test
public void shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() throws Exception {
    final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST) {
        @Override
        public List<PartitionInfo> partitionsFor(final String topic) {
            throw new RuntimeException("KABOOM!");
        }
    };
    globalStreamThread = new GlobalStreamThread(builder.buildGlobalStateTopology(),
                                                config,
                                                mockConsumer,
                                                new StateDirectory("appId", TestUtils.tempDirectory().getPath(), time),
                                                new Metrics(),
                                                new MockTime(),
                                                "clientId");

    try {
        globalStreamThread.start();
        fail("Should have thrown StreamsException if start up failed");
    } catch (StreamsException e) {
        assertThat(e.getCause(), instanceOf(RuntimeException.class));
        assertThat(e.getCause().getMessage(), equalTo("KABOOM!"));
    }
    assertFalse(globalStreamThread.stillRunning());
}
项目:kafka-0.11.0.0-src-with-comment    文件:FetcherTest.java   
@Test
public void testListOffsetsSendsIsolationLevel() {
    for (final IsolationLevel isolationLevel : IsolationLevel.values()) {
        Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(),
                new ByteArrayDeserializer(), Integer.MAX_VALUE, isolationLevel);

        subscriptions.assignFromUser(singleton(tp1));
        subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST);

        client.prepareResponse(new MockClient.RequestMatcher() {
            @Override
            public boolean matches(AbstractRequest body) {
                ListOffsetRequest request = (ListOffsetRequest) body;
                return request.isolationLevel() == isolationLevel;
            }
        }, listOffsetResponse(Errors.NONE, 1L, 5L));
        fetcher.updateFetchPositions(singleton(tp1));
        assertFalse(subscriptions.isOffsetResetNeeded(tp1));
        assertTrue(subscriptions.isFetchable(tp1));
        assertEquals(5, subscriptions.position(tp1).longValue());
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:FetcherTest.java   
@Test
public void testUpdateFetchPositionDisconnect() {
    subscriptions.assignFromUser(singleton(tp1));
    subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST);

    // First request gets a disconnect
    client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
                           listOffsetResponse(Errors.NONE, 1L, 5L), true);

    // Next one succeeds
    client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
                           listOffsetResponse(Errors.NONE, 1L, 5L));
    fetcher.updateFetchPositions(singleton(tp1));
    assertFalse(subscriptions.isOffsetResetNeeded(tp1));
    assertTrue(subscriptions.isFetchable(tp1));
    assertEquals(5, subscriptions.position(tp1).longValue());
}
项目:kafka-0.11.0.0-src-with-comment    文件:FetcherTest.java   
@Test
public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() {
    subscriptions.assignFromUser(singleton(tp1));
    subscriptions.committed(tp1, new OffsetAndMetadata(0));
    subscriptions.pause(tp1); // paused partition does not have a valid position
    subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST);

    client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
                           listOffsetResponse(Errors.NONE, 1L, 10L));
    fetcher.updateFetchPositions(singleton(tp1));

    assertFalse(subscriptions.isOffsetResetNeeded(tp1));
    assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused
    assertTrue(subscriptions.hasValidPosition(tp1));
    assertEquals(10, subscriptions.position(tp1).longValue());
}
项目:fluid    文件:KafkaUsage.java   
public Properties getConsumerProperties(String groupId, String clientId, OffsetResetStrategy autoOffsetReset) {
    if (groupId == null) {
        throw new IllegalArgumentException("The groupId is required");
    } else {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", brokers);
        props.setProperty("group.id", groupId);
        props.setProperty("enable.auto.commit", Boolean.FALSE.toString());
        if (autoOffsetReset != null) {
            props.setProperty("auto.offset.reset",
                autoOffsetReset.toString().toLowerCase());
        }

        if (clientId != null) {
            props.setProperty("client.id", clientId);
        }

        return props;
    }
}
项目:kafka    文件:Fetcher.java   
/**
 * Reset offsets for the given partition using the offset reset strategy.
 *
 * @param partition The given partition that needs reset offset
 * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined
 */
private void resetOffset(TopicPartition partition) {
    OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
    final long timestamp;
    if (strategy == OffsetResetStrategy.EARLIEST)
        timestamp = ListOffsetRequest.EARLIEST_TIMESTAMP;
    else if (strategy == OffsetResetStrategy.LATEST)
        timestamp = ListOffsetRequest.LATEST_TIMESTAMP;
    else
        throw new NoOffsetForPartitionException(partition);

    log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase(Locale.ROOT));
    long offset = listOffset(partition, timestamp);

    // we might lose the assignment while fetching the offset, so check it is still active
    if (subscriptions.isAssigned(partition))
        this.subscriptions.seek(partition, offset);
}
项目:kafka    文件:ConsumerCoordinatorTest.java   
@Before
public void setup() {
    this.time = new MockTime();
    this.client = new MockClient(time);
    this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
    this.metadata = new Metadata(0, Long.MAX_VALUE);
    this.metadata.update(cluster, time.milliseconds());
    this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
    this.metrics = new Metrics(time);
    this.rebalanceListener = new MockRebalanceListener();
    this.defaultOffsetCommitCallback = new MockCommitCallback();
    this.partitionAssignor.clear();

    client.setNode(node);
    this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommitEnabled);
}
项目:kafka    文件:FetcherTest.java   
@Test
public void testUpdateFetchPositionDisconnect() {
    subscriptions.assignFromUser(Arrays.asList(tp));
    subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);

    // First request gets a disconnect
    client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
                           listOffsetResponse(Errors.NONE, Arrays.asList(5L)), true);

    // Next one succeeds
    client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
                           listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
    fetcher.updateFetchPositions(Collections.singleton(tp));
    assertFalse(subscriptions.isOffsetResetNeeded(tp));
    assertTrue(subscriptions.isFetchable(tp));
    assertEquals(5, (long) subscriptions.position(tp));
}
项目:vertx-kafka-client    文件:ConsumerTestBase.java   
@Test
public void testConsume(TestContext ctx) throws Exception {
  final String topicName = "testConsume";
  String consumerId = topicName;
  Async batch = ctx.async();
  AtomicInteger index = new AtomicInteger();
  int numMessages = 1000;
  kafkaCluster.useTo().produceStrings(numMessages, batch::complete,  () ->
      new ProducerRecord<>(topicName, 0, "key-" + index.get(), "value-" + index.getAndIncrement()));
  batch.awaitSuccess(20000);
  Properties config = kafkaCluster.useTo().getConsumerProperties(consumerId, consumerId, OffsetResetStrategy.EARLIEST);
  config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  consumer = createConsumer(vertx, config);
  Async done = ctx.async();
  AtomicInteger count = new AtomicInteger(numMessages);
  consumer.exceptionHandler(ctx::fail);
  consumer.handler(rec -> {
    if (count.decrementAndGet() == 0) {
      done.complete();
    }
  });
  consumer.subscribe(Collections.singleton(topicName));
}
项目:vertx-kafka-client    文件:ConsumerTestBase.java   
@Test
public void testPartitionsFor(TestContext ctx) throws Exception {
  String topicName = "testPartitionsFor";
  String consumerId = topicName;
  kafkaCluster.createTopic(topicName, 2, 1);
  Properties config = kafkaCluster.useTo().getConsumerProperties(consumerId, consumerId, OffsetResetStrategy.EARLIEST);
  config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  Context context = vertx.getOrCreateContext();
  consumer = createConsumer(context, config);

  Async done = ctx.async();

  consumer.partitionsFor(topicName, ar -> {
    if (ar.succeeded()) {
      List<PartitionInfo> partitionInfo = ar.result();
      ctx.assertEquals(2, partitionInfo.size());
    } else {
      ctx.fail();
    }
    done.complete();
  });
}
项目:vertx-kafka-client    文件:ConsumerTestBase.java   
@Test
public void testBatchHandler(TestContext ctx) throws Exception {
  String topicName = "testBatchHandler";
  String consumerId = topicName;
  Async batch1 = ctx.async();
  AtomicInteger index = new AtomicInteger();
  int numMessages = 500;
  kafkaCluster.useTo().produceStrings(numMessages, batch1::complete,  () ->
      new ProducerRecord<>(topicName, 0, "key-" + index.get(), "value-" + index.getAndIncrement()));
  batch1.awaitSuccess(10000);
  Properties config = kafkaCluster.useTo().getConsumerProperties(consumerId, consumerId, OffsetResetStrategy.EARLIEST);
  config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  Context context = vertx.getOrCreateContext();
  consumer = createConsumer(context, config);
  Async batchHandler = ctx.async();
  consumer.batchHandler(records -> {
    ctx.assertEquals(numMessages, records.count());
    batchHandler.complete();
  });
  consumer.exceptionHandler(ctx::fail);
  consumer.handler(rec -> {});
  consumer.subscribe(Collections.singleton(topicName));
}
项目:vertx-kafka-client    文件:ConsumerTestBase.java   
@Test
public void testPollTimeout(TestContext ctx) throws Exception {
  Async async = ctx.async();
  String topicName = "testPollTimeout";
  Properties config = kafkaCluster.useTo().getConsumerProperties(topicName, topicName, OffsetResetStrategy.EARLIEST);
  config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

  io.vertx.kafka.client.common.TopicPartition topicPartition = new io.vertx.kafka.client.common.TopicPartition(topicName, 0);
  KafkaConsumer<Object, Object> consumerWithCustomTimeout = KafkaConsumer.create(vertx, config);

  int pollingTimeout = 1500;
  // Set the polling timeout to 1500 ms (default is 1000)
  consumerWithCustomTimeout.pollTimeout(pollingTimeout);
  // Subscribe to the empty topic (we want the poll() call to timeout!)
  consumerWithCustomTimeout.subscribe(topicName, subscribeRes -> {
    consumerWithCustomTimeout.handler(rec -> {}); // Consumer will now immediately poll once
    long beforeSeek = System.currentTimeMillis();
    consumerWithCustomTimeout.seekToBeginning(topicPartition, seekRes -> {
      long durationWShortTimeout = System.currentTimeMillis() - beforeSeek;
      ctx.assertTrue(durationWShortTimeout >= pollingTimeout, "Operation must take at least as long as the polling timeout");
      consumerWithCustomTimeout.close();
      async.countDown();
    });
  });
}
项目:vertx-kafka-client    文件:RxConsumerTest.java   
@Test
public void testConsume(TestContext ctx) throws Exception {
  String topicName = "testConsume";
  Async batch = ctx.async();
  AtomicInteger index = new AtomicInteger();
  int numMessages = 1000;
  kafkaCluster.useTo().produceStrings(numMessages, batch::complete,  () ->
    new ProducerRecord<>(topicName, 0, "key-" + index.get(), "value-" + index.getAndIncrement()));
  batch.awaitSuccess(20000);
  Properties config = kafkaCluster.useTo().getConsumerProperties("testConsume_consumer", "testConsume_consumer", OffsetResetStrategy.EARLIEST);
  Map<String ,String> map = mapConfig(config);
  consumer = KafkaConsumer.create(vertx, map, String.class, String.class);
  Async done = ctx.async();
  AtomicInteger count = new AtomicInteger(numMessages);
  consumer.toObservable().subscribe(a -> {
    if (count.decrementAndGet() == 0) {
      done.complete();
    }
  }, ctx::fail);
  consumer.subscribe(Collections.singleton(topicName));
}
项目:vertx-kafka-client    文件:CleanupTest.java   
@Test
// Regression test for ISS-73: undeployment of a verticle with unassigned consumer fails
public void testUndeployUnassignedConsumer(TestContext ctx) throws Exception {
  Properties config = kafkaCluster.useTo().getConsumerProperties("testUndeployUnassignedConsumer_consumer",
    "testUndeployUnassignedConsumer_consumer", OffsetResetStrategy.EARLIEST);
  config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

  Async async = ctx.async(1);
  vertx.deployVerticle(new AbstractVerticle() {
    @Override
    public void start() throws Exception {
      KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);
      vertx.setTimer(20, record -> {
        // Very rarely, this throws a AlreadyUndedeployed error
        vertx.undeploy(context.deploymentID(), ctx.asyncAssertSuccess(ar -> {
          async.complete();
        }));
      });
    }
  }, ctx.asyncAssertSuccess());
}
项目:vertx-kafka-client    文件:ConsumerMockTestBase.java   
@Test
public void testConsume(TestContext ctx) throws Exception {
  MockConsumer<String, String> mock = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
  KafkaReadStream<String, String> consumer = createConsumer(vertx, mock);
  Async doneLatch = ctx.async();
  consumer.handler(record -> {
    ctx.assertEquals("the_topic", record.topic());
    ctx.assertEquals(0, record.partition());
    ctx.assertEquals("abc", record.key());
    ctx.assertEquals("def", record.value());
    consumer.close(v -> doneLatch.complete());
  });
  consumer.subscribe(Collections.singleton("the_topic"), v -> {
    mock.schedulePollTask(()-> {
      mock.rebalance(Collections.singletonList(new TopicPartition("the_topic", 0)));
      mock.addRecord(new ConsumerRecord<>("the_topic", 0, 0L, "abc", "def"));
      mock.seek(new TopicPartition("the_topic", 0), 0L);
    });
  });
}
项目:kafka-reactive-streams    文件:KafkaPublisherTest.java   
@Override public Publisher<ConsumerRecord<Long, Double>> createPublisher(final long l) {
    long nRecords = 100;

    mockConsumer = new MockConsumer<Long, Double>(OffsetResetStrategy.LATEST);
    mockConsumer.assign(Arrays.asList(topicPartition));
    final HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<>();
    topicPartitionLongHashMap.put(topicPartition, 0L);
    mockConsumer.updateBeginningOffsets(topicPartitionLongHashMap);
    topicPartitionLongHashMap.put(topicPartition, nRecords - 1);
    mockConsumer.updateEndOffsets(topicPartitionLongHashMap);
    final Random random = new Random();
    for (int i = 0; i < nRecords; i++)
        mockConsumer.addRecord(
                new ConsumerRecord<Long, Double>(
                        topicPartition.topic(),
                        topicPartition.partition(),
                        i,
                        random.nextLong(),
                        random.nextDouble()));

    return new KafkaPublisher<Long, Double>(mockConsumer, 100, Executors.newSingleThreadExecutor());
}
项目:beam    文件:KafkaIOTest.java   
/**
 * Creates a consumer with two topics, with 10 partitions each.
 * numElements are (round-robin) assigned all the 20 partitions.
 */
private static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(
    int numElements,
    int maxNumRecords,
    @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) {

  List<String> topics = ImmutableList.of("topic_a", "topic_b");

  KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read()
      .withBootstrapServers("myServer1:9092,myServer2:9092")
      .withTopics(topics)
      .withConsumerFactoryFn(new ConsumerFactoryFn(
          topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions
      .withKeyDeserializer(IntegerDeserializer.class)
      .withValueDeserializer(LongDeserializer.class)
      .withMaxNumRecords(maxNumRecords);

  if (timestampFn != null) {
    return reader.withTimestampFn(timestampFn);
  } else {
    return reader;
  }
}
项目:beam    文件:KafkaIOTest.java   
@Test
public void testUnboundedSourceWithSingleTopic() {
  // same as testUnboundedSource, but with single topic

  int numElements = 1000;
  String topic = "my_topic";

  KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read()
      .withBootstrapServers("none")
      .withTopic("my_topic")
      .withConsumerFactoryFn(new ConsumerFactoryFn(
          ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST))
      .withMaxNumRecords(numElements)
      .withKeyDeserializer(IntegerDeserializer.class)
      .withValueDeserializer(LongDeserializer.class);

  PCollection<Long> input = p
      .apply(reader.withoutMetadata())
      .apply(Values.<Long>create());

  addCountingAsserts(input, numElements);
  p.run();
}
项目:beam    文件:KafkaIOTest.java   
@Test
public void testSourceWithExplicitPartitionsDisplayData() {
  KafkaIO.Read<byte[], Long> read = KafkaIO.<byte[], Long>read()
      .withBootstrapServers("myServer1:9092,myServer2:9092")
      .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5),
          new TopicPartition("test", 6)))
      .withConsumerFactoryFn(new ConsumerFactoryFn(
          Lists.newArrayList("test"), 10, 10, OffsetResetStrategy.EARLIEST)) // 10 partitions
      .withKeyDeserializer(ByteArrayDeserializer.class)
      .withValueDeserializer(LongDeserializer.class);

  DisplayData displayData = DisplayData.from(read);

  assertThat(displayData, hasDisplayItem("topicPartitions", "test-5,test-6"));
  assertThat(displayData, hasDisplayItem("enable.auto.commit", false));
  assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092"));
  assertThat(displayData, hasDisplayItem("auto.offset.reset", "latest"));
  assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288));
}
项目:incubator-gobblin    文件:Kafka09ConsumerClientTest.java   
@Test
public void testConsume() throws Exception {
  Config testConfig = ConfigFactory.parseMap(ImmutableMap.of(ConfigurationKeys.KAFKA_BROKERS, "test"));
  MockConsumer<String, String> consumer = new MockConsumer<String, String>(OffsetResetStrategy.NONE);
  consumer.assign(Arrays.asList(new TopicPartition("test_topic", 0)));

  HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
  beginningOffsets.put(new TopicPartition("test_topic", 0), 0L);
  consumer.updateBeginningOffsets(beginningOffsets);

  ConsumerRecord<String, String> record0 = new ConsumerRecord<>("test_topic", 0, 0L, "key", "value0");
  ConsumerRecord<String, String> record1 = new ConsumerRecord<>("test_topic", 0, 1L, "key", "value1");
  ConsumerRecord<String, String> record2 = new ConsumerRecord<>("test_topic", 0, 2L, "key", "value2");

  consumer.addRecord(record0);
  consumer.addRecord(record1);
  consumer.addRecord(record2);

  try (Kafka09ConsumerClient<String, String> kafka09Client = new Kafka09ConsumerClient<>(testConfig, consumer);) {

    // Consume from 0 offset
    Set<KafkaConsumerRecord> consumedRecords =
        Sets.newHashSet(kafka09Client.consume(new KafkaPartition.Builder().withId(0).withTopicName("test_topic")
            .build(), 0l, 100l));

    Set<Kafka09ConsumerRecord<String, String>> expected =
        ImmutableSet.<Kafka09ConsumerRecord<String, String>> of(new Kafka09ConsumerRecord<>(record0),
            new Kafka09ConsumerRecord<>(record1), new Kafka09ConsumerRecord<>(record2));
    Assert.assertEquals(consumedRecords, expected);

  }

}
项目:debezium-proto    文件:KafkaCluster.java   
/**
 * Use the supplied function to asynchronously consume messages from the cluster.
 * 
 * @param groupId the name of the group; may not be null
 * @param clientId the name of the client; may not be null
 * @param autoOffsetReset how to pick a starting offset when there is no initial offset in ZooKeeper or if an offset is
 *            out of range; may be null for the default to be used
 * @param keyDeserializer the deserializer for the keys; may not be null
 * @param valueDeserializer the deserializer for the values; may not be null
 * @param continuation the function that determines if the consumer should continue; may not be null
 * @param completion the function to call when the consumer terminates; may be null
 * @param topics the set of topics to consume; may not be null or empty
 * @param consumerFunction the function to consume the messages; may not be null
 */
public <K, V> void consume(String groupId, String clientId, OffsetResetStrategy autoOffsetReset,
                           Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer,
                           BooleanSupplier continuation, Runnable completion, Collection<String> topics,
                           java.util.function.Consumer<ConsumerRecord<K, V>> consumerFunction) {
    Properties props = getConsumerProperties(groupId, clientId, autoOffsetReset);
    Thread t = new Thread(() -> {
        LOGGER.debug("Starting consumer {} to read messages", clientId);
        try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(props, keyDeserializer, valueDeserializer)) {
            consumer.subscribe(new ArrayList<>(topics));
            while (continuation.getAsBoolean()) {
                consumer.poll(10).forEach(record -> {
                    LOGGER.debug("Consumer {}: consuming message {}", clientId, record);
                    consumerFunction.accept(record);
                    consumer.commitAsync();
                });
            }
        } finally {
            if (completion != null) completion.run();
            LOGGER.debug("Stopping consumer {}", clientId);
        }
    });
    t.setName(clientId + "-thread");
    t.start();
}
项目:Lagerta    文件:KafkaMockFactory.java   
private Consumer createConsumer() {
    ProxyMockConsumer consumer = new ProxyMockConsumer(OffsetResetStrategy.EARLIEST);

    for (Map.Entry<String, List<PartitionInfo>> entry : getInfos().entrySet()) {
        consumer.updatePartitions(entry.getKey(), entry.getValue());
    }
    CONSUMERS.add(consumer);
    return consumer;
}
项目:Lagerta    文件:KafkaMockFactory.java   
private Consumer createConsumer() {
    ProxyMockConsumer consumer = new ProxyMockConsumer(OffsetResetStrategy.EARLIEST);
    for (Map.Entry<String, List<PartitionInfo>> entry : getInfos().entrySet()) {
        consumer.updatePartitions(entry.getKey(), entry.getValue());
    }
    ConsumerForTests testConsumer = new ConsumerForTests(consumer);

    CONSUMERS.add(testConsumer);
    return testConsumer;
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaBasedLogTest.java   
@Before
public void setUp() throws Exception {
    store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"},
            TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time, initializer);
    consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1));
    Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
    beginningOffsets.put(TP0, 0L);
    beginningOffsets.put(TP1, 0L);
    consumer.updateBeginningOffsets(beginningOffsets);
}
项目:kafka-0.11.0.0-src-with-comment    文件:AbstractTaskTest.java   
private Consumer mockConsumer(final RuntimeException toThrow) {
    return new MockConsumer(OffsetResetStrategy.EARLIEST) {
        @Override
        public OffsetAndMetadata committed(final TopicPartition partition) {
            throw toThrow;
        }
    };
}
项目:kafka-0.11.0.0-src-with-comment    文件:SubscriptionState.java   
public SubscriptionState(OffsetResetStrategy defaultResetStrategy) {
    this.defaultResetStrategy = defaultResetStrategy;
    this.subscription = Collections.emptySet();
    this.assignment = new PartitionStates<>();
    this.groupSubscription = new HashSet<>();
    this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
    this.subscribedPattern = null;
    this.subscriptionType = SubscriptionType.NONE;
}
项目:kafka-0.11.0.0-src-with-comment    文件:Fetcher.java   
private void offsetResetStrategyTimestamp(
        final TopicPartition partition,
        final Map<TopicPartition, Long> output,
        final Set<TopicPartition> partitionsWithNoOffsets) {
    OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
    if (strategy == OffsetResetStrategy.EARLIEST)
        output.put(partition, ListOffsetRequest.EARLIEST_TIMESTAMP);
    else if (strategy == OffsetResetStrategy.LATEST)
        output.put(partition, endTimestamp());
    else
        partitionsWithNoOffsets.add(partition);
}
项目:kafka-0.11.0.0-src-with-comment    文件:FetcherTest.java   
@Test
public void testUpdateFetchPositionResetToLatestOffset() {
    subscriptions.assignFromUser(singleton(tp1));
    subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST);

    client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
                           listOffsetResponse(Errors.NONE, 1L, 5L));
    fetcher.updateFetchPositions(singleton(tp1));
    assertFalse(subscriptions.isOffsetResetNeeded(tp1));
    assertTrue(subscriptions.isFetchable(tp1));
    assertEquals(5, subscriptions.position(tp1).longValue());
}
项目:kafka-0.11.0.0-src-with-comment    文件:FetcherTest.java   
@Test
public void testUpdateFetchPositionResetToEarliestOffset() {
    subscriptions.assignFromUser(singleton(tp1));
    subscriptions.needOffsetReset(tp1, OffsetResetStrategy.EARLIEST);

    client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
                           listOffsetResponse(Errors.NONE, 1L, 5L));
    fetcher.updateFetchPositions(singleton(tp1));
    assertFalse(subscriptions.isOffsetResetNeeded(tp1));
    assertTrue(subscriptions.isFetchable(tp1));
    assertEquals(5, subscriptions.position(tp1).longValue());
}
项目:fluid    文件:KafkaUsage.java   
/**
 * Use the supplied function to asynchronously consume messages from the cluster.
 *
 * @param groupId              the name of the group; may not be null
 * @param clientId             the name of the client; may not be null
 * @param autoOffsetReset      how to pick a starting offset when there is no initial offset in ZooKeeper or if an offset is
 *                             out of range; may be null for the default to be used
 * @param keyDeserializer      the deserializer for the keys; may not be null
 * @param valueDeserializer    the deserializer for the values; may not be null
 * @param continuation         the function that determines if the consumer should continue; may not be null
 * @param offsetCommitCallback the callback that should be used after committing offsets; may be null if offsets are
 *                             not to be committed
 * @param completion           the function to call when the consumer terminates; may be null
 * @param topics               the set of topics to consume; may not be null or empty
 * @param consumerFunction     the function to consume the messages; may not be null
 */
public <K, V> void consume(String groupId, String clientId, OffsetResetStrategy autoOffsetReset,
                           Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer,
                           BooleanSupplier continuation, OffsetCommitCallback offsetCommitCallback, Runnable completion,
                           Collection<String> topics,
                           java.util.function.Consumer<ConsumerRecord<K, V>> consumerFunction) {
    Properties props = getConsumerProperties(groupId, clientId, autoOffsetReset);
    Thread t = new Thread(() -> {
        LOGGER.info("Starting consumer {} to read messages", clientId);
        try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(props, keyDeserializer, valueDeserializer)) {
            consumer.subscribe(new ArrayList<>(topics));
            while (continuation.getAsBoolean()) {
                consumer.poll(10).forEach(record -> {
                    LOGGER.info("Consumer {}: consuming message {}", clientId, record);
                    consumerFunction.accept(record);
                    if (offsetCommitCallback != null) {
                        consumer.commitAsync(offsetCommitCallback);
                    }
                });
            }
        } finally {
            if (completion != null) completion.run();
            LOGGER.debug("Stopping consumer {}", clientId);
        }
    });
    t.setName(clientId + "-thread");
    t.start();
}
项目:fluid    文件:KafkaUsage.java   
public void consumeIntegers(BooleanSupplier continuation, Runnable completion, Collection<String> topics, Consumer<ConsumerRecord<String, Integer>> consumerFunction) {
    Deserializer<String> keyDes = new StringDeserializer();
    Deserializer<Integer> valDes = new IntegerDeserializer();
    String randomId = UUID.randomUUID().toString();
    OffsetCommitCallback offsetCommitCallback = null;
    this.consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, (OffsetCommitCallback) offsetCommitCallback, completion, topics, consumerFunction);
}
项目:kafka    文件:SubscriptionState.java   
public SubscriptionState(OffsetResetStrategy defaultResetStrategy) {
    this.defaultResetStrategy = defaultResetStrategy;
    this.subscription = new HashSet<>();
    this.userAssignment = new HashSet<>();
    this.assignment = new HashMap<>();
    this.groupSubscription = new HashSet<>();
    this.needsPartitionAssignment = false;
    this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
    this.subscribedPattern = null;
}
项目:kafka    文件:FetcherTest.java   
@Test
public void testUpdateFetchPositionResetToLatestOffset() {
    subscriptions.assignFromUser(Arrays.asList(tp));
    subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);

    client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
                           listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
    fetcher.updateFetchPositions(Collections.singleton(tp));
    assertFalse(subscriptions.isOffsetResetNeeded(tp));
    assertTrue(subscriptions.isFetchable(tp));
    assertEquals(5, (long) subscriptions.position(tp));
}
项目:kafka    文件:FetcherTest.java   
@Test
public void testUpdateFetchPositionResetToEarliestOffset() {
    subscriptions.assignFromUser(Arrays.asList(tp));
    subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);

    client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
                           listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
    fetcher.updateFetchPositions(Collections.singleton(tp));
    assertFalse(subscriptions.isOffsetResetNeeded(tp));
    assertTrue(subscriptions.isFetchable(tp));
    assertEquals(5, (long) subscriptions.position(tp));
}