/** * Create taskDirs on all the disks. Otherwise, in some cases, like when * LinuxTaskController is in use, child might wish to balance load across * disks but cannot itself create attempt directory because of the fact that * job directory is writable only by the TT. * * @param user * @param jobId * @param attemptId * @throws IOException */ public void initializeAttemptDirs(String user, String jobId, String attemptId) throws IOException { boolean initStatus = false; String attemptDirPath = TaskTracker.getLocalTaskDir(user, jobId, attemptId); for (String localDir : localDirs) { Path localAttemptDir = new Path(localDir, attemptDirPath); boolean attemptDirStatus = fs.mkdirs(localAttemptDir); if (!attemptDirStatus) { LOG.warn("localAttemptDir " + localAttemptDir.toString() + " couldn't be created."); } initStatus = initStatus || attemptDirStatus; } if (!initStatus) { throw new IOException("Not able to initialize attempt directories " + "in any of the configured local directories for the attempt " + attemptId); } }
public void testFileSystemOtherThanDefault() throws Exception { if (!canRun()) { return; } TrackerDistributedCacheManager manager = new TrackerDistributedCacheManager(conf, taskController); conf.set("fs.fakefile.impl", FileSystem.getFileSystemClass("file", conf).getName()); String userName = getJobOwnerName(); conf.set("user.name", userName); Path fileToCache = new Path("fakefile:///" + firstCacheFile.toUri().getPath()); CacheFile file = new CacheFile(fileToCache.toUri(), CacheFile.FileType.REGULAR, false, 0, false); Path result = manager.getLocalCache(fileToCache.toUri(), conf, TaskTracker.getPrivateDistributedCacheDir(userName), fs.getFileStatus(firstCacheFile), false, System.currentTimeMillis(), false, file); assertNotNull("DistributedCache cached file on non-default filesystem.", result); }
public void testFileSystemOtherThanDefault() throws Exception { if (!canRun()) { return; } TrackerDistributedCacheManager manager = new TrackerDistributedCacheManager(conf, taskController); conf.set("fs.fakefile.impl", conf.get("fs.file.impl")); String userName = getJobOwnerName(); conf.set("user.name", userName); Path fileToCache = new Path("fakefile:///" + firstCacheFile.toUri().getPath()); CacheFile file = new CacheFile(fileToCache.toUri(), CacheFile.FileType.REGULAR, false, 0, false); Path result = manager.getLocalCache(fileToCache.toUri(), conf, TaskTracker.getPrivateDistributedCacheDir(userName), fs.getFileStatus(firstCacheFile), false, System.currentTimeMillis(), false, file); assertNotNull("DistributedCache cached file on non-default filesystem.", result); }
/** * Create taskDirs on all the disks. Otherwise, in some cases, like when * LinuxTaskController is in use, child might wish to balance load across * disks but cannot itself create attempt directory because of the fact that * job directory is writable only by the TT. * * @param user * @param jobId * @param attemptId * @param isCleanupAttempt * @throws IOException */ public void initializeAttemptDirs(String user, String jobId, String attemptId, boolean isCleanupAttempt) throws IOException { boolean initStatus = false; String attemptDirPath = TaskTracker.getLocalTaskDir(user, jobId, attemptId, isCleanupAttempt); for (String localDir : localDirs) { Path localAttemptDir = new Path(localDir, attemptDirPath); boolean attemptDirStatus = fs.mkdirs(localAttemptDir); if (!attemptDirStatus) { LOG.warn("localAttemptDir " + localAttemptDir.toString() + " couldn't be created."); } initStatus = initStatus || attemptDirStatus; } if (!initStatus) { throw new IOException("Not able to initialize attempt directories " + "in any of the configured local directories for the attempt " + attemptId); } }
public void testFileSystemOtherThanDefault() throws Exception { if (!canRun()) { return; } TrackerDistributedCacheManager manager = new TrackerDistributedCacheManager(conf, taskController); conf.set("fs.fakefile.impl", conf.get("fs.file.impl")); String userName = getJobOwnerName(); conf.set(MRJobConfig.USER_NAME, userName); Path fileToCache = new Path("fakefile:///" + firstCacheFile.toUri().getPath()); Path result = manager.getLocalCache(fileToCache.toUri(), conf, TaskTracker.getPrivateDistributedCacheDir(userName), fs.getFileStatus(firstCacheFile), false, getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false); assertNotNull("DistributedCache cached file on non-default filesystem.", result); }
public TaskMemoryManagerThread(TaskTracker taskTracker) { this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L, taskTracker.getJobConf().getLong( "mapred.tasktracker.taskmemorymanager.monitoring-interval", 5000L)); this.taskTracker = taskTracker; long reservedRssMemory = taskTracker.getReservedPhysicalMemoryOnTT(); long totalPhysicalMemoryOnTT = taskTracker.getTotalPhysicalMemoryOnTT(); if (reservedRssMemory == JobConf.DISABLED_MEMORY_LIMIT || totalPhysicalMemoryOnTT == JobConf.DISABLED_MEMORY_LIMIT) { maxRssMemoryAllowedForAllTasks = JobConf.DISABLED_MEMORY_LIMIT; LOG.info("Physical memory monitoring disabled"); } else { maxRssMemoryAllowedForAllTasks = totalPhysicalMemoryOnTT - reservedRssMemory; if (maxRssMemoryAllowedForAllTasks < 0) { maxRssMemoryAllowedForAllTasks = JobConf.DISABLED_MEMORY_LIMIT; LOG.warn("Reserved physical memory exceeds total. Physical memory " + "monitoring disabled."); } else { LOG.info(String.format("Physical memory monitoring enabled. " + "System total: %s. Reserved: %s. Maximum: %s.", totalPhysicalMemoryOnTT, reservedRssMemory, maxRssMemoryAllowedForAllTasks)); } } }
/** * Prepare the job directories for a given job. To be called by the job * localization code, only if the job is not already localized. * * <br> * Here, we set 700 permissions on the job directories created on all disks. * This we do so as to avoid any misuse by other users till the time * {@link TaskController#initializeJob} is run at a * later time to set proper private permissions on the job directories. <br> * * @param user * @param jobId * @throws IOException */ public void initializeJobDirs(String user, JobID jobId) throws IOException { boolean initJobDirStatus = false; String jobDirPath = TaskTracker.getLocalJobDir(user, jobId.toString()); for (String localDir : localDirs) { Path jobDir = new Path(localDir, jobDirPath); if (fs.exists(jobDir)) { // this will happen on a partial execution of localizeJob. Sometimes // copying job.xml to the local disk succeeds but copying job.jar might // throw out an exception. We should clean up and then try again. fs.delete(jobDir, true); } boolean jobDirStatus = fs.mkdirs(jobDir); if (!jobDirStatus) { LOG.warn("Not able to create job directory " + jobDir.toString()); } initJobDirStatus = initJobDirStatus || jobDirStatus; // job-dir has to be private to the TT fs.setPermission(jobDir, new FsPermission((short)0700)); } if (!initJobDirStatus) { throw new IOException("Not able to initialize job directories " + "in any of the configured local directories for job " + jobId.toString()); } }
public TaskMemoryManagerThread(TaskTracker taskTracker) { this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L, taskTracker.getJobConf().getLong( "mapred.tasktracker.taskmemorymanager.monitoring-interval", 5000L)); this.taskTracker = taskTracker; }
public TaskMemoryManagerThread(TaskTracker taskTracker) { this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L, taskTracker.getJobConf().getLong( "mapred.tasktracker.taskmemorymanager.monitoring-interval", 5000L), taskTracker.getJobConf().getLong( "mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill", ProcfsBasedProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL)); this.taskTracker = taskTracker; }
public TaskMemoryManagerThread(TaskTracker taskTracker) { this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L, taskTracker.getJobConf().getLong( TTConfig.TT_MEMORY_MANAGER_MONITORING_INTERVAL, 5000L)); this.taskTracker = taskTracker; long reservedRssMemory = taskTracker.getReservedPhysicalMemoryOnTT(); long totalPhysicalMemoryOnTT = taskTracker.getTotalPhysicalMemoryOnTT(); if (reservedRssMemory == JobConf.DISABLED_MEMORY_LIMIT || totalPhysicalMemoryOnTT == JobConf.DISABLED_MEMORY_LIMIT) { maxRssMemoryAllowedForAllTasks = JobConf.DISABLED_MEMORY_LIMIT; } else { maxRssMemoryAllowedForAllTasks = totalPhysicalMemoryOnTT - reservedRssMemory; } }
/** * Prepare the job directories for a given job. To be called by the job * localization code, only if the job is not already localized. * * <br> * Here, we set 700 permissions on the job directories created on all disks. * This we do so as to avoid any misuse by other users till the time * {@link TaskController#initializeJob(JobInitializationContext)} is run at a * later time to set proper private permissions on the job directories. <br> * * @param user * @param jobId * @throws IOException */ public void initializeJobDirs(String user, JobID jobId) throws IOException { boolean initJobDirStatus = false; String jobDirPath = TaskTracker.getLocalJobDir(user, jobId.toString()); for (String localDir : localDirs) { Path jobDir = new Path(localDir, jobDirPath); if (fs.exists(jobDir)) { // this will happen on a partial execution of localizeJob. Sometimes // copying job.xml to the local disk succeeds but copying job.jar might // throw out an exception. We should clean up and then try again. fs.delete(jobDir, true); } boolean jobDirStatus = fs.mkdirs(jobDir); if (!jobDirStatus) { LOG.warn("Not able to create job directory " + jobDir.toString()); } initJobDirStatus = initJobDirStatus || jobDirStatus; // job-dir has to be private to the TT fs.setPermission(jobDir, new FsPermission((short)0700)); } if (!initJobDirStatus) { throw new IOException("Not able to initialize job directories " + "in any of the configured local directories for job " + jobId.toString()); } }
MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, long size, JobConf conf, LocalDirAllocator localDirAllocator, int fetcher, boolean primaryMapOutput) throws IOException { this.id = ID.incrementAndGet(); this.mapId = mapId; this.merger = merger; type = Type.DISK; memory = null; byteStream = null; this.size = size; this.localFS = FileSystem.getLocal(conf); String filename = "map_" + mapId.getTaskID().getId() + ".out"; String tmpOutput = Path.SEPARATOR + TaskTracker.getJobCacheSubdir(conf.getUser()) + Path.SEPARATOR + mapId.getJobID() + Path.SEPARATOR + merger.getReduceId() + Path.SEPARATOR + "output" + Path.SEPARATOR + filename + "." + fetcher; tmpOutputPath = localDirAllocator.getLocalPathForWrite(tmpOutput, size, conf); outputPath = new Path(tmpOutputPath.getParent(), filename); disk = localFS.create(tmpOutputPath); this.primaryMapOutput = primaryMapOutput; }
/** * Localize a file. After localization is complete, create a file, "myFile", * under the directory where the file is localized and ensure that it has * permissions different from what is set by default. Then, localize another * file. Verify that "myFile" has the right permissions. * @throws Exception */ public void testCustomPermissions() throws Exception { if (!canRun()) { return; } String userName = getJobOwnerName(); conf.set(MRJobConfig.USER_NAME, userName); TrackerDistributedCacheManager manager = new TrackerDistributedCacheManager(conf, taskController); FileSystem localfs = FileSystem.getLocal(conf); Path[] localCache = new Path[2]; localCache[0] = manager.getLocalCache(firstCacheFile.toUri(), conf, TaskTracker.getPrivateDistributedCacheDir(userName), fs.getFileStatus(firstCacheFile), false, getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false); FsPermission myPermission = new FsPermission((short)0600); Path myFile = new Path(localCache[0].getParent(), "myfile.txt"); if (FileSystem.create(localfs, myFile, myPermission) == null) { throw new IOException("Could not create " + myFile); } try { localCache[1] = manager.getLocalCache(secondCacheFile.toUri(), conf, TaskTracker.getPrivateDistributedCacheDir(userName), fs.getFileStatus(secondCacheFile), false, getFileStamp(secondCacheFile), new Path(TEST_ROOT_DIR), false, false); FileStatus stat = localfs.getFileStatus(myFile); assertTrue(stat.getPermission().equals(myPermission)); // validate permissions of localized files. checkFilePermissions(localCache); } finally { localfs.delete(myFile, false); } }
/** * This is the typical flow for using the DistributedCache classes. * * @throws IOException * @throws LoginException */ public void testManagerFlow() throws IOException, LoginException { if (!canRun()) { return; } // ****** Imitate JobClient code // Configures a task/job with both a regular file and a "classpath" file. Configuration subConf = new Configuration(conf); String userName = getJobOwnerName(); subConf.set("user.name", userName); JobID jobid = new JobID("jt",1); DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf); DistributedCache.addFileToClassPath(secondCacheFile, subConf, FileSystem.get(subConf)); Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>(); TrackerDistributedCacheManager.determineTimestamps(subConf, statCache); TrackerDistributedCacheManager.determineCacheVisibilities(subConf, statCache); assertEquals(2, statCache.size()); // ****** End of imitating JobClient code Path jobFile = new Path(TEST_ROOT_DIR, "job.xml"); FileOutputStream os = new FileOutputStream(new File(jobFile.toString())); subConf.writeXml(os); os.close(); // ****** Imitate TaskRunner code. TrackerDistributedCacheManager manager = new TrackerDistributedCacheManager(conf, taskController); TaskDistributedCacheManager handle = manager.newTaskDistributedCacheManager(jobid, subConf); assertNull(null, DistributedCache.getLocalCacheFiles(subConf)); File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString()); handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(), TaskTracker.getPrivateDistributedCacheDir(userName)); JobLocalizer.downloadPrivateCache(subConf); // DOESN'T ACTUALLY HAPPEN IN THE TaskRunner (THIS IS A TODO) // handle.setupPrivateCache(localDirAllocator, TaskTracker // .getPrivateDistributedCacheDir(userName)); // // ****** End of imitating TaskRunner code Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf); assertNotNull(null, localCacheFiles); assertEquals(2, localCacheFiles.length); Path cachedFirstFile = localCacheFiles[0]; Path cachedSecondFile = localCacheFiles[1]; assertFileLengthEquals(firstCacheFile, cachedFirstFile); assertFalse("Paths should be different.", firstCacheFile.equals(cachedFirstFile)); assertEquals(1, handle.getClassPaths().size()); assertEquals(cachedSecondFile.toString(), handle.getClassPaths().get(0)); checkFilePermissions(localCacheFiles); // Cleanup handle.release(); manager.purgeCache(); assertFalse(pathToFile(cachedFirstFile).exists()); }
public void testSameNameFileArchiveCache() throws IOException, InterruptedException { if (!canRun()) { return; } TrackerDistributedCacheManager manager = new TrackerDistributedCacheManager( conf, taskController); String userName = getJobOwnerName(); File workDir = new File(TEST_ROOT_DIR, "workdir"); Path cacheFile = new Path(TEST_ROOT_DIR, "fileArchiveCacheFile"); createPublicTempFile(cacheFile); Configuration conf1 = new Configuration(conf); conf1.set("user.name", userName); DistributedCache.addCacheFile(cacheFile.toUri(), conf1); DistributedCache.addCacheArchive(cacheFile.toUri(), conf1); TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf1); dumpState(conf1); TaskDistributedCacheManager handle = manager .newTaskDistributedCacheManager(new JobID("jt", 1), conf1); handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(), TaskTracker.getPrivateDistributedCacheDir(userName)); TaskDistributedCacheManager.CacheFile cFile = handle.getCacheFiles().get(0); TaskDistributedCacheManager.CacheFile cArchive = handle.getCacheFiles() .get(1); String distCacheDir = TaskTracker.getPublicDistributedCacheDir(); Path localizedPathForFile = manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir, fs.getFileStatus(cacheFile), false, cFile.timestamp, true, cFile); Path localizedPathForArchive = manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir, fs.getFileStatus(cacheFile), true, cArchive.timestamp, true, cArchive); assertNotSame("File and Archive resolve to the same path: " + localizedPathForFile + ". Should differ.", localizedPathForFile, localizedPathForArchive); }
private void checkLocalizedPath(boolean visibility) throws IOException, LoginException, InterruptedException, URISyntaxException { TrackerDistributedCacheManager manager = new TrackerDistributedCacheManager(conf, taskController); String userName = getJobOwnerName(); File workDir = new File(TEST_ROOT_DIR, "workdir"); Path cacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile"); if (visibility) { createPublicTempFile(cacheFile); } else { createPrivateTempFile(cacheFile); } Configuration conf1 = new Configuration(conf); conf1.set("user.name", userName); DistributedCache.addCacheFile(cacheFile.toUri(), conf1); TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf1); dumpState(conf1); // Task localizing for job TaskDistributedCacheManager handle = manager .newTaskDistributedCacheManager(new JobID("jt", 1), conf1); handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(), TaskTracker.getPrivateDistributedCacheDir(userName)); JobLocalizer.downloadPrivateCache(conf1); TaskDistributedCacheManager.CacheFile c = handle.getCacheFiles().get(0); String distCacheDir; if (visibility) { distCacheDir = TaskTracker.getPublicDistributedCacheDir(); } else { distCacheDir = TaskTracker.getPrivateDistributedCacheDir(userName); } Path localizedPath = manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir, fs.getFileStatus(cacheFile), false, c.timestamp, visibility, c); assertTrue("Cache file didn't get localized in the expected directory. " + "Expected localization to happen within " + ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir + ", but was localized at " + localizedPath, localizedPath.toString().contains(distCacheDir)); if (visibility) { checkPublicFilePermissions(new Path[]{localizedPath}); } else { checkFilePermissions(new Path[]{localizedPath}); } }
public MultiTaskTrackerMetrics(List<TaskTracker> trackerList) { this.trackerList = trackerList; MetricsContext context = MetricsUtil.getContext("mapred"); metricsRecord = MetricsUtil.createRecord(context, "multitasktracker"); context.registerUpdater(this); }
@Override public void doUpdates(MetricsContext context) { LOG.info("Updating metrics"); int numTrackers = trackerList.size(); long totalMapRefill = 0; long totalReduceRefill = 0; int totalRunningMaps = 0; int totalRunningReduces = 0; int totalMapSlots = 0; int totalReduceSlots = 0; for (TaskTracker tracker : trackerList) { totalMapRefill += tracker.getAveMapSlotRefillMsecs(); totalReduceRefill += tracker.getAveReduceSlotRefillMsecs(); totalRunningMaps += tracker.getRunningMaps(); totalRunningReduces += tracker.getRunningReduces(); totalMapSlots += tracker.getMaxActualMapTasks(); totalReduceSlots += tracker.getMaxActualReduceTasks(); // If the metrics exists, aggregate the task launch msecs for all // trackers TaskTrackerInstrumentation instrumentation = tracker.getTaskTrackerInstrumentation(); if (instrumentation != null) { MetricsTimeVaryingRate taskLaunchMsecs = instrumentation.getTaskLaunchMsecs(); if (taskLaunchMsecs != null) { taskLaunchMsecs.pushMetric(null); aggTaskLaunchMsecs.inc( taskLaunchMsecs.getPreviousIntervalAverageTime()); } } } long avgMapRefill = totalMapRefill / numTrackers; long avgReduceRefill = totalReduceRefill / numTrackers; metricsRecord.setMetric("aveMapSlotRefillMsecs", avgMapRefill); metricsRecord.setMetric("aveReduceSlotRefillMsecs", avgReduceRefill); metricsRecord.setMetric("maps_running", totalRunningMaps); metricsRecord.setMetric("reduces_running", totalRunningReduces); metricsRecord.setMetric("mapTaskSlots", totalMapSlots); metricsRecord.setMetric("reduceTaskSlots", totalReduceSlots); for (MetricsBase metricsBase : registry.getMetricsList()) { metricsBase.pushMetric(metricsRecord); } metricsRecord.update(); }
public TaskTrackerRunner(int id, TaskTracker tt) { super(); this.ttToRun = tt; this.id = id; }
/** * This is the typical flow for using the DistributedCache classes. * * @throws IOException * @throws LoginException */ public void testManagerFlow() throws IOException, LoginException { if (!canRun()) { return; } // ****** Imitate JobClient code // Configures a task/job with both a regular file and a "classpath" file. Configuration subConf = new Configuration(conf); String userName = getJobOwnerName(); subConf.set("user.name", userName); JobID jobid = new JobID("jt",1); DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf); DistributedCache.addFileToClassPath(secondCacheFile, subConf, FileSystem.get(subConf)); TrackerDistributedCacheManager.determineTimestamps(subConf); TrackerDistributedCacheManager.determineCacheVisibilities(subConf); // ****** End of imitating JobClient code Path jobFile = new Path(TEST_ROOT_DIR, "job.xml"); FileOutputStream os = new FileOutputStream(new File(jobFile.toString())); subConf.writeXml(os); os.close(); // ****** Imitate TaskRunner code. TrackerDistributedCacheManager manager = new TrackerDistributedCacheManager(conf, taskController); TaskDistributedCacheManager handle = manager.newTaskDistributedCacheManager(jobid, subConf); assertNull(null, DistributedCache.getLocalCacheFiles(subConf)); File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString()); handle.setupCache(subConf, TaskTracker.getPublicDistributedCacheDir(), TaskTracker.getPrivateDistributedCacheDir(userName)); JobLocalizer.downloadPrivateCache(subConf); // DOESN'T ACTUALLY HAPPEN IN THE TaskRunner (THIS IS A TODO) // handle.setupPrivateCache(localDirAllocator, TaskTracker // .getPrivateDistributedCacheDir(userName)); // // ****** End of imitating TaskRunner code Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf); assertNotNull(null, localCacheFiles); assertEquals(2, localCacheFiles.length); Path cachedFirstFile = localCacheFiles[0]; Path cachedSecondFile = localCacheFiles[1]; assertFileLengthEquals(firstCacheFile, cachedFirstFile); assertFalse("Paths should be different.", firstCacheFile.equals(cachedFirstFile)); assertEquals(1, handle.getClassPaths().size()); assertEquals(cachedSecondFile.toString(), handle.getClassPaths().get(0)); checkFilePermissions(localCacheFiles); // Cleanup handle.release(); manager.purgeCache(); assertFalse(pathToFile(cachedFirstFile).exists()); }
public void testSameNameFileArchiveCache() throws IOException, InterruptedException { if (!canRun()) { return; } TrackerDistributedCacheManager manager = new TrackerDistributedCacheManager( conf, taskController); String userName = getJobOwnerName(); File workDir = new File(TEST_ROOT_DIR, "workdir"); Path cacheFile = new Path(TEST_ROOT_DIR, "fileArchiveCacheFile"); createPublicTempFile(cacheFile); Configuration conf1 = new Configuration(conf); conf1.set("user.name", userName); DistributedCache.addCacheFile(cacheFile.toUri(), conf1); DistributedCache.addCacheArchive(cacheFile.toUri(), conf1); TrackerDistributedCacheManager.determineTimestamps(conf1); TrackerDistributedCacheManager.determineCacheVisibilities(conf1); dumpState(conf1); TaskDistributedCacheManager handle = manager .newTaskDistributedCacheManager(new JobID("jt", 1), conf1); handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(), TaskTracker.getPrivateDistributedCacheDir(userName)); TaskDistributedCacheManager.CacheFile cFile = handle.getCacheFiles().get(0); TaskDistributedCacheManager.CacheFile cArchive = handle.getCacheFiles() .get(1); String distCacheDir = TaskTracker.getPublicDistributedCacheDir(); Path localizedPathForFile = manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir, fs.getFileStatus(cacheFile), false, cFile.timestamp, true, cFile); Path localizedPathForArchive = manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir, fs.getFileStatus(cacheFile), true, cArchive.timestamp, true, cArchive); assertNotSame("File and Archive resolve to the same path: " + localizedPathForFile + ". Should differ.", localizedPathForFile, localizedPathForArchive); }
private void checkLocalizedPath(boolean visibility) throws IOException, LoginException, InterruptedException { TrackerDistributedCacheManager manager = new TrackerDistributedCacheManager(conf, taskController); String userName = getJobOwnerName(); File workDir = new File(TEST_ROOT_DIR, "workdir"); Path cacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile"); if (visibility) { createPublicTempFile(cacheFile); } else { createPrivateTempFile(cacheFile); } Configuration conf1 = new Configuration(conf); conf1.set("user.name", userName); DistributedCache.addCacheFile(cacheFile.toUri(), conf1); TrackerDistributedCacheManager.determineTimestamps(conf1); TrackerDistributedCacheManager.determineCacheVisibilities(conf1); dumpState(conf1); // Task localizing for job TaskDistributedCacheManager handle = manager .newTaskDistributedCacheManager(new JobID("jt", 1), conf1); handle.setupCache(conf1, TaskTracker.getPublicDistributedCacheDir(), TaskTracker.getPrivateDistributedCacheDir(userName)); JobLocalizer.downloadPrivateCache(conf1); TaskDistributedCacheManager.CacheFile c = handle.getCacheFiles().get(0); String distCacheDir; if (visibility) { distCacheDir = TaskTracker.getPublicDistributedCacheDir(); } else { distCacheDir = TaskTracker.getPrivateDistributedCacheDir(userName); } Path localizedPath = manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir, fs.getFileStatus(cacheFile), false, c.timestamp, visibility, c); assertTrue("Cache file didn't get localized in the expected directory. " + "Expected localization to happen within " + ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir + ", but was localized at " + localizedPath, localizedPath.toString().contains(distCacheDir)); if (visibility) { checkPublicFilePermissions(new Path[]{localizedPath}); } else { checkFilePermissions(new Path[]{localizedPath}); } }