public static ResourceLocalizationSpec newResourceLocalizationSpec( LocalResource rsrc, Path path) { URL local = ConverterUtils.getYarnUrlFromPath(path); ResourceLocalizationSpec resourceLocalizationSpec = Records.newRecord(ResourceLocalizationSpec.class); resourceLocalizationSpec.setDestinationDirectory(local); resourceLocalizationSpec.setResource(rsrc); return resourceLocalizationSpec; }
public void addResource(FileSystem fs, Configuration conf, Path destPath, Map<String, LocalResource> localResources, LocalResourceType resourceType, String link, Map<URI, FileStatus> statCache, boolean appMasterOnly) throws IOException { FileStatus destStatus = fs.getFileStatus(destPath); LocalResource amJarRsrc = Records.newRecord(LocalResource.class); amJarRsrc.setType(resourceType); LocalResourceVisibility visibility = getVisibility(conf, destPath.toUri(), statCache); amJarRsrc.setVisibility(visibility); amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath)); amJarRsrc.setTimestamp(destStatus.getModificationTime()); amJarRsrc.setSize(destStatus.getLen()); if (link == null || link.isEmpty()) throw new IOException("You must specify a valid link name"); localResources.put(link, amJarRsrc); }
@SuppressWarnings("deprecation") public static void setupDistributedCache(Configuration conf, Map<String, LocalResource> localResources) throws IOException { // Cache archives parseDistributedCacheArtifacts(conf, localResources, LocalResourceType.ARCHIVE, DistributedCache.getCacheArchives(conf), DistributedCache.getArchiveTimestamps(conf), getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), DistributedCache.getArchiveVisibilities(conf)); // Cache files parseDistributedCacheArtifacts(conf, localResources, LocalResourceType.FILE, DistributedCache.getCacheFiles(conf), DistributedCache.getFileTimestamps(conf), getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES), DistributedCache.getFileVisibilities(conf)); }
public boolean run() throws Exception { YarnClientApplication app = createApplication(); ApplicationId appId = app.getNewApplicationResponse().getApplicationId(); // Copy the application jar to the filesystem FileSystem fs = FileSystem.get(conf); String appIdStr = appId.toString(); Path dstJarPath = Utils.copyLocalFileToDfs(fs, appIdStr, new Path(tfJar), Constants.TF_JAR_NAME); Path dstLibPath = Utils.copyLocalFileToDfs(fs, appIdStr, new Path(tfLib), Constants.TF_LIB_NAME); Map<String, Path> files = new HashMap<>(); files.put(Constants.TF_JAR_NAME, dstJarPath); Map<String, LocalResource> localResources = Utils.makeLocalResources(fs, files); Map<String, String> javaEnv = Utils.setJavaEnv(conf); String command = makeAppMasterCommand(dstLibPath.toString(), dstJarPath.toString()); LOG.info("Make ApplicationMaster command: " + command); ContainerLaunchContext launchContext = ContainerLaunchContext.newInstance( localResources, javaEnv, Lists.newArrayList(command), null, null, null); Resource resource = Resource.newInstance(amMemory, amVCores); submitApplication(app, appName, launchContext, resource, amQueue); return awaitApplication(appId); }
static LocalResource createJar(FileContext files, Path p, LocalResourceVisibility vis) throws IOException { LOG.info("Create jar file " + p); File jarFile = new File((files.makeQualified(p)).toUri()); FileOutputStream stream = new FileOutputStream(jarFile); LOG.info("Create jar out stream "); JarOutputStream out = new JarOutputStream(stream, new Manifest()); LOG.info("Done writing jar stream "); out.close(); LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); ret.setResource(ConverterUtils.getYarnUrlFromPath(p)); FileStatus status = files.getFileStatus(p); ret.setSize(status.getLen()); ret.setTimestamp(status.getModificationTime()); ret.setType(LocalResourceType.PATTERN); ret.setVisibility(vis); ret.setPattern("classes/.*"); return ret; }
static LocalResource createJarFile(FileContext files, Path p, int len, Random r, LocalResourceVisibility vis) throws IOException, URISyntaxException { byte[] bytes = new byte[len]; r.nextBytes(bytes); File archiveFile = new File(p.toUri().getPath() + ".jar"); archiveFile.createNewFile(); JarOutputStream out = new JarOutputStream( new FileOutputStream(archiveFile)); out.putNextEntry(new JarEntry(p.getName())); out.write(bytes); out.closeEntry(); out.close(); LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString() + ".jar"))); ret.setSize(len); ret.setType(LocalResourceType.ARCHIVE); ret.setVisibility(vis); ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".jar")) .getModificationTime()); return ret; }
static LocalResource createZipFile(FileContext files, Path p, int len, Random r, LocalResourceVisibility vis) throws IOException, URISyntaxException { byte[] bytes = new byte[len]; r.nextBytes(bytes); File archiveFile = new File(p.toUri().getPath() + ".ZIP"); archiveFile.createNewFile(); ZipOutputStream out = new ZipOutputStream( new FileOutputStream(archiveFile)); out.putNextEntry(new ZipEntry(p.getName())); out.write(bytes); out.closeEntry(); out.close(); LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString() + ".ZIP"))); ret.setSize(len); ret.setType(LocalResourceType.ARCHIVE); ret.setVisibility(vis); ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".ZIP")) .getModificationTime()); return ret; }
private Path getPathForLocalization(LocalResource rsrc) throws IOException, URISyntaxException { String user = context.getUser(); ApplicationId appId = context.getContainerId().getApplicationAttemptId().getApplicationId(); LocalResourceVisibility vis = rsrc.getVisibility(); LocalResourcesTracker tracker = getLocalResourcesTracker(vis, user, appId); String cacheDirectory = null; if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only cacheDirectory = getUserFileCachePath(user); } else {// APPLICATION ONLY cacheDirectory = getAppFileCachePath(user, appId.toString()); } Path dirPath = dirsHandler.getLocalPathForWrite(cacheDirectory, ContainerLocalizer.getEstimatedSize(rsrc), false); return tracker.getPathForLocalization(new LocalResourceRequest(rsrc), dirPath, delService); }
/** * @param resource the local resource that contains the original remote path * @param localPath the path in the local filesystem where the resource is * localized * @param fs the filesystem of the shared cache * @param localFs the local filesystem */ public SharedCacheUploader(LocalResource resource, Path localPath, String user, Configuration conf, SCMUploaderProtocol scmClient, FileSystem fs, FileSystem localFs) { this.resource = resource; this.localPath = localPath; this.user = user; this.conf = conf; this.scmClient = scmClient; this.fs = fs; this.sharedCacheRootDir = conf.get(YarnConfiguration.SHARED_CACHE_ROOT, YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT); this.nestedLevel = SharedCacheUtil.getCacheDepth(conf); this.checksum = SharedCacheChecksumFactory.getChecksum(conf); this.localFs = localFs; this.recordFactory = RecordFactoryProvider.getRecordFactory(null); }
public ContainerLocalizer(FileContext lfs, String user, String appId, String localizerId, List<Path> localDirs, RecordFactory recordFactory) throws IOException { if (null == user) { throw new IOException("Cannot initialize for null user"); } if (null == localizerId) { throw new IOException("Cannot initialize for null containerId"); } this.lfs = lfs; this.user = user; this.appId = appId; this.localDirs = localDirs; this.localizerId = localizerId; this.recordFactory = recordFactory; this.conf = new Configuration(); this.appCacheDirContextName = String.format(APPCACHE_CTXT_FMT, appId); this.pendingResources = new HashMap<LocalResource,Future<Path>>(); }
static ResourceLocalizationSpec getMockRsrc(Random r, LocalResourceVisibility vis, Path p) { ResourceLocalizationSpec resourceLocalizationSpec = mock(ResourceLocalizationSpec.class); LocalResource rsrc = mock(LocalResource.class); String name = Long.toHexString(r.nextLong()); URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class); when(uri.getScheme()).thenReturn("file"); when(uri.getHost()).thenReturn(null); when(uri.getFile()).thenReturn("/local/" + vis + "/" + name); when(rsrc.getResource()).thenReturn(uri); when(rsrc.getSize()).thenReturn(r.nextInt(1024) + 1024L); when(rsrc.getTimestamp()).thenReturn(r.nextInt(1024) + 2048L); when(rsrc.getType()).thenReturn(LocalResourceType.FILE); when(rsrc.getVisibility()).thenReturn(vis); when(resourceLocalizationSpec.getResource()).thenReturn(rsrc); when(resourceLocalizationSpec.getDestinationDirectory()). thenReturn(ConverterUtils.getYarnUrlFromPath(p)); return resourceLocalizationSpec; }
/** * If resource is public, verifyAccess should succeed */ @Test public void testVerifyAccessPublicResource() throws Exception { Configuration conf = new Configuration(); conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true); LocalResource resource = mock(LocalResource.class); // give public visibility when(resource.getVisibility()).thenReturn(LocalResourceVisibility.PUBLIC); Path localPath = mock(Path.class); when(localPath.getName()).thenReturn("foo.jar"); String user = "joe"; SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class); FileSystem fs = mock(FileSystem.class); FileSystem localFs = FileSystem.getLocal(conf); SharedCacheUploader spied = createSpiedUploader(resource, localPath, user, conf, scmClient, fs, localFs); assertTrue(spied.verifyAccess()); }
/** * If the localPath does not exists, getActualPath should get to one level * down */ @Test public void testGetActualPath() throws Exception { Configuration conf = new Configuration(); conf.setBoolean(YarnConfiguration.SHARED_CACHE_ENABLED, true); LocalResource resource = mock(LocalResource.class); // give public visibility when(resource.getVisibility()).thenReturn(LocalResourceVisibility.PUBLIC); Path localPath = new Path("foo.jar"); String user = "joe"; SCMUploaderProtocol scmClient = mock(SCMUploaderProtocol.class); FileSystem fs = mock(FileSystem.class); FileSystem localFs = mock(FileSystem.class); // stub it to return a status that indicates a directory FileStatus status = mock(FileStatus.class); when(status.isDirectory()).thenReturn(true); when(localFs.getFileStatus(localPath)).thenReturn(status); SharedCacheUploader spied = createSpiedUploader(resource, localPath, user, conf, scmClient, fs, localFs); Path actualPath = spied.getActualPath(); assertEquals(actualPath.getName(), localPath.getName()); assertEquals(actualPath.getParent().getName(), localPath.getName()); }
public Map<Path, List<String>> doLocalizeResources( boolean checkLocalizingState, int skipRsrcCount) throws URISyntaxException { Path cache = new Path("file:///cache"); Map<Path, List<String>> localPaths = new HashMap<Path, List<String>>(); int counter = 0; for (Entry<String, LocalResource> rsrc : localResources.entrySet()) { if (counter++ < skipRsrcCount) { continue; } if (checkLocalizingState) { assertEquals(ContainerState.LOCALIZING, c.getContainerState()); } LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue()); Path p = new Path(cache, rsrc.getKey()); localPaths.put(p, Arrays.asList(rsrc.getKey())); // rsrc copied to p c.handle(new ContainerResourceLocalizedEvent(c.getContainerId(), req, p)); } drainDispatcherEvents(); return localPaths; }
static LocalizerHeartbeatResponse createLocalizerHeartbeatResponse() throws URISyntaxException { LocalizerHeartbeatResponse ret = recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class); assertTrue(ret instanceof LocalizerHeartbeatResponsePBImpl); ret.setLocalizerAction(LocalizerAction.LIVE); LocalResource rsrc = createResource(); ArrayList<ResourceLocalizationSpec> rsrcs = new ArrayList<ResourceLocalizationSpec>(); ResourceLocalizationSpec resource = recordFactory.newRecordInstance(ResourceLocalizationSpec.class); resource.setResource(rsrc); resource.setDestinationDirectory(ConverterUtils .getYarnUrlFromPath(new Path("/tmp" + System.currentTimeMillis()))); rsrcs.add(resource); ret.setResourceSpecs(rsrcs); System.out.println(resource); return ret; }
public static void setupDistributedCache( Configuration conf, Map<String, LocalResource> localResources) throws IOException { // Cache archives parseDistributedCacheArtifacts(conf, localResources, LocalResourceType.ARCHIVE, DistributedCache.getCacheArchives(conf), DistributedCache.getArchiveTimestamps(conf), getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), DistributedCache.getArchiveVisibilities(conf)); // Cache files parseDistributedCacheArtifacts(conf, localResources, LocalResourceType.FILE, DistributedCache.getCacheFiles(conf), DistributedCache.getFileTimestamps(conf), getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES), DistributedCache.getFileVisibilities(conf)); }
@SuppressWarnings("deprecation") public void testSetupDistributedCacheConflictsFiles() throws Exception { Configuration conf = new Configuration(); conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); URI mockUri = URI.create("mockfs://mock/"); FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf)) .getRawFileSystem(); URI file = new URI("mockfs://mock/tmp/something.zip#something"); Path filePath = new Path(file); URI file2 = new URI("mockfs://mock/tmp/something.txt#something"); Path file2Path = new Path(file2); when(mockFs.resolvePath(filePath)).thenReturn(filePath); when(mockFs.resolvePath(file2Path)).thenReturn(file2Path); DistributedCache.addCacheFile(file, conf); DistributedCache.addCacheFile(file2, conf); conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11"); conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11"); conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true"); Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); MRApps.setupDistributedCache(conf, localResources); assertEquals(1, localResources.size()); LocalResource lr = localResources.get("something"); //First one wins assertNotNull(lr); assertEquals(10l, lr.getSize()); assertEquals(10l, lr.getTimestamp()); assertEquals(LocalResourceType.FILE, lr.getType()); }
private Path getPathForLocalization(LocalResource rsrc) throws IOException, URISyntaxException { String user = context.getUser(); ApplicationId appId = context.getContainerId().getApplicationAttemptId().getApplicationId(); LocalResourceVisibility vis = rsrc.getVisibility(); LocalResourcesTracker tracker = getLocalResourcesTracker(vis, user, appId); String cacheDirectory = null; if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only cacheDirectory = getUserFileCachePath(user); } else {// APPLICATION ONLY cacheDirectory = getAppFileCachePath(user, appId.toString()); } Path dirPath = dirsHandler.getLocalPathForWrite(cacheDirectory, ContainerLocalizer.getEstimatedSize(rsrc), false); return tracker.getPathForLocalization(new LocalResourceRequest(rsrc), dirPath); }
private Map<String, LocalResource> setupEsYarnJar() { Map<String, LocalResource> resources = new LinkedHashMap<String, LocalResource>(); LocalResource esYarnJar = Records.newRecord(LocalResource.class); Path p = new Path(clientCfg.jarHdfsPath()); FileStatus fsStat; try { fsStat = FileSystem.get(client.getConfiguration()).getFileStatus(p); } catch (IOException ex) { throw new IllegalArgumentException( String.format("Cannot find jar [%s]; make sure the artifacts have been properly provisioned and the correct permissions are in place.", clientCfg.jarHdfsPath()), ex); } // use the normalized path as otherwise YARN chokes down the line esYarnJar.setResource(ConverterUtils.getYarnUrlFromPath(fsStat.getPath())); esYarnJar.setSize(fsStat.getLen()); esYarnJar.setTimestamp(fsStat.getModificationTime()); esYarnJar.setType(LocalResourceType.FILE); esYarnJar.setVisibility(LocalResourceVisibility.PUBLIC); resources.put(clientCfg.jarName(), esYarnJar); return resources; }
/** * Uploads and registers a single resource and adds it to <tt>localResources</tt>. * * @param key * the key to add the resource under * @param fs * the remote file system to upload to * @param appId * application ID * @param localSrcPath * local path to the file * @param localResources * map of resources * * @return the remote path to the uploaded resource */ private static Path setupSingleLocalResource( String key, FileSystem fs, ApplicationId appId, Path localSrcPath, Map<String, LocalResource> localResources, Path targetHomeDir, String relativeTargetPath) throws IOException, URISyntaxException { Tuple2<Path, LocalResource> resource = Utils.setupLocalResource( fs, appId.toString(), localSrcPath, targetHomeDir, relativeTargetPath); localResources.put(key, resource.f1); return resource.f0; }
public ContainerLaunchContext buildContainerContext(Map<String, LocalResource> localResources, YacopConfig yacopConfig) { ContainerLaunchContext ctx = null; try { List<String> commands = new ArrayList<>(); //cmd Vector<CharSequence> vargs = new Vector<>(5); vargs.add("(" + yacopConfig.getCmd() + ")"); vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); StringBuilder command = new StringBuilder(); for (CharSequence str : vargs) { command.append(str).append(" "); } commands.add(command.toString()); //tokens Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); DataOutputBuffer dob = new DataOutputBuffer(); credentials.writeTokenStorageToStream(dob); ByteBuffer allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); //ctx ctx = ContainerLaunchContext.newInstance(localResources, null, commands, null, allTokens.duplicate(), null); } catch (IOException e) { e.printStackTrace(); } return ctx; }
private static void parseDistributedCacheArtifacts(Configuration conf, Map<String, LocalResource> localResources, LocalResourceType type, URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[]) throws IOException { if (uris != null) { // Sanity check if ((uris.length != timestamps.length) || (uris.length != sizes.length) || (uris.length != visibilities.length)) { throw new IllegalArgumentException("Invalid specification for " + "distributed-cache artifacts of type " + type + " :" + " #uris=" + uris.length + " #timestamps=" + timestamps.length + " #visibilities=" + visibilities.length); } for (int i = 0; i < uris.length; ++i) { URI u = uris[i]; Path p = new Path(u); FileSystem remoteFS = p.getFileSystem(conf); p = remoteFS .resolvePath(p.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory())); // Add URI fragment or just the filename Path name = new Path((null == u.getFragment()) ? p.getName() : u.getFragment()); if (name.isAbsolute()) { throw new IllegalArgumentException("Resource name must be relative"); } String linkName = name.toUri().getPath(); LocalResource orig = localResources.get(linkName); org.apache.hadoop.yarn.api.records.URL url = ConverterUtils.getYarnUrlFromURI(p.toUri()); if (orig != null && !orig.getResource().equals(url)) { LOG.warn(getResourceDescription(orig.getType()) + toString(orig.getResource()) + " conflicts with " + getResourceDescription(type) + toString(url) + " This will be an error in Hadoop 2.0"); continue; } localResources.put(linkName, LocalResource.newInstance(ConverterUtils.getYarnUrlFromURI(p .toUri()), type, visibilities[i] ? LocalResourceVisibility.PUBLIC : LocalResourceVisibility.PRIVATE, sizes[i], timestamps[i])); } } }
private LocalResource toLocalResource(Path path, LocalResourceVisibility visibility) throws IOException { FileSystem fs = path.getFileSystem(clusterConf.conf()); FileStatus stat = fs.getFileStatus(path); return LocalResource.newInstance( ConverterUtils.getYarnUrlFromPath(path), LocalResourceType.FILE, visibility, stat.getLen(), stat.getModificationTime() ); }
private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type) throws IOException { LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class); FileStatus rsrcStat = fs.getFileStatus(p); rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs.getDefaultFileSystem().resolvePath(rsrcStat.getPath()))); rsrc.setSize(rsrcStat.getLen()); rsrc.setTimestamp(rsrcStat.getModificationTime()); rsrc.setType(type); rsrc.setVisibility(LocalResourceVisibility.APPLICATION); return rsrc; }