public void createProducer(String bootstrapServer) { long numberOfEvents = 5; Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); KafkaProducer<String, String> producer = new KafkaProducer<>( props); for (int i = 0; i < numberOfEvents; i++) { String key = "testContainers"; String value = "AreAwesome"; ProducerRecord<String, String> record = new ProducerRecord<>( "hello_world_topic", key, value); try { producer.send(record).get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } System.out.printf("key = %s, value = %s\n", key, value); } producer.close(); }
public void initialize(String servers) { if (isInitialized.get()) { logger.warn("Already initialized"); return; } Properties props = new Properties(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, SixtPartitioner.class.getName()); props.put(ProducerConfig.RETRIES_CONFIG, "3"); props.put(ProducerConfig.ACKS_CONFIG, "all"); properties.forEach(props::put); realProducer = new KafkaProducer<>(props); isInitialized.set(true); }
/** * Metrics for a machine * * @param machine * @return the metric */ @GET @Path("{machine}") @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) public Response getMachineMetric(@PathParam("machine") String machine) { LOGGER.log(Level.INFO, "Fetching metrics for machine {0}", machine); KafkaStreams ks = GlobalAppState.getInstance().getKafkaStreams(); HostInfo thisInstance = GlobalAppState.getInstance().getHostPortInfo(); Metrics metrics = null; StreamsMetadata metadataForMachine = ks.metadataForKey(storeName, machine, new StringSerializer()); if (metadataForMachine.host().equals(thisInstance.host()) && metadataForMachine.port() == thisInstance.port()) { LOGGER.log(Level.INFO, "Querying local store for machine {0}", machine); metrics = getLocalMetrics(machine); } else { //LOGGER.log(Level.INFO, "Querying remote store for machine {0}", machine); String url = "http://" + metadataForMachine.host() + ":" + metadataForMachine.port() + "/metrics/remote/" + machine; metrics = Utils.getRemoteStoreState(url, 2, TimeUnit.SECONDS); LOGGER.log(Level.INFO, "Metric from remote store at {0} == {1}", new Object[]{url, metrics}); } return Response.ok(metrics).build(); }
@Override public void init(AbstractConfiguration config, ApplicationListenerFactory factory) { init(config); logger.trace("Initializing Kafka consumer ..."); // consumer config Properties props = new Properties(); props.put("bootstrap.servers", config.getString("bootstrap.servers")); props.put("group.id", config.getString("group.id")); props.put("enable.auto.commit", "true"); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", InternalMessageSerializer.class.getName()); // consumer this.consumer = new KafkaConsumer<>(props); // consumer worker this.worker = new KafkaApplicationWorker(this.consumer, APPLICATION_TOPIC, factory.newListener()); this.executor.submit(this.worker); }
@Override public void init(AbstractConfiguration config, String brokerId, BrokerListenerFactory factory) { init(config); BROKER_TOPIC = BROKER_TOPIC_PREFIX + "." + brokerId; logger.trace("Initializing Kafka consumer ..."); // consumer config Properties props = new Properties(); props.put("bootstrap.servers", config.getString("bootstrap.servers")); props.put("group.id", UUIDs.shortUuid()); props.put("enable.auto.commit", "true"); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", InternalMessageSerializer.class.getName()); // consumer this.consumer = new KafkaConsumer<>(props); // consumer worker this.worker = new KafkaBrokerWorker(this.consumer, BROKER_TOPIC, factory.newListener()); this.executor.submit(this.worker); }
protected void init(AbstractConfiguration config) { BROKER_TOPIC_PREFIX = config.getString("communicator.broker.topic"); APPLICATION_TOPIC = config.getString("communicator.application.topic"); logger.trace("Initializing Kafka producer ..."); // producer config Properties props = new Properties(); props.put("bootstrap.servers", config.getString("bootstrap.servers")); props.put("acks", config.getString("acks")); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", InternalMessageSerializer.class.getName()); // producer this.producer = new KafkaProducer<>(props); // consumer executor this.executor = Executors.newSingleThreadExecutor(); }
public void publishDummyData() { final String topic = "TestTopic"; // Create publisher final Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); final KafkaProducer<String, String> producer = new KafkaProducer<>(config); for (int charCode = 65; charCode < 91; charCode++) { final char[] key = new char[1]; key[0] = (char) charCode; producer.send(new ProducerRecord<>(topic, new String(key), new String(key))); } producer.flush(); producer.close(); }
public static void main(String[] args) { Configuration config = ConfigurationProvider.getConfiguration(); String bootstrapServers = config.getOrDefault("kafka.bootstrap_servers", "localhost:9092"); Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<String, String> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 100) .boxed() .map(number -> new ProducerRecord<>( "topic-1", number.toString(), number.toString())) .map(record -> producer.send(record)) .forEach(result -> printMetadata(result)); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); Producer<String, byte[]> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 100).boxed() .map(number -> new ProducerRecord<>( TOPIC, //topic number.toString(), //key UserAvroSerdes.serialize(new User(String.format("user-%s", number.toString()))))) //value .forEach(record -> producer.send(record)); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<Integer, String> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 100).boxed() .map(number -> new ProducerRecord<>( TOPIC, number, //Key String.format("record-%s", number))) //Value .forEach(record -> producer.send(record)); producer.close(); }
public static void main(String[] args) { final ActorSystem system = ActorSystem.create("KafkaProducerSystem"); final Materializer materializer = ActorMaterializer.create(system); final ProducerSettings<byte[], String> producerSettings = ProducerSettings .create(system, new ByteArraySerializer(), new StringSerializer()) .withBootstrapServers("localhost:9092"); CompletionStage<Done> done = Source.range(1, 100) .map(n -> n.toString()) .map(elem -> new ProducerRecord<byte[], String>( "topic1-ts", 0, Instant.now().getEpochSecond(), null, elem)) .runWith(Producer.plainSink(producerSettings), materializer); done.whenComplete((d, ex) -> System.out.println("sent")); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<Integer, String> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 10000).boxed() .map(number -> new ProducerRecord<>( TOPIC, 1, //Key String.format("record-%s", number))) //Value .forEach(record -> producer.send(record)); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<Integer, String> producer = new KafkaProducer<>(properties); IntStream .rangeClosed(1, 100000).boxed() .map(number -> new ProducerRecord<>( TOPIC, 1, //Key String.format("record-%s", number))) //Value .forEach(record -> producer.send(record)); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.ACKS_CONFIG, "all"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<Integer, String> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 100).boxed() .map(number -> new ProducerRecord<>( TOPIC, number, //Key String.format("record-%s", number))) //Value .forEach(record -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } producer.send(record); }); producer.close(); }
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) { Map<String, Object> producerProps = new HashMap<>(); producerProps.putAll(config.originals()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); Map<String, Object> consumerProps = new HashMap<>(); consumerProps.putAll(config.originals()); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Map<String, Object> adminProps = new HashMap<>(config.originals()); NewTopic topicDescription = TopicAdmin.defineTopic(topic). compacted(). partitions(1). replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)). build(); return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps); }
private void produceMessages(final long timestamp) throws ExecutionException, InterruptedException { IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( streamOneInput, Arrays.asList( new KeyValue<>(1, "A"), new KeyValue<>(2, "B"), new KeyValue<>(3, "C"), new KeyValue<>(4, "D"), new KeyValue<>(5, "E")), TestUtils.producerConfig( CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, new Properties()), timestamp); }
private void produceStreamTwoInputTo(final String streamTwoInput) throws ExecutionException, InterruptedException { IntegrationTestUtils.produceKeyValuesSynchronously( streamTwoInput, Arrays.asList( new KeyValue<>(1, "A"), new KeyValue<>(2, "B"), new KeyValue<>(3, "C"), new KeyValue<>(4, "D"), new KeyValue<>(5, "E")), TestUtils.producerConfig( CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, new Properties()), mockTime); }
private void produceTopicValues(final String topic) throws java.util.concurrent.ExecutionException, InterruptedException { IntegrationTestUtils.produceKeyValuesSynchronously( topic, Arrays.asList( new KeyValue<>("a", 1L), new KeyValue<>("b", 2L), new KeyValue<>("c", 3L), new KeyValue<>("d", 4L), new KeyValue<>("e", 5L)), TestUtils.producerConfig( CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class, new Properties()), mockTime); }
private void produceGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException { IntegrationTestUtils.produceKeyValuesSynchronously( globalOne, Arrays.asList( new KeyValue<>(1L, "F"), new KeyValue<>(2L, "G"), new KeyValue<>(3L, "H"), new KeyValue<>(4L, "I"), new KeyValue<>(5L, "J")), TestUtils.producerConfig( CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class, new Properties()), mockTime); }
private void produceMessages(long timestamp) throws ExecutionException, InterruptedException { IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( streamOneInput, Arrays.asList( new KeyValue<>(1, "A"), new KeyValue<>(2, "B"), new KeyValue<>(3, "C"), new KeyValue<>(4, "D"), new KeyValue<>(5, "E")), TestUtils.producerConfig( CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, new Properties()), timestamp); }
@BeforeClass public static void setupConfigsAndUtils() throws Exception { PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all"); PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0); PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-result-consumer"); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); }
private void prepareInputData() throws Exception { CLUSTER.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN); final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class); mockTime.sleep(10); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, mockTime.milliseconds()); mockTime.sleep(10); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, mockTime.milliseconds()); mockTime.sleep(10); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, mockTime.milliseconds()); mockTime.sleep(10); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, mockTime.milliseconds()); mockTime.sleep(10); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, mockTime.milliseconds()); mockTime.sleep(10); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, mockTime.milliseconds()); mockTime.sleep(1); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, mockTime.milliseconds()); mockTime.sleep(1); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, mockTime.milliseconds()); mockTime.sleep(1); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, mockTime.milliseconds()); mockTime.sleep(1); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, mockTime.milliseconds()); }
@Override public void run() { final Properties producerConfig = new Properties(); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); producerConfig.put(ProducerConfig.RETRIES_CONFIG, 10); producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); try (final KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer())) { while (getCurrIteration() < numIterations && !shutdown) { for (final String value : inputValues) { producer.send(new ProducerRecord<String, String>(topic, value)); } incrementInteration(); } } }
@Test public void testWindowedSerializerNoArgConstructors() { Map<String, String> props = new HashMap<>(); // test key[value].serializer.inner.class takes precedence over serializer.inner.class WindowedSerializer<StringSerializer> windowedSerializer = new WindowedSerializer<>(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1"); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); props.put("key.serializer.inner.class", "org.apache.kafka.common.serialization.StringSerializer"); props.put("serializer.inner.class", "org.apache.kafka.common.serialization.StringSerializer"); windowedSerializer.configure(props, true); Serializer<?> inner = windowedSerializer.innerSerializer(); assertNotNull("Inner serializer should be not null", inner); assertTrue("Inner serializer type should be StringSerializer", inner instanceof StringSerializer); // test serializer.inner.class props.put("serializer.inner.class", "org.apache.kafka.common.serialization.ByteArraySerializer"); props.remove("key.serializer.inner.class"); props.remove("value.serializer.inner.class"); WindowedSerializer<?> windowedSerializer1 = new WindowedSerializer<>(); windowedSerializer1.configure(props, false); Serializer<?> inner1 = windowedSerializer1.innerSerializer(); assertNotNull("Inner serializer should be not null", inner1); assertTrue("Inner serializer type should be ByteArraySerializer", inner1 instanceof ByteArraySerializer); }
@Test public void testWindowedDeserializerNoArgConstructors() { Map<String, String> props = new HashMap<>(); // test key[value].deserializer.inner.class takes precedence over serializer.inner.class WindowedDeserializer<StringSerializer> windowedDeserializer = new WindowedDeserializer<>(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1"); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); props.put("key.deserializer.inner.class", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("deserializer.inner.class", "org.apache.kafka.common.serialization.StringDeserializer"); windowedDeserializer.configure(props, true); Deserializer<?> inner = windowedDeserializer.innerDeserializer(); assertNotNull("Inner deserializer should be not null", inner); assertTrue("Inner deserializer type should be StringDeserializer", inner instanceof StringDeserializer); // test deserializer.inner.class props.put("deserializer.inner.class", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.remove("key.deserializer.inner.class"); props.remove("value.deserializer.inner.class"); WindowedDeserializer<?> windowedDeserializer1 = new WindowedDeserializer<>(); windowedDeserializer1.configure(props, false); Deserializer<?> inner1 = windowedDeserializer1.innerDeserializer(); assertNotNull("Inner deserializer should be not null", inner1); assertTrue("Inner deserializer type should be ByteArrayDeserializer", inner1 instanceof ByteArrayDeserializer); }
@Test public void testInterceptorConstructClose() throws Exception { try { Properties props = new Properties(); // test with client ID assigned by KafkaProducer props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName()); props.setProperty(MockProducerInterceptor.APPEND_STRING_PROP, "something"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>( props, new StringSerializer(), new StringSerializer()); assertEquals(1, MockProducerInterceptor.INIT_COUNT.get()); assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get()); // Cluster metadata will only be updated on calling onSend. Assert.assertNull(MockProducerInterceptor.CLUSTER_META.get()); producer.close(); assertEquals(1, MockProducerInterceptor.INIT_COUNT.get()); assertEquals(1, MockProducerInterceptor.CLOSE_COUNT.get()); } finally { // cleanup since we are using mutable static variables in MockProducerInterceptor MockProducerInterceptor.resetCounters(); } }
@Test public void testPartitionerClose() throws Exception { try { Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, MockPartitioner.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<String, String>( props, new StringSerializer(), new StringSerializer()); assertEquals(1, MockPartitioner.INIT_COUNT.get()); assertEquals(0, MockPartitioner.CLOSE_COUNT.get()); producer.close(); assertEquals(1, MockPartitioner.INIT_COUNT.get()); assertEquals(1, MockPartitioner.CLOSE_COUNT.get()); } finally { // cleanup since we are using mutable static variables in MockPartitioner MockPartitioner.resetCounters(); } }
@Autowired public KafkaRequestResponseProducer(@Value("${seldon.kafka.enable}") boolean kafkaEnabled) { if (kafkaEnabled) { enabled = true; String kafkaHostPort = System.getenv(ENV_VAR_SELDON_KAFKA_SERVER); logger.info(String.format("using %s[%s]", ENV_VAR_SELDON_KAFKA_SERVER, kafkaHostPort)); if (kafkaHostPort == null) { logger.warn("*WARNING* SELDON_KAFKA_SERVER environment variable not set!"); kafkaHostPort = "localhost:9093"; } logger.info("Starting kafka client with server "+kafkaHostPort); Properties props = new Properties(); props.put("bootstrap.servers", kafkaHostPort); props.put("client.id", "RequestResponseProducer"); props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000"); props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "1000"); props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,"20"); //NB need to investigate issues of Kafka not able to get metadata producer = new KafkaProducer<>(props, new StringSerializer(), new RequestResponseSerializer()); } else logger.warn("Kafka not enabled"); }
@Test public void testSinkWithString() throws InterruptedException { KafkaUsage usage = new KafkaUsage(); String topic = UUID.randomUUID().toString(); CountDownLatch latch = new CountDownLatch(1); List<String> values = new ArrayList<>(); usage.consumeStrings(topic, 10, 10, TimeUnit.SECONDS, latch::countDown, (k, v) -> values.contains(v)); KafkaSink<String> sink = new KafkaSink<>(vertx, getKafkaConfig() .put("topic", topic) .put("value.serializer", StringSerializer.class.getName()) .put("value.deserializer", StringDeserializer.class.getName()) ); Stream<String> stream = new Random().longs(10).mapToObj(Long::toString); Source.fromPayloads(stream) .onPayload(values::add) .to(sink); assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue(); }
public KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final String zkServers, final String mail, final String rpc, final String app, final String host, final Property[] properties) { super(loggerContext, name); this.topic = topic; this.zkServers = zkServers; this.mail = mail; this.rpc = rpc; this.app = app; this.orginApp = app; this.host = host; this.checkAndSetConfig(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); this.checkAndSetConfig(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 设置分区类, 使用自定义的KeyModPartitioner,同样的key进入相同的partition this.checkAndSetConfig(ProducerConfig.PARTITIONER_CLASS_CONFIG, KeyModPartitioner.class.getName()); // xml配置里面的参数 for (final Property property : properties) { this.config.put(property.getName(), property.getValue()); } // 由于容器部署需要从外部获取host this.config.put(ProducerConfig.CLIENT_ID_CONFIG, this.app + Constants.MIDDLE_LINE + this.host + Constants.MIDDLE_LINE + "log4j2"); }
@Test public void testAutoCommit() throws Exception { LOG.info("Start testAutoCommit"); ContainerProperties containerProps = new ContainerProperties("topic3", "topic4"); final CountDownLatch latch = new CountDownLatch(4); containerProps.setMessageListener((MessageListener<Integer, String>) message -> { LOG.info("received: " + message); latch.countDown(); }); KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps, IntegerDeserializer.class, StringDeserializer.class); container.setBeanName("testAutoCommit"); container.start(); Thread.sleep(5000); // wait a bit for the container to start KafkaTemplate<Integer, String> template = createTemplate(IntegerSerializer.class, StringSerializer.class); template.setDefaultTopic("topic3"); template.sendDefault(0, "foo"); template.sendDefault(2, "bar"); template.sendDefault(0, "baz"); template.sendDefault(2, "qux"); template.flush(); assertTrue(latch.await(60, TimeUnit.SECONDS)); container.stop(); LOG.info("Stop testAutoCommit"); }
public KafkaOutputter(String kafkaBroker, int kafkaPort, String kafkaTopicBase) { sortedMapper = new ObjectMapper(); sortedMapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true); unsortedMapper = new ObjectMapper(); // Kafka producer Properties props = new Properties(); props.put("bootstrap.servers", kafkaBroker + ":" + kafkaPort); props.put("request.required.acks", "all"); this.kafkaTopicBase = kafkaTopicBase; kafkaProducer = new KafkaProducer<String, String>(props, new StringSerializer(), new StringSerializer()); sortedMapper = new ObjectMapper(); sortedMapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true); unsortedMapper = new ObjectMapper(); responses = new ArrayList<Future<RecordMetadata>>(); }
/** * Configure the kafka producer. * * @return props to use for producer when no properties file is given to ctor. */ protected Map<String, Object> producer() { Map<String, Object> props = new HashMap<>(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); return props; }
@Test public void testInterceptorConstructClose() throws Exception { try { Properties props = new Properties(); // test with client ID assigned by KafkaProducer props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName()); props.setProperty(MockProducerInterceptor.APPEND_STRING_PROP, "something"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>( props, new StringSerializer(), new StringSerializer()); Assert.assertEquals(1, MockProducerInterceptor.INIT_COUNT.get()); Assert.assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get()); producer.close(); Assert.assertEquals(1, MockProducerInterceptor.INIT_COUNT.get()); Assert.assertEquals(1, MockProducerInterceptor.CLOSE_COUNT.get()); } finally { // cleanup since we are using mutable static variables in MockProducerInterceptor MockProducerInterceptor.resetCounters(); } }
@Test public void testCreateTopic(TestContext ctx) throws Exception { final String topicName = "testCreateTopic"; Properties config = kafkaCluster.useTo().getProducerProperties("the_producer"); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); AdminUtils adminUtils = AdminUtils.create(vertx, zookeeperHosts, false); Async createAsync = ctx.async(); adminUtils.createTopic(topicName, 1, 1, ctx.asyncAssertSuccess( res -> createAsync.complete()) ); createAsync.awaitSuccess(10000); Async deleteAsync = ctx.async(); adminUtils.deleteTopic(topicName, ctx.asyncAssertSuccess(res -> deleteAsync.complete())); deleteAsync.awaitSuccess(10000); }
@Test public void testCreateTopicWithTooManyReplicas(TestContext ctx) throws Exception { final String topicName = "testCreateTopicWithTooManyReplicas"; Properties config = kafkaCluster.useTo().getProducerProperties("the_producer"); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Async async = ctx.async(); AdminUtils adminUtils = AdminUtils.create(vertx, zookeeperHosts, true); adminUtils.createTopic(topicName, 1, 2, ctx.asyncAssertFailure( res -> { ctx.assertEquals("Replication factor: 2 larger than available brokers: 1.", res.getLocalizedMessage(), "Topic creation must fail: only one Broker present, but two replicas requested"); async.complete(); }) ); async.awaitSuccess(10000); }
@Test public void testTopicExists(TestContext ctx) throws Exception { final String topicName = "testTopicExists"; Properties config = kafkaCluster.useTo().getProducerProperties("the_producer"); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Async createAsync = ctx.async(); AdminUtils adminUtils = AdminUtils.create(vertx, zookeeperHosts, false); adminUtils.createTopic(topicName, 2, 1, ctx.asyncAssertSuccess( res -> createAsync.complete()) ); createAsync.awaitSuccess(10000); Async existsAndDeleteAsync = ctx.async(2); adminUtils.topicExists(topicName, ctx.asyncAssertSuccess(res -> existsAndDeleteAsync.countDown())); adminUtils.deleteTopic(topicName, ctx.asyncAssertSuccess(res -> existsAndDeleteAsync.countDown())); existsAndDeleteAsync.awaitSuccess(10000); }
@Test public void testTopicExistsNonExisting(TestContext ctx) throws Exception { final String topicName = "testTopicExistsNonExisting"; Properties config = kafkaCluster.useTo().getProducerProperties("the_producer"); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Async createAsync = ctx.async(); AdminUtils adminUtils = AdminUtils.create(vertx, zookeeperHosts, true); adminUtils.topicExists(topicName, ctx.asyncAssertSuccess(res -> { ctx.assertFalse(res, "Topic must not exist"); createAsync.complete(); }) ); createAsync.awaitSuccess(10000); }
@Test public void testDeleteTopic(TestContext ctx) throws Exception { final String topicName = "testDeleteTopic"; Properties config = kafkaCluster.useTo().getProducerProperties("the_producer"); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Async async = ctx.async(); AdminUtils adminUtils = AdminUtils.create(vertx, zookeeperHosts, false); adminUtils.createTopic(topicName, 1, 1, ctx.asyncAssertSuccess( res -> async.complete()) ); async.awaitSuccess(10000); Async deleteAsync = ctx.async(); adminUtils.deleteTopic(topicName, ctx.asyncAssertSuccess(res -> deleteAsync.complete())); deleteAsync.awaitSuccess(10000); }