/** * @param conf * @param handlerCount the number of handler threads that will be used to process calls * @param priorityHandlerCount How many threads for priority handling. * @param replicationHandlerCount How many threads for replication handling. * @param highPriorityLevel * @param priority Function to extract request priority. */ public SimpleRpcScheduler( Configuration conf, int handlerCount, int priorityHandlerCount, int replicationHandlerCount, PriorityFunction priority, int highPriorityLevel) { int maxQueueLength = conf.getInt("ipc.server.max.callqueue.length", handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); this.handlerCount = handlerCount; this.priorityHandlerCount = priorityHandlerCount; this.replicationHandlerCount = replicationHandlerCount; this.priority = priority; this.highPriorityLevel = highPriorityLevel; String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); LOG.debug("Using " + callQueueType + " as user call queue"); if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { this.callQueue = new BoundedPriorityBlockingQueue<CallRunner>(maxQueueLength, new CallPriorityComparator(conf, this.priority)); } else { this.callQueue = new LinkedBlockingQueue<CallRunner>(maxQueueLength); } this.priorityCallQueue = priorityHandlerCount > 0 ? new LinkedBlockingQueue<CallRunner>(maxQueueLength) : null; this.replicationQueue = replicationHandlerCount > 0 ? new LinkedBlockingQueue<CallRunner>(maxQueueLength) : null; }
public RpcExecutor(final String name, final int handlerCount, final String callQueueType, final int maxQueueLength, final PriorityFunction priority, final Configuration conf, final Abortable abortable) { this.name = Strings.nullToEmpty(name); this.conf = conf; this.abortable = abortable; float callQueuesHandlersFactor = this.conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f); this.numCallQueues = computeNumCallQueues(handlerCount, callQueuesHandlersFactor); this.queues = new ArrayList<>(this.numCallQueues); this.handlerCount = Math.max(handlerCount, this.numCallQueues); this.handlers = new ArrayList<>(this.handlerCount); this.priority = priority; if (isDeadlineQueueType(callQueueType)) { this.name += ".Deadline"; this.queueInitArgs = new Object[] { maxQueueLength, new CallPriorityComparator(conf, this.priority) }; this.queueClass = BoundedPriorityBlockingQueue.class; } else if (isCodelQueueType(callQueueType)) { this.name += ".Codel"; int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY, CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY); int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL); double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD, CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD); queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval, codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches }; queueClass = AdaptiveLifoCoDelCallQueue.class; } else { this.name += ".Fifo"; queueInitArgs = new Object[] { maxQueueLength }; queueClass = LinkedBlockingQueue.class; } LOG.info("RpcExecutor " + name + " using " + callQueueType + " as call queue; numCallQueues=" + numCallQueues + "; maxQueueLength=" + maxQueueLength + "; handlerCount=" + handlerCount); }