@Test public void testCheckCommitAixCompatMode() throws IOException { DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); NfsConfiguration conf = new NfsConfiguration(); conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false); // Enable AIX compatibility mode. OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(new NfsConfiguration()), true, conf); // Test fall-through to pendingWrites check in the event that commitOffset // is greater than the number of bytes we've so far flushed. Mockito.when(fos.getPos()).thenReturn((long) 2); COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_FINISHED); // Test the case when we actually have received more bytes than we're trying // to commit. ctx.getPendingWritesForTest().put(new OffsetRange(0, 10), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); Mockito.when(fos.getPos()).thenReturn((long) 10); ctx.setNextOffsetForTest((long)10); status = ctx.checkCommitInternal(5, null, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); }
@Test public void testCheckSequential() throws IOException { DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); Mockito.when(fos.getPos()).thenReturn((long) 0); NfsConfiguration config = new NfsConfiguration(); config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false); OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(config), false, config); ctx.getPendingWritesForTest().put(new OffsetRange(5, 10), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ctx.getPendingWritesForTest().put(new OffsetRange(10, 15), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ctx.getPendingWritesForTest().put(new OffsetRange(20, 25), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); assertTrue(!ctx.checkSequential(5, 4)); assertTrue(ctx.checkSequential(9, 5)); assertTrue(ctx.checkSequential(10, 5)); assertTrue(ctx.checkSequential(14, 5)); assertTrue(!ctx.checkSequential(15, 5)); assertTrue(!ctx.checkSequential(20, 5)); assertTrue(!ctx.checkSequential(25, 5)); assertTrue(!ctx.checkSequential(999, 5)); }
public RpcProgramNfs3(NfsConfiguration config, DatagramSocket registrationSocket, boolean allowInsecurePorts) throws IOException { super("NFS3", "localhost", config.getInt( NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY, NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT), Nfs3Constant.PROGRAM, Nfs3Constant.VERSION, Nfs3Constant.VERSION, registrationSocket, allowInsecurePorts); this.config = config; config.set(FsPermission.UMASK_LABEL, "000"); iug = new ShellBasedIdMapping(config); aixCompatMode = config.getBoolean( NfsConfigKeys.AIX_COMPAT_MODE_KEY, NfsConfigKeys.AIX_COMPAT_MODE_DEFAULT); exports = NfsExports.getInstance(config); writeManager = new WriteManager(iug, config, aixCompatMode); clientCache = new DFSClientCache(config); replication = (short) config.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, DFSConfigKeys.DFS_REPLICATION_DEFAULT); blockSize = config.getLongBytes(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT); bufferSize = config.getInt( CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); writeDumpDir = config.get(NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_KEY, NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_DEFAULT); boolean enableDump = config.getBoolean(NfsConfigKeys.DFS_NFS_FILE_DUMP_KEY, NfsConfigKeys.DFS_NFS_FILE_DUMP_DEFAULT); UserGroupInformation.setConfiguration(config); SecurityUtil.login(config, NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY, NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY); superuser = config.get(NfsConfigKeys.NFS_SUPERUSER_KEY, NfsConfigKeys.NFS_SUPERUSER_DEFAULT); LOG.info("Configured HDFS superuser is " + superuser); if (!enableDump) { writeDumpDir = null; } else { clearDirectory(writeDumpDir); } rpcCallCache = new RpcCallCache("NFS3", 256); infoServer = new Nfs3HttpServer(config); }
@Test public void testEviction() throws IOException, InterruptedException { NfsConfiguration conf = new NfsConfiguration(); // Only two entries will be in the cache conf.setInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, 2); DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); Mockito.when(fos.getPos()).thenReturn((long) 0); OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(new NfsConfiguration())); OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(new NfsConfiguration())); OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(new NfsConfiguration())); OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(new NfsConfiguration())); OpenFileCtx context5 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(new NfsConfiguration())); OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100); boolean ret = cache.put(new FileHandle(1), context1); assertTrue(ret); Thread.sleep(1000); ret = cache.put(new FileHandle(2), context2); assertTrue(ret); ret = cache.put(new FileHandle(3), context3); assertFalse(ret); assertTrue(cache.size() == 2); // Wait for the oldest stream to be evict-able, insert again Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT); assertTrue(cache.size() == 2); ret = cache.put(new FileHandle(3), context3); assertTrue(ret); assertTrue(cache.size() == 2); assertTrue(cache.get(new FileHandle(1)) == null); // Test inactive entry is evicted immediately context3.setActiveStatusForTest(false); ret = cache.put(new FileHandle(4), context4); assertTrue(ret); // Now the cache has context2 and context4 // Test eviction failure if all entries have pending work. context2.getPendingWritesForTest().put(new OffsetRange(0, 100), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); context4.getPendingCommitsForTest().put(new Long(100), new CommitCtx(0, null, 0, attr)); Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT); ret = cache.put(new FileHandle(5), context5); assertFalse(ret); }
@Test public void testScan() throws IOException, InterruptedException { NfsConfiguration conf = new NfsConfiguration(); // Only two entries will be in the cache conf.setInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, 2); DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); Mockito.when(fos.getPos()).thenReturn((long) 0); OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(new NfsConfiguration())); OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(new NfsConfiguration())); OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(new NfsConfiguration())); OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(new NfsConfiguration())); OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100); // Test cleaning expired entry boolean ret = cache.put(new FileHandle(1), context1); assertTrue(ret); ret = cache.put(new FileHandle(2), context2); assertTrue(ret); Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT + 1); cache.scan(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT); assertTrue(cache.size() == 0); // Test cleaning inactive entry ret = cache.put(new FileHandle(3), context3); assertTrue(ret); ret = cache.put(new FileHandle(4), context4); assertTrue(ret); context3.setActiveStatusForTest(false); cache.scan(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_DEFAULT); assertTrue(cache.size() == 1); assertTrue(cache.get(new FileHandle(3)) == null); assertTrue(cache.get(new FileHandle(4)) != null); }
@Test // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which // includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX, // COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC. public void testCheckCommit() throws IOException { DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); Mockito.when(fos.getPos()).thenReturn((long) 0); NfsConfiguration conf = new NfsConfiguration(); conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false); OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(conf), false, conf); COMMIT_STATUS ret; // Test inactive open file context ctx.setActiveStatusForTest(false); Channel ch = Mockito.mock(Channel.class); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX); ctx.getPendingWritesForTest().put(new OffsetRange(5, 10), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE); // Test request with non zero commit offset ctx.setActiveStatusForTest(true); Mockito.when(fos.getPos()).thenReturn((long) 10); ctx.setNextOffsetForTest(10); COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); // Do_SYNC state will be updated to FINISHED after data sync ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); status = ctx.checkCommitInternal(10, ch, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); ConcurrentNavigableMap<Long, CommitCtx> commits = ctx .getPendingCommitsForTest(); Assert.assertTrue(commits.size() == 0); ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT); Assert.assertTrue(commits.size() == 1); long key = commits.firstKey(); Assert.assertTrue(key == 11); // Test request with zero commit offset commits.remove(new Long(11)); // There is one pending write [5,10] ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT); Assert.assertTrue(commits.size() == 1); key = commits.firstKey(); Assert.assertTrue(key == 9); // Empty pending writes ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); }
@Test // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with // large file upload option. public void testCheckCommitLargeFileUpload() throws IOException { DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); Mockito.when(fos.getPos()).thenReturn((long) 0); NfsConfiguration conf = new NfsConfiguration(); conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true); OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(conf), false, conf); COMMIT_STATUS ret; // Test inactive open file context ctx.setActiveStatusForTest(false); Channel ch = Mockito.mock(Channel.class); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX); ctx.getPendingWritesForTest().put(new OffsetRange(10, 15), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE); // Test request with non zero commit offset ctx.setActiveStatusForTest(true); Mockito.when(fos.getPos()).thenReturn((long) 8); ctx.setNextOffsetForTest(10); COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); // Do_SYNC state will be updated to FINISHED after data sync ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); // Test commit sequential writes status = ctx.checkCommitInternal(10, ch, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT); ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT); // Test commit non-sequential writes ConcurrentNavigableMap<Long, CommitCtx> commits = ctx .getPendingCommitsForTest(); Assert.assertTrue(commits.size() == 1); ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS); Assert.assertTrue(commits.size() == 1); // Test request with zero commit offset commits.remove(new Long(10)); // There is one pending write [10,15] ret = ctx.checkCommitInternal(0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT); ret = ctx.checkCommitInternal(9, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT); Assert.assertTrue(commits.size() == 2); // Empty pending writes. nextOffset=10, flushed pos=8 ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT); // Empty pending writes ctx.setNextOffsetForTest((long) 8); // flushed pos = 8 ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); }
@Test // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which // includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX, // COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC. public void testCheckCommitFromRead() throws IOException { DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); Mockito.when(fos.getPos()).thenReturn((long) 0); NfsConfiguration config = new NfsConfiguration(); config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false); OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(config), false, config); FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath" COMMIT_STATUS ret; WriteManager wm = new WriteManager(new ShellBasedIdMapping(config), config, false); assertTrue(wm.addOpenFileStream(h, ctx)); // Test inactive open file context ctx.setActiveStatusForTest(false); Channel ch = Mockito.mock(Channel.class); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0)); ctx.getPendingWritesForTest().put(new OffsetRange(10, 15), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret); assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0)); // Test request with non zero commit offset ctx.setActiveStatusForTest(true); Mockito.when(fos.getPos()).thenReturn((long) 10); ctx.setNextOffsetForTest((long)10); COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false); assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status); // Do_SYNC state will be updated to FINISHED after data sync ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5)); status = ctx.checkCommitInternal(10, ch, 1, attr, true); assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10)); ConcurrentNavigableMap<Long, CommitCtx> commits = ctx .getPendingCommitsForTest(); assertTrue(commits.size() == 0); ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret); assertEquals(0, commits.size()); // commit triggered by read doesn't wait assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 11)); // Test request with zero commit offset // There is one pending write [5,10] ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret); assertEquals(0, commits.size()); assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0)); // Empty pending writes ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0)); }
@Test // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with large file upload option public void testCheckCommitFromReadLargeFileUpload() throws IOException { DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); Mockito.when(fos.getPos()).thenReturn((long) 0); NfsConfiguration config = new NfsConfiguration(); config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true); OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(config), false, config); FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath" COMMIT_STATUS ret; WriteManager wm = new WriteManager(new ShellBasedIdMapping(config), config, false); assertTrue(wm.addOpenFileStream(h, ctx)); // Test inactive open file context ctx.setActiveStatusForTest(false); Channel ch = Mockito.mock(Channel.class); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0)); ctx.getPendingWritesForTest().put(new OffsetRange(10, 15), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret); assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0)); // Test request with non zero commit offset ctx.setActiveStatusForTest(true); Mockito.when(fos.getPos()).thenReturn((long) 6); ctx.setNextOffsetForTest((long)10); COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false); assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status); // Do_SYNC state will be updated to FINISHED after data sync ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5)); // Test request with sequential writes status = ctx.checkCommitInternal(9, ch, 1, attr, true); assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT); ret = ctx.checkCommit(dfsClient, 9, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret); assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 9)); // Test request with non-sequential writes ConcurrentNavigableMap<Long, CommitCtx> commits = ctx .getPendingCommitsForTest(); assertTrue(commits.size() == 0); ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS, ret); assertEquals(0, commits.size()); // commit triggered by read doesn't wait assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 16)); // Test request with zero commit offset // There is one pending write [10,15] ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret); assertEquals(0, commits.size()); assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0)); // Empty pending writes ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret); assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0)); }
public RpcProgramNfs3(NfsConfiguration config, DatagramSocket registrationSocket, boolean allowInsecurePorts) throws IOException { super("NFS3", "localhost", config.getInt( NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY, NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT), Nfs3Constant.PROGRAM, Nfs3Constant.VERSION, Nfs3Constant.VERSION, registrationSocket, allowInsecurePorts); this.config = config; config.set(FsPermission.UMASK_LABEL, "000"); iug = new ShellBasedIdMapping(config); aixCompatMode = config.getBoolean( NfsConfigKeys.AIX_COMPAT_MODE_KEY, NfsConfigKeys.AIX_COMPAT_MODE_DEFAULT); exports = NfsExports.getInstance(config); writeManager = new WriteManager(iug, config, aixCompatMode); clientCache = new DFSClientCache(config); replication = (short) config.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, DFSConfigKeys.DFS_REPLICATION_DEFAULT); blockSize = config.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT); bufferSize = config.getInt( CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); writeDumpDir = config.get(NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_KEY, NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_DEFAULT); boolean enableDump = config.getBoolean(NfsConfigKeys.DFS_NFS_FILE_DUMP_KEY, NfsConfigKeys.DFS_NFS_FILE_DUMP_DEFAULT); UserGroupInformation.setConfiguration(config); SecurityUtil.login(config, NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY, NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY); superuser = config.get(NfsConfigKeys.NFS_SUPERUSER_KEY, NfsConfigKeys.NFS_SUPERUSER_DEFAULT); LOG.info("Configured HDFS superuser is " + superuser); if (!enableDump) { writeDumpDir = null; } else { clearDirectory(writeDumpDir); } rpcCallCache = new RpcCallCache("NFS3", 256); infoServer = new Nfs3HttpServer(config); }