public String receive() { KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); consumer.subscribe(Arrays.asList(properties.getProperty("topic"))); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); System.err.println(buffer.size() + "----->" + record); } if (buffer.size() >= minBatchSize) { writeFileToHadoop(buffer);//先把buffer写入文件中 consumer.commitSync(); buffer.clear(); } } }
@OnScheduled public void onScheduled(final ProcessContext context) { try { topic = context.getProperty(TOPIC).getValue(); groupName = context.getProperty(CONSUMER_GROUP_NAME).getValue(); brokerIP = context.getProperty(BROKERIP).getValue(); props = new Properties(); props.put("bootstrap.servers", brokerIP); props.put("group.id", groupName); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); } catch (Exception e) { e.printStackTrace(); } }
public List<String> receive() { KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); consumer.subscribe(Arrays.asList(properties.getProperty("topic"))); List<String> buffer = new ArrayList<String>(); String msg = ""; while (true) { System.err.println("consumer receive------------------"); ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record.value()); } consumer.close(); return buffer; } }
public static void main(String[] args) { KafkaConsumer<String, String> consumer = createConsumer(); consumer.subscribe(Arrays.asList(TOPIC)); boolean flag = true; while (true) { ConsumerRecords<String, String> records = consumer.poll(100); if (flag) { Set<TopicPartition> assignments = consumer.assignment(); assignments.forEach(topicPartition -> consumer.seek( topicPartition, 90)); flag = false; } for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
private void maybeSeekToEnd(final KafkaConsumer<byte[], byte[]> client, final Set<TopicPartition> intermediateTopicPartitions) { final String groupId = options.valueOf(applicationIdOption); final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption); if (intermediateTopicPartitions.size() > 0) { if (!dryRun) { client.seekToEnd(intermediateTopicPartitions); } else { System.out.println("Following intermediate topics offsets will be reset to end (for consumer group " + groupId + ")"); for (final String topic : intermediateTopics) { if (allTopics.contains(topic)) { System.out.println("Topic: " + topic); } } } } }
private static void loopUntilRecordReceived(final String kafka, final boolean eosEnabled) { final Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "broker-compatibility-consumer"); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); if (eosEnabled) { consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)); } final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties); consumer.subscribe(Collections.singletonList(SINK_TOPIC)); while (true) { final ConsumerRecords<String, String> records = consumer.poll(100); for (final ConsumerRecord<String, String> record : records) { if (record.key().equals("key") && record.value().equals("value")) { consumer.close(); return; } } } }
public static void main(String[] args) throws Exception { Properties props = new Properties(); props.setProperty("bootstrap.servers", args[0]); props.setProperty("group.id", UUID.randomUUID().toString()); props.setProperty("key.deserializer", LongDeserializer.class.getName()); props.setProperty("value.deserializer", TradeDeserializer.class.getName()); props.setProperty("auto.offset.reset", "earliest"); KafkaConsumer<Long, Trade> consumer = new KafkaConsumer<>(props); List<String> topics = Arrays.asList(args[1]); consumer.subscribe(topics); System.out.println("Subscribed to topics " + topics); long count = 0; long start = System.nanoTime(); while (true) { ConsumerRecords<Long, Trade> poll = consumer.poll(5000); System.out.println("Partitions in batch: " + poll.partitions()); LongSummaryStatistics stats = StreamSupport.stream(poll.spliterator(), false) .mapToLong(r -> r.value().getTime()).summaryStatistics(); System.out.println("Oldest record time: " + stats.getMin() + ", newest record: " + stats.getMax()); count += poll.count(); long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); long rate = (long) ((double) count / elapsed * 1000); System.out.printf("Total count: %,d in %,dms. Average rate: %,d records/s %n", count, elapsed, rate); } }
public KafkaConsumerEvent(String topic) { super(0l); this.topic = topic; Properties props = HeartBeatConfigContainer.getInstance().getKafkaConsumerConfig(); Properties producerProps = HeartBeatConfigContainer.getInstance().getKafkaProducerConfig(); try { dataConsumer = new KafkaConsumer<>(props); partition0 = new TopicPartition(this.topic, 0); dataConsumer.assign(Arrays.asList(partition0)); dataConsumer.seekToEnd(Arrays.asList(partition0)); KafkaConsumerContainer.getInstances().putConsumer(this.topic, dataConsumer); statProducer = new KafkaProducer<>(producerProps); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } startTime = System.currentTimeMillis(); }
/** * createConsumer - create a new consumer * @return * @throws Exception */ private Consumer<String, String> createConsumer() throws Exception { // Seek to end automatically TopicPartition dataTopicPartition = new TopicPartition(topicName, 0); List<TopicPartition> topics = Arrays.asList(dataTopicPartition); Properties props = ConfUtils.getProps(CONSUMER_PROPS); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.assign(topics); if(offset == -1){ consumer.seekToEnd(topics); logger.info("Consumer seek to end"); }else{ consumer.seek(dataTopicPartition, offset); logger.info(String.format("read changed as offset: %s", consumer.position(dataTopicPartition))); } return consumer; }
private void cleanUp(KafkaConsumer<StatEventKey, StatAggregate> kafkaConsumer, int unCommittedRecCount) { //force a flush of anything in the aggregator if (statAggregator != null) { LOGGER.debug("Forcing a flush of aggregator {} on processor {}", statAggregator, this); flushAggregator(); } if (kafkaConsumer != null) { if (unCommittedRecCount > 0) { LOGGER.debug("Committing kafka offset on processor {}", this); kafkaConsumer.commitSync(); } LOGGER.debug("Closing kafka consumer on processor {}", this); kafkaConsumer.close(); } }
@SuppressWarnings("unchecked") public KafkaMessageConsumer2(final Map<String, Object> consumerConfig, final String topic, final ISenderProvider<T> senderProvider, final MessagePublisherProvider<T, K, V> publisherProvider) { this.consumer = new KafkaConsumer<>(consumerConfig); this.pollTimeout = (Long) consumerConfig.get(Constants.KAFKA_POLL_TIMEOUT); this.senderProvider = senderProvider; this.publisherProvider = publisherProvider; final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); final List<String> subscribedPartitions = (List<String>) consumerConfig.get(Constants.KAFKA_SUBSCRIBED_PARTITIONS); final Collection<TopicPartition> partitions = partitionInfos.stream().filter(p -> subscribedPartitions.contains(Integer.valueOf(p.partition()))) .map(p -> new TopicPartition(p.topic(), p.partition())).collect(Collectors.toList()); LOG.info("Assigning to topic={}, partitions={}", topic, partitions); this.consumer.assign(partitions); }
public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.77.7:9092,192.168.77.7:9093,192.168.77.7:9094"); props.put("group.id", "test-group-id"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test")); System.out.println("Subscribed to topic test"); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.println(String.format("offset = %s, key = %s, value = %s", record.offset(), record.key(), record.value())); } }
public boolean verifyTopicsExist(String kafkaBrokers, Set<String> requiredTopics, boolean checkPartitionCounts) { Properties props = new Properties(); props.put("bootstrap.servers", kafkaBrokers); props.put("group.id", UUID.randomUUID().toString()); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer consumer = new KafkaConsumer(props); try { @SuppressWarnings("unchecked") Map<String, List<PartitionInfo>> topics = consumer.listTopics(); Set<Integer> partitionCount = new HashSet<>(); for (String requiredTopic : requiredTopics) { List<PartitionInfo> partitions = topics.get(requiredTopic); if (partitions == null) { logger.info("Required kafka topic {} not present", requiredTopic); return false; } partitionCount.add(partitions.size()); } if (checkPartitionCounts && partitionCount.size() > 1) { logger.warn("Partition count mismatch in topics {}", Arrays.toString(requiredTopics.toArray())); return false; } return true; } finally { consumer.close(); } }
/** * This will consume all records from all partitions on the given topic. * @param topic Topic to consume from. * @return List of ConsumerRecords consumed. */ public List<ConsumerRecord<byte[], byte[]>> consumeAllRecordsFromTopic(final String topic) { // Connect to broker to determine what partitions are available. KafkaConsumer<byte[], byte[]> kafkaConsumer = kafkaTestServer.getKafkaConsumer( ByteArrayDeserializer.class, ByteArrayDeserializer.class ); final List<Integer> partitionIds = new ArrayList<>(); for (PartitionInfo partitionInfo: kafkaConsumer.partitionsFor(topic)) { partitionIds.add(partitionInfo.partition()); } kafkaConsumer.close(); return consumeAllRecordsFromTopic(topic, partitionIds); }
/** * This will consume all records from only the partitions given. * @param topic Topic to consume from. * @param partitionIds Collection of PartitionIds to consume. * @return List of ConsumerRecords consumed. */ public List<ConsumerRecord<byte[], byte[]>> consumeAllRecordsFromTopic(final String topic, Collection<Integer> partitionIds) { // Create topic Partitions List<TopicPartition> topicPartitions = new ArrayList<>(); for (Integer partitionId: partitionIds) { topicPartitions.add(new TopicPartition(topic, partitionId)); } // Connect Consumer KafkaConsumer<byte[], byte[]> kafkaConsumer = kafkaTestServer.getKafkaConsumer(ByteArrayDeserializer.class, ByteArrayDeserializer.class); // Assign topic partitions & seek to head of them kafkaConsumer.assign(topicPartitions); kafkaConsumer.seekToBeginning(topicPartitions); // Pull records from kafka, keep polling until we get nothing back final List<ConsumerRecord<byte[], byte[]>> allRecords = new ArrayList<>(); ConsumerRecords<byte[], byte[]> records; do { // Grab records from kafka records = kafkaConsumer.poll(2000L); logger.info("Found {} records in kafka", records.count()); // Add to our array list records.forEach(allRecords::add); } while (!records.isEmpty()); // close consumer kafkaConsumer.close(); // return all records return allRecords; }
@Override public void init(AbstractConfiguration config, String brokerId, BrokerListenerFactory factory) { init(config); BROKER_TOPIC = BROKER_TOPIC_PREFIX + "." + brokerId; logger.trace("Initializing Kafka consumer ..."); // consumer config Properties props = new Properties(); props.put("bootstrap.servers", config.getString("bootstrap.servers")); props.put("group.id", UUIDs.shortUuid()); props.put("enable.auto.commit", "true"); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", InternalMessageSerializer.class.getName()); // consumer this.consumer = new KafkaConsumer<>(props); // consumer worker this.worker = new KafkaBrokerWorker(this.consumer, BROKER_TOPIC, factory.newListener()); this.executor.submit(this.worker); }
/** * Create a WebSocket Consumer Client. These instances are intended to be long lived * and run in the background, streaming consumed records to a Web Socket. * @param view What view to consume from. * @param filterDefinitions Any additional filters to apply. * @param startingPosition Defines where the Socket consumer should resume from. * @param sessionIdentifier An identifier for the consumer. */ public SocketKafkaConsumer createWebSocketClient( final View view, final Collection<FilterDefinition> filterDefinitions, final StartingPosition startingPosition, final SessionIdentifier sessionIdentifier) { // Create client config builder final ClientConfig clientConfig = createClientConfig(view, filterDefinitions, sessionIdentifier) .withStartingPosition(startingPosition) .build(); // Create kafka consumer final KafkaConsumer kafkaConsumer = createKafkaConsumer(clientConfig); // Create consumer return new SocketKafkaConsumer(kafkaConsumer, clientConfig); }
private static KafkaConsumer<String, String> createConsumer(Namespace parsedArgs) { String consumerGroup = parsedArgs.getString("consumerGroup"); String brokerList = parsedArgs.getString("brokerList"); Integer numMessagesPerTransaction = parsedArgs.getInt("messagesPerTransaction"); Properties props = new Properties(); props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, numMessagesPerTransaction.toString()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); return new KafkaConsumer<>(props); }
@SuppressWarnings("unchecked") @Override public List<EntityCommand<?>> fetch(String txId) { List<EntityCommand<?>> transactionOperations = new ArrayList<EntityCommand<?>>(); Map<String, Object> consumerConfigs = (Map<String, Object>)configuration.get("kafkaConsumerConfiguration"); consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(consumerConfigs); kafkaConsumer.subscribe(Arrays.asList(txId)); ConsumerRecords<String, String> records = kafkaConsumer.poll(kafkaConsumerPollTimeout); for (ConsumerRecord<String, String> record : records){ LOG.info("offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value()); try { transactionOperations.add(serializer.readFromString(record.value())); } catch (SerializationFailedException e) { LOG.error("Unable to deserialize [{}] because of: {}", record.value(), e.getMessage()); } } kafkaConsumer.close(); return transactionOperations; }
/** * Return a map containing one List of records per partition. * This internally creates a Kafka Consumer using the provided consumer properties. * * @param numPtns * @param consumerProperties * @return A Map of Partitions(Integer) and the resulting List of messages (byte[]) retrieved */ public static Map<Integer, List<byte[]>> retrieveRecordsFromPartitions(String topic, int numPtns, Properties consumerProperties) { Map<Integer, List<byte[]>> resultsMap = new HashMap<Integer, List<byte[]>>(); for (int i = 0; i < numPtns; i++) { List<byte[]> partitionResults = new ArrayList<byte[]>(); resultsMap.put(i, partitionResults); KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(consumerProperties); TopicPartition partition = new TopicPartition(topic, i); consumer.assign(Arrays.asList(partition)); ConsumerRecords<String, byte[]> records = consumer.poll(1000); for (ConsumerRecord<String, byte[]> record : records) { partitionResults.add(record.value()); } consumer.close(); } return resultsMap; }
public static void verify(final String kafka) { ensureStreamsApplicationDown(kafka); final Map<TopicPartition, Long> committedOffsets = getCommittedOffsets(kafka); final Properties props = new Properties(); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) { final List<TopicPartition> partitions = getAllPartitions(consumer, "data", "echo", "min", "sum"); consumer.assign(partitions); consumer.seekToBeginning(partitions); final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition = getOutputRecords(consumer, committedOffsets); truncate("data", recordPerTopicPerPartition, committedOffsets); verifyMin(recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("min")); verifySum(recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("sum")); verifyAllTransactionFinished(consumer, kafka); // do not modify: required test output System.out.println("ALL-RECORDS-DELIVERED"); } catch (final Exception e) { e.printStackTrace(System.err); System.out.println("FAILED"); } }
public ConsumerLoop(int id, String groupId, List<String> topics) { this.id = id; this.topics = topics; Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<>(props); }
public static void main(String[] args) { Map<String, Object> configs = new HashMap<String, Object>(); // bootstrap.servers指定一个或多个broker,不用指定全部的broker,它将自动发现集群中的其余的borker。 configs.put("bootstrap.servers", "192.168.0.107:9092,192.168.0.108:9092,192.168.0.109:9092"); configs.put("group.id", "kafka-test"); // 是否自动确认offset configs.put("enable.auto.commit", "false"); // 自动确认offset的时间间隔 configs.put("auto.commit.interval.ms", "1000"); configs.put("session.timeout.ms", "30000"); configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs); // 消费者订阅的topic, 可同时订阅多个 consumer.subscribe(Arrays.asList("kafka-test")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } // 数据达到批量要求,就写入DB,同步确认offset if (buffer.size() >= minBatchSize) { // insertIntoDb(buffer); consumer.commitSync(); buffer.clear(); } } }
@Test public void testMessageFail() throws PubSubException { String randomMessage = UUID.randomUUID().toString(); String randomID = UUID.randomUUID().toString(); KafkaConsumer<String, byte[]> consumer = makeMockConsumer(randomID, randomMessage); KafkaSubscriber subscriber = new KafkaSubscriber(consumer, 50); PubSubMessage message = subscriber.receive(); Assert.assertNotNull(message); subscriber.fail(message.getId()); Assert.assertTrue(getAndCheck(randomMessage, randomID, subscriber)); }
@PostConstruct public void init() { recomputeStatistics(); Properties consumerProperties = new Properties(); consumerProperties.setProperty("group.id", "mapr.music.statistics"); consumerProperties.setProperty("enable.auto.commit", "true"); consumerProperties.setProperty("auto.offset.reset", "latest"); consumerProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); consumerProperties.setProperty("value.deserializer", "com.mapr.db.cdc.ChangeDataRecordDeserializer"); loginTestUser(MAPR_USER_NAME, MAPR_USER_GROUP); // Create and adjust consumer which is used to consume MapR-DB CDC events for Albums table. KafkaConsumer<byte[], ChangeDataRecord> albumsChangelogConsumer = new KafkaConsumer<>(consumerProperties); albumsChangelogConsumer.subscribe(Collections.singletonList(ALBUMS_CHANGE_LOG)); ChangeDataRecordHandler albumsHandler = new ChangeDataRecordHandler(albumsChangelogConsumer); albumsHandler.setOnDelete((id) -> decrementAlbums()); albumsHandler.setOnInsert((id) -> incrementAlbums()); // Create and adjust consumer which is used to consume MapR-DB CDC events for Artists table. KafkaConsumer<byte[], ChangeDataRecord> artistsChangelogConsumer = new KafkaConsumer<>(consumerProperties); artistsChangelogConsumer.subscribe(Collections.singletonList(ARTISTS_CHANGE_LOG)); ChangeDataRecordHandler artistsHandler = new ChangeDataRecordHandler(artistsChangelogConsumer); artistsHandler.setOnDelete((id) -> decrementArtists()); artistsHandler.setOnInsert((id) -> incrementArtists()); threadFactory.newThread(albumsHandler).start(); threadFactory.newThread(artistsHandler).start(); }
/** * Get the kafka consumer, if it has been retried yet, set it up. * @return Kafka consumer */ KafkaConsumer<byte[], byte[]> getKafkaConsumer() { // If kafkaConsumer is not null, we'll create one. // If it is NOT null, we'll re-use the instance we got passed in from the constructor. // Typically you'd pass in an instance for testing. if (kafkaConsumer == null) { // Construct new consumer kafkaConsumer = new KafkaConsumer<>(getConsumerConfig().getKafkaConsumerProperties()); } return kafkaConsumer; }
@Test public void test() throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "127.0.0.1:9092");//该地址是集群的子集,用来探测集群。 props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group"); props.put("acks", "all");// 记录完整提交,最慢的但是最大可能的持久化 props.put("retries", 3);// 请求失败重试的次数 props.put("batch.size", 16384);// batch的大小 props.put("linger.ms", 1);// 默认情况即使缓冲区有剩余的空间,也会立即发送请求,设置一段时间用来等待从而将缓冲区填的更多,单位为毫秒,producer发送数据会延迟1ms,可以减少发送到kafka服务器的请求数据 props.put("buffer.memory", 33554432);// 提供给生产者缓冲内存总量 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer.tracing.codec", Codec.JSON); props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); Consumer<String, byte[]> consumer = new KafkaConsumer<>(props); ListenableTracingConsumer<String, String> listenableTracingConsumer = new ListenableTracingConsumer<>(consumer, Pattern.compile("test"), new StringDeserializer()); BraveFactoryBean factoryBean = new BraveFactoryBean(); factoryBean.setServiceName("kafka-test"); factoryBean.setTransport("http"); factoryBean.setTransportAddress("127.0.0.1:9411"); factoryBean.afterPropertiesSet(); Brave brave = factoryBean.getObject(); listenableTracingConsumer.addListener(new AbstractTracingListener<String, String>(brave) { @Override public void onPayload(Payload<String, String> payload) { try { System.out.println("key: " + payload.key()); System.out.println("value: " + payload.value()); Sleeper.JUST.sleepFor(2000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); listenableTracingConsumer.start(); System.in.read(); }
static KafkaConsumer<byte[], byte[]> getConsumer(String groupName, String brokerList) { Properties prop = new Properties(); prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); prop.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupName); prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName()); prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName()); return new KafkaConsumer<>(prop); }
/** * Create a new KafkaConsumer, specify the server's ip and port, and groupID. * * @param server ip and port for Kafka server * @param groupID consumer's group id * @return the consumer */ public static KafkaConsumer<String, String> createConsumer(String server, String groupID) { Properties props = new Properties(); props.put("bootstrap.servers", server); props.put("group.id", groupID); props.put("enable.auto.commit", false); props.put("session.timeout.ms", 60000); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); return new KafkaConsumer<String, String>(props); }
public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(GROUP_ID_CONFIG, "a"); props.put(ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("produktion")); System.out.println("Consumer A gestartet!"); while(true) { ConsumerRecords<String, String> records = consumer.poll(1000); if (records.count() == 0) continue; for (ConsumerRecord<String, String> record : records) System.out.printf("offset= %d, key= %s, value= %s\n", record.offset(), record.key(), record.value()); } }
@Test public void testMaxUnackedMessages() throws PubSubException { String randomMessage = UUID.randomUUID().toString(); String randomID = UUID.randomUUID().toString(); KafkaConsumer<String, byte[]> consumer = makeMockConsumer(randomID, randomMessage); KafkaSubscriber subscriber = new KafkaSubscriber(consumer, 1); // Multiple receives without a commit. Assert.assertNotNull(subscriber.receive()); Assert.assertNull(subscriber.receive()); }
@GET @Path("/readKafkaTopic") public Response readKafkaTopic(Map<String, Object > map) { try { Properties properties = PropertiesUtils.getProps("consumer.properties"); properties.setProperty("client.id","readKafkaTopic"); properties.setProperty("group.id","readKafkaTopic"); //properties.setProperty("bootstrap.servers", "localhost:9092"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); String topic = map.get("topic").toString(); //System.out.println("topic="+topic); TopicPartition topicPartition = new TopicPartition(topic, 0); List<TopicPartition> topics = Arrays.asList(topicPartition); consumer.assign(topics); consumer.seekToEnd(topics); long current = consumer.position(topicPartition); long end = current; current -= 1000; if(current < 0) current = 0; consumer.seek(topicPartition, current); List<String> result = new ArrayList<>(); while (current < end) { //System.out.println("topic position = "+current); ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { result.add(record.value()); //System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } current = consumer.position(topicPartition); } consumer.close(); return Response.ok().entity(result).build(); } catch (Exception e) { logger.error("Error encountered while readKafkaTopic with parameter:{}", JSON.toJSONString(map), e); return Response.status(204).entity(new Result(-1, e.getMessage())).build(); } }
/** * Calling nextRecord calls fillBuffer() which has special handling for OutOfRangeExceptions. There is some * recursion in this method, but it should not go on forever and yield a StackOverflow exception. This test verifies * that when the KafkaConsumer is relentlessly throwing OutOfRangeExceptions that we stop trying to fill the buffer * after five attempts and do not have any other errors. */ @Test public void testNextRecordWithRecursiveOutOfRangeException() { final KafkaConsumer<byte[], byte[]> kafkaConsumer = Mockito.mock(KafkaConsumer.class); Mockito.when(kafkaConsumer.assignment()).thenReturn(Sets.newHashSet(new TopicPartition("Foobar", 0))); Mockito.when(kafkaConsumer.partitionsFor(topicName)).thenReturn(Arrays.asList( new PartitionInfo(topicName, 0, null, null, null) )); Mockito.when(kafkaConsumer.poll(300)).thenThrow( new OffsetOutOfRangeException(new HashMap<>()) ); final PersistenceAdapter persistenceAdapter = new InMemoryPersistenceAdapter(); persistenceAdapter.open(Maps.newHashMap()); // Create our consumer final Consumer consumer = new Consumer(kafkaConsumer); consumer.open( getDefaultConfig(topicName), getDefaultVSpoutId(), getDefaultConsumerCohortDefinition(), persistenceAdapter, new LogRecorder(), null ); final Record record = consumer.nextRecord(); assertNull(record); Mockito.verify(kafkaConsumer, Mockito.times(5)).poll(300); consumer.close(); }
private KafkaConsumer<StatEventKey, StatAggregate> buildConsumer() { try { Map<String, Object> props = getConsumerProps(); LOGGER.debug(() -> "Starting aggregation consumer [" + instanceId + "] with properties:\n" + props.entrySet().stream() .map(entry -> " " + entry.getKey() + ": " + entry.getValue().toString()) .collect(Collectors.joining("\n")) ); KafkaConsumer<StatEventKey, StatAggregate> kafkaConsumer = new KafkaConsumer<>( props, statKeySerde.deserializer(), statAggregateSerde.deserializer()); StatisticsAggregationRebalanceListener rebalanceListener = new StatisticsAggregationRebalanceListener( this, kafkaConsumer); kafkaConsumer.subscribe(Collections.singletonList(inputTopic), rebalanceListener); //Update our collection of partitions for later health check use // assignedPartitions = kafkaConsumer.partitionsFor(inputTopic).stream() // .map(PartitionInfo::partition) // .collect(Collectors.toList()); setAssignedPartitions(kafkaConsumer.assignment()); return kafkaConsumer; } catch (Exception e) { LOGGER.error(String.format("Error building consumer for topic %s on processor %s", inputTopic, this), e); throw e; } }
/** * Start a consumer that subscribes to all embeddedKafka topics to help with debugging. * Dumps out the key/msg as byte arrays given that the object types may vary */ private void startAllTopicsConsumer(Map<String, Object> consumerProps) throws InterruptedException { ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.execute(() -> { KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerProps, Serdes.ByteArray().deserializer(), Serdes.ByteArray().deserializer()); try { kafkaEmbedded.consumeFromAllEmbeddedTopics(kafkaConsumer); } catch (Exception e) { throw new RuntimeException(String.format("Error subscribing to all embedded topics"), e); } try { while (true) { ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100); // ConsumerRecords<StatEventKey, StatAggregate> records = kafkaConsumer.poll(100); // for (ConsumerRecord<StatEventKey, StatAggregate> record : records) { for (ConsumerRecord<byte[], byte[]> record : records) { LOGGER.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } } finally { kafkaConsumer.close(); } }); }
/** * Start a consume consuming from both bad events topics, log each message and add each message * into a map keyed by topic name * A {@link CountDownLatch} is returned to allow the caller to wait for the expected number of messages */ private CountDownLatch startBadEventsConsumer(final Map<String, Object> consumerProps, final int expectedMsgCount, List<BadStatMessage> badMessages) { final CountDownLatch latch = new CountDownLatch(expectedMsgCount); ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.execute(() -> { Thread.currentThread().setName("bad-events-consumer-thread"); activeConsumerThreads.incrementAndGet(); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerProps, Serdes.String().deserializer(), Serdes.String().deserializer()); //Subscribe to all bad event topics kafkaConsumer.subscribe(BAD_TOPICS_MAP.values()); try { while (areConsumersEnabled.get()) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); if (records.count() > 0) { for (ConsumerRecord<String, String> record : records) { LOGGER.warn("Bad events Consumer - topic = {}, partition = {}, offset = {}, key = {}, value = {}", record.topic(), record.partition(), record.offset(), record.key(), record.value()); badMessages.add(new BadStatMessage(record.topic(), record.key(), record.value())); latch.countDown(); } if (latch.getCount() == 0) { break; } kafkaConsumer.commitSync(); } } } finally { kafkaConsumer.close(); activeConsumerThreads.decrementAndGet(); } }); return latch; }
protected long getLogSize(KafkaConsumer<String, Serializable> kafkaConsumer, String topic, int partition) { TopicPartition topicPartition = new TopicPartition(topic, partition); List<TopicPartition> asList = Arrays.asList(topicPartition); kafkaConsumer.assign(asList); kafkaConsumer.seekToEnd(asList); long logEndOffset = kafkaConsumer.position(topicPartition); return logEndOffset; }
private static Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> getOutputRecords(final KafkaConsumer<byte[], byte[]> consumer, final Map<TopicPartition, Long> committedOffsets) { final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition = new HashMap<>(); long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; boolean allRecordsReceived = false; while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) { final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(500); for (final ConsumerRecord<byte[], byte[]> record : receivedRecords) { maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; addRecord(record, recordPerTopicPerPartition); } if (receivedRecords.count() > 0) { allRecordsReceived = receivedAllRecords( recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("echo"), committedOffsets); } } if (!allRecordsReceived) { throw new RuntimeException("FAIL: did not receive all records after 30 sec idle time."); } return recordPerTopicPerPartition; }
public static void main(String[] args) throws Exception { CommandLine commandLine = parseCommandLine(args); String brokerStatsZk = commandLine.getOptionValue(BROKERSTATS_ZOOKEEPER); String brokerStatsTopic = commandLine.getOptionValue(BROKERSTATS_TOPIC); String brokerName = commandLine.getOptionValue(BROKERNAME); Set<String> brokerNames = new HashSet<>(); brokerNames.add(brokerName); KafkaConsumer kafkaConsumer = KafkaUtils.getKafkaConsumer(brokerStatsZk, "org.apache.kafka.common.serialization.ByteArrayDeserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer", 1); long startTimestampInMillis = System.currentTimeMillis() - 86400 * 1000L; Map<TopicPartition, Long> offsets = ReplicaStatsManager.getProcessingStartOffsets( kafkaConsumer, brokerStatsTopic, startTimestampInMillis); kafkaConsumer.unsubscribe(); kafkaConsumer.assign(offsets.keySet()); Map<TopicPartition, Long> latestOffsets = kafkaConsumer.endOffsets(offsets.keySet()); kafkaConsumer.close(); Map<Long, BrokerStats> brokerStatsMap = new TreeMap<>(); for (TopicPartition topicPartition : offsets.keySet()) { LOG.info("Start processing {}", topicPartition); long startOffset = offsets.get(topicPartition); long endOffset = latestOffsets.get(topicPartition); List<BrokerStats> statsList = processOnePartition(brokerStatsZk, topicPartition, startOffset, endOffset, brokerNames); for (BrokerStats brokerStats : statsList) { brokerStatsMap.put(brokerStats.getTimestamp(), brokerStats); } LOG.info("Finished processing {}, retrieved {} records", topicPartition, statsList.size()); } for (Map.Entry<Long, BrokerStats> entry: brokerStatsMap.entrySet()) { System.out.println(entry.getKey() + " : " + entry.getValue()); } }
public KafkaConsumerThread(String groupId, List<String> topics) { this.topics = topics; Properties props = new Properties(); props.put("bootstrap.servers", "123.207.61.225:9092"); props.put("group.id", groupId); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<String, String>(props); }