Java 类com.lmax.disruptor.TimeoutBlockingWaitStrategy 实例源码

项目:incubator-omid    文件:RequestProcessorImpl.java   
@Inject
RequestProcessorImpl(MetricsRegistry metrics,
                     TimestampOracle timestampOracle,
                     PersistenceProcessor persistProc,
                     Panicker panicker,
                     TSOServerConfig config)
        throws IOException {

    // ------------------------------------------------------------------------------------------------------------
    // Disruptor initialization
    // ------------------------------------------------------------------------------------------------------------

    TimeoutBlockingWaitStrategy timeoutStrategy = new TimeoutBlockingWaitStrategy(config.getBatchPersistTimeoutInMs(), MILLISECONDS);

    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("request-%d").build();
    this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory);

    this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, MULTI, timeoutStrategy);
    disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This must be before handleEventsWith()
    disruptor.handleEventsWith(this);
    this.requestRing = disruptor.start();

    // ------------------------------------------------------------------------------------------------------------
    // Attribute initialization
    // ------------------------------------------------------------------------------------------------------------

    this.metrics = metrics;
    this.persistProc = persistProc;
    this.timestampOracle = timestampOracle;
    this.hashmap = new CommitHashMap(config.getConflictMapSize());
    this.tableFences = new HashMap<Long, Long>();

    LOG.info("RequestProcessor initialized");

}
项目:jstorm    文件:Worker.java   
private AsyncLoopThread startDispatchThread() {
    // send tuple directly from netty server
    // send control tuple to dispatch thread
    // startDispatchDisruptor();

    IContext context = workerData.getContext();
    String topologyId = workerData.getTopologyId();

    //create recv connection
    Map stormConf = workerData.getStormConf();
    long timeout = JStormUtils.parseLong(stormConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT), 10);
    WaitStrategy waitStrategy = new TimeoutBlockingWaitStrategy(timeout, TimeUnit.MILLISECONDS);
    int queueSize = JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_CTRL_BUFFER_SIZE), 256);
    DisruptorQueue recvControlQueue = DisruptorQueue.mkInstance("Dispatch-control", ProducerType.MULTI,
            queueSize, waitStrategy, false, 0, 0);

    //metric for recvControlQueue
    QueueGauge revCtrlGauge = new QueueGauge(recvControlQueue, MetricDef.RECV_CTRL_QUEUE);
    JStormMetrics.registerWorkerMetric(JStormMetrics.workerMetricName(MetricDef.RECV_CTRL_QUEUE, MetricType.GAUGE), new AsmGauge(
            revCtrlGauge));

    IConnection recvConnection = context.bind(topologyId, workerData.getPort(), workerData.getDeserializeQueues(),
            recvControlQueue, false, workerData.getTaskIds());
    workerData.setRecvConnection(recvConnection);

    // create recvice control messages's thread
    RunnableCallback recvControlDispather = new VirtualPortCtrlDispatch(
            workerData, recvConnection, recvControlQueue, MetricDef.RECV_THREAD);

    return new AsyncLoopThread(recvControlDispather, false, Thread.MAX_PRIORITY, true);
}
项目:jstorm    文件:TaskReceiver.java   
public TaskReceiver(Task task, int taskId, Map stormConf, TopologyContext topologyContext,
                    Map<Integer, DisruptorQueue> innerTaskTransfer,
                    TaskStatus taskStatus, String taskName) {
    this.stormConf = stormConf;
    this.task = task;
    this.bolt = (task.getTaskObj() instanceof IBolt ? (IBolt) task.getTaskObj() : null);
    this.taskId = taskId;
    this.idStr = taskName;

    this.topologyContext = topologyContext;
    this.innerTaskTransfer = innerTaskTransfer;

    this.taskStatus = taskStatus;

    int queueSize = JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE), 256);

    long timeout = JStormUtils.parseLong(stormConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT), 10);
    WaitStrategy waitStrategy = new TimeoutBlockingWaitStrategy(timeout, TimeUnit.MILLISECONDS);
    boolean isDisruptorBatchMode = ConfigExtension.isDisruptorQueueBatchMode(stormConf);
    int disruptorBatch = ConfigExtension.getDisruptorBufferSize(stormConf);
    long flushMs = ConfigExtension.getDisruptorBufferFlushMs(stormConf);
    this.deserializeQueue = DisruptorQueue.mkInstance("TaskDeserialize", ProducerType.MULTI, queueSize, waitStrategy,
            isDisruptorBatchMode, disruptorBatch, flushMs);
    dserializeThreadNum = ConfigExtension.getTaskDeserializeThreadNum(stormConf);
    deserializeThreads = new ArrayList<>();
    setDeserializeThread();
    //this.deserializer = new KryoTupleDeserializer(stormConf, topologyContext);

    String topologyId = topologyContext.getTopologyId();
    String component = topologyContext.getThisComponentId();

    AsmHistogram deserializeTimerHistogram = new AsmHistogram();
    deserializeTimerHistogram.setAggregate(false);
    deserializeTimer = (AsmHistogram) JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(
            topologyId, component, taskId, MetricDef.DESERIALIZE_TIME, MetricType.HISTOGRAM), deserializeTimerHistogram);

    QueueGauge deserializeQueueGauge = new QueueGauge(deserializeQueue, idStr, MetricDef.DESERIALIZE_QUEUE);
    JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(
                    topologyId, component, taskId, MetricDef.DESERIALIZE_QUEUE, MetricType.GAUGE),
            new AsmGauge(deserializeQueueGauge));
    JStormHealthCheck.registerTaskHealthCheck(taskId, MetricDef.DESERIALIZE_QUEUE, deserializeQueueGauge);

    this.isBackpressureEnable = ConfigExtension.isBackpressureEnable(stormConf);
    this.highMark = (float) ConfigExtension.getBackpressureWaterMarkHigh(stormConf);
    this.lowMark = (float) ConfigExtension.getBackpressureWaterMarkLow(stormConf);
    this.backpressureStatus = false;

    LOG.info("Successfully started TaskReceiver thread for {}, thread num: {}", idStr, dserializeThreadNum);
}
项目:jstorm    文件:BaseExecutors.java   
public BaseExecutors(Task task) {
    this.task = task;
    this.storm_conf = task.getStormConf();

    this.userTopologyCtx = task.getUserContext();
    this.sysTopologyCtx = task.getTopologyContext();
    this.taskStats = task.getTaskStats();
    this.taskId = sysTopologyCtx.getThisTaskId();
    this.innerTaskTransfer = task.getInnerTaskTransfer();
    this.topologyId = sysTopologyCtx.getTopologyId();
    this.componentId = sysTopologyCtx.getThisComponentId();
    this.idStr = JStormServerUtils.getName(componentId, taskId);

    this.taskStatus = task.getTaskStatus();
    this.executorStatus = new TaskStatus();
    this.reportError = task.getReportErrorDie();
    this.taskTransfer = task.getTaskTransfer();
    this.metricsReporter = task.getWorkerData().getMetricsReporter();

    messageTimeoutSecs = JStormUtils.parseInt(storm_conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 30);

    int queue_size = Utils.getInt(storm_conf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE), 256);
    long timeout = JStormUtils.parseLong(storm_conf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT), 10);
    WaitStrategy waitStrategy = new TimeoutBlockingWaitStrategy(timeout, TimeUnit.MILLISECONDS);
    boolean isDisruptorBatchMode = ConfigExtension.isDisruptorQueueBatchMode(storm_conf);
    int disruptorBatch = ConfigExtension.getDisruptorBufferSize(storm_conf);
    long flushMs = ConfigExtension.getDisruptorBufferFlushMs(storm_conf);

    this.exeQueue = DisruptorQueue.mkInstance(idStr, ProducerType.MULTI, queue_size,
            waitStrategy, isDisruptorBatchMode, disruptorBatch, flushMs);
    //this.exeQueue.consumerStarted();
    queue_size = Utils.getInt(storm_conf.get(Config.TOPOLOGY_CTRL_BUFFER_SIZE), 32);
    this.controlQueue = DisruptorQueue.mkInstance(
            idStr + " for control message", ProducerType.MULTI, queue_size, waitStrategy, false, 0, 0);
    //this.controlQueue.consumerStarted();

    this.registerInnerTransfer(exeQueue);
    this.task.getControlQueues().put(taskId, this.controlQueue);

    QueueGauge exeQueueGauge = new QueueGauge(exeQueue, idStr, MetricDef.EXECUTE_QUEUE);
    JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(
                    topologyId, componentId, taskId, MetricDef.EXECUTE_QUEUE, MetricType.GAUGE),
            new AsmGauge(exeQueueGauge));
    JStormHealthCheck.registerTaskHealthCheck(taskId, MetricDef.EXECUTE_QUEUE, exeQueueGauge);
    //metric for control queue
    QueueGauge controlQueueGauge = new QueueGauge(controlQueue, idStr, MetricDef.CONTROL_QUEUE);
    JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(
                    topologyId, componentId, taskId, MetricDef.CONTROL_QUEUE, MetricType.GAUGE),
            new AsmGauge(controlQueueGauge));
    JStormHealthCheck.registerTaskHealthCheck(taskId, MetricDef.CONTROL_QUEUE, controlQueueGauge);

    rotatingMapTrigger = new RotatingMapTrigger(storm_conf, idStr + "_rotating", controlQueue);
    rotatingMapTrigger.register();
    taskHbTrigger = new TaskHeartbeatTrigger(storm_conf, idStr + "_taskHeartbeat", controlQueue,
            taskId, componentId, sysTopologyCtx, reportError, executorStatus);
    assignmentTs = System.currentTimeMillis();
    isBatchMode = ConfigExtension.isTaskBatchTuple(storm_conf);
}