/** * @return a path with a write for that path. caller should close. */ private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException { Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true); if (regionedits == null) { return null; } if (fs.exists(regionedits)) { LOG.warn("Found old edits file. It could be the " + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" + fs.getFileStatus(regionedits).getLen()); if (!fs.delete(regionedits, false)) { LOG.warn("Failed delete of old " + regionedits); } } Writer w = createWriter(regionedits); LOG.debug("Creating writer path=" + regionedits); return new WriterAndPath(regionedits, w); }
@Override public void setUpCluster() throws Exception { util = getTestingUtil(null); Configuration conf = util.getConfiguration(); if (!util.isDistributedCluster()) { // Inject required configuration if we are not running in distributed mode conf.setInt(HFile.FORMAT_VERSION_KEY, 3); conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName()); conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class, Reader.class); conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class, Writer.class); conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true); } // Check if the cluster configuration can support this test try { EncryptionTest.testEncryption(conf, "AES", null); } catch (Exception e) { LOG.warn("Encryption configuration test did not pass, skipping test"); return; } super.setUpCluster(); initialized = true; }
/** * @return a path with a write for that path. caller should close. */ private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException { Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true); if (regionedits == null) { return null; } if (fs.exists(regionedits)) { LOG.warn("Found old edits file. It could be the " + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" + fs.getFileStatus(regionedits).getLen()); if (!fs.delete(regionedits, false)) { LOG.warn("Failed delete of old " + regionedits); } } Writer w = createWriter(regionedits); LOG.info("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region)); return (new WriterAndPath(regionedits, w)); }
@BeforeClass public static void setUpBeforeClass() throws Exception { conf = TEST_UTIL.getConfiguration(); conf.setClass("hbase.regionserver.hlog.writer.impl", InstrumentedLogWriter.class, Writer.class); conf.setBoolean("dfs.support.broken.append", true); conf.setBoolean("dfs.support.append", true); // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config. System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); // Create fake maping user to group and set it to the conf. Map<String, String []> u2g_map = new HashMap<String, String []>(2); ROBBER = User.getCurrent().getName() + "-robber"; ZOMBIE = User.getCurrent().getName() + "-zombie"; u2g_map.put(ROBBER, GROUP); u2g_map.put(ZOMBIE, GROUP); DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map); conf.setInt("dfs.heartbeat.interval", 1); TEST_UTIL.startMiniDFSCluster(2); }
@Test(timeout=300000, expected = IOException.class) public void testSplitWillFailIfWritingToRegionFails() throws Exception { //leave 5th log open so we could append the "trap" Writer writer = generateWALs(4); useDifferentDFSClient(); String region = "break"; Path regiondir = new Path(TABLEDIR, region); fs.mkdirs(regiondir); InstrumentedLogWriter.activateFailure = false; appendEntry(writer, TABLE_NAME, Bytes.toBytes(region), ("r" + 999).getBytes(), FAMILY, QUALIFIER, VALUE, 0); writer.close(); try { InstrumentedLogWriter.activateFailure = true; WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); } catch (IOException e) { assertTrue(e.getMessage(). contains("This exception is instrumented and should only be thrown for testing")); throw e; } finally { InstrumentedLogWriter.activateFailure = false; } }
/** * @param leaveOpen index to leave un-closed. -1 to close all. * @return the writer that's still open, or null if all were closed. */ private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException { makeRegionDirs(REGIONS); fs.mkdirs(WALDIR); Writer [] ws = new Writer[writers]; int seq = 0; for (int i = 0; i < writers; i++) { ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i)); for (int j = 0; j < entries; j++) { int prefix = 0; for (String region : REGIONS) { String row_key = region + prefix++ + i + j; appendEntry(ws[i], TABLE_NAME, region.getBytes(), row_key.getBytes(), FAMILY, QUALIFIER, VALUE, seq++); } } if (i != leaveOpen) { ws[i].close(); LOG.info("Closing writer " + i); } } if (leaveOpen < 0 || leaveOpen >= writers) { return null; } return ws[leaveOpen]; }
/** * @return a path with a write for that path. caller should close. */ WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException { Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, fileBeingSplit.getPath().getName()); if (regionedits == null) { return null; } if (fs.exists(regionedits)) { LOG.warn("Found old edits file. It could be the " + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" + fs.getFileStatus(regionedits).getLen()); if (!fs.delete(regionedits, false)) { LOG.warn("Failed delete of old {}", regionedits); } } Writer w = createWriter(regionedits); LOG.debug("Creating writer path={}", regionedits); return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId()); }
@BeforeClass public static void setUpBeforeClass() throws Exception { conf = TEST_UTIL.getConfiguration(); conf.setClass("hbase.regionserver.hlog.writer.impl", InstrumentedLogWriter.class, Writer.class); // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config. System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); // Create fake maping user to group and set it to the conf. Map<String, String []> u2g_map = new HashMap<>(2); ROBBER = User.getCurrent().getName() + "-robber"; ZOMBIE = User.getCurrent().getName() + "-zombie"; u2g_map.put(ROBBER, GROUP); u2g_map.put(ZOMBIE, GROUP); DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map); conf.setInt("dfs.heartbeat.interval", 1); TEST_UTIL.startMiniDFSCluster(2); }
@Test(timeout=300000, expected = IOException.class) public void testSplitWillFailIfWritingToRegionFails() throws Exception { //leave 5th log open so we could append the "trap" Writer writer = generateWALs(4); useDifferentDFSClient(); String region = "break"; Path regiondir = new Path(TABLEDIR, region); fs.mkdirs(regiondir); InstrumentedLogWriter.activateFailure = false; appendEntry(writer, TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes("r" + 999), FAMILY, QUALIFIER, VALUE, 0); writer.close(); try { InstrumentedLogWriter.activateFailure = true; WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); } catch (IOException e) { assertTrue(e.getMessage(). contains("This exception is instrumented and should only be thrown for testing")); throw e; } finally { InstrumentedLogWriter.activateFailure = false; } }
private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs, String output) throws IOException { WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder(); desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes())) .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes())) .setRegionName(ByteString.copyFrom(hri.getRegionName())) .setFamilyName(ByteString.copyFrom(FAMILY)) .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY)) .addAllCompactionInput(Arrays.asList(inputs)) .addCompactionOutput(output); WALEdit edit = WALEdit.createCompaction(hri, desc.build()); WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1, EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID); w.append(new Entry(key, edit)); w.sync(); }
private static void appendRegionEvent(Writer w, String region) throws IOException { WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor( WALProtos.RegionEventDescriptor.EventType.REGION_OPEN, TABLE_NAME.toBytes(), Bytes.toBytes(region), Bytes.toBytes(String.valueOf(region.hashCode())), 1, ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>>of()); final long time = EnvironmentEdgeManager.currentTime(); KeyValue kv = new KeyValue(Bytes.toBytes(region), WALEdit.METAFAMILY, WALEdit.REGION_EVENT, time, regionOpenDesc.toByteArray()); final WALKeyImpl walKey = new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time, HConstants.DEFAULT_CLUSTER_ID); w.append( new Entry(walKey, new WALEdit().add(kv))); w.sync(); }
@Override public void setUpCluster() throws Exception { util = getTestingUtil(null); Configuration conf = util.getConfiguration(); if (!util.isDistributedCluster()) { // Inject required configuration if we are not running in distributed mode conf.setInt(HFile.FORMAT_VERSION_KEY, 3); conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName()); conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class, Reader.class); conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class, Writer.class); conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true); } // Check if the cluster configuration can support this test try { EncryptionTest.testEncryption(conf, "AES", null); } catch (Exception e) { LOG.warn("Encryption configuration test did not pass, skipping test", e); return; } super.setUpCluster(); initialized = true; }
/** * If you already have a WALFactory, you should favor the instance method. * @return a writer that won't overwrite files. Caller must close. */ @VisibleForTesting public static Writer createWALWriter(final FileSystem fs, final Path path, final Configuration configuration) throws IOException { return DefaultWALProvider.createWriter(configuration, fs, path, false); }
@BeforeClass public static void setUpBeforeClass() throws Exception { Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration(); conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName()); conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class, Reader.class); conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class, Writer.class); conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true); TestWALReplay.setUpBeforeClass(); }
private void doWriting() throws IOException, InterruptedException { this.user.runAs(new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { // Index of the WAL we want to keep open. generateWALs will leave open the WAL whose // index we supply here. int walToKeepOpen = numOfWriters - 1; // The below method writes numOfWriters files each with ENTRIES entries for a total of // numOfWriters * ENTRIES added per column family in the region. Writer writer = null; try { writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen); } catch (IOException e1) { throw new RuntimeException("Failed", e1); } // Update counter so has all edits written so far. editsCount.addAndGet(numOfWriters * ENTRIES); loop(writer); // If we've been interruped, then things should have shifted out from under us. // closing should error try { writer.close(); fail("Writing closing after parsing should give an error."); } catch (IOException exception) { LOG.debug("ignoring error when closing final writer.", exception); } return null; } }); }
private void loop(final Writer writer) { byte [] regionBytes = Bytes.toBytes(this.region); while (!stop.get()) { try { long seq = appendEntry(writer, TABLE_NAME, regionBytes, ("r" + editsCount.get()).getBytes(), regionBytes, QUALIFIER, VALUE, 0); long count = editsCount.incrementAndGet(); LOG.info(getName() + " sync count=" + count + ", seq=" + seq); try { Thread.sleep(1); } catch (InterruptedException e) { // } } catch (IOException ex) { LOG.error(getName() + " ex " + ex.toString()); if (ex instanceof RemoteException) { LOG.error("Juliet: got RemoteException " + ex.getMessage() + " while writing " + (editsCount.get() + 1)); } else { LOG.error(getName() + " failed to write....at " + editsCount.get()); fail("Failed to write " + editsCount.get()); } break; } catch (Throwable t) { LOG.error(getName() + " HOW? " + t); LOG.debug("exception details", t); break; } } LOG.info(getName() + " Writer exiting"); }
public static long appendEntry(Writer writer, TableName table, byte[] region, byte[] row, byte[] family, byte[] qualifier, byte[] value, long seq) throws IOException { LOG.info(Thread.currentThread().getName() + " append"); writer.append(createTestEntry(table, region, row, family, qualifier, value, seq)); LOG.info(Thread.currentThread().getName() + " sync"); writer.sync(); return seq; }
/** * If you already have a WALFactory, you should favor the instance method. * @return a writer that won't overwrite files. Caller must close. */ @VisibleForTesting public static Writer createWALWriter(final FileSystem fs, final Path path, final Configuration configuration) throws IOException { return FSHLogProvider.createWriter(configuration, fs, path, false); }
/** * This method allows subclasses to inject different writers without having to extend other * methods like rollWriter(). * @return Writer instance */ @Override protected Writer createWriterInstance(final Path path) throws IOException { Writer writer = FSHLogProvider.createWriter(conf, fs, path, false); if (writer instanceof ProtobufLogWriter) { preemptiveSync((ProtobufLogWriter) writer); } return writer; }
private void loop(final Writer writer) { byte [] regionBytes = Bytes.toBytes(this.region); while (!stop.get()) { try { long seq = appendEntry(writer, TABLE_NAME, regionBytes, Bytes.toBytes("r" + editsCount.get()), regionBytes, QUALIFIER, VALUE, 0); long count = editsCount.incrementAndGet(); LOG.info(getName() + " sync count=" + count + ", seq=" + seq); try { Thread.sleep(1); } catch (InterruptedException e) { // } } catch (IOException ex) { LOG.error(getName() + " ex " + ex.toString()); if (ex instanceof RemoteException) { LOG.error("Juliet: got RemoteException " + ex.getMessage() + " while writing " + (editsCount.get() + 1)); } else { LOG.error(getName() + " failed to write....at " + editsCount.get()); fail("Failed to write " + editsCount.get()); } break; } catch (Throwable t) { LOG.error(getName() + " HOW? " + t); LOG.debug("exception details", t); break; } } LOG.info(getName() + " Writer exiting"); }
@Test (timeout=300000) public void testSplitLeavesCompactionEventsEdits() throws IOException{ RegionInfo hri = RegionInfoBuilder.newBuilder(TABLE_NAME).build(); REGIONS.clear(); REGIONS.add(hri.getEncodedName()); Path regionDir = new Path(FSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName()); LOG.info("Creating region directory: " + regionDir); assertTrue(fs.mkdirs(regionDir)); Writer writer = generateWALs(1, 10, 0, 10); String[] compactInputs = new String[]{"file1", "file2", "file3"}; String compactOutput = "file4"; appendCompactionEvent(writer, hri, compactInputs, compactOutput); writer.close(); useDifferentDFSClient(); WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals); Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); // original log should have 10 test edits, 10 region markers, 1 compaction marker assertEquals(21, countWAL(originalLog)); Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, hri.getEncodedName()); assertEquals(1, splitLog.length); assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0])); // split log should have 10 test edits plus 1 compaction marker assertEquals(11, countWAL(splitLog[0])); }
/** * @param leaveOpen index to leave un-closed. -1 to close all. * @return the writer that's still open, or null if all were closed. */ private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents) throws IOException { makeRegionDirs(REGIONS); fs.mkdirs(WALDIR); Writer [] ws = new Writer[writers]; int seq = 0; int numRegionEventsAdded = 0; for (int i = 0; i < writers; i++) { ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i)); for (int j = 0; j < entries; j++) { int prefix = 0; for (String region : REGIONS) { String row_key = region + prefix++ + i + j; appendEntry(ws[i], TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(row_key), FAMILY, QUALIFIER, VALUE, seq++); if (numRegionEventsAdded < regionEvents) { numRegionEventsAdded ++; appendRegionEvent(ws[i], region); } } } if (i != leaveOpen) { ws[i].close(); LOG.info("Closing writer " + i); } } if (leaveOpen < 0 || leaveOpen >= writers) { return null; } return ws[leaveOpen]; }
private void injectEmptyFile(String suffix, boolean closeFile) throws IOException { Writer writer = WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf); if (closeFile) { writer.close(); } }
@BeforeClass public static void setUpBeforeClass() throws Exception { Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName()); conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class, Reader.class); conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class, Writer.class); conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true); AbstractTestWALReplay.setUpBeforeClass(); }
/** * should be package-private, visible for recovery testing. * @return an overwritable writer for recovered edits. caller should close. */ @VisibleForTesting public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path) throws IOException { return DefaultWALProvider.createWriter(conf, fs, path, true); }
/** * If you already have a WALFactory, you should favor the instance method. * @return a Writer that will overwrite files. Caller must close. */ static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path, final Configuration configuration) throws IOException { return DefaultWALProvider.createWriter(configuration, fs, path, true); }
/** * Create a new {@link Writer} for writing log splits. * @return a new Writer instance, caller should close */ protected Writer createWriter(Path logfile) throws IOException { return walFactory.createRecoveredEditsWriter(fs, logfile); }
WriterAndPath(final Path p, final Writer w) { this.p = p; this.w = w; }
@Test (timeout=300000) public void testIOEOnOutputThread() throws Exception { conf.setBoolean(HBASE_SKIP_ERRORS, false); generateWALs(-1); useDifferentDFSClient(); FileStatus[] logfiles = fs.listStatus(WALDIR); assertTrue("There should be some log file", logfiles != null && logfiles.length > 0); // wals with no entries (like the one we don't use in the factory) // won't cause a failure since nothing will ever be written. // pick the largest one since it's most likely to have entries. int largestLogFile = 0; long largestSize = 0; for (int i = 0; i < logfiles.length; i++) { if (logfiles[i].getLen() > largestSize) { largestLogFile = i; largestSize = logfiles[i].getLen(); } } assertTrue("There should be some log greater than size 0.", 0 < largestSize); // Set up a splitter that will throw an IOE on the output side WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null, this.mode) { @Override protected Writer createWriter(Path logfile) throws IOException { Writer mockWriter = Mockito.mock(Writer.class); Mockito.doThrow(new IOException("Injected")).when( mockWriter).append(Mockito.<Entry>any()); return mockWriter; } }; // Set up a background thread dumper. Needs a thread to depend on and then we need to run // the thread dumping in a background thread so it does not hold up the test. final AtomicBoolean stop = new AtomicBoolean(false); final Thread someOldThread = new Thread("Some-old-thread") { @Override public void run() { while(!stop.get()) Threads.sleep(10); } }; someOldThread.setDaemon(true); someOldThread.start(); final Thread t = new Thread("Background-thread-dumper") { public void run() { try { Threads.threadDumpingIsAlive(someOldThread); } catch (InterruptedException e) { e.printStackTrace(); } } }; t.setDaemon(true); t.start(); try { logSplitter.splitLogFile(logfiles[largestLogFile], null); fail("Didn't throw!"); } catch (IOException ioe) { assertTrue(ioe.toString().contains("Injected")); } finally { // Setting this to true will turn off the background thread dumper. stop.set(true); } }
/** * @throws IOException * @see https://issues.apache.org/jira/browse/HBASE-4862 */ @Test (timeout=300000) public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException { LOG.info("testConcurrentSplitLogAndReplayRecoverEdit"); // Generate wals for our destination region String regionName = "r0"; final Path regiondir = new Path(TABLEDIR, regionName); REGIONS.clear(); REGIONS.add(regionName); generateWALs(-1); wals.getWAL(Bytes.toBytes(regionName)); FileStatus[] logfiles = fs.listStatus(WALDIR); assertTrue("There should be some log file", logfiles != null && logfiles.length > 0); WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null, this.mode) { @Override protected Writer createWriter(Path logfile) throws IOException { Writer writer = wals.createRecoveredEditsWriter(this.fs, logfile); // After creating writer, simulate region's // replayRecoveredEditsIfAny() which gets SplitEditFiles of this // region and delete them, excluding files with '.temp' suffix. NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); if (files != null && !files.isEmpty()) { for (Path file : files) { if (!this.fs.delete(file, false)) { LOG.error("Failed delete of " + file); } else { LOG.debug("Deleted recovered.edits file=" + file); } } } return writer; } }; try{ logSplitter.splitLogFile(logfiles[0], null); } catch (IOException e) { LOG.info(e); fail("Throws IOException when spliting " + "log, it is most likely because writing file does not " + "exist which is caused by concurrent replayRecoveredEditsIfAny()"); } if (fs.exists(CORRUPTDIR)) { if (fs.listStatus(CORRUPTDIR).length > 0) { fail("There are some corrupt logs, " + "it is most likely caused by concurrent replayRecoveredEditsIfAny()"); } } }
private Writer generateWALs(int leaveOpen) throws IOException { return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen); }
private void injectEmptyFile(String suffix, boolean closeFile) throws IOException { Writer writer = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf); if (closeFile) writer.close(); }