Java 类org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue 实例源码

项目:PyroDB    文件:SimpleRpcScheduler.java   
/**
 * @param conf
 * @param handlerCount the number of handler threads that will be used to process calls
 * @param priorityHandlerCount How many threads for priority handling.
 * @param replicationHandlerCount How many threads for replication handling.
 * @param highPriorityLevel
 * @param priority Function to extract request priority.
 */
public SimpleRpcScheduler(
    Configuration conf,
    int handlerCount,
    int priorityHandlerCount,
    int replicationHandlerCount,
    PriorityFunction priority,
    int highPriorityLevel) {
  int maxQueueLength = conf.getInt("ipc.server.max.callqueue.length",
      handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
  this.handlerCount = handlerCount;
  this.priorityHandlerCount = priorityHandlerCount;
  this.replicationHandlerCount = replicationHandlerCount;
  this.priority = priority;
  this.highPriorityLevel = highPriorityLevel;

  String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
  LOG.debug("Using " + callQueueType + " as user call queue");
  if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
    this.callQueue = new BoundedPriorityBlockingQueue<CallRunner>(maxQueueLength,
        new CallPriorityComparator(conf, this.priority));
  } else {
    this.callQueue = new LinkedBlockingQueue<CallRunner>(maxQueueLength);
  }
  this.priorityCallQueue = priorityHandlerCount > 0
      ? new LinkedBlockingQueue<CallRunner>(maxQueueLength)
      : null;
  this.replicationQueue = replicationHandlerCount > 0
      ? new LinkedBlockingQueue<CallRunner>(maxQueueLength)
      : null;
}
项目:hbase    文件:RpcExecutor.java   
public RpcExecutor(final String name, final int handlerCount, final String callQueueType,
    final int maxQueueLength, final PriorityFunction priority, final Configuration conf,
    final Abortable abortable) {
  this.name = Strings.nullToEmpty(name);
  this.conf = conf;
  this.abortable = abortable;

  float callQueuesHandlersFactor = this.conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f);
  this.numCallQueues = computeNumCallQueues(handlerCount, callQueuesHandlersFactor);
  this.queues = new ArrayList<>(this.numCallQueues);

  this.handlerCount = Math.max(handlerCount, this.numCallQueues);
  this.handlers = new ArrayList<>(this.handlerCount);

  this.priority = priority;

  if (isDeadlineQueueType(callQueueType)) {
    this.name += ".Deadline";
    this.queueInitArgs = new Object[] { maxQueueLength,
      new CallPriorityComparator(conf, this.priority) };
    this.queueClass = BoundedPriorityBlockingQueue.class;
  } else if (isCodelQueueType(callQueueType)) {
    this.name += ".Codel";
    int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
      CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
    int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
    double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
      CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
    queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval,
        codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches };
    queueClass = AdaptiveLifoCoDelCallQueue.class;
  } else {
    this.name += ".Fifo";
    queueInitArgs = new Object[] { maxQueueLength };
    queueClass = LinkedBlockingQueue.class;
  }

  LOG.info("RpcExecutor " + name + " using " + callQueueType
      + " as call queue; numCallQueues=" + numCallQueues + "; maxQueueLength=" + maxQueueLength
      + "; handlerCount=" + handlerCount);
}