Java 类org.apache.kafka.clients.consumer.ConsumerConfig 实例源码

项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerCoordinatorTest.java   
@Before
public void setup() {
    this.time = new MockTime();
    this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
    this.metadata = new Metadata(0, Long.MAX_VALUE, true);
    this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
    this.client = new MockClient(time, metadata);
    this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
    this.metrics = new Metrics(time);
    this.rebalanceListener = new MockRebalanceListener();
    this.mockOffsetCommitCallback = new MockCommitCallback();
    this.partitionAssignor.clear();

    client.setNode(node);
    this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommitEnabled, true);
}
项目:xm-uaa    文件:ApplicationStartup.java   
private void createSystemConsumer(String name, MessageListener<String, String> consumeEvent) {
    log.info("Creating kafka consumer for topic {}", name);
    ContainerProperties containerProps = new ContainerProperties(name);

    Map<String, Object> props = kafkaProperties.buildConsumerProperties();
    if (name.equals(applicationProperties.getKafkaSystemTopic())) {
        props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
    }
    ConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(props);

    ConcurrentMessageListenerContainer<String, String> container =
        new ConcurrentMessageListenerContainer<>(factory, containerProps);
    container.setupMessageListener(consumeEvent);
    container.start();
    log.info("Successfully created kafka consumer for topic {}", name);
}
项目:open-kilda    文件:KafkaSpoutFactory.java   
public KafkaSpoutConfig.Builder<String, String> builder(String bootstrapServers, String topic, Class klass) {
    Map<String, Object> props = new HashMap<>();
    if (bootstrapServers == null || bootstrapServers.isEmpty()) {
        throw new IllegalArgumentException("bootstrap servers cannot be null");
    }
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(KafkaSpoutConfig.Consumer.GROUP_ID, groupId);
    props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER,
            keyDeserializer);
    props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER,
            valueDeserializer);
    props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, String.valueOf(enableAutoCommit));

    KafkaSpoutStreams streams = new KafkaSpoutStreamsNamedTopics.Builder(
            new KafkaSpoutStream(getFields(klass), topic)).build();

    KafkaSpoutTuplesBuilder<String, String> tuplesBuilder = new KafkaSpoutTuplesBuilderNamedTopics.Builder<String, String>(
            new TuplesBuilder(topic, klass)).build();

    return new KafkaSpoutConfig.Builder<>(props, streams, tuplesBuilder);
}
项目:kafka-streams-machine-learning-examples    文件:StreamsStarterApp.java   
public static void main(String[] args) {

        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-starter-app");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        KStreamBuilder builder = new KStreamBuilder();

        KStream<String, String> kStream = builder.stream("streams-file-input");
        // do stuff
        kStream.to("streams-wordcount-output");

        KafkaStreams streams = new KafkaStreams(builder, config);
        streams.cleanUp(); // only do this in dev - not in prod
        streams.start();

        // print the topology
        System.out.println(streams.toString());

        // shutdown hook to correctly close the streams application
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

    }
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerCoordinatorTest.java   
@Test
public void testAutoCommitManualAssignment() {
    ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
                                                       ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);

    subscriptions.assignFromUser(singleton(t1p));
    subscriptions.seek(t1p, 100);

    client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
    coordinator.ensureCoordinatorReady();

    client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
    time.sleep(autoCommitIntervalMs);
    coordinator.poll(time.milliseconds(), Long.MAX_VALUE);

    assertEquals(100L, subscriptions.committed(t1p).offset());
}
项目:stroom-stats    文件:StatisticsFlatMappingProcessor.java   
private StreamsConfig buildStreamsConfig(String appId, final Map<String, Object> additionalProps) {
        Map<String, Object> props = new HashMap<>();

        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBootstrapServers());
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, getStreamsCommitIntervalMs());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, getAutoOffsetReset());

        //TODO not clear if this is needed for not. Normal Kafka doesn't need it but streams may do
        //leaving it in seems to cause zookeeper connection warnings in the tests.  Tests seem to work ok
        //without it
//        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperConfig.getQuorum());

        //Add any additional props, overwriting any from above
        props.putAll(additionalProps);

        props.forEach((key, value) ->
                LOGGER.info("Setting Kafka Streams property {} for appId {} to [{}]", key, appId, value.toString())
        );

        return new StreamsConfig(props);
    }
项目:xm-ms-entity    文件:ApplicationStartup.java   
private void createSystemConsumer(String name, MessageListener<String, String> consumeEvent) {
    log.info("Creating kafka consumer for topic {}", name);
    ContainerProperties containerProps = new ContainerProperties(name);

    Map<String, Object> props = kafkaProperties.buildConsumerProperties();
    if (name.equals(applicationProperties.getKafkaSystemTopic())) {
        props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
    }
    ConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(props);

    ConcurrentMessageListenerContainer<String, String> container =
        new ConcurrentMessageListenerContainer<>(factory, containerProps);
    container.setupMessageListener(consumeEvent);
    container.start();
    log.info("Successfully created kafka consumer for topic {}", name);
}
项目:flume-release-1.7.0    文件:KafkaSource.java   
private void translateOldProperties(Context ctx) {
  // topic
  String topic = context.getString(KafkaSourceConstants.TOPIC);
  if (topic != null && !topic.isEmpty()) {
    subscriber = new TopicListSubscriber(topic);
    log.warn("{} is deprecated. Please use the parameter {}",
            KafkaSourceConstants.TOPIC, KafkaSourceConstants.TOPICS);
  }

  // old groupId
  groupId = ctx.getString(KafkaSourceConstants.OLD_GROUP_ID);
  if (groupId != null && !groupId.isEmpty()) {
    log.warn("{} is deprecated. Please use the parameter {}",
            KafkaSourceConstants.OLD_GROUP_ID,
            KafkaSourceConstants.KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
  }
}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerCoordinatorTest.java   
@Test
public void testAutoCommitManualAssignmentCoordinatorUnknown() {
    ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
                                                       ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);

    subscriptions.assignFromUser(singleton(t1p));
    subscriptions.seek(t1p, 100);

    // no commit initially since coordinator is unknown
    consumerClient.poll(0);
    time.sleep(autoCommitIntervalMs);
    consumerClient.poll(0);

    assertNull(subscriptions.committed(t1p));

    // now find the coordinator
    client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
    coordinator.ensureCoordinatorReady();

    // sleep only for the retry backoff
    time.sleep(retryBackoffMs);
    client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
    coordinator.poll(time.milliseconds(), Long.MAX_VALUE);

    assertEquals(100L, subscriptions.committed(t1p).offset());
}
项目:flume-release-1.7.0    文件:TestKafkaChannel.java   
@Test
public void testOldConfig() throws Exception {
  Context context = new Context();
  context.put(BROKER_LIST_FLUME_KEY,testUtil.getKafkaServerUrl());
  context.put(GROUP_ID_FLUME,"flume-something");
  context.put(READ_SMALLEST_OFFSET,"true");
  context.put("topic",topic);

  final KafkaChannel channel = new KafkaChannel();
  Configurables.configure(channel, context);

  Properties consumerProps = channel.getConsumerProps();
  Properties producerProps = channel.getProducerProps();

  Assert.assertEquals(producerProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
                      testUtil.getKafkaServerUrl());
  Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG),
                      "flume-something");
  Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
                      "earliest");
}
项目:microservices-transactions-tcc    文件:CompositeTransactionManagerKafkaImpl.java   
@SuppressWarnings("unchecked")
@Override
public List<EntityCommand<?>> fetch(String txId) {
    List<EntityCommand<?>> transactionOperations = new ArrayList<EntityCommand<?>>();

    Map<String, Object> consumerConfigs = (Map<String, Object>)configuration.get("kafkaConsumerConfiguration");
    consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());

    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(consumerConfigs);
    kafkaConsumer.subscribe(Arrays.asList(txId));

    ConsumerRecords<String, String> records = kafkaConsumer.poll(kafkaConsumerPollTimeout);
    for (ConsumerRecord<String, String> record : records){
        LOG.info("offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value());
        try {
            transactionOperations.add(serializer.readFromString(record.value()));
        } catch (SerializationFailedException e) {
            LOG.error("Unable to deserialize [{}] because of: {}", record.value(), e.getMessage());
        }
    }

    kafkaConsumer.close();

    return transactionOperations;
}
项目:azeroth    文件:KafkaConsumerCommand.java   
private KafkaConsumer<String, Serializable> getConsumer(String groupId) {

        KafkaConsumer<String, Serializable> kafkaConsumer = null;
        if ((kafkaConsumer = kafkaConsumers.get(groupId)) != null)
            return kafkaConsumer;

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());

        kafkaConsumer = new KafkaConsumer<String, Serializable>(properties);
        kafkaConsumers.put(groupId, kafkaConsumer);
        return kafkaConsumer;

    }
项目:Lagerta    文件:ReceivedTransactionsListenerImpl.java   
@Override
public void execute(ServiceContext ctx) throws Exception {
    active = true;
    receivedIds = new GridConcurrentHashSet<>();

    Properties config = new Properties();
    config.putAll(dataRecoveryConfig.getConsumerConfig());
    config.put(ConsumerConfig.GROUP_ID_CONFIG, ReceivedTransactionsListenerImpl.class.getSimpleName());
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    try (Consumer<ByteBuffer, ByteBuffer> consumer = kafkaFactory.consumer(config)) {
        consumer.subscribe(Arrays.asList(dataRecoveryConfig.getRemoteTopic(), dataRecoveryConfig.getReconciliationTopic()));
        while (active) {
            ConsumerRecords<ByteBuffer, ByteBuffer> poll = consumer.poll(500);
            for (ConsumerRecord<ByteBuffer, ByteBuffer> record : poll) {
                TransactionMetadata metadata = serializer.deserialize(record.key());
                receivedIds.add(metadata.getTransactionId());
            }
            consumer.commitSync();
        }
    }
}
项目:springboot_cwenao    文件:KafkaConsumerConfig.java   
public ConsumerFactory<String, String> consumerFactory() {

        Map<String, Object> properties = new HashMap<String, Object>();

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        return new DefaultKafkaConsumerFactory<String, String>(properties);

    }
项目:kalinka    文件:ContextConfiguration.java   
@Bean
public Map<String, Object> kafkaConsumerConfig() {
    final Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.GROUP_ID_CONFIG, this.kafkaGroupId);
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaHosts);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.kafkaAutoCommit);
    if (this.kafkaAutoCommit) {
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, this.kafkaAutoCommitInterval);
    }
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, this.kafkaSessionTimeout);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, this.kafkaKeyDeserializerClass);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.kafkaValueDeserializerClass);
    props.put(Constants.KAFKA_POLL_TIMEOUT, this.kafkaPollTimeout);
    props.put(Constants.KAFKA_SUBSCRIBED_TOPICS, this.kafkaSubscribedTopics);
    return props;
}
项目:kafka-0.11.0.0-src-with-comment    文件:WorkerSinkTask.java   
private KafkaConsumer<byte[], byte[]> createConsumer() {
    // Include any unknown worker configs so consumer configs can be set globally on the worker
    // and through to the task
    Map<String, Object> props = new HashMap<>();

    props.put(ConsumerConfig.GROUP_ID_CONFIG, SinkUtils.consumerGroupId(id.connector()));
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");

    props.putAll(workerConfig.originalsWithPrefix("consumer."));

    KafkaConsumer<byte[], byte[]> newConsumer;
    try {
        newConsumer = new KafkaConsumer<>(props);
    } catch (Throwable t) {
        throw new ConnectException("Failed to create consumer", t);
    }

    return newConsumer;
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConfigBackingStore.java   
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) {
    Map<String, Object> producerProps = new HashMap<>();
    producerProps.putAll(config.originals());
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

    Map<String, Object> consumerProps = new HashMap<>();
    consumerProps.putAll(config.originals());
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

    Map<String, Object> adminProps = new HashMap<>(config.originals());
    NewTopic topicDescription = TopicAdmin.defineTopic(topic).
            compacted().
            partitions(1).
            replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)).
            build();

    return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaOffsetBackingStoreTest.java   
@Test
public void testStartStop() throws Exception {
    expectConfigure();
    expectStart(Collections.EMPTY_LIST);
    expectStop();

    PowerMock.replayAll();

    store.configure(DEFAULT_DISTRIBUTED_CONFIG);
    assertEquals(TOPIC, capturedTopic.getValue());
    assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
    assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
    assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
    assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));

    assertEquals(TOPIC, capturedNewTopic.getValue().name());
    assertEquals(TOPIC_PARTITIONS, capturedNewTopic.getValue().numPartitions());
    assertEquals(TOPIC_REPLICATION_FACTOR, capturedNewTopic.getValue().replicationFactor());

    store.start();
    store.stop();

    PowerMock.verifyAll();
}
项目:kafka-0.11.0.0-src-with-comment    文件:PipeDemo.java   
public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    KStreamBuilder builder = new KStreamBuilder();

    builder.stream("streams-file-input").to("streams-pipe-output");

    KafkaStreams streams = new KafkaStreams(builder, props);
    streams.start();

    // usually the stream application would be running forever,
    // in this example we just let it run for some time and stop since the input data is finite.
    Thread.sleep(5000L);

    streams.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamsConfig.java   
private Map<String, Object> getCommonConsumerConfigs() throws ConfigException {
    final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());

    // disable auto commit and throw exception if there is user overridden values,
    // this is necessary for streams commit semantics
    if (clientProvidedProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
        throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
            + ", as the streams client will always turn off auto committing.");
    }
    if (eosEnabled) {
        if (clientProvidedProps.containsKey(ConsumerConfig.ISOLATION_LEVEL_CONFIG)) {
            throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ISOLATION_LEVEL_CONFIG
                + "; because " + PROCESSING_GUARANTEE_CONFIG + " is set to '" + EXACTLY_ONCE + "' consumers will always read committed data only.");
        }
    }

    final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
    consumerProps.putAll(clientProvidedProps);

    // bootstrap.servers should be from StreamsConfig
    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG));
    // remove deprecate ZK config
    consumerProps.remove(ZOOKEEPER_CONNECT_CONFIG);

    return consumerProps;
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamsConfig.java   
/**
 * Get the configs to the {@link KafkaConsumer consumer}.
 * Properties using the prefix {@link #CONSUMER_PREFIX} will be used in favor over their non-prefixed versions
 * except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} where we always use the non-prefixed
 * version as we only support reading/writing from/to the same Kafka Cluster.
 *
 * @param streamThread the {@link StreamThread} creating a consumer
 * @param groupId      consumer groupId
 * @param clientId     clientId
 * @return Map of the consumer configuration.
 * @throws ConfigException if {@code "enable.auto.commit"} was set to {@code false} by the user
 */
public Map<String, Object> getConsumerConfigs(final StreamThread streamThread,
                                              final String groupId,
                                              final String clientId) throws ConfigException {
    final Map<String, Object> consumerProps = getCommonConsumerConfigs();

    // add client id with stream client id prefix, and group id
    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");

    // add configs required for stream partition assignor
    consumerProps.put(InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
    consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
    consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
    consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName());
    consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));

    consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));

    return consumerProps;
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamAggregationIntegrationTest.java   
private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
                                                        keyDeserializer,
                                                    final Deserializer<V>
                                                        valueDeserializer,
                                                    final int numMessages)
    throws InterruptedException {
    final Properties consumerProperties = new Properties();
    consumerProperties
        .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo);
    consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
    consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
    return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
        consumerProperties,
        outputTopic,
        numMessages,
        60 * 1000);

}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamsFineGrainedAutoResetIntegrationTest.java   
@Before
public void setUp() throws Exception {

    Properties props = new Properties();
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);

    streamsConfiguration = StreamsTestUtils.getStreamsConfig(
            "testAutoOffsetId",
            CLUSTER.bootstrapServers(),
            STRING_SERDE_CLASSNAME,
            STRING_SERDE_CLASSNAME,
            props);

    // Remove any state from previous test runs
    IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamRepartitionJoinTest.java   
