Java 类org.apache.kafka.common.serialization.ByteArrayDeserializer 实例源码

项目:ja-micro    文件:Consumer.java   
Consumer(Topic topic, String consumerGroupId, Properties props, PartitionProcessorFactory processorFactory) {
    this.topic = topic;
    this.consumerGroupId = consumerGroupId;

    // Mandatory settings, not changeable
    props.put("group.id", consumerGroupId);
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", ByteArrayDeserializer.class.getName());

    kafka = new KafkaConsumer<>(props);
    partitions = new AssignedPartitions(processorFactory);

    long now = System.currentTimeMillis();

    // start it
    consumerLoopExecutor.execute(new ConsumerLoop());
}
项目:kafka-junit    文件:KafkaTestUtils.java   
/**
 * This will consume all records from all partitions on the given topic.
 * @param topic Topic to consume from.
 * @return List of ConsumerRecords consumed.
 */
public List<ConsumerRecord<byte[], byte[]>> consumeAllRecordsFromTopic(final String topic) {
    // Connect to broker to determine what partitions are available.
    KafkaConsumer<byte[], byte[]> kafkaConsumer = kafkaTestServer.getKafkaConsumer(
        ByteArrayDeserializer.class,
        ByteArrayDeserializer.class
    );

    final List<Integer> partitionIds = new ArrayList<>();
    for (PartitionInfo partitionInfo: kafkaConsumer.partitionsFor(topic)) {
        partitionIds.add(partitionInfo.partition());
    }
    kafkaConsumer.close();

    return consumeAllRecordsFromTopic(topic, partitionIds);
}
项目:kafka-junit    文件:KafkaTestUtils.java   
/**
 * This will consume all records from only the partitions given.
 * @param topic Topic to consume from.
 * @param partitionIds Collection of PartitionIds to consume.
 * @return List of ConsumerRecords consumed.
 */
public List<ConsumerRecord<byte[], byte[]>> consumeAllRecordsFromTopic(final String topic, Collection<Integer> partitionIds) {
    // Create topic Partitions
    List<TopicPartition> topicPartitions = new ArrayList<>();
    for (Integer partitionId: partitionIds) {
        topicPartitions.add(new TopicPartition(topic, partitionId));
    }

    // Connect Consumer
    KafkaConsumer<byte[], byte[]> kafkaConsumer =
        kafkaTestServer.getKafkaConsumer(ByteArrayDeserializer.class, ByteArrayDeserializer.class);

    // Assign topic partitions & seek to head of them
    kafkaConsumer.assign(topicPartitions);
    kafkaConsumer.seekToBeginning(topicPartitions);

    // Pull records from kafka, keep polling until we get nothing back
    final List<ConsumerRecord<byte[], byte[]>> allRecords = new ArrayList<>();
    ConsumerRecords<byte[], byte[]> records;
    do {
        // Grab records from kafka
        records = kafkaConsumer.poll(2000L);
        logger.info("Found {} records in kafka", records.count());

        // Add to our array list
        records.forEach(allRecords::add);

    }
    while (!records.isEmpty());

    // close consumer
    kafkaConsumer.close();

    // return all records
    return allRecords;
}
项目:kafka-webview    文件:DataLoaderConfig.java   
/**
 * Creates default message formats.
 */
