Java 类org.apache.kafka.clients.consumer.ConsumerRecord 实例源码

项目:testcontainers-java-module-confluent-platform    文件:ProducerConsumerTest.java   
@Test
public void testProducerConsumer() {
  String containerIpAddress = cp.getContainerIpAddress();
  System.out.println("containerIpAddress = " + containerIpAddress);
  Integer zookeeperPort = zooCp.getMappedPort(2181);
  System.out.println("zookeeperPort = " + zookeeperPort);
  Integer kafkaPort = cp.getMappedPort(9092);
  System.out.println("kafkaPort = " + kafkaPort);

  HelloProducer helloProducer = new HelloProducer();
  helloProducer.createProducer(cp.kafkaUrl());

  HelloConsumer helloConsumer = new HelloConsumer(cp.kafkaUrl());
  helloConsumer.consume();
  Collection<ConsumerRecord> messages = helloConsumer.getReceivedRecords();

  Assert.assertEquals("message consumed", messages.size(), 5);
  messages.forEach(stringStringConsumerRecord -> {
    Assert.assertEquals(stringStringConsumerRecord.key(), "testContainers");
    Assert.assertEquals(stringStringConsumerRecord.value(), "AreAwesome");
  });
}
项目:testcontainers-java-module-confluent-platform    文件:KafkaSingleNodeComposeTest.java   
@Test
public void testProducerConsumer() {
  String host = environment.getServiceHost("kafka_1",29092);
  Integer port = environment.getServicePort("kafka_1", 29092);

  HelloProducer helloProducer = new HelloProducer();
  helloProducer.createProducer(host+":"+port);

  HelloConsumer helloConsumer = new HelloConsumer(host+":"+port);
  helloConsumer.consume();
  Collection<ConsumerRecord> messages = helloConsumer.getReceivedRecords();

  Assert.assertEquals("message consumed", messages.size(), 5);
  messages.forEach(stringStringConsumerRecord -> {
    Assert.assertEquals(stringStringConsumerRecord.key(), "testContainers");
    Assert.assertEquals(stringStringConsumerRecord.value(), "AreAwesome");
  });
}
项目:EasyTransaction    文件:KafkaEasyTransMsgConsumerImpl.java   
private void executeJobs(List<MessageHandler> listJob) throws InterruptedException {
    List<Future<ConsumerRecord<String, byte[]>>> invokeAll = threadPool.invokeAll(listJob);

    // 检查每个调用的状态,若调用失败则继续进行调用,直到全部调用完成为止
    for (Future<ConsumerRecord<String, byte[]>> future : invokeAll) {
        Future<ConsumerRecord<String, byte[]>> localFuture = future;
        boolean futureSuccess = false;
        while (!futureSuccess) {
            try {
                // 检测本次的执行结果,若失败则重试
                futureSuccess = localFuture.get() == null;
                if (!futureSuccess) {
                    localFuture = threadPool.submit(new MessageHandler(localFuture.get()));
                    Thread.sleep(1000);//slow down to avoid continues errors harm
                }
            } catch (ExecutionException e) {
                // 设计中,不会抛出异常
                throw new RuntimeException("Unexpected,it should not throw Exception", e);
            }
        }
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:WorkerSinkTask.java   
private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) {
    for (ConsumerRecord<byte[], byte[]> msg : msgs) {
        log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
        SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key());
        SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value());
        SinkRecord record = new SinkRecord(msg.topic(), msg.partition(),
                keyAndSchema.schema(), keyAndSchema.value(),
                valueAndSchema.schema(), valueAndSchema.value(),
                msg.offset(),
                ConnectUtils.checkAndConvertTimestamp(msg.timestamp()),
                msg.timestampType());
        record = transformationChain.apply(record);
        if (record != null) {
            messageBatch.add(record);
        }
    }
}
项目:kmq    文件:StandaloneProcessor.java   
private static boolean processMessage(ConsumerRecord<ByteBuffer, ByteBuffer> rawMsg) {
    int msg = rawMsg.value().getInt();
    // 10% of the messages are dropped
    if (random.nextInt(10) != 0) {
        // Sleeping up to 2.5 seconds
        LOG.info("Processing message: " + msg);
        try {
            Thread.sleep(random.nextInt(25)*100L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        Integer previous = processedMessages.put(msg, msg);
        if (previous != null) {
            LOG.warn(String.format("Message %d was already processed!", msg));
        }

        int total = totalProcessed.incrementAndGet();
        LOG.info(String.format("Done processing message: %d. Total processed: %d.", msg, total));

        return true;
    } else {
        LOG.info("Dropping message: " + msg);
        return false;
    }
}
项目:WiFiProbeAnalysis    文件:KafkaConsumers.java   
public List<String> receive() {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    consumer.subscribe(Arrays.asList(properties.getProperty("topic")));
    List<String> buffer = new ArrayList<String>();
    String msg = "";
    while (true) {
        System.err.println("consumer receive------------------");
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            buffer.add(record.value());
        }
        consumer.close();
        return buffer;
    }


}
项目:tankms    文件:KafkaMessageListenerAdapter.java   
@Override
public void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment) {
    logger.info(cachingDateFormatter.format(System.currentTimeMillis()) + "-" + data.toString());
    // router topic
    String topic = data.topic();

    MessageHandler<K, V> messageHandler = messageHandlers.get(topic);
    if (null == messageHandler) {
        // TODO:需要处理 未找到注册的MessageHandler
        throw new RuntimeException("not found MessagHandler Instance");
    }
    // 获取运行时泛型
    Type messageType = ((ParameterizedType) messageHandler.getClass().getGenericInterfaces()[0]).getActualTypeArguments()[0];
    // create MessageChannel , MessageBuilder
    messageChannel = new KafkaMessageChannel(acknowledgment);
    messageChannel.putMessage(data);
    Message message = MessageBuilder.build(messageType, messageChannel).createMessage(data.key());
    messageHandler.handler(message);
}
项目:post-kafka-rewind-consumer-offset    文件:KafkaConsumerFromBeginning.java   
public static void main(String[] args) {

        KafkaConsumer<String, String> consumer = KafkaConsumerUtil.createConsumer();
        consumer.subscribe(Arrays.asList(TOPIC));
        boolean flag = true;

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);

            if (flag) {
                Set<TopicPartition> assignments = consumer.assignment();
                assignments.forEach(topicPartition ->
                        consumer.seekToBeginning(
                                Arrays.asList(topicPartition)));
                flag = false;
            }

            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
