Java 类org.apache.kafka.clients.consumer.Consumer 实例源码

项目:nighthawk    文件:ListenableConsumerFactoryBean.java   
@Override
@SuppressWarnings("unchecked")
public void afterPropertiesSet() throws Exception {
    if (topics == null && topicPatternString == null) {
        throw new IllegalArgumentException("topic info must not be null");
    }
    Assert.notEmpty(configs, "configs must not be null");
    Assert.notNull(payloadListener, "payloadListener must be null");
    String valueDeserializerKlass = (String) configs.get("value.deserializer");
    configs.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    Consumer<String, byte[]> consumer = new KafkaConsumer<>(configs);

    Deserializer valueDeserializer = createDeserializer(valueDeserializerKlass);
    valueDeserializer.configure(configs, false);

    if (topics != null) {
        listenableConsumer =
                new ListenableTracingConsumer<>(consumer, Arrays.asList(topics), valueDeserializer);
    } else {
        listenableConsumer =
                new ListenableTracingConsumer<>(consumer, Pattern.compile(topicPatternString), valueDeserializer);
    }
    if (payloadListener != null) {
        listenableConsumer.addListener(payloadListener);
    }
    listenableConsumer.start();
}
项目:DBus    文件:DataShardsSplittingSpout.java   
private void loadRunningConf(String reloadMsgJson) {
    String notifyEvtName = reloadMsgJson == null ? "loaded" : "reloaded";
    try {
        //加载zk中的配置信息
        this.confMap = FullPullHelper.loadConfProps(zkConnect, topologyId, zkTopoRoot, Constants.ZkTopoConfForFullPull.FULL_PULL_SRC_TOPIC);
        this.MAX_FLOW_THRESHOLD = (Integer) confMap.get(FullPullHelper.RUNNING_CONF_KEY_MAX_FLOW_THRESHOLD);
        LOG.info("MAX_FLOW_THRESHOLD is {} on DataShardsSplittingSpout.loadRunningConf", MAX_FLOW_THRESHOLD);
        this.commonProps = (Properties) confMap.get(FullPullHelper.RUNNING_CONF_KEY_COMMON);
        this.dsName = commonProps.getProperty(Constants.ZkTopoConfForFullPull.DATASOURCE_NAME);
        this.fullPullSrcTopic = commonProps.getProperty(Constants.ZkTopoConfForFullPull.FULL_PULL_SRC_TOPIC);
        this.consumer = (Consumer<String, byte[]>) confMap.get(FullPullHelper.RUNNING_CONF_KEY_CONSUMER);
        this.zkService = (ZkService) confMap.get(FullPullHelper.RUNNING_CONF_KEY_ZK_SERVICE);

        LOG.info("Running Config is " + notifyEvtName + " successfully for DataShardsSplittingSpout!");
    } catch (Exception e) {
        LOG.error(notifyEvtName + "ing running configuration encountered Exception!", e);
    } finally {
        FullPullHelper.saveReloadStatus(reloadMsgJson, "splitting-spout", true, zkConnect);
    }
}
项目:DBus    文件:DataPullingSpout.java   
private void loadRunningConf(String reloadMsgJson) {
    try {
        this.confMap = FullPullHelper.loadConfProps(zkConnect, topologyId, zkTopoRoot, Constants.ZkTopoConfForFullPull.FULL_PULL_MEDIANT_TOPIC);
        this.MAX_FLOW_THRESHOLD = (Integer) confMap.get(FullPullHelper.RUNNING_CONF_KEY_MAX_FLOW_THRESHOLD);
        this.commonProps = (Properties) confMap.get(FullPullHelper.RUNNING_CONF_KEY_COMMON);
        this.dsName = commonProps.getProperty(Constants.ZkTopoConfForFullPull.DATASOURCE_NAME);
        this.zkService = (ZkService) confMap.get(FullPullHelper.RUNNING_CONF_KEY_ZK_SERVICE);
        this.consumer =  (Consumer<String, byte[]>)confMap.get(FullPullHelper.RUNNING_CONF_KEY_CONSUMER);

        String notifyEvtName = reloadMsgJson == null ? "loaded" : "reloaded";
        LOG.info("Running Config is " + notifyEvtName + " successfully for DataPullingSpout!");
    } catch (Exception e) {
        LOG.error("Loading running configuration encountered Exception!", e);
        throw e;
    } finally {
        FullPullHelper.saveReloadStatus(reloadMsgJson, "pulling-spout", true, zkConnect);
    }
}
项目:DBus    文件:FullPullerPerfChecker.java   
/**
 * createConsumer - create a new consumer
 * @return
 * @throws Exception
 */
private Consumer<String, String> createConsumer() throws Exception {
    Properties props = ConfUtils.getProps(CONSUMER_PROPS);
    Consumer<String, String> consumer = new KafkaConsumer<>(props);

    // Seek to end automatically
    List<TopicPartition> pts = topics.stream().map(s -> new TopicPartition(s, 0)).collect(Collectors.toList());
    consumer.assign(pts);
    if(rollBack==0){
        consumer.seekToEnd(pts);  
    }else{
        for (TopicPartition topicPartition : pts) {
            consumer.seek(topicPartition, consumer.position(topicPartition)-rollBack);
            logger.info("Consumer seeked to -500000 :"+consumer.position(topicPartition));
        }    
    }  
    return consumer;
}
项目:DBus    文件:KafkaReader.java   
/**
 * createConsumer - create a new consumer
 * @return
 * @throws Exception
 */
