Java 类org.apache.kafka.clients.consumer.internals.PartitionAssignor 实例源码

项目:kafka-0.11.0.0-src-with-comment    文件:StreamPartitionAssignorTest.java   
@Test
public void shouldAddUserDefinedEndPointToSubscription() throws Exception {
    final Properties properties = configProps();
    properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080");
    final StreamsConfig config = new StreamsConfig(properties);
    final String applicationId = "application-id";
    builder.setApplicationId(applicationId);
    builder.addSource("source", "input");
    builder.addProcessor("processor", new MockProcessorSupplier(), "source");
    builder.addSink("sink", "output", "processor");

    final UUID uuid1 = UUID.randomUUID();
    final String client1 = "client1";

    final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
                                                       0);

    partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
    final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input"));
    final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData());
    assertEquals("localhost:8080", subscriptionInfo.userEndPoint);
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamPartitionAssignorTest.java   
@Test
public void shouldSetClusterMetadataOnAssignment() throws Exception {
    final List<TopicPartition> topic = Collections.singletonList(new TopicPartition("topic", 0));
    final Map<HostInfo, Set<TopicPartition>> hostState =
            Collections.singletonMap(new HostInfo("localhost", 80),
                    Collections.singleton(new TopicPartition("topic", 0)));
    final AssignmentInfo assignmentInfo = new AssignmentInfo(Collections.singletonList(new TaskId(0, 0)),
            Collections.<TaskId, Set<TopicPartition>>emptyMap(),
            hostState);


    partitionAssignor.onAssignment(new PartitionAssignor.Assignment(topic, assignmentInfo.encode()));
    final Cluster cluster = partitionAssignor.clusterMetadata();
    final List<PartitionInfo> partitionInfos = cluster.partitionsForTopic("topic");
    final PartitionInfo partitionInfo = partitionInfos.get(0);
    assertEquals(1, partitionInfos.size());
    assertEquals("topic", partitionInfo.topic());
    assertEquals(0, partitionInfo.partition());
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamThreadTest.java   
private void initPartitionGrouper(final StreamsConfig config,
                                  final StreamThread thread,
                                  final MockClientSupplier clientSupplier) {

    final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();

    partitionAssignor.configure(config.getConsumerConfigs(thread, thread.applicationId, thread.clientId));
    final MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread.config, clientSupplier.restoreConsumer);
    partitionAssignor.setInternalTopicManager(internalTopicManager);

    final Map<String, PartitionAssignor.Assignment> assignments =
        partitionAssignor.assign(metadata, Collections.singletonMap("client", subscription));

    partitionAssignor.onAssignment(assignments.get("client"));
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamPartitionAssignorTest.java   
@SuppressWarnings("unchecked")
@Test
public void testSubscription() throws Exception {
    builder.addSource("source1", "topic1");
    builder.addSource("source2", "topic2");
    builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");

    final Set<TaskId> prevTasks = Utils.mkSet(
            new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1));
    final Set<TaskId> cachedTasks = Utils.mkSet(
            new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1),
            new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2));

    String clientId = "client-id";
    UUID processId = UUID.randomUUID();
    StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", clientId, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
                                           0) {
        @Override
        public Set<TaskId> prevActiveTasks() {
            return prevTasks;
        }
        @Override
        public Set<TaskId> cachedTasks() {
            return cachedTasks;
        }
    };

    partitionAssignor.configure(config.getConsumerConfigs(thread, "test", clientId));

    PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2"));

    Collections.sort(subscription.topics());
    assertEquals(Utils.mkList("topic1", "topic2"), subscription.topics());

    Set<TaskId> standbyTasks = new HashSet<>(cachedTasks);
    standbyTasks.removeAll(prevTasks);

    SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks, null);
    assertEquals(info.encode(), subscription.userData());
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamPartitionAssignorTest.java   
@Test
public void testAssignWithPartialTopology() throws Exception {
    Properties props = configProps();
    props.put(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, SingleGroupPartitionGrouperStub.class);
    StreamsConfig config = new StreamsConfig(props);

    builder.addSource("source1", "topic1");
    builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
    builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor1");
    builder.addSource("source2", "topic2");
    builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
    builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor2");
    List<String> topics = Utils.mkList("topic1", "topic2");
    Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);

    UUID uuid1 = UUID.randomUUID();
    String client1 = "client1";

    StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);

    partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
    partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
    Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
    subscriptions.put("consumer10",
        new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));


    // will throw exception if it fails
    Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);

    // check assignment info
    Set<TaskId> allActiveTasks = new HashSet<>();
    AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), assignments.get("consumer10"));
    allActiveTasks.addAll(info10.activeTasks);

    assertEquals(3, allActiveTasks.size());
    assertEquals(allTasks, new HashSet<>(allActiveTasks));
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamPartitionAssignorTest.java   
@Test
public void testOnAssignment() throws Exception {
    TopicPartition t2p3 = new TopicPartition("topic2", 3);

    TopologyBuilder builder = new TopologyBuilder();
    builder.addSource("source1", "topic1");
    builder.addSource("source2", "topic2");
    builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");

    UUID uuid = UUID.randomUUID();
    String client1 = "client1";

    StreamThread thread = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
                                           0);

    partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));

    List<TaskId> activeTaskList = Utils.mkList(task0, task3);
    Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
    Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
    activeTasks.put(task0, Utils.mkSet(t1p0));
    activeTasks.put(task3, Utils.mkSet(t2p3));
    standbyTasks.put(task1, Utils.mkSet(t1p0));
    standbyTasks.put(task2, Utils.mkSet(t2p0));

    AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, new HashMap<HostInfo, Set<TopicPartition>>());
    PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode());
    partitionAssignor.onAssignment(assignment);

    assertEquals(activeTasks, partitionAssignor.activeTasks());
    assertEquals(standbyTasks, partitionAssignor.standbyTasks());
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamPartitionAssignorTest.java   
@Test
public void testAssignWithInternalTopics() throws Exception {
    String applicationId = "test";
    builder.setApplicationId(applicationId);
    builder.addInternalTopic("topicX");
    builder.addSource("source1", "topic1");
    builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
    builder.addSink("sink1", "topicX", "processor1");
    builder.addSource("source2", "topicX");
    builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
    List<String> topics = Utils.mkList("topic1", "test-topicX");
    Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);

    UUID uuid1 = UUID.randomUUID();
    String client1 = "client1";


    StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
                                             0);

    partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
    MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer);
    partitionAssignor.setInternalTopicManager(internalTopicManager);

    Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
    Set<TaskId> emptyTasks = Collections.emptySet();
    subscriptions.put("consumer10",
            new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()));

    partitionAssignor.assign(metadata, subscriptions);

    // check prepared internal topics
    assertEquals(1, internalTopicManager.readyTopics.size());
    assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicX"));
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamPartitionAssignorTest.java   
@Test
public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throws Exception {
    String applicationId = "test";
    builder.setApplicationId(applicationId);
    builder.addInternalTopic("topicX");
    builder.addSource("source1", "topic1");
    builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
    builder.addSink("sink1", "topicX", "processor1");
    builder.addSource("source2", "topicX");
    builder.addInternalTopic("topicZ");
    builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
    builder.addSink("sink2", "topicZ", "processor2");
    builder.addSource("source3", "topicZ");
    List<String> topics = Utils.mkList("topic1", "test-topicX", "test-topicZ");
    Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);

    UUID uuid1 = UUID.randomUUID();
    String client1 = "client1";

    StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
                                             0);

    partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
    MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer);
    partitionAssignor.setInternalTopicManager(internalTopicManager);

    Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
    Set<TaskId> emptyTasks = Collections.emptySet();
    subscriptions.put("consumer10",
            new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()));

    partitionAssignor.assign(metadata, subscriptions);

    // check prepared internal topics
    assertEquals(2, internalTopicManager.readyTopics.size());
    assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicZ"));
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamPartitionAssignorTest.java   
@Test
public void shouldMapUserEndPointToTopicPartitions() throws Exception {
    final Properties properties = configProps();
    final String myEndPoint = "localhost:8080";
    properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
    final StreamsConfig config = new StreamsConfig(properties);
    final String applicationId = "application-id";
    builder.setApplicationId(applicationId);
    builder.addSource("source", "topic1");
    builder.addProcessor("processor", new MockProcessorSupplier(), "source");
    builder.addSink("sink", "output", "processor");

    final List<String> topics = Utils.mkList("topic1");

    final UUID uuid1 = UUID.randomUUID();
    final String client1 = "client1";

    final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
                                                       0);

    final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
    partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
    partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, mockClientSupplier.restoreConsumer));

    final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
    final Set<TaskId> emptyTasks = Collections.emptySet();
    subscriptions.put("consumer1",
            new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, myEndPoint).encode()));

    final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
    final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1");
    final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData());
    final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 8080));
    assertEquals(Utils.mkSet(new TopicPartition("topic1", 0),
            new TopicPartition("topic1", 1),
            new TopicPartition("topic1", 2)), topicPartitions);
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamPartitionAssignorTest.java   
@Test
public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exception {
    List<TopicPartition> topic = Collections.singletonList(new TopicPartition("topic", 0));
    final Map<HostInfo, Set<TopicPartition>> hostState =
            Collections.singletonMap(new HostInfo("localhost", 80),
                    Collections.singleton(new TopicPartition("topic", 0)));
    AssignmentInfo assignmentInfo = new AssignmentInfo(Collections.singletonList(new TaskId(0, 0)),
            Collections.<TaskId, Set<TopicPartition>>emptyMap(),
            hostState);
    partitionAssignor.onAssignment(new PartitionAssignor.Assignment(topic, assignmentInfo.encode()));
    assertEquals(hostState, partitionAssignor.getPartitionsByHostState());
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamPartitionAssignorTest.java   
private PartitionAssignor.Assignment createAssignment(final Map<HostInfo, Set<TopicPartition>> firstHostState) {
    final AssignmentInfo info = new AssignmentInfo(Collections.<TaskId>emptyList(),
                                                   Collections.<TaskId, Set<TopicPartition>>emptyMap(),
                                                   firstHostState);

    return new PartitionAssignor.Assignment(
            Collections.<TopicPartition>emptyList(), info.encode());
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConsumerTest.java   
@Test
public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
    int rebalanceTimeoutMs = 60000;
    int sessionTimeoutMs = 3000;
    int heartbeatIntervalMs = 2000;
    int autoCommitIntervalMs = 1000;

    Time time = new MockTime();
    Cluster cluster = TestUtils.singletonCluster(topic, 1);
    Node node = cluster.nodes().get(0);

    Metadata metadata = createMetadata();
    metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());

    MockClient client = new MockClient(time, metadata);
    client.setNode(node);
    PartitionAssignor assignor = new RoundRobinAssignor();

    final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
            rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
    consumer.assign(Arrays.asList(tp0));
    consumer.seekToBeginning(Arrays.asList(tp0));

    // there shouldn't be any need to lookup the coordinator or fetch committed offsets.
    // we just lookup the starting position and send the record fetch.
    client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 50L), Errors.NONE));
    client.prepareResponse(fetchResponse(tp0, 50L, 5));

    ConsumerRecords<String, String> records = consumer.poll(0);
    assertEquals(5, records.count());
    assertEquals(55L, consumer.position(tp0));
    consumer.close(0, TimeUnit.MILLISECONDS);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConsumerTest.java   
