private static Token<DelegationTokenIdentifier> createToken(URI serviceUri) { byte[] pw = "hadoop".getBytes(); byte[] ident = new DelegationTokenIdentifier(new Text("owner"), new Text( "renewer"), new Text("realuser")).getBytes(); Text service = new Text(serviceUri.toString()); return new Token<DelegationTokenIdentifier>(ident, pw, HftpFileSystem.TOKEN_KIND, service); }
/** * @return a {@link HftpFileSystem} object. */ public HftpFileSystem getHftpFileSystem(int nnIndex) throws IOException { String uri = "hftp://" + nameNodes[nnIndex].conf .get(DFS_NAMENODE_HTTP_ADDRESS_KEY); try { return (HftpFileSystem)FileSystem.get(new URI(uri), conf); } catch (URISyntaxException e) { throw new IOException(e); } }
/** * @return a {@link HftpFileSystem} object as specified user. */ public HftpFileSystem getHftpFileSystemAs(final String username, final Configuration conf, final int nnIndex, final String... groups) throws IOException, InterruptedException { final UserGroupInformation ugi = UserGroupInformation.createUserForTesting( username, groups); return ugi.doAs(new PrivilegedExceptionAction<HftpFileSystem>() { @Override public HftpFileSystem run() throws Exception { return getHftpFileSystem(nnIndex); } }); }
static public Credentials getDTfromRemote(String nnAddr, String renewer) throws IOException { DataInputStream dis = null; InetSocketAddress serviceAddr = NetUtils.createSocketAddr(nnAddr); try { StringBuffer url = new StringBuffer(); if (renewer != null) { url.append(nnAddr).append(GetDelegationTokenServlet.PATH_SPEC) .append("?").append(GetDelegationTokenServlet.RENEWER).append("=") .append(renewer); } else { url.append(nnAddr).append(GetDelegationTokenServlet.PATH_SPEC); } URL remoteURL = new URL(url.toString()); URLConnection connection = SecurityUtil2.openSecureHttpConnection(remoteURL); InputStream in = connection.getInputStream(); Credentials ts = new Credentials(); dis = new DataInputStream(in); ts.readFields(dis); for (Token<?> token : ts.getAllTokens()) { token.setKind(HftpFileSystem.TOKEN_KIND); SecurityUtil2.setTokenService(token, serviceAddr); } return ts; } catch (Exception e) { throw new IOException("Unable to obtain remote token", e); } finally { if (dis != null) { dis.close(); } } }
@BeforeClass public static void setUp() throws IOException { ((Log4JLogger) HftpFileSystem.LOG).getLogger().setLevel(Level.ALL); final long seed = RAN.nextLong(); System.out.println("seed=" + seed); RAN.setSeed(seed); config = new Configuration(); cluster = new MiniDFSCluster.Builder(config).numDataNodes(2).build(); blockPoolId = cluster.getNamesystem().getBlockPoolId(); hftpUri = "hftp://" + config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY); }
@Before public void initFileSystems() throws IOException { hdfs = cluster.getFileSystem(); hftpFs = (HftpFileSystem) new Path(hftpUri).getFileSystem(config); // clear out the namespace for (FileStatus stat : hdfs.listStatus(new Path("/"))) { hdfs.delete(stat.getPath(), true); } }
@Test public void testHftpDefaultPorts() throws IOException { Configuration conf = new Configuration(); URI uri = URI.create("hftp://localhost"); HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf); assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, fs.getDefaultPort()); assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultSecurePort()); assertEquals(uri, fs.getUri()); assertEquals("127.0.0.1:" + DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getCanonicalServiceName()); }
@Test public void testHftpCustomDefaultPorts() throws IOException { Configuration conf = new Configuration(); conf.setInt("dfs.http.port", 123); conf.setInt("dfs.https.port", 456); URI uri = URI.create("hftp://localhost"); HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf); assertEquals(123, fs.getDefaultPort()); assertEquals(456, fs.getDefaultSecurePort()); assertEquals(uri, fs.getUri()); assertEquals("127.0.0.1:456", fs.getCanonicalServiceName()); }
@Test public void testHftpCustomUriPortWithDefaultPorts() throws IOException { Configuration conf = new Configuration(); URI uri = URI.create("hftp://localhost:123"); HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf); assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, fs.getDefaultPort()); assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultSecurePort()); assertEquals(uri, fs.getUri()); assertEquals("127.0.0.1:" + DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getCanonicalServiceName()); }
@Test public void testHftpCustomUriPortWithCustomDefaultPorts() throws IOException { Configuration conf = new Configuration(); conf.setInt("dfs.http.port", 123); conf.setInt("dfs.https.port", 456); URI uri = URI.create("hftp://localhost:789"); HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf); assertEquals(123, fs.getDefaultPort()); assertEquals(456, fs.getDefaultSecurePort()); assertEquals(uri, fs.getUri()); assertEquals("127.0.0.1:456", fs.getCanonicalServiceName()); }
@Test public void testHdfsDelegationToken() throws Exception { SecurityUtilTestHelper.setTokenServiceUseIp(true); final Configuration conf = new Configuration(); conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos"); UserGroupInformation.setConfiguration(conf); UserGroupInformation user = UserGroupInformation .createUserForTesting("oom", new String[]{"memory"}); Token<?> token = new Token<>(new byte[0], new byte[0], DelegationTokenIdentifier.HDFS_DELEGATION_KIND, new Text("127.0.0.1:8020")); user.addToken(token); Token<?> token2 = new Token<>(null, null, new Text("other token"), new Text("127.0.0.1:8021")); user.addToken(token2); assertEquals("wrong tokens in user", 2, user.getTokens().size()); FileSystem fs = user.doAs(new PrivilegedExceptionAction<FileSystem>() { @Override public FileSystem run() throws Exception { return FileSystem.get(new URI("hftp://localhost:50470/"), conf); } }); assertSame("wrong kind of file system", HftpFileSystem.class, fs.getClass()); Field renewToken = HftpFileSystem.class.getDeclaredField("renewToken"); renewToken.setAccessible(true); assertSame("wrong token", token, renewToken.get(fs)); }
/** * @return a {@link HftpFileSystem} object. */ public HftpFileSystem getHftpFileSystem(int nnIndex) throws IOException { String uri = "hftp://" + nameNodes[nnIndex].conf.get(DFS_NAMENODE_HTTP_ADDRESS_KEY); try { return (HftpFileSystem) FileSystem.get(new URI(uri), conf); } catch (URISyntaxException e) { throw new IOException(e); } }
/** * @return a {@link HftpFileSystem} object as specified user. */ public HftpFileSystem getHftpFileSystemAs(final String username, final Configuration conf, final int nnIndex, final String... groups) throws IOException, InterruptedException { final UserGroupInformation ugi = UserGroupInformation.createUserForTesting(username, groups); return ugi.doAs(new PrivilegedExceptionAction<HftpFileSystem>() { @Override public HftpFileSystem run() throws Exception { return getHftpFileSystem(nnIndex); } }); }
@Override protected SimpleDateFormat initialValue() { return HftpFileSystem.getDateFormat(); }
static public Credentials getDTfromRemote(URLConnectionFactory factory, URI nnUri, String renewer, String proxyUser) throws IOException { StringBuilder buf = new StringBuilder(nnUri.toString()) .append(GetDelegationTokenServlet.PATH_SPEC); String separator = "?"; if (renewer != null) { buf.append("?").append(GetDelegationTokenServlet.RENEWER).append("=") .append(renewer); separator = "&"; } if (proxyUser != null) { buf.append(separator).append("doas=").append(proxyUser); } boolean isHttps = nnUri.getScheme().equals("https"); HttpURLConnection conn = null; DataInputStream dis = null; InetSocketAddress serviceAddr = NetUtils.createSocketAddr(nnUri .getAuthority()); try { if(LOG.isDebugEnabled()) { LOG.debug("Retrieving token from: " + buf); } conn = run(factory, new URL(buf.toString())); InputStream in = conn.getInputStream(); Credentials ts = new Credentials(); dis = new DataInputStream(in); ts.readFields(dis); for (Token<?> token : ts.getAllTokens()) { token.setKind(isHttps ? HsftpFileSystem.TOKEN_KIND : HftpFileSystem.TOKEN_KIND); SecurityUtil.setTokenService(token, serviceAddr); } return ts; } catch (Exception e) { throw new IOException("Unable to obtain remote token", e); } finally { IOUtils.cleanup(LOG, dis); if (conn != null) { conn.disconnect(); } } }
private void checkTokenSelection(HftpFileSystem fs, int port, Configuration conf) throws IOException { UserGroupInformation ugi = UserGroupInformation .createUserForTesting(fs.getUri().getAuthority(), new String[]{}); // use ip-based tokens SecurityUtilTestHelper.setTokenServiceUseIp(true); // test fallback to hdfs token Token<?> hdfsToken = new Token<>(new byte[0], new byte[0], DelegationTokenIdentifier.HDFS_DELEGATION_KIND, new Text("127.0.0.1:8020")); ugi.addToken(hdfsToken); // test fallback to hdfs token Token<?> token = fs.selectDelegationToken(ugi); assertNotNull(token); assertEquals(hdfsToken, token); // test hftp is favored over hdfs Token<?> hftpToken = new Token<>(new byte[0], new byte[0], HftpFileSystem.TOKEN_KIND, new Text("127.0.0.1:" + port)); ugi.addToken(hftpToken); token = fs.selectDelegationToken(ugi); assertNotNull(token); assertEquals(hftpToken, token); // switch to using host-based tokens, no token should match SecurityUtilTestHelper.setTokenServiceUseIp(false); token = fs.selectDelegationToken(ugi); assertNull(token); // test fallback to hdfs token hdfsToken = new Token<>(new byte[0], new byte[0], DelegationTokenIdentifier.HDFS_DELEGATION_KIND, new Text("localhost:8020")); ugi.addToken(hdfsToken); token = fs.selectDelegationToken(ugi); assertNotNull(token); assertEquals(hdfsToken, token); // test hftp is favored over hdfs hftpToken = new Token<>(new byte[0], new byte[0], HftpFileSystem.TOKEN_KIND, new Text("localhost:" + port)); ugi.addToken(hftpToken); token = fs.selectDelegationToken(ugi); assertNotNull(token); assertEquals(hftpToken, token); }
@Test public void testPropagatedClose() throws IOException { ByteRangeInputStream brs = spy(new HftpFileSystem.RangeHeaderInputStream(new URL("http://test/"))); InputStream mockStream = mock(InputStream.class); doReturn(mockStream).when(brs).openInputStream(); int brisOpens = 0; int brisCloses = 0; int isCloses = 0; // first open, shouldn't close underlying stream brs.getInputStream(); verify(brs, times(++brisOpens)).openInputStream(); verify(brs, times(brisCloses)).close(); verify(mockStream, times(isCloses)).close(); // stream is open, shouldn't close underlying stream brs.getInputStream(); verify(brs, times(brisOpens)).openInputStream(); verify(brs, times(brisCloses)).close(); verify(mockStream, times(isCloses)).close(); // seek forces a reopen, should close underlying stream brs.seek(1); brs.getInputStream(); verify(brs, times(++brisOpens)).openInputStream(); verify(brs, times(brisCloses)).close(); verify(mockStream, times(++isCloses)).close(); // verify that the underlying stream isn't closed after a seek // ie. the state was correctly updated brs.getInputStream(); verify(brs, times(brisOpens)).openInputStream(); verify(brs, times(brisCloses)).close(); verify(mockStream, times(isCloses)).close(); // seeking to same location should be a no-op brs.seek(1); brs.getInputStream(); verify(brs, times(brisOpens)).openInputStream(); verify(brs, times(brisCloses)).close(); verify(mockStream, times(isCloses)).close(); // close should of course close brs.close(); verify(brs, times(++brisCloses)).close(); verify(mockStream, times(++isCloses)).close(); // it's already closed, underlying stream should not close brs.close(); verify(brs, times(++brisCloses)).close(); verify(mockStream, times(isCloses)).close(); // it's closed, don't reopen it boolean errored = false; try { brs.getInputStream(); } catch (IOException e) { errored = true; assertEquals("Stream closed", e.getMessage()); } finally { assertTrue("Read a closed steam", errored); } verify(brs, times(brisOpens)).openInputStream(); verify(brs, times(brisCloses)).close(); verify(mockStream, times(isCloses)).close(); }
@Test public void testPropagatedClose() throws IOException { URLConnectionFactory factory = mock(URLConnectionFactory.class); ByteRangeInputStream brs = spy(new HftpFileSystem.RangeHeaderInputStream( factory, new URL("http://test/"))); InputStream mockStream = mock(InputStream.class); doReturn(mockStream).when(brs).openInputStream(); int brisOpens = 0; int brisCloses = 0; int isCloses = 0; // first open, shouldn't close underlying stream brs.getInputStream(); verify(brs, times(++brisOpens)).openInputStream(); verify(brs, times(brisCloses)).close(); verify(mockStream, times(isCloses)).close(); // stream is open, shouldn't close underlying stream brs.getInputStream(); verify(brs, times(brisOpens)).openInputStream(); verify(brs, times(brisCloses)).close(); verify(mockStream, times(isCloses)).close(); // seek forces a reopen, should close underlying stream brs.seek(1); brs.getInputStream(); verify(brs, times(++brisOpens)).openInputStream(); verify(brs, times(brisCloses)).close(); verify(mockStream, times(++isCloses)).close(); // verify that the underlying stream isn't closed after a seek // ie. the state was correctly updated brs.getInputStream(); verify(brs, times(brisOpens)).openInputStream(); verify(brs, times(brisCloses)).close(); verify(mockStream, times(isCloses)).close(); // seeking to same location should be a no-op brs.seek(1); brs.getInputStream(); verify(brs, times(brisOpens)).openInputStream(); verify(brs, times(brisCloses)).close(); verify(mockStream, times(isCloses)).close(); // close should of course close brs.close(); verify(brs, times(++brisCloses)).close(); verify(mockStream, times(++isCloses)).close(); // it's already closed, underlying stream should not close brs.close(); verify(brs, times(++brisCloses)).close(); verify(mockStream, times(isCloses)).close(); // it's closed, don't reopen it boolean errored = false; try { brs.getInputStream(); } catch (IOException e) { errored = true; assertEquals("Stream closed", e.getMessage()); } finally { assertTrue("Read a closed steam", errored); } verify(brs, times(brisOpens)).openInputStream(); verify(brs, times(brisCloses)).close(); verify(mockStream, times(isCloses)).close(); }