@Inject RequestProcessorImpl(MetricsRegistry metrics, TimestampOracle timestampOracle, PersistenceProcessor persistProc, Panicker panicker, TSOServerConfig config) throws IOException { // ------------------------------------------------------------------------------------------------------------ // Disruptor initialization // ------------------------------------------------------------------------------------------------------------ TimeoutBlockingWaitStrategy timeoutStrategy = new TimeoutBlockingWaitStrategy(config.getBatchPersistTimeoutInMs(), MILLISECONDS); ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("request-%d").build(); this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory); this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, MULTI, timeoutStrategy); disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This must be before handleEventsWith() disruptor.handleEventsWith(this); this.requestRing = disruptor.start(); // ------------------------------------------------------------------------------------------------------------ // Attribute initialization // ------------------------------------------------------------------------------------------------------------ this.metrics = metrics; this.persistProc = persistProc; this.timestampOracle = timestampOracle; this.hashmap = new CommitHashMap(config.getConflictMapSize()); this.tableFences = new HashMap<Long, Long>(); LOG.info("RequestProcessor initialized"); }
private AsyncLoopThread startDispatchThread() { // send tuple directly from netty server // send control tuple to dispatch thread // startDispatchDisruptor(); IContext context = workerData.getContext(); String topologyId = workerData.getTopologyId(); //create recv connection Map stormConf = workerData.getStormConf(); long timeout = JStormUtils.parseLong(stormConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT), 10); WaitStrategy waitStrategy = new TimeoutBlockingWaitStrategy(timeout, TimeUnit.MILLISECONDS); int queueSize = JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_CTRL_BUFFER_SIZE), 256); DisruptorQueue recvControlQueue = DisruptorQueue.mkInstance("Dispatch-control", ProducerType.MULTI, queueSize, waitStrategy, false, 0, 0); //metric for recvControlQueue QueueGauge revCtrlGauge = new QueueGauge(recvControlQueue, MetricDef.RECV_CTRL_QUEUE); JStormMetrics.registerWorkerMetric(JStormMetrics.workerMetricName(MetricDef.RECV_CTRL_QUEUE, MetricType.GAUGE), new AsmGauge( revCtrlGauge)); IConnection recvConnection = context.bind(topologyId, workerData.getPort(), workerData.getDeserializeQueues(), recvControlQueue, false, workerData.getTaskIds()); workerData.setRecvConnection(recvConnection); // create recvice control messages's thread RunnableCallback recvControlDispather = new VirtualPortCtrlDispatch( workerData, recvConnection, recvControlQueue, MetricDef.RECV_THREAD); return new AsyncLoopThread(recvControlDispather, false, Thread.MAX_PRIORITY, true); }
public TaskReceiver(Task task, int taskId, Map stormConf, TopologyContext topologyContext, Map<Integer, DisruptorQueue> innerTaskTransfer, TaskStatus taskStatus, String taskName) { this.stormConf = stormConf; this.task = task; this.bolt = (task.getTaskObj() instanceof IBolt ? (IBolt) task.getTaskObj() : null); this.taskId = taskId; this.idStr = taskName; this.topologyContext = topologyContext; this.innerTaskTransfer = innerTaskTransfer; this.taskStatus = taskStatus; int queueSize = JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE), 256); long timeout = JStormUtils.parseLong(stormConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT), 10); WaitStrategy waitStrategy = new TimeoutBlockingWaitStrategy(timeout, TimeUnit.MILLISECONDS); boolean isDisruptorBatchMode = ConfigExtension.isDisruptorQueueBatchMode(stormConf); int disruptorBatch = ConfigExtension.getDisruptorBufferSize(stormConf); long flushMs = ConfigExtension.getDisruptorBufferFlushMs(stormConf); this.deserializeQueue = DisruptorQueue.mkInstance("TaskDeserialize", ProducerType.MULTI, queueSize, waitStrategy, isDisruptorBatchMode, disruptorBatch, flushMs); dserializeThreadNum = ConfigExtension.getTaskDeserializeThreadNum(stormConf); deserializeThreads = new ArrayList<>(); setDeserializeThread(); //this.deserializer = new KryoTupleDeserializer(stormConf, topologyContext); String topologyId = topologyContext.getTopologyId(); String component = topologyContext.getThisComponentId(); AsmHistogram deserializeTimerHistogram = new AsmHistogram(); deserializeTimerHistogram.setAggregate(false); deserializeTimer = (AsmHistogram) JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName( topologyId, component, taskId, MetricDef.DESERIALIZE_TIME, MetricType.HISTOGRAM), deserializeTimerHistogram); QueueGauge deserializeQueueGauge = new QueueGauge(deserializeQueue, idStr, MetricDef.DESERIALIZE_QUEUE); JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName( topologyId, component, taskId, MetricDef.DESERIALIZE_QUEUE, MetricType.GAUGE), new AsmGauge(deserializeQueueGauge)); JStormHealthCheck.registerTaskHealthCheck(taskId, MetricDef.DESERIALIZE_QUEUE, deserializeQueueGauge); this.isBackpressureEnable = ConfigExtension.isBackpressureEnable(stormConf); this.highMark = (float) ConfigExtension.getBackpressureWaterMarkHigh(stormConf); this.lowMark = (float) ConfigExtension.getBackpressureWaterMarkLow(stormConf); this.backpressureStatus = false; LOG.info("Successfully started TaskReceiver thread for {}, thread num: {}", idStr, dserializeThreadNum); }
public BaseExecutors(Task task) { this.task = task; this.storm_conf = task.getStormConf(); this.userTopologyCtx = task.getUserContext(); this.sysTopologyCtx = task.getTopologyContext(); this.taskStats = task.getTaskStats(); this.taskId = sysTopologyCtx.getThisTaskId(); this.innerTaskTransfer = task.getInnerTaskTransfer(); this.topologyId = sysTopologyCtx.getTopologyId(); this.componentId = sysTopologyCtx.getThisComponentId(); this.idStr = JStormServerUtils.getName(componentId, taskId); this.taskStatus = task.getTaskStatus(); this.executorStatus = new TaskStatus(); this.reportError = task.getReportErrorDie(); this.taskTransfer = task.getTaskTransfer(); this.metricsReporter = task.getWorkerData().getMetricsReporter(); messageTimeoutSecs = JStormUtils.parseInt(storm_conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 30); int queue_size = Utils.getInt(storm_conf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE), 256); long timeout = JStormUtils.parseLong(storm_conf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT), 10); WaitStrategy waitStrategy = new TimeoutBlockingWaitStrategy(timeout, TimeUnit.MILLISECONDS); boolean isDisruptorBatchMode = ConfigExtension.isDisruptorQueueBatchMode(storm_conf); int disruptorBatch = ConfigExtension.getDisruptorBufferSize(storm_conf); long flushMs = ConfigExtension.getDisruptorBufferFlushMs(storm_conf); this.exeQueue = DisruptorQueue.mkInstance(idStr, ProducerType.MULTI, queue_size, waitStrategy, isDisruptorBatchMode, disruptorBatch, flushMs); //this.exeQueue.consumerStarted(); queue_size = Utils.getInt(storm_conf.get(Config.TOPOLOGY_CTRL_BUFFER_SIZE), 32); this.controlQueue = DisruptorQueue.mkInstance( idStr + " for control message", ProducerType.MULTI, queue_size, waitStrategy, false, 0, 0); //this.controlQueue.consumerStarted(); this.registerInnerTransfer(exeQueue); this.task.getControlQueues().put(taskId, this.controlQueue); QueueGauge exeQueueGauge = new QueueGauge(exeQueue, idStr, MetricDef.EXECUTE_QUEUE); JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName( topologyId, componentId, taskId, MetricDef.EXECUTE_QUEUE, MetricType.GAUGE), new AsmGauge(exeQueueGauge)); JStormHealthCheck.registerTaskHealthCheck(taskId, MetricDef.EXECUTE_QUEUE, exeQueueGauge); //metric for control queue QueueGauge controlQueueGauge = new QueueGauge(controlQueue, idStr, MetricDef.CONTROL_QUEUE); JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName( topologyId, componentId, taskId, MetricDef.CONTROL_QUEUE, MetricType.GAUGE), new AsmGauge(controlQueueGauge)); JStormHealthCheck.registerTaskHealthCheck(taskId, MetricDef.CONTROL_QUEUE, controlQueueGauge); rotatingMapTrigger = new RotatingMapTrigger(storm_conf, idStr + "_rotating", controlQueue); rotatingMapTrigger.register(); taskHbTrigger = new TaskHeartbeatTrigger(storm_conf, idStr + "_taskHeartbeat", controlQueue, taskId, componentId, sysTopologyCtx, reportError, executorStatus); assignmentTs = System.currentTimeMillis(); isBatchMode = ConfigExtension.isTaskBatchTuple(storm_conf); }