@Override public void handle(JobHistoryEvent jhEvent) { if (jhEvent.getType() != EventType.JOB_SUBMITTED) { return; } JobSubmittedEvent jsEvent = (JobSubmittedEvent) jhEvent.getHistoryEvent(); if (!workflowId.equals(jsEvent.getWorkflowId())) { setAssertValue(false); return; } if (!workflowName.equals(jsEvent.getWorkflowName())) { setAssertValue(false); return; } if (!workflowNodeName.equals(jsEvent.getWorkflowNodeName())) { setAssertValue(false); return; } String[] wrkflowAdj = workflowAdjacencies.split(" "); String[] jswrkflowAdj = jsEvent.getWorkflowAdjacencies().split(" "); Arrays.sort(wrkflowAdj); Arrays.sort(jswrkflowAdj); if (!Arrays.equals(wrkflowAdj, jswrkflowAdj)) { setAssertValue(false); return; } if (!workflowTags.equals(jsEvent.getWorkflowTags())) { setAssertValue(false); return; } setAssertValue(true); }
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName, HistoryEventEmitter thatg) { JobID jobID = JobID.forName(jobIDName); if (jobIDName == null) { return null; } String submitTime = line.get("SUBMIT_TIME"); String jobConf = line.get("JOBCONF"); String user = line.get("USER"); if (user == null) { user = "nulluser"; } String jobName = line.get("JOBNAME"); String jobQueueName = line.get("JOB_QUEUE");// could be null String workflowId = line.get("WORKFLOW_ID"); if (workflowId == null) { workflowId = ""; } String workflowName = line.get("WORKFLOW_NAME"); if (workflowName == null) { workflowName = ""; } String workflowNodeName = line.get("WORKFLOW_NODE_NAME"); if (workflowNodeName == null) { workflowNodeName = ""; } String workflowAdjacencies = line.get("WORKFLOW_ADJACENCIES"); if (workflowAdjacencies == null) { workflowAdjacencies = ""; } String workflowTags = line.get("WORKFLOW_TAGS"); if (workflowTags == null) { workflowTags = ""; } if (submitTime != null) { Job20LineHistoryEventEmitter that = (Job20LineHistoryEventEmitter) thatg; that.originalSubmitTime = Long.parseLong(submitTime); Map<JobACL, AccessControlList> jobACLs = new HashMap<JobACL, AccessControlList>(); return new JobSubmittedEvent(jobID, jobName, user, that.originalSubmitTime, jobConf, jobACLs, jobQueueName, workflowId, workflowName, workflowNodeName, workflowAdjacencies, workflowTags); } return null; }
private void processJobSubmittedEvent(JobSubmittedEvent event) { result.setJobID(event.getJobId().toString()); result.setJobName(event.getJobName()); result.setUser(event.getUserName()); result.setSubmitTime(event.getSubmitTime()); result.putJobConfPath(event.getJobConfPath()); result.putJobAcls(event.getJobAcls()); // set the queue name if existing String queue = event.getJobQueueName(); if (queue != null) { result.setQueue(queue); } }
void logSubmissionToJobHistory() throws IOException { // log job info String username = conf.getUser(); if (username == null) { username = ""; } String jobname = conf.getJobName(); String jobQueueName = conf.getQueueName(); setUpLocalizedJobConf(conf, jobId); jobHistory.setupEventWriter(jobId, conf); JobSubmittedEvent jse = new JobSubmittedEvent(jobId, jobname, username, this.startTime, jobFile.toString(), status.getJobACLs(), jobQueueName); jobHistory.logEvent(jse, jobId); }
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName, HistoryEventEmitter thatg) { JobID jobID = JobID.forName(jobIDName); if (jobIDName == null) { return null; } String submitTime = line.get("SUBMIT_TIME"); String jobConf = line.get("JOBCONF"); String user = line.get("USER"); if (user == null) { user = "nulluser"; } String jobName = line.get("JOBNAME"); String jobQueueName = line.get("JOB_QUEUE");// could be null if (submitTime != null) { Job20LineHistoryEventEmitter that = (Job20LineHistoryEventEmitter) thatg; that.originalSubmitTime = Long.parseLong(submitTime); Map<JobACL, AccessControlList> jobACLs = new HashMap<JobACL, AccessControlList>(); return new JobSubmittedEvent(jobID, jobName, user, that.originalSubmitTime, jobConf, jobACLs, jobQueueName); } return null; }
private void processJobSubmittedEvent(JobSubmittedEvent event) { result.setJobID(event.getJobId().toString()); result.setJobName(event.getJobName()); result.setUser(event.getUserName()); result.setSubmitTime(event.getSubmitTime()); // job queue name is set when conf file is processed. // See JobBuilder.process(Properties) method for details. }
/** * Process one {@link HistoryEvent} * * @param event * The {@link HistoryEvent} to be processed. */ public void process(HistoryEvent event) { if (finalized) { throw new IllegalStateException( "JobBuilder.process(HistoryEvent event) called after ParsedJob built"); } // these are in lexicographical order by class name. if (event instanceof AMStartedEvent) { // ignore this event as Rumen currently doesnt need this event //TODO Enhance Rumen to process this event and capture restarts return; } else if (event instanceof NormalizedResourceEvent) { // Log an warn message as NormalizedResourceEvent shouldn't be written. LOG.warn("NormalizedResourceEvent should be ignored in history server."); } else if (event instanceof JobFinishedEvent) { processJobFinishedEvent((JobFinishedEvent) event); } else if (event instanceof JobInfoChangeEvent) { processJobInfoChangeEvent((JobInfoChangeEvent) event); } else if (event instanceof JobInitedEvent) { processJobInitedEvent((JobInitedEvent) event); } else if (event instanceof JobPriorityChangeEvent) { processJobPriorityChangeEvent((JobPriorityChangeEvent) event); } else if (event instanceof JobQueueChangeEvent) { processJobQueueChangeEvent((JobQueueChangeEvent) event); } else if (event instanceof JobStatusChangedEvent) { processJobStatusChangedEvent((JobStatusChangedEvent) event); } else if (event instanceof JobSubmittedEvent) { processJobSubmittedEvent((JobSubmittedEvent) event); } else if (event instanceof JobUnsuccessfulCompletionEvent) { processJobUnsuccessfulCompletionEvent((JobUnsuccessfulCompletionEvent) event); } else if (event instanceof MapAttemptFinishedEvent) { processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event); } else if (event instanceof ReduceAttemptFinishedEvent) { processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event); } else if (event instanceof TaskAttemptFinishedEvent) { processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event); } else if (event instanceof TaskAttemptStartedEvent) { processTaskAttemptStartedEvent((TaskAttemptStartedEvent) event); } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) { processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event); } else if (event instanceof TaskFailedEvent) { processTaskFailedEvent((TaskFailedEvent) event); } else if (event instanceof TaskFinishedEvent) { processTaskFinishedEvent((TaskFinishedEvent) event); } else if (event instanceof TaskStartedEvent) { processTaskStartedEvent((TaskStartedEvent) event); } else if (event instanceof TaskUpdatedEvent) { processTaskUpdatedEvent((TaskUpdatedEvent) event); } else throw new IllegalArgumentException( "JobBuilder.process(HistoryEvent): unknown event type:" + event.getEventType() + " for event:" + event); }
/** * Process one {@link HistoryEvent} * * @param event * The {@link HistoryEvent} to be processed. */ public void process(HistoryEvent event) { if (finalized) { throw new IllegalStateException( "JobBuilder.process(HistoryEvent event) called after ParsedJob built"); } // these are in lexicographical order by class name. if (event instanceof AMStartedEvent) { // ignore this event as Rumen currently doesnt need this event //TODO Enhance Rumen to process this event and capture restarts return; } else if (event instanceof JobFinishedEvent) { processJobFinishedEvent((JobFinishedEvent) event); } else if (event instanceof JobInfoChangeEvent) { processJobInfoChangeEvent((JobInfoChangeEvent) event); } else if (event instanceof JobInitedEvent) { processJobInitedEvent((JobInitedEvent) event); } else if (event instanceof JobPriorityChangeEvent) { processJobPriorityChangeEvent((JobPriorityChangeEvent) event); } else if (event instanceof JobStatusChangedEvent) { processJobStatusChangedEvent((JobStatusChangedEvent) event); } else if (event instanceof JobSubmittedEvent) { processJobSubmittedEvent((JobSubmittedEvent) event); } else if (event instanceof JobUnsuccessfulCompletionEvent) { processJobUnsuccessfulCompletionEvent((JobUnsuccessfulCompletionEvent) event); } else if (event instanceof MapAttemptFinishedEvent) { processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event); } else if (event instanceof ReduceAttemptFinishedEvent) { processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event); } else if (event instanceof TaskAttemptFinishedEvent) { processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event); } else if (event instanceof TaskAttemptStartedEvent) { processTaskAttemptStartedEvent((TaskAttemptStartedEvent) event); } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) { processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event); } else if (event instanceof TaskFailedEvent) { processTaskFailedEvent((TaskFailedEvent) event); } else if (event instanceof TaskFinishedEvent) { processTaskFinishedEvent((TaskFinishedEvent) event); } else if (event instanceof TaskStartedEvent) { processTaskStartedEvent((TaskStartedEvent) event); } else if (event instanceof TaskUpdatedEvent) { processTaskUpdatedEvent((TaskUpdatedEvent) event); } else throw new IllegalArgumentException( "JobBuilder.process(HistoryEvent): unknown event type"); }