private Map<LocalResourceProto, Path> loadStartedResources( LeveldbIterator iter, String keyPrefix) throws IOException { Map<LocalResourceProto, Path> rsrcs = new HashMap<LocalResourceProto, Path>(); while (iter.hasNext()) { Entry<byte[],byte[]> entry = iter.peekNext(); String key = asString(entry.getKey()); if (!key.startsWith(keyPrefix)) { break; } Path localPath = new Path(key.substring(keyPrefix.length())); if (LOG.isDebugEnabled()) { LOG.debug("Loading in-progress resource at " + localPath); } rsrcs.put(LocalResourceProto.parseFrom(entry.getValue()), localPath); iter.next(); } return rsrcs; }
@Override public void startResourceLocalization(String user, ApplicationId appId, LocalResourceProto proto, Path localPath) throws IOException { String key = getResourceStartedKey(user, appId, localPath.toString()); try { db.put(bytes(key), proto.toByteArray()); } catch (DBException e) { throw new IOException(e); } }
private LocalResourceTrackerState loadTrackerState(TrackerState ts) { LocalResourceTrackerState result = new LocalResourceTrackerState(); result.localizedResources.addAll(ts.localizedResources.values()); for (Map.Entry<Path, LocalResourceProto> entry : ts.inProgressMap.entrySet()) { result.inProgressResources.put(entry.getValue(), entry.getKey()); } return result; }
public LocalResourcePBImpl() { builder = LocalResourceProto.newBuilder(); }
public LocalResourcePBImpl(LocalResourceProto proto) { this.proto = proto; viaProto = true; }
public synchronized LocalResourceProto getProto() { mergeLocalToBuilder(); proto = viaProto ? proto : builder.build(); viaProto = true; return proto; }
private synchronized void maybeInitBuilder() { if (viaProto || builder == null) { builder = LocalResourceProto.newBuilder(proto); } viaProto = false; }
private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) { return new LocalResourcePBImpl(p); }
private LocalResourceProto convertToProtoFormat(LocalResource t) { return ((LocalResourcePBImpl)t).getProto(); }
private LocalResourceProto convertToProtoFormat(LocalResource rsrc) { return ((LocalResourcePBImpl)rsrc).getProto(); }
private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto rsrc) { return new LocalResourcePBImpl(rsrc); }
@Override public void startResourceLocalization(String user, ApplicationId appId, LocalResourceProto proto, Path localPath) throws IOException { }
public Map<LocalResourceProto, Path> getInProgressResources() { return inProgressResources; }
@Test @SuppressWarnings("unchecked") public void testStateStoreSuccessfulLocalization() throws Exception { final String user = "someuser"; final ApplicationId appId = ApplicationId.newInstance(1, 1); // This is a random path. NO File creation will take place at this place. final Path localDir = new Path("/tmp"); Configuration conf = new YarnConfiguration(); DrainDispatcher dispatcher = null; dispatcher = createDispatcher(conf); EventHandler<LocalizerEvent> localizerEventHandler = mock(EventHandler.class); EventHandler<LocalizerEvent> containerEventHandler = mock(EventHandler.class); dispatcher.register(LocalizerEventType.class, localizerEventHandler); dispatcher.register(ContainerEventType.class, containerEventHandler); DeletionService mockDelService = mock(DeletionService.class); NMStateStoreService stateStore = mock(NMStateStoreService.class); try { LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, appId, dispatcher, false, conf, stateStore); // Container 1 needs lr1 resource ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1); LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.APPLICATION); LocalizerContext lc1 = new LocalizerContext(user, cId1, null); // Container 1 requests lr1 to be localized ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1, LocalResourceVisibility.APPLICATION, lc1); tracker.handle(reqEvent1); dispatcher.await(); // Simulate the process of localization of lr1 Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir, null); ArgumentCaptor<LocalResourceProto> localResourceCaptor = ArgumentCaptor.forClass(LocalResourceProto.class); ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class); verify(stateStore).startResourceLocalization(eq(user), eq(appId), localResourceCaptor.capture(), pathCaptor.capture()); LocalResourceProto lrProto = localResourceCaptor.getValue(); Path localizedPath1 = pathCaptor.getValue(); Assert.assertEquals(lr1, new LocalResourceRequest(new LocalResourcePBImpl(lrProto))); Assert.assertEquals(hierarchicalPath1, localizedPath1.getParent()); // Simulate lr1 getting localized ResourceLocalizedEvent rle1 = new ResourceLocalizedEvent(lr1, pathCaptor.getValue(), 120); tracker.handle(rle1); dispatcher.await(); ArgumentCaptor<LocalizedResourceProto> localizedProtoCaptor = ArgumentCaptor.forClass(LocalizedResourceProto.class); verify(stateStore).finishResourceLocalization(eq(user), eq(appId), localizedProtoCaptor.capture()); LocalizedResourceProto localizedProto = localizedProtoCaptor.getValue(); Assert.assertEquals(lr1, new LocalResourceRequest( new LocalResourcePBImpl(localizedProto.getResource()))); Assert.assertEquals(localizedPath1.toString(), localizedProto.getLocalPath()); LocalizedResource localizedRsrc1 = tracker.getLocalizedResource(lr1); Assert.assertNotNull(localizedRsrc1); // simulate release and retention processing tracker.handle(new ResourceReleaseEvent(lr1, cId1)); dispatcher.await(); boolean removeResult = tracker.remove(localizedRsrc1, mockDelService); Assert.assertTrue(removeResult); verify(stateStore).removeLocalizedResource(eq(user), eq(appId), eq(localizedPath1)); } finally { if (dispatcher != null) { dispatcher.stop(); } } }
@Test @SuppressWarnings("unchecked") public void testStateStoreFailedLocalization() throws Exception { final String user = "someuser"; final ApplicationId appId = ApplicationId.newInstance(1, 1); // This is a random path. NO File creation will take place at this place. final Path localDir = new Path("/tmp"); Configuration conf = new YarnConfiguration(); DrainDispatcher dispatcher = null; dispatcher = createDispatcher(conf); EventHandler<LocalizerEvent> localizerEventHandler = mock(EventHandler.class); EventHandler<LocalizerEvent> containerEventHandler = mock(EventHandler.class); dispatcher.register(LocalizerEventType.class, localizerEventHandler); dispatcher.register(ContainerEventType.class, containerEventHandler); NMStateStoreService stateStore = mock(NMStateStoreService.class); try { LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, appId, dispatcher, false, conf, stateStore); // Container 1 needs lr1 resource ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1); LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.APPLICATION); LocalizerContext lc1 = new LocalizerContext(user, cId1, null); // Container 1 requests lr1 to be localized ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1, LocalResourceVisibility.APPLICATION, lc1); tracker.handle(reqEvent1); dispatcher.await(); // Simulate the process of localization of lr1 Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir, null); ArgumentCaptor<LocalResourceProto> localResourceCaptor = ArgumentCaptor.forClass(LocalResourceProto.class); ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class); verify(stateStore).startResourceLocalization(eq(user), eq(appId), localResourceCaptor.capture(), pathCaptor.capture()); LocalResourceProto lrProto = localResourceCaptor.getValue(); Path localizedPath1 = pathCaptor.getValue(); Assert.assertEquals(lr1, new LocalResourceRequest(new LocalResourcePBImpl(lrProto))); Assert.assertEquals(hierarchicalPath1, localizedPath1.getParent()); ResourceFailedLocalizationEvent rfe1 = new ResourceFailedLocalizationEvent( lr1, new Exception("Test").toString()); tracker.handle(rfe1); dispatcher.await(); verify(stateStore).removeLocalizedResource(eq(user), eq(appId), eq(localizedPath1)); } finally { if (dispatcher != null) { dispatcher.stop(); } } }
@Override public synchronized void startResourceLocalization(String user, ApplicationId appId, LocalResourceProto proto, Path localPath) { TrackerState ts = getTrackerState(new TrackerKey(user, appId)); ts.inProgressMap.put(localPath, proto); }
@Test public void testLocalResourcePBImpl() throws Exception { validatePBImplRecord(LocalResourcePBImpl.class, LocalResourceProto.class); }