@Test
public void testRegexSubscription() {
    int rebalanceTimeoutMs = 60000;
    int sessionTimeoutMs = 30000;
    int heartbeatIntervalMs = 3000;
    int autoCommitIntervalMs = 1000;

    String unmatchedTopic = "unmatched";

    Time time = new MockTime();

    Map<String, Integer> topicMetadata = new HashMap<>();
    topicMetadata.put(topic, 1);
    topicMetadata.put(unmatchedTopic, 1);

    Cluster cluster = TestUtils.clusterWith(1, topicMetadata);
    Metadata metadata = createMetadata();
    Node node = cluster.nodes().get(0);

    MockClient client = new MockClient(time, metadata);
    client.setNode(node);
    PartitionAssignor assignor = new RoundRobinAssignor();

    final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
            rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);


    prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);

    consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));

    client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());

    consumer.poll(0);
    assertEquals(singleton(topic), consumer.subscription());
    assertEquals(singleton(tp0), consumer.assignment());
    consumer.close(0, TimeUnit.MILLISECONDS);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConsumerTest.java   
@Test
public void testPollThrowsInterruptExceptionIfInterrupted() throws Exception {
    int rebalanceTimeoutMs = 60000;
    int sessionTimeoutMs = 30000;
    int heartbeatIntervalMs = 3000;

    final Time time = new MockTime();
    Cluster cluster = TestUtils.singletonCluster(topic, 1);
    final Node node = cluster.nodes().get(0);

    Metadata metadata = createMetadata();
    metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());

    final MockClient client = new MockClient(time, metadata);
    client.setNode(node);
    final PartitionAssignor assignor = new RoundRobinAssignor();

    final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
            rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, 0);

    consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
    prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);

    consumer.poll(0);

    // interrupt the thread and call poll
    try {
        Thread.currentThread().interrupt();
        expectedException.expect(InterruptException.class);
        consumer.poll(0);
    } finally {
        // clear interrupted state again since this thread may be reused by JUnit
        Thread.interrupted();
    }
    consumer.close(0, TimeUnit.MILLISECONDS);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConsumerTest.java   