private void createDefaultMessageFormats() {
    final Map<String, String> defaultFormats = new HashMap<>();
    defaultFormats.put("Short", ShortDeserializer.class.getName());
    defaultFormats.put("ByteArray", ByteArrayDeserializer.class.getName());
    defaultFormats.put("Bytes", BytesDeserializer.class.getName());
    defaultFormats.put("Double", DoubleDeserializer.class.getName());
    defaultFormats.put("Float", FloatDeserializer.class.getName());
    defaultFormats.put("Integer", IntegerDeserializer.class.getName());
    defaultFormats.put("Long", LongDeserializer.class.getName());
    defaultFormats.put("String", StringDeserializer.class.getName());

    // Create if needed.
    for (final Map.Entry<String, String> entry : defaultFormats.entrySet()) {
        MessageFormat messageFormat = messageFormatRepository.findByName(entry.getKey());
        if (messageFormat == null) {
            messageFormat = new MessageFormat();
        }
        messageFormat.setName(entry.getKey());
        messageFormat.setClasspath(entry.getValue());
        messageFormat.setJar("n/a");
        messageFormat.setDefaultFormat(true);
        messageFormatRepository.save(messageFormat);
    }
}
项目:talk-kafka-messaging-logs    文件:ProduceConsumeLongByteArrayRecord.java   
private static void consumeRecords(String bootstrapServers) {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "byte-array-consumer");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

    Consumer<Long, byte[]> consumer = new KafkaConsumer<>(props);

    consumer.subscribe(Arrays.asList(TOPIC));

    ConsumerRecords<Long, byte[]> records = consumer.poll(10000);

    for (ConsumerRecord<Long, byte[]> record : records)
        out.printf(
                "key = %s value = %s%n",
                record.key(),
                new String(record.value()));

    consumer.close();
}
项目:talk-kafka-messaging-logs    文件:ProduceConsumeStringAvroRecord.java   
private static void consumeRecords(String bootstrapServers) {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro-consumer");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

    Consumer<String, byte[]> consumer = new KafkaConsumer<>(props);

    consumer.subscribe(Arrays.asList(TOPIC));

    ConsumerRecords<String, byte[]> records = consumer.poll(10000);

    for (ConsumerRecord<String, byte[]> record : records)
        out.printf(
                "key = %s value = %s%n",
                record.key(),
                UserAvroSerdes.deserialize(record.value()).getName().toString());

    consumer.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConfigBackingStore.java   
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) {
    Map<String, Object> producerProps = new HashMap<>();
    producerProps.putAll(config.originals());
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

    Map<String, Object> consumerProps = new HashMap<>();
    consumerProps.putAll(config.originals());
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

    Map<String, Object> adminProps = new HashMap<>(config.originals());
    NewTopic topicDescription = TopicAdmin.defineTopic(topic).
            compacted().
            partitions(1).
            replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)).
            build();

    return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps);
}
项目:kafka-0.11.0.0-src-with-comment    文件:WindowedStreamPartitionerTest.java   
@Test
public void testWindowedDeserializerNoArgConstructors() {
    Map<String, String> props = new HashMap<>();
    // test key[value].deserializer.inner.class takes precedence over serializer.inner.class
    WindowedDeserializer<StringSerializer> windowedDeserializer = new WindowedDeserializer<>();
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
    props.put("key.deserializer.inner.class", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("deserializer.inner.class", "org.apache.kafka.common.serialization.StringDeserializer");
    windowedDeserializer.configure(props, true);
    Deserializer<?> inner = windowedDeserializer.innerDeserializer();
    assertNotNull("Inner deserializer should be not null", inner);
    assertTrue("Inner deserializer type should be StringDeserializer", inner instanceof StringDeserializer);
    // test deserializer.inner.class
    props.put("deserializer.inner.class", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    props.remove("key.deserializer.inner.class");
    props.remove("value.deserializer.inner.class");
    WindowedDeserializer<?> windowedDeserializer1 = new WindowedDeserializer<>();
    windowedDeserializer1.configure(props, false);
    Deserializer<?> inner1 = windowedDeserializer1.innerDeserializer();
    assertNotNull("Inner deserializer should be not null", inner1);
    assertTrue("Inner deserializer type should be ByteArrayDeserializer", inner1 instanceof ByteArrayDeserializer);
}
项目:kafka-0.11.0.0-src-with-comment    文件:SimpleBenchmark.java   
private Properties setProduceConsumeProperties(final String clientId) {
    Properties props = new Properties();
    props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
    // the socket buffer needs to be large, especially when running in AWS with
    // high latency. if running locally the default is fine.
    props.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    // the socket buffer needs to be large, especially when running in AWS with
    // high latency. if running locally the default is fine.
    props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
    return props;
}
项目:kafka-0.11.0.0-src-with-comment    文件:FetcherTest.java   
@Test
public void testListOffsetsSendsIsolationLevel() {
    for (final IsolationLevel isolationLevel : IsolationLevel.values()) {
        Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(),
                new ByteArrayDeserializer(), Integer.MAX_VALUE, isolationLevel);

        subscriptions.assignFromUser(singleton(tp1));
        subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST);

        client.prepareResponse(new MockClient.RequestMatcher() {
            @Override
            public boolean matches(AbstractRequest body) {
                ListOffsetRequest request = (ListOffsetRequest) body;
                return request.isolationLevel() == isolationLevel;
            }
        }, listOffsetResponse(Errors.NONE, 1L, 5L));
        fetcher.updateFetchPositions(singleton(tp1));
        assertFalse(subscriptions.isOffsetResetNeeded(tp1));
        assertTrue(subscriptions.isFetchable(tp1));
        assertEquals(5, subscriptions.position(tp1).longValue());
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConsumerTest.java   
@Test
public void testConstructorClose() throws Exception {
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar.local:9999");
    props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());

    final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
    final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
    try {
        new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
    } catch (KafkaException e) {
        assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
        assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get());
        assertEquals("Failed to construct kafka consumer", e.getMessage());
        return;
    }
    Assert.fail("should have caught an exception and returned");
}
项目:kafka    文件:FetcherTest.java   
private Fetcher<byte[], byte[]> createFetcher(int maxPollRecords,
                                              SubscriptionState subscriptions,
                                              Metrics metrics) {
    return new Fetcher<>(consumerClient,
            minBytes,
            maxWaitMs,
            fetchSize,
            maxPollRecords,
            true, // check crc
            new ByteArrayDeserializer(),
            new ByteArrayDeserializer(),
            metadata,
            subscriptions,
            metrics,
            "consumer" + groupId,
            time,
            retryBackoffMs);
}
项目:kafka    文件:KafkaConsumerTest.java   
@Test
public void testConstructorClose() throws Exception {
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar:9999");
    props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());

    final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
    final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
    try {
        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
                props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
    } catch (KafkaException e) {
        assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
        assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get());
        assertEquals("Failed to construct kafka consumer", e.getMessage());
        return;
    }
    Assert.fail("should have caught an exception and returned");
}
项目:li-apache-kafka-clients    文件:LiKafkaProducerIntegrationTest.java   
@Test
public void testZeroLengthValue() throws Exception {
  Properties producerPropertyOverrides = new Properties();
  producerPropertyOverrides.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

  try (LiKafkaProducer producer =  createProducer(producerPropertyOverrides)) {
    producer.send(new ProducerRecord<>("testZeroLengthValue", "key", new byte[0])).get();
  }
  Properties consumerProps = new Properties();
  consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

  try (LiKafkaConsumer consumer = createConsumer(consumerProps)) {
    consumer.subscribe(Collections.singleton("testZeroLengthValue"));
    long startMs = System.currentTimeMillis();
    ConsumerRecords records = ConsumerRecords.empty();
    while (records.isEmpty() && System.currentTimeMillis() < startMs + 30000) {
      records = consumer.poll(100);
    }
    assertEquals(1, records.count());
    ConsumerRecord record = (ConsumerRecord) records.iterator().next();
    assertEquals("key", record.key());
    assertEquals(((byte[]) record.value()).length, 0);
  }
}
项目:spring-cloud-stream-binder-kafka    文件:KafkaBinderMetrics.java   
private ConsumerFactory<?, ?> createConsumerFactory(String group) {
    if (defaultConsumerFactory != null) {
        return defaultConsumerFactory;
    }
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    if (!ObjectUtils.isEmpty(binderConfigurationProperties.getConsumerConfiguration())) {
        props.putAll(binderConfigurationProperties.getConsumerConfiguration());
    }
    if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                this.binderConfigurationProperties.getKafkaConnectionString());
    }
    props.put("group.id", group);
    return new DefaultKafkaConsumerFactory<>(props);
}
项目:spring-cloud-stream-binder-kafka    文件:KafkaMessageChannelBinder.java   
private ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup,
        ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, anonymous ? "latest" : "earliest");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);

    if (!ObjectUtils.isEmpty(configurationProperties.getConsumerConfiguration())) {
        props.putAll(configurationProperties.getConsumerConfiguration());
    }
    if (ObjectUtils.isEmpty(props.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))) {
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
    }
    if (!ObjectUtils.isEmpty(consumerProperties.getExtension().getConfiguration())) {
        props.putAll(consumerProperties.getExtension().getConfiguration());
    }
    if (!ObjectUtils.isEmpty(consumerProperties.getExtension().getStartOffset())) {
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
                consumerProperties.getExtension().getStartOffset().name());
    }

    return new DefaultKafkaConsumerFactory<>(props);
}
项目:spring-cloud-stream-binder-kafka    文件:KafkaBinderConfiguration.java   
@Bean
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    if (!ObjectUtils.isEmpty(configurationProperties.getConsumerConfiguration())) {
        props.putAll(configurationProperties.getConsumerConfiguration());
    }
    if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
    }
    ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
    KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(kafkaMessageChannelBinder,
            consumerFactory);
    indicator.setTimeout(this.configurationProperties.getHealthTimeout());
    return indicator;
}
项目:spring-cloud-stream-binder-kafka    文件:KafkaBinderTests.java   
@Test
@SuppressWarnings("unchecked")
public void testConsumerDefaultDeserializer() throws Throwable {
    Binding<?> binding = null;
    try {
        KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
        String testTopicName = "existing" + System.currentTimeMillis();
        invokeCreateTopic(testTopicName, 5, 1);
        configurationProperties.setAutoCreateTopics(false);
        Binder binder = getBinder(configurationProperties);

        ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
        DirectChannel input = createBindableChannel("input", createConsumerBindingProperties(consumerProperties));

        binding = binder.bindConsumer(testTopicName, "test", input, consumerProperties);
        DirectFieldAccessor consumerAccessor = new DirectFieldAccessor(getKafkaConsumer(binding));
        assertTrue(consumerAccessor.getPropertyValue("keyDeserializer") instanceof ByteArrayDeserializer);
        assertTrue(consumerAccessor.getPropertyValue("valueDeserializer") instanceof ByteArrayDeserializer);
    }
    finally {
        if (binding != null) {
            binding.unbind();
        }
    }
}
项目:beam    文件:BeamKafkaTable.java   
@Override
  public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
    KafkaIO.Read<byte[], byte[]> kafkaRead = null;
    if (topics != null) {
      kafkaRead = KafkaIO.<byte[], byte[]>read()
      .withBootstrapServers(bootstrapServers)
      .withTopics(topics)
      .updateConsumerProperties(configUpdates)
      .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
      .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of());
    } else if (topicPartitions != null) {
      kafkaRead = KafkaIO.<byte[], byte[]>read()
          .withBootstrapServers(bootstrapServers)
          .withTopicPartitions(topicPartitions)
          .updateConsumerProperties(configUpdates)
          .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
          .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of());
    } else {
      throw new IllegalArgumentException("One of topics and topicPartitions must be configurated.");
    }

    return PBegin.in(pipeline).apply("read", kafkaRead.withoutMetadata())
