Java 类org.apache.hadoop.security.ShellBasedIdMapping 实例源码

项目:hadoop    文件:TestWrites.java   
@Test
public void testCheckCommitAixCompatMode() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);

  NfsConfiguration conf = new NfsConfiguration();
  conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  // Enable AIX compatibility mode.
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(new NfsConfiguration()), true, conf);

  // Test fall-through to pendingWrites check in the event that commitOffset
  // is greater than the number of bytes we've so far flushed.
  Mockito.when(fos.getPos()).thenReturn((long) 2);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_FINISHED);

  // Test the case when we actually have received more bytes than we're trying
  // to commit.
  ctx.getPendingWritesForTest().put(new OffsetRange(0, 10),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  ctx.setNextOffsetForTest((long)10);
  status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
}
项目:hadoop    文件:TestWrites.java   
@Test
public void testCheckSequential() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);
  NfsConfiguration config = new NfsConfiguration();

  config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(config), false, config);

  ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ctx.getPendingWritesForTest().put(new OffsetRange(20, 25),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));

  assertTrue(!ctx.checkSequential(5, 4));
  assertTrue(ctx.checkSequential(9, 5));
  assertTrue(ctx.checkSequential(10, 5));
  assertTrue(ctx.checkSequential(14, 5));
  assertTrue(!ctx.checkSequential(15, 5));
  assertTrue(!ctx.checkSequential(20, 5));
  assertTrue(!ctx.checkSequential(25, 5));
  assertTrue(!ctx.checkSequential(999, 5));
}
项目:aliyun-oss-hadoop-fs    文件:TestWrites.java   
@Test
public void testCheckCommitAixCompatMode() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);

  NfsConfiguration conf = new NfsConfiguration();
  conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  // Enable AIX compatibility mode.
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(new NfsConfiguration()), true, conf);

  // Test fall-through to pendingWrites check in the event that commitOffset
  // is greater than the number of bytes we've so far flushed.
  Mockito.when(fos.getPos()).thenReturn((long) 2);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_FINISHED);

  // Test the case when we actually have received more bytes than we're trying
  // to commit.
  ctx.getPendingWritesForTest().put(new OffsetRange(0, 10),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  ctx.setNextOffsetForTest((long)10);
  status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
}
项目:aliyun-oss-hadoop-fs    文件:TestWrites.java   
@Test
public void testCheckSequential() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);
  NfsConfiguration config = new NfsConfiguration();

  config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(config), false, config);

  ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ctx.getPendingWritesForTest().put(new OffsetRange(20, 25),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));

  assertTrue(!ctx.checkSequential(5, 4));
  assertTrue(ctx.checkSequential(9, 5));
  assertTrue(ctx.checkSequential(10, 5));
  assertTrue(ctx.checkSequential(14, 5));
  assertTrue(!ctx.checkSequential(15, 5));
  assertTrue(!ctx.checkSequential(20, 5));
  assertTrue(!ctx.checkSequential(25, 5));
  assertTrue(!ctx.checkSequential(999, 5));
}
项目:big-c    文件:TestWrites.java   
@Test
public void testCheckCommitAixCompatMode() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);

  NfsConfiguration conf = new NfsConfiguration();
  conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  // Enable AIX compatibility mode.
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(new NfsConfiguration()), true, conf);

  // Test fall-through to pendingWrites check in the event that commitOffset
  // is greater than the number of bytes we've so far flushed.
  Mockito.when(fos.getPos()).thenReturn((long) 2);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_FINISHED);

  // Test the case when we actually have received more bytes than we're trying
  // to commit.
  ctx.getPendingWritesForTest().put(new OffsetRange(0, 10),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  ctx.setNextOffsetForTest((long)10);
  status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
}
项目:big-c    文件:TestWrites.java   
@Test
public void testCheckSequential() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);
  NfsConfiguration config = new NfsConfiguration();

  config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(config), false, config);

  ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ctx.getPendingWritesForTest().put(new OffsetRange(20, 25),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));

  assertTrue(!ctx.checkSequential(5, 4));
  assertTrue(ctx.checkSequential(9, 5));
  assertTrue(ctx.checkSequential(10, 5));
  assertTrue(ctx.checkSequential(14, 5));
  assertTrue(!ctx.checkSequential(15, 5));
  assertTrue(!ctx.checkSequential(20, 5));
  assertTrue(!ctx.checkSequential(25, 5));
  assertTrue(!ctx.checkSequential(999, 5));
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestWrites.java   
@Test
public void testCheckCommitAixCompatMode() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);

  NfsConfiguration conf = new NfsConfiguration();
  conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  // Enable AIX compatibility mode.
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(new NfsConfiguration()), true, conf);

  // Test fall-through to pendingWrites check in the event that commitOffset
  // is greater than the number of bytes we've so far flushed.
  Mockito.when(fos.getPos()).thenReturn((long) 2);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_FINISHED);

  // Test the case when we actually have received more bytes than we're trying
  // to commit.
  ctx.getPendingWritesForTest().put(new OffsetRange(0, 10),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  ctx.setNextOffsetForTest((long)10);
  status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestWrites.java   
@Test
public void testCheckSequential() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);
  NfsConfiguration config = new NfsConfiguration();

  config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(config), false, config);

  ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ctx.getPendingWritesForTest().put(new OffsetRange(20, 25),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));

  assertTrue(!ctx.checkSequential(5, 4));
  assertTrue(ctx.checkSequential(9, 5));
  assertTrue(ctx.checkSequential(10, 5));
  assertTrue(ctx.checkSequential(14, 5));
  assertTrue(!ctx.checkSequential(15, 5));
  assertTrue(!ctx.checkSequential(20, 5));
  assertTrue(!ctx.checkSequential(25, 5));
  assertTrue(!ctx.checkSequential(999, 5));
}
项目:hadoop    文件:RpcProgramNfs3.java   
public RpcProgramNfs3(NfsConfiguration config, DatagramSocket registrationSocket,
    boolean allowInsecurePorts) throws IOException {
  super("NFS3", "localhost", config.getInt(
      NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY,
      NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT), Nfs3Constant.PROGRAM,
      Nfs3Constant.VERSION, Nfs3Constant.VERSION, registrationSocket,
      allowInsecurePorts);

  this.config = config;
  config.set(FsPermission.UMASK_LABEL, "000");
  iug = new ShellBasedIdMapping(config);

  aixCompatMode = config.getBoolean(
      NfsConfigKeys.AIX_COMPAT_MODE_KEY,
      NfsConfigKeys.AIX_COMPAT_MODE_DEFAULT);
  exports = NfsExports.getInstance(config);
  writeManager = new WriteManager(iug, config, aixCompatMode);
  clientCache = new DFSClientCache(config);
  replication = (short) config.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
      DFSConfigKeys.DFS_REPLICATION_DEFAULT);
  blockSize = config.getLongBytes(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
      DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
  bufferSize = config.getInt(
      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);

  writeDumpDir = config.get(NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_KEY,
      NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_DEFAULT);
  boolean enableDump = config.getBoolean(NfsConfigKeys.DFS_NFS_FILE_DUMP_KEY,
      NfsConfigKeys.DFS_NFS_FILE_DUMP_DEFAULT);
  UserGroupInformation.setConfiguration(config);
  SecurityUtil.login(config, NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY,
      NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY);
  superuser = config.get(NfsConfigKeys.NFS_SUPERUSER_KEY,
      NfsConfigKeys.NFS_SUPERUSER_DEFAULT);
  LOG.info("Configured HDFS superuser is " + superuser);

  if (!enableDump) {
    writeDumpDir = null;
  } else {
    clearDirectory(writeDumpDir);
  }

  rpcCallCache = new RpcCallCache("NFS3", 256);
  infoServer = new Nfs3HttpServer(config);
}
项目:hadoop    文件:TestOpenFileCtxCache.java   
@Test
public void testEviction() throws IOException, InterruptedException {
  NfsConfiguration conf = new NfsConfiguration();

  // Only two entries will be in the cache
  conf.setInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, 2);

  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context5 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));

  OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);

  boolean ret = cache.put(new FileHandle(1), context1);
  assertTrue(ret);
  Thread.sleep(1000);
  ret = cache.put(new FileHandle(2), context2);
  assertTrue(ret);
  ret = cache.put(new FileHandle(3), context3);
  assertFalse(ret);
  assertTrue(cache.size() == 2);

  // Wait for the oldest stream to be evict-able, insert again
  Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
  assertTrue(cache.size() == 2);

  ret = cache.put(new FileHandle(3), context3);
  assertTrue(ret);
  assertTrue(cache.size() == 2);
  assertTrue(cache.get(new FileHandle(1)) == null);

  // Test inactive entry is evicted immediately
  context3.setActiveStatusForTest(false);
  ret = cache.put(new FileHandle(4), context4);
  assertTrue(ret);

  // Now the cache has context2 and context4
  // Test eviction failure if all entries have pending work.
  context2.getPendingWritesForTest().put(new OffsetRange(0, 100),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  context4.getPendingCommitsForTest().put(new Long(100),
      new CommitCtx(0, null, 0, attr));
  Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
  ret = cache.put(new FileHandle(5), context5);
  assertFalse(ret);
}
项目:hadoop    文件:TestOpenFileCtxCache.java   
@Test
public void testScan() throws IOException, InterruptedException {
  NfsConfiguration conf = new NfsConfiguration();

  // Only two entries will be in the cache
  conf.setInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, 2);

  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));

  OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);

  // Test cleaning expired entry
  boolean ret = cache.put(new FileHandle(1), context1);
  assertTrue(ret);
  ret = cache.put(new FileHandle(2), context2);
  assertTrue(ret);
  Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT + 1);
  cache.scan(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
  assertTrue(cache.size() == 0);

  // Test cleaning inactive entry
  ret = cache.put(new FileHandle(3), context3);
  assertTrue(ret);
  ret = cache.put(new FileHandle(4), context4);
  assertTrue(ret);
  context3.setActiveStatusForTest(false);
  cache.scan(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_DEFAULT);
  assertTrue(cache.size() == 1);
  assertTrue(cache.get(new FileHandle(3)) == null);
  assertTrue(cache.get(new FileHandle(4)) != null);
}
项目:hadoop    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
// COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC.
public void testCheckCommit() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  NfsConfiguration conf = new NfsConfiguration();
  conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(conf), false, conf);

  COMMIT_STATUS ret;

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);

  ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  ctx.setNextOffsetForTest(10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  status = ctx.checkCommitInternal(10, ch, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  Assert.assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
  Assert.assertTrue(commits.size() == 1);
  long key = commits.firstKey();
  Assert.assertTrue(key == 11);

  // Test request with zero commit offset
  commits.remove(new Long(11));
  // There is one pending write [5,10]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
  Assert.assertTrue(commits.size() == 1);
  key = commits.firstKey();
  Assert.assertTrue(key == 9);

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
}
项目:hadoop    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with
// large file upload option.
public void testCheckCommitLargeFileUpload() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  NfsConfiguration conf = new NfsConfiguration();
  conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(conf), false, conf);

  COMMIT_STATUS ret;

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 8);
  ctx.setNextOffsetForTest(10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  // Test commit sequential writes
  status = ctx.checkCommitInternal(10, ch, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  // Test commit non-sequential writes
  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  Assert.assertTrue(commits.size() == 1);
  ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS);
  Assert.assertTrue(commits.size() == 1);

  // Test request with zero commit offset
  commits.remove(new Long(10));
  // There is one pending write [10,15]
  ret = ctx.checkCommitInternal(0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  ret = ctx.checkCommitInternal(9, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  Assert.assertTrue(commits.size() == 2);

  // Empty pending writes. nextOffset=10, flushed pos=8
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  // Empty pending writes
  ctx.setNextOffsetForTest((long) 8); // flushed pos = 8
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

}
项目:hadoop    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
// COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC.
public void testCheckCommitFromRead() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);
  NfsConfiguration config = new NfsConfiguration();

  config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(config), false, config);

  FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
  COMMIT_STATUS ret;
  WriteManager wm = new WriteManager(new ShellBasedIdMapping(config), config, false);
  assertTrue(wm.addOpenFileStream(h, ctx));

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
  assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  ctx.setNextOffsetForTest((long)10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
  assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));

  status = ctx.checkCommitInternal(10, ch, 1, attr, true);
  assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10));

  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret);
  assertEquals(0, commits.size()); // commit triggered by read doesn't wait
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 11));

  // Test request with zero commit offset
  // There is one pending write [5,10]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret);
  assertEquals(0, commits.size());
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
}
项目:hadoop    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with large file upload option
public void testCheckCommitFromReadLargeFileUpload() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);
  NfsConfiguration config = new NfsConfiguration();

  config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(config), false, config);

  FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
  COMMIT_STATUS ret;
  WriteManager wm = new WriteManager(new ShellBasedIdMapping(config), config, false);
  assertTrue(wm.addOpenFileStream(h, ctx));

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
  assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 6);
  ctx.setNextOffsetForTest((long)10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
  assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));

  // Test request with sequential writes
  status = ctx.checkCommitInternal(9, ch, 1, attr, true);
  assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  ret = ctx.checkCommit(dfsClient, 9, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 9));

  // Test request with non-sequential writes
  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS, ret);
  assertEquals(0, commits.size()); // commit triggered by read doesn't wait
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 16));

  // Test request with zero commit offset
  // There is one pending write [10,15]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(0, commits.size());
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));
}
项目:aliyun-oss-hadoop-fs    文件:RpcProgramNfs3.java   
public RpcProgramNfs3(NfsConfiguration config, DatagramSocket registrationSocket,
    boolean allowInsecurePorts) throws IOException {
  super("NFS3", "localhost", config.getInt(
      NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY,
      NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT), Nfs3Constant.PROGRAM,
      Nfs3Constant.VERSION, Nfs3Constant.VERSION, registrationSocket,
      allowInsecurePorts);

  this.config = config;
  config.set(FsPermission.UMASK_LABEL, "000");
  iug = new ShellBasedIdMapping(config);

  aixCompatMode = config.getBoolean(
      NfsConfigKeys.AIX_COMPAT_MODE_KEY,
      NfsConfigKeys.AIX_COMPAT_MODE_DEFAULT);
  exports = NfsExports.getInstance(config);
  writeManager = new WriteManager(iug, config, aixCompatMode);
  clientCache = new DFSClientCache(config);
  replication = (short) config.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
      DFSConfigKeys.DFS_REPLICATION_DEFAULT);
  blockSize = config.getLongBytes(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
      DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
  bufferSize = config.getInt(
      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);

  writeDumpDir = config.get(NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_KEY,
      NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_DEFAULT);
  boolean enableDump = config.getBoolean(NfsConfigKeys.DFS_NFS_FILE_DUMP_KEY,
      NfsConfigKeys.DFS_NFS_FILE_DUMP_DEFAULT);
  UserGroupInformation.setConfiguration(config);
  SecurityUtil.login(config, NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY,
      NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY);
  superuser = config.get(NfsConfigKeys.NFS_SUPERUSER_KEY,
      NfsConfigKeys.NFS_SUPERUSER_DEFAULT);
  LOG.info("Configured HDFS superuser is " + superuser);

  if (!enableDump) {
    writeDumpDir = null;
  } else {
    clearDirectory(writeDumpDir);
  }

  rpcCallCache = new RpcCallCache("NFS3", 256);
  infoServer = new Nfs3HttpServer(config);
}
项目:aliyun-oss-hadoop-fs    文件:TestOpenFileCtxCache.java   
@Test
public void testEviction() throws IOException, InterruptedException {
  NfsConfiguration conf = new NfsConfiguration();

  // Only two entries will be in the cache
  conf.setInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, 2);

  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context5 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));

  OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);

  boolean ret = cache.put(new FileHandle(1), context1);
  assertTrue(ret);
  Thread.sleep(1000);
  ret = cache.put(new FileHandle(2), context2);
  assertTrue(ret);
  ret = cache.put(new FileHandle(3), context3);
  assertFalse(ret);
  assertTrue(cache.size() == 2);

  // Wait for the oldest stream to be evict-able, insert again
  Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
  assertTrue(cache.size() == 2);

  ret = cache.put(new FileHandle(3), context3);
  assertTrue(ret);
  assertTrue(cache.size() == 2);
  assertTrue(cache.get(new FileHandle(1)) == null);

  // Test inactive entry is evicted immediately
  context3.setActiveStatusForTest(false);
  ret = cache.put(new FileHandle(4), context4);
  assertTrue(ret);

  // Now the cache has context2 and context4
  // Test eviction failure if all entries have pending work.
  context2.getPendingWritesForTest().put(new OffsetRange(0, 100),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  context4.getPendingCommitsForTest().put(new Long(100),
      new CommitCtx(0, null, 0, attr));
  Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
  ret = cache.put(new FileHandle(5), context5);
  assertFalse(ret);
}
项目:aliyun-oss-hadoop-fs    文件:TestOpenFileCtxCache.java   
@Test
public void testScan() throws IOException, InterruptedException {
  NfsConfiguration conf = new NfsConfiguration();

  // Only two entries will be in the cache
  conf.setInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, 2);

  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));

  OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);

  // Test cleaning expired entry
  boolean ret = cache.put(new FileHandle(1), context1);
  assertTrue(ret);
  ret = cache.put(new FileHandle(2), context2);
  assertTrue(ret);
  Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT + 1);
  cache.scan(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
  assertTrue(cache.size() == 0);

  // Test cleaning inactive entry
  ret = cache.put(new FileHandle(3), context3);
  assertTrue(ret);
  ret = cache.put(new FileHandle(4), context4);
  assertTrue(ret);
  context3.setActiveStatusForTest(false);
  cache.scan(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_DEFAULT);
  assertTrue(cache.size() == 1);
  assertTrue(cache.get(new FileHandle(3)) == null);
  assertTrue(cache.get(new FileHandle(4)) != null);
}
项目:aliyun-oss-hadoop-fs    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
// COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC.
public void testCheckCommit() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  NfsConfiguration conf = new NfsConfiguration();
  conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(conf), false, conf);

  COMMIT_STATUS ret;

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);

  ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  ctx.setNextOffsetForTest(10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  status = ctx.checkCommitInternal(10, ch, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  Assert.assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
  Assert.assertTrue(commits.size() == 1);
  long key = commits.firstKey();
  Assert.assertTrue(key == 11);

  // Test request with zero commit offset
  commits.remove(new Long(11));
  // There is one pending write [5,10]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
  Assert.assertTrue(commits.size() == 1);
  key = commits.firstKey();
  Assert.assertTrue(key == 9);

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
}
项目:aliyun-oss-hadoop-fs    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with
// large file upload option.
public void testCheckCommitLargeFileUpload() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  NfsConfiguration conf = new NfsConfiguration();
  conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(conf), false, conf);

  COMMIT_STATUS ret;

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 8);
  ctx.setNextOffsetForTest(10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  // Test commit sequential writes
  status = ctx.checkCommitInternal(10, ch, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  // Test commit non-sequential writes
  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  Assert.assertTrue(commits.size() == 1);
  ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS);
  Assert.assertTrue(commits.size() == 1);

  // Test request with zero commit offset
  commits.remove(new Long(10));
  // There is one pending write [10,15]
  ret = ctx.checkCommitInternal(0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  ret = ctx.checkCommitInternal(9, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  Assert.assertTrue(commits.size() == 2);

  // Empty pending writes. nextOffset=10, flushed pos=8
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  // Empty pending writes
  ctx.setNextOffsetForTest((long) 8); // flushed pos = 8
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

}
项目:aliyun-oss-hadoop-fs    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
// COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC.
public void testCheckCommitFromRead() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);
  NfsConfiguration config = new NfsConfiguration();

  config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(config), false, config);

  FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
  COMMIT_STATUS ret;
  WriteManager wm = new WriteManager(new ShellBasedIdMapping(config), config, false);
  assertTrue(wm.addOpenFileStream(h, ctx));

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
  assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  ctx.setNextOffsetForTest((long)10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
  assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));

  status = ctx.checkCommitInternal(10, ch, 1, attr, true);
  assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10));

  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret);
  assertEquals(0, commits.size()); // commit triggered by read doesn't wait
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 11));

  // Test request with zero commit offset
  // There is one pending write [5,10]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret);
  assertEquals(0, commits.size());
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
}
项目:aliyun-oss-hadoop-fs    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with large file upload option
public void testCheckCommitFromReadLargeFileUpload() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);
  NfsConfiguration config = new NfsConfiguration();

  config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(config), false, config);

  FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
  COMMIT_STATUS ret;
  WriteManager wm = new WriteManager(new ShellBasedIdMapping(config), config, false);
  assertTrue(wm.addOpenFileStream(h, ctx));

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
  assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 6);
  ctx.setNextOffsetForTest((long)10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
  assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));

  // Test request with sequential writes
  status = ctx.checkCommitInternal(9, ch, 1, attr, true);
  assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  ret = ctx.checkCommit(dfsClient, 9, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 9));

  // Test request with non-sequential writes
  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS, ret);
  assertEquals(0, commits.size()); // commit triggered by read doesn't wait
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 16));

  // Test request with zero commit offset
  // There is one pending write [10,15]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(0, commits.size());
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));
}
项目:big-c    文件:RpcProgramNfs3.java   
public RpcProgramNfs3(NfsConfiguration config, DatagramSocket registrationSocket,
    boolean allowInsecurePorts) throws IOException {
  super("NFS3", "localhost", config.getInt(
      NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY,
      NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT), Nfs3Constant.PROGRAM,
      Nfs3Constant.VERSION, Nfs3Constant.VERSION, registrationSocket,
      allowInsecurePorts);

  this.config = config;
  config.set(FsPermission.UMASK_LABEL, "000");
  iug = new ShellBasedIdMapping(config);

  aixCompatMode = config.getBoolean(
      NfsConfigKeys.AIX_COMPAT_MODE_KEY,
      NfsConfigKeys.AIX_COMPAT_MODE_DEFAULT);
  exports = NfsExports.getInstance(config);
  writeManager = new WriteManager(iug, config, aixCompatMode);
  clientCache = new DFSClientCache(config);
  replication = (short) config.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
      DFSConfigKeys.DFS_REPLICATION_DEFAULT);
  blockSize = config.getLongBytes(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
      DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
  bufferSize = config.getInt(
      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);

  writeDumpDir = config.get(NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_KEY,
      NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_DEFAULT);
  boolean enableDump = config.getBoolean(NfsConfigKeys.DFS_NFS_FILE_DUMP_KEY,
      NfsConfigKeys.DFS_NFS_FILE_DUMP_DEFAULT);
  UserGroupInformation.setConfiguration(config);
  SecurityUtil.login(config, NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY,
      NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY);
  superuser = config.get(NfsConfigKeys.NFS_SUPERUSER_KEY,
      NfsConfigKeys.NFS_SUPERUSER_DEFAULT);
  LOG.info("Configured HDFS superuser is " + superuser);

  if (!enableDump) {
    writeDumpDir = null;
  } else {
    clearDirectory(writeDumpDir);
  }

  rpcCallCache = new RpcCallCache("NFS3", 256);
  infoServer = new Nfs3HttpServer(config);
}
项目:big-c    文件:TestOpenFileCtxCache.java   
@Test
public void testEviction() throws IOException, InterruptedException {
  NfsConfiguration conf = new NfsConfiguration();

  // Only two entries will be in the cache
  conf.setInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, 2);

  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context5 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));

  OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);

  boolean ret = cache.put(new FileHandle(1), context1);
  assertTrue(ret);
  Thread.sleep(1000);
  ret = cache.put(new FileHandle(2), context2);
  assertTrue(ret);
  ret = cache.put(new FileHandle(3), context3);
  assertFalse(ret);
  assertTrue(cache.size() == 2);

  // Wait for the oldest stream to be evict-able, insert again
  Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
  assertTrue(cache.size() == 2);

  ret = cache.put(new FileHandle(3), context3);
  assertTrue(ret);
  assertTrue(cache.size() == 2);
  assertTrue(cache.get(new FileHandle(1)) == null);

  // Test inactive entry is evicted immediately
  context3.setActiveStatusForTest(false);
  ret = cache.put(new FileHandle(4), context4);
  assertTrue(ret);

  // Now the cache has context2 and context4
  // Test eviction failure if all entries have pending work.
  context2.getPendingWritesForTest().put(new OffsetRange(0, 100),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  context4.getPendingCommitsForTest().put(new Long(100),
      new CommitCtx(0, null, 0, attr));
  Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
  ret = cache.put(new FileHandle(5), context5);
  assertFalse(ret);
}
项目:big-c    文件:TestOpenFileCtxCache.java   
@Test
public void testScan() throws IOException, InterruptedException {
  NfsConfiguration conf = new NfsConfiguration();

  // Only two entries will be in the cache
  conf.setInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, 2);

  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));

  OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);

  // Test cleaning expired entry
  boolean ret = cache.put(new FileHandle(1), context1);
  assertTrue(ret);
  ret = cache.put(new FileHandle(2), context2);
  assertTrue(ret);
  Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT + 1);
  cache.scan(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
  assertTrue(cache.size() == 0);

  // Test cleaning inactive entry
  ret = cache.put(new FileHandle(3), context3);
  assertTrue(ret);
  ret = cache.put(new FileHandle(4), context4);
  assertTrue(ret);
  context3.setActiveStatusForTest(false);
  cache.scan(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_DEFAULT);
  assertTrue(cache.size() == 1);
  assertTrue(cache.get(new FileHandle(3)) == null);
  assertTrue(cache.get(new FileHandle(4)) != null);
}
项目:big-c    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
// COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC.
public void testCheckCommit() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  NfsConfiguration conf = new NfsConfiguration();
  conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(conf), false, conf);

  COMMIT_STATUS ret;

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);

  ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  ctx.setNextOffsetForTest(10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  status = ctx.checkCommitInternal(10, ch, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  Assert.assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
  Assert.assertTrue(commits.size() == 1);
  long key = commits.firstKey();
  Assert.assertTrue(key == 11);

  // Test request with zero commit offset
  commits.remove(new Long(11));
  // There is one pending write [5,10]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
  Assert.assertTrue(commits.size() == 1);
  key = commits.firstKey();
  Assert.assertTrue(key == 9);

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
}
项目:big-c    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with
// large file upload option.
public void testCheckCommitLargeFileUpload() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  NfsConfiguration conf = new NfsConfiguration();
  conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(conf), false, conf);

  COMMIT_STATUS ret;

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 8);
  ctx.setNextOffsetForTest(10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  // Test commit sequential writes
  status = ctx.checkCommitInternal(10, ch, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  // Test commit non-sequential writes
  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  Assert.assertTrue(commits.size() == 1);
  ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS);
  Assert.assertTrue(commits.size() == 1);

  // Test request with zero commit offset
  commits.remove(new Long(10));
  // There is one pending write [10,15]
  ret = ctx.checkCommitInternal(0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  ret = ctx.checkCommitInternal(9, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  Assert.assertTrue(commits.size() == 2);

  // Empty pending writes. nextOffset=10, flushed pos=8
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  // Empty pending writes
  ctx.setNextOffsetForTest((long) 8); // flushed pos = 8
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

}
项目:big-c    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
// COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC.
public void testCheckCommitFromRead() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);
  NfsConfiguration config = new NfsConfiguration();

  config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(config), false, config);

  FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
  COMMIT_STATUS ret;
  WriteManager wm = new WriteManager(new ShellBasedIdMapping(config), config, false);
  assertTrue(wm.addOpenFileStream(h, ctx));

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
  assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  ctx.setNextOffsetForTest((long)10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
  assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));

  status = ctx.checkCommitInternal(10, ch, 1, attr, true);
  assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10));

  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret);
  assertEquals(0, commits.size()); // commit triggered by read doesn't wait
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 11));

  // Test request with zero commit offset
  // There is one pending write [5,10]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret);
  assertEquals(0, commits.size());
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
}
项目:big-c    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with large file upload option
public void testCheckCommitFromReadLargeFileUpload() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);
  NfsConfiguration config = new NfsConfiguration();

  config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(config), false, config);

  FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
  COMMIT_STATUS ret;
  WriteManager wm = new WriteManager(new ShellBasedIdMapping(config), config, false);
  assertTrue(wm.addOpenFileStream(h, ctx));

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
  assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 6);
  ctx.setNextOffsetForTest((long)10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
  assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));

  // Test request with sequential writes
  status = ctx.checkCommitInternal(9, ch, 1, attr, true);
  assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  ret = ctx.checkCommit(dfsClient, 9, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 9));

  // Test request with non-sequential writes
  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS, ret);
  assertEquals(0, commits.size()); // commit triggered by read doesn't wait
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 16));

  // Test request with zero commit offset
  // There is one pending write [10,15]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(0, commits.size());
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));
}
项目:hadoop-2.6.0-cdh5.4.3    文件:RpcProgramNfs3.java   
public RpcProgramNfs3(NfsConfiguration config, DatagramSocket registrationSocket,
    boolean allowInsecurePorts) throws IOException {
  super("NFS3", "localhost", config.getInt(
      NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY,
      NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT), Nfs3Constant.PROGRAM,
      Nfs3Constant.VERSION, Nfs3Constant.VERSION, registrationSocket,
      allowInsecurePorts);

  this.config = config;
  config.set(FsPermission.UMASK_LABEL, "000");
  iug = new ShellBasedIdMapping(config);

  aixCompatMode = config.getBoolean(
      NfsConfigKeys.AIX_COMPAT_MODE_KEY,
      NfsConfigKeys.AIX_COMPAT_MODE_DEFAULT);
  exports = NfsExports.getInstance(config);
  writeManager = new WriteManager(iug, config, aixCompatMode);
  clientCache = new DFSClientCache(config);
  replication = (short) config.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
      DFSConfigKeys.DFS_REPLICATION_DEFAULT);
  blockSize = config.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
      DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
  bufferSize = config.getInt(
      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);

  writeDumpDir = config.get(NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_KEY,
      NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_DEFAULT);
  boolean enableDump = config.getBoolean(NfsConfigKeys.DFS_NFS_FILE_DUMP_KEY,
      NfsConfigKeys.DFS_NFS_FILE_DUMP_DEFAULT);
  UserGroupInformation.setConfiguration(config);
  SecurityUtil.login(config, NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY,
      NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY);
  superuser = config.get(NfsConfigKeys.NFS_SUPERUSER_KEY,
      NfsConfigKeys.NFS_SUPERUSER_DEFAULT);
  LOG.info("Configured HDFS superuser is " + superuser);

  if (!enableDump) {
    writeDumpDir = null;
  } else {
    clearDirectory(writeDumpDir);
  }

  rpcCallCache = new RpcCallCache("NFS3", 256);
  infoServer = new Nfs3HttpServer(config);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestOpenFileCtxCache.java   
