EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node JournalInfo journalInfo) // active name-node throws IOException { super(); this.bnRegistration = bnReg; this.journalInfo = journalInfo; InetSocketAddress bnAddress = NetUtils.createSocketAddr(bnRegistration.getAddress()); try { this.backupNode = NameNodeProxies.createNonHAProxy(new HdfsConfiguration(), bnAddress, JournalProtocol.class, UserGroupInformation.getCurrentUser(), true).getProxy(); } catch(IOException e) { Storage.LOG.error("Error connecting to: " + bnAddress, e); throw e; } this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE); this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE); }
NamenodeCommand startCheckpoint(NamenodeRegistration backupNode, NamenodeRegistration activeNamenode) throws IOException { checkOperation(OperationCategory.CHECKPOINT); writeLock(); try { checkOperation(OperationCategory.CHECKPOINT); checkNameNodeSafeMode("Checkpoint not started"); LOG.info("Start checkpoint for " + backupNode.getAddress()); NamenodeCommand cmd = getFSImage().startCheckpoint(backupNode, activeNamenode); getEditLog().logSync(); return cmd; } finally { writeUnlock(); } }
/** * Register a Backup name-node, verifying that it belongs * to the correct namespace, and adding it to the set of * active journals if necessary. * * @param bnReg registration of the new BackupNode * @param nnReg registration of this NameNode * @throws IOException if the namespace IDs do not match */ void registerBackupNode(NamenodeRegistration bnReg, NamenodeRegistration nnReg) throws IOException { writeLock(); try { if(getFSImage().getStorage().getNamespaceID() != bnReg.getNamespaceID()) throw new IOException("Incompatible namespaceIDs: " + " Namenode namespaceID = " + getFSImage().getStorage().getNamespaceID() + "; " + bnReg.getRole() + " node namespaceID = " + bnReg.getNamespaceID()); if (bnReg.getRole() == NamenodeRole.BACKUP) { getFSImage().getEditLog().registerBackupNode( bnReg, nnReg); } } finally { writeUnlock(); } }
/** * Release (unregister) backup node. * <p> * Find and remove the backup stream corresponding to the node. * @throws IOException */ void releaseBackupNode(NamenodeRegistration registration) throws IOException { checkOperation(OperationCategory.WRITE); writeLock(); try { checkOperation(OperationCategory.WRITE); if(getFSImage().getStorage().getNamespaceID() != registration.getNamespaceID()) throw new IOException("Incompatible namespaceIDs: " + " Namenode namespaceID = " + getFSImage().getStorage().getNamespaceID() + "; " + registration.getRole() + " node namespaceID = " + registration.getNamespaceID()); getEditLog().releaseBackupStream(registration); } finally { writeUnlock(); } }
@Override // NamenodeProtocol public NamenodeCommand startCheckpoint(NamenodeRegistration registration) throws IOException { checkNNStartup(); namesystem.checkSuperuserPrivilege(); verifyRequest(registration); if(!nn.isRole(NamenodeRole.NAMENODE)) throw new IOException("Only an ACTIVE node can invoke startCheckpoint."); CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null); if (cacheEntry != null && cacheEntry.isSuccess()) { return (NamenodeCommand) cacheEntry.getPayload(); } NamenodeCommand ret = null; try { ret = namesystem.startCheckpoint(registration, nn.setRegistration()); } finally { RetryCache.setState(cacheEntry, ret != null, ret); } return ret; }
@Override // NamenodeProtocol public void endCheckpoint(NamenodeRegistration registration, CheckpointSignature sig) throws IOException { checkNNStartup(); namesystem.checkSuperuserPrivilege(); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response } boolean success = false; try { namesystem.endCheckpoint(registration, sig); success = true; } finally { RetryCache.setState(cacheEntry, success); } }
/** * Create (or find if already exists) an edit output stream, which * streams journal records (edits) to the specified backup node.<br> * * The new BackupNode will start receiving edits the next time this * NameNode's logs roll. * * @param bnReg the backup node registration information. * @param nnReg this (active) name-node registration. * @throws IOException */ synchronized void registerBackupNode( NamenodeRegistration bnReg, // backup node NamenodeRegistration nnReg) // active name-node throws IOException { if(bnReg.isRole(NamenodeRole.CHECKPOINT)) return; // checkpoint node does not stream edits JournalManager jas = findBackupJournal(bnReg); if (jas != null) { // already registered LOG.info("Backup node " + bnReg + " re-registers"); return; } LOG.info("Registering new backup node: " + bnReg); BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg); synchronized(journalSetLock) { journalSet.add(bjm, false); } }
@Test public void testConvertNamenodeRegistration() { StorageInfo info = getStorageInfo(NodeType.NAME_NODE); NamenodeRegistration reg = new NamenodeRegistration("address:999", "http:1000", info, NamenodeRole.NAMENODE); NamenodeRegistrationProto regProto = PBHelper.convert(reg); NamenodeRegistration reg2 = PBHelper.convert(regProto); assertEquals(reg.getAddress(), reg2.getAddress()); assertEquals(reg.getClusterID(), reg2.getClusterID()); assertEquals(reg.getCTime(), reg2.getCTime()); assertEquals(reg.getHttpAddress(), reg2.getHttpAddress()); assertEquals(reg.getLayoutVersion(), reg2.getLayoutVersion()); assertEquals(reg.getNamespaceID(), reg2.getNamespaceID()); assertEquals(reg.getRegistrationID(), reg2.getRegistrationID()); assertEquals(reg.getRole(), reg2.getRole()); assertEquals(reg.getVersion(), reg2.getVersion()); }
NamenodeCommand startCheckpoint(NamenodeRegistration backupNode, NamenodeRegistration activeNamenode) throws IOException { checkOperation(OperationCategory.CHECKPOINT); writeLock(); try { checkOperation(OperationCategory.CHECKPOINT); checkNameNodeSafeMode("Checkpoint not started"); LOG.info("Start checkpoint for " + backupNode.getAddress()); NamenodeCommand cmd = getFSImage().startCheckpoint(backupNode, activeNamenode, getEffectiveLayoutVersion()); getEditLog().logSync(); return cmd; } finally { writeUnlock(); } }
NamenodeCommand startCheckpoint(NamenodeRegistration backupNode, NamenodeRegistration activeNamenode) throws IOException { checkOperation(OperationCategory.CHECKPOINT); CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null); if (cacheEntry != null && cacheEntry.isSuccess()) { return (NamenodeCommand) cacheEntry.getPayload(); } writeLock(); NamenodeCommand cmd = null; try { checkOperation(OperationCategory.CHECKPOINT); checkNameNodeSafeMode("Checkpoint not started"); LOG.info("Start checkpoint for " + backupNode.getAddress()); cmd = getFSImage().startCheckpoint(backupNode, activeNamenode); getEditLog().logSync(); return cmd; } finally { writeUnlock(); RetryCache.setState(cacheEntry, cmd != null, cmd); } }
void endCheckpoint(NamenodeRegistration registration, CheckpointSignature sig) throws IOException { checkOperation(OperationCategory.CHECKPOINT); CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response } boolean success = false; readLock(); try { checkOperation(OperationCategory.CHECKPOINT); checkNameNodeSafeMode("Checkpoint not ended"); LOG.info("End checkpoint for " + registration.getAddress()); getFSImage().endCheckpoint(sig); success = true; } finally { readUnlock(); RetryCache.setState(cacheEntry, success); } }
NamenodeCommand startCheckpoint(NamenodeRegistration backupNode, NamenodeRegistration activeNamenode) throws IOException { checkOperation(OperationCategory.CHECKPOINT); CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null); if (cacheEntry != null && cacheEntry.isSuccess()) { return (NamenodeCommand) cacheEntry.getPayload(); } writeLock(); NamenodeCommand cmd = null; try { checkOperation(OperationCategory.CHECKPOINT); if (isInSafeMode()) { throw new SafeModeException("Checkpoint not started", safeMode); } LOG.info("Start checkpoint for " + backupNode.getAddress()); cmd = getFSImage().startCheckpoint(backupNode, activeNamenode); getEditLog().logSync(); return cmd; } finally { writeUnlock(); RetryCache.setState(cacheEntry, cmd != null, cmd); } }
void endCheckpoint(NamenodeRegistration registration, CheckpointSignature sig) throws IOException { CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { return; // Return previous response } checkOperation(OperationCategory.CHECKPOINT); boolean success = false; readLock(); try { checkOperation(OperationCategory.CHECKPOINT); if (isInSafeMode()) { throw new SafeModeException("Checkpoint not ended", safeMode); } LOG.info("End checkpoint for " + registration.getAddress()); getFSImage().endCheckpoint(sig); success = true; } finally { readUnlock(); RetryCache.setState(cacheEntry, success); } }
/** * Release (unregister) backup node. * <p> * Find and remove the backup stream corresponding to the node. * @param registration * @throws IOException */ void releaseBackupNode(NamenodeRegistration registration) throws IOException { checkOperation(OperationCategory.WRITE); writeLock(); try { checkOperation(OperationCategory.WRITE); if(getFSImage().getStorage().getNamespaceID() != registration.getNamespaceID()) throw new IOException("Incompatible namespaceIDs: " + " Namenode namespaceID = " + getFSImage().getStorage().getNamespaceID() + "; " + registration.getRole() + " node namespaceID = " + registration.getNamespaceID()); getEditLog().releaseBackupStream(registration); } finally { writeUnlock(); } }
/** * Create (or find if already exists) an edit output stream, which * streams journal records (edits) to the specified backup node.<br> * * The new BackupNode will start receiving edits the next time this * NameNode's logs roll. * * @param bnReg the backup node registration information. * @param nnReg this (active) name-node registration. * @throws IOException */ synchronized void registerBackupNode( NamenodeRegistration bnReg, // backup node NamenodeRegistration nnReg) // active name-node throws IOException { if(bnReg.isRole(NamenodeRole.CHECKPOINT)) return; // checkpoint node does not stream edits JournalManager jas = findBackupJournal(bnReg); if (jas != null) { // already registered LOG.info("Backup node " + bnReg + " re-registers"); return; } LOG.info("Registering new backup node: " + bnReg); BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg); journalSet.add(bjm, true); }
@Test public void testConvertNamenodeRegistration() { StorageInfo info = getStorageInfo(); NamenodeRegistration reg = new NamenodeRegistration("address:999", "http:1000", info, NamenodeRole.NAMENODE); NamenodeRegistrationProto regProto = PBHelper.convert(reg); NamenodeRegistration reg2 = PBHelper.convert(regProto); assertEquals(reg.getAddress(), reg2.getAddress()); assertEquals(reg.getClusterID(), reg2.getClusterID()); assertEquals(reg.getCTime(), reg2.getCTime()); assertEquals(reg.getHttpAddress(), reg2.getHttpAddress()); assertEquals(reg.getLayoutVersion(), reg2.getLayoutVersion()); assertEquals(reg.getNamespaceID(), reg2.getNamespaceID()); assertEquals(reg.getRegistrationID(), reg2.getRegistrationID()); assertEquals(reg.getRole(), reg2.getRole()); assertEquals(reg.getVersion(), reg2.getVersion()); }