.apply("in_format", getPTransformForInput());
  }
项目:beam    文件:KafkaIOTest.java   
@Test
public void testSourceWithExplicitPartitionsDisplayData() {
  KafkaIO.Read<byte[], Long> read = KafkaIO.<byte[], Long>read()
      .withBootstrapServers("myServer1:9092,myServer2:9092")
      .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5),
          new TopicPartition("test", 6)))
      .withConsumerFactoryFn(new ConsumerFactoryFn(
          Lists.newArrayList("test"), 10, 10, OffsetResetStrategy.EARLIEST)) // 10 partitions
      .withKeyDeserializer(ByteArrayDeserializer.class)
      .withValueDeserializer(LongDeserializer.class);

  DisplayData displayData = DisplayData.from(read);

  assertThat(displayData, hasDisplayItem("topicPartitions", "test-5,test-6"));
  assertThat(displayData, hasDisplayItem("enable.auto.commit", false));
  assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092"));
  assertThat(displayData, hasDisplayItem("auto.offset.reset", "latest"));
  assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288));
}
项目:LaS-VPE-Platform    文件:SystemPropertyCenter.java   
public Map<String, Object> getKafkaParams(String group) {
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, group);
    kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    kafkaParams.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, kafkaMsgMaxBytes);
    kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, kafkaMsgMaxBytes);
    kafkaParams.put("fetch.message.max.bytes", kafkaMsgMaxBytes);
    kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    kafkaParams.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, kafkaMsgMaxBytes);
    kafkaParams.put(ConsumerConfig.SEND_BUFFER_CONFIG, kafkaMsgMaxBytes);
    kafkaParams.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaFetchTimeoutMs);
    kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return kafkaParams;
}
项目:flink    文件:FlinkKafkaConsumer09.java   
/**
 * Makes sure that the ByteArrayDeserializer is registered in the Kafka properties.
 *
 * @param props The Kafka properties to register the serializer in.
 */
private static void setDeserializer(Properties props) {
    final String deSerName = ByteArrayDeserializer.class.getCanonicalName();

    Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
    Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);

    if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
        LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
    }
    if (valDeSer != null && !valDeSer.equals(deSerName)) {
        LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
    }

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
}
项目:flink    文件:FlinkKafkaConsumer09.java   
/**
 * Makes sure that the ByteArrayDeserializer is registered in the Kafka properties.
 * 
 * @param props The Kafka properties to register the serializer in.
 */
private static void setDeserializer(Properties props) {
    final String deSerName = ByteArrayDeserializer.class.getCanonicalName();

    Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
    Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);

    if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
        LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
    }
    if (valDeSer != null && !valDeSer.equals(deSerName)) {
        LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
    }

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
}
项目:ameliant-tools    文件:ConsumerDriverTest.java   
private ConsumerDriver createConsumerDriver(CountDownLatch latch, String topic, int messageCount) {
    Map<String, Object> configs = new ConsumerConfigsBuilder()
            .groupId("bar")
            .bootstrapServers("127.0.0.1:" + broker.getPort())
            .enableAutoCommit(true)
            .autoCommitIntervalMs(1000)
            .sessionTimeoutMs(30000)
            .keyDeserializer(ByteArrayDeserializer.class)
            .valueDeserializer(ByteArrayDeserializer.class)
            .autoOffsetReset(ConsumerConfigsBuilder.OffsetReset.earliest)
            .build();

    ConsumerDefinition consumerDefinition = new ConsumerDefinition();
    consumerDefinition.setConfig(configs);
    consumerDefinition.setTopic(topic);
    consumerDefinition.setMessagesToReceive(messageCount);
    return new ConsumerDriver(consumerDefinition, latch);
}
项目:datacollector    文件:KafkaNewConsumerITBase.java   
@NotNull
protected SdcKafkaConsumer createKafkaConsumer(int port, String topic, Source.Context sourceContext) throws
    StageException {
  Map<String, Object> props = new HashMap<>();
  props.put("auto.commit.interval.ms", "1000");
  props.put("auto.offset.reset", "earliest");
  props.put("session.timeout.ms", "30000");
  props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
  props.put(KafkaConstants.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  props.put(KafkaConstants.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
  SdcKafkaConsumer sdcKafkaConsumer = createSdcKafkaConsumer(
      "localhost:" + port,
      topic,
      1000,
      sourceContext,
      props,
      CONSUMER_GROUP_NAME
  );
  sdcKafkaConsumer.validate(new ArrayList<Stage.ConfigIssue>(), sourceContext);
  sdcKafkaConsumer.init();
  return sdcKafkaConsumer;
}
项目:components    文件:KafkaConnection.java   
public static Map<String, String> createConnMaps(KafkaDatastoreProperties datastore, boolean isBeam) {
    Map<String, String> props = new HashMap<>();
    if (datastore != null) {
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, datastore.brokers.getValue());
        if (!isBeam) {
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        }
        if (datastore.ssl.useSsl.getValue()) {
            props.put("security.protocol", "SSL");
            props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, datastore.ssl.trustStoreType.getValue().toString());
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, datastore.ssl.trustStorePath.getValue());
            props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, datastore.ssl.trustStorePassword.getValue());
            if (datastore.ssl.needClientAuth.getValue()) {
                props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, datastore.ssl.keyStoreType.getValue().toString());
                props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, datastore.ssl.keyStorePath.getValue());
                props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, datastore.ssl.keyStorePassword.getValue());
            }
        }
    }
    return props;
}
项目:Aletheia    文件:KafkaDatumEnvelopeFetcherFactory.java   
private Properties createConsumerConfig(final String brokers,
                                        final String groupId,
                                        final Properties properties) {
  final Properties consumerConfig = (Properties) properties.clone();

  if (consumerConfig.getProperty("value.deserializer") != null
          || consumerConfig.getProperty("key.deserializer") != null) {
    logger.warn("serializer cannot be provided as consumer properties. "
            + "Overriding manually to be the correct serialization type");
  }
  consumerConfig.put("key.deserializer", StringDeserializer.class.getName());
  consumerConfig.put("value.deserializer", ByteArrayDeserializer.class.getName());

  if (consumerConfig.getProperty("enable.auto.commit") != null) {
    logger.warn("enable.auto.commit cannot be provided as consumer properties. "
            + "Please use offset.commit.mode to control offset management mode. see com.outbrain.aletheia.datum.consumption.OffsetCommitMode for supported modes.");
  }
  consumerConfig.put("enable.auto.commit", "false");

  consumerConfig.put("bootstrap.servers", brokers);
  consumerConfig.put("group.id", groupId);

  logger.warn("Using consumer config: {}", consumerConfig);

  return consumerConfig;
}
项目:cruise-control    文件:KafkaSampleStore.java   
protected KafkaConsumer<byte[], byte[]> createConsumers(Map<String, ?> config) {
    Properties consumerProps = new Properties();
    consumerProps.putAll(config);
    long randomToken = RANDOM.nextLong();
    consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                              (String) config.get(KafkaCruiseControlConfig.BOOTSTRAP_SERVERS_CONFIG));
    consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "LiKafkaCruiseControlSampleStore" + randomToken);
    consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_CLIENT_ID + randomToken);
    consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    consumerProps.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.toString(Integer.MAX_VALUE));
    consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
    consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
    return new KafkaConsumer<>(consumerProps);
}
项目:AthenaX    文件:ITestUtil.java   
static KafkaConsumer<byte[], byte[]> getConsumer(String groupName, String brokerList) {
  Properties prop = new Properties();
  prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
  prop.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
  prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupName);
  prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName());
  prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName());
  return new KafkaConsumer<>(prop);
}
项目:storm-dynamic-spout    文件:KafkaConsumerConfigTest.java   
/**
 * Validates that we have some sane default settings.
 */
