@Test(timeout=180000) public void testWriterInterrupted() throws Exception { short repl = 3; int numWrites = 20000; MiniDFSCluster cluster = new MiniDFSCluster(conf, repl, true, null); FileSystem fs1 = cluster.getFileSystem(); FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf()); Path path = new Path("/testWriterInterrupted"); FSDataOutputStream stm = fs1.create(path); byte[] toWrite = AppendTestUtil.randomBytes(0, 5); CountDownLatch countdown = new CountDownLatch(1); AtomicReference<Throwable> thrown = new AtomicReference<Throwable>(); WriterThread writerThread = new AppendTestUtil.WriterThread( stm, toWrite, thrown, countdown, numWrites); writerThread.start(); countdown.countDown(); while (writerThread.getNumWritten() == 0 && thrown.get() == null && writerThread.isAlive()) { System.err.println("Waiting for writer to start"); Thread.sleep(10); } assertTrue(writerThread.isAlive()); if (thrown.get() != null) { throw new RuntimeException(thrown.get()); } AppendTestUtil.loseLeases(fs1); AppendTestUtil.recoverFile(cluster, fs2, path); while (thrown.get() == null) { LOG.info("Waiting for writer thread to get expected exception"); Thread.sleep(1000); } assertNotNull(thrown.get()); // Check that we can see all of the synced edits int expectedEdits = writerThread.getNumWritten(); int gotEdits = (int)(fs2.getFileStatus(path).getLen() / toWrite.length); assertTrue("Expected at least " + expectedEdits + " edits, got " + gotEdits, gotEdits >= expectedEdits); }
@Test(timeout=90000) public void testWriterInterrupted() throws Exception { short repl = 3; int numWrites = 20000; MiniDFSCluster cluster = new MiniDFSCluster(conf, repl, true, null); FileSystem fs1 = cluster.getFileSystem(); FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf()); Path path = new Path("/testWriterInterrupted"); FSDataOutputStream stm = fs1.create(path); byte[] toWrite = AppendTestUtil.randomBytes(0, 5); CountDownLatch countdown = new CountDownLatch(1); AtomicReference<Throwable> thrown = new AtomicReference<Throwable>(); WriterThread writerThread = new AppendTestUtil.WriterThread( stm, toWrite, thrown, countdown, numWrites); writerThread.start(); countdown.countDown(); while (writerThread.getNumWritten() == 0 && thrown.get() == null && writerThread.isAlive()) { System.err.println("Waiting for writer to start"); Thread.sleep(10); } assertTrue(writerThread.isAlive()); if (thrown.get() != null) { throw new RuntimeException(thrown.get()); } AppendTestUtil.loseLeases(fs1); AppendTestUtil.recoverFile(cluster, fs2, path); while (thrown.get() == null) { LOG.info("Waiting for writer thread to get expected exception"); Thread.sleep(1000); } assertNotNull(thrown.get()); // Check that we can see all of the synced edits int expectedEdits = writerThread.getNumWritten(); int gotEdits = (int)(fs2.getFileStatus(path).getLen() / toWrite.length); assertTrue("Expected at least " + expectedEdits + " edits, got " + gotEdits, gotEdits >= expectedEdits); }