/** * In the case the node crashes in between downloading a log segment * and persisting the associated paxos recovery data, the log segment * will be left in its temporary location on disk. Given the paxos data, * we can check if this was indeed the case, and "roll forward" * the atomic operation. * * See the inline comments in * {@link #acceptRecovery(RequestInfo, SegmentStateProto, URL)} for more * details. * * @throws IOException if the temporary file is unable to be renamed into * place */ private void completeHalfDoneAcceptRecovery( PersistedRecoveryPaxosData paxosData) throws IOException { if (paxosData == null) { return; } long segmentId = paxosData.getSegmentState().getStartTxId(); long epoch = paxosData.getAcceptedInEpoch(); File tmp = storage.getSyncLogTemporaryFile(segmentId, epoch); if (tmp.exists()) { File dst = storage.getInProgressEditLog(segmentId); LOG.info("Rolling forward previously half-completed synchronization: " + tmp + " -> " + dst); FileUtil.replaceFile(tmp, dst); } }
/** * Retrieve the persisted data for recovering the given segment from disk. */ private PersistedRecoveryPaxosData getPersistedPaxosData(long segmentTxId) throws IOException { File f = storage.getPaxosFile(segmentTxId); if (!f.exists()) { // Default instance has no fields filled in (they're optional) return null; } InputStream in = new FileInputStream(f); try { PersistedRecoveryPaxosData ret = PersistedRecoveryPaxosData.parseDelimitedFrom(in); Preconditions.checkState(ret != null && ret.getSegmentState().getStartTxId() == segmentTxId, "Bad persisted data for segment %s: %s", segmentTxId, ret); return ret; } finally { IOUtils.closeStream(in); } }
/** * In the case the node crashes in between downloading a log segment * and persisting the associated paxos recovery data, the log segment * will be left in its temporary location on disk. Given the paxos data, * we can check if this was indeed the case, and "roll forward" * the atomic operation. * * See the inline comments in * {@link #acceptRecovery(RequestInfo, SegmentStateProto, URL)} for more * details. * * @throws IOException if the temporary file is unable to be renamed into * place */ private void completeHalfDoneAcceptRecovery( PersistedRecoveryPaxosData paxosData) throws IOException { if (paxosData == null) { return; } long segmentId = paxosData.getSegmentState().getStartTxId(); long epoch = paxosData.getAcceptedInEpoch(); File tmp = journalStorage.getSyncLogTemporaryFile(segmentId, epoch); if (tmp.exists()) { File dst = journalStorage.getInProgressEditLog(segmentId); LOG.info("Rolling forward previously half-completed synchronization: " + tmp + " -> " + dst); FileUtil.replaceFile(tmp, dst); } }
/** * Retrieve the persisted data for recovering the given segment from disk. */ private PersistedRecoveryPaxosData getPersistedPaxosData(long segmentTxId) throws IOException { File f = journalStorage.getPaxosFile(segmentTxId); if (!f.exists()) { // Default instance has no fields filled in (they're optional) return null; } InputStream in = new FileInputStream(f); try { PersistedRecoveryPaxosData ret = PersistedRecoveryPaxosData.parseDelimitedFrom(in); Preconditions.checkState(ret != null && ret.getSegmentState().getStartTxId() == segmentTxId, "Bad persisted data for segment %s: %s", segmentTxId, ret); return ret; } finally { IOUtils.closeStream(in); } }
/** * Persist data for recovering the given segment from disk. */ private void persistPaxosData(long segmentTxId, PersistedRecoveryPaxosData newData) throws IOException { File f = storage.getPaxosFile(segmentTxId); boolean success = false; AtomicFileOutputStream fos = new AtomicFileOutputStream(f); try { newData.writeDelimitedTo(fos); fos.write('\n'); // Write human-readable data after the protobuf. This is only // to assist in debugging -- it's not parsed at all. OutputStreamWriter writer = new OutputStreamWriter(fos, Charsets.UTF_8); writer.write(String.valueOf(newData)); writer.write('\n'); writer.flush(); fos.flush(); success = true; } finally { if (success) { IOUtils.closeStream(fos); } else { fos.abort(); } } }
/** * @see QJournalProtocol#prepareRecovery(RequestInfo, long) */ public synchronized PrepareRecoveryResponseProto prepareRecovery( RequestInfo reqInfo, long segmentTxId) throws IOException { checkJournalStorageFormatted(); checkRequest(reqInfo); abortCurSegment(); PrepareRecoveryResponseProto ret = new PrepareRecoveryResponseProto(); PersistedRecoveryPaxosData previouslyAccepted = getPersistedPaxosData(segmentTxId); completeHalfDoneAcceptRecovery(previouslyAccepted); SegmentStateProto segInfo = getSegmentInfo(segmentTxId); boolean hasFinalizedSegment = segInfo != null && !segInfo.getIsInProgress(); if (previouslyAccepted != null && !hasFinalizedSegment) { ret.setAcceptedInEpoch(previouslyAccepted.getAcceptedInEpoch()); ret.setSegmentState(previouslyAccepted.getSegmentState()); } else { if (segInfo != null) { ret.setSegmentState(segInfo); } } ret.setLastWriterEpoch(lastWriterEpoch.get()); if (committedTxnId.get() != HdfsConstants.INVALID_TXID) { ret.setLastCommittedTxId(committedTxnId.get()); } LOG.info("Prepared recovery for segment " + segmentTxId + ": " + ret); return ret; }
/** * Persist data for recovering the given segment from disk. */ private void persistPaxosData(long segmentTxId, PersistedRecoveryPaxosData newData) throws IOException { File f = journalStorage.getPaxosFile(segmentTxId); boolean success = false; AtomicFileOutputStream fos = new AtomicFileOutputStream(f); try { newData.writeDelimitedTo(fos); fos.write('\n'); // Write human-readable data after the protobuf. This is only // to assist in debugging -- it's not parsed at all. OutputStreamWriter writer = new OutputStreamWriter(fos, Charsets.UTF_8); writer.write(String.valueOf(newData)); writer.write('\n'); writer.flush(); fos.flush(); success = true; } finally { if (success) { IOUtils.closeStream(fos); } else { fos.abort(); } } }