/** * @param lInfo * @param taskLogFileDetails * @param updatedTaskLogFileDetails * @param logName */ private void copyOriginalIndexFileInfo(JVMInfo lInfo, Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails, Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails, LogName logName) { if (TaskLog.LOGS_TRACKED_BY_INDEX_FILES.contains(logName)) { for (Task task : lInfo.getAllAttempts()) { if (!updatedTaskLogFileDetails.containsKey(task)) { updatedTaskLogFileDetails.put(task, new HashMap<LogName, LogFileDetail>()); } updatedTaskLogFileDetails.get(task).put(logName, taskLogFileDetails.get(task).get(logName)); } } }
/** * Check if truncation of logs is needed for the given jvmInfo. If all the * tasks that ran in a JVM are within the log-limits, then truncation is not * needed. Otherwise it is needed. * * @param lInfo * @param taskLogFileDetails * @param logName * @return true if truncation is needed, false otherwise */ private boolean isTruncationNeeded(JVMInfo lInfo, Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails, LogName logName) { boolean truncationNeeded = false; LogFileDetail logFileDetail = null; for (Task task : lInfo.getAllAttempts()) { long taskRetainSize = (task.isMapTask() ? mapRetainSize : reduceRetainSize); Map<LogName, LogFileDetail> allLogsFileDetails = taskLogFileDetails.get(task); logFileDetail = allLogsFileDetails.get(logName); if (taskRetainSize > MINIMUM_RETAIN_SIZE_FOR_TRUNCATION && logFileDetail.length > taskRetainSize) { truncationNeeded = true; break; } } return truncationNeeded; }
/** * @param lInfo * @param taskLogFileDetails * @param updatedTaskLogFileDetails * @param logName */ private void revertIndexFileInfo(PerJVMInfo lInfo, Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails, Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails, LogName logName) { if (TaskLog.LOGS_TRACKED_BY_INDEX_FILES.contains(logName)) { for (Task task : lInfo.allAttempts) { if (!updatedTaskLogFileDetails.containsKey(task)) { updatedTaskLogFileDetails.put(task, new HashMap<LogName, LogFileDetail>()); } updatedTaskLogFileDetails.get(task).put(logName, taskLogFileDetails.get(task).get(logName)); } } }
/** * Check if truncation of logs is needed for the given jvmInfo. If all the * tasks that ran in a JVM are within the log-limits, then truncation is not * needed. Otherwise it is needed. * * @param lInfo * @param taskLogFileDetails * @param logName * @return true if truncation is needed, false otherwise */ private boolean isTruncationNeeded(PerJVMInfo lInfo, Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails, LogName logName) { boolean truncationNeeded = false; LogFileDetail logFileDetail = null; for (Task task : lInfo.allAttempts) { long taskRetainSize = (task.isMapTask() ? mapRetainSize : reduceRetainSize); Map<LogName, LogFileDetail> allLogsFileDetails = taskLogFileDetails.get(task); logFileDetail = allLogsFileDetails.get(logName); if (taskRetainSize > MINIMUM_RETAIN_SIZE_FOR_TRUNCATION && logFileDetail.length > taskRetainSize) { truncationNeeded = true; break; } } return truncationNeeded; }
/** * Check the log file sizes generated by the attempts that ran in a * particular JVM * @param lInfo * @return * @throws IOException */ public boolean shouldTruncateLogs(JVMInfo lInfo) throws IOException { // Read the log-file details for all the attempts that ran in this JVM Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails; try { taskLogFileDetails = getAllLogsFileDetails(lInfo.getAllAttempts()); } catch (IOException e) { LOG.warn( "Exception in truncateLogs while getting allLogsFileDetails()." + " Ignoring the truncation of logs of this process.", e); return false; } File attemptLogDir = lInfo.getLogLocation(); for (LogName logName : LogName.values()) { File logFile = new File(attemptLogDir, logName.toString()); if (logFile.exists()) { if(!isTruncationNeeded(lInfo, taskLogFileDetails, logName)) { LOG.debug("Truncation is not needed for " + logFile.getAbsolutePath()); } else return true; } } return false; }
/** * Get the logFileDetails of all the list of attempts passed. * @param allAttempts the attempts we are interested in * * @return a map of task to the log-file detail * @throws IOException */ private Map<Task, Map<LogName, LogFileDetail>> getAllLogsFileDetails( final List<Task> allAttempts) throws IOException { Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails = new HashMap<Task, Map<LogName, LogFileDetail>>(); for (Task task : allAttempts) { Map<LogName, LogFileDetail> allLogsFileDetails; allLogsFileDetails = TaskLog.getAllLogsFileDetails(task.getTaskID(), task.isTaskCleanupTask()); taskLogFileDetails.put(task, allLogsFileDetails); } return taskLogFileDetails; }
/** * Truncation of logs is done. Now sync the index files to reflect the * truncated sizes. * * @param firstAttempt * @param updatedTaskLogFileDetails */ private void updateIndicesAfterLogTruncation(String location, Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails) { for (Entry<Task, Map<LogName, LogFileDetail>> entry : updatedTaskLogFileDetails.entrySet()) { Task task = entry.getKey(); Map<LogName, LogFileDetail> logFileDetails = entry.getValue(); Map<LogName, Long[]> logLengths = new HashMap<LogName, Long[]>(); // set current and previous lengths for (LogName logName : TaskLog.LOGS_TRACKED_BY_INDEX_FILES) { logLengths.put(logName, new Long[] { Long.valueOf(0L), Long.valueOf(0L) }); LogFileDetail lfd = logFileDetails.get(logName); if (lfd != null) { // Set previous lengths logLengths.get(logName)[0] = Long.valueOf(lfd.start); // Set current lengths logLengths.get(logName)[1] = Long.valueOf(lfd.start + lfd.length); } } try { TaskLog.writeToIndexFile(location, task.getTaskID(), task.isTaskCleanupTask(), logLengths); } catch (IOException ioe) { LOG.warn("Exception encountered while updating index file of task " + task.getTaskID() + ". Ignoring and continuing with other tasks.", ioe); } } }
/** * Get the logFileDetails of all the list of attempts passed. * * @param lInfo * @return a map of task to the log-file detail * @throws IOException */ private Map<Task, Map<LogName, LogFileDetail>> getAllLogsFileDetails( final List<Task> allAttempts) throws IOException { Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails = new HashMap<Task, Map<LogName, LogFileDetail>>(); for (Task task : allAttempts) { Map<LogName, LogFileDetail> allLogsFileDetails; allLogsFileDetails = TaskLog.getAllLogsFileDetails(task.getTaskID(), task.isTaskCleanupTask()); taskLogFileDetails.put(task, allLogsFileDetails); } return taskLogFileDetails; }
/** * Truncation of logs is done. Now sync the index files to reflect the * truncated sizes. * * @param firstAttempt * @param updatedTaskLogFileDetails */ private void updateIndicesAfterLogTruncation(TaskAttemptID firstAttempt, Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails) { for (Entry<Task, Map<LogName, LogFileDetail>> entry : updatedTaskLogFileDetails.entrySet()) { Task task = entry.getKey(); Map<LogName, LogFileDetail> logFileDetails = entry.getValue(); Map<LogName, Long[]> logLengths = new HashMap<LogName, Long[]>(); // set current and previous lengths for (LogName logName : TaskLog.LOGS_TRACKED_BY_INDEX_FILES) { logLengths.put(logName, new Long[] { Long.valueOf(0L), Long.valueOf(0L) }); LogFileDetail lfd = logFileDetails.get(logName); if (lfd != null) { // Set previous lengths logLengths.get(logName)[0] = Long.valueOf(lfd.start); // Set current lengths logLengths.get(logName)[1] = Long.valueOf(lfd.start + lfd.length); } } try { TaskLog.writeToIndexFile(firstAttempt, task.getTaskID(), task.isTaskCleanupTask(), logLengths); } catch (IOException ioe) { LOG.warn("Exception in updateIndicesAfterLogTruncation : " + StringUtils.stringifyException(ioe)); LOG.warn("Exception encountered while updating index file of task " + task.getTaskID() + ". Ignoring and continuing with other tasks."); } } }
/** * Check the log file sizes generated by the attempts that ran in a * particular JVM * @param lInfo * @return is truncation required? * @throws IOException */ public boolean shouldTruncateLogs(JVMInfo lInfo) throws IOException { // Read the log-file details for all the attempts that ran in this JVM Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails; try { taskLogFileDetails = getAllLogsFileDetails(lInfo.getAllAttempts()); } catch (IOException e) { LOG.warn( "Exception in truncateLogs while getting allLogsFileDetails()." + " Ignoring the truncation of logs of this process.", e); return false; } File attemptLogDir = lInfo.getLogLocation(); for (LogName logName : LogName.values()) { File logFile = new File(attemptLogDir, logName.toString()); if (logFile.exists()) { if(!isTruncationNeeded(lInfo, taskLogFileDetails, logName)) { LOG.debug("Truncation is not needed for " + logFile.getAbsolutePath()); } else return true; } } return false; }
/** * Truncate the log file of this task-attempt so that only the last retainSize * many bytes of each log file is retained and the log file is reduced in size * saving disk space. * * @param taskID Task whose logs need to be truncated * @param oldLogFileDetail contains the original log details for the attempt * @param taskRetainSize retain-size * @param tmpFileOutputStream New log file to write to. Already opened in append * mode. * @param logFileInputStream Original log file to read from. * @return * @throws IOException */ private LogFileDetail truncateALogFileOfAnAttempt( final TaskAttemptID taskID, final LogFileDetail oldLogFileDetail, final long taskRetainSize, final FileOutputStream tmpFileOutputStream, final FileInputStream logFileInputStream, final LogName logName) throws IOException { LogFileDetail newLogFileDetail = new LogFileDetail(); long logSize = 0; // ///////////// Truncate log file /////////////////////// // New location of log file is same as the old newLogFileDetail.location = oldLogFileDetail.location; if (taskRetainSize > MINIMUM_RETAIN_SIZE_FOR_TRUNCATION && oldLogFileDetail.length > taskRetainSize) { LOG.info("Truncating " + logName + " logs for " + taskID + " from " + oldLogFileDetail.length + "bytes to " + taskRetainSize + "bytes."); logSize = taskRetainSize; byte[] truncatedMsgBytes = TRUNCATED_MSG.getBytes(); tmpFileOutputStream.write(truncatedMsgBytes); newLogFileDetail.length += truncatedMsgBytes.length; } else { LOG.debug("No truncation needed for " + logName + " logs for " + taskID + " length is " + oldLogFileDetail.length + " retain size " + taskRetainSize + "bytes."); logSize = oldLogFileDetail.length; } long bytesSkipped = logFileInputStream.skip(oldLogFileDetail.length - logSize); if (bytesSkipped != oldLogFileDetail.length - logSize) { throw new IOException("Erroneously skipped " + bytesSkipped + " instead of the expected " + (oldLogFileDetail.length - logSize) + " while truncating " + logName + " logs for " + taskID ); } long alreadyRead = 0; while (alreadyRead < logSize) { byte tmpBuf[]; // Temporary buffer to read logs if (logSize - alreadyRead >= DEFAULT_BUFFER_SIZE) { tmpBuf = new byte[DEFAULT_BUFFER_SIZE]; } else { tmpBuf = new byte[(int) (logSize - alreadyRead)]; } int bytesRead = logFileInputStream.read(tmpBuf); if (bytesRead < 0) { break; } else { alreadyRead += bytesRead; } tmpFileOutputStream.write(tmpBuf); } newLogFileDetail.length += logSize; // ////// End of truncating log file /////////////////////// return newLogFileDetail; }
/** * Truncate the log file of this task-attempt so that only the last retainSize * many bytes of each log file is retained and the log file is reduced in size * saving disk space. * * @param taskID Task whose logs need to be truncated * @param oldLogFileDetail contains the original log details for the attempt * @param taskRetainSize retain-size * @param tmpFileWriter New log file to write to. Already opened in append * mode. * @param logFileReader Original log file to read from. * @return * @throws IOException */ private LogFileDetail truncateALogFileOfAnAttempt( final TaskAttemptID taskID, final LogFileDetail oldLogFileDetail, final long taskRetainSize, final FileWriter tmpFileWriter, final FileReader logFileReader) throws IOException { LogFileDetail newLogFileDetail = new LogFileDetail(); // ///////////// Truncate log file /////////////////////// // New location of log file is same as the old newLogFileDetail.location = oldLogFileDetail.location; if (taskRetainSize > MINIMUM_RETAIN_SIZE_FOR_TRUNCATION && oldLogFileDetail.length > taskRetainSize) { LOG.info("Truncating logs for " + taskID + " from " + oldLogFileDetail.length + "bytes to " + taskRetainSize + "bytes."); newLogFileDetail.length = taskRetainSize; } else { LOG.info("No truncation needed for " + taskID + " length is " + oldLogFileDetail.length + " retain size " + taskRetainSize + "bytes."); newLogFileDetail.length = oldLogFileDetail.length; } long charsSkipped = logFileReader.skip(oldLogFileDetail.length - newLogFileDetail.length); if (charsSkipped != oldLogFileDetail.length - newLogFileDetail.length) { throw new IOException("Erroneously skipped " + charsSkipped + " instead of the expected " + (oldLogFileDetail.length - newLogFileDetail.length)); } long alreadyRead = 0; while (alreadyRead < newLogFileDetail.length) { char tmpBuf[]; // Temporary buffer to read logs if (newLogFileDetail.length - alreadyRead >= DEFAULT_BUFFER_SIZE) { tmpBuf = new char[DEFAULT_BUFFER_SIZE]; } else { tmpBuf = new char[(int) (newLogFileDetail.length - alreadyRead)]; } int bytesRead = logFileReader.read(tmpBuf); if (bytesRead < 0) { break; } else { alreadyRead += bytesRead; } tmpFileWriter.write(tmpBuf); } // ////// End of truncating log file /////////////////////// return newLogFileDetail; }