@Test public void testUse_ExistingEntry_NoAppIds() throws Exception { // Pre-populate the SCM with one cache entry store.addResource("key1", "foo.jar"); long hits = ClientSCMMetrics.getInstance().getCacheHits(); UseSharedCacheResourceRequest request = recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class); request.setResourceKey("key1"); request.setAppId(createAppId(2, 2L)); // Expecting default depth of 3 and under the shared cache root dir String expectedPath = testDir.getAbsolutePath() + "/k/e/y/key1/foo.jar"; assertEquals(expectedPath, clientSCMProxy.use(request).getPath()); assertEquals(1, store.getResourceReferences("key1").size()); assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics .getInstance().getCacheHits() - hits); }
@Test public void testUse_ExistingEntry_OneId() throws Exception { // Pre-populate the SCM with one cache entry store.addResource("key1", "foo.jar"); store.addResourceReference("key1", new SharedCacheResourceReference(createAppId(1, 1L), "user")); assertEquals(1, store.getResourceReferences("key1").size()); long hits = ClientSCMMetrics.getInstance().getCacheHits(); // Add a new distinct appId UseSharedCacheResourceRequest request = recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class); request.setResourceKey("key1"); request.setAppId(createAppId(2, 2L)); // Expecting default depth of 3 under the shared cache root dir String expectedPath = testDir.getAbsolutePath() + "/k/e/y/key1/foo.jar"; assertEquals(expectedPath, clientSCMProxy.use(request).getPath()); assertEquals(2, store.getResourceReferences("key1").size()); assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics .getInstance().getCacheHits() - hits); }
@Test public void testUse_ExistingEntry_DupId() throws Exception { // Pre-populate the SCM with one cache entry store.addResource("key1", "foo.jar"); UserGroupInformation testUGI = UserGroupInformation.getCurrentUser(); store.addResourceReference("key1", new SharedCacheResourceReference(createAppId(1, 1L), testUGI.getShortUserName())); assertEquals(1, store.getResourceReferences("key1").size()); long hits = ClientSCMMetrics.getInstance().getCacheHits(); // Add a new duplicate appId UseSharedCacheResourceRequest request = recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class); request.setResourceKey("key1"); request.setAppId(createAppId(1, 1L)); // Expecting default depth of 3 under the shared cache root dir String expectedPath = testDir.getAbsolutePath() + "/k/e/y/key1/foo.jar"; assertEquals(expectedPath, clientSCMProxy.use(request).getPath()); assertEquals(1, store.getResourceReferences("key1").size()); assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics .getInstance().getCacheHits() - hits); }
@Override public Path use(ApplicationId applicationId, String resourceKey) throws YarnException { Path resourcePath = null; UseSharedCacheResourceRequest request = Records.newRecord( UseSharedCacheResourceRequest.class); request.setAppId(applicationId); request.setResourceKey(resourceKey); try { UseSharedCacheResourceResponse response = this.scmClient.use(request); if (response != null && response.getPath() != null) { resourcePath = new Path(response.getPath()); } } catch (Exception e) { // Just catching IOException isn't enough. // RPC call can throw ConnectionException. // We don't handle different exceptions separately at this point. throw new YarnException(e); } return resourcePath; }
@Override public UseSharedCacheResourceResponse use( UseSharedCacheResourceRequest request) throws YarnException, IOException { UseSharedCacheResourceRequestProto requestProto = ((UseSharedCacheResourceRequestPBImpl) request).getProto(); try { return new UseSharedCacheResourceResponsePBImpl(proxy.use(null, requestProto)); } catch (ServiceException e) { RPCUtil.unwrapAndThrowException(e); return null; } }
@Override public UseSharedCacheResourceResponse use( UseSharedCacheResourceRequest request) throws YarnException, IOException { UseSharedCacheResourceResponse response = recordFactory.newRecordInstance(UseSharedCacheResourceResponse.class); UserGroupInformation callerUGI; try { callerUGI = UserGroupInformation.getCurrentUser(); } catch (IOException ie) { LOG.info("Error getting UGI ", ie); throw RPCUtil.getRemoteException(ie); } String fileName = this.store.addResourceReference(request.getResourceKey(), new SharedCacheResourceReference(request.getAppId(), callerUGI.getShortUserName())); if (fileName != null) { response .setPath(getCacheEntryFilePath(request.getResourceKey(), fileName)); this.metrics.incCacheHitCount(); } else { this.metrics.incCacheMissCount(); } return response; }
@Test public void testUse_MissingEntry() throws Exception { long misses = ClientSCMMetrics.getInstance().getCacheMisses(); UseSharedCacheResourceRequest request = recordFactory.newRecordInstance(UseSharedCacheResourceRequest.class); request.setResourceKey("key1"); request.setAppId(createAppId(1, 1L)); assertNull(clientSCMProxy.use(request).getPath()); assertEquals("Client SCM metrics aren't updated.", 1, ClientSCMMetrics .getInstance().getCacheMisses() - misses); }
@Test public void testUse() throws Exception { Path file = new Path("viewfs://test/path"); UseSharedCacheResourceResponse response = new UseSharedCacheResourceResponsePBImpl(); response.setPath(file.toString()); when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenReturn( response); Path newPath = client.use(mock(ApplicationId.class), "key"); assertEquals(file, newPath); }
@Test(expected = YarnException.class) public void testUseError() throws Exception { String message = "Mock IOExcepiton!"; when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenThrow( new IOException(message)); client.use(mock(ApplicationId.class), "key"); }
/** * <p> * The interface used by clients to claim a resource with the * <code>SharedCacheManager.</code> The client uses a checksum to identify the * resource and an {@link ApplicationId} to identify which application will be * using the resource. * </p> * * <p> * The <code>SharedCacheManager</code> responds with whether or not the * resource exists in the cache. If the resource exists, a <code>Path</code> * to the resource in the shared cache is returned. If the resource does not * exist, the response is empty. * </p> * * @param request request to claim a resource in the shared cache * @return response indicating if the resource is already in the cache * @throws YarnException * @throws IOException */ public UseSharedCacheResourceResponse use( UseSharedCacheResourceRequest request) throws YarnException, IOException;