/** * On construction, adds the size of this call to the running count of outstanding call sizes. * Presumption is that we are put on a queue while we wait on an executor to run us. During this * time we occupy heap. */ // The constructor is shutdown so only RpcServer in this class can make one of these. CallRunner(final RpcServerInterface rpcServer, final Call call) { this.call = call; this.rpcServer = rpcServer; // Add size of the call to queue size. this.rpcServer.addCallSize(call.getSize()); }
/** * On construction, adds the size of this call to the running count of outstanding call sizes. * Presumption is that we are put on a queue while we wait on an executor to run us. During this * time we occupy heap. */ // The constructor is shutdown so only RpcServer in this class can make one of these. CallRunner(final RpcServerInterface rpcServer, final Call call, UserProvider userProvider) { this.call = call; this.rpcServer = rpcServer; // Add size of the call to queue size. this.rpcServer.addCallSize(call.getSize()); this.status = getStatus(); this.userProvider = userProvider; }
/** * On construction, adds the size of this call to the running count of outstanding call sizes. * Presumption is that we are put on a queue while we wait on an executor to run us. During this * time we occupy heap. * @param call The call to run. * @param rpcServer */ // The constructor is shutdown so only RpcServer in this class can make one of these. CallRunner(final RpcServerInterface rpcServer, final Call call, UserProvider userProvider) { this.call = call; this.rpcServer = rpcServer; // Add size of the call to queue size. this.rpcServer.addCallSize(call.getSize()); this.status = getStatus(); this.userProvider = userProvider; }
public Call getCall() { return call; }
private CallRunner createMockTask() { Call call = mock(Call.class); CallRunner task = mock(CallRunner.class); when(task.getCall()).thenReturn(call); return task; }
private void testRpcScheduler(final String queueType) throws Exception { Configuration schedConf = HBaseConfiguration.create(); schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, queueType); PriorityFunction priority = mock(PriorityFunction.class); when(priority.getPriority(any(RequestHeader.class), any(Message.class), any(User.class))) .thenReturn(HConstants.NORMAL_QOS); RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD); try { scheduler.start(); CallRunner smallCallTask = mock(CallRunner.class); RpcServer.Call smallCall = mock(RpcServer.Call.class); RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build(); when(smallCallTask.getCall()).thenReturn(smallCall); when(smallCall.getHeader()).thenReturn(smallHead); CallRunner largeCallTask = mock(CallRunner.class); RpcServer.Call largeCall = mock(RpcServer.Call.class); RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build(); when(largeCallTask.getCall()).thenReturn(largeCall); when(largeCall.getHeader()).thenReturn(largeHead); CallRunner hugeCallTask = mock(CallRunner.class); RpcServer.Call hugeCall = mock(RpcServer.Call.class); RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build(); when(hugeCallTask.getCall()).thenReturn(hugeCall); when(hugeCall.getHeader()).thenReturn(hugeHead); when(priority.getDeadline(eq(smallHead), any(Message.class))).thenReturn(0L); when(priority.getDeadline(eq(largeHead), any(Message.class))).thenReturn(50L); when(priority.getDeadline(eq(hugeHead), any(Message.class))).thenReturn(100L); final ArrayList<Integer> work = new ArrayList<Integer>(); doAnswerTaskExecution(smallCallTask, work, 10, 250); doAnswerTaskExecution(largeCallTask, work, 50, 250); doAnswerTaskExecution(hugeCallTask, work, 100, 250); scheduler.dispatch(smallCallTask); scheduler.dispatch(smallCallTask); scheduler.dispatch(smallCallTask); scheduler.dispatch(hugeCallTask); scheduler.dispatch(smallCallTask); scheduler.dispatch(largeCallTask); scheduler.dispatch(smallCallTask); scheduler.dispatch(smallCallTask); while (work.size() < 8) { Threads.sleepWithoutInterrupt(100); } int seqSum = 0; int totalTime = 0; for (int i = 0; i < work.size(); ++i) { LOG.debug("Request i=" + i + " value=" + work.get(i)); seqSum += work.get(i); totalTime += seqSum; } LOG.debug("Total Time: " + totalTime); // -> [small small small huge small large small small] // -> NO REORDER [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue) // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue) if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { assertEquals(530, totalTime); } else /* if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) */ { assertEquals(930, totalTime); } } finally { scheduler.stop(); } }
private void testRpcScheduler(final String queueType) throws Exception { Configuration schedConf = HBaseConfiguration.create(); schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, queueType); PriorityFunction priority = mock(PriorityFunction.class); when(priority.getPriority(any(RequestHeader.class), any(Message.class))) .thenReturn(HConstants.NORMAL_QOS); RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, HConstants.QOS_THRESHOLD); try { scheduler.start(); CallRunner smallCallTask = mock(CallRunner.class); RpcServer.Call smallCall = mock(RpcServer.Call.class); RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build(); when(smallCallTask.getCall()).thenReturn(smallCall); when(smallCall.getHeader()).thenReturn(smallHead); CallRunner largeCallTask = mock(CallRunner.class); RpcServer.Call largeCall = mock(RpcServer.Call.class); RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build(); when(largeCallTask.getCall()).thenReturn(largeCall); when(largeCall.getHeader()).thenReturn(largeHead); CallRunner hugeCallTask = mock(CallRunner.class); RpcServer.Call hugeCall = mock(RpcServer.Call.class); RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build(); when(hugeCallTask.getCall()).thenReturn(hugeCall); when(hugeCall.getHeader()).thenReturn(hugeHead); when(priority.getDeadline(eq(smallHead), any(Message.class))).thenReturn(0L); when(priority.getDeadline(eq(largeHead), any(Message.class))).thenReturn(50L); when(priority.getDeadline(eq(hugeHead), any(Message.class))).thenReturn(100L); final ArrayList<Integer> work = new ArrayList<Integer>(); doAnswerTaskExecution(smallCallTask, work, 10, 250); doAnswerTaskExecution(largeCallTask, work, 50, 250); doAnswerTaskExecution(hugeCallTask, work, 100, 250); scheduler.dispatch(smallCallTask); scheduler.dispatch(smallCallTask); scheduler.dispatch(smallCallTask); scheduler.dispatch(hugeCallTask); scheduler.dispatch(smallCallTask); scheduler.dispatch(largeCallTask); scheduler.dispatch(smallCallTask); scheduler.dispatch(smallCallTask); while (work.size() < 8) { Threads.sleepWithoutInterrupt(100); } int seqSum = 0; int totalTime = 0; for (int i = 0; i < work.size(); ++i) { LOG.debug("Request i=" + i + " value=" + work.get(i)); seqSum += work.get(i); totalTime += seqSum; } LOG.debug("Total Time: " + totalTime); // -> [small small small huge small large small small] // -> NO REORDER [10 10 10 100 10 50 10 10] -> 930 (FIFO Queue) // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue) if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { assertEquals(530, totalTime); } else /* if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) */ { assertEquals(930, totalTime); } } finally { scheduler.stop(); } }