public static FileInputStream openLogFileForRead(String containerIdStr, File logFile, Context context) throws IOException { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); ApplicationId applicationId = containerId.getApplicationAttemptId() .getApplicationId(); String user = context.getApplications().get( applicationId).getUser(); try { return SecureIOUtils.openForRead(logFile, user, null); } catch (IOException e) { if (e.getMessage().contains( "did not match expected owner '" + user + "'")) { LOG.error( "Exception reading log file " + logFile.getAbsolutePath(), e); throw new IOException("Exception reading log file. Application submitted by '" + user + "' doesn't own requested log file : " + logFile.getName(), e); } else { throw new IOException("Exception reading log file. It might be because log " + "file was aggregated : " + logFile.getName(), e); } } }
public static FileInputStream openLogFileForRead(String containerIdStr, File logFile, Context context) throws IOException { ContainerId containerId = ContainerId.fromString(containerIdStr); ApplicationId applicationId = containerId.getApplicationAttemptId() .getApplicationId(); String user = context.getApplications().get( applicationId).getUser(); try { return SecureIOUtils.openForRead(logFile, user, null); } catch (IOException e) { if (e.getMessage().contains( "did not match expected owner '" + user + "'")) { LOG.error( "Exception reading log file " + logFile.getAbsolutePath(), e); throw new IOException("Exception reading log file. Application submitted by '" + user + "' doesn't own requested log file : " + logFile.getName(), e); } else { throw new IOException("Exception reading log file. It might be because log " + "file was aggregated : " + logFile.getName(), e); } } }
public SpillRecord(Path indexFileName, JobConf job, Checksum crc, String expectedIndexOwner) throws IOException { final FileSystem rfs = FileSystem.getLocal(job).getRaw(); final FSDataInputStream in = SecureIOUtils.openFSDataInputStream(new File(indexFileName.toUri() .getRawPath()), expectedIndexOwner, null); try { final long length = rfs.getFileStatus(indexFileName).getLen(); final int partitions = (int) length / MAP_OUTPUT_INDEX_RECORD_LENGTH; final int size = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH; buf = ByteBuffer.allocate(size); if (crc != null) { crc.reset(); CheckedInputStream chk = new CheckedInputStream(in, crc); IOUtils.readFully(chk, buf.array(), 0, size); if (chk.getChecksum().getValue() != in.readLong()) { throw new ChecksumException("Checksum error reading spill index: " + indexFileName, -1); } } else { IOUtils.readFully(in, buf.array(), 0, size); } entries = buf.asLongBuffer(); } finally { in.close(); } }
/** * Read a log file from start to end positions. The offsets may be negative, * in which case they are relative to the end of the file. For example, * Reader(taskid, kind, 0, -1) is the entire file and * Reader(taskid, kind, -4197, -1) is the last 4196 bytes. * @param taskid the id of the task to read the log file for * @param kind the kind of log to read * @param start the offset to read from (negative is relative to tail) * @param end the offset to read upto (negative is relative to tail) * @param isCleanup whether the attempt is cleanup attempt or not * @throws IOException */ public Reader(TaskAttemptID taskid, LogName kind, long start, long end, boolean isCleanup) throws IOException { // find the right log file LogFileDetail fileDetail = getLogFileDetail(taskid, kind, isCleanup); // calculate the start and stop long size = fileDetail.length; if (start < 0) { start += size + 1; } if (end < 0) { end += size + 1; } start = Math.max(0, Math.min(start, size)); end = Math.max(0, Math.min(end, size)); start += fileDetail.start; end += fileDetail.start; bytesRemaining = end - start; String owner = obtainLogDirOwner(taskid); file = SecureIOUtils.openForRead(new File(fileDetail.location, kind.toString()), owner, null); // skip upto start long pos = 0; while (pos < start) { long result = file.skip(start - pos); if (result < 0) { bytesRemaining = 0; break; } pos += result; } }
public SpillRecord(Path indexFileName, JobConf job, Checksum crc, String expectedIndexOwner) throws IOException { final FileSystem rfs = FileSystem.getLocal(job).getRaw(); final DataInputStream in = new DataInputStream(SecureIOUtils.openForRead( new File(indexFileName.toUri().getPath()), expectedIndexOwner, null)); try { final long length = rfs.getFileStatus(indexFileName).getLen(); final int partitions = (int) length / MAP_OUTPUT_INDEX_RECORD_LENGTH; final int size = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH; buf = ByteBuffer.allocate(size); if (crc != null) { crc.reset(); CheckedInputStream chk = new CheckedInputStream(in, crc); IOUtils.readFully(chk, buf.array(), 0, size); if (chk.getChecksum().getValue() != in.readLong()) { throw new ChecksumException("Checksum error reading spill index: " + indexFileName, -1); } } else { IOUtils.readFully(in, buf.array(), 0, size); } entries = buf.asLongBuffer(); } finally { in.close(); } }
static void writeToIndexFile(String logLocation, TaskAttemptID currentTaskid, boolean isCleanup, Map<LogName, Long[]> lengths) throws IOException { // To ensure atomicity of updates to index file, write to temporary index // file first and then rename. File tmpIndexFile = getTmpIndexFile(currentTaskid, isCleanup); BufferedOutputStream bos = new BufferedOutputStream( SecureIOUtils.createForWrite(tmpIndexFile, 0644)); DataOutputStream dos = new DataOutputStream(bos); //the format of the index file is //LOG_DIR: <the dir where the task logs are really stored> //STDOUT: <start-offset in the stdout file> <length> //STDERR: <start-offset in the stderr file> <length> //SYSLOG: <start-offset in the syslog file> <length> dos.writeBytes(LogFileDetail.LOCATION + logLocation + "\n"); for (LogName logName : LOGS_TRACKED_BY_INDEX_FILES) { Long[] lens = lengths.get(logName); dos.writeBytes(logName.toString() + ":" + lens[0].toString() + " " + Long.toString(lens[1].longValue() - lens[0].longValue()) + "\n");} dos.close(); File indexFile = getIndexFile(currentTaskid, isCleanup); Path indexFilePath = new Path(indexFile.getAbsolutePath()); Path tmpIndexFilePath = new Path(tmpIndexFile.getAbsolutePath()); if (localFS == null) {// set localFS once localFS = FileSystem.getLocal(new Configuration()); } localFS.rename (tmpIndexFilePath, indexFilePath); }
/** * Read a log file from start to end positions. The offsets may be negative, * in which case they are relative to the end of the file. For example, * Reader(taskid, kind, 0, -1) is the entire file and * Reader(taskid, kind, -4197, -1) is the last 4196 bytes. * @param taskid the id of the task to read the log file for * @param kind the kind of log to read * @param start the offset to read from (negative is relative to tail) * @param end the offset to read upto (negative is relative to tail) * @param isCleanup whether the attempt is cleanup attempt or not * @throws IOException */ public Reader(TaskAttemptID taskid, LogName kind, long start, long end, boolean isCleanup) throws IOException { // find the right log file Map<LogName, LogFileDetail> allFilesDetails = getAllLogsFileDetails(taskid, isCleanup); LogFileDetail fileDetail = allFilesDetails.get(kind); // calculate the start and stop long size = fileDetail.length; if (start < 0) { start += size + 1; } if (end < 0) { end += size + 1; } start = Math.max(0, Math.min(start, size)); end = Math.max(0, Math.min(end, size)); start += fileDetail.start; end += fileDetail.start; bytesRemaining = end - start; String owner = obtainLogDirOwner(taskid); file = SecureIOUtils.openForRead(new File(fileDetail.location, kind.toString()), owner, null); // skip upto start long pos = 0; while (pos < start) { long result = file.skip(start - pos); if (result < 0) { bytesRemaining = 0; break; } pos += result; } }
public SpillRecord(Path indexFileName, JobConf job, Checksum crc, String expectedIndexOwner) throws IOException { final FileSystem rfs = FileSystem.getLocal(job).getRaw(); final DataInputStream in = new DataInputStream(SecureIOUtils.openForRead( new File(indexFileName.toUri().getPath()), expectedIndexOwner)); try { final long length = rfs.getFileStatus(indexFileName).getLen(); final int partitions = (int) length / MAP_OUTPUT_INDEX_RECORD_LENGTH; final int size = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH; buf = ByteBuffer.allocate(size); if (crc != null) { crc.reset(); CheckedInputStream chk = new CheckedInputStream(in, crc); IOUtils.readFully(chk, buf.array(), 0, size); if (chk.getChecksum().getValue() != in.readLong()) { throw new ChecksumException("Checksum error reading spill index: " + indexFileName, -1); } } else { IOUtils.readFully(in, buf.array(), 0, size); } entries = buf.asLongBuffer(); } finally { in.close(); } }
static synchronized void writeToIndexFile(String logLocation, TaskAttemptID currentTaskid, boolean isCleanup, Map<LogName, Long[]> lengths) throws IOException { // To ensure atomicity of updates to index file, write to temporary index // file first and then rename. File tmpIndexFile = getTmpIndexFile(currentTaskid, isCleanup); BufferedOutputStream bos = new BufferedOutputStream( SecureIOUtils.createForWrite(tmpIndexFile, 0644)); DataOutputStream dos = new DataOutputStream(bos); //the format of the index file is //LOG_DIR: <the dir where the task logs are really stored> //STDOUT: <start-offset in the stdout file> <length> //STDERR: <start-offset in the stderr file> <length> //SYSLOG: <start-offset in the syslog file> <length> dos.writeBytes(LogFileDetail.LOCATION + logLocation + "\n"); for (LogName logName : LOGS_TRACKED_BY_INDEX_FILES) { Long[] lens = lengths.get(logName); dos.writeBytes(logName.toString() + ":" + lens[0].toString() + " " + Long.toString(lens[1].longValue() - lens[0].longValue()) + "\n");} dos.close(); File indexFile = getIndexFile(currentTaskid, isCleanup); Path indexFilePath = new Path(indexFile.getAbsolutePath()); Path tmpIndexFilePath = new Path(tmpIndexFile.getAbsolutePath()); if (localFS == null) {// set localFS once localFS = FileSystem.getLocal(new Configuration()); } localFS.rename (tmpIndexFilePath, indexFilePath); }
/** * Read a log file from start to end positions. The offsets may be negative, * in which case they are relative to the end of the file. For example, * Reader(taskid, kind, 0, -1) is the entire file and * Reader(taskid, kind, -4197, -1) is the last 4196 bytes. * @param taskid the id of the task to read the log file for * @param kind the kind of log to read * @param start the offset to read from (negative is relative to tail) * @param end the offset to read upto (negative is relative to tail) * @param isCleanup whether the attempt is cleanup attempt or not * @throws IOException */ public Reader(TaskAttemptID taskid, LogName kind, long start, long end, boolean isCleanup) throws IOException { // find the right log file Map<LogName, LogFileDetail> allFilesDetails = getAllLogsFileDetails(taskid, isCleanup); LogFileDetail fileDetail = allFilesDetails.get(kind); // calculate the start and stop long size = fileDetail.length; if (start < 0) { start += size + 1; } if (end < 0) { end += size + 1; } start = Math.max(0, Math.min(start, size)); end = Math.max(0, Math.min(end, size)); start += fileDetail.start; end += fileDetail.start; bytesRemaining = end - start; String owner = obtainLogDirOwner(taskid); file = SecureIOUtils.openForRead(new File(fileDetail.location, kind.toString()), owner); // skip upto start long pos = 0; while (pos < start) { long result = file.skip(start - pos); if (result < 0) { bytesRemaining = 0; break; } pos += result; } }
/** * Creates job-acls.xml under the given directory logDir and writes * job-view-acl, queue-admins-acl, jobOwner name and queue name into this * file. * queue name is the queue to which the job was submitted to. * queue-admins-acl is the queue admins ACL of the queue to which this * job was submitted to. * @param conf job configuration * @param logDir job userlog dir * @throws IOException */ private static void writeJobACLs(JobConf conf, File logDir) throws IOException { File aclFile = new File(logDir, jobACLsFile); JobConf aclConf = new JobConf(false); // set the job view acl in aclConf String jobViewACL = conf.get(MRJobConfig.JOB_ACL_VIEW_JOB, " "); aclConf.set(MRJobConfig.JOB_ACL_VIEW_JOB, jobViewACL); // set the job queue name in aclConf String queue = conf.getQueueName(); aclConf.setQueueName(queue); // set the queue admins acl in aclConf String qACLName = toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()); String queueAdminsACL = conf.get(qACLName, " "); aclConf.set(qACLName, queueAdminsACL); // set jobOwner as user.name in aclConf String jobOwner = conf.getUser(); aclConf.set("user.name", jobOwner); FileOutputStream out; try { out = SecureIOUtils.createForWrite(aclFile, 0600); } catch (SecureIOUtils.AlreadyExistsException aee) { LOG.warn("Job ACL file already exists at " + aclFile, aee); return; } try { aclConf.writeXml(out); } finally { out.close(); } }