@Test @PrepareOnlyThisForTest( { NioDatagramSession.class, NioProcessor.class, AbstractIoSession.class }) public void assertThatPrepareOnlyThisForTestWorks() throws Exception { final String scheduleRemoveMethodName = "scheduleRemove"; Executor executor = createMock(Executor.class); NioProcessor objectUnderTest = createPartialMock(NioProcessor.class, new String[] { scheduleRemoveMethodName }, executor); NioDatagramSession session = createMock(NioDatagramSession.class); expect(session.isConnected()).andReturn(false); expectPrivate(objectUnderTest, scheduleRemoveMethodName, session).once(); replay(objectUnderTest, executor, session); assertFalse(Whitebox.<Boolean> invokeMethod(objectUnderTest, "flushNow", session, 20L)); verify(objectUnderTest, executor, session); }
@PrepareOnlyThisForTest(Metadata.class) @Test public void testInterceptorPartitionSetOnTooLargeRecord() throws Exception { Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1"); String topic = "topic"; ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value"); KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); Metadata metadata = PowerMock.createNiceMock(Metadata.class); MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata); final Cluster cluster = new Cluster( "dummy", Collections.singletonList(new Node(0, "host1", 1000)), Arrays.asList(new PartitionInfo(topic, 0, null, null, null)), Collections.<String>emptySet(), Collections.<String>emptySet()); EasyMock.expect(metadata.fetch()).andReturn(cluster).once(); // Mock interceptors field ProducerInterceptors interceptors = PowerMock.createMock(ProducerInterceptors.class); EasyMock.expect(interceptors.onSend(record)).andReturn(record); interceptors.onSendError(EasyMock.eq(record), EasyMock.<TopicPartition>notNull(), EasyMock.<Exception>notNull()); EasyMock.expectLastCall(); MemberModifier.field(KafkaProducer.class, "interceptors").set(producer, interceptors); PowerMock.replay(metadata); EasyMock.replay(interceptors); producer.send(record); EasyMock.verify(interceptors); }
@PrepareOnlyThisForTest(Metadata.class) @Test public void testMetadataFetch() throws Exception { Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); Metadata metadata = PowerMock.createNiceMock(Metadata.class); MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata); String topic = "topic"; ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value"); Collection<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000)); final Cluster emptyCluster = new Cluster(null, nodes, Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(), Collections.<String>emptySet()); final Cluster cluster = new Cluster( "dummy", Collections.singletonList(new Node(0, "host1", 1000)), Arrays.asList(new PartitionInfo(topic, 0, null, null, null)), Collections.<String>emptySet(), Collections.<String>emptySet()); // Expect exactly one fetch for each attempt to refresh while topic metadata is not available final int refreshAttempts = 5; EasyMock.expect(metadata.fetch()).andReturn(emptyCluster).times(refreshAttempts - 1); EasyMock.expect(metadata.fetch()).andReturn(cluster).once(); EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes(); PowerMock.replay(metadata); producer.send(record); PowerMock.verify(metadata); // Expect exactly one fetch if topic metadata is available PowerMock.reset(metadata); EasyMock.expect(metadata.fetch()).andReturn(cluster).once(); EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes(); PowerMock.replay(metadata); producer.send(record, null); PowerMock.verify(metadata); // Expect exactly one fetch if topic metadata is available PowerMock.reset(metadata); EasyMock.expect(metadata.fetch()).andReturn(cluster).once(); EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes(); PowerMock.replay(metadata); producer.partitionsFor(topic); PowerMock.verify(metadata); }
@PrepareOnlyThisForTest(Metadata.class) @Test public void testMetadataFetchOnStaleMetadata() throws Exception { Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); Metadata metadata = PowerMock.createNiceMock(Metadata.class); MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata); String topic = "topic"; ProducerRecord<String, String> initialRecord = new ProducerRecord<>(topic, "value"); // Create a record with a partition higher than the initial (outdated) partition range ProducerRecord<String, String> extendedRecord = new ProducerRecord<>(topic, 2, null, "value"); Collection<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000)); final Cluster emptyCluster = new Cluster(null, nodes, Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(), Collections.<String>emptySet()); final Cluster initialCluster = new Cluster( "dummy", Collections.singletonList(new Node(0, "host1", 1000)), Arrays.asList(new PartitionInfo(topic, 0, null, null, null)), Collections.<String>emptySet(), Collections.<String>emptySet()); final Cluster extendedCluster = new Cluster( "dummy", Collections.singletonList(new Node(0, "host1", 1000)), Arrays.asList( new PartitionInfo(topic, 0, null, null, null), new PartitionInfo(topic, 1, null, null, null), new PartitionInfo(topic, 2, null, null, null)), Collections.<String>emptySet(), Collections.<String>emptySet()); // Expect exactly one fetch for each attempt to refresh while topic metadata is not available final int refreshAttempts = 5; EasyMock.expect(metadata.fetch()).andReturn(emptyCluster).times(refreshAttempts - 1); EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once(); EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes(); PowerMock.replay(metadata); producer.send(initialRecord); PowerMock.verify(metadata); // Expect exactly one fetch if topic metadata is available and records are still within range PowerMock.reset(metadata); EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once(); EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes(); PowerMock.replay(metadata); producer.send(initialRecord, null); PowerMock.verify(metadata); // Expect exactly two fetches if topic metadata is available but metadata response still returns // the same partition size (either because metadata are still stale at the broker too or because // there weren't any partitions added in the first place). PowerMock.reset(metadata); EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once(); EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once(); EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes(); PowerMock.replay(metadata); try { producer.send(extendedRecord, null); fail("Expected KafkaException to be raised"); } catch (KafkaException e) { // expected } PowerMock.verify(metadata); // Expect exactly two fetches if topic metadata is available but outdated for the given record PowerMock.reset(metadata); EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once(); EasyMock.expect(metadata.fetch()).andReturn(extendedCluster).once(); EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes(); PowerMock.replay(metadata); producer.send(extendedRecord, null); PowerMock.verify(metadata); }
@PrepareOnlyThisForTest(Metadata.class) @Test public void testHeaders() throws Exception { Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); ExtendedSerializer keySerializer = PowerMock.createNiceMock(ExtendedSerializer.class); ExtendedSerializer valueSerializer = PowerMock.createNiceMock(ExtendedSerializer.class); KafkaProducer<String, String> producer = new KafkaProducer<>(props, keySerializer, valueSerializer); Metadata metadata = PowerMock.createNiceMock(Metadata.class); MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata); String topic = "topic"; Collection<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000)); final Cluster cluster = new Cluster( "dummy", Collections.singletonList(new Node(0, "host1", 1000)), Arrays.asList(new PartitionInfo(topic, 0, null, null, null)), Collections.<String>emptySet(), Collections.<String>emptySet()); EasyMock.expect(metadata.fetch()).andReturn(cluster).anyTimes(); PowerMock.replay(metadata); String value = "value"; ProducerRecord<String, String> record = new ProducerRecord<>(topic, value); EasyMock.expect(keySerializer.serialize(topic, record.headers(), null)).andReturn(null).once(); EasyMock.expect(valueSerializer.serialize(topic, record.headers(), value)).andReturn(value.getBytes()).once(); PowerMock.replay(keySerializer); PowerMock.replay(valueSerializer); //ensure headers can be mutated pre send. record.headers().add(new RecordHeader("test", "header2".getBytes())); producer.send(record, null); //ensure headers are closed and cannot be mutated post send try { record.headers().add(new RecordHeader("test", "test".getBytes())); fail("Expected IllegalStateException to be raised"); } catch (IllegalStateException ise) { //expected } //ensure existing headers are not changed, and last header for key is still original value assertTrue(Arrays.equals(record.headers().lastHeader("test").value(), "header2".getBytes())); PowerMock.verify(valueSerializer); PowerMock.verify(keySerializer); }
private void addFullyQualifiedNames(Set<String> all, PrepareOnlyThisForTest annotation) { String[] fullyQualifiedNames = annotation.fullyQualifiedNames(); addFullyQualifiedNames(all, fullyQualifiedNames); }