@Test
public void fetchResponseWithUnexpectedPartitionIsIgnored() {
    int rebalanceTimeoutMs = 60000;
    int sessionTimeoutMs = 30000;
    int heartbeatIntervalMs = 3000;

    // adjust auto commit interval lower than heartbeat so we don't need to deal with
    // a concurrent heartbeat request
    int autoCommitIntervalMs = 1000;

    Time time = new MockTime();
    Cluster cluster = TestUtils.singletonCluster(singletonMap(topic, 1));
    Node node = cluster.nodes().get(0);

    Metadata metadata = createMetadata();
    metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());

    MockClient client = new MockClient(time, metadata);
    client.setNode(node);
    PartitionAssignor assignor = new RangeAssignor();

    final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
            rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);

    consumer.subscribe(singletonList(topic), getConsumerRebalanceListener(consumer));

    prepareRebalance(client, node, assignor, singletonList(tp0), null);

    Map<TopicPartition, FetchInfo> fetches1 = new HashMap<>();
    fetches1.put(tp0, new FetchInfo(0, 1));
    fetches1.put(t2p0, new FetchInfo(0, 10)); // not assigned and not fetched
    client.prepareResponseFrom(fetchResponse(fetches1), node);

    ConsumerRecords<String, String> records = consumer.poll(0);
    assertEquals(0, records.count());
    consumer.close(0, TimeUnit.MILLISECONDS);
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamPartitionAssignorTest.java   
@Test
public void testAssignBasic() throws Exception {
    builder.addSource("source1", "topic1");
    builder.addSource("source2", "topic2");
    builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
    List<String> topics = Utils.mkList("topic1", "topic2");
    Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);

    final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
    final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
    final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
    final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
    final Set<TaskId> standbyTasks11 = Utils.mkSet(task2);
    final Set<TaskId> standbyTasks20 = Utils.mkSet(task0);

    UUID uuid1 = UUID.randomUUID();
    UUID uuid2 = UUID.randomUUID();
    String client1 = "client1";


    StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
                                             0);


    partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
    partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));

    Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
    subscriptions.put("consumer10",
            new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode()));
    subscriptions.put("consumer11",
            new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, userEndPoint).encode()));
    subscriptions.put("consumer20",
            new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, userEndPoint).encode()));


    Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);

    // check assigned partitions
    assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)),
            Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions())));
    assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions()));

    // check assignment info

    Set<TaskId> allActiveTasks = new HashSet<>();

    // the first consumer
    AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
    allActiveTasks.addAll(info10.activeTasks);

    // the second consumer
    AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11"));
    allActiveTasks.addAll(info11.activeTasks);

    assertEquals(Utils.mkSet(task0, task1), allActiveTasks);

    // the third consumer
    AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20"));
    allActiveTasks.addAll(info20.activeTasks);

    assertEquals(3, allActiveTasks.size());
    assertEquals(allTasks, new HashSet<>(allActiveTasks));

    assertEquals(3, allActiveTasks.size());
    assertEquals(allTasks, allActiveTasks);
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamPartitionAssignorTest.java   
@Test
public void testAssignEmptyMetadata() throws Exception {
    builder.addSource("source1", "topic1");
    builder.addSource("source2", "topic2");
    builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
    List<String> topics = Utils.mkList("topic1", "topic2");
    Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);

    final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
    final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
    final  Cluster emptyMetadata = new Cluster("cluster", Collections.singletonList(Node.noNode()),
        Collections.<PartitionInfo>emptySet(),
        Collections.<String>emptySet(),
        Collections.<String>emptySet());
    UUID uuid1 = UUID.randomUUID();
    String client1 = "client1";

    StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);

    partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));

    Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
    subscriptions.put("consumer10",
        new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode()));

    // initially metadata is empty
    Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(emptyMetadata, subscriptions);

    // check assigned partitions
    assertEquals(Collections.<TopicPartition>emptySet(),
        new HashSet<>(assignments.get("consumer10").partitions()));

    // check assignment info
    Set<TaskId> allActiveTasks = new HashSet<>();
    AssignmentInfo info10 = checkAssignment(Collections.<String>emptySet(), assignments.get("consumer10"));
    allActiveTasks.addAll(info10.activeTasks);

    assertEquals(0, allActiveTasks.size());
    assertEquals(Collections.<TaskId>emptySet(), new HashSet<>(allActiveTasks));

    // then metadata gets populated
    assignments = partitionAssignor.assign(metadata, subscriptions);
    // check assigned partitions
    assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0, t1p0, t2p0, t1p1, t2p1, t1p2, t2p2)),
        Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions())));

    // the first consumer
    info10 = checkAssignment(allTopics, assignments.get("consumer10"));
    allActiveTasks.addAll(info10.activeTasks);

    assertEquals(3, allActiveTasks.size());
    assertEquals(allTasks, new HashSet<>(allActiveTasks));

    assertEquals(3, allActiveTasks.size());
    assertEquals(allTasks, allActiveTasks);
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamPartitionAssignorTest.java   
@Test
public void testAssignWithNewTasks() throws Exception {
    builder.addSource("source1", "topic1");
    builder.addSource("source2", "topic2");
    builder.addSource("source3", "topic3");
    builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2", "source3");
    List<String> topics = Utils.mkList("topic1", "topic2", "topic3");
    Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2, task3);

    // assuming that previous tasks do not have topic3
    final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
    final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
    final Set<TaskId> prevTasks20 = Utils.mkSet(task2);

    UUID uuid1 = UUID.randomUUID();
    UUID uuid2 = UUID.randomUUID();
    String client1 = "client1";

    StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
                                             0);

    partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
    partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));

    Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
    subscriptions.put("consumer10",
            new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.<TaskId>emptySet(), userEndPoint).encode()));
    subscriptions.put("consumer11",
            new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.<TaskId>emptySet(), userEndPoint).encode()));
    subscriptions.put("consumer20",
            new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.<TaskId>emptySet(), userEndPoint).encode()));

    Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);

    // check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match
    // also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and
    // then later ones will be re-assigned to other hosts due to load balancing
    Set<TaskId> allActiveTasks = new HashSet<>();
    Set<TopicPartition> allPartitions = new HashSet<>();
    AssignmentInfo info;

    info = AssignmentInfo.decode(assignments.get("consumer10").userData());
    allActiveTasks.addAll(info.activeTasks);
    allPartitions.addAll(assignments.get("consumer10").partitions());

    info = AssignmentInfo.decode(assignments.get("consumer11").userData());
    allActiveTasks.addAll(info.activeTasks);
    allPartitions.addAll(assignments.get("consumer11").partitions());

    info = AssignmentInfo.decode(assignments.get("consumer20").userData());
    allActiveTasks.addAll(info.activeTasks);
    allPartitions.addAll(assignments.get("consumer20").partitions());

    assertEquals(allTasks, allActiveTasks);
    assertEquals(Utils.mkSet(t1p0, t1p1, t1p2, t2p0, t2p1, t2p2, t3p0, t3p1, t3p2, t3p3), allPartitions);
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamPartitionAssignorTest.java   
@Test
public void testAssignWithStates() throws Exception {
    String applicationId = "test";
    builder.setApplicationId(applicationId);
    builder.addSource("source1", "topic1");
    builder.addSource("source2", "topic2");

    builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1");
    builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor-1");

    builder.addProcessor("processor-2", new MockProcessorSupplier(), "source2");
    builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor-2");
    builder.addStateStore(new MockStateStoreSupplier("store3", false), "processor-2");

    List<String> topics = Utils.mkList("topic1", "topic2");

    TaskId task00 = new TaskId(0, 0);
    TaskId task01 = new TaskId(0, 1);
    TaskId task02 = new TaskId(0, 2);
    TaskId task10 = new TaskId(1, 0);
    TaskId task11 = new TaskId(1, 1);
    TaskId task12 = new TaskId(1, 2);
    List<TaskId> tasks = Utils.mkList(task00, task01, task02, task10, task11, task12);

    UUID uuid1 = UUID.randomUUID();
    UUID uuid2 = UUID.randomUUID();
    String client1 = "client1";


    StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
                                             0);

    partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
    partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));

    Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
    subscriptions.put("consumer10",
            new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
    subscriptions.put("consumer11",
            new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
    subscriptions.put("consumer20",
            new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));

    Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);

    // check assigned partition size: since there is no previous task and there are two sub-topologies the assignment is random so we cannot check exact match
    assertEquals(2, assignments.get("consumer10").partitions().size());
    assertEquals(2, assignments.get("consumer11").partitions().size());
    assertEquals(2, assignments.get("consumer20").partitions().size());

    AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
    AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
    AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());

    assertEquals(2, info10.activeTasks.size());
    assertEquals(2, info11.activeTasks.size());
    assertEquals(2, info20.activeTasks.size());

    Set<TaskId> allTasks = new HashSet<>();
    allTasks.addAll(info10.activeTasks);
    allTasks.addAll(info11.activeTasks);
    allTasks.addAll(info20.activeTasks);
    assertEquals(new HashSet<>(tasks), allTasks);

    // check tasks for state topics
    Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = thread10.builder.topicGroups();

    assertEquals(Utils.mkSet(task00, task01, task02), tasksForState(applicationId, "store1", tasks, topicGroups));
    assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store2", tasks, topicGroups));
    assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store3", tasks, topicGroups));
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamPartitionAssignorTest.java   
@Test
public void testAssignWithStandbyReplicas() throws Exception {
    Properties props = configProps();
    props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
    StreamsConfig config = new StreamsConfig(props);

    builder.addSource("source1", "topic1");
    builder.addSource("source2", "topic2");
    builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
    List<String> topics = Utils.mkList("topic1", "topic2");
    Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);


    final Set<TaskId> prevTasks00 = Utils.mkSet(task0);
    final Set<TaskId> prevTasks01 = Utils.mkSet(task1);
    final Set<TaskId> prevTasks02 = Utils.mkSet(task2);
    final Set<TaskId> standbyTasks01 = Utils.mkSet(task1);
    final Set<TaskId> standbyTasks02 = Utils.mkSet(task2);
    final Set<TaskId> standbyTasks00 = Utils.mkSet(task0);

    UUID uuid1 = UUID.randomUUID();
    UUID uuid2 = UUID.randomUUID();
    String client1 = "client1";

    StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
                                             0);

    partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
    partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));

    Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
    subscriptions.put("consumer10",
            new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks00, standbyTasks01, userEndPoint).encode()));
    subscriptions.put("consumer11",
            new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks01, standbyTasks02, userEndPoint).encode()));
    subscriptions.put("consumer20",
            new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks02, standbyTasks00, "any:9097").encode()));

    Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);

    Set<TaskId> allActiveTasks = new HashSet<>();
    Set<TaskId> allStandbyTasks = new HashSet<>();

    // the first consumer
    AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
    allActiveTasks.addAll(info10.activeTasks);
    allStandbyTasks.addAll(info10.standbyTasks.keySet());

    // the second consumer
    AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11"));
    allActiveTasks.addAll(info11.activeTasks);
    allStandbyTasks.addAll(info11.standbyTasks.keySet());

    assertNotEquals("same processId has same set of standby tasks", info11.standbyTasks.keySet(), info10.standbyTasks.keySet());

    // check active tasks assigned to the first client
    assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
    assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks));

    // the third consumer
    AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20"));
    allActiveTasks.addAll(info20.activeTasks);
    allStandbyTasks.addAll(info20.standbyTasks.keySet());

    // all task ids are in the active tasks and also in the standby tasks

    assertEquals(3, allActiveTasks.size());
    assertEquals(allTasks, allActiveTasks);

    assertEquals(3, allStandbyTasks.size());
    assertEquals(allTasks, allStandbyTasks);
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamPartitionAssignorTest.java   
@Test
public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Exception {
    final Properties props = configProps();
    props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
    final StreamsConfig config = new StreamsConfig(props);
    final KStreamBuilder builder = new KStreamBuilder();
    final String applicationId = "appId";
    builder.setApplicationId(applicationId);
    builder.stream("topic1").groupByKey().count("count");

    final UUID uuid = UUID.randomUUID();
    final String client = "client1";

    final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);

    partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client));
    partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, mockClientSupplier.restoreConsumer));

    final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
    final Set<TaskId> emptyTasks = Collections.emptySet();
    subscriptions.put(
            "consumer1",
            new PartitionAssignor.Subscription(
                    Collections.singletonList("topic1"),
                    new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode()
            )
    );

    subscriptions.put(
            "consumer2",
            new PartitionAssignor.Subscription(
                    Collections.singletonList("topic1"),
                    new SubscriptionInfo(UUID.randomUUID(), emptyTasks, emptyTasks, "other:9090").encode()
            )
    );
    final Set<TopicPartition> allPartitions = Utils.mkSet(t1p0, t1p1, t1p2);
    final Map<String, PartitionAssignor.Assignment> assign = partitionAssignor.assign(metadata, subscriptions);
    final PartitionAssignor.Assignment consumer1Assignment = assign.get("consumer1");
    final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumer1Assignment.userData());
    final Set<TopicPartition> consumer1partitions = assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 2171));
    final Set<TopicPartition> consumer2Partitions = assignmentInfo.partitionsByHost.get(new HostInfo("other", 9090));
    final HashSet<TopicPartition> allAssignedPartitions = new HashSet<>(consumer1partitions);
    allAssignedPartitions.addAll(consumer2Partitions);
    assertThat(consumer1partitions, not(allPartitions));
    assertThat(consumer2Partitions, not(allPartitions));
    assertThat(allAssignedPartitions, equalTo(allPartitions));
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConsumerTest.java   
@Test
public void verifyHeartbeatSent() throws Exception {
    int rebalanceTimeoutMs = 60000;
    int sessionTimeoutMs = 30000;
    int heartbeatIntervalMs = 1000;
    int autoCommitIntervalMs = 10000;

    Time time = new MockTime();
    Cluster cluster = TestUtils.singletonCluster(topic, 1);
    Node node = cluster.nodes().get(0);

    Metadata metadata = createMetadata();
    metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());

    MockClient client = new MockClient(time, metadata);
    client.setNode(node);
    PartitionAssignor assignor = new RoundRobinAssignor();

    final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
            rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);

    consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
    Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);

    // initial fetch
    client.prepareResponseFrom(fetchResponse(tp0, 0, 0), node);

    consumer.poll(0);
    assertEquals(Collections.singleton(tp0), consumer.assignment());

    AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, coordinator);

    // heartbeat interval is 2 seconds
    time.sleep(heartbeatIntervalMs);
    Thread.sleep(heartbeatIntervalMs);

    consumer.poll(0);

    assertTrue(heartbeatReceived.get());
    consumer.close(0, TimeUnit.MILLISECONDS);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConsumerTest.java   
