/** * Try to recover the lease. * @param dfs * @param nbAttempt * @param p * @param startWaiting * @return True if dfs#recoverLease came by true. * @throws FileNotFoundException */ boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, final Path p, final long startWaiting) throws FileNotFoundException { boolean recovered = false; try { recovered = dfs.recoverLease(p); LOG.info((recovered? "Recovered lease, ": "Failed to recover lease, ") + getLogMessageDetail(nbAttempt, p, startWaiting)); } catch (IOException e) { if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) { // This exception comes out instead of FNFE, fix it throw new FileNotFoundException("The given WAL wasn't found at " + p); } else if (e instanceof FileNotFoundException) { throw (FileNotFoundException)e; } LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e); } return recovered; }
/** * Try to recover the lease. * @param dfs The filesystem instance. * @param nbAttempt Count number of this attempt. * @param p Path of the file to recover. * @param startWaiting Timestamp of when we started attempting to recover the file lease. * @return True if dfs#recoverLease came by true. * @throws java.io.FileNotFoundException */ boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, final Path p, final long startWaiting) throws FileNotFoundException { boolean recovered = false; try { recovered = dfs.recoverLease(p); LOG.info("recoverLease=" + recovered + ", " + getLogMessageDetail(nbAttempt, p, startWaiting)); } catch (IOException e) { if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) { // This exception comes out instead of FNFE, fix it throw new FileNotFoundException("The given file wasn't found at " + p); } else if (e instanceof FileNotFoundException) { throw (FileNotFoundException) e; } LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e); } return recovered; }
/** * Try to recover the lease. * @param dfs * @param nbAttempt * @param p * @param startWaiting * @return True if dfs#recoverLease came by true. * @throws FileNotFoundException */ boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, final Path p, final long startWaiting) throws FileNotFoundException { boolean recovered = false; try { recovered = dfs.recoverLease(p); LOG.info("recoverLease=" + recovered + ", " + getLogMessageDetail(nbAttempt, p, startWaiting)); } catch (IOException e) { if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) { // This exception comes out instead of FNFE, fix it throw new FileNotFoundException("The given HLog wasn't found at " + p); } else if (e instanceof FileNotFoundException) { throw (FileNotFoundException)e; } LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e); } return recovered; }
@Test public void testMovedHLogDuringRecovery() throws Exception { generateHLogs(-1); fs.initialize(fs.getUri(), conf); // This partial mock will throw LEE for every file simulating // files that were moved FileSystem spiedFs = Mockito.spy(fs); // The "File does not exist" part is very important, // that's how it comes out of HDFS Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")). when(spiedFs).append(Mockito.<Path>any()); HLogSplitter logSplitter = new HLogSplitter( conf, hbaseDir, hlogDir, oldLogDir, spiedFs); try { logSplitter.splitLog(); assertEquals(NUM_WRITERS, fs.listStatus(oldLogDir).length); assertFalse(fs.exists(hlogDir)); } catch (IOException e) { fail("There shouldn't be any exception but: " + e.toString()); } }
/** * Try to recover the lease. * @param dfs * @param nbAttempt * @param p * @param startWaiting * @return True if dfs#recoverLease came by true. * @throws FileNotFoundException */ boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, final Path p, final long startWaiting) throws FileNotFoundException { boolean recovered = false; try { recovered = dfs.recoverLease(p); LOG.info("recoverLease=" + recovered + ", " + getLogMessageDetail(nbAttempt, p, startWaiting)); } catch (IOException e) { if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) { // This exception comes out instead of FNFE, fix it throw new FileNotFoundException("The given WAL wasn't found at " + p); } else if (e instanceof FileNotFoundException) { throw (FileNotFoundException)e; } LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e); } return recovered; }
@Test (timeout=300000) public void testMovedHLogDuringRecovery() throws Exception { generateHLogs(-1); fs.initialize(fs.getUri(), conf); // This partial mock will throw LEE for every file simulating // files that were moved FileSystem spiedFs = Mockito.spy(fs); // The "File does not exist" part is very important, // that's how it comes out of HDFS Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")). when(spiedFs).append(Mockito.<Path>any()); try { HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, spiedFs, conf); assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length); assertFalse(fs.exists(HLOGDIR)); } catch (IOException e) { fail("There shouldn't be any exception but: " + e.toString()); } }
@Test public void testMovedHLogDuringRecovery() throws Exception { generateHLogs(-1); fs.initialize(fs.getUri(), conf); // This partial mock will throw LEE for every file simulating // files that were moved FileSystem spiedFs = Mockito.spy(fs); // The "File does not exist" part is very important, // that's how it comes out of HDFS Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")). when(spiedFs).append(Mockito.<Path>any()); HLogSplitter logSplitter = new HLogSplitter( conf, HBASEDIR, HLOGDIR, OLDLOGDIR, spiedFs, null); try { logSplitter.splitLog(); assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length); assertFalse(fs.exists(HLOGDIR)); } catch (IOException e) { fail("There shouldn't be any exception but: " + e.toString()); } }
/** * Test complete(..) - verifies that the fileId in the request * matches that of the Inode. * This test checks that FileNotFoundException exception is thrown in case * the fileId does not match. */ @Test public void testFileIdMismatch() throws IOException { Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); DistributedFileSystem dfs = null; try { cluster.waitActive(); dfs = cluster.getFileSystem(); DFSClient client = dfs.dfs; final Path f = new Path("/testFileIdMismatch.txt"); createFile(dfs, f, 3); long someOtherFileId = -1; try { cluster.getNameNodeRpc() .complete(f.toString(), client.clientName, null, someOtherFileId); fail(); } catch(LeaseExpiredException e) { FileSystem.LOG.info("Caught Expected LeaseExpiredException: ", e); } } finally { IOUtils.closeStream(dfs); cluster.shutdown(); } }
@Test (timeout=300000) public void testMovedWALDuringRecovery() throws Exception { // This partial mock will throw LEE for every file simulating // files that were moved FileSystem spiedFs = Mockito.spy(fs); // The "File does not exist" part is very important, // that's how it comes out of HDFS Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")). when(spiedFs).append(Mockito.<Path>any()); retryOverHdfsProblem(spiedFs); }
@Override protected void transferFile(boolean deleteSource, String destination, String filename, String localDirectory) throws IOException { Path source = new Path(localDirectory + BaseESReducer.DIR_SEPARATOR + filename); ensurePathExists(destination); try{ hdfsFileSystem.copyFromLocalFile(deleteSource, true, source, new Path(destination + BaseESReducer.DIR_SEPARATOR + filename)); } catch(LeaseExpiredException | RemoteException e) { // This is an expected race condition where 2 reducers are trying to write the manifest files at the same time. That's okay, it only has to succeed once. logger.warn("Exception from 2 reducers writing the same file concurrently. One writer failed to obtain a lease. Destination " + destination + " filename " + filename + " localDirectory " + localDirectory, e); } }
/** * Create a new dfs file with the specified block replication * with write-progress reporting and return an output stream for writing * into the file. * * @param src stream name * @param permission The permission of the directory being created. * If permission == null, use {@link FsPermission#getDefault()}. * @param overwrite do not check for file existence if true * @param replication block replication * @param forceSync a hdfs sync() operation invokes local filesystem sync * on datanodes. * @param doParallelWrites write replicas in parallel * @param favoredNodes nodes on which to place replicas if possible * @return output stream * @throws IOException * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long) */ public OutputStream create(String src, FsPermission permission, boolean overwrite, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, int bytesPerChecksum, boolean forceSync, boolean doParallelWrites, InetSocketAddress[] favoredNodes) throws IOException { checkOpen(); if (permission == null) { permission = FsPermission.getDefault(); } boolean success = false; try { FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf)); LOG.debug(src + ": masked=" + masked); // For each of the favored nodes, mock up a DatanodeInfo with the IP // address and port of that node. DatanodeInfo[] favoredNodeInfos = null; if (favoredNodes != null) { favoredNodeInfos = new DatanodeInfo[favoredNodes.length]; for (int i = 0; i < favoredNodes.length; i++) { favoredNodeInfos[i] = new DatanodeInfo(new DatanodeID( favoredNodes[i].getAddress().getHostAddress() + ":" + favoredNodes[i].getPort())); } } OutputStream result = new DFSOutputStream(this, src, masked, overwrite, createParent, replication, blockSize, progress, buffersize, bytesPerChecksum, forceSync, doParallelWrites, favoredNodeInfos); leasechecker.put(src, result); metrics.incNumCreateFileOps(); if (stats != null) { stats.incrementFilesCreated(); } success = true; return result; } finally { if (!success && namenodeProtocolProxy.isMethodSupported( "abandonFile", String.class, String.class)) { try { namenode.abandonFile(src, clientName); } catch (RemoteException e) { if (e.unwrapRemoteException() instanceof LeaseExpiredException) { LOG.debug(String.format( "client %s attempting to abandon file %s which it does not own", clientName, src), e ); } else { throw e; } } } } }
/** * Append to an existing HDFS file. * * @param src file name * @param buffersize buffer size * @param progress for reporting write-progress * @return an output stream for writing into the file * @throws IOException * @see ClientProtocol#append(String, String) */ OutputStream append(String src, int buffersize, Progressable progress ) throws IOException { checkOpen(); FileStatus stat = null; LocatedBlock lastBlock = null; boolean success = false; int namespaceId = 0; try { stat = getFileInfo(src); if (namenodeProtocolProxy != null && namenodeProtocolProxy.isMethodSupported( "appendAndFetchMetaInfo", String.class, String.class)) { LocatedBlockWithMetaInfo loc = namenode.appendAndFetchMetaInfo(src, clientName); lastBlock = loc; if (loc != null) { namespaceId = loc.getNamespaceID(); updateDataTransferProtocolVersionIfNeeded(loc.getDataProtocolVersion()); getNewNameNodeIfNeeded(loc.getMethodFingerPrint()); } } else { lastBlock = namenode.append(src, clientName); } OutputStream result = new DFSOutputStream(this, src, buffersize, progress, lastBlock, stat, conf.getInt("io.bytes.per.checksum", 512), namespaceId); leasechecker.put(src, result); success = true; return result; } catch(RemoteException re) { throw re.unwrapRemoteException(FileNotFoundException.class, AccessControlException.class, NSQuotaExceededException.class, DSQuotaExceededException.class); } finally { if (!success) { try { namenode.abandonFile(src, clientName); } catch (RemoteException e) { if (e.unwrapRemoteException() instanceof LeaseExpiredException) { LOG.debug(String.format( "client %s attempting to abandon file %s which it does not own", clientName, src), e ); } else { throw e; } } } } }