Java 类org.apache.kafka.common.serialization.DoubleDeserializer 实例源码

项目:kafka-webview    文件:DataLoaderConfig.java   
/**
 * Creates default message formats.
 */
private void createDefaultMessageFormats() {
    final Map<String, String> defaultFormats = new HashMap<>();
    defaultFormats.put("Short", ShortDeserializer.class.getName());
    defaultFormats.put("ByteArray", ByteArrayDeserializer.class.getName());
    defaultFormats.put("Bytes", BytesDeserializer.class.getName());
    defaultFormats.put("Double", DoubleDeserializer.class.getName());
    defaultFormats.put("Float", FloatDeserializer.class.getName());
    defaultFormats.put("Integer", IntegerDeserializer.class.getName());
    defaultFormats.put("Long", LongDeserializer.class.getName());
    defaultFormats.put("String", StringDeserializer.class.getName());

    // Create if needed.
    for (final Map.Entry<String, String> entry : defaultFormats.entrySet()) {
        MessageFormat messageFormat = messageFormatRepository.findByName(entry.getKey());
        if (messageFormat == null) {
            messageFormat = new MessageFormat();
        }
        messageFormat.setName(entry.getKey());
        messageFormat.setClasspath(entry.getValue());
        messageFormat.setJar("n/a");
        messageFormat.setDefaultFormat(true);
        messageFormatRepository.save(messageFormat);
    }
}
项目:KafkaExample    文件:DemoConsumerManualCommit.java   
public static void main(String[] args) throws Exception {
        args = new String[] { "kafka0:19092", "gender-amount", "group4", "consumer2" };
        if (args == null || args.length != 4) {
            System.err.println(
                    "Usage:\n\tjava -jar kafka_consumer.jar ${bootstrap_server} ${topic_name} ${group_name} ${client_id}");
            System.exit(1);
        }
        String bootstrap = args[0];
        String topic = args[1];
        String groupid = args[2];
        String clientid = args[3];

        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrap);
        props.put("group.id", groupid);
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", DoubleDeserializer.class.getName());
        props.put("max.poll.interval.ms", "300000");
        props.put("max.poll.records", "500");
        props.put("auto.offset.reset", "earliest");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
        AtomicLong atomicLong = new AtomicLong();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            records.forEach(record -> {
                System.out.printf("client : %s , topic: %s , partition: %d , offset = %d, key = %s, value = %s%n",
                        clientid, record.topic(), record.partition(), record.offset(), record.key(), record.value());
                if (atomicLong.get() % 10 == 0) {
//                  consumer.commitSync();
                }
            });
        }
    }