@Before
public void before() throws InterruptedException {
    testNo++;
    String applicationId = "kstream-repartition-join-test-" + testNo;
    builder = new KStreamBuilder();
    createTopics();
    streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
    streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
    streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);

    streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput);
    streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput);
    streamFour = builder.stream(Serdes.Integer(), Serdes.String(), streamFourInput);

    keyMapper = MockKeyValueMapper.SelectValueKeyValueMapper();
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamRepartitionJoinTest.java   
private List<String> receiveMessages(final Deserializer<?> valueDeserializer,
                                     final int numMessages, final String topic) throws InterruptedException {

    final Properties config = new Properties();

    config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kstream-test");
    config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        IntegerDeserializer.class.getName());
    config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        valueDeserializer.getClass().getName());
    final List<String> received = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(
        config,
        topic,
        numMessages,
        60 * 1000);
    Collections.sort(received);

    return received;
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamKTableJoinIntegrationTest.java   
@Before
public void before() throws InterruptedException {
    testNo++;
    userClicksTopic = "user-clicks-" + testNo;
    userRegionsTopic = "user-regions-" + testNo;
    userRegionsStoreName = "user-regions-store-name-" + testNo;
    outputTopic = "output-topic-" + testNo;
    CLUSTER.createTopics(userClicksTopic, userRegionsTopic, outputTopic);
    streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-integration-test-" + testNo);
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
        TestUtils.tempDirectory().getPath());
    streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);


}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerCoordinatorTest.java   
@Test
public void testAutoCommitDynamicAssignment() {
    final String consumerId = "consumer";

    ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
                                                       ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);

    subscriptions.subscribe(singleton(topic1), rebalanceListener);

    client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
    coordinator.ensureCoordinatorReady();

    client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
    client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
    coordinator.joinGroupIfNeeded();

    subscriptions.seek(t1p, 100);

    client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
    time.sleep(autoCommitIntervalMs);
    coordinator.poll(time.milliseconds(), Long.MAX_VALUE);

    assertEquals(100L, subscriptions.committed(t1p).offset());
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamAggregationDedupIntegrationTest.java   
private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
                                                        keyDeserializer,
                                                    final Deserializer<V>
                                                        valueDeserializer,
                                                    final int numMessages)
    throws InterruptedException {
    final Properties consumerProperties = new Properties();
    consumerProperties
        .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" +
        testNo);
    consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        keyDeserializer.getClass().getName());
    consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        valueDeserializer.getClass().getName());
    return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties,
        outputTopic,
        numMessages,
        60 * 1000);

}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamsTestUtils.java   
public static Properties getStreamsConfig(final String applicationId,
                                          final String bootstrapServers,
                                          final String keySerdeClassName,
                                          final String valueSerdeClassName,
                                          final Properties additional) {

    Properties streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    streamsConfiguration.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerdeClassName);
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerdeClassName);
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
    streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
    streamsConfiguration.putAll(additional);
    return streamsConfiguration;

}
项目:kafka-0.11.0.0-src-with-comment    文件:JoinIntegrationTest.java   
@BeforeClass
public static void setupConfigsAndUtils() throws Exception {
    PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
    PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
    PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
    PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-result-consumer");
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
    STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
    STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
    STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
}
项目:kafka-0.11.0.0-src-with-comment    文件:ResetIntegrationTest.java   
private Properties prepareTest() throws Exception {
    final Properties streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + testNo);
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
    streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
    streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
    streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);

    IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);

    return streamsConfiguration;
}
项目:kafka-0.11.0.0-src-with-comment    文件:ResetIntegrationTest.java   
private void cleanGlobal(final String intermediateUserTopic) {
    final String[] parameters;
    if (intermediateUserTopic != null) {
        parameters = new String[]{
            "--application-id", APP_ID + testNo,
            "--bootstrap-servers", CLUSTER.bootstrapServers(),
            "--zookeeper", CLUSTER.zKConnectString(),
            "--input-topics", INPUT_TOPIC,
            "--intermediate-topics", INTERMEDIATE_USER_TOPIC
        };
    } else {
        parameters = new String[]{
            "--application-id", APP_ID + testNo,
            "--bootstrap-servers", CLUSTER.bootstrapServers(),
            "--zookeeper", CLUSTER.zKConnectString(),
            "--input-topics", INPUT_TOPIC
        };
    }
    final Properties cleanUpConfig = new Properties();
    cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
    cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);

    final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
    Assert.assertEquals(0, exitCode);
}
项目:kafka-0.11.0.0-src-with-comment    文件:EosIntegrationTest.java   
private List<KeyValue<Long, Long>> readResult(final int numberOfRecords,
                                              final String groupId) throws Exception {
    if (groupId != null) {
        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
            TestUtils.consumerConfig(
                CLUSTER.bootstrapServers(),
                groupId,
                LongDeserializer.class,
                LongDeserializer.class,
                new Properties() {
                    {
                        put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
                    }
                }),
            SINGLE_PARTITION_OUTPUT_TOPIC,
            numberOfRecords
        );
    }

    // read uncommitted
    return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
        TestUtils.consumerConfig(CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class),
        SINGLE_PARTITION_OUTPUT_TOPIC,
        numberOfRecords
    );
}
项目:kafka-0.11.0.0-src-with-comment    文件:SimpleBenchmark.java   
private Properties setStreamProperties(final String applicationId) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
    props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
    props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    // the socket buffer needs to be large, especially when running in AWS with
    // high latency. if running locally the default is fine.
    props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
    props.put(StreamsConfig.POLL_MS_CONFIG, POLL_MS);
    return props;
}
项目:kafka-0.11.0.0-src-with-comment    文件:SimpleBenchmark.java   
private Properties setProduceConsumeProperties(final String clientId) {
    Properties props = new Properties();
    props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
    // the socket buffer needs to be large, especially when running in AWS with
    // high latency. if running locally the default is fine.
    props.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    // the socket buffer needs to be large, especially when running in AWS with
    // high latency. if running locally the default is fine.
    props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
    return props;
}
项目:kmq    文件:KafkaClients.java   
public <K, V> KafkaConsumer<K, V> createConsumer(String groupId,
                                                 Class<? extends Deserializer<K>> keyDeserializer,
                                                 Class<? extends Deserializer<V>> valueDeserializer) {
    Properties props = new Properties();
    props.put("bootstrap.servers", bootstrapServers);
    props.put("enable.auto.commit", "false");
    props.put("key.deserializer", keyDeserializer.getName());
    props.put("value.deserializer", valueDeserializer.getName());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    if (groupId != null) {
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    }

    return new KafkaConsumer<>(props);
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamsConfigTest.java   
@Test
public void shouldSupportPrefixedRestoreConsumerConfigs() throws Exception {
    props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
    props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
    final StreamsConfig streamsConfig = new StreamsConfig(props);
    final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
    assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
    assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
}
项目:cruise-control    文件:CruiseControlMetricsReporterTest.java   
@Test
public void testReportingMetrics() {
  Properties props = new Properties();
  props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
  props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MetricSerde.class.getName());
  props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testReportingMetrics");
  props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  setSecurityConfigs(props, "consumer");
  Consumer<String, CruiseControlMetric> consumer = new KafkaConsumer<>(props);

  ConsumerRecords<String, CruiseControlMetric> records = ConsumerRecords.empty();
  consumer.subscribe(Collections.singletonList(TOPIC));
  long startMs = System.currentTimeMillis();
  Set<Integer> metricTypes = new HashSet<>();
  while (metricTypes.size() < 16 && System.currentTimeMillis() < startMs + 15000) {
    records = consumer.poll(10);
    for (ConsumerRecord<String, CruiseControlMetric> record : records) {
      metricTypes.add((int) record.value().metricType().id());
    }
  }
  HashSet<Integer> expectedMetricTypes = new HashSet<>(Arrays.asList((int) ALL_TOPIC_BYTES_IN.id(),
                                                                     (int) ALL_TOPIC_BYTES_OUT.id(),
                                                                     (int) TOPIC_BYTES_IN.id(),
                                                                     (int) TOPIC_BYTES_OUT.id(),
                                                                     (int) PARTITION_SIZE.id(),
                                                                     (int) BROKER_CPU_UTIL.id(),
                                                                     (int) ALL_TOPIC_PRODUCE_REQUEST_RATE.id(),
                                                                     (int) ALL_TOPIC_FETCH_REQUEST_RATE.id(),
                                                                     (int) ALL_TOPIC_MESSAGES_IN_PER_SEC.id(),
                                                                     (int) TOPIC_PRODUCE_REQUEST_RATE.id(),
                                                                     (int) TOPIC_FETCH_REQUEST_RATE.id(),
                                                                     (int) TOPIC_MESSAGES_IN_PER_SEC.id(),
                                                                     (int) BROKER_PRODUCE_REQUEST_RATE.id(),
                                                                     (int) BROKER_CONSUMER_FETCH_REQUEST_RATE.id(),
                                                                     (int) BROKER_FOLLOWER_FETCH_REQUEST_RATE.id(),
                                                                     (int) BROKER_REQUEST_HANDLER_AVG_IDLE_PERCENT.id()));
  assertEquals("Expected to see " + expectedMetricTypes + ", but only see " + metricTypes, metricTypes, expectedMetricTypes);
}
项目:nighthawk    文件:ListenableTracingConsumerTest.java   
@Test
public void test() throws Exception {
    Properties props = new Properties();
    props.put("bootstrap.servers", "127.0.0.1:9092");//该地址是集群的子集,用来探测集群。
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
    props.put("acks", "all");// 记录完整提交,最慢的但是最大可能的持久化
    props.put("retries", 3);// 请求失败重试的次数
    props.put("batch.size", 16384);// batch的大小
    props.put("linger.ms", 1);// 默认情况即使缓冲区有剩余的空间,也会立即发送请求,设置一段时间用来等待从而将缓冲区填的更多,单位为毫秒,producer发送数据会延迟1ms,可以减少发送到kafka服务器的请求数据
    props.put("buffer.memory", 33554432);// 提供给生产者缓冲内存总量
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer.tracing.codec", Codec.JSON);
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    Consumer<String, byte[]> consumer = new KafkaConsumer<>(props);
    ListenableTracingConsumer<String, String> listenableTracingConsumer =
            new ListenableTracingConsumer<>(consumer, Pattern.compile("test"), new StringDeserializer());
    BraveFactoryBean factoryBean = new BraveFactoryBean();
    factoryBean.setServiceName("kafka-test");
    factoryBean.setTransport("http");
    factoryBean.setTransportAddress("127.0.0.1:9411");
    factoryBean.afterPropertiesSet();
    Brave brave = factoryBean.getObject();
    listenableTracingConsumer.addListener(new AbstractTracingListener<String, String>(brave) {
        @Override
        public void onPayload(Payload<String, String> payload) {
            try {
                System.out.println("key: " + payload.key());
                System.out.println("value: " + payload.value());
                Sleeper.JUST.sleepFor(2000, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    });
    listenableTracingConsumer.start();
    System.in.read();
}
项目:mtgo-best-bot    文件:BotConfig.java   
@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    // allows a pool of processes to divide the work of consuming and processing records
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "bot");

    return props;
}