public void kill() { if (!killed) { killed = true; TaskController controller = tracker.getTaskController(); // Check inital context before issuing a kill to prevent situations // where kill is issued before task is launched. if (initalContext != null && initalContext.env != null) { initalContext.pid = jvmIdToPid.get(jvmId); initalContext.sleeptimeBeforeSigkill = tracker.getJobConf() .getLong(SLEEPTIME_BEFORE_SIGKILL_KEY, ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL); controller.destroyTaskJVM(initalContext); } else { LOG.info(String.format("JVM Not killed %s but just removed", jvmId .toString())); } removeJvm(jvmId); } }
@Override void terminateTask(TaskControllerContext context) { ShellCommandExecutor shexec = context.shExec; if (shexec != null) { Process process = shexec.getProcess(); if (Shell.WINDOWS) { // Currently we don't use setsid on WINDOWS. //So kill the process alone. if (process != null) { process.destroy(); } } else { // In addition to the task JVM, kill its subprocesses also. String pid = context.pid; if (pid != null) { if(ProcessTree.isSetsidAvailable) { ProcessTree.terminateProcessGroup(pid); }else { ProcessTree.terminateProcess(pid); } } } } }
@Override void killTask(TaskControllerContext context) { ShellCommandExecutor shexec = context.shExec; if (shexec != null) { if (Shell.WINDOWS) { //We don't do send kill process signal in case of windows as //already we have done a process.destroy() in termintateTaskJVM() return; } String pid = context.pid; if (pid != null) { if(ProcessTree.isSetsidAvailable) { ProcessTree.killProcessGroup(pid); }else { ProcessTree.killProcess(pid); } } else { LOG.warn("killTask: Failed to get pid from task context " + context); } } }
public void kill() { if (!killed) { killed = true; TaskController controller = tracker.getTaskController(); // Check inital context before issuing a kill to prevent situations // where kill is issued before task is launched. if (initalContext != null && initalContext.env != null) { initalContext.pid = jvmIdToPid.get(jvmId); initalContext.sleeptimeBeforeSigkill = tracker.getJobConf() .getLong("mapred.tasktracker.tasks.sleeptime-before-sigkill", ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL); controller.destroyTaskJVM(initalContext); } else { LOG.info(String.format("JVM Not killed %s but just removed", jvmId .toString())); } removeJvm(jvmId); } }
@Override void killTask(TaskControllerContext context) { ShellCommandExecutor shexec = context.shExec; if (shexec != null) { if (Shell.WINDOWS) { //We don't do send kill process signal in case of windows as //already we have done a process.destroy() in termintateTaskJVM() return; } String pid = context.pid; if (pid != null) { if(ProcessTree.isSetsidAvailable) { ProcessTree.killProcessGroup(pid); }else { ProcessTree.killProcess(pid); } } } }
@Override public void signalTask(String user, int taskPid, Signal signal) { if (ProcessTree.isSetsidAvailable) { ProcessTree.killProcessGroup(Integer.toString(taskPid), signal); } else { ProcessTree.killProcess(Integer.toString(taskPid), signal); } }
/** * Verifies if the subprocesses of the map task are killed properly. */ private static void validateKillingSubprocesses(RunningJob job, JobConf conf) throws IOException { // wait till the the job finishes while (!job.isComplete()) { try { Thread.sleep(500); } catch (InterruptedException e) { break; } } // Checking if the map task got killed or not assertTrue(!ProcessTree.isAlive(pid)); LOG.info("The map task is not alive after Job is completed, as expected."); // Checking if the descendant processes of map task are killed properly if(ProcessTree.isSetsidAvailable) { for(int i=0; i <= numLevelsOfSubProcesses; i++) { String childPid = UtilsForTests.getPidFromPidFile( scriptDirName + "/childPidFile" + i); LOG.info("pid of the descendant process at level " + i + "in the subtree of processes(with the map task as the root)" + " is " + childPid); assertTrue("Unexpected: The subprocess at level " + i + " in the subtree is alive after Job completion", !isAlive(childPid)); } } FileSystem fs = FileSystem.getLocal(mr.createJobConf()); if(fs.exists(scriptDir)) { fs.delete(scriptDir, true); } }
@Override boolean isTaskAlive(TaskControllerContext context) { String pid = context.pid; if (pid != null) { return ProcessTree.isAlive(pid); } else { LOG.warn("isTaskAlive: Failed to get pid from task context " + context); return false; } }
@Override void doStackTrace(TaskControllerContext context) { String pid = context.pid; if (pid != null) { ProcessTree.doStackTrace(pid); } else { LOG.warn("doStackTrace: Failed to get pid from task context " + context); } }
/** * Verifies if the subprocesses of the map task are killed properly. */ private static void validateKillingSubprocesses(RunningJob job, JobConf conf) throws IOException { // wait till the the job finishes while (!job.isComplete()) { try { Thread.sleep(500); } catch (InterruptedException e) { break; } } // Checking if the map task got killed or not assertTrue(!ProcessTree.isAlive(pid)); LOG.info("The map task is not alive after Job is completed, as expected."); // Checking if the descendant processes of map task are killed properly if(ProcessTree.isSetsidAvailable) { for(int i=0; i <= numLevelsOfSubProcesses; i++) { String childPid = TestProcfsBasedProcessTree.getPidFromPidFile( scriptDirName + "/childPidFile" + i); LOG.info("pid of the descendant process at level " + i + "in the subtree of processes(with the map task as the root)" + " is " + childPid); assertTrue("Unexpected: The subprocess at level " + i + " in the subtree is alive after Job completion", !isAlive(childPid)); } } FileSystem fs = FileSystem.getLocal(mr.createJobConf()); if(fs.exists(scriptDir)) { fs.delete(scriptDir, true); } }
static String buildCommandLine(List<String> setup, List<String> cmd, File stdoutFilename, File stderrFilename, long tailLength, boolean useSetSid) throws IOException { String stdout = FileUtil.makeShellPath(stdoutFilename); String stderr = FileUtil.makeShellPath(stderrFilename); StringBuilder mergedCmd = new StringBuilder(); if (!Shell.WINDOWS) { mergedCmd.append("export JVM_PID=`echo $$`\n"); } if (setup != null) { for (String s : setup) { mergedCmd.append(s); mergedCmd.append("\n"); } } if (tailLength > 0) { mergedCmd.append("("); } else if (ProcessTree.isSetsidAvailable && useSetSid && !Shell.WINDOWS) { mergedCmd.append("exec setsid "); } else { mergedCmd.append("exec "); } mergedCmd.append(addCommand(cmd, true)); mergedCmd.append(" < /dev/null "); if (tailLength > 0) { mergedCmd.append(" | "); mergedCmd.append(tailCommand); mergedCmd.append(" -c "); mergedCmd.append(tailLength); mergedCmd.append(" >> "); mergedCmd.append(stdout); mergedCmd.append(" ; exit $PIPESTATUS ) 2>&1 | "); mergedCmd.append(tailCommand); mergedCmd.append(" -c "); mergedCmd.append(tailLength); mergedCmd.append(" >> "); mergedCmd.append(stderr); mergedCmd.append(" ; exit $PIPESTATUS"); } else { mergedCmd.append(" 1>> "); mergedCmd.append(stdout); mergedCmd.append(" 2>> "); mergedCmd.append(stderr); } return mergedCmd.toString(); }
/** * Runs a recursive shell script to create a chain of subprocesses */ private static void runChildren(JobConf conf) throws IOException { if (ProcessTree.isSetsidAvailable) { FileSystem fs = FileSystem.getLocal(conf); if (fs.exists(scriptDir)) { fs.delete(scriptDir, true); } // Create the directory and set open permissions so that the TT can // access. fs.mkdirs(scriptDir); fs.setPermission(scriptDir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); // create shell script Random rm = new Random(); Path scriptPath = new Path(scriptDirName, "_shellScript_" + rm.nextInt() + ".sh"); String shellScript = scriptPath.toString(); // Construct the script. Set umask to 0000 so that TT can access all the // files. String script = "umask 000\n" + "echo $$ > " + scriptDirName + "/childPidFile" + "$1\n" + "echo hello\n" + "trap 'echo got SIGTERM' 15 \n" + "if [ $1 != 0 ]\nthen\n" + " sh " + shellScript + " $(($1-1))\n" + "else\n" + " while true\n do\n" + " sleep 2\n" + " done\n" + "fi"; DataOutputStream file = fs.create(scriptPath); file.writeBytes(script); file.close(); // Set executable permissions on the script. new File(scriptPath.toUri().getPath()).setExecutable(true); LOG.info("Calling script from map task : " + shellScript); Runtime.getRuntime() .exec(shellScript + " " + numLevelsOfSubProcesses); String childPid = UtilsForTests.getPidFromPidFile(scriptDirName + "/childPidFile" + 0); while (childPid == null) { LOG.warn(scriptDirName + "/childPidFile" + 0 + " is null; Sleeping..."); try { Thread.sleep(500); } catch (InterruptedException ie) { LOG.warn("sleep is interrupted:" + ie); break; } childPid = UtilsForTests.getPidFromPidFile(scriptDirName + "/childPidFile" + 0); } } }
static String buildCommandLine(List<String> setup, List<String> cmd, File stdoutFilename, File stderrFilename, long tailLength, boolean useSetSid) throws IOException { String stdout = FileUtil.makeShellPath(stdoutFilename); String stderr = FileUtil.makeShellPath(stderrFilename); StringBuffer mergedCmd = new StringBuffer(); if (!Shell.WINDOWS) { mergedCmd.append(" export JVM_PID=`echo $$` ; "); } if (setup != null && setup.size() > 0) { mergedCmd.append(addCommand(setup, false)); mergedCmd.append(";"); } if (tailLength > 0) { mergedCmd.append("("); } else if (ProcessTree.isSetsidAvailable && useSetSid && !Shell.WINDOWS) { mergedCmd.append("exec setsid "); } else { mergedCmd.append("exec "); } mergedCmd.append(addCommand(cmd, true)); mergedCmd.append(" < /dev/null "); if (tailLength > 0) { mergedCmd.append(" | "); mergedCmd.append(tailCommand); mergedCmd.append(" -c "); mergedCmd.append(tailLength); mergedCmd.append(" >> "); mergedCmd.append(stdout); mergedCmd.append(" ; exit $PIPESTATUS ) 2>&1 | "); mergedCmd.append(tailCommand); mergedCmd.append(" -c "); mergedCmd.append(tailLength); mergedCmd.append(" >> "); mergedCmd.append(stderr); mergedCmd.append(" ; exit $PIPESTATUS"); } else { mergedCmd.append(" 1>> "); mergedCmd.append(stdout); mergedCmd.append(" 2>> "); mergedCmd.append(stderr); } return mergedCmd.toString(); }
/** * Verifies if the subprocesses of the map task are killed properly. */ private static void validateKillingSubprocesses(RunningJob job, JobConf conf) throws IOException { // wait till the the job finishes while (!job.isComplete()) { try { Thread.sleep(500); } catch (InterruptedException e) { break; } } // Checking if the map task got killed or not boolean isDead = false; for (int i = 0; i < 3000; i++) { // We check this every 100 ms for 5 minute. isDead = !ProcessTree.isAlive(pid); if (isDead == true) { break; } sleep(100); } assertTrue(isDead); LOG.info("The map task is not alive after Job is completed, as expected."); // Checking if the descendant processes of map task are killed properly if(ProcessTree.isSetsidAvailable) { for(int i=0; i <= numLevelsOfSubProcesses; i++) { String childPid = UtilsForTests.getPidFromPidFile( scriptDirName + "/childPidFile" + i); LOG.info("pid of the descendant process at level " + i + "in the subtree of processes(with the map task as the root)" + " is " + childPid); // Check this every 100ms for 5 minute boolean isChildDead = false; for (int j = 0; j < 3000; j++) { isChildDead = !isAlive(childPid); if (isChildDead == true) { break; } sleep(100); } assertTrue("Unexpected: The subprocess at level " + i + " in the subtree is alive after Job completion", !isAlive(childPid)); } } FileSystem fs = FileSystem.getLocal(mr.createJobConf()); if(fs.exists(scriptDir)) { fs.delete(scriptDir, true); } }
/** * Runs a recursive shell script to create a chain of subprocesses */ private static void runChildren(JobConf conf) throws IOException { if (ProcessTree.isSetsidAvailable) { FileSystem fs = FileSystem.getLocal(conf); TEST_ROOT_DIR = new Path(conf.get("test.build.data")).toUri().getPath(); scriptDir = new Path(TEST_ROOT_DIR + "/script"); if(fs.exists(scriptDir)){ fs.delete(scriptDir, true); } // create shell script Random rm = new Random(); Path scriptPath = new Path(scriptDir, "_shellScript_" + rm.nextInt() + ".sh"); String shellScript = scriptPath.toString(); String script = "echo $$ > " + scriptDir.toString() + "/childPidFile" + "$1\n" + "echo hello\n" + "trap 'echo got SIGTERM' 15 \n" + "if [ $1 != 0 ]\nthen\n" + " sh " + shellScript + " $(($1-1))\n" + "else\n" + " while true\n do\n" + " sleep 2\n" + " done\n" + "fi"; DataOutputStream file = fs.create(scriptPath); file.writeBytes(script); file.close(); LOG.info("Calling script from map task of failjob : " + shellScript); Runtime.getRuntime() .exec(shellScript + " " + numLevelsOfSubProcesses); String childPid = UtilsForTests.getPidFromPidFile(scriptDir + "/childPidFile" + 0); while (childPid == null) { LOG.warn(scriptDir + "/childPidFile" + 0 + " is null; Sleeping..."); try { Thread.sleep(500); } catch (InterruptedException ie) { LOG.warn("sleep is interrupted:" + ie); break; } childPid = UtilsForTests.getPidFromPidFile(scriptDir + "/childPidFile" + 0); } } }
/** * Runs a recursive shell script to create a chain of subprocesses */ private static void runChildren(JobConf conf) throws IOException { if (ProcessTree.isSetsidAvailable) { FileSystem fs = FileSystem.getLocal(conf); if (fs.exists(scriptDir)) { fs.delete(scriptDir, true); } // Create the directory and set open permissions so that the TT can // access. fs.mkdirs(scriptDir); fs.setPermission(scriptDir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); // create shell script Random rm = new Random(); Path scriptPath = new Path(scriptDirName, "_shellScript_" + rm.nextInt() + ".sh"); String shellScript = scriptPath.toString(); // Construct the script. Set umask to 0000 so that TT can access all the // files. String script = "umask 000\n" + "echo $$ > " + scriptDirName + "/childPidFile" + "$1\n" + "echo hello\n" + "trap 'echo got SIGTERM' 15 \n" + "if [ $1 != 0 ]\nthen\n" + " sh " + shellScript + " $(($1-1))\n" + "else\n" + " while true\n do\n" + " sleep 2\n" + " done\n" + "fi"; DataOutputStream file = fs.create(scriptPath); file.writeBytes(script); file.close(); // Set executable permissions on the script. new File(scriptPath.toUri().getPath()).setExecutable(true); LOG.info("Calling script from map task : " + shellScript); Runtime.getRuntime() .exec(shellScript + " " + numLevelsOfSubProcesses); String childPid = TestProcfsBasedProcessTree.getPidFromPidFile(scriptDirName + "/childPidFile" + 0); while (childPid == null) { LOG.warn(scriptDirName + "/childPidFile" + 0 + " is null; Sleeping..."); try { Thread.sleep(500); } catch (InterruptedException ie) { LOG.warn("sleep is interrupted:" + ie); break; } childPid = TestProcfsBasedProcessTree.getPidFromPidFile(scriptDirName + "/childPidFile" + 0); } } }