Java 类org.apache.hadoop.mapred.JvmManager.JvmManagerForType.JvmRunner 实例源码

项目:mammoth    文件   
private void spawnNewJvm(JobID jobId, JvmEnv env,  
    TaskRunner t) {
  JvmRunner jvmRunner = new JvmRunner(env, jobId, t.getTask());
  jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
  //spawn the JVM in a new thread. Note that there will be very little
  //extra overhead of launching the new thread for a new JVM since
  //most of the cost is involved in launching the process. Moreover,
  //since we are going to be using the JVM for running many tasks,
  //the thread launch cost becomes trivial when amortized over all
  //tasks. Doing it this way also keeps code simple.
  jvmRunner.setName("JVM Runner " + jvmRunner.jvmId + " spawned.");
  setRunningTaskForJvm(jvmRunner.jvmId, t);;
项目:mammoth    文件   
synchronized public TaskInProgress getTaskForJvm(JVMId jvmId)
    throws IOException {
  /*if (jvmToRunningTask.containsKey(jvmId)) {
    //Incase of JVM reuse, tasks are returned to previously launched
    //JVM via this method. However when a new task is launched
    //the task being returned has to be initialized.

    TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
    JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
    Task task = taskRunner.getTaskInProgress().getTask();

    return taskRunner.getTaskInProgress();

  if (jvmToPendingTasks.containsKey(jvmId)) {
      //Incase of JVM reuse, tasks are returned to previously launched
      //JVM via this method. However when a new task is launched
      //the task being returned has to be initialized.

      List<TaskRunner> taskRunners = jvmToPendingTasks.get(jvmId);
      if (taskRunners.size() == 0) {
          return null;
      JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
      TaskRunner tr= taskRunners.remove(0);
      if (!this.jvmToRunningTasks.containsKey(jvmId)) {
          this.jvmToRunningTasks.put(jvmId, new ArrayList<TaskRunner>());
      TaskInProgress tip = tr.getTaskInProgress();          
      return tip;
  return null;
项目:mammoth    文件   
synchronized public void taskFinished(TaskRunner tr) {
  JVMId jvmId = runningTaskToJvm.remove(tr);
  if (jvmId != null) {
    if (jvmToRunningTasks.get(jvmId).size() == 0) {
    JvmRunner jvmRunner;
    if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) {
      jvmRunner.taskRan( tr.getTask());
项目:mammoth    文件   
synchronized public void killJvm(JVMId jvmId) throws IOException, 
                                                     InterruptedException {
  JvmRunner jvmRunner;
  if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) {
项目:mammoth    文件   
synchronized public void stop() throws IOException, InterruptedException {
  //since the kill() method invoked later on would remove
  //an entry from the jvmIdToRunner map, we create a
  //copy of the values and iterate over it (if we don't
  //make a copy, we will encounter concurrentModification
  List <JvmRunner> list = new ArrayList<JvmRunner>();
  for (JvmRunner jvm : list) {
项目:mammoth    文件   
private synchronized void reapJvm( 
     TaskRunner t, JvmEnv env) throws IOException, InterruptedException {
   if (t.getTaskInProgress().wasKilled()) {
     //the task was killed in-flight
     //no need to do the rest of the operations
   //boolean spawnNewJvm = false;

   JobID jobId = t.getTask().getJobID();
   int numJvmsSpawned = getSpawnedJvmNum(t.getTask().isMapTask());
   JvmRunner runnerToKill = null;
   if ((t.getTask().isMapTask()&& numJvmsSpawned < maxMapJvms) || 
        (!t.getTask().isMapTask()&& numJvmsSpawned < maxReduceJvms)) {
    Iterator<Map.Entry<JVMId, JvmRunner>> jvmIter = 

 while (jvmIter.hasNext()) {
JvmRunner jvmRunner =;
   JobID jId = jvmRunner.jvmId.getJobId();        
     //look for a free JVM for this job; if one exists then just break          
   if (jId.equals(jobId)){
     setRunningTaskForJvm(jvmRunner.jvmId, t); //reserve the JVM"No new JVM spawned for jobId/taskid: " + 
              jobId+"/"+t.getTask().getTaskID() +
              ". Attempting to reuse: " + jvmRunner.jvmId);
 spawnNewJvm(jobId, env, t);
   LOG.fatal("Inconsistent state!!! " +
        "JVM Manager reached an unstable state " +
         "while reaping a JVM for task: " + t.getTask().getTaskID()+
         " " + getDetails() + ". Aborting. ");
项目:mammoth    文件   
public JvmRunner(JvmEnv env, JobID jobId, Task firstTask) {
  this.env = env;
  this.jvmId = new JVMId(jobId, rand.nextInt());
  this.numTasksToRun = env.conf.getNumTasksToExecutePerJvm();
  this.firstTask = firstTask;"In JvmRunner constructed JVM ID: " + jvmId);
项目:mapreduce-fork    文件   
 * Create a bunch of tasks and use a special hash map to detect
 * racy access to the various internal data structures of JvmManager.
 * (Regression test for MAPREDUCE-2224)
public void testForRaces() throws Exception {
  JvmManagerForType mapJvmManager = jvmManager

  // Sub out the HashMaps for maps that will detect racy access.
  mapJvmManager.jvmToRunningTask = new RaceHashMap<JVMId, TaskRunner>();
  mapJvmManager.runningTaskToJvm = new RaceHashMap<TaskRunner, JVMId>();
  mapJvmManager.jvmIdToRunner = new RaceHashMap<JVMId, JvmRunner>();

  // Launch a bunch of JVMs, but only allow MAP_SLOTS to run at once.
  final ExecutorService exec = Executors.newFixedThreadPool(MAP_SLOTS);
  final AtomicReference<Throwable> failed =
    new AtomicReference<Throwable>();

  for (int i = 0; i < MAP_SLOTS*5; i++) {
    JobConf taskConf = new JobConf(ttConf);
    TaskAttemptID attemptID = new TaskAttemptID("test", 0, TaskType.MAP, i, 0);
    Task task = new MapTask(null, attemptID, i, null, 1);
    TaskInProgress tip = TaskInProgress(task, taskConf);
    File pidFile = new File(TEST_DIR, "pid_" + i);
    final TaskRunner taskRunner = task.createRunner(tt, tip);
    // launch a jvm which sleeps for 60 seconds
    final Vector<String> vargs = new Vector<String>(2);
    vargs.add(writeScript("script_" + i, "echo hi\n", pidFile).getAbsolutePath());
    final File workDir = new File(TEST_DIR, "work_" + i);
    final File stdout = new File(TEST_DIR, "stdout_" + i);
    final File stderr = new File(TEST_DIR, "stderr_" + i);

    // launch the process and wait in a thread, till it finishes
    Runnable launcher = new Runnable() {
      public void run() {
        try {
          taskRunner.launchJvmAndWait(null, vargs, stdout, stderr, 100,
              workDir, null);
        } catch (Throwable t) {
          failed.compareAndSet(null, t);

  exec.awaitTermination(3, TimeUnit.MINUTES);
  if (failed.get() != null) {
    throw new RuntimeException(failed.get());
项目:mammoth    文件   
private synchronized void killJvmRunner(JvmRunner jvmRunner
                                        ) throws IOException,
                                                 InterruptedException {
项目:mammoth    文件   
private synchronized void oldReapJvm( 
    TaskRunner t, JvmEnv env) throws IOException, InterruptedException {
  if (t.getTaskInProgress().wasKilled()) {
    //the task was killed in-flight
    //no need to do the rest of the operations
  boolean spawnNewJvm = false;
  JobID jobId = t.getTask().getJobID();
  //Check whether there is a free slot to start a new JVM.
  //,or, Kill a (idle) JVM and launch a new one
  //When this method is called, we *must* 
  // (1) spawn a new JVM (if we are below the max) 
  // (2) find an idle JVM (that belongs to the same job), or,
  // (3) kill an idle JVM (from a different job) 
  // (the order of return is in the order above)
  int numJvmsSpawned = getSpawnedJvmNum(t.getTask().isMapTask());
  JvmRunner runnerToKill = null;
  if ((t.getTask().isMapTask()&& numJvmsSpawned >= maxMapJvms) || 
        (t.getTask().isMapTask()&& numJvmsSpawned >= maxReduceJvms)) {
    //go through the list of JVMs for all jobs.
    Iterator<Map.Entry<JVMId, JvmRunner>> jvmIter = 

    while (jvmIter.hasNext()) {
      JvmRunner jvmRunner =;
      JobID jId = jvmRunner.jvmId.getJobId();
      //look for a free JVM for this job; if one exists then just break
      if (jId.equals(jobId) && !jvmRunner.isBusy() && !jvmRunner.ranAll()){
        setRunningTaskForJvm(jvmRunner.jvmId, t); //reserve the JVM"No new JVM spawned for jobId/taskid: " + 
                 jobId+"/"+t.getTask().getTaskID() +
                 ". Attempting to reuse: " + jvmRunner.jvmId);
      //Cases when a JVM is killed: 
      // (1) the JVM under consideration belongs to the same job 
      //     (passed in the argument). In this case, kill only when
      //     the JVM ran all the tasks it was scheduled to run (in terms
      //     of count).
      // (2) the JVM under consideration belongs to a different job and is
      //     currently not busy
      //But in both the above cases, we see if we can assign the current
      //task to an idle JVM (hence we continue the loop even on a match)
      if ((jId.equals(jobId) && jvmRunner.ranAll()) ||
          (!jId.equals(jobId) && !jvmRunner.isBusy())) {
        runnerToKill = jvmRunner;
        spawnNewJvm = true;
  } else {
    spawnNewJvm = true;

  if (spawnNewJvm) {
    if (runnerToKill != null) {"Killing JVM: " + runnerToKill.jvmId);
    spawnNewJvm(jobId, env, t);
  //*MUST* never reach this
  LOG.fatal("Inconsistent state!!! " +
        "JVM Manager reached an unstable state " +
        "while reaping a JVM for task: " + t.getTask().getTaskID()+
        " " + getDetails() + ". Aborting. ");