@Override public Map<String, List<String>> getActiveTopicMap() { Map<String, List<String>> topicGroupsMap = new HashMap<String, List<String>>(); List<String> consumers = ZKUtils.getChildren(ZkUtils.ConsumersPath()); for (String consumer : consumers) { Map<String, scala.collection.immutable.List<ConsumerThreadId>> consumer_consumerThreadId = null; try { consumer_consumerThreadId = JavaConversions .mapAsJavaMap(ZKUtils.getZKUtilsFromKafka().getConsumersPerTopic(consumer, true)); } catch (Exception e) { LOG.warn("getActiveTopicMap-> getConsumersPerTopic for group: " + consumer + "failed! " + e.getMessage()); // TODO /consumers/{group}/ids/{id} 节点的内容不符合要求。这个group有问题 continue; } Set<String> topics = consumer_consumerThreadId.keySet(); topics.forEach(topic -> { List<String> _groups = null; if (topicGroupsMap.containsKey(topic)) { _groups = topicGroupsMap.get(topic); _groups.add(consumer); } else { _groups = new ArrayList<String>(); _groups.add(consumer); } topicGroupsMap.put(topic, _groups); }); } return topicGroupsMap; }
/** * This method is called after the new partition assignment is finished but before fetcher * threads start. A map of new global partition assignment is passed in as parameter. * @param consumerId The consumer Id string of the consumer invoking this callback. * @param globalPartitionAssignment A Map[topic, Map[Partition, ConsumerThreadId]]. It is the global partition * assignment of this consumer group. */ public void beforeStartingFetchers(String consumerId, Map<String, Map<Integer, ConsumerThreadId>> globalPartitionAssignment);