private Consumer<String, String> createConsumer() throws Exception {

    // Seek to end automatically
    TopicPartition dataTopicPartition = new TopicPartition(topicName, 0);
    List<TopicPartition> topics = Arrays.asList(dataTopicPartition);

    Properties props = ConfUtils.getProps(CONSUMER_PROPS);
    Consumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.assign(topics);

    if(offset == -1){
        consumer.seekToEnd(topics);
        logger.info("Consumer seek to end");
    }else{
        consumer.seek(dataTopicPartition, offset);
        logger.info(String.format("read changed as offset: %s", consumer.position(dataTopicPartition)));
    }
    return consumer;
}
项目:Lagerta    文件:PublisherKafkaService.java   
public void seekToTransaction(DataRecoveryConfig config, long transactionId, KafkaFactory kafkaFactory,
    String groupId) {
    String topic = config.getLocalTopic();
    Properties consumerProperties = PropertiesUtil.propertiesForGroup(config.getConsumerConfig(), groupId);

    try (Consumer<ByteBuffer, ByteBuffer> consumer = kafkaFactory.consumer(consumerProperties)) {
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
        Map<TopicPartition, Long> seekMap = new HashMap<>(partitionInfos.size());

        for (PartitionInfo partitionInfo : partitionInfos) {
            seekMap.put(new TopicPartition(topic, partitionInfo.partition()), transactionId);
        }
        consumer.assign(seekMap.keySet());
        Map<TopicPartition, OffsetAndTimestamp> foundOffsets = consumer.offsetsForTimes(seekMap);
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();

        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : foundOffsets.entrySet()) {
            if (entry.getValue() != null) {
                offsetsToCommit.put(entry.getKey(), new OffsetAndMetadata(entry.getValue().offset()));
            }
        }
        consumer.commitSync(offsetsToCommit);
    }
}
项目:Lagerta    文件:LocalLeadContextLoader.java   
private void pollCommunicateOnce(Consumer<ByteBuffer, ByteBuffer> consumer) {
    ConsumerRecords<ByteBuffer, ByteBuffer> records = consumer.poll(POLL_TIMEOUT);

    if (records.isEmpty()) {
        if (!stalled && checkStalled(consumer)) {
            LOGGER.info("[I] Loader stalled {} / {}", f(leadId), f(localLoaderId));
            stalled = true;
            lead.notifyLocalLoaderStalled(leadId, localLoaderId);
        }
        // ToDo: Consider sending empty messages for heartbeat sake.
        return;
    }
    if (stalled) {
        stalled = false;
    }
    MutableLongList committedIds = new LongArrayList(records.count());

    for (ConsumerRecord<ByteBuffer, ByteBuffer> record : records) {
        committedIds.add(record.timestamp());
    }
    committedIds.sortThis();
    lead.updateInitialContext(localLoaderId, committedIds);
    consumer.commitSync();
}
项目:Lagerta    文件:ReceivedTransactionsListenerImpl.java   
@Override
public void execute(ServiceContext ctx) throws Exception {
    active = true;
    receivedIds = new GridConcurrentHashSet<>();

    Properties config = new Properties();
    config.putAll(dataRecoveryConfig.getConsumerConfig());
    config.put(ConsumerConfig.GROUP_ID_CONFIG, ReceivedTransactionsListenerImpl.class.getSimpleName());
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    try (Consumer<ByteBuffer, ByteBuffer> consumer = kafkaFactory.consumer(config)) {
        consumer.subscribe(Arrays.asList(dataRecoveryConfig.getRemoteTopic(), dataRecoveryConfig.getReconciliationTopic()));
        while (active) {
            ConsumerRecords<ByteBuffer, ByteBuffer> poll = consumer.poll(500);
            for (ConsumerRecord<ByteBuffer, ByteBuffer> record : poll) {
                TransactionMetadata metadata = serializer.deserialize(record.key());
                receivedIds.add(metadata.getTransactionId());
            }
            consumer.commitSync();
        }
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:StandbyTask.java   
/**
 * Create {@link StandbyTask} with its assigned partitions
 *
 * @param id             the ID of this task
 * @param applicationId  the ID of the stream processing application
 * @param partitions     the collection of assigned {@link TopicPartition}
 * @param topology       the instance of {@link ProcessorTopology}
 * @param consumer       the instance of {@link Consumer}
 * @param config         the {@link StreamsConfig} specified by the user
 * @param metrics        the {@link StreamsMetrics} created by the thread
 * @param stateDirectory the {@link StateDirectory} created by the thread
 */
StandbyTask(final TaskId id,
            final String applicationId,
            final Collection<TopicPartition> partitions,
            final ProcessorTopology topology,
            final Consumer<byte[], byte[]> consumer,
            final ChangelogReader changelogReader,
            final StreamsConfig config,
            final StreamsMetrics metrics,
            final StateDirectory stateDirectory) {
    super(id, applicationId, partitions, topology, consumer, changelogReader, true, stateDirectory, null, config);

    // initialize the topology with its own context
    processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics);

    log.debug("{} Initializing", logPrefix);
    initializeStateStores();
    processorContext.initialized();
    checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed());
}
项目:kafka-0.11.0.0-src-with-comment    文件:GlobalStreamThread.java   
public GlobalStreamThread(final ProcessorTopology topology,
                          final StreamsConfig config,
                          final Consumer<byte[], byte[]> globalConsumer,
                          final StateDirectory stateDirectory,
                          final Metrics metrics,
                          final Time time,
                          final String threadClientId) {
    super(threadClientId);
    this.time = time;
    this.config = config;
    this.topology = topology;
    this.consumer = globalConsumer;
    this.stateDirectory = stateDirectory;
    long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
            (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + 1));
    this.streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId, Collections.singletonMap("client-id", threadClientId));
    this.cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics);
}
项目:kafka-0.11.0.0-src-with-comment    文件:IntegrationTestUtils.java   
/**
 * Wait until enough data (key-value records) has been consumed.
 *
 * @param consumerConfig     Kafka Consumer configuration
 * @param topic              Topic to consume from
 * @param expectedNumRecords Minimum number of expected records
 * @param waitTime           Upper bound in waiting time in milliseconds
 * @return All the records consumed, or null if no records are consumed
 * @throws InterruptedException
 * @throws AssertionError       if the given wait time elapses
 */
