public String receive() { KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); consumer.subscribe(Arrays.asList(properties.getProperty("topic"))); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); System.err.println(buffer.size() + "----->" + record); } if (buffer.size() >= minBatchSize) { writeFileToHadoop(buffer);//先把buffer写入文件中 consumer.commitSync(); buffer.clear(); } } }
@Override public void run() { while (true) { ConsumerRecords<byte[], ChangeDataRecord> changeRecords = consumer.poll(KAFKA_CONSUMER_POLL_TIMEOUT); for (ConsumerRecord<byte[], ChangeDataRecord> consumerRecord : changeRecords) { // The ChangeDataRecord contains all the changes made to a document ChangeDataRecord changeDataRecord = consumerRecord.value(); String documentId = changeDataRecord.getId().getString(); // Handle 'RECORD_INSERT' if (changeDataRecord.getType() == ChangeDataRecordType.RECORD_INSERT && this.onInsert != null) { this.onInsert.handle(documentId); } // Handle 'RECORD_DELETE' if (changeDataRecord.getType() == ChangeDataRecordType.RECORD_DELETE && this.onDelete != null) { this.onDelete.handle(documentId); } } } }
public void run() { try { printJson(new StartupComplete()); consumer.subscribe(Collections.singletonList(topic), this); while (true) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); Map<TopicPartition, OffsetAndMetadata> offsets = onRecordsReceived(records); if (!useAutoCommit) { if (useAsyncCommit) consumer.commitAsync(offsets, this); else commitSync(offsets); } } } catch (WakeupException e) { // ignore, we are closing } finally { consumer.close(); printJson(new ShutdownComplete()); shutdownLatch.countDown(); } }
public List<String> receive() { KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); consumer.subscribe(Arrays.asList(properties.getProperty("topic"))); List<String> buffer = new ArrayList<String>(); String msg = ""; while (true) { System.err.println("consumer receive------------------"); ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record.value()); } consumer.close(); return buffer; } }
@Override public void run() { try { consumer.subscribe(Collections.singletonList(Config.getProperty("input_topic"))); while (!closed.get()) { ConsumerRecords<String, String> records = consumer.poll(3000); try { consumer.commitSync(); // commit } catch (Exception ignored) { } if (records.count() > 0) { handler.consume(records); } } } catch (WakeupException e) { if (!closed.get()) throw e; } finally { consumer.close(); } }
@SuppressWarnings("unchecked") @Override public void fillRowSet(RowSet rowSet) throws BiremeException { CommitCallback callback = changeSet.callback; HashMap<String, Long> offsets = ((KafkaCommitCallback) callback).partitionOffset; Row row = null; for (ConsumerRecord<String, String> change : (ConsumerRecords<String, String>) changeSet.changes) { row = new Row(); if (!transform(change, row)) { continue; } addToRowSet(row, rowSet); offsets.put(change.topic() + "+" + change.partition(), change.offset()); callback.setNewestRecord(row.produceTime); } callback.setNumOfTables(rowSet.rowBucket.size()); rowSet.callback = callback; }
/** * 处理通过kafka consumer获取到的一批消息 * * @param records 待处理的消息集合 */ public void handleMessages(ConsumerRecords<String, byte[]> records) { // 按记录进行处理 for (ConsumerRecord<String, byte[]> record : records) { // 过滤掉暂停的topic的消息 if (consumerListener.filterPausedTopic(record.topic())) { listener.increaseFlowSize(record.serializedValueSize()); try { this.chooseProcessor(record.key(), record.topic()).process(record); } catch (Exception e) { logger.error("sport process error", e); consumerListener.seek(record); break; } } else { listener.reduceFlowSize(record.serializedValueSize()); consumerListener.syncOffset(record); logger.info("The record of topic {} was skipped whose offset is {}", record.topic(), record.offset()); } } }
/** * Get the next batch of records by polling. * @return Next batch of records or null if no records available. */ private ConsumerRecords<byte[], byte[]> pollRequests() { ConsumerRecords<byte[], byte[]> records = null; try { records = consumer.poll(pollTimeMs); } catch (final InvalidOffsetException e) { resetInvalidOffsets(e); } if (rebalanceException != null) { if (!(rebalanceException instanceof ProducerFencedException)) { throw new StreamsException(logPrefix + " Failed to rebalance.", rebalanceException); } } return records; }
/** * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from are * already configured in the consumer). * * @param topic Kafka topic to read messages from * @param consumerConfig Kafka consumer configuration * @param maxMessages Maximum number of messages to read via the consumer * @return The KeyValue elements retrieved via the consumer */ public static <K, V> List<KeyValue<K, V>> readKeyValues(String topic, Properties consumerConfig, int maxMessages) { KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerConfig); consumer.subscribe(Collections.singletonList(topic)); int pollIntervalMs = 100; int maxTotalPollTimeMs = 2000; int totalPollTimeMs = 0; List<KeyValue<K, V>> consumedValues = new ArrayList<>(); while (totalPollTimeMs < maxTotalPollTimeMs && continueConsuming(consumedValues.size(), maxMessages)) { totalPollTimeMs += pollIntervalMs; ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs); for (ConsumerRecord<K, V> record : records) { consumedValues.add(new KeyValue<>(record.key(), record.value())); } } consumer.close(); return consumedValues; }
@SuppressWarnings("unchecked") private IExpectationSetters<Object> expectOnePoll() { // Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of // returning empty data, we return one record. The expectation is that the data will be ignored by the // response behavior specified using the return value of this method. EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( new IAnswer<ConsumerRecords<byte[], byte[]>>() { @Override public ConsumerRecords<byte[], byte[]> answer() throws Throwable { // "Sleep" so time will progress time.sleep(1L); ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>( Collections.singletonMap( new TopicPartition(TOPIC, PARTITION), Arrays.asList( new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0L, 0, 0, RAW_KEY, RAW_VALUE) ))); recordsReturned++; return records; } }); EasyMock.expect(keyConverter.toConnectData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)); EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)); sinkTask.put(EasyMock.anyObject(Collection.class)); return EasyMock.expectLastCall(); }
public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.77.7:9092,192.168.77.7:9093,192.168.77.7:9094"); props.put("group.id", "test-group-id"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test")); System.out.println("Subscribed to topic test"); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.println(String.format("offset = %s, key = %s, value = %s", record.offset(), record.key(), record.value())); } }
public static void main(String[] args) { // TODO Auto-generated method stub Properties props = new Properties(); props.put("bootstrap.servers", "123.207.61.225:9092"); props.put("group.id", "test-consumer-group"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("log.level", "info"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } }
private static ConsumerRecord<byte[], byte[]> retrieveOneMessage(KafkaConsumer kafkaConsumer, TopicPartition topicPartition, long offset) { kafkaConsumer.seek(topicPartition, offset); ConsumerRecords<byte[], byte[]> records; ConsumerRecord<byte[], byte[]> record = null; while (record == null) { records = kafkaConsumer.poll(100); if (!records.isEmpty()) { LOG.debug("records.count() = {}", records.count()); List<ConsumerRecord<byte[], byte[]>> reclist = records.records(topicPartition); if (reclist != null && !reclist.isEmpty()) { record = reclist.get(0); break; } else { LOG.info("recList is null or empty"); } } } return record; }
public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("demo-topic1")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.println(record.toString()); } } catch(Exception e) { LOGGER.error("Exception occured while consuing messages",e); }finally { consumer.close(); } }
private void failOnKafkaPoll() { drGrid().compute().broadcast(new ActiveStoreIgniteRunnable() { @Inject private transient KafkaFactory kafkaFactory; @Override protected void runInjected() { ((KafkaFactoryForTests)kafkaFactory).substituteConsumers( new ConsumerAdapter() { @Override public ConsumerRecords poll(long timeout) { throw new KafkaException("Poll failed"); } }); } }); kafkaConsumerFailing = true; }
/** * This will consume all records from only the partitions given. * @param topic Topic to consume from. * @param partitionIds Collection of PartitionIds to consume. * @return List of ConsumerRecords consumed. */ public List<ConsumerRecord<byte[], byte[]>> consumeAllRecordsFromTopic(final String topic, Collection<Integer> partitionIds) { // Create topic Partitions List<TopicPartition> topicPartitions = new ArrayList<>(); for (Integer partitionId: partitionIds) { topicPartitions.add(new TopicPartition(topic, partitionId)); } // Connect Consumer KafkaConsumer<byte[], byte[]> kafkaConsumer = kafkaTestServer.getKafkaConsumer(ByteArrayDeserializer.class, ByteArrayDeserializer.class); // Assign topic partitions & seek to head of them kafkaConsumer.assign(topicPartitions); kafkaConsumer.seekToBeginning(topicPartitions); // Pull records from kafka, keep polling until we get nothing back final List<ConsumerRecord<byte[], byte[]>> allRecords = new ArrayList<>(); ConsumerRecords<byte[], byte[]> records; do { // Grab records from kafka records = kafkaConsumer.poll(2000L); logger.info("Found {} records in kafka", records.count()); // Add to our array list records.forEach(allRecords::add); } while (!records.isEmpty()); // close consumer kafkaConsumer.close(); // return all records return allRecords; }
private ConsumerRecords createConsumerRecords(final int count) { final String topic = "MyTopic"; final int partition = 0; final Map<TopicPartition, List<ConsumerRecord>> recordsMap = new HashMap<>(); final TopicPartition topicPartition = new TopicPartition(topic, partition); final List<ConsumerRecord> consumerRecords = new ArrayList<>(); for (int x = 0; x < count; x++) { consumerRecords.add( new ConsumerRecord<Object, Object>(topic, partition, x, "Key" + x, "Value" + x) ); } recordsMap.put(topicPartition, consumerRecords); return new ConsumerRecords(recordsMap); }
/** * Return a map containing one List of records per partition. * This internally creates a Kafka Consumer using the provided consumer properties. * * @param numPtns * @param consumerProperties * @return A Map of Partitions(Integer) and the resulting List of messages (byte[]) retrieved */ public static Map<Integer, List<byte[]>> retrieveRecordsFromPartitions(String topic, int numPtns, Properties consumerProperties) { Map<Integer, List<byte[]>> resultsMap = new HashMap<Integer, List<byte[]>>(); for (int i = 0; i < numPtns; i++) { List<byte[]> partitionResults = new ArrayList<byte[]>(); resultsMap.put(i, partitionResults); KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(consumerProperties); TopicPartition partition = new TopicPartition(topic, i); consumer.assign(Arrays.asList(partition)); ConsumerRecords<String, byte[]> records = consumer.poll(1000); for (ConsumerRecord<String, byte[]> record : records) { partitionResults.add(record.value()); } consumer.close(); } return resultsMap; }
private void pollAndCommitTransactionsBatch() { ConsumerRecords<ByteBuffer, ByteBuffer> records = consumer.poll(POLL_TIMEOUT); List<TransactionScope> scopes = new ArrayList<>(records.count()); for (ConsumerRecord<ByteBuffer, ByteBuffer> record : records) { TransactionScope transactionScope = serializer.deserialize(record.key()); if (transactionScope.getScope().isEmpty()) { LOGGER.warn("[R] {} polled empty transaction {}", readerId, transactionScope.getTransactionId()); } TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); buffer.put(transactionScope.getTransactionId(), new TransactionData(transactionScope, record.value(), topicPartition, record.offset())); scopes.add(transactionScope); committedOffsetMap.computeIfAbsent(topicPartition, COMMITTED_OFFSET).notifyRead(record.offset()); } if (!scopes.isEmpty()) { scopes.sort(SCOPE_COMPARATOR); LOGGER.trace("[R] {} polled {}", readerId, scopes); } approveAndCommitTransactionsBatch(scopes); }
private void keepPolling() throws InterruptedException { // keep on polling until shutdown for this thread is called. while (!shutdown) { ConsumerRecords<K, V> records = consumer.poll(pollingTime); // if polling gave no tasks, then sleep this thread for n seconds. if (records.isEmpty()) { log.debug("NO RECORDS fetched from queue. Putting current THREAD to SLEEP."); Thread.sleep(sleepTime); continue; } log.info("Processing a batch of records."); if (!processor.process(records)) { log.error("ERROR occurred while PROCESSING RECORDS."); } } }
private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) { for (ConsumerRecord<byte[], byte[]> msg : msgs) { log.trace("Consuming message with key {}, value {}", msg.key(), msg.value()); SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key()); SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value()); SinkRecord record = new SinkRecord(msg.topic(), msg.partition(), keyAndSchema.schema(), keyAndSchema.value(), valueAndSchema.schema(), valueAndSchema.value(), msg.offset(), ConnectUtils.checkAndConvertTimestamp(msg.timestamp()), msg.timestampType()); record = transformationChain.apply(record); if (record != null) { messageBatch.add(record); } } }
/** * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from * are already configured in the consumer). * * @param topic Kafka topic to read messages from * @param consumer Kafka consumer * @param waitTime Maximum wait time in milliseconds * @param maxMessages Maximum number of messages to read via the consumer * @return The KeyValue elements retrieved via the consumer */ private static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic, final Consumer<K, V> consumer, final long waitTime, final int maxMessages) { final List<KeyValue<K, V>> consumedValues; consumer.subscribe(Collections.singletonList(topic)); final int pollIntervalMs = 100; consumedValues = new ArrayList<>(); int totalPollTimeMs = 0; while (totalPollTimeMs < waitTime && continueConsuming(consumedValues.size(), maxMessages)) { totalPollTimeMs += pollIntervalMs; final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs); for (final ConsumerRecord<K, V> record : records) { consumedValues.add(new KeyValue<>(record.key(), record.value())); } } return consumedValues; }
public static void main(String[] args) { KafkaConsumer<String, String> consumer = createConsumer(); consumer.subscribe(Arrays.asList(TOPIC)); boolean flag = true; while (true) { ConsumerRecords<String, String> records = consumer.poll(100); if (flag) { Set<TopicPartition> assignments = consumer.assignment(); assignments.forEach(topicPartition -> consumer.seek( topicPartition, 90)); flag = false; } for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
public boolean consume(ConsumerRecords<String, String> records) { for (ConsumerRecord<String, String> consumerRecord : records) { log.info("topic:{}, key:{}, value:{}", consumerRecord.topic(), consumerRecord.key(), consumerRecord.value()); String deviceId = consumerRecord.key(); String payload = consumerRecord.value(); Optional<Device> deviceOpt = authService.findDeviceById(DeviceId.fromString(deviceId)); if (deviceOpt.isPresent()) { Device device = deviceOpt.get(); JsonObject root = (JsonObject) new JsonParser().parse(payload); int messageId = root.getAsJsonPrimitive("messageId").getAsInt(); FromDeviceMsg msg = JsonConverter.convertToTelemetry(root.get("d"), messageId); BasicToDeviceActorSessionMsg basicToDeviceActorSessionMsg = new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(deviceSessionCtx, msg)); processor.process(basicToDeviceActorSessionMsg); } } return true; }
private void expectPollInitialAssignment() { final List<TopicPartition> partitions = asList(TOPIC_PARTITION, TOPIC_PARTITION2); sinkTask.open(partitions); EasyMock.expectLastCall(); EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() { @Override public ConsumerRecords<byte[], byte[]> answer() throws Throwable { rebalanceListener.getValue().onPartitionsAssigned(partitions); return ConsumerRecords.empty(); } }); EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); sinkTask.put(Collections.<SinkRecord>emptyList()); EasyMock.expectLastCall(); }
public void consume() { consumer.subscribe(singletonList("hello_world_topic")); ConsumerRecords<String, String> records = consumer.poll(10000); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); this.receivedRecords.add(record); } }
private synchronized void pollAndDispatchMessage() throws InterruptedException { // 处理记录过程中,不能修改consumer相关的设定 // 拉取需要处理的记录 ConsumerRecords<String, byte[]> allRecords = consumer.poll(10000); // 为每个消息都封装成CALLABLE的形式,并进行调用处理 Iterator<ConsumerRecord<String, byte[]>> iterator = allRecords.iterator(); List<MessageHandler> listJob = new LinkedList<>(); while (iterator.hasNext()) { listJob.add(new MessageHandler(iterator.next())); } executeJobs(listJob); // 全部调用成功,更新消费坐标 consumer.commitAsync(); }
@Override public ConsumerRecords<K, V> poll(long timeout) { ConsumerRecords<K, V> records = consumer.poll(timeout); for (ConsumerRecord<K, V> record : records) { buildAndFinishChildSpan(record); } return records; }
@Override public void listen() { if (this.onInsert == null && this.onUpdate == null && this.onDelete == null) { log.warn("There is no callbacks set. Listening change data records without callbacks has no effect."); } this.consumer.subscribe(Collections.singletonList(this.changelog)); log.info("Start listening changelog '{}'", this.changelog); new Thread(() -> { while (true) { ConsumerRecords<byte[], ChangeDataRecord> changeRecords = consumer.poll(KAFKA_CONSUMER_POLL_TIMEOUT); for (ConsumerRecord<byte[], ChangeDataRecord> consumerRecord : changeRecords) { // The ChangeDataRecord contains all the changes made to a document ChangeDataRecord changeDataRecord = consumerRecord.value(); ChangeDataRecordType recordType = changeDataRecord.getType(); switch (recordType) { case RECORD_INSERT: handleInsert(changeDataRecord); break; case RECORD_UPDATE: handleUpdate(changeDataRecord); break; case RECORD_DELETE: handleDelete(changeDataRecord); break; default: log.warn("Get record of unknown type '{}'. Ignoring ...", recordType); } } } }).start(); }
@Override public void work() { System.out.println("pull"); ConsumerRecords<Long, byte[]> records = getWorker().poll(100); System.out.printf("pulled count = %d %n", records.count()); log.info("pulled count = %d %n", records.count()); for (ConsumerRecord<Long, byte[]> record : records) { orderHandle(record); } }
@Override public void doWork() { consumer.subscribe(Collections.singletonList(this.topic)); ConsumerRecords<Integer, String> records = consumer.poll(1000); for (ConsumerRecord<Integer, String> record : records) { System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } }
@Override public void consume(ConsumerRecords<String, String> records) { for (ConsumerRecord<String, String> record : records) { String msg = record.value(); logger.info(msg); } }
void pollAndUpdate() { final ConsumerRecords<byte[], byte[]> received = consumer.poll(pollMs); for (ConsumerRecord<byte[], byte[]> record : received) { stateMaintainer.update(record); } final long now = time.milliseconds(); if (flushInterval >= 0 && now >= lastFlush + flushInterval) { stateMaintainer.flushState(); lastFlush = now; } }
public void run() { while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(60000); if (records != null && !records.isEmpty()) { log.info("records size:{}", records.count()); boolean success = consume(records); if (success) { log.info("now commit offset"); kafkaConsumer.commitSync(); } } } }
@Override public void run() { KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps); consumer.subscribe(topics); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { logger.trace("received message: {} - {}", record.offset(), record.value()); parseRecordExecutor.execute(new ParseRecord(record)); } } }
@Override public void run() { Properties kprops = new Properties(); kprops.put("bootstrap.servers", kafka.url()); kprops.put("group.id", groupId); // NB: only "true" is valid; no code to handle "false" yet. kprops.put("enable.auto.commit", "true"); kprops.put("auto.commit.interval.ms", commitInterval); kprops.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kprops.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kprops); consumer.subscribe(Arrays.asList(topicName)); Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("heartbeat.consumer").build()) .execute(new Runnable() { @Override public void run() { try { while (true) { logger.trace("==> Poll for records"); ConsumerRecords<String, String> records = consumer.poll(1000); logger.trace("==> Number of records: " + records.count()); // NB: if you want the beginning .. // consumer.seekToBeginning(records.partitions()); for (ConsumerRecord<String, String> record : records) logger.debug("==> ==> offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value()); Thread.sleep(sleepTime); } } catch (InterruptedException e) { logger.info("Heartbeat Consumer Interrupted"); } finally { consumer.close(); } } }); }
@Override public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) { onConsumeCount++; if (throwExceptionOnConsume) throw new KafkaException("Injected exception in FilterConsumerInterceptor.onConsume."); // filters out topic/partitions with partition == FILTER_PARTITION Map<TopicPartition, List<ConsumerRecord<K, V>>> recordMap = new HashMap<>(); for (TopicPartition tp : records.partitions()) { if (tp.partition() != filterPartition) recordMap.put(tp, records.records(tp)); } return new ConsumerRecords<K, V>(recordMap); }
@SuppressWarnings("unchecked") @Override public ConsumerRecords<K, V> poll(final long timeout) { return Retries.tryMe(new Callable<ConsumerRecords<K, V>>() { @Override public ConsumerRecords<K, V> call() throws Exception { return inner.poll(timeout); } }, strategy(), EMPTY_POLL); }
private static Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> getOutputRecords(final KafkaConsumer<byte[], byte[]> consumer, final Map<TopicPartition, Long> committedOffsets) { final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition = new HashMap<>(); long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; boolean allRecordsReceived = false; while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) { final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(500); for (final ConsumerRecord<byte[], byte[]> record : receivedRecords) { maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; addRecord(record, recordPerTopicPerPartition); } if (receivedRecords.count() > 0) { allRecordsReceived = receivedAllRecords( recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("echo"), committedOffsets); } } if (!allRecordsReceived) { throw new RuntimeException("FAIL: did not receive all records after 30 sec idle time."); } return recordPerTopicPerPartition; }
@GET @Path("/readKafkaTopic") public Response readKafkaTopic(Map<String, Object > map) { try { Properties properties = PropertiesUtils.getProps("consumer.properties"); properties.setProperty("client.id","readKafkaTopic"); properties.setProperty("group.id","readKafkaTopic"); //properties.setProperty("bootstrap.servers", "localhost:9092"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); String topic = map.get("topic").toString(); //System.out.println("topic="+topic); TopicPartition topicPartition = new TopicPartition(topic, 0); List<TopicPartition> topics = Arrays.asList(topicPartition); consumer.assign(topics); consumer.seekToEnd(topics); long current = consumer.position(topicPartition); long end = current; current -= 1000; if(current < 0) current = 0; consumer.seek(topicPartition, current); List<String> result = new ArrayList<>(); while (current < end) { //System.out.println("topic position = "+current); ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { result.add(record.value()); //System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } current = consumer.position(topicPartition); } consumer.close(); return Response.ok().entity(result).build(); } catch (Exception e) { logger.error("Error encountered while readKafkaTopic with parameter:{}", JSON.toJSONString(map), e); return Response.status(204).entity(new Result(-1, e.getMessage())).build(); } }