public RpcProgramMountd(NfsConfiguration config, DatagramSocket registrationSocket, boolean allowInsecurePorts) throws IOException { // Note that RPC cache is not enabled super("mountd", "localhost", config.getInt( NfsConfigKeys.DFS_NFS_MOUNTD_PORT_KEY, NfsConfigKeys.DFS_NFS_MOUNTD_PORT_DEFAULT), PROGRAM, VERSION_1, VERSION_3, registrationSocket, allowInsecurePorts); exports = new ArrayList<String>(); exports.add(config.get(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, NfsConfigKeys.DFS_NFS_EXPORT_POINT_DEFAULT)); this.hostsMatcher = NfsExports.getInstance(config); this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>()); UserGroupInformation.setConfiguration(config); SecurityUtil.login(config, NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY, NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY); this.dfsClient = new DFSClient(NameNode.getAddress(config), config); }
/** * Check stream status to decide if it should be closed * @return true, remove stream; false, keep stream */ public synchronized boolean streamCleanup(long fileId, long streamTimeout) { Preconditions .checkState(streamTimeout >= NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT); if (!activeState) { return true; } boolean flag = false; // Check the stream timeout if (checkStreamTimeout(streamTimeout)) { if (LOG.isDebugEnabled()) { LOG.debug("stream can be closed for fileId: " + fileId); } flag = true; } return flag; }
WriteManager(IdMappingServiceProvider iug, final NfsConfiguration config, boolean aixCompatMode) { this.iug = iug; this.config = config; this.aixCompatMode = aixCompatMode; streamTimeout = config.getLong(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_KEY, NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_DEFAULT); LOG.info("Stream timeout is " + streamTimeout + "ms."); if (streamTimeout < NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT) { LOG.info("Reset stream timeout to minimum value " + NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT + "ms."); streamTimeout = NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT; } maxStreams = config.getInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_DEFAULT); LOG.info("Maximum open streams is "+ maxStreams); this.fileContextCache = new OpenFileCtxCache(config, streamTimeout); }
@Override public void init(DaemonContext context) throws Exception { System.err.println("Initializing privileged NFS client socket..."); NfsConfiguration conf = new NfsConfiguration(); int clientPort = conf.getInt(NfsConfigKeys.DFS_NFS_REGISTRATION_PORT_KEY, NfsConfigKeys.DFS_NFS_REGISTRATION_PORT_DEFAULT); if (clientPort < 1 || clientPort > 1023) { throw new RuntimeException("Must start privileged NFS server with '" + NfsConfigKeys.DFS_NFS_REGISTRATION_PORT_KEY + "' configured to a " + "privileged port."); } registrationSocket = new DatagramSocket( new InetSocketAddress("localhost", clientPort)); registrationSocket.setReuseAddress(true); args = context.getArguments(); }
void start() throws IOException { final InetSocketAddress httpAddr = getHttpAddress(conf); final String httpsAddrString = conf.get( NfsConfigKeys.NFS_HTTPS_ADDRESS_KEY, NfsConfigKeys.NFS_HTTPS_ADDRESS_DEFAULT); InetSocketAddress httpsAddr = NetUtils.createSocketAddr(httpsAddrString); HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf, httpAddr, httpsAddr, "nfs3", NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY, NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY); this.httpServer = builder.build(); this.httpServer.start(); HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf); int connIdx = 0; if (policy.isHttpEnabled()) { infoPort = httpServer.getConnectorAddress(connIdx++).getPort(); } if (policy.isHttpsEnabled()) { infoSecurePort = httpServer.getConnectorAddress(connIdx).getPort(); } }
@BeforeClass public static void setUp() throws Exception { conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTP_AND_HTTPS.name()); conf.set(NfsConfigKeys.NFS_HTTP_ADDRESS_KEY, "localhost:0"); conf.set(NfsConfigKeys.NFS_HTTPS_ADDRESS_KEY, "localhost:0"); // Use emphral port in case tests are running in parallel conf.setInt(NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY, 0); conf.setInt(NfsConfigKeys.DFS_NFS_MOUNTD_PORT_KEY, 0); File base = new File(BASEDIR); FileUtil.fullyDelete(base); base.mkdirs(); keystoresDir = new File(BASEDIR).getAbsolutePath(); sslConfDir = KeyStoreTestUtil.getClasspathDir(TestNfs3HttpServer.class); KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); }
public RpcProgramMountd(NfsConfiguration config, DatagramSocket registrationSocket, boolean allowInsecurePorts) throws IOException { // Note that RPC cache is not enabled super("mountd", "localhost", config.getInt( NfsConfigKeys.DFS_NFS_MOUNTD_PORT_KEY, NfsConfigKeys.DFS_NFS_MOUNTD_PORT_DEFAULT), PROGRAM, VERSION_1, VERSION_3, registrationSocket, allowInsecurePorts); exports = new ArrayList<String>(); exports.add(config.get(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, NfsConfigKeys.DFS_NFS_EXPORT_POINT_DEFAULT)); this.hostsMatcher = NfsExports.getInstance(config); this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>()); UserGroupInformation.setConfiguration(config); SecurityUtil.login(config, NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY, NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY); this.dfsClient = new DFSClient(DFSUtilClient.getNNAddress(config), config); }
OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, String dumpFilePath, DFSClient client, IdMappingServiceProvider iug, boolean aixCompatMode, NfsConfiguration config) { this.fos = fos; this.latestAttr = latestAttr; this.aixCompatMode = aixCompatMode; // We use the ReverseComparatorOnMin as the comparator of the map. In this // way, we first dump the data with larger offset. In the meanwhile, we // retrieve the last element to write back to HDFS. pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>( OffsetRange.ReverseComparatorOnMin); pendingCommits = new ConcurrentSkipListMap<Long, CommitCtx>(); updateLastAccessTime(); activeState = true; asyncStatus = false; asyncWriteBackStartOffset = 0; dumpOut = null; raf = null; nonSequentialWriteInMemory = new AtomicLong(0); this.dumpFilePath = dumpFilePath; enabledDump = dumpFilePath != null; nextOffset = new AtomicLong(); nextOffset.set(latestAttr.getSize()); try { assert(nextOffset.get() == this.fos.getPos()); } catch (IOException e) {} dumpThread = null; this.client = client; this.iug = iug; this.uploadLargeFile = config.getBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, NfsConfigKeys.LARGE_FILE_UPLOAD_DEFAULT); }
public static RpcProgramNfs3 createRpcProgramNfs3(NfsConfiguration config, DatagramSocket registrationSocket, boolean allowInsecurePorts) throws IOException { DefaultMetricsSystem.initialize("Nfs3"); String displayName = DNS.getDefaultHost("default", "default") + config.getInt(NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY, NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT); metrics = Nfs3Metrics.create(config, displayName); return new RpcProgramNfs3(config, registrationSocket, allowInsecurePorts); }
static void startService(String[] args, DatagramSocket registrationSocket) throws IOException { StringUtils.startupShutdownMessage(Nfs3.class, args, LOG); NfsConfiguration conf = new NfsConfiguration(); boolean allowInsecurePorts = conf.getBoolean( NfsConfigKeys.DFS_NFS_PORT_MONITORING_DISABLED_KEY, NfsConfigKeys.DFS_NFS_PORT_MONITORING_DISABLED_DEFAULT); final Nfs3 nfsServer = new Nfs3(conf, registrationSocket, allowInsecurePorts); nfsServer.startServiceInternal(true); }
OpenFileCtxCache(NfsConfiguration config, long streamTimeout) { maxStreams = config.getInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_DEFAULT); LOG.info("Maximum open streams is " + maxStreams); this.streamTimeout = streamTimeout; streamMonitor = new StreamMonitor(); }
public static Nfs3Metrics create(Configuration conf, String gatewayName) { String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY); MetricsSystem ms = DefaultMetricsSystem.instance(); JvmMetrics jm = JvmMetrics.create(gatewayName, sessionId, ms); // Percentile measurement is [50th,75th,90th,95th,99th] currently int[] intervals = conf .getInts(NfsConfigKeys.NFS_METRICS_PERCENTILES_INTERVALS_KEY); return ms.register(new Nfs3Metrics(gatewayName, sessionId, intervals, jm)); }
public static void main(String[] args) throws InterruptedException { Arrays.fill(data1, (byte) 7); Arrays.fill(data2, (byte) 8); Arrays.fill(data3, (byte) 9); // NFS3 Create request NfsConfiguration conf = new NfsConfiguration(); WriteClient client = new WriteClient("localhost", conf.getInt( NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY, NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT), create(), false); client.run(); while (handle == null) { Thread.sleep(1000); System.out.println("handle is still null..."); } LOG.info("Send write1 request"); XDR writeReq; writeReq = write(handle, 0x8000005c, 2000, 1000, data3); Nfs3Utils.writeChannel(channel, writeReq, 1); writeReq = write(handle, 0x8000005d, 1000, 1000, data2); Nfs3Utils.writeChannel(channel, writeReq, 2); writeReq = write(handle, 0x8000005e, 0, 1000, data1); Nfs3Utils.writeChannel(channel, writeReq, 3); // TODO: convert to Junit test, and validate result automatically }
@Test public void testCheckCommitAixCompatMode() throws IOException { DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); NfsConfiguration conf = new NfsConfiguration(); conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false); // Enable AIX compatibility mode. OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(new NfsConfiguration()), true, conf); // Test fall-through to pendingWrites check in the event that commitOffset // is greater than the number of bytes we've so far flushed. Mockito.when(fos.getPos()).thenReturn((long) 2); COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_FINISHED); // Test the case when we actually have received more bytes than we're trying // to commit. ctx.getPendingWritesForTest().put(new OffsetRange(0, 10), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); Mockito.when(fos.getPos()).thenReturn((long) 10); ctx.setNextOffsetForTest((long)10); status = ctx.checkCommitInternal(5, null, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); }
@Test public void testCheckSequential() throws IOException { DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); Mockito.when(fos.getPos()).thenReturn((long) 0); NfsConfiguration config = new NfsConfiguration(); config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false); OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(config), false, config); ctx.getPendingWritesForTest().put(new OffsetRange(5, 10), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ctx.getPendingWritesForTest().put(new OffsetRange(10, 15), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ctx.getPendingWritesForTest().put(new OffsetRange(20, 25), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); assertTrue(!ctx.checkSequential(5, 4)); assertTrue(ctx.checkSequential(9, 5)); assertTrue(ctx.checkSequential(10, 5)); assertTrue(ctx.checkSequential(14, 5)); assertTrue(!ctx.checkSequential(15, 5)); assertTrue(!ctx.checkSequential(20, 5)); assertTrue(!ctx.checkSequential(25, 5)); assertTrue(!ctx.checkSequential(999, 5)); }
@Test public void testExportPoint() throws IOException { NfsConfiguration config = new NfsConfiguration(); MiniDFSCluster cluster = null; String exportPoint = "/myexport1"; config.setStrings(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, exportPoint); // Use emphral port in case tests are running in parallel config.setInt("nfs3.mountd.port", 0); config.setInt("nfs3.server.port", 0); try { cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); cluster.waitActive(); // Start nfs final Nfs3 nfsServer = new Nfs3(config); nfsServer.startServiceInternal(false); Mountd mountd = nfsServer.getMountd(); RpcProgramMountd rpcMount = (RpcProgramMountd) mountd.getRpcProgram(); assertTrue(rpcMount.getExports().size() == 1); String exportInMountd = rpcMount.getExports().get(0); assertTrue(exportInMountd.equals(exportPoint)); } finally { if (cluster != null) { cluster.shutdown(); } } }
@Test public void testDeprecatedKeys() { NfsConfiguration conf = new NfsConfiguration(); conf.setInt("nfs3.server.port", 998); assertTrue(conf.getInt(NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY, 0) == 998); conf.setInt("nfs3.mountd.port", 999); assertTrue(conf.getInt(NfsConfigKeys.DFS_NFS_MOUNTD_PORT_KEY, 0) == 999); conf.set("dfs.nfs.exports.allowed.hosts", "host1"); assertTrue(conf.get(CommonConfigurationKeys.NFS_EXPORTS_ALLOWED_HOSTS_KEY) .equals("host1")); conf.setInt("dfs.nfs.exports.cache.expirytime.millis", 1000); assertTrue(conf.getInt( Nfs3Constant.NFS_EXPORTS_CACHE_EXPIRYTIME_MILLIS_KEY, 0) == 1000); conf.setInt("hadoop.nfs.userupdate.milly", 10); assertTrue(conf.getInt(IdMappingConstant.USERGROUPID_UPDATE_MILLIS_KEY, 0) == 10); conf.set("dfs.nfs3.dump.dir", "/nfs/tmp"); assertTrue(conf.get(NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_KEY).equals( "/nfs/tmp")); conf.setBoolean("dfs.nfs3.enableDump", false); assertTrue(conf.getBoolean(NfsConfigKeys.DFS_NFS_FILE_DUMP_KEY, true) == false); conf.setInt("dfs.nfs3.max.open.files", 500); assertTrue(conf.getInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, 0) == 500); conf.setInt("dfs.nfs3.stream.timeout", 6000); assertTrue(conf.getInt(NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_KEY, 0) == 6000); conf.set("dfs.nfs3.export.point", "/dir1"); assertTrue(conf.get(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY).equals("/dir1")); }
OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, String dumpFilePath, DFSClient client, IdMappingServiceProvider iug, boolean aixCompatMode, NfsConfiguration config) { this.fos = fos; this.latestAttr = latestAttr; this.aixCompatMode = aixCompatMode; // We use the ReverseComparatorOnMin as the comparator of the map. In this // way, we first dump the data with larger offset. In the meanwhile, we // retrieve the last element to write back to HDFS. pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>( OffsetRange.ReverseComparatorOnMin); pendingCommits = new ConcurrentSkipListMap<Long, CommitCtx>(); updateLastAccessTime(); activeState = true; asyncStatus = false; asyncWriteBackStartOffset = 0; dumpOut = null; raf = null; nonSequentialWriteInMemory = new AtomicLong(0); this.dumpFilePath = dumpFilePath; enabledDump = dumpFilePath != null; nextOffset = new AtomicLong(); nextOffset.set(latestAttr.getSize()); assert(nextOffset.get() == this.fos.getPos()); dumpThread = null; this.client = client; this.iug = iug; this.uploadLargeFile = config.getBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, NfsConfigKeys.LARGE_FILE_UPLOAD_DEFAULT); }