public CapacitySchedulerQueue(String queueName, CapacitySchedulerConf conf) { this.queueName = queueName; // Do not allow changes to 'supportsPriorities' supportsPriorities = conf.isPrioritySupported(queueName); initializeQueue(conf); if (supportsPriorities) { // use the default priority-aware comparator comparator = JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR; } else { comparator = STARTTIME_JOB_COMPARATOR; } this.waitingJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator); this.initializingJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator); this.runningJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator); this.mapSlots = new SlotsUsage(); this.reduceSlots = new SlotsUsage(); }
private void jobCompleted(JobInProgress job, JobSchedulingInfo oldInfo, CapacitySchedulerQueue queue, int runState) { LOG.info("Job " + job.getJobID().toString() + " submitted to queue " + job.getProfile().getQueueName() + " has completed"); //remove jobs from both queue's a job can be in //running and waiting queue at the same time. JobInProgress waitingJob = queue.removeWaitingJob(oldInfo, runState); JobInProgress initializingJob = queue.removeInitializingJob(oldInfo, runState); JobInProgress runningJob = queue.removeRunningJob(oldInfo, runState); // let scheduler know if necessary // sometimes this isn't necessary if the job was rejected during submission if (runningJob != null || initializingJob != null || waitingJob != null) { scheduler.jobCompleted(job); } }
private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo, CapacitySchedulerQueue queue, int runState) { if(queue.removeWaitingJob(oldInfo, runState) != null) { try { queue.addWaitingJob(job); } catch (IOException ioe) { // Ignore, cannot happen LOG.warn("Couldn't change priority!"); return; } } if (queue.removeInitializingJob(oldInfo, runState) != null) { queue.addInitializingJob(job); } if(queue.removeRunningJob(oldInfo, runState) != null) { queue.addRunningJob(job); } }
private void jobStateChanged(JobStatusChangeEvent event, QueueInfo qi) { JobInProgress job = event.getJobInProgress(); JobSchedulingInfo oldJobStateInfo = new JobSchedulingInfo(event.getOldStatus()); // Check if the ordering of the job has changed // For now priority and start-time can change the job ordering if (event.getEventType() == EventType.PRIORITY_CHANGED || event.getEventType() == EventType.START_TIME_CHANGED) { // Make a priority change reorderJobs(job, oldJobStateInfo, qi); } else if (event.getEventType() == EventType.RUN_STATE_CHANGED) { // Check if the job is complete int runState = job.getStatus().getRunState(); if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED || runState == JobStatus.KILLED) { jobCompleted(job, oldJobStateInfo, qi); } else if (runState == JobStatus.RUNNING) { makeJobRunning(job, oldJobStateInfo, qi); } } }
/** * This method returns the first job in the queue and removes the same. * * @param queue * queue name * @return First job in the queue and removes it. */ private JobInProgress getFirstJobInQueue(String queue) { Map<JobSchedulingInfo, JobInProgress> jobsList = jobsPerQueue.get(queue); synchronized (jobsList) { if (jobsList.isEmpty()) { return null; } Iterator<JobInProgress> jobIterator = jobsList.values().iterator(); JobInProgress job = jobIterator.next(); jobIterator.remove(); currentJobCount.getAndDecrement(); return job; } }
void addJobsToQueue(String queue, JobInProgress job) { Map<JobSchedulingInfo, JobInProgress> jobs = jobsPerQueue.get(queue); if (jobs == null) { LOG.error("Invalid queue passed to the thread : " + queue + " For job :: " + job.getJobID()); } synchronized (jobs) { JobSchedulingInfo schedInfo = new JobSchedulingInfo(job); jobs.put(schedInfo, job); currentJobCount.getAndIncrement(); } }
void addQueue(String queueName) { CapacitySchedulerQueue queue = jobQueueManager.getQueue(queueName); TreeMap<JobSchedulingInfo, JobInProgress> jobs = new TreeMap<JobSchedulingInfo, JobInProgress>(queue.getComparator()); jobsPerQueue.put(queueName, jobs); }
public int compare(JobSchedulingInfo o1, JobSchedulingInfo o2) { // the job that started earlier wins if (o1.getStartTime() < o2.getStartTime()) { return -1; } else { return (o1.getStartTime() == o2.getStartTime() ? o1.getJobID().compareTo(o2.getJobID()) : 1); } }
public void jobInitializing(JobSchedulingInfo jobSchedInfo, JobInProgress job) { if (!initializingJobs.containsKey(jobSchedInfo)) { initializingJobs.put(jobSchedInfo, job); activeTasks += job.desiredTasks(); } }
synchronized JobInProgress removeInitializingJob( JobSchedulingInfo jobSchedInfo, int runState) { JobInProgress job = initializingJobs.remove(jobSchedInfo); if (job != null) { String user = job.getProfile().getUser(); UserInfo userInfo = users.get(user); userInfo.removeInitializingJob(jobSchedInfo); // Decrement counts if the job is killed _while_ it was selected for // initialization, but aborted // NOTE: addRunningJob calls removeInitializingJob with runState==RUNNING if (runState != JobStatus.RUNNING) { finishJob(jobSchedInfo, job); } if (LOG.isDebugEnabled()) { LOG.debug("removeInitializingJob:" + " job=" + job.getJobID() + " user=" + user + " queue=" + queueName + " qWaitJobs=" + getNumWaitingJobs() + " qInitJobs=" + getNumInitializingJobs()+ " qRunJobs=" + getNumRunningJobs() + " qActiveTasks=" + getNumActiveTasks() + " uWaitJobs=" + getNumWaitingJobsByUser(user) + " uInitJobs=" + getNumInitializingJobsByUser(user) + " uRunJobs=" + getNumRunningJobsByUser(user) + " uActiveTasks=" + getNumActiveTasksByUser(user) ); } } return job; }
synchronized void addRunningJob(JobInProgress job) { JobSchedulingInfo jobSchedInfo = new JobSchedulingInfo(job); if (runningJobs.containsKey(jobSchedInfo)) { LOG.info("job " + job.getJobID() + " already running in queue'" + queueName + "'!"); return; } // Mark the job as running runningJobs.put(jobSchedInfo,job); // Update user stats String user = job.getProfile().getUser(); UserInfo userInfo = users.get(user); userInfo.jobInitialized(jobSchedInfo, job); if (LOG.isDebugEnabled()) { LOG.debug("addRunningJob:" + " job=" + job.getJobID() + " user=" + user + " queue=" + queueName + " qWaitJobs=" + getNumWaitingJobs() + " qInitJobs=" + getNumInitializingJobs()+ " qRunJobs=" + getNumRunningJobs() + " qActiveTasks=" + getNumActiveTasks() + " uWaitJobs=" + getNumWaitingJobsByUser(user) + " uInitJobs=" + getNumInitializingJobsByUser(user) + " uRunJobs=" + getNumRunningJobsByUser(user) + " uActiveTasks=" + getNumActiveTasksByUser(user) ); } // Remove from 'initializing' list // Note that at this point job.status.state != RUNNING, // however, logically it is a reasonable state to pass in to ensure // that removeInitializingJob doesn't double-decrement // the relevant queue/user counters removeInitializingJob(jobSchedInfo, JobStatus.RUNNING); }
synchronized private void addJob(JobSchedulingInfo jobSchedInfo, JobInProgress job) { // Update queue stats activeTasks += job.desiredTasks(); // Update user stats String user = job.getProfile().getUser(); UserInfo userInfo = users.get(user); userInfo.jobInitializing(jobSchedInfo, job); }
synchronized private void finishJob(JobSchedulingInfo jobSchedInfo, JobInProgress job) { // Update user stats String user = job.getProfile().getUser(); UserInfo userInfo = users.get(user); userInfo.jobCompleted(jobSchedInfo, job); if (userInfo.isInactive()) { users.remove(userInfo); } // Update queue stats activeTasks -= job.desiredTasks(); }
synchronized JobInProgress removeRunningJob(JobSchedulingInfo jobSchedInfo, int runState) { JobInProgress job = runningJobs.remove(jobSchedInfo); // We have to be careful, we might be trying to remove a job // which might not have been initialized if (job != null) { String user = job.getProfile().getUser(); finishJob(jobSchedInfo, job); if (LOG.isDebugEnabled()) { LOG.debug("removeRunningJob:" + " job=" + job.getJobID() + " user=" + user + " queue=" + queueName + " qWaitJobs=" + getNumWaitingJobs() + " qInitJobs=" + getNumInitializingJobs()+ " qRunJobs=" + getNumRunningJobs() + " qActiveTasks=" + getNumActiveTasks() + " uWaitJobs=" + getNumWaitingJobsByUser(user) + " uInitJobs=" + getNumInitializingJobsByUser(user) + " uRunJobs=" + getNumRunningJobsByUser(user) + " uActiveTasks=" + getNumActiveTasksByUser(user) ); } } return job; }
synchronized void addWaitingJob(JobInProgress job) throws IOException { JobSchedulingInfo jobSchedInfo = new JobSchedulingInfo(job); if (waitingJobs.containsKey(jobSchedInfo)) { LOG.info("job " + job.getJobID() + " already waiting in queue '" + queueName + "'!"); return; } String user = job.getProfile().getUser(); // Check acceptance limits checkJobSubmissionLimits(job, user); waitingJobs.put(jobSchedInfo, job); // Update user stats UserInfo userInfo = users.get(user); if (userInfo == null) { userInfo = new UserInfo(comparator); users.put(user, userInfo); } userInfo.jobAdded(jobSchedInfo, job); if (LOG.isDebugEnabled()) { LOG.debug("addWaitingJob:" + " job=" + job.getJobID() + " user=" + user + " queue=" + queueName + " qWaitJobs=" + getNumWaitingJobs() + " qInitJobs=" + getNumInitializingJobs()+ " qRunJobs=" + getNumRunningJobs() + " qActiveTasks=" + getNumActiveTasks() + " uWaitJobs=" + getNumWaitingJobsByUser(user) + " uInitJobs=" + getNumInitializingJobsByUser(user) + " uRunJobs=" + getNumRunningJobsByUser(user) + " uActiveTasks=" + getNumActiveTasksByUser(user) ); } }
synchronized JobInProgress removeWaitingJob(JobSchedulingInfo jobSchedInfo, int unused) { JobInProgress job = waitingJobs.remove(jobSchedInfo); if (job != null) { String user = job.getProfile().getUser(); UserInfo userInfo = users.get(user); userInfo.removeWaitingJob(jobSchedInfo); if (LOG.isDebugEnabled()) { LOG.debug("removeWaitingJob:" + " job=" + job.getJobID() + " user=" + user + " queue=" + queueName + " qWaitJobs=" + getNumWaitingJobs() + " qInitJobs=" + getNumInitializingJobs()+ " qRunJobs=" + getNumRunningJobs() + " qActiveTasks=" + getNumActiveTasks() + " uWaitJobs=" + getNumWaitingJobsByUser(user) + " uInitJobs=" + getNumInitializingJobsByUser(user) + " uRunJobs=" + getNumRunningJobsByUser(user) + " uActiveTasks=" + getNumActiveTasksByUser(user) ); } } return job; }
private void makeJobRunning(JobInProgress job, JobSchedulingInfo oldInfo, CapacitySchedulerQueue queue) { // Removing of the job from job list is responsibility of the //initialization poller. // Add the job to the running queue queue.addRunningJob(job); }
/** * This method returns the first job in the queue and removes the same. * * @param queue * queue name * @return First job in the queue and removes it. */ private JobInProgress getFirstJobInQueue(String queue) { TreeMap<JobSchedulingInfo, JobInProgress> jobsList = jobsPerQueue .get(queue); synchronized (jobsList) { if (jobsList.isEmpty()) { return null; } Iterator<JobInProgress> jobIterator = jobsList.values().iterator(); JobInProgress job = jobIterator.next(); jobIterator.remove(); currentJobCount.getAndDecrement(); return job; } }
void addJobsToQueue(String queue, JobInProgress job) { TreeMap<JobSchedulingInfo, JobInProgress> jobs = jobsPerQueue .get(queue); if (jobs == null) { LOG.error("Invalid queue passed to the thread : " + queue + " For job :: " + job.getJobID()); } synchronized (jobs) { JobSchedulingInfo schedInfo = new JobSchedulingInfo(job); jobs.put(schedInfo, job); currentJobCount.getAndIncrement(); } }
QueueInfo(boolean prio) { this.supportsPriorities = prio; if (supportsPriorities) { // use the default priority-aware comparator comparator = JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR; } else { comparator = STARTTIME_JOB_COMPARATOR; } waitingJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator); runningJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator); }
private void jobCompleted(JobInProgress job, JobSchedulingInfo oldInfo, QueueInfo qi) { LOG.info("Job " + job.getJobID().toString() + " submitted to queue " + job.getProfile().getQueueName() + " has completed"); //remove jobs from both queue's a job can be in //running and waiting queue at the same time. qi.removeRunningJob(oldInfo); qi.removeWaitingJob(oldInfo); // let scheduler know scheduler.jobCompleted(job); }
private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo, QueueInfo qi) { if(qi.removeWaitingJob(oldInfo) != null) { qi.addWaitingJob(job); } if(qi.removeRunningJob(oldInfo) != null) { qi.addRunningJob(job); } }
private void makeJobRunning(JobInProgress job, JobSchedulingInfo oldInfo, QueueInfo qi) { // Removing of the job from job list is responsibility of the //initialization poller. // Add the job to the running queue qi.addRunningJob(job); }