@Test
public void testForSaneDefaultKafkaConsumerSettings() {
    final List<String> brokerHosts = Lists.newArrayList(
        "broker1:9092", "broker2:9093"
    );
    final String consumerId = "myConsumerId";
    final String topic = "myTopic";

    // Create config
    final KafkaConsumerConfig config = new KafkaConsumerConfig(brokerHosts, consumerId, topic);

    // now do validation on constructor arguments
    assertEquals("Topic set", topic, config.getTopic());
    assertEquals("ConsumerId set", consumerId, config.getConsumerId());
    assertEquals("BrokerHosts set", "broker1:9092,broker2:9093", config.getKafkaConsumerProperty("bootstrap.servers"));

    // Check defaults are sane.
    assertEquals("group.id set", consumerId, config.getKafkaConsumerProperty("group.id"));
    assertEquals("auto.offset.reset set to none", "none", config.getKafkaConsumerProperty("auto.offset.reset"));
    assertEquals(
        "Key Deserializer set to bytes deserializer",
        ByteArrayDeserializer.class.getName(),
        config.getKafkaConsumerProperty("key.deserializer"));
    assertEquals(
        "Value Deserializer set to bytes deserializer",
        ByteArrayDeserializer.class.getName(),
        config.getKafkaConsumerProperty("value.deserializer")
    );
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaStatusBackingStore.java   
@Override
public void configure(final WorkerConfig config) {
    this.topic = config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG);
    if (topic.equals(""))
        throw new ConfigException("Must specify topic for connector status.");

    Map<String, Object> producerProps = new HashMap<>();
    producerProps.putAll(config.originals());
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class

    Map<String, Object> consumerProps = new HashMap<>();
    consumerProps.putAll(config.originals());
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

    Map<String, Object> adminProps = new HashMap<>(config.originals());
    NewTopic topicDescription = TopicAdmin.defineTopic(topic).
            compacted().
            partitions(config.getInt(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG)).
            replicationFactor(config.getShort(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG)).
            build();

    Callback<ConsumerRecord<String, byte[]>> readCallback = new Callback<ConsumerRecord<String, byte[]>>() {
        @Override
        public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
            read(record);
        }
    };
    this.kafkaLog = createKafkaBasedLog(topic, producerProps, consumerProps, readCallback, topicDescription, adminProps);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaOffsetBackingStore.java   