@Test
public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception {
    int rebalanceTimeoutMs = 60000;
    int sessionTimeoutMs = 30000;
    int heartbeatIntervalMs = 1000;
    int autoCommitIntervalMs = 10000;

    Time time = new MockTime();
    Cluster cluster = TestUtils.singletonCluster(topic, 1);
    Node node = cluster.nodes().get(0);

    Metadata metadata = createMetadata();
    metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());

    MockClient client = new MockClient(time, metadata);
    client.setNode(node);
    PartitionAssignor assignor = new RoundRobinAssignor();

    final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
            rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);

    consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
    Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);

    consumer.poll(0);

    // respond to the outstanding fetch so that we have data available on the next poll
    client.respondFrom(fetchResponse(tp0, 0, 5), node);
    client.poll(0, time.milliseconds());

    client.prepareResponseFrom(fetchResponse(tp0, 5, 0), node);
    AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, coordinator);

    time.sleep(heartbeatIntervalMs);
    Thread.sleep(heartbeatIntervalMs);

    consumer.poll(0);

    assertTrue(heartbeatReceived.get());
    consumer.close(0, TimeUnit.MILLISECONDS);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConsumerTest.java   
@Test
public void testCommitsFetchedDuringAssign() {
    long offset1 = 10000;
    long offset2 = 20000;

    int rebalanceTimeoutMs = 6000;
    int sessionTimeoutMs = 3000;
    int heartbeatIntervalMs = 2000;
    int autoCommitIntervalMs = 1000;

    Time time = new MockTime();
    Cluster cluster = TestUtils.singletonCluster(topic, 1);
    Node node = cluster.nodes().get(0);

    Metadata metadata = createMetadata();
    metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());

    MockClient client = new MockClient(time, metadata);
    client.setNode(node);
    PartitionAssignor assignor = new RoundRobinAssignor();

    final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
            rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
    consumer.assign(singletonList(tp0));

    // lookup coordinator
    client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
    Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());

    // fetch offset for one topic
    client.prepareResponseFrom(
            offsetResponse(Collections.singletonMap(tp0, offset1), Errors.NONE),
            coordinator);

    assertEquals(offset1, consumer.committed(tp0).offset());

    consumer.assign(Arrays.asList(tp0, tp1));

    // fetch offset for two topics
    Map<TopicPartition, Long> offsets = new HashMap<>();
    offsets.put(tp0, offset1);
    client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
    assertEquals(offset1, consumer.committed(tp0).offset());

    offsets.remove(tp0);
    offsets.put(tp1, offset2);
    client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
    assertEquals(offset2, consumer.committed(tp1).offset());
    consumer.close(0, TimeUnit.MILLISECONDS);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConsumerTest.java   
