@Test public void testEviction() throws IOException, InterruptedException { NfsConfiguration conf = new NfsConfiguration(); // Only two entries will be in the cache conf.setInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, 2); DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); Mockito.when(fos.getPos()).thenReturn((long) 0); OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(new NfsConfiguration())); OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(new NfsConfiguration())); OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(new NfsConfiguration())); OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(new NfsConfiguration())); OpenFileCtx context5 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(new NfsConfiguration())); OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100); boolean ret = cache.put(new FileHandle(1), context1); assertTrue(ret); Thread.sleep(1000); ret = cache.put(new FileHandle(2), context2); assertTrue(ret); ret = cache.put(new FileHandle(3), context3); assertFalse(ret); assertTrue(cache.size() == 2); // Wait for the oldest stream to be evict-able, insert again Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT); assertTrue(cache.size() == 2); ret = cache.put(new FileHandle(3), context3); assertTrue(ret); assertTrue(cache.size() == 2); assertTrue(cache.get(new FileHandle(1)) == null); // Test inactive entry is evicted immediately context3.setActiveStatusForTest(false); ret = cache.put(new FileHandle(4), context4); assertTrue(ret); // Now the cache has context2 and context4 // Test eviction failure if all entries have pending work. context2.getPendingWritesForTest().put(new OffsetRange(0, 100), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); context4.getPendingCommitsForTest().put(new Long(100), new CommitCtx(0, null, 0, attr)); Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT); ret = cache.put(new FileHandle(5), context5); assertFalse(ret); }
@Test // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which // includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX, // COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC. public void testCheckCommit() throws IOException { DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); Mockito.when(fos.getPos()).thenReturn((long) 0); NfsConfiguration conf = new NfsConfiguration(); conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false); OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(conf), false, conf); COMMIT_STATUS ret; // Test inactive open file context ctx.setActiveStatusForTest(false); Channel ch = Mockito.mock(Channel.class); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX); ctx.getPendingWritesForTest().put(new OffsetRange(5, 10), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE); // Test request with non zero commit offset ctx.setActiveStatusForTest(true); Mockito.when(fos.getPos()).thenReturn((long) 10); ctx.setNextOffsetForTest(10); COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); // Do_SYNC state will be updated to FINISHED after data sync ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); status = ctx.checkCommitInternal(10, ch, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); ConcurrentNavigableMap<Long, CommitCtx> commits = ctx .getPendingCommitsForTest(); Assert.assertTrue(commits.size() == 0); ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT); Assert.assertTrue(commits.size() == 1); long key = commits.firstKey(); Assert.assertTrue(key == 11); // Test request with zero commit offset commits.remove(new Long(11)); // There is one pending write [5,10] ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT); Assert.assertTrue(commits.size() == 1); key = commits.firstKey(); Assert.assertTrue(key == 9); // Empty pending writes ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); }
@Test // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with // large file upload option. public void testCheckCommitLargeFileUpload() throws IOException { DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); Mockito.when(fos.getPos()).thenReturn((long) 0); NfsConfiguration conf = new NfsConfiguration(); conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true); OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(conf), false, conf); COMMIT_STATUS ret; // Test inactive open file context ctx.setActiveStatusForTest(false); Channel ch = Mockito.mock(Channel.class); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX); ctx.getPendingWritesForTest().put(new OffsetRange(10, 15), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE); // Test request with non zero commit offset ctx.setActiveStatusForTest(true); Mockito.when(fos.getPos()).thenReturn((long) 8); ctx.setNextOffsetForTest(10); COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); // Do_SYNC state will be updated to FINISHED after data sync ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); // Test commit sequential writes status = ctx.checkCommitInternal(10, ch, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT); ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT); // Test commit non-sequential writes ConcurrentNavigableMap<Long, CommitCtx> commits = ctx .getPendingCommitsForTest(); Assert.assertTrue(commits.size() == 1); ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS); Assert.assertTrue(commits.size() == 1); // Test request with zero commit offset commits.remove(new Long(10)); // There is one pending write [10,15] ret = ctx.checkCommitInternal(0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT); ret = ctx.checkCommitInternal(9, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT); Assert.assertTrue(commits.size() == 2); // Empty pending writes. nextOffset=10, flushed pos=8 ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT); // Empty pending writes ctx.setNextOffsetForTest((long) 8); // flushed pos = 8 ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); }
@Test // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which // includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX, // COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC. public void testCheckCommitFromRead() throws IOException { DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); Mockito.when(fos.getPos()).thenReturn((long) 0); NfsConfiguration config = new NfsConfiguration(); config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false); OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(config), false, config); FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath" COMMIT_STATUS ret; WriteManager wm = new WriteManager(new ShellBasedIdMapping(config), config, false); assertTrue(wm.addOpenFileStream(h, ctx)); // Test inactive open file context ctx.setActiveStatusForTest(false); Channel ch = Mockito.mock(Channel.class); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0)); ctx.getPendingWritesForTest().put(new OffsetRange(10, 15), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret); assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0)); // Test request with non zero commit offset ctx.setActiveStatusForTest(true); Mockito.when(fos.getPos()).thenReturn((long) 10); ctx.setNextOffsetForTest((long)10); COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false); assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status); // Do_SYNC state will be updated to FINISHED after data sync ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5)); status = ctx.checkCommitInternal(10, ch, 1, attr, true); assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10)); ConcurrentNavigableMap<Long, CommitCtx> commits = ctx .getPendingCommitsForTest(); assertTrue(commits.size() == 0); ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret); assertEquals(0, commits.size()); // commit triggered by read doesn't wait assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 11)); // Test request with zero commit offset // There is one pending write [5,10] ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret); assertEquals(0, commits.size()); assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0)); // Empty pending writes ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0)); }
@Test // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with large file upload option public void testCheckCommitFromReadLargeFileUpload() throws IOException { DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); Mockito.when(fos.getPos()).thenReturn((long) 0); NfsConfiguration config = new NfsConfiguration(); config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true); OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(config), false, config); FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath" COMMIT_STATUS ret; WriteManager wm = new WriteManager(new ShellBasedIdMapping(config), config, false); assertTrue(wm.addOpenFileStream(h, ctx)); // Test inactive open file context ctx.setActiveStatusForTest(false); Channel ch = Mockito.mock(Channel.class); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0)); ctx.getPendingWritesForTest().put(new OffsetRange(10, 15), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret); assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0)); // Test request with non zero commit offset ctx.setActiveStatusForTest(true); Mockito.when(fos.getPos()).thenReturn((long) 6); ctx.setNextOffsetForTest((long)10); COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false); assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status); // Do_SYNC state will be updated to FINISHED after data sync ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5)); // Test request with sequential writes status = ctx.checkCommitInternal(9, ch, 1, attr, true); assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT); ret = ctx.checkCommit(dfsClient, 9, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret); assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 9)); // Test request with non-sequential writes ConcurrentNavigableMap<Long, CommitCtx> commits = ctx .getPendingCommitsForTest(); assertTrue(commits.size() == 0); ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS, ret); assertEquals(0, commits.size()); // commit triggered by read doesn't wait assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 16)); // Test request with zero commit offset // There is one pending write [10,15] ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret); assertEquals(0, commits.size()); assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0)); // Empty pending writes ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret); assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0)); }
@Test public void testEviction() throws IOException, InterruptedException { Configuration conf = new Configuration(); // Only two entries will be in the cache conf.setInt(Nfs3Constant.MAX_OPEN_FILES, 2); DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); Mockito.when(fos.getPos()).thenReturn((long) 0); OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new IdUserGroup()); OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new IdUserGroup()); OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new IdUserGroup()); OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new IdUserGroup()); OpenFileCtx context5 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new IdUserGroup()); OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100); boolean ret = cache.put(new FileHandle(1), context1); assertTrue(ret); Thread.sleep(1000); ret = cache.put(new FileHandle(2), context2); assertTrue(ret); ret = cache.put(new FileHandle(3), context3); assertFalse(ret); assertTrue(cache.size() == 2); // Wait for the oldest stream to be evict-able, insert again Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT); assertTrue(cache.size() == 2); ret = cache.put(new FileHandle(3), context3); assertTrue(ret); assertTrue(cache.size() == 2); assertTrue(cache.get(new FileHandle(1)) == null); // Test inactive entry is evicted immediately context3.setActiveStatusForTest(false); ret = cache.put(new FileHandle(4), context4); assertTrue(ret); // Now the cache has context2 and context4 // Test eviction failure if all entries have pending work. context2.getPendingWritesForTest().put(new OffsetRange(0, 100), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); context4.getPendingCommitsForTest() .put(new Long(100), new CommitCtx(0, null, 0, attr)); Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT); ret = cache.put(new FileHandle(5), context5); assertFalse(ret); }
@Test // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which // includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX, // COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC. public void testCheckCommit() throws IOException { DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); Mockito.when(fos.getPos()).thenReturn((long) 0); OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new IdUserGroup()); COMMIT_STATUS ret; // Test inactive open file context ctx.setActiveStatusForTest(false); Channel ch = Mockito.mock(Channel.class); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX); ctx.getPendingWritesForTest().put(new OffsetRange(5, 10), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE); // Test request with non zero commit offset ctx.setActiveStatusForTest(true); Mockito.when(fos.getPos()).thenReturn((long) 10); COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); // Do_SYNC state will be updated to FINISHED after data sync ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); status = ctx.checkCommitInternal(10, ch, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); ConcurrentNavigableMap<Long, CommitCtx> commits = ctx.getPendingCommitsForTest(); Assert.assertTrue(commits.size() == 0); ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT); Assert.assertTrue(commits.size() == 1); long key = commits.firstKey(); Assert.assertTrue(key == 11); // Test request with zero commit offset commits.remove(new Long(11)); // There is one pending write [5,10] ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT); Assert.assertTrue(commits.size() == 1); key = commits.firstKey(); Assert.assertTrue(key == 9); // Empty pending writes ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); }
@Test // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which // includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX, // COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC. public void testCheckCommitFromRead() throws IOException { DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); Mockito.when(fos.getPos()).thenReturn((long) 0); OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new IdUserGroup()); FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath" COMMIT_STATUS ret; WriteManager wm = new WriteManager(new IdUserGroup(), new Configuration()); assertTrue(wm.addOpenFileStream(h, ctx)); // Test inactive open file context ctx.setActiveStatusForTest(false); Channel ch = Mockito.mock(Channel.class); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0)); ctx.getPendingWritesForTest().put(new OffsetRange(5, 10), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret); assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0)); // Test request with non zero commit offset ctx.setActiveStatusForTest(true); Mockito.when(fos.getPos()).thenReturn((long) 10); COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false); assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status); // Do_SYNC state will be updated to FINISHED after data sync ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5)); status = ctx.checkCommitInternal(10, ch, 1, attr, true); assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10)); ConcurrentNavigableMap<Long, CommitCtx> commits = ctx.getPendingCommitsForTest(); assertTrue(commits.size() == 0); ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret); assertEquals(0, commits.size()); // commit triggered by read doesn't wait assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 11)); // Test request with zero commit offset // There is one pending write [5,10] ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret); assertEquals(0, commits.size()); assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0)); // Empty pending writes ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0)); }
@Test public void testEviction() throws IOException, InterruptedException { Configuration conf = new Configuration(); // Only two entries will be in the cache conf.setInt(Nfs3Constant.MAX_OPEN_FILES, 2); DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); Mockito.when(fos.getPos()).thenReturn((long) 0); OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new IdUserGroup()); OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new IdUserGroup()); OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new IdUserGroup()); OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new IdUserGroup()); OpenFileCtx context5 = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new IdUserGroup()); OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100); boolean ret = cache.put(new FileHandle(1), context1); assertTrue(ret); Thread.sleep(1000); ret = cache.put(new FileHandle(2), context2); assertTrue(ret); ret = cache.put(new FileHandle(3), context3); assertFalse(ret); assertTrue(cache.size() == 2); // Wait for the oldest stream to be evict-able, insert again Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT); assertTrue(cache.size() == 2); ret = cache.put(new FileHandle(3), context3); assertTrue(ret); assertTrue(cache.size() == 2); assertTrue(cache.get(new FileHandle(1)) == null); // Test inactive entry is evicted immediately context3.setActiveStatusForTest(false); ret = cache.put(new FileHandle(4), context4); assertTrue(ret); // Now the cache has context2 and context4 // Test eviction failure if all entries have pending work. context2.getPendingWritesForTest().put(new OffsetRange(0, 100), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); context4.getPendingCommitsForTest().put(new Long(100), new CommitCtx(0, null, 0, attr)); Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT); ret = cache.put(new FileHandle(5), context5); assertFalse(ret); }
@Test // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which // includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX, // COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC. public void testCheckCommit() throws IOException { DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); Mockito.when(fos.getPos()).thenReturn((long) 0); OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new IdUserGroup()); COMMIT_STATUS ret; // Test inactive open file context ctx.setActiveStatusForTest(false); Channel ch = Mockito.mock(Channel.class); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX); ctx.getPendingWritesForTest().put(new OffsetRange(5, 10), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE); // Test request with non zero commit offset ctx.setActiveStatusForTest(true); Mockito.when(fos.getPos()).thenReturn((long) 10); COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); // Do_SYNC state will be updated to FINISHED after data sync ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); status = ctx.checkCommitInternal(10, ch, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); ConcurrentNavigableMap<Long, CommitCtx> commits = ctx .getPendingCommitsForTest(); Assert.assertTrue(commits.size() == 0); ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT); Assert.assertTrue(commits.size() == 1); long key = commits.firstKey(); Assert.assertTrue(key == 11); // Test request with zero commit offset commits.remove(new Long(11)); // There is one pending write [5,10] ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT); Assert.assertTrue(commits.size() == 1); key = commits.firstKey(); Assert.assertTrue(key == 9); // Empty pending writes ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); }
@Test // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which // includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX, // COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC. public void testCheckCommitFromRead() throws IOException { DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); Mockito.when(fos.getPos()).thenReturn((long) 0); OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new IdUserGroup()); FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath" COMMIT_STATUS ret; WriteManager wm = new WriteManager(new IdUserGroup(), new Configuration()); assertTrue(wm.addOpenFileStream(h, ctx)); // Test inactive open file context ctx.setActiveStatusForTest(false); Channel ch = Mockito.mock(Channel.class); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0)); ctx.getPendingWritesForTest().put(new OffsetRange(5, 10), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret); assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0)); // Test request with non zero commit offset ctx.setActiveStatusForTest(true); Mockito.when(fos.getPos()).thenReturn((long) 10); COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false); assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status); // Do_SYNC state will be updated to FINISHED after data sync ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5)); status = ctx.checkCommitInternal(10, ch, 1, attr, true); assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10)); ConcurrentNavigableMap<Long, CommitCtx> commits = ctx .getPendingCommitsForTest(); assertTrue(commits.size() == 0); ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret); assertEquals(0, commits.size()); // commit triggered by read doesn't wait assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 11)); // Test request with zero commit offset // There is one pending write [5,10] ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret); assertEquals(0, commits.size()); assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0)); // Empty pending writes ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0)); }