/** * Returns the next batch of events in the stream, waiting indefinitely if * a new batch is not immediately available. * * @throws IOException see {@link DFSInotifyEventInputStream#poll()} * @throws MissingEventsException see * {@link DFSInotifyEventInputStream#poll()} * @throws InterruptedException if the calling thread is interrupted */ public EventBatch take() throws IOException, InterruptedException, MissingEventsException { TraceScope scope = Trace.startSpan("inotifyTake", traceSampler); EventBatch next = null; try { int nextWaitMin = INITIAL_WAIT_MS; while ((next = poll()) == null) { // sleep for a random period between nextWaitMin and nextWaitMin * 2 // to avoid stampedes at the NN if there are multiple clients int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin); LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime); Thread.sleep(sleepTime); // the maximum sleep is 2 minutes nextWaitMin = Math.min(60000, nextWaitMin * 2); } } finally { scope.close(); } return next; }
/** * Returns the next event batch in the stream, waiting up to the specified * amount of time for a new batch. Returns null if one is not available at the * end of the specified amount of time. The time before the method returns may * exceed the specified amount of time by up to the time required for an RPC * to the NameNode. * * @param time number of units of the given TimeUnit to wait * @param tu the desired TimeUnit * @throws IOException see {@link DFSInotifyEventInputStream#poll()} * @throws MissingEventsException * see {@link DFSInotifyEventInputStream#poll()} * @throws InterruptedException if the calling thread is interrupted */ public EventBatch poll(long time, TimeUnit tu) throws IOException, InterruptedException, MissingEventsException { EventBatch next; try (TraceScope ignored = tracer.newScope("inotifyPollWithTimeout")) { long initialTime = Time.monotonicNow(); long totalWait = TimeUnit.MILLISECONDS.convert(time, tu); long nextWait = INITIAL_WAIT_MS; while ((next = poll()) == null) { long timeLeft = totalWait - (Time.monotonicNow() - initialTime); if (timeLeft <= 0) { LOG.debug("timed poll(): timed out"); break; } else if (timeLeft < nextWait * 2) { nextWait = timeLeft; } else { nextWait *= 2; } LOG.debug("timed poll(): poll() returned null, sleeping for {} ms", nextWait); Thread.sleep(nextWait); } } return next; }
/** * Returns the next batch of events in the stream, waiting indefinitely if * a new batch is not immediately available. * * @throws IOException see {@link DFSInotifyEventInputStream#poll()} * @throws MissingEventsException see * {@link DFSInotifyEventInputStream#poll()} * @throws InterruptedException if the calling thread is interrupted */ public EventBatch take() throws IOException, InterruptedException, MissingEventsException { EventBatch next; try (TraceScope ignored = tracer.newScope("inotifyTake")) { int nextWaitMin = INITIAL_WAIT_MS; while ((next = poll()) == null) { // sleep for a random period between nextWaitMin and nextWaitMin * 2 // to avoid stampedes at the NN if there are multiple clients int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin); LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime); Thread.sleep(sleepTime); // the maximum sleep is 2 minutes nextWaitMin = Math.min(60000, nextWaitMin * 2); } } return next; }
/** * Returns the next event batch in the stream, waiting up to the specified * amount of time for a new batch. Returns null if one is not available at the * end of the specified amount of time. The time before the method returns may * exceed the specified amount of time by up to the time required for an RPC * to the NameNode. * * @param time number of units of the given TimeUnit to wait * @param tu the desired TimeUnit * @throws IOException see {@link DFSInotifyEventInputStream#poll()} * @throws MissingEventsException * see {@link DFSInotifyEventInputStream#poll()} * @throws InterruptedException if the calling thread is interrupted */ public EventBatch poll(long time, TimeUnit tu) throws IOException, InterruptedException, MissingEventsException { long initialTime = Time.monotonicNow(); long totalWait = TimeUnit.MILLISECONDS.convert(time, tu); long nextWait = INITIAL_WAIT_MS; EventBatch next = null; while ((next = poll()) == null) { long timeLeft = totalWait - (Time.monotonicNow() - initialTime); if (timeLeft <= 0) { LOG.debug("timed poll(): timed out"); break; } else if (timeLeft < nextWait * 2) { nextWait = timeLeft; } else { nextWait *= 2; } LOG.debug("timed poll(): poll() returned null, sleeping for {} ms", nextWait); Thread.sleep(nextWait); } return next; }
/** * Returns the next batch of events in the stream, waiting indefinitely if * a new batch is not immediately available. * * @throws IOException see {@link DFSInotifyEventInputStream#poll()} * @throws MissingEventsException see * {@link DFSInotifyEventInputStream#poll()} * @throws InterruptedException if the calling thread is interrupted */ public EventBatch take() throws IOException, InterruptedException, MissingEventsException { EventBatch next = null; int nextWaitMin = INITIAL_WAIT_MS; while ((next = poll()) == null) { // sleep for a random period between nextWaitMin and nextWaitMin * 2 // to avoid stampedes at the NN if there are multiple clients int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin); LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime); Thread.sleep(sleepTime); // the maximum sleep is 2 minutes nextWaitMin = Math.min(60000, nextWaitMin * 2); } return next; }
/** * Returns the next event in the stream, waiting up to the specified amount of * time for a new event. Returns null if a new event is not available at the * end of the specified amount of time. The time before the method returns may * exceed the specified amount of time by up to the time required for an RPC * to the NameNode. * * @param time number of units of the given TimeUnit to wait * @param tu the desired TimeUnit * @throws IOException see {@link DFSInotifyEventInputStream#poll()} * @throws MissingEventsException * see {@link DFSInotifyEventInputStream#poll()} * @throws InterruptedException if the calling thread is interrupted */ public Event poll(long time, TimeUnit tu) throws IOException, InterruptedException, MissingEventsException { long initialTime = Time.monotonicNow(); long totalWait = TimeUnit.MILLISECONDS.convert(time, tu); long nextWait = INITIAL_WAIT_MS; Event next = null; while ((next = poll()) == null) { long timeLeft = totalWait - (Time.monotonicNow() - initialTime); if (timeLeft <= 0) { LOG.debug("timed poll(): timed out"); break; } else if (timeLeft < nextWait * 2) { nextWait = timeLeft; } else { nextWait *= 2; } LOG.debug("timed poll(): poll() returned null, sleeping for {} ms", nextWait); Thread.sleep(nextWait); } return next; }
/** * Returns the next event in the stream, waiting indefinitely if a new event * is not immediately available. * * @throws IOException see {@link DFSInotifyEventInputStream#poll()} * @throws MissingEventsException see * {@link DFSInotifyEventInputStream#poll()} * @throws InterruptedException if the calling thread is interrupted */ public Event take() throws IOException, InterruptedException, MissingEventsException { Event next = null; int nextWaitMin = INITIAL_WAIT_MS; while ((next = poll()) == null) { // sleep for a random period between nextWaitMin and nextWaitMin * 2 // to avoid stampedes at the NN if there are multiple clients int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin); LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime); Thread.sleep(sleepTime); // the maximum sleep is 2 minutes nextWaitMin = Math.min(60000, nextWaitMin * 2); } return next; }
/** * Returns the next event batch in the stream, waiting up to the specified * amount of time for a new batch. Returns null if one is not available at the * end of the specified amount of time. The time before the method returns may * exceed the specified amount of time by up to the time required for an RPC * to the NameNode. * * @param time number of units of the given TimeUnit to wait * @param tu the desired TimeUnit * @throws IOException see {@link DFSInotifyEventInputStream#poll()} * @throws MissingEventsException * see {@link DFSInotifyEventInputStream#poll()} * @throws InterruptedException if the calling thread is interrupted */ public EventBatch poll(long time, TimeUnit tu) throws IOException, InterruptedException, MissingEventsException { TraceScope scope = Trace.startSpan("inotifyPollWithTimeout", traceSampler); EventBatch next = null; try { long initialTime = Time.monotonicNow(); long totalWait = TimeUnit.MILLISECONDS.convert(time, tu); long nextWait = INITIAL_WAIT_MS; while ((next = poll()) == null) { long timeLeft = totalWait - (Time.monotonicNow() - initialTime); if (timeLeft <= 0) { LOG.debug("timed poll(): timed out"); break; } else if (timeLeft < nextWait * 2) { nextWait = timeLeft; } else { nextWait *= 2; } LOG.debug("timed poll(): poll() returned null, sleeping for {} ms", nextWait); Thread.sleep(nextWait); } } finally { scope.close(); } return next; }
@Test(timeout = 120000) public void testNNFailover() throws IOException, URISyntaxException, MissingEventsException { Configuration conf = new HdfsConfiguration(); MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build(); try { cluster.getDfsCluster().waitActive(); cluster.getDfsCluster().transitionToActive(0); DFSClient client = ((DistributedFileSystem) HATestUtil.configureFailoverFs (cluster.getDfsCluster(), conf)).dfs; DFSInotifyEventInputStream eis = client.getInotifyEventStream(); for (int i = 0; i < 10; i++) { client.mkdirs("/dir" + i, null, false); } cluster.getDfsCluster().shutdownNameNode(0); cluster.getDfsCluster().transitionToActive(1); EventBatch batch = null; // we can read all of the edits logged by the old active from the new // active for (int i = 0; i < 10; i++) { batch = waitForNextEvents(eis); Assert.assertEquals(1, batch.getEvents().length); Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE); Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" + i)); } Assert.assertTrue(eis.poll() == null); } finally { cluster.shutdown(); } }
@Test(timeout = 120000) public void testReadEventsWithTimeout() throws IOException, InterruptedException, MissingEventsException { Configuration conf = new HdfsConfiguration(); MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build(); try { cluster.getDfsCluster().waitActive(); cluster.getDfsCluster().transitionToActive(0); final DFSClient client = new DFSClient(cluster.getDfsCluster() .getNameNode(0).getNameNodeAddress(), conf); DFSInotifyEventInputStream eis = client.getInotifyEventStream(); ScheduledExecutorService ex = Executors .newSingleThreadScheduledExecutor(); ex.schedule(new Runnable() { @Override public void run() { try { client.mkdirs("/dir", null, false); } catch (IOException e) { // test will fail LOG.error("Unable to create /dir", e); } } }, 1, TimeUnit.SECONDS); // a very generous wait period -- the edit will definitely have been // processed by the time this is up EventBatch batch = eis.poll(5, TimeUnit.SECONDS); Assert.assertNotNull(batch); Assert.assertEquals(1, batch.getEvents().length); Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE); Assert.assertEquals("/dir", ((Event.CreateEvent) batch.getEvents()[0]).getPath()); } finally { cluster.shutdown(); } }
/** * Returns the next batch of events in the stream or null if no new * batches are currently available. * * @throws IOException because of network error or edit log * corruption. Also possible if JournalNodes are unresponsive in the * QJM setting (even one unresponsive JournalNode is enough in rare cases), * so catching this exception and retrying at least a few times is * recommended. * @throws MissingEventsException if we cannot return the next batch in the * stream because the data for the events (and possibly some subsequent * events) has been deleted (generally because this stream is a very large * number of transactions behind the current state of the NameNode). It is * safe to continue reading from the stream after this exception is thrown * The next available batch of events will be returned. */ public EventBatch poll() throws IOException, MissingEventsException { try (TraceScope ignored = tracer.newScope("inotifyPoll")) { // need to keep retrying until the NN sends us the latest committed txid if (lastReadTxid == -1) { LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN"); lastReadTxid = namenode.getCurrentEditLogTxid(); return null; } if (!it.hasNext()) { EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1); if (el.getLastTxid() != -1) { // we only want to set syncTxid when we were actually able to read some // edits on the NN -- otherwise it will seem like edits are being // generated faster than we can read them when the problem is really // that we are temporarily unable to read edits syncTxid = el.getSyncTxid(); it = el.getBatches().iterator(); long formerLastReadTxid = lastReadTxid; lastReadTxid = el.getLastTxid(); if (el.getFirstTxid() != formerLastReadTxid + 1) { throw new MissingEventsException(formerLastReadTxid + 1, el.getFirstTxid()); } } else { LOG.debug("poll(): read no edits from the NN when requesting edits " + "after txid {}", lastReadTxid); return null; } } if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the // newly seen edit log ops actually got converted to events return it.next(); } else { return null; } } }
/** * Returns the next batch of events in the stream or null if no new * batches are currently available. * * @throws IOException because of network error or edit log * corruption. Also possible if JournalNodes are unresponsive in the * QJM setting (even one unresponsive JournalNode is enough in rare cases), * so catching this exception and retrying at least a few times is * recommended. * @throws MissingEventsException if we cannot return the next batch in the * stream because the data for the events (and possibly some subsequent * events) has been deleted (generally because this stream is a very large * number of transactions behind the current state of the NameNode). It is * safe to continue reading from the stream after this exception is thrown * The next available batch of events will be returned. */ public EventBatch poll() throws IOException, MissingEventsException { // need to keep retrying until the NN sends us the latest committed txid if (lastReadTxid == -1) { LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN"); lastReadTxid = namenode.getCurrentEditLogTxid(); return null; } if (!it.hasNext()) { EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1); if (el.getLastTxid() != -1) { // we only want to set syncTxid when we were actually able to read some // edits on the NN -- otherwise it will seem like edits are being // generated faster than we can read them when the problem is really // that we are temporarily unable to read edits syncTxid = el.getSyncTxid(); it = el.getBatches().iterator(); long formerLastReadTxid = lastReadTxid; lastReadTxid = el.getLastTxid(); if (el.getFirstTxid() != formerLastReadTxid + 1) { throw new MissingEventsException(formerLastReadTxid + 1, el.getFirstTxid()); } } else { LOG.debug("poll(): read no edits from the NN when requesting edits " + "after txid {}", lastReadTxid); return null; } } if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the // newly seen edit log ops actually got converted to events return it.next(); } else { return null; } }
/** * Returns the next event in the stream or null if no new events are currently * available. * * @throws IOException because of network error or edit log * corruption. Also possible if JournalNodes are unresponsive in the * QJM setting (even one unresponsive JournalNode is enough in rare cases), * so catching this exception and retrying at least a few times is * recommended. * @throws MissingEventsException if we cannot return the next event in the * stream because the data for the event (and possibly some subsequent events) * has been deleted (generally because this stream is a very large number of * events behind the current state of the NameNode). It is safe to continue * reading from the stream after this exception is thrown -- the next * available event will be returned. */ public Event poll() throws IOException, MissingEventsException { // need to keep retrying until the NN sends us the latest committed txid if (lastReadTxid == -1) { LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN"); lastReadTxid = namenode.getCurrentEditLogTxid(); return null; } if (!it.hasNext()) { EventsList el = namenode.getEditsFromTxid(lastReadTxid + 1); if (el.getLastTxid() != -1) { // we only want to set syncTxid when we were actually able to read some // edits on the NN -- otherwise it will seem like edits are being // generated faster than we can read them when the problem is really // that we are temporarily unable to read edits syncTxid = el.getSyncTxid(); it = el.getEvents().iterator(); long formerLastReadTxid = lastReadTxid; lastReadTxid = el.getLastTxid(); if (el.getFirstTxid() != formerLastReadTxid + 1) { throw new MissingEventsException(formerLastReadTxid + 1, el.getFirstTxid()); } } else { LOG.debug("poll(): read no edits from the NN when requesting edits " + "after txid {}", lastReadTxid); return null; } } if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the // newly seen edit log ops actually got converted to events return it.next(); } else { return null; } }
@Test(timeout = 120000) public void testNNFailover() throws IOException, URISyntaxException, MissingEventsException { Configuration conf = new HdfsConfiguration(); MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build(); try { cluster.getDfsCluster().waitActive(); cluster.getDfsCluster().transitionToActive(0); DFSClient client = ((DistributedFileSystem) HATestUtil.configureFailoverFs (cluster.getDfsCluster(), conf)).dfs; DFSInotifyEventInputStream eis = client.getInotifyEventStream(); for (int i = 0; i < 10; i++) { client.mkdirs("/dir" + i, null, false); } cluster.getDfsCluster().shutdownNameNode(0); cluster.getDfsCluster().transitionToActive(1); Event next = null; // we can read all of the edits logged by the old active from the new // active for (int i = 0; i < 10; i++) { next = waitForNextEvent(eis); Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" + i)); } Assert.assertTrue(eis.poll() == null); } finally { cluster.shutdown(); } }
@Test(timeout = 120000) public void testReadEventsWithTimeout() throws IOException, InterruptedException, MissingEventsException { Configuration conf = new HdfsConfiguration(); MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build(); try { cluster.getDfsCluster().waitActive(); cluster.getDfsCluster().transitionToActive(0); final DFSClient client = new DFSClient(cluster.getDfsCluster() .getNameNode(0).getNameNodeAddress(), conf); DFSInotifyEventInputStream eis = client.getInotifyEventStream(); ScheduledExecutorService ex = Executors .newSingleThreadScheduledExecutor(); ex.schedule(new Runnable() { @Override public void run() { try { client.mkdirs("/dir", null, false); } catch (IOException e) { // test will fail LOG.error("Unable to create /dir", e); } } }, 1, TimeUnit.SECONDS); // a very generous wait period -- the edit will definitely have been // processed by the time this is up Event next = eis.poll(5, TimeUnit.SECONDS); Assert.assertTrue(next != null); Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir")); } finally { cluster.shutdown(); } }
/** * Returns the next batch of events in the stream or null if no new * batches are currently available. * * @throws IOException because of network error or edit log * corruption. Also possible if JournalNodes are unresponsive in the * QJM setting (even one unresponsive JournalNode is enough in rare cases), * so catching this exception and retrying at least a few times is * recommended. * @throws MissingEventsException if we cannot return the next batch in the * stream because the data for the events (and possibly some subsequent * events) has been deleted (generally because this stream is a very large * number of transactions behind the current state of the NameNode). It is * safe to continue reading from the stream after this exception is thrown * The next available batch of events will be returned. */ public EventBatch poll() throws IOException, MissingEventsException { TraceScope scope = Trace.startSpan("inotifyPoll", traceSampler); try { // need to keep retrying until the NN sends us the latest committed txid if (lastReadTxid == -1) { LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN"); lastReadTxid = namenode.getCurrentEditLogTxid(); return null; } if (!it.hasNext()) { EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1); if (el.getLastTxid() != -1) { // we only want to set syncTxid when we were actually able to read some // edits on the NN -- otherwise it will seem like edits are being // generated faster than we can read them when the problem is really // that we are temporarily unable to read edits syncTxid = el.getSyncTxid(); it = el.getBatches().iterator(); long formerLastReadTxid = lastReadTxid; lastReadTxid = el.getLastTxid(); if (el.getFirstTxid() != formerLastReadTxid + 1) { throw new MissingEventsException(formerLastReadTxid + 1, el.getFirstTxid()); } } else { LOG.debug("poll(): read no edits from the NN when requesting edits " + "after txid {}", lastReadTxid); return null; } } if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the // newly seen edit log ops actually got converted to events return it.next(); } else { return null; } } finally { scope.close(); } }
private static EventBatch waitForNextEvents(DFSInotifyEventInputStream eis) throws IOException, MissingEventsException { EventBatch batch = null; while ((batch = eis.poll()) == null); return batch; }
@Test(timeout = 120000) public void testTwoActiveNNs() throws IOException, MissingEventsException { Configuration conf = new HdfsConfiguration(); MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build(); try { cluster.getDfsCluster().waitActive(); cluster.getDfsCluster().transitionToActive(0); DFSClient client0 = new DFSClient(cluster.getDfsCluster().getNameNode(0) .getNameNodeAddress(), conf); DFSClient client1 = new DFSClient(cluster.getDfsCluster().getNameNode(1) .getNameNodeAddress(), conf); DFSInotifyEventInputStream eis = client0.getInotifyEventStream(); for (int i = 0; i < 10; i++) { client0.mkdirs("/dir" + i, null, false); } cluster.getDfsCluster().transitionToActive(1); for (int i = 10; i < 20; i++) { client1.mkdirs("/dir" + i, null, false); } // make sure that the old active can't read any further than the edits // it logged itself (it has no idea whether the in-progress edits from // the other writer have actually been committed) EventBatch batch = null; for (int i = 0; i < 10; i++) { batch = waitForNextEvents(eis); Assert.assertEquals(1, batch.getEvents().length); Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE); Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" + i)); } Assert.assertTrue(eis.poll() == null); } finally { try { cluster.shutdown(); } catch (ExitUtil.ExitException e) { // expected because the old active will be unable to flush the // end-of-segment op since it is fenced } } }
public static EventBatch waitForNextEvents(DFSInotifyEventInputStream eis) throws IOException, MissingEventsException { EventBatch batch = null; while ((batch = eis.poll()) == null); return batch; }
private static Event waitForNextEvent(DFSInotifyEventInputStream eis) throws IOException, MissingEventsException { Event next = null; while ((next = eis.poll()) == null); return next; }
@Test(timeout = 120000) public void testTwoActiveNNs() throws IOException, MissingEventsException { Configuration conf = new HdfsConfiguration(); MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build(); try { cluster.getDfsCluster().waitActive(); cluster.getDfsCluster().transitionToActive(0); DFSClient client0 = new DFSClient(cluster.getDfsCluster().getNameNode(0) .getNameNodeAddress(), conf); DFSClient client1 = new DFSClient(cluster.getDfsCluster().getNameNode(1) .getNameNodeAddress(), conf); DFSInotifyEventInputStream eis = client0.getInotifyEventStream(); for (int i = 0; i < 10; i++) { client0.mkdirs("/dir" + i, null, false); } cluster.getDfsCluster().transitionToActive(1); for (int i = 10; i < 20; i++) { client1.mkdirs("/dir" + i, null, false); } // make sure that the old active can't read any further than the edits // it logged itself (it has no idea whether the in-progress edits from // the other writer have actually been committed) Event next = null; for (int i = 0; i < 10; i++) { next = waitForNextEvent(eis); Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" + i)); } Assert.assertTrue(eis.poll() == null); } finally { try { cluster.shutdown(); } catch (ExitUtil.ExitException e) { // expected because the old active will be unable to flush the // end-of-segment op since it is fenced } } }