@Test
public void testAutoCommitSentBeforePositionUpdate() {
    int rebalanceTimeoutMs = 60000;
    int sessionTimeoutMs = 30000;
    int heartbeatIntervalMs = 3000;

    // adjust auto commit interval lower than heartbeat so we don't need to deal with
    // a concurrent heartbeat request
    int autoCommitIntervalMs = 1000;

    Time time = new MockTime();
    Cluster cluster = TestUtils.singletonCluster(topic, 1);
    Node node = cluster.nodes().get(0);

    Metadata metadata = createMetadata();
    metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());

    MockClient client = new MockClient(time, metadata);
    client.setNode(node);
    PartitionAssignor assignor = new RoundRobinAssignor();

    final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
            rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);

    consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
    Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);

    consumer.poll(0);

    // respond to the outstanding fetch so that we have data available on the next poll
    client.respondFrom(fetchResponse(tp0, 0, 5), node);
    client.poll(0, time.milliseconds());

    time.sleep(autoCommitIntervalMs);

    client.prepareResponseFrom(fetchResponse(tp0, 5, 0), node);

    // no data has been returned to the user yet, so the committed offset should be 0
    AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, coordinator, tp0, 0);

    consumer.poll(0);

    assertTrue(commitReceived.get());
    consumer.close(0, TimeUnit.MILLISECONDS);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConsumerTest.java   
