@Override public void process() throws IOException { long startTime = System.currentTimeMillis(); try { Status status = this.splitTaskExecutor.exec(splitTaskDetails.getWALFile(), mode, reporter); switch (status) { case DONE: coordination.endTask(new SplitLogTask.Done(this.serverName,this.mode), SplitLogCounters.tot_wkr_task_done, splitTaskDetails); break; case PREEMPTED: SplitLogCounters.tot_wkr_preempt_task.incrementAndGet(); LOG.warn("task execution prempted " + splitTaskDetails.getWALFile()); break; case ERR: if (server != null && !server.isStopped()) { coordination.endTask(new SplitLogTask.Err(this.serverName, this.mode), SplitLogCounters.tot_wkr_task_err, splitTaskDetails); break; } // if the RS is exiting then there is probably a tons of stuff // that can go wrong. Resign instead of signaling error. //$FALL-THROUGH$ case RESIGNED: if (server != null && server.isStopped()) { LOG.info("task execution interrupted because worker is exiting " + splitTaskDetails.toString()); } coordination.endTask(new SplitLogTask.Resigned(this.serverName, this.mode), SplitLogCounters.tot_wkr_task_resigned, splitTaskDetails); break; } } finally { LOG.info("worker " + serverName + " done with task " + splitTaskDetails.toString() + " in " + (System.currentTimeMillis() - startTime) + "ms"); this.inProgressTasks.decrementAndGet(); } }
@Override public void process() throws IOException { long startTime = System.currentTimeMillis(); try { Status status = this.splitTaskExecutor.exec(wal, reporter); switch (status) { case DONE: endTask(zkw, new SplitLogTask.Done(this.serverName), SplitLogCounters.tot_wkr_task_done, curTask, curTaskZKVersion.intValue()); break; case PREEMPTED: SplitLogCounters.tot_wkr_preempt_task.incrementAndGet(); LOG.warn("task execution prempted " + wal); break; case ERR: if (server != null && !server.isStopped()) { endTask(zkw, new SplitLogTask.Err(this.serverName), SplitLogCounters.tot_wkr_task_err, curTask, curTaskZKVersion.intValue()); break; } // if the RS is exiting then there is probably a tons of stuff // that can go wrong. Resign instead of signaling error. //$FALL-THROUGH$ case RESIGNED: if (server != null && server.isStopped()) { LOG.info("task execution interrupted because worker is exiting " + curTask); } endTask(zkw, new SplitLogTask.Resigned(this.serverName), SplitLogCounters.tot_wkr_task_resigned, curTask, curTaskZKVersion.intValue()); break; } } finally { LOG.info("worker " + serverName + " done with task " + curTask + " in " + (System.currentTimeMillis() - startTime) + "ms"); this.inProgressTasks.decrementAndGet(); } }
@Override public void process() throws IOException { long startTime = System.currentTimeMillis(); Status status = null; try { status = this.splitTaskExecutor.exec(splitTaskDetails.getWALFile(), reporter); switch (status) { case DONE: coordination.endTask(new SplitLogTask.Done(this.serverName), SplitLogCounters.tot_wkr_task_done, splitTaskDetails); break; case PREEMPTED: SplitLogCounters.tot_wkr_preempt_task.increment(); LOG.warn("task execution preempted " + splitTaskDetails.getWALFile()); break; case ERR: if (server != null && !server.isStopped()) { coordination.endTask(new SplitLogTask.Err(this.serverName), SplitLogCounters.tot_wkr_task_err, splitTaskDetails); break; } // if the RS is exiting then there is probably a tons of stuff // that can go wrong. Resign instead of signaling error. //$FALL-THROUGH$ case RESIGNED: if (server != null && server.isStopped()) { LOG.info("task execution interrupted because worker is exiting " + splitTaskDetails.toString()); } coordination.endTask(new SplitLogTask.Resigned(this.serverName), SplitLogCounters.tot_wkr_task_resigned, splitTaskDetails); break; } } finally { LOG.info("Worker " + serverName + " done with task " + splitTaskDetails.toString() + " in " + (System.currentTimeMillis() - startTime) + "ms. Status = " + status); this.inProgressTasks.decrementAndGet(); } }