/** * Creates job-acls.xml under the given directory logDir and writes * job-view-acl, queue-admins-acl, jobOwner name and queue name into this * file. * queue name is the queue to which the job was submitted to. * queue-admins-acl is the queue admins ACL of the queue to which this * job was submitted to. * @param conf job configuration * @param logDir job userlog dir * @throws IOException */ private void writeJobACLs(JobConf conf, Path logDir) throws IOException { JobConf aclConf = new JobConf(false); // set the job view acl in aclConf String jobViewACL = conf.get(JobContext.JOB_ACL_VIEW_JOB, " "); aclConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACL); // set the job queue name in aclConf String queue = conf.getQueueName(); aclConf.setQueueName(queue); // set the queue admins acl in aclConf String qACLName = QueueManager.toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()); String queueAdminsACL = conf.get(qACLName, " "); aclConf.set(qACLName, queueAdminsACL); // set jobOwner as user.name in aclConf aclConf.set("user.name", user); OutputStream out = null; Path aclFile = new Path(logDir, TaskTracker.jobACLsFile); try { out = lfs.create(aclFile); aclConf.writeXml(out); } finally { IOUtils.cleanup(LOG, out); } lfs.setPermission(aclFile, urw_gr); }
/** * Validates if the given user has job view permissions for this job. * conf contains jobOwner and job-view-ACLs. * We allow jobOwner, superUser(i.e. mrOwner) and cluster administrators and * users and groups specified in configuration using * mapreduce.job.acl-view-job to view job. */ private void checkAccessForTaskLogs(JobConf conf, String user, String jobId, TaskTracker tracker) throws AccessControlException { if (!tracker.areACLsEnabled()) { return; } // build job view ACL by reading from conf AccessControlList jobViewACL = tracker.getJobACLsManager(). constructJobACLs(conf).get(JobACL.VIEW_JOB); // read job queue name from conf String queue = conf.getQueueName(); // build queue admins ACL by reading from conf AccessControlList queueAdminsACL = new AccessControlList( conf.get(QueueManager.toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), " ")); String jobOwner = conf.get("user.name"); UserGroupInformation callerUGI = UserGroupInformation.createRemoteUser(user); // check if user is queue admin or cluster admin or jobOwner or member of // job-view-acl if (!queueAdminsACL.isUserAllowed(callerUGI)) { tracker.getACLsManager().checkAccess(jobId, callerUGI, queue, Operation.VIEW_TASK_LOGS, jobOwner, jobViewACL); } }
/** * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getQueueAdmins(String) */ public AccessControlList getQueueAdmins(String queueName) throws IOException { AccessControlList acl = queueManager.getQueueACL(queueName, QueueACL.ADMINISTER_JOBS); if (acl == null) { acl = new AccessControlList(" "); } return acl; }
protected void validateJobACLsFileContent() { JobConf jobACLsConf = TaskLogServlet.getConfFromJobACLsFile(jobId); assertTrue(jobACLsConf.get("user.name").equals( localizedJobConf.getUser())); assertTrue(jobACLsConf.get(JobContext.JOB_ACL_VIEW_JOB). equals(localizedJobConf.get(JobContext.JOB_ACL_VIEW_JOB))); String queue = localizedJobConf.getQueueName(); assertTrue(queue.equalsIgnoreCase(jobACLsConf.getQueueName())); String qACLName = QueueManager.toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()); assertTrue(jobACLsConf.get(qACLName).equals( localizedJobConf.get(qACLName))); }
private void startCluster(boolean reStart) throws IOException { UserGroupInformation MR_UGI = UserGroupInformation.getLoginUser(); JobConf conf = new JobConf(); // Enable queue and job level authorization conf.setBoolean(JobConf.MR_ACLS_ENABLED, true); // qAdmin is a queue administrator for default queue conf.set(QueueManager.toFullPropertyName( "default", QueueACL.ADMINISTER_JOBS.getAclName()), qAdmin); conf.set(QueueManager.toFullPropertyName( "default", QueueACL.SUBMIT_JOB.getAclName()), jobSubmitter); // Enable CompletedJobStore FileSystem fs = FileSystem.getLocal(conf); if (!reStart) { fs.delete(TEST_DIR, true); } conf.set("mapred.job.tracker.persist.jobstatus.dir", fs.makeQualified(TEST_DIR).toString()); conf.setBoolean("mapred.job.tracker.persist.jobstatus.active", true); conf.set("mapred.job.tracker.persist.jobstatus.hours", "1"); // Let us have enough slots sothat there won't be contention for slots // for launching JOB_CLEANUP tasks conf.set("mapred.tasktracker.map.tasks.maximum", "4"); mr = new MiniMRCluster(0, 0, 2, "file:///", 1, null, null, MR_UGI, conf); }
@Before protected void setUp() throws Exception { if (!canRun()) { return; } TEST_ROOT_DIR = new File(System.getProperty("test.build.data", "/tmp"), getClass() .getSimpleName()); if (!TEST_ROOT_DIR.exists()) { TEST_ROOT_DIR.mkdirs(); } ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local"); ROOT_MAPRED_LOCAL_DIR.mkdirs(); HADOOP_LOG_DIR = new File(TEST_ROOT_DIR, "logs"); HADOOP_LOG_DIR.mkdir(); System.setProperty("hadoop.log.dir", HADOOP_LOG_DIR.getAbsolutePath()); trackerFConf = new JobConf(); trackerFConf.set("fs.default.name", "file:///"); localDirs = new String[numLocalDirs]; for (int i = 0; i < numLocalDirs; i++) { localDirs[i] = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i).getPath(); } trackerFConf.setStrings("mapred.local.dir", localDirs); trackerFConf.setBoolean(JobConf.MR_ACLS_ENABLED, true); // Create the job configuration file. Same as trackerConf in this test. jobConf = new JobConf(trackerFConf); // Set job view ACLs in conf sothat validation of contents of jobACLsFile // can be done against this value. Have both users and groups String jobViewACLs = "user1,user2, group1,group2"; jobConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACLs); jobConf.setInt("mapred.userlog.retain.hours", 0); jobConf.setUser(getJobOwner().getShortUserName()); // set job queue name in job conf String queue = "default"; jobConf.setQueueName(queue); // Set queue admins acl in job conf similar to what JobClient does jobConf.set(QueueManager.toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), "qAdmin1,qAdmin2 qAdminsGroup1,qAdminsGroup2"); String jtIdentifier = "200907202331"; jobId = new JobID(jtIdentifier, 1); // JobClient uploads the job jar to the file system and sets it in the // jobConf. uploadJobJar(jobConf); // JobClient uploads the jobConf to the file system. jobConfFile = uploadJobConf(jobConf); // create jobTokens file uploadJobTokensFile(); taskTrackerUGI = UserGroupInformation.getCurrentUser(); startTracker(); // Set up the task to be localized taskId = new TaskAttemptID(jtIdentifier, jobId.getId(), true, 1, 0); createTask(); // mimic register task // create the tip tip = tracker.new TaskInProgress(task, trackerFConf); }
public void testQueueAclRefreshWithInvalidConfFile() throws IOException { try { String queueConfigPath = System.getProperty("test.build.extraconf", "build/test/extraconf"); File queueConfigFile = new File(queueConfigPath, QueueManager.QUEUE_ACLS_FILE_NAME ); File hadoopConfigFile = new File(queueConfigPath, "hadoop-site.xml"); try { // queue properties with which the cluster is started. Properties hadoopConfProps = new Properties(); hadoopConfProps.put("mapred.queue.names", "default,q1,q2"); hadoopConfProps.put(JobConf.MR_ACLS_ENABLED, "true"); UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile); //properties for mapred-queue-acls.xml Properties queueConfProps = new Properties(); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); queueConfProps.put(QueueManager.toFullPropertyName ("default", submitAcl), ugi.getShortUserName()); queueConfProps.put(QueueManager.toFullPropertyName ("q1", submitAcl), ugi.getShortUserName()); queueConfProps.put(QueueManager.toFullPropertyName ("q2", submitAcl), ugi.getShortUserName()); UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile); Configuration conf = new JobConf(); QueueManager queueManager = new QueueManager(conf); //Testing access to queue. assertTrue("User Job Submission failed.", queueManager.hasAccess("default", QueueACL.SUBMIT_JOB, ugi)); assertTrue("User Job Submission failed.", queueManager.hasAccess("q1", QueueACL.SUBMIT_JOB, ugi)); assertTrue("User Job Submission failed.", queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, ugi)); //Write out a new incomplete invalid configuration file. PrintWriter writer = new PrintWriter(new FileOutputStream(queueConfigFile)); writer.println("<configuration>"); writer.println("<property>"); writer.flush(); writer.close(); try { //Exception to be thrown by queue manager because configuration passed //is invalid. queueManager.refreshQueues(conf); fail("Refresh of ACLs should have failed with invalid conf file."); } catch (Exception e) { } assertTrue("User Job Submission failed after invalid conf file refresh.", queueManager.hasAccess("default", QueueACL.SUBMIT_JOB, ugi)); assertTrue("User Job Submission failed after invalid conf file refresh.", queueManager.hasAccess("q1", QueueACL.SUBMIT_JOB, ugi)); assertTrue("User Job Submission failed after invalid conf file refresh.", queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, ugi)); } finally { //Cleanup the configuration files in all cases if(hadoopConfigFile.exists()) { hadoopConfigFile.delete(); } if(queueConfigFile.exists()) { queueConfigFile.delete(); } } } finally { tearDownCluster(); } }
@Override protected void setUp() throws Exception { if (!canRun()) { return; } TEST_ROOT_DIR = new File(System.getProperty("test.build.data", "/tmp"), getClass() .getSimpleName()); if (!TEST_ROOT_DIR.exists()) { TEST_ROOT_DIR.mkdirs(); } ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local"); ROOT_MAPRED_LOCAL_DIR.mkdirs(); HADOOP_LOG_DIR = new File(TEST_ROOT_DIR, "logs"); HADOOP_LOG_DIR.mkdir(); System.setProperty("hadoop.log.dir", HADOOP_LOG_DIR.getAbsolutePath()); trackerFConf = new JobConf(); trackerFConf.set("fs.default.name", "file:///"); localDirs = new String[numLocalDirs]; for (int i = 0; i < numLocalDirs; i++) { localDirs[i] = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i).getPath(); } trackerFConf.setStrings("mapred.local.dir", localDirs); trackerFConf.setBoolean(JobConf.MR_ACLS_ENABLED, true); // Create the job configuration file. Same as trackerConf in this test. jobConf = new JobConf(trackerFConf); // Set job view ACLs in conf sothat validation of contents of jobACLsFile // can be done against this value. Have both users and groups String jobViewACLs = "user1,user2, group1,group2"; jobConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACLs); jobConf.setInt("mapred.userlog.retain.hours", 0); jobConf.setUser(getJobOwner().getShortUserName()); // set job queue name in job conf String queue = "default"; jobConf.setQueueName(queue); // Set queue admins acl in job conf similar to what JobClient does jobConf.set(QueueManager.toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), "qAdmin1,qAdmin2 qAdminsGroup1,qAdminsGroup2"); String jtIdentifier = "200907202331"; jobId = new JobID(jtIdentifier, 1); // JobClient uploads the job jar to the file system and sets it in the // jobConf. uploadJobJar(jobConf); // JobClient uploads the jobConf to the file system. jobConfFile = uploadJobConf(jobConf); // create jobTokens file uploadJobTokensFile(); taskTrackerUGI = UserGroupInformation.getCurrentUser(); startTracker(); // Set up the task to be localized taskId = new TaskAttemptID(jtIdentifier, jobId.getId(), true, 1, 0); createTask(); // mimic register task // create the tip tip = tracker.new TaskInProgress(task, trackerFConf); }