@Test
public void testChangingRegexSubscription() {
    int rebalanceTimeoutMs = 60000;
    int sessionTimeoutMs = 30000;
    int heartbeatIntervalMs = 3000;
    int autoCommitIntervalMs = 1000;
    PartitionAssignor assignor = new RoundRobinAssignor();

    String otherTopic = "other";
    TopicPartition otherTopicPartition = new TopicPartition(otherTopic, 0);

    Time time = new MockTime();

    Map<String, Integer> topicMetadata = new HashMap<>();
    topicMetadata.put(topic, 1);
    topicMetadata.put(otherTopic, 1);

    Cluster cluster = TestUtils.clusterWith(1, topicMetadata);
    Metadata metadata = createMetadata();
    Node node = cluster.nodes().get(0);

    MockClient client = new MockClient(time, metadata);
    client.setNode(node);
    client.cluster(cluster);

    metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());

    final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
            rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, autoCommitIntervalMs);

    Node coordinator = prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
    consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));

    consumer.poll(0);
    assertEquals(singleton(topic), consumer.subscription());

    consumer.subscribe(Pattern.compile(otherTopic), getConsumerRebalanceListener(consumer));

    prepareRebalance(client, node, singleton(otherTopic), assignor, singletonList(otherTopicPartition), coordinator);
    consumer.poll(0);

    assertEquals(singleton(otherTopic), consumer.subscription());
    consumer.close(0, TimeUnit.MILLISECONDS);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConsumerTest.java   
