Java 类org.apache.hadoop.mapreduce.jobhistory.HistoryEvent 实例源码

项目:hadoop    文件:Job20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  JobID jobID = JobID.forName(jobIDName);

  if (jobIDName == null) {
    return null;
  }

  String priority = line.get("JOB_PRIORITY");

  if (priority != null) {
    return new JobPriorityChangeEvent(jobID, JobPriority.valueOf(priority));
  }

  return null;
}
项目:hadoop    文件:Job20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String status = line.get("JOB_STATUS");

  if (status != null) {
    return new JobStatusChangedEvent(jobID, status);
  }

  return null;
}
项目:hadoop    文件:Job20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String launchTime = line.get("LAUNCH_TIME");

  if (launchTime != null) {
    Job20LineHistoryEventEmitter that =
        (Job20LineHistoryEventEmitter) thatg;
    return new JobInfoChangeEvent(jobID, that.originalSubmitTime, Long
        .parseLong(launchTime));
  }

  return null;
}
项目:hadoop    文件:Job20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String finishTime = line.get("FINISH_TIME");

  String status = line.get("JOB_STATUS");

  String finishedMaps = line.get("FINISHED_MAPS");
  String finishedReduces = line.get("FINISHED_REDUCES");

  if (status != null && !status.equalsIgnoreCase("success")
      && finishTime != null && finishedMaps != null
      && finishedReduces != null) {
    return new JobUnsuccessfulCompletionEvent(jobID, Long
        .parseLong(finishTime), Integer.parseInt(finishedMaps), Integer
        .parseInt(finishedReduces), status);
  }

  return null;
}
项目:hadoop    文件:ReduceAttempt20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
    HistoryEventEmitter thatg) {
  if (taskAttemptIDName == null) {
    return null;
  }

  TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);

  String finishTime = line.get("FINISH_TIME");
  String status = line.get("TASK_STATUS");

  if (finishTime != null && status != null
      && status.equalsIgnoreCase("success")) {
    String hostName = line.get("HOSTNAME");
    String counters = line.get("COUNTERS");
    String state = line.get("STATE_STRING");
    String shuffleFinish = line.get("SHUFFLE_FINISHED");
    String sortFinish = line.get("SORT_FINISHED");

    if (shuffleFinish != null && sortFinish != null
        && "success".equalsIgnoreCase(status)) {
      ReduceAttempt20LineHistoryEventEmitter that =
          (ReduceAttempt20LineHistoryEventEmitter) thatg;

      return new ReduceAttemptFinishedEvent
        (taskAttemptID,
         that.originalTaskType, status,
         Long.parseLong(shuffleFinish),
         Long.parseLong(sortFinish),
         Long.parseLong(finishTime),
         hostName, -1, null,
         state, maybeParseCounters(counters),
         null);
    }
  }

  return null;
}
项目:hadoop    文件:Task20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
    HistoryEventEmitter thatg) {
  if (taskIDName == null) {
    return null;
  }

  TaskID taskID = TaskID.forName(taskIDName);

  String taskType = line.get("TASK_TYPE");
  String startTime = line.get("START_TIME");
  String splits = line.get("SPLITS");

  if (startTime != null && taskType != null) {
    Task20LineHistoryEventEmitter that =
        (Task20LineHistoryEventEmitter) thatg;

    that.originalStartTime = Long.parseLong(startTime);
    that.originalTaskType =
        Version20LogInterfaceUtils.get20TaskType(taskType);

    return new TaskStartedEvent(taskID, that.originalStartTime,
        that.originalTaskType, splits);
  }

  return null;
}
项目:hadoop    文件:Task20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
    HistoryEventEmitter thatg) {
  if (taskIDName == null) {
    return null;
  }

  TaskID taskID = TaskID.forName(taskIDName);

  String finishTime = line.get("FINISH_TIME");

  if (finishTime != null) {
    return new TaskUpdatedEvent(taskID, Long.parseLong(finishTime));
  }

  return null;
}
项目:hadoop    文件:TopologyBuilder.java   
/**
 * Process one {@link HistoryEvent}
 * 
 * @param event
 *          The {@link HistoryEvent} to be processed.
 */
