Java 类org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.CommitCtx 实例源码

项目:hadoop    文件:TestOpenFileCtxCache.java   
@Test
public void testEviction() throws IOException, InterruptedException {
  NfsConfiguration conf = new NfsConfiguration();

  // Only two entries will be in the cache
  conf.setInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, 2);

  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context5 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));

  OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);

  boolean ret = cache.put(new FileHandle(1), context1);
  assertTrue(ret);
  Thread.sleep(1000);
  ret = cache.put(new FileHandle(2), context2);
  assertTrue(ret);
  ret = cache.put(new FileHandle(3), context3);
  assertFalse(ret);
  assertTrue(cache.size() == 2);

  // Wait for the oldest stream to be evict-able, insert again
  Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
  assertTrue(cache.size() == 2);

  ret = cache.put(new FileHandle(3), context3);
  assertTrue(ret);
  assertTrue(cache.size() == 2);
  assertTrue(cache.get(new FileHandle(1)) == null);

  // Test inactive entry is evicted immediately
  context3.setActiveStatusForTest(false);
  ret = cache.put(new FileHandle(4), context4);
  assertTrue(ret);

  // Now the cache has context2 and context4
  // Test eviction failure if all entries have pending work.
  context2.getPendingWritesForTest().put(new OffsetRange(0, 100),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  context4.getPendingCommitsForTest().put(new Long(100),
      new CommitCtx(0, null, 0, attr));
  Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
  ret = cache.put(new FileHandle(5), context5);
  assertFalse(ret);
}
项目:hadoop    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
// COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC.
public void testCheckCommit() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  NfsConfiguration conf = new NfsConfiguration();
  conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(conf), false, conf);

  COMMIT_STATUS ret;

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);

  ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  ctx.setNextOffsetForTest(10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  status = ctx.checkCommitInternal(10, ch, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  Assert.assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
  Assert.assertTrue(commits.size() == 1);
  long key = commits.firstKey();
  Assert.assertTrue(key == 11);

  // Test request with zero commit offset
  commits.remove(new Long(11));
  // There is one pending write [5,10]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
  Assert.assertTrue(commits.size() == 1);
  key = commits.firstKey();
  Assert.assertTrue(key == 9);

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
}
项目:hadoop    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with
// large file upload option.
public void testCheckCommitLargeFileUpload() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  NfsConfiguration conf = new NfsConfiguration();
  conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(conf), false, conf);

  COMMIT_STATUS ret;

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 8);
  ctx.setNextOffsetForTest(10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  // Test commit sequential writes
  status = ctx.checkCommitInternal(10, ch, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  // Test commit non-sequential writes
  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  Assert.assertTrue(commits.size() == 1);
  ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS);
  Assert.assertTrue(commits.size() == 1);

  // Test request with zero commit offset
  commits.remove(new Long(10));
  // There is one pending write [10,15]
  ret = ctx.checkCommitInternal(0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  ret = ctx.checkCommitInternal(9, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  Assert.assertTrue(commits.size() == 2);

  // Empty pending writes. nextOffset=10, flushed pos=8
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  // Empty pending writes
  ctx.setNextOffsetForTest((long) 8); // flushed pos = 8
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

}
项目:hadoop    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
// COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC.
public void testCheckCommitFromRead() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);
  NfsConfiguration config = new NfsConfiguration();

  config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(config), false, config);

  FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
  COMMIT_STATUS ret;
  WriteManager wm = new WriteManager(new ShellBasedIdMapping(config), config, false);
  assertTrue(wm.addOpenFileStream(h, ctx));

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
  assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  ctx.setNextOffsetForTest((long)10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
  assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));

  status = ctx.checkCommitInternal(10, ch, 1, attr, true);
  assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10));

  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret);
  assertEquals(0, commits.size()); // commit triggered by read doesn't wait
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 11));

  // Test request with zero commit offset
  // There is one pending write [5,10]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret);
  assertEquals(0, commits.size());
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
}
项目:hadoop    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with large file upload option
public void testCheckCommitFromReadLargeFileUpload() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);
  NfsConfiguration config = new NfsConfiguration();

  config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(config), false, config);

  FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
  COMMIT_STATUS ret;
  WriteManager wm = new WriteManager(new ShellBasedIdMapping(config), config, false);
  assertTrue(wm.addOpenFileStream(h, ctx));

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
  assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 6);
  ctx.setNextOffsetForTest((long)10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
  assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));

  // Test request with sequential writes
  status = ctx.checkCommitInternal(9, ch, 1, attr, true);
  assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  ret = ctx.checkCommit(dfsClient, 9, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 9));

  // Test request with non-sequential writes
  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS, ret);
  assertEquals(0, commits.size()); // commit triggered by read doesn't wait
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 16));

  // Test request with zero commit offset
  // There is one pending write [10,15]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(0, commits.size());
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));
}
项目:aliyun-oss-hadoop-fs    文件:TestOpenFileCtxCache.java   
@Test
public void testEviction() throws IOException, InterruptedException {
  NfsConfiguration conf = new NfsConfiguration();

  // Only two entries will be in the cache
  conf.setInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, 2);

  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context5 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));

  OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);

  boolean ret = cache.put(new FileHandle(1), context1);
  assertTrue(ret);
  Thread.sleep(1000);
  ret = cache.put(new FileHandle(2), context2);
  assertTrue(ret);
  ret = cache.put(new FileHandle(3), context3);
  assertFalse(ret);
  assertTrue(cache.size() == 2);

  // Wait for the oldest stream to be evict-able, insert again
  Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
  assertTrue(cache.size() == 2);

  ret = cache.put(new FileHandle(3), context3);
  assertTrue(ret);
  assertTrue(cache.size() == 2);
  assertTrue(cache.get(new FileHandle(1)) == null);

  // Test inactive entry is evicted immediately
  context3.setActiveStatusForTest(false);
  ret = cache.put(new FileHandle(4), context4);
  assertTrue(ret);

  // Now the cache has context2 and context4
  // Test eviction failure if all entries have pending work.
  context2.getPendingWritesForTest().put(new OffsetRange(0, 100),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  context4.getPendingCommitsForTest().put(new Long(100),
      new CommitCtx(0, null, 0, attr));
  Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
  ret = cache.put(new FileHandle(5), context5);
  assertFalse(ret);
}
项目:aliyun-oss-hadoop-fs    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
// COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC.
public void testCheckCommit() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  NfsConfiguration conf = new NfsConfiguration();
  conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(conf), false, conf);

  COMMIT_STATUS ret;

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);

  ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  ctx.setNextOffsetForTest(10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  status = ctx.checkCommitInternal(10, ch, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  Assert.assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
  Assert.assertTrue(commits.size() == 1);
  long key = commits.firstKey();
  Assert.assertTrue(key == 11);

  // Test request with zero commit offset
  commits.remove(new Long(11));
  // There is one pending write [5,10]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
  Assert.assertTrue(commits.size() == 1);
  key = commits.firstKey();
  Assert.assertTrue(key == 9);

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
}
项目:aliyun-oss-hadoop-fs    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with
// large file upload option.
public void testCheckCommitLargeFileUpload() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  NfsConfiguration conf = new NfsConfiguration();
  conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(conf), false, conf);

  COMMIT_STATUS ret;

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 8);
  ctx.setNextOffsetForTest(10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  // Test commit sequential writes
  status = ctx.checkCommitInternal(10, ch, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  // Test commit non-sequential writes
  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  Assert.assertTrue(commits.size() == 1);
  ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS);
  Assert.assertTrue(commits.size() == 1);

  // Test request with zero commit offset
  commits.remove(new Long(10));
  // There is one pending write [10,15]
  ret = ctx.checkCommitInternal(0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  ret = ctx.checkCommitInternal(9, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  Assert.assertTrue(commits.size() == 2);

  // Empty pending writes. nextOffset=10, flushed pos=8
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  // Empty pending writes
  ctx.setNextOffsetForTest((long) 8); // flushed pos = 8
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

}
项目:aliyun-oss-hadoop-fs    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
// COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC.
public void testCheckCommitFromRead() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);
  NfsConfiguration config = new NfsConfiguration();

  config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(config), false, config);

  FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
  COMMIT_STATUS ret;
  WriteManager wm = new WriteManager(new ShellBasedIdMapping(config), config, false);
  assertTrue(wm.addOpenFileStream(h, ctx));

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
  assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  ctx.setNextOffsetForTest((long)10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
  assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));

  status = ctx.checkCommitInternal(10, ch, 1, attr, true);
  assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10));

  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret);
  assertEquals(0, commits.size()); // commit triggered by read doesn't wait
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 11));

  // Test request with zero commit offset
  // There is one pending write [5,10]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret);
  assertEquals(0, commits.size());
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
}
项目:aliyun-oss-hadoop-fs    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with large file upload option
public void testCheckCommitFromReadLargeFileUpload() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);
  NfsConfiguration config = new NfsConfiguration();

  config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(config), false, config);

  FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
  COMMIT_STATUS ret;
  WriteManager wm = new WriteManager(new ShellBasedIdMapping(config), config, false);
  assertTrue(wm.addOpenFileStream(h, ctx));

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
  assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 6);
  ctx.setNextOffsetForTest((long)10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
  assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));

  // Test request with sequential writes
  status = ctx.checkCommitInternal(9, ch, 1, attr, true);
  assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  ret = ctx.checkCommit(dfsClient, 9, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 9));

  // Test request with non-sequential writes
  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS, ret);
  assertEquals(0, commits.size()); // commit triggered by read doesn't wait
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 16));

  // Test request with zero commit offset
  // There is one pending write [10,15]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(0, commits.size());
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));
}
项目:big-c    文件:TestOpenFileCtxCache.java   
@Test
public void testEviction() throws IOException, InterruptedException {
  NfsConfiguration conf = new NfsConfiguration();

  // Only two entries will be in the cache
  conf.setInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, 2);

  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context5 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));

  OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);

  boolean ret = cache.put(new FileHandle(1), context1);
  assertTrue(ret);
  Thread.sleep(1000);
  ret = cache.put(new FileHandle(2), context2);
  assertTrue(ret);
  ret = cache.put(new FileHandle(3), context3);
  assertFalse(ret);
  assertTrue(cache.size() == 2);

  // Wait for the oldest stream to be evict-able, insert again
  Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
  assertTrue(cache.size() == 2);

  ret = cache.put(new FileHandle(3), context3);
  assertTrue(ret);
  assertTrue(cache.size() == 2);
  assertTrue(cache.get(new FileHandle(1)) == null);

  // Test inactive entry is evicted immediately
  context3.setActiveStatusForTest(false);
  ret = cache.put(new FileHandle(4), context4);
  assertTrue(ret);

  // Now the cache has context2 and context4
  // Test eviction failure if all entries have pending work.
  context2.getPendingWritesForTest().put(new OffsetRange(0, 100),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  context4.getPendingCommitsForTest().put(new Long(100),
      new CommitCtx(0, null, 0, attr));
  Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
  ret = cache.put(new FileHandle(5), context5);
  assertFalse(ret);
}
项目:big-c    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
// COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC.
public void testCheckCommit() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  NfsConfiguration conf = new NfsConfiguration();
  conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(conf), false, conf);

  COMMIT_STATUS ret;

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);

  ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  ctx.setNextOffsetForTest(10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  status = ctx.checkCommitInternal(10, ch, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  Assert.assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
  Assert.assertTrue(commits.size() == 1);
  long key = commits.firstKey();
  Assert.assertTrue(key == 11);

  // Test request with zero commit offset
  commits.remove(new Long(11));
  // There is one pending write [5,10]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
  Assert.assertTrue(commits.size() == 1);
  key = commits.firstKey();
  Assert.assertTrue(key == 9);

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
}
项目:big-c    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with
// large file upload option.
public void testCheckCommitLargeFileUpload() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  NfsConfiguration conf = new NfsConfiguration();
  conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(conf), false, conf);

  COMMIT_STATUS ret;

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 8);
  ctx.setNextOffsetForTest(10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  // Test commit sequential writes
  status = ctx.checkCommitInternal(10, ch, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  // Test commit non-sequential writes
  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  Assert.assertTrue(commits.size() == 1);
  ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS);
  Assert.assertTrue(commits.size() == 1);

  // Test request with zero commit offset
  commits.remove(new Long(10));
  // There is one pending write [10,15]
  ret = ctx.checkCommitInternal(0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  ret = ctx.checkCommitInternal(9, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  Assert.assertTrue(commits.size() == 2);

  // Empty pending writes. nextOffset=10, flushed pos=8
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  // Empty pending writes
  ctx.setNextOffsetForTest((long) 8); // flushed pos = 8
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

}
项目:big-c    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
// COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC.
public void testCheckCommitFromRead() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);
  NfsConfiguration config = new NfsConfiguration();

  config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(config), false, config);

  FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
  COMMIT_STATUS ret;
  WriteManager wm = new WriteManager(new ShellBasedIdMapping(config), config, false);
  assertTrue(wm.addOpenFileStream(h, ctx));

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
  assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  ctx.setNextOffsetForTest((long)10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
  assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));

  status = ctx.checkCommitInternal(10, ch, 1, attr, true);
  assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10));

  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret);
  assertEquals(0, commits.size()); // commit triggered by read doesn't wait
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 11));

  // Test request with zero commit offset
  // There is one pending write [5,10]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret);
  assertEquals(0, commits.size());
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
}
项目:big-c    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with large file upload option
public void testCheckCommitFromReadLargeFileUpload() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);
  NfsConfiguration config = new NfsConfiguration();

  config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(config), false, config);

  FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
  COMMIT_STATUS ret;
  WriteManager wm = new WriteManager(new ShellBasedIdMapping(config), config, false);
  assertTrue(wm.addOpenFileStream(h, ctx));

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
  assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 6);
  ctx.setNextOffsetForTest((long)10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
  assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));

  // Test request with sequential writes
  status = ctx.checkCommitInternal(9, ch, 1, attr, true);
  assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  ret = ctx.checkCommit(dfsClient, 9, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 9));

  // Test request with non-sequential writes
  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS, ret);
  assertEquals(0, commits.size()); // commit triggered by read doesn't wait
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 16));

  // Test request with zero commit offset
  // There is one pending write [10,15]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(0, commits.size());
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestOpenFileCtxCache.java   
@Test
public void testEviction() throws IOException, InterruptedException {
  NfsConfiguration conf = new NfsConfiguration();

  // Only two entries will be in the cache
  conf.setInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, 2);

  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context5 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));

  OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);

  boolean ret = cache.put(new FileHandle(1), context1);
  assertTrue(ret);
  Thread.sleep(1000);
  ret = cache.put(new FileHandle(2), context2);
  assertTrue(ret);
  ret = cache.put(new FileHandle(3), context3);
  assertFalse(ret);
  assertTrue(cache.size() == 2);

  // Wait for the oldest stream to be evict-able, insert again
  Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
  assertTrue(cache.size() == 2);

  ret = cache.put(new FileHandle(3), context3);
  assertTrue(ret);
  assertTrue(cache.size() == 2);
  assertTrue(cache.get(new FileHandle(1)) == null);

  // Test inactive entry is evicted immediately
  context3.setActiveStatusForTest(false);
  ret = cache.put(new FileHandle(4), context4);
  assertTrue(ret);

  // Now the cache has context2 and context4
  // Test eviction failure if all entries have pending work.
  context2.getPendingWritesForTest().put(new OffsetRange(0, 100),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  context4.getPendingCommitsForTest().put(new Long(100),
      new CommitCtx(0, null, 0, attr));
  Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
  ret = cache.put(new FileHandle(5), context5);
  assertFalse(ret);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
// COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC.
public void testCheckCommit() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  NfsConfiguration conf = new NfsConfiguration();
  conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(conf), false, conf);

  COMMIT_STATUS ret;

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);

  ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  ctx.setNextOffsetForTest(10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  status = ctx.checkCommitInternal(10, ch, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  Assert.assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
  Assert.assertTrue(commits.size() == 1);
  long key = commits.firstKey();
  Assert.assertTrue(key == 11);

  // Test request with zero commit offset
  commits.remove(new Long(11));
  // There is one pending write [5,10]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
  Assert.assertTrue(commits.size() == 1);
  key = commits.firstKey();
  Assert.assertTrue(key == 9);

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with
// large file upload option.
public void testCheckCommitLargeFileUpload() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  NfsConfiguration conf = new NfsConfiguration();
  conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(conf), false, conf);

  COMMIT_STATUS ret;

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 8);
  ctx.setNextOffsetForTest(10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  // Test commit sequential writes
  status = ctx.checkCommitInternal(10, ch, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  // Test commit non-sequential writes
  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  Assert.assertTrue(commits.size() == 1);
  ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS);
  Assert.assertTrue(commits.size() == 1);

  // Test request with zero commit offset
  commits.remove(new Long(10));
  // There is one pending write [10,15]
  ret = ctx.checkCommitInternal(0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  ret = ctx.checkCommitInternal(9, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  Assert.assertTrue(commits.size() == 2);

  // Empty pending writes. nextOffset=10, flushed pos=8
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  // Empty pending writes
  ctx.setNextOffsetForTest((long) 8); // flushed pos = 8
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
// COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC.
public void testCheckCommitFromRead() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);
  NfsConfiguration config = new NfsConfiguration();

  config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(config), false, config);

  FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
  COMMIT_STATUS ret;
  WriteManager wm = new WriteManager(new ShellBasedIdMapping(config), config, false);
  assertTrue(wm.addOpenFileStream(h, ctx));

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
  assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  ctx.setNextOffsetForTest((long)10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
  assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));

  status = ctx.checkCommitInternal(10, ch, 1, attr, true);
  assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10));

  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret);
  assertEquals(0, commits.size()); // commit triggered by read doesn't wait
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 11));

  // Test request with zero commit offset
  // There is one pending write [5,10]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret);
  assertEquals(0, commits.size());
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with large file upload option
public void testCheckCommitFromReadLargeFileUpload() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);
  NfsConfiguration config = new NfsConfiguration();

  config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(config), false, config);

  FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
  COMMIT_STATUS ret;
  WriteManager wm = new WriteManager(new ShellBasedIdMapping(config), config, false);
  assertTrue(wm.addOpenFileStream(h, ctx));

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
  assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 6);
  ctx.setNextOffsetForTest((long)10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
  assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));

  // Test request with sequential writes
  status = ctx.checkCommitInternal(9, ch, 1, attr, true);
  assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  ret = ctx.checkCommit(dfsClient, 9, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 9));

  // Test request with non-sequential writes
  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS, ret);
  assertEquals(0, commits.size()); // commit triggered by read doesn't wait
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 16));

  // Test request with zero commit offset
  // There is one pending write [10,15]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(0, commits.size());
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));
}
项目:hops    文件:TestOpenFileCtxCache.java   
@Test
public void testEviction() throws IOException, InterruptedException {
  Configuration conf = new Configuration();

  // Only two entries will be in the cache
  conf.setInt(Nfs3Constant.MAX_OPEN_FILES, 2);

  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  OpenFileCtx context1 =
      new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
          new IdUserGroup());
  OpenFileCtx context2 =
      new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
          new IdUserGroup());
  OpenFileCtx context3 =
      new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
          new IdUserGroup());
  OpenFileCtx context4 =
      new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
          new IdUserGroup());
  OpenFileCtx context5 =
      new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
          new IdUserGroup());

  OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);

  boolean ret = cache.put(new FileHandle(1), context1);
  assertTrue(ret);
  Thread.sleep(1000);
  ret = cache.put(new FileHandle(2), context2);
  assertTrue(ret);
  ret = cache.put(new FileHandle(3), context3);
  assertFalse(ret);
  assertTrue(cache.size() == 2);

  // Wait for the oldest stream to be evict-able, insert again
  Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
  assertTrue(cache.size() == 2);

  ret = cache.put(new FileHandle(3), context3);
  assertTrue(ret);
  assertTrue(cache.size() == 2);
  assertTrue(cache.get(new FileHandle(1)) == null);

  // Test inactive entry is evicted immediately
  context3.setActiveStatusForTest(false);
  ret = cache.put(new FileHandle(4), context4);
  assertTrue(ret);

  // Now the cache has context2 and context4
  // Test eviction failure if all entries have pending work.
  context2.getPendingWritesForTest().put(new OffsetRange(0, 100),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  context4.getPendingCommitsForTest()
      .put(new Long(100), new CommitCtx(0, null, 0, attr));
  Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
  ret = cache.put(new FileHandle(5), context5);
  assertFalse(ret);
}
项目:hops    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
// COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC.
public void testCheckCommit() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new IdUserGroup());

  COMMIT_STATUS ret;

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);

  ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  status = ctx.checkCommitInternal(10, ch, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  ConcurrentNavigableMap<Long, CommitCtx> commits =
      ctx.getPendingCommitsForTest();
  Assert.assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
  Assert.assertTrue(commits.size() == 1);
  long key = commits.firstKey();
  Assert.assertTrue(key == 11);

  // Test request with zero commit offset
  commits.remove(new Long(11));
  // There is one pending write [5,10]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
  Assert.assertTrue(commits.size() == 1);
  key = commits.firstKey();
  Assert.assertTrue(key == 9);

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
}
项目:hops    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
// COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC.
public void testCheckCommitFromRead() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new IdUserGroup());

  FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
  COMMIT_STATUS ret;
  WriteManager wm = new WriteManager(new IdUserGroup(), new Configuration());
  assertTrue(wm.addOpenFileStream(h, ctx));

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));

  ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
  assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
  assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));

  status = ctx.checkCommitInternal(10, ch, 1, attr, true);
  assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10));

  ConcurrentNavigableMap<Long, CommitCtx> commits =
      ctx.getPendingCommitsForTest();
  assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret);
  assertEquals(0, commits.size()); // commit triggered by read doesn't wait
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX,
      wm.commitBeforeRead(dfsClient, h, 11));

  // Test request with zero commit offset
  // There is one pending write [5,10]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret);
  assertEquals(0, commits.size());
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX,
      wm.commitBeforeRead(dfsClient, h, 0));

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
}
项目:hadoop-on-lustre2    文件:TestOpenFileCtxCache.java   
@Test
public void testEviction() throws IOException, InterruptedException {
  Configuration conf = new Configuration();

  // Only two entries will be in the cache
  conf.setInt(Nfs3Constant.MAX_OPEN_FILES, 2);

  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new IdUserGroup());
  OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new IdUserGroup());
  OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new IdUserGroup());
  OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new IdUserGroup());
  OpenFileCtx context5 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new IdUserGroup());

  OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);

  boolean ret = cache.put(new FileHandle(1), context1);
  assertTrue(ret);
  Thread.sleep(1000);
  ret = cache.put(new FileHandle(2), context2);
  assertTrue(ret);
  ret = cache.put(new FileHandle(3), context3);
  assertFalse(ret);
  assertTrue(cache.size() == 2);

  // Wait for the oldest stream to be evict-able, insert again
  Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
  assertTrue(cache.size() == 2);

  ret = cache.put(new FileHandle(3), context3);
  assertTrue(ret);
  assertTrue(cache.size() == 2);
  assertTrue(cache.get(new FileHandle(1)) == null);

  // Test inactive entry is evicted immediately
  context3.setActiveStatusForTest(false);
  ret = cache.put(new FileHandle(4), context4);
  assertTrue(ret);

  // Now the cache has context2 and context4
  // Test eviction failure if all entries have pending work.
  context2.getPendingWritesForTest().put(new OffsetRange(0, 100),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  context4.getPendingCommitsForTest().put(new Long(100),
      new CommitCtx(0, null, 0, attr));
  Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
  ret = cache.put(new FileHandle(5), context5);
  assertFalse(ret);
}
项目:hadoop-on-lustre2    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
// COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC.
public void testCheckCommit() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new IdUserGroup());

  COMMIT_STATUS ret;

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);

  ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  status = ctx.checkCommitInternal(10, ch, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  Assert.assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
  Assert.assertTrue(commits.size() == 1);
  long key = commits.firstKey();
  Assert.assertTrue(key == 11);

  // Test request with zero commit offset
  commits.remove(new Long(11));
  // There is one pending write [5,10]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
  Assert.assertTrue(commits.size() == 1);
  key = commits.firstKey();
  Assert.assertTrue(key == 9);

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
}
项目:hadoop-on-lustre2    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
// COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC.
public void testCheckCommitFromRead() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new IdUserGroup());

  FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
  COMMIT_STATUS ret;
  WriteManager wm = new WriteManager(new IdUserGroup(), new Configuration());
  assertTrue(wm.addOpenFileStream(h, ctx));

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));

  ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
  assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
  assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));

  status = ctx.checkCommitInternal(10, ch, 1, attr, true);
  assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10));

  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret);
  assertEquals(0, commits.size()); // commit triggered by read doesn't wait
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 11));

  // Test request with zero commit offset
  // There is one pending write [5,10]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret);
  assertEquals(0, commits.size());
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
}