项目:open-kilda    文件:FlowTopologyTest.java   
@Test
public void createFlowCommandBoltTest() throws Exception {
    ConsumerRecord<String, String> record;
    String flowId = UUID.randomUUID().toString();

    createFlow(flowId);

    record = cacheConsumer.pollMessage();
    assertNotNull(record);
    assertNotNull(record.value());

    InfoData infoData = objectMapper.readValue(record.value(), InfoData.class);
    ImmutablePair<Flow, Flow> flow = ((FlowInfoData) infoData).getPayload();
    assertNotNull(flow);

    record = nbConsumer.pollMessage();
    assertNotNull(record);
    assertNotNull(record.value());

    InfoMessage infoMessage = objectMapper.readValue(record.value(), InfoMessage.class);
    FlowResponse response = (FlowResponse) infoMessage.getData();
    assertNotNull(response);
}
项目:kafka-0.11.0.0-src-with-comment    文件:WorkerSinkTaskThreadedTest.java   
@SuppressWarnings("unchecked")
private IExpectationSetters<Object> expectOnePoll() {
    // Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of
    // returning empty data, we return one record. The expectation is that the data will be ignored by the
    // response behavior specified using the return value of this method.
    EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
            new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                @Override
                public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
                    // "Sleep" so time will progress
                    time.sleep(1L);
                    ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
                            Collections.singletonMap(
                                    new TopicPartition(TOPIC, PARTITION),
                                    Arrays.asList(
                                            new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0L, 0, 0, RAW_KEY, RAW_VALUE)
                                    )));
                    recordsReturned++;
                    return records;
                }
            });
    EasyMock.expect(keyConverter.toConnectData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
    EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
    sinkTask.put(EasyMock.anyObject(Collection.class));
    return EasyMock.expectLastCall();
}
项目:open-kilda    文件:FlowTopologyTest.java   
@Test
@Ignore
public void dumpFlowsTopologyEngineBoltTest() throws Exception {
    ConsumerRecord<String, String> nbRecord;
    String flowId = UUID.randomUUID().toString();

    List<Flow> payload = dumpFlowCommand(flowId);

    nbRecord = nbConsumer.pollMessage();
    assertNotNull(nbRecord);
    assertNotNull(nbRecord.value());

    InfoMessage response = objectMapper.readValue(nbRecord.value(), InfoMessage.class);
    assertNotNull(response);

    FlowsResponse responseData = (FlowsResponse) response.getData();
    assertNotNull(responseData);
    assertEquals(payload, responseData.getPayload());
}
项目:open-kilda    文件:CacheTopologyTest.java   
@Test
public void cacheReceivesWfmTopologyUpdatesAndSendsToTopologyEngine() throws Exception {
    System.out.println("Network Update Test");

    sendSwitchUpdate(sw);

    ConsumerRecord<String, String> record = teConsumer.pollMessage();

    assertNotNull(record);
    assertNotNull(record.value());

    InfoMessage infoMessage = objectMapper.readValue(record.value(), InfoMessage.class);
    SwitchInfoData data = (SwitchInfoData) infoMessage.getData();
    assertNotNull(data);

    assertEquals(sw, data);
}
项目:scalable-task-scheduler    文件:RequestConsumers.java   
public static Consumer.Processor<String, Task> getProcessor() {
    return new Consumer.Processor<String, Task>() {
        @Override
        protected Boolean process(ConsumerRecords<String, Task> records) {
            for (ConsumerRecord<String, Task> record : records) {
                if (record.key() == null) {
                    log.error("Wrong task encountered. Task meta: {}", record);
                    continue;
                }
                IRequestServer requestServer = provider.getRequestServer(record.key());
                if (requestServer == null) {
                    log.error("Request Server not found for request type: {}", record.key());
                    continue;
                }
                log.info("Request server found: {}", requestServer.getClass());
                try {
                    requestServer.serve(record.value());
                } catch (ServiceException se) {
                    log.error("Service Exception occurred while serving request. Error: ", se);
                    continue;
                }
            }
            return true;
        }
    };
}
项目:open-kilda    文件:CacheTopologyTest.java   
@Test
@Ignore // TODO: ignoring on 2018.01.04 - failing in GCP but not Mac - needs troubleshooting
public void ctrlDumpHandler() throws Exception {
    CtrlRequest request = new CtrlRequest(
            "cachetopology/*", new RequestData("dump"), 1, "dump-correlation-id", Destination.WFM_CTRL);

    sendMessage(request, topology.getConfig().getKafkaCtrlTopic());

    ConsumerRecord<String, String> raw = ctrlConsumer.pollMessage();

    assertNotNull(raw);   // TODO: FAILED
    assertNotNull(raw.value());

    Message responseGeneric = objectMapper.readValue(raw.value(), Message.class);
    CtrlResponse response = (CtrlResponse) responseGeneric;
    ResponseData payload = response.getData();

    assertEquals(request.getCorrelationId(), response.getCorrelationId());
    assertEquals(CacheTopology.BOLT_ID_CACHE, payload.getComponent());
    assertTrue(payload instanceof DumpStateResponseData);
}
项目:open-kilda    文件:CacheTopologyTest.java   
@Test
@Ignore // TODO: ignoring on 2018.01.04 - failing in GCP but not Mac - needs troubleshooting
public void ctrlSpecificRoute() throws Exception {
    CtrlRequest request = new CtrlRequest(
            "cachetopology/cache", new RequestData("dump"), 1, "route-correlation-id", Destination.WFM_CTRL);
    sendMessage(request, topology.getConfig().getKafkaCtrlTopic());

    ConsumerRecord<String, String> raw = ctrlConsumer.pollMessage();

    assertNotNull(raw);   // TODO: FAILED
    assertNotNull(raw.value());

    Message responseGeneric = objectMapper.readValue(raw.value(), Message.class);
    CtrlResponse response = (CtrlResponse) responseGeneric;
    assertEquals(request.getCorrelationId(), response.getCorrelationId());
}
项目:kafka-0.11.0.0-src-with-comment    文件:JsonTimestampExtractor.java   
@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
    if (record.value() instanceof PageViewTypedDemo.PageView) {
        return ((PageViewTypedDemo.PageView) record.value()).timestamp;
    }

    if (record.value() instanceof PageViewTypedDemo.UserProfile) {
        return ((PageViewTypedDemo.UserProfile) record.value()).timestamp;
    }

    if (record.value() instanceof JsonNode) {
        return ((JsonNode) record.value()).get("timestamp").longValue();
    }

    throw new IllegalArgumentException("JsonTimestampExtractor cannot recognize the record value " + record.value());
}
项目:Lagerta    文件:LocalLeadContextLoader.java   
private void pollCommunicateOnce(Consumer<ByteBuffer, ByteBuffer> consumer) {
    ConsumerRecords<ByteBuffer, ByteBuffer> records = consumer.poll(POLL_TIMEOUT);

    if (records.isEmpty()) {
        if (!stalled && checkStalled(consumer)) {
            LOGGER.info("[I] Loader stalled {} / {}", f(leadId), f(localLoaderId));
            stalled = true;
            lead.notifyLocalLoaderStalled(leadId, localLoaderId);
        }
        // ToDo: Consider sending empty messages for heartbeat sake.
        return;
    }
    if (stalled) {
        stalled = false;
    }
    MutableLongList committedIds = new LongArrayList(records.count());

    for (ConsumerRecord<ByteBuffer, ByteBuffer> record : records) {
        committedIds.add(record.timestamp());
    }
    committedIds.sortThis();
    lead.updateInitialContext(localLoaderId, committedIds);
    consumer.commitSync();
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaOffsetBackingStoreTest.java   
@Test
public void testReloadOnStart() throws Exception {
    expectConfigure();
    expectStart(Arrays.asList(
            new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE.array()),
            new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE.array()),
            new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array()),
            new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array())
    ));
    expectStop();

    PowerMock.replayAll();

    store.configure(DEFAULT_DISTRIBUTED_CONFIG);
    store.start();
    HashMap<ByteBuffer, ByteBuffer> data = Whitebox.getInternalState(store, "data");
    assertEquals(TP0_VALUE_NEW, data.get(TP0_KEY));
    assertEquals(TP1_VALUE_NEW, data.get(TP1_KEY));

    store.stop();

    PowerMock.verifyAll();
}
项目:embulk-input-kafka    文件:KafkaInputColumns.java   
public void setOutputRecords(PageBuilder builder, ConsumerRecords<?, ?> records) {
    for(ConsumerRecord record: records) {
        for(int c = 0; c < columnHandler.length; c++) {
            columnHandler[c].setValue(builder, record, c);
        }
        builder.addRecord();
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:GlobalStreamThread.java   
void pollAndUpdate() {
    final ConsumerRecords<byte[], byte[]> received = consumer.poll(pollMs);
    for (ConsumerRecord<byte[], byte[]> record : received) {
        stateMaintainer.update(record);
    }
    final long now = time.milliseconds();
    if (flushInterval >= 0 && now >= lastFlush + flushInterval) {
        stateMaintainer.flushState();
        lastFlush = now;
    }
}
项目:java-kafka-client    文件:TracingKafkaConsumer.java   
@Override
public ConsumerRecords<K, V> poll(long timeout) {
  ConsumerRecords<K, V> records = consumer.poll(timeout);

  for (ConsumerRecord<K, V> record : records) {
    buildAndFinishChildSpan(record);
  }

  return records;
}
项目:bullet-kafka    文件:KafkaSubscriberTest.java   
private ConsumerRecords<String, byte[]> makeConsumerRecords(String randomID, Serializable message) {
    ConsumerRecord<String, byte[]> record = new ConsumerRecord<>("testMessage", 0, 0, randomID,
                                                                 SerializerDeserializer.toBytes(message));
    Map<TopicPartition, List<ConsumerRecord<String, byte[]>>> recordMap = new HashMap<>();
    recordMap.put(new TopicPartition("testMessage", 0), Collections.singletonList(record));
    return new ConsumerRecords<>(recordMap);
}
项目:java-kafka-client    文件:TracingKafkaTest.java   
private void createConsumer(final CountDownLatch latch, final Integer key)
    throws InterruptedException {
  ExecutorService executorService = Executors.newSingleThreadExecutor();

  final Map<String, Object> consumerProps = KafkaTestUtils
      .consumerProps("sampleRawConsumer", "false", embeddedKafka);
  consumerProps.put("auto.offset.reset", "earliest");

  executorService.execute(() -> {
    KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
    TracingKafkaConsumer<Integer, String> tracingKafkaConsumer = new TracingKafkaConsumer<>(
        kafkaConsumer, mockTracer);

    tracingKafkaConsumer.subscribe(Collections.singletonList("messages"));

    while (latch.getCount() > 0) {
      ConsumerRecords<Integer, String> records = tracingKafkaConsumer.poll(100);
      for (ConsumerRecord<Integer, String> record : records) {
        SpanContext spanContext = TracingKafkaUtils
            .extractSpanContext(record.headers(), mockTracer);
        assertNotNull(spanContext);
        assertEquals("test", record.value());
        if (key != null) {
          assertEquals(key, record.key());
        }
        tracingKafkaConsumer.commitSync();
        latch.countDown();
      }
    }
    kafkaConsumer.close();
  });

  assertTrue(latch.await(30, TimeUnit.SECONDS));

}
项目:kafka-0.11.0.0-src-with-comment    文件:WorkerSinkTaskThreadedTest.java   
private Capture<Collection<SinkRecord>> expectPolls(final long pollDelayMs) throws Exception {
    // Stub out all the consumer stream/iterator responses, which we just want to verify occur,
    // but don't care about the exact details here.
    EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer(
            new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                @Override
                public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
                    // "Sleep" so time will progress
                    time.sleep(pollDelayMs);
                    ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
                            Collections.singletonMap(
                                    new TopicPartition(TOPIC, PARTITION),
                                    Arrays.asList(
                                            new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0L, 0, 0, RAW_KEY, RAW_VALUE)
                                    )));
                    recordsReturned++;
                    return records;
                }
            });
    EasyMock.expect(keyConverter.toConnectData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes();
    EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes();

    final Capture<SinkRecord> recordCapture = EasyMock.newCapture();
    EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture))).andAnswer(new IAnswer<SinkRecord>() {
        @Override
        public SinkRecord answer() {
            return recordCapture.getValue();
        }
    }).anyTimes();

    Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL);
    sinkTask.put(EasyMock.capture(capturedRecords));
    EasyMock.expectLastCall().anyTimes();
    return capturedRecords;
}
项目:kafka-0.11.0.0-src-with-comment    文件:ProcessorStateManagerTest.java   
@Test
public void shouldWriteCheckpointForStandbyReplica() throws Exception {
    final ProcessorStateManager stateMgr = new ProcessorStateManager(
        taskId,
        noPartitions,
        true, // standby
        stateDirectory,
        Collections.singletonMap(persistentStore.name(), persistentStoreTopicName),
        changelogReader,
        false);

    stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
    final byte[] bytes = Serdes.Integer().serializer().serialize("", 10);
    stateMgr.updateStandbyStates(persistentStorePartition,
                                 Collections.singletonList(
                                         new ConsumerRecord<>(persistentStorePartition.topic(),
                                                              persistentStorePartition.partition(),
                                                              888L,
                                                              bytes,
                                                              bytes)));

    stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap());

    final Map<TopicPartition, Long> read = checkpoint.read();
    assertThat(read, equalTo(Collections.singletonMap(persistentStorePartition, 889L)));

}
项目:kafka-0.11.0.0-src-with-comment    文件:EosTestDriver.java   
public static void verify(final String kafka) {
    ensureStreamsApplicationDown(kafka);

    final Map<TopicPartition, Long> committedOffsets = getCommittedOffsets(kafka);

    final Properties props = new Properties();
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));

    try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
        final List<TopicPartition> partitions = getAllPartitions(consumer, "data", "echo", "min", "sum");
        consumer.assign(partitions);
        consumer.seekToBeginning(partitions);

        final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition
            = getOutputRecords(consumer, committedOffsets);

        truncate("data", recordPerTopicPerPartition, committedOffsets);

        verifyMin(recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("min"));
        verifySum(recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("sum"));

        verifyAllTransactionFinished(consumer, kafka);

        // do not modify: required test output
        System.out.println("ALL-RECORDS-DELIVERED");
    } catch (final Exception e) {
        e.printStackTrace(System.err);
        System.out.println("FAILED");
    }
}
项目:xm-uaa    文件:SystemQueueConsumer.java   
/**
 * Consume tenant command event message.
 * @param message the tenant command event message
 */
