Java 类org.apache.hadoop.hdfs.inotify.EventBatch 实例源码

项目:hadoop    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next batch of events in the stream, waiting indefinitely if
 * a new batch  is not immediately available.
 *
 * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
 * @throws MissingEventsException see
 * {@link DFSInotifyEventInputStream#poll()}
 * @throws InterruptedException if the calling thread is interrupted
 */
public EventBatch take() throws IOException, InterruptedException,
    MissingEventsException {
  TraceScope scope = Trace.startSpan("inotifyTake", traceSampler);
  EventBatch next = null;
  try {
    int nextWaitMin = INITIAL_WAIT_MS;
    while ((next = poll()) == null) {
      // sleep for a random period between nextWaitMin and nextWaitMin * 2
      // to avoid stampedes at the NN if there are multiple clients
      int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin);
      LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime);
      Thread.sleep(sleepTime);
      // the maximum sleep is 2 minutes
      nextWaitMin = Math.min(60000, nextWaitMin * 2);
    }
  } finally {
    scope.close();
  }

  return next;
}
项目:aliyun-oss-hadoop-fs    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next event batch in the stream, waiting up to the specified
 * amount of time for a new batch. Returns null if one is not available at the
 * end of the specified amount of time. The time before the method returns may
 * exceed the specified amount of time by up to the time required for an RPC
 * to the NameNode.
 *
 * @param time number of units of the given TimeUnit to wait
 * @param tu the desired TimeUnit
 * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
 * @throws MissingEventsException
 * see {@link DFSInotifyEventInputStream#poll()}
 * @throws InterruptedException if the calling thread is interrupted
 */
public EventBatch poll(long time, TimeUnit tu) throws IOException,
    InterruptedException, MissingEventsException {
  EventBatch next;
  try (TraceScope ignored = tracer.newScope("inotifyPollWithTimeout")) {
    long initialTime = Time.monotonicNow();
    long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
    long nextWait = INITIAL_WAIT_MS;
    while ((next = poll()) == null) {
      long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
      if (timeLeft <= 0) {
        LOG.debug("timed poll(): timed out");
        break;
      } else if (timeLeft < nextWait * 2) {
        nextWait = timeLeft;
      } else {
        nextWait *= 2;
      }
      LOG.debug("timed poll(): poll() returned null, sleeping for {} ms",
          nextWait);
      Thread.sleep(nextWait);
    }
  }
  return next;
}
项目:aliyun-oss-hadoop-fs    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next batch of events in the stream, waiting indefinitely if
 * a new batch  is not immediately available.
 *
 * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
 * @throws MissingEventsException see
 * {@link DFSInotifyEventInputStream#poll()}
 * @throws InterruptedException if the calling thread is interrupted
 */
public EventBatch take() throws IOException, InterruptedException,
    MissingEventsException {
  EventBatch next;
  try (TraceScope ignored = tracer.newScope("inotifyTake")) {
    int nextWaitMin = INITIAL_WAIT_MS;
    while ((next = poll()) == null) {
      // sleep for a random period between nextWaitMin and nextWaitMin * 2
      // to avoid stampedes at the NN if there are multiple clients
      int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin);
      LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime);
      Thread.sleep(sleepTime);
      // the maximum sleep is 2 minutes
      nextWaitMin = Math.min(60000, nextWaitMin * 2);
    }
  }

  return next;
}
项目:big-c    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next batch of events in the stream, waiting indefinitely if
 * a new batch  is not immediately available.
 *
 * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
 * @throws MissingEventsException see
 * {@link DFSInotifyEventInputStream#poll()}
 * @throws InterruptedException if the calling thread is interrupted
 */
public EventBatch take() throws IOException, InterruptedException,
    MissingEventsException {
  TraceScope scope = Trace.startSpan("inotifyTake", traceSampler);
  EventBatch next = null;
  try {
    int nextWaitMin = INITIAL_WAIT_MS;
    while ((next = poll()) == null) {
      // sleep for a random period between nextWaitMin and nextWaitMin * 2
      // to avoid stampedes at the NN if there are multiple clients
      int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin);
      LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime);
      Thread.sleep(sleepTime);
      // the maximum sleep is 2 minutes
      nextWaitMin = Math.min(60000, nextWaitMin * 2);
    }
  } finally {
    scope.close();
  }

  return next;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next event batch in the stream, waiting up to the specified
 * amount of time for a new batch. Returns null if one is not available at the
 * end of the specified amount of time. The time before the method returns may
 * exceed the specified amount of time by up to the time required for an RPC
 * to the NameNode.
 *
 * @param time number of units of the given TimeUnit to wait
 * @param tu the desired TimeUnit
 * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
 * @throws MissingEventsException
 * see {@link DFSInotifyEventInputStream#poll()}
 * @throws InterruptedException if the calling thread is interrupted
 */
public EventBatch poll(long time, TimeUnit tu) throws IOException,
    InterruptedException, MissingEventsException {
  long initialTime = Time.monotonicNow();
  long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
  long nextWait = INITIAL_WAIT_MS;
  EventBatch next = null;
  while ((next = poll()) == null) {
    long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
    if (timeLeft <= 0) {
      LOG.debug("timed poll(): timed out");
      break;
    } else if (timeLeft < nextWait * 2) {
      nextWait = timeLeft;
    } else {
      nextWait *= 2;
    }
    LOG.debug("timed poll(): poll() returned null, sleeping for {} ms",
        nextWait);
    Thread.sleep(nextWait);
  }

  return next;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next batch of events in the stream, waiting indefinitely if
 * a new batch  is not immediately available.
 *
 * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
 * @throws MissingEventsException see
 * {@link DFSInotifyEventInputStream#poll()}
 * @throws InterruptedException if the calling thread is interrupted
 */
public EventBatch take() throws IOException, InterruptedException,
    MissingEventsException {
  EventBatch next = null;
  int nextWaitMin = INITIAL_WAIT_MS;
  while ((next = poll()) == null) {
    // sleep for a random period between nextWaitMin and nextWaitMin * 2
    // to avoid stampedes at the NN if there are multiple clients
    int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin);
    LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime);
    Thread.sleep(sleepTime);
    // the maximum sleep is 2 minutes
    nextWaitMin = Math.min(60000, nextWaitMin * 2);
  }

  return next;
}
项目:hadoop    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next event batch in the stream, waiting up to the specified
 * amount of time for a new batch. Returns null if one is not available at the
 * end of the specified amount of time. The time before the method returns may
 * exceed the specified amount of time by up to the time required for an RPC
 * to the NameNode.
 *
 * @param time number of units of the given TimeUnit to wait
 * @param tu the desired TimeUnit
 * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
 * @throws MissingEventsException
 * see {@link DFSInotifyEventInputStream#poll()}
 * @throws InterruptedException if the calling thread is interrupted
 */
public EventBatch poll(long time, TimeUnit tu) throws IOException,
    InterruptedException, MissingEventsException {
  TraceScope scope = Trace.startSpan("inotifyPollWithTimeout", traceSampler);
  EventBatch next = null;
  try {
    long initialTime = Time.monotonicNow();
    long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
    long nextWait = INITIAL_WAIT_MS;
    while ((next = poll()) == null) {
      long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
      if (timeLeft <= 0) {
        LOG.debug("timed poll(): timed out");
        break;
      } else if (timeLeft < nextWait * 2) {
        nextWait = timeLeft;
      } else {
        nextWait *= 2;
      }
      LOG.debug("timed poll(): poll() returned null, sleeping for {} ms",
          nextWait);
      Thread.sleep(nextWait);
    }
  } finally {
    scope.close();
  }
  return next;
}
项目:hadoop    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testNNFailover() throws IOException, URISyntaxException,
    MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    DFSClient client = ((DistributedFileSystem) HATestUtil.configureFailoverFs
        (cluster.getDfsCluster(), conf)).dfs;
    DFSInotifyEventInputStream eis = client.getInotifyEventStream();
    for (int i = 0; i < 10; i++) {
      client.mkdirs("/dir" + i, null, false);
    }
    cluster.getDfsCluster().shutdownNameNode(0);
    cluster.getDfsCluster().transitionToActive(1);
    EventBatch batch = null;
    // we can read all of the edits logged by the old active from the new
    // active
    for (int i = 0; i < 10; i++) {
      batch = waitForNextEvents(eis);
      Assert.assertEquals(1, batch.getEvents().length);
      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
      Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
          i));
    }
    Assert.assertTrue(eis.poll() == null);
  } finally {
    cluster.shutdown();
  }
}
项目:hadoop    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testReadEventsWithTimeout() throws IOException,
    InterruptedException, MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    final DFSClient client = new DFSClient(cluster.getDfsCluster()
        .getNameNode(0).getNameNodeAddress(), conf);
    DFSInotifyEventInputStream eis = client.getInotifyEventStream();
    ScheduledExecutorService ex = Executors
        .newSingleThreadScheduledExecutor();
    ex.schedule(new Runnable() {
      @Override
      public void run() {
        try {
          client.mkdirs("/dir", null, false);
        } catch (IOException e) {
          // test will fail
          LOG.error("Unable to create /dir", e);
        }
      }
    }, 1, TimeUnit.SECONDS);
    // a very generous wait period -- the edit will definitely have been
    // processed by the time this is up
    EventBatch batch = eis.poll(5, TimeUnit.SECONDS);
    Assert.assertNotNull(batch);
    Assert.assertEquals(1, batch.getEvents().length);
    Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
    Assert.assertEquals("/dir", ((Event.CreateEvent) batch.getEvents()[0]).getPath());
  } finally {
    cluster.shutdown();
  }
}
项目:aliyun-oss-hadoop-fs    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next batch of events in the stream or null if no new
 * batches are currently available.
 *
 * @throws IOException because of network error or edit log
 * corruption. Also possible if JournalNodes are unresponsive in the
 * QJM setting (even one unresponsive JournalNode is enough in rare cases),
 * so catching this exception and retrying at least a few times is
 * recommended.
 * @throws MissingEventsException if we cannot return the next batch in the
 * stream because the data for the events (and possibly some subsequent
 * events) has been deleted (generally because this stream is a very large
 * number of transactions behind the current state of the NameNode). It is
 * safe to continue reading from the stream after this exception is thrown
 * The next available batch of events will be returned.
 */