public void process(HistoryEvent event) {
  if (event instanceof TaskAttemptFinishedEvent) {
    processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
  } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) {
    processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
  } else if (event instanceof TaskStartedEvent) {
    processTaskStartedEvent((TaskStartedEvent) event);
  } else if (event instanceof MapAttemptFinishedEvent) {
    processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
  } else if (event instanceof ReduceAttemptFinishedEvent) {
    processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
  }

  // I do NOT expect these if statements to be exhaustive.
}
项目:hadoop    文件:MapAttempt20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
    HistoryEventEmitter thatg) {
  if (taskAttemptIDName == null) {
    return null;
  }

  TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);

  String finishTime = line.get("FINISH_TIME");
  String status = line.get("TASK_STATUS");

  if (finishTime != null && status != null
      && status.equalsIgnoreCase("success")) {
    String hostName = line.get("HOSTNAME");
    String counters = line.get("COUNTERS");
    String state = line.get("STATE_STRING");

    MapAttempt20LineHistoryEventEmitter that =
        (MapAttempt20LineHistoryEventEmitter) thatg;

    if ("success".equalsIgnoreCase(status)) {
      return new MapAttemptFinishedEvent
        (taskAttemptID,
          that.originalTaskType, status,
         Long.parseLong(finishTime),
         Long.parseLong(finishTime),
         hostName, -1, null, state, maybeParseCounters(counters),
         null);
    }
  }

  return null;
}
项目:hadoop    文件:Job20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String launchTime = line.get("LAUNCH_TIME");
  String status = line.get("JOB_STATUS");
  String totalMaps = line.get("TOTAL_MAPS");
  String totalReduces = line.get("TOTAL_REDUCES");
  String uberized = line.get("UBERIZED");

  if (launchTime != null && totalMaps != null && totalReduces != null) {
    return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer
        .parseInt(totalMaps), Integer.parseInt(totalReduces), status,
        Boolean.parseBoolean(uberized));
  }

  return null;
}
项目:hadoop    文件:Job20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String finishTime = line.get("FINISH_TIME");

  String status = line.get("JOB_STATUS");

  String finishedMaps = line.get("FINISHED_MAPS");
  String finishedReduces = line.get("FINISHED_REDUCES");

  String failedMaps = line.get("FAILED_MAPS");
  String failedReduces = line.get("FAILED_REDUCES");

  String counters = line.get("COUNTERS");

  if (status != null && status.equalsIgnoreCase("success")
      && finishTime != null && finishedMaps != null
      && finishedReduces != null) {
    return new JobFinishedEvent(jobID, Long.parseLong(finishTime), Integer
        .parseInt(finishedMaps), Integer.parseInt(finishedReduces), Integer
        .parseInt(failedMaps), Integer.parseInt(failedReduces), null, null,
        maybeParseCounters(counters));
  }

  return null;
}
项目:hadoop    文件:TaskAttempt20LineEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
    HistoryEventEmitter thatg) {
  if (taskAttemptIDName == null) {
    return null;
  }

  TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);

  String startTime = line.get("START_TIME");
  String taskType = line.get("TASK_TYPE");
  String trackerName = line.get("TRACKER_NAME");
  String httpPort = line.get("HTTP_PORT");
  String locality = line.get("LOCALITY");
  if (locality == null) {
    locality = "";
  }
  String avataar = line.get("AVATAAR");
  if (avataar == null) {
    avataar = "";
  }

  if (startTime != null && taskType != null) {
    TaskAttempt20LineEventEmitter that =
        (TaskAttempt20LineEventEmitter) thatg;

    that.originalStartTime = Long.parseLong(startTime);
    that.originalTaskType =
        Version20LogInterfaceUtils.get20TaskType(taskType);

    int port =
        httpPort.equals("") ? DEFAULT_HTTP_PORT : Integer
            .parseInt(httpPort);

    return new TaskAttemptStartedEvent(taskAttemptID,
        that.originalTaskType, that.originalStartTime, trackerName, port, -1,
        locality, avataar);
  }

  return null;
}
项目:hadoop    文件:TaskAttempt20LineEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
    HistoryEventEmitter thatg) {
  if (taskAttemptIDName == null) {
    return null;
  }

  TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);

  String finishTime = line.get("FINISH_TIME");
  String status = line.get("TASK_STATUS");

  if (finishTime != null && status != null
      && status.equalsIgnoreCase("success")) {
    String hostName = line.get("HOSTNAME");
    String counters = line.get("COUNTERS");
    String state = line.get("STATE_STRING");

    TaskAttempt20LineEventEmitter that =
        (TaskAttempt20LineEventEmitter) thatg;

    ParsedHost pHost = ParsedHost.parse(hostName);

    return new TaskAttemptFinishedEvent(taskAttemptID,
        that.originalTaskType, status, Long.parseLong(finishTime),
        pHost.getRackName(), pHost.getNodeName(), state, 
        maybeParseCounters(counters));
  }

  return null;
}
项目:hadoop    文件:TaskAttempt20LineEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
    HistoryEventEmitter thatg) {
  if (taskAttemptIDName == null) {
    return null;
  }

  TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);

  String finishTime = line.get("FINISH_TIME");
  String status = line.get("TASK_STATUS");

  if (finishTime != null && status != null
      && !status.equalsIgnoreCase("success")) {
    String hostName = line.get("HOSTNAME");
    String error = line.get("ERROR");

    TaskAttempt20LineEventEmitter that =
        (TaskAttempt20LineEventEmitter) thatg;

    ParsedHost pHost = ParsedHost.parse(hostName);
    String rackName = null;

    // Earlier versions of MR logged on hostnames (without rackname) for
    // unsuccessful attempts
    if (pHost != null) {
      rackName = pHost.getRackName();
      hostName = pHost.getNodeName();
    }
    return new TaskAttemptUnsuccessfulCompletionEvent
      (taskAttemptID,
       that.originalTaskType, status, Long.parseLong(finishTime),
       hostName, -1, rackName, error, null);
  }

  return null;
}
项目:aliyun-oss-hadoop-fs    文件:Task20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
    HistoryEventEmitter thatg) {
  if (taskIDName == null) {
    return null;
  }

  TaskID taskID = TaskID.forName(taskIDName);

  String taskType = line.get("TASK_TYPE");
  String startTime = line.get("START_TIME");
  String splits = line.get("SPLITS");

  if (startTime != null && taskType != null) {
    Task20LineHistoryEventEmitter that =
        (Task20LineHistoryEventEmitter) thatg;

    that.originalStartTime = Long.parseLong(startTime);
    that.originalTaskType =
        Version20LogInterfaceUtils.get20TaskType(taskType);

    return new TaskStartedEvent(taskID, that.originalStartTime,
        that.originalTaskType, splits);
  }

  return null;
}
项目:aliyun-oss-hadoop-fs    文件:Task20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
    HistoryEventEmitter thatg) {
  if (taskIDName == null) {
    return null;
  }

  TaskID taskID = TaskID.forName(taskIDName);

  String finishTime = line.get("FINISH_TIME");

  if (finishTime != null) {
    return new TaskUpdatedEvent(taskID, Long.parseLong(finishTime));
  }

  return null;
}
项目:aliyun-oss-hadoop-fs    文件:TopologyBuilder.java   
/**
 * Process one {@link HistoryEvent}
 * 
 * @param event
 *          The {@link HistoryEvent} to be processed.
 */
