private NamespaceInfo handshake(Configuration conf) throws IOException { // connect to name node InetSocketAddress nnAddress = NameNode.getServiceAddress(conf, true); this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddress, NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true).getProxy(); this.nnRpcAddress = NetUtils.getHostPortString(nnAddress); this.nnHttpAddress = DFSUtil.getInfoServer(nnAddress, conf, DFSUtil.getHttpClientScheme(conf)).toURL(); // get version and id info from the name-node NamespaceInfo nsInfo = null; while(!isStopRequested()) { try { nsInfo = handshake(namenode); break; } catch(SocketTimeoutException e) { // name-node is busy LOG.info("Problem connecting to server: " + nnAddress); try { Thread.sleep(1000); } catch (InterruptedException ie) { LOG.warn("Encountered exception ", e); } } } return nsInfo; }
private static NamespaceInfo handshake(NamenodeProtocol namenode) throws IOException, SocketTimeoutException { NamespaceInfo nsInfo; nsInfo = namenode.versionRequest(); // throws SocketTimeoutException String errorMsg = null; // verify build version if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion())) { errorMsg = "Incompatible build versions: active name-node BV = " + nsInfo.getBuildVersion() + "; backup node BV = " + Storage.getBuildVersion(); LOG.error(errorMsg); throw new IOException(errorMsg); } assert HdfsConstants.NAMENODE_LAYOUT_VERSION == nsInfo.getLayoutVersion() : "Active and backup node layout versions must be the same. Expected: " + HdfsConstants.NAMENODE_LAYOUT_VERSION + " actual "+ nsInfo.getLayoutVersion(); return nsInfo; }
private void checkNNVersion(NamespaceInfo nsInfo) throws IncorrectVersionException { // build and layout versions should match String nnVersion = nsInfo.getSoftwareVersion(); String minimumNameNodeVersion = dnConf.getMinimumNameNodeVersion(); if (VersionUtil.compareVersions(nnVersion, minimumNameNodeVersion) < 0) { IncorrectVersionException ive = new IncorrectVersionException( minimumNameNodeVersion, nnVersion, "NameNode", "DataNode"); LOG.warn(ive.getMessage()); throw ive; } String dnVersion = VersionInfo.getVersion(); if (!nnVersion.equals(dnVersion)) { LOG.info("Reported NameNode version '" + nnVersion + "' does not match " + "DataNode version '" + dnVersion + "' but is within acceptable " + "limits. Note: This is normal during a rolling upgrade."); } }
private void connectToNNAndHandshake() throws IOException { // get NN proxy bpNamenode = dn.connectToNN(nnAddr); // First phase of the handshake with NN - get the namespace // info. NamespaceInfo nsInfo = retrieveNamespaceInfo(); // Verify that this matches the other NN in this HA pair. // This also initializes our block pool in the DN if we are // the first NN connection for this BP. bpos.verifyAndSetNamespaceInfo(nsInfo); // Second phase of the handshake with the NN. register(nsInfo); }
/** * One of the Block Pools has successfully connected to its NN. * This initializes the local storage for that block pool, * checks consistency of the NN's cluster ID, etc. * * If this is the first block pool to register, this also initializes * the datanode-scoped storage. * * @param bpos Block pool offer service * @throws IOException if the NN is inconsistent with the local storage. */ void initBlockPool(BPOfferService bpos) throws IOException { NamespaceInfo nsInfo = bpos.getNamespaceInfo(); if (nsInfo == null) { throw new IOException("NamespaceInfo not found: Block pool " + bpos + " should have retrieved namespace info before initBlockPool."); } setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID()); // Register the new block pool with the BP manager. blockPoolManager.addBlockPool(bpos); // In the case that this is the first block pool to connect, initialize // the dataset, block scanners, etc. initStorage(nsInfo); // Exclude failed disks before initializing the block pools to avoid startup // failures. checkDiskError(); data.addBlockPool(nsInfo.getBlockPoolID(), conf); blockScanner.enableBlockPoolId(bpos.getBlockPoolId()); initDirectoryScanner(conf); }
/** * Analyze and load storage directories. Recover from previous transitions if * required. * * The block pool storages are either all analyzed or none of them is loaded. * Therefore, a failure on loading any block pool storage results a faulty * data volume. * * @param datanode Datanode to which this storage belongs to * @param nsInfo namespace information * @param dataDirs storage directories of block pool * @param startOpt startup option * @return an array of loaded block pool directories. * @throws IOException on error */ List<StorageDirectory> loadBpStorageDirectories( DataNode datanode, NamespaceInfo nsInfo, Collection<File> dataDirs, StartupOption startOpt) throws IOException { List<StorageDirectory> succeedDirs = Lists.newArrayList(); try { for (File dataDir : dataDirs) { if (containsStorageDir(dataDir)) { throw new IOException( "BlockPoolSliceStorage.recoverTransitionRead: " + "attempt to load an used block storage: " + dataDir); } StorageDirectory sd = loadStorageDirectory(datanode, nsInfo, dataDir, startOpt); succeedDirs.add(sd); } } catch (IOException e) { LOG.warn("Failed to analyze storage directories for block pool " + nsInfo.getBlockPoolID(), e); throw e; } return succeedDirs; }
@Test public void testSimpleWrite() throws Exception { NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi); bkjm.format(nsi); EditLogOutputStream out = bkjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); for (long i = 1 ; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(i); out.write(op); } out.close(); bkjm.finalizeLogSegment(1, 100); String zkpath = bkjm.finalizedLedgerZNode(1, 100); assertNotNull(zkc.exists(zkpath, false)); assertNull(zkc.exists(bkjm.inprogressZNode(1), false)); }
void format(StorageDirectory sd, NamespaceInfo nsInfo, String datanodeUuid) throws IOException { sd.clearDirectory(); // create directory this.layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION; this.clusterID = nsInfo.getClusterID(); this.namespaceID = nsInfo.getNamespaceID(); this.cTime = 0; setDatanodeUuid(datanodeUuid); if (sd.getStorageUuid() == null) { // Assign a new Storage UUID. sd.setStorageUuid(DatanodeStorage.generateUuid()); } writeProperties(sd); }
public IPCLoggerChannel(Configuration conf, NamespaceInfo nsInfo, String journalId, InetSocketAddress addr) { this.conf = conf; this.nsInfo = nsInfo; this.journalId = journalId; this.addr = addr; this.queueSizeLimitBytes = 1024 * 1024 * conf.getInt( DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY, DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT); singleThreadExecutor = MoreExecutors.listeningDecorator( createSingleThreadExecutor()); parallelExecutor = MoreExecutors.listeningDecorator( createParallelExecutor()); metrics = IPCLoggerChannelMetrics.create(this); }
public static String buildPath(String journalId, long segmentTxId, NamespaceInfo nsInfo) { StringBuilder path = new StringBuilder("/getJournal?"); try { path.append(JOURNAL_ID_PARAM).append("=") .append(URLEncoder.encode(journalId, "UTF-8")); path.append("&" + SEGMENT_TXID_PARAM).append("=") .append(segmentTxId); path.append("&" + STORAGEINFO_PARAM).append("=") .append(URLEncoder.encode(nsInfo.toColonSeparatedString(), "UTF-8")); } catch (UnsupportedEncodingException e) { // Never get here -- everyone supports UTF-8 throw new RuntimeException(e); } return path.toString(); }
@Test public void testTwoWriters() throws Exception { long start = 1; NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi); bkjm1.format(nsi); BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi); EditLogOutputStream out1 = bkjm1.startLogSegment(start, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); try { bkjm2.startLogSegment(start, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); fail("Shouldn't have been able to open the second writer"); } catch (IOException ioe) { LOG.info("Caught exception as expected", ioe); }finally{ out1.close(); } }
@Before public void setUp() throws IOException { mockDnConf = mock(DNConf.class); doReturn(VersionInfo.getVersion()).when(mockDnConf).getMinimumNameNodeVersion(); DataNode mockDN = mock(DataNode.class); doReturn(true).when(mockDN).shouldRun(); doReturn(mockDnConf).when(mockDN).getDnConf(); BPOfferService mockBPOS = mock(BPOfferService.class); doReturn(mockDN).when(mockBPOS).getDataNode(); actor = new BPServiceActor(INVALID_ADDR, mockBPOS); fakeNsInfo = mock(NamespaceInfo.class); // Return a a good software version. doReturn(VersionInfo.getVersion()).when(fakeNsInfo).getSoftwareVersion(); // Return a good layout version for now. doReturn(HdfsConstants.NAMENODE_LAYOUT_VERSION).when(fakeNsInfo) .getLayoutVersion(); DatanodeProtocolClientSideTranslatorPB fakeDnProt = mock(DatanodeProtocolClientSideTranslatorPB.class); when(fakeDnProt.versionRequest()).thenReturn(fakeNsInfo); actor.setNameNode(fakeDnProt); }
private QuorumJournalManager createSpyingQJM() throws IOException, URISyntaxException { AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() { @Override public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, String journalId, InetSocketAddress addr) { AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId, addr) { protected ExecutorService createSingleThreadExecutor() { // Don't parallelize calls to the quorum in the tests. // This makes the tests more deterministic. return MoreExecutors.sameThreadExecutor(); } }; return Mockito.spy(logger); } }; return closeLater(new QuorumJournalManager( conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO, spyFactory)); }
/** * Iterate over all the storage directories, checking if it should be * formatted. Format the storage if necessary and allowed by the user. * @return True if formatting is processed */ private boolean format(NNStorage storage, NamespaceInfo nsInfo) throws IOException { // Check with the user before blowing away data. if (!Storage.confirmFormat(storage.dirIterable(null), force, interactive)) { storage.close(); return false; } else { // Format the storage (writes VERSION file) storage.format(nsInfo); return true; } }
/** * Format all available storage directories. */ public void format(NamespaceInfo nsInfo) throws IOException { Preconditions.checkArgument(nsInfo.getLayoutVersion() == 0 || nsInfo.getLayoutVersion() == HdfsConstants.NAMENODE_LAYOUT_VERSION, "Bad layout version: %s", nsInfo.getLayoutVersion()); this.setStorageInfo(nsInfo); this.blockpoolID = nsInfo.getBlockPoolID(); for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) { StorageDirectory sd = it.next(); format(sd); } }
public NamespaceInfo getNamespaceInfo() { return new NamespaceInfo( getNamespaceID(), getClusterID(), getBlockPoolID(), getCTime()); }
@Override // NameNode protected void initialize(Configuration conf) throws IOException { // Trash is disabled in BackupNameNode, // but should be turned back on if it ever becomes active. conf.setLong(CommonConfigurationKeys.FS_TRASH_INTERVAL_KEY, CommonConfigurationKeys.FS_TRASH_INTERVAL_DEFAULT); NamespaceInfo nsInfo = handshake(conf); super.initialize(conf); namesystem.setBlockPoolId(nsInfo.getBlockPoolID()); if (false == namesystem.isInSafeMode()) { namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER); } // Backup node should never do lease recovery, // therefore lease hard limit should never expire. namesystem.leaseManager.setLeasePeriod( HdfsConstants.LEASE_SOFTLIMIT_PERIOD, Long.MAX_VALUE); // register with the active name-node registerWith(nsInfo); // Checkpoint daemon should start after the rpc server started runCheckpointDaemon(conf); InetSocketAddress addr = getHttpAddress(); if (addr != null) { conf.set(BN_HTTP_ADDRESS_NAME_KEY, NetUtils.getHostPortString(getHttpAddress())); } }
NamespaceInfo getNamespaceInfo() { readLock(); try { return unprotectedGetNamespaceInfo(); } finally { readUnlock(); } }
void format(FSNamesystem fsn, String clusterId) throws IOException { long fileCount = fsn.getTotalFiles(); // Expect 1 file, which is the root inode Preconditions.checkState(fileCount == 1, "FSImage.format should be called with an uninitialized namesystem, has " + fileCount + " files"); NamespaceInfo ns = NNStorage.newNamespaceInfo(); LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID()); ns.clusterID = clusterId; storage.format(ns); editLog.formatNonFileJournals(ns); saveFSImageInAllDirs(fsn, 0); }
@Override public void format(NamespaceInfo nsInfo) { // format() should only get called at startup, before any BNs // can register with the NN. throw new UnsupportedOperationException( "BackupNode journal should never get formatted"); }
/** * Format all configured journals which are not file-based. * * File-based journals are skipped, since they are formatted by the * Storage format code. */ synchronized void formatNonFileJournals(NamespaceInfo nsInfo) throws IOException { Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS, "Bad state: %s", state); for (JournalManager jm : journalSet.getJournalManagers()) { if (!(jm instanceof FileJournalManager)) { jm.format(nsInfo); } } }
/** * Construct a custom journal manager. * The class to construct is taken from the configuration. * @param uri Uri to construct * @return The constructed journal manager * @throws IllegalArgumentException if no class is configured for uri */ private JournalManager createJournal(URI uri) { Class<? extends JournalManager> clazz = getJournalClass(conf, uri.getScheme()); try { Constructor<? extends JournalManager> cons = clazz.getConstructor(Configuration.class, URI.class, NamespaceInfo.class); return cons.newInstance(conf, uri, storage.getNamespaceInfo()); } catch (Exception e) { throw new IllegalArgumentException("Unable to construct journal, " + uri, e); } }
@Override public void format(NamespaceInfo ns) throws IOException { // Formatting file journals is done by the StorageDirectory // format code, since they may share their directory with // checkpoints, etc. throw new UnsupportedOperationException(); }
public NameNodeConnector(String name, URI nameNodeUri, Path idPath, List<Path> targetPaths, Configuration conf, int maxNotChangedIterations) throws IOException { this.nameNodeUri = nameNodeUri; this.idPath = idPath; this.targetPaths = targetPaths == null || targetPaths.isEmpty() ? Arrays .asList(new Path("/")) : targetPaths; this.maxNotChangedIterations = maxNotChangedIterations; this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri, NamenodeProtocol.class).getProxy(); this.client = NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class, fallbackToSimpleAuth).getProxy(); this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf); final NamespaceInfo namespaceinfo = namenode.versionRequest(); this.blockpoolID = namespaceinfo.getBlockPoolID(); final FsServerDefaults defaults = fs.getServerDefaults(new Path("/")); this.keyManager = new KeyManager(blockpoolID, namenode, defaults.getEncryptDataTransfer(), conf); // if it is for test, we do not create the id file out = checkAndMarkRunning(); if (out == null) { // Exit if there is another one running. throw new IOException("Another " + name + " is running."); } }
/** * Initializes the {@link #data}. The initialization is done only once, when * handshake with the the first namenode is completed. */ private void initStorage(final NamespaceInfo nsInfo) throws IOException { final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> factory = FsDatasetSpi.Factory.getFactory(conf); if (!factory.isSimulated()) { final StartupOption startOpt = getStartupOption(conf); if (startOpt == null) { throw new IOException("Startup option not set."); } final String bpid = nsInfo.getBlockPoolID(); //read storage info, lock data dirs and transition fs state if necessary synchronized (this) { storage.recoverTransitionRead(this, nsInfo, dataDirs, startOpt); } final StorageInfo bpStorage = storage.getBPStorage(bpid); LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID() + ";bpid=" + bpid + ";lv=" + storage.getLayoutVersion() + ";nsInfo=" + nsInfo + ";dnuuid=" + storage.getDatanodeUuid()); } // If this is a newly formatted DataNode then assign a new DatanodeUuid. checkDatanodeUuid(); synchronized(this) { if (data == null) { data = factory.newInstance(this, storage, conf); } } }
/** * Format a block pool slice storage. * @param bpSdir the block pool storage * @param nsInfo the name space info * @throws IOException Signals that an I/O exception has occurred. */ private void format(StorageDirectory bpSdir, NamespaceInfo nsInfo) throws IOException { LOG.info("Formatting block pool " + blockpoolID + " directory " + bpSdir.getCurrentDir()); bpSdir.clearDirectory(); // create directory this.layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION; this.cTime = nsInfo.getCTime(); this.namespaceID = nsInfo.getNamespaceID(); this.blockpoolID = nsInfo.getBlockPoolID(); writeProperties(bpSdir); }
void doRollback(StorageDirectory bpSd, NamespaceInfo nsInfo) throws IOException { File prevDir = bpSd.getPreviousDir(); // regular startup if previous dir does not exist if (!prevDir.exists()) return; // read attributes out of the VERSION file of previous directory BlockPoolSliceStorage prevInfo = new BlockPoolSliceStorage(); prevInfo.readPreviousVersionProperties(bpSd); // We allow rollback to a state, which is either consistent with // the namespace state or can be further upgraded to it. // In another word, we can only roll back when ( storedLV >= software LV) // && ( DN.previousCTime <= NN.ctime) if (!(prevInfo.getLayoutVersion() >= HdfsConstants.DATANODE_LAYOUT_VERSION && prevInfo.getCTime() <= nsInfo.getCTime())) { // cannot rollback throw new InconsistentFSStateException(bpSd.getRoot(), "Cannot rollback to a newer state.\nDatanode previous state: LV = " + prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime() + " is newer than the namespace state: LV = " + HdfsConstants.DATANODE_LAYOUT_VERSION + " CTime = " + nsInfo.getCTime()); } LOG.info("Rolling back storage directory " + bpSd.getRoot() + ".\n target LV = " + nsInfo.getLayoutVersion() + "; target CTime = " + nsInfo.getCTime()); File tmpDir = bpSd.getRemovedTmp(); assert !tmpDir.exists() : "removed.tmp directory must not exist."; // 1. rename current to tmp File curDir = bpSd.getCurrentDir(); assert curDir.exists() : "Current directory must exist."; rename(curDir, tmpDir); // 2. rename previous to current rename(prevDir, curDir); // 3. delete removed.tmp dir deleteDir(tmpDir); LOG.info("Rollback of " + bpSd.getRoot() + " is complete"); }
private StorageDirectory loadStorageDirectory(DataNode datanode, NamespaceInfo nsInfo, File dataDir, StartupOption startOpt) throws IOException { StorageDirectory sd = new StorageDirectory(dataDir, null, false); try { StorageState curState = sd.analyzeStorage(startOpt, this); // sd is locked but not opened switch (curState) { case NORMAL: break; case NON_EXISTENT: LOG.info("Storage directory " + dataDir + " does not exist"); throw new IOException("Storage directory " + dataDir + " does not exist"); case NOT_FORMATTED: // format LOG.info("Storage directory " + dataDir + " is not formatted for " + nsInfo.getBlockPoolID()); LOG.info("Formatting ..."); format(sd, nsInfo, datanode.getDatanodeUuid()); break; default: // recovery part is common sd.doRecover(curState); } // 2. Do transitions // Each storage directory is treated individually. // During startup some of them can upgrade or roll back // while others could be up-to-date for the regular startup. doTransition(datanode, sd, nsInfo, startOpt); // 3. Update successfully loaded storage. setServiceLayoutVersion(getServiceLayoutVersion()); writeProperties(sd); return sd; } catch (IOException ioe) { sd.unlock(); throw ioe; } }
@Override public NamespaceInfo versionRequest() throws IOException { try { return PBHelper.convert(rpcProxy.versionRequest(NULL_CONTROLLER, VOID_VERSION_REQUEST).getInfo()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
public static NamespaceInfoProto convert(NamespaceInfo info) { return NamespaceInfoProto.newBuilder() .setBlockPoolID(info.getBlockPoolID()) .setBuildVersion(info.getBuildVersion()) .setUnused(0) .setStorageInfo(PBHelper.convert((StorageInfo)info)) .setSoftwareVersion(info.getSoftwareVersion()) .setCapabilities(info.getCapabilities()) .build(); }
@Override public VersionResponseProto versionRequest(RpcController controller, VersionRequestProto request) throws ServiceException { NamespaceInfo info; try { info = impl.versionRequest(); } catch (IOException e) { throw new ServiceException(e); } return VersionResponseProto.newBuilder() .setInfo(PBHelper.convert(info)).build(); }
public QuorumCall<AsyncLogger,NewEpochResponseProto> newEpoch( NamespaceInfo nsInfo, long epoch) { Map<AsyncLogger, ListenableFuture<NewEpochResponseProto>> calls = Maps.newHashMap(); for (AsyncLogger logger : loggers) { calls.put(logger, logger.newEpoch(epoch)); } return QuorumCall.create(calls); }
QuorumCall<AsyncLogger,Void> format(NamespaceInfo nsInfo) { Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap(); for (AsyncLogger logger : loggers) { ListenableFuture<Void> future = logger.format(nsInfo); calls.put(logger, future); } return QuorumCall.create(calls); }
@Override public ListenableFuture<Void> format(final NamespaceInfo nsInfo) { return singleThreadExecutor.submit(new Callable<Void>() { @Override public Void call() throws Exception { getProxy().format(journalId, nsInfo); return null; } }); }
QuorumJournalManager(Configuration conf, URI uri, NamespaceInfo nsInfo, AsyncLogger.Factory loggerFactory) throws IOException { Preconditions.checkArgument(conf != null, "must be configured"); this.conf = conf; this.uri = uri; this.nsInfo = nsInfo; this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory)); this.connectionFactory = URLConnectionFactory .newDefaultURLConnectionFactory(conf); // Configure timeouts. this.startSegmentTimeoutMs = conf.getInt( DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY, DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT); this.prepareRecoveryTimeoutMs = conf.getInt( DFSConfigKeys.DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_KEY, DFSConfigKeys.DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT); this.acceptRecoveryTimeoutMs = conf.getInt( DFSConfigKeys.DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_KEY, DFSConfigKeys.DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT); this.finalizeSegmentTimeoutMs = conf.getInt( DFSConfigKeys.DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_KEY, DFSConfigKeys.DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_DEFAULT); this.selectInputStreamsTimeoutMs = conf.getInt( DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY, DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT); this.getJournalStateTimeoutMs = conf.getInt( DFSConfigKeys.DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY, DFSConfigKeys.DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT); this.newEpochTimeoutMs = conf.getInt( DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY, DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT); this.writeTxnsTimeoutMs = conf.getInt( DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY, DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT); }
static List<AsyncLogger> createLoggers(Configuration conf, URI uri, NamespaceInfo nsInfo, AsyncLogger.Factory factory) throws IOException { List<AsyncLogger> ret = Lists.newArrayList(); List<InetSocketAddress> addrs = getLoggerAddresses(uri); String jid = parseJournalId(uri); for (InetSocketAddress addr : addrs) { ret.add(factory.createLogger(conf, nsInfo, jid, addr)); } return ret; }
/** * Format the local storage with the given namespace. */ void format(NamespaceInfo nsInfo) throws IOException { Preconditions.checkState(nsInfo.getNamespaceID() != 0, "can't format with uninitialized namespace info: %s", nsInfo); LOG.info("Formatting " + this + " with namespace info: " + nsInfo); storage.format(nsInfo); refreshCachedData(); }
/** * Try to create a new epoch for this journal. * @param nsInfo the namespace, which is verified for consistency or used to * format, if the Journal has not yet been written to. * @param epoch the epoch to start * @return the status information necessary to begin recovery * @throws IOException if the node has already made a promise to another * writer with a higher epoch number, if the namespace is inconsistent, * or if a disk error occurs. */ synchronized NewEpochResponseProto newEpoch( NamespaceInfo nsInfo, long epoch) throws IOException { checkFormatted(); storage.checkConsistentNamespace(nsInfo); // Check that the new epoch being proposed is in fact newer than // any other that we've promised. if (epoch <= getLastPromisedEpoch()) { throw new IOException("Proposed epoch " + epoch + " <= last promise " + getLastPromisedEpoch()); } updateLastPromisedEpoch(epoch); abortCurSegment(); NewEpochResponseProto.Builder builder = NewEpochResponseProto.newBuilder(); EditLogFile latestFile = scanStorageForLatestEdits(); if (latestFile != null) { builder.setLastSegmentTxId(latestFile.getFirstTxId()); } return builder.build(); }