/** Create block file and corresponding metafile in a rondom volume */ private long createInlineBlockFile(int checksumType) throws IOException { FSVolume[] volumes = data.volumes.getVolumes(); int index = rand.nextInt(volumes.length - 1); long id = getFreeBlockId(); File finalizedDir = volumes[index].getNamespaceSlice(nsid).getCurrentDir(); int checksumSize = DataChecksum.getChecksumSizeByType(checksumType); String inlineFileName = getInlineBlockFileName(id, checksumType, checksumSize); File file = new File(finalizedDir, inlineFileName); assertTrue(file.createNewFile()); PrintWriter pw = new PrintWriter(file); int desiredLength = (int)BlockInlineChecksumReader.getFileLengthFromBlockSize(1, 1, checksumSize); for(int i = 0; i < desiredLength; i++) { pw.write(Character.getNumericValue('0')); } pw.close(); LOG.info("Created block file " + file.getName()); return id; }
static DiskScanInfo getSeparateFilesLayoutScanInfo(long blockId, FileInfo blockFileInfo, FileInfo metaFileInfo, FSVolume vol) { File metaFile = null; long genStamp = Block.GRANDFATHER_GENERATION_STAMP; if (metaFileInfo != null) { metaFile = metaFileInfo.file; genStamp = metaFileInfo.getStamp; } File blockFile = null; long fileLength = 0; if (blockFileInfo != null) { blockFile = blockFileInfo.file; fileLength = blockFile.length(); } return new DiskScanInfo(SEPARATE_FILES_LAYOUT, blockId, blockFile, metaFile, vol, fileLength, genStamp); }
static DiskScanInfo getInlineFilesLayoutScanInfo(long blockId, FileInfo singleFileInfo, FSVolume vol) { String[] groundSeparated = StringUtils .split(singleFileInfo.fileName, '_'); if (groundSeparated.length != 6) { throw new IllegalStateException("FileName \"" + singleFileInfo.fileName + "\" doesn't " + "reflect new layout format!"); } int checksumType = Integer.parseInt(groundSeparated[4]); int bytesPerChecksum = Integer.parseInt(groundSeparated[5]); long fileLength = BlockInlineChecksumReader.getBlockSizeFromFileLength( singleFileInfo.file.length(), checksumType, bytesPerChecksum); return new DiskScanInfo(INLINE_CHECKSUM_LAYOUT, blockId, singleFileInfo.file, singleFileInfo.file, vol, fileLength, singleFileInfo.getStamp); }
/** * Returned information is a JSON representation of a map with * volume name as the key and value is a map of volume attribute * keys to its values */ @Override // DataNodeMXBean public String getVolumeInfo() { final Map<String, Object> info = new HashMap<String, Object>(); try { FSVolume[] volumes = ((FSDataset)this.data).volumes.getVolumes(); for (FSVolume v : volumes) { final Map<String, Object> innerInfo = new HashMap<String, Object>(); innerInfo.put("usedSpace", v.getDfsUsed()); innerInfo.put("freeSpace", v.getAvailable()); innerInfo.put("reservedSpace", v.getReserved()); info.put(v.getDir().toString(), innerInfo); } return JSON.toString(info); } catch (IOException e) { LOG.info("Cannot get volume info.", e); return "ERROR"; } }
synchronized int removeUnhealthyVolumes(Collection<FSVolume> failed_vols, FSDatasetDeltaInterface datasetDelta) { int removed_blocks = 0; Iterator<Entry<Block, DatanodeBlockInfo>> dbi = blockInfoMap.entrySet() .iterator(); while (dbi.hasNext()) { Entry<Block, DatanodeBlockInfo> entry = dbi.next(); for (FSVolume v : failed_vols) { if (entry.getValue().getBlockDataFile().getVolume() == v) { DataNode.LOG.warn("removing block " + entry.getKey().getBlockId() + " from vol " + v.toString() + ", form namespace: " + namespaceId); dbi.remove(); if (datasetDelta != null) { datasetDelta.removeBlock(namespaceId, entry.getKey()); } removed_blocks++; break; } } } return removed_blocks; }
/** * get a list of block info with CRC information per FS volume. * * @param volumes * Volumes are interested in get the list * @return a map from FSVolume to buckets -> (Block -> DatanodeBlockInfo) in * the volume and has CRC information. The first level value is a * list, each one on the list is for a bucket. The order on the list * is the bucket ID. The third level is a map from block to datablock * info. */ Map<FSVolume, List<Map<Block, DatanodeBlockInfo>>> getBlockCrcPerVolume( List<FSVolume> volumes) { Map<FSVolume, List<Map<Block, DatanodeBlockInfo>>> retMap = new HashMap<FSVolume, List<Map<Block, DatanodeBlockInfo>>>(); for (FSVolume volume : volumes) { List<Map<Block, DatanodeBlockInfo>> newSubMap = new ArrayList<Map<Block, DatanodeBlockInfo>>( numBucket); for (int i = 0; i < numBucket; i++) { newSubMap.add(new HashMap<Block, DatanodeBlockInfo>()); } retMap.put(volume, newSubMap); } for (BlockBucket bb : blockBuckets) { bb.getBlockCrcPerVolume(retMap); } return retMap; }
private void setup(FSDataset dataSet) throws IOException { // setup replicas map ReplicasMap replicasMap = dataSet.volumeMap; FSVolume vol = dataSet.volumes.getNextVolume(0); ReplicaInfo replicaInfo = new FinalizedReplica( blocks[FINALIZED], vol, vol.getDir()); replicasMap.add(replicaInfo); replicaInfo.getBlockFile().createNewFile(); replicaInfo.getMetaFile().createNewFile(); replicasMap.add(new ReplicaInPipeline( blocks[TEMPORARY].getBlockId(), blocks[TEMPORARY].getGenerationStamp(), vol, vol.createTmpFile(blocks[TEMPORARY]).getParentFile())); replicaInfo = new ReplicaBeingWritten(blocks[RBW], vol, vol.createRbwFile(blocks[RBW]).getParentFile(), null); replicasMap.add(replicaInfo); replicaInfo.getBlockFile().createNewFile(); replicaInfo.getMetaFile().createNewFile(); replicasMap.add(new ReplicaWaitingToBeRecovered(blocks[RWR], vol, vol.createRbwFile(blocks[RWR]).getParentFile())); replicasMap.add(new ReplicaUnderRecovery( new FinalizedReplica(blocks[RUR], vol, vol.getDir()), 2007)); }
synchronized int removeUnhealthyVolumes(Collection<FSVolume> failed_vols) { int removed_blocks = 0; for (Integer namespaceId : namespaceMap.keySet()) { Map<Block, DatanodeBlockInfo> m = namespaceMap.get(namespaceId); Iterator<Entry<Block, DatanodeBlockInfo>> dbi = m.entrySet().iterator(); while (dbi.hasNext()) { Entry<Block, DatanodeBlockInfo> entry = dbi.next(); for (FSVolume v : failed_vols) { if (entry.getValue().getVolume() == v) { DataNode.LOG.warn("removing block " + entry.getKey().getBlockId() + " from vol " + v.toString() + ", form namespace: " + namespaceId); dbi.remove(); removed_blocks++; break; } } } } return removed_blocks; }
@BeforeClass public static void setUpCluster() { LOG.info("setting up!"); Configuration CONF = new Configuration(); CONF.setLong("dfs.block.size", 100); CONF.setInt("io.bytes.per.checksum", 1); CONF.setLong("dfs.heartbeat.interval", 1L); CONF.setInt("dfs.datanode.directoryscan.interval", 1000); try{ cluster = new MiniDFSCluster(CONF, 1, true, null); cluster.waitActive(); dn = cluster.getDataNodes().get(0); nsid = dn.getAllNamespaces()[0]; scanner = dn.directoryScanner; data = (FSDataset)dn.data; Field f = DirectoryScanner.class.getDeclaredField("delta"); f.setAccessible(true); delta = (FSDatasetDelta)f.get(scanner); fs = cluster.getFileSystem(); List<File> volumes = new ArrayList<File>(); for(FSVolume vol : data.volumes.getVolumes()) { volumes.add(vol.getDir()); } data.asyncDiskService = new TestDirectoryScanner.FSDatasetAsyncDiscServiceMock( volumes.toArray(new File[volumes.size()]), CONF); } catch (Exception e) { e.printStackTrace(); fail("setup failed"); } }
/** Create a block file in a random volume*/ private long createBlockFile() throws IOException { FSVolume[] volumes = fds.volumes.getVolumes(); int index = rand.nextInt(volumes.length - 1); long id = getFreeBlockId(); File finalizedDir = volumes[index].getNamespaceSlice(nsid).getCurrentDir(); File file = new File(finalizedDir, getBlockFile(id)); if (file.createNewFile()) { LOG.info("Created block file " + file.getName()); } return id; }
/** Create a metafile in a random volume*/ private long createMetaFile() throws IOException { FSVolume[] volumes = fds.volumes.getVolumes(); int index = rand.nextInt(volumes.length - 1); long id = getFreeBlockId(); File finalizedDir = volumes[index].getNamespaceSlice(nsid).getCurrentDir(); File file = new File(finalizedDir, getMetaFile(id)); if (file.createNewFile()) { LOG.info("Created metafile " + file.getName()); } return id; }
/** Create block file and corresponding metafile in a rondom volume */ private long createBlockMetaFile() throws IOException { FSVolume[] volumes = fds.volumes.getVolumes(); int index = rand.nextInt(volumes.length - 1); long id = getFreeBlockId(); File finalizedDir = volumes[index].getNamespaceSlice(nsid).getCurrentDir(); File file = new File(finalizedDir, getBlockFile(id)); if (file.createNewFile()) { LOG.info("Created block file " + file.getName()); // Create files with same prefix as block file but extension names // such that during sorting, these files appear around meta file // to test how DirectoryScanner handles extraneous files String name1 = file.getAbsolutePath() + ".l"; String name2 = file.getAbsolutePath() + ".n"; file = new File(name1); if (file.createNewFile()) { LOG.info("Created extraneous file " + name1); } file = new File(name2); if (file.createNewFile()) { LOG.info("Created extraneous file " + name2); } file = new File(finalizedDir, getMetaFile(id)); if (file.createNewFile()) { LOG.info("Created metafile " + file.getName()); } } return id; }
DatanodeBlockInfo(FSVolume vol, File file, long finalizedSize, boolean visible, boolean inlineChecksum, int checksumType, int bytesPerChecksum, boolean blockCrcValid, int blockCrc) { this.finalizedSize = finalizedSize; detached = false; this.visible = visible; this.inlineChecksum = inlineChecksum; this.checksumType = checksumType; this.bytesPerChecksum = bytesPerChecksum; this.blockCrcValid = blockCrcValid; this.blockCrc = blockCrc; this.block = null; this.blockDataFile = new BlockDataFile(file, vol); }
DiskScanInfo(int layout, long blockId, File blockFile, File metaFile, FSVolume vol, long fileLength, long genStamp) { this.blockId = blockId; this.metaFile = metaFile; this.blockFile = blockFile; this.volume = vol; this.fileLength = fileLength; this.genStamp = genStamp; this.layout = layout; }
synchronized int removeUnhealthyVolumes(Collection<FSVolume> failed_vols) { int removed_blocks = 0; for (Integer namespaceId : nsMap.keySet()) { removed_blocks += nsMap.get(namespaceId).removeUnhealthyVolumes( failed_vols, datasetDelta); } return removed_blocks; }
synchronized void getBlockCrcPerVolume( Map<FSVolume, List<Map<Block, DatanodeBlockInfo>>> fsVolumeMap) { for (Map.Entry<Block, DatanodeBlockInfo> entry: blockInfoMap.entrySet()) { Block block = entry.getKey(); DatanodeBlockInfo binfo = entry.getValue(); if (fsVolumeMap.containsKey(binfo.getBlockDataFile().getVolume()) && binfo.hasBlockCrcInfo()) { fsVolumeMap.get(binfo.getBlockDataFile().getVolume()).get(bucketId) .put(block, binfo); } } }
int removeUnhealthyVolumes(Collection<FSVolume> failed_vols, FSDatasetDeltaInterface datasetDelta) { int removed_blocks = 0; for (BlockBucket blockBucket : blockBuckets) { removed_blocks += blockBucket.removeUnhealthyVolumes(failed_vols, datasetDelta); } return removed_blocks; }
/** Create a block file in a random volume*/ private long createBlockFile() throws IOException { FSVolume[] volumes = fds.volumes.volumes; int index = rand.nextInt(volumes.length - 1); long id = getFreeBlockId(); File file = new File(volumes[index].getDir().getPath(), getBlockFile(id)); if (file.createNewFile()) { LOG.info("Created block file " + file.getName()); } return id; }
/** Create a metafile in a random volume*/ private long createMetaFile() throws IOException { FSVolume[] volumes = fds.volumes.volumes; int index = rand.nextInt(volumes.length - 1); long id = getFreeBlockId(); File file = new File(volumes[index].getDir().getPath(), getMetaFile(id)); if (file.createNewFile()) { LOG.info("Created metafile " + file.getName()); } return id; }
/** Create block file and corresponding metafile in a rondom volume */ private long createBlockMetaFile() throws IOException { FSVolume[] volumes = fds.volumes.volumes; int index = rand.nextInt(volumes.length - 1); long id = getFreeBlockId(); File file = new File(volumes[index].getDir().getPath(), getBlockFile(id)); if (file.createNewFile()) { LOG.info("Created block file " + file.getName()); // Create files with same prefix as block file but extension names // such that during sorting, these files appear around meta file // to test how DirectoryScanner handles extraneous files String name1 = file.getAbsolutePath() + ".l"; String name2 = file.getAbsolutePath() + ".n"; file = new File(name1); if (file.createNewFile()) { LOG.info("Created extraneous file " + name1); } file = new File(name2); if (file.createNewFile()) { LOG.info("Created extraneous file " + name2); } file = new File(volumes[index].getDir().getPath(), getMetaFile(id)); if (file.createNewFile()) { LOG.info("Created metafile " + file.getName()); } } return id; }
private void testRbwReplicas(MiniDFSCluster cluster, boolean isCorrupt) throws IOException { FSDataOutputStream out = null; try { FileSystem fs = cluster.getFileSystem(); NamespaceInfo nsInfo = cluster.getNameNode().versionRequest(); final int fileLen = 515; // create some rbw replicas on disk byte[] writeBuf = new byte[fileLen]; new Random().nextBytes(writeBuf); final Path src = new Path("/test.txt"); out = fs.create(src); out.write(writeBuf); out.sync(); DataNode dn = cluster.getDataNodes().get(0); // corrupt rbw replicas for (FSVolume volume : ((FSDataset) dn.data).volumes.getVolumes()) { File rbwDir = volume.getRbwDir(nsInfo.getNamespaceID()); for (File file : rbwDir.listFiles()) { if (isCorrupt) { if (Block.isSeparateChecksumBlockFilename(file.getName())) { new RandomAccessFile(file, "rw").setLength(fileLen - 1); // corrupt } else if (Block.isInlineChecksumBlockFilename(file.getName())) { new RandomAccessFile(file, "rw").setLength(file.length() - 1); // corrupt } } } } cluster.restartDataNodes(); cluster.waitActive(); dn = cluster.getDataNodes().get(0); // check volumeMap: one rbw replica NamespaceMap volumeMap = ((FSDataset) (dn.data)).volumeMap .getNamespaceMap(nsInfo.getNamespaceID()); assertEquals(1, volumeMap.size()); Block replica = null; for (int i = 0; i < volumeMap.getNumBucket(); i++) { Set<Block> blockSet = volumeMap.getBucket(i).blockInfoMap.keySet(); if (blockSet.isEmpty()) { continue; } Block r = blockSet.iterator().next(); if (r != null) { replica = r; break; } } if (isCorrupt) { assertEquals((fileLen - 1), replica.getNumBytes()); } else { assertEquals(fileLen, replica.getNumBytes()); } dn.data.invalidate(nsInfo.getNamespaceID(), new Block[] { replica }); fs.delete(src, false); } finally { IOUtils.closeStream(out); } }
@Override void deleteAsyncFile(FSVolume volume, File file) { DataNode.LOG.info("Scheduling file " + file.toString() + " for deletion"); new FileDeleteTask(volume, file).run(); }
FSVolume getVolume() { return volume; }
FSVolume getVolume() { return diskScanInfo.getVolume(); }
public ReportCompiler(FSVolume volume, DataNode datanode) { this.volume = volume; this.datanode = datanode; }
void init() throws IOException { // get the list of blocks and arrange them in random order Block arr[] = dataset.getBlockReport(namespaceId); Collections.shuffle(Arrays.asList(arr)); blockInfoSet = new LightWeightLinkedSet<BlockScanInfo>(); blockMap = new HashMap<Block, BlockScanInfo>(); long scanTime = -1; for (Block block : arr) { BlockScanInfo info = new BlockScanInfo( block ); info.lastScanTime = scanTime--; //still keep 'info.lastScanType' to NONE. addBlockInfo(info); } /* Pick the first directory that has any existing scanner log. * otherwise, pick the first directory. */ File dir = null; FSDataset.FSVolume[] volumes = dataset.volumes.getVolumes(); for(FSDataset.FSVolume vol : volumes) { File nsDir = vol.getNamespaceSlice(namespaceId).getDirectory(); if (LogFileHandler.isFilePresent(nsDir, verificationLogFile)) { dir = nsDir; break; } } if (dir == null) { dir = volumes[0].getNamespaceSlice(namespaceId).getDirectory(); } try { // max lines will be updated later during initialization. verificationLog = new LogFileHandler(dir, verificationLogFile, 100); } catch (IOException e) { LOG.warn("Could not open verfication log. " + "Verification times are not stored."); } synchronized (this) { throttler = new DataTransferThrottler(MAX_SCAN_RATE); } }
static File getCurrentFile(FSVolume vol, int namespaceId) throws IOException { return LogFileHandler.getCurrentFile(vol.getNamespaceSlice(namespaceId).getDirectory(), DataBlockScanner.verificationLogFile); }
public BlockDataFile (File file, FSVolume volume) { this.file = file; this.volume = volume; }
public FSVolume getVolume() { return volume; }
DatanodeBlockInfo(FSVolume vol, File file) { this.volume = vol; this.file = file; detached = false; }
DatanodeBlockInfo(FSVolume vol) { this.volume = vol; this.file = null; detached = false; }
ScanInfo(long blockId, File blockFile, File metaFile, FSVolume vol) { this.blockId = blockId; this.metaFile = metaFile; this.blockFile = blockFile; this.volume = vol; }
public ReportCompiler(FSVolume volume, File dir) { this.dir = dir; this.volume = volume; }
/** * Set the volume where this replica is located on disk */ void setVolume(FSVolume vol) { this.volume = vol; }
@Override //ReplicaInfo void setVolume(FSVolume vol) { super.setVolume(vol); original.setVolume(vol); }
private void testRbwReplicas(MiniDFSCluster cluster, boolean isCorrupt) throws IOException { FSDataOutputStream out = null; FileSystem fs = cluster.getFileSystem(); final Path src = new Path("/test.txt"); try { final int fileLen = 515; // create some rbw replicas on disk byte[] writeBuf = new byte[fileLen]; new Random().nextBytes(writeBuf); out = fs.create(src); out.write(writeBuf); out.hflush(); DataNode dn = cluster.getDataNodes().get(0); for (FSVolume volume : ((FSDataset)dn.data).volumes.volumes) { File currentDir = volume.getDir().getParentFile(); File rbwDir = new File(currentDir, "rbw"); for (File file : rbwDir.listFiles()) { if (isCorrupt && Block.isBlockFilename(file)) { new RandomAccessFile(file, "rw").setLength(fileLen-1); // corrupt } } } cluster.restartDataNodes(); cluster.waitActive(); dn = cluster.getDataNodes().get(0); // check volumeMap: one rwr replica ReplicasMap replicas = ((FSDataset)(dn.data)).volumeMap; Assert.assertEquals(1, replicas.size()); ReplicaInfo replica = replicas.replicas().iterator().next(); Assert.assertEquals(ReplicaState.RWR, replica.getState()); if (isCorrupt) { Assert.assertEquals((fileLen-1)/512*512, replica.getNumBytes()); } else { Assert.assertEquals(fileLen, replica.getNumBytes()); } dn.data.invalidate(new Block[]{replica}); } finally { IOUtils.closeStream(out); if (fs.exists(src)) { fs.delete(src, false); } fs.close(); } }
private void testRbwReplicas(MiniDFSCluster cluster, boolean isCorrupt) throws IOException { FSDataOutputStream out = null; try { FileSystem fs = cluster.getFileSystem(); NamespaceInfo nsInfo = cluster.getNameNode().versionRequest(); final int fileLen = 515; // create some rbw replicas on disk byte[] writeBuf = new byte[fileLen]; new Random().nextBytes(writeBuf); final Path src = new Path("/test.txt"); out = fs.create(src); out.write(writeBuf); out.sync(); DataNode dn = cluster.getDataNodes().get(0); // corrupt rbw replicas for (FSVolume volume : ((FSDataset) dn.data).volumes.getVolumes()) { File rbwDir = volume.getRbwDir(nsInfo.getNamespaceID()); for (File file : rbwDir.listFiles()) { if (isCorrupt && Block.isBlockFilename(file.getName())) { new RandomAccessFile(file, "rw").setLength(fileLen - 1); // corrupt } } } cluster.restartDataNodes(); cluster.waitActive(); dn = cluster.getDataNodes().get(0); // check volumeMap: one rbw replica Map<Block, DatanodeBlockInfo> volumeMap = ((FSDataset) (dn.data)).volumeMap.getNamespaceMap(nsInfo.getNamespaceID()); assertEquals(1, volumeMap.size()); Block replica = volumeMap.keySet().iterator().next(); if (isCorrupt) { assertEquals((fileLen - 1), replica.getNumBytes()); } else { assertEquals(fileLen, replica.getNumBytes()); } dn.data.invalidate(nsInfo.getNamespaceID(), new Block[] { replica }); fs.delete(src, false); } finally { IOUtils.closeStream(out); } }