/** Test DEFAULT ReplaceDatanodeOnFailure policy. */ @Test public void testDefaultPolicy() throws Exception { final Configuration conf = new HdfsConfiguration(); final ReplaceDatanodeOnFailure p = ReplaceDatanodeOnFailure.get(conf); final DatanodeInfo[] infos = new DatanodeInfo[5]; final DatanodeInfo[][] datanodes = new DatanodeInfo[infos.length + 1][]; datanodes[0] = new DatanodeInfo[0]; for(int i = 0; i < infos.length; ) { infos[i] = DFSTestUtil.getLocalDatanodeInfo(50020 + i); i++; datanodes[i] = new DatanodeInfo[i]; System.arraycopy(infos, 0, datanodes[i], 0, datanodes[i].length); } final boolean[] isAppend = {true, true, false, false}; final boolean[] isHflushed = {true, false, true, false}; for(short replication = 1; replication <= infos.length; replication++) { for(int nExistings = 0; nExistings < datanodes.length; nExistings++) { final DatanodeInfo[] existings = datanodes[nExistings]; Assert.assertEquals(nExistings, existings.length); for(int i = 0; i < isAppend.length; i++) { for(int j = 0; j < isHflushed.length; j++) { final int half = replication/2; final boolean enoughReplica = replication <= nExistings; final boolean noReplica = nExistings == 0; final boolean replicationL3 = replication < 3; final boolean existingsLEhalf = nExistings <= half; final boolean isAH = isAppend[i] || isHflushed[j]; final boolean expected; if (enoughReplica || noReplica || replicationL3) { expected = false; } else { expected = isAH || existingsLEhalf; } final boolean computed = p.satisfy( replication, existings, isAppend[i], isHflushed[j]); try { Assert.assertEquals(expected, computed); } catch(AssertionError e) { final String s = "replication=" + replication + "\nnExistings =" + nExistings + "\nisAppend =" + isAppend[i] + "\nisHflushed =" + isHflushed[j]; throw new RuntimeException(s, e); } } } } } }
/** * Create a new DFSClient connected to the given nameNodeUri or rpcNamenode. * Exactly one of nameNodeUri or rpcNamenode must be null. */ @VisibleForTesting public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats) throws IOException { // Copy only the required DFSClient configuration this.dfsClientConf = new Conf(conf); this.shouldUseLegacyBlockReaderLocal = this.dfsClientConf.useLegacyBlockReaderLocal; if (this.dfsClientConf.useLegacyBlockReaderLocal) { LOG.debug("Using legacy short-circuit local reads."); } this.conf = conf; this.stats = stats; this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf); this.ugi = UserGroupInformation.getCurrentUser(); this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority(); this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" + DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId(); if (rpcNamenode != null) { // This case is used for testing. Preconditions.checkArgument(nameNodeUri == null); this.namenode = rpcNamenode; dtService = null; } else { Preconditions.checkArgument(nameNodeUri != null, "null URI"); NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class); this.dtService = proxyInfo.getDelegationTokenService(); this.namenode = proxyInfo.getProxy(); } // read directly from the block file if configured. this.domainSocketFactory = new DomainSocketFactory(dfsClientConf); String localInterfaces[] = conf.getTrimmedStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES); localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces); if (LOG.isDebugEnabled() && 0 != localInterfaces.length) { LOG.debug("Using local interfaces [" + Joiner.on(',').join(localInterfaces)+ "] with addresses [" + Joiner.on(',').join(localInterfaceAddrs) + "]"); } this.peerCache = PeerCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry); }
/** Test DEFAULT ReplaceDatanodeOnFailure policy. */ @Test public void testDefaultPolicy() throws Exception { final ReplaceDatanodeOnFailure p = ReplaceDatanodeOnFailure.DEFAULT; final DatanodeInfo[] infos = new DatanodeInfo[5]; final DatanodeInfo[][] datanodes = new DatanodeInfo[infos.length + 1][]; datanodes[0] = new DatanodeInfo[0]; for(int i = 0; i < infos.length; ) { infos[i] = DFSTestUtil.getLocalDatanodeInfo(50020 + i); i++; datanodes[i] = new DatanodeInfo[i]; System.arraycopy(infos, 0, datanodes[i], 0, datanodes[i].length); } final boolean[] isAppend = {true, true, false, false}; final boolean[] isHflushed = {true, false, true, false}; for(short replication = 1; replication <= infos.length; replication++) { for(int nExistings = 0; nExistings < datanodes.length; nExistings++) { final DatanodeInfo[] existings = datanodes[nExistings]; Assert.assertEquals(nExistings, existings.length); for(int i = 0; i < isAppend.length; i++) { for(int j = 0; j < isHflushed.length; j++) { final int half = replication/2; final boolean enoughReplica = replication <= nExistings; final boolean noReplica = nExistings == 0; final boolean replicationL3 = replication < 3; final boolean existingsLEhalf = nExistings <= half; final boolean isAH = isAppend[i] || isHflushed[j]; final boolean expected; if (enoughReplica || noReplica || replicationL3) { expected = false; } else { expected = isAH || existingsLEhalf; } final boolean computed = p.satisfy( replication, existings, isAppend[i], isHflushed[j]); try { Assert.assertEquals(expected, computed); } catch(AssertionError e) { final String s = "replication=" + replication + "\nnExistings =" + nExistings + "\nisAppend =" + isAppend[i] + "\nisHflushed =" + isHflushed[j]; throw new RuntimeException(s, e); } } } } } }
/** * Create a new DFSClient connected to the given nameNodeUri or rpcNamenode. * Exactly one of nameNodeUri or rpcNamenode must be null. */ DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats) throws IOException { // Copy only the required DFSClient configuration this.dfsClientConf = new Conf(conf); checkSmallFilesSupportConf(conf); this.conf = conf; this.stats = stats; this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf); // The hdfsTimeout is currently the same as the ipc timeout this.hdfsTimeout = Client.getTimeout(conf); this.ugi = UserGroupInformation.getCurrentUser(); this.authority = nameNodeUri == null ? "null" : nameNodeUri.getAuthority(); String clientNamePrefix = ""; if(dfsClientConf.hdfsClientEmulationForSF){ clientNamePrefix = "DFSClient"; }else{ clientNamePrefix = "HopsFS_DFSClient"; } this.clientName = clientNamePrefix+ "_" + dfsClientConf.taskId + "_" + DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId(); if (rpcNamenode != null) { // This case is used for testing. Preconditions.checkArgument(nameNodeUri == null); namenodeSelector = new NamenodeSelector(conf, rpcNamenode, this.ugi); dtService = null; } else { Preconditions.checkArgument(nameNodeUri != null, "null URI"); NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri, this.ugi, ClientProtocol.class); this.dtService = proxyInfo.getDelegationTokenService(); namenodeSelector = new NamenodeSelector(conf, nameNodeUri, this.ugi); } // read directly from the block file if configured. this.shortCircuitLocalReads = conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT); if (LOG.isDebugEnabled()) { LOG.debug("Short circuit read is " + shortCircuitLocalReads); } String localInterfaces[] = conf.getTrimmedStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES); localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces); if (LOG.isDebugEnabled() && 0 != localInterfaces.length) { LOG.debug("Using local interfaces [" + Joiner.on(',').join(localInterfaces) + "] with addresses [" + Joiner.on(',').join(localInterfaceAddrs) + "]"); } this.socketCache = SocketCache .getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry); this.MAX_RPC_RETRIES = conf.getInt(DFSConfigKeys.DFS_CLIENT_RETRIES_ON_FAILURE_KEY, DFSConfigKeys.DFS_CLIENT_RETRIES_ON_FAILURE_DEFAULT); }
/** * Test DEFAULT ReplaceDatanodeOnFailure policy. */ @Test public void testDefaultPolicy() throws Exception { final ReplaceDatanodeOnFailure p = ReplaceDatanodeOnFailure.DEFAULT; final DatanodeInfo[] infos = new DatanodeInfo[5]; final DatanodeInfo[][] datanodes = new DatanodeInfo[infos.length + 1][]; datanodes[0] = new DatanodeInfo[0]; for (int i = 0; i < infos.length; ) { infos[i] = DFSTestUtil.getLocalDatanodeInfo(50020 + i); i++; datanodes[i] = new DatanodeInfo[i]; System.arraycopy(infos, 0, datanodes[i], 0, datanodes[i].length); } final boolean[] isAppend = {true, true, false, false}; final boolean[] isHflushed = {true, false, true, false}; for (short replication = 1; replication <= infos.length; replication++) { for (int nExistings = 0; nExistings < datanodes.length; nExistings++) { final DatanodeInfo[] existings = datanodes[nExistings]; Assert.assertEquals(nExistings, existings.length); for (boolean anIsAppend : isAppend) { for (boolean anIsHflushed : isHflushed) { final int half = replication / 2; final boolean enoughReplica = replication <= nExistings; final boolean noReplica = nExistings == 0; final boolean replicationL3 = replication < 3; final boolean existingsLEhalf = nExistings <= half; final boolean isAH = anIsAppend || anIsHflushed; final boolean expected; if (enoughReplica || noReplica || replicationL3) { expected = false; } else { expected = isAH || existingsLEhalf; } final boolean computed = p.satisfy(replication, existings, anIsAppend, anIsHflushed); try { Assert.assertEquals(expected, computed); } catch (AssertionError e) { final String s = "replication=" + replication + "\nnExistings =" + nExistings + "\nisAppend =" + anIsAppend + "\nisHflushed =" + anIsHflushed; throw new RuntimeException(s, e); } } } } } }