@Test public void testGenerateHadoopConfig() throws Exception { Configuration cfg = HistoryLogUtils.generateHadoopConfig(DUMMY_CLIENT_ID, DUMMY_USER_NAME, DUMMY_BUCKET); // We are asserting that the properties involving substitution have been changed checkPropertySubstitution(this.configWithoutSubstitute, cfg, YarnConfiguration.NM_REMOTE_APP_LOG_DIR, "gs://" + DUMMY_BUCKET + "/logs/such-client"); checkPropertySubstitution(this.configWithoutSubstitute, cfg, JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, "gs://" + DUMMY_BUCKET + "/history/such-client/done-intermediate"); checkPropertySubstitution(this.configWithoutSubstitute, cfg, JHAdminConfig.MR_HISTORY_DONE_DIR, "gs://" + DUMMY_BUCKET + "/history/such-client/done"); // Some additional guards to check whether we accidentally load additional config assertEquals("Sizes of configuration must not differ. Except for the user, client-id and bucket properties", cfg.size(), this.configWithoutSubstitute.size() + 3); }
@Test public void testGetHistoryIntermediateDoneDirForUser() throws IOException { // Test relative path Configuration conf = new Configuration(); conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, "/mapred/history/done_intermediate"); conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name")); String pathStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf); Assert.assertEquals("/mapred/history/done_intermediate/" + System.getProperty("user.name"), pathStr); // Test fully qualified path // Create default configuration pointing to the minicluster conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, dfsCluster.getURI().toString()); FileOutputStream os = new FileOutputStream(coreSitePath); conf.writeXml(os); os.close(); // Simulate execution under a non-default namenode conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///"); pathStr = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf); Assert.assertEquals(dfsCluster.getURI().toString() + "/mapred/history/done_intermediate/" + System.getProperty("user.name"), pathStr); }
protected MRClientProtocol instantiateHistoryProxy() throws IOException { final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS); if (StringUtils.isEmpty(serviceAddr)) { return null; } LOG.debug("Connecting to HistoryServer at: " + serviceAddr); final YarnRPC rpc = YarnRPC.create(conf); LOG.debug("Connected to HistoryServer at: " + serviceAddr); UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() { @Override public MRClientProtocol run() { return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class, NetUtils.createSocketAddr(serviceAddr), conf); } }); }
@BeforeClass public static void setUp() throws IOException { JobConf conf = new JobConf(); fileSys = FileSystem.get(conf); fileSys.delete(new Path(TEST_ROOT_DIR), true); conf.set("mapred.job.tracker.handler.count", "1"); conf.set("mapred.job.tracker", "127.0.0.1:0"); conf.set("mapred.job.tracker.http.address", "127.0.0.1:0"); conf.set("mapred.task.tracker.http.address", "127.0.0.1:0"); conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, TEST_ROOT_DIR + "/intermediate"); conf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter .SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "true"); mr = new MiniMRCluster(1, "file:///", 1, null, null, conf); inDir = new Path(TEST_ROOT_DIR, "test-input"); String input = "The quick brown fox\n" + "has many silly\n" + "red fox sox\n"; DataOutputStream file = fileSys.create(new Path(inDir, "part-" + 0)); file.writeBytes(input); file.close(); emptyInDir = new Path(TEST_ROOT_DIR, "empty-input"); fileSys.mkdirs(emptyInDir); }
@Override public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) { if (!protocol .equals(HSClientProtocolPB.class)) { return null; } return new KerberosInfo() { @Override public Class<? extends Annotation> annotationType() { return null; } @Override public String serverPrincipal() { return JHAdminConfig.MR_HISTORY_PRINCIPAL; } @Override public String clientPrincipal() { return null; } }; }
@VisibleForTesting protected void initializeWebApp(Configuration conf) { webApp = new HsWebApp(history); InetSocketAddress bindAddress = MRWebAppUtil.getJHSWebBindAddress(conf); // NOTE: there should be a .at(InetSocketAddress) WebApps .$for("jobhistory", HistoryClientService.class, this, "ws") .with(conf) .withHttpSpnegoKeytabKey( JHAdminConfig.MR_WEBAPP_SPNEGO_KEYTAB_FILE_KEY) .withHttpSpnegoPrincipalKey( JHAdminConfig.MR_WEBAPP_SPNEGO_USER_NAME_KEY) .at(NetUtils.getHostPortString(bindAddress)).start(webApp); String connectHost = MRWebAppUtil.getJHSWebappURLWithoutScheme(conf).split(":")[0]; MRWebAppUtil.setJHSWebappURLWithoutScheme(conf, connectHost + ":" + webApp.getListenerAddress().getPort()); }
/** * Constructs an instance of the configured storage class * * @param conf the configuration * @return the state storage instance */ public static HistoryServerStateStoreService getStore(Configuration conf) { Class<? extends HistoryServerStateStoreService> storeClass = HistoryServerNullStateStoreService.class; boolean recoveryEnabled = conf.getBoolean( JHAdminConfig.MR_HS_RECOVERY_ENABLE, JHAdminConfig.DEFAULT_MR_HS_RECOVERY_ENABLE); if (recoveryEnabled) { storeClass = conf.getClass(JHAdminConfig.MR_HS_STATE_STORE, null, HistoryServerStateStoreService.class); if (storeClass == null) { throw new RuntimeException("Unable to locate storage class, check " + JHAdminConfig.MR_HS_STATE_STORE); } } return ReflectionUtils.newInstance(storeClass, conf); }
private int refreshUserToGroupsMappings() throws IOException { // Get the current configuration Configuration conf = getConf(); InetSocketAddress address = conf.getSocketAddr( JHAdminConfig.JHS_ADMIN_ADDRESS, JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS, JHAdminConfig.DEFAULT_JHS_ADMIN_PORT); RefreshUserMappingsProtocol refreshProtocol = HSProxies.createProxy(conf, address, RefreshUserMappingsProtocol.class, UserGroupInformation.getCurrentUser()); // Refresh the user-to-groups mappings refreshProtocol.refreshUserToGroupsMappings(); return 0; }
private int refreshSuperUserGroupsConfiguration() throws IOException { // Refresh the super-user groups Configuration conf = getConf(); InetSocketAddress address = conf.getSocketAddr( JHAdminConfig.JHS_ADMIN_ADDRESS, JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS, JHAdminConfig.DEFAULT_JHS_ADMIN_PORT); RefreshUserMappingsProtocol refreshProtocol = HSProxies.createProxy(conf, address, RefreshUserMappingsProtocol.class, UserGroupInformation.getCurrentUser()); // Refresh the super-user group mappings refreshProtocol.refreshSuperUserGroupsConfiguration(); return 0; }
@SuppressWarnings("deprecation") @Override public void initializeMemberVariables() { xmlFilename = new String("mapred-default.xml"); configurationClasses = new Class[] { MRJobConfig.class, MRConfig.class, JHAdminConfig.class, ShuffleHandler.class, FileOutputFormat.class, FileInputFormat.class, Job.class, NLineInputFormat.class, JobConf.class, FileOutputCommitter.class }; // Initialize used variables configurationPropsToSkipCompare = new HashSet<String>(); // Set error modes errorIfMissingConfigProps = true; errorIfMissingXmlProps = false; // Ignore deprecated MR1 properties in JobConf configurationPropsToSkipCompare .add(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY); configurationPropsToSkipCompare .add(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY); }
@Override protected void serviceStart() throws Exception { super.serviceStart(); //need to do this because historyServer.init creates a new Configuration getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS, historyServer.getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS)); MRWebAppUtil.setJHSWebappURLWithoutScheme(getConfig(), MRWebAppUtil.getJHSWebappURLWithoutScheme(historyServer.getConfig())); LOG.info("MiniMRYARN ResourceManager address: " + getConfig().get(YarnConfiguration.RM_ADDRESS)); LOG.info("MiniMRYARN ResourceManager web address: " + WebAppUtils.getRMWebAppURLWithoutScheme(getConfig())); LOG.info("MiniMRYARN HistoryServer address: " + getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS)); LOG.info("MiniMRYARN HistoryServer web address: " + getResolvedMRHistoryWebAppURLWithoutScheme(getConfig(), MRWebAppUtil.getJHSHttpPolicy() == HttpConfig.Policy.HTTPS_ONLY)); }
@Test public void testHistoryFileInfoSummaryFileNotExist() throws Exception { HistoryFileManagerTest hmTest = new HistoryFileManagerTest(); String job = "job_1410889000000_123456"; Path summaryFile = new Path(job + ".summary"); JobIndexInfo jobIndexInfo = new JobIndexInfo(); jobIndexInfo.setJobId(TypeConverter.toYarn(JobID.forName(job))); Configuration conf = dfsCluster.getConfiguration(0); conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, "/" + UUID.randomUUID()); conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, "/" + UUID.randomUUID()); hmTest.serviceInit(conf); HistoryFileInfo info = hmTest.getHistoryFileInfo(null, null, summaryFile, jobIndexInfo, false); info.moveToDone(); Assert.assertFalse(info.didMoveFail()); }
/** * Starts a minimal JobHistoryServer */ public static void startJHS(Configuration cfg) { try { JobHistoryServer jobHistoryServer = new JobHistoryServer(); jobHistoryServer.init(cfg); logger.info(String.format("Starting JobHistoryServer on: http://%s", cfg.get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS))); jobHistoryServer.start(); } catch (Exception e) { logger.error("Error starting JobHistoryServer", e); System.exit(1); } }
private HSClientProtocol instantiateHistoryProxy() { final String serviceAddr = mrCluster.getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS); final YarnRPC rpc = YarnRPC.create(conf); HSClientProtocol historyClient = (HSClientProtocol) rpc.getProxy(HSClientProtocol.class, NetUtils.createSocketAddr(serviceAddr), mrCluster.getConfig()); return historyClient; }
public static String getResolvedMRHistoryWebAppURLWithoutScheme( Configuration conf, boolean isSSLEnabled) { InetSocketAddress address = null; if (isSSLEnabled) { address = conf.getSocketAddr(JHAdminConfig.MR_HISTORY_WEBAPP_HTTPS_ADDRESS, JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_HTTPS_ADDRESS, JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_HTTPS_PORT); } else { address = conf.getSocketAddr(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS, JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS, JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_PORT); } address = NetUtils.getConnectAddress(address); StringBuffer sb = new StringBuffer(); InetAddress resolved = address.getAddress(); if (resolved == null || resolved.isAnyLocalAddress() || resolved.isLoopbackAddress()) { String lh = address.getHostName(); try { lh = InetAddress.getLocalHost().getCanonicalHostName(); } catch (UnknownHostException e) { //Ignore and fallback. } sb.append(lh); } else { sb.append(address.getHostName()); } sb.append(":").append(address.getPort()); return sb.toString(); }
public static void initialize(Configuration conf) { setHttpPolicyInYARN(conf.get( YarnConfiguration.YARN_HTTP_POLICY_KEY, YarnConfiguration.YARN_HTTP_POLICY_DEFAULT)); setHttpPolicyInJHS(conf.get(JHAdminConfig.MR_HS_HTTP_POLICY, JHAdminConfig.DEFAULT_MR_HS_HTTP_POLICY)); }
public static void setJHSWebappURLWithoutScheme(Configuration conf, String hostAddress) { if (httpPolicyInJHS == Policy.HTTPS_ONLY) { conf.set(JHAdminConfig.MR_HISTORY_WEBAPP_HTTPS_ADDRESS, hostAddress); } else { conf.set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS, hostAddress); } }
public static String getJHSWebappURLWithoutScheme(Configuration conf) { if (httpPolicyInJHS == Policy.HTTPS_ONLY) { return conf.get(JHAdminConfig.MR_HISTORY_WEBAPP_HTTPS_ADDRESS, JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_HTTPS_ADDRESS); } else { return conf.get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS, JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS); } }
public static InetSocketAddress getJHSWebBindAddress(Configuration conf) { if (httpPolicyInJHS == Policy.HTTPS_ONLY) { return conf.getSocketAddr( JHAdminConfig.MR_HISTORY_BIND_HOST, JHAdminConfig.MR_HISTORY_WEBAPP_HTTPS_ADDRESS, JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_HTTPS_ADDRESS, JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_HTTPS_PORT); } else { return conf.getSocketAddr( JHAdminConfig.MR_HISTORY_BIND_HOST, JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS, JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS, JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_PORT); } }
public static String getApplicationWebURLOnJHSWithoutScheme(Configuration conf, ApplicationId appId) throws UnknownHostException { //construct the history url for job String addr = getJHSWebappURLWithoutScheme(conf); Iterator<String> it = ADDR_SPLITTER.split(addr).iterator(); it.next(); // ignore the bind host String port = it.next(); // Use hs address to figure out the host for webapp addr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS, JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS); String host = ADDR_SPLITTER.split(addr).iterator().next(); String hsAddress = JOINER.join(host, ":", port); InetSocketAddress address = NetUtils.createSocketAddr( hsAddress, getDefaultJHSWebappPort(), getDefaultJHSWebappURLWithoutScheme()); StringBuffer sb = new StringBuffer(); if (address.getAddress().isAnyLocalAddress() || address.getAddress().isLoopbackAddress()) { sb.append(InetAddress.getLocalHost().getCanonicalHostName()); } else { sb.append(address.getHostName()); } sb.append(":").append(address.getPort()); sb.append("/jobhistory/job/"); JobID jobId = TypeConverter.fromYarn(appId); sb.append(jobId.toString()); return sb.toString(); }
@Test public void testProducesHistoryServerUriForAppId() throws URISyntaxException { final String historyAddress = "example.net:424242"; YarnConfiguration conf = new YarnConfiguration(); conf.set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS, historyAddress); MapReduceTrackingUriPlugin plugin = new MapReduceTrackingUriPlugin(); plugin.setConf(conf); ApplicationId id = ApplicationId.newInstance(6384623l, 5); String jobSuffix = id.toString().replaceFirst("^application_", "job_"); URI expected = new URI("http://" + historyAddress + "/jobhistory/job/" + jobSuffix); URI actual = plugin.getTrackingUri(id); assertEquals(expected, actual); }
@SuppressWarnings("serial") private void createLoadedJobCache(Configuration conf) { loadedJobCacheSize = conf.getInt( JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, JHAdminConfig.DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE); loadedJobCache = Collections.synchronizedMap(new LinkedHashMap<JobId, Job>( loadedJobCacheSize + 1, 0.75f, true) { @Override public boolean removeEldestEntry(final Map.Entry<JobId, Job> eldest) { return super.size() > loadedJobCacheSize; } }); }
private Path createStorageDir(Configuration conf) throws IOException { String confPath = conf.get(JHAdminConfig.MR_HS_LEVELDB_STATE_STORE_PATH); if (confPath == null) { throw new IOException("No store location directory configured in " + JHAdminConfig.MR_HS_LEVELDB_STATE_STORE_PATH); } Path root = new Path(confPath, DB_NAME); FileSystem fs = FileSystem.getLocal(conf); fs.mkdirs(root, new FsPermission((short)0700)); return root; }
protected void serviceStart() throws Exception { Configuration conf = getConfig(); YarnRPC rpc = YarnRPC.create(conf); initializeWebApp(conf); InetSocketAddress address = conf.getSocketAddr( JHAdminConfig.MR_HISTORY_BIND_HOST, JHAdminConfig.MR_HISTORY_ADDRESS, JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS, JHAdminConfig.DEFAULT_MR_HISTORY_PORT); server = rpc.getServer(HSClientProtocol.class, protocolHandler, address, conf, jhsDTSecretManager, conf.getInt(JHAdminConfig.MR_HISTORY_CLIENT_THREAD_COUNT, JHAdminConfig.DEFAULT_MR_HISTORY_CLIENT_THREAD_COUNT)); // Enable service authorization? if (conf.getBoolean( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { server.refreshServiceAcl(conf, new ClientHSPolicyProvider()); } server.start(); this.bindAddress = conf.updateConnectAddr(JHAdminConfig.MR_HISTORY_BIND_HOST, JHAdminConfig.MR_HISTORY_ADDRESS, JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS, server.getListenerAddress()); LOG.info("Instantiated HistoryClientService at " + this.bindAddress); super.serviceStart(); }
@Override protected void serviceInit(Configuration conf) throws Exception { LOG.info("JobHistory Init"); this.conf = conf; this.appID = ApplicationId.newInstance(0, 0); this.appAttemptID = RecordFactoryProvider.getRecordFactory(conf) .newRecordInstance(ApplicationAttemptId.class); moveThreadInterval = conf.getLong( JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS, JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS); hsManager = createHistoryFileManager(); hsManager.init(conf); try { hsManager.initExisting(); } catch (IOException e) { throw new YarnRuntimeException("Failed to intialize existing directories", e); } storage = createHistoryStorage(); if (storage instanceof Service) { ((Service) storage).init(conf); } storage.setHistoryFileManager(hsManager); super.serviceInit(conf); }