public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
                                                                              final String topic,
                                                                              final int expectedNumRecords,
                                                                              final long waitTime) throws InterruptedException {
    final List<KeyValue<K, V>> accumData = new ArrayList<>();
    try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
        final TestCondition valuesRead = new TestCondition() {
            @Override
            public boolean conditionMet() {
                final List<KeyValue<K, V>> readData =
                    readKeyValues(topic, consumer, waitTime, expectedNumRecords);
                accumData.addAll(readData);
                return accumData.size() >= expectedNumRecords;
            }
        };
        final String conditionDetails =
            "Expecting " + expectedNumRecords + " records from topic " + topic +
                " while only received " + accumData.size() + ": " + accumData;
        TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
    }
    return accumData;
}
项目:kafka-0.11.0.0-src-with-comment    文件:IntegrationTestUtils.java   
/**
 * Wait until enough data (value records) has been consumed.
 *
 * @param consumerConfig     Kafka Consumer configuration
 * @param topic              Topic to consume from
 * @param expectedNumRecords Minimum number of expected records
 * @param waitTime           Upper bound in waiting time in milliseconds
 * @return All the records consumed, or null if no records are consumed
 * @throws InterruptedException
 * @throws AssertionError       if the given wait time elapses
 */
public static <V> List<V> waitUntilMinValuesRecordsReceived(final Properties consumerConfig,
                                                            final String topic,
                                                            final int expectedNumRecords,
                                                            final long waitTime) throws InterruptedException {
    final List<V> accumData = new ArrayList<>();
    try (final Consumer<Object, V> consumer = createConsumer(consumerConfig)) {
        final TestCondition valuesRead = new TestCondition() {
            @Override
            public boolean conditionMet() {
                final List<V> readData =
                    readValues(topic, consumer, waitTime, expectedNumRecords);
                accumData.addAll(readData);
                return accumData.size() >= expectedNumRecords;
            }
        };
        final String conditionDetails =
            "Expecting " + expectedNumRecords + " records from topic " + topic +
                " while only received " + accumData.size() + ": " + accumData;
        TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
    }
    return accumData;
}
项目:kafka-0.11.0.0-src-with-comment    文件:IntegrationTestUtils.java   
/**
 * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from
 * are already configured in the consumer).
 *
 * @param topic          Kafka topic to read messages from
 * @param consumer       Kafka consumer
 * @param waitTime       Maximum wait time in milliseconds
 * @param maxMessages    Maximum number of messages to read via the consumer
 * @return The KeyValue elements retrieved via the consumer
 */
