public static void setFederatedHAConfiguration(MiniDFSCluster cluster, Configuration conf) { Map<String, List<String>> nameservices = Maps.newHashMap(); for (NameNodeInfo info : cluster.getNameNodeInfos()) { Preconditions.checkState(info.nameserviceId != null); List<String> nns = nameservices.get(info.nameserviceId); if (nns == null) { nns = Lists.newArrayList(); nameservices.put(info.nameserviceId, nns); } nns.add(info.nnId); conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, info.nameserviceId, info.nnId), DFSUtil.createUri(HdfsConstants.HDFS_URI_SCHEME, info.nameNode.getNameNodeAddress()).toString()); conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, info.nameserviceId, info.nnId), DFSUtil.createUri(HdfsConstants.HDFS_URI_SCHEME, info.nameNode.getNameNodeAddress()).toString()); } for (Map.Entry<String, List<String>> entry : nameservices.entrySet()) { conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, entry.getKey()), Joiner.on(",").join(entry.getValue())); conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + entry .getKey(), ConfiguredFailoverProxyProvider.class.getName()); } conf.set(DFSConfigKeys.DFS_NAMESERVICES, Joiner.on(",") .join(nameservices.keySet())); }
public static void setFederatedHAConfiguration(MiniDFSCluster cluster, Configuration conf) { Map<String, List<String>> nameservices = Maps.newHashMap(); for (NameNodeInfo info : cluster.getNameNodeInfos()) { Preconditions.checkState(info.nameserviceId != null); List<String> nns = nameservices.get(info.nameserviceId); if (nns == null) { nns = Lists.newArrayList(); nameservices.put(info.nameserviceId, nns); } nns.add(info.nnId); conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, info.nameserviceId, info.nnId), DFSUtil.createUri(HdfsConstants.HDFS_URI_SCHEME, info.nameNode.getNameNodeAddress()).toString()); conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, info.nameserviceId, info.nnId), DFSUtil.createUri(HdfsConstants.HDFS_URI_SCHEME, info.nameNode.getNameNodeAddress()).toString()); } for (Map.Entry<String, List<String>> entry : nameservices.entrySet()) { conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, entry.getKey()), Joiner.on(",").join(entry.getValue())); conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + entry.getKey(), ConfiguredFailoverProxyProvider.class.getName()); } conf.set(DFSConfigKeys.DFS_NAMESERVICES, Joiner.on(",") .join(nameservices.keySet())); }
@Test public void testSetUpFederatedCluster() throws Exception { Configuration conf = new Configuration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology( MiniDFSNNTopology.simpleHAFederatedTopology(2)) .numDataNodes(2) .build(); try { cluster.waitActive(); cluster.transitionToActive(1); cluster.transitionToActive(3); assertEquals("standby", cluster.getNamesystem(0).getHAState()); assertEquals("active", cluster.getNamesystem(1).getHAState()); assertEquals("standby", cluster.getNamesystem(2).getHAState()); assertEquals("active", cluster.getNamesystem(3).getHAState()); String ns0nn0 = conf.get( DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns0", "nn0")); String ns0nn1 = conf.get( DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns0", "nn1")); String ns1nn0 = conf.get( DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns1", "nn0")); String ns1nn1 = conf.get( DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns1", "nn1")); for(NameNodeInfo nnInfo : cluster.getNameNodeInfos()) { assertEquals(ns0nn0, nnInfo.conf.get( DFSUtil.addKeySuffixes( DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns0", "nn0"))); assertEquals(ns0nn1, nnInfo.conf.get( DFSUtil.addKeySuffixes( DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns0", "nn1"))); assertEquals(ns1nn0, nnInfo.conf.get( DFSUtil.addKeySuffixes( DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns1", "nn0"))); assertEquals(ns1nn1, nnInfo.conf.get( DFSUtil.addKeySuffixes( DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns1", "nn1"))); } } finally { MiniDFSCluster.shutdownCluster(cluster); } }