@Test public void testRelease_ExistingEntry_NonExistantAppId() throws Exception { // Pre-populate the SCM with one cache entry store.addResource("key1", "foo.jar"); store.addResourceReference("key1", new SharedCacheResourceReference(createAppId(1, 1L), "user")); assertEquals(1, store.getResourceReferences("key1").size()); long releases = ClientSCMMetrics.getInstance().getCacheReleases(); ReleaseSharedCacheResourceRequest request = recordFactory .newRecordInstance(ReleaseSharedCacheResourceRequest.class); request.setResourceKey("key1"); request.setAppId(createAppId(2, 2L)); clientSCMProxy.release(request); assertEquals(1, store.getResourceReferences("key1").size()); assertEquals( "Client SCM metrics were updated when a release did not happen", 0, ClientSCMMetrics.getInstance().getCacheReleases() - releases); }
@Test public void testRelease_ExistingEntry_WithAppId() throws Exception { // Pre-populate the SCM with one cache entry store.addResource("key1", "foo.jar"); UserGroupInformation testUGI = UserGroupInformation.getCurrentUser(); store.addResourceReference("key1", new SharedCacheResourceReference(createAppId(1, 1L), testUGI.getShortUserName())); assertEquals(1, store.getResourceReferences("key1").size()); long releases = ClientSCMMetrics.getInstance().getCacheReleases(); ReleaseSharedCacheResourceRequest request = recordFactory .newRecordInstance(ReleaseSharedCacheResourceRequest.class); request.setResourceKey("key1"); request.setAppId(createAppId(1, 1L)); clientSCMProxy.release(request); assertEquals(0, store.getResourceReferences("key1").size()); assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics .getInstance().getCacheReleases() - releases); }
@Test public void testRelease_MissingEntry() throws Exception { long releases = ClientSCMMetrics.getInstance().getCacheReleases(); ReleaseSharedCacheResourceRequest request = recordFactory .newRecordInstance(ReleaseSharedCacheResourceRequest.class); request.setResourceKey("key2"); request.setAppId(createAppId(2, 2L)); clientSCMProxy.release(request); assertNotNull(store.getResourceReferences("key2")); assertEquals(0, store.getResourceReferences("key2").size()); assertEquals( "Client SCM metrics were updated when a release did not happen.", 0, ClientSCMMetrics.getInstance().getCacheReleases() - releases); }
@Override public void release(ApplicationId applicationId, String resourceKey) throws YarnException { ReleaseSharedCacheResourceRequest request = Records.newRecord( ReleaseSharedCacheResourceRequest.class); request.setAppId(applicationId); request.setResourceKey(resourceKey); try { // We do not care about the response because it is empty. this.scmClient.release(request); } catch (Exception e) { // Just catching IOException isn't enough. // RPC call can throw ConnectionException. throw new YarnException(e); } }
@Override public ReleaseSharedCacheResourceResponse release( ReleaseSharedCacheResourceRequest request) throws YarnException, IOException { ReleaseSharedCacheResourceRequestProto requestProto = ((ReleaseSharedCacheResourceRequestPBImpl) request).getProto(); try { return new ReleaseSharedCacheResourceResponsePBImpl(proxy.release(null, requestProto)); } catch (ServiceException e) { RPCUtil.unwrapAndThrowException(e); return null; } }
@Override public ReleaseSharedCacheResourceResponse release( ReleaseSharedCacheResourceRequest request) throws YarnException, IOException { ReleaseSharedCacheResourceResponse response = recordFactory .newRecordInstance(ReleaseSharedCacheResourceResponse.class); UserGroupInformation callerUGI; try { callerUGI = UserGroupInformation.getCurrentUser(); } catch (IOException ie) { LOG.info("Error getting UGI ", ie); throw RPCUtil.getRemoteException(ie); } boolean removed = this.store.removeResourceReference( request.getResourceKey(), new SharedCacheResourceReference(request.getAppId(), callerUGI .getShortUserName()), true); if (removed) { this.metrics.incCacheRelease(); } return response; }
@Test public void testRelease() throws Exception { // Release does not care about the return value because it is empty when(cProtocol.release(isA(ReleaseSharedCacheResourceRequest.class))) .thenReturn(null); client.release(mock(ApplicationId.class), "key"); }
@Test(expected = YarnException.class) public void testReleaseError() throws Exception { String message = "Mock IOExcepiton!"; when(cProtocol.release(isA(ReleaseSharedCacheResourceRequest.class))) .thenThrow(new IOException(message)); client.release(mock(ApplicationId.class), "key"); }
/** * <p> * The interface used by clients to release a resource with the * <code>SharedCacheManager.</code> This method is called once an application * is no longer using a claimed resource in the shared cache. The client uses * a checksum to identify the resource and an {@link ApplicationId} to * identify which application is releasing the resource. * </p> * * <p> * Note: This method is an optimization and the client is not required to call * it for correctness. * </p> * * <p> * Currently the <code>SharedCacheManager</code> sends an empty response. * </p> * * @param request request to release a resource in the shared cache * @return (empty) response on releasing the resource * @throws YarnException * @throws IOException */ public ReleaseSharedCacheResourceResponse release( ReleaseSharedCacheResourceRequest request) throws YarnException, IOException;