@Test public void testStreamPartitioner() { final RecordCollectorImpl collector = new RecordCollectorImpl( new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer), "RecordCollectorTest-TestStreamPartitioner"); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.send("topic1", "9", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.send("topic1", "27", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.send("topic1", "81", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.send("topic1", "243", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.send("topic1", "28", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.send("topic1", "82", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.send("topic1", "244", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.send("topic1", "245", "0", null, stringSerializer, stringSerializer, streamPartitioner); final Map<TopicPartition, Long> offsets = collector.offsets(); assertEquals((Long) 4L, offsets.get(new TopicPartition("topic1", 0))); assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1))); assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2))); }
@SuppressWarnings("unchecked") @Test public void shouldRetryWhenTimeoutExceptionOccursOnSend() throws Exception { final AtomicInteger attempt = new AtomicInteger(0); final RecordCollectorImpl collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) { if (attempt.getAndIncrement() == 0) { throw new TimeoutException(); } return super.send(record, callback); } }, "test"); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); final Long offset = collector.offsets().get(new TopicPartition("topic1", 0)); assertEquals(Long.valueOf(0L), offset); }
private static void verifyProducerRecords(MockProducer<Integer, Long> mockProducer, String topic, int numElements, boolean keyIsAbsent) { // verify that appropriate messages are written to kafka List<ProducerRecord<Integer, Long>> sent = mockProducer.history(); // sort by values Collections.sort(sent, new Comparator<ProducerRecord<Integer, Long>>() { @Override public int compare(ProducerRecord<Integer, Long> o1, ProducerRecord<Integer, Long> o2) { return Long.compare(o1.value(), o2.value()); } }); for (int i = 0; i < numElements; i++) { ProducerRecord<Integer, Long> record = sent.get(i); assertEquals(topic, record.topic()); if (keyIsAbsent) { assertNull(record.key()); } else { assertEquals(i, record.key().intValue()); } assertEquals(i, record.value().longValue()); } }
MockProducerWrapper() { producerKey = String.valueOf(ThreadLocalRandom.current().nextLong()); mockProducer = new MockProducer<Integer, Long>( false, // disable synchronous completion of send. see ProducerSendCompletionThread below. new IntegerSerializer(), new LongSerializer()) { // override flush() so that it does not complete all the waiting sends, giving a chance to // ProducerCompletionThread to inject errors. @Override public void flush() { while (completeNext()) { // there are some uncompleted records. let the completion thread handle them. try { Thread.sleep(10); } catch (InterruptedException e) { // ok to retry. } } } }; // Add the producer to the global map so that producer factory function can access it. assertNull(MOCK_PRODUCER_MAP.putIfAbsent(producerKey, mockProducer)); }
private MockProducer createProducer() { Collection<PartitionInfo> partitionInfos = getInfos() .values() .stream() .flatMap(Collection::stream) .collect(Collectors.toList()); Cluster cluster = new Cluster( UUID.randomUUID().toString(), Collections.emptyList(), partitionInfos, Collections.emptySet(), Collections.emptySet() ); return new MockProducer(cluster, true, null, new ByteBufferSerializer(), new ByteBufferSerializer()); }
private MockProducer createProducer() { Map<String, List<PartitionInfo>> infos = getInfos(); Collection<PartitionInfo> partitionInfos = new ArrayList<>(); for (List<PartitionInfo> infoEntry : infos.values()) { partitionInfos.addAll(infoEntry); } Cluster cluster = new Cluster(UUID.randomUUID().toString(), Collections.<Node>emptyList(), partitionInfos, Collections.<String>emptySet(), Collections.<String>emptySet()); MockProducer producer = new MockProducer(cluster, true, null, new ByteBufferSerializer(), new ByteBufferSerializer()); PRODUCERS.add(producer); return producer; }
@Test public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() { builder.addSource("source1", "someTopic"); final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId); final StreamThread thread = new StreamThread( builder, new StreamsConfig(configProps(true)), clientSupplier, applicationId, clientId, processId, metrics, mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(); assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0))); assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1))); thread.setPartitionAssignor(new MockStreamsPartitionAssignor(assignment)); thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0))); thread.close(); thread.run(); for (final StreamTask task : thread.tasks().values()) { assertTrue(((MockProducer) ((RecordCollectorImpl) task.recordCollector()).producer()).closed()); } }
@Test public void shouldCloseThreadProducerOnCloseIfEosDisabled() { builder.addSource("source1", "someTopic"); final StreamThread thread = new StreamThread( builder, config, clientSupplier, applicationId, clientId, processId, metrics, mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(); assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0))); assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1))); thread.setPartitionAssignor(new MockStreamsPartitionAssignor(assignment)); thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0))); thread.close(); thread.run(); assertTrue(((MockProducer) thread.threadProducer).closed()); }
@Test public void testSpecificPartition() { final RecordCollectorImpl collector = new RecordCollectorImpl( new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer), "RecordCollectorTest-TestSpecificPartition"); collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer); collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer); collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer); collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer); collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer); collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer); final Map<TopicPartition, Long> offsets = collector.offsets(); assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 0))); assertEquals((Long) 1L, offsets.get(new TopicPartition("topic1", 1))); assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2))); // ignore StreamPartitioner collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer); collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer); collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer); assertEquals((Long) 3L, offsets.get(new TopicPartition("topic1", 0))); assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1))); assertEquals((Long) 1L, offsets.get(new TopicPartition("topic1", 2))); }
@SuppressWarnings("unchecked") @Test(expected = StreamsException.class) public void shouldThrowStreamsExceptionAfterMaxAttempts() throws Exception { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) { throw new TimeoutException(); } }, "test"); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); }
@SuppressWarnings("unchecked") @Test(expected = StreamsException.class) public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() throws Exception { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) { callback.onCompletion(null, new Exception()); return null; } }, "test"); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); }
@SuppressWarnings("unchecked") @Test(expected = StreamsException.class) public void shouldThrowStreamsExceptionOnFlushIfASendFailed() throws Exception { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) { callback.onCompletion(null, new Exception()); return null; } }, "test"); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.flush(); }
@SuppressWarnings("unchecked") @Test(expected = StreamsException.class) public void shouldThrowStreamsExceptionOnCloseIfASendFailed() throws Exception { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) { callback.onCompletion(null, new Exception()); return null; } }, "test"); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.close(); }
@SuppressWarnings("unchecked") @Test(expected = StreamsException.class) public void shouldThrowIfTopicIsUnknown() { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override public List<PartitionInfo> partitionsFor(final String topic) { return Collections.EMPTY_LIST; } }, "test"); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); }
@Test public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() throws Exception { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer); assertTrue(producer.transactionInitialized()); assertTrue(producer.transactionInFlight()); }
@Test public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() throws Exception { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, null, time, producer); assertFalse(producer.transactionInitialized()); assertFalse(producer.transactionInFlight()); }
@Test public void shouldSendOffsetsAndCommitTransactionButNotStartNewTransactionOnSuspendIfEosEnabled() throws Exception { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer); task.addRecords(partition1, Collections.singletonList( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))); task.process(); task.suspend(); assertTrue(producer.sentOffsets()); assertTrue(producer.transactionCommitted()); assertFalse(producer.transactionInFlight()); }
@Test public void shouldCommitTransactionOnSuspendEvenIfTransactionIsEmptyIfEosEnabled() throws Exception { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer); task.suspend(); assertTrue(producer.transactionCommitted()); assertFalse(producer.transactionInFlight()); }
@Test public void shouldNotSendOffsetsAndCommitTransactionNorStartNewTransactionOnSuspendIfEosDisabled() throws Exception { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, null, time, producer); task.addRecords(partition1, Collections.singletonList( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))); task.process(); task.suspend(); assertFalse(producer.sentOffsets()); assertFalse(producer.transactionCommitted()); assertFalse(producer.transactionInFlight()); }
@Test public void shouldStartNewTransactionOnResumeIfEosEnabled() throws Exception { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer); task.addRecords(partition1, Collections.singletonList( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))); task.process(); task.suspend(); task.resume(); assertTrue(producer.transactionInFlight()); }
@Test public void shouldNotStartNewTransactionOnResumeIfEosDisabled() throws Exception { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, null, time, producer); task.addRecords(partition1, Collections.singletonList( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))); task.process(); task.suspend(); task.resume(); assertFalse(producer.transactionInFlight()); }
@Test public void shouldStartNewTransactionOnCommitIfEosEnabled() throws Exception { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer); task.addRecords(partition1, Collections.singletonList( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))); task.process(); task.commit(); assertTrue(producer.transactionInFlight()); }
@Test public void shouldNotStartNewTransactionOnCommitIfEosDisabled() throws Exception { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, null, time, producer); task.addRecords(partition1, Collections.singletonList( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))); task.process(); task.commit(); assertFalse(producer.transactionInFlight()); }
@Test public void shouldAbortTransactionOnDirtyClosedIfEosEnabled() throws Exception { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer); task.close(false); task = null; assertTrue(producer.transactionAborted()); }
@Test public void shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() throws Exception { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, null, time, producer); task.close(false); assertFalse(producer.transactionAborted()); }
@SuppressWarnings("unchecked") @Test public void shouldCloseProducerOnCloseWhenEosEnabled() throws Exception { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer); task.close(true); task = null; assertTrue(producer.closed()); }
@Override public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) { if (applicationId != null) { assertThat((String) config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), startsWith(applicationId + "-")); } else { assertFalse(config.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); } final MockProducer<byte[], byte[]> producer = new MockProducer<>(true, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER); producers.add(producer); return producer; }
ProducerSendCompletionThread(MockProducer<Integer, Long> mockProducer, int maxErrors, int errorFrequency) { this.mockProducer = mockProducer; this.maxErrors = maxErrors; this.errorFrequency = errorFrequency; injectorThread = Executors.newSingleThreadExecutor(); }
@Test public void shouldSendDataToKafka() throws IOException { // given Page target = new Page(new URL(url), html, responseHeaders); target.setCrawlerId("mycrawler"); target.setTargetRelevance(TargetRelevance.RELEVANT); String topicName = "ache-data-topic"; StringSerializer ss = new StringSerializer(); MockProducer<String, String> producer = new MockProducer<>(true, ss, ss); KafkaConfig.Format format = KafkaConfig.Format.JSON; KafkaTargetRepository repository = new KafkaTargetRepository(producer, topicName, format); // when repository.insert(target); repository.close(); // then List<ProducerRecord<String, String>> history = producer.history(); TargetModelJson page = mapper.readValue(history.get(0).value(), TargetModelJson.class); assertThat(page.getContentAsString(), is(html)); assertThat(page.getUrl(), is(url)); assertThat(page.getResponseHeaders().get("content-type").get(0), is("text/html")); assertThat(page.getRelevance().isRelevant(), is(TargetRelevance.RELEVANT.isRelevant())); assertThat(page.getRelevance().getRelevance(), is(TargetRelevance.RELEVANT.getRelevance())); assertThat(page.getCrawlerId(), is("mycrawler")); }
@Test public void shouldSendDataToKafkaUsingCDR31() throws IOException { // given Page target = new Page(new URL(url), html, responseHeaders); target.setCrawlerId("mycrawler"); target.setTargetRelevance(TargetRelevance.RELEVANT); String topicName = "ache-data-topic"; StringSerializer ss = new StringSerializer(); MockProducer<String, String> producer = new MockProducer<>(true, ss, ss); KafkaConfig.Format format = KafkaConfig.Format.CDR31; KafkaTargetRepository repository = new KafkaTargetRepository(producer, topicName, format); // when repository.insert(target); repository.close(); // then List<ProducerRecord<String, String>> history = producer.history(); CDR31Document page = mapper.readValue(history.get(0).value(), CDR31Document.class); assertThat(page.getRawContent(), is(html)); assertThat(page.getUrl(), is(url)); assertThat(page.getResponseHeaders().get("content-type"), is("text/html")); assertThat(page.getCrawler(), is("mycrawler")); }
@Override public <K, V> MockProducer<K, V> producer(Properties properties) { return createProducer(); }
/** * Create a new test driver instance. * @param config the stream configuration for the topology * @param builder the topology builder that will be used to create the topology instance */ public ProcessorTopologyTestDriver(final StreamsConfig config, final TopologyBuilder builder) { topology = builder.setApplicationId(APPLICATION_ID).build(null); final ProcessorTopology globalTopology = builder.buildGlobalStateTopology(); // Set up the consumer and producer ... final Consumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); final Serializer<byte[]> bytesSerializer = new ByteArraySerializer(); producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) { @Override public List<PartitionInfo> partitionsFor(final String topic) { return Collections.singletonList(new PartitionInfo(topic, PARTITION_ID, null, null, null)); } }; // Identify internal topics for forwarding in process ... for (final TopologyBuilder.TopicsInfo topicsInfo : builder.topicGroups().values()) { internalTopics.addAll(topicsInfo.repartitionSourceTopics.keySet()); } // Set up all of the topic+partition information and subscribe the consumer to each ... for (final String topic : topology.sourceTopics()) { final TopicPartition tp = new TopicPartition(topic, PARTITION_ID); partitionsByTopic.put(topic, tp); offsetsByTopicPartition.put(tp, new AtomicLong()); } consumer.assign(offsetsByTopicPartition.keySet()); final StateDirectory stateDirectory = new StateDirectory(APPLICATION_ID, TestUtils.tempDirectory().getPath(), Time.SYSTEM); final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics()); final ThreadCache cache = new ThreadCache("mock", 1024 * 1024, streamsMetrics); if (globalTopology != null) { final MockConsumer<byte[], byte[]> globalConsumer = createGlobalConsumer(); for (final String topicName : globalTopology.sourceTopics()) { final List<PartitionInfo> partitionInfos = new ArrayList<>(); partitionInfos.add(new PartitionInfo(topicName, 1, null, null, null)); globalConsumer.updatePartitions(topicName, partitionInfos); final TopicPartition partition = new TopicPartition(topicName, 1); globalConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L)); globalPartitionsByTopic.put(topicName, partition); offsetsByTopicPartition.put(partition, new AtomicLong()); } final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology, globalConsumer, stateDirectory); globalStateTask = new GlobalStateUpdateTask(globalTopology, new GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache), stateManager ); globalStateTask.initialize(); } if (!partitionsByTopic.isEmpty()) { task = new StreamTask(TASK_ID, APPLICATION_ID, partitionsByTopic.values(), topology, consumer, new StoreChangelogReader( createRestoreConsumer(topology.storeToChangelogTopic()), Time.SYSTEM, 5000), config, streamsMetrics, stateDirectory, cache, new MockTime(), producer); } }
public MyResourceService(final CuratorFramework curator, final EventService eventService, final InterProcessLock lock) { super(baseUrl, new MockProducer<>(true, new StringSerializer(), new StringSerializer()), curator, eventService, mockIdSupplier, false); this.lock = lock; }
public MyResourceService(final String baseUrl, final String connectString) { super(baseUrl, new MockProducer<>(true, new StringSerializer(), new StringSerializer()), getZkClient(connectString), null, null, false); }
public KafkaSubscriberBlackboxTest() { super(new TestEnvironment()); mockProducer = new MockProducer<Long, Long>(true, new LongSerializer(), new LongSerializer()); }
public KafkaSubscriberWhiteboxTest() { super(new TestEnvironment()); mockProducer = new MockProducer<Long, Long>(true, new LongSerializer(), new LongSerializer()); }
@BeforeClass public static void setUp() { eventuateKafkaProducer = new EventuateKafkaProducer(); eventuateKafkaProducer.setProducer( new MockProducer(true, null , null, null) ); event = new PublishedEvent("eventId", "entityId", "entityType", "eventJson", "eventType", null, Optional.of("metadata")); }
ProducerSendCompletionThread(MockProducer<Integer, Long> mockProducer) { // complete everything successfully this(mockProducer, 0, 0); }
@BeforeEach void createLogForwarder() { logForwarder = new LogForwarder("url", "app", new MockProducer<>()); }