Consumer(Topic topic, String consumerGroupId, Properties props, PartitionProcessorFactory processorFactory) { this.topic = topic; this.consumerGroupId = consumerGroupId; // Mandatory settings, not changeable props.put("group.id", consumerGroupId); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); kafka = new KafkaConsumer<>(props); partitions = new AssignedPartitions(processorFactory); long now = System.currentTimeMillis(); // start it consumerLoopExecutor.execute(new ConsumerLoop()); }
/** * This will consume all records from all partitions on the given topic. * @param topic Topic to consume from. * @return List of ConsumerRecords consumed. */ public List<ConsumerRecord<byte[], byte[]>> consumeAllRecordsFromTopic(final String topic) { // Connect to broker to determine what partitions are available. KafkaConsumer<byte[], byte[]> kafkaConsumer = kafkaTestServer.getKafkaConsumer( ByteArrayDeserializer.class, ByteArrayDeserializer.class ); final List<Integer> partitionIds = new ArrayList<>(); for (PartitionInfo partitionInfo: kafkaConsumer.partitionsFor(topic)) { partitionIds.add(partitionInfo.partition()); } kafkaConsumer.close(); return consumeAllRecordsFromTopic(topic, partitionIds); }
/** * This will consume all records from only the partitions given. * @param topic Topic to consume from. * @param partitionIds Collection of PartitionIds to consume. * @return List of ConsumerRecords consumed. */ public List<ConsumerRecord<byte[], byte[]>> consumeAllRecordsFromTopic(final String topic, Collection<Integer> partitionIds) { // Create topic Partitions List<TopicPartition> topicPartitions = new ArrayList<>(); for (Integer partitionId: partitionIds) { topicPartitions.add(new TopicPartition(topic, partitionId)); } // Connect Consumer KafkaConsumer<byte[], byte[]> kafkaConsumer = kafkaTestServer.getKafkaConsumer(ByteArrayDeserializer.class, ByteArrayDeserializer.class); // Assign topic partitions & seek to head of them kafkaConsumer.assign(topicPartitions); kafkaConsumer.seekToBeginning(topicPartitions); // Pull records from kafka, keep polling until we get nothing back final List<ConsumerRecord<byte[], byte[]>> allRecords = new ArrayList<>(); ConsumerRecords<byte[], byte[]> records; do { // Grab records from kafka records = kafkaConsumer.poll(2000L); logger.info("Found {} records in kafka", records.count()); // Add to our array list records.forEach(allRecords::add); } while (!records.isEmpty()); // close consumer kafkaConsumer.close(); // return all records return allRecords; }
/** * Creates default message formats. */ private void createDefaultMessageFormats() { final Map<String, String> defaultFormats = new HashMap<>(); defaultFormats.put("Short", ShortDeserializer.class.getName()); defaultFormats.put("ByteArray", ByteArrayDeserializer.class.getName()); defaultFormats.put("Bytes", BytesDeserializer.class.getName()); defaultFormats.put("Double", DoubleDeserializer.class.getName()); defaultFormats.put("Float", FloatDeserializer.class.getName()); defaultFormats.put("Integer", IntegerDeserializer.class.getName()); defaultFormats.put("Long", LongDeserializer.class.getName()); defaultFormats.put("String", StringDeserializer.class.getName()); // Create if needed. for (final Map.Entry<String, String> entry : defaultFormats.entrySet()) { MessageFormat messageFormat = messageFormatRepository.findByName(entry.getKey()); if (messageFormat == null) { messageFormat = new MessageFormat(); } messageFormat.setName(entry.getKey()); messageFormat.setClasspath(entry.getValue()); messageFormat.setJar("n/a"); messageFormat.setDefaultFormat(true); messageFormatRepository.save(messageFormat); } }
private static void consumeRecords(String bootstrapServers) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "byte-array-consumer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Consumer<Long, byte[]> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(TOPIC)); ConsumerRecords<Long, byte[]> records = consumer.poll(10000); for (ConsumerRecord<Long, byte[]> record : records) out.printf( "key = %s value = %s%n", record.key(), new String(record.value())); consumer.close(); }
private static void consumeRecords(String bootstrapServers) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro-consumer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Consumer<String, byte[]> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(TOPIC)); ConsumerRecords<String, byte[]> records = consumer.poll(10000); for (ConsumerRecord<String, byte[]> record : records) out.printf( "key = %s value = %s%n", record.key(), UserAvroSerdes.deserialize(record.value()).getName().toString()); consumer.close(); }
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) { Map<String, Object> producerProps = new HashMap<>(); producerProps.putAll(config.originals()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); Map<String, Object> consumerProps = new HashMap<>(); consumerProps.putAll(config.originals()); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Map<String, Object> adminProps = new HashMap<>(config.originals()); NewTopic topicDescription = TopicAdmin.defineTopic(topic). compacted(). partitions(1). replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)). build(); return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps); }
@Test public void testWindowedDeserializerNoArgConstructors() { Map<String, String> props = new HashMap<>(); // test key[value].deserializer.inner.class takes precedence over serializer.inner.class WindowedDeserializer<StringSerializer> windowedDeserializer = new WindowedDeserializer<>(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1"); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); props.put("key.deserializer.inner.class", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("deserializer.inner.class", "org.apache.kafka.common.serialization.StringDeserializer"); windowedDeserializer.configure(props, true); Deserializer<?> inner = windowedDeserializer.innerDeserializer(); assertNotNull("Inner deserializer should be not null", inner); assertTrue("Inner deserializer type should be StringDeserializer", inner instanceof StringDeserializer); // test deserializer.inner.class props.put("deserializer.inner.class", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.remove("key.deserializer.inner.class"); props.remove("value.deserializer.inner.class"); WindowedDeserializer<?> windowedDeserializer1 = new WindowedDeserializer<>(); windowedDeserializer1.configure(props, false); Deserializer<?> inner1 = windowedDeserializer1.innerDeserializer(); assertNotNull("Inner deserializer should be not null", inner1); assertTrue("Inner deserializer type should be ByteArrayDeserializer", inner1 instanceof ByteArrayDeserializer); }
private Properties setProduceConsumeProperties(final String clientId) { Properties props = new Properties(); props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); // the socket buffer needs to be large, especially when running in AWS with // high latency. if running locally the default is fine. props.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // the socket buffer needs to be large, especially when running in AWS with // high latency. if running locally the default is fine. props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS); return props; }
@Test public void testListOffsetsSendsIsolationLevel() { for (final IsolationLevel isolationLevel : IsolationLevel.values()) { Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, isolationLevel); subscriptions.assignFromUser(singleton(tp1)); subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST); client.prepareResponse(new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { ListOffsetRequest request = (ListOffsetRequest) body; return request.isolationLevel() == isolationLevel; } }, listOffsetResponse(Errors.NONE, 1L, 5L)); fetcher.updateFetchPositions(singleton(tp1)); assertFalse(subscriptions.isOffsetResetNeeded(tp1)); assertTrue(subscriptions.isFetchable(tp1)); assertEquals(5, subscriptions.position(tp1).longValue()); } }
@Test public void testConstructorClose() throws Exception { Properties props = new Properties(); props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testConstructorClose"); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar.local:9999"); props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); final int oldInitCount = MockMetricsReporter.INIT_COUNT.get(); final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get(); try { new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer()); } catch (KafkaException e) { assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get()); assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get()); assertEquals("Failed to construct kafka consumer", e.getMessage()); return; } Assert.fail("should have caught an exception and returned"); }
private Fetcher<byte[], byte[]> createFetcher(int maxPollRecords, SubscriptionState subscriptions, Metrics metrics) { return new Fetcher<>(consumerClient, minBytes, maxWaitMs, fetchSize, maxPollRecords, true, // check crc new ByteArrayDeserializer(), new ByteArrayDeserializer(), metadata, subscriptions, metrics, "consumer" + groupId, time, retryBackoffMs); }
@Test public void testConstructorClose() throws Exception { Properties props = new Properties(); props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testConstructorClose"); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar:9999"); props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); final int oldInitCount = MockMetricsReporter.INIT_COUNT.get(); final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get(); try { KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>( props, new ByteArrayDeserializer(), new ByteArrayDeserializer()); } catch (KafkaException e) { assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get()); assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get()); assertEquals("Failed to construct kafka consumer", e.getMessage()); return; } Assert.fail("should have caught an exception and returned"); }
@Test public void testZeroLengthValue() throws Exception { Properties producerPropertyOverrides = new Properties(); producerPropertyOverrides.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); try (LiKafkaProducer producer = createProducer(producerPropertyOverrides)) { producer.send(new ProducerRecord<>("testZeroLengthValue", "key", new byte[0])).get(); } Properties consumerProps = new Properties(); consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); try (LiKafkaConsumer consumer = createConsumer(consumerProps)) { consumer.subscribe(Collections.singleton("testZeroLengthValue")); long startMs = System.currentTimeMillis(); ConsumerRecords records = ConsumerRecords.empty(); while (records.isEmpty() && System.currentTimeMillis() < startMs + 30000) { records = consumer.poll(100); } assertEquals(1, records.count()); ConsumerRecord record = (ConsumerRecord) records.iterator().next(); assertEquals("key", record.key()); assertEquals(((byte[]) record.value()).length, 0); } }
private ConsumerFactory<?, ?> createConsumerFactory(String group) { if (defaultConsumerFactory != null) { return defaultConsumerFactory; } Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); if (!ObjectUtils.isEmpty(binderConfigurationProperties.getConsumerConfiguration())) { props.putAll(binderConfigurationProperties.getConsumerConfiguration()); } if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) { props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.binderConfigurationProperties.getKafkaConnectionString()); } props.put("group.id", group); return new DefaultKafkaConsumerFactory<>(props); }
private ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup, ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, anonymous ? "latest" : "earliest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); if (!ObjectUtils.isEmpty(configurationProperties.getConsumerConfiguration())) { props.putAll(configurationProperties.getConsumerConfiguration()); } if (ObjectUtils.isEmpty(props.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))) { props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString()); } if (!ObjectUtils.isEmpty(consumerProperties.getExtension().getConfiguration())) { props.putAll(consumerProperties.getExtension().getConfiguration()); } if (!ObjectUtils.isEmpty(consumerProperties.getExtension().getStartOffset())) { props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerProperties.getExtension().getStartOffset().name()); } return new DefaultKafkaConsumerFactory<>(props); }
@Bean KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder) { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); if (!ObjectUtils.isEmpty(configurationProperties.getConsumerConfiguration())) { props.putAll(configurationProperties.getConsumerConfiguration()); } if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) { props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString()); } ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props); KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, consumerFactory); indicator.setTimeout(this.configurationProperties.getHealthTimeout()); return indicator; }
@Test @SuppressWarnings("unchecked") public void testConsumerDefaultDeserializer() throws Throwable { Binding<?> binding = null; try { KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties(); String testTopicName = "existing" + System.currentTimeMillis(); invokeCreateTopic(testTopicName, 5, 1); configurationProperties.setAutoCreateTopics(false); Binder binder = getBinder(configurationProperties); ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties(); DirectChannel input = createBindableChannel("input", createConsumerBindingProperties(consumerProperties)); binding = binder.bindConsumer(testTopicName, "test", input, consumerProperties); DirectFieldAccessor consumerAccessor = new DirectFieldAccessor(getKafkaConsumer(binding)); assertTrue(consumerAccessor.getPropertyValue("keyDeserializer") instanceof ByteArrayDeserializer); assertTrue(consumerAccessor.getPropertyValue("valueDeserializer") instanceof ByteArrayDeserializer); } finally { if (binding != null) { binding.unbind(); } } }
@Override public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) { KafkaIO.Read<byte[], byte[]> kafkaRead = null; if (topics != null) { kafkaRead = KafkaIO.<byte[], byte[]>read() .withBootstrapServers(bootstrapServers) .withTopics(topics) .updateConsumerProperties(configUpdates) .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()) .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()); } else if (topicPartitions != null) { kafkaRead = KafkaIO.<byte[], byte[]>read() .withBootstrapServers(bootstrapServers) .withTopicPartitions(topicPartitions) .updateConsumerProperties(configUpdates) .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()) .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()); } else { throw new IllegalArgumentException("One of topics and topicPartitions must be configurated."); } return PBegin.in(pipeline).apply("read", kafkaRead.withoutMetadata()) .apply("in_format", getPTransformForInput()); }
@Test public void testSourceWithExplicitPartitionsDisplayData() { KafkaIO.Read<byte[], Long> read = KafkaIO.<byte[], Long>read() .withBootstrapServers("myServer1:9092,myServer2:9092") .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5), new TopicPartition("test", 6))) .withConsumerFactoryFn(new ConsumerFactoryFn( Lists.newArrayList("test"), 10, 10, OffsetResetStrategy.EARLIEST)) // 10 partitions .withKeyDeserializer(ByteArrayDeserializer.class) .withValueDeserializer(LongDeserializer.class); DisplayData displayData = DisplayData.from(read); assertThat(displayData, hasDisplayItem("topicPartitions", "test-5,test-6")); assertThat(displayData, hasDisplayItem("enable.auto.commit", false)); assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092")); assertThat(displayData, hasDisplayItem("auto.offset.reset", "latest")); assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288)); }
public Map<String, Object> getKafkaParams(String group) { Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, group); kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); kafkaParams.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, kafkaMsgMaxBytes); kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, kafkaMsgMaxBytes); kafkaParams.put("fetch.message.max.bytes", kafkaMsgMaxBytes); kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); kafkaParams.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, kafkaMsgMaxBytes); kafkaParams.put(ConsumerConfig.SEND_BUFFER_CONFIG, kafkaMsgMaxBytes); kafkaParams.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaFetchTimeoutMs); kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); return kafkaParams; }
/** * Makes sure that the ByteArrayDeserializer is registered in the Kafka properties. * * @param props The Kafka properties to register the serializer in. */ private static void setDeserializer(Properties props) { final String deSerName = ByteArrayDeserializer.class.getCanonicalName(); Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); if (keyDeSer != null && !keyDeSer.equals(deSerName)) { LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); } if (valDeSer != null && !valDeSer.equals(deSerName)) { LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); } props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName); }
private ConsumerDriver createConsumerDriver(CountDownLatch latch, String topic, int messageCount) { Map<String, Object> configs = new ConsumerConfigsBuilder() .groupId("bar") .bootstrapServers("127.0.0.1:" + broker.getPort()) .enableAutoCommit(true) .autoCommitIntervalMs(1000) .sessionTimeoutMs(30000) .keyDeserializer(ByteArrayDeserializer.class) .valueDeserializer(ByteArrayDeserializer.class) .autoOffsetReset(ConsumerConfigsBuilder.OffsetReset.earliest) .build(); ConsumerDefinition consumerDefinition = new ConsumerDefinition(); consumerDefinition.setConfig(configs); consumerDefinition.setTopic(topic); consumerDefinition.setMessagesToReceive(messageCount); return new ConsumerDriver(consumerDefinition, latch); }
@NotNull protected SdcKafkaConsumer createKafkaConsumer(int port, String topic, Source.Context sourceContext) throws StageException { Map<String, Object> props = new HashMap<>(); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest"); props.put("session.timeout.ms", "30000"); props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor"); props.put(KafkaConstants.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(KafkaConstants.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); SdcKafkaConsumer sdcKafkaConsumer = createSdcKafkaConsumer( "localhost:" + port, topic, 1000, sourceContext, props, CONSUMER_GROUP_NAME ); sdcKafkaConsumer.validate(new ArrayList<Stage.ConfigIssue>(), sourceContext); sdcKafkaConsumer.init(); return sdcKafkaConsumer; }
public static Map<String, String> createConnMaps(KafkaDatastoreProperties datastore, boolean isBeam) { Map<String, String> props = new HashMap<>(); if (datastore != null) { props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, datastore.brokers.getValue()); if (!isBeam) { props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); } if (datastore.ssl.useSsl.getValue()) { props.put("security.protocol", "SSL"); props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, datastore.ssl.trustStoreType.getValue().toString()); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, datastore.ssl.trustStorePath.getValue()); props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, datastore.ssl.trustStorePassword.getValue()); if (datastore.ssl.needClientAuth.getValue()) { props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, datastore.ssl.keyStoreType.getValue().toString()); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, datastore.ssl.keyStorePath.getValue()); props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, datastore.ssl.keyStorePassword.getValue()); } } } return props; }
private Properties createConsumerConfig(final String brokers, final String groupId, final Properties properties) { final Properties consumerConfig = (Properties) properties.clone(); if (consumerConfig.getProperty("value.deserializer") != null || consumerConfig.getProperty("key.deserializer") != null) { logger.warn("serializer cannot be provided as consumer properties. " + "Overriding manually to be the correct serialization type"); } consumerConfig.put("key.deserializer", StringDeserializer.class.getName()); consumerConfig.put("value.deserializer", ByteArrayDeserializer.class.getName()); if (consumerConfig.getProperty("enable.auto.commit") != null) { logger.warn("enable.auto.commit cannot be provided as consumer properties. " + "Please use offset.commit.mode to control offset management mode. see com.outbrain.aletheia.datum.consumption.OffsetCommitMode for supported modes."); } consumerConfig.put("enable.auto.commit", "false"); consumerConfig.put("bootstrap.servers", brokers); consumerConfig.put("group.id", groupId); logger.warn("Using consumer config: {}", consumerConfig); return consumerConfig; }
protected KafkaConsumer<byte[], byte[]> createConsumers(Map<String, ?> config) { Properties consumerProps = new Properties(); consumerProps.putAll(config); long randomToken = RANDOM.nextLong(); consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, (String) config.get(KafkaCruiseControlConfig.BOOTSTRAP_SERVERS_CONFIG)); consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "LiKafkaCruiseControlSampleStore" + randomToken); consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_CLIENT_ID + randomToken); consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); consumerProps.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.toString(Integer.MAX_VALUE)); consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); return new KafkaConsumer<>(consumerProps); }
static KafkaConsumer<byte[], byte[]> getConsumer(String groupName, String brokerList) { Properties prop = new Properties(); prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); prop.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupName); prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName()); prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName()); return new KafkaConsumer<>(prop); }
/** * Validates that we have some sane default settings. */ @Test public void testForSaneDefaultKafkaConsumerSettings() { final List<String> brokerHosts = Lists.newArrayList( "broker1:9092", "broker2:9093" ); final String consumerId = "myConsumerId"; final String topic = "myTopic"; // Create config final KafkaConsumerConfig config = new KafkaConsumerConfig(brokerHosts, consumerId, topic); // now do validation on constructor arguments assertEquals("Topic set", topic, config.getTopic()); assertEquals("ConsumerId set", consumerId, config.getConsumerId()); assertEquals("BrokerHosts set", "broker1:9092,broker2:9093", config.getKafkaConsumerProperty("bootstrap.servers")); // Check defaults are sane. assertEquals("group.id set", consumerId, config.getKafkaConsumerProperty("group.id")); assertEquals("auto.offset.reset set to none", "none", config.getKafkaConsumerProperty("auto.offset.reset")); assertEquals( "Key Deserializer set to bytes deserializer", ByteArrayDeserializer.class.getName(), config.getKafkaConsumerProperty("key.deserializer")); assertEquals( "Value Deserializer set to bytes deserializer", ByteArrayDeserializer.class.getName(), config.getKafkaConsumerProperty("value.deserializer") ); }
@Override public void configure(final WorkerConfig config) { this.topic = config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG); if (topic.equals("")) throw new ConfigException("Must specify topic for connector status."); Map<String, Object> producerProps = new HashMap<>(); producerProps.putAll(config.originals()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class Map<String, Object> consumerProps = new HashMap<>(); consumerProps.putAll(config.originals()); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Map<String, Object> adminProps = new HashMap<>(config.originals()); NewTopic topicDescription = TopicAdmin.defineTopic(topic). compacted(). partitions(config.getInt(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG)). replicationFactor(config.getShort(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG)). build(); Callback<ConsumerRecord<String, byte[]>> readCallback = new Callback<ConsumerRecord<String, byte[]>>() { @Override public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) { read(record); } }; this.kafkaLog = createKafkaBasedLog(topic, producerProps, consumerProps, readCallback, topicDescription, adminProps); }
@Override public void configure(final WorkerConfig config) { String topic = config.getString(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG); if (topic.equals("")) throw new ConfigException("Offset storage topic must be specified"); data = new HashMap<>(); Map<String, Object> producerProps = new HashMap<>(); producerProps.putAll(config.originals()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); Map<String, Object> consumerProps = new HashMap<>(); consumerProps.putAll(config.originals()); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Map<String, Object> adminProps = new HashMap<>(config.originals()); NewTopic topicDescription = TopicAdmin.defineTopic(topic). compacted(). partitions(config.getInt(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG)). replicationFactor(config.getShort(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG)). build(); offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminProps); }
public static void verify(final String kafka) { ensureStreamsApplicationDown(kafka); final Map<TopicPartition, Long> committedOffsets = getCommittedOffsets(kafka); final Properties props = new Properties(); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) { final List<TopicPartition> partitions = getAllPartitions(consumer, "data", "echo", "min", "sum"); consumer.assign(partitions); consumer.seekToBeginning(partitions); final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition = getOutputRecords(consumer, committedOffsets); truncate("data", recordPerTopicPerPartition, committedOffsets); verifyMin(recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("min")); verifySum(recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("sum")); verifyAllTransactionFinished(consumer, kafka); // do not modify: required test output System.out.println("ALL-RECORDS-DELIVERED"); } catch (final Exception e) { e.printStackTrace(System.err); System.out.println("FAILED"); } }
private static Map<TopicPartition, Long> getCommittedOffsets(final String kafka) { final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(ConsumerConfig.GROUP_ID_CONFIG, EosTestClient.APP_ID); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "OffsetsClient"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); final Map<TopicPartition, Long> committedOffsets = new HashMap<>(); try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) { final Set<String> topics = new HashSet<>(); topics.add("data"); consumer.subscribe(topics); consumer.poll(0); final Set<TopicPartition> partitions = new HashSet<>(); for (final String topic : topics) { for (final PartitionInfo partition : consumer.partitionsFor(topic)) { partitions.add(new TopicPartition(partition.topic(), partition.partition())); } } for (final TopicPartition tp : partitions) { final long offset = consumer.position(tp); committedOffsets.put(tp, offset); } } return committedOffsets; }
@Test public void testFetchedRecordsRaisesOnSerializationErrors() { // raise an exception from somewhere in the middle of the fetch response // so that we can verify that our position does not advance after raising ByteArrayDeserializer deserializer = new ByteArrayDeserializer() { int i = 0; @Override public byte[] deserialize(String topic, byte[] data) { if (i++ % 2 == 1) { // Should be blocked on the value deserialization of the first record. assertEquals(new String(data, StandardCharsets.UTF_8), "value-1"); throw new SerializationException(); } return data; } }; Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), deserializer, deserializer); subscriptions.assignFromUser(singleton(tp1)); subscriptions.seek(tp1, 1); client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(tp1, this.records, Errors.NONE, 100L, 0)); assertEquals(1, fetcher.sendFetches()); consumerClient.poll(0); // The fetcher should block on Deserialization error for (int i = 0; i < 2; i++) { try { fetcher.fetchedRecords(); fail("fetchedRecords should have raised"); } catch (SerializationException e) { // the position should not advance since no data has been returned assertEquals(1, subscriptions.position(tp1).longValue()); } } }
@Test public void testReadCommittedLagMetric() { Metrics metrics = new Metrics(); fetcher = createFetcher(subscriptions, metrics, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); subscriptions.assignFromUser(singleton(tp1)); subscriptions.seek(tp1, 0); MetricName maxLagMetric = metrics.metricInstance(metricsRegistry.recordsLagMax); MetricName partitionLagMetric = metrics.metricName(tp1 + ".records-lag", metricGroup); Map<MetricName, KafkaMetric> allMetrics = metrics.metrics(); KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric); // recordsFetchLagMax should be initialized to negative infinity assertEquals(Double.NEGATIVE_INFINITY, recordsFetchLagMax.value(), EPSILON); // recordsFetchLagMax should be lso - fetchOffset after receiving an empty FetchResponse fetchRecords(tp1, MemoryRecords.EMPTY, Errors.NONE, 100L, 50L, 0); assertEquals(50, recordsFetchLagMax.value(), EPSILON); KafkaMetric partitionLag = allMetrics.get(partitionLagMetric); assertEquals(50, partitionLag.value(), EPSILON); // recordsFetchLagMax should be lso - offset of the last message after receiving a non-empty FetchResponse MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0L); for (int v = 0; v < 3; v++) builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes()); fetchRecords(tp1, builder.build(), Errors.NONE, 200L, 150L, 0); assertEquals(147, recordsFetchLagMax.value(), EPSILON); assertEquals(147, partitionLag.value(), EPSILON); // verify de-registration of partition lag subscriptions.unsubscribe(); assertFalse(allMetrics.containsKey(partitionLagMetric)); }
@Test public void testSkippingAbortedTransactions() { Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); ByteBuffer buffer = ByteBuffer.allocate(1024); int currentOffset = 0; currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset, new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes())); abortTransaction(buffer, 1L, currentOffset); buffer.flip(); List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>(); abortedTransactions.add(new FetchResponse.AbortedTransaction(1, 0)); MemoryRecords records = MemoryRecords.readableRecords(buffer); subscriptions.assignFromUser(singleton(tp1)); subscriptions.seek(tp1, 0); // normal fetch assertEquals(1, fetcher.sendFetches()); assertFalse(fetcher.hasCompletedFetches()); client.prepareResponse(fetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords(); assertFalse(fetchedRecords.containsKey(tp1)); }
@Test public void testReturnCommittedTransactions() { Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); ByteBuffer buffer = ByteBuffer.allocate(1024); int currentOffset = 0; currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset, new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes())); currentOffset += commitTransaction(buffer, 1L, currentOffset); buffer.flip(); List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>(); MemoryRecords records = MemoryRecords.readableRecords(buffer); subscriptions.assignFromUser(singleton(tp1)); subscriptions.seek(tp1, 0); // normal fetch assertEquals(1, fetcher.sendFetches()); assertFalse(fetcher.hasCompletedFetches()); client.prepareResponse(new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { FetchRequest request = (FetchRequest) body; assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel()); return true; } }, fetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords(); assertTrue(fetchedRecords.containsKey(tp1)); assertEquals(fetchedRecords.get(tp1).size(), 2); }
@Test public void testReturnAbortedTransactionsinUncommittedMode() { Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED); ByteBuffer buffer = ByteBuffer.allocate(1024); int currentOffset = 0; currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset, new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes())); abortTransaction(buffer, 1L, currentOffset); buffer.flip(); List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>(); abortedTransactions.add(new FetchResponse.AbortedTransaction(1, 0)); MemoryRecords records = MemoryRecords.readableRecords(buffer); subscriptions.assignFromUser(singleton(tp1)); subscriptions.seek(tp1, 0); // normal fetch assertEquals(1, fetcher.sendFetches()); assertFalse(fetcher.hasCompletedFetches()); client.prepareResponse(fetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords(); assertTrue(fetchedRecords.containsKey(tp1)); }
@Test public void testConsumerPositionUpdatedWhenSkippingAbortedTransactions() { Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); ByteBuffer buffer = ByteBuffer.allocate(1024); long currentOffset = 0; currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset, new SimpleRecord(time.milliseconds(), "abort1-1".getBytes(), "value".getBytes()), new SimpleRecord(time.milliseconds(), "abort1-2".getBytes(), "value".getBytes())); currentOffset += abortTransaction(buffer, 1L, currentOffset); buffer.flip(); List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>(); abortedTransactions.add(new FetchResponse.AbortedTransaction(1, 0)); MemoryRecords records = MemoryRecords.readableRecords(buffer); subscriptions.assignFromUser(singleton(tp1)); subscriptions.seek(tp1, 0); // normal fetch assertEquals(1, fetcher.sendFetches()); assertFalse(fetcher.hasCompletedFetches()); client.prepareResponse(fetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords(); // Ensure that we don't return any of the aborted records, but yet advance the consumer position. assertFalse(fetchedRecords.containsKey(tp1)); assertEquals(currentOffset, (long) subscriptions.position(tp1)); }