public EventBatch poll() throws IOException, MissingEventsException {
  try (TraceScope ignored = tracer.newScope("inotifyPoll")) {
    // need to keep retrying until the NN sends us the latest committed txid
    if (lastReadTxid == -1) {
      LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
      lastReadTxid = namenode.getCurrentEditLogTxid();
      return null;
    }
    if (!it.hasNext()) {
      EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
      if (el.getLastTxid() != -1) {
        // we only want to set syncTxid when we were actually able to read some
        // edits on the NN -- otherwise it will seem like edits are being
        // generated faster than we can read them when the problem is really
        // that we are temporarily unable to read edits
        syncTxid = el.getSyncTxid();
        it = el.getBatches().iterator();
        long formerLastReadTxid = lastReadTxid;
        lastReadTxid = el.getLastTxid();
        if (el.getFirstTxid() != formerLastReadTxid + 1) {
          throw new MissingEventsException(formerLastReadTxid + 1,
              el.getFirstTxid());
        }
      } else {
        LOG.debug("poll(): read no edits from the NN when requesting edits " +
            "after txid {}", lastReadTxid);
        return null;
      }
    }

    if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the
      // newly seen edit log ops actually got converted to events
      return it.next();
    } else {
      return null;
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testNNFailover() throws IOException, URISyntaxException,
    MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    DFSClient client = ((DistributedFileSystem) HATestUtil.configureFailoverFs
        (cluster.getDfsCluster(), conf)).dfs;
    DFSInotifyEventInputStream eis = client.getInotifyEventStream();
    for (int i = 0; i < 10; i++) {
      client.mkdirs("/dir" + i, null, false);
    }
    cluster.getDfsCluster().shutdownNameNode(0);
    cluster.getDfsCluster().transitionToActive(1);
    EventBatch batch = null;
    // we can read all of the edits logged by the old active from the new
    // active
    for (int i = 0; i < 10; i++) {
      batch = waitForNextEvents(eis);
      Assert.assertEquals(1, batch.getEvents().length);
      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
      Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
          i));
    }
    Assert.assertTrue(eis.poll() == null);
  } finally {
    cluster.shutdown();
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testReadEventsWithTimeout() throws IOException,
    InterruptedException, MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    final DFSClient client = new DFSClient(cluster.getDfsCluster()
        .getNameNode(0).getNameNodeAddress(), conf);
    DFSInotifyEventInputStream eis = client.getInotifyEventStream();
    ScheduledExecutorService ex = Executors
        .newSingleThreadScheduledExecutor();
    ex.schedule(new Runnable() {
      @Override
      public void run() {
        try {
          client.mkdirs("/dir", null, false);
        } catch (IOException e) {
          // test will fail
          LOG.error("Unable to create /dir", e);
        }
      }
    }, 1, TimeUnit.SECONDS);
    // a very generous wait period -- the edit will definitely have been
    // processed by the time this is up
    EventBatch batch = eis.poll(5, TimeUnit.SECONDS);
    Assert.assertNotNull(batch);
    Assert.assertEquals(1, batch.getEvents().length);
    Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
    Assert.assertEquals("/dir", ((Event.CreateEvent) batch.getEvents()[0]).getPath());
  } finally {
    cluster.shutdown();
  }
}
项目:big-c    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next event batch in the stream, waiting up to the specified
 * amount of time for a new batch. Returns null if one is not available at the
 * end of the specified amount of time. The time before the method returns may
 * exceed the specified amount of time by up to the time required for an RPC
 * to the NameNode.
 *
 * @param time number of units of the given TimeUnit to wait
 * @param tu the desired TimeUnit
 * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
 * @throws MissingEventsException
 * see {@link DFSInotifyEventInputStream#poll()}
 * @throws InterruptedException if the calling thread is interrupted
 */
public EventBatch poll(long time, TimeUnit tu) throws IOException,
    InterruptedException, MissingEventsException {
  TraceScope scope = Trace.startSpan("inotifyPollWithTimeout", traceSampler);
  EventBatch next = null;
  try {
    long initialTime = Time.monotonicNow();
    long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
    long nextWait = INITIAL_WAIT_MS;
    while ((next = poll()) == null) {
      long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
      if (timeLeft <= 0) {
        LOG.debug("timed poll(): timed out");
        break;
      } else if (timeLeft < nextWait * 2) {
        nextWait = timeLeft;
      } else {
        nextWait *= 2;
      }
      LOG.debug("timed poll(): poll() returned null, sleeping for {} ms",
          nextWait);
      Thread.sleep(nextWait);
    }
  } finally {
    scope.close();
  }
  return next;
}
项目:big-c    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testNNFailover() throws IOException, URISyntaxException,
    MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    DFSClient client = ((DistributedFileSystem) HATestUtil.configureFailoverFs
        (cluster.getDfsCluster(), conf)).dfs;
    DFSInotifyEventInputStream eis = client.getInotifyEventStream();
    for (int i = 0; i < 10; i++) {
      client.mkdirs("/dir" + i, null, false);
    }
    cluster.getDfsCluster().shutdownNameNode(0);
    cluster.getDfsCluster().transitionToActive(1);
    EventBatch batch = null;
    // we can read all of the edits logged by the old active from the new
    // active
    for (int i = 0; i < 10; i++) {
      batch = waitForNextEvents(eis);
      Assert.assertEquals(1, batch.getEvents().length);
      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
      Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
          i));
    }
    Assert.assertTrue(eis.poll() == null);
  } finally {
    cluster.shutdown();
  }
}
项目:big-c    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testReadEventsWithTimeout() throws IOException,
    InterruptedException, MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    final DFSClient client = new DFSClient(cluster.getDfsCluster()
        .getNameNode(0).getNameNodeAddress(), conf);
    DFSInotifyEventInputStream eis = client.getInotifyEventStream();
    ScheduledExecutorService ex = Executors
        .newSingleThreadScheduledExecutor();
    ex.schedule(new Runnable() {
      @Override
      public void run() {
        try {
          client.mkdirs("/dir", null, false);
        } catch (IOException e) {
          // test will fail
          LOG.error("Unable to create /dir", e);
        }
      }
    }, 1, TimeUnit.SECONDS);
    // a very generous wait period -- the edit will definitely have been
    // processed by the time this is up
    EventBatch batch = eis.poll(5, TimeUnit.SECONDS);
    Assert.assertNotNull(batch);
    Assert.assertEquals(1, batch.getEvents().length);
    Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
    Assert.assertEquals("/dir", ((Event.CreateEvent) batch.getEvents()[0]).getPath());
  } finally {
    cluster.shutdown();
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next batch of events in the stream or null if no new
 * batches are currently available.
 *
 * @throws IOException because of network error or edit log
 * corruption. Also possible if JournalNodes are unresponsive in the
 * QJM setting (even one unresponsive JournalNode is enough in rare cases),
 * so catching this exception and retrying at least a few times is
 * recommended.
 * @throws MissingEventsException if we cannot return the next batch in the
 * stream because the data for the events (and possibly some subsequent
 * events) has been deleted (generally because this stream is a very large
 * number of transactions behind the current state of the NameNode). It is
 * safe to continue reading from the stream after this exception is thrown
 * The next available batch of events will be returned.
 */
public EventBatch poll() throws IOException, MissingEventsException {
  // need to keep retrying until the NN sends us the latest committed txid
  if (lastReadTxid == -1) {
    LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
    lastReadTxid = namenode.getCurrentEditLogTxid();
    return null;
  }
  if (!it.hasNext()) {
    EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
    if (el.getLastTxid() != -1) {
      // we only want to set syncTxid when we were actually able to read some
      // edits on the NN -- otherwise it will seem like edits are being
      // generated faster than we can read them when the problem is really
      // that we are temporarily unable to read edits
      syncTxid = el.getSyncTxid();
      it = el.getBatches().iterator();
      long formerLastReadTxid = lastReadTxid;
      lastReadTxid = el.getLastTxid();
      if (el.getFirstTxid() != formerLastReadTxid + 1) {
        throw new MissingEventsException(formerLastReadTxid + 1,
            el.getFirstTxid());
      }
    } else {
      LOG.debug("poll(): read no edits from the NN when requesting edits " +
        "after txid {}", lastReadTxid);
      return null;
    }
  }

  if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the
    // newly seen edit log ops actually got converted to events
    return it.next();
  } else {
    return null;
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testNNFailover() throws IOException, URISyntaxException,
    MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    DFSClient client = ((DistributedFileSystem) HATestUtil.configureFailoverFs
        (cluster.getDfsCluster(), conf)).dfs;
    DFSInotifyEventInputStream eis = client.getInotifyEventStream();
    for (int i = 0; i < 10; i++) {
      client.mkdirs("/dir" + i, null, false);
    }
    cluster.getDfsCluster().shutdownNameNode(0);
    cluster.getDfsCluster().transitionToActive(1);
    EventBatch batch = null;
    // we can read all of the edits logged by the old active from the new
    // active
    for (int i = 0; i < 10; i++) {
      batch = waitForNextEvents(eis);
      Assert.assertEquals(1, batch.getEvents().length);
      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
      Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
          i));
    }
    Assert.assertTrue(eis.poll() == null);
  } finally {
    cluster.shutdown();
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testReadEventsWithTimeout() throws IOException,
    InterruptedException, MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    final DFSClient client = new DFSClient(cluster.getDfsCluster()
        .getNameNode(0).getNameNodeAddress(), conf);
    DFSInotifyEventInputStream eis = client.getInotifyEventStream();
    ScheduledExecutorService ex = Executors
        .newSingleThreadScheduledExecutor();
    ex.schedule(new Runnable() {
      @Override
      public void run() {
        try {
          client.mkdirs("/dir", null, false);
        } catch (IOException e) {
          // test will fail
          LOG.error("Unable to create /dir", e);
        }
      }
    }, 1, TimeUnit.SECONDS);
    // a very generous wait period -- the edit will definitely have been
    // processed by the time this is up
    EventBatch batch = eis.poll(5, TimeUnit.SECONDS);
    Assert.assertNotNull(batch);
    Assert.assertEquals(1, batch.getEvents().length);
    Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
    Assert.assertEquals("/dir", ((Event.CreateEvent) batch.getEvents()[0]).getPath());
  } finally {
    cluster.shutdown();
  }
}
项目:hadoop    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next batch of events in the stream or null if no new
 * batches are currently available.
 *
 * @throws IOException because of network error or edit log
 * corruption. Also possible if JournalNodes are unresponsive in the
 * QJM setting (even one unresponsive JournalNode is enough in rare cases),
 * so catching this exception and retrying at least a few times is
 * recommended.
 * @throws MissingEventsException if we cannot return the next batch in the
 * stream because the data for the events (and possibly some subsequent
 * events) has been deleted (generally because this stream is a very large
 * number of transactions behind the current state of the NameNode). It is
 * safe to continue reading from the stream after this exception is thrown
 * The next available batch of events will be returned.
 */
