/** * Performs log splitting for all regionserver directories. * @throws Exception */ private void doOfflineLogSplitting() throws Exception { LOG.info("Starting Log splitting"); final Path rootDir = FSUtils.getRootDir(getConf()); final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); // since this is the singleton, we needn't close it. final WALFactory factory = WALFactory.getInstance(getConf()); FileSystem fs = FSUtils.getCurrentFileSystem(getConf()); Path logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); FileStatus[] regionServerLogDirs = FSUtils.listStatus(fs, logDir); if (regionServerLogDirs == null || regionServerLogDirs.length == 0) { LOG.info("No log directories to split, returning"); return; } try { for (FileStatus regionServerLogDir : regionServerLogDirs) { // split its log dir, if exists WALSplitter.split(rootDir, regionServerLogDir.getPath(), oldLogDir, fs, getConf(), factory); } LOG.info("Successfully completed Log splitting"); } catch (Exception e) { LOG.error("Got exception while doing Log splitting ", e); throw e; } }
public ZKSplitLogManagerCoordination(final CoordinatedStateManager manager, ZooKeeperWatcher watcher) { super(watcher); taskFinisher = new TaskFinisher() { @Override public Status finish(ServerName workerName, String logfile) { try { WALSplitter.finishSplitLogFile(logfile, manager.getServer().getConfiguration()); } catch (IOException e) { LOG.warn("Could not finish splitting of log file " + logfile, e); return Status.ERR; } return Status.DONE; } }; this.server = manager.getServer(); this.conf = server.getConfiguration(); }
private void writeRegionCloseMarker(WAL wal) throws IOException { Map<byte[], List<Path>> storeFiles = getStoreFiles(); RegionEventDescriptor regionEventDesc = ProtobufUtil .toRegionEventDescriptor(RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(), getRegionServerServices().getServerName(), storeFiles); WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc, mvcc); // Store SeqId in HDFS when a region closes // checking region folder exists is due to many tests which delete the table // folder while a // table is still online if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) { WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(), mvcc.getReadPoint(), 0); } }
private void writeRegionCloseMarker(WAL wal) throws IOException { Map<byte[], List<Path>> storeFiles = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR); for (Map.Entry<byte[], Store> entry : getStores().entrySet()) { Store store = entry.getValue(); ArrayList<Path> storeFileNames = new ArrayList<Path>(); for (StoreFile storeFile : store.getStorefiles()) { storeFileNames.add(storeFile.getPath()); } storeFiles.put(entry.getKey(), storeFileNames); } RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor( RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), getSequenceId().get(), getRegionServerServices().getServerName(), storeFiles); WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc, getSequenceId()); // Store SeqId in HDFS when a region closes // checking region folder exists is due to many tests which delete the table folder while a // table is still online if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) { WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(), getSequenceId().get(), 0); } }
public ZKSplitLogManagerCoordination(Configuration conf, ZKWatcher watcher) { super(watcher); this.conf = conf; taskFinisher = new TaskFinisher() { @Override public Status finish(ServerName workerName, String logfile) { try { WALSplitter.finishSplitLogFile(logfile, conf); } catch (IOException e) { LOG.warn("Could not finish splitting of log file " + logfile, e); return Status.ERR; } return Status.DONE; } }; }
private static void split(final Configuration conf, final Path p) throws IOException { FileSystem fs = FSUtils.getWALFileSystem(conf); if (!fs.exists(p)) { throw new FileNotFoundException(p.toString()); } if (!fs.getFileStatus(p).isDirectory()) { throw new IOException(p + " is not a directory"); } final Path baseDir = FSUtils.getWALRootDir(conf); Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME); if (conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)) { archiveDir = new Path(archiveDir, p.getName()); } WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf)); }
private void writeRegionCloseMarker(WAL wal) throws IOException { Map<byte[], List<Path>> storeFiles = getStoreFiles(); RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor( RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(), getRegionServerServices().getServerName(), storeFiles); WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc, mvcc); // Store SeqId in HDFS when a region closes // checking region folder exists is due to many tests which delete the table folder while a // table is still online if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) { WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(), mvcc.getReadPoint(), 0); } }
/** * Iterate over recovered.edits of the specified region * * @param fs {@link FileSystem} * @param regionDir {@link Path} to the Region directory * @param visitor callback object to get the recovered.edits files * @throws IOException if an error occurred while scanning the directory */ public static void visitRegionRecoveredEdits(final FileSystem fs, final Path regionDir, final FSVisitor.RecoveredEditsVisitor visitor) throws IOException { NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regionDir); if (files == null || files.size() == 0) return; for (Path source: files) { // check to see if the file is zero length, in which case we can skip it FileStatus stat = fs.getFileStatus(source); if (stat.getLen() <= 0) continue; visitor.recoveredEdits(regionDir.getName(), source.getName()); } }
private static void split(final Configuration conf, final Path p) throws IOException { FileSystem fs = FileSystem.get(conf); if (!fs.exists(p)) { throw new FileNotFoundException(p.toString()); } if (!fs.getFileStatus(p).isDirectory()) { throw new IOException(p + " is not a directory"); } final Path baseDir = FSUtils.getRootDir(conf); final Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME); WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf)); }
private Path runWALSplit(final Configuration c) throws IOException { List<Path> splits = WALSplitter.split( hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals); // Split should generate only 1 file since there's only 1 region assertEquals(1, splits.size()); // Make sure the file exists assertTrue(fs.exists(splits.get(0))); LOG.info("Split file=" + splits.get(0)); return splits.get(0); }
private void createRecoverEdits(final Path tableDir, final Set<String> tableRegions, final Set<String> recoverEdits) throws IOException { for (String region: tableRegions) { Path regionEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(new Path(tableDir, region)); long seqId = System.currentTimeMillis(); for (int i = 0; i < 3; ++i) { String editName = String.format("%019d", seqId + i); recoverEdits.add(editName); FSDataOutputStream stream = fs.create(new Path(regionEditsDir, editName)); stream.write(Bytes.toBytes("test")); stream.close(); } } }
/** * HBASE-12782 ITBLL fails for me if generator does anything but 5M per maptask. * Create a region. Close it. Then copy into place a file to replay, one that is bigger than * configured flush size so we bring on lots of flushes. Then reopen and confirm all edits * made it in. * @throws IOException */ @Test (timeout=60000) public void testReplayWorksThoughLotsOfFlushing() throws IOException { Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); // Set it so we flush every 1M or so. Thats a lot. conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024); // The file of recovered edits has a column family of 'meta'. Also has an encoded regionname // of 4823016d8fca70b25503ee07f4c6d79f which needs to match on replay. final String encodedRegionName = "4823016d8fca70b25503ee07f4c6d79f"; HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(testName.getMethodName())); final String columnFamily = "meta"; byte [][] columnFamilyAsByteArray = new byte [][] {Bytes.toBytes(columnFamily)}; htd.addFamily(new HColumnDescriptor(columnFamily)); HRegionInfo hri = new HRegionInfo(htd.getTableName()) { @Override public synchronized String getEncodedName() { return encodedRegionName; } // Cache the name because lots of lookups. private byte [] encodedRegionNameAsBytes = null; @Override public synchronized byte[] getEncodedNameAsBytes() { if (encodedRegionNameAsBytes == null) { this.encodedRegionNameAsBytes = Bytes.toBytes(getEncodedName()); } return this.encodedRegionNameAsBytes; } }; Path hbaseRootDir = TEST_UTIL.getDataTestDir(); FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); Path tableDir = FSUtils.getTableDir(hbaseRootDir, htd.getTableName()); HRegionFileSystem hrfs = new HRegionFileSystem(TEST_UTIL.getConfiguration(), fs, tableDir, hri); if (fs.exists(hrfs.getRegionDir())) { LOG.info("Region directory already exists. Deleting."); fs.delete(hrfs.getRegionDir(), true); } HRegion region = HRegion.createHRegion(hri, hbaseRootDir, conf, htd, null); assertEquals(encodedRegionName, region.getRegionInfo().getEncodedName()); List<String> storeFiles = region.getStoreFileList(columnFamilyAsByteArray); // There should be no store files. assertTrue(storeFiles.isEmpty()); region.close(); Path regionDir = region.getRegionDir(hbaseRootDir, hri); Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir); // This is a little fragile getting this path to a file of 10M of edits. Path recoveredEditsFile = new Path( System.getProperty("test.build.classes", "target/test-classes"), "0000000000000016310"); // Copy this file under the region's recovered.edits dir so it is replayed on reopen. Path destination = new Path(recoveredEditsDir, recoveredEditsFile.getName()); fs.copyToLocalFile(recoveredEditsFile, destination); assertTrue(fs.exists(destination)); // Now the file 0000000000000016310 is under recovered.edits, reopen the region to replay. region = HRegion.openHRegion(region, null); assertEquals(encodedRegionName, region.getRegionInfo().getEncodedName()); storeFiles = region.getStoreFileList(columnFamilyAsByteArray); // Our 0000000000000016310 is 10MB. Most of the edits are for one region. Lets assume that if // we flush at 1MB, that there are at least 3 flushed files that are there because of the // replay of edits. assertTrue("Files count=" + storeFiles.size(), storeFiles.size() > 10); // Now verify all edits made it into the region. int count = verifyAllEditsMadeItIn(fs, conf, recoveredEditsFile, region); LOG.info("Checked " + count + " edits made it in"); }
private Path runWALSplit(final Configuration c) throws IOException { List<Path> splits = WALSplitter.split( hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals); // Split should generate only 1 file since there's only 1 region assertEquals("splits=" + splits, 1, splits.size()); // Make sure the file exists assertTrue(fs.exists(splits.get(0))); LOG.info("Split file=" + splits.get(0)); return splits.get(0); }
/** * Tests the case where a RegionServer enters a GC pause, * comes back online after the master declared it dead and started to split. * Want log rolling after a master split to fail. See HBASE-2312. */ @Test (timeout=300000) public void testLogRollAfterSplitStart() throws IOException { LOG.info("Verify wal roll after split starts will fail."); String logName = "testLogRollAfterSplitStart"; Path thisTestsDir = new Path(HBASEDIR, DefaultWALProvider.getWALDirectoryName(logName)); final WALFactory wals = new WALFactory(conf, null, logName); try { // put some entries in an WAL TableName tableName = TableName.valueOf(this.getClass().getName()); HRegionInfo regioninfo = new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); final WAL log = wals.getWAL(regioninfo.getEncodedNameAsBytes()); MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); final int total = 20; for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor("column")); log.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc), kvs, true); } // Send the data to HDFS datanodes and close the HDFS writer log.sync(); ((FSHLog) log).replaceWriter(((FSHLog)log).getOldPath(), null, null, null); /* code taken from MasterFileSystem.getLogDirs(), which is called from MasterFileSystem.splitLog() * handles RS shutdowns (as observed by the splitting process) */ // rename the directory so a rogue RS doesn't create more WALs Path rsSplitDir = thisTestsDir.suffix(DefaultWALProvider.SPLITTING_EXT); if (!fs.rename(thisTestsDir, rsSplitDir)) { throw new IOException("Failed fs.rename for log split: " + thisTestsDir); } LOG.debug("Renamed region directory: " + rsSplitDir); LOG.debug("Processing the old log files."); WALSplitter.split(HBASEDIR, rsSplitDir, OLDLOGDIR, fs, conf, wals); LOG.debug("Trying to roll the WAL."); try { log.rollWriter(); Assert.fail("rollWriter() did not throw any exception."); } catch (IOException ioe) { if (ioe.getCause() instanceof FileNotFoundException) { LOG.info("Got the expected exception: ", ioe.getCause()); } else { Assert.fail("Unexpected exception: " + ioe); } } } finally { wals.close(); if (fs.exists(thisTestsDir)) { fs.delete(thisTestsDir, true); } } }
@Test(timeout = 300000) public void testReadWriteSeqIdFiles() throws Exception { LOG.info("testReadWriteSeqIdFiles"); startCluster(2); final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); HTable ht = installTable(zkw, "table", "family", 10); FileSystem fs = master.getMasterFileSystem().getFileSystem(); Path tableDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), TableName.valueOf("table")); List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir); long newSeqId = WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 1L, 1000L); WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0) , 1L, 1000L); assertEquals(newSeqId + 2000, WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 3L, 1000L)); Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regionDirs.get(0)); FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { @Override public boolean accept(Path p) { return WALSplitter.isSequenceIdFile(p); } }); // only one seqid file should exist assertEquals(1, files.length); // verify all seqId files aren't treated as recovered.edits files NavigableSet<Path> recoveredEdits = WALSplitter.getSplitEditFilesSorted(fs, regionDirs.get(0)); assertEquals(0, recoveredEdits.size()); ht.close(); }
@Test(timeout = 300000) public void testReadWriteSeqIdFiles() throws Exception { LOG.info("testReadWriteSeqIdFiles"); startCluster(2); final ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); Table ht = installTable(zkw, 10); try { FileSystem fs = master.getMasterFileSystem().getFileSystem(); Path tableDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), TableName.valueOf(name.getMethodName())); List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir); long newSeqId = WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 1L, 1000L); WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 1L, 1000L); assertEquals(newSeqId + 2000, WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 3L, 1000L)); Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regionDirs.get(0)); FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { @Override public boolean accept(Path p) { return WALSplitter.isSequenceIdFile(p); } }); // only one seqid file should exist assertEquals(1, files.length); // verify all seqId files aren't treated as recovered.edits files NavigableSet<Path> recoveredEdits = WALSplitter.getSplitEditFilesSorted(fs, regionDirs.get(0)); assertEquals(0, recoveredEdits.size()); } finally { if (ht != null) ht.close(); if (zkw != null) zkw.close(); } }
/** * testcase for https://issues.apache.org/jira/browse/HBASE-14949. */ private void testNameConflictWhenSplit(boolean largeFirst) throws IOException, StreamLacksCapabilityException { final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL"); final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); final Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName); deleteDir(basedir); final HTableDescriptor htd = createBasic1FamilyHTD(tableName); NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); for (byte[] fam : htd.getFamiliesKeys()) { scopes.put(fam, 0); } HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); HBaseTestingUtility.closeRegionAndWAL(region); final byte[] family = htd.getColumnFamilies()[0].getName(); final byte[] rowName = tableName.getName(); FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1, scopes); FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2, scopes); Path largeFile = new Path(logDir, "wal-1"); Path smallFile = new Path(logDir, "wal-2"); writerWALFile(largeFile, Arrays.asList(entry1, entry2)); writerWALFile(smallFile, Arrays.asList(entry2)); FileStatus first, second; if (largeFirst) { first = fs.getFileStatus(largeFile); second = fs.getFileStatus(smallFile); } else { first = fs.getFileStatus(smallFile); second = fs.getFileStatus(largeFile); } WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals); WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals); WAL wal = createWAL(this.conf, hbaseRootDir, logName); region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal); assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint()); assertEquals(2, region.get(new Get(rowName)).size()); }
/** * Inspect the log directory to find dead servers which need recovery work * @return A set of ServerNames which aren't running but still have WAL files left in file system */ Set<ServerName> getFailedServersFromLogFolders() { boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors", WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT); Set<ServerName> serverNames = new HashSet<ServerName>(); Path logsDirPath = new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME); do { if (master.isStopped()) { LOG.warn("Master stopped while trying to get failed servers."); break; } try { if (!this.fs.exists(logsDirPath)) return serverNames; FileStatus[] logFolders = FSUtils.listStatus(this.fs, logsDirPath, null); // Get online servers after getting log folders to avoid log folder deletion of newly // checked in region servers . see HBASE-5916 Set<ServerName> onlineServers = ((HMaster) master).getServerManager().getOnlineServers() .keySet(); if (logFolders == null || logFolders.length == 0) { LOG.debug("No log files to split, proceeding..."); return serverNames; } for (FileStatus status : logFolders) { FileStatus[] curLogFiles = FSUtils.listStatus(this.fs, status.getPath(), null); if (curLogFiles == null || curLogFiles.length == 0) { // Empty log folder. No recovery needed continue; } final ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName( status.getPath()); if (null == serverName) { LOG.warn("Log folder " + status.getPath() + " doesn't look like its name includes a " + "region server name; leaving in place. If you see later errors about missing " + "write ahead logs they may be saved in this location."); } else if (!onlineServers.contains(serverName)) { LOG.info("Log folder " + status.getPath() + " doesn't belong " + "to a known region server, splitting"); serverNames.add(serverName); } else { LOG.info("Log folder " + status.getPath() + " belongs to an existing region server"); } } retrySplitting = false; } catch (IOException ioe) { LOG.warn("Failed getting failed servers to be recovered.", ioe); if (!checkFileSystem()) { LOG.warn("Bad Filesystem, exiting"); Runtime.getRuntime().halt(1); } try { if (retrySplitting) { Thread.sleep(conf.getInt("hbase.hlog.split.failure.retry.interval", 30 * 1000)); } } catch (InterruptedException e) { LOG.warn("Interrupted, aborting since cannot return w/o splitting"); Thread.currentThread().interrupt(); retrySplitting = false; Runtime.getRuntime().halt(1); } } } while (retrySplitting); return serverNames; }
@Override public synchronized Void call() throws IOException { try { // level 2: <HBASE_DIR>/<table>/* FileStatus[] regionDirs = fs.listStatus(tableDir.getPath()); for (FileStatus regionDir : regionDirs) { errors.progress(); String encodedName = regionDir.getPath().getName(); // ignore directories that aren't hexadecimal if (!encodedName.toLowerCase().matches("[0-9a-f]+")) { continue; } LOG.debug("Loading region info from hdfs:"+ regionDir.getPath()); HbckInfo hbi = hbck.getOrCreateInfo(encodedName); HdfsEntry he = new HdfsEntry(); synchronized (hbi) { if (hbi.getHdfsRegionDir() != null) { errors.print("Directory " + encodedName + " duplicate??" + hbi.getHdfsRegionDir()); } he.hdfsRegionDir = regionDir.getPath(); he.hdfsRegionDirModTime = regionDir.getModificationTime(); Path regioninfoFile = new Path(he.hdfsRegionDir, HRegionFileSystem.REGION_INFO_FILE); he.hdfsRegioninfoFilePresent = fs.exists(regioninfoFile); // we add to orphan list when we attempt to read .regioninfo // Set a flag if this region contains only edits // This is special case if a region is left after split he.hdfsOnlyEdits = true; FileStatus[] subDirs = fs.listStatus(regionDir.getPath()); Path ePath = WALSplitter.getRegionDirRecoveredEditsDir(regionDir.getPath()); for (FileStatus subDir : subDirs) { errors.progress(); String sdName = subDir.getPath().getName(); if (!sdName.startsWith(".") && !sdName.equals(ePath.getName())) { he.hdfsOnlyEdits = false; break; } } hbi.hdfsEntry = he; } } } catch (IOException e) { // unable to connect to the region server. errors.reportError(ERROR_CODE.RS_CONNECT_FAILURE, "Table Directory: " + tableDir.getPath().getName() + " Unable to fetch region information. " + e); throw e; } return null; }
/** * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. * @param region * @param mutations * @param replaySeqId * @return an array of OperationStatus which internally contains the OperationStatusCode and the * exceptionMessage if any * @throws IOException */ private OperationStatus [] doReplayBatchOp(final Region region, final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException { long before = EnvironmentEdgeManager.currentTime(); boolean batchContainsPuts = false, batchContainsDelete = false; try { for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) { WALSplitter.MutationReplay m = it.next(); if (m.type == MutationType.PUT) { batchContainsPuts = true; } else { batchContainsDelete = true; } NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); List<Cell> metaCells = map.get(WALEdit.METAFAMILY); if (metaCells != null && !metaCells.isEmpty()) { for (Cell metaCell : metaCells) { CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); HRegion hRegion = (HRegion)region; if (compactionDesc != null) { // replay the compaction. Remove the files from stores only if we are the primary // region replica (thus own the files) hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, replaySeqId); continue; } FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); if (flushDesc != null && !isDefaultReplica) { hRegion.replayWALFlushMarker(flushDesc, replaySeqId); continue; } RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); if (regionEvent != null && !isDefaultReplica) { hRegion.replayWALRegionEventMarker(regionEvent); continue; } BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); if (bulkLoadEvent != null) { hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); continue; } } it.remove(); } } requestCount.add(mutations.size()); if (!region.getRegionInfo().isMetaTable()) { regionServer.cacheFlusher.reclaimMemStoreMemory(); } return region.batchReplay(mutations.toArray( new WALSplitter.MutationReplay[mutations.size()]), replaySeqId); } finally { if (regionServer.metricsRegionServer != null) { long after = EnvironmentEdgeManager.currentTime(); if (batchContainsPuts) { regionServer.metricsRegionServer.updatePut(after - before); } if (batchContainsDelete) { regionServer.metricsRegionServer.updateDelete(after - before); } } } }
@Override public synchronized Void call() throws IOException { try { // level 2: <HBASE_DIR>/<table>/* FileStatus[] regionDirs = fs.listStatus(tableDir.getPath()); for (FileStatus regionDir : regionDirs) { String encodedName = regionDir.getPath().getName(); // ignore directories that aren't hexadecimal if (!encodedName.toLowerCase().matches("[0-9a-f]+")) { continue; } LOG.debug("Loading region info from hdfs:"+ regionDir.getPath()); HbckInfo hbi = hbck.getOrCreateInfo(encodedName); HdfsEntry he = new HdfsEntry(); synchronized (hbi) { if (hbi.getHdfsRegionDir() != null) { errors.print("Directory " + encodedName + " duplicate??" + hbi.getHdfsRegionDir()); } he.hdfsRegionDir = regionDir.getPath(); he.hdfsRegionDirModTime = regionDir.getModificationTime(); Path regioninfoFile = new Path(he.hdfsRegionDir, HRegionFileSystem.REGION_INFO_FILE); he.hdfsRegioninfoFilePresent = fs.exists(regioninfoFile); // we add to orphan list when we attempt to read .regioninfo // Set a flag if this region contains only edits // This is special case if a region is left after split he.hdfsOnlyEdits = true; FileStatus[] subDirs = fs.listStatus(regionDir.getPath()); Path ePath = WALSplitter.getRegionDirRecoveredEditsDir(regionDir.getPath()); for (FileStatus subDir : subDirs) { String sdName = subDir.getPath().getName(); if (!sdName.startsWith(".") && !sdName.equals(ePath.getName())) { he.hdfsOnlyEdits = false; break; } } hbi.hdfsEntry = he; } } } catch (IOException e) { // unable to connect to the region server. errors.reportError(ERROR_CODE.RS_CONNECT_FAILURE, "Table Directory: " + tableDir.getPath().getName() + " Unable to fetch region information. " + e); throw e; } return null; }
private long initializeRegionInternals(final CancelableProgressable reporter, final MonitoredTask status) throws IOException, UnsupportedEncodingException { if (coprocessorHost != null) { status.setStatus("Running coprocessor pre-open hook"); coprocessorHost.preOpen(); } // Write HRI to a file in case we need to recover hbase:meta status.setStatus("Writing region info on filesystem"); fs.checkRegionInfoOnFilesystem(); // Initialize all the HStores status.setStatus("Initializing all the Stores"); long maxSeqId = initializeRegionStores(reporter, status); this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this)); this.writestate.flushRequested = false; this.writestate.compacting = 0; if (this.writestate.writesEnabled) { // Remove temporary data left over from old regions status.setStatus("Cleaning up temporary data from old regions"); fs.cleanupTempDir(); } if (this.writestate.writesEnabled) { status.setStatus("Cleaning up detritus from prior splits"); // Get rid of any splits or merges that were lost in-progress. Clean out // these directories here on open. We may be opening a region that was // being split but we crashed in the middle of it all. fs.cleanupAnySplitDetritus(); fs.cleanupMergesDir(); } // Initialize split policy this.splitPolicy = RegionSplitPolicy.create(this, conf); this.lastFlushTime = EnvironmentEdgeManager.currentTime(); // Use maximum of wal sequenceid or that which was found in stores // (particularly if no recovered edits, seqid will be -1). long nextSeqid = maxSeqId; // In distributedLogReplay mode, we don't know the last change sequence number because region // is opened before recovery completes. So we add a safety bumper to avoid new sequence number // overlaps used sequence numbers if (this.writestate.writesEnabled) { nextSeqid = WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs .getRegionDir(), nextSeqid, (this.isRecovering ? (this.flushPerChanges + 10000000) : 1)); } else { nextSeqid++; } LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() + "; next sequenceid=" + nextSeqid); // A region can be reopened if failed a split; reset flags this.closing.set(false); this.closed.set(false); if (coprocessorHost != null) { status.setStatus("Running coprocessor post-open hooks"); coprocessorHost.postOpen(); } status.markComplete("Region opened successfully"); return nextSeqid; }
/** * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. * * @param region * @param mutations * @param replaySeqId * @return an array of OperationStatus which internally contains the OperationStatusCode and the * exceptionMessage if any * @throws IOException */ private OperationStatus[] doReplayBatchOp(final HRegion region, final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException { long before = EnvironmentEdgeManager.currentTime(); boolean batchContainsPuts = false, batchContainsDelete = false; try { for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext(); ) { WALSplitter.MutationReplay m = it.next(); if (m.type == MutationType.PUT) { batchContainsPuts = true; } else { batchContainsDelete = true; } NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); List<Cell> metaCells = map.get(WALEdit.METAFAMILY); if (metaCells != null && !metaCells.isEmpty()) { for (Cell metaCell : metaCells) { CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); if (compactionDesc != null) { region.completeCompactionMarker(compactionDesc); } } it.remove(); } } requestCount.add(mutations.size()); if (!region.getRegionInfo().isMetaTable()) { regionServer.cacheFlusher.reclaimMemStoreMemory(); } return region.batchReplay(mutations.toArray( new WALSplitter.MutationReplay[mutations.size()]), replaySeqId); } finally { if (regionServer.metricsRegionServer != null) { long after = EnvironmentEdgeManager.currentTime(); if (batchContainsPuts) { regionServer.metricsRegionServer.updatePut(after - before); } if (batchContainsDelete) { regionServer.metricsRegionServer.updateDelete(after - before); } } } }
/** * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is * that the given mutations will be durable on the receiving RS if this method returns without any * exception. * * @param controller the RPC controller * @param request the request * @throws ServiceException */ @Override @QosPriority(priority = HConstants.REPLAY_QOS) public ReplicateWALEntryResponse replay(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { long before = EnvironmentEdgeManager.currentTime(); CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner(); try { checkOpen(); List<WALEntry> entries = request.getEntryList(); if (entries == null || entries.isEmpty()) { // empty input return ReplicateWALEntryResponse.newBuilder().build(); } HRegion region = regionServer.getRegionByEncodedName( entries.get(0).getKey().getEncodedRegionName().toStringUtf8()); RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>(); for (WALEntry entry : entries) { if (regionServer.nonceManager != null) { long nonceGroup = entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime()); } Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<WALKey, WALEdit>(); List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry, cells, walEntry); if (coprocessorHost != null) { // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a // KeyValue. if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), walEntry.getSecond())) { // if bypass this log entry, ignore it ... continue; } walEntries.add(walEntry); } if (edits != null && !edits.isEmpty()) { long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber(); OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId); // check if it's a partial success for (int i = 0; result != null && i < result.length; i++) { if (result[i] != OperationStatus.SUCCESS) { throw new IOException(result[i].getExceptionMsg()); } } } } //sync wal at the end because ASYNC_WAL is used above region.syncWal(); if (coprocessorHost != null) { for (Pair<WALKey, WALEdit> wal : walEntries) { coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(), wal.getSecond()); } } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); } finally { if (regionServer.metricsRegionServer != null) { regionServer.metricsRegionServer.updateReplay( EnvironmentEdgeManager.currentTime() - before); } } }
@Test public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception { String method = "testSkipRecoveredEditsReplayTheLastFileIgnored"; TableName tableName = TableName.valueOf(method); byte[] family = Bytes.toBytes("family"); this.region = initHRegion(tableName, method, CONF, family); final WALFactory wals = new WALFactory(CONF, null, method); try { Path regiondir = region.getRegionFileSystem().getRegionDir(); FileSystem fs = region.getRegionFileSystem().getFileSystem(); byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); assertEquals(0, region.getStoreFileList( region.getStores().keySet().toArray(new byte[0][])).size()); Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); long maxSeqId = 1050; long minSeqId = 1000; for (long i = minSeqId; i <= maxSeqId; i += 10) { Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); fs.create(recoveredEdits); WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); long time = System.nanoTime(); WALEdit edit = null; if (i == maxSeqId) { edit = WALEdit.createCompaction(region.getRegionInfo(), CompactionDescriptor.newBuilder() .setTableName(ByteString.copyFrom(tableName.getName())) .setFamilyName(ByteString.copyFrom(regionName)) .setEncodedRegionName(ByteString.copyFrom(regionName)) .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString()))) .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName())) .build()); } else { edit = new WALEdit(); edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes .toBytes(i))); } writer.append(new WAL.Entry(new HLogKey(regionName, tableName, i, time, HConstants.DEFAULT_CLUSTER_ID), edit)); writer.close(); } long recoverSeqId = 1030; Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); MonitoredTask status = TaskMonitor.get().createStatus(method); for (Store store : region.getStores().values()) { maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), recoverSeqId - 1); } long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); assertEquals(maxSeqId, seqId); // assert that the files are flushed assertEquals(1, region.getStoreFileList( region.getStores().keySet().toArray(new byte[0][])).size()); } finally { HRegion.closeHRegion(this.region); this.region = null; wals.close(); } }
/** * HBASE-12782 ITBLL fails for me if generator does anything but 5M per maptask. * Create a region. Close it. Then copy into place a file to replay, one that is bigger than * configured flush size so we bring on lots of flushes. Then reopen and confirm all edits * made it in. * @throws IOException */ @Test (timeout=30000) public void testReplayWorksThoughLotsOfFlushing() throws IOException { Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); // Set it so we flush every 1M or so. Thats a lot. conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024); // The file of recovered edits has a column family of 'meta'. Also has an encoded regionname // of 4823016d8fca70b25503ee07f4c6d79f which needs to match on replay. final String encodedRegionName = "4823016d8fca70b25503ee07f4c6d79f"; HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(testName.getMethodName())); final String columnFamily = "meta"; byte [][] columnFamilyAsByteArray = new byte [][] {Bytes.toBytes(columnFamily)}; htd.addFamily(new HColumnDescriptor(columnFamily)); HRegionInfo hri = new HRegionInfo(htd.getTableName()) { @Override public synchronized String getEncodedName() { return encodedRegionName; } // Cache the name because lots of lookups. private byte [] encodedRegionNameAsBytes = null; @Override public synchronized byte[] getEncodedNameAsBytes() { if (encodedRegionNameAsBytes == null) { this.encodedRegionNameAsBytes = Bytes.toBytes(getEncodedName()); } return this.encodedRegionNameAsBytes; } }; Path hbaseRootDir = TEST_UTIL.getDataTestDir(); HRegion region = HRegion.createHRegion(hri, hbaseRootDir, conf, htd, null); assertEquals(encodedRegionName, region.getRegionInfo().getEncodedName()); List<String> storeFiles = region.getStoreFileList(columnFamilyAsByteArray); // There should be no store files. assertTrue(storeFiles.isEmpty()); region.close(); Path regionDir = region.getRegionDir(hbaseRootDir, hri); Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir); // This is a little fragile getting this path to a file of 10M of edits. Path recoveredEditsFile = new Path(new Path( System.getProperty("project.build.testSourceDirectory", "src" + Path.SEPARATOR + "test"), "data"), "0000000000000016310"); // Copy this file under the region's recovered.edits dir so it is replayed on reopen. FileSystem fs = FileSystem.get(conf); Path destination = new Path(recoveredEditsDir, recoveredEditsFile.getName()); fs.copyToLocalFile(recoveredEditsFile, destination); assertTrue(fs.exists(destination)); // Now the file 0000000000000016310 is under recovered.edits, reopen the region to replay. region = HRegion.openHRegion(region, null); assertEquals(encodedRegionName, region.getRegionInfo().getEncodedName()); storeFiles = region.getStoreFileList(columnFamilyAsByteArray); // Our 0000000000000016310 is 10MB. Most of the edits are for one region. Lets assume that if // we flush at 1MB, that there are at least 3 flushed files that are there because of the // replay of edits. assertTrue("Files count=" + storeFiles.size(), storeFiles.size() > 10); // Now verify all edits made it into the region. int count = verifyAllEditsMadeItIn(fs, conf, recoveredEditsFile, region); Log.info("Checked " + count + " edits made it in"); }
/** * Tests the case where a RegionServer enters a GC pause, * comes back online after the master declared it dead and started to split. * Want log rolling after a master split to fail. See HBASE-2312. */ @Test (timeout=300000) public void testLogRollAfterSplitStart() throws IOException { LOG.info("Verify wal roll after split starts will fail."); String logName = "testLogRollAfterSplitStart"; Path thisTestsDir = new Path(HBASEDIR, DefaultWALProvider.getWALDirectoryName(logName)); final WALFactory wals = new WALFactory(conf, null, logName); try { // put some entries in an WAL TableName tableName = TableName.valueOf(this.getClass().getName()); HRegionInfo regioninfo = new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); final WAL log = wals.getWAL(regioninfo.getEncodedNameAsBytes()); final AtomicLong sequenceId = new AtomicLong(1); final int total = 20; for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor("column")); log.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis()), kvs, sequenceId, true, null); } // Send the data to HDFS datanodes and close the HDFS writer log.sync(); ((FSHLog) log).replaceWriter(((FSHLog)log).getOldPath(), null, null, null); /* code taken from MasterFileSystem.getLogDirs(), which is called from MasterFileSystem.splitLog() * handles RS shutdowns (as observed by the splitting process) */ // rename the directory so a rogue RS doesn't create more WALs Path rsSplitDir = thisTestsDir.suffix(DefaultWALProvider.SPLITTING_EXT); if (!fs.rename(thisTestsDir, rsSplitDir)) { throw new IOException("Failed fs.rename for log split: " + thisTestsDir); } LOG.debug("Renamed region directory: " + rsSplitDir); LOG.debug("Processing the old log files."); WALSplitter.split(HBASEDIR, rsSplitDir, OLDLOGDIR, fs, conf, wals); LOG.debug("Trying to roll the WAL."); try { log.rollWriter(); Assert.fail("rollWriter() did not throw any exception."); } catch (IOException ioe) { if (ioe.getCause() instanceof FileNotFoundException) { LOG.info("Got the expected exception: ", ioe.getCause()); } else { Assert.fail("Unexpected exception: " + ioe); } } } finally { wals.close(); if (fs.exists(thisTestsDir)) { fs.delete(thisTestsDir, true); } } }
/** * Inspect the log directory to find dead servers which need recovery work * @return A set of ServerNames which aren't running but still have WAL files left in file system */ public Set<ServerName> getFailedServersFromLogFolders() { boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors", WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT); Set<ServerName> serverNames = new HashSet<>(); Path logsDirPath = new Path(this.rootDir, HConstants.HREGION_LOGDIR_NAME); do { if (services.isStopped()) { LOG.warn("Master stopped while trying to get failed servers."); break; } try { if (!this.fs.exists(logsDirPath)) return serverNames; FileStatus[] logFolders = FSUtils.listStatus(this.fs, logsDirPath, null); // Get online servers after getting log folders to avoid log folder deletion of newly // checked in region servers . see HBASE-5916 Set<ServerName> onlineServers = services.getServerManager().getOnlineServers().keySet(); if (logFolders == null || logFolders.length == 0) { LOG.debug("No log files to split, proceeding..."); return serverNames; } for (FileStatus status : logFolders) { FileStatus[] curLogFiles = FSUtils.listStatus(this.fs, status.getPath(), null); if (curLogFiles == null || curLogFiles.length == 0) { // Empty log folder. No recovery needed continue; } final ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName( status.getPath()); if (null == serverName) { LOG.warn("Log folder " + status.getPath() + " doesn't look like its name includes a " + "region server name; leaving in place. If you see later errors about missing " + "write ahead logs they may be saved in this location."); } else if (!onlineServers.contains(serverName)) { LOG.info("Log folder " + status.getPath() + " doesn't belong " + "to a known region server, splitting"); serverNames.add(serverName); } else { LOG.info("Log folder " + status.getPath() + " belongs to an existing region server"); } } retrySplitting = false; } catch (IOException ioe) { LOG.warn("Failed getting failed servers to be recovered.", ioe); if (!checkFileSystem()) { LOG.warn("Bad Filesystem, exiting"); Runtime.getRuntime().halt(1); } try { if (retrySplitting) { Thread.sleep(conf.getInt("hbase.hlog.split.failure.retry.interval", 30 * 1000)); } } catch (InterruptedException e) { LOG.warn("Interrupted, aborting since cannot return w/o splitting"); Thread.currentThread().interrupt(); retrySplitting = false; Runtime.getRuntime().halt(1); } } } while (retrySplitting); return serverNames; }
/** * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. * @param region * @param mutations * @param replaySeqId * @return an array of OperationStatus which internally contains the OperationStatusCode and the * exceptionMessage if any * @throws IOException */ private OperationStatus [] doReplayBatchOp(final HRegion region, final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException { long before = EnvironmentEdgeManager.currentTime(); boolean batchContainsPuts = false, batchContainsDelete = false; try { for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) { WALSplitter.MutationReplay m = it.next(); if (m.type == MutationType.PUT) { batchContainsPuts = true; } else { batchContainsDelete = true; } NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); List<Cell> metaCells = map.get(WALEdit.METAFAMILY); if (metaCells != null && !metaCells.isEmpty()) { for (Cell metaCell : metaCells) { CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); HRegion hRegion = region; if (compactionDesc != null) { // replay the compaction. Remove the files from stores only if we are the primary // region replica (thus own the files) hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, replaySeqId); continue; } FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); if (flushDesc != null && !isDefaultReplica) { hRegion.replayWALFlushMarker(flushDesc, replaySeqId); continue; } RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); if (regionEvent != null && !isDefaultReplica) { hRegion.replayWALRegionEventMarker(regionEvent); continue; } BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); if (bulkLoadEvent != null) { hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); continue; } } it.remove(); } } requestCount.increment(); if (!region.getRegionInfo().isMetaRegion()) { regionServer.cacheFlusher.reclaimMemStoreMemory(); } return region.batchReplay(mutations.toArray( new WALSplitter.MutationReplay[mutations.size()]), replaySeqId); } finally { if (regionServer.metricsRegionServer != null) { long after = EnvironmentEdgeManager.currentTime(); if (batchContainsPuts) { regionServer.metricsRegionServer.updatePutBatch( region.getTableDescriptor().getTableName(), after - before); } if (batchContainsDelete) { regionServer.metricsRegionServer.updateDeleteBatch( region.getTableDescriptor().getTableName(), after - before); } } } }
@Test(timeout = 300000) public void testRecoveredEdits() throws Exception { conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal startCluster(NUM_RS); int numLogLines = 10000; SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); // turn off load balancing to prevent regions from moving around otherwise // they will consume recovered.edits master.balanceSwitch(false); FileSystem fs = master.getMasterFileSystem().getFileSystem(); List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); Path rootdir = FSUtils.getRootDir(conf); int numRegions = 50; try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); Table t = installTable(zkw, numRegions)) { TableName table = t.getName(); List<RegionInfo> regions = null; HRegionServer hrs = null; for (int i = 0; i < NUM_RS; i++) { hrs = rsts.get(i).getRegionServer(); regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); // At least one RS will have >= to average number of regions. if (regions.size() >= numRegions / NUM_RS) { break; } } Path logDir = new Path(rootdir, AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString())); LOG.info("#regions = " + regions.size()); Iterator<RegionInfo> it = regions.iterator(); while (it.hasNext()) { RegionInfo region = it.next(); if (region.getTable().getNamespaceAsString() .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) { it.remove(); } } makeWAL(hrs, regions, numLogLines, 100); slm.splitLogDistributed(logDir); int count = 0; for (RegionInfo hri : regions) { Path tdir = FSUtils.getTableDir(rootdir, table); @SuppressWarnings("deprecation") Path editsdir = WALSplitter .getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName())); LOG.debug("checking edits dir " + editsdir); FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { @Override public boolean accept(Path p) { if (WALSplitter.isSequenceIdFile(p)) { return false; } return true; } }); assertTrue( "edits dir should have more than a single file in it. instead has " + files.length, files.length > 1); for (int i = 0; i < files.length; i++) { int c = countWAL(files[i].getPath(), fs, conf); count += c; } LOG.info(count + " edits in " + files.length + " recovered edits files."); } // check that the log file is moved assertFalse(fs.exists(logDir)); assertEquals(numLogLines, count); } }