private static void produceRecords(String bootstrapServers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); Producer<Long, byte[]> producer = new KafkaProducer<>(properties); LongStream.rangeClosed(1, 100).boxed() .map(number -> new ProducerRecord<>( TOPIC, //topic number, //key String.format("record-%s", number.toString()).getBytes())) //value .forEach(record -> producer.send(record)); producer.close(); }
private void produceToStreamOne() throws ExecutionException, InterruptedException { IntegrationTestUtils.produceKeyValuesSynchronously( streamOneInput, Arrays.asList( new KeyValue<>(10L, 1), new KeyValue<>(5L, 2), new KeyValue<>(12L, 3), new KeyValue<>(15L, 4), new KeyValue<>(20L, 5), new KeyValue<Long, Integer>(70L, null)), // nulls should be filtered TestUtils.producerConfig( CLUSTER.bootstrapServers(), LongSerializer.class, IntegerSerializer.class, new Properties()), mockTime); }
private void produceTopicValues(final String topic) throws java.util.concurrent.ExecutionException, InterruptedException { IntegrationTestUtils.produceKeyValuesSynchronously( topic, Arrays.asList( new KeyValue<>("a", 1L), new KeyValue<>("b", 2L), new KeyValue<>("c", 3L), new KeyValue<>("d", 4L), new KeyValue<>("e", 5L)), TestUtils.producerConfig( CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class, new Properties()), mockTime); }
private void produceGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException { IntegrationTestUtils.produceKeyValuesSynchronously( globalOne, Arrays.asList( new KeyValue<>(1L, "F"), new KeyValue<>(2L, "G"), new KeyValue<>(3L, "H"), new KeyValue<>(4L, "I"), new KeyValue<>(5L, "J")), TestUtils.producerConfig( CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class, new Properties()), mockTime); }
@BeforeClass public static void setupConfigsAndUtils() throws Exception { PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all"); PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0); PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-result-consumer"); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); }
private void prepareInputData() throws Exception { CLUSTER.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN); final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class); mockTime.sleep(10); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, mockTime.milliseconds()); mockTime.sleep(10); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, mockTime.milliseconds()); mockTime.sleep(10); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, mockTime.milliseconds()); mockTime.sleep(10); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, mockTime.milliseconds()); mockTime.sleep(10); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, mockTime.milliseconds()); mockTime.sleep(10); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, mockTime.milliseconds()); mockTime.sleep(1); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, mockTime.milliseconds()); mockTime.sleep(1); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, mockTime.milliseconds()); mockTime.sleep(1); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, mockTime.milliseconds()); mockTime.sleep(1); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, mockTime.milliseconds()); }
private RealTimeTradeProducer(int index, String broker, String topic, int tradesPerSecond, int keysFrom, int keysTo) throws IOException, URISyntaxException { if (tradesPerSecond <= 0) { throw new RuntimeException("tradesPerSecond=" + tradesPerSecond); } this.index = index; this.topic = topic; this.tradesPerSecond = tradesPerSecond; tickers = new String[keysTo - keysFrom]; Arrays.setAll(tickers, i -> "T-" + Integer.toString(i + keysFrom)); Properties props = new Properties(); props.setProperty("bootstrap.servers", broker); props.setProperty("key.serializer", LongSerializer.class.getName()); props.setProperty("value.serializer", TradeSerializer.class.getName()); this.producer = new KafkaProducer<>(props); }
@Test public void testSinkDisplayData() { try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) { KafkaIO.Write<Integer, Long> write = KafkaIO.<Integer, Long>write() .withBootstrapServers("myServerA:9092,myServerB:9092") .withTopic("myTopic") .withValueSerializer(LongSerializer.class) .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey)); DisplayData displayData = DisplayData.from(write); assertThat(displayData, hasDisplayItem("topic", "myTopic")); assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092")); assertThat(displayData, hasDisplayItem("retries", 3)); } }
MockProducerWrapper() { producerKey = String.valueOf(ThreadLocalRandom.current().nextLong()); mockProducer = new MockProducer<Integer, Long>( false, // disable synchronous completion of send. see ProducerSendCompletionThread below. new IntegerSerializer(), new LongSerializer()) { // override flush() so that it does not complete all the waiting sends, giving a chance to // ProducerCompletionThread to inject errors. @Override public void flush() { while (completeNext()) { // there are some uncompleted records. let the completion thread handle them. try { Thread.sleep(10); } catch (InterruptedException e) { // ok to retry. } } } }; // Add the producer to the global map so that producer factory function can access it. assertNull(MOCK_PRODUCER_MAP.putIfAbsent(producerKey, mockProducer)); }
private void produceInitialGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException { IntegrationTestUtils.produceKeyValuesSynchronously( globalOne, Arrays.asList( new KeyValue<>(1L, "A"), new KeyValue<>(2L, "B"), new KeyValue<>(3L, "C"), new KeyValue<>(4L, "D")), TestUtils.producerConfig( CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class, new Properties()), mockTime); }
private void writeInputData(final List<KeyValue<Long, Long>> records) throws Exception { IntegrationTestUtils.produceKeyValuesSynchronously( MULTI_PARTITION_INPUT_TOPIC, records, TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class), CLUSTER.time ); }
public void run(Configuration configuration, Environment environment) throws Exception { final CollectorRegistry collectorRegistry = new CollectorRegistry(); collectorRegistry.register(new DropwizardExports(environment.metrics())); environment.admin() .addServlet("metrics", new MetricsServlet(collectorRegistry)) .addMapping("/metrics"); final PrometheusMetricsReporter reporter = PrometheusMetricsReporter.newMetricsReporter() .withCollectorRegistry(collectorRegistry) .withConstLabel("service", getName()) .build(); final Tracer tracer = getTracer(); final Tracer metricsTracer = io.opentracing.contrib.metrics.Metrics.decorate(tracer, reporter); GlobalTracer.register(metricsTracer); final DynamicFeature tracing = new ServerTracingDynamicFeature.Builder(metricsTracer).build(); environment.jersey().register(tracing); final Properties producerConfigs = new Properties(); producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "tweets-kafka:9092"); producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all"); producerConfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); final KafkaProducer<Long, String> kafkaProducer = new KafkaProducer<>(producerConfigs, new LongSerializer(), new StringSerializer()); final Producer<Long, String> tracingKafkaProducer = new TracingKafkaProducer<>(kafkaProducer, metricsTracer); final ObjectMapper objectMapper = environment.getObjectMapper(); final TweetEventRepository tweetRepository = new KafkaTweetEventRepository(tracingKafkaProducer, objectMapper); final TweetsService tweetsService = new TweetsService(tweetRepository); final TweetsResource tweetsResource = new TweetsResource(tweetsService); environment.jersey().register(tweetsResource); }
private TradeProducer(String broker) { loadTickers(); Properties props = new Properties(); props.setProperty("bootstrap.servers", broker); props.setProperty("key.serializer", LongSerializer.class.getName()); props.setProperty("value.serializer", TradeSerializer.class.getName()); producer = new KafkaProducer<>(props); }
private static Producer<Object, Object> createProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094"); props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaMDWProducer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); return new KafkaProducer<>(props); }
@Test public void testSink() throws Exception { // Simply read from kafka source and write to kafka sink. Then verify the records // are correctly published to mock kafka producer. int numElements = 1000; try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) { ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread(producerWrapper.mockProducer).start(); String topic = "test"; p .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) .withoutMetadata()) .apply(KafkaIO.<Integer, Long>write() .withBootstrapServers("none") .withTopic(topic) .withKeySerializer(IntegerSerializer.class) .withValueSerializer(LongSerializer.class) .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey))); p.run(); completionThread.shutdown(); verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, false); } }
@Test public void testValuesSink() throws Exception { // similar to testSink(), but use values()' interface. int numElements = 1000; try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) { ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread(producerWrapper.mockProducer).start(); String topic = "test"; p .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) .withoutMetadata()) .apply(Values.<Long>create()) // there are no keys .apply(KafkaIO.<Integer, Long>write() .withBootstrapServers("none") .withTopic(topic) .withValueSerializer(LongSerializer.class) .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey)) .values()); p.run(); completionThread.shutdown(); verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, true); } }
private void runSimpleCopyTest(final int numberOfRestarts, final String inputTopic, final String throughTopic, final String outputTopic) throws Exception { final KStreamBuilder builder = new KStreamBuilder(); final KStream<Long, Long> input = builder.stream(inputTopic); KStream<Long, Long> output = input; if (throughTopic != null) { output = input.through(throughTopic); } output.to(outputTopic); for (int i = 0; i < numberOfRestarts; ++i) { final long factor = i; final KafkaStreams streams = new KafkaStreams( builder, StreamsTestUtils.getStreamsConfig( applicationId, CLUSTER.bootstrapServers(), Serdes.LongSerde.class.getName(), Serdes.LongSerde.class.getName(), new Properties() { { put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1); put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); } })); try { streams.start(); final List<KeyValue<Long, Long>> inputData = prepareData(factor * 100, factor * 100 + 10L, 0L, 1L); IntegrationTestUtils.produceKeyValuesSynchronously( inputTopic, inputData, TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class), CLUSTER.time ); final List<KeyValue<Long, Long>> committedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( TestUtils.consumerConfig( CLUSTER.bootstrapServers(), CONSUMER_GROUP_ID, LongDeserializer.class, LongDeserializer.class, new Properties() { { put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)); } }), inputTopic, inputData.size() ); checkResultPerKey(committedRecords, inputData); } finally { streams.close(); } } }
@Test public void shouldBeAbleToPerformMultipleTransactions() throws Exception { final KStreamBuilder builder = new KStreamBuilder(); builder.stream(SINGLE_PARTITION_INPUT_TOPIC).to(SINGLE_PARTITION_OUTPUT_TOPIC); final KafkaStreams streams = new KafkaStreams( builder, StreamsTestUtils.getStreamsConfig( applicationId, CLUSTER.bootstrapServers(), Serdes.LongSerde.class.getName(), Serdes.LongSerde.class.getName(), new Properties() { { put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); } })); try { streams.start(); final List<KeyValue<Long, Long>> firstBurstOfData = prepareData(0L, 5L, 0L); final List<KeyValue<Long, Long>> secondBurstOfData = prepareData(5L, 8L, 0L); IntegrationTestUtils.produceKeyValuesSynchronously( SINGLE_PARTITION_INPUT_TOPIC, firstBurstOfData, TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class), CLUSTER.time ); final List<KeyValue<Long, Long>> firstCommittedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( TestUtils.consumerConfig( CLUSTER.bootstrapServers(), CONSUMER_GROUP_ID, LongDeserializer.class, LongDeserializer.class, new Properties() { { put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)); } }), SINGLE_PARTITION_OUTPUT_TOPIC, firstBurstOfData.size() ); assertThat(firstCommittedRecords, equalTo(firstBurstOfData)); IntegrationTestUtils.produceKeyValuesSynchronously( SINGLE_PARTITION_INPUT_TOPIC, secondBurstOfData, TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class), CLUSTER.time ); final List<KeyValue<Long, Long>> secondCommittedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( TestUtils.consumerConfig( CLUSTER.bootstrapServers(), CONSUMER_GROUP_ID, LongDeserializer.class, LongDeserializer.class, new Properties() { { put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)); } }), SINGLE_PARTITION_OUTPUT_TOPIC, secondBurstOfData.size() ); assertThat(secondCommittedRecords, equalTo(secondBurstOfData)); } finally { streams.close(); } }
@Test public void testKafkaBinderConfigurationWithKafkaProperties() throws Exception { assertNotNull(this.kafkaMessageChannelBinder); ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>( new KafkaProducerProperties()); Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory", String.class, ExtendedProducerProperties.class); getProducerFactoryMethod.setAccessible(true); DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod .invoke(this.kafkaMessageChannelBinder, "foo", producerProperties); Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs", Map.class); ReflectionUtils.makeAccessible(producerFactoryConfigField); Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField, producerFactory); assertTrue(producerConfigs.get("batch.size").equals(10)); assertTrue(producerConfigs.get("key.serializer").equals(LongSerializer.class)); assertTrue(producerConfigs.get("key.deserializer") == null); assertTrue(producerConfigs.get("value.serializer").equals(LongSerializer.class)); assertTrue(producerConfigs.get("value.deserializer") == null); assertTrue(producerConfigs.get("compression.type").equals("snappy")); List<String> bootstrapServers = new ArrayList<>(); bootstrapServers.add("10.98.09.199:9092"); bootstrapServers.add("10.98.09.196:9092"); assertTrue((((List<String>) producerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers))); Method createKafkaConsumerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod( "createKafkaConsumerFactory", boolean.class, String.class, ExtendedConsumerProperties.class); createKafkaConsumerFactoryMethod.setAccessible(true); ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>( new KafkaConsumerProperties()); DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) createKafkaConsumerFactoryMethod .invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties); Field consumerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class, "configs", Map.class); ReflectionUtils.makeAccessible(consumerFactoryConfigField); Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField, consumerFactory); assertTrue(consumerConfigs.get("key.deserializer").equals(LongDeserializer.class)); assertTrue(consumerConfigs.get("key.serializer") == null); assertTrue(consumerConfigs.get("value.deserializer").equals(LongDeserializer.class)); assertTrue(consumerConfigs.get("value.serialized") == null); assertTrue(consumerConfigs.get("group.id").equals("groupIdFromBootConfig")); assertTrue(consumerConfigs.get("auto.offset.reset").equals("earliest")); assertTrue((((List<String>) consumerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers))); }
public KafkaSubscriberBlackboxTest() { super(new TestEnvironment()); mockProducer = new MockProducer<Long, Long>(true, new LongSerializer(), new LongSerializer()); }
public KafkaSubscriberWhiteboxTest() { super(new TestEnvironment()); mockProducer = new MockProducer<Long, Long>(true, new LongSerializer(), new LongSerializer()); }
@Test public void testEOSink() { // testSink() with EOS enabled. // This does not actually inject retries in a stage to test exactly-once-semantics. // It mainly exercises the code in normal flow without retries. // Ideally we should test EOS Sink by triggering replays of a messages between stages. // It is not feasible to test such retries with direct runner. When DoFnTester supports // state, we can test KafkaEOWriter DoFn directly to ensure it handles retries correctly. if (!ProducerSpEL.supportsTransactions()) { LOG.warn("testEOSink() is disabled as Kafka client version does not support transactions."); return; } int numElements = 1000; try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) { ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread(producerWrapper.mockProducer).start(); String topic = "test"; p .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) .withoutMetadata()) .apply(KafkaIO.<Integer, Long>write() .withBootstrapServers("none") .withTopic(topic) .withKeySerializer(IntegerSerializer.class) .withValueSerializer(LongSerializer.class) .withEOS(1, "test") .withConsumerFactoryFn(new ConsumerFactoryFn( Lists.newArrayList(topic), 10, 10, OffsetResetStrategy.EARLIEST)) .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey))); p.run(); completionThread.shutdown(); verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, false); } }
@Test public void testSinkWithSendErrors() throws Throwable { // similar to testSink(), except that up to 10 of the send calls to producer will fail // asynchronously. // TODO: Ideally we want the pipeline to run to completion by retrying bundles that fail. // We limit the number of errors injected to 10 below. This would reflect a real streaming // pipeline. But I am sure how to achieve that. For now expect an exception: thrown.expect(InjectedErrorException.class); thrown.expectMessage("Injected Error #1"); int numElements = 1000; try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) { ProducerSendCompletionThread completionThreadWithErrors = new ProducerSendCompletionThread(producerWrapper.mockProducer, 10, 100).start(); String topic = "test"; p .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) .withoutMetadata()) .apply(KafkaIO.<Integer, Long>write() .withBootstrapServers("none") .withTopic(topic) .withKeySerializer(IntegerSerializer.class) .withValueSerializer(LongSerializer.class) .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey))); try { p.run(); } catch (PipelineExecutionException e) { // throwing inner exception helps assert that first exception is thrown from the Sink throw e.getCause().getCause(); } finally { completionThreadWithErrors.shutdown(); } } }
@Test public void testSinkMetrics() throws Exception { // Simply read from kafka source and write to kafka sink. Then verify the metrics are reported. int numElements = 1000; try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) { ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread(producerWrapper.mockProducer).start(); String topic = "test"; p .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) .withoutMetadata()) .apply("writeToKafka", KafkaIO.<Integer, Long>write() .withBootstrapServers("none") .withTopic(topic) .withKeySerializer(IntegerSerializer.class) .withValueSerializer(LongSerializer.class) .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey))); PipelineResult result = p.run(); MetricName elementsWritten = SinkMetrics.elementsWritten().getName(); MetricQueryResults metrics = result.metrics().queryMetrics( MetricsFilter.builder() .addNameFilter(MetricNameFilter.inNamespace(elementsWritten.namespace())) .build()); assertThat(metrics.counters(), hasItem( attemptedMetricsResult( elementsWritten.namespace(), elementsWritten.name(), "writeToKafka", 1000L))); completionThread.shutdown(); } }