public void process(HistoryEvent event) {
  if (event instanceof TaskAttemptFinishedEvent) {
    processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
  } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) {
    processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
  } else if (event instanceof TaskStartedEvent) {
    processTaskStartedEvent((TaskStartedEvent) event);
  } else if (event instanceof MapAttemptFinishedEvent) {
    processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
  } else if (event instanceof ReduceAttemptFinishedEvent) {
    processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
  }

  // I do NOT expect these if statements to be exhaustive.
}
项目:aliyun-oss-hadoop-fs    文件:Job20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  JobID jobID = JobID.forName(jobIDName);

  if (jobIDName == null) {
    return null;
  }

  String priority = line.get("JOB_PRIORITY");

  if (priority != null) {
    return new JobPriorityChangeEvent(jobID, JobPriority.valueOf(priority));
  }

  return null;
}
项目:aliyun-oss-hadoop-fs    文件:Job20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String launchTime = line.get("LAUNCH_TIME");
  String status = line.get("JOB_STATUS");
  String totalMaps = line.get("TOTAL_MAPS");
  String totalReduces = line.get("TOTAL_REDUCES");
  String uberized = line.get("UBERIZED");

  if (launchTime != null && totalMaps != null && totalReduces != null) {
    return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer
        .parseInt(totalMaps), Integer.parseInt(totalReduces), status,
        Boolean.parseBoolean(uberized));
  }

  return null;
}
项目:aliyun-oss-hadoop-fs    文件:Job20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String status = line.get("JOB_STATUS");

  if (status != null) {
    return new JobStatusChangedEvent(jobID, status);
  }

  return null;
}
项目:aliyun-oss-hadoop-fs    文件:Job20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String launchTime = line.get("LAUNCH_TIME");

  if (launchTime != null) {
    Job20LineHistoryEventEmitter that =
        (Job20LineHistoryEventEmitter) thatg;
    return new JobInfoChangeEvent(jobID, that.originalSubmitTime, Long
        .parseLong(launchTime));
  }

  return null;
}
项目:aliyun-oss-hadoop-fs    文件:Job20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String finishTime = line.get("FINISH_TIME");

  String status = line.get("JOB_STATUS");

  String finishedMaps = line.get("FINISHED_MAPS");
  String finishedReduces = line.get("FINISHED_REDUCES");

  if (status != null && !status.equalsIgnoreCase("success")
      && finishTime != null && finishedMaps != null
      && finishedReduces != null) {
    return new JobUnsuccessfulCompletionEvent(jobID, Long
        .parseLong(finishTime), Integer.parseInt(finishedMaps), Integer
        .parseInt(finishedReduces), status);
  }

  return null;
}
项目:big-c    文件:Task20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
    HistoryEventEmitter thatg) {
  if (taskIDName == null) {
    return null;
  }

  TaskID taskID = TaskID.forName(taskIDName);

  String taskType = line.get("TASK_TYPE");
  String startTime = line.get("START_TIME");
  String splits = line.get("SPLITS");

  if (startTime != null && taskType != null) {
    Task20LineHistoryEventEmitter that =
        (Task20LineHistoryEventEmitter) thatg;

    that.originalStartTime = Long.parseLong(startTime);
    that.originalTaskType =
        Version20LogInterfaceUtils.get20TaskType(taskType);

    return new TaskStartedEvent(taskID, that.originalStartTime,
        that.originalTaskType, splits);
  }

  return null;
}
项目:big-c    文件:Task20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
    HistoryEventEmitter thatg) {
  if (taskIDName == null) {
    return null;
  }

  TaskID taskID = TaskID.forName(taskIDName);

  String finishTime = line.get("FINISH_TIME");

  if (finishTime != null) {
    return new TaskUpdatedEvent(taskID, Long.parseLong(finishTime));
  }

  return null;
}
项目:big-c    文件:TopologyBuilder.java   
/**
 * Process one {@link HistoryEvent}
 * 
 * @param event
 *          The {@link HistoryEvent} to be processed.
 */
