@Test public void shouldAddUserDefinedEndPointToSubscription() throws Exception { final Properties properties = configProps(); properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080"); final StreamsConfig config = new StreamsConfig(properties); final String applicationId = "application-id"; builder.setApplicationId(applicationId); builder.addSource("source", "input"); builder.addProcessor("processor", new MockProcessorSupplier(), "source"); builder.addSink("sink", "output", "processor"); final UUID uuid1 = UUID.randomUUID(); final String client1 = "client1"; final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1)); final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input")); final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData()); assertEquals("localhost:8080", subscriptionInfo.userEndPoint); }
@Test public void shouldSetClusterMetadataOnAssignment() throws Exception { final List<TopicPartition> topic = Collections.singletonList(new TopicPartition("topic", 0)); final Map<HostInfo, Set<TopicPartition>> hostState = Collections.singletonMap(new HostInfo("localhost", 80), Collections.singleton(new TopicPartition("topic", 0))); final AssignmentInfo assignmentInfo = new AssignmentInfo(Collections.singletonList(new TaskId(0, 0)), Collections.<TaskId, Set<TopicPartition>>emptyMap(), hostState); partitionAssignor.onAssignment(new PartitionAssignor.Assignment(topic, assignmentInfo.encode())); final Cluster cluster = partitionAssignor.clusterMetadata(); final List<PartitionInfo> partitionInfos = cluster.partitionsForTopic("topic"); final PartitionInfo partitionInfo = partitionInfos.get(0); assertEquals(1, partitionInfos.size()); assertEquals("topic", partitionInfo.topic()); assertEquals(0, partitionInfo.partition()); }
private void initPartitionGrouper(final StreamsConfig config, final StreamThread thread, final MockClientSupplier clientSupplier) { final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); partitionAssignor.configure(config.getConsumerConfigs(thread, thread.applicationId, thread.clientId)); final MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread.config, clientSupplier.restoreConsumer); partitionAssignor.setInternalTopicManager(internalTopicManager); final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, Collections.singletonMap("client", subscription)); partitionAssignor.onAssignment(assignments.get("client")); }
@SuppressWarnings("unchecked") @Test public void testSubscription() throws Exception { builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); final Set<TaskId> prevTasks = Utils.mkSet( new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1)); final Set<TaskId> cachedTasks = Utils.mkSet( new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1), new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)); String clientId = "client-id"; UUID processId = UUID.randomUUID(); StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", clientId, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { @Override public Set<TaskId> prevActiveTasks() { return prevTasks; } @Override public Set<TaskId> cachedTasks() { return cachedTasks; } }; partitionAssignor.configure(config.getConsumerConfigs(thread, "test", clientId)); PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2")); Collections.sort(subscription.topics()); assertEquals(Utils.mkList("topic1", "topic2"), subscription.topics()); Set<TaskId> standbyTasks = new HashSet<>(cachedTasks); standbyTasks.removeAll(prevTasks); SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks, null); assertEquals(info.encode(), subscription.userData()); }
@Test public void testAssignWithPartialTopology() throws Exception { Properties props = configProps(); props.put(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, SingleGroupPartitionGrouperStub.class); StreamsConfig config = new StreamsConfig(props); builder.addSource("source1", "topic1"); builder.addProcessor("processor1", new MockProcessorSupplier(), "source1"); builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor1"); builder.addSource("source2", "topic2"); builder.addProcessor("processor2", new MockProcessorSupplier(), "source2"); builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor2"); List<String> topics = Utils.mkList("topic1", "topic2"); Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); UUID uuid1 = UUID.randomUUID(); String client1 = "client1"; StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer)); Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode())); // will throw exception if it fails Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); // check assignment info Set<TaskId> allActiveTasks = new HashSet<>(); AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), assignments.get("consumer10")); allActiveTasks.addAll(info10.activeTasks); assertEquals(3, allActiveTasks.size()); assertEquals(allTasks, new HashSet<>(allActiveTasks)); }
@Test public void testOnAssignment() throws Exception { TopicPartition t2p3 = new TopicPartition("topic2", 3); TopologyBuilder builder = new TopologyBuilder(); builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); UUID uuid = UUID.randomUUID(); String client1 = "client1"; StreamThread thread = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1)); List<TaskId> activeTaskList = Utils.mkList(task0, task3); Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); activeTasks.put(task0, Utils.mkSet(t1p0)); activeTasks.put(task3, Utils.mkSet(t2p3)); standbyTasks.put(task1, Utils.mkSet(t1p0)); standbyTasks.put(task2, Utils.mkSet(t2p0)); AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, new HashMap<HostInfo, Set<TopicPartition>>()); PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode()); partitionAssignor.onAssignment(assignment); assertEquals(activeTasks, partitionAssignor.activeTasks()); assertEquals(standbyTasks, partitionAssignor.standbyTasks()); }
@Test public void testAssignWithInternalTopics() throws Exception { String applicationId = "test"; builder.setApplicationId(applicationId); builder.addInternalTopic("topicX"); builder.addSource("source1", "topic1"); builder.addProcessor("processor1", new MockProcessorSupplier(), "source1"); builder.addSink("sink1", "topicX", "processor1"); builder.addSource("source2", "topicX"); builder.addProcessor("processor2", new MockProcessorSupplier(), "source2"); List<String> topics = Utils.mkList("topic1", "test-topicX"); Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); UUID uuid1 = UUID.randomUUID(); String client1 = "client1"; StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1)); MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer); partitionAssignor.setInternalTopicManager(internalTopicManager); Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); Set<TaskId> emptyTasks = Collections.emptySet(); subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode())); partitionAssignor.assign(metadata, subscriptions); // check prepared internal topics assertEquals(1, internalTopicManager.readyTopics.size()); assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicX")); }
@Test public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throws Exception { String applicationId = "test"; builder.setApplicationId(applicationId); builder.addInternalTopic("topicX"); builder.addSource("source1", "topic1"); builder.addProcessor("processor1", new MockProcessorSupplier(), "source1"); builder.addSink("sink1", "topicX", "processor1"); builder.addSource("source2", "topicX"); builder.addInternalTopic("topicZ"); builder.addProcessor("processor2", new MockProcessorSupplier(), "source2"); builder.addSink("sink2", "topicZ", "processor2"); builder.addSource("source3", "topicZ"); List<String> topics = Utils.mkList("topic1", "test-topicX", "test-topicZ"); Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); UUID uuid1 = UUID.randomUUID(); String client1 = "client1"; StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1)); MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer); partitionAssignor.setInternalTopicManager(internalTopicManager); Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); Set<TaskId> emptyTasks = Collections.emptySet(); subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode())); partitionAssignor.assign(metadata, subscriptions); // check prepared internal topics assertEquals(2, internalTopicManager.readyTopics.size()); assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicZ")); }
@Test public void shouldMapUserEndPointToTopicPartitions() throws Exception { final Properties properties = configProps(); final String myEndPoint = "localhost:8080"; properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint); final StreamsConfig config = new StreamsConfig(properties); final String applicationId = "application-id"; builder.setApplicationId(applicationId); builder.addSource("source", "topic1"); builder.addProcessor("processor", new MockProcessorSupplier(), "source"); builder.addSink("sink", "output", "processor"); final List<String> topics = Utils.mkList("topic1"); final UUID uuid1 = UUID.randomUUID(); final String client1 = "client1"; final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, mockClientSupplier.restoreConsumer)); final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); final Set<TaskId> emptyTasks = Collections.emptySet(); subscriptions.put("consumer1", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, myEndPoint).encode())); final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1"); final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData()); final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 8080)); assertEquals(Utils.mkSet(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2)), topicPartitions); }
@Test public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exception { List<TopicPartition> topic = Collections.singletonList(new TopicPartition("topic", 0)); final Map<HostInfo, Set<TopicPartition>> hostState = Collections.singletonMap(new HostInfo("localhost", 80), Collections.singleton(new TopicPartition("topic", 0))); AssignmentInfo assignmentInfo = new AssignmentInfo(Collections.singletonList(new TaskId(0, 0)), Collections.<TaskId, Set<TopicPartition>>emptyMap(), hostState); partitionAssignor.onAssignment(new PartitionAssignor.Assignment(topic, assignmentInfo.encode())); assertEquals(hostState, partitionAssignor.getPartitionsByHostState()); }
private PartitionAssignor.Assignment createAssignment(final Map<HostInfo, Set<TopicPartition>> firstHostState) { final AssignmentInfo info = new AssignmentInfo(Collections.<TaskId>emptyList(), Collections.<TaskId, Set<TopicPartition>>emptyMap(), firstHostState); return new PartitionAssignor.Assignment( Collections.<TopicPartition>emptyList(), info.encode()); }
@Test public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() { int rebalanceTimeoutMs = 60000; int sessionTimeoutMs = 3000; int heartbeatIntervalMs = 2000; int autoCommitIntervalMs = 1000; Time time = new MockTime(); Cluster cluster = TestUtils.singletonCluster(topic, 1); Node node = cluster.nodes().get(0); Metadata metadata = createMetadata(); metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); client.setNode(node); PartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs); consumer.assign(Arrays.asList(tp0)); consumer.seekToBeginning(Arrays.asList(tp0)); // there shouldn't be any need to lookup the coordinator or fetch committed offsets. // we just lookup the starting position and send the record fetch. client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 50L), Errors.NONE)); client.prepareResponse(fetchResponse(tp0, 50L, 5)); ConsumerRecords<String, String> records = consumer.poll(0); assertEquals(5, records.count()); assertEquals(55L, consumer.position(tp0)); consumer.close(0, TimeUnit.MILLISECONDS); }
@Test public void testRegexSubscription() { int rebalanceTimeoutMs = 60000; int sessionTimeoutMs = 30000; int heartbeatIntervalMs = 3000; int autoCommitIntervalMs = 1000; String unmatchedTopic = "unmatched"; Time time = new MockTime(); Map<String, Integer> topicMetadata = new HashMap<>(); topicMetadata.put(topic, 1); topicMetadata.put(unmatchedTopic, 1); Cluster cluster = TestUtils.clusterWith(1, topicMetadata); Metadata metadata = createMetadata(); Node node = cluster.nodes().get(0); MockClient client = new MockClient(time, metadata); client.setNode(node); PartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs); prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null); consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer)); client.prepareMetadataUpdate(cluster, Collections.<String>emptySet()); consumer.poll(0); assertEquals(singleton(topic), consumer.subscription()); assertEquals(singleton(tp0), consumer.assignment()); consumer.close(0, TimeUnit.MILLISECONDS); }
@Test public void testPollThrowsInterruptExceptionIfInterrupted() throws Exception { int rebalanceTimeoutMs = 60000; int sessionTimeoutMs = 30000; int heartbeatIntervalMs = 3000; final Time time = new MockTime(); Cluster cluster = TestUtils.singletonCluster(topic, 1); final Node node = cluster.nodes().get(0); Metadata metadata = createMetadata(); metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); final MockClient client = new MockClient(time, metadata); client.setNode(node); final PartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, 0); consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer)); prepareRebalance(client, node, assignor, Arrays.asList(tp0), null); consumer.poll(0); // interrupt the thread and call poll try { Thread.currentThread().interrupt(); expectedException.expect(InterruptException.class); consumer.poll(0); } finally { // clear interrupted state again since this thread may be reused by JUnit Thread.interrupted(); } consumer.close(0, TimeUnit.MILLISECONDS); }
@Test public void fetchResponseWithUnexpectedPartitionIsIgnored() { int rebalanceTimeoutMs = 60000; int sessionTimeoutMs = 30000; int heartbeatIntervalMs = 3000; // adjust auto commit interval lower than heartbeat so we don't need to deal with // a concurrent heartbeat request int autoCommitIntervalMs = 1000; Time time = new MockTime(); Cluster cluster = TestUtils.singletonCluster(singletonMap(topic, 1)); Node node = cluster.nodes().get(0); Metadata metadata = createMetadata(); metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); client.setNode(node); PartitionAssignor assignor = new RangeAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs); consumer.subscribe(singletonList(topic), getConsumerRebalanceListener(consumer)); prepareRebalance(client, node, assignor, singletonList(tp0), null); Map<TopicPartition, FetchInfo> fetches1 = new HashMap<>(); fetches1.put(tp0, new FetchInfo(0, 1)); fetches1.put(t2p0, new FetchInfo(0, 10)); // not assigned and not fetched client.prepareResponseFrom(fetchResponse(fetches1), node); ConsumerRecords<String, String> records = consumer.poll(0); assertEquals(0, records.count()); consumer.close(0, TimeUnit.MILLISECONDS); }
@Test public void testAssignBasic() throws Exception { builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); List<String> topics = Utils.mkList("topic1", "topic2"); Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); final Set<TaskId> prevTasks10 = Utils.mkSet(task0); final Set<TaskId> prevTasks11 = Utils.mkSet(task1); final Set<TaskId> prevTasks20 = Utils.mkSet(task2); final Set<TaskId> standbyTasks10 = Utils.mkSet(task1); final Set<TaskId> standbyTasks11 = Utils.mkSet(task2); final Set<TaskId> standbyTasks20 = Utils.mkSet(task0); UUID uuid1 = UUID.randomUUID(); UUID uuid2 = UUID.randomUUID(); String client1 = "client1"; StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer)); Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode())); subscriptions.put("consumer11", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, userEndPoint).encode())); subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, userEndPoint).encode())); Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); // check assigned partitions assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)), Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions()))); assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions())); // check assignment info Set<TaskId> allActiveTasks = new HashSet<>(); // the first consumer AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10")); allActiveTasks.addAll(info10.activeTasks); // the second consumer AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11")); allActiveTasks.addAll(info11.activeTasks); assertEquals(Utils.mkSet(task0, task1), allActiveTasks); // the third consumer AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20")); allActiveTasks.addAll(info20.activeTasks); assertEquals(3, allActiveTasks.size()); assertEquals(allTasks, new HashSet<>(allActiveTasks)); assertEquals(3, allActiveTasks.size()); assertEquals(allTasks, allActiveTasks); }
@Test public void testAssignEmptyMetadata() throws Exception { builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); List<String> topics = Utils.mkList("topic1", "topic2"); Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); final Set<TaskId> prevTasks10 = Utils.mkSet(task0); final Set<TaskId> standbyTasks10 = Utils.mkSet(task1); final Cluster emptyMetadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(), Collections.<String>emptySet()); UUID uuid1 = UUID.randomUUID(); String client1 = "client1"; StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode())); // initially metadata is empty Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(emptyMetadata, subscriptions); // check assigned partitions assertEquals(Collections.<TopicPartition>emptySet(), new HashSet<>(assignments.get("consumer10").partitions())); // check assignment info Set<TaskId> allActiveTasks = new HashSet<>(); AssignmentInfo info10 = checkAssignment(Collections.<String>emptySet(), assignments.get("consumer10")); allActiveTasks.addAll(info10.activeTasks); assertEquals(0, allActiveTasks.size()); assertEquals(Collections.<TaskId>emptySet(), new HashSet<>(allActiveTasks)); // then metadata gets populated assignments = partitionAssignor.assign(metadata, subscriptions); // check assigned partitions assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0, t1p0, t2p0, t1p1, t2p1, t1p2, t2p2)), Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()))); // the first consumer info10 = checkAssignment(allTopics, assignments.get("consumer10")); allActiveTasks.addAll(info10.activeTasks); assertEquals(3, allActiveTasks.size()); assertEquals(allTasks, new HashSet<>(allActiveTasks)); assertEquals(3, allActiveTasks.size()); assertEquals(allTasks, allActiveTasks); }
@Test public void testAssignWithNewTasks() throws Exception { builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addSource("source3", "topic3"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2", "source3"); List<String> topics = Utils.mkList("topic1", "topic2", "topic3"); Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2, task3); // assuming that previous tasks do not have topic3 final Set<TaskId> prevTasks10 = Utils.mkSet(task0); final Set<TaskId> prevTasks11 = Utils.mkSet(task1); final Set<TaskId> prevTasks20 = Utils.mkSet(task2); UUID uuid1 = UUID.randomUUID(); UUID uuid2 = UUID.randomUUID(); String client1 = "client1"; StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer)); Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.<TaskId>emptySet(), userEndPoint).encode())); subscriptions.put("consumer11", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.<TaskId>emptySet(), userEndPoint).encode())); subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.<TaskId>emptySet(), userEndPoint).encode())); Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); // check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match // also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and // then later ones will be re-assigned to other hosts due to load balancing Set<TaskId> allActiveTasks = new HashSet<>(); Set<TopicPartition> allPartitions = new HashSet<>(); AssignmentInfo info; info = AssignmentInfo.decode(assignments.get("consumer10").userData()); allActiveTasks.addAll(info.activeTasks); allPartitions.addAll(assignments.get("consumer10").partitions()); info = AssignmentInfo.decode(assignments.get("consumer11").userData()); allActiveTasks.addAll(info.activeTasks); allPartitions.addAll(assignments.get("consumer11").partitions()); info = AssignmentInfo.decode(assignments.get("consumer20").userData()); allActiveTasks.addAll(info.activeTasks); allPartitions.addAll(assignments.get("consumer20").partitions()); assertEquals(allTasks, allActiveTasks); assertEquals(Utils.mkSet(t1p0, t1p1, t1p2, t2p0, t2p1, t2p2, t3p0, t3p1, t3p2, t3p3), allPartitions); }
@Test public void testAssignWithStates() throws Exception { String applicationId = "test"; builder.setApplicationId(applicationId); builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1"); builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor-1"); builder.addProcessor("processor-2", new MockProcessorSupplier(), "source2"); builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor-2"); builder.addStateStore(new MockStateStoreSupplier("store3", false), "processor-2"); List<String> topics = Utils.mkList("topic1", "topic2"); TaskId task00 = new TaskId(0, 0); TaskId task01 = new TaskId(0, 1); TaskId task02 = new TaskId(0, 2); TaskId task10 = new TaskId(1, 0); TaskId task11 = new TaskId(1, 1); TaskId task12 = new TaskId(1, 2); List<TaskId> tasks = Utils.mkList(task00, task01, task02, task10, task11, task12); UUID uuid1 = UUID.randomUUID(); UUID uuid2 = UUID.randomUUID(); String client1 = "client1"; StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer)); Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode())); subscriptions.put("consumer11", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode())); subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode())); Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); // check assigned partition size: since there is no previous task and there are two sub-topologies the assignment is random so we cannot check exact match assertEquals(2, assignments.get("consumer10").partitions().size()); assertEquals(2, assignments.get("consumer11").partitions().size()); assertEquals(2, assignments.get("consumer20").partitions().size()); AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData()); AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData()); AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData()); assertEquals(2, info10.activeTasks.size()); assertEquals(2, info11.activeTasks.size()); assertEquals(2, info20.activeTasks.size()); Set<TaskId> allTasks = new HashSet<>(); allTasks.addAll(info10.activeTasks); allTasks.addAll(info11.activeTasks); allTasks.addAll(info20.activeTasks); assertEquals(new HashSet<>(tasks), allTasks); // check tasks for state topics Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = thread10.builder.topicGroups(); assertEquals(Utils.mkSet(task00, task01, task02), tasksForState(applicationId, "store1", tasks, topicGroups)); assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store2", tasks, topicGroups)); assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store3", tasks, topicGroups)); }
@Test public void testAssignWithStandbyReplicas() throws Exception { Properties props = configProps(); props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1"); StreamsConfig config = new StreamsConfig(props); builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); List<String> topics = Utils.mkList("topic1", "topic2"); Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); final Set<TaskId> prevTasks00 = Utils.mkSet(task0); final Set<TaskId> prevTasks01 = Utils.mkSet(task1); final Set<TaskId> prevTasks02 = Utils.mkSet(task2); final Set<TaskId> standbyTasks01 = Utils.mkSet(task1); final Set<TaskId> standbyTasks02 = Utils.mkSet(task2); final Set<TaskId> standbyTasks00 = Utils.mkSet(task0); UUID uuid1 = UUID.randomUUID(); UUID uuid2 = UUID.randomUUID(); String client1 = "client1"; StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer)); Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks00, standbyTasks01, userEndPoint).encode())); subscriptions.put("consumer11", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks01, standbyTasks02, userEndPoint).encode())); subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks02, standbyTasks00, "any:9097").encode())); Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); Set<TaskId> allActiveTasks = new HashSet<>(); Set<TaskId> allStandbyTasks = new HashSet<>(); // the first consumer AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10")); allActiveTasks.addAll(info10.activeTasks); allStandbyTasks.addAll(info10.standbyTasks.keySet()); // the second consumer AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11")); allActiveTasks.addAll(info11.activeTasks); allStandbyTasks.addAll(info11.standbyTasks.keySet()); assertNotEquals("same processId has same set of standby tasks", info11.standbyTasks.keySet(), info10.standbyTasks.keySet()); // check active tasks assigned to the first client assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks)); assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks)); // the third consumer AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20")); allActiveTasks.addAll(info20.activeTasks); allStandbyTasks.addAll(info20.standbyTasks.keySet()); // all task ids are in the active tasks and also in the standby tasks assertEquals(3, allActiveTasks.size()); assertEquals(allTasks, allActiveTasks); assertEquals(3, allStandbyTasks.size()); assertEquals(allTasks, allStandbyTasks); }
@Test public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Exception { final Properties props = configProps(); props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1"); final StreamsConfig config = new StreamsConfig(props); final KStreamBuilder builder = new KStreamBuilder(); final String applicationId = "appId"; builder.setApplicationId(applicationId); builder.stream("topic1").groupByKey().count("count"); final UUID uuid = UUID.randomUUID(); final String client = "client1"; final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, mockClientSupplier.restoreConsumer)); final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); final Set<TaskId> emptyTasks = Collections.emptySet(); subscriptions.put( "consumer1", new PartitionAssignor.Subscription( Collections.singletonList("topic1"), new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode() ) ); subscriptions.put( "consumer2", new PartitionAssignor.Subscription( Collections.singletonList("topic1"), new SubscriptionInfo(UUID.randomUUID(), emptyTasks, emptyTasks, "other:9090").encode() ) ); final Set<TopicPartition> allPartitions = Utils.mkSet(t1p0, t1p1, t1p2); final Map<String, PartitionAssignor.Assignment> assign = partitionAssignor.assign(metadata, subscriptions); final PartitionAssignor.Assignment consumer1Assignment = assign.get("consumer1"); final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumer1Assignment.userData()); final Set<TopicPartition> consumer1partitions = assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 2171)); final Set<TopicPartition> consumer2Partitions = assignmentInfo.partitionsByHost.get(new HostInfo("other", 9090)); final HashSet<TopicPartition> allAssignedPartitions = new HashSet<>(consumer1partitions); allAssignedPartitions.addAll(consumer2Partitions); assertThat(consumer1partitions, not(allPartitions)); assertThat(consumer2Partitions, not(allPartitions)); assertThat(allAssignedPartitions, equalTo(allPartitions)); }
@Test public void verifyHeartbeatSent() throws Exception { int rebalanceTimeoutMs = 60000; int sessionTimeoutMs = 30000; int heartbeatIntervalMs = 1000; int autoCommitIntervalMs = 10000; Time time = new MockTime(); Cluster cluster = TestUtils.singletonCluster(topic, 1); Node node = cluster.nodes().get(0); Metadata metadata = createMetadata(); metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); client.setNode(node); PartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs); consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer)); Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0), null); // initial fetch client.prepareResponseFrom(fetchResponse(tp0, 0, 0), node); consumer.poll(0); assertEquals(Collections.singleton(tp0), consumer.assignment()); AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, coordinator); // heartbeat interval is 2 seconds time.sleep(heartbeatIntervalMs); Thread.sleep(heartbeatIntervalMs); consumer.poll(0); assertTrue(heartbeatReceived.get()); consumer.close(0, TimeUnit.MILLISECONDS); }
@Test public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception { int rebalanceTimeoutMs = 60000; int sessionTimeoutMs = 30000; int heartbeatIntervalMs = 1000; int autoCommitIntervalMs = 10000; Time time = new MockTime(); Cluster cluster = TestUtils.singletonCluster(topic, 1); Node node = cluster.nodes().get(0); Metadata metadata = createMetadata(); metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); client.setNode(node); PartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs); consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer)); Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0), null); consumer.poll(0); // respond to the outstanding fetch so that we have data available on the next poll client.respondFrom(fetchResponse(tp0, 0, 5), node); client.poll(0, time.milliseconds()); client.prepareResponseFrom(fetchResponse(tp0, 5, 0), node); AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, coordinator); time.sleep(heartbeatIntervalMs); Thread.sleep(heartbeatIntervalMs); consumer.poll(0); assertTrue(heartbeatReceived.get()); consumer.close(0, TimeUnit.MILLISECONDS); }
@Test public void testCommitsFetchedDuringAssign() { long offset1 = 10000; long offset2 = 20000; int rebalanceTimeoutMs = 6000; int sessionTimeoutMs = 3000; int heartbeatIntervalMs = 2000; int autoCommitIntervalMs = 1000; Time time = new MockTime(); Cluster cluster = TestUtils.singletonCluster(topic, 1); Node node = cluster.nodes().get(0); Metadata metadata = createMetadata(); metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); client.setNode(node); PartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs); consumer.assign(singletonList(tp0)); // lookup coordinator client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node); Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); // fetch offset for one topic client.prepareResponseFrom( offsetResponse(Collections.singletonMap(tp0, offset1), Errors.NONE), coordinator); assertEquals(offset1, consumer.committed(tp0).offset()); consumer.assign(Arrays.asList(tp0, tp1)); // fetch offset for two topics Map<TopicPartition, Long> offsets = new HashMap<>(); offsets.put(tp0, offset1); client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator); assertEquals(offset1, consumer.committed(tp0).offset()); offsets.remove(tp0); offsets.put(tp1, offset2); client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator); assertEquals(offset2, consumer.committed(tp1).offset()); consumer.close(0, TimeUnit.MILLISECONDS); }
@Test public void testAutoCommitSentBeforePositionUpdate() { int rebalanceTimeoutMs = 60000; int sessionTimeoutMs = 30000; int heartbeatIntervalMs = 3000; // adjust auto commit interval lower than heartbeat so we don't need to deal with // a concurrent heartbeat request int autoCommitIntervalMs = 1000; Time time = new MockTime(); Cluster cluster = TestUtils.singletonCluster(topic, 1); Node node = cluster.nodes().get(0); Metadata metadata = createMetadata(); metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); client.setNode(node); PartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs); consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer)); Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0), null); consumer.poll(0); // respond to the outstanding fetch so that we have data available on the next poll client.respondFrom(fetchResponse(tp0, 0, 5), node); client.poll(0, time.milliseconds()); time.sleep(autoCommitIntervalMs); client.prepareResponseFrom(fetchResponse(tp0, 5, 0), node); // no data has been returned to the user yet, so the committed offset should be 0 AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, coordinator, tp0, 0); consumer.poll(0); assertTrue(commitReceived.get()); consumer.close(0, TimeUnit.MILLISECONDS); }
@Test public void testChangingRegexSubscription() { int rebalanceTimeoutMs = 60000; int sessionTimeoutMs = 30000; int heartbeatIntervalMs = 3000; int autoCommitIntervalMs = 1000; PartitionAssignor assignor = new RoundRobinAssignor(); String otherTopic = "other"; TopicPartition otherTopicPartition = new TopicPartition(otherTopic, 0); Time time = new MockTime(); Map<String, Integer> topicMetadata = new HashMap<>(); topicMetadata.put(topic, 1); topicMetadata.put(otherTopic, 1); Cluster cluster = TestUtils.clusterWith(1, topicMetadata); Metadata metadata = createMetadata(); Node node = cluster.nodes().get(0); MockClient client = new MockClient(time, metadata); client.setNode(node); client.cluster(cluster); metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, autoCommitIntervalMs); Node coordinator = prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null); consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer)); consumer.poll(0); assertEquals(singleton(topic), consumer.subscription()); consumer.subscribe(Pattern.compile(otherTopic), getConsumerRebalanceListener(consumer)); prepareRebalance(client, node, singleton(otherTopic), assignor, singletonList(otherTopicPartition), coordinator); consumer.poll(0); assertEquals(singleton(otherTopic), consumer.subscription()); consumer.close(0, TimeUnit.MILLISECONDS); }
@Test public void testWakeupWithFetchDataAvailable() throws Exception { int rebalanceTimeoutMs = 60000; final int sessionTimeoutMs = 30000; int heartbeatIntervalMs = 3000; // adjust auto commit interval lower than heartbeat so we don't need to deal with // a concurrent heartbeat request int autoCommitIntervalMs = 1000; final Time time = new MockTime(); Cluster cluster = TestUtils.singletonCluster(topic, 1); Node node = cluster.nodes().get(0); Metadata metadata = createMetadata(); metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); client.setNode(node); PartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs); consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer)); prepareRebalance(client, node, assignor, Arrays.asList(tp0), null); consumer.poll(0); // respond to the outstanding fetch so that we have data available on the next poll client.respondFrom(fetchResponse(tp0, 0, 5), node); client.poll(0, time.milliseconds()); consumer.wakeup(); try { consumer.poll(0); fail(); } catch (WakeupException e) { } // make sure the position hasn't been updated assertEquals(0, consumer.position(tp0)); // the next poll should return the completed fetch ConsumerRecords<String, String> records = consumer.poll(0); assertEquals(5, records.count()); // Increment time asynchronously to clear timeouts in closing the consumer final ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); exec.scheduleAtFixedRate(new Runnable() { @Override public void run() { time.sleep(sessionTimeoutMs); } }, 0L, 10L, TimeUnit.MILLISECONDS); consumer.close(); exec.shutdownNow(); exec.awaitTermination(5L, TimeUnit.SECONDS); }
@Test public void testManualAssignmentChangeWithAutoCommitEnabled() { int rebalanceTimeoutMs = 60000; int sessionTimeoutMs = 30000; int heartbeatIntervalMs = 3000; int autoCommitIntervalMs = 1000; Time time = new MockTime(); Map<String, Integer> tpCounts = new HashMap<>(); tpCounts.put(topic, 1); tpCounts.put(topic2, 1); Cluster cluster = TestUtils.singletonCluster(tpCounts); Node node = cluster.nodes().get(0); Metadata metadata = createMetadata(); metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); client.setNode(node); PartitionAssignor assignor = new RangeAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs); // lookup coordinator client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node); Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); // manual assignment consumer.assign(Arrays.asList(tp0)); consumer.seekToBeginning(Arrays.asList(tp0)); // fetch offset for one topic client.prepareResponseFrom( offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE), coordinator); assertEquals(0, consumer.committed(tp0).offset()); // verify that assignment immediately changes assertTrue(consumer.assignment().equals(Collections.singleton(tp0))); // there shouldn't be any need to lookup the coordinator or fetch committed offsets. // we just lookup the starting position and send the record fetch. client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L), Errors.NONE)); client.prepareResponse(fetchResponse(tp0, 10L, 1)); ConsumerRecords<String, String> records = consumer.poll(0); assertEquals(1, records.count()); assertEquals(11L, consumer.position(tp0)); // mock the offset commit response for to be revoked partitions AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, coordinator, tp0, 11); // new manual assignment consumer.assign(Arrays.asList(t2p0)); // verify that assignment immediately changes assertTrue(consumer.assignment().equals(Collections.singleton(t2p0))); // verify that the offset commits occurred as expected assertTrue(commitReceived.get()); client.requests().clear(); consumer.close(); }
@Test public void testManualAssignmentChangeWithAutoCommitDisabled() { int rebalanceTimeoutMs = 60000; int sessionTimeoutMs = 30000; int heartbeatIntervalMs = 3000; int autoCommitIntervalMs = 1000; Time time = new MockTime(); Map<String, Integer> tpCounts = new HashMap<>(); tpCounts.put(topic, 1); tpCounts.put(topic2, 1); Cluster cluster = TestUtils.singletonCluster(tpCounts); Node node = cluster.nodes().get(0); Metadata metadata = createMetadata(); metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); client.setNode(node); PartitionAssignor assignor = new RangeAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, autoCommitIntervalMs); // lookup coordinator client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node); Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); // manual assignment consumer.assign(Arrays.asList(tp0)); consumer.seekToBeginning(Arrays.asList(tp0)); // fetch offset for one topic client.prepareResponseFrom( offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE), coordinator); assertEquals(0, consumer.committed(tp0).offset()); // verify that assignment immediately changes assertTrue(consumer.assignment().equals(Collections.singleton(tp0))); // there shouldn't be any need to lookup the coordinator or fetch committed offsets. // we just lookup the starting position and send the record fetch. client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L), Errors.NONE)); client.prepareResponse(fetchResponse(tp0, 10L, 1)); ConsumerRecords<String, String> records = consumer.poll(0); assertEquals(1, records.count()); assertEquals(11L, consumer.position(tp0)); // new manual assignment consumer.assign(Arrays.asList(t2p0)); // verify that assignment immediately changes assertTrue(consumer.assignment().equals(Collections.singleton(t2p0))); // the auto commit is disabled, so no offset commit request should be sent for (ClientRequest req : client.requests()) assertTrue(req.requestBuilder().apiKey() != ApiKeys.OFFSET_COMMIT); client.requests().clear(); consumer.close(); }
private JoinGroupResponse joinGroupFollowerResponse(PartitionAssignor assignor, int generationId, String memberId, String leaderId, Errors error) { return new JoinGroupResponse(error, generationId, assignor.name(), memberId, leaderId, Collections.<String, ByteBuffer>emptyMap()); }
private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, Errors error) { ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions)); return new SyncGroupResponse(error, buf); }
private Struct joinGroupFollowerResponse(PartitionAssignor assignor, int generationId, String memberId, String leaderId, short error) { return new JoinGroupResponse(error, generationId, assignor.name(), memberId, leaderId, Collections.<String, ByteBuffer>emptyMap()).toStruct(); }
private Struct syncGroupResponse(List<TopicPartition> partitions, short error) { ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions)); return new SyncGroupResponse(error, buf).toStruct(); }