/** * Verify the file length and file crc. */ private static boolean verifyFile(FileSystem fs, Path filePath, int fileLen, DataChecksum checksum) throws IOException { FileStatus stat = fs.getFileStatus(filePath); if (stat.getLen() != fileLen) { return false; } int fileCRC = fs.getFileCrc(filePath); LOG.info("Expected checksum: " + (int)checksum.getValue() + ", get: " + fileCRC); InputStream in = fs.open(filePath); DataChecksum newChecksum = DataChecksum.newDataChecksum(FSConstants.CHECKSUM_TYPE, 1); int toRead = fileLen; byte[] buffer = new byte[1024 * 1024]; while (toRead > 0) { int numRead = in.read(buffer); newChecksum.update(buffer, 0, numRead); toRead -= numRead; } LOG.info("Read CRC: " + (int)newChecksum.getValue()); return (int)checksum.getValue() == fileCRC && (int)newChecksum.getValue() == fileCRC; }
/** * Tests to make sure the returned addresses are correct in case of default * configuration with no federation */ @Test public void testNonFederation() throws Exception { Configuration conf = new Configuration(); // Returned namenode address should match default address conf.set("fs.default.name", "hdfs://localhost:1000"); verifyAddresses(conf, TestType.NAMENODE, "127.0.0.1:1000"); // Returned namenode address should match service RPC address conf = new Configuration(); conf.set(NameNode.DATANODE_PROTOCOL_ADDRESS, "localhost:1000"); conf.set(FSConstants.DFS_NAMENODE_RPC_ADDRESS_KEY, "localhost:1001"); verifyAddresses(conf, TestType.NAMENODE, "127.0.0.1:1000"); // Returned address should match RPC address conf = new Configuration(); conf.set(FSConstants.DFS_NAMENODE_RPC_ADDRESS_KEY, "localhost:1001"); verifyAddresses(conf, TestType.NAMENODE, "127.0.0.1:1001"); }
public static boolean corruptReplica(Block block, int replica, MiniDFSCluster cluster) throws IOException { Random random = new Random(); boolean corrupted = false; for (int i=replica*2; i<replica*2+2; i++) { File blockFile = new File(cluster.getBlockDirectory("data" + (i+1)), block.getBlockName()); if (blockFile.exists()) { corruptFile(blockFile, random); corrupted = true; continue; } File blockFileInlineChecksum = new File(cluster.getBlockDirectory("data" + (i + 1)), BlockInlineChecksumWriter.getInlineChecksumFileName( block, FSConstants.CHECKSUM_TYPE, cluster.conf.getInt( "io.bytes.per.checksum", FSConstants.DEFAULT_BYTES_PER_CHECKSUM))); if (blockFileInlineChecksum.exists()) { corruptFile(blockFileInlineChecksum, random); corrupted = true; continue; } } return corrupted; }
public static boolean canRollBack(StorageDirectory sd, Storage storage) throws IOException { File prevDir = sd.getPreviousDir(); if (!prevDir.exists()) { // use current directory then LOG.info("Storage directory " + sd.getRoot() + " does not contain previous fs state."); // read and verify consistency with other directories sd.read(); return false; } // read and verify consistency of the prev dir sd.read(sd.getPreviousVersionFile()); if (storage.getLayoutVersion() != FSConstants.LAYOUT_VERSION) { throw new IOException("Cannot rollback to storage version " + storage.getLayoutVersion() + " using this version of the NameNode, which uses storage version " + FSConstants.LAYOUT_VERSION + ". " + "Please use the previous version of HDFS to perform the rollback."); } return true; }
/** * Perform operations such as setting quota, deletion of files, rename and * ensure system can apply edits log during startup. */ public void testEditsLog() throws Exception { DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem(); Path src1 = new Path(dir, "testEditsLog/srcdir/src1"); Path dst1 = new Path(dir, "testEditsLog/dstdir/dst1"); createFile(fs, src1); fs.mkdirs(dst1.getParent()); createFile(fs, dst1); // Set quota so that dst1 parent cannot allow under it new files/directories fs.setQuota(dst1.getParent(), 2, FSConstants.QUOTA_DONT_SET); // Free up quota for a subsequent rename fs.delete(dst1, true); rename(src1, dst1, true, false); // Restart the cluster and ensure the above operations can be // loaded from the edits log restartCluster(); fs = (DistributedFileSystem)cluster.getFileSystem(); assertFalse(fs.exists(src1)); // ensure src1 is already renamed assertTrue(fs.exists(dst1)); // ensure rename dst exists }
/** * Setup hadoop mini-cluster for test. */ private static void oneTimeSetUp() throws IOException { ((Log4JLogger)HftpFileSystem.LOG).getLogger().setLevel(Level.ALL); final long seed = RAN.nextLong(); System.out.println("seed=" + seed); RAN.setSeed(seed); config = new Configuration(); config.set(FSConstants.SLAVE_HOST_NAME, "localhost"); cluster = new MiniDFSCluster(config, 2, true, null); hdfs = cluster.getFileSystem(); final String hftpuri = "hftp://" + config.get("dfs.http.address"); System.out.println("hftpuri=" + hftpuri); hftpFs = (HftpFileSystem) new Path(hftpuri).getFileSystem(config); }
private void setupDatanodeAddress(Configuration conf, InetSocketAddress toSet, boolean setupHostsFile) throws IOException { String rpcAddress = "127.0.0.1:0"; if (setupHostsFile) { String hostsFile = conf.get(FSConstants.DFS_HOSTS, "").trim(); if (hostsFile.length() == 0) { throw new IOException("Parameter dfs.hosts is not setup in conf"); } // Setup datanode in the include file, if it is defined in the conf String addressInString = NetUtils.toIpPort(toSet); if (addressInString != null) { rpcAddress = addressInString; } } conf.set(FSConstants.DFS_DATANODE_ADDRESS_KEY, rpcAddress); conf.set(FSConstants.DFS_DATANODE_HTTP_ADDRESS_KEY, "127.0.0.1:0"); conf.set(FSConstants.DFS_DATANODE_IPC_ADDRESS_KEY, "127.0.0.1:0"); }
/** * Test that includes/excludes will be ignored * if dfs.ignore.missing.include.files is set */ @Test public void testIncludesExcludesConfigure() throws IOException { String inFile = "/tmp/inFileNotExists"; String exFile = "/tmp/exFileNotExists"; File include = new File(inFile); File exclude = new File(exFile); include.delete(); exclude.delete(); assertFalse(include.exists()); assertFalse(exclude.exists()); Configuration conf = new Configuration(); conf.set("dfs.hosts.ignoremissing", "true"); conf.set(FSConstants.DFS_HOSTS, inFile); conf.set("dfs.hosts.exclude", exFile); cluster = new MiniDFSCluster(conf, 3, true, null); }
@Override public void initialize(URI name, Configuration conf) throws IOException { super.initialize(name, conf); setConf(conf); try { this.ugi = UnixUserGroupInformation.login(conf, true); } catch (LoginException le) { throw new IOException(StringUtils.stringifyException(le)); } initializedWith = name; if (conf.getBoolean(FSConstants.CLIENT_CONFIGURATION_LOOKUP_DONE, false)) { try { initializedWith = new URI(conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); } catch (URISyntaxException e) { LOG.error(e); } } nnAddr = NetUtils.createSocketAddr(name.toString()); doStrictContentLengthCheck = conf.getBoolean(STRICT_CONTENT_LENGTH, false); }
/** * Sets or resets quotas for a directory. * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long, long) */ void setQuota(String src, long namespaceQuota, long diskspaceQuota) throws IOException { // sanity check if ((namespaceQuota <= 0 && namespaceQuota != FSConstants.QUOTA_DONT_SET && namespaceQuota != FSConstants.QUOTA_RESET) || (diskspaceQuota <= 0 && diskspaceQuota != FSConstants.QUOTA_DONT_SET && diskspaceQuota != FSConstants.QUOTA_RESET)) { throw new IllegalArgumentException("Invalid values for quota : " + namespaceQuota + " and " + diskspaceQuota); } try { namenode.setQuota(src, namespaceQuota, diskspaceQuota); } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileNotFoundException.class, NSQuotaExceededException.class, DSQuotaExceededException.class); } }
/** * Returns list of InetSocketAddresses corresponding to namenodes from the * configuration. Note this is to be used by datanodes to get the list of * namenode addresses to talk to. * * Returns namenode address specifically configured for datanodes (using * service ports), if found. If not, regular RPC address configured for other * clients is returned. * * @param conf configuration * @return list of InetSocketAddress * @throws IOException on error */ public static List<InetSocketAddress> getNNServiceRpcAddresses( Configuration conf) throws IOException { // Use default address as fall back String defaultAddress; try { defaultAddress = NameNode.getDefaultAddress(conf); } catch (IllegalArgumentException e) { defaultAddress = null; } List<InetSocketAddress> addressList = getAddresses(conf, defaultAddress, NameNode.DATANODE_PROTOCOL_ADDRESS, FSConstants.DFS_NAMENODE_RPC_ADDRESS_KEY); if (addressList == null) { throw new IOException("Incorrect configuration: namenode address " + NameNode.DATANODE_PROTOCOL_ADDRESS + " or " + FSConstants.DFS_NAMENODE_RPC_ADDRESS_KEY + " is not configured."); } return addressList; }
public void initializeStreams(int bytesPerChecksum, int checksumSize, Block block, String inAddr, int namespaceId, DataNode datanode) throws FileNotFoundException, IOException { if (this.blockDataWriter == null) { blockDataWriter = blockDataFile.getWriter(-1); } if (this.cout == null) { this.cout = new FileOutputStream( new RandomAccessFile(metafile, "rw").getFD()); } checksumOut = new DataOutputStream(new BufferedOutputStream(cout, FSConstants.SMALL_BUFFER_SIZE)); setParameters(bytesPerChecksum, checksumSize, block, inAddr, namespaceId, datanode); }
public NameNodeRouter(ClientProxyCommons commons) throws IOException { this.commons = commons; this.clusterId = commons.conf.getInt(FSConstants.DFS_CLUSTER_ID, RequestMetaInfo.NO_CLUSTER_ID); if (this.clusterId == RequestMetaInfo.NO_CLUSTER_ID) { String msg = "Cluster ID is not set in configuration."; LOG.error(msg); throw new IllegalArgumentException(msg); } handlers = new HashMap<String, NameNodeHandler>(); try { for (String nameserviceId : commons.conf.getStringCollection( FSConstants.DFS_FEDERATION_NAMESERVICES)) { LOG.info("Initializing NameNodeHandler for clusterId: " + clusterId + "nameserviceId: " + nameserviceId); handlers.put(nameserviceId, new NameNodeHandler(commons, nameserviceId)); } } catch (URISyntaxException e) { LOG.error("Malformed URI", e); throw new IOException(e); } }
public void setUp() throws Exception { try { Configuration conf = new Configuration(); // Bind port automatically conf.setInt(StorageServiceConfigKeys.PROXY_THRIFT_PORT_KEY, 0); conf.setInt(StorageServiceConfigKeys.PROXY_RPC_PORT_KEY, 0); cluster = new MiniAvatarCluster(conf, 2, true, null, null, 1, true); proxyService = new ClientProxyService(new ClientProxyCommons(conf, conf.get( FSConstants.DFS_CLUSTER_NAME))); benchmark = new NNLatencyBenchmark(); benchmark.setConf(conf); } catch (IOException e) { tearDown(); throw e; } }
/** * Checks if the service argument is specified in the command arguments. */ public static boolean isServiceSpecified(String command, Configuration conf, String[] argv) { if (conf.get(FSConstants.DFS_FEDERATION_NAMESERVICES) != null) { for (int i = 0; i < argv.length; i++) { if (argv[i].equals("-service")) { // found service specs return true; } } // no service specs printServiceErrorMessage(command, conf); return false; } return true; }
/** * Registers namenode's address in zookeeper */ private static boolean registerClientProtocolAddress(AvatarZooKeeperClient zk, Configuration originalConf, Configuration conf, boolean toOverwrite) throws UnsupportedEncodingException, IOException { LOG.info("Updating Client Address information in ZooKeeper"); InetSocketAddress addr = NameNode.getClientProtocolAddress(conf); if (addr == null) { LOG.error(FSConstants.DFS_NAMENODE_RPC_ADDRESS_KEY + " for primary service is not defined"); return true; } InetSocketAddress defaultAddr = NameNode.getClientProtocolAddress(originalConf); if (defaultAddr == null) { LOG.error(FSConstants.DFS_NAMENODE_RPC_ADDRESS_KEY + " for default service is not defined"); return true; } registerSocketAddress(zk, originalConf.get(NameNode.DFS_NAMENODE_RPC_ADDRESS_KEY), conf.get(NameNode.DFS_NAMENODE_RPC_ADDRESS_KEY), toOverwrite); /** TODO later: need to handle alias leave it as it is now */ registerAliases(zk, conf, FSConstants.FS_NAMENODE_ALIASES, conf.get(NameNode.DFS_NAMENODE_RPC_ADDRESS_KEY), toOverwrite); return false; }
/** * Safely reads the log version from the stream. Logic is exactly the same * as in the equivalent {@link EditLogFileInputStream} method. * @see EditLogFileInputStream#readLogVersion(DataInputStream) * @return The log version or 0 if stream is empty */ private static int readLogVersion(DataInputStream in) throws IOException { int logVersion = 0; in.mark(4); // See comments in EditLogFileInputStream as to why readLogVersion is // implemented in this way boolean available = true; try { logVersion = in.readByte(); } catch (EOFException e) { available = false; } if (available) { in.reset(); logVersion = in.readInt(); if (logVersion < FSConstants.LAYOUT_VERSION) { throw new LedgerHeaderCorruptException( "Unexpected version of the log segment in the ledger: " + logVersion + ". Current version is " + FSConstants.LAYOUT_VERSION + "."); } } return logVersion; }
/** * When avatarone and avatarzero both write to the filer in the case where the * filer is not the shared directory (when we use QJM). We need both the nodes * to write to different directories. For this purpose we configure * dfs.name.dir and dfs.name.edits.dir as follows * * <property> * <name>dfs.name.dir</name> * <value>/hadoop/<cluster>/,/mnt/fsimage/<cluster>/%</value> * </property> * * Then depending upon the instance we replace '%' with zero or one in this * function to ensure both nodes write to different locations. */ private static void processNameDirectories(Configuration conf, InstanceId instanceId) { if (instanceId == InstanceId.NODEONE || instanceId == InstanceId.NODEZERO) { String instance = (instanceId == InstanceId.NODEZERO) ? "zero" : "one"; // Edits directory. String editDirs = getWildcardDir(instance, conf, FSConstants.DFS_NAMENODE_EDITS_DIR_KEY); conf.set(FSConstants.DFS_NAMENODE_EDITS_DIR_KEY, editDirs); // Image directory. String imageDirs = getWildcardDir(instance, conf, FSConstants.DFS_NAMENODE_NAME_DIR_KEY); conf.set(FSConstants.DFS_NAMENODE_NAME_DIR_KEY, imageDirs); } }
/** * Returns the address of the remote namenode */ static InetSocketAddress getRemoteNamenodeAddress(Configuration conf, InstanceId instance) throws IOException { String fs = null; if (instance == InstanceId.NODEZERO) { fs = conf.get(DFS_NAMENODE_RPC_ADDRESS1_KEY); if (fs == null) fs = conf.get("fs.default.name1"); } else if (instance == InstanceId.NODEONE) { fs = conf.get(DFS_NAMENODE_RPC_ADDRESS0_KEY); if (fs == null) fs = conf.get("fs.default.name0"); } else { throw new IOException("Unknown instance " + instance); } if(fs != null) { Configuration newConf = new Configuration(conf); newConf.set(FSConstants.DFS_NAMENODE_RPC_ADDRESS_KEY, fs); conf = newConf; } return NameNode.getClientProtocolAddress(conf); }
public void setUp(String name) throws Exception { LOG.info("------------------- test: " + name + ", federation: " + federation + " START ----------------"); oldThreads = new HashSet<Thread>(Thread.getAllStackTraces().keySet()); conf = new Configuration(); hosts = HOST_FILE_PATH + "_" + System.currentTimeMillis(); File f = new File(hosts); f.delete(); f.createNewFile(); conf.set(FSConstants.DFS_HOSTS, hosts); conf.setInt("dfs.datanode.failed.volumes.tolerated", 0); if (!federation) { cluster = new MiniAvatarCluster.Builder(conf).build(); } else { cluster = new MiniAvatarCluster.Builder(conf) .numNameNodes(2).federation(true).build(); } federation = false; }
protected void getFields(Properties props, StorageDirectory sd ) throws IOException { setLayoutVersion(props, sd); setStorageType(props, sd); // Read NamespaceID in version before federation if (layoutVersion > FSConstants.FEDERATION_VERSION) { setNamespaceID(props, sd); setcTime(props, sd); } String ssid = props.getProperty(STORAGE_ID); if (ssid == null || !("".equals(storageID) || "".equals(ssid) || storageID.equals(ssid))) throw new InconsistentFSStateException(sd.getRoot(), "has incompatible storage Id."); if ("".equals(storageID)) // update id only if it was empty storageID = ssid; }
NameNodeInfo(int nnIndex) { avatarDir = baseAvatarDir; fsimagelocalDir = avatarDir + "/fsimagelocal-" + FSConstants.DFS_NAMENODE_NAME_DIR_WILDCARD; fseditslocalDir = avatarDir + "/fseditslocal-" + FSConstants.DFS_NAMENODE_NAME_DIR_WILDCARD; fsimage0Dir = avatarDir + "/fsimage0"; fsimage1Dir = avatarDir + "/fsimage1"; fsedits0Dir = avatarDir + "/fsedits0"; fsedits1Dir = avatarDir + "/fsedits1"; rpcPort = nnPort = MiniDFSCluster.getFreePort(); nnDnPort = MiniDFSCluster.getFreePort(); httpPort = MiniDFSCluster.getFreePort(); rpc0Port = nn0Port = MiniDFSCluster.getFreePorts(2); nnDn0Port = MiniDFSCluster.getFreePort(); http0Port = MiniDFSCluster.getFreePort(); rpc1Port = nn1Port = MiniDFSCluster.getFreePorts(2); nnDn1Port = MiniDFSCluster.getFreePort(); http1Port = MiniDFSCluster.getFreePort(); }
public void updateAvatarConf(Configuration newConf) { conf = new Configuration(newConf); if (federation) { conf.set(FSConstants.DFS_FEDERATION_NAMESERVICE_ID, nameserviceId); } // server config for avatar nodes a0Conf = new Configuration(conf); a1Conf = new Configuration(conf); a0Conf.set("dfs.name.dir", fsimagelocalDir); a0Conf.set("dfs.name.edits.dir", fseditslocalDir); a0Conf.set("fs.checkpoint.dir", avatarDir + "/checkpoint0"); a1Conf.set("dfs.name.dir", fsimagelocalDir); a1Conf.set("dfs.name.edits.dir", fseditslocalDir); a1Conf.set("fs.checkpoint.dir", avatarDir + "/checkpoint1"); }
public void cleanupAvatarDirs() throws IOException { String[] files = new String[] { fsimagelocalDir.replaceAll( FSConstants.DFS_NAMENODE_NAME_DIR_WILDCARD, "zero"), fsimagelocalDir.replaceAll( FSConstants.DFS_NAMENODE_NAME_DIR_WILDCARD, "one"), fsimage0Dir, fsimage1Dir, fseditslocalDir.replaceAll( FSConstants.DFS_NAMENODE_NAME_DIR_WILDCARD, "zero"), fseditslocalDir.replaceAll( FSConstants.DFS_NAMENODE_NAME_DIR_WILDCARD, "one"), fsedits0Dir, fsedits1Dir }; for (String filename : files) { FileUtil.fullyDelete(new File(filename)); } }
@Test public void testVerifyEditLogLedgerMetadata() throws Exception { EditLogLedgerMetadata m0 = new EditLogLedgerMetadata( FSConstants.LAYOUT_VERSION, 1, 1, 100); EditLogLedgerMetadata m1 = new EditLogLedgerMetadata( FSConstants.LAYOUT_VERSION, 2, 101, 200); String m0Path = manager.fullyQualifiedPathForLedger(m0); String m1Path = manager.fullyQualifiedPathForLedger(m1); manager.writeEditLogLedgerMetadata(m0Path, m0); manager.writeEditLogLedgerMetadata(m1Path, m1); assertTrue(m0 + " should verify under " + m0Path, manager.verifyEditLogLedgerMetadata(m0, m0Path)); assertTrue(m1 + " should verify under " + m1Path, manager.verifyEditLogLedgerMetadata(m1, m1Path)); assertFalse(m0 + " should not verify under " + m1Path, manager.verifyEditLogLedgerMetadata(m0, m1Path)); assertFalse(m1 + " should not verify under" + m0Path, manager.verifyEditLogLedgerMetadata(m1, m0Path)); assertFalse("Non-existent path should not verify!", manager.verifyEditLogLedgerMetadata(m0, "/does/not/exist")); }
/** * Tests that {@link EditLogLedgerMetadata} can be correctly serialized * and deserialized. */ @Test public void testReadAndWrite() throws Exception { EditLogLedgerMetadata ledgerMetadataIn = new EditLogLedgerMetadata( FSConstants.LAYOUT_VERSION, 1, 1, -1); EditLogLedgerMetadataWritable ledgerMetadataWritableIn = new EditLogLedgerMetadataWritable(); ledgerMetadataWritableIn.set(ledgerMetadataIn); // Calls readWriteFields() byte[] editLogLedgerMedataBytes = WritableUtil.writableToByteArray(ledgerMetadataWritableIn); // Calls readFields() EditLogLedgerMetadataWritable ledgerMetadataWritableOut = WritableUtil.readWritableFromByteArray(editLogLedgerMedataBytes, new EditLogLedgerMetadataWritable()); // Tests that deserialize(read(write(serialize(deserialize(m)) == m EditLogLedgerMetadata ledgerMetadataOut = ledgerMetadataWritableOut.get(); assertEquals(ledgerMetadataIn, ledgerMetadataOut); }
private static boolean doAppendTest(FileSystem fs, Path filePath, Random random, Reporter reporter) throws IOException { if (reporter == null) { reporter = Reporter.NULL; } FSDataOutputStream out = fs.create(filePath); DataChecksum checksum = DataChecksum.newDataChecksum(FSConstants.CHECKSUM_TYPE, 1); checksum.reset(); int fileLen = 0; int len = random.nextInt((int) (SIZE_RANGE + fs.getDefaultBlockSize())); fileLen += len; writeToFile(random, out, len, checksum); out.close(); reporter.progress(); for (int i = 0; i < round; i++) { out = fs.append(filePath); len = random.nextInt(SIZE_RANGE); fileLen += len; writeToFile(random, out, len, checksum); out.close(); reporter.progress(); } return verifyFile(fs, filePath, fileLen, checksum); }
/** Setup federation nameServiceIds in the configuration */ private void setupNameServices(Configuration conf, int nameServiceIdCount) { StringBuilder nsList = new StringBuilder(); for (int i = 0; i < nameServiceIdCount; i++) { if (nsList.length() > 0) { nsList.append(","); } nsList.append(getNameServiceId(i)); } conf.set(FSConstants.DFS_FEDERATION_NAMESERVICES, nsList.toString()); }
protected void writeCorruptedData(RandomAccessFile file) throws IOException { final String messageForPreUpgradeVersion = "\nThis file is INTENTIONALLY CORRUPTED so that versions\n" + "of Hadoop prior to 0.13 (which are incompatible\n" + "with this directory layout) will fail to start.\n"; file.seek(0); file.writeInt(FSConstants.LAYOUT_VERSION); org.apache.hadoop.io.UTF8.writeString(file, ""); file.writeBytes(messageForPreUpgradeVersion); file.getFD().sync(); }
public synchronized void completeUpgrade() throws IOException { // set and write new upgrade state into disk setUpgradeState(false, FSConstants.LAYOUT_VERSION); namesystem.getFSImage().storage.writeAll(); currentUpgrades = null; broadcastCommand = null; namesystem.leaveSafeMode(false); }
/** * Get common storage fields. * Should be overloaded if additional fields need to be get. * * @param props * @throws IOException */ protected void getFields(Properties props, StorageDirectory sd ) throws IOException { String sv, st, sid, sct; sv = props.getProperty(LAYOUT_VERSION); st = props.getProperty(STORAGE_TYPE); sid = props.getProperty(NAMESPACE_ID); sct = props.getProperty(CHECK_TIME); if (sv == null || st == null || sid == null || sct == null) throw new InconsistentFSStateException(sd.root, "file " + STORAGE_FILE_VERSION + " is invalid."); int rv = Integer.parseInt(sv); NodeType rt = NodeType.valueOf(st); int rid = Integer.parseInt(sid); long rct = Long.parseLong(sct); if (!storageType.equals(rt) || !((namespaceID == 0) || (rid == 0) || namespaceID == rid)) throw new InconsistentFSStateException(sd.root, "is incompatible with others. " + " namespaceID is " + namespaceID + " and rid is " + rid + "," + " storage type is " + storageType + " but rt is " + rt); if (rv < FSConstants.LAYOUT_VERSION) // future version throw new IncorrectVersionException(rv, "storage directory " + sd.root.getCanonicalPath()); layoutVersion = rv; storageType = rt; namespaceID = rid; cTime = rct; }
public void testNonFederationClusterUpgradeAfterFederationVersion() throws Exception { File[] baseDirs; UpgradeUtilities.initialize(); for (int numDirs = 1; numDirs <= 2; numDirs++) { conf = new Configuration(); conf.setInt("dfs.datanode.scan.period.hours", -1); conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf); String[] nameNodeDirs = conf.getStrings("dfs.name.dir"); String[] dataNodeDirs = conf.getStrings("dfs.data.dir"); log("DataNode upgrade with federation layout version in current", numDirs); UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current"); try { cluster = new MiniDFSCluster(conf, 0, StartupOption.UPGRADE); baseDirs = UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current"); UpgradeUtilities.createVersionFile(DATA_NODE, baseDirs, new StorageInfo(FSConstants.FEDERATION_VERSION, UpgradeUtilities.getCurrentNamespaceID(cluster), UpgradeUtilities.getCurrentFsscTime(cluster)), cluster.getNameNode().getNamespaceID()); cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null); checkResult(DATA_NODE, dataNodeDirs, 0, false); } finally { if (cluster != null) cluster.shutdown(); UpgradeUtilities.createEmptyDirs(nameNodeDirs); UpgradeUtilities.createEmptyDirs(dataNodeDirs); } } }
public void testFederationClusterUpgradeAfterFederationVersionWithCTimeChange() throws Exception { File[] baseDirs; Configuration baseConf = new Configuration(); UpgradeUtilities.initialize(2, baseConf, true); for (int numDirs = 1; numDirs <= 2; numDirs++) { conf = new Configuration(); conf.setInt("dfs.datanode.scan.period.hours", -1); conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf); String[] nameNodeDirs = conf.getStrings("dfs.name.dir"); String[] dataNodeDirs = conf.getStrings("dfs.data.dir"); log("DataNode upgrade with federation layout version in current and ctime change", numDirs); UpgradeUtilities.createFederatedNameNodeStorageDirs(nameNodeDirs); conf.set(FSConstants.DFS_FEDERATION_NAMESERVICES, baseConf.get(FSConstants.DFS_FEDERATION_NAMESERVICES)); try { cluster = new MiniDFSCluster(conf, 0, StartupOption.UPGRADE, false, 2); baseDirs = UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current"); for (int i = 0; i < 2; i++) { UpgradeUtilities.createVersionFile(DATA_NODE, baseDirs, new StorageInfo(FSConstants.FEDERATION_VERSION, cluster .getNameNode(i).getNamespaceID(), cluster.getNameNode(i) .versionRequest().getCTime() - 1), cluster.getNameNode(i) .getNamespaceID()); } cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null); for (int i = 0; i < 2; i++) { checkResult(DATA_NODE, dataNodeDirs, i, false); } } finally { if (cluster != null) cluster.shutdown(); UpgradeUtilities.createEmptyDirs(nameNodeDirs); UpgradeUtilities.createEmptyDirs(dataNodeDirs); } } }
/** * Helper function to generate consectuve datanode addresses and * fill in the hostfiles with them. * @param baseDirectory Root directory where the hosts file should be. * @param racks RackAwareness to assign */ private static List<InetSocketAddress> setupHostsFile(int numberOfDatanodes, Configuration conf, File baseDirectory, String[] racks) throws IOException { List<InetSocketAddress> datanodeAddresses = generateDatanodeAddresses(numberOfDatanodes); conf.set(FSConstants.DFS_HOSTS, writeHostsFile(datanodeAddresses, getHostsFile(conf, baseDirectory))); if (racks != null) { for (int i = 0; i < racks.length; i++) { StaticMapping.addNodeToRack(NetUtils.toIpPort(datanodeAddresses.get(i)), racks[i]); } } return datanodeAddresses; }
/** * Creates a hosts file and returns the object after setting the configuration * @param baseDirectory Root directory for the file to be in. * @return The FileObject after creating the file. */ private static File getHostsFile(Configuration conf, File baseDirectory) throws IOException { File hostsFile = null; if (conf.get(FSConstants.DFS_HOSTS, "").length() != 0) { hostsFile = new File(conf.get(FSConstants.DFS_HOSTS)); } else { baseDirectory.mkdirs(); hostsFile = new File(baseDirectory, "hosts"); } hostsFile.createNewFile(); return hostsFile; }
/** Initialize configuration for federation cluster */ private static void initFederationConf(Configuration conf, Collection<String> nameserviceIds, int numDataNodes, int nnPort) { String nameserviceIdList = ""; for (String nameserviceId : nameserviceIds) { // Create comma separated list of nameserviceIds if (nameserviceIdList.length() > 0) { nameserviceIdList += ","; } nameserviceIdList += nameserviceId; initFederatedNamenodeAddress(conf, nameserviceId, nnPort); nnPort = nnPort == 0 ? 0 : nnPort + 2; } conf.set(FSConstants.DFS_FEDERATION_NAMESERVICES, nameserviceIdList); }
boolean corruptBlockOnDataNode(int i, Block block) throws Exception { Random random = new Random(); boolean corrupted = false; if (i < 0 || i >= dataNodes.size()) return false; for (int dn = i*2; dn < i*2+2; dn++) { String blockFileName; if (this.getDataNodes().get(0).useInlineChecksum) { blockFileName = BlockInlineChecksumWriter.getInlineChecksumFileName( block, FSConstants.CHECKSUM_TYPE, conf .getInt("io.bytes.per.checksum", FSConstants.DEFAULT_BYTES_PER_CHECKSUM)); } else { blockFileName = block.getBlockName(); } File blockFile = new File(getBlockDirectory("data" + (dn+1)), blockFileName); System.out.println("Corrupting for: " + blockFile); if (blockFile.exists()) { // Corrupt replica by writing random bytes into replica RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw"); FileChannel channel = raFile.getChannel(); String badString = "BADBAD"; int rand = random.nextInt((int)channel.size()/2); raFile.seek(rand); raFile.write(badString.getBytes()); raFile.close(); } corrupted = true; } return corrupted; }
/** * Add a namenode to cluster and start it. Configuration of datanodes * in the cluster is refreshed to register with the new namenode. * @return newly started namenode */ public NameNode addNameNode(Configuration conf, int namenodePort) throws IOException { if(!federation) { throw new IOException("cannot add namenode to non-federated cluster"); } int nnIndex = nameNodes.length; int numNameNodes = nameNodes.length + 1; NameNodeInfo[] newlist = new NameNodeInfo[numNameNodes]; System.arraycopy(nameNodes, 0, newlist, 0, nameNodes.length); nameNodes = newlist; String nameserviceId = NAMESERVICE_ID_PREFIX + getNSId(); String nameserviceIds = conf.get(FSConstants.DFS_FEDERATION_NAMESERVICES); nameserviceIds += "," + nameserviceId; conf.set(FSConstants.DFS_FEDERATION_NAMESERVICES, nameserviceIds); initFederatedNamenodeAddress(conf, nameserviceId, namenodePort); createFederatedNameNode(nnIndex, conf, numDataNodes, true, true, null, nameserviceId); // Refresh datanodes with the newly started namenode for (DataNodeProperties dn : dataNodes) { DataNode datanode = dn.datanode; datanode.refreshNamenodes(conf); } // Wait for new namenode to get registrations from all the datanodes waitActive(true, nnIndex); return nameNodes[nnIndex].nameNode; }
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress, boolean forceSync, boolean doParallelWrites, WriteOptions options) throws IOException { return new FSDataOutputStream (dfs.create(getPathName(f), permission, overwrite, false, replication, blockSize, progress, bufferSize, getConf().getInt("io.bytes.per.checksum", FSConstants.DEFAULT_BYTES_PER_CHECKSUM), forceSync, doParallelWrites, null, options), statistics); }
/** * Determines if the given Namenode version and Datanode version * are compatible with each other. Compatibility in this case mean * that the Namenode and Datanode will successfully start up and * will work together. The rules for compatibility, * taken from the DFS Upgrade Design, are as follows: * <pre> * 1. The data-node does regular startup (no matter which options * it is started with) if * softwareLV == storedLV AND * DataNode.FSSCTime == NameNode.FSSCTime * 2. The data-node performs an upgrade if it is started without any * options and * |softwareLV| > |storedLV| OR * (softwareLV == storedLV AND * DataNode.FSSCTime < NameNode.FSSCTime) * 3. NOT TESTED: The data-node rolls back if it is started with * the -rollback option and * |softwareLV| >= |previous.storedLV| AND * DataNode.previous.FSSCTime <= NameNode.FSSCTime * 4. In all other cases the startup fails. * </pre> */ boolean isVersionCompatible(StorageInfo namenodeVer, StorageInfo datanodeVer) { // check #0 if (namenodeVer.getNamespaceID() != datanodeVer.getNamespaceID()) { LOG.info("namespaceIDs are not equal: isVersionCompatible=false"); return false; } // check #1 int softwareLV = FSConstants.LAYOUT_VERSION; // will also be Namenode's LV int storedLV = datanodeVer.getLayoutVersion(); if (softwareLV == storedLV && datanodeVer.getCTime() == namenodeVer.getCTime()) { LOG.info("layoutVersions and cTimes are equal: isVersionCompatible=true"); return true; } // check #2 long absSoftwareLV = Math.abs((long)softwareLV); long absStoredLV = Math.abs((long)storedLV); if (absSoftwareLV > absStoredLV || (softwareLV == storedLV && datanodeVer.getCTime() < namenodeVer.getCTime())) { LOG.info("softwareLayoutVersion is newer OR namenode cTime is newer: isVersionCompatible=true"); return true; } // check #4 LOG.info("default case: isVersionCompatible=false"); return false; }