@Override public EventBatchList getEditsFromTxid(long txid) throws IOException { GetEditsFromTxidRequestProto req = GetEditsFromTxidRequestProto.newBuilder() .setTxid(txid).build(); try { return PBHelper.convert(rpcProxy.getEditsFromTxid(null, req)); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
/** * Returns the next batch of events in the stream or null if no new * batches are currently available. * * @throws IOException because of network error or edit log * corruption. Also possible if JournalNodes are unresponsive in the * QJM setting (even one unresponsive JournalNode is enough in rare cases), * so catching this exception and retrying at least a few times is * recommended. * @throws MissingEventsException if we cannot return the next batch in the * stream because the data for the events (and possibly some subsequent * events) has been deleted (generally because this stream is a very large * number of transactions behind the current state of the NameNode). It is * safe to continue reading from the stream after this exception is thrown * The next available batch of events will be returned. */ public EventBatch poll() throws IOException, MissingEventsException { try (TraceScope ignored = tracer.newScope("inotifyPoll")) { // need to keep retrying until the NN sends us the latest committed txid if (lastReadTxid == -1) { LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN"); lastReadTxid = namenode.getCurrentEditLogTxid(); return null; } if (!it.hasNext()) { EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1); if (el.getLastTxid() != -1) { // we only want to set syncTxid when we were actually able to read some // edits on the NN -- otherwise it will seem like edits are being // generated faster than we can read them when the problem is really // that we are temporarily unable to read edits syncTxid = el.getSyncTxid(); it = el.getBatches().iterator(); long formerLastReadTxid = lastReadTxid; lastReadTxid = el.getLastTxid(); if (el.getFirstTxid() != formerLastReadTxid + 1) { throw new MissingEventsException(formerLastReadTxid + 1, el.getFirstTxid()); } } else { LOG.debug("poll(): read no edits from the NN when requesting edits " + "after txid {}", lastReadTxid); return null; } } if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the // newly seen edit log ops actually got converted to events return it.next(); } else { return null; } } }
@Override public EventBatchList getEditsFromTxid(long txid) throws IOException { GetEditsFromTxidRequestProto req = GetEditsFromTxidRequestProto.newBuilder() .setTxid(txid).build(); try { return PBHelperClient.convert(rpcProxy.getEditsFromTxid(null, req)); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
/** * Returns the next batch of events in the stream or null if no new * batches are currently available. * * @throws IOException because of network error or edit log * corruption. Also possible if JournalNodes are unresponsive in the * QJM setting (even one unresponsive JournalNode is enough in rare cases), * so catching this exception and retrying at least a few times is * recommended. * @throws MissingEventsException if we cannot return the next batch in the * stream because the data for the events (and possibly some subsequent * events) has been deleted (generally because this stream is a very large * number of transactions behind the current state of the NameNode). It is * safe to continue reading from the stream after this exception is thrown * The next available batch of events will be returned. */ public EventBatch poll() throws IOException, MissingEventsException { // need to keep retrying until the NN sends us the latest committed txid if (lastReadTxid == -1) { LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN"); lastReadTxid = namenode.getCurrentEditLogTxid(); return null; } if (!it.hasNext()) { EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1); if (el.getLastTxid() != -1) { // we only want to set syncTxid when we were actually able to read some // edits on the NN -- otherwise it will seem like edits are being // generated faster than we can read them when the problem is really // that we are temporarily unable to read edits syncTxid = el.getSyncTxid(); it = el.getBatches().iterator(); long formerLastReadTxid = lastReadTxid; lastReadTxid = el.getLastTxid(); if (el.getFirstTxid() != formerLastReadTxid + 1) { throw new MissingEventsException(formerLastReadTxid + 1, el.getFirstTxid()); } } else { LOG.debug("poll(): read no edits from the NN when requesting edits " + "after txid {}", lastReadTxid); return null; } } if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the // newly seen edit log ops actually got converted to events return it.next(); } else { return null; } }
@Override public EventBatchList getEditsFromTxid(long txid) throws IOException { try { AuthorizationProvider.beginClientOp(); return server.getEditsFromTxid(txid); } finally { AuthorizationProvider.endClientOp(); } }
/** * Returns the next batch of events in the stream or null if no new * batches are currently available. * * @throws IOException because of network error or edit log * corruption. Also possible if JournalNodes are unresponsive in the * QJM setting (even one unresponsive JournalNode is enough in rare cases), * so catching this exception and retrying at least a few times is * recommended. * @throws MissingEventsException if we cannot return the next batch in the * stream because the data for the events (and possibly some subsequent * events) has been deleted (generally because this stream is a very large * number of transactions behind the current state of the NameNode). It is * safe to continue reading from the stream after this exception is thrown * The next available batch of events will be returned. */ public EventBatch poll() throws IOException, MissingEventsException { TraceScope scope = Trace.startSpan("inotifyPoll", traceSampler); try { // need to keep retrying until the NN sends us the latest committed txid if (lastReadTxid == -1) { LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN"); lastReadTxid = namenode.getCurrentEditLogTxid(); return null; } if (!it.hasNext()) { EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1); if (el.getLastTxid() != -1) { // we only want to set syncTxid when we were actually able to read some // edits on the NN -- otherwise it will seem like edits are being // generated faster than we can read them when the problem is really // that we are temporarily unable to read edits syncTxid = el.getSyncTxid(); it = el.getBatches().iterator(); long formerLastReadTxid = lastReadTxid; lastReadTxid = el.getLastTxid(); if (el.getFirstTxid() != formerLastReadTxid + 1) { throw new MissingEventsException(formerLastReadTxid + 1, el.getFirstTxid()); } } else { LOG.debug("poll(): read no edits from the NN when requesting edits " + "after txid {}", lastReadTxid); return null; } } if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the // newly seen edit log ops actually got converted to events return it.next(); } else { return null; } } finally { scope.close(); } }
@Override public EventBatchList getEditsFromTxid(long txid) throws IOException { throw new IOException("Invalid operation, do not use proxy"); }
/** * Get an ordered list of batches of events corresponding to the edit log * transactions for txids equal to or greater than txid. */ @Idempotent public EventBatchList getEditsFromTxid(long txid) throws IOException;
/** * Get an ordered list of batches of events corresponding to the edit log * transactions for txids equal to or greater than txid. */ @Idempotent EventBatchList getEditsFromTxid(long txid) throws IOException;