private void spawnNewJvm(JobID jobId, JvmEnv env, TaskRunner t) { JvmRunner jvmRunner = new JvmRunner(env, jobId, t.getTask()); jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner); //spawn the JVM in a new thread. Note that there will be very little //extra overhead of launching the new thread for a new JVM since //most of the cost is involved in launching the process. Moreover, //since we are going to be using the JVM for running many tasks, //the thread launch cost becomes trivial when amortized over all //tasks. Doing it this way also keeps code simple. jvmRunner.setDaemon(true); jvmRunner.setName("JVM Runner " + jvmRunner.jvmId + " spawned."); setRunningTaskForJvm(jvmRunner.jvmId, t); LOG.info(jvmRunner.getName()); jvmRunner.start(); }
synchronized public TaskInProgress getTaskForJvm(JVMId jvmId) throws IOException { /*if (jvmToRunningTask.containsKey(jvmId)) { //Incase of JVM reuse, tasks are returned to previously launched //JVM via this method. However when a new task is launched //the task being returned has to be initialized. TaskRunner taskRunner = jvmToRunningTask.get(jvmId); JvmRunner jvmRunner = jvmIdToRunner.get(jvmId); Task task = taskRunner.getTaskInProgress().getTask(); jvmRunner.taskGiven(task); return taskRunner.getTaskInProgress(); }*/ if (jvmToPendingTasks.containsKey(jvmId)) { //Incase of JVM reuse, tasks are returned to previously launched //JVM via this method. However when a new task is launched //the task being returned has to be initialized. List<TaskRunner> taskRunners = jvmToPendingTasks.get(jvmId); if (taskRunners.size() == 0) { return null; } JvmRunner jvmRunner = jvmIdToRunner.get(jvmId); TaskRunner tr= taskRunners.remove(0); if (!this.jvmToRunningTasks.containsKey(jvmId)) { this.jvmToRunningTasks.put(jvmId, new ArrayList<TaskRunner>()); } this.jvmToRunningTasks.get(jvmId).add(tr); TaskInProgress tip = tr.getTaskInProgress(); jvmRunner.taskGiven(tip.getTask()); return tip; } return null; }
synchronized public void taskFinished(TaskRunner tr) { JVMId jvmId = runningTaskToJvm.remove(tr); if (jvmId != null) { jvmToRunningTasks.get(jvmId).remove(tr); if (jvmToRunningTasks.get(jvmId).size() == 0) { jvmToRunningTasks.remove(jvmId); } JvmRunner jvmRunner; if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) { jvmRunner.taskRan( tr.getTask()); } } }
synchronized public void killJvm(JVMId jvmId) throws IOException, InterruptedException { JvmRunner jvmRunner; if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) { killJvmRunner(jvmRunner); } }
synchronized public void stop() throws IOException, InterruptedException { //since the kill() method invoked later on would remove //an entry from the jvmIdToRunner map, we create a //copy of the values and iterate over it (if we don't //make a copy, we will encounter concurrentModification //exception List <JvmRunner> list = new ArrayList<JvmRunner>(); list.addAll(jvmIdToRunner.values()); for (JvmRunner jvm : list) { killJvmRunner(jvm); } }
private synchronized void reapJvm( TaskRunner t, JvmEnv env) throws IOException, InterruptedException { if (t.getTaskInProgress().wasKilled()) { //the task was killed in-flight //no need to do the rest of the operations return; } //boolean spawnNewJvm = false; JobID jobId = t.getTask().getJobID(); int numJvmsSpawned = getSpawnedJvmNum(t.getTask().isMapTask()); JvmRunner runnerToKill = null; if ((t.getTask().isMapTask()&& numJvmsSpawned < maxMapJvms) || (!t.getTask().isMapTask()&& numJvmsSpawned < maxReduceJvms)) { Iterator<Map.Entry<JVMId, JvmRunner>> jvmIter = jvmIdToRunner.entrySet().iterator(); while (jvmIter.hasNext()) { JvmRunner jvmRunner = jvmIter.next().getValue(); JobID jId = jvmRunner.jvmId.getJobId(); //look for a free JVM for this job; if one exists then just break if (jId.equals(jobId)){ setRunningTaskForJvm(jvmRunner.jvmId, t); //reserve the JVM LOG.info("No new JVM spawned for jobId/taskid: " + jobId+"/"+t.getTask().getTaskID() + ". Attempting to reuse: " + jvmRunner.jvmId); return; } } spawnNewJvm(jobId, env, t); return; } LOG.fatal("Inconsistent state!!! " + "JVM Manager reached an unstable state " + "while reaping a JVM for task: " + t.getTask().getTaskID()+ " " + getDetails() + ". Aborting. "); System.exit(-1); }
public JvmRunner(JvmEnv env, JobID jobId, Task firstTask) { this.env = env; this.jvmId = new JVMId(jobId, rand.nextInt()); this.numTasksToRun = env.conf.getNumTasksToExecutePerJvm(); this.firstTask = firstTask; LOG.info("In JvmRunner constructed JVM ID: " + jvmId); }
/** * Create a bunch of tasks and use a special hash map to detect * racy access to the various internal data structures of JvmManager. * (Regression test for MAPREDUCE-2224) */ @Test public void testForRaces() throws Exception { JvmManagerForType mapJvmManager = jvmManager .getJvmManagerForType(TaskType.MAP); // Sub out the HashMaps for maps that will detect racy access. mapJvmManager.jvmToRunningTask = new RaceHashMap<JVMId, TaskRunner>(); mapJvmManager.runningTaskToJvm = new RaceHashMap<TaskRunner, JVMId>(); mapJvmManager.jvmIdToRunner = new RaceHashMap<JVMId, JvmRunner>(); // Launch a bunch of JVMs, but only allow MAP_SLOTS to run at once. final ExecutorService exec = Executors.newFixedThreadPool(MAP_SLOTS); final AtomicReference<Throwable> failed = new AtomicReference<Throwable>(); for (int i = 0; i < MAP_SLOTS*5; i++) { JobConf taskConf = new JobConf(ttConf); TaskAttemptID attemptID = new TaskAttemptID("test", 0, TaskType.MAP, i, 0); Task task = new MapTask(null, attemptID, i, null, 1); task.setConf(taskConf); TaskInProgress tip = tt.new TaskInProgress(task, taskConf); File pidFile = new File(TEST_DIR, "pid_" + i); final TaskRunner taskRunner = task.createRunner(tt, tip); // launch a jvm which sleeps for 60 seconds final Vector<String> vargs = new Vector<String>(2); vargs.add(writeScript("script_" + i, "echo hi\n", pidFile).getAbsolutePath()); final File workDir = new File(TEST_DIR, "work_" + i); workDir.mkdir(); final File stdout = new File(TEST_DIR, "stdout_" + i); final File stderr = new File(TEST_DIR, "stderr_" + i); // launch the process and wait in a thread, till it finishes Runnable launcher = new Runnable() { public void run() { try { taskRunner.launchJvmAndWait(null, vargs, stdout, stderr, 100, workDir, null); } catch (Throwable t) { failed.compareAndSet(null, t); exec.shutdownNow(); return; } } }; exec.submit(launcher); } exec.shutdown(); exec.awaitTermination(3, TimeUnit.MINUTES); if (failed.get() != null) { throw new RuntimeException(failed.get()); } }
private synchronized void killJvmRunner(JvmRunner jvmRunner ) throws IOException, InterruptedException { jvmRunner.kill(); removeJvm(jvmRunner.jvmId); }
private synchronized void oldReapJvm( TaskRunner t, JvmEnv env) throws IOException, InterruptedException { if (t.getTaskInProgress().wasKilled()) { //the task was killed in-flight //no need to do the rest of the operations return; } boolean spawnNewJvm = false; JobID jobId = t.getTask().getJobID(); //Check whether there is a free slot to start a new JVM. //,or, Kill a (idle) JVM and launch a new one //When this method is called, we *must* // (1) spawn a new JVM (if we are below the max) // (2) find an idle JVM (that belongs to the same job), or, // (3) kill an idle JVM (from a different job) // (the order of return is in the order above) int numJvmsSpawned = getSpawnedJvmNum(t.getTask().isMapTask()); JvmRunner runnerToKill = null; if ((t.getTask().isMapTask()&& numJvmsSpawned >= maxMapJvms) || (t.getTask().isMapTask()&& numJvmsSpawned >= maxReduceJvms)) { //go through the list of JVMs for all jobs. Iterator<Map.Entry<JVMId, JvmRunner>> jvmIter = jvmIdToRunner.entrySet().iterator(); while (jvmIter.hasNext()) { JvmRunner jvmRunner = jvmIter.next().getValue(); JobID jId = jvmRunner.jvmId.getJobId(); //look for a free JVM for this job; if one exists then just break if (jId.equals(jobId) && !jvmRunner.isBusy() && !jvmRunner.ranAll()){ setRunningTaskForJvm(jvmRunner.jvmId, t); //reserve the JVM LOG.info("No new JVM spawned for jobId/taskid: " + jobId+"/"+t.getTask().getTaskID() + ". Attempting to reuse: " + jvmRunner.jvmId); return; } //Cases when a JVM is killed: // (1) the JVM under consideration belongs to the same job // (passed in the argument). In this case, kill only when // the JVM ran all the tasks it was scheduled to run (in terms // of count). // (2) the JVM under consideration belongs to a different job and is // currently not busy //But in both the above cases, we see if we can assign the current //task to an idle JVM (hence we continue the loop even on a match) if ((jId.equals(jobId) && jvmRunner.ranAll()) || (!jId.equals(jobId) && !jvmRunner.isBusy())) { runnerToKill = jvmRunner; spawnNewJvm = true; } } } else { spawnNewJvm = true; } if (spawnNewJvm) { if (runnerToKill != null) { LOG.info("Killing JVM: " + runnerToKill.jvmId); killJvmRunner(runnerToKill); } spawnNewJvm(jobId, env, t); return; } //*MUST* never reach this LOG.fatal("Inconsistent state!!! " + "JVM Manager reached an unstable state " + "while reaping a JVM for task: " + t.getTask().getTaskID()+ " " + getDetails() + ". Aborting. "); System.exit(-1); }