@Before public void setup() { this.time = new MockTime(); this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); this.metadata = new Metadata(0, Long.MAX_VALUE, true); this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); this.client = new MockClient(time, metadata); this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); this.metrics = new Metrics(time); this.rebalanceListener = new MockRebalanceListener(); this.mockOffsetCommitCallback = new MockCommitCallback(); this.partitionAssignor.clear(); client.setNode(node); this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommitEnabled, true); }
private void createSystemConsumer(String name, MessageListener<String, String> consumeEvent) { log.info("Creating kafka consumer for topic {}", name); ContainerProperties containerProps = new ContainerProperties(name); Map<String, Object> props = kafkaProperties.buildConsumerProperties(); if (name.equals(applicationProperties.getKafkaSystemTopic())) { props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); } ConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(props); ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(factory, containerProps); container.setupMessageListener(consumeEvent); container.start(); log.info("Successfully created kafka consumer for topic {}", name); }
public KafkaSpoutConfig.Builder<String, String> builder(String bootstrapServers, String topic, Class klass) { Map<String, Object> props = new HashMap<>(); if (bootstrapServers == null || bootstrapServers.isEmpty()) { throw new IllegalArgumentException("bootstrap servers cannot be null"); } props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(KafkaSpoutConfig.Consumer.GROUP_ID, groupId); props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, keyDeserializer); props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, valueDeserializer); props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, String.valueOf(enableAutoCommit)); KafkaSpoutStreams streams = new KafkaSpoutStreamsNamedTopics.Builder( new KafkaSpoutStream(getFields(klass), topic)).build(); KafkaSpoutTuplesBuilder<String, String> tuplesBuilder = new KafkaSpoutTuplesBuilderNamedTopics.Builder<String, String>( new TuplesBuilder(topic, klass)).build(); return new KafkaSpoutConfig.Builder<>(props, streams, tuplesBuilder); }
public static void main(String[] args) { Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-starter-app"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> kStream = builder.stream("streams-file-input"); // do stuff kStream.to("streams-wordcount-output"); KafkaStreams streams = new KafkaStreams(builder, config); streams.cleanUp(); // only do this in dev - not in prod streams.start(); // print the topology System.out.println(streams.toString()); // shutdown hook to correctly close the streams application Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); }
@Test public void testAutoCommitManualAssignment() { ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true); subscriptions.assignFromUser(singleton(t1p)); subscriptions.seek(t1p, 100); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); time.sleep(autoCommitIntervalMs); coordinator.poll(time.milliseconds(), Long.MAX_VALUE); assertEquals(100L, subscriptions.committed(t1p).offset()); }
private StreamsConfig buildStreamsConfig(String appId, final Map<String, Object> additionalProps) { Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBootstrapServers()); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, getStreamsCommitIntervalMs()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, getAutoOffsetReset()); //TODO not clear if this is needed for not. Normal Kafka doesn't need it but streams may do //leaving it in seems to cause zookeeper connection warnings in the tests. Tests seem to work ok //without it // props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperConfig.getQuorum()); //Add any additional props, overwriting any from above props.putAll(additionalProps); props.forEach((key, value) -> LOGGER.info("Setting Kafka Streams property {} for appId {} to [{}]", key, appId, value.toString()) ); return new StreamsConfig(props); }
private void translateOldProperties(Context ctx) { // topic String topic = context.getString(KafkaSourceConstants.TOPIC); if (topic != null && !topic.isEmpty()) { subscriber = new TopicListSubscriber(topic); log.warn("{} is deprecated. Please use the parameter {}", KafkaSourceConstants.TOPIC, KafkaSourceConstants.TOPICS); } // old groupId groupId = ctx.getString(KafkaSourceConstants.OLD_GROUP_ID); if (groupId != null && !groupId.isEmpty()) { log.warn("{} is deprecated. Please use the parameter {}", KafkaSourceConstants.OLD_GROUP_ID, KafkaSourceConstants.KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG); } }
@Test public void testAutoCommitManualAssignmentCoordinatorUnknown() { ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true); subscriptions.assignFromUser(singleton(t1p)); subscriptions.seek(t1p, 100); // no commit initially since coordinator is unknown consumerClient.poll(0); time.sleep(autoCommitIntervalMs); consumerClient.poll(0); assertNull(subscriptions.committed(t1p)); // now find the coordinator client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); // sleep only for the retry backoff time.sleep(retryBackoffMs); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); coordinator.poll(time.milliseconds(), Long.MAX_VALUE); assertEquals(100L, subscriptions.committed(t1p).offset()); }
@Test public void testOldConfig() throws Exception { Context context = new Context(); context.put(BROKER_LIST_FLUME_KEY,testUtil.getKafkaServerUrl()); context.put(GROUP_ID_FLUME,"flume-something"); context.put(READ_SMALLEST_OFFSET,"true"); context.put("topic",topic); final KafkaChannel channel = new KafkaChannel(); Configurables.configure(channel, context); Properties consumerProps = channel.getConsumerProps(); Properties producerProps = channel.getProducerProps(); Assert.assertEquals(producerProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), testUtil.getKafkaServerUrl()); Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG), "flume-something"); Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); }
@SuppressWarnings("unchecked") @Override public List<EntityCommand<?>> fetch(String txId) { List<EntityCommand<?>> transactionOperations = new ArrayList<EntityCommand<?>>(); Map<String, Object> consumerConfigs = (Map<String, Object>)configuration.get("kafkaConsumerConfiguration"); consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(consumerConfigs); kafkaConsumer.subscribe(Arrays.asList(txId)); ConsumerRecords<String, String> records = kafkaConsumer.poll(kafkaConsumerPollTimeout); for (ConsumerRecord<String, String> record : records){ LOG.info("offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value()); try { transactionOperations.add(serializer.readFromString(record.value())); } catch (SerializationFailedException e) { LOG.error("Unable to deserialize [{}] because of: {}", record.value(), e.getMessage()); } } kafkaConsumer.close(); return transactionOperations; }
private KafkaConsumer<String, Serializable> getConsumer(String groupId) { KafkaConsumer<String, Serializable> kafkaConsumer = null; if ((kafkaConsumer = kafkaConsumers.get(groupId)) != null) return kafkaConsumer; Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); kafkaConsumer = new KafkaConsumer<String, Serializable>(properties); kafkaConsumers.put(groupId, kafkaConsumer); return kafkaConsumer; }
@Override public void execute(ServiceContext ctx) throws Exception { active = true; receivedIds = new GridConcurrentHashSet<>(); Properties config = new Properties(); config.putAll(dataRecoveryConfig.getConsumerConfig()); config.put(ConsumerConfig.GROUP_ID_CONFIG, ReceivedTransactionsListenerImpl.class.getSimpleName()); config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); try (Consumer<ByteBuffer, ByteBuffer> consumer = kafkaFactory.consumer(config)) { consumer.subscribe(Arrays.asList(dataRecoveryConfig.getRemoteTopic(), dataRecoveryConfig.getReconciliationTopic())); while (active) { ConsumerRecords<ByteBuffer, ByteBuffer> poll = consumer.poll(500); for (ConsumerRecord<ByteBuffer, ByteBuffer> record : poll) { TransactionMetadata metadata = serializer.deserialize(record.key()); receivedIds.add(metadata.getTransactionId()); } consumer.commitSync(); } } }
public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> properties = new HashMap<String, Object>(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.GROUP_ID_CONFIG, group); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); return new DefaultKafkaConsumerFactory<String, String>(properties); }
@Bean public Map<String, Object> kafkaConsumerConfig() { final Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.GROUP_ID_CONFIG, this.kafkaGroupId); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaHosts); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.kafkaAutoCommit); if (this.kafkaAutoCommit) { props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, this.kafkaAutoCommitInterval); } props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, this.kafkaSessionTimeout); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, this.kafkaKeyDeserializerClass); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.kafkaValueDeserializerClass); props.put(Constants.KAFKA_POLL_TIMEOUT, this.kafkaPollTimeout); props.put(Constants.KAFKA_SUBSCRIBED_TOPICS, this.kafkaSubscribedTopics); return props; }
private KafkaConsumer<byte[], byte[]> createConsumer() { // Include any unknown worker configs so consumer configs can be set globally on the worker // and through to the task Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.GROUP_ID_CONFIG, SinkUtils.consumerGroupId(id.connector())); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.putAll(workerConfig.originalsWithPrefix("consumer.")); KafkaConsumer<byte[], byte[]> newConsumer; try { newConsumer = new KafkaConsumer<>(props); } catch (Throwable t) { throw new ConnectException("Failed to create consumer", t); } return newConsumer; }
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) { Map<String, Object> producerProps = new HashMap<>(); producerProps.putAll(config.originals()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); Map<String, Object> consumerProps = new HashMap<>(); consumerProps.putAll(config.originals()); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Map<String, Object> adminProps = new HashMap<>(config.originals()); NewTopic topicDescription = TopicAdmin.defineTopic(topic). compacted(). partitions(1). replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)). build(); return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps); }
@Test public void testStartStop() throws Exception { expectConfigure(); expectStart(Collections.EMPTY_LIST); expectStop(); PowerMock.replayAll(); store.configure(DEFAULT_DISTRIBUTED_CONFIG); assertEquals(TOPIC, capturedTopic.getValue()); assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)); assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); assertEquals(TOPIC, capturedNewTopic.getValue().name()); assertEquals(TOPIC_PARTITIONS, capturedNewTopic.getValue().numPartitions()); assertEquals(TOPIC_REPLICATION_FACTOR, capturedNewTopic.getValue().replicationFactor()); store.start(); store.stop(); PowerMock.verifyAll(); }
public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KStreamBuilder builder = new KStreamBuilder(); builder.stream("streams-file-input").to("streams-pipe-output"); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); // usually the stream application would be running forever, // in this example we just let it run for some time and stop since the input data is finite. Thread.sleep(5000L); streams.close(); }
private Map<String, Object> getCommonConsumerConfigs() throws ConfigException { final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames()); // disable auto commit and throw exception if there is user overridden values, // this is necessary for streams commit semantics if (clientProvidedProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + ", as the streams client will always turn off auto committing."); } if (eosEnabled) { if (clientProvidedProps.containsKey(ConsumerConfig.ISOLATION_LEVEL_CONFIG)) { throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ISOLATION_LEVEL_CONFIG + "; because " + PROCESSING_GUARANTEE_CONFIG + " is set to '" + EXACTLY_ONCE + "' consumers will always read committed data only."); } } final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES); consumerProps.putAll(clientProvidedProps); // bootstrap.servers should be from StreamsConfig consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG)); // remove deprecate ZK config consumerProps.remove(ZOOKEEPER_CONNECT_CONFIG); return consumerProps; }
/** * Get the configs to the {@link KafkaConsumer consumer}. * Properties using the prefix {@link #CONSUMER_PREFIX} will be used in favor over their non-prefixed versions * except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} where we always use the non-prefixed * version as we only support reading/writing from/to the same Kafka Cluster. * * @param streamThread the {@link StreamThread} creating a consumer * @param groupId consumer groupId * @param clientId clientId * @return Map of the consumer configuration. * @throws ConfigException if {@code "enable.auto.commit"} was set to {@code false} by the user */ public Map<String, Object> getConsumerConfigs(final StreamThread streamThread, final String groupId, final String clientId) throws ConfigException { final Map<String, Object> consumerProps = getCommonConsumerConfigs(); // add client id with stream client id prefix, and group id consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer"); // add configs required for stream partition assignor consumerProps.put(InternalConfig.STREAM_THREAD_INSTANCE, streamThread); consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG)); consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG)); consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName()); consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)); consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG)); return consumerProps; }
private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer, final int numMessages) throws InterruptedException { final Properties consumerProperties = new Properties(); consumerProperties .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo); consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName()); consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName()); return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( consumerProperties, outputTopic, numMessages, 60 * 1000); }
@Before public void setUp() throws Exception { Properties props = new Properties(); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); streamsConfiguration = StreamsTestUtils.getStreamsConfig( "testAutoOffsetId", CLUSTER.bootstrapServers(), STRING_SERDE_CLASSNAME, STRING_SERDE_CLASSNAME, props); // Remove any state from previous test runs IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); }
@Before public void before() throws InterruptedException { testNo++; String applicationId = "kstream-repartition-join-test-" + testNo; builder = new KStreamBuilder(); createTopics(); streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3); streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput); streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput); streamFour = builder.stream(Serdes.Integer(), Serdes.String(), streamFourInput); keyMapper = MockKeyValueMapper.SelectValueKeyValueMapper(); }
private List<String> receiveMessages(final Deserializer<?> valueDeserializer, final int numMessages, final String topic) throws InterruptedException { final Properties config = new Properties(); config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kstream-test"); config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName()); final List<String> received = IntegrationTestUtils.waitUntilMinValuesRecordsReceived( config, topic, numMessages, 60 * 1000); Collections.sort(received); return received; }
@Before public void before() throws InterruptedException { testNo++; userClicksTopic = "user-clicks-" + testNo; userRegionsTopic = "user-regions-" + testNo; userRegionsStoreName = "user-regions-store-name-" + testNo; outputTopic = "output-topic-" + testNo; CLUSTER.createTopics(userClicksTopic, userRegionsTopic, outputTopic); streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-integration-test-" + testNo); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); }
@Test public void testAutoCommitDynamicAssignment() { final String consumerId = "consumer"; ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true); subscriptions.subscribe(singleton(topic1), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); coordinator.joinGroupIfNeeded(); subscriptions.seek(t1p, 100); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); time.sleep(autoCommitIntervalMs); coordinator.poll(time.milliseconds(), Long.MAX_VALUE); assertEquals(100L, subscriptions.committed(t1p).offset()); }
private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer, final int numMessages) throws InterruptedException { final Properties consumerProperties = new Properties(); consumerProperties .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo); consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName()); consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName()); return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties, outputTopic, numMessages, 60 * 1000); }
public static Properties getStreamsConfig(final String applicationId, final String bootstrapServers, final String keySerdeClassName, final String valueSerdeClassName, final Properties additional) { Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); streamsConfiguration.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerdeClassName); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerdeClassName); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsConfiguration.putAll(additional); return streamsConfiguration; }
@BeforeClass public static void setupConfigsAndUtils() throws Exception { PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all"); PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0); PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-result-consumer"); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); }
private Properties prepareTest() throws Exception { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + testNo); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); return streamsConfiguration; }
private void cleanGlobal(final String intermediateUserTopic) { final String[] parameters; if (intermediateUserTopic != null) { parameters = new String[]{ "--application-id", APP_ID + testNo, "--bootstrap-servers", CLUSTER.bootstrapServers(), "--zookeeper", CLUSTER.zKConnectString(), "--input-topics", INPUT_TOPIC, "--intermediate-topics", INTERMEDIATE_USER_TOPIC }; } else { parameters = new String[]{ "--application-id", APP_ID + testNo, "--bootstrap-servers", CLUSTER.bootstrapServers(), "--zookeeper", CLUSTER.zKConnectString(), "--input-topics", INPUT_TOPIC }; } final Properties cleanUpConfig = new Properties(); cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT); final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig); Assert.assertEquals(0, exitCode); }
private List<KeyValue<Long, Long>> readResult(final int numberOfRecords, final String groupId) throws Exception { if (groupId != null) { return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( TestUtils.consumerConfig( CLUSTER.bootstrapServers(), groupId, LongDeserializer.class, LongDeserializer.class, new Properties() { { put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)); } }), SINGLE_PARTITION_OUTPUT_TOPIC, numberOfRecords ); } // read uncommitted return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( TestUtils.consumerConfig(CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class), SINGLE_PARTITION_OUTPUT_TOPIC, numberOfRecords ); }
private Properties setStreamProperties(final String applicationId) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // the socket buffer needs to be large, especially when running in AWS with // high latency. if running locally the default is fine. props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass()); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS); props.put(StreamsConfig.POLL_MS_CONFIG, POLL_MS); return props; }
private Properties setProduceConsumeProperties(final String clientId) { Properties props = new Properties(); props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); // the socket buffer needs to be large, especially when running in AWS with // high latency. if running locally the default is fine. props.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // the socket buffer needs to be large, especially when running in AWS with // high latency. if running locally the default is fine. props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS); return props; }
public <K, V> KafkaConsumer<K, V> createConsumer(String groupId, Class<? extends Deserializer<K>> keyDeserializer, Class<? extends Deserializer<V>> valueDeserializer) { Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("enable.auto.commit", "false"); props.put("key.deserializer", keyDeserializer.getName()); props.put("value.deserializer", valueDeserializer.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); if (groupId != null) { props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); } return new KafkaConsumer<>(props); }
@Test public void shouldSupportPrefixedRestoreConsumerConfigs() throws Exception { props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1); final StreamsConfig streamsConfig = new StreamsConfig(props); final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId"); assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)); }
@Test public void testReportingMetrics() { Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MetricSerde.class.getName()); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testReportingMetrics"); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); setSecurityConfigs(props, "consumer"); Consumer<String, CruiseControlMetric> consumer = new KafkaConsumer<>(props); ConsumerRecords<String, CruiseControlMetric> records = ConsumerRecords.empty(); consumer.subscribe(Collections.singletonList(TOPIC)); long startMs = System.currentTimeMillis(); Set<Integer> metricTypes = new HashSet<>(); while (metricTypes.size() < 16 && System.currentTimeMillis() < startMs + 15000) { records = consumer.poll(10); for (ConsumerRecord<String, CruiseControlMetric> record : records) { metricTypes.add((int) record.value().metricType().id()); } } HashSet<Integer> expectedMetricTypes = new HashSet<>(Arrays.asList((int) ALL_TOPIC_BYTES_IN.id(), (int) ALL_TOPIC_BYTES_OUT.id(), (int) TOPIC_BYTES_IN.id(), (int) TOPIC_BYTES_OUT.id(), (int) PARTITION_SIZE.id(), (int) BROKER_CPU_UTIL.id(), (int) ALL_TOPIC_PRODUCE_REQUEST_RATE.id(), (int) ALL_TOPIC_FETCH_REQUEST_RATE.id(), (int) ALL_TOPIC_MESSAGES_IN_PER_SEC.id(), (int) TOPIC_PRODUCE_REQUEST_RATE.id(), (int) TOPIC_FETCH_REQUEST_RATE.id(), (int) TOPIC_MESSAGES_IN_PER_SEC.id(), (int) BROKER_PRODUCE_REQUEST_RATE.id(), (int) BROKER_CONSUMER_FETCH_REQUEST_RATE.id(), (int) BROKER_FOLLOWER_FETCH_REQUEST_RATE.id(), (int) BROKER_REQUEST_HANDLER_AVG_IDLE_PERCENT.id())); assertEquals("Expected to see " + expectedMetricTypes + ", but only see " + metricTypes, metricTypes, expectedMetricTypes); }
@Test public void test() throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "127.0.0.1:9092");//该地址是集群的子集,用来探测集群。 props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group"); props.put("acks", "all");// 记录完整提交,最慢的但是最大可能的持久化 props.put("retries", 3);// 请求失败重试的次数 props.put("batch.size", 16384);// batch的大小 props.put("linger.ms", 1);// 默认情况即使缓冲区有剩余的空间,也会立即发送请求,设置一段时间用来等待从而将缓冲区填的更多,单位为毫秒,producer发送数据会延迟1ms,可以减少发送到kafka服务器的请求数据 props.put("buffer.memory", 33554432);// 提供给生产者缓冲内存总量 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer.tracing.codec", Codec.JSON); props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); Consumer<String, byte[]> consumer = new KafkaConsumer<>(props); ListenableTracingConsumer<String, String> listenableTracingConsumer = new ListenableTracingConsumer<>(consumer, Pattern.compile("test"), new StringDeserializer()); BraveFactoryBean factoryBean = new BraveFactoryBean(); factoryBean.setServiceName("kafka-test"); factoryBean.setTransport("http"); factoryBean.setTransportAddress("127.0.0.1:9411"); factoryBean.afterPropertiesSet(); Brave brave = factoryBean.getObject(); listenableTracingConsumer.addListener(new AbstractTracingListener<String, String>(brave) { @Override public void onPayload(Payload<String, String> payload) { try { System.out.println("key: " + payload.key()); System.out.println("value: " + payload.value()); Sleeper.JUST.sleepFor(2000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); listenableTracingConsumer.start(); System.in.read(); }
@Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); // list of host:port pairs used for establishing the initial connections to the Kakfa cluster props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); // allows a pool of processes to divide the work of consuming and processing records props.put(ConsumerConfig.GROUP_ID_CONFIG, "bot"); return props; }