@Override public void journal(RequestInfo reqInfo, long segmentTxId, long firstTxnId, int numTxns, byte[] records) throws IOException { JournalRequestProto req = JournalRequestProto.newBuilder() .setReqInfo(convert(reqInfo)) .setSegmentTxnId(segmentTxId) .setFirstTxnId(firstTxnId) .setNumTxns(numTxns) .setRecords(PBHelper.getByteString(records)) .build(); try { rpcProxy.journal(NULL_CONTROLLER, req); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
@Test (timeout = 10000) public void testFormatResetsCachedValues() throws Exception { journal.newEpoch(FAKE_NSINFO, 12345L); journal.startLogSegment(new RequestInfo(JID, 12345L, 1L, 0L), 1L, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); assertEquals(12345L, journal.getLastPromisedEpoch()); assertEquals(12345L, journal.getLastWriterEpoch()); assertTrue(journal.isFormatted()); // Close the journal in preparation for reformatting it. journal.close(); journal.format(FAKE_NSINFO_2); assertEquals(0, journal.getLastPromisedEpoch()); assertEquals(0, journal.getLastWriterEpoch()); assertTrue(journal.isFormatted()); }
@Override public void journal(RequestInfo reqInfo, long segmentTxId, long firstTxnId, int numTxns, byte[] records) throws IOException { JournalRequestProto req = JournalRequestProto.newBuilder() .setReqInfo(convert(reqInfo)) .setSegmentTxnId(segmentTxId) .setFirstTxnId(firstTxnId) .setNumTxns(numTxns) .setRecords(PBHelperClient.getByteString(records)) .build(); try { rpcProxy.journal(NULL_CONTROLLER, req); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
/** * @see JournalManager#purgeLogsOlderThan(long) */ public synchronized void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep) throws IOException { checkJournalStorageFormatted(); checkRequest(reqInfo); journalStorage.purgeDataOlderThan(minTxIdToKeep); if (minTxIdToKeep == FSEditLog.PURGE_ALL_TXID) { // When trying to remove all the segments, reset // the committed transaction ID too. committedTxnId.set(0, true); minTxid = 0; } else { minTxid = minTxIdToKeep; } if (imageStorage.isFormatted()) { imageStorage.purgeDataOlderThan(minTxIdToKeep == 0 ? -1 : minTxIdToKeep); } }
@Test (timeout = 10000) public void testFormatResetsCachedValues() throws Exception { journal.newEpoch(FAKE_NSINFO, 12345L); journal.startLogSegment(new RequestInfo(JID, 12345L, 1L, 0L), 1L); assertEquals(12345L, journal.getLastPromisedEpoch()); assertEquals(12345L, journal.getLastWriterEpoch()); assertTrue(journal.isFormatted()); // Close the journal in preparation for reformatting it. journal.close(); journal.format(FAKE_NSINFO_2); assertEquals(0, journal.getLastPromisedEpoch()); assertEquals(0, journal.getLastWriterEpoch()); assertTrue(journal.isFormatted()); }
/** * Ensure that the given request is coming from the correct writer and in-order. * @param reqInfo the request info * @throws IOException if the request is invalid. */ private synchronized void checkRequest(RequestInfo reqInfo) throws IOException { // Invariant 25 from ZAB paper if (reqInfo.getEpoch() < lastPromisedEpoch.get()) { throw new IOException("IPC's epoch " + reqInfo.getEpoch() + " is less than the last promised epoch " + lastPromisedEpoch.get()); } else if (reqInfo.getEpoch() > lastPromisedEpoch.get()) { // A newer client has arrived. Fence any previous writers by updating // the promise. updateLastPromisedEpoch(reqInfo.getEpoch()); } // Ensure that the IPCs are arriving in-order as expected. checkSync(reqInfo.getIpcSerialNumber() > currentEpochIpcSerial, "IPC serial %s from client %s was not higher than prior highest " + "IPC serial %s", reqInfo.getIpcSerialNumber(), Server.getRemoteIp(), currentEpochIpcSerial); currentEpochIpcSerial = reqInfo.getIpcSerialNumber(); if (reqInfo.hasCommittedTxId()) { Preconditions.checkArgument( reqInfo.getCommittedTxId() >= committedTxnId.get(), "Client trying to move committed txid backward from " + committedTxnId.get() + " to " + reqInfo.getCommittedTxId()); committedTxnId.set(reqInfo.getCommittedTxId()); } }
private synchronized void checkWriteRequest(RequestInfo reqInfo) throws IOException { checkRequest(reqInfo); if (reqInfo.getEpoch() != lastWriterEpoch.get()) { throw new IOException("IPC's epoch " + reqInfo.getEpoch() + " is not the current writer epoch " + lastWriterEpoch.get()); } }
/** * @see JournalManager#purgeLogsOlderThan(long) */ public synchronized void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep) throws IOException { checkFormatted(); checkRequest(reqInfo); storage.purgeDataOlderThan(minTxIdToKeep); }
/** * Synchronize a log segment from another JournalNode. The log is * downloaded from the provided URL into a temporary location on disk, * which is named based on the current request's epoch. * * @return the temporary location of the downloaded file */ private File syncLog(RequestInfo reqInfo, final SegmentStateProto segment, final URL url) throws IOException { final File tmpFile = storage.getSyncLogTemporaryFile( segment.getStartTxId(), reqInfo.getEpoch()); final List<File> localPaths = ImmutableList.of(tmpFile); LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) + " from " + url); SecurityUtil.doAsLoginUser( new PrivilegedExceptionAction<Void>() { @Override public Void run() throws IOException { // We may have lost our ticket since last checkpoint, log in again, just in case if (UserGroupInformation.isSecurityEnabled()) { UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab(); } boolean success = false; try { TransferFsImage.doGetUrl(url, localPaths, storage, true); assert tmpFile.exists(); success = true; } finally { if (!success) { if (!tmpFile.delete()) { LOG.warn("Failed to delete temporary file " + tmpFile); } } } return null; } }); return tmpFile; }
@Override public void journal(RequestInfo reqInfo, long segmentTxId, long firstTxnId, int numTxns, byte[] records) throws IOException { jn.getOrCreateJournal(reqInfo.getJournalId()) .journal(reqInfo, segmentTxId, firstTxnId, numTxns, records); }
private RequestInfo convert( QJournalProtocolProtos.RequestInfoProto reqInfo) { return new RequestInfo( reqInfo.getJournalId().getIdentifier(), reqInfo.getEpoch(), reqInfo.getIpcSerialNumber(), reqInfo.hasCommittedTxId() ? reqInfo.getCommittedTxId() : HdfsConstants.INVALID_TXID); }
@Override public void heartbeat(RequestInfo reqInfo) throws IOException { try { rpcProxy.heartbeat(NULL_CONTROLLER, HeartbeatRequestProto.newBuilder() .setReqInfo(convert(reqInfo)) .build()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
private QJournalProtocolProtos.RequestInfoProto convert( RequestInfo reqInfo) { RequestInfoProto.Builder builder = RequestInfoProto.newBuilder() .setJournalId(convertJournalId(reqInfo.getJournalId())) .setEpoch(reqInfo.getEpoch()) .setIpcSerialNumber(reqInfo.getIpcSerialNumber()); if (reqInfo.hasCommittedTxId()) { builder.setCommittedTxId(reqInfo.getCommittedTxId()); } return builder.build(); }
@Override public void startLogSegment(RequestInfo reqInfo, long txid, int layoutVersion) throws IOException { StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder() .setReqInfo(convert(reqInfo)) .setTxid(txid).setLayoutVersion(layoutVersion) .build(); try { rpcProxy.startLogSegment(NULL_CONTROLLER, req); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
@Override public void finalizeLogSegment(RequestInfo reqInfo, long startTxId, long endTxId) throws IOException { FinalizeLogSegmentRequestProto req = FinalizeLogSegmentRequestProto.newBuilder() .setReqInfo(convert(reqInfo)) .setStartTxId(startTxId) .setEndTxId(endTxId) .build(); try { rpcProxy.finalizeLogSegment(NULL_CONTROLLER, req); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
@Override public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep) throws IOException { PurgeLogsRequestProto req = PurgeLogsRequestProto.newBuilder() .setReqInfo(convert(reqInfo)) .setMinTxIdToKeep(minTxIdToKeep) .build(); try { rpcProxy.purgeLogs(NULL_CONTROLLER, req); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
@Override public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo, long segmentTxId) throws IOException { try { return rpcProxy.prepareRecovery(NULL_CONTROLLER, PrepareRecoveryRequestProto.newBuilder() .setReqInfo(convert(reqInfo)) .setSegmentTxId(segmentTxId) .build()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
@Override public void acceptRecovery(RequestInfo reqInfo, SegmentStateProto stateToAccept, URL fromUrl) throws IOException { try { rpcProxy.acceptRecovery(NULL_CONTROLLER, AcceptRecoveryRequestProto.newBuilder() .setReqInfo(convert(reqInfo)) .setStateToAccept(stateToAccept) .setFromURL(fromUrl.toExternalForm()) .build()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
@Test public void testSimpleCall() throws Exception { ch.sendEdits(1, 1, 3, FAKE_DATA).get(); Mockito.verify(mockProxy).journal(Mockito.<RequestInfo>any(), Mockito.eq(1L), Mockito.eq(1L), Mockito.eq(3), Mockito.same(FAKE_DATA)); }
/** * Test that, once the queue eclipses the configure size limit, * calls to journal more data are rejected. */ @Test public void testQueueLimiting() throws Exception { // Block the underlying fake proxy from actually completing any calls. DelayAnswer delayer = new DelayAnswer(LOG); Mockito.doAnswer(delayer).when(mockProxy).journal( Mockito.<RequestInfo>any(), Mockito.eq(1L), Mockito.eq(1L), Mockito.eq(1), Mockito.same(FAKE_DATA)); // Queue up the maximum number of calls. int numToQueue = LIMIT_QUEUE_SIZE_BYTES / FAKE_DATA.length; for (int i = 1; i <= numToQueue; i++) { ch.sendEdits(1L, (long)i, 1, FAKE_DATA); } // The accounting should show the correct total number queued. assertEquals(LIMIT_QUEUE_SIZE_BYTES, ch.getQueuedEditsSize()); // Trying to queue any more should fail. try { ch.sendEdits(1L, numToQueue + 1, 1, FAKE_DATA).get(1, TimeUnit.SECONDS); fail("Did not fail to queue more calls after queue was full"); } catch (ExecutionException ee) { if (!(ee.getCause() instanceof LoggerTooFarBehindException)) { throw ee; } } delayer.proceed(); // After we allow it to proceeed, it should chug through the original queue GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { return ch.getQueuedEditsSize() == 0; } }, 10, 1000); }
@Test (timeout = 10000) public void testMaintainCommittedTxId() throws Exception { journal.newEpoch(FAKE_NSINFO, 1); journal.startLogSegment(makeRI(1), 1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); // Send txids 1-3, with a request indicating only 0 committed journal.journal(new RequestInfo(JID, 1, 2, 0), 1, 1, 3, QJMTestUtil.createTxnData(1, 3)); assertEquals(0, journal.getCommittedTxnIdForTests()); // Send 4-6, with request indicating that through 3 is committed. journal.journal(new RequestInfo(JID, 1, 3, 3), 1, 4, 3, QJMTestUtil.createTxnData(4, 6)); assertEquals(3, journal.getCommittedTxnIdForTests()); }
private RequestInfo convert( QJournalProtocolProtos.RequestInfoProto reqInfo) { return new RequestInfo( reqInfo.getJournalId().getIdentifier(), reqInfo.getEpoch(), reqInfo.getIpcSerialNumber(), reqInfo.hasCommittedTxId() ? reqInfo.getCommittedTxId() : HdfsServerConstants.INVALID_TXID); }