/** * The idea for making sure that there is no more than one instance * running in an HDFS is to create a file in the HDFS, writes the hostname * of the machine on which the instance is running to the file, but did not * close the file until it exits. * * This prevents the second instance from running because it can not * creates the file while the first one is running. * * This method checks if there is any running instance. If no, mark yes. * Note that this is an atomic operation. * * @return null if there is a running instance; * otherwise, the output stream to the newly created file. */ private OutputStream checkAndMarkRunning() throws IOException { try { if (fs.exists(idPath)) { // try appending to it so that it will fail fast if another balancer is // running. IOUtils.closeStream(fs.append(idPath)); fs.delete(idPath, true); } final FSDataOutputStream fsout = fs.create(idPath, false); // mark balancer idPath to be deleted during filesystem closure fs.deleteOnExit(idPath); if (write2IdFile) { fsout.writeBytes(InetAddress.getLocalHost().getHostName()); fsout.hflush(); } return fsout; } catch(RemoteException e) { if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){ return null; } else { throw e; } } }
@Override public FSDataOutputStream call() throws IOException { try { FileSystem fs = FSUtils.getCurrentFileSystem(getConf()); FsPermission defaultPerms = FSUtils.getFilePermissions(fs, getConf(), HConstants.DATA_FILE_UMASK_KEY); Path tmpDir = new Path(FSUtils.getRootDir(getConf()), HConstants.HBASE_TEMP_DIRECTORY); fs.mkdirs(tmpDir); HBCK_LOCK_PATH = new Path(tmpDir, HBCK_LOCK_FILE); final FSDataOutputStream out = createFileWithRetries(fs, HBCK_LOCK_PATH, defaultPerms); out.writeBytes(InetAddress.getLocalHost().toString()); out.flush(); return out; } catch(RemoteException e) { if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){ return null; } else { throw e; } } }
@Override public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions) throws AccessControlException, AlreadyBeingCreatedException, DSQuotaExceededException, FileAlreadyExistsException, FileNotFoundException, NSQuotaExceededException, ParentNotDirectoryException, SafeModeException, UnresolvedLinkException, SnapshotAccessControlException, IOException { try { AuthorizationProvider.beginClientOp(); return server.create(src, masked, clientName, flag, createParent, replication, blockSize, supportedVersions); } finally { AuthorizationProvider.endClientOp(); } }
/** * The idea for making sure that there is no more than one instance * running in an HDFS is to create a file in the HDFS, writes the hostname * of the machine on which the instance is running to the file, but did not * close the file until it exits. * * This prevents the second instance from running because it can not * creates the file while the first one is running. * * This method checks if there is any running instance. If no, mark yes. * Note that this is an atomic operation. * * @return null if there is a running instance; * otherwise, the output stream to the newly created file. */ private OutputStream checkAndMarkRunning() throws IOException { try { final FSDataOutputStream out = fs.create(idPath); if (write2IdFile) { out.writeBytes(InetAddress.getLocalHost().getHostName()); out.hflush(); } return out; } catch(RemoteException e) { if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){ return null; } else { throw e; } } }
static ClientProtocol createNamenode(ClientProtocol rpcNamenode, Configuration conf) throws IOException { long sleepTime = conf.getLong("dfs.client.rpc.retry.sleep", LEASE_SOFTLIMIT_PERIOD); RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( 5, sleepTime, TimeUnit.MILLISECONDS); Map<Class<? extends Exception>,RetryPolicy> remoteExceptionToPolicyMap = new HashMap<Class<? extends Exception>, RetryPolicy>(); remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy); Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap = new HashMap<Class<? extends Exception>, RetryPolicy>(); exceptionToPolicyMap.put(RemoteException.class, RetryPolicies.retryByRemoteException( RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap)); RetryPolicy methodPolicy = RetryPolicies.retryByException( RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>(); methodNameToPolicyMap.put("create", methodPolicy); return (ClientProtocol) RetryProxy.create(ClientProtocol.class, rpcNamenode, methodNameToPolicyMap); }
/** Create a {@link NameNode} proxy */ static DatanodeProtocolPB createNamenodeWithRetry( DatanodeProtocolPB rpcNamenode) { RetryPolicy createPolicy = RetryPolicies .retryUpToMaximumCountWithFixedSleep(5, HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS); Map<Class<? extends Exception>, RetryPolicy> remoteExceptionToPolicyMap = new HashMap<Class<? extends Exception>, RetryPolicy>(); remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy); Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = new HashMap<Class<? extends Exception>, RetryPolicy>(); exceptionToPolicyMap.put(RemoteException.class, RetryPolicies .retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap)); RetryPolicy methodPolicy = RetryPolicies.retryByException( RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>(); methodNameToPolicyMap.put("create", methodPolicy); return (DatanodeProtocolPB) RetryProxy.create(DatanodeProtocolPB.class, rpcNamenode, methodNameToPolicyMap); }
@Override public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize) throws AccessControlException, AlreadyBeingCreatedException, DSQuotaExceededException, FileAlreadyExistsException, FileNotFoundException, NSQuotaExceededException, ParentNotDirectoryException, SafeModeException, UnresolvedLinkException, IOException { CreateRequestProto req = CreateRequestProto.newBuilder() .setSrc(src) .setMasked(PBHelper.convert(masked)) .setClientName(clientName) .setCreateFlag(PBHelper.convertCreateFlag(flag)) .setCreateParent(createParent) .setReplication(replication) .setBlockSize(blockSize) .build(); try { CreateResponseProto res = rpcProxy.create(null, req); return res.hasFs() ? PBHelper.convert(res.getFs()) : null; } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
public void create(final String src, final FsPermission masked, final String clientName, final EnumSetWritable<CreateFlag> flag, final boolean createParent, final short replication, final long blockSize) throws AccessControlException, AlreadyBeingCreatedException, DSQuotaExceededException, FileAlreadyExistsException, FileNotFoundException, NSQuotaExceededException, ParentNotDirectoryException, SafeModeException, UnresolvedLinkException, IOException { ClientActionHandler handler = new ClientActionHandler() { @Override public Object doAction(ClientProtocol namenode) throws RemoteException, IOException { namenode .create(src, masked, clientName, flag, createParent, replication, blockSize); return null; } }; doClientActionWithRetry(handler, "create"); }
public void create(final String src, final FsPermission masked, final String clientName, final EnumSetWritable<CreateFlag> flag, final boolean createParent, final short replication, final long blockSize, final EncodingPolicy policy) throws AccessControlException, AlreadyBeingCreatedException, DSQuotaExceededException, FileAlreadyExistsException, FileNotFoundException, NSQuotaExceededException, ParentNotDirectoryException, SafeModeException, UnresolvedLinkException, IOException { ClientActionHandler handler = new ClientActionHandler() { @Override public Object doAction(ClientProtocol namenode) throws RemoteException, IOException { namenode .create(src, masked, clientName, flag, createParent, replication, blockSize, policy); return null; } }; doClientActionWithRetry(handler, "create"); }
@Override public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize, EncodingPolicy policy) throws AccessControlException, AlreadyBeingCreatedException, DSQuotaExceededException, FileAlreadyExistsException, FileNotFoundException, NSQuotaExceededException, ParentNotDirectoryException, SafeModeException, UnresolvedLinkException, IOException { CreateRequestProto.Builder builder = CreateRequestProto.newBuilder().setSrc(src) .setMasked(PBHelper.convert(masked)).setClientName(clientName) .setCreateFlag(PBHelper.convertCreateFlag(flag)) .setCreateParent(createParent).setReplication(replication) .setBlockSize(blockSize); if (policy != null) { builder.setPolicy(PBHelper.convert(policy)); } CreateRequestProto req = builder.build(); try { CreateResponseProto result = rpcProxy.create(null, req); return result.hasFs() ? PBHelper.convert(result.getFs()) : null; } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
private static ClientProtocol createNamenode(ClientProtocol rpcNamenode) throws IOException { RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( 5, LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS); Map<Class<? extends Exception>,RetryPolicy> remoteExceptionToPolicyMap = new HashMap<Class<? extends Exception>, RetryPolicy>(); remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy); Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap = new HashMap<Class<? extends Exception>, RetryPolicy>(); exceptionToPolicyMap.put(RemoteException.class, RetryPolicies.retryByRemoteException( RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap)); RetryPolicy methodPolicy = RetryPolicies.retryByException( RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>(); methodNameToPolicyMap.put("create", methodPolicy); return (ClientProtocol) RetryProxy.create(ClientProtocol.class, rpcNamenode, methodNameToPolicyMap); }
/** * Mocks FSNamesystem instance, adds an empty file, sets status of last two * blocks to COMMITTED and COMMITTED and invokes lease recovery * method. AlreadyBeingCreatedException is expected. * @throws AlreadyBeingCreatedException as the result */ @Test(expected=AlreadyBeingCreatedException.class) public void testInternalReleaseLease_COMM_COMM () throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } LeaseManager.Lease lm = mock(LeaseManager.Lease.class); Path file = spy(new Path("/" + GenericTestUtils.getMethodName() + "_test.dat")); DatanodeDescriptor dnd = mock(DatanodeDescriptor.class); PermissionStatus ps = new PermissionStatus("test", "test", new FsPermission((short)0777)); mockFileBlocks(2, HdfsConstants.BlockUCState.COMMITTED, HdfsConstants.BlockUCState.COMMITTED, file, dnd, ps, false); fsn.internalReleaseLease(lm, file.toString(), null); assertTrue("FSNamesystem.internalReleaseLease suppose to throw " + "AlreadyBeingCreatedException here", false); }
/** * Mocks FSNamesystem instance, adds an empty file with 1 block * and invokes lease recovery method. * AlreadyBeingCreatedException is expected. * @throws AlreadyBeingCreatedException as the result */ @Test(expected=AlreadyBeingCreatedException.class) public void testInternalReleaseLease_1blocks () throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } LeaseManager.Lease lm = mock(LeaseManager.Lease.class); Path file = spy(new Path("/" + GenericTestUtils.getMethodName() + "_test.dat")); DatanodeDescriptor dnd = mock(DatanodeDescriptor.class); PermissionStatus ps = new PermissionStatus("test", "test", new FsPermission((short)0777)); mockFileBlocks(1, null, HdfsConstants.BlockUCState.COMMITTED, file, dnd, ps, false); fsn.internalReleaseLease(lm, file.toString(), null); assertTrue("FSNamesystem.internalReleaseLease suppose to throw " + "AlreadyBeingCreatedException here", false); }
private static ClientProtocol createNamenode(ClientProtocol rpcNamenode, Configuration conf) throws IOException { long sleepTime = conf.getLong("dfs.client.rpc.retry.sleep", LEASE_SOFTLIMIT_PERIOD); RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( 5, sleepTime, TimeUnit.MILLISECONDS); Map<Class<? extends Exception>,RetryPolicy> remoteExceptionToPolicyMap = new HashMap<Class<? extends Exception>, RetryPolicy>(); remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy); Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap = new HashMap<Class<? extends Exception>, RetryPolicy>(); exceptionToPolicyMap.put(RemoteException.class, RetryPolicies.retryByRemoteException( RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap)); RetryPolicy methodPolicy = RetryPolicies.retryByException( RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>(); methodNameToPolicyMap.put("create", methodPolicy); return (ClientProtocol) RetryProxy.create(ClientProtocol.class, rpcNamenode, methodNameToPolicyMap); }
/** * Takes ownership of the lock file if possible. * @param lockFile * @param lastEntry last entry in the lock file. this param is an optimization. * we dont scan the lock file again to find its last entry here since * its already been done once by the logic used to check if the lock * file is stale. so this value comes from that earlier scan. * @param spoutId spout id * @throws IOException if unable to acquire * @return null if lock File is not recoverable */ public static FileLock takeOwnership(FileSystem fs, Path lockFile, LogEntry lastEntry, String spoutId) throws IOException { try { if(fs instanceof DistributedFileSystem ) { if( !((DistributedFileSystem) fs).recoverLease(lockFile) ) { LOG.warn("Unable to recover lease on lock file {} right now. Cannot transfer ownership. Will need to try later. Spout = {}", lockFile, spoutId); return null; } } return new FileLock(fs, lockFile, spoutId, lastEntry); } catch (IOException e) { if (e instanceof RemoteException && ((RemoteException) e).unwrapRemoteException() instanceof AlreadyBeingCreatedException) { LOG.warn("Lock file " + lockFile + "is currently open. Cannot transfer ownership now. Will need to try later. Spout= " + spoutId, e); return null; } else { // unexpected error LOG.warn("Cannot transfer ownership now for lock file " + lockFile + ". Will need to try later. Spout =" + spoutId, e); throw e; } } }
@Override public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions) throws AccessControlException, AlreadyBeingCreatedException, DSQuotaExceededException, FileAlreadyExistsException, FileNotFoundException, NSQuotaExceededException, ParentNotDirectoryException, SafeModeException, UnresolvedLinkException, IOException { CreateRequestProto.Builder builder = CreateRequestProto.newBuilder() .setSrc(src) .setMasked(PBHelper.convert(masked)) .setClientName(clientName) .setCreateFlag(PBHelper.convertCreateFlag(flag)) .setCreateParent(createParent) .setReplication(replication) .setBlockSize(blockSize); builder.addAllCryptoProtocolVersion(PBHelper.convert(supportedVersions)); CreateRequestProto req = builder.build(); try { CreateResponseProto res = rpcProxy.create(null, req); return res.hasFs() ? PBHelper.convert(res.getFs()) : null; } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
/** Test two consecutive appends on a file with a full block. */ @Test public void testAppendTwice() throws Exception { Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); final FileSystem fs1 = cluster.getFileSystem(); final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf); try { final Path p = new Path("/testAppendTwice/foo"); final int len = 1 << 16; final byte[] fileContents = AppendTestUtil.initBuffer(len); { // create a new file with a full block. FSDataOutputStream out = fs2.create(p, true, 4096, (short)1, len); out.write(fileContents, 0, len); out.close(); } //1st append does not add any data so that the last block remains full //and the last block in INodeFileUnderConstruction is a BlockInfo //but not BlockInfoUnderConstruction. fs2.append(p); //2nd append should get AlreadyBeingCreatedException fs1.append(p); Assert.fail(); } catch(RemoteException re) { AppendTestUtil.LOG.info("Got an exception:", re); Assert.assertEquals(AlreadyBeingCreatedException.class.getName(), re.getClassName()); } finally { fs2.close(); fs1.close(); cluster.shutdown(); } }
/** Test two consecutive appends on a file with a full block. */ @Test public void testAppend2Twice() throws Exception { Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); final DistributedFileSystem fs1 = cluster.getFileSystem(); final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf); try { final Path p = new Path("/testAppendTwice/foo"); final int len = 1 << 16; final byte[] fileContents = AppendTestUtil.initBuffer(len); { // create a new file with a full block. FSDataOutputStream out = fs2.create(p, true, 4096, (short)1, len); out.write(fileContents, 0, len); out.close(); } //1st append does not add any data so that the last block remains full //and the last block in INodeFileUnderConstruction is a BlockInfo //but not BlockInfoUnderConstruction. ((DistributedFileSystem) fs2).append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null); // 2nd append should get AlreadyBeingCreatedException fs1.append(p); Assert.fail(); } catch(RemoteException re) { AppendTestUtil.LOG.info("Got an exception:", re); Assert.assertEquals(AlreadyBeingCreatedException.class.getName(), re.getClassName()); } finally { fs2.close(); fs1.close(); cluster.shutdown(); } }
/** Test two consecutive appends on a file with a full block. */ @Test public void testAppendTwice() throws Exception { Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); final FileSystem fs1 = cluster.getFileSystem(); final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf); try { final Path p = new Path("/testAppendTwice/foo"); final int len = 1 << 16; final byte[] fileContents = AppendTestUtil.initBuffer(len); { // create a new file with a full block. FSDataOutputStream out = fs2.create(p, true, 4096, (short)1, len); out.write(fileContents, 0, len); out.close(); } //1st append does not add any data so that the last block remains full //and the last block in INodeFileUnderConstruction is a BlockInfo //but does not have a BlockUnderConstructionFeature. fs2.append(p); //2nd append should get AlreadyBeingCreatedException fs1.append(p); Assert.fail(); } catch(RemoteException re) { AppendTestUtil.LOG.info("Got an exception:", re); Assert.assertEquals(AlreadyBeingCreatedException.class.getName(), re.getClassName()); } finally { fs2.close(); fs1.close(); cluster.shutdown(); } }
/** Test two consecutive appends on a file with a full block. */ @Test public void testAppend2Twice() throws Exception { Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); final DistributedFileSystem fs1 = cluster.getFileSystem(); final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf); try { final Path p = new Path("/testAppendTwice/foo"); final int len = 1 << 16; final byte[] fileContents = AppendTestUtil.initBuffer(len); { // create a new file with a full block. FSDataOutputStream out = fs2.create(p, true, 4096, (short)1, len); out.write(fileContents, 0, len); out.close(); } //1st append does not add any data so that the last block remains full //and the last block in INodeFileUnderConstruction is a BlockInfo //but does not have a BlockUnderConstructionFeature. ((DistributedFileSystem) fs2).append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null); // 2nd append should get AlreadyBeingCreatedException fs1.append(p); Assert.fail(); } catch(RemoteException re) { AppendTestUtil.LOG.info("Got an exception:", re); Assert.assertEquals(AlreadyBeingCreatedException.class.getName(), re.getClassName()); } finally { fs2.close(); fs1.close(); cluster.shutdown(); } }
private void recoverLeaseUsingCreate(Path filepath) throws IOException { Configuration conf2 = new Configuration(conf); String username = UserGroupInformation.getCurrentUGI().getUserName()+"_1"; UnixUserGroupInformation.saveToConf(conf2, UnixUserGroupInformation.UGI_PROPERTY_NAME, new UnixUserGroupInformation(username, new String[]{"supergroup"})); FileSystem dfs2 = FileSystem.get(conf2); boolean done = false; for(int i = 0; i < 10 && !done; i++) { AppendTestUtil.LOG.info("i=" + i); try { dfs2.create(filepath, false, bufferSize, (short)1, BLOCK_SIZE); fail("Creation of an existing file should never succeed."); } catch (IOException ioe) { final String message = ioe.getMessage(); if (message.contains("file exists")) { AppendTestUtil.LOG.info("done", ioe); done = true; } else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) { AppendTestUtil.LOG.info("GOOD! got " + message); } else { AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe); } } if (!done) { AppendTestUtil.LOG.info("sleep " + 5000 + "ms"); try {Thread.sleep(5000);} catch (InterruptedException e) {} } } assertTrue(done); }
private OutputStream checkAndMarkRunningBalancer() throws IOException { try { DataOutputStream out = fs.create(BALANCER_ID_PATH); out. writeBytes(InetAddress.getLocalHost().getHostName()); out.flush(); return out; } catch(RemoteException e) { if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){ return null; } else { throw e; } } }
private OutputStream checkAndMarkRunningBalancer() throws IOException { try { final DataOutputStream out = fs.create(BALANCER_ID_PATH); out.writeBytes(InetAddress.getLocalHost().getHostName()); out.flush(); return out; } catch(RemoteException e) { if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){ return null; } else { throw e; } } }