/** * This job's files are no longer needed on this TT, remove them. * * @param rjob * @throws IOException */ void removeJobFiles(String user, JobID jobId) throws IOException { String userDir = getUserDir(user); String jobDir = getLocalJobDir(user, jobId.toString()); PathDeletionContext jobCleanup = new TaskController.DeletionContext(getTaskController(), false, user, jobDir.substring(userDir.length())); directoryCleanupThread.addToQueue(jobCleanup); for (String str : localStorage.getDirs()) { Path ttPrivateJobDir = FileSystem.getLocal(fConf).makeQualified( new Path(str, TaskTracker.getPrivateDirForJob(user, jobId.toString()))); PathDeletionContext ttPrivateJobCleanup = new CleanupQueue.PathDeletionContext(ttPrivateJobDir, fConf); directoryCleanupThread.addToQueue(ttPrivateJobCleanup); } }
/** * Some or all of the files from this task are no longer required. Remove * them via CleanupQueue. * * @param removeOutputs remove outputs as well as output * @param taskId * @throws IOException */ void removeTaskFiles(boolean removeOutputs) throws IOException { if (localJobConf.getNumTasksToExecutePerJvm() == 1) { String user = ugi.getShortUserName(); int userDirLen = TaskTracker.getUserDir(user).length(); String jobId = task.getJobID().toString(); String taskId = task.getTaskID().toString(); boolean cleanup = task.isTaskCleanupTask(); String taskDir; if (!removeOutputs) { taskDir = TaskTracker.getTaskWorkDir(user, jobId, taskId, cleanup); } else { taskDir = TaskTracker.getLocalTaskDir(user, jobId, taskId, cleanup); } PathDeletionContext item = new TaskController.DeletionContext(taskController, false, user, taskDir.substring(userDirLen)); directoryCleanupThread.addToQueue(item); } }
@Override public void addToQueue(PathDeletionContext... contexts) { // delete paths in-line for (PathDeletionContext context : contexts) { Exception exc = null; try { if (!deletePath(context)) { LOG.warn("Stale path " + context.fullPath); stalePaths.add(context.fullPath); } } catch (IOException e) { exc = e; } catch (InterruptedException ie) { exc = ie; } if (exc != null) { LOG.warn("Caught exception while deleting path " + context.fullPath); LOG.info(StringUtils.stringifyException(exc)); stalePaths.add(context.fullPath); } } }
private void deletePathsInSecureCluster(String newPathName, FileStatus status) throws FileNotFoundException, IOException { // In a secure tasktracker, the subdirectories belong // to different user PathDeletionContext item = null; // iterate and queue subdirectories for cleanup for (FileStatus subDirStatus : localFileSystem.listStatus(status.getPath())) { String owner = subDirStatus.getOwner(); String path = subDirStatus.getPath().getName(); if (path.equals(owner)) { // add it to the cleanup queue item = new TaskController.DeletionContext(taskController, false, owner, newPathName + Path.SEPARATOR_CHAR + path); cleanupQueue.addToQueue(item); } } // queue the parent directory for cleanup item = new TaskController.DeletionContext(taskController, false, status.getOwner(), newPathName); cleanupQueue.addToQueue(item); }
/** * Enables the task for cleanup by changing permissions of the specified path * in the local filesystem */ @Override void enableTaskForCleanup(PathDeletionContext context) throws IOException { if (context instanceof TaskControllerTaskPathDeletionContext) { TaskControllerTaskPathDeletionContext tContext = (TaskControllerTaskPathDeletionContext) context; enablePathForCleanup(tContext, TaskControllerCommands.ENABLE_TASK_FOR_CLEANUP, buildTaskCleanupArgs(tContext)); } else { throw new IllegalArgumentException("PathDeletionContext provided is not " + "TaskControllerTaskPathDeletionContext."); } }
/** * Deletes the log path. * * This path will be removed through {@link CleanupQueue} * * @param logPath * @throws IOException */ private void deleteLogPath(String logPath) throws IOException { LOG.info("Deleting user log path " + logPath); String user = getLogUser(logPath); TaskController controller = userLogManager.getTaskController(); PathDeletionContext item = new TaskController.DeletionContext(controller, true, user, logPath); cleanupQueue.addToQueue(item); }
static boolean deletePath(PathDeletionContext context) throws IOException, InterruptedException { if (LOG.isDebugEnabled()) { LOG.debug("Trying to delete " + context.fullPath); } // FileSystem fs = context.fullPath.getFileSystem(context.conf); // if (fs.exists(context.fullPath)) { // return fs.delete(context.fullPath, true); // } context.deletePath(); return true; }
/** * Enables the task for cleanup by changing permissions of the specified path * in the local filesystem */ @Override void enableTaskForCleanup(PathDeletionContext context) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Going to do " + TaskCommands.ENABLE_TASK_FOR_CLEANUP.toString() + " for " + context.fullPath); } if (context instanceof TaskControllerPathDeletionContext) { TaskControllerPathDeletionContext tContext = (TaskControllerPathDeletionContext) context; if (tContext.task.getUser() != null && tContext.fs instanceof LocalFileSystem) { try { runCommand(TaskCommands.ENABLE_TASK_FOR_CLEANUP, tContext.task.getUser(), buildTaskCleanupArgs(tContext), null, null); } catch(IOException e) { LOG.warn("Uanble to change permissions for " + tContext.fullPath); } } else { throw new IllegalArgumentException("Either user is null or the " + "file system is not local file system."); } } else { throw new IllegalArgumentException("PathDeletionContext provided is not " + "TaskControllerPathDeletionContext."); } }
/** * Enables the task for cleanup by changing permissions of the specified path * in the local filesystem */ @Override void enableTaskForCleanup(PathDeletionContext context) throws IOException { try { FileUtil.chmod(context.fullPath, "a+rwx", true); } catch(InterruptedException e) { LOG.warn("Interrupted while setting permissions for " + context.fullPath + " for deletion."); } catch(IOException ioe) { LOG.warn("Unable to change permissions of " + context.fullPath); } }
private static PathDeletionContext[] buildPathDeletionContexts(FileSystem fs, Path[] paths) { int i = 0; PathDeletionContext[] contexts = new PathDeletionContext[paths.length]; for (Path p : paths) { contexts[i++] = new PathDeletionContext(fs, p.toUri().getPath()); } return contexts; }
static PathDeletionContext[] buildTaskControllerPathDeletionContexts( FileSystem fs, Path[] paths, Task task, boolean isWorkDir, TaskController taskController) throws IOException { int i = 0; PathDeletionContext[] contexts = new TaskControllerPathDeletionContext[paths.length]; for (Path p : paths) { contexts[i++] = new TaskControllerPathDeletionContext(fs, p, task, isWorkDir, taskController); } return contexts; }
/** * The job is dead. We're now GC'ing it, getting rid of the job * from all tables. Be sure to remove all of this job's tasks * from the various tables. */ void garbageCollect() { synchronized(this) { // Cancel task tracker reservation cancelReservedSlots(); // Let the JobTracker know that a job is complete jobtracker.getInstrumentation().decWaitingMaps(getJobID(), pendingMaps()); jobtracker.getInstrumentation().decWaitingReduces(getJobID(), pendingReduces()); jobtracker.storeCompletedJob(this); jobtracker.finalizeJob(this); try { // Definitely remove the local-disk copy of the job file if (localJobFile != null) { localFs.delete(localJobFile, true); localJobFile = null; } Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID()); new CleanupQueue().addToQueue(new PathDeletionContext( jobtracker.getFileSystem(), tempDir.toUri().getPath())); } catch (IOException e) { LOG.warn("Error cleaning up "+profile.getJobID()+": "+e); } // free up the memory used by the data structures this.nonRunningMapCache = null; this.runningMapCache = null; this.nonRunningReduces = null; this.runningReduces = null; } // remove jobs delegation tokens if(conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true)) { DelegationTokenRenewal.removeDelegationTokenRenewalForJob(jobId); } // else don't remove it.May be used by spawned tasks }
/** * Enables the job for cleanup by changing permissions of the specified path * in the local filesystem */ @Override void enableJobForCleanup(PathDeletionContext context) throws IOException { if (context instanceof TaskControllerJobPathDeletionContext) { TaskControllerJobPathDeletionContext tContext = (TaskControllerJobPathDeletionContext) context; enablePathForCleanup(tContext, TaskControllerCommands.ENABLE_JOB_FOR_CLEANUP, buildJobCleanupArgs(tContext)); } else { throw new IllegalArgumentException("PathDeletionContext provided is not " + "TaskControllerJobPathDeletionContext."); } }
/** * Enables the path for cleanup by changing permissions of the specified path * in the local filesystem */ private void enablePathForCleanup(PathDeletionContext context) throws IOException { try { FileUtil.chmod(context.fullPath, "u+rwx", true); } catch(InterruptedException e) { LOG.warn("Interrupted while setting permissions for " + context.fullPath + " for deletion."); } catch(IOException ioe) { LOG.warn("Unable to change permissions of " + context.fullPath); } }
/** * Builds list of PathDeletionContext objects for the given paths */ private static PathDeletionContext[] buildPathDeletionContexts(FileSystem fs, Path[] paths) { int i = 0; PathDeletionContext[] contexts = new PathDeletionContext[paths.length]; for (Path p : paths) { contexts[i++] = new PathDeletionContext(fs, p.toUri().getPath()); } return contexts; }
/** * Builds list of {@link TaskControllerJobPathDeletionContext} objects for a * job each pointing to the job's jobLocalDir. * @param fs : FileSystem in which the dirs to be deleted * @param paths : mapred-local-dirs * @param id : {@link JobID} of the job for which the local-dir needs to * be cleaned up. * @param user : Job owner's username * @param taskController : the task-controller to be used for deletion of * jobLocalDir */ static PathDeletionContext[] buildTaskControllerJobPathDeletionContexts( FileSystem fs, Path[] paths, JobID id, String user, TaskController taskController) throws IOException { int i = 0; PathDeletionContext[] contexts = new TaskControllerPathDeletionContext[paths.length]; for (Path p : paths) { contexts[i++] = new TaskControllerJobPathDeletionContext(fs, p, id, user, taskController); } return contexts; }
/** * Builds list of TaskControllerTaskPathDeletionContext objects for a task * @param fs : FileSystem in which the dirs to be deleted * @param paths : mapred-local-dirs * @param task : the task whose taskDir or taskWorkDir is going to be deleted * @param isWorkDir : the dir to be deleted is workDir or taskDir * @param taskController : the task-controller to be used for deletion of * taskDir or taskWorkDir */ static PathDeletionContext[] buildTaskControllerTaskPathDeletionContexts( FileSystem fs, Path[] paths, Task task, boolean isWorkDir, TaskController taskController) throws IOException { int i = 0; PathDeletionContext[] contexts = new TaskControllerPathDeletionContext[paths.length]; for (Path p : paths) { contexts[i++] = new TaskControllerTaskPathDeletionContext(fs, p, task, isWorkDir, taskController); } return contexts; }
/** * This job's files are no longer needed on this TT, remove them. * * @param rjob * @throws IOException */ void removeJobFiles(String user, JobID jobId) throws IOException { PathDeletionContext[] contexts = buildTaskControllerJobPathDeletionContexts(localFs, getLocalFiles(fConf, ""), jobId, user, taskController); directoryCleanupThread.addToQueue(contexts); }