public void process(HistoryEvent event) {
  if (event instanceof TaskAttemptFinishedEvent) {
    processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
  } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) {
    processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
  } else if (event instanceof TaskStartedEvent) {
    processTaskStartedEvent((TaskStartedEvent) event);
  } else if (event instanceof MapAttemptFinishedEvent) {
    processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
  } else if (event instanceof ReduceAttemptFinishedEvent) {
    processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
  }

  // I do NOT expect these if statements to be exhaustive.
}
项目:big-c    文件:Job20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  JobID jobID = JobID.forName(jobIDName);

  if (jobIDName == null) {
    return null;
  }

  String priority = line.get("JOB_PRIORITY");

  if (priority != null) {
    return new JobPriorityChangeEvent(jobID, JobPriority.valueOf(priority));
  }

  return null;
}
项目:big-c    文件:Job20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String launchTime = line.get("LAUNCH_TIME");
  String status = line.get("JOB_STATUS");
  String totalMaps = line.get("TOTAL_MAPS");
  String totalReduces = line.get("TOTAL_REDUCES");
  String uberized = line.get("UBERIZED");

  if (launchTime != null && totalMaps != null && totalReduces != null) {
    return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer
        .parseInt(totalMaps), Integer.parseInt(totalReduces), status,
        Boolean.parseBoolean(uberized));
  }

  return null;
}
项目:big-c    文件:Job20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String status = line.get("JOB_STATUS");

  if (status != null) {
    return new JobStatusChangedEvent(jobID, status);
  }

  return null;
}
项目:big-c    文件:Job20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String launchTime = line.get("LAUNCH_TIME");

  if (launchTime != null) {
    Job20LineHistoryEventEmitter that =
        (Job20LineHistoryEventEmitter) thatg;
    return new JobInfoChangeEvent(jobID, that.originalSubmitTime, Long
        .parseLong(launchTime));
  }

  return null;
}
项目:big-c    文件:Job20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String finishTime = line.get("FINISH_TIME");

  String status = line.get("JOB_STATUS");

  String finishedMaps = line.get("FINISHED_MAPS");
  String finishedReduces = line.get("FINISHED_REDUCES");

  if (status != null && !status.equalsIgnoreCase("success")
      && finishTime != null && finishedMaps != null
      && finishedReduces != null) {
    return new JobUnsuccessfulCompletionEvent(jobID, Long
        .parseLong(finishTime), Integer.parseInt(finishedMaps), Integer
        .parseInt(finishedReduces), status);
  }

  return null;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:Task20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
    HistoryEventEmitter thatg) {
  if (taskIDName == null) {
    return null;
  }

  TaskID taskID = TaskID.forName(taskIDName);

  String taskType = line.get("TASK_TYPE");
  String startTime = line.get("START_TIME");
  String splits = line.get("SPLITS");

  if (startTime != null && taskType != null) {
    Task20LineHistoryEventEmitter that =
        (Task20LineHistoryEventEmitter) thatg;

    that.originalStartTime = Long.parseLong(startTime);
    that.originalTaskType =
        Version20LogInterfaceUtils.get20TaskType(taskType);

    return new TaskStartedEvent(taskID, that.originalStartTime,
        that.originalTaskType, splits);
  }

  return null;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:Task20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
    HistoryEventEmitter thatg) {
  if (taskIDName == null) {
    return null;
  }

  TaskID taskID = TaskID.forName(taskIDName);

  String finishTime = line.get("FINISH_TIME");

  if (finishTime != null) {
    return new TaskUpdatedEvent(taskID, Long.parseLong(finishTime));
  }

  return null;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TopologyBuilder.java   
/**
 * Process one {@link HistoryEvent}
 * 
 * @param event
 *          The {@link HistoryEvent} to be processed.
 */
public void process(HistoryEvent event) {
  if (event instanceof TaskAttemptFinishedEvent) {
    processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
  } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) {
    processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
  } else if (event instanceof TaskStartedEvent) {
    processTaskStartedEvent((TaskStartedEvent) event);
  } else if (event instanceof MapAttemptFinishedEvent) {
    processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
  } else if (event instanceof ReduceAttemptFinishedEvent) {
    processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
  }

  // I do NOT expect these if statements to be exhaustive.
}
项目:hadoop-2.6.0-cdh5.4.3    文件:Job20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  JobID jobID = JobID.forName(jobIDName);

  if (jobIDName == null) {
    return null;
  }

  String priority = line.get("JOB_PRIORITY");

  if (priority != null) {
    return new JobPriorityChangeEvent(jobID, JobPriority.valueOf(priority));
  }

  return null;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:Job20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String launchTime = line.get("LAUNCH_TIME");
  String status = line.get("JOB_STATUS");
  String totalMaps = line.get("TOTAL_MAPS");
  String totalReduces = line.get("TOTAL_REDUCES");
  String uberized = line.get("UBERIZED");

  if (launchTime != null && totalMaps != null && totalReduces != null) {
    return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer
        .parseInt(totalMaps), Integer.parseInt(totalReduces), status,
        Boolean.parseBoolean(uberized));
  }

  return null;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:Job20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String status = line.get("JOB_STATUS");

  if (status != null) {
    return new JobStatusChangedEvent(jobID, status);
  }

  return null;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:Job20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String launchTime = line.get("LAUNCH_TIME");

  if (launchTime != null) {
    Job20LineHistoryEventEmitter that =
        (Job20LineHistoryEventEmitter) thatg;
    return new JobInfoChangeEvent(jobID, that.originalSubmitTime, Long
        .parseLong(launchTime));
  }

  return null;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:Job20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String finishTime = line.get("FINISH_TIME");

  String status = line.get("JOB_STATUS");

  String finishedMaps = line.get("FINISHED_MAPS");
  String finishedReduces = line.get("FINISHED_REDUCES");

  if (status != null && !status.equalsIgnoreCase("success")
      && finishTime != null && finishedMaps != null
      && finishedReduces != null) {
    return new JobUnsuccessfulCompletionEvent(jobID, Long
        .parseLong(finishTime), Integer.parseInt(finishedMaps), Integer
        .parseInt(finishedReduces), status);
  }

  return null;
}
项目:hadoop-plus    文件:Task20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
    HistoryEventEmitter thatg) {
  if (taskIDName == null) {
    return null;
  }

  TaskID taskID = TaskID.forName(taskIDName);

  String taskType = line.get("TASK_TYPE");
  String startTime = line.get("START_TIME");
  String splits = line.get("SPLITS");

  if (startTime != null && taskType != null) {
    Task20LineHistoryEventEmitter that =
        (Task20LineHistoryEventEmitter) thatg;

    that.originalStartTime = Long.parseLong(startTime);
    that.originalTaskType =
        Version20LogInterfaceUtils.get20TaskType(taskType);

    return new TaskStartedEvent(taskID, that.originalStartTime,
        that.originalTaskType, splits);
  }

  return null;
}
项目:hadoop-plus    文件:Task20LineHistoryEventEmitter.java   
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
    HistoryEventEmitter thatg) {
  if (taskIDName == null) {
    return null;
  }

  TaskID taskID = TaskID.forName(taskIDName);

  String finishTime = line.get("FINISH_TIME");

  if (finishTime != null) {
    return new TaskUpdatedEvent(taskID, Long.parseLong(finishTime));
  }

  return null;
}