@Test
public void testWakeupWithFetchDataAvailable() throws Exception {
    int rebalanceTimeoutMs = 60000;
    final int sessionTimeoutMs = 30000;
    int heartbeatIntervalMs = 3000;

    // adjust auto commit interval lower than heartbeat so we don't need to deal with
    // a concurrent heartbeat request
    int autoCommitIntervalMs = 1000;

    final Time time = new MockTime();
    Cluster cluster = TestUtils.singletonCluster(topic, 1);
    Node node = cluster.nodes().get(0);

    Metadata metadata = createMetadata();
    metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());

    MockClient client = new MockClient(time, metadata);
    client.setNode(node);
    PartitionAssignor assignor = new RoundRobinAssignor();

    final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
            rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);

    consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
    prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);

    consumer.poll(0);

    // respond to the outstanding fetch so that we have data available on the next poll
    client.respondFrom(fetchResponse(tp0, 0, 5), node);
    client.poll(0, time.milliseconds());

    consumer.wakeup();

    try {
        consumer.poll(0);
        fail();
    } catch (WakeupException e) {
    }

    // make sure the position hasn't been updated
    assertEquals(0, consumer.position(tp0));

    // the next poll should return the completed fetch
    ConsumerRecords<String, String> records = consumer.poll(0);
    assertEquals(5, records.count());
    // Increment time asynchronously to clear timeouts in closing the consumer
    final ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
    exec.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            time.sleep(sessionTimeoutMs);
        }
    }, 0L, 10L, TimeUnit.MILLISECONDS);
    consumer.close();
    exec.shutdownNow();
    exec.awaitTermination(5L, TimeUnit.SECONDS);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConsumerTest.java   
