/** * Create a bunch of tasks and use a special hash map to detect * racy access to the various internal data structures of JvmManager. * (Regression test for MAPREDUCE-2224) */ @Test public void testForRaces() throws Exception { JvmManagerForType mapJvmManager = jvmManager .getJvmManagerForType(TaskType.MAP); // Sub out the HashMaps for maps that will detect racy access. mapJvmManager.jvmToRunningTask = new RaceHashMap<JVMId, TaskRunner>(); mapJvmManager.runningTaskToJvm = new RaceHashMap<TaskRunner, JVMId>(); mapJvmManager.jvmIdToRunner = new RaceHashMap<JVMId, JvmRunner>(); // Launch a bunch of JVMs, but only allow MAP_SLOTS to run at once. final ExecutorService exec = Executors.newFixedThreadPool(MAP_SLOTS); final AtomicReference<Throwable> failed = new AtomicReference<Throwable>(); for (int i = 0; i < MAP_SLOTS*5; i++) { JobConf taskConf = new JobConf(ttConf); TaskAttemptID attemptID = new TaskAttemptID("test", 0, TaskType.MAP, i, 0); Task task = new MapTask(null, attemptID, i, null, 1); task.setConf(taskConf); TaskInProgress tip = tt.new TaskInProgress(task, taskConf); File pidFile = new File(TEST_DIR, "pid_" + i); final TaskRunner taskRunner = task.createRunner(tt, tip); // launch a jvm which sleeps for 60 seconds final Vector<String> vargs = new Vector<String>(2); vargs.add(writeScript("script_" + i, "echo hi\n", pidFile).getAbsolutePath()); final File workDir = new File(TEST_DIR, "work_" + i); workDir.mkdir(); final File stdout = new File(TEST_DIR, "stdout_" + i); final File stderr = new File(TEST_DIR, "stderr_" + i); // launch the process and wait in a thread, till it finishes Runnable launcher = new Runnable() { public void run() { try { taskRunner.launchJvmAndWait(null, vargs, stdout, stderr, 100, workDir, null); } catch (Throwable t) { failed.compareAndSet(null, t); exec.shutdownNow(); return; } } }; exec.submit(launcher); } exec.shutdown(); exec.awaitTermination(3, TimeUnit.MINUTES); if (failed.get() != null) { throw new RuntimeException(failed.get()); } }