public ZKSplitLogManagerCoordination(final CoordinatedStateManager manager, ZooKeeperWatcher watcher) { super(watcher); taskFinisher = new TaskFinisher() { @Override public Status finish(ServerName workerName, String logfile) { try { WALSplitter.finishSplitLogFile(logfile, manager.getServer().getConfiguration()); } catch (IOException e) { LOG.warn("Could not finish splitting of log file " + logfile, e); return Status.ERR; } return Status.DONE; } }; this.server = manager.getServer(); this.conf = server.getConfiguration(); }
public ZKSplitLogManagerCoordination(Configuration conf, ZKWatcher watcher) { super(watcher); this.conf = conf; taskFinisher = new TaskFinisher() { @Override public Status finish(ServerName workerName, String logfile) { try { WALSplitter.finishSplitLogFile(logfile, conf); } catch (IOException e) { LOG.warn("Could not finish splitting of log file " + logfile, e); return Status.ERR; } return Status.DONE; } }; }
private void getDataSetWatchSuccess(String path, byte[] data, int version) throws DeserializationException { if (data == null) { if (version == Integer.MIN_VALUE) { // assume all done. The task znode suddenly disappeared. setDone(path, SUCCESS); return; } SplitLogCounters.tot_mgr_null_data.incrementAndGet(); LOG.fatal("logic error - got null data " + path); setDone(path, FAILURE); return; } data = this.watcher.getRecoverableZooKeeper().removeMetaData(data); SplitLogTask slt = SplitLogTask.parseFrom(data); if (slt.isUnassigned()) { LOG.debug("task not yet acquired " + path + " ver = " + version); handleUnassignedTask(path); } else if (slt.isOwned()) { heartbeat(path, version, slt.getServerName()); } else if (slt.isResigned()) { LOG.info("task " + path + " entered state: " + slt.toString()); resubmitOrFail(path, FORCE); } else if (slt.isDone()) { LOG.info("task " + path + " entered state: " + slt.toString()); if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) { if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) { setDone(path, SUCCESS); } else { resubmitOrFail(path, CHECK); } } else { setDone(path, SUCCESS); } } else if (slt.isErr()) { LOG.info("task " + path + " entered state: " + slt.toString()); resubmitOrFail(path, CHECK); } else { LOG.fatal("logic error - unexpected zk state for path = " + path + " data = " + slt.toString()); setDone(path, FAILURE); } }
private void getDataSetWatchSuccess(String path, byte[] data, int version) throws DeserializationException { if (data == null) { if (version == Integer.MIN_VALUE) { // assume all done. The task znode suddenly disappeared. setDone(path, SUCCESS); return; } SplitLogCounters.tot_mgr_null_data.increment(); LOG.error(HBaseMarkers.FATAL, "logic error - got null data " + path); setDone(path, FAILURE); return; } data = ZKMetadata.removeMetaData(data); SplitLogTask slt = SplitLogTask.parseFrom(data); if (slt.isUnassigned()) { LOG.debug("Task not yet acquired " + path + ", ver=" + version); handleUnassignedTask(path); } else if (slt.isOwned()) { heartbeat(path, version, slt.getServerName()); } else if (slt.isResigned()) { LOG.info("Task " + path + " entered state=" + slt.toString()); resubmitOrFail(path, FORCE); } else if (slt.isDone()) { LOG.info("Task " + path + " entered state=" + slt.toString()); if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) { if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) { setDone(path, SUCCESS); } else { resubmitOrFail(path, CHECK); } } else { setDone(path, SUCCESS); } } else if (slt.isErr()) { LOG.info("Task " + path + " entered state=" + slt.toString()); resubmitOrFail(path, CHECK); } else { LOG.error(HBaseMarkers.FATAL, "logic error - unexpected zk state for path = " + path + " data = " + slt.toString()); setDone(path, FAILURE); } }
/** * finish the partially done task. workername provides clue to where the partial results of the * partially done tasks are present. taskname is the name of the task that was put up in * zookeeper. * <p> * @param workerName * @param taskname * @return DONE if task completed successfully, ERR otherwise */ Status finish(ServerName workerName, String taskname);