@SuppressWarnings("deprecation") public static void setupDistributedCache(Configuration conf, Map<String, LocalResource> localResources) throws IOException { // Cache archives parseDistributedCacheArtifacts(conf, localResources, LocalResourceType.ARCHIVE, DistributedCache.getCacheArchives(conf), DistributedCache.getArchiveTimestamps(conf), getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), DistributedCache.getArchiveVisibilities(conf)); // Cache files parseDistributedCacheArtifacts(conf, localResources, LocalResourceType.FILE, DistributedCache.getCacheFiles(conf), DistributedCache.getFileTimestamps(conf), getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES), DistributedCache.getFileVisibilities(conf)); }
public static void setupDistributedCache( Configuration conf, Map<String, LocalResource> localResources) throws IOException { // Cache archives parseDistributedCacheArtifacts(conf, localResources, LocalResourceType.ARCHIVE, DistributedCache.getCacheArchives(conf), DistributedCache.getArchiveTimestamps(conf), getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), DistributedCache.getArchiveVisibilities(conf)); // Cache files parseDistributedCacheArtifacts(conf, localResources, LocalResourceType.FILE, DistributedCache.getCacheFiles(conf), DistributedCache.getFileTimestamps(conf), getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES), DistributedCache.getFileVisibilities(conf)); }
@SuppressWarnings("deprecation") public void testSetupDistributedCacheConflictsFiles() throws Exception { Configuration conf = new Configuration(); conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); URI mockUri = URI.create("mockfs://mock/"); FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf)) .getRawFileSystem(); URI file = new URI("mockfs://mock/tmp/something.zip#something"); Path filePath = new Path(file); URI file2 = new URI("mockfs://mock/tmp/something.txt#something"); Path file2Path = new Path(file2); when(mockFs.resolvePath(filePath)).thenReturn(filePath); when(mockFs.resolvePath(file2Path)).thenReturn(file2Path); DistributedCache.addCacheFile(file, conf); DistributedCache.addCacheFile(file2, conf); conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11"); conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11"); conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true"); Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); MRApps.setupDistributedCache(conf, localResources); assertEquals(1, localResources.size()); LocalResource lr = localResources.get("something"); //First one wins assertNotNull(lr); assertEquals(10l, lr.getSize()); assertEquals(10l, lr.getTimestamp()); assertEquals(LocalResourceType.FILE, lr.getType()); }
@SuppressWarnings("deprecation") public static void setupDistributedCache( Configuration conf, Map<String, LocalResource> localResources) throws IOException { // Cache archives parseDistributedCacheArtifacts(conf, localResources, LocalResourceType.ARCHIVE, DistributedCache.getCacheArchives(conf), DistributedCache.getArchiveTimestamps(conf), getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), DistributedCache.getArchiveVisibilities(conf)); // Cache files parseDistributedCacheArtifacts(conf, localResources, LocalResourceType.FILE, DistributedCache.getCacheFiles(conf), DistributedCache.getFileTimestamps(conf), getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES), DistributedCache.getFileVisibilities(conf)); }
@SuppressWarnings("deprecation") public static void setClasspath(Map<String, String> environment, Configuration conf) throws IOException { String classpathEnvVar = Environment.CLASSPATH.name(); Apps.addToEnvironment(environment, classpathEnvVar, Environment.PWD.$()); Apps.addToEnvironment(environment, classpathEnvVar, Environment.PWD.$() + Path.SEPARATOR + "*"); // a * in the classpath will only find a .jar, so we need to filter out // all .jars and add everything else addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf), DistributedCache.getCacheFiles(conf), conf, environment, classpathEnvVar); addToClasspathIfNotJar(DistributedCache.getArchiveClassPaths(conf), DistributedCache.getCacheArchives(conf), conf, environment, classpathEnvVar); AngelApps.setAngelFrameworkClasspath(environment, conf); }
static boolean runJob(JobConf conf, Path inDir, Path outDir, int numMaps, int numReds) throws IOException, InterruptedException { FileSystem fs = FileSystem.get(conf); if (fs.exists(outDir)) { fs.delete(outDir, true); } if (!fs.exists(inDir)) { fs.mkdirs(inDir); } String input = "The quick brown fox\n" + "has many silly\n" + "red fox sox\n"; for (int i = 0; i < numMaps; ++i) { DataOutputStream file = fs.create(new Path(inDir, "part-" + i)); file.writeBytes(input); file.close(); } DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf, fs); conf.setOutputCommitter(CustomOutputCommitter.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputKeyClass(LongWritable.class); conf.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(conf, inDir); FileOutputFormat.setOutputPath(conf, outDir); conf.setNumMapTasks(numMaps); conf.setNumReduceTasks(numReds); JobClient jobClient = new JobClient(conf); RunningJob job = jobClient.submitJob(conf); return jobClient.monitorAndPrintJob(conf, job); }
@Test public void testCombinerShouldUpdateTheReporter() throws Exception { JobConf conf = new JobConf(mrCluster.getConfig()); int numMaps = 5; int numReds = 2; Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(), "testCombinerShouldUpdateTheReporter-in"); Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(), "testCombinerShouldUpdateTheReporter-out"); createInputOutPutFolder(in, out, numMaps); conf.setJobName("test-job-with-combiner"); conf.setMapperClass(IdentityMapper.class); conf.setCombinerClass(MyCombinerToCheckReporter.class); //conf.setJarByClass(MyCombinerToCheckReporter.class); conf.setReducerClass(IdentityReducer.class); DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf); conf.setOutputCommitter(CustomOutputCommitter.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputKeyClass(LongWritable.class); conf.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(conf, in); FileOutputFormat.setOutputPath(conf, out); conf.setNumMapTasks(numMaps); conf.setNumReduceTasks(numReds); runJob(conf); }
@SuppressWarnings("deprecation") public void testSetupDistributedCacheConflicts() throws Exception { Configuration conf = new Configuration(); conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); URI mockUri = URI.create("mockfs://mock/"); FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf)) .getRawFileSystem(); URI archive = new URI("mockfs://mock/tmp/something.zip#something"); Path archivePath = new Path(archive); URI file = new URI("mockfs://mock/tmp/something.txt#something"); Path filePath = new Path(file); when(mockFs.resolvePath(archivePath)).thenReturn(archivePath); when(mockFs.resolvePath(filePath)).thenReturn(filePath); DistributedCache.addCacheArchive(archive, conf); conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10"); conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10"); conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true"); DistributedCache.addCacheFile(file, conf); conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11"); conf.set(MRJobConfig.CACHE_FILES_SIZES, "11"); conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true"); Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); MRApps.setupDistributedCache(conf, localResources); assertEquals(1, localResources.size()); LocalResource lr = localResources.get("something"); //Archive wins assertNotNull(lr); assertEquals(10l, lr.getSize()); assertEquals(10l, lr.getTimestamp()); assertEquals(LocalResourceType.ARCHIVE, lr.getType()); }
@SuppressWarnings("deprecation") @Test(timeout = 120000, expected = InvalidJobConfException.class) public void testSetupDistributedCacheConflicts() throws Exception { Configuration conf = new Configuration(); conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); URI mockUri = URI.create("mockfs://mock/"); FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf)) .getRawFileSystem(); URI archive = new URI("mockfs://mock/tmp/something.zip#something"); Path archivePath = new Path(archive); URI file = new URI("mockfs://mock/tmp/something.txt#something"); Path filePath = new Path(file); when(mockFs.resolvePath(archivePath)).thenReturn(archivePath); when(mockFs.resolvePath(filePath)).thenReturn(filePath); DistributedCache.addCacheArchive(archive, conf); conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10"); conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10"); conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true"); DistributedCache.addCacheFile(file, conf); conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11"); conf.set(MRJobConfig.CACHE_FILES_SIZES, "11"); conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true"); Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); MRApps.setupDistributedCache(conf, localResources); }
@SuppressWarnings("deprecation") @Test(timeout = 120000, expected = InvalidJobConfException.class) public void testSetupDistributedCacheConflictsFiles() throws Exception { Configuration conf = new Configuration(); conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); URI mockUri = URI.create("mockfs://mock/"); FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf)) .getRawFileSystem(); URI file = new URI("mockfs://mock/tmp/something.zip#something"); Path filePath = new Path(file); URI file2 = new URI("mockfs://mock/tmp/something.txt#something"); Path file2Path = new Path(file2); when(mockFs.resolvePath(filePath)).thenReturn(filePath); when(mockFs.resolvePath(file2Path)).thenReturn(file2Path); DistributedCache.addCacheFile(file, conf); DistributedCache.addCacheFile(file2, conf); conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11"); conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11"); conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true"); Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); MRApps.setupDistributedCache(conf, localResources); }
@SuppressWarnings("deprecation") private void copyLog4jPropertyFile(Job job, Path submitJobDir, short replication) throws IOException { Configuration conf = job.getConfiguration(); String file = validateFilePath( conf.get(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE), conf); LOG.debug("default FileSystem: " + jtFs.getUri()); FsPermission mapredSysPerms = new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION); if (!jtFs.exists(submitJobDir)) { throw new IOException("Cannot find job submission directory! " + "It should just be created, so something wrong here."); } Path fileDir = JobSubmissionFiles.getJobLog4jFile(submitJobDir); // first copy local log4j.properties file to HDFS under submitJobDir if (file != null) { FileSystem.mkdirs(jtFs, fileDir, mapredSysPerms); URI tmpURI = null; try { tmpURI = new URI(file); } catch (URISyntaxException e) { throw new IllegalArgumentException(e); } Path tmp = new Path(tmpURI); Path newPath = copyRemoteFiles(fileDir, tmp, conf, replication); DistributedCache.addFileToClassPath(new Path(newPath.toUri().getPath()), conf); } }
@SuppressWarnings("deprecation") public void testSetupDistributedCacheConflictsFiles() throws Exception { Configuration conf = new Configuration(); conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); URI mockUri = URI.create("mockfs://mock/"); FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf)) .getRawFileSystem(); URI file = new URI("mockfs://mock/tmp/something.zip#something"); Path filePath = new Path(file); URI file2 = new URI("mockfs://mock/tmp/something.txt#something"); Path file2Path = new Path(file); when(mockFs.resolvePath(filePath)).thenReturn(filePath); when(mockFs.resolvePath(file2Path)).thenReturn(file2Path); DistributedCache.addCacheFile(file, conf); DistributedCache.addCacheFile(file2, conf); conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11"); conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11"); conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true"); Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); MRApps.setupDistributedCache(conf, localResources); assertEquals(1, localResources.size()); LocalResource lr = localResources.get("something"); //First one wins assertNotNull(lr); assertEquals(10l, lr.getSize()); assertEquals(10l, lr.getTimestamp()); assertEquals(LocalResourceType.FILE, lr.getType()); }
public static void setupDistCacheInputs(JobConf job, String indices, String pathsString, ArrayList<String> paths) { job.set(DISTCACHE_INPUT_INDICES, indices); job.set(DISTCACHE_INPUT_PATHS, pathsString); Path p = null; for(String spath : paths) { p = new Path(spath); DistributedCache.addCacheFile(p.toUri(), job); DistributedCache.createSymlink(job); } }
@SuppressWarnings("deprecation") public static void addClasspathToEnv(Map<String, String> environment, String classpathEnvVar, Configuration conf) throws IOException { MRApps.addToEnvironment( environment, classpathEnvVar, MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR, conf); MRApps.addToEnvironment( environment, classpathEnvVar, MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR, conf); MRApps.addToEnvironment( environment, classpathEnvVar, MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*", conf); MRApps.addToEnvironment( environment, classpathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*", conf); // a * in the classpath will only find a .jar, so we need to filter out // all .jars and add everything else addToClasspathIfNotJar(DistributedCache.getFileClassPaths(conf), DistributedCache.getCacheFiles(conf), conf, environment, classpathEnvVar); addToClasspathIfNotJar(DistributedCache.getArchiveClassPaths(conf), DistributedCache.getCacheArchives(conf), conf, environment, classpathEnvVar); }