@Test public void testProducerConsumer() { String containerIpAddress = cp.getContainerIpAddress(); System.out.println("containerIpAddress = " + containerIpAddress); Integer zookeeperPort = zooCp.getMappedPort(2181); System.out.println("zookeeperPort = " + zookeeperPort); Integer kafkaPort = cp.getMappedPort(9092); System.out.println("kafkaPort = " + kafkaPort); HelloProducer helloProducer = new HelloProducer(); helloProducer.createProducer(cp.kafkaUrl()); HelloConsumer helloConsumer = new HelloConsumer(cp.kafkaUrl()); helloConsumer.consume(); Collection<ConsumerRecord> messages = helloConsumer.getReceivedRecords(); Assert.assertEquals("message consumed", messages.size(), 5); messages.forEach(stringStringConsumerRecord -> { Assert.assertEquals(stringStringConsumerRecord.key(), "testContainers"); Assert.assertEquals(stringStringConsumerRecord.value(), "AreAwesome"); }); }
@Test public void testProducerConsumer() { String host = environment.getServiceHost("kafka_1",29092); Integer port = environment.getServicePort("kafka_1", 29092); HelloProducer helloProducer = new HelloProducer(); helloProducer.createProducer(host+":"+port); HelloConsumer helloConsumer = new HelloConsumer(host+":"+port); helloConsumer.consume(); Collection<ConsumerRecord> messages = helloConsumer.getReceivedRecords(); Assert.assertEquals("message consumed", messages.size(), 5); messages.forEach(stringStringConsumerRecord -> { Assert.assertEquals(stringStringConsumerRecord.key(), "testContainers"); Assert.assertEquals(stringStringConsumerRecord.value(), "AreAwesome"); }); }
private void executeJobs(List<MessageHandler> listJob) throws InterruptedException { List<Future<ConsumerRecord<String, byte[]>>> invokeAll = threadPool.invokeAll(listJob); // 检查每个调用的状态,若调用失败则继续进行调用,直到全部调用完成为止 for (Future<ConsumerRecord<String, byte[]>> future : invokeAll) { Future<ConsumerRecord<String, byte[]>> localFuture = future; boolean futureSuccess = false; while (!futureSuccess) { try { // 检测本次的执行结果,若失败则重试 futureSuccess = localFuture.get() == null; if (!futureSuccess) { localFuture = threadPool.submit(new MessageHandler(localFuture.get())); Thread.sleep(1000);//slow down to avoid continues errors harm } } catch (ExecutionException e) { // 设计中,不会抛出异常 throw new RuntimeException("Unexpected,it should not throw Exception", e); } } } }
private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) { for (ConsumerRecord<byte[], byte[]> msg : msgs) { log.trace("Consuming message with key {}, value {}", msg.key(), msg.value()); SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key()); SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value()); SinkRecord record = new SinkRecord(msg.topic(), msg.partition(), keyAndSchema.schema(), keyAndSchema.value(), valueAndSchema.schema(), valueAndSchema.value(), msg.offset(), ConnectUtils.checkAndConvertTimestamp(msg.timestamp()), msg.timestampType()); record = transformationChain.apply(record); if (record != null) { messageBatch.add(record); } } }
private static boolean processMessage(ConsumerRecord<ByteBuffer, ByteBuffer> rawMsg) { int msg = rawMsg.value().getInt(); // 10% of the messages are dropped if (random.nextInt(10) != 0) { // Sleeping up to 2.5 seconds LOG.info("Processing message: " + msg); try { Thread.sleep(random.nextInt(25)*100L); } catch (InterruptedException e) { e.printStackTrace(); } Integer previous = processedMessages.put(msg, msg); if (previous != null) { LOG.warn(String.format("Message %d was already processed!", msg)); } int total = totalProcessed.incrementAndGet(); LOG.info(String.format("Done processing message: %d. Total processed: %d.", msg, total)); return true; } else { LOG.info("Dropping message: " + msg); return false; } }
public List<String> receive() { KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); consumer.subscribe(Arrays.asList(properties.getProperty("topic"))); List<String> buffer = new ArrayList<String>(); String msg = ""; while (true) { System.err.println("consumer receive------------------"); ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record.value()); } consumer.close(); return buffer; } }
@Override public void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment) { logger.info(cachingDateFormatter.format(System.currentTimeMillis()) + "-" + data.toString()); // router topic String topic = data.topic(); MessageHandler<K, V> messageHandler = messageHandlers.get(topic); if (null == messageHandler) { // TODO:需要处理 未找到注册的MessageHandler throw new RuntimeException("not found MessagHandler Instance"); } // 获取运行时泛型 Type messageType = ((ParameterizedType) messageHandler.getClass().getGenericInterfaces()[0]).getActualTypeArguments()[0]; // create MessageChannel , MessageBuilder messageChannel = new KafkaMessageChannel(acknowledgment); messageChannel.putMessage(data); Message message = MessageBuilder.build(messageType, messageChannel).createMessage(data.key()); messageHandler.handler(message); }
public static void main(String[] args) { KafkaConsumer<String, String> consumer = KafkaConsumerUtil.createConsumer(); consumer.subscribe(Arrays.asList(TOPIC)); boolean flag = true; while (true) { ConsumerRecords<String, String> records = consumer.poll(100); if (flag) { Set<TopicPartition> assignments = consumer.assignment(); assignments.forEach(topicPartition -> consumer.seekToBeginning( Arrays.asList(topicPartition))); flag = false; } for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
@Test public void createFlowCommandBoltTest() throws Exception { ConsumerRecord<String, String> record; String flowId = UUID.randomUUID().toString(); createFlow(flowId); record = cacheConsumer.pollMessage(); assertNotNull(record); assertNotNull(record.value()); InfoData infoData = objectMapper.readValue(record.value(), InfoData.class); ImmutablePair<Flow, Flow> flow = ((FlowInfoData) infoData).getPayload(); assertNotNull(flow); record = nbConsumer.pollMessage(); assertNotNull(record); assertNotNull(record.value()); InfoMessage infoMessage = objectMapper.readValue(record.value(), InfoMessage.class); FlowResponse response = (FlowResponse) infoMessage.getData(); assertNotNull(response); }
@SuppressWarnings("unchecked") private IExpectationSetters<Object> expectOnePoll() { // Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of // returning empty data, we return one record. The expectation is that the data will be ignored by the // response behavior specified using the return value of this method. EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( new IAnswer<ConsumerRecords<byte[], byte[]>>() { @Override public ConsumerRecords<byte[], byte[]> answer() throws Throwable { // "Sleep" so time will progress time.sleep(1L); ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>( Collections.singletonMap( new TopicPartition(TOPIC, PARTITION), Arrays.asList( new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0L, 0, 0, RAW_KEY, RAW_VALUE) ))); recordsReturned++; return records; } }); EasyMock.expect(keyConverter.toConnectData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)); EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)); sinkTask.put(EasyMock.anyObject(Collection.class)); return EasyMock.expectLastCall(); }
@Test @Ignore public void dumpFlowsTopologyEngineBoltTest() throws Exception { ConsumerRecord<String, String> nbRecord; String flowId = UUID.randomUUID().toString(); List<Flow> payload = dumpFlowCommand(flowId); nbRecord = nbConsumer.pollMessage(); assertNotNull(nbRecord); assertNotNull(nbRecord.value()); InfoMessage response = objectMapper.readValue(nbRecord.value(), InfoMessage.class); assertNotNull(response); FlowsResponse responseData = (FlowsResponse) response.getData(); assertNotNull(responseData); assertEquals(payload, responseData.getPayload()); }
@Test public void cacheReceivesWfmTopologyUpdatesAndSendsToTopologyEngine() throws Exception { System.out.println("Network Update Test"); sendSwitchUpdate(sw); ConsumerRecord<String, String> record = teConsumer.pollMessage(); assertNotNull(record); assertNotNull(record.value()); InfoMessage infoMessage = objectMapper.readValue(record.value(), InfoMessage.class); SwitchInfoData data = (SwitchInfoData) infoMessage.getData(); assertNotNull(data); assertEquals(sw, data); }
public static Consumer.Processor<String, Task> getProcessor() { return new Consumer.Processor<String, Task>() { @Override protected Boolean process(ConsumerRecords<String, Task> records) { for (ConsumerRecord<String, Task> record : records) { if (record.key() == null) { log.error("Wrong task encountered. Task meta: {}", record); continue; } IRequestServer requestServer = provider.getRequestServer(record.key()); if (requestServer == null) { log.error("Request Server not found for request type: {}", record.key()); continue; } log.info("Request server found: {}", requestServer.getClass()); try { requestServer.serve(record.value()); } catch (ServiceException se) { log.error("Service Exception occurred while serving request. Error: ", se); continue; } } return true; } }; }
@Test @Ignore // TODO: ignoring on 2018.01.04 - failing in GCP but not Mac - needs troubleshooting public void ctrlDumpHandler() throws Exception { CtrlRequest request = new CtrlRequest( "cachetopology/*", new RequestData("dump"), 1, "dump-correlation-id", Destination.WFM_CTRL); sendMessage(request, topology.getConfig().getKafkaCtrlTopic()); ConsumerRecord<String, String> raw = ctrlConsumer.pollMessage(); assertNotNull(raw); // TODO: FAILED assertNotNull(raw.value()); Message responseGeneric = objectMapper.readValue(raw.value(), Message.class); CtrlResponse response = (CtrlResponse) responseGeneric; ResponseData payload = response.getData(); assertEquals(request.getCorrelationId(), response.getCorrelationId()); assertEquals(CacheTopology.BOLT_ID_CACHE, payload.getComponent()); assertTrue(payload instanceof DumpStateResponseData); }
@Test @Ignore // TODO: ignoring on 2018.01.04 - failing in GCP but not Mac - needs troubleshooting public void ctrlSpecificRoute() throws Exception { CtrlRequest request = new CtrlRequest( "cachetopology/cache", new RequestData("dump"), 1, "route-correlation-id", Destination.WFM_CTRL); sendMessage(request, topology.getConfig().getKafkaCtrlTopic()); ConsumerRecord<String, String> raw = ctrlConsumer.pollMessage(); assertNotNull(raw); // TODO: FAILED assertNotNull(raw.value()); Message responseGeneric = objectMapper.readValue(raw.value(), Message.class); CtrlResponse response = (CtrlResponse) responseGeneric; assertEquals(request.getCorrelationId(), response.getCorrelationId()); }
@Override public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) { if (record.value() instanceof PageViewTypedDemo.PageView) { return ((PageViewTypedDemo.PageView) record.value()).timestamp; } if (record.value() instanceof PageViewTypedDemo.UserProfile) { return ((PageViewTypedDemo.UserProfile) record.value()).timestamp; } if (record.value() instanceof JsonNode) { return ((JsonNode) record.value()).get("timestamp").longValue(); } throw new IllegalArgumentException("JsonTimestampExtractor cannot recognize the record value " + record.value()); }
private void pollCommunicateOnce(Consumer<ByteBuffer, ByteBuffer> consumer) { ConsumerRecords<ByteBuffer, ByteBuffer> records = consumer.poll(POLL_TIMEOUT); if (records.isEmpty()) { if (!stalled && checkStalled(consumer)) { LOGGER.info("[I] Loader stalled {} / {}", f(leadId), f(localLoaderId)); stalled = true; lead.notifyLocalLoaderStalled(leadId, localLoaderId); } // ToDo: Consider sending empty messages for heartbeat sake. return; } if (stalled) { stalled = false; } MutableLongList committedIds = new LongArrayList(records.count()); for (ConsumerRecord<ByteBuffer, ByteBuffer> record : records) { committedIds.add(record.timestamp()); } committedIds.sortThis(); lead.updateInitialContext(localLoaderId, committedIds); consumer.commitSync(); }
@Test public void testReloadOnStart() throws Exception { expectConfigure(); expectStart(Arrays.asList( new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE.array()), new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE.array()), new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array()), new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array()) )); expectStop(); PowerMock.replayAll(); store.configure(DEFAULT_DISTRIBUTED_CONFIG); store.start(); HashMap<ByteBuffer, ByteBuffer> data = Whitebox.getInternalState(store, "data"); assertEquals(TP0_VALUE_NEW, data.get(TP0_KEY)); assertEquals(TP1_VALUE_NEW, data.get(TP1_KEY)); store.stop(); PowerMock.verifyAll(); }
public void setOutputRecords(PageBuilder builder, ConsumerRecords<?, ?> records) { for(ConsumerRecord record: records) { for(int c = 0; c < columnHandler.length; c++) { columnHandler[c].setValue(builder, record, c); } builder.addRecord(); } }
void pollAndUpdate() { final ConsumerRecords<byte[], byte[]> received = consumer.poll(pollMs); for (ConsumerRecord<byte[], byte[]> record : received) { stateMaintainer.update(record); } final long now = time.milliseconds(); if (flushInterval >= 0 && now >= lastFlush + flushInterval) { stateMaintainer.flushState(); lastFlush = now; } }
@Override public ConsumerRecords<K, V> poll(long timeout) { ConsumerRecords<K, V> records = consumer.poll(timeout); for (ConsumerRecord<K, V> record : records) { buildAndFinishChildSpan(record); } return records; }
private ConsumerRecords<String, byte[]> makeConsumerRecords(String randomID, Serializable message) { ConsumerRecord<String, byte[]> record = new ConsumerRecord<>("testMessage", 0, 0, randomID, SerializerDeserializer.toBytes(message)); Map<TopicPartition, List<ConsumerRecord<String, byte[]>>> recordMap = new HashMap<>(); recordMap.put(new TopicPartition("testMessage", 0), Collections.singletonList(record)); return new ConsumerRecords<>(recordMap); }
private void createConsumer(final CountDownLatch latch, final Integer key) throws InterruptedException { ExecutorService executorService = Executors.newSingleThreadExecutor(); final Map<String, Object> consumerProps = KafkaTestUtils .consumerProps("sampleRawConsumer", "false", embeddedKafka); consumerProps.put("auto.offset.reset", "earliest"); executorService.execute(() -> { KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps); TracingKafkaConsumer<Integer, String> tracingKafkaConsumer = new TracingKafkaConsumer<>( kafkaConsumer, mockTracer); tracingKafkaConsumer.subscribe(Collections.singletonList("messages")); while (latch.getCount() > 0) { ConsumerRecords<Integer, String> records = tracingKafkaConsumer.poll(100); for (ConsumerRecord<Integer, String> record : records) { SpanContext spanContext = TracingKafkaUtils .extractSpanContext(record.headers(), mockTracer); assertNotNull(spanContext); assertEquals("test", record.value()); if (key != null) { assertEquals(key, record.key()); } tracingKafkaConsumer.commitSync(); latch.countDown(); } } kafkaConsumer.close(); }); assertTrue(latch.await(30, TimeUnit.SECONDS)); }
private Capture<Collection<SinkRecord>> expectPolls(final long pollDelayMs) throws Exception { // Stub out all the consumer stream/iterator responses, which we just want to verify occur, // but don't care about the exact details here. EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer( new IAnswer<ConsumerRecords<byte[], byte[]>>() { @Override public ConsumerRecords<byte[], byte[]> answer() throws Throwable { // "Sleep" so time will progress time.sleep(pollDelayMs); ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>( Collections.singletonMap( new TopicPartition(TOPIC, PARTITION), Arrays.asList( new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0L, 0, 0, RAW_KEY, RAW_VALUE) ))); recordsReturned++; return records; } }); EasyMock.expect(keyConverter.toConnectData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes(); EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes(); final Capture<SinkRecord> recordCapture = EasyMock.newCapture(); EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture))).andAnswer(new IAnswer<SinkRecord>() { @Override public SinkRecord answer() { return recordCapture.getValue(); } }).anyTimes(); Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL); sinkTask.put(EasyMock.capture(capturedRecords)); EasyMock.expectLastCall().anyTimes(); return capturedRecords; }
@Test public void shouldWriteCheckpointForStandbyReplica() throws Exception { final ProcessorStateManager stateMgr = new ProcessorStateManager( taskId, noPartitions, true, // standby stateDirectory, Collections.singletonMap(persistentStore.name(), persistentStoreTopicName), changelogReader, false); stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); final byte[] bytes = Serdes.Integer().serializer().serialize("", 10); stateMgr.updateStandbyStates(persistentStorePartition, Collections.singletonList( new ConsumerRecord<>(persistentStorePartition.topic(), persistentStorePartition.partition(), 888L, bytes, bytes))); stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap()); final Map<TopicPartition, Long> read = checkpoint.read(); assertThat(read, equalTo(Collections.singletonMap(persistentStorePartition, 889L))); }
public static void verify(final String kafka) { ensureStreamsApplicationDown(kafka); final Map<TopicPartition, Long> committedOffsets = getCommittedOffsets(kafka); final Properties props = new Properties(); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) { final List<TopicPartition> partitions = getAllPartitions(consumer, "data", "echo", "min", "sum"); consumer.assign(partitions); consumer.seekToBeginning(partitions); final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition = getOutputRecords(consumer, committedOffsets); truncate("data", recordPerTopicPerPartition, committedOffsets); verifyMin(recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("min")); verifySum(recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("sum")); verifyAllTransactionFinished(consumer, kafka); // do not modify: required test output System.out.println("ALL-RECORDS-DELIVERED"); } catch (final Exception e) { e.printStackTrace(System.err); System.out.println("FAILED"); } }
/** * Consume tenant command event message. * @param message the tenant command event message */ @Retryable(maxAttemptsExpression = "${application.retry.max-attempts}", backoff = @Backoff(delayExpression = "${application.retry.delay}", multiplierExpression = "${application.retry.multiplier}")) public void consumeEvent(ConsumerRecord<String, String> message) { MDCUtil.put(); try { log.info("Input message {}", message); ObjectMapper mapper = new ObjectMapper() .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); mapper.registerModule(new JavaTimeModule()); try { SystemEvent event = mapper.readValue(message.value(), SystemEvent.class); String command = event.getEventType(); String userKey = String.valueOf(event.getData().get(Constants.USER_KEY)); TenantContext.setCurrent(event.getTenantInfo()); if (Constants.UPDATE_ACCOUNT_EVENT_TYPE.equalsIgnoreCase(command)) { log.info("Start to update account for userKey='{}'", userKey); User user = userService.getUser(userKey); if (user == null) { log.error("Failed to update account. User with userKey='{}' does not exists.", userKey); } else { SystemEventMapper.toUser(event, user); userService.saveUser(user); } } } catch (IOException e) { log.error("Kafka message has incorrect format ", e); } } finally { MDCUtil.remove(); } }
@Test public void updateProfile() { when(userService.getUser(USER_KEY)).thenReturn(new User()); doNothing().when(userService).saveUser(anyObject()); consumer.consumeEvent(new ConsumerRecord<>("test", 0, 0, "", UPDATE_ACCOUNT_EVENT)); verify(userService).getUser(USER_KEY); verify(userService).saveUser(anyObject()); }
@Test public void updateNotExistsProfile() { when(userService.getUser(USER_KEY)).thenReturn(null); doNothing().when(userService).saveUser(anyObject()); consumer.consumeEvent(new ConsumerRecord<>("test", 0, 0, "", UPDATE_ACCOUNT_EVENT)); verify(userService).getUser(USER_KEY); verify(userService, times(0)).saveUser(anyObject()); }
public synchronized void writeFileToHadoop(List<ConsumerRecord<String, String>> buffer) { Configuration configuration = new Configuration(); String str; StringBuffer stringBuffer = new StringBuffer(); try { FileSystem fileSystem = FileSystem.get(configuration); Path path = new Path("/user/hive/output/data.dat"); FSDataOutputStream fsDataOutputStream = fileSystem.create(path); //fileWriter = new FileWriter(file,false); //printWriter = new PrintWriter(fileWriter); for (int i = 0; i < buffer.size(); i++) { str = buffer.get(i).value() + "\t" + buffer.get(i).value() + "\n"; stringBuffer.append(str); //printWriter.println(buffer.get(i).value() + "\t" + buffer.get(i).value()); } fsDataOutputStream.write(stringBuffer.toString().getBytes(),0,stringBuffer.toString().getBytes().length); fsDataOutputStream.flush(); fsDataOutputStream.close(); stringBuffer.delete(0,stringBuffer.length()); insertIntoHive();//存入hive中 //printWriter.flush(); } catch (IOException e) { } }
@Override public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) { onConsumeCount++; if (throwExceptionOnConsume) throw new KafkaException("Injected exception in FilterConsumerInterceptor.onConsume."); // filters out topic/partitions with partition == FILTER_PARTITION Map<TopicPartition, List<ConsumerRecord<K, V>>> recordMap = new HashMap<>(); for (TopicPartition tp : records.partitions()) { if (tp.partition() != filterPartition) recordMap.put(tp, records.records(tp)); } return new ConsumerRecords<K, V>(recordMap); }
boolean isAlive(ConsumerRecords<?, ?> records) { if (endOffset == null) { endOffset = recordStream(records) .findFirst() .map(record -> new TopicPartition(record.topic(), record.partition())) .map(tp -> consumer.endOffsets(Collections.singletonList(tp)).get(tp)) .orElse(null); } alive = endOffset != null && recordStream(records) .map(ConsumerRecord::offset) .max(Long::compareTo) .map(f -> f >= endOffset) .orElse(false); return alive; }
@SuppressWarnings("unchecked") private void handleRecord(ConsumerRecord<String, T> record) { System.out.println("Handling message " + record); if (type == ConsumerRecord.class) { subscriber.onNext((T)record); } else { subscriber.onNext((T)record.value()); } consumer.commitAsync(); sent.incrementAndGet(); }
public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(GROUP_ID_CONFIG, "k"); props.put(ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.LongDeserializer"); props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); try { KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("produktion"), new LogRebalanceListener()); while (true) { ConsumerRecords<Long, String> records = consumer.poll(1000); if (records.count() == 0) continue; System.out.print("Partitions: " + records.partitions()); System.out.println(" Count: " + records.count()); for (ConsumerRecord<Long, String> record : records) System.out.printf("partition=%d, offset= %d, key= %s, value= %s\n", record.partition(), record.offset(), record.key(), record.value()); } } catch (RuntimeException e) { System.out.println("e = " + e); } finally { System.out.println("Closing!"); } }
@Override public boolean transform(ConsumerRecord<String, String> change, Row row) throws BiremeException { MaxwellRecord record = new MaxwellRecord(change.value()); if (filter(record)) { return false; } Table table = cxt.tablesInfo.get(getMappedTableName(record)); row.type = record.type; row.produceTime = record.produceTime; row.originTable = getOriginTableName(record); row.mappedTable = getMappedTableName(record); row.keys = formatColumns(record, table, table.keyNames, false); if (row.type == RowType.INSERT || row.type == RowType.UPDATE) { row.tuple = formatColumns(record, table, table.columnName, false); } if (row.type == RowType.UPDATE) { row.oldKeys = formatColumns(record, table, table.keyNames, true); if (row.keys.equals(row.oldKeys)) { row.oldKeys = null; } } return true; }
@Test public void shouldStartNewTransactionOnCommitIfEosEnabled() throws Exception { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer); task.addRecords(partition1, Collections.singletonList( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))); task.process(); task.commit(); assertTrue(producer.transactionInFlight()); }
private void consume() { ConsumerRecords<String, CoffeeEvent> records = consumer.poll(Long.MAX_VALUE); for (ConsumerRecord<String, CoffeeEvent> record : records) { eventConsumer.accept(record.value()); } consumer.commitSync(); }
@KafkaListener(topics = {"test-topic"}) public void listen(ConsumerRecord<?, ?> record) { logger.debug("listen1 "); Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); logger.debug("listen1 : {}", message); } }
public void doWork(Properties properties, ProteusTask task) { topicsList.add(ConsumerUtils.getTopicName(runnerProperties.getProperty("eu.proteus.kafkaTopic"))); properties.put("bootstrap.servers", properties.get("com.treelogic.proteus.kafka.bootstrapServers")); properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerSerializer"); properties.put("value.deserializer", ProteusSerializer.class.getName()); properties.put("group.id", "proteus-" + ConsumerUtils.getTopicName(runnerProperties.getProperty("eu.proteus.kafkaTopic"))); properties.put("max.poll.records", 100); properties.put("session.timeout.ms", 60000); properties.put("request.timeout.ms", 80000); properties.put("fetch.max.wati.ms", 60000); properties.put("auto.offset.reset", "latest"); kafkaConsumer = new KafkaConsumer<>(properties, new IntegerDeserializer(), new ProteusSerializer()); kafkaConsumer.subscribe(topicsList); try { while (true) { ConsumerRecords<Integer, Measurement> records = kafkaConsumer.poll(Long.MAX_VALUE); for (ConsumerRecord<Integer, Measurement> record : records) { logger.info("Task " + this.getClass().getSimpleName() + " doing work for coil " + record.value().getCoilID() + " on topic " + ConsumerUtils.getTopicName(runnerProperties.getProperty("eu.proteus.kafkaTopic"))); task.doWork(record.key(), record.value(), proteusBucket, topicsList); } } } finally { System.out.println("Cerrariamos la ejecución del hilo < " + this.runnerProperties.getProperty("eu.proteus.kafkaTopic") + " >"); } }
/** * Start a consumer that subscribes to all embeddedKafka topics to help with debugging. * Dumps out the key/msg as byte arrays given that the object types may vary */ private void startAllTopicsConsumer(Map<String, Object> consumerProps) throws InterruptedException { ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.execute(() -> { KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerProps, Serdes.ByteArray().deserializer(), Serdes.ByteArray().deserializer()); try { kafkaEmbedded.consumeFromAllEmbeddedTopics(kafkaConsumer); } catch (Exception e) { throw new RuntimeException(String.format("Error subscribing to all embedded topics"), e); } try { while (true) { ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100); // ConsumerRecords<StatEventKey, StatAggregate> records = kafkaConsumer.poll(100); // for (ConsumerRecord<StatEventKey, StatAggregate> record : records) { for (ConsumerRecord<byte[], byte[]> record : records) { LOGGER.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } } finally { kafkaConsumer.close(); } }); }