@Test
public void testEviction() throws IOException, InterruptedException {
  NfsConfiguration conf = new NfsConfiguration();

  // Only two entries will be in the cache
  conf.setInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, 2);

  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context5 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));

  OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);

  boolean ret = cache.put(new FileHandle(1), context1);
  assertTrue(ret);
  Thread.sleep(1000);
  ret = cache.put(new FileHandle(2), context2);
  assertTrue(ret);
  ret = cache.put(new FileHandle(3), context3);
  assertFalse(ret);
  assertTrue(cache.size() == 2);

  // Wait for the oldest stream to be evict-able, insert again
  Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
  assertTrue(cache.size() == 2);

  ret = cache.put(new FileHandle(3), context3);
  assertTrue(ret);
  assertTrue(cache.size() == 2);
  assertTrue(cache.get(new FileHandle(1)) == null);

  // Test inactive entry is evicted immediately
  context3.setActiveStatusForTest(false);
  ret = cache.put(new FileHandle(4), context4);
  assertTrue(ret);

  // Now the cache has context2 and context4
  // Test eviction failure if all entries have pending work.
  context2.getPendingWritesForTest().put(new OffsetRange(0, 100),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  context4.getPendingCommitsForTest().put(new Long(100),
      new CommitCtx(0, null, 0, attr));
  Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
  ret = cache.put(new FileHandle(5), context5);
  assertFalse(ret);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestOpenFileCtxCache.java   
@Test
public void testScan() throws IOException, InterruptedException {
  NfsConfiguration conf = new NfsConfiguration();

  // Only two entries will be in the cache
  conf.setInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, 2);

  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));
  OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
      dfsClient, new ShellBasedIdMapping(new NfsConfiguration()));

  OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);

  // Test cleaning expired entry
  boolean ret = cache.put(new FileHandle(1), context1);
  assertTrue(ret);
  ret = cache.put(new FileHandle(2), context2);
  assertTrue(ret);
  Thread.sleep(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT + 1);
  cache.scan(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
  assertTrue(cache.size() == 0);

  // Test cleaning inactive entry
  ret = cache.put(new FileHandle(3), context3);
  assertTrue(ret);
  ret = cache.put(new FileHandle(4), context4);
  assertTrue(ret);
  context3.setActiveStatusForTest(false);
  cache.scan(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_DEFAULT);
  assertTrue(cache.size() == 1);
  assertTrue(cache.get(new FileHandle(3)) == null);
  assertTrue(cache.get(new FileHandle(4)) != null);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
// COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC.
public void testCheckCommit() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  NfsConfiguration conf = new NfsConfiguration();
  conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(conf), false, conf);

  COMMIT_STATUS ret;

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);

  ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  ctx.setNextOffsetForTest(10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  status = ctx.checkCommitInternal(10, ch, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  Assert.assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
  Assert.assertTrue(commits.size() == 1);
  long key = commits.firstKey();
  Assert.assertTrue(key == 11);

  // Test request with zero commit offset
  commits.remove(new Long(11));
  // There is one pending write [5,10]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
  Assert.assertTrue(commits.size() == 1);
  key = commits.firstKey();
  Assert.assertTrue(key == 9);

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with
// large file upload option.
public void testCheckCommitLargeFileUpload() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);

  NfsConfiguration conf = new NfsConfiguration();
  conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(conf), false, conf);

  COMMIT_STATUS ret;

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 8);
  ctx.setNextOffsetForTest(10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

  // Test commit sequential writes
  status = ctx.checkCommitInternal(10, ch, 1, attr, false);
  Assert.assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  // Test commit non-sequential writes
  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  Assert.assertTrue(commits.size() == 1);
  ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS);
  Assert.assertTrue(commits.size() == 1);

  // Test request with zero commit offset
  commits.remove(new Long(10));
  // There is one pending write [10,15]
  ret = ctx.checkCommitInternal(0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  ret = ctx.checkCommitInternal(9, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  Assert.assertTrue(commits.size() == 2);

  // Empty pending writes. nextOffset=10, flushed pos=8
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);

  // Empty pending writes
  ctx.setNextOffsetForTest((long) 8); // flushed pos = 8
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
  Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);

}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which
// includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
// COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC.
public void testCheckCommitFromRead() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);
  NfsConfiguration config = new NfsConfiguration();

  config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(config), false, config);

  FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
  COMMIT_STATUS ret;
  WriteManager wm = new WriteManager(new ShellBasedIdMapping(config), config, false);
  assertTrue(wm.addOpenFileStream(h, ctx));

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
  assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 10);
  ctx.setNextOffsetForTest((long)10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
  assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));

  status = ctx.checkCommitInternal(10, ch, 1, attr, true);
  assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
  ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10));

  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret);
  assertEquals(0, commits.size()); // commit triggered by read doesn't wait
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 11));

  // Test request with zero commit offset
  // There is one pending write [5,10]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret);
  assertEquals(0, commits.size());
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestWrites.java   
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with large file upload option
public void testCheckCommitFromReadLargeFileUpload() throws IOException {
  DFSClient dfsClient = Mockito.mock(DFSClient.class);
  Nfs3FileAttributes attr = new Nfs3FileAttributes();
  HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
  Mockito.when(fos.getPos()).thenReturn((long) 0);
  NfsConfiguration config = new NfsConfiguration();

  config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
  OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
      new ShellBasedIdMapping(config), false, config);

  FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
  COMMIT_STATUS ret;
  WriteManager wm = new WriteManager(new ShellBasedIdMapping(config), config, false);
  assertTrue(wm.addOpenFileStream(h, ctx));

  // Test inactive open file context
  ctx.setActiveStatusForTest(false);
  Channel ch = Mockito.mock(Channel.class);
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));

  ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
      new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
  assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));

  // Test request with non zero commit offset
  ctx.setActiveStatusForTest(true);
  Mockito.when(fos.getPos()).thenReturn((long) 6);
  ctx.setNextOffsetForTest((long)10);
  COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
  assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
  // Do_SYNC state will be updated to FINISHED after data sync
  ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));

  // Test request with sequential writes
  status = ctx.checkCommitInternal(9, ch, 1, attr, true);
  assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
  ret = ctx.checkCommit(dfsClient, 9, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 9));

  // Test request with non-sequential writes
  ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
      .getPendingCommitsForTest();
  assertTrue(commits.size() == 0);
  ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS, ret);
  assertEquals(0, commits.size()); // commit triggered by read doesn't wait
  assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 16));

  // Test request with zero commit offset
  // There is one pending write [10,15]
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(0, commits.size());
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));

  // Empty pending writes
  ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
  ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
  assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
  assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));
}