private static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic,
    final Consumer<K, V> consumer, final long waitTime, final int maxMessages) {
    final List<KeyValue<K, V>> consumedValues;
    consumer.subscribe(Collections.singletonList(topic));
    final int pollIntervalMs = 100;
    consumedValues = new ArrayList<>();
    int totalPollTimeMs = 0;
    while (totalPollTimeMs < waitTime &&
        continueConsuming(consumedValues.size(), maxMessages)) {
        totalPollTimeMs += pollIntervalMs;
        final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
        for (final ConsumerRecord<K, V> record : records) {
            consumedValues.add(new KeyValue<>(record.key(), record.value()));
        }
    }
    return consumedValues;
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamThreadTest.java   
TestStreamTask(final TaskId id,
               final String applicationId,
               final Collection<TopicPartition> partitions,
               final ProcessorTopology topology,
               final Consumer<byte[], byte[]> consumer,
               final Producer<byte[], byte[]> producer,
               final Consumer<byte[], byte[]> restoreConsumer,
               final StreamsConfig config,
               final StreamsMetrics metrics,
               final StateDirectory stateDirectory) {
    super(id,
        applicationId,
        partitions,
        topology,
        consumer,
        new StoreChangelogReader(restoreConsumer, Time.SYSTEM, 5000),
        config,
        metrics,
        stateDirectory,
        null,
        new MockTime(),
        producer);
}
项目:rmap    文件:KafkaUtils.java   
static void commitOffsets(Consumer<?, ?> consumer, Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
                          boolean async) {

    if (offsetsToCommit == null || offsetsToCommit.isEmpty()) {
        return;
    }

    OffsetCommitCallback callback = (offsets, exception) -> {
        if (exception != null) {
            LOG.warn("Unable to commit offsets for {} TopicPartition(s) {}: {}",
                    offsets.size(),
                    offsetsAsString(offsets),
                    exception.getMessage(),
                    exception);
        } else {
            LOG.debug("Successfully committed offset(s) for {} TopicPartition(s): {}",
                    offsets.size(), offsetsAsString(offsets));
        }
    };

    if (async) {
        consumer.commitAsync(offsetsToCommit, callback);
    } else {
        consumer.commitSync(offsetsToCommit);
    }
}
项目:rmap    文件:SaveOffsetOnRebalanceTest.java   
@Test
@SuppressWarnings("serial")
public void testOnPartitionsRevoked() throws Exception {
    String topic = "topic";
    int partition = 0;
    long offset = 21;
    TopicPartition tp = new TopicPartition(topic, partition);
    OffsetAndMetadata commitOffsetMd = new OffsetAndMetadata(offset, null);
    Consumer consumer = mock(Consumer.class);
    OffsetLookup lookup = mock(OffsetLookup.class);
    SaveOffsetOnRebalance underTest = new SaveOffsetOnRebalance(lookup, consumer);

    when(consumer.position(tp)).thenReturn(offset);

    underTest.onPartitionsRevoked(Collections.singleton(tp));

    verify(consumer).position(tp);
    verify(consumer).commitSync(new HashMap(){
            {
                put(tp, commitOffsetMd);
            }
    });
}
项目:rmap    文件:SaveOffsetOnRebalanceTest.java   
@Test
public void testOnPartitionsAssignedNegativeOffsetLookup() throws Exception {
    long offset = -1;
    String topic = "topic";
    int partition = 0;
    TopicPartition tp = new TopicPartition(topic, partition);

    Consumer consumer = mock(Consumer.class);
    OffsetLookup lookup = mock(OffsetLookup.class);
    SaveOffsetOnRebalance underTest = new SaveOffsetOnRebalance(lookup, consumer);

    when(lookup.lookupOffset(topic, partition, Seek.LATEST)).thenReturn(offset);

    underTest.onPartitionsAssigned(Collections.singleton(tp));

    verify(lookup).lookupOffset(topic, partition, Seek.LATEST);
    if (SaveOffsetOnRebalance.DEFAULT_SEEK_BEHAVIOR == Seek.EARLIEST) {
        verify(consumer).seekToBeginning(Collections.singleton(tp));
    } else {
        verify(consumer).seekToEnd(Collections.singleton(tp));
    }
    verify(consumer).position(tp);
}
项目:rmap    文件:SaveOffsetOnRebalanceTest.java   
private static void performOffsetLookupTest(long offset) {
    String topic = "topic";
    int partition = 0;
    TopicPartition tp = new TopicPartition(topic, partition);

    Consumer consumer = mock(Consumer.class);
    OffsetLookup lookup = mock(OffsetLookup.class);
    SaveOffsetOnRebalance underTest = new SaveOffsetOnRebalance(lookup, consumer);

    when(lookup.lookupOffset(topic, partition, Seek.LATEST)).thenReturn(offset);

    underTest.onPartitionsAssigned(Collections.singleton(tp));

    verify(lookup).lookupOffset(topic, partition, Seek.LATEST);
    verify(consumer).seek(tp, offset + 1);
    verify(consumer).position(tp);
}
项目:vertx-kafka-client    文件:KafkaReadStreamImpl.java   
@Override
public KafkaReadStream<K, V> subscribe(Set<String> topics, Handler<AsyncResult<Void>> completionHandler) {

  BiConsumer<Consumer<K, V>, Future<Void>> handler = (consumer, future) -> {
    consumer.subscribe(topics, this.rebalanceListener);
    this.startConsuming();
    if (future != null) {
      future.complete();
    }
  };

  if (this.closed.compareAndSet(true, false)) {
    this.start(handler, completionHandler);
  } else {
    this.submitTask(handler, completionHandler);
  }

  return this;
}
项目:vertx-kafka-client    文件:KafkaReadStreamImpl.java   
@Override
public KafkaReadStream<K, V> assign(Set<TopicPartition> partitions, Handler<AsyncResult<Void>> completionHandler) {

  BiConsumer<Consumer<K, V>, Future<Void>> handler = (consumer, future) -> {
    consumer.assign(partitions);
    this.startConsuming();
    if (future != null) {
      future.complete();
    }
  };

  if (this.closed.compareAndSet(true, false)) {
    this.start(handler, completionHandler);
  } else {
    this.submitTask(handler, completionHandler);
  }

  return this;
}
项目:incubator-pulsar    文件:ConsumerExample.java   
public static void main(String[] args) {
    String topic = "persistent://sample/standalone/ns/my-topic";

    Properties props = new Properties();
    props.put("bootstrap.servers", "pulsar://localhost:6650");
    props.put("group.id", "my-subscription-name");
    props.put("enable.auto.commit", "false");
    props.put("key.deserializer", IntegerDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());

    Consumer<Integer, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(topic));

    while (true) {
        ConsumerRecords<Integer, String> records = consumer.poll(100);
        records.forEach(record -> {
            log.info("Received record: {}", record);
        });

        // Commit last offset
        consumer.commitSync();
    }
}
项目:mobclip    文件:PropertyPersister.java   
@Override
public void run() {
    Consumer<String, Property> consumer = ConsumerFactory.getPropertyConsumer();
    consumer.subscribe(Arrays.asList(Topics.SAVE_PROPERTY));

    try {
        while (true) {
            ConsumerRecords<String, Property> records = consumer.poll(POLL_DELAY);
            LOGGER.log(Level.INFO, "records fetched to persist {0}", records.count());
            for (ConsumerRecord<String, Property> record : records) {
                Property property = record.value();
                propertyService.save(property);
            }
        }
    } catch (Exception e) {
        LOGGER.log(Level.SEVERE, null, e);
    } finally {
        consumer.close();
    }
}
项目:beam    文件:ConsumerSpEL.java   
/**
 * Look up the offset for the given partition by timestamp.
 * Throws RuntimeException if there are no messages later than timestamp or if this partition
 * does not support timestamp based offset.
 */