@Test
public void testManualAssignmentChangeWithAutoCommitEnabled() {
    int rebalanceTimeoutMs = 60000;
    int sessionTimeoutMs = 30000;
    int heartbeatIntervalMs = 3000;
    int autoCommitIntervalMs = 1000;

    Time time = new MockTime();
    Map<String, Integer> tpCounts = new HashMap<>();
    tpCounts.put(topic, 1);
    tpCounts.put(topic2, 1);
    Cluster cluster = TestUtils.singletonCluster(tpCounts);
    Node node = cluster.nodes().get(0);

    Metadata metadata = createMetadata();
    metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());

    MockClient client = new MockClient(time, metadata);
    client.setNode(node);
    PartitionAssignor assignor = new RangeAssignor();

    final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
            rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);

    // lookup coordinator
    client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
    Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());

    // manual assignment
    consumer.assign(Arrays.asList(tp0));
    consumer.seekToBeginning(Arrays.asList(tp0));

    // fetch offset for one topic
    client.prepareResponseFrom(
            offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE),
            coordinator);
    assertEquals(0, consumer.committed(tp0).offset());

    // verify that assignment immediately changes
    assertTrue(consumer.assignment().equals(Collections.singleton(tp0)));

    // there shouldn't be any need to lookup the coordinator or fetch committed offsets.
    // we just lookup the starting position and send the record fetch.
    client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L), Errors.NONE));
    client.prepareResponse(fetchResponse(tp0, 10L, 1));

    ConsumerRecords<String, String> records = consumer.poll(0);
    assertEquals(1, records.count());
    assertEquals(11L, consumer.position(tp0));

    // mock the offset commit response for to be revoked partitions
    AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, coordinator, tp0, 11);

    // new manual assignment
    consumer.assign(Arrays.asList(t2p0));

    // verify that assignment immediately changes
    assertTrue(consumer.assignment().equals(Collections.singleton(t2p0)));
    // verify that the offset commits occurred as expected
    assertTrue(commitReceived.get());

    client.requests().clear();
    consumer.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConsumerTest.java   
@Test
public void testManualAssignmentChangeWithAutoCommitDisabled() {
    int rebalanceTimeoutMs = 60000;
    int sessionTimeoutMs = 30000;
    int heartbeatIntervalMs = 3000;
    int autoCommitIntervalMs = 1000;

    Time time = new MockTime();
    Map<String, Integer> tpCounts = new HashMap<>();
    tpCounts.put(topic, 1);
    tpCounts.put(topic2, 1);
    Cluster cluster = TestUtils.singletonCluster(tpCounts);
    Node node = cluster.nodes().get(0);

    Metadata metadata = createMetadata();
    metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());

    MockClient client = new MockClient(time, metadata);
    client.setNode(node);
    PartitionAssignor assignor = new RangeAssignor();

    final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
            rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, autoCommitIntervalMs);

    // lookup coordinator
    client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
    Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());

    // manual assignment
    consumer.assign(Arrays.asList(tp0));
    consumer.seekToBeginning(Arrays.asList(tp0));

    // fetch offset for one topic
    client.prepareResponseFrom(
            offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE),
            coordinator);
    assertEquals(0, consumer.committed(tp0).offset());

    // verify that assignment immediately changes
    assertTrue(consumer.assignment().equals(Collections.singleton(tp0)));

    // there shouldn't be any need to lookup the coordinator or fetch committed offsets.
    // we just lookup the starting position and send the record fetch.
    client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L), Errors.NONE));
    client.prepareResponse(fetchResponse(tp0, 10L, 1));

    ConsumerRecords<String, String> records = consumer.poll(0);
    assertEquals(1, records.count());
    assertEquals(11L, consumer.position(tp0));

    // new manual assignment
    consumer.assign(Arrays.asList(t2p0));

    // verify that assignment immediately changes
    assertTrue(consumer.assignment().equals(Collections.singleton(t2p0)));

    // the auto commit is disabled, so no offset commit request should be sent
    for (ClientRequest req : client.requests())
        assertTrue(req.requestBuilder().apiKey() != ApiKeys.OFFSET_COMMIT);

    client.requests().clear();
    consumer.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConsumerTest.java   
private JoinGroupResponse joinGroupFollowerResponse(PartitionAssignor assignor, int generationId, String memberId, String leaderId, Errors error) {
    return new JoinGroupResponse(error, generationId, assignor.name(), memberId, leaderId,
            Collections.<String, ByteBuffer>emptyMap());
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConsumerTest.java   
private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, Errors error) {
    ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions));
    return new SyncGroupResponse(error, buf);
}
项目:kafka    文件:KafkaConsumerTest.java   
private Struct joinGroupFollowerResponse(PartitionAssignor assignor, int generationId, String memberId, String leaderId, short error) {
    return new JoinGroupResponse(error, generationId, assignor.name(), memberId, leaderId,
            Collections.<String, ByteBuffer>emptyMap()).toStruct();
}
项目:kafka    文件:KafkaConsumerTest.java   
private Struct syncGroupResponse(List<TopicPartition> partitions, short error) {
    ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions));
    return new SyncGroupResponse(error, buf).toStruct();
}