public ConsumerLoop(int id, String groupId, List<String> topics, DBClient dbClient) { this.id = id; this.topics = topics; Properties props = new Properties(); String kafkaHostName = System.getenv("KAFKA_HOST_NAME"); log.info("Kafka host: " + kafkaHostName); props.put("bootstrap.servers", kafkaHostName + ":9092"); props.put("group.id", groupId); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<>(props); this.dbClient = dbClient; }
public boolean verifyTopicsExist(String kafkaBrokers, Set<String> requiredTopics, boolean checkPartitionCounts) { Properties props = new Properties(); props.put("bootstrap.servers", kafkaBrokers); props.put("group.id", UUID.randomUUID().toString()); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer consumer = new KafkaConsumer(props); try { @SuppressWarnings("unchecked") Map<String, List<PartitionInfo>> topics = consumer.listTopics(); Set<Integer> partitionCount = new HashSet<>(); for (String requiredTopic : requiredTopics) { List<PartitionInfo> partitions = topics.get(requiredTopic); if (partitions == null) { logger.info("Required kafka topic {} not present", requiredTopic); return false; } partitionCount.add(partitions.size()); } if (checkPartitionCounts && partitionCount.size() > 1) { logger.warn("Partition count mismatch in topics {}", Arrays.toString(requiredTopics.toArray())); return false; } return true; } finally { consumer.close(); } }
Consumer(Topic topic, String consumerGroupId, Properties props, PartitionProcessorFactory processorFactory) { this.topic = topic; this.consumerGroupId = consumerGroupId; // Mandatory settings, not changeable props.put("group.id", consumerGroupId); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); kafka = new KafkaConsumer<>(props); partitions = new AssignedPartitions(processorFactory); long now = System.currentTimeMillis(); // start it consumerLoopExecutor.execute(new ConsumerLoop()); }
/** * reads the config info from yml file * * @return */ @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put("bootstrap.servers", env.getProperty("kafka.broker")); propsMap.put("enable.auto.commit", env.getProperty("enable.auto.commit")); propsMap.put("auto.commit.interval.ms", env.getProperty("auto.commit.interval.ms")); propsMap.put("key.deserializer", StringDeserializer.class); propsMap.put("value.deserializer", JsonDeserializer.class); propsMap.put("group.id", env.getProperty("group.id")); propsMap.put("auto.offset.reset", env.getProperty("kafka.auto.offset.reset")); return propsMap; }
/** * Creates default message formats. */ private void createDefaultMessageFormats() { final Map<String, String> defaultFormats = new HashMap<>(); defaultFormats.put("Short", ShortDeserializer.class.getName()); defaultFormats.put("ByteArray", ByteArrayDeserializer.class.getName()); defaultFormats.put("Bytes", BytesDeserializer.class.getName()); defaultFormats.put("Double", DoubleDeserializer.class.getName()); defaultFormats.put("Float", FloatDeserializer.class.getName()); defaultFormats.put("Integer", IntegerDeserializer.class.getName()); defaultFormats.put("Long", LongDeserializer.class.getName()); defaultFormats.put("String", StringDeserializer.class.getName()); // Create if needed. for (final Map.Entry<String, String> entry : defaultFormats.entrySet()) { MessageFormat messageFormat = messageFormatRepository.findByName(entry.getKey()); if (messageFormat == null) { messageFormat = new MessageFormat(); } messageFormat.setName(entry.getKey()); messageFormat.setClasspath(entry.getValue()); messageFormat.setJar("n/a"); messageFormat.setDefaultFormat(true); messageFormatRepository.save(messageFormat); } }
private KafkaConsumer<String, Serializable> getConsumer(String groupId) { KafkaConsumer<String, Serializable> kafkaConsumer = null; if ((kafkaConsumer = kafkaConsumers.get(groupId)) != null) return kafkaConsumer; Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); kafkaConsumer = new KafkaConsumer<String, Serializable>(properties); kafkaConsumers.put(groupId, kafkaConsumer); return kafkaConsumer; }
public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> properties = new HashMap<String, Object>(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.GROUP_ID_CONFIG, group); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); return new DefaultKafkaConsumerFactory<String, String>(properties); }
private static void consumeRecords(String bootstrapServers) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro-consumer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Consumer<String, byte[]> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(TOPIC)); ConsumerRecords<String, byte[]> records = consumer.poll(10000); for (ConsumerRecord<String, byte[]> record : records) out.printf( "key = %s value = %s%n", record.key(), UserAvroSerdes.deserialize(record.value()).getName().toString()); consumer.close(); }
private static void consumeRecords(String bootstrapServers) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "string-consumer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); Consumer<Integer, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(TOPIC)); ConsumerRecords<Integer, String> records = consumer.poll(10000); for (ConsumerRecord<Integer, String> record : records) out.printf( "key = %s value = %s%n", record.key(), record.value()); consumer.close(); }
private static void consumeRecords(String bootstrapServers) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "metadata-consumer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); Consumer<Integer, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(TOPIC)); ConsumerRecords<Integer, String> records = consumer.poll(10000); for (ConsumerRecord<Integer, String> record : records) { System.out.printf("key = %s value = %s\t", record.key(), record.value()); System.out.printf("ProducerRecord: topic=>%s partition=>%s offset=>%s timestamp=>%s checksum=>%s", record.topic(), record.partition(), record.offset(), FORMATTER.format(Instant.ofEpochMilli(record.timestamp())), record.checksum()); System.out.println(); } consumer.close(); }
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) { Map<String, Object> producerProps = new HashMap<>(); producerProps.putAll(config.originals()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); Map<String, Object> consumerProps = new HashMap<>(); consumerProps.putAll(config.originals()); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Map<String, Object> adminProps = new HashMap<>(config.originals()); NewTopic topicDescription = TopicAdmin.defineTopic(topic). compacted(). partitions(1). replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)). build(); return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps); }
private void commitInvalidOffsets() { final KafkaConsumer consumer = new KafkaConsumer(TestUtils.consumerConfig( CLUSTER.bootstrapServers(), streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), StringDeserializer.class, StringDeserializer.class)); final Map<TopicPartition, OffsetAndMetadata> invalidOffsets = new HashMap<>(); invalidOffsets.put(new TopicPartition(TOPIC_1_2, 0), new OffsetAndMetadata(5, null)); invalidOffsets.put(new TopicPartition(TOPIC_2_2, 0), new OffsetAndMetadata(5, null)); invalidOffsets.put(new TopicPartition(TOPIC_A_2, 0), new OffsetAndMetadata(5, null)); invalidOffsets.put(new TopicPartition(TOPIC_C_2, 0), new OffsetAndMetadata(5, null)); invalidOffsets.put(new TopicPartition(TOPIC_Y_2, 0), new OffsetAndMetadata(5, null)); invalidOffsets.put(new TopicPartition(TOPIC_Z_2, 0), new OffsetAndMetadata(5, null)); consumer.commitSync(invalidOffsets); consumer.close(); }
@BeforeClass public static void setupConfigsAndUtils() throws Exception { PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all"); PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0); PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-result-consumer"); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); }
@Test public void testWindowedDeserializerNoArgConstructors() { Map<String, String> props = new HashMap<>(); // test key[value].deserializer.inner.class takes precedence over serializer.inner.class WindowedDeserializer<StringSerializer> windowedDeserializer = new WindowedDeserializer<>(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1"); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); props.put("key.deserializer.inner.class", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("deserializer.inner.class", "org.apache.kafka.common.serialization.StringDeserializer"); windowedDeserializer.configure(props, true); Deserializer<?> inner = windowedDeserializer.innerDeserializer(); assertNotNull("Inner deserializer should be not null", inner); assertTrue("Inner deserializer type should be StringDeserializer", inner instanceof StringDeserializer); // test deserializer.inner.class props.put("deserializer.inner.class", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.remove("key.deserializer.inner.class"); props.remove("value.deserializer.inner.class"); WindowedDeserializer<?> windowedDeserializer1 = new WindowedDeserializer<>(); windowedDeserializer1.configure(props, false); Deserializer<?> inner1 = windowedDeserializer1.innerDeserializer(); assertNotNull("Inner deserializer should be not null", inner1); assertTrue("Inner deserializer type should be ByteArrayDeserializer", inner1 instanceof ByteArrayDeserializer); }
private static void loopUntilRecordReceived(final String kafka, final boolean eosEnabled) { final Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "broker-compatibility-consumer"); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); if (eosEnabled) { consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)); } final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties); consumer.subscribe(Collections.singletonList(SINK_TOPIC)); while (true) { final ConsumerRecords<String, String> records = consumer.poll(100); for (final ConsumerRecord<String, String> record : records) { if (record.key().equals("key") && record.value().equals("value")) { consumer.close(); return; } } } }
@Test public void testInterceptorConstructorClose() throws Exception { try { Properties props = new Properties(); // test with client ID assigned by KafkaConsumer props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>( props, new StringDeserializer(), new StringDeserializer()); assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get()); assertEquals(0, MockConsumerInterceptor.CLOSE_COUNT.get()); consumer.close(); assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get()); assertEquals(1, MockConsumerInterceptor.CLOSE_COUNT.get()); // Cluster metadata will only be updated on calling poll. Assert.assertNull(MockConsumerInterceptor.CLUSTER_META.get()); } finally { // cleanup since we are using mutable static variables in MockConsumerInterceptor MockConsumerInterceptor.resetCounters(); } }
@Test public void testSinkWithString() throws InterruptedException { KafkaUsage usage = new KafkaUsage(); String topic = UUID.randomUUID().toString(); CountDownLatch latch = new CountDownLatch(1); List<String> values = new ArrayList<>(); usage.consumeStrings(topic, 10, 10, TimeUnit.SECONDS, latch::countDown, (k, v) -> values.contains(v)); KafkaSink<String> sink = new KafkaSink<>(vertx, getKafkaConfig() .put("topic", topic) .put("value.serializer", StringSerializer.class.getName()) .put("value.deserializer", StringDeserializer.class.getName()) ); Stream<String> stream = new Random().longs(10).mapToObj(Long::toString); Source.fromPayloads(stream) .onPayload(values::add) .to(sink); assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue(); }
@Provides JavaInputDStream<ConsumerRecord<String, RawRating>> providesKafkaInputStream(JavaStreamingContext streamingContext) { Map<String, Object> kafkaParams = new HashedMap(); kafkaParams.put("bootstrap.servers", "localhost:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", JsonDeserializer.class); kafkaParams.put("serializedClass", RawRating.class); kafkaParams.put("group.id", "rating_stream"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); Collection<String> topics = Arrays.asList("topicA", "topicB"); return KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, RawRating>Subscribe(topics, kafkaParams) ); }
@Test public void testAutoCommit() throws Exception { LOG.info("Start testAutoCommit"); ContainerProperties containerProps = new ContainerProperties("topic3", "topic4"); final CountDownLatch latch = new CountDownLatch(4); containerProps.setMessageListener((MessageListener<Integer, String>) message -> { LOG.info("received: " + message); latch.countDown(); }); KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps, IntegerDeserializer.class, StringDeserializer.class); container.setBeanName("testAutoCommit"); container.start(); Thread.sleep(5000); // wait a bit for the container to start KafkaTemplate<Integer, String> template = createTemplate(IntegerSerializer.class, StringSerializer.class); template.setDefaultTopic("topic3"); template.sendDefault(0, "foo"); template.sendDefault(2, "bar"); template.sendDefault(0, "baz"); template.sendDefault(2, "qux"); template.flush(); assertTrue(latch.await(60, TimeUnit.SECONDS)); container.stop(); LOG.info("Stop testAutoCommit"); }
public static BrokerCompatibilityCheck create(final Map<String, Object> streamsConfig, final KafkaTopicClient topicClient) { Set<String> topicNames = topicClient.listTopicNames(); // the offsetsForTime call needs a partition that exists else it can block forever if (topicNames.isEmpty()) { topicClient.createTopic(KSQL_COMPATIBILITY_CHECK, 1, (short)1); topicNames = Utils.mkSet(KSQL_COMPATIBILITY_CHECK); } final Map<String, Object> consumerConfigs = new StreamsConfig(streamsConfig) .getConsumerConfigs(KSQL_COMPATIBILITY_CHECK, "ksql_server"); // remove this otherwise it will try and instantiate the StreamsPartitionAssignor consumerConfigs.remove("partition.assignment.strategy"); final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfigs, new StringDeserializer(), new StringDeserializer()); return new BrokerCompatibilityCheck(consumer, new TopicPartition(topicNames.iterator().next(), 0)); }
@Test public void testConsume(TestContext ctx) throws Exception { final String topicName = "testConsume"; String consumerId = topicName; Async batch = ctx.async(); AtomicInteger index = new AtomicInteger(); int numMessages = 1000; kafkaCluster.useTo().produceStrings(numMessages, batch::complete, () -> new ProducerRecord<>(topicName, 0, "key-" + index.get(), "value-" + index.getAndIncrement())); batch.awaitSuccess(20000); Properties config = kafkaCluster.useTo().getConsumerProperties(consumerId, consumerId, OffsetResetStrategy.EARLIEST); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumer = createConsumer(vertx, config); Async done = ctx.async(); AtomicInteger count = new AtomicInteger(numMessages); consumer.exceptionHandler(ctx::fail); consumer.handler(rec -> { if (count.decrementAndGet() == 0) { done.complete(); } }); consumer.subscribe(Collections.singleton(topicName)); }
@Test public void testPartitionsFor(TestContext ctx) throws Exception { String topicName = "testPartitionsFor"; String consumerId = topicName; kafkaCluster.createTopic(topicName, 2, 1); Properties config = kafkaCluster.useTo().getConsumerProperties(consumerId, consumerId, OffsetResetStrategy.EARLIEST); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); Context context = vertx.getOrCreateContext(); consumer = createConsumer(context, config); Async done = ctx.async(); consumer.partitionsFor(topicName, ar -> { if (ar.succeeded()) { List<PartitionInfo> partitionInfo = ar.result(); ctx.assertEquals(2, partitionInfo.size()); } else { ctx.fail(); } done.complete(); }); }
@Test public void testBatchHandler(TestContext ctx) throws Exception { String topicName = "testBatchHandler"; String consumerId = topicName; Async batch1 = ctx.async(); AtomicInteger index = new AtomicInteger(); int numMessages = 500; kafkaCluster.useTo().produceStrings(numMessages, batch1::complete, () -> new ProducerRecord<>(topicName, 0, "key-" + index.get(), "value-" + index.getAndIncrement())); batch1.awaitSuccess(10000); Properties config = kafkaCluster.useTo().getConsumerProperties(consumerId, consumerId, OffsetResetStrategy.EARLIEST); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); Context context = vertx.getOrCreateContext(); consumer = createConsumer(context, config); Async batchHandler = ctx.async(); consumer.batchHandler(records -> { ctx.assertEquals(numMessages, records.count()); batchHandler.complete(); }); consumer.exceptionHandler(ctx::fail); consumer.handler(rec -> {}); consumer.subscribe(Collections.singleton(topicName)); }
@Test public void testPollTimeout(TestContext ctx) throws Exception { Async async = ctx.async(); String topicName = "testPollTimeout"; Properties config = kafkaCluster.useTo().getConsumerProperties(topicName, topicName, OffsetResetStrategy.EARLIEST); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); io.vertx.kafka.client.common.TopicPartition topicPartition = new io.vertx.kafka.client.common.TopicPartition(topicName, 0); KafkaConsumer<Object, Object> consumerWithCustomTimeout = KafkaConsumer.create(vertx, config); int pollingTimeout = 1500; // Set the polling timeout to 1500 ms (default is 1000) consumerWithCustomTimeout.pollTimeout(pollingTimeout); // Subscribe to the empty topic (we want the poll() call to timeout!) consumerWithCustomTimeout.subscribe(topicName, subscribeRes -> { consumerWithCustomTimeout.handler(rec -> {}); // Consumer will now immediately poll once long beforeSeek = System.currentTimeMillis(); consumerWithCustomTimeout.seekToBeginning(topicPartition, seekRes -> { long durationWShortTimeout = System.currentTimeMillis() - beforeSeek; ctx.assertTrue(durationWShortTimeout >= pollingTimeout, "Operation must take at least as long as the polling timeout"); consumerWithCustomTimeout.close(); async.countDown(); }); }); }
@Test // Regression test for ISS-73: undeployment of a verticle with unassigned consumer fails public void testUndeployUnassignedConsumer(TestContext ctx) throws Exception { Properties config = kafkaCluster.useTo().getConsumerProperties("testUndeployUnassignedConsumer_consumer", "testUndeployUnassignedConsumer_consumer", OffsetResetStrategy.EARLIEST); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); Async async = ctx.async(1); vertx.deployVerticle(new AbstractVerticle() { @Override public void start() throws Exception { KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config); vertx.setTimer(20, record -> { // Very rarely, this throws a AlreadyUndedeployed error vertx.undeploy(context.deploymentID(), ctx.asyncAssertSuccess(ar -> { async.complete(); })); }); } }, ctx.asyncAssertSuccess()); }
protected Properties getConsumerProperties(Properties overrides) { Properties result = new Properties(); //populate defaults result.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); result.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testingConsumer"); result.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName()); result.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName()); setSecurityConfigs(result, "consumer"); result.setProperty(LiKafkaConsumerConfig.MESSAGE_ASSEMBLER_BUFFER_CAPACITY_CONFIG, "300000"); result.setProperty(LiKafkaConsumerConfig.MESSAGE_ASSEMBLER_EXPIRATION_OFFSET_GAP_CONFIG, "10000"); result.setProperty(LiKafkaConsumerConfig.EXCEPTION_ON_MESSAGE_DROPPED_CONFIG, "true"); result.setProperty(LiKafkaConsumerConfig.MAX_TRACKED_MESSAGES_PER_PARTITION_CONFIG, "10000"); result.setProperty(LiKafkaConsumerConfig.SEGMENT_DESERIALIZER_CLASS_CONFIG, DefaultSegmentDeserializer.class.getCanonicalName()); //apply overrides if (overrides != null) { result.putAll(overrides); } return result; }
@Test public void testSerde() { Serializer<String> stringSerializer = new StringSerializer(); Deserializer<String> stringDeserializer = new StringDeserializer(); Serializer<LargeMessageSegment> segmentSerializer = new DefaultSegmentSerializer(); Deserializer<LargeMessageSegment> segmentDeserializer = new DefaultSegmentDeserializer(); String s = LiKafkaClientsTestUtils.getRandomString(100); assertEquals(s.length(), 100); byte[] stringBytes = stringSerializer.serialize("topic", s); assertEquals(stringBytes.length, 100); LargeMessageSegment segment = new LargeMessageSegment(LiKafkaClientsUtils.randomUUID(), 0, 2, stringBytes.length, ByteBuffer.wrap(stringBytes)); // String bytes + segment header byte[] serializedSegment = segmentSerializer.serialize("topic", segment); assertEquals(serializedSegment.length, 1 + stringBytes.length + LargeMessageSegment.SEGMENT_INFO_OVERHEAD + 4); LargeMessageSegment deserializedSegment = segmentDeserializer.deserialize("topic", serializedSegment); assertEquals(deserializedSegment.messageId, segment.messageId); assertEquals(deserializedSegment.messageSizeInBytes, segment.messageSizeInBytes); assertEquals(deserializedSegment.numberOfSegments, segment.numberOfSegments); assertEquals(deserializedSegment.sequenceNumber, segment.sequenceNumber); assertEquals(deserializedSegment.payload.limit(), 100); String deserializedString = stringDeserializer.deserialize("topic", deserializedSegment.payloadArray()); assertEquals(deserializedString.length(), s.length()); }
public KafkaCommandListener(String zookeeper, String groupId, ObjectMapper objectMapper, EventPublisher applicationEventPublisher, String aggregateRootName) { resolver = ActionHandlerResolver.getCurrent(); Properties props = new Properties(); props.put("bootstrap.servers", zookeeper); props.put("group.id", groupId); props.put("key.deserializer", StringDeserializer.class); props.put("value.deserializer", StringDeserializer.class); props.put("enable.auto.commit", "false"); props.put("auto.offset.reset", "earliest"); this.consumer = new KafkaConsumer(props); this.objectMapper = objectMapper; this.applicationEventPublisher = applicationEventPublisher; this.aggregateRootName = aggregateRootName; this.applicationEventsStream = aggregateRootName + APPLICATION_EVENTS; }
public void run() { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.99.100:9092"); props.put("group.id", "geolocationConsumer"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Arrays.asList("geolocations")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); REPO.addGeoLocation(GSON.fromJson(record.value(), GeoLocation.class)); } } } catch (Exception e) { System.err.println("Error while consuming geolocations. Details: " + e.getMessage()); } }
private KafkaConsumer<String, Serializable> getConsumer(String groupId) { KafkaConsumer<String, Serializable> kafkaConsumer = null; if ((kafkaConsumer = kafkaConsumers.get(groupId))!= null) return kafkaConsumer; Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); kafkaConsumer = new KafkaConsumer<String, Serializable>(properties); kafkaConsumers.put(groupId, kafkaConsumer); return kafkaConsumer; }
public static void main(String[] args) { String topic = "persistent://sample/standalone/ns/my-topic"; Properties props = new Properties(); props.put("bootstrap.servers", "pulsar://localhost:6650"); props.put("group.id", "my-subscription-name"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", IntegerDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); Consumer<Integer, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); while (true) { ConsumerRecords<Integer, String> records = consumer.poll(100); records.forEach(record -> { log.info("Received record: {}", record); }); // Commit last offset consumer.commitSync(); } }
public static void main(String[] args) throws IOException { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka0:19092"); props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper0:12181/kafka"); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); TopologyBuilder builder = new TopologyBuilder(); builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "words") .addProcessor("WordCountProcessor", WordCountProcessor::new, "SOURCE") .addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "WordCountProcessor") // .connectProcessorAndStateStores("WordCountProcessor", "Counts") .addSink("SINK", "count", new StringSerializer(), new IntegerSerializer(), "WordCountProcessor"); KafkaStreams stream = new KafkaStreams(builder, props); stream.start(); System.in.read(); stream.close(); stream.cleanUp(); }
@Test public void testInferKeyCoder() { CoderRegistry registry = CoderRegistry.createDefault(); assertTrue(KafkaIO.inferCoder(registry, LongDeserializer.class).getValueCoder() instanceof VarLongCoder); assertTrue(KafkaIO.inferCoder(registry, StringDeserializer.class).getValueCoder() instanceof StringUtf8Coder); assertTrue(KafkaIO.inferCoder(registry, InstantDeserializer.class).getValueCoder() instanceof InstantCoder); assertTrue(KafkaIO.inferCoder(registry, DeserializerWithInterfaces.class).getValueCoder() instanceof VarLongCoder); }
@Test public void testApplyUdfsToColumns() throws Exception { final String testStreamName = "SelectUDFsStream".toUpperCase(); final String queryString = String.format( "CREATE STREAM %s AS SELECT %s FROM %s WHERE %s;", testStreamName, "ITEMID, ORDERUNITS*10, PRICEARRAY[0]+10, KEYVALUEMAP['key1']*KEYVALUEMAP['key2']+10, PRICEARRAY[1]>1000", "ORDERS", "ORDERUNITS > 20 AND ITEMID LIKE '%_8'" ); ksqlContext.sql(queryString); Schema resultSchema = ksqlContext.getMetaStore().getSource(testStreamName).getSchema(); Map<String, GenericRow> expectedResults = Collections.singletonMap("8", new GenericRow(Arrays.asList(null, null, "ITEM_8", 800.0, 1110.0, 12.0, true))); Map<String, GenericRow> results = testHarness.consumeData(testStreamName, resultSchema, 4, new StringDeserializer(), IntegrationTestHarness.RESULTS_POLL_MAX_TIME_MS); assertThat(results, equalTo(expectedResults)); }
@NotNull private static Properties createConsumerProperties(String bootstrapServer) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); props.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroup" +new Random().nextInt()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; }
private static Map<String, Location> getVehicleStartPoints() { Map<String, Location> vehicleStartPoint = new HashMap<String, Location>(); Properties props = new Properties(); props.put("zookeeper.connect", ZOOKEEPER_CONNECTION_STRING); props.put("group.id", "DataLoader" + r.nextInt(100)); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); props.put("auto.offset.reset", "smallest"); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(KAFKA_TOPIC_STATIC_DATA, new Integer(1)); KafkaStream<byte[], byte[]> stream = consumer.createMessageStreams(topicCountMap).get(KAFKA_TOPIC_STATIC_DATA) .get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { String message = new String(it.next().message()); try { vehicleStartPoint = objectMapper.readValue(message, new TypeReference<Map<String, Location>>() { }); } catch (IOException e) { e.printStackTrace(); } break; } consumer.shutdown(); return vehicleStartPoint; }
@Test public void testReportingMetrics() { Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MetricSerde.class.getName()); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testReportingMetrics"); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); setSecurityConfigs(props, "consumer"); Consumer<String, CruiseControlMetric> consumer = new KafkaConsumer<>(props); ConsumerRecords<String, CruiseControlMetric> records = ConsumerRecords.empty(); consumer.subscribe(Collections.singletonList(TOPIC)); long startMs = System.currentTimeMillis(); Set<Integer> metricTypes = new HashSet<>(); while (metricTypes.size() < 16 && System.currentTimeMillis() < startMs + 15000) { records = consumer.poll(10); for (ConsumerRecord<String, CruiseControlMetric> record : records) { metricTypes.add((int) record.value().metricType().id()); } } HashSet<Integer> expectedMetricTypes = new HashSet<>(Arrays.asList((int) ALL_TOPIC_BYTES_IN.id(), (int) ALL_TOPIC_BYTES_OUT.id(), (int) TOPIC_BYTES_IN.id(), (int) TOPIC_BYTES_OUT.id(), (int) PARTITION_SIZE.id(), (int) BROKER_CPU_UTIL.id(), (int) ALL_TOPIC_PRODUCE_REQUEST_RATE.id(), (int) ALL_TOPIC_FETCH_REQUEST_RATE.id(), (int) ALL_TOPIC_MESSAGES_IN_PER_SEC.id(), (int) TOPIC_PRODUCE_REQUEST_RATE.id(), (int) TOPIC_FETCH_REQUEST_RATE.id(), (int) TOPIC_MESSAGES_IN_PER_SEC.id(), (int) BROKER_PRODUCE_REQUEST_RATE.id(), (int) BROKER_CONSUMER_FETCH_REQUEST_RATE.id(), (int) BROKER_FOLLOWER_FETCH_REQUEST_RATE.id(), (int) BROKER_REQUEST_HANDLER_AVG_IDLE_PERCENT.id())); assertEquals("Expected to see " + expectedMetricTypes + ", but only see " + metricTypes, metricTypes, expectedMetricTypes); }
@Test public void test() throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "127.0.0.1:9092");//该地址是集群的子集,用来探测集群。 props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group"); props.put("acks", "all");// 记录完整提交,最慢的但是最大可能的持久化 props.put("retries", 3);// 请求失败重试的次数 props.put("batch.size", 16384);// batch的大小 props.put("linger.ms", 1);// 默认情况即使缓冲区有剩余的空间,也会立即发送请求,设置一段时间用来等待从而将缓冲区填的更多,单位为毫秒,producer发送数据会延迟1ms,可以减少发送到kafka服务器的请求数据 props.put("buffer.memory", 33554432);// 提供给生产者缓冲内存总量 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer.tracing.codec", Codec.JSON); props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); Consumer<String, byte[]> consumer = new KafkaConsumer<>(props); ListenableTracingConsumer<String, String> listenableTracingConsumer = new ListenableTracingConsumer<>(consumer, Pattern.compile("test"), new StringDeserializer()); BraveFactoryBean factoryBean = new BraveFactoryBean(); factoryBean.setServiceName("kafka-test"); factoryBean.setTransport("http"); factoryBean.setTransportAddress("127.0.0.1:9411"); factoryBean.afterPropertiesSet(); Brave brave = factoryBean.getObject(); listenableTracingConsumer.addListener(new AbstractTracingListener<String, String>(brave) { @Override public void onPayload(Payload<String, String> payload) { try { System.out.println("key: " + payload.key()); System.out.println("value: " + payload.value()); Sleeper.JUST.sleepFor(2000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); listenableTracingConsumer.start(); System.in.read(); }
@Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); // list of host:port pairs used for establishing the initial connections to the Kakfa cluster props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); // allows a pool of processes to divide the work of consuming and processing records props.put(ConsumerConfig.GROUP_ID_CONFIG, "bot"); return props; }
/** * Kafka consumer configuration bean. This {@link Map} is used by {@link MessageConsumerConfig#consumerFactory}. * * @return kafka properties */ @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHosts); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); //props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); //props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; }