/** * Creates default message formats. */ private void createDefaultMessageFormats() { final Map<String, String> defaultFormats = new HashMap<>(); defaultFormats.put("Short", ShortDeserializer.class.getName()); defaultFormats.put("ByteArray", ByteArrayDeserializer.class.getName()); defaultFormats.put("Bytes", BytesDeserializer.class.getName()); defaultFormats.put("Double", DoubleDeserializer.class.getName()); defaultFormats.put("Float", FloatDeserializer.class.getName()); defaultFormats.put("Integer", IntegerDeserializer.class.getName()); defaultFormats.put("Long", LongDeserializer.class.getName()); defaultFormats.put("String", StringDeserializer.class.getName()); // Create if needed. for (final Map.Entry<String, String> entry : defaultFormats.entrySet()) { MessageFormat messageFormat = messageFormatRepository.findByName(entry.getKey()); if (messageFormat == null) { messageFormat = new MessageFormat(); } messageFormat.setName(entry.getKey()); messageFormat.setClasspath(entry.getValue()); messageFormat.setJar("n/a"); messageFormat.setDefaultFormat(true); messageFormatRepository.save(messageFormat); } }
public static void main(String[] args) throws Exception { args = new String[] { "kafka0:19092", "gender-amount", "group4", "consumer2" }; if (args == null || args.length != 4) { System.err.println( "Usage:\n\tjava -jar kafka_consumer.jar ${bootstrap_server} ${topic_name} ${group_name} ${client_id}"); System.exit(1); } String bootstrap = args[0]; String topic = args[1]; String groupid = args[2]; String clientid = args[3]; Properties props = new Properties(); props.put("bootstrap.servers", bootstrap); props.put("group.id", groupid); props.put("enable.auto.commit", "false"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", DoubleDeserializer.class.getName()); props.put("max.poll.interval.ms", "300000"); props.put("max.poll.records", "500"); props.put("auto.offset.reset", "earliest"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); AtomicLong atomicLong = new AtomicLong(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); records.forEach(record -> { System.out.printf("client : %s , topic: %s , partition: %d , offset = %d, key = %s, value = %s%n", clientid, record.topic(), record.partition(), record.offset(), record.key(), record.value()); if (atomicLong.get() % 10 == 0) { // consumer.commitSync(); } }); } }