@Override @SuppressWarnings("unchecked") public void afterPropertiesSet() throws Exception { if (topics == null && topicPatternString == null) { throw new IllegalArgumentException("topic info must not be null"); } Assert.notEmpty(configs, "configs must not be null"); Assert.notNull(payloadListener, "payloadListener must be null"); String valueDeserializerKlass = (String) configs.get("value.deserializer"); configs.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); Consumer<String, byte[]> consumer = new KafkaConsumer<>(configs); Deserializer valueDeserializer = createDeserializer(valueDeserializerKlass); valueDeserializer.configure(configs, false); if (topics != null) { listenableConsumer = new ListenableTracingConsumer<>(consumer, Arrays.asList(topics), valueDeserializer); } else { listenableConsumer = new ListenableTracingConsumer<>(consumer, Pattern.compile(topicPatternString), valueDeserializer); } if (payloadListener != null) { listenableConsumer.addListener(payloadListener); } listenableConsumer.start(); }
private void loadRunningConf(String reloadMsgJson) { String notifyEvtName = reloadMsgJson == null ? "loaded" : "reloaded"; try { //加载zk中的配置信息 this.confMap = FullPullHelper.loadConfProps(zkConnect, topologyId, zkTopoRoot, Constants.ZkTopoConfForFullPull.FULL_PULL_SRC_TOPIC); this.MAX_FLOW_THRESHOLD = (Integer) confMap.get(FullPullHelper.RUNNING_CONF_KEY_MAX_FLOW_THRESHOLD); LOG.info("MAX_FLOW_THRESHOLD is {} on DataShardsSplittingSpout.loadRunningConf", MAX_FLOW_THRESHOLD); this.commonProps = (Properties) confMap.get(FullPullHelper.RUNNING_CONF_KEY_COMMON); this.dsName = commonProps.getProperty(Constants.ZkTopoConfForFullPull.DATASOURCE_NAME); this.fullPullSrcTopic = commonProps.getProperty(Constants.ZkTopoConfForFullPull.FULL_PULL_SRC_TOPIC); this.consumer = (Consumer<String, byte[]>) confMap.get(FullPullHelper.RUNNING_CONF_KEY_CONSUMER); this.zkService = (ZkService) confMap.get(FullPullHelper.RUNNING_CONF_KEY_ZK_SERVICE); LOG.info("Running Config is " + notifyEvtName + " successfully for DataShardsSplittingSpout!"); } catch (Exception e) { LOG.error(notifyEvtName + "ing running configuration encountered Exception!", e); } finally { FullPullHelper.saveReloadStatus(reloadMsgJson, "splitting-spout", true, zkConnect); } }
private void loadRunningConf(String reloadMsgJson) { try { this.confMap = FullPullHelper.loadConfProps(zkConnect, topologyId, zkTopoRoot, Constants.ZkTopoConfForFullPull.FULL_PULL_MEDIANT_TOPIC); this.MAX_FLOW_THRESHOLD = (Integer) confMap.get(FullPullHelper.RUNNING_CONF_KEY_MAX_FLOW_THRESHOLD); this.commonProps = (Properties) confMap.get(FullPullHelper.RUNNING_CONF_KEY_COMMON); this.dsName = commonProps.getProperty(Constants.ZkTopoConfForFullPull.DATASOURCE_NAME); this.zkService = (ZkService) confMap.get(FullPullHelper.RUNNING_CONF_KEY_ZK_SERVICE); this.consumer = (Consumer<String, byte[]>)confMap.get(FullPullHelper.RUNNING_CONF_KEY_CONSUMER); String notifyEvtName = reloadMsgJson == null ? "loaded" : "reloaded"; LOG.info("Running Config is " + notifyEvtName + " successfully for DataPullingSpout!"); } catch (Exception e) { LOG.error("Loading running configuration encountered Exception!", e); throw e; } finally { FullPullHelper.saveReloadStatus(reloadMsgJson, "pulling-spout", true, zkConnect); } }
/** * createConsumer - create a new consumer * @return * @throws Exception */ private Consumer<String, String> createConsumer() throws Exception { Properties props = ConfUtils.getProps(CONSUMER_PROPS); Consumer<String, String> consumer = new KafkaConsumer<>(props); // Seek to end automatically List<TopicPartition> pts = topics.stream().map(s -> new TopicPartition(s, 0)).collect(Collectors.toList()); consumer.assign(pts); if(rollBack==0){ consumer.seekToEnd(pts); }else{ for (TopicPartition topicPartition : pts) { consumer.seek(topicPartition, consumer.position(topicPartition)-rollBack); logger.info("Consumer seeked to -500000 :"+consumer.position(topicPartition)); } } return consumer; }
/** * createConsumer - create a new consumer * @return * @throws Exception */ private Consumer<String, String> createConsumer() throws Exception { // Seek to end automatically TopicPartition dataTopicPartition = new TopicPartition(topicName, 0); List<TopicPartition> topics = Arrays.asList(dataTopicPartition); Properties props = ConfUtils.getProps(CONSUMER_PROPS); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.assign(topics); if(offset == -1){ consumer.seekToEnd(topics); logger.info("Consumer seek to end"); }else{ consumer.seek(dataTopicPartition, offset); logger.info(String.format("read changed as offset: %s", consumer.position(dataTopicPartition))); } return consumer; }
public void seekToTransaction(DataRecoveryConfig config, long transactionId, KafkaFactory kafkaFactory, String groupId) { String topic = config.getLocalTopic(); Properties consumerProperties = PropertiesUtil.propertiesForGroup(config.getConsumerConfig(), groupId); try (Consumer<ByteBuffer, ByteBuffer> consumer = kafkaFactory.consumer(consumerProperties)) { List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); Map<TopicPartition, Long> seekMap = new HashMap<>(partitionInfos.size()); for (PartitionInfo partitionInfo : partitionInfos) { seekMap.put(new TopicPartition(topic, partitionInfo.partition()), transactionId); } consumer.assign(seekMap.keySet()); Map<TopicPartition, OffsetAndTimestamp> foundOffsets = consumer.offsetsForTimes(seekMap); Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(); for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : foundOffsets.entrySet()) { if (entry.getValue() != null) { offsetsToCommit.put(entry.getKey(), new OffsetAndMetadata(entry.getValue().offset())); } } consumer.commitSync(offsetsToCommit); } }
private void pollCommunicateOnce(Consumer<ByteBuffer, ByteBuffer> consumer) { ConsumerRecords<ByteBuffer, ByteBuffer> records = consumer.poll(POLL_TIMEOUT); if (records.isEmpty()) { if (!stalled && checkStalled(consumer)) { LOGGER.info("[I] Loader stalled {} / {}", f(leadId), f(localLoaderId)); stalled = true; lead.notifyLocalLoaderStalled(leadId, localLoaderId); } // ToDo: Consider sending empty messages for heartbeat sake. return; } if (stalled) { stalled = false; } MutableLongList committedIds = new LongArrayList(records.count()); for (ConsumerRecord<ByteBuffer, ByteBuffer> record : records) { committedIds.add(record.timestamp()); } committedIds.sortThis(); lead.updateInitialContext(localLoaderId, committedIds); consumer.commitSync(); }
@Override public void execute(ServiceContext ctx) throws Exception { active = true; receivedIds = new GridConcurrentHashSet<>(); Properties config = new Properties(); config.putAll(dataRecoveryConfig.getConsumerConfig()); config.put(ConsumerConfig.GROUP_ID_CONFIG, ReceivedTransactionsListenerImpl.class.getSimpleName()); config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); try (Consumer<ByteBuffer, ByteBuffer> consumer = kafkaFactory.consumer(config)) { consumer.subscribe(Arrays.asList(dataRecoveryConfig.getRemoteTopic(), dataRecoveryConfig.getReconciliationTopic())); while (active) { ConsumerRecords<ByteBuffer, ByteBuffer> poll = consumer.poll(500); for (ConsumerRecord<ByteBuffer, ByteBuffer> record : poll) { TransactionMetadata metadata = serializer.deserialize(record.key()); receivedIds.add(metadata.getTransactionId()); } consumer.commitSync(); } } }
/** * Create {@link StandbyTask} with its assigned partitions * * @param id the ID of this task * @param applicationId the ID of the stream processing application * @param partitions the collection of assigned {@link TopicPartition} * @param topology the instance of {@link ProcessorTopology} * @param consumer the instance of {@link Consumer} * @param config the {@link StreamsConfig} specified by the user * @param metrics the {@link StreamsMetrics} created by the thread * @param stateDirectory the {@link StateDirectory} created by the thread */ StandbyTask(final TaskId id, final String applicationId, final Collection<TopicPartition> partitions, final ProcessorTopology topology, final Consumer<byte[], byte[]> consumer, final ChangelogReader changelogReader, final StreamsConfig config, final StreamsMetrics metrics, final StateDirectory stateDirectory) { super(id, applicationId, partitions, topology, consumer, changelogReader, true, stateDirectory, null, config); // initialize the topology with its own context processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics); log.debug("{} Initializing", logPrefix); initializeStateStores(); processorContext.initialized(); checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed()); }
public GlobalStreamThread(final ProcessorTopology topology, final StreamsConfig config, final Consumer<byte[], byte[]> globalConsumer, final StateDirectory stateDirectory, final Metrics metrics, final Time time, final String threadClientId) { super(threadClientId); this.time = time; this.config = config; this.topology = topology; this.consumer = globalConsumer; this.stateDirectory = stateDirectory; long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) / (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + 1)); this.streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId, Collections.singletonMap("client-id", threadClientId)); this.cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics); }
/** * Wait until enough data (key-value records) has been consumed. * * @param consumerConfig Kafka Consumer configuration * @param topic Topic to consume from * @param expectedNumRecords Minimum number of expected records * @param waitTime Upper bound in waiting time in milliseconds * @return All the records consumed, or null if no records are consumed * @throws InterruptedException * @throws AssertionError if the given wait time elapses */ public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig, final String topic, final int expectedNumRecords, final long waitTime) throws InterruptedException { final List<KeyValue<K, V>> accumData = new ArrayList<>(); try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) { final TestCondition valuesRead = new TestCondition() { @Override public boolean conditionMet() { final List<KeyValue<K, V>> readData = readKeyValues(topic, consumer, waitTime, expectedNumRecords); accumData.addAll(readData); return accumData.size() >= expectedNumRecords; } }; final String conditionDetails = "Expecting " + expectedNumRecords + " records from topic " + topic + " while only received " + accumData.size() + ": " + accumData; TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails); } return accumData; }
/** * Wait until enough data (value records) has been consumed. * * @param consumerConfig Kafka Consumer configuration * @param topic Topic to consume from * @param expectedNumRecords Minimum number of expected records * @param waitTime Upper bound in waiting time in milliseconds * @return All the records consumed, or null if no records are consumed * @throws InterruptedException * @throws AssertionError if the given wait time elapses */ public static <V> List<V> waitUntilMinValuesRecordsReceived(final Properties consumerConfig, final String topic, final int expectedNumRecords, final long waitTime) throws InterruptedException { final List<V> accumData = new ArrayList<>(); try (final Consumer<Object, V> consumer = createConsumer(consumerConfig)) { final TestCondition valuesRead = new TestCondition() { @Override public boolean conditionMet() { final List<V> readData = readValues(topic, consumer, waitTime, expectedNumRecords); accumData.addAll(readData); return accumData.size() >= expectedNumRecords; } }; final String conditionDetails = "Expecting " + expectedNumRecords + " records from topic " + topic + " while only received " + accumData.size() + ": " + accumData; TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails); } return accumData; }
/** * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from * are already configured in the consumer). * * @param topic Kafka topic to read messages from * @param consumer Kafka consumer * @param waitTime Maximum wait time in milliseconds * @param maxMessages Maximum number of messages to read via the consumer * @return The KeyValue elements retrieved via the consumer */ private static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic, final Consumer<K, V> consumer, final long waitTime, final int maxMessages) { final List<KeyValue<K, V>> consumedValues; consumer.subscribe(Collections.singletonList(topic)); final int pollIntervalMs = 100; consumedValues = new ArrayList<>(); int totalPollTimeMs = 0; while (totalPollTimeMs < waitTime && continueConsuming(consumedValues.size(), maxMessages)) { totalPollTimeMs += pollIntervalMs; final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs); for (final ConsumerRecord<K, V> record : records) { consumedValues.add(new KeyValue<>(record.key(), record.value())); } } return consumedValues; }
TestStreamTask(final TaskId id, final String applicationId, final Collection<TopicPartition> partitions, final ProcessorTopology topology, final Consumer<byte[], byte[]> consumer, final Producer<byte[], byte[]> producer, final Consumer<byte[], byte[]> restoreConsumer, final StreamsConfig config, final StreamsMetrics metrics, final StateDirectory stateDirectory) { super(id, applicationId, partitions, topology, consumer, new StoreChangelogReader(restoreConsumer, Time.SYSTEM, 5000), config, metrics, stateDirectory, null, new MockTime(), producer); }
static void commitOffsets(Consumer<?, ?> consumer, Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, boolean async) { if (offsetsToCommit == null || offsetsToCommit.isEmpty()) { return; } OffsetCommitCallback callback = (offsets, exception) -> { if (exception != null) { LOG.warn("Unable to commit offsets for {} TopicPartition(s) {}: {}", offsets.size(), offsetsAsString(offsets), exception.getMessage(), exception); } else { LOG.debug("Successfully committed offset(s) for {} TopicPartition(s): {}", offsets.size(), offsetsAsString(offsets)); } }; if (async) { consumer.commitAsync(offsetsToCommit, callback); } else { consumer.commitSync(offsetsToCommit); } }
@Test @SuppressWarnings("serial") public void testOnPartitionsRevoked() throws Exception { String topic = "topic"; int partition = 0; long offset = 21; TopicPartition tp = new TopicPartition(topic, partition); OffsetAndMetadata commitOffsetMd = new OffsetAndMetadata(offset, null); Consumer consumer = mock(Consumer.class); OffsetLookup lookup = mock(OffsetLookup.class); SaveOffsetOnRebalance underTest = new SaveOffsetOnRebalance(lookup, consumer); when(consumer.position(tp)).thenReturn(offset); underTest.onPartitionsRevoked(Collections.singleton(tp)); verify(consumer).position(tp); verify(consumer).commitSync(new HashMap(){ { put(tp, commitOffsetMd); } }); }
@Test public void testOnPartitionsAssignedNegativeOffsetLookup() throws Exception { long offset = -1; String topic = "topic"; int partition = 0; TopicPartition tp = new TopicPartition(topic, partition); Consumer consumer = mock(Consumer.class); OffsetLookup lookup = mock(OffsetLookup.class); SaveOffsetOnRebalance underTest = new SaveOffsetOnRebalance(lookup, consumer); when(lookup.lookupOffset(topic, partition, Seek.LATEST)).thenReturn(offset); underTest.onPartitionsAssigned(Collections.singleton(tp)); verify(lookup).lookupOffset(topic, partition, Seek.LATEST); if (SaveOffsetOnRebalance.DEFAULT_SEEK_BEHAVIOR == Seek.EARLIEST) { verify(consumer).seekToBeginning(Collections.singleton(tp)); } else { verify(consumer).seekToEnd(Collections.singleton(tp)); } verify(consumer).position(tp); }
private static void performOffsetLookupTest(long offset) { String topic = "topic"; int partition = 0; TopicPartition tp = new TopicPartition(topic, partition); Consumer consumer = mock(Consumer.class); OffsetLookup lookup = mock(OffsetLookup.class); SaveOffsetOnRebalance underTest = new SaveOffsetOnRebalance(lookup, consumer); when(lookup.lookupOffset(topic, partition, Seek.LATEST)).thenReturn(offset); underTest.onPartitionsAssigned(Collections.singleton(tp)); verify(lookup).lookupOffset(topic, partition, Seek.LATEST); verify(consumer).seek(tp, offset + 1); verify(consumer).position(tp); }
@Override public KafkaReadStream<K, V> subscribe(Set<String> topics, Handler<AsyncResult<Void>> completionHandler) { BiConsumer<Consumer<K, V>, Future<Void>> handler = (consumer, future) -> { consumer.subscribe(topics, this.rebalanceListener); this.startConsuming(); if (future != null) { future.complete(); } }; if (this.closed.compareAndSet(true, false)) { this.start(handler, completionHandler); } else { this.submitTask(handler, completionHandler); } return this; }
@Override public KafkaReadStream<K, V> assign(Set<TopicPartition> partitions, Handler<AsyncResult<Void>> completionHandler) { BiConsumer<Consumer<K, V>, Future<Void>> handler = (consumer, future) -> { consumer.assign(partitions); this.startConsuming(); if (future != null) { future.complete(); } }; if (this.closed.compareAndSet(true, false)) { this.start(handler, completionHandler); } else { this.submitTask(handler, completionHandler); } return this; }
public static void main(String[] args) { String topic = "persistent://sample/standalone/ns/my-topic"; Properties props = new Properties(); props.put("bootstrap.servers", "pulsar://localhost:6650"); props.put("group.id", "my-subscription-name"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", IntegerDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); Consumer<Integer, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); while (true) { ConsumerRecords<Integer, String> records = consumer.poll(100); records.forEach(record -> { log.info("Received record: {}", record); }); // Commit last offset consumer.commitSync(); } }
@Override public void run() { Consumer<String, Property> consumer = ConsumerFactory.getPropertyConsumer(); consumer.subscribe(Arrays.asList(Topics.SAVE_PROPERTY)); try { while (true) { ConsumerRecords<String, Property> records = consumer.poll(POLL_DELAY); LOGGER.log(Level.INFO, "records fetched to persist {0}", records.count()); for (ConsumerRecord<String, Property> record : records) { Property property = record.value(); propertyService.save(property); } } } catch (Exception e) { LOGGER.log(Level.SEVERE, null, e); } finally { consumer.close(); } }
/** * Look up the offset for the given partition by timestamp. * Throws RuntimeException if there are no messages later than timestamp or if this partition * does not support timestamp based offset. */ @SuppressWarnings("unchecked") public long offsetForTime(Consumer<?, ?> consumer, TopicPartition topicPartition, Instant time) { checkArgument(hasOffsetsForTimes, "This Kafka Client must support Consumer.OffsetsForTimes()."); Map<TopicPartition, Long> timestampsToSearch = ImmutableMap.of(topicPartition, time.getMillis()); try { Map offsetsByTimes = (Map) offsetsForTimesMethod.invoke(consumer, timestampsToSearch); Object offsetAndTimestamp = Iterables.getOnlyElement(offsetsByTimes.values()); if (offsetAndTimestamp == null) { throw new RuntimeException("There are no messages has a timestamp that is greater than or " + "equals to the target time or the message format version in this partition is " + "before 0.10.0, topicPartition is: " + topicPartition); } else { return (long) offsetGetterMethod.invoke(offsetAndTimestamp); } } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } }
@Override public PCollection<Void> expand(PCollection<KV<K, V>> input) { int numShards = spec.getNumShards(); if (numShards <= 0) { try (Consumer<?, ?> consumer = openConsumer(spec)) { numShards = consumer.partitionsFor(spec.getTopic()).size(); LOG.info("Using {} shards for exactly-once writer, matching number of partitions " + "for topic '{}'", numShards, spec.getTopic()); } } checkState(numShards > 0, "Could not set number of shards"); return input .apply(Window.<KV<K, V>>into(new GlobalWindows()) // Everything into global window. .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .discardingFiredPanes()) .apply(String.format("Shuffle across %d shards", numShards), ParDo.of(new EOSReshard<K, V>(numShards))) .apply("Persist sharding", GroupByKey.<Integer, KV<K, V>>create()) .apply("Assign sequential ids", ParDo.of(new EOSSequencer<K, V>())) .apply("Persist ids", GroupByKey.<Integer, KV<Long, KV<K, V>>>create()) .apply(String.format("Write to Kafka topic '%s'", spec.getTopic()), ParDo.of(new KafkaEOWriter<>(spec, input.getCoder()))); }
@Override public void run() { logger.info("Worker thread started"); try (Consumer<String, Integer> consumer = new KafkaConsumer<>(configuration, new StringDeserializer(), new IntegerDeserializer())) { SeekingConsumerLogic logic = new SeekingConsumerLogic(consumer, stateDao, messagesToChangeState, percentFailureProbability); consumer.subscribe(Collections.singletonList(topic), logic); while (!finish.get()) { ConsumerRecords<String, Integer> records = consumer.poll(pollTimeout.toMillis()); long startTime = System.nanoTime(); logic.processMessages(records); long duration = System.nanoTime() - startTime; logger.debug("Processing of poll batch finished: {} messages, {} ms", records.count(), TimeUnit.NANOSECONDS.toMillis(duration)); } logic.optionallyCommitAllOffsets(); } catch (Exception e) { logger.error("Unexpected exception occurred: {}", e.toString(), e); } logger.info("Worker thread stopped"); }
/** * Create a {@link Consumer} that has a unique group ID and reads everything from a topic that is hosted within an * embedded instance of Kafka starting at the earliest point by default. * * @param kafka - The Kafka rule used to connect to the embedded Kafkfa instance. (not null) * @param keyDeserializerClass - Deserializes the keys. (not null) * @param valueDeserializerClass - Deserializes the values. (not null) * @return A {@link Consumer} that can be used to read records from a topic. */ public static <K, V> Consumer<K, V> fromStartConsumer( final KafkaTestInstanceRule kafka, final Class<? extends Deserializer<K>> keyDeserializerClass, final Class<? extends Deserializer<V>> valueDeserializerClass) { requireNonNull(kafka); requireNonNull(keyDeserializerClass); requireNonNull(valueDeserializerClass); final Properties props = kafka.createBootstrapServerConfig(); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass.getName()); return new KafkaConsumer<>(props); }
/** * Polls a {@link Consumer> until it has either polled too many times without hitting the target number * of results, or it hits the target number of results. * * @param pollMs - How long each poll could take. * @param pollIterations - The maximum number of polls that will be attempted. * @param targetSize - The number of results to read before stopping. * @param consumer - The consumer that will be polled. * @return The results that were read frmo the consumer. * @throws Exception If the poll failed. */ public static <K, V> List<V> pollForResults( final int pollMs, final int pollIterations, final int targetSize, final Consumer<K, V> consumer) throws Exception { requireNonNull(consumer); final List<V> values = new ArrayList<>(); int i = 0; while(values.size() < targetSize && i < pollIterations) { for(final ConsumerRecord<K, V> record : consumer.poll(pollMs)) { values.add( record.value() ); } i++; } return values; }
@Before public void setup() { // Make sure the topic that the change log uses exists. final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance); kafka.createTopic(changeLogTopic); // Setup the QueryRepository used by the test. final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class); final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class); final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic); queryRepo = new InMemoryQueryRepository(changeLog); // Initialize the Statements Producer and the Results Consumer. stmtProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class); resultConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class); }
/** * This test simulates executing many commands and each of them use their own InMemoryQueryRepository. We need * to re-create the repo outside of the command to ensure it has the most up to date values inside of it. * * @param ryaInstance - The rya instance the repository is connected to. (not null) * @param createTopic - Set this to true if the topic doesn't exist yet. */ private QueryRepository makeQueryRepository(final String ryaInstance, final boolean createTopic) { requireNonNull(ryaInstance); // Make sure the topic that the change log uses exists. final String changeLogTopic = KafkaTopics.queryChangeLogTopic("" + ryaInstance); if(createTopic) { kafka.createTopic(changeLogTopic); } // Setup the QueryRepository used by the test. final Producer<?, QueryChange> queryProducer = KafkaTestUtil.makeProducer(kafka, StringSerializer.class, QueryChangeSerializer.class); final Consumer<?, QueryChange>queryConsumer = KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, QueryChangeDeserializer.class); final QueryChangeLog changeLog = new KafkaQueryChangeLog(queryProducer, queryConsumer, changeLogTopic); return new InMemoryQueryRepository(changeLog); }
/** * Creates an instance of {@link KafkaQueryChangeLog} using a new {@link Producer} and {@link Consumer}. * * @param bootstrapServers - Indicates which instance of Kafka that will be connected to. (not null) * @param topic - The topic the QueryChangeLog is persisted to. (not null) * @return A new instance of {@link KafkaQueryChangeLog}. */ public static KafkaQueryChangeLog make( final String bootstrapServers, final String topic) { requireNonNull(bootstrapServers); requireNonNull(topic); final Properties producerProperties = new Properties(); producerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName()); final Properties consumerProperties = new Properties(); consumerProperties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName()); final Producer<?, QueryChange> producer = new KafkaProducer<>(producerProperties); final Consumer<?, QueryChange> consumer = new KafkaConsumer<>(consumerProperties); return new KafkaQueryChangeLog(producer, consumer, topic); }
private <T> void consume(String topic, ConsumerRecords<String, byte[]> records, JSONReader<T> reader, java.util.function.Consumer<List<T>> consumer) { int messageSize = 0; List<T> messages = new ArrayList<>(); for (ConsumerRecord<String, byte[]> record : records.records(topic)) { byte[] body = record.value(); messages.add(reader.fromJSON(body)); messageSize += body.length; } if (messages.isEmpty()) return; StopWatch watch = new StopWatch(); try { consumer.accept(messages); } finally { long elapsedTime = watch.elapsedTime(); logger.info("consume messages, topic={}, count={}, size={}, elapsedTime={}", topic, messages.size(), messageSize, elapsedTime); } }
public NakadiKafkaConsumer( final Consumer<byte[], byte[]> kafkaConsumer, final List<KafkaCursor> kafkaCursors, final Map<TopicPartition, Timeline> timelineMap, final long pollTimeout) { this.kafkaConsumer = kafkaConsumer; this.pollTimeout = pollTimeout; this.timelineMap = timelineMap; // define topic/partitions to consume from final Map<TopicPartition, KafkaCursor> topicCursors = kafkaCursors.stream().collect( Collectors.toMap( cursor -> new TopicPartition(cursor.getTopic(), cursor.getPartition()), cursor -> cursor, (cursor1, cursor2) -> cursor2 )); kafkaConsumer.assign(new ArrayList<>(topicCursors.keySet())); topicCursors.forEach((topicPartition, cursor) -> kafkaConsumer.seek(topicPartition, cursor.getOffset())); }
@Override public KafkaTopic getTopic(final String name) { KafkaTopic kafkaTopic = null; if (listTopics().contains(name)) { try (Consumer<String, String> consumer = kafkaConsumerFactory.createConsumer()) { final List<PartitionInfo> partitionInfos = consumer.partitionsFor(name); if (partitionInfos.size() > 0) { final PartitionInfo partitionInfo = partitionInfos.get(0); kafkaTopic = new KafkaTopic(); kafkaTopic.setName(name); kafkaTopic.setNumPartitions(partitionInfos.size()); kafkaTopic.setReplicationFactor(partitionInfo.replicas().length); } } } return kafkaTopic; }
@Override public String getSampleMessage(final String topic) { String message = null; if (listTopics().contains(topic)) { try (Consumer<String, String> kafkaConsumer = kafkaConsumerFactory.createConsumer()) { kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream() .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())) .collect(Collectors.toList())); kafkaConsumer.assignment().stream() .filter(p -> (kafkaConsumer.position(p) - 1) >= 0) .forEach(p -> kafkaConsumer.seek(p, kafkaConsumer.position(p) - 1)); final ConsumerRecords<String, String> records = kafkaConsumer.poll(KAFKA_CONSUMER_TIMEOUT); message = records.isEmpty() ? null : records.iterator().next().value(); kafkaConsumer.unsubscribe(); } } return message; }
static public Consumer getKafkaConsumer(Config config) { List<String> brokers = ConfigUtils.getStringList(config, ConfigurationKeys.KAFKA_BROKERS); Properties props = new Properties(); props.put("bootstrap.servers", Joiner.on(",").join(brokers)); props.put("group.id", ConfigUtils.getString(config, ConfigurationKeys.JOB_NAME_KEY, StringUtils.EMPTY)); props.put("enable.auto.commit", "false"); Preconditions.checkArgument(config.hasPath(TOPIC_KEY_DESERIALIZER)); props.put("key.deserializer", config.getString(TOPIC_KEY_DESERIALIZER)); Preconditions.checkArgument(config.hasPath(TOPIC_VALUE_DESERIALIZER)); props.put("value.deserializer", config.getString(TOPIC_VALUE_DESERIALIZER)); // pass along any config scoped under source.kafka.config // one use case of this is to pass SSL configuration Config scopedConfig = ConfigUtils.getConfigOrEmpty(config, KAFKA_CONSUMER_CONFIG_PREFIX); props.putAll(ConfigUtils.configToProperties(scopedConfig)); Consumer consumer = null; try { consumer = new KafkaConsumer<>(props); } catch (Exception e) { LOG.error("Exception when creating Kafka consumer - {}", e); throw Throwables.propagate(e); } return consumer; }
@Override public List<WorkUnit> getWorkunits(SourceState state) { Config config = ConfigUtils.propertiesToConfig(state.getProperties()); Consumer<String, byte[]> consumer = getKafkaConsumer(config); LOG.debug("Consumer is {}", consumer); String topic = ConfigUtils.getString(config, TOPIC_WHITELIST, StringUtils.EMPTY); // TODO: fix this to use the new API when KafkaWrapper is fixed List<WorkUnit> workUnits = new ArrayList<WorkUnit>(); List<PartitionInfo> topicPartitions; topicPartitions = consumer.partitionsFor(topic); LOG.info("Partition count is {}", topicPartitions.size()); for (PartitionInfo topicPartition : topicPartitions) { Extract extract = this.createExtract(DEFAULT_TABLE_TYPE, DEFAULT_NAMESPACE_NAME, topicPartition.topic()); LOG.info("Partition info is {}", topicPartition); WorkUnit workUnit = WorkUnit.create(extract); setTopicNameInState(workUnit, topicPartition.topic()); workUnit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, topicPartition.topic()); setPartitionId(workUnit, topicPartition.partition()); workUnits.add(workUnit); } return workUnits; }
@Test public void testReportingMetrics() { Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MetricSerde.class.getName()); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testReportingMetrics"); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); setSecurityConfigs(props, "consumer"); Consumer<String, CruiseControlMetric> consumer = new KafkaConsumer<>(props); ConsumerRecords<String, CruiseControlMetric> records = ConsumerRecords.empty(); consumer.subscribe(Collections.singletonList(TOPIC)); long startMs = System.currentTimeMillis(); Set<Integer> metricTypes = new HashSet<>(); while (metricTypes.size() < 16 && System.currentTimeMillis() < startMs + 15000) { records = consumer.poll(10); for (ConsumerRecord<String, CruiseControlMetric> record : records) { metricTypes.add((int) record.value().metricType().id()); } } HashSet<Integer> expectedMetricTypes = new HashSet<>(Arrays.asList((int) ALL_TOPIC_BYTES_IN.id(), (int) ALL_TOPIC_BYTES_OUT.id(), (int) TOPIC_BYTES_IN.id(), (int) TOPIC_BYTES_OUT.id(), (int) PARTITION_SIZE.id(), (int) BROKER_CPU_UTIL.id(), (int) ALL_TOPIC_PRODUCE_REQUEST_RATE.id(), (int) ALL_TOPIC_FETCH_REQUEST_RATE.id(), (int) ALL_TOPIC_MESSAGES_IN_PER_SEC.id(), (int) TOPIC_PRODUCE_REQUEST_RATE.id(), (int) TOPIC_FETCH_REQUEST_RATE.id(), (int) TOPIC_MESSAGES_IN_PER_SEC.id(), (int) BROKER_PRODUCE_REQUEST_RATE.id(), (int) BROKER_CONSUMER_FETCH_REQUEST_RATE.id(), (int) BROKER_FOLLOWER_FETCH_REQUEST_RATE.id(), (int) BROKER_REQUEST_HANDLER_AVG_IDLE_PERCENT.id())); assertEquals("Expected to see " + expectedMetricTypes + ", but only see " + metricTypes, metricTypes, expectedMetricTypes); }