private void processQueuedMessages(Iterable<ReportedBlockInfo> rbis) throws IOException { for (ReportedBlockInfo rbi : rbis) { if (LOG.isDebugEnabled()) { LOG.debug("Processing previouly queued message " + rbi); } if (rbi.getReportedState() == null) { // This is a DELETE_BLOCK request DatanodeStorageInfo storageInfo = rbi.getStorageInfo(); removeStoredBlock(rbi.getBlock(), storageInfo.getDatanodeDescriptor()); } else { processAndHandleReportedBlock(rbi.getStorageInfo(), rbi.getBlock(), rbi.getReportedState(), null); } } }
@Test public void testQueues() { DatanodeDescriptor fakeDN = DFSTestUtil.getLocalDatanodeDescriptor(); DatanodeStorage storage = new DatanodeStorage("STORAGE_ID"); DatanodeStorageInfo storageInfo = new DatanodeStorageInfo(fakeDN, storage); msgs.enqueueReportedBlock(storageInfo, block1Gs1, ReplicaState.FINALIZED); msgs.enqueueReportedBlock(storageInfo, block1Gs2, ReplicaState.FINALIZED); assertEquals(2, msgs.count()); // Nothing queued yet for block 2 assertNull(msgs.takeBlockQueue(block2Gs1)); assertEquals(2, msgs.count()); Queue<ReportedBlockInfo> q = msgs.takeBlockQueue(block1Gs2DifferentInstance); assertEquals( "ReportedBlockInfo [block=blk_1_1, dn=127.0.0.1:50010, reportedState=FINALIZED]," + "ReportedBlockInfo [block=blk_1_2, dn=127.0.0.1:50010, reportedState=FINALIZED]", Joiner.on(",").join(q)); assertEquals(0, msgs.count()); // Should be null if we pull again assertNull(msgs.takeBlockQueue(block1Gs1)); assertEquals(0, msgs.count()); }
private void processQueuedMessages(Iterable<ReportedBlockInfo> rbis) throws IOException { for (ReportedBlockInfo rbi : rbis) { if (LOG.isDebugEnabled()) { LOG.debug("Processing previouly queued message " + rbi); } if (rbi.getReportedState() == null) { // This is a DELETE_BLOCK request DatanodeStorageInfo storageInfo = rbi.getStorageInfo(); removeStoredBlock(getStoredBlock(rbi.getBlock()), storageInfo.getDatanodeDescriptor()); } else { processAndHandleReportedBlock(rbi.getStorageInfo(), rbi.getBlock(), rbi.getReportedState(), null); } } }
@Test public void testQueues() { DatanodeDescriptor fakeDN = DFSTestUtil.getLocalDatanodeDescriptor(); msgs.enqueueReportedBlock(fakeDN, block1Gs1, ReplicaState.FINALIZED); msgs.enqueueReportedBlock(fakeDN, block1Gs2, ReplicaState.FINALIZED); assertEquals(2, msgs.count()); // Nothing queued yet for block 2 assertNull(msgs.takeBlockQueue(block2Gs1)); assertEquals(2, msgs.count()); Queue<ReportedBlockInfo> q = msgs.takeBlockQueue(block1Gs2DifferentInstance); assertEquals( "ReportedBlockInfo [block=blk_1_1, dn=127.0.0.1:50010, reportedState=FINALIZED]," + "ReportedBlockInfo [block=blk_1_2, dn=127.0.0.1:50010, reportedState=FINALIZED]", Joiner.on(",").join(q)); assertEquals(0, msgs.count()); // Should be null if we pull again assertNull(msgs.takeBlockQueue(block1Gs1)); assertEquals(0, msgs.count()); }
@Test public void testQueues() { DatanodeDescriptor fakeDN = DFSTestUtil.getLocalDatanodeDescriptor(); msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs1, ReplicaState.FINALIZED); msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs2, ReplicaState.FINALIZED); assertEquals(2, msgs.count()); // Nothing queued yet for block 2 assertNull(msgs.takeBlockQueue(block2Gs1)); assertEquals(2, msgs.count()); Queue<ReportedBlockInfo> q = msgs.takeBlockQueue(block1Gs2DifferentInstance); assertEquals( "ReportedBlockInfo [block=blk_1_1, dn=127.0.0.1:50010, reportedState=FINALIZED]," + "ReportedBlockInfo [block=blk_1_2, dn=127.0.0.1:50010, reportedState=FINALIZED]", Joiner.on(",").join(q)); assertEquals(0, msgs.count()); // Should be null if we pull again assertNull(msgs.takeBlockQueue(block1Gs1)); assertEquals(0, msgs.count()); }
/** * Try to process any messages that were previously queued for the given * block. This is called from FSEditLogLoader whenever a block's state * in the namespace has changed or a new block has been created. */ public void processQueuedMessagesForBlock(Block b) throws IOException { Queue<ReportedBlockInfo> queue = pendingDNMessages.takeBlockQueue(b); if (queue == null) { // Nothing to re-process return; } processQueuedMessages(queue); }
private void processQueuedMessages(Iterable<ReportedBlockInfo> rbis) throws IOException { for (ReportedBlockInfo rbi : rbis) { if (LOG.isDebugEnabled()) { LOG.debug("Processing previouly queued message " + rbi); } processAndHandleReportedBlock(rbi.getStorageInfo(), rbi.getBlock(), rbi.getReportedState(), null); } }
private void processQueuedMessages(Iterable<ReportedBlockInfo> rbis) throws IOException { for (ReportedBlockInfo rbi : rbis) { if (LOG.isDebugEnabled()) { LOG.debug("Processing previouly queued message " + rbi); } processAndHandleReportedBlock( rbi.getNode(), rbi.getBlock(), rbi.getReportedState(), null); } }
private void processQueuedMessages(Iterable<ReportedBlockInfo> rbis) throws IOException { for (ReportedBlockInfo rbi : rbis) { if (LOG.isDebugEnabled()) { LOG.debug("Processing previouly queued message " + rbi); } processAndHandleReportedBlock(rbi.getNode(), rbi.getStorageID(), rbi.getBlock(), rbi.getReportedState(), null); } }