@Override public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { AllocateRequestProto requestProto = ((AllocateRequestPBImpl) request).getProto(); try { return new AllocateResponsePBImpl(proxy.allocate(null, requestProto)); } catch (ServiceException e) { RPCUtil.unwrapAndThrowException(e); return null; } }
@Test(timeout=1200000) public void testProgressFilter() throws Exception{ MockRM rm = new MockRM(conf); rm.start(); // Register node1 MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); // Submit an application RMApp app1 = rm.submitApp(2048); nm1.nodeHeartbeat(true); RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); am1.registerAppAttempt(); AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl(); List<ContainerId> release = new ArrayList<ContainerId>(); List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); allocateRequest.setReleaseList(release); allocateRequest.setAskList(ask); allocateRequest.setProgress(Float.POSITIVE_INFINITY); am1.allocate(allocateRequest); while(attempt1.getProgress()!=1){ LOG.info("Waiting for allocate event to be handled ..."); sleep(100); } allocateRequest.setProgress(Float.NaN); am1.allocate(allocateRequest); while(attempt1.getProgress()!=0){ LOG.info("Waiting for allocate event to be handled ..."); sleep(100); } allocateRequest.setProgress((float)9); am1.allocate(allocateRequest); while(attempt1.getProgress()!=1){ LOG.info("Waiting for allocate event to be handled ..."); sleep(100); } allocateRequest.setProgress(Float.NEGATIVE_INFINITY); am1.allocate(allocateRequest); while(attempt1.getProgress()!=0){ LOG.info("Waiting for allocate event to be handled ..."); sleep(100); } allocateRequest.setProgress((float)0.5); am1.allocate(allocateRequest); while(attempt1.getProgress()!=0.5){ LOG.info("Waiting for allocate event to be handled ..."); sleep(100); } allocateRequest.setProgress((float)-1); am1.allocate(allocateRequest); while(attempt1.getProgress()!=0){ LOG.info("Waiting for allocate event to be handled ..."); sleep(100); } }
@Test public void testAllocateRequestPBImpl() throws Exception { validatePBImplRecord(AllocateRequestPBImpl.class, AllocateRequestProto.class); }
@Test(timeout = 300000) public void testPriorityInAllocatedResponse() throws Exception { conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); // Set Max Application Priority as 10 conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); MockRM rm = new MockRM(conf); rm.start(); // Register node1 MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); // Submit an application Priority appPriority1 = Priority.newInstance(5); RMApp app1 = rm.submitApp(2048, appPriority1); nm1.nodeHeartbeat(true); RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); am1.registerAppAttempt(); AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl(); List<ContainerId> release = new ArrayList<ContainerId>(); List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); allocateRequest.setReleaseList(release); allocateRequest.setAskList(ask); AllocateResponse response1 = am1.allocate(allocateRequest); Assert.assertEquals(appPriority1, response1.getApplicationPriority()); // get scheduler CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); // Change the priority of App1 to 8 Priority appPriority2 = Priority.newInstance(8); cs.updateApplicationPriority(appPriority2, app1.getApplicationId()); AllocateResponse response2 = am1.allocate(allocateRequest); Assert.assertEquals(appPriority2, response2.getApplicationPriority()); rm.stop(); }