@Override public Path use(ApplicationId applicationId, String resourceKey) throws YarnException { Path resourcePath = null; UseSharedCacheResourceRequest request = Records.newRecord( UseSharedCacheResourceRequest.class); request.setAppId(applicationId); request.setResourceKey(resourceKey); try { UseSharedCacheResourceResponse response = this.scmClient.use(request); if (response != null && response.getPath() != null) { resourcePath = new Path(response.getPath()); } } catch (Exception e) { // Just catching IOException isn't enough. // RPC call can throw ConnectionException. // We don't handle different exceptions separately at this point. throw new YarnException(e); } return resourcePath; }
@Override public UseSharedCacheResourceResponse use( UseSharedCacheResourceRequest request) throws YarnException, IOException { UseSharedCacheResourceRequestProto requestProto = ((UseSharedCacheResourceRequestPBImpl) request).getProto(); try { return new UseSharedCacheResourceResponsePBImpl(proxy.use(null, requestProto)); } catch (ServiceException e) { RPCUtil.unwrapAndThrowException(e); return null; } }
@Override public UseSharedCacheResourceResponse use( UseSharedCacheResourceRequest request) throws YarnException, IOException { UseSharedCacheResourceResponse response = recordFactory.newRecordInstance(UseSharedCacheResourceResponse.class); UserGroupInformation callerUGI; try { callerUGI = UserGroupInformation.getCurrentUser(); } catch (IOException ie) { LOG.info("Error getting UGI ", ie); throw RPCUtil.getRemoteException(ie); } String fileName = this.store.addResourceReference(request.getResourceKey(), new SharedCacheResourceReference(request.getAppId(), callerUGI.getShortUserName())); if (fileName != null) { response .setPath(getCacheEntryFilePath(request.getResourceKey(), fileName)); this.metrics.incCacheHitCount(); } else { this.metrics.incCacheMissCount(); } return response; }
@Test public void testUse() throws Exception { Path file = new Path("viewfs://test/path"); UseSharedCacheResourceResponse response = new UseSharedCacheResourceResponsePBImpl(); response.setPath(file.toString()); when(cProtocol.use(isA(UseSharedCacheResourceRequest.class))).thenReturn( response); Path newPath = client.use(mock(ApplicationId.class), "key"); assertEquals(file, newPath); }
/** * <p> * The interface used by clients to claim a resource with the * <code>SharedCacheManager.</code> The client uses a checksum to identify the * resource and an {@link ApplicationId} to identify which application will be * using the resource. * </p> * * <p> * The <code>SharedCacheManager</code> responds with whether or not the * resource exists in the cache. If the resource exists, a <code>Path</code> * to the resource in the shared cache is returned. If the resource does not * exist, the response is empty. * </p> * * @param request request to claim a resource in the shared cache * @return response indicating if the resource is already in the cache * @throws YarnException * @throws IOException */ public UseSharedCacheResourceResponse use( UseSharedCacheResourceRequest request) throws YarnException, IOException;