/** * Initializes the service. * * @throws ServiceException thrown if the service could not be initialized. */ @Override protected void init() throws ServiceException { long updateInterval = getServiceConfig().getLong(UPDATE_INTERVAL, DAY); long maxLifetime = getServiceConfig().getLong(MAX_LIFETIME, 7 * DAY); long renewInterval = getServiceConfig().getLong(RENEW_INTERVAL, DAY); tokenKind = (HttpFSServerWebApp.get().isSslEnabled()) ? SWebHdfsFileSystem.TOKEN_KIND : WebHdfsFileSystem.TOKEN_KIND; secretManager = new DelegationTokenSecretManager(tokenKind, updateInterval, maxLifetime, renewInterval, HOUR); try { secretManager.startThreads(); } catch (IOException ex) { throw new ServiceException(ServiceException.ERROR.S12, DelegationTokenManager.class.getSimpleName(), ex.toString(), ex); } }
/** * Returns list of InetSocketAddress corresponding to HA NN HTTP addresses from * the configuration. * * @return list of InetSocketAddresses */ public static Map<String, Map<String, InetSocketAddress>> getHaNnWebHdfsAddresses( Configuration conf, String scheme) { if (WebHdfsFileSystem.SCHEME.equals(scheme)) { return getAddresses(conf, null, DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY); } else if (SWebHdfsFileSystem.SCHEME.equals(scheme)) { return getAddresses(conf, null, DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY); } else { throw new IllegalArgumentException("Unsupported scheme: " + scheme); } }
private Token<? extends TokenIdentifier> generateDelegationToken( final NameNode namenode, final UserGroupInformation ugi, final String renewer) throws IOException { final Credentials c = DelegationTokenSecretManager.createCredentials( namenode, ugi, renewer != null? renewer: ugi.getShortUserName()); if (c == null) { return null; } final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next(); Text kind = request.getScheme().equals("http") ? WebHdfsFileSystem.TOKEN_KIND : SWebHdfsFileSystem.TOKEN_KIND; t.setKind(kind); return t; }
@Test public void testSWebHdfsCustomDefaultPorts() throws IOException { URI uri = URI.create("swebhdfs://localhost"); SWebHdfsFileSystem fs = (SWebHdfsFileSystem) FileSystem.get(uri, conf); assertEquals(456, fs.getDefaultPort()); assertEquals(uri, fs.getUri()); assertEquals("127.0.0.1:456", fs.getCanonicalServiceName()); }
@Test public void testSwebHdfsCustomUriPortWithCustomDefaultPorts() throws IOException { URI uri = URI.create("swebhdfs://localhost:789"); SWebHdfsFileSystem fs = (SWebHdfsFileSystem) FileSystem.get(uri, conf); assertEquals(456, fs.getDefaultPort()); assertEquals(uri, fs.getUri()); assertEquals("127.0.0.1:789", fs.getCanonicalServiceName()); }
/** * Returns a new {@link SWebHdfsFileSystem}, with the given configuration. * * @param conf configuration * @return new SWebHdfsFileSystem */ private static SWebHdfsFileSystem createSWebHdfsFileSystem( Configuration conf) { SWebHdfsFileSystem fs = new SWebHdfsFileSystem(); fs.setConf(conf); return fs; }
@Test @TestDir public void testManagementOperationsSWebHdfsFileSystem() throws Exception { try { System.setProperty(HttpFSServerWebApp.NAME + ServerWebApp.SSL_ENABLED, "true"); testManagementOperations(SWebHdfsFileSystem.TOKEN_KIND); } finally { System.getProperties().remove(HttpFSServerWebApp.NAME + ServerWebApp.SSL_ENABLED); } }
private Token<? extends TokenIdentifier> generateDelegationToken( final NameNode namenode, final UserGroupInformation ugi, final String renewer) throws IOException { final Credentials c = DelegationTokenSecretManager.createCredentials( namenode, ugi, renewer != null? renewer: ugi.getShortUserName()); final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next(); Text kind = request.getScheme().equals("http") ? WebHdfsFileSystem.TOKEN_KIND : SWebHdfsFileSystem.TOKEN_KIND; t.setKind(kind); return t; }
@Override protected Class getFileSystemClass() { return SWebHdfsFileSystem.class; }
@Override public Text getKind() { return SWebHdfsFileSystem.TOKEN_KIND; }
private Response put( final InputStream in, final String nnId, final String fullpath, final PutOpParam op, final PermissionParam permission, final OverwriteParam overwrite, final BufferSizeParam bufferSize, final ReplicationParam replication, final BlockSizeParam blockSize ) throws IOException, URISyntaxException { final DataNode datanode = (DataNode)context.getAttribute("datanode"); switch(op.getValue()) { case CREATE: { final Configuration conf = new Configuration(datanode.getConf()); conf.set(FsPermission.UMASK_LABEL, "000"); final int b = bufferSize.getValue(conf); DFSClient dfsclient = newDfsClient(nnId, conf); FSDataOutputStream out = null; try { out = dfsclient.createWrappedOutputStream(dfsclient.create( fullpath, permission.getFsPermission(), overwrite.getValue() ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE), replication.getValue(conf), blockSize.getValue(conf), null, b, null), null); IOUtils.copyBytes(in, out, b); out.close(); out = null; dfsclient.close(); dfsclient = null; } finally { IOUtils.cleanup(LOG, out); IOUtils.cleanup(LOG, dfsclient); } final String scheme = "http".equals(request.getScheme()) ? WebHdfsFileSystem.SCHEME : SWebHdfsFileSystem.SCHEME; final URI uri = new URI(scheme, nnId, fullpath, null, null); return Response.created(uri).type(MediaType.APPLICATION_OCTET_STREAM).build(); } default: throw new UnsupportedOperationException(op + " is not supported"); } }
private Response put( final InputStream in, final String nnId, final String fullpath, final PutOpParam op, final PermissionParam permission, final OverwriteParam overwrite, final BufferSizeParam bufferSize, final ReplicationParam replication, final BlockSizeParam blockSize ) throws IOException, URISyntaxException { final DataNode datanode = (DataNode)context.getAttribute("datanode"); switch(op.getValue()) { case CREATE: { final Configuration conf = new Configuration(datanode.getConf()); conf.set(FsPermission.UMASK_LABEL, "000"); final int b = bufferSize.getValue(conf); DFSClient dfsclient = newDfsClient(nnId, conf); FSDataOutputStream out = null; try { out = new FSDataOutputStream(dfsclient.create( fullpath, permission.getFsPermission(), overwrite.getValue() ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE), replication.getValue(conf), blockSize.getValue(conf), null, b, null), null); IOUtils.copyBytes(in, out, b); out.close(); out = null; dfsclient.close(); dfsclient = null; } finally { IOUtils.cleanup(LOG, out); IOUtils.cleanup(LOG, dfsclient); } final String scheme = "http".equals(request.getScheme()) ? WebHdfsFileSystem.SCHEME : SWebHdfsFileSystem.SCHEME; final URI uri = new URI(scheme, nnId, fullpath, null, null); return Response.created(uri).type(MediaType.APPLICATION_OCTET_STREAM).build(); } default: throw new UnsupportedOperationException(op + " is not supported"); } }