private void init(final UserGroupInformation ugi, final DelegationParam delegation, final String nnId, final UriFsPathParam path, final HttpOpParam<?> op, final Param<?, ?>... parameters) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path + ", ugi=" + ugi + Param.toSortedString(", ", parameters)); } if (nnId == null) { throw new IllegalArgumentException(NamenodeAddressParam.NAME + " is not specified."); } //clear content type response.setContentType(null); if (UserGroupInformation.isSecurityEnabled()) { //add a token for RPC. final Token<DelegationTokenIdentifier> token = deserializeToken (delegation.getValue(), nnId); ugi.addToken(token); } }
/** Handle HTTP POST request for the root for the root. */ @POST @Path("/") @Consumes({"*/*"}) @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON}) public Response postRoot( final InputStream in, @Context final UserGroupInformation ugi, @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) final DelegationParam delegation, @QueryParam(NamenodeAddressParam.NAME) @DefaultValue(NamenodeAddressParam.DEFAULT) final NamenodeAddressParam namenode, @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT) final PostOpParam op, @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) final BufferSizeParam bufferSize ) throws IOException, InterruptedException { return post(in, ugi, delegation, namenode, ROOT, op, bufferSize); }
/** Handle HTTP GET request for the root. */ @GET @Path("/") @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON}) public Response getRoot( @Context final UserGroupInformation ugi, @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) final DelegationParam delegation, @QueryParam(NamenodeAddressParam.NAME) @DefaultValue(NamenodeAddressParam.DEFAULT) final NamenodeAddressParam namenode, @QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT) final GetOpParam op, @QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT) final OffsetParam offset, @QueryParam(LengthParam.NAME) @DefaultValue(LengthParam.DEFAULT) final LengthParam length, @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) final BufferSizeParam bufferSize ) throws IOException, InterruptedException { return get(ugi, delegation, namenode, ROOT, op, offset, length, bufferSize); }
@Test public void testDeserializeHAToken() throws IOException { Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME); final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(); QueryStringDecoder decoder = new QueryStringDecoder( WebHdfsHandler.WEBHDFS_PREFIX + "/?" + NamenodeAddressParam.NAME + "=" + LOGICAL_NAME + "&" + DelegationParam.NAME + "=" + token.encodeToUrlString()); ParameterParser testParser = new ParameterParser(decoder, conf); final Token<DelegationTokenIdentifier> tok2 = testParser.delegationToken(); Assert.assertTrue(HAUtil.isTokenForLogicalUri(tok2)); }
@Test public void testDeserializeHAToken() throws IOException { Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME); final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(); QueryStringDecoder decoder = new QueryStringDecoder( WebHdfsHandler.WEBHDFS_PREFIX + "/?" + NamenodeAddressParam.NAME + "=" + LOGICAL_NAME + "&" + DelegationParam.NAME + "=" + token.encodeToUrlString()); ParameterParser testParser = new ParameterParser(decoder, conf); final Token<DelegationTokenIdentifier> tok2 = testParser.delegationToken(); Assert.assertTrue(HAUtilClient.isTokenForLogicalUri(tok2)); }
/** Handle HTTP PUT request for the root. */ @PUT @Path("/") @Consumes({"*/*"}) @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON}) public Response putRoot( final InputStream in, @Context final UserGroupInformation ugi, @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) final DelegationParam delegation, @QueryParam(NamenodeAddressParam.NAME) @DefaultValue(NamenodeAddressParam.DEFAULT) final NamenodeAddressParam namenode, @QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT) final PutOpParam op, @QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT) final PermissionParam permission, @QueryParam(OverwriteParam.NAME) @DefaultValue(OverwriteParam.DEFAULT) final OverwriteParam overwrite, @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) final BufferSizeParam bufferSize, @QueryParam(ReplicationParam.NAME) @DefaultValue(ReplicationParam.DEFAULT) final ReplicationParam replication, @QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT) final BlockSizeParam blockSize ) throws IOException, InterruptedException { return put(in, ugi, delegation, namenode, ROOT, op, permission, overwrite, bufferSize, replication, blockSize); }
/** Handle HTTP PUT request. */ @PUT @Path("{" + UriFsPathParam.NAME + ":.*}") @Consumes({"*/*"}) @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON}) public Response put( final InputStream in, @Context final UserGroupInformation ugi, @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) final DelegationParam delegation, @QueryParam(NamenodeAddressParam.NAME) @DefaultValue(NamenodeAddressParam.DEFAULT) final NamenodeAddressParam namenode, @PathParam(UriFsPathParam.NAME) final UriFsPathParam path, @QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT) final PutOpParam op, @QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT) final PermissionParam permission, @QueryParam(OverwriteParam.NAME) @DefaultValue(OverwriteParam.DEFAULT) final OverwriteParam overwrite, @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) final BufferSizeParam bufferSize, @QueryParam(ReplicationParam.NAME) @DefaultValue(ReplicationParam.DEFAULT) final ReplicationParam replication, @QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT) final BlockSizeParam blockSize ) throws IOException, InterruptedException { final String nnId = namenode.getValue(); init(ugi, delegation, nnId, path, op, permission, overwrite, bufferSize, replication, blockSize); return ugi.doAs(new PrivilegedExceptionAction<Response>() { @Override public Response run() throws IOException, URISyntaxException { return put(in, nnId, path.getAbsolutePath(), op, permission, overwrite, bufferSize, replication, blockSize); } }); }
/** Handle HTTP POST request. */ @POST @Path("{" + UriFsPathParam.NAME + ":.*}") @Consumes({"*/*"}) @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON}) public Response post( final InputStream in, @Context final UserGroupInformation ugi, @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) final DelegationParam delegation, @QueryParam(NamenodeAddressParam.NAME) @DefaultValue(NamenodeAddressParam.DEFAULT) final NamenodeAddressParam namenode, @PathParam(UriFsPathParam.NAME) final UriFsPathParam path, @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT) final PostOpParam op, @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) final BufferSizeParam bufferSize ) throws IOException, InterruptedException { final String nnId = namenode.getValue(); init(ugi, delegation, nnId, path, op, bufferSize); return ugi.doAs(new PrivilegedExceptionAction<Response>() { @Override public Response run() throws IOException { return post(in, nnId, path.getAbsolutePath(), op, bufferSize); } }); }
/** Handle HTTP GET request. */ @GET @Path("{" + UriFsPathParam.NAME + ":.*}") @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON}) public Response get( @Context final UserGroupInformation ugi, @QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT) final DelegationParam delegation, @QueryParam(NamenodeAddressParam.NAME) @DefaultValue(NamenodeAddressParam.DEFAULT) final NamenodeAddressParam namenode, @PathParam(UriFsPathParam.NAME) final UriFsPathParam path, @QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT) final GetOpParam op, @QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT) final OffsetParam offset, @QueryParam(LengthParam.NAME) @DefaultValue(LengthParam.DEFAULT) final LengthParam length, @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) final BufferSizeParam bufferSize ) throws IOException, InterruptedException { final String nnId = namenode.getValue(); init(ugi, delegation, nnId, path, op, offset, length, bufferSize); return ugi.doAs(new PrivilegedExceptionAction<Response>() { @Override public Response run() throws IOException { return get(nnId, path.getAbsolutePath(), op, offset, length, bufferSize); } }); }
String namenodeId() { return new NamenodeAddressParam(param(NamenodeAddressParam.NAME)) .getValue(); }
@Test public void testUGICacheSecure() throws Exception { // fake turning on security so api thinks it should use tokens SecurityUtil.setAuthenticationMethod(KERBEROS, conf); UserGroupInformation.setConfiguration(conf); UserGroupInformation ugi = UserGroupInformation .createRemoteUser("test-user"); ugi.setAuthenticationMethod(KERBEROS); ugi = UserGroupInformation.createProxyUser("test-proxy-user", ugi); UserGroupInformation.setLoginUser(ugi); List<Token<DelegationTokenIdentifier>> tokens = Lists.newArrayList(); getWebHdfsFileSystem(ugi, conf, tokens); String uri1 = WebHdfsFileSystem.PATH_PREFIX + PATH + "?op=OPEN" + Param.toSortedString("&", new NamenodeAddressParam("127.0.0.1:1010"), new OffsetParam((long) OFFSET), new LengthParam((long) LENGTH), new DelegationParam(tokens.get(0).encodeToUrlString())); String uri2 = WebHdfsFileSystem.PATH_PREFIX + PATH + "?op=OPEN" + Param.toSortedString("&", new NamenodeAddressParam("127.0.0.1:1010"), new OffsetParam((long) OFFSET), new LengthParam((long) LENGTH), new DelegationParam(tokens.get(1).encodeToUrlString())); DataNodeUGIProvider ugiProvider1 = new DataNodeUGIProvider( new ParameterParser(new QueryStringDecoder(URI.create(uri1)), conf)); UserGroupInformation ugi11 = ugiProvider1.ugi(); UserGroupInformation ugi12 = ugiProvider1.ugi(); Assert.assertEquals( "With UGI cache, two UGIs returned by the same token should be same", ugi11, ugi12); DataNodeUGIProvider ugiProvider2 = new DataNodeUGIProvider( new ParameterParser(new QueryStringDecoder(URI.create(uri2)), conf)); UserGroupInformation url21 = ugiProvider2.ugi(); UserGroupInformation url22 = ugiProvider2.ugi(); Assert.assertEquals( "With UGI cache, two UGIs returned by the same token should be same", url21, url22); Assert.assertNotEquals( "With UGI cache, two UGIs for the different token should not be same", ugi11, url22); awaitCacheEmptyDueToExpiration(); ugi12 = ugiProvider1.ugi(); url22 = ugiProvider2.ugi(); String msg = "With cache eviction, two UGIs returned" + " by the same token should not be same"; Assert.assertNotEquals(msg, ugi11, ugi12); Assert.assertNotEquals(msg, url21, url22); Assert.assertNotEquals( "With UGI cache, two UGIs for the different token should not be same", ugi11, url22); }
private URI redirectURI(final NameNode namenode, final UserGroupInformation ugi, final DelegationParam delegation, final UserParam username, final DoAsParam doAsUser, final String path, final HttpOpParam.Op op, final long openOffset, final long blocksize, final Param<?, ?>... parameters) throws URISyntaxException, IOException { final DatanodeInfo dn; try { dn = chooseDatanode(namenode, path, op, openOffset, blocksize); } catch (InvalidTopologyException ite) { throw new IOException("Failed to find datanode, suggest to check cluster health.", ite); } final String delegationQuery; if (!UserGroupInformation.isSecurityEnabled()) { //security disabled delegationQuery = Param.toSortedString("&", doAsUser, username); } else if (delegation.getValue() != null) { //client has provided a token delegationQuery = "&" + delegation; } else { //generate a token final Token<? extends TokenIdentifier> t = generateDelegationToken( namenode, ugi, request.getUserPrincipal().getName()); delegationQuery = "&" + new DelegationParam(t.encodeToUrlString()); } final String query = op.toQueryString() + delegationQuery + "&" + new NamenodeAddressParam(namenode) + Param.toSortedString("&", parameters); final String uripath = WebHdfsFileSystem.PATH_PREFIX + path; final String scheme = request.getScheme(); int port = "http".equals(scheme) ? dn.getInfoPort() : dn .getInfoSecurePort(); final URI uri = new URI(scheme, null, dn.getHostName(), port, uripath, query, null); if (LOG.isTraceEnabled()) { LOG.trace("redirectURI=" + uri); } return uri; }