@Retryable(maxAttemptsExpression = "${application.retry.max-attempts}",
    backoff = @Backoff(delayExpression = "${application.retry.delay}",
        multiplierExpression = "${application.retry.multiplier}"))
public void consumeEvent(ConsumerRecord<String, String> message) {
    MDCUtil.put();
    try {
        log.info("Input message {}", message);
        ObjectMapper mapper = new ObjectMapper()
            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        mapper.registerModule(new JavaTimeModule());
        try {
            SystemEvent event = mapper.readValue(message.value(), SystemEvent.class);
            String command = event.getEventType();
            String userKey = String.valueOf(event.getData().get(Constants.USER_KEY));
            TenantContext.setCurrent(event.getTenantInfo());
            if (Constants.UPDATE_ACCOUNT_EVENT_TYPE.equalsIgnoreCase(command)) {
                log.info("Start to update account for userKey='{}'", userKey);
                User user = userService.getUser(userKey);
                if (user == null) {
                    log.error("Failed to update account. User with userKey='{}' does not exists.", userKey);
                } else {
                    SystemEventMapper.toUser(event, user);
                    userService.saveUser(user);
                }
            }
        } catch (IOException e) {
            log.error("Kafka message has incorrect format ", e);
        }
    } finally {
        MDCUtil.remove();
    }
}
项目:xm-uaa    文件:SystemQueueConsumerUnitTest.java   
@Test
public void updateProfile() {
    when(userService.getUser(USER_KEY)).thenReturn(new User());
    doNothing().when(userService).saveUser(anyObject());
    consumer.consumeEvent(new ConsumerRecord<>("test", 0, 0, "", UPDATE_ACCOUNT_EVENT));

    verify(userService).getUser(USER_KEY);
    verify(userService).saveUser(anyObject());
}
项目:xm-uaa    文件:SystemQueueConsumerUnitTest.java   
@Test
public void updateNotExistsProfile() {
    when(userService.getUser(USER_KEY)).thenReturn(null);
    doNothing().when(userService).saveUser(anyObject());
    consumer.consumeEvent(new ConsumerRecord<>("test", 0, 0, "", UPDATE_ACCOUNT_EVENT));

    verify(userService).getUser(USER_KEY);
    verify(userService, times(0)).saveUser(anyObject());
}
项目:WiFiProbeAnalysis    文件:KafkaConsumerForHive.java   
public synchronized void writeFileToHadoop(List<ConsumerRecord<String, String>> buffer) {


        Configuration configuration = new Configuration();
        String str;
        StringBuffer stringBuffer = new StringBuffer();
        try {

            FileSystem fileSystem = FileSystem.get(configuration);
            Path path = new Path("/user/hive/output/data.dat");
            FSDataOutputStream fsDataOutputStream = fileSystem.create(path);

            //fileWriter = new FileWriter(file,false);
            //printWriter = new PrintWriter(fileWriter);
            for (int i = 0; i < buffer.size(); i++) {
                str = buffer.get(i).value() + "\t" + buffer.get(i).value() + "\n";
                stringBuffer.append(str);
                //printWriter.println(buffer.get(i).value()   + "\t" + buffer.get(i).value());
            }
            fsDataOutputStream.write(stringBuffer.toString().getBytes(),0,stringBuffer.toString().getBytes().length);
            fsDataOutputStream.flush();
            fsDataOutputStream.close();
            stringBuffer.delete(0,stringBuffer.length());
            insertIntoHive();//存入hive中
            //printWriter.flush();

        } catch (IOException e) {

        }

    }
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerInterceptorsTest.java   
@Override
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
    onConsumeCount++;
    if (throwExceptionOnConsume)
        throw new KafkaException("Injected exception in FilterConsumerInterceptor.onConsume.");

    // filters out topic/partitions with partition == FILTER_PARTITION
    Map<TopicPartition, List<ConsumerRecord<K, V>>> recordMap = new HashMap<>();
    for (TopicPartition tp : records.partitions()) {
        if (tp.partition() != filterPartition)
            recordMap.put(tp, records.records(tp));
    }
    return new ConsumerRecords<K, V>(recordMap);
}
项目:Lagerta    文件:LeadStateLoader.java   
boolean isAlive(ConsumerRecords<?, ?> records) {
    if (endOffset == null) {
        endOffset = recordStream(records)
                .findFirst()
                .map(record -> new TopicPartition(record.topic(), record.partition()))
                .map(tp -> consumer.endOffsets(Collections.singletonList(tp)).get(tp))
                .orElse(null);
    }
    alive = endOffset != null && recordStream(records)
            .map(ConsumerRecord::offset)
            .max(Long::compareTo)
            .map(f -> f >= endOffset)
            .orElse(false);
    return alive;
}
项目:reactive-components    文件:KafkaSource.java   
@SuppressWarnings("unchecked")
private void handleRecord(ConsumerRecord<String, T> record) {
    System.out.println("Handling message " + record);
    if (type == ConsumerRecord.class) {
        subscriber.onNext((T)record);
    } else {
        subscriber.onNext((T)record.value());
    }
    consumer.commitAsync();
    sent.incrementAndGet();
}
项目:apache-kafka-demos    文件:SimpleConsumer.java   
public static void main(String[] args) throws InterruptedException {

        Properties props = new Properties();
        props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(GROUP_ID_CONFIG, "k");
        props.put(ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.LongDeserializer");
        props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");


        try {
            KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);

            consumer.subscribe(Arrays.asList("produktion"), new LogRebalanceListener());

            while (true) {

                ConsumerRecords<Long, String> records = consumer.poll(1000);
                if (records.count() == 0)
                    continue;

                System.out.print("Partitions: " + records.partitions());
                System.out.println(" Count: " + records.count());

                for (ConsumerRecord<Long, String> record : records)
                    System.out.printf("partition=%d, offset= %d, key= %s, value= %s\n", record.partition(), record.offset(), record.key(), record.value());

            }
        } catch (RuntimeException e) {
            System.out.println("e = " + e);
        } finally {
            System.out.println("Closing!");
        }
    }
