/** * Verifies a journal request */ private void verifyJournalRequest(JournalInfo journalInfo) throws IOException { verifyLayoutVersion(journalInfo.getLayoutVersion()); String errorMsg = null; int expectedNamespaceID = namesystem.getNamespaceInfo().getNamespaceID(); if (journalInfo.getNamespaceId() != expectedNamespaceID) { errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID + " actual " + journalInfo.getNamespaceId(); LOG.warn(errorMsg); throw new UnregisteredNodeException(journalInfo); } if (!journalInfo.getClusterId().equals(namesystem.getClusterId())) { errorMsg = "Invalid clusterId in journal request - expected " + journalInfo.getClusterId() + " actual " + namesystem.getClusterId(); LOG.warn(errorMsg); throw new UnregisteredNodeException(journalInfo); } }
EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node JournalInfo journalInfo) // active name-node throws IOException { super(); this.bnRegistration = bnReg; this.journalInfo = journalInfo; InetSocketAddress bnAddress = NetUtils.createSocketAddr(bnRegistration.getAddress()); try { this.backupNode = NameNodeProxies.createNonHAProxy(new HdfsConfiguration(), bnAddress, JournalProtocol.class, UserGroupInformation.getCurrentUser(), true).getProxy(); } catch(IOException e) { Storage.LOG.error("Error connecting to: " + bnAddress, e); throw e; } this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE); this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE); }
@Override public void journal(JournalInfo journalInfo, long epoch, long firstTxnId, int numTxns, byte[] records) throws IOException { JournalRequestProto req = JournalRequestProto.newBuilder() .setJournalInfo(PBHelper.convert(journalInfo)) .setEpoch(epoch) .setFirstTxnId(firstTxnId) .setNumTxns(numTxns) .setRecords(PBHelper.getByteString(records)) .build(); try { rpcProxy.journal(NULL_CONTROLLER, req); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
@Override public void journal(JournalInfo journalInfo, long epoch, long firstTxnId, int numTxns, byte[] records) throws IOException { JournalRequestProto req = JournalRequestProto.newBuilder() .setJournalInfo(PBHelper.convert(journalInfo)) .setEpoch(epoch) .setFirstTxnId(firstTxnId) .setNumTxns(numTxns) .setRecords(PBHelperClient.getByteString(records)) .build(); try { rpcProxy.journal(NULL_CONTROLLER, req); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
@Override public void startLogSegment(JournalInfo journalInfo, long epoch, long txid) throws IOException { namesystem.checkOperation(OperationCategory.JOURNAL); verifyJournalRequest(journalInfo); getBNImage().namenodeStartedLogSegment(txid); }
@Override public void journal(JournalInfo journalInfo, long epoch, long firstTxId, int numTxns, byte[] records) throws IOException { namesystem.checkOperation(OperationCategory.JOURNAL); verifyJournalRequest(journalInfo); getBNImage().journal(firstTxId, numTxns, records); }
@Override public FenceResponse fence(JournalInfo journalInfo, long epoch, String fencerInfo) throws IOException { LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch); throw new UnsupportedOperationException( "BackupNode does not support fence"); }
@Override public void startLogSegment(JournalInfo journalInfo, long epoch, long txid) throws IOException { StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder() .setJournalInfo(PBHelper.convert(journalInfo)) .setEpoch(epoch) .setTxid(txid) .build(); try { rpcProxy.startLogSegment(NULL_CONTROLLER, req); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
@Override public FenceResponse fence(JournalInfo journalInfo, long epoch, String fencerInfo) throws IOException { FenceRequestProto req = FenceRequestProto.newBuilder().setEpoch(epoch) .setJournalInfo(PBHelper.convert(journalInfo)).build(); try { FenceResponseProto resp = rpcProxy.fence(NULL_CONTROLLER, req); return new FenceResponse(resp.getPreviousEpoch(), resp.getLastTransactionId(), resp.getInSync()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }