public RpcProgramMountd(NfsConfiguration config, DatagramSocket registrationSocket, boolean allowInsecurePorts) throws IOException { // Note that RPC cache is not enabled super("mountd", "localhost", config.getInt( NfsConfigKeys.DFS_NFS_MOUNTD_PORT_KEY, NfsConfigKeys.DFS_NFS_MOUNTD_PORT_DEFAULT), PROGRAM, VERSION_1, VERSION_3, registrationSocket, allowInsecurePorts); exports = new ArrayList<String>(); exports.add(config.get(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, NfsConfigKeys.DFS_NFS_EXPORT_POINT_DEFAULT)); this.hostsMatcher = NfsExports.getInstance(config); this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>()); UserGroupInformation.setConfiguration(config); SecurityUtil.login(config, NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY, NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY); this.dfsClient = new DFSClient(NameNode.getAddress(config), config); }
WriteManager(IdMappingServiceProvider iug, final NfsConfiguration config, boolean aixCompatMode) { this.iug = iug; this.config = config; this.aixCompatMode = aixCompatMode; streamTimeout = config.getLong(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_KEY, NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_DEFAULT); LOG.info("Stream timeout is " + streamTimeout + "ms."); if (streamTimeout < NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT) { LOG.info("Reset stream timeout to minimum value " + NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT + "ms."); streamTimeout = NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT; } maxStreams = config.getInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_DEFAULT); LOG.info("Maximum open streams is "+ maxStreams); this.fileContextCache = new OpenFileCtxCache(config, streamTimeout); }
DFSClientCache(NfsConfiguration config, int clientCache) { this.config = config; this.clientCache = CacheBuilder.newBuilder() .maximumSize(clientCache) .removalListener(clientRemovalListener()) .build(clientLoader()); this.inputstreamCache = CacheBuilder.newBuilder() .maximumSize(DEFAULT_DFS_INPUTSTREAM_CACHE_SIZE) .expireAfterAccess(DEFAULT_DFS_INPUTSTREAM_CACHE_TTL, TimeUnit.SECONDS) .removalListener(inputStreamRemovalListener()) .build(inputStreamLoader()); ShutdownHookManager.get().addShutdownHook(new CacheFinalizer(), SHUTDOWN_HOOK_PRIORITY); }
@Override public void init(DaemonContext context) throws Exception { System.err.println("Initializing privileged NFS client socket..."); NfsConfiguration conf = new NfsConfiguration(); int clientPort = conf.getInt(NfsConfigKeys.DFS_NFS_REGISTRATION_PORT_KEY, NfsConfigKeys.DFS_NFS_REGISTRATION_PORT_DEFAULT); if (clientPort < 1 || clientPort > 1023) { throw new RuntimeException("Must start privileged NFS server with '" + NfsConfigKeys.DFS_NFS_REGISTRATION_PORT_KEY + "' configured to a " + "privileged port."); } registrationSocket = new DatagramSocket( new InetSocketAddress("localhost", clientPort)); registrationSocket.setReuseAddress(true); args = context.getArguments(); }
@Test public void testStart() throws IOException { // Start minicluster NfsConfiguration config = new NfsConfiguration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(config).numDataNodes(1) .build(); cluster.waitActive(); // Use emphral port in case tests are running in parallel config.setInt("nfs3.mountd.port", 0); config.setInt("nfs3.server.port", 0); // Start nfs Nfs3 nfs3 = new Nfs3(config); nfs3.startServiceInternal(false); RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountd() .getRpcProgram(); mountd.nullOp(new XDR(), 1234, InetAddress.getByName("localhost")); RpcProgramNfs3 nfsd = (RpcProgramNfs3) nfs3.getRpcProgram(); nfsd.nullProcedure(); cluster.shutdown(); }
@Test public void testEviction() throws IOException { NfsConfiguration conf = new NfsConfiguration(); conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost"); // Only one entry will be in the cache final int MAX_CACHE_SIZE = 1; DFSClientCache cache = new DFSClientCache(conf, MAX_CACHE_SIZE); DFSClient c1 = cache.getDfsClient("test1"); assertTrue(cache.getDfsClient("test1").toString().contains("ugi=test1")); assertEquals(c1, cache.getDfsClient("test1")); assertFalse(isDfsClientClose(c1)); cache.getDfsClient("test2"); assertTrue(isDfsClientClose(c1)); assertTrue("cache size should be the max size or less", cache.clientCache.size() <= MAX_CACHE_SIZE); }
@Test public void testGetUserGroupInformationSecure() throws IOException { String userName = "user1"; String currentUser = "test-user"; NfsConfiguration conf = new NfsConfiguration(); UserGroupInformation currentUserUgi = UserGroupInformation.createRemoteUser(currentUser); currentUserUgi.setAuthenticationMethod(KERBEROS); UserGroupInformation.setLoginUser(currentUserUgi); DFSClientCache cache = new DFSClientCache(conf); UserGroupInformation ugiResult = cache.getUserGroupInformation(userName, currentUserUgi); assertThat(ugiResult.getUserName(), is(userName)); assertThat(ugiResult.getRealUser(), is(currentUserUgi)); assertThat( ugiResult.getAuthenticationMethod(), is(UserGroupInformation.AuthenticationMethod.PROXY)); }
@Test public void testGetUserGroupInformation() throws IOException { String userName = "user1"; String currentUser = "currentUser"; UserGroupInformation currentUserUgi = UserGroupInformation .createUserForTesting(currentUser, new String[0]); NfsConfiguration conf = new NfsConfiguration(); conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost"); DFSClientCache cache = new DFSClientCache(conf); UserGroupInformation ugiResult = cache.getUserGroupInformation(userName, currentUserUgi); assertThat(ugiResult.getUserName(), is(userName)); assertThat(ugiResult.getRealUser(), is(currentUserUgi)); assertThat( ugiResult.getAuthenticationMethod(), is(UserGroupInformation.AuthenticationMethod.PROXY)); }
public RpcProgramMountd(NfsConfiguration config, DatagramSocket registrationSocket, boolean allowInsecurePorts) throws IOException { // Note that RPC cache is not enabled super("mountd", "localhost", config.getInt( NfsConfigKeys.DFS_NFS_MOUNTD_PORT_KEY, NfsConfigKeys.DFS_NFS_MOUNTD_PORT_DEFAULT), PROGRAM, VERSION_1, VERSION_3, registrationSocket, allowInsecurePorts); exports = new ArrayList<String>(); exports.add(config.get(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, NfsConfigKeys.DFS_NFS_EXPORT_POINT_DEFAULT)); this.hostsMatcher = NfsExports.getInstance(config); this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>()); UserGroupInformation.setConfiguration(config); SecurityUtil.login(config, NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY, NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY); this.dfsClient = new DFSClient(DFSUtilClient.getNNAddress(config), config); }
@Test public void testEviction() throws IOException { NfsConfiguration conf = new NfsConfiguration(); conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost"); // Only one entry will be in the cache final int MAX_CACHE_SIZE = 2; DFSClientCache cache = new DFSClientCache(conf, MAX_CACHE_SIZE); DFSClient c1 = cache.getDfsClient("test1"); assertTrue(cache.getDfsClient("test1").toString().contains("ugi=test1")); assertEquals(c1, cache.getDfsClient("test1")); assertFalse(isDfsClientClose(c1)); cache.getDfsClient("test2"); assertTrue(isDfsClientClose(c1)); assertEquals(MAX_CACHE_SIZE - 1, cache.clientCache.size()); }
OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, String dumpFilePath, DFSClient client, IdMappingServiceProvider iug, boolean aixCompatMode, NfsConfiguration config) { this.fos = fos; this.latestAttr = latestAttr; this.aixCompatMode = aixCompatMode; // We use the ReverseComparatorOnMin as the comparator of the map. In this // way, we first dump the data with larger offset. In the meanwhile, we // retrieve the last element to write back to HDFS. pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>( OffsetRange.ReverseComparatorOnMin); pendingCommits = new ConcurrentSkipListMap<Long, CommitCtx>(); updateLastAccessTime(); activeState = true; asyncStatus = false; asyncWriteBackStartOffset = 0; dumpOut = null; raf = null; nonSequentialWriteInMemory = new AtomicLong(0); this.dumpFilePath = dumpFilePath; enabledDump = dumpFilePath != null; nextOffset = new AtomicLong(); nextOffset.set(latestAttr.getSize()); try { assert(nextOffset.get() == this.fos.getPos()); } catch (IOException e) {} dumpThread = null; this.client = client; this.iug = iug; this.uploadLargeFile = config.getBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, NfsConfigKeys.LARGE_FILE_UPLOAD_DEFAULT); }
public static RpcProgramNfs3 createRpcProgramNfs3(NfsConfiguration config, DatagramSocket registrationSocket, boolean allowInsecurePorts) throws IOException { DefaultMetricsSystem.initialize("Nfs3"); String displayName = DNS.getDefaultHost("default", "default") + config.getInt(NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY, NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT); metrics = Nfs3Metrics.create(config, displayName); return new RpcProgramNfs3(config, registrationSocket, allowInsecurePorts); }
static void startService(String[] args, DatagramSocket registrationSocket) throws IOException { StringUtils.startupShutdownMessage(Nfs3.class, args, LOG); NfsConfiguration conf = new NfsConfiguration(); boolean allowInsecurePorts = conf.getBoolean( NfsConfigKeys.DFS_NFS_PORT_MONITORING_DISABLED_KEY, NfsConfigKeys.DFS_NFS_PORT_MONITORING_DISABLED_DEFAULT); final Nfs3 nfsServer = new Nfs3(conf, registrationSocket, allowInsecurePorts); nfsServer.startServiceInternal(true); }
OpenFileCtxCache(NfsConfiguration config, long streamTimeout) { maxStreams = config.getInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_DEFAULT); LOG.info("Maximum open streams is " + maxStreams); this.streamTimeout = streamTimeout; streamMonitor = new StreamMonitor(); }
public static void main(String[] args) throws InterruptedException { Arrays.fill(data1, (byte) 7); Arrays.fill(data2, (byte) 8); Arrays.fill(data3, (byte) 9); // NFS3 Create request NfsConfiguration conf = new NfsConfiguration(); WriteClient client = new WriteClient("localhost", conf.getInt( NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY, NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT), create(), false); client.run(); while (handle == null) { Thread.sleep(1000); System.out.println("handle is still null..."); } LOG.info("Send write1 request"); XDR writeReq; writeReq = write(handle, 0x8000005c, 2000, 1000, data3); Nfs3Utils.writeChannel(channel, writeReq, 1); writeReq = write(handle, 0x8000005d, 1000, 1000, data2); Nfs3Utils.writeChannel(channel, writeReq, 2); writeReq = write(handle, 0x8000005e, 0, 1000, data1); Nfs3Utils.writeChannel(channel, writeReq, 3); // TODO: convert to Junit test, and validate result automatically }
@Test public void testCheckCommitAixCompatMode() throws IOException { DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); NfsConfiguration conf = new NfsConfiguration(); conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false); // Enable AIX compatibility mode. OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(new NfsConfiguration()), true, conf); // Test fall-through to pendingWrites check in the event that commitOffset // is greater than the number of bytes we've so far flushed. Mockito.when(fos.getPos()).thenReturn((long) 2); COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_FINISHED); // Test the case when we actually have received more bytes than we're trying // to commit. ctx.getPendingWritesForTest().put(new OffsetRange(0, 10), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); Mockito.when(fos.getPos()).thenReturn((long) 10); ctx.setNextOffsetForTest((long)10); status = ctx.checkCommitInternal(5, null, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); }
@Test public void testCheckSequential() throws IOException { DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); Mockito.when(fos.getPos()).thenReturn((long) 0); NfsConfiguration config = new NfsConfiguration(); config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false); OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(config), false, config); ctx.getPendingWritesForTest().put(new OffsetRange(5, 10), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ctx.getPendingWritesForTest().put(new OffsetRange(10, 15), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ctx.getPendingWritesForTest().put(new OffsetRange(20, 25), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); assertTrue(!ctx.checkSequential(5, 4)); assertTrue(ctx.checkSequential(9, 5)); assertTrue(ctx.checkSequential(10, 5)); assertTrue(ctx.checkSequential(14, 5)); assertTrue(!ctx.checkSequential(15, 5)); assertTrue(!ctx.checkSequential(20, 5)); assertTrue(!ctx.checkSequential(25, 5)); assertTrue(!ctx.checkSequential(999, 5)); }
@Test public void testExportPoint() throws IOException { NfsConfiguration config = new NfsConfiguration(); MiniDFSCluster cluster = null; String exportPoint = "/myexport1"; config.setStrings(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, exportPoint); // Use emphral port in case tests are running in parallel config.setInt("nfs3.mountd.port", 0); config.setInt("nfs3.server.port", 0); try { cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); cluster.waitActive(); // Start nfs final Nfs3 nfsServer = new Nfs3(config); nfsServer.startServiceInternal(false); Mountd mountd = nfsServer.getMountd(); RpcProgramMountd rpcMount = (RpcProgramMountd) mountd.getRpcProgram(); assertTrue(rpcMount.getExports().size() == 1); String exportInMountd = rpcMount.getExports().get(0); assertTrue(exportInMountd.equals(exportPoint)); } finally { if (cluster != null) { cluster.shutdown(); } } }