@SuppressWarnings("unchecked")
public long offsetForTime(Consumer<?, ?> consumer, TopicPartition topicPartition, Instant time) {

  checkArgument(hasOffsetsForTimes,
      "This Kafka Client must support Consumer.OffsetsForTimes().");

  Map<TopicPartition, Long> timestampsToSearch =
      ImmutableMap.of(topicPartition, time.getMillis());
  try {
    Map offsetsByTimes = (Map) offsetsForTimesMethod.invoke(consumer, timestampsToSearch);
    Object offsetAndTimestamp = Iterables.getOnlyElement(offsetsByTimes.values());

    if (offsetAndTimestamp == null) {
      throw new RuntimeException("There are no messages has a timestamp that is greater than or "
          + "equals to the target time or the message format version in this partition is "
          + "before 0.10.0, topicPartition is: " + topicPartition);
    } else {
      return (long) offsetGetterMethod.invoke(offsetAndTimestamp);
    }
  } catch (IllegalAccessException | InvocationTargetException e) {
    throw new RuntimeException(e);
  }

}
项目:beam    文件:KafkaIO.java   
@Override
public PCollection<Void> expand(PCollection<KV<K, V>> input) {

  int numShards = spec.getNumShards();
  if (numShards <= 0) {
    try (Consumer<?, ?> consumer = openConsumer(spec)) {
      numShards = consumer.partitionsFor(spec.getTopic()).size();
      LOG.info("Using {} shards for exactly-once writer, matching number of partitions "
               + "for topic '{}'", numShards, spec.getTopic());
    }
  }
  checkState(numShards > 0, "Could not set number of shards");

  return input
      .apply(Window.<KV<K, V>>into(new GlobalWindows()) // Everything into global window.
                 .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                 .discardingFiredPanes())
      .apply(String.format("Shuffle across %d shards", numShards),
             ParDo.of(new EOSReshard<K, V>(numShards)))
      .apply("Persist sharding", GroupByKey.<Integer, KV<K, V>>create())
      .apply("Assign sequential ids", ParDo.of(new EOSSequencer<K, V>()))
      .apply("Persist ids", GroupByKey.<Integer, KV<Long, KV<K, V>>>create())
      .apply(String.format("Write to Kafka topic '%s'", spec.getTopic()),
             ParDo.of(new KafkaEOWriter<>(spec, input.getCoder())));
}
项目:ksql    文件:IntegrationTestUtils.java   
/**
 * Wait until enough data (key-value records) has been consumed.
 *
 * @param consumerConfig     Kafka Consumer configuration
 * @param topic              Topic to consume from
 * @param expectedNumRecords Minimum number of expected records
 * @param waitTime           Upper bound in waiting time in milliseconds
 * @return All the records consumed, or null if no records are consumed
 * @throws InterruptedException
 * @throws AssertionError       if the given wait time elapses
 */
public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
                                                                              final String topic,
                                                                              final int expectedNumRecords,
                                                                              final long waitTime) throws InterruptedException {
  final List<KeyValue<K, V>> accumData = new ArrayList<>();
  try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
    final TestCondition valuesRead = new TestCondition() {
      @Override
      public boolean conditionMet() {
        final List<KeyValue<K, V>> readData =
                readKeyValues(topic, consumer, waitTime, expectedNumRecords);
        accumData.addAll(readData);
        return accumData.size() >= expectedNumRecords;
      }
    };
    final String conditionDetails =
            "Expecting " + expectedNumRecords + " records from topic " + topic +
                    " while only received " + accumData.size() + ": " + accumData;
    TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
  }
  return accumData;
}
项目:ksql    文件:IntegrationTestUtils.java   
/**
 * Wait until enough data (value records) has been consumed.
 *
 * @param consumerConfig     Kafka Consumer configuration
 * @param topic              Topic to consume from
 * @param expectedNumRecords Minimum number of expected records
 * @param waitTime           Upper bound in waiting time in milliseconds
 * @return All the records consumed, or null if no records are consumed
 * @throws InterruptedException
 * @throws AssertionError       if the given wait time elapses
 */
public static <V> List<V> waitUntilMinValuesRecordsReceived(final Properties consumerConfig,
                                                            final String topic,
                                                            final int expectedNumRecords,
                                                            final long waitTime) throws InterruptedException {
  final List<V> accumData = new ArrayList<>();
  try (final Consumer<Object, V> consumer = createConsumer(consumerConfig)) {
    final TestCondition valuesRead = new TestCondition() {
      @Override
      public boolean conditionMet() {
        final List<V> readData =
                readValues(topic, consumer, waitTime, expectedNumRecords);
        accumData.addAll(readData);
        return accumData.size() >= expectedNumRecords;
      }
    };
    final String conditionDetails =
            "Expecting " + expectedNumRecords + " records from topic " + topic +
                    " while only received " + accumData.size() + ": " + accumData;
    TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
  }
  return accumData;
}
项目:ksql    文件:IntegrationTestUtils.java   
/**
 * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from
 * are already configured in the consumer).
 *
 * @param topic          Kafka topic to read messages from
 * @param consumer       Kafka consumer
 * @param waitTime       Maximum wait time in milliseconds
 * @param maxMessages    Maximum number of messages to read via the consumer
 * @return The KeyValue elements retrieved via the consumer
 */
private static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic,
                                                         final Consumer<K, V> consumer, final long waitTime, final int maxMessages) {
  final List<KeyValue<K, V>> consumedValues;
  consumer.subscribe(Collections.singletonList(topic));
  final int pollIntervalMs = 100;
  consumedValues = new ArrayList<>();
  int totalPollTimeMs = 0;
  while (totalPollTimeMs < waitTime &&
          continueConsuming(consumedValues.size(), maxMessages)) {
    totalPollTimeMs += pollIntervalMs;
    final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
    for (final ConsumerRecord<K, V> record : records) {
      consumedValues.add(new KeyValue<>(record.key(), record.value()));
    }
  }
  return consumedValues;
}
项目:kafka-tests    文件:SeekingConsumer.java   
@Override
public void run() {
    logger.info("Worker thread started");

    try (Consumer<String, Integer> consumer = new KafkaConsumer<>(configuration, new StringDeserializer(), new IntegerDeserializer())) {
        SeekingConsumerLogic logic = new SeekingConsumerLogic(consumer, stateDao, messagesToChangeState, percentFailureProbability);
        consumer.subscribe(Collections.singletonList(topic), logic);

        while (!finish.get()) {
            ConsumerRecords<String, Integer> records = consumer.poll(pollTimeout.toMillis());

            long startTime = System.nanoTime();
            logic.processMessages(records);
            long duration = System.nanoTime() - startTime;

            logger.debug("Processing of poll batch finished: {} messages, {} ms", records.count(), TimeUnit.NANOSECONDS.toMillis(duration));
        }

        logic.optionallyCommitAllOffsets();
    } catch (Exception e) {
        logger.error("Unexpected exception occurred: {}", e.toString(), e);
    }

    logger.info("Worker thread stopped");
}
项目:incubator-rya    文件:KafkaTestUtil.java   
/**
 * Create a {@link Consumer} that has a unique group ID and reads everything from a topic that is hosted within an
 * embedded instance of Kafka starting at the earliest point by default.
 *
 * @param kafka - The Kafka rule used to connect to the embedded Kafkfa instance. (not null)
 * @param keyDeserializerClass - Deserializes the keys. (not null)
 * @param valueDeserializerClass - Deserializes the values. (not null)
 * @return A {@link Consumer} that can be used to read records from a topic.
 */
public static <K, V> Consumer<K, V> fromStartConsumer(
        final KafkaTestInstanceRule kafka,
        final Class<? extends Deserializer<K>> keyDeserializerClass,
        final Class<? extends Deserializer<V>> valueDeserializerClass) {
    requireNonNull(kafka);
    requireNonNull(keyDeserializerClass);
    requireNonNull(valueDeserializerClass);

    final Properties props = kafka.createBootstrapServerConfig();
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
    props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass.getName());
    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass.getName());
    return new KafkaConsumer<>(props);
}
项目:incubator-rya    文件:KafkaTestUtil.java   
/**
 * Polls a {@link Consumer> until it has either polled too many times without hitting the target number
 * of results, or it hits the target number of results.
 *
 * @param pollMs - How long each poll could take.
 * @param pollIterations - The maximum number of polls that will be attempted.
 * @param targetSize - The number of results to read before stopping.
 * @param consumer - The consumer that will be polled.
 * @return The results that were read frmo the consumer.
 * @throws Exception If the poll failed.
 */
public static <K, V> List<V> pollForResults(
        final int pollMs,
        final int pollIterations,
        final int targetSize,
        final Consumer<K, V> consumer) throws Exception {
    requireNonNull(consumer);

    final List<V> values = new ArrayList<>();

    int i = 0;
    while(values.size() < targetSize && i < pollIterations) {
        for(final ConsumerRecord<K, V> record : consumer.poll(pollMs)) {
            values.add( record.value() );
        }
        i++;
    }

    return values;
}
项目:incubator-rya    文件:RunQueryCommandIT.java   
@Before
public void setup() {
    // Make sure the topic that the change log uses exists.
    final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
    kafka.createTopic(changeLogTopic);

    // Setup the QueryRepository used by the test.
    final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
    final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
    final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
    queryRepo = new InMemoryQueryRepository(changeLog);

    // Initialize the Statements Producer and the Results Consumer.
    stmtProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class);
    resultConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class);
}
项目:incubator-rya    文件:DeleteQueryCommandIT.java   
/**
 * This test simulates executing many commands and each of them use their own InMemoryQueryRepository. We need
 * to re-create the repo outside of the command to ensure it has the most up to date values inside of it.
 *
 * @param ryaInstance - The rya instance the repository is connected to. (not null)
 * @param createTopic - Set this to true if the topic doesn't exist yet.
 */
private QueryRepository makeQueryRepository(final String ryaInstance, final boolean createTopic) {
    requireNonNull(ryaInstance);

    // Make sure the topic that the change log uses exists.
    final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance);
    if(createTopic) {
        kafka.createTopic(changeLogTopic);
    }

    // Setup the QueryRepository used by the test.
    final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class);
    final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class);
    final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic);
    return new InMemoryQueryRepository(changeLog);
}
项目:incubator-rya    文件:KafkaQueryChangeLogFactory.java   
/**
 * Creates an instance of {@link KafkaQueryChangeLog} using a new {@link Producer} and {@link Consumer}.
 *
 * @param bootstrapServers - Indicates which instance of Kafka that will be connected to. (not null)
 * @param topic - The topic the QueryChangeLog is persisted to. (not null)
 * @return A new instance of {@link KafkaQueryChangeLog}.
 */
public static KafkaQueryChangeLog make(
        final String bootstrapServers,
        final String topic) {
    requireNonNull(bootstrapServers);
    requireNonNull(topic);

    final Properties producerProperties = new Properties();
    producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName());

    final Properties consumerProperties = new Properties();
    consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName());

    final Producer<?, QueryChange> producer = new KafkaProducer<>(producerProperties);
    final Consumer<?, QueryChange> consumer = new KafkaConsumer<>(consumerProperties);
    return new KafkaQueryChangeLog(producer, consumer, topic);
}
项目:core-ng-project    文件:MessageProcessor.java   
private <T> void consume(String topic, ConsumerRecords<String, byte[]> records, JSONReader<T> reader, java.util.function.Consumer<List<T>> consumer) {
    int messageSize = 0;
    List<T> messages = new ArrayList<>();
    for (ConsumerRecord<String, byte[]> record : records.records(topic)) {
        byte[] body = record.value();
        messages.add(reader.fromJSON(body));
        messageSize += body.length;
    }
    if (messages.isEmpty()) return;

    StopWatch watch = new StopWatch();
    try {
        consumer.accept(messages);
    } finally {
        long elapsedTime = watch.elapsedTime();
        logger.info("consume messages, topic={}, count={}, size={}, elapsedTime={}", topic, messages.size(), messageSize, elapsedTime);
    }
}
项目:nakadi    文件:NakadiKafkaConsumer.java   
public NakadiKafkaConsumer(
        final Consumer<byte[], byte[]> kafkaConsumer,
        final List<KafkaCursor> kafkaCursors,
        final Map<TopicPartition, Timeline> timelineMap,
        final long pollTimeout) {
    this.kafkaConsumer = kafkaConsumer;
    this.pollTimeout = pollTimeout;
    this.timelineMap = timelineMap;
    // define topic/partitions to consume from
    final Map<TopicPartition, KafkaCursor> topicCursors = kafkaCursors.stream().collect(
            Collectors.toMap(
                    cursor -> new TopicPartition(cursor.getTopic(), cursor.getPartition()),
                    cursor -> cursor,
                    (cursor1, cursor2) -> cursor2
            ));
    kafkaConsumer.assign(new ArrayList<>(topicCursors.keySet()));
    topicCursors.forEach((topicPartition, cursor) -> kafkaConsumer.seek(topicPartition, cursor.getOffset()));
}
项目:metron    文件:KafkaServiceImpl.java   
@Override
public KafkaTopic getTopic(final String name) {
  KafkaTopic kafkaTopic = null;
  if (listTopics().contains(name)) {
    try (Consumer<String, String> consumer = kafkaConsumerFactory.createConsumer()) {
      final List<PartitionInfo> partitionInfos = consumer.partitionsFor(name);
      if (partitionInfos.size() > 0) {
        final PartitionInfo partitionInfo = partitionInfos.get(0);
        kafkaTopic = new KafkaTopic();
        kafkaTopic.setName(name);
        kafkaTopic.setNumPartitions(partitionInfos.size());
        kafkaTopic.setReplicationFactor(partitionInfo.replicas().length);
      }
    }
  }
  return kafkaTopic;
}
项目:metron    文件:KafkaServiceImpl.java   
@Override
public String getSampleMessage(final String topic) {
  String message = null;
  if (listTopics().contains(topic)) {
    try (Consumer<String, String> kafkaConsumer = kafkaConsumerFactory.createConsumer()) {
      kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream()
        .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
        .collect(Collectors.toList()));

      kafkaConsumer.assignment().stream()
        .filter(p -> (kafkaConsumer.position(p) - 1) >= 0)
        .forEach(p -> kafkaConsumer.seek(p, kafkaConsumer.position(p) - 1));

      final ConsumerRecords<String, String> records = kafkaConsumer.poll(KAFKA_CONSUMER_TIMEOUT);
      message = records.isEmpty() ? null : records.iterator().next().value();
      kafkaConsumer.unsubscribe();
    }
  }
  return message;
}
项目:incubator-gobblin    文件:KafkaSimpleStreamingSource.java   
static public Consumer getKafkaConsumer(Config config) {
  List<String> brokers = ConfigUtils.getStringList(config, ConfigurationKeys.KAFKA_BROKERS);
  Properties props = new Properties();
  props.put("bootstrap.servers", Joiner.on(",").join(brokers));
  props.put("group.id", ConfigUtils.getString(config, ConfigurationKeys.JOB_NAME_KEY, StringUtils.EMPTY));
  props.put("enable.auto.commit", "false");
  Preconditions.checkArgument(config.hasPath(TOPIC_KEY_DESERIALIZER));
  props.put("key.deserializer", config.getString(TOPIC_KEY_DESERIALIZER));
  Preconditions.checkArgument(config.hasPath(TOPIC_VALUE_DESERIALIZER));
  props.put("value.deserializer", config.getString(TOPIC_VALUE_DESERIALIZER));

  // pass along any config scoped under source.kafka.config
  // one use case of this is to pass SSL configuration
  Config scopedConfig = ConfigUtils.getConfigOrEmpty(config, KAFKA_CONSUMER_CONFIG_PREFIX);
  props.putAll(ConfigUtils.configToProperties(scopedConfig));

  Consumer consumer = null;
  try {
    consumer = new KafkaConsumer<>(props);
  } catch (Exception e) {
    LOG.error("Exception when creating Kafka consumer - {}", e);
    throw Throwables.propagate(e);
  }
  return consumer;
}
项目:incubator-gobblin    文件:KafkaSimpleStreamingSource.java   
@Override
public List<WorkUnit> getWorkunits(SourceState state) {
  Config config = ConfigUtils.propertiesToConfig(state.getProperties());
  Consumer<String, byte[]> consumer = getKafkaConsumer(config);
  LOG.debug("Consumer is {}", consumer);
  String topic = ConfigUtils.getString(config, TOPIC_WHITELIST,
      StringUtils.EMPTY); // TODO: fix this to use the new API when KafkaWrapper is fixed
  List<WorkUnit> workUnits = new ArrayList<WorkUnit>();
  List<PartitionInfo> topicPartitions;
  topicPartitions = consumer.partitionsFor(topic);
  LOG.info("Partition count is {}", topicPartitions.size());
  for (PartitionInfo topicPartition : topicPartitions) {
    Extract extract = this.createExtract(DEFAULT_TABLE_TYPE, DEFAULT_NAMESPACE_NAME, topicPartition.topic());
    LOG.info("Partition info is {}", topicPartition);
    WorkUnit workUnit = WorkUnit.create(extract);
    setTopicNameInState(workUnit, topicPartition.topic());
    workUnit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, topicPartition.topic());
    setPartitionId(workUnit, topicPartition.partition());
    workUnits.add(workUnit);
  }
  return workUnits;
}
项目:cruise-control    文件:CruiseControlMetricsReporterTest.java   
@Test
public void testReportingMetrics() {
  Properties props = new Properties();
  props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
  props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MetricSerde.class.getName());
  props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testReportingMetrics");
  props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  setSecurityConfigs(props, "consumer");
  Consumer<String, CruiseControlMetric> consumer = new KafkaConsumer<>(props);

  ConsumerRecords<String, CruiseControlMetric> records = ConsumerRecords.empty();
  consumer.subscribe(Collections.singletonList(TOPIC));
  long startMs = System.currentTimeMillis();
  Set<Integer> metricTypes = new HashSet<>();
  while (metricTypes.size() < 16 && System.currentTimeMillis() < startMs + 15000) {
    records = consumer.poll(10);
    for (ConsumerRecord<String, CruiseControlMetric> record : records) {
      metricTypes.add((int) record.value().metricType().id());
    }
  }
  HashSet<Integer> expectedMetricTypes = new HashSet<>(Arrays.asList((int) ALL_TOPIC_BYTES_IN.id(),
                                                                     (int) ALL_TOPIC_BYTES_OUT.id(),
                                                                     (int) TOPIC_BYTES_IN.id(),
                                                                     (int) TOPIC_BYTES_OUT.id(),
                                                                     (int) PARTITION_SIZE.id(),
                                                                     (int) BROKER_CPU_UTIL.id(),
                                                                     (int) ALL_TOPIC_PRODUCE_REQUEST_RATE.id(),
                                                                     (int) ALL_TOPIC_FETCH_REQUEST_RATE.id(),
                                                                     (int) ALL_TOPIC_MESSAGES_IN_PER_SEC.id(),
                                                                     (int) TOPIC_PRODUCE_REQUEST_RATE.id(),
                                                                     (int) TOPIC_FETCH_REQUEST_RATE.id(),
                                                                     (int) TOPIC_MESSAGES_IN_PER_SEC.id(),
                                                                     (int) BROKER_PRODUCE_REQUEST_RATE.id(),
                                                                     (int) BROKER_CONSUMER_FETCH_REQUEST_RATE.id(),
                                                                     (int) BROKER_FOLLOWER_FETCH_REQUEST_RATE.id(),
                                                                     (int) BROKER_REQUEST_HANDLER_AVG_IDLE_PERCENT.id()));
  assertEquals("Expected to see " + expectedMetricTypes + ", but only see " + metricTypes, metricTypes, expectedMetricTypes);
}