@Override
public void configure(final WorkerConfig config) {
    String topic = config.getString(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG);
    if (topic.equals(""))
        throw new ConfigException("Offset storage topic must be specified");

    data = new HashMap<>();

    Map<String, Object> producerProps = new HashMap<>();
    producerProps.putAll(config.originals());
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

    Map<String, Object> consumerProps = new HashMap<>();
    consumerProps.putAll(config.originals());
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

    Map<String, Object> adminProps = new HashMap<>(config.originals());
    NewTopic topicDescription = TopicAdmin.defineTopic(topic).
            compacted().
            partitions(config.getInt(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG)).
            replicationFactor(config.getShort(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG)).
            build();

    offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminProps);
}
项目:kafka-0.11.0.0-src-with-comment    文件:EosTestDriver.java   
public static void verify(final String kafka) {
    ensureStreamsApplicationDown(kafka);

    final Map<TopicPartition, Long> committedOffsets = getCommittedOffsets(kafka);

    final Properties props = new Properties();
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));

    try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
        final List<TopicPartition> partitions = getAllPartitions(consumer, "data", "echo", "min", "sum");
        consumer.assign(partitions);
        consumer.seekToBeginning(partitions);

        final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition
            = getOutputRecords(consumer, committedOffsets);

        truncate("data", recordPerTopicPerPartition, committedOffsets);

        verifyMin(recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("min"));
        verifySum(recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("sum"));

        verifyAllTransactionFinished(consumer, kafka);

        // do not modify: required test output
        System.out.println("ALL-RECORDS-DELIVERED");
    } catch (final Exception e) {
        e.printStackTrace(System.err);
        System.out.println("FAILED");
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:EosTestDriver.java   
private static Map<TopicPartition, Long> getCommittedOffsets(final String kafka) {
    final Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, EosTestClient.APP_ID);
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "OffsetsClient");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

    final Map<TopicPartition, Long> committedOffsets = new HashMap<>();
    try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
        final Set<String> topics = new HashSet<>();
        topics.add("data");
        consumer.subscribe(topics);
        consumer.poll(0);

        final Set<TopicPartition> partitions = new HashSet<>();
        for (final String topic : topics) {
            for (final PartitionInfo partition : consumer.partitionsFor(topic)) {
                partitions.add(new TopicPartition(partition.topic(), partition.partition()));
            }
        }

        for (final TopicPartition tp : partitions) {
            final long offset = consumer.position(tp);
            committedOffsets.put(tp, offset);
        }
    }

    return committedOffsets;
}
项目:kafka-0.11.0.0-src-with-comment    文件:FetcherTest.java   
@Test
public void testFetchedRecordsRaisesOnSerializationErrors() {
    // raise an exception from somewhere in the middle of the fetch response
    // so that we can verify that our position does not advance after raising
    ByteArrayDeserializer deserializer = new ByteArrayDeserializer() {
        int i = 0;
        @Override
        public byte[] deserialize(String topic, byte[] data) {
            if (i++ % 2 == 1) {
                // Should be blocked on the value deserialization of the first record.
                assertEquals(new String(data, StandardCharsets.UTF_8), "value-1");
                throw new SerializationException();
            }
            return data;
        }
    };

    Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), deserializer, deserializer);

    subscriptions.assignFromUser(singleton(tp1));
    subscriptions.seek(tp1, 1);

    client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(tp1, this.records, Errors.NONE, 100L, 0));

    assertEquals(1, fetcher.sendFetches());
    consumerClient.poll(0);
    // The fetcher should block on Deserialization error
    for (int i = 0; i < 2; i++) {
        try {
            fetcher.fetchedRecords();
            fail("fetchedRecords should have raised");
        } catch (SerializationException e) {
            // the position should not advance since no data has been returned
            assertEquals(1, subscriptions.position(tp1).longValue());
        }
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:FetcherTest.java   
@Test
public void testReadCommittedLagMetric() {
    Metrics metrics = new Metrics();
    fetcher = createFetcher(subscriptions, metrics, new ByteArrayDeserializer(),
            new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);

    subscriptions.assignFromUser(singleton(tp1));
    subscriptions.seek(tp1, 0);

    MetricName maxLagMetric = metrics.metricInstance(metricsRegistry.recordsLagMax);
    MetricName partitionLagMetric = metrics.metricName(tp1 + ".records-lag", metricGroup);

    Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
    KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric);

    // recordsFetchLagMax should be initialized to negative infinity
    assertEquals(Double.NEGATIVE_INFINITY, recordsFetchLagMax.value(), EPSILON);

    // recordsFetchLagMax should be lso - fetchOffset after receiving an empty FetchResponse
    fetchRecords(tp1, MemoryRecords.EMPTY, Errors.NONE, 100L, 50L, 0);
    assertEquals(50, recordsFetchLagMax.value(), EPSILON);

    KafkaMetric partitionLag = allMetrics.get(partitionLagMetric);
    assertEquals(50, partitionLag.value(), EPSILON);

    // recordsFetchLagMax should be lso - offset of the last message after receiving a non-empty FetchResponse
    MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
            TimestampType.CREATE_TIME, 0L);
    for (int v = 0; v < 3; v++)
        builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
    fetchRecords(tp1, builder.build(), Errors.NONE, 200L, 150L, 0);
    assertEquals(147, recordsFetchLagMax.value(), EPSILON);
    assertEquals(147, partitionLag.value(), EPSILON);

    // verify de-registration of partition lag
    subscriptions.unsubscribe();
    assertFalse(allMetrics.containsKey(partitionLagMetric));
}
项目:kafka-0.11.0.0-src-with-comment    文件:FetcherTest.java   
@Test
public void testSkippingAbortedTransactions() {
    Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(),
            new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    int currentOffset = 0;

    currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset,
            new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()),
            new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));

    abortTransaction(buffer, 1L, currentOffset);

    buffer.flip();

    List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
    abortedTransactions.add(new FetchResponse.AbortedTransaction(1, 0));
    MemoryRecords records = MemoryRecords.readableRecords(buffer);
    subscriptions.assignFromUser(singleton(tp1));

    subscriptions.seek(tp1, 0);

    // normal fetch
    assertEquals(1, fetcher.sendFetches());
    assertFalse(fetcher.hasCompletedFetches());

    client.prepareResponse(fetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
    consumerClient.poll(0);
    assertTrue(fetcher.hasCompletedFetches());

    Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords();
    assertFalse(fetchedRecords.containsKey(tp1));
}
项目:kafka-0.11.0.0-src-with-comment    文件:FetcherTest.java   
@Test
public void testReturnCommittedTransactions() {
    Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(),
            new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    int currentOffset = 0;

    currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset,
            new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()),
            new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));

    currentOffset += commitTransaction(buffer, 1L, currentOffset);
    buffer.flip();

    List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
    MemoryRecords records = MemoryRecords.readableRecords(buffer);
    subscriptions.assignFromUser(singleton(tp1));

    subscriptions.seek(tp1, 0);

    // normal fetch
    assertEquals(1, fetcher.sendFetches());
    assertFalse(fetcher.hasCompletedFetches());
    client.prepareResponse(new MockClient.RequestMatcher() {
        @Override
        public boolean matches(AbstractRequest body) {
            FetchRequest request = (FetchRequest) body;
            assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel());
            return true;
        }
    }, fetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));

    consumerClient.poll(0);
    assertTrue(fetcher.hasCompletedFetches());

    Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords();
    assertTrue(fetchedRecords.containsKey(tp1));
    assertEquals(fetchedRecords.get(tp1).size(), 2);
}
项目:kafka-0.11.0.0-src-with-comment    文件:FetcherTest.java   
@Test
public void testReturnAbortedTransactionsinUncommittedMode() {
    Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(),
            new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    int currentOffset = 0;

    currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset,
            new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()),
            new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));

    abortTransaction(buffer, 1L, currentOffset);

    buffer.flip();

    List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
    abortedTransactions.add(new FetchResponse.AbortedTransaction(1, 0));
    MemoryRecords records = MemoryRecords.readableRecords(buffer);
    subscriptions.assignFromUser(singleton(tp1));

    subscriptions.seek(tp1, 0);

    // normal fetch
    assertEquals(1, fetcher.sendFetches());
    assertFalse(fetcher.hasCompletedFetches());

    client.prepareResponse(fetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
    consumerClient.poll(0);
    assertTrue(fetcher.hasCompletedFetches());

    Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords();
    assertTrue(fetchedRecords.containsKey(tp1));
}
项目:kafka-0.11.0.0-src-with-comment    文件:FetcherTest.java   
@Test
public void testConsumerPositionUpdatedWhenSkippingAbortedTransactions() {
    Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(),
            new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    long currentOffset = 0;

    currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset,
            new SimpleRecord(time.milliseconds(), "abort1-1".getBytes(), "value".getBytes()),
            new SimpleRecord(time.milliseconds(), "abort1-2".getBytes(), "value".getBytes()));

    currentOffset += abortTransaction(buffer, 1L, currentOffset);
    buffer.flip();

    List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
    abortedTransactions.add(new FetchResponse.AbortedTransaction(1, 0));
    MemoryRecords records = MemoryRecords.readableRecords(buffer);
    subscriptions.assignFromUser(singleton(tp1));

    subscriptions.seek(tp1, 0);

    // normal fetch
    assertEquals(1, fetcher.sendFetches());
    assertFalse(fetcher.hasCompletedFetches());

    client.prepareResponse(fetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
    consumerClient.poll(0);
    assertTrue(fetcher.hasCompletedFetches());

    Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords();

    // Ensure that we don't return any of the aborted records, but yet advance the consumer position.
    assertFalse(fetchedRecords.containsKey(tp1));
    assertEquals(currentOffset, (long) subscriptions.position(tp1));
}