public EventBatch poll() throws IOException, MissingEventsException {
  TraceScope scope =
      Trace.startSpan("inotifyPoll", traceSampler);
  try {
    // need to keep retrying until the NN sends us the latest committed txid
    if (lastReadTxid == -1) {
      LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
      lastReadTxid = namenode.getCurrentEditLogTxid();
      return null;
    }
    if (!it.hasNext()) {
      EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
      if (el.getLastTxid() != -1) {
        // we only want to set syncTxid when we were actually able to read some
        // edits on the NN -- otherwise it will seem like edits are being
        // generated faster than we can read them when the problem is really
        // that we are temporarily unable to read edits
        syncTxid = el.getSyncTxid();
        it = el.getBatches().iterator();
        long formerLastReadTxid = lastReadTxid;
        lastReadTxid = el.getLastTxid();
        if (el.getFirstTxid() != formerLastReadTxid + 1) {
          throw new MissingEventsException(formerLastReadTxid + 1,
              el.getFirstTxid());
        }
      } else {
        LOG.debug("poll(): read no edits from the NN when requesting edits " +
          "after txid {}", lastReadTxid);
        return null;
      }
    }

    if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the
      // newly seen edit log ops actually got converted to events
      return it.next();
    } else {
      return null;
    }
  } finally {
    scope.close();
  }
}
项目:hadoop    文件:TestDFSUpgrade.java   
@Test
public void testPreserveEditLogs() throws Exception {
  conf = new HdfsConfiguration();
  conf = UpgradeUtilities.initializeStorageStateConf(1, conf);
  String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
  conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION, false);

  log("Normal NameNode upgrade", 1);
  File[] created =
      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
  for (final File createdDir : created) {
    List<String> fileNameList =
        IOUtils.listDirectory(createdDir, EditLogsFilter.INSTANCE);
    for (String fileName : fileNameList) {
      String tmpFileName = fileName + ".tmp";
      File existingFile = new File(createdDir, fileName);
      File tmpFile = new File(createdDir, tmpFileName);
      Files.move(existingFile.toPath(), tmpFile.toPath());
      File newFile = new File(createdDir, fileName);
      Preconditions.checkState(newFile.createNewFile(),
          "Cannot create new edits log file in " + createdDir);
      EditLogFileInputStream in = new EditLogFileInputStream(tmpFile,
          HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID,
          false);
      EditLogFileOutputStream out = new EditLogFileOutputStream(conf, newFile,
          (int)tmpFile.length());
      out.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION + 1);
      FSEditLogOp logOp = in.readOp();
      while (logOp != null) {
        out.write(logOp);
        logOp = in.readOp();
      }
      out.setReadyToFlush();
      out.flushAndSync(true);
      out.close();
      Files.delete(tmpFile.toPath());
    }
  }

  cluster = createCluster();

  DFSInotifyEventInputStream ieis =
      cluster.getFileSystem().getInotifyEventStream(0);
  EventBatch batch = ieis.poll();
  Event[] events = batch.getEvents();
  assertTrue("Should be able to get transactions before the upgrade.",
      events.length > 0);
  assertEquals(events[0].getEventType(), Event.EventType.CREATE);
  assertEquals(((CreateEvent) events[0]).getPath(), "/TestUpgrade");
  cluster.shutdown();
  UpgradeUtilities.createEmptyDirs(nameNodeDirs);
}
项目:hadoop    文件:TestDFSInotifyEventInputStream.java   
private static EventBatch waitForNextEvents(DFSInotifyEventInputStream eis)
  throws IOException, MissingEventsException {
  EventBatch batch = null;
  while ((batch = eis.poll()) == null);
  return batch;
}
项目:hadoop    文件:TestDFSInotifyEventInputStream.java   
private static long checkTxid(EventBatch batch, long prevTxid){
  Assert.assertTrue("Previous txid " + prevTxid + " was not less than " +
      "new txid " + batch.getTxid(), prevTxid < batch.getTxid());
  return batch.getTxid();
}
项目:hadoop    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testTwoActiveNNs() throws IOException, MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    DFSClient client0 = new DFSClient(cluster.getDfsCluster().getNameNode(0)
        .getNameNodeAddress(), conf);
    DFSClient client1 = new DFSClient(cluster.getDfsCluster().getNameNode(1)
        .getNameNodeAddress(), conf);
    DFSInotifyEventInputStream eis = client0.getInotifyEventStream();
    for (int i = 0; i < 10; i++) {
      client0.mkdirs("/dir" + i, null, false);
    }

    cluster.getDfsCluster().transitionToActive(1);
    for (int i = 10; i < 20; i++) {
      client1.mkdirs("/dir" + i, null, false);
    }

    // make sure that the old active can't read any further than the edits
    // it logged itself (it has no idea whether the in-progress edits from
    // the other writer have actually been committed)
    EventBatch batch = null;
    for (int i = 0; i < 10; i++) {
      batch = waitForNextEvents(eis);
      Assert.assertEquals(1, batch.getEvents().length);
      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
      Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
          i));
    }
    Assert.assertTrue(eis.poll() == null);
  } finally {
    try {
      cluster.shutdown();
    } catch (ExitUtil.ExitException e) {
      // expected because the old active will be unable to flush the
      // end-of-segment op since it is fenced
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件:TestDFSInotifyEventInputStream.java   
public static EventBatch waitForNextEvents(DFSInotifyEventInputStream eis)
  throws IOException, MissingEventsException {
  EventBatch batch = null;
  while ((batch = eis.poll()) == null);
  return batch;
}
项目:aliyun-oss-hadoop-fs    文件:TestDFSInotifyEventInputStream.java   
private static long checkTxid(EventBatch batch, long prevTxid){
  Assert.assertTrue("Previous txid " + prevTxid + " was not less than " +
      "new txid " + batch.getTxid(), prevTxid < batch.getTxid());
  return batch.getTxid();
}
项目:aliyun-oss-hadoop-fs    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testTwoActiveNNs() throws IOException, MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    DFSClient client0 = new DFSClient(cluster.getDfsCluster().getNameNode(0)
        .getNameNodeAddress(), conf);
    DFSClient client1 = new DFSClient(cluster.getDfsCluster().getNameNode(1)
        .getNameNodeAddress(), conf);
    DFSInotifyEventInputStream eis = client0.getInotifyEventStream();
    for (int i = 0; i < 10; i++) {
      client0.mkdirs("/dir" + i, null, false);
    }

    cluster.getDfsCluster().transitionToActive(1);
    for (int i = 10; i < 20; i++) {
      client1.mkdirs("/dir" + i, null, false);
    }

    // make sure that the old active can't read any further than the edits
    // it logged itself (it has no idea whether the in-progress edits from
    // the other writer have actually been committed)
    EventBatch batch = null;
    for (int i = 0; i < 10; i++) {
      batch = waitForNextEvents(eis);
      Assert.assertEquals(1, batch.getEvents().length);
      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
      Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
          i));
    }
    Assert.assertTrue(eis.poll() == null);
  } finally {
    try {
      cluster.shutdown();
    } catch (ExitUtil.ExitException e) {
      // expected because the old active will be unable to flush the
      // end-of-segment op since it is fenced
    }
  }
}
项目:big-c    文件:DFSInotifyEventInputStream.java   
/**
 * Returns the next batch of events in the stream or null if no new
 * batches are currently available.
 *
 * @throws IOException because of network error or edit log
 * corruption. Also possible if JournalNodes are unresponsive in the
 * QJM setting (even one unresponsive JournalNode is enough in rare cases),
 * so catching this exception and retrying at least a few times is
 * recommended.
 * @throws MissingEventsException if we cannot return the next batch in the
 * stream because the data for the events (and possibly some subsequent
 * events) has been deleted (generally because this stream is a very large
 * number of transactions behind the current state of the NameNode). It is
 * safe to continue reading from the stream after this exception is thrown
 * The next available batch of events will be returned.
 */
public EventBatch poll() throws IOException, MissingEventsException {
  TraceScope scope =
      Trace.startSpan("inotifyPoll", traceSampler);
  try {
    // need to keep retrying until the NN sends us the latest committed txid
    if (lastReadTxid == -1) {
      LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
      lastReadTxid = namenode.getCurrentEditLogTxid();
      return null;
    }
    if (!it.hasNext()) {
      EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
      if (el.getLastTxid() != -1) {
        // we only want to set syncTxid when we were actually able to read some
        // edits on the NN -- otherwise it will seem like edits are being
        // generated faster than we can read them when the problem is really
        // that we are temporarily unable to read edits
        syncTxid = el.getSyncTxid();
        it = el.getBatches().iterator();
        long formerLastReadTxid = lastReadTxid;
        lastReadTxid = el.getLastTxid();
        if (el.getFirstTxid() != formerLastReadTxid + 1) {
          throw new MissingEventsException(formerLastReadTxid + 1,
              el.getFirstTxid());
        }
      } else {
        LOG.debug("poll(): read no edits from the NN when requesting edits " +
          "after txid {}", lastReadTxid);
        return null;
      }
    }

    if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the
      // newly seen edit log ops actually got converted to events
      return it.next();
    } else {
      return null;
    }
  } finally {
    scope.close();
  }
}
项目:big-c    文件:TestDFSUpgrade.java   
@Test
public void testPreserveEditLogs() throws Exception {
  conf = new HdfsConfiguration();
  conf = UpgradeUtilities.initializeStorageStateConf(1, conf);
  String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
  conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION, false);

  log("Normal NameNode upgrade", 1);
  File[] created =
      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
  for (final File createdDir : created) {
    List<String> fileNameList =
        IOUtils.listDirectory(createdDir, EditLogsFilter.INSTANCE);
    for (String fileName : fileNameList) {
      String tmpFileName = fileName + ".tmp";
      File existingFile = new File(createdDir, fileName);
      File tmpFile = new File(createdDir, tmpFileName);
      Files.move(existingFile.toPath(), tmpFile.toPath());
      File newFile = new File(createdDir, fileName);
      Preconditions.checkState(newFile.createNewFile(),
          "Cannot create new edits log file in " + createdDir);
      EditLogFileInputStream in = new EditLogFileInputStream(tmpFile,
          HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID,
          false);
      EditLogFileOutputStream out = new EditLogFileOutputStream(conf, newFile,
          (int)tmpFile.length());
      out.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION + 1);
      FSEditLogOp logOp = in.readOp();
      while (logOp != null) {
        out.write(logOp);
        logOp = in.readOp();
      }
      out.setReadyToFlush();
      out.flushAndSync(true);
      out.close();
      Files.delete(tmpFile.toPath());
    }
  }

  cluster = createCluster();

  DFSInotifyEventInputStream ieis =
      cluster.getFileSystem().getInotifyEventStream(0);
  EventBatch batch = ieis.poll();
  Event[] events = batch.getEvents();
  assertTrue("Should be able to get transactions before the upgrade.",
      events.length > 0);
  assertEquals(events[0].getEventType(), Event.EventType.CREATE);
  assertEquals(((CreateEvent) events[0]).getPath(), "/TestUpgrade");
  cluster.shutdown();
  UpgradeUtilities.createEmptyDirs(nameNodeDirs);
}
项目:big-c    文件:TestDFSInotifyEventInputStream.java   
private static EventBatch waitForNextEvents(DFSInotifyEventInputStream eis)
  throws IOException, MissingEventsException {
  EventBatch batch = null;
  while ((batch = eis.poll()) == null);
  return batch;
}
项目:big-c    文件:TestDFSInotifyEventInputStream.java   
private static long checkTxid(EventBatch batch, long prevTxid){
  Assert.assertTrue("Previous txid " + prevTxid + " was not less than " +
      "new txid " + batch.getTxid(), prevTxid < batch.getTxid());
  return batch.getTxid();
}
项目:big-c    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testTwoActiveNNs() throws IOException, MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    DFSClient client0 = new DFSClient(cluster.getDfsCluster().getNameNode(0)
        .getNameNodeAddress(), conf);
    DFSClient client1 = new DFSClient(cluster.getDfsCluster().getNameNode(1)
        .getNameNodeAddress(), conf);
    DFSInotifyEventInputStream eis = client0.getInotifyEventStream();
    for (int i = 0; i < 10; i++) {
      client0.mkdirs("/dir" + i, null, false);
    }

    cluster.getDfsCluster().transitionToActive(1);
    for (int i = 10; i < 20; i++) {
      client1.mkdirs("/dir" + i, null, false);
    }

    // make sure that the old active can't read any further than the edits
    // it logged itself (it has no idea whether the in-progress edits from
    // the other writer have actually been committed)
    EventBatch batch = null;
    for (int i = 0; i < 10; i++) {
      batch = waitForNextEvents(eis);
      Assert.assertEquals(1, batch.getEvents().length);
      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
      Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
          i));
    }
    Assert.assertTrue(eis.poll() == null);
  } finally {
    try {
      cluster.shutdown();
    } catch (ExitUtil.ExitException e) {
      // expected because the old active will be unable to flush the
      // end-of-segment op since it is fenced
    }
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestDFSInotifyEventInputStream.java   
private static EventBatch waitForNextEvents(DFSInotifyEventInputStream eis)
  throws IOException, MissingEventsException {
  EventBatch batch = null;
  while ((batch = eis.poll()) == null);
  return batch;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestDFSInotifyEventInputStream.java   
private static long checkTxid(EventBatch batch, long prevTxid){
  Assert.assertTrue("Previous txid " + prevTxid + " was not less than " +
      "new txid " + batch.getTxid(), prevTxid < batch.getTxid());
  return batch.getTxid();
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestDFSInotifyEventInputStream.java   
@Test(timeout = 120000)
public void testTwoActiveNNs() throws IOException, MissingEventsException {
  Configuration conf = new HdfsConfiguration();
  MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();

  try {
    cluster.getDfsCluster().waitActive();
    cluster.getDfsCluster().transitionToActive(0);
    DFSClient client0 = new DFSClient(cluster.getDfsCluster().getNameNode(0)
        .getNameNodeAddress(), conf);
    DFSClient client1 = new DFSClient(cluster.getDfsCluster().getNameNode(1)
        .getNameNodeAddress(), conf);
    DFSInotifyEventInputStream eis = client0.getInotifyEventStream();
    for (int i = 0; i < 10; i++) {
      client0.mkdirs("/dir" + i, null, false);
    }

    cluster.getDfsCluster().transitionToActive(1);
    for (int i = 10; i < 20; i++) {
      client1.mkdirs("/dir" + i, null, false);
    }

    // make sure that the old active can't read any further than the edits
    // it logged itself (it has no idea whether the in-progress edits from
    // the other writer have actually been committed)
    EventBatch batch = null;
    for (int i = 0; i < 10; i++) {
      batch = waitForNextEvents(eis);
      Assert.assertEquals(1, batch.getEvents().length);
      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
      Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
          i));
    }
    Assert.assertTrue(eis.poll() == null);
  } finally {
    try {
      cluster.shutdown();
    } catch (ExitUtil.ExitException e) {
      // expected because the old active will be unable to flush the
      // end-of-segment op since it is fenced
    }
  }
}