项目:bireme    文件:MaxwellPipeLine.java   
@Override
public boolean transform(ConsumerRecord<String, String> change, Row row)
    throws BiremeException {
  MaxwellRecord record = new MaxwellRecord(change.value());

  if (filter(record)) {
    return false;
  }

  Table table = cxt.tablesInfo.get(getMappedTableName(record));

  row.type = record.type;
  row.produceTime = record.produceTime;
  row.originTable = getOriginTableName(record);
  row.mappedTable = getMappedTableName(record);
  row.keys = formatColumns(record, table, table.keyNames, false);

  if (row.type == RowType.INSERT || row.type == RowType.UPDATE) {
    row.tuple = formatColumns(record, table, table.columnName, false);
  }

  if (row.type == RowType.UPDATE) {
    row.oldKeys = formatColumns(record, table, table.keyNames, true);

    if (row.keys.equals(row.oldKeys)) {
      row.oldKeys = null;
    }
  }

  return true;
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamTaskTest.java   
@Test
public void shouldStartNewTransactionOnCommitIfEosEnabled() throws Exception {
    final MockProducer producer = new MockProducer();
    task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
        eosConfig, streamsMetrics, stateDirectory, null, time, producer);

    task.addRecords(partition1, Collections.singletonList(
        new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
    task.process();

    task.commit();
    assertTrue(producer.transactionInFlight());
}
项目:scalable-coffee-shop    文件:EventConsumer.java   
private void consume() {
    ConsumerRecords<String, CoffeeEvent> records = consumer.poll(Long.MAX_VALUE);
    for (ConsumerRecord<String, CoffeeEvent> record : records) {
        eventConsumer.accept(record.value());
    }
    consumer.commitSync();
}
项目:push    文件:KafkaConsumer.java   
@KafkaListener(topics = {"test-topic"})
public void listen(ConsumerRecord<?, ?> record) {
    logger.debug("listen1 ");
    Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    if (kafkaMessage.isPresent()) {
        Object message = kafkaMessage.get();
        logger.debug("listen1 : {}", message);
    }
}
项目:proteus-consumer-couchbase    文件:Runner.java   
public void doWork(Properties properties, ProteusTask task) {

        topicsList.add(ConsumerUtils.getTopicName(runnerProperties.getProperty("eu.proteus.kafkaTopic")));
        properties.put("bootstrap.servers", properties.get("com.treelogic.proteus.kafka.bootstrapServers"));
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put("value.deserializer", ProteusSerializer.class.getName());
        properties.put("group.id",
                "proteus-" + ConsumerUtils.getTopicName(runnerProperties.getProperty("eu.proteus.kafkaTopic")));
        properties.put("max.poll.records", 100);
        properties.put("session.timeout.ms", 60000);
        properties.put("request.timeout.ms", 80000);
        properties.put("fetch.max.wati.ms", 60000);
        properties.put("auto.offset.reset", "latest");

        kafkaConsumer = new KafkaConsumer<>(properties, new IntegerDeserializer(), new ProteusSerializer());
        kafkaConsumer.subscribe(topicsList);

        try {
            while (true) {
                ConsumerRecords<Integer, Measurement> records = kafkaConsumer.poll(Long.MAX_VALUE);
                for (ConsumerRecord<Integer, Measurement> record : records) {
                    logger.info("Task " + this.getClass().getSimpleName() + " doing work for coil "
                            + record.value().getCoilID() + " on topic "
                            + ConsumerUtils.getTopicName(runnerProperties.getProperty("eu.proteus.kafkaTopic")));
                    task.doWork(record.key(), record.value(), proteusBucket, topicsList);
                }

            }
        } finally {
            System.out.println("Cerrariamos la ejecución del hilo < "
                    + this.runnerProperties.getProperty("eu.proteus.kafkaTopic") + " >");
        }

    }
项目:stroom-stats    文件:EmbeddedKafkaIT.java   
/**
     * Start a consumer that subscribes to all embeddedKafka topics to help with debugging.
     * Dumps out the key/msg as byte arrays given that the object types may vary
     */
    private void startAllTopicsConsumer(Map<String, Object> consumerProps) throws InterruptedException {

        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(() -> {
            KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerProps,
                    Serdes.ByteArray().deserializer(),
                    Serdes.ByteArray().deserializer());
            try {
                kafkaEmbedded.consumeFromAllEmbeddedTopics(kafkaConsumer);
            } catch (Exception e) {
                throw new RuntimeException(String.format("Error subscribing to all embedded topics"), e);
            }

            try {
                while (true) {
                    ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100);
//                    ConsumerRecords<StatEventKey, StatAggregate> records = kafkaConsumer.poll(100);
//                    for (ConsumerRecord<StatEventKey, StatAggregate> record : records) {
                    for (ConsumerRecord<byte[], byte[]> record : records) {
                        LOGGER.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",
                                record.topic(), record.partition(), record.offset(), record.key(), record.value());
                    }
                }
            } finally {
                kafkaConsumer.close();
            }
        });
    }