@Override public KafkaConsumer<Long, byte[]> worker() { Properties props = AbstractKafkaClient.configBuilder()// .put(BOOTSTRAP_SERVERS_CONFIG, bootstrap)// .put(GROUP_ID_CONFIG, MallConstants.ORDER_GROUP)// .put(ENABLE_AUTO_COMMIT_CONFIG, true)// .put(MAX_POLL_RECORDS_CONFIG, "100")// .put(SESSION_TIMEOUT_MS_CONFIG, "30000")// .put(FETCH_MIN_BYTES_CONFIG, 1)// .put(AUTO_COMMIT_INTERVAL_MS_CONFIG, AUTO_COMMIT_INTERVAL_MS)// .put(KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName())// .put(VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName())// .build(); this.worker = new KafkaConsumer<>(props); this.worker.subscribe(topics); System.out.printf("started"); return this.worker; }
/** * Creates default message formats. */ private void createDefaultMessageFormats() { final Map<String, String> defaultFormats = new HashMap<>(); defaultFormats.put("Short", ShortDeserializer.class.getName()); defaultFormats.put("ByteArray", ByteArrayDeserializer.class.getName()); defaultFormats.put("Bytes", BytesDeserializer.class.getName()); defaultFormats.put("Double", DoubleDeserializer.class.getName()); defaultFormats.put("Float", FloatDeserializer.class.getName()); defaultFormats.put("Integer", IntegerDeserializer.class.getName()); defaultFormats.put("Long", LongDeserializer.class.getName()); defaultFormats.put("String", StringDeserializer.class.getName()); // Create if needed. for (final Map.Entry<String, String> entry : defaultFormats.entrySet()) { MessageFormat messageFormat = messageFormatRepository.findByName(entry.getKey()); if (messageFormat == null) { messageFormat = new MessageFormat(); } messageFormat.setName(entry.getKey()); messageFormat.setClasspath(entry.getValue()); messageFormat.setJar("n/a"); messageFormat.setDefaultFormat(true); messageFormatRepository.save(messageFormat); } }
@Override public KafkaProducer<Long, byte[]> worker() { Properties props = AbstractKafkaClient.configBuilder()// .put(BOOTSTRAP_SERVERS_CONFIG, bootstrap)// .put(ACKS_CONFIG, "all").put(RETRIES_CONFIG, 3)// .put(BATCH_SIZE_CONFIG, 16384)// .put(LINGER_MS_CONFIG, 1)// .put(BUFFER_MEMORY_CONFIG, 33554432)// .put(KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName())// .put(VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName())// .build(); return this.worker = new KafkaProducer<>(props); }