void preLogRoll(Path newLog) throws IOException { recordLog(newLog); String logName = newLog.getName(); String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName); synchronized (latestPaths) { Iterator<Path> iterator = latestPaths.iterator(); while (iterator.hasNext()) { Path path = iterator.next(); if (path.getName().contains(logPrefix)) { iterator.remove(); break; } } this.latestPaths.add(newLog); } }
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK", justification= "We only release this lock when we set it. Updates to code that uses it should verify use " + "of the guard boolean.") private List<Path> getLogDirs(final Set<ServerName> serverNames) throws IOException { List<Path> logDirs = new ArrayList<Path>(); boolean needReleaseLock = false; if (!this.services.isInitialized()) { // during master initialization, we could have multiple places splitting a same wal this.splitLogLock.lock(); needReleaseLock = true; } try { for (ServerName serverName : serverNames) { Path logDir = new Path(this.rootdir, DefaultWALProvider.getWALDirectoryName(serverName.toString())); Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT); // Rename the directory so a rogue RS doesn't create more WALs if (fs.exists(logDir)) { if (!this.fs.rename(logDir, splitDir)) { throw new IOException("Failed fs.rename for log split: " + logDir); } logDir = splitDir; LOG.debug("Renamed region directory: " + splitDir); } else if (!fs.exists(splitDir)) { LOG.info("Log dir for server " + serverName + " does not exist"); continue; } logDirs.add(splitDir); } } finally { if (needReleaseLock) { this.splitLogLock.unlock(); } } return logDirs; }
/** * The caller will block until all the log files of the given region server have been processed - * successfully split or an error is encountered - by an available worker region server. This * method must only be called after the region servers have been brought online. * @param logDirs List of log dirs to split * @throws IOException If there was an error while splitting any log file * @return cumulative size of the logfiles split */ public long splitLogDistributed(final List<Path> logDirs) throws IOException { if (logDirs.isEmpty()) { return 0; } Set<ServerName> serverNames = new HashSet<ServerName>(); for (Path logDir : logDirs) { try { ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(logDir); if (serverName != null) { serverNames.add(serverName); } } catch (IllegalArgumentException e) { // ignore invalid format error. LOG.warn("Cannot parse server name from " + logDir); } } return splitLogDistributed(serverNames, logDirs, null); }
@Test (timeout=300000) public void testWALRollWriting() throws Exception { setUpforLogRolling(); String className = this.getClass().getName(); StringBuilder v = new StringBuilder(className); while (v.length() < 1000) { v.append(className); } byte[] value = Bytes.toBytes(v.toString()); HRegionServer regionServer = startAndWriteData(TableName.valueOf("TestLogRolling"), value); LOG.info("after writing there are " + DefaultWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)) + " log files"); // flush all regions for (Region r : regionServer.getOnlineRegionsLocalContext()) { r.flush(true); } admin.rollWALWriter(regionServer.getServerName()); int count = DefaultWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)); LOG.info("after flushing all regions and rolling logs there are " + count + " log files"); assertTrue(("actual count: " + count), count <= 2); }
@Before public void setUp() throws Exception { this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); // this.cluster = TEST_UTIL.getDFSCluster(); this.fs = TEST_UTIL.getDFSCluster().getFileSystem(); this.hbaseRootDir = FSUtils.getRootDir(conf); this.dir = new Path(this.hbaseRootDir, TestWALObserver.class.getName()); this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME); this.logDir = new Path(this.hbaseRootDir, DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName())); this.logName = HConstants.HREGION_LOGDIR_NAME; if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); } this.wals = new WALFactory(conf, null, currentTest.getMethodName()); }
/** * Tests that logs are deleted * @throws IOException * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException */ @Test public void testLogRolling() throws Exception { this.tableName = getName(); // TODO: Why does this write data take for ever? startAndWriteData(); final WAL log = server.getWAL(null); LOG.info("after writing there are " + DefaultWALProvider.getNumRolledLogFiles(log) + " log files"); // flush all regions for (Region r: server.getOnlineRegionsLocalContext()) { r.flush(true); } // Now roll the log log.rollWriter(); int count = DefaultWALProvider.getNumRolledLogFiles(log); LOG.info("after flushing all regions and rolling logs there are " + count + " log files"); assertTrue(("actual count: " + count), count <= 2); }
@Test (timeout=300000) public void testWALRollWriting() throws Exception { setUpforLogRolling(); String className = this.getClass().getName(); StringBuilder v = new StringBuilder(className); while (v.length() < 1000) { v.append(className); } byte[] value = Bytes.toBytes(v.toString()); HRegionServer regionServer = startAndWriteData(TableName.valueOf("TestLogRolling"), value); LOG.info("after writing there are " + DefaultWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)) + " log files"); // flush all regions List<HRegion> regions = new ArrayList<HRegion>(regionServer .getOnlineRegionsLocalContext()); for (HRegion r : regions) { r.flushcache(); } admin.rollWALWriter(regionServer.getServerName()); int count = DefaultWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)); LOG.info("after flushing all regions and rolling logs there are " + count + " log files"); assertTrue(("actual count: " + count), count <= 2); }
/** * Tests that logs are deleted * @throws IOException * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException */ @Test public void testLogRolling() throws Exception { this.tableName = getName(); // TODO: Why does this write data take for ever? startAndWriteData(); final WAL log = server.getWAL(null); LOG.info("after writing there are " + DefaultWALProvider.getNumRolledLogFiles(log) + " log files"); // flush all regions List<HRegion> regions = new ArrayList<HRegion>(server.getOnlineRegionsLocalContext()); for (HRegion r: regions) { r.flushcache(); } // Now roll the log log.rollWriter(); int count = DefaultWALProvider.getNumRolledLogFiles(log); LOG.info("after flushing all regions and rolling logs there are " + count + " log files"); assertTrue(("actual count: " + count), count <= 2); }
/** * * @param serverName the 'unique-over-restarts' name, i.e. hostname with start code suffix * @param hlogName name of HLog */ private long getLogFileSize(String serverName, String hlogName) throws IOException { Path hbaseLogDir = new Path(hbaseRootDir, DefaultWALProvider.getWALDirectoryName(serverName)); Path path = new Path(hbaseLogDir, hlogName); try { FileStatus status = fileSystem.getFileStatus(path); return status.getLen(); } catch (FileNotFoundException e) { Path oldLogPath = new Path(hbaseOldLogDir, hlogName); try { return fileSystem.getFileStatus(oldLogPath).getLen(); } catch (FileNotFoundException e2) { // TODO there is still another place to look for log files, cfr dead region servers, see openReader in replicationsource System.err.println("HLog not found at : " + path + " or " + oldLogPath); return -1; } } }
/** * Add sources for the given peer cluster on this region server. For the newly added peer, we only * need to enqueue the latest log of each wal group and do replication * @param id the id of the peer cluster * @return the source that was created * @throws IOException */ protected ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id); ReplicationPeer peer = replicationPeers.getPeer(id); ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, this.replicationQueues, this.replicationPeers, server, id, this.clusterId, peerConfig, peer); synchronized (this.walsById) { this.sources.add(src); Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>(); this.walsById.put(id, walsByGroup); // Add the latest wal to that source's queue synchronized (latestPaths) { if (this.latestPaths.size() > 0) { for (Path logPath : latestPaths) { String name = logPath.getName(); String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(name); SortedSet<String> logs = new TreeSet<String>(); logs.add(name); walsByGroup.put(walPrefix, logs); try { this.replicationQueues.addLog(id, name); } catch (ReplicationException e) { String message = "Cannot add log to queue when creating a new source, queueId=" + id + ", filename=" + name; server.stop(message); throw e; } src.enqueueLog(logPath); } } } } src.startup(); return src; }
@Override public void enqueueLog(Path log) { String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(log.getName()); PriorityBlockingQueue<Path> queue = queues.get(logPrefix); if (queue == null) { queue = new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator()); queues.put(logPrefix, queue); if (this.sourceRunning) { // new wal group observed after source startup, start a new worker thread to track it // notice: it's possible that log enqueued when this.running is set but worker thread // still not launched, so it's necessary to check workerThreads before start the worker final ReplicationSourceWorkerThread worker = new ReplicationSourceWorkerThread(logPrefix, queue, replicationQueueInfo, this); ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(logPrefix, worker); if (extant != null) { LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix); } else { LOG.debug("Starting up worker for wal group " + logPrefix); worker.startup(); } } } queue.put(log); int queueSize = logQueueSize.incrementAndGet(); this.metrics.setSizeOfLogQueue(queueSize); // This will log a warning for each new log that gets created above the warn threshold if (queue.size() > this.logQueueWarnThreshold) { LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold); } }
public void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src) throws IOException { ServerName sn = DefaultWALProvider.getServerNameFromWALDirectoryName(conf, src); if (sn == null) { // It's not an WAL return; } // Ok, so it's an WAL String hostName = sn.getHostname(); if (LOG.isTraceEnabled()) { LOG.trace(src + " is an WAL file, so reordering blocks, last hostname will be:" + hostName); } // Just check for all blocks for (LocatedBlock lb : lbs.getLocatedBlocks()) { DatanodeInfo[] dnis = lb.getLocations(); if (dnis != null && dnis.length > 1) { boolean found = false; for (int i = 0; i < dnis.length - 1 && !found; i++) { if (hostName.equals(dnis[i].getHostName())) { // advance the other locations by one and put this one at the last place. DatanodeInfo toLast = dnis[i]; System.arraycopy(dnis, i + 1, dnis, i, dnis.length - i - 1); dnis[dnis.length - 1] = toLast; found = true; } } } } }
/** * Setup WAL log and replication if enabled. * Replication setup is done in here because it wants to be hooked up to WAL. * * @return A WAL instance. * @throws IOException */ private WALFactory setupWALAndReplication() throws IOException { // TODO Replication make assumptions here based on the default filesystem impl final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString()); Path logdir = new Path(rootDir, logName); if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir); if (this.fs.exists(logdir)) { throw new RegionServerRunningException( "Region server has already " + "created directory at " + this.serverName.toString()); } // Instantiate replication manager if replication enabled. Pass it the // log directories. createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir); // listeners the wal factory will add to wals it creates. final List<WALActionsListener> listeners = new ArrayList<WALActionsListener>(); listeners.add(new MetricsWAL()); if (this.replicationSourceHandler != null && this.replicationSourceHandler.getWALActionsListener() != null) { // Replication handler is an implementation of WALActionsListener. listeners.add(this.replicationSourceHandler.getWALActionsListener()); } return new WALFactory(conf, listeners, serverName.toString()); }
@Before public void setUp() throws Exception { this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); this.fs = TEST_UTIL.getDFSCluster().getFileSystem(); this.hbaseRootDir = FSUtils.getRootDir(this.conf); this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME); this.logName = DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName() + "-manual"); this.logDir = new Path(this.hbaseRootDir, logName); if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); } this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); this.wals = new WALFactory(conf, null, currentTest.getMethodName()); }
private void verifyWALCount(WALFactory wals, WAL log, int expected) throws Exception { Path walPath = DefaultWALProvider.getCurrentFileName(log); WAL.Reader reader = wals.createReader(FS, walPath); int count = 0; WAL.Entry entry = new WAL.Entry(); while (reader.next(entry) != null) count++; reader.close(); assertEquals(expected, count); }
/** * Tests the case where a RegionServer enters a GC pause, * comes back online after the master declared it dead and started to split. * Want log rolling after a master split to fail. See HBASE-2312. */ @Test (timeout=300000) public void testLogRollAfterSplitStart() throws IOException { LOG.info("Verify wal roll after split starts will fail."); String logName = "testLogRollAfterSplitStart"; Path thisTestsDir = new Path(HBASEDIR, DefaultWALProvider.getWALDirectoryName(logName)); final WALFactory wals = new WALFactory(conf, null, logName); try { // put some entries in an WAL TableName tableName = TableName.valueOf(this.getClass().getName()); HRegionInfo regioninfo = new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); final WAL log = wals.getWAL(regioninfo.getEncodedNameAsBytes()); MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); final int total = 20; for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor("column")); log.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc), kvs, true); } // Send the data to HDFS datanodes and close the HDFS writer log.sync(); ((FSHLog) log).replaceWriter(((FSHLog)log).getOldPath(), null, null, null); /* code taken from MasterFileSystem.getLogDirs(), which is called from MasterFileSystem.splitLog() * handles RS shutdowns (as observed by the splitting process) */ // rename the directory so a rogue RS doesn't create more WALs Path rsSplitDir = thisTestsDir.suffix(DefaultWALProvider.SPLITTING_EXT); if (!fs.rename(thisTestsDir, rsSplitDir)) { throw new IOException("Failed fs.rename for log split: " + thisTestsDir); } LOG.debug("Renamed region directory: " + rsSplitDir); LOG.debug("Processing the old log files."); WALSplitter.split(HBASEDIR, rsSplitDir, OLDLOGDIR, fs, conf, wals); LOG.debug("Trying to roll the WAL."); try { log.rollWriter(); Assert.fail("rollWriter() did not throw any exception."); } catch (IOException ioe) { if (ioe.getCause() instanceof FileNotFoundException) { LOG.info("Got the expected exception: ", ioe.getCause()); } else { Assert.fail("Unexpected exception: " + ioe); } } } finally { wals.close(); if (fs.exists(thisTestsDir)) { fs.delete(thisTestsDir, true); } } }
@Override public void setUp() throws Exception { // setup config values necessary for store this.conf = TEST_UTIL.getConfiguration(); this.conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 0); this.conf.setInt("hbase.hstore.compaction.min", minFiles); this.conf.setInt("hbase.hstore.compaction.max", maxFiles); this.conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, minSize); this.conf.setLong("hbase.hstore.compaction.max.size", maxSize); this.conf.setFloat("hbase.hstore.compaction.ratio", 1.0F); //Setting up a Store final String id = TestDefaultCompactSelection.class.getName(); Path basedir = new Path(DIR); final Path logdir = new Path(basedir, DefaultWALProvider.getWALDirectoryName(id)); HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes("family")); FileSystem fs = FileSystem.get(conf); fs.delete(logdir, true); HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(Bytes.toBytes("table"))); htd.addFamily(hcd); HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, basedir); wals = new WALFactory(walConf, null, id); region = HRegion.createHRegion(info, basedir, conf, htd); HRegion.closeHRegion(region); Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes()), fs, conf, info, htd, null); store = new HStore(region, hcd, conf); TEST_FILE = region.getRegionFileSystem().createTempName(); fs.createNewFile(TEST_FILE); }
@SuppressWarnings("deprecation") private Store init(String methodName, Configuration conf, HTableDescriptor htd, HColumnDescriptor hcd) throws IOException { //Setting up a Store Path basedir = new Path(DIR+methodName); Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); final Path logdir = new Path(basedir, DefaultWALProvider.getWALDirectoryName(methodName)); FileSystem fs = FileSystem.get(conf); fs.delete(logdir, true); if (htd.hasFamily(hcd.getName())) { htd.modifyFamily(hcd); } else { htd.addFamily(hcd); } HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, basedir); final WALFactory wals = new WALFactory(walConf, null, methodName); HRegion region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes()), fs, conf, info, htd, null); store = new HStore(region, hcd, conf); return store; }
@Before public void setUp() throws IOException { // parameterized tests add [#] suffix get rid of [ and ]. table = Bytes.toBytes(name.getMethodName().replaceAll("[\\[\\]]", "_")); conf = TEST_UTIL.getConfiguration(); conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION); conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, false); conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, false); conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, false); fs = HFileSystem.get(conf); // Create the schema HColumnDescriptor hcd = new HColumnDescriptor(family); hcd.setBloomFilterType(BloomType.ROWCOL); cowType.modifyFamilySchema(hcd); HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); htd.addFamily(hcd); // Create a store based on the schema final String id = TestCacheOnWriteInSchema.class.getName(); final Path logdir = new Path(FSUtils.getRootDir(conf), DefaultWALProvider.getWALDirectoryName(id)); fs.delete(logdir, true); HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); walFactory = new WALFactory(conf, null, id); region = TEST_UTIL.createLocalHRegion(info, htd, walFactory.getWAL(info.getEncodedNameAsBytes())); store = new HStore(region, hcd, conf); }
/** * Setup WAL log and replication if enabled. * Replication setup is done in here because it wants to be hooked up to WAL. * * @return A WAL instance. * @throws IOException */ private WALFactory setupWALAndReplication() throws IOException { // TODO Replication make assumptions here based on the default filesystem impl final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString()); Path logdir = new Path(rootDir, logName); if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir); if (this.fs.exists(logdir)) { throw new RegionServerRunningException("Region server has already " + "created directory at " + this.serverName.toString()); } // Instantiate replication manager if replication enabled. Pass it the // log directories. createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir); // listeners the wal factory will add to wals it creates. final List<WALActionsListener> listeners = new ArrayList<WALActionsListener>(); listeners.add(new MetricsWAL()); if (this.replicationSourceHandler != null && this.replicationSourceHandler.getWALActionsListener() != null) { // Replication handler is an implementation of WALActionsListener. listeners.add(this.replicationSourceHandler.getWALActionsListener()); } return new WALFactory(conf, listeners, serverName.toString()); }
@Override protected boolean validate(Path file) { return DefaultWALProvider.validateWALFilename(file.getName()); }
@Override public boolean accept(Path p) { return DefaultWALProvider.isMetaFile(p); }
@Override public boolean accept(Path p) { return !DefaultWALProvider.isMetaFile(p); }
/** * Inspect the log directory to find dead servers which need recovery work * @return A set of ServerNames which aren't running but still have WAL files left in file system */ Set<ServerName> getFailedServersFromLogFolders() { boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors", WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT); Set<ServerName> serverNames = new HashSet<ServerName>(); Path logsDirPath = new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME); do { if (master.isStopped()) { LOG.warn("Master stopped while trying to get failed servers."); break; } try { if (!this.fs.exists(logsDirPath)) return serverNames; FileStatus[] logFolders = FSUtils.listStatus(this.fs, logsDirPath, null); // Get online servers after getting log folders to avoid log folder deletion of newly // checked in region servers . see HBASE-5916 Set<ServerName> onlineServers = ((HMaster) master).getServerManager().getOnlineServers() .keySet(); if (logFolders == null || logFolders.length == 0) { LOG.debug("No log files to split, proceeding..."); return serverNames; } for (FileStatus status : logFolders) { FileStatus[] curLogFiles = FSUtils.listStatus(this.fs, status.getPath(), null); if (curLogFiles == null || curLogFiles.length == 0) { // Empty log folder. No recovery needed continue; } final ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName( status.getPath()); if (null == serverName) { LOG.warn("Log folder " + status.getPath() + " doesn't look like its name includes a " + "region server name; leaving in place. If you see later errors about missing " + "write ahead logs they may be saved in this location."); } else if (!onlineServers.contains(serverName)) { LOG.info("Log folder " + status.getPath() + " doesn't belong " + "to a known region server, splitting"); serverNames.add(serverName); } else { LOG.info("Log folder " + status.getPath() + " belongs to an existing region server"); } } retrySplitting = false; } catch (IOException ioe) { LOG.warn("Failed getting failed servers to be recovered.", ioe); if (!checkFileSystem()) { LOG.warn("Bad Filesystem, exiting"); Runtime.getRuntime().halt(1); } try { if (retrySplitting) { Thread.sleep(conf.getInt("hbase.hlog.split.failure.retry.interval", 30 * 1000)); } } catch (InterruptedException e) { LOG.warn("Interrupted, aborting since cannot return w/o splitting"); Thread.currentThread().interrupt(); retrySplitting = false; Runtime.getRuntime().halt(1); } } } while (retrySplitting); return serverNames; }
/** * Test that the reorder algo works as we expect. */ @Test public void testBlockLocation() throws Exception { // We need to start HBase to get HConstants.HBASE_DIR set in conf htu.startMiniZKCluster(); MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 1); conf = hbm.getConfiguration(); // The "/" is mandatory, without it we've got a null pointer exception on the namenode final String fileName = "/helloWorld"; Path p = new Path(fileName); final int repCount = 3; Assert.assertTrue((short) cluster.getDataNodes().size() >= repCount); // Let's write the file FSDataOutputStream fop = dfs.create(p, (short) repCount); final double toWrite = 875.5613; fop.writeDouble(toWrite); fop.close(); for (int i=0; i<10; i++){ // The interceptor is not set in this test, so we get the raw list at this point LocatedBlocks l; final long max = System.currentTimeMillis() + 10000; do { l = getNamenode(dfs.getClient()).getBlockLocations(fileName, 0, 1); Assert.assertNotNull(l.getLocatedBlocks()); Assert.assertEquals(l.getLocatedBlocks().size(), 1); Assert.assertTrue("Expecting " + repCount + " , got " + l.get(0).getLocations().length, System.currentTimeMillis() < max); } while (l.get(0).getLocations().length != repCount); // Should be filtered, the name is different => The order won't change Object originalList[] = l.getLocatedBlocks().toArray(); HFileSystem.ReorderWALBlocks lrb = new HFileSystem.ReorderWALBlocks(); lrb.reorderBlocks(conf, l, fileName); Assert.assertArrayEquals(originalList, l.getLocatedBlocks().toArray()); // Should be reordered, as we pretend to be a file name with a compliant stuff Assert.assertNotNull(conf.get(HConstants.HBASE_DIR)); Assert.assertFalse(conf.get(HConstants.HBASE_DIR).isEmpty()); String pseudoLogFile = conf.get(HConstants.HBASE_DIR) + "/" + HConstants.HREGION_LOGDIR_NAME + "/" + host1 + ",6977,6576" + "/mylogfile"; // Check that it will be possible to extract a ServerName from our construction Assert.assertNotNull("log= " + pseudoLogFile, DefaultWALProvider.getServerNameFromWALDirectoryName(dfs.getConf(), pseudoLogFile)); // And check we're doing the right reorder. lrb.reorderBlocks(conf, l, pseudoLogFile); Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName()); // Check again, it should remain the same. lrb.reorderBlocks(conf, l, pseudoLogFile); Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName()); } }
/** * Tests that logs are deleted when some region has a compaction * record in WAL and no other records. See HBASE-8597. */ @Test public void testCompactionRecordDoesntBlockRolling() throws Exception { Table table = null; Table table2 = null; // When the hbase:meta table can be opened, the region servers are running Table t = new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME); try { table = createTestTable(getName()); table2 = createTestTable(getName() + "1"); server = TEST_UTIL.getRSForFirstRegionInTable(table.getName()); final WAL log = server.getWAL(null); Region region = server.getOnlineRegions(table2.getName()).get(0); Store s = region.getStore(HConstants.CATALOG_FAMILY); //have to flush namespace to ensure it doesn't affect wall tests admin.flush(TableName.NAMESPACE_TABLE_NAME); // Put some stuff into table2, to make sure we have some files to compact. for (int i = 1; i <= 2; ++i) { doPut(table2, i); admin.flush(table2.getName()); } doPut(table2, 3); // don't flush yet, or compaction might trigger before we roll WAL assertEquals("Should have no WAL after initial writes", 0, DefaultWALProvider.getNumRolledLogFiles(log)); assertEquals(2, s.getStorefilesCount()); // Roll the log and compact table2, to have compaction record in the 2nd WAL. log.rollWriter(); assertEquals("Should have WAL; one table is not flushed", 1, DefaultWALProvider.getNumRolledLogFiles(log)); admin.flush(table2.getName()); region.compact(false); // Wait for compaction in case if flush triggered it before us. Assert.assertNotNull(s); for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) { Threads.sleepWithoutInterrupt(200); } assertEquals("Compaction didn't happen", 1, s.getStorefilesCount()); // Write some value to the table so the WAL cannot be deleted until table is flushed. doPut(table, 0); // Now 2nd WAL will have compaction record for table2 and put for table. log.rollWriter(); // 1st WAL deleted, 2nd not deleted yet. assertEquals("Should have WAL; one table is not flushed", 1, DefaultWALProvider.getNumRolledLogFiles(log)); // Flush table to make latest WAL obsolete; write another record, and roll again. admin.flush(table.getName()); doPut(table, 1); log.rollWriter(); // Now 2nd WAL is deleted and 3rd is added. assertEquals("Should have 1 WALs at the end", 1, DefaultWALProvider.getNumRolledLogFiles(log)); } finally { if (t != null) t.close(); if (table != null) table.close(); if (table2 != null) table2.close(); } }
WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException { return wals.createReader(TEST_UTIL.getTestFileSystem(), DefaultWALProvider.getCurrentFileName(walPrimary), TEST_UTIL.getConfiguration()); }