public void publishDummyDataNumbers() { final String topic = "NumbersTopic"; // Create publisher final Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); final KafkaProducer<Integer, Integer> producer = new KafkaProducer<>(config); for (int value = 0; value < 10000; value++) { producer.send(new ProducerRecord<>(topic, value, value)); } producer.flush(); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<Integer, String> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 100).boxed() .map(number -> new ProducerRecord<>( TOPIC, number, //Key String.format("record-%s", number))) //Value .forEach(record -> producer.send(record)); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<Integer, String> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 10000).boxed() .map(number -> new ProducerRecord<>( TOPIC, 1, //Key String.format("record-%s", number))) //Value .forEach(record -> producer.send(record)); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<Integer, String> producer = new KafkaProducer<>(properties); IntStream .rangeClosed(1, 100000).boxed() .map(number -> new ProducerRecord<>( TOPIC, 1, //Key String.format("record-%s", number))) //Value .forEach(record -> producer.send(record)); producer.close(); }
private static void produceRecords() { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); Producer<Integer, byte[]> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 10000).boxed() .map(number -> new ProducerRecord<>( TOPIC, 1, //Key KafkaProducerUtil.createMessage(1000))) //Value .forEach(record -> { producer.send(record); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); Producer<Integer, byte[]> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 10000).boxed() .map(number -> new ProducerRecord<>( TOPIC, 1, //Key KafkaProducerUtil.createMessage(1000))) //Value .forEach(record -> { producer.send(record); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.ACKS_CONFIG, "all"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<Integer, String> producer = new KafkaProducer<>(properties); IntStream.rangeClosed(1, 100).boxed() .map(number -> new ProducerRecord<>( TOPIC, number, //Key String.format("record-%s", number))) //Value .forEach(record -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } producer.send(record); }); producer.close(); }
private void produceMessages(final long timestamp) throws ExecutionException, InterruptedException { IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( streamOneInput, Arrays.asList( new KeyValue<>(1, "A"), new KeyValue<>(2, "B"), new KeyValue<>(3, "C"), new KeyValue<>(4, "D"), new KeyValue<>(5, "E")), TestUtils.producerConfig( CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, new Properties()), timestamp); }
private void produceStreamTwoInputTo(final String streamTwoInput) throws ExecutionException, InterruptedException { IntegrationTestUtils.produceKeyValuesSynchronously( streamTwoInput, Arrays.asList( new KeyValue<>(1, "A"), new KeyValue<>(2, "B"), new KeyValue<>(3, "C"), new KeyValue<>(4, "D"), new KeyValue<>(5, "E")), TestUtils.producerConfig( CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, new Properties()), mockTime); }
private void produceToStreamOne() throws ExecutionException, InterruptedException { IntegrationTestUtils.produceKeyValuesSynchronously( streamOneInput, Arrays.asList( new KeyValue<>(10L, 1), new KeyValue<>(5L, 2), new KeyValue<>(12L, 3), new KeyValue<>(15L, 4), new KeyValue<>(20L, 5), new KeyValue<Long, Integer>(70L, null)), // nulls should be filtered TestUtils.producerConfig( CLUSTER.bootstrapServers(), LongSerializer.class, IntegerSerializer.class, new Properties()), mockTime); }
private void produceMessages(long timestamp) throws ExecutionException, InterruptedException { IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( streamOneInput, Arrays.asList( new KeyValue<>(1, "A"), new KeyValue<>(2, "B"), new KeyValue<>(3, "C"), new KeyValue<>(4, "D"), new KeyValue<>(5, "E")), TestUtils.producerConfig( CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, new Properties()), timestamp); }
private Properties setProduceConsumeProperties(final String clientId) { Properties props = new Properties(); props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); // the socket buffer needs to be large, especially when running in AWS with // high latency. if running locally the default is fine. props.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // the socket buffer needs to be large, especially when running in AWS with // high latency. if running locally the default is fine. props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS); return props; }
@Test public void testSinkWithInteger() throws InterruptedException { KafkaUsage usage = new KafkaUsage(); String topic = UUID.randomUUID().toString(); CountDownLatch latch = new CountDownLatch(1); AtomicInteger expected = new AtomicInteger(2); usage.consumeIntegers(topic, 10, 10, TimeUnit.SECONDS, latch::countDown, (k, v) -> v == expected.getAndIncrement()); KafkaSink<Integer> sink = new KafkaSink<>(vertx, getKafkaConfig() .put("topic", topic) .put("value.serializer", IntegerSerializer.class.getName()) .put("value.deserializer", IntegerDeserializer.class.getName()) ); Source.from(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) .transformPayload(i -> i + 1) .to(sink); assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue(); }
@Test public void testSource() throws InterruptedException { KafkaUsage usage = new KafkaUsage(); String topic = UUID.randomUUID().toString(); List<Integer> results = new ArrayList<>(); KafkaSource<Integer> source = new KafkaSource<>(vertx, getKafkaConfig() .put("topic", topic) .put("value.serializer", IntegerSerializer.class.getName()) .put("value.deserializer", IntegerDeserializer.class.getName()) ); source .transformPayload(i -> i + 1) .to(Sink.forEachPayload(results::add)); AtomicInteger counter = new AtomicInteger(); usage.produceIntegers(10, null, () -> new ProducerRecord<>(topic, counter.getAndIncrement())); await().atMost(1, TimeUnit.MINUTES).until(() -> results.size() >= 10); assertThat(results).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); }
@Test public void testCommonHeaders(TestContext context) throws InterruptedException { Async async = context.async(); KafkaUsage usage = new KafkaUsage(); String topic = UUID.randomUUID().toString(); KafkaSource<Integer> source = new KafkaSource<>(vertx, getKafkaConfig() .put("topic", topic) .put("value.serializer", IntegerSerializer.class.getName()) .put("value.deserializer", IntegerDeserializer.class.getName()) ); source .to(data -> { KafkaConsumerRecord record = original(data); assertThat(record).isNotNull(); assertThat(key(data)).isNotNull(); async.complete(); return complete(); }); usage.produceIntegers(1, null, () -> new ProducerRecord<>(topic, "key", 1)); }
@Test public void testMulticastWithBufferSize() throws InterruptedException { KafkaUsage usage = new KafkaUsage(); String topic = UUID.randomUUID().toString(); KafkaSource<Integer> source = new KafkaSource<>(vertx, getKafkaConfig() .put("topic", topic) .put("value.serializer", IntegerSerializer.class.getName()) .put("value.deserializer", IntegerDeserializer.class.getName()) .put("multicast.buffer.size", 20) ); assertThat(source).isNotNull(); checkMulticast(usage, topic, source); }
@Test public void testMulticastWithTime() throws InterruptedException { KafkaUsage usage = new KafkaUsage(); String topic = UUID.randomUUID().toString(); KafkaSource<Integer> source = new KafkaSource<>(vertx, getKafkaConfig() .put("topic", topic) .put("value.serializer", IntegerSerializer.class.getName()) .put("value.deserializer", IntegerDeserializer.class.getName()) .put("multicast.buffer.period.ms", 2000) ); assertThat(source).isNotNull(); checkMulticast(usage, topic, source); }
@Test public void testAutoCommit() throws Exception { LOG.info("Start testAutoCommit"); ContainerProperties containerProps = new ContainerProperties("topic3", "topic4"); final CountDownLatch latch = new CountDownLatch(4); containerProps.setMessageListener((MessageListener<Integer, String>) message -> { LOG.info("received: " + message); latch.countDown(); }); KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps, IntegerDeserializer.class, StringDeserializer.class); container.setBeanName("testAutoCommit"); container.start(); Thread.sleep(5000); // wait a bit for the container to start KafkaTemplate<Integer, String> template = createTemplate(IntegerSerializer.class, StringSerializer.class); template.setDefaultTopic("topic3"); template.sendDefault(0, "foo"); template.sendDefault(2, "bar"); template.sendDefault(0, "baz"); template.sendDefault(2, "qux"); template.flush(); assertTrue(latch.await(60, TimeUnit.SECONDS)); container.stop(); LOG.info("Stop testAutoCommit"); }
/** * get kafkaProducer * Producer端的常用配置 bootstrap.servers:Kafka集群连接串,可以由多个host:port组成 acks:broker消息确认的模式,有三种: 0:不进行消息接收确认,即Client端发送完成后不会等待Broker的确认 1:由Leader确认,Leader接收到消息后会立即返回确认信息 all:集群完整确认,Leader会等待所有in-sync的follower节点都确认收到消息后,再返回确认信息 我们可以根据消息的重要程度,设置不同的确认模式。默认为1 retries:发送失败时Producer端的重试次数,默认为0 batch.size:当同时有大量消息要向同一个分区发送时,Producer端会将消息打包后进行批量发送。如果设置为0,则每条消息都独立发送。默认为16384字节 linger.ms:发送消息前等待的毫秒数,与batch.size配合使用。在消息负载不高的情况下,配置linger.ms能够让Producer在发送消息前等待一定时间,以积累更多的消息打包发送,达到节省网络资源的目的。默认为0 key.serializer/value.serializer:消息key/value的序列器Class,根据key和value的类型决定 buffer.memory:消息缓冲池大小。尚未被发送的消息会保存在Producer的内存中,如果消息产生的速度大于消息发送的速度,那么缓冲池满后发送消息的请求会被阻塞。默认33554432字节(32MB) * * * @return */ private static KafkaProducer<Integer, String> getProducer() { Properties properties = new Properties(); //bootstrap.servers // properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "123.56.118.135:9092,123.56.118.135:9093"); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "123.56.118.135:9092,123.56.118.135:9093"); //client.id properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerTest"); //batch.size 当同时有大量消息要向同一个分区发送时,Producer端会将消息打包后进行批量发送。如果设置为0,则每条消息都独立发送。默认为16384字节 properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384); //发送消息前等待的毫秒数,与batch.size配合使用。在消息负载不高的情况下,配置linger.ms能够让Producer在发送消息前等待一定时间,以积累更多的消息打包发送,达到节省网络资源的目的。默认为0 properties.put(ProducerConfig.LINGER_MS_CONFIG,5000); //retries:发送失败时Producer端的重试次数,默认为0 properties.put(ProducerConfig.RETRIES_CONFIG,0); //消息缓冲池大小。尚未被发送的消息会保存在Producer的内存中,如果消息产生的速度大于消息发送的速度,那么缓冲池满后发送消息的请求会被阻塞。默认33554432字节 properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432); //key 和 value serializer的类 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); return new KafkaProducer(properties); }
public static void main(String[] args) { String topic = "persistent://sample/standalone/ns/my-topic"; Properties props = new Properties(); props.put("bootstrap.servers", "pulsar://localhost:6650"); props.put("key.serializer", IntegerSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); Producer<Integer, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<Integer, String>(topic, i, Integer.toString(i))); log.info("Message {} sent successfully", i); } producer.close(); }
public static void main(String[] args) throws IOException { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka0:19092"); props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper0:12181/kafka"); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); TopologyBuilder builder = new TopologyBuilder(); builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "words") .addProcessor("WordCountProcessor", WordCountProcessor::new, "SOURCE") .addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "WordCountProcessor") // .connectProcessorAndStateStores("WordCountProcessor", "Counts") .addSink("SINK", "count", new StringSerializer(), new IntegerSerializer(), "WordCountProcessor"); KafkaStreams stream = new KafkaStreams(builder, props); stream.start(); System.in.read(); stream.close(); stream.cleanUp(); }
MockProducerWrapper() { producerKey = String.valueOf(ThreadLocalRandom.current().nextLong()); mockProducer = new MockProducer<Integer, Long>( false, // disable synchronous completion of send. see ProducerSendCompletionThread below. new IntegerSerializer(), new LongSerializer()) { // override flush() so that it does not complete all the waiting sends, giving a chance to // ProducerCompletionThread to inject errors. @Override public void flush() { while (completeNext()) { // there are some uncompleted records. let the completion thread handle them. try { Thread.sleep(10); } catch (InterruptedException e) { // ok to retry. } } } }; // Add the producer to the global map so that producer factory function can access it. assertNull(MOCK_PRODUCER_MAP.putIfAbsent(producerKey, mockProducer)); }
public GeneratorProducer(Properties configuration, String topic, int instances, int messagesPerGroup, Duration producerSlowDown, Duration shutdownTimeout, StateDao stateDao) { logger.info("Starting instance"); this.topic = topic; this.messagesPerGroup = messagesPerGroup; this.producerSlowDown = producerSlowDown; this.shutdownTimeout = shutdownTimeout; this.stateDao = stateDao; this.producer = new KafkaProducer<>(configuration, new StringSerializer(), new IntegerSerializer()); this.executor = Executors.newFixedThreadPool(instances, new ThreadFactoryBuilder() .setNameFormat(getClass().getSimpleName() + "-" + instanceName + "-worker-" + "%d") .setDaemon(false) .build()); IntStream.range(0, instances) .forEach((v) -> executor.submit(this)); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.ACKS_CONFIG, "1"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<Integer, String> producer = new KafkaProducer<>(properties); RecordsProducer.produce("kafka_producer_ack_one_latency", producer, TOPIC); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.ACKS_CONFIG, "0"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<Integer, String> producer = new KafkaProducer<>(properties); RecordsProducer.produce("kafka_producer_ack_zero_latency", producer, TOPIC); producer.close(); }
private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.ACKS_CONFIG, "all"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<Integer, String> producer = new KafkaProducer<>(properties); RecordsProducer.produce("kafka_producer_ack_all_latency", producer, TOPIC); producer.close(); }
@Test public void shouldProcessRecordsForOtherTopic() throws Exception { final byte[] integerBytes = new IntegerSerializer().serialize("foo", 1); globalStateTask.initialize(); globalStateTask.update(new ConsumerRecord<>("t2", 1, 1, integerBytes, integerBytes)); assertEquals(1, sourceTwo.numReceived); assertEquals(0, sourceOne.numReceived); }
public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; }
@Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 20000000); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; }
/** * 构造KafkaProducer * * @return KafkaProducer */ private KafkaProducer<Integer, String> getProducer() { Properties properties = new Properties(); //bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "123.56.118.135:9092"); //client.id properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerTest"); //key 和 value serializer的类 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); return new KafkaProducer(properties); }
@Test public void testSimpleProducer() throws Exception { String topic = "persistent://sample/standalone/ns/testSimpleProducer"; Consumer pulsarConsumer = pulsarClient.subscribe(topic, "my-subscription"); Properties props = new Properties(); props.put("bootstrap.servers", lookupUrl.toString()); props.put("key.serializer", IntegerSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); Producer<Integer, String> producer = new PulsarKafkaProducer<>(props); for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i)); } producer.flush(); producer.close(); for (int i = 0; i < 10; i++) { Message msg = pulsarConsumer.receive(1, TimeUnit.SECONDS); assertEquals(new String(msg.getData()), "hello-" + i); pulsarConsumer.acknowledge(msg); } }
@Test(timeOut = 10000) public void testProducerCallback() throws Exception { String topic = "persistent://sample/standalone/ns/testProducerCallback"; Consumer pulsarConsumer = pulsarClient.subscribe(topic, "my-subscription"); Properties props = new Properties(); props.put("bootstrap.servers", lookupUrl.toString()); props.put("key.serializer", IntegerSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); Producer<Integer, String> producer = new PulsarKafkaProducer<>(props); CountDownLatch counter = new CountDownLatch(10); for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i), (metadata, exception) -> { assertEquals(metadata.topic(), topic); assertNull(exception); counter.countDown(); }); } counter.await(); for (int i = 0; i < 10; i++) { Message msg = pulsarConsumer.receive(1, TimeUnit.SECONDS); assertEquals(new String(msg.getData()), "hello-" + i); pulsarConsumer.acknowledge(msg); } producer.close(); }
@Bean public Map<String, Object> producerConfigs() { HashMap<String, Object> props = new HashMap<>(); // list of host:port pairs used for establishing the initial connections // to the Kakfa cluster props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // value to block, after which it will throw a TimeoutException props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000); return props; }
@Test public void testSink() throws Exception { // Simply read from kafka source and write to kafka sink. Then verify the records // are correctly published to mock kafka producer. int numElements = 1000; try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) { ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread(producerWrapper.mockProducer).start(); String topic = "test"; p .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) .withoutMetadata()) .apply(KafkaIO.<Integer, Long>write() .withBootstrapServers("none") .withTopic(topic) .withKeySerializer(IntegerSerializer.class) .withValueSerializer(LongSerializer.class) .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey))); p.run(); completionThread.shutdown(); verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, false); } }
@Bean public Map<String, Object> producerConfigs() { //FIXME: 12factorize Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerList()); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.ACKS_CONFIG, acks); //props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); //props.put(ProducerConfig.LINGER_MS_CONFIG, 1); //props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; }
private KafkaProducer<Integer, String> getProducer() { if (producer == null) { Properties producerProps = new Properties(); producerProps.setProperty("bootstrap.servers", BROKER_HOST + ':' + brokerPort); producerProps.setProperty("key.serializer", IntegerSerializer.class.getCanonicalName()); producerProps.setProperty("value.serializer", StringSerializer.class.getCanonicalName()); producer = new KafkaProducer<>(producerProps); } return producer; }
public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamingConfig.JOB_ID_CONFIG, "moving-avg-example"); props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); StreamingConfig config = new StreamingConfig(props); KStreamBuilder builder = new KStreamBuilder(); KStream<String, Integer> prices = builder.stream("ks_prices"); KTable<String, String> names = builder.table("ks_names"); KStream<String, Integer> namedPrices = prices.leftJoin(names, (price, name) -> { return new NamedPrice(name, price); }).map((ticket, namedPrice) -> new KeyValue<String, Integer>(namedPrice.name, namedPrice.price)); KTable<Windowed<String>, AvgValue> tempTable = namedPrices.<AvgValue, TumblingWindow>aggregateByKey( () -> new AvgAggregator<String, Integer, AvgValue>(), TumblingWindows.of("avgWindow").with(10000), new StringSerializer(), new AvgValueSerializer(), new StringDeserializer(), new AvgValueDeserializer()); // Should work after we implement "aggregateByKey KTable<Windowed<String>, Double> avg = tempTable.<Double>mapValues((v) -> ((double) v.sum / v.count)); avg.to("ks_avg_prices"); KafkaStreaming kstream = new KafkaStreaming(builder, config); kstream.start(); }
@Test public void shouldStartClusterAndAllowInteractiveProductionAndAutomaticConsumersToUseIt() throws Exception { Testing.Debug.enable(); final String topicName = "topicA"; final CountDownLatch completion = new CountDownLatch(1); final int numMessages = 3; final AtomicLong messagesRead = new AtomicLong(0); // Start a cluster and create a topic ... cluster.deleteDataUponShutdown(false).addBrokers(1).startup(); cluster.createTopics(topicName); // Consume messages asynchronously ... Stopwatch sw = Stopwatch.reusable().start(); cluster.useTo().consumeIntegers(topicName, numMessages, 10, TimeUnit.SECONDS, completion::countDown, (key, value) -> { messagesRead.incrementAndGet(); return true; }); // Produce some messages interactively ... cluster.useTo() .createProducer("manual", new StringSerializer(), new IntegerSerializer()) .write(topicName, "key1", 1) .write(topicName, "key2", 2) .write(topicName, "key3", 3) .close(); // Wait for the consumer to to complete ... if (completion.await(10, TimeUnit.SECONDS)) { sw.stop(); Testing.debug("The consumer completed normally in " + sw.durations()); } else { Testing.debug("Consumer did not completed normally"); } assertThat(messagesRead.get()).isEqualTo(numMessages); }
@Test public void shouldStartClusterAndAllowAsynchronousProductionAndAutomaticConsumersToUseIt() throws Exception { Testing.Debug.enable(); final String topicName = "topicA"; final CountDownLatch completion = new CountDownLatch(2); final int numMessages = 3; final AtomicLong messagesRead = new AtomicLong(0); // Start a cluster and create a topic ... cluster.deleteDataUponShutdown(false).addBrokers(2).startup(); cluster.createTopics(topicName); // Consume messages asynchronously ... Stopwatch sw = Stopwatch.reusable().start(); cluster.useTo().consumeIntegers(topicName, numMessages, 10, TimeUnit.SECONDS, completion::countDown, (key, value) -> { messagesRead.incrementAndGet(); return true; }); // Produce some messages interactively ... cluster.useTo().produce("manual", new StringSerializer(), new IntegerSerializer(), produer -> { produer.write(topicName, "key1", 1); produer.write(topicName, "key2", 2); produer.write(topicName, "key3", 3); completion.countDown(); }); // Wait for the consumer to to complete ... if (completion.await(10, TimeUnit.SECONDS)) { sw.stop(); Testing.debug("The consumer completed normally in " + sw.durations()); } else { Testing.debug("Consumer did not completed normally"); } assertThat(messagesRead.get()).isEqualTo(numMessages); }
static void generate(final String kafka) throws Exception { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { isRunning = false; } }); final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); final KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProps); final Random rand = new Random(System.currentTimeMillis()); int numRecordsProduced = 0; while (isRunning) { final String key = "" + rand.nextInt(MAX_NUMBER_OF_KEYS); final int value = rand.nextInt(10000); final ProducerRecord<String, Integer> record = new ProducerRecord<>("data", key, value); producer.send(record, new Callback() { @Override public void onCompletion(final RecordMetadata metadata, final Exception exception) { if (exception != null) { exception.printStackTrace(); Exit.exit(1); } } }); numRecordsProduced++; if (numRecordsProduced % 1000 == 0) { System.out.println(numRecordsProduced + " records produced"); } Utils.sleep(rand.nextInt(50)); } producer.close(); System.out.println(numRecordsProduced + " records produced"); }