/** * Call the server once only. * {@link RetryingCallable} has a strange shape so we can do retrys. Use this invocation if you * want to do a single call only (A call to {@link RetryingCallable#call(int)} will not likely * succeed). * @return an object of type T * @throws IOException if a remote or network exception occurs * @throws RuntimeException other unspecified error */ public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout) throws IOException, RuntimeException { // The code of this method should be shared with withRetries. this.globalStartTime = EnvironmentEdgeManager.currentTime(); try { callable.prepare(false); return callable.call(callTimeout); } catch (Throwable t) { Throwable t2 = translateException(t); ExceptionUtil.rethrowIfInterrupt(t2); // It would be nice to clear the location cache here. if (t2 instanceof IOException) { throw (IOException)t2; } else { throw new RuntimeException(t2); } } }
/** * Handle connection failures * * If the current number of retries is equal to the max number of retries, * stop retrying and throw the exception; Otherwise backoff N seconds and * try connecting again. * * This Method is only called from inside setupIOstreams(), which is * synchronized. Hence the sleep is synchronized; the locks will be retained. * * @param curRetries current number of retries * @param maxRetries max number of retries allowed * @param ioe failure reason * @throws IOException if max number of retries is reached */ private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe) throws IOException { closeConnection(); // throw the exception if the maximum number of retries is reached if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) { throw ioe; } // otherwise back off and retry try { Thread.sleep(failureSleep); } catch (InterruptedException ie) { ExceptionUtil.rethrowIfInterrupt(ie); } LOG.info("Retrying connect to server: " + remoteId.getAddress() + " after sleeping " + failureSleep + "ms. Already tried " + curRetries + " time(s)."); }
/** * Call the server once only. * {@link RetryingCallable} has a strange shape so we can do retrys. Use this invocation if you * want to do a single call only (A call to {@link RetryingCallable#call(int)} will not likely * succeed). * @return an object of type T * @throws IOException if a remote or network exception occurs * @throws RuntimeException other unspecified error */ public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout) throws IOException, RuntimeException { // The code of this method should be shared with withRetries. this.globalStartTime = EnvironmentEdgeManager.currentTime(); try { callable.prepare(false);//call 的准备工作 return callable.call(callTimeout);//具体的call调用 } catch (Throwable t) { Throwable t2 = translateException(t); ExceptionUtil.rethrowIfInterrupt(t2); // It would be nice to clear the location cache here. if (t2 instanceof IOException) { throw (IOException)t2; } else { throw new RuntimeException(t2); } } }
/** * Call the server once only. * {@link RetryingCallable} has a strange shape so we can do retrys. Use this invocation if you * want to do a single call only (A call to {@link RetryingCallable#call()} will not likely * succeed). * @return an object of type T * @throws IOException if a remote or network exception occurs * @throws RuntimeException other unspecified error */ public T callWithoutRetries(RetryingCallable<T> callable) throws IOException, RuntimeException { // The code of this method should be shared with withRetries. this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis(); try { beforeCall(); callable.prepare(false); return callable.call(); } catch (Throwable t) { Throwable t2 = translateException(t); ExceptionUtil.rethrowIfInterrupt(t2); // It would be nice to clear the location cache here. if (t2 instanceof IOException) { throw (IOException)t2; } else { throw new RuntimeException(t2); } } finally { afterCall(); } }
@Override public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout) throws IOException, RuntimeException { // The code of this method should be shared with withRetries. try { callable.prepare(false); return callable.call(callTimeout); } catch (Throwable t) { Throwable t2 = translateException(t); ExceptionUtil.rethrowIfInterrupt(t2); // It would be nice to clear the location cache here. if (t2 instanceof IOException) { throw (IOException)t2; } else { throw new RuntimeException(t2); } } }
/** * Handle connection failures If the current number of retries is equal to the max number of * retries, stop retrying and throw the exception; Otherwise backoff N seconds and try connecting * again. This Method is only called from inside setupIOstreams(), which is synchronized. Hence * the sleep is synchronized; the locks will be retained. * @param curRetries current number of retries * @param maxRetries max number of retries allowed * @param ioe failure reason * @throws IOException if max number of retries is reached */ private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe) throws IOException { closeSocket(); // throw the exception if the maximum number of retries is reached if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) { throw ioe; } // otherwise back off and retry try { Thread.sleep(this.rpcClient.failureSleep); } catch (InterruptedException ie) { ExceptionUtil.rethrowIfInterrupt(ie); } if (LOG.isInfoEnabled()) { LOG.info("Retrying connect to server: " + remoteId.getAddress() + " after sleeping " + this.rpcClient.failureSleep + "ms. Already tried " + curRetries + " time(s)."); } }
/** * Call the server once only. * {@link RetryingCallable} has a strange shape so we can do retrys. Use this invocation if you * want to do a single call only (A call to {@link RetryingCallable#call(int)} will not likely * succeed). * @return an object of type T * @throws IOException if a remote or network exception occurs * @throws RuntimeException other unspecified error */ public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout) throws IOException, RuntimeException { // The code of this method should be shared with withRetries. this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis(); try { callable.prepare(false); return callable.call(callTimeout); } catch (Throwable t) { Throwable t2 = translateException(t); ExceptionUtil.rethrowIfInterrupt(t2); // It would be nice to clear the location cache here. if (t2 instanceof IOException) { throw (IOException)t2; } else { throw new RuntimeException(t2); } } }
@Override public void run() { try { LOG.info("SplitLogWorker " + server.getServerName() + " starting"); coordination.registerListener(); // wait for Coordination Engine is ready boolean res = false; while (!res && !coordination.isStop()) { res = coordination.isReady(); } if (!coordination.isStop()) { coordination.taskLoop(); } } catch (Throwable t) { if (ExceptionUtil.isInterrupt(t)) { LOG.info("SplitLogWorker interrupted. Exiting. " + (coordination.isStop() ? "" : " (ERROR: exitWorker is not set, exiting anyway)")); } else { // only a logical error can cause here. Printing it out // to make debugging easier LOG.error("unexpected error ", t); } } finally { coordination.removeListener(); LOG.info("SplitLogWorker " + server.getServerName() + " exiting"); } }
/** * Return the IOException thrown by the remote server wrapped in * ServiceException as cause. * * @param se ServiceException that wraps IO exception thrown by the server * @return Exception wrapped in ServiceException or * a new IOException that wraps the unexpected ServiceException. */ public static IOException getRemoteException(ServiceException se) { Throwable e = se.getCause(); if (e == null) { return new IOException(se); } if (ExceptionUtil.isInterrupt(e)) { return ExceptionUtil.asInterrupt(e); } if (e instanceof RemoteException) { e = ((RemoteException) e).unwrapRemoteException(); } return e instanceof IOException ? (IOException) e : new IOException(se); }
/** * Create a stub. Try once only. It is not typed because there is no common type to * protobuf services nor their interfaces. Let the caller do appropriate casting. * @return A stub for master services. * @throws IOException * @throws KeeperException * @throws ServiceException */ private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException { ZooKeeperKeepAliveConnection zkw; try { zkw = getKeepAliveZooKeeperWatcher(); } catch (IOException e) { ExceptionUtil.rethrowIfInterrupt(e); throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e); } try { checkIfBaseNodeAvailable(zkw); ServerName sn = MasterAddressTracker.getMasterAddress(zkw); if (sn == null) { String msg = "ZooKeeper available but no active master location found"; LOG.info(msg); throw new MasterNotRunningException(msg); } if (isDeadServer(sn)) { throw new MasterNotRunningException(sn + " is dead."); } // Use the security info interface name as our stub key String key = getStubKey(getServiceName(), sn.getHostname(), sn.getPort(), hostnamesCanChange); connectionLock.putIfAbsent(key, key); Object stub = null; synchronized (connectionLock.get(key)) { stub = stubs.get(key); if (stub == null) { BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); stub = makeStub(channel); isMasterRunning(); stubs.put(key, stub); } } return stub; } finally { zkw.close(); } }
/** * Set failed * * @param exception to set */ public void setFailed(IOException exception) { if (ExceptionUtil.isInterrupt(exception)) { exception = ExceptionUtil.asInterrupt(exception); } if (exception instanceof RemoteException) { exception = ((RemoteException) exception).unwrapRemoteException(); } this.setFailure(exception); }
/** * Create a stub. Try once only. It is not typed because there is no common type to * protobuf services nor their interfaces. Let the caller do appropriate casting. * * @return A stub for master services. * @throws IOException * @throws KeeperException * @throws ServiceException */ private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException { ZooKeeperKeepAliveConnection zkw; try { zkw = getKeepAliveZooKeeperWatcher(); } catch (IOException e) { ExceptionUtil.rethrowIfInterrupt(e); throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e); } try { checkIfBaseNodeAvailable(zkw); ServerName sn = MasterAddressTracker.getMasterAddress(zkw); if (sn == null) { String msg = "ZooKeeper available but no active master location found"; LOG.info(msg); throw new MasterNotRunningException(msg); } if (isDeadServer(sn)) { throw new MasterNotRunningException(sn + " is dead."); } // Use the security info interface name as our stub key String key = getStubKey(getServiceName(), sn.getHostAndPort()); connectionLock.putIfAbsent(key, key); Object stub = null; synchronized (connectionLock.get(key)) { stub = stubs.get(key); if (stub == null) { BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); stub = makeStub(channel); isMasterRunning(); stubs.put(key, stub); } } return stub; } finally { zkw.close(); } }
/** * Create a stub. Try once only. It is not typed because there is no common type to * protobuf services nor their interfaces. Let the caller do appropriate casting. * @return A stub for master services. * @throws IOException * @throws KeeperException * @throws ServiceException */ private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException { ZooKeeperKeepAliveConnection zkw; try { zkw = getKeepAliveZooKeeperWatcher(); } catch (IOException e) { ExceptionUtil.rethrowIfInterrupt(e); throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e); } try { checkIfBaseNodeAvailable(zkw); ServerName sn = MasterAddressTracker.getMasterAddress(zkw); if (sn == null) { String msg = "ZooKeeper available but no active master location found"; LOG.info(msg); throw new MasterNotRunningException(msg); } if (isDeadServer(sn)) { throw new MasterNotRunningException(sn + " is dead."); } // Use the security info interface name as our stub key String key = getStubKey(getServiceName(), sn.getHostAndPort()); connectionLock.putIfAbsent(key, key); Object stub = null; synchronized (connectionLock.get(key)) { stub = stubs.get(key); if (stub == null) { BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); stub = makeStub(channel); isMasterRunning(); stubs.put(key, stub); } } return stub; } finally { zkw.close(); } }
/** * Return the Exception thrown by the remote server wrapped in * ServiceException as cause. RemoteException are left untouched. * * @param e ServiceException that wraps IO exception thrown by the server * @return Exception wrapped in ServiceException. */ public static IOException getServiceException(org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) { Throwable t = e.getCause(); if (ExceptionUtil.isInterrupt(t)) { return ExceptionUtil.asInterrupt(t); } return t instanceof IOException ? (IOException) t : new HBaseIOException(t); }
private static IOException makeIOExceptionOfException(Exception e) { Throwable t = e; if (e instanceof ServiceException || e instanceof org.apache.hbase.thirdparty.com.google.protobuf.ServiceException) { t = e.getCause(); } if (ExceptionUtil.isInterrupt(t)) { return ExceptionUtil.asInterrupt(t); } if (t instanceof RemoteException) { t = ((RemoteException)t).unwrapRemoteException(); } return t instanceof IOException? (IOException)t: new HBaseIOException(t); }
private static IOException makeIOExceptionOfException(Exception e) { Throwable t = e; if (e instanceof ServiceException) { t = e.getCause(); } if (ExceptionUtil.isInterrupt(t)) { return ExceptionUtil.asInterrupt(t); } if (t instanceof RemoteException) { t = ((RemoteException)t).unwrapRemoteException(); } return t instanceof IOException? (IOException)t: new HBaseIOException(t); }
@Override protected boolean nextScanner(int nbRows, final boolean done) throws IOException { // Close the previous scanner if it's open if (this.callable != null) { this.callable.setClose(); // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, // we do a callWithRetries this.caller.callWithoutRetries(callable, scannerTimeout); this.callable = null; } // Where to start the next scanner byte[] localStartKey; boolean locateTheClosestFrontRow = true; // if we're at start of table, close and return false to stop iterating if (this.currentRegion != null) { byte[] startKey = this.currentRegion.getStartKey(); if (startKey == null || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY) || checkScanStopRow(startKey) || done) { close(); if (LOG.isDebugEnabled()) { LOG.debug("Finished " + this.currentRegion); } return false; } localStartKey = startKey; if (LOG.isDebugEnabled()) { LOG.debug("Finished " + this.currentRegion); } } else { localStartKey = this.scan.getStartRow(); if (!Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY)) { locateTheClosestFrontRow = false; } } if (LOG.isDebugEnabled() && this.currentRegion != null) { // Only worth logging if NOT first region in scan. LOG.debug("Advancing internal scanner to startKey at '" + Bytes.toStringBinary(localStartKey) + "'"); } try { // In reversed scan, we want to locate the previous region through current // region's start key. In order to get that previous region, first we // create a closest row before the start key of current region, then // locate all the regions from the created closest row to start key of // current region, thus the last one of located regions should be the // previous region of current region. The related logic of locating // regions is implemented in ReversedScannerCallable byte[] locateStartRow = locateTheClosestFrontRow ? createClosestRowBefore(localStartKey) : null; callable = getScannerCallable(localStartKey, nbRows, locateStartRow); // Open a scanner on the region server starting at the // beginning of the region // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, // we do a callWithRetries this.caller.callWithoutRetries(callable, scannerTimeout); this.currentRegion = callable.getHRegionInfo(); if (this.scanMetrics != null) { this.scanMetrics.countOfRegions.incrementAndGet(); } } catch (IOException e) { ExceptionUtil.rethrowIfInterrupt(e); close(); throw e; } return true; }
@Override protected boolean nextScanner(int nbRows, final boolean done) throws IOException { // Close the previous scanner if it's open if (this.callable != null) { this.callable.setClose(); this.caller.callWithRetries(callable); this.callable = null; } // Where to start the next scanner byte[] localStartKey; boolean locateTheClosestFrontRow = true; // if we're at start of table, close and return false to stop iterating if (this.currentRegion != null) { byte[] startKey = this.currentRegion.getStartKey(); if (startKey == null || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY) || checkScanStopRow(startKey) || done) { close(); if (LOG.isDebugEnabled()) { LOG.debug("Finished " + this.currentRegion); } return false; } localStartKey = startKey; if (LOG.isDebugEnabled()) { LOG.debug("Finished " + this.currentRegion); } } else { localStartKey = this.scan.getStartRow(); if (!Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY)) { locateTheClosestFrontRow = false; } } if (LOG.isDebugEnabled() && this.currentRegion != null) { // Only worth logging if NOT first region in scan. LOG.debug("Advancing internal scanner to startKey at '" + Bytes.toStringBinary(localStartKey) + "'"); } try { // In reversed scan, we want to locate the previous region through current // region's start key. In order to get that previous region, first we // create a closest row before the start key of current region, then // locate all the regions from the created closest row to start key of // current region, thus the last one of located regions should be the // previous region of current region. The related logic of locating // regions is implemented in ReversedScannerCallable byte[] locateStartRow = locateTheClosestFrontRow ? createClosestRowBefore(localStartKey) : null; callable = getScannerCallable(localStartKey, nbRows, locateStartRow); // Open a scanner on the region server starting at the // beginning of the region this.caller.callWithRetries(callable); this.currentRegion = callable.getHRegionInfo(); if (this.scanMetrics != null) { this.scanMetrics.countOfRegions.incrementAndGet(); } } catch (IOException e) { ExceptionUtil.rethrowIfInterrupt(e); close(); throw e; } return true; }
/** * Retries if invocation fails. * @param callTimeout Timeout for this call * @param callable The {@link RetryingCallable} to run. * @return an object of type T * @throws IOException if a remote or network exception occurs * @throws RuntimeException other unspecified error */ @edu.umd.cs.findbugs.annotations.SuppressWarnings (value = "SWL_SLEEP_WITH_LOCK_HELD", justification = "na") public synchronized T callWithRetries(RetryingCallable<T> callable, int callTimeout) throws IOException, RuntimeException { this.callTimeout = callTimeout; List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions = new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>(); this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis(); for (int tries = 0;; tries++) { long expectedSleep = 0; try { beforeCall(); callable.prepare(tries != 0); // if called with false, check table status on ZK return callable.call(); } catch (Throwable t) { if (LOG.isTraceEnabled()) { LOG.trace("Call exception, tries=" + tries + ", retries=" + retries + ", retryTime=" + (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime) + "ms", t); } // translateException throws exception when should not retry: i.e. when request is bad. t = translateException(t); callable.throwable(t, retries != 1); RetriesExhaustedException.ThrowableWithExtraContext qt = new RetriesExhaustedException.ThrowableWithExtraContext(t, EnvironmentEdgeManager.currentTimeMillis(), toString()); exceptions.add(qt); ExceptionUtil.rethrowIfInterrupt(t); if (tries >= retries - 1) { throw new RetriesExhaustedException(tries, exceptions); } // If the server is dead, we need to wait a little before retrying, to give // a chance to the regions to be // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time expectedSleep = callable.sleep(pause, tries + 1); // If, after the planned sleep, there won't be enough time left, we stop now. long duration = singleCallDuration(expectedSleep); if (duration > this.callTimeout) { String msg = "callTimeout=" + this.callTimeout + ", callDuration=" + duration + ": " + callable.getExceptionMessageAdditionalDetail(); throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t)); } } finally { afterCall(); } try { Thread.sleep(expectedSleep); } catch (InterruptedException e) { throw new InterruptedIOException("Interrupted after " + tries + " tries on " + retries); } } }
/** * Performs a scan of META table. * @param connection connection we're using * @param startRow Where to start the scan. Pass null if want to begin scan * at first row. * @param stopRow Where to stop the scan. Pass null if want to scan all rows * from the start one * @param type scanned part of meta * @param maxRows maximum rows to return * @param visitor Visitor invoked against each row. * @throws IOException */ public static void scanMeta(Connection connection, @Nullable final byte[] startRow, @Nullable final byte[] stopRow, QueryType type, int maxRows, final Visitor visitor) throws IOException { int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE; Scan scan = getMetaScan(connection, rowUpperLimit); for (byte[] family : type.getFamilies()) { scan.addFamily(family); } if (startRow != null) scan.setStartRow(startRow); if (stopRow != null) scan.setStopRow(stopRow); if (LOG.isTraceEnabled()) { LOG.trace("Scanning META" + " starting at row=" + Bytes.toStringBinary(startRow) + " stopping at row=" + Bytes.toStringBinary(stopRow) + " for max=" + rowUpperLimit + " with caching=" + scan.getCaching()); } int currentRow = 0; try (Table metaTable = getMetaHTable(connection)) { try (ResultScanner scanner = metaTable.getScanner(scan)) { Result data; while ((data = scanner.next()) != null) { if (data.isEmpty()) continue; // Break if visit returns false. if (!visitor.visit(data)) break; if (++currentRow >= rowUpperLimit) break; } } } if (visitor != null && visitor instanceof Closeable) { try { ((Closeable) visitor).close(); } catch (Throwable t) { ExceptionUtil.rethrowIfInterrupt(t); LOG.debug("Got exception in closing the meta scanner visitor", t); } } }
@Override public void run() { try { LOG.info("SplitLogWorker " + this.serverName + " starting"); this.watcher.registerListener(this); boolean distributedLogReplay = HLogSplitter.isDistributedLogReplay(conf); if (distributedLogReplay) { // initialize a new connection for splitlogworker configuration HConnectionManager.getConnection(conf); } // wait for master to create the splitLogZnode int res = -1; while (res == -1 && !exitWorker) { try { res = ZKUtil.checkExists(watcher, watcher.splitLogZNode); } catch (KeeperException e) { // ignore LOG.warn("Exception when checking for " + watcher.splitLogZNode + " ... retrying", e); } if (res == -1) { LOG.info(watcher.splitLogZNode + " znode does not exist, waiting for master to create"); Thread.sleep(1000); } } if (!exitWorker) { taskLoop(); } } catch (Throwable t) { if (ExceptionUtil.isInterrupt(t)) { LOG.info("SplitLogWorker interrupted. Exiting. " + (exitWorker ? "" : " (ERROR: exitWorker is not set, exiting anyway)")); } else { // only a logical error can cause here. Printing it out // to make debugging easier LOG.error("unexpected error ", t); } } finally { LOG.info("SplitLogWorker " + this.serverName + " exiting"); } }
@Override protected boolean nextScanner(int nbRows, final boolean done) throws IOException { // Close the previous scanner if it's open if (this.callable != null) { this.callable.setClose(); this.caller.callWithRetries(callable, scannerTimeout); this.callable = null; } // Where to start the next scanner byte[] localStartKey; boolean locateTheClosestFrontRow = true; // if we're at start of table, close and return false to stop iterating if (this.currentRegion != null) { byte[] startKey = this.currentRegion.getStartKey(); if (startKey == null || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY) || checkScanStopRow(startKey) || done) { close(); if (LOG.isDebugEnabled()) { LOG.debug("Finished " + this.currentRegion); } return false; } localStartKey = startKey; if (LOG.isDebugEnabled()) { LOG.debug("Finished " + this.currentRegion); } } else { localStartKey = this.scan.getStartRow(); if (!Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY)) { locateTheClosestFrontRow = false; } } if (LOG.isDebugEnabled() && this.currentRegion != null) { // Only worth logging if NOT first region in scan. LOG.debug("Advancing internal scanner to startKey at '" + Bytes.toStringBinary(localStartKey) + "'"); } try { // In reversed scan, we want to locate the previous region through current // region's start key. In order to get that previous region, first we // create a closest row before the start key of current region, then // locate all the regions from the created closest row to start key of // current region, thus the last one of located regions should be the // previous region of current region. The related logic of locating // regions is implemented in ReversedScannerCallable byte[] locateStartRow = locateTheClosestFrontRow ? createClosestRowBefore(localStartKey) : null; callable = getScannerCallable(localStartKey, nbRows, locateStartRow); // Open a scanner on the region server starting at the // beginning of the region this.caller.callWithRetries(callable, scannerTimeout); this.currentRegion = callable.getHRegionInfo(); if (this.scanMetrics != null) { this.scanMetrics.countOfRegions.incrementAndGet(); } } catch (IOException e) { ExceptionUtil.rethrowIfInterrupt(e); close(); throw e; } return true; }
/** * Retries if invocation fails. * @param callTimeout Timeout for this call * @param callable The {@link RetryingCallable} to run. * @return an object of type T * @throws IOException if a remote or network exception occurs * @throws RuntimeException other unspecified error */ public T callWithRetries(RetryingCallable<T> callable, int callTimeout) throws IOException, RuntimeException { List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions = new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>(); this.globalStartTime = EnvironmentEdgeManager.currentTimeMillis(); for (int tries = 0;; tries++) { long expectedSleep; try { //LOG.info("Shen Li: retrying number " + tries); callable.prepare(tries != 0); // if called with false, check table status on ZK return callable.call(getRemainingTime(callTimeout)); } catch (Throwable t) { //LOG.info("Shen Li: retry calling get Exception at tries = " + tries // + ", " + retries + ", retryTime = " // + (EnvironmentEdgeManager.currentTimeMillis() // - this.globalStartTime) // + ", callTimeout = " + callTimeout // + ", Message = " + t.getMessage()); //LOG.info("Shen Li:\n" + t.getStackTrace()); ExceptionUtil.rethrowIfInterrupt(t); if (LOG.isTraceEnabled()) { LOG.trace("Call exception, tries=" + tries + ", retries=" + retries + ", retryTime=" + (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime) + "ms", t); } // translateException throws exception when should not retry: i.e. when request is bad. t = translateException(t); callable.throwable(t, retries != 1); RetriesExhaustedException.ThrowableWithExtraContext qt = new RetriesExhaustedException.ThrowableWithExtraContext(t, EnvironmentEdgeManager.currentTimeMillis(), toString()); exceptions.add(qt); if (tries >= retries - 1) { throw new RetriesExhaustedException(tries, exceptions); } // If the server is dead, we need to wait a little before retrying, to give // a chance to the regions to be // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time expectedSleep = callable.sleep(pause, tries + 1); // If, after the planned sleep, there won't be enough time left, we stop now. long duration = singleCallDuration(expectedSleep); if (duration > callTimeout) { String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration + ": " + callable.getExceptionMessageAdditionalDetail(); throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t)); } } try { Thread.sleep(expectedSleep); } catch (InterruptedException e) { throw new InterruptedIOException("Interrupted after " + tries + " tries on " + retries); } } }
@Override public void onFailure(IOException e) { try { if (LOG.isTraceEnabled()) { LOG.trace("Call exception, tries=" + tries + ", retries=" + retries + ", retryTime=" + (EnvironmentEdgeManager.currentTimeMillis() - globalStartTime) + "ms", e); } // translateException throws exception when should not retry: i.e. when request is bad. Throwable t = translateException(e); callable.throwable(t, retries != 1); RetriesExhaustedException.ThrowableWithExtraContext qt = new RetriesExhaustedException.ThrowableWithExtraContext(t, EnvironmentEdgeManager.currentTimeMillis(), toString()); exceptions.add(qt); ExceptionUtil.rethrowIfInterrupt(t); if (tries >= retries - 1) { throw new RetriesExhaustedException(tries, exceptions); } // If the server is dead, we need to wait a little before retrying, to give // a chance to the regions to be // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time expectedSleep = callable.sleep(pause, tries + 1); // If, after the planned sleep, there won't be enough time left, we stop now. long duration = singleCallDuration(expectedSleep); if (duration > callTimeout) { String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration + ": " + callable.getExceptionMessageAdditionalDetail(); throw (SocketTimeoutException) (new SocketTimeoutException(msg).initCause(t)); } } catch (IOException io) { handler.onFailure(io); return; } // Wait till next retry AsyncRpcClient.WHEEL_TIMER.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { try { callable.prepare(false); // if called with false, check table status on ZK } catch (IOException e2) { handler.onFailure(e2); return; } callable.call(CallResponseHandler.this); } }, expectedSleep, TimeUnit.MILLISECONDS); }
@Override protected void nextScanner(final int nbRows, final boolean done, final ResponseHandler<Boolean> handler) { if (checkToCloseScanner(nbRows, done, handler)) { return; } // Where to start the next scanner byte[] localStartKey; boolean locateTheClosestFrontRow = true; // if we're at start of table, close and return false to stop iterating if (this.currentRegion != null) { byte[] startKey = this.currentRegion.getStartKey(); if (startKey == null || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY) || checkScanStopRow(startKey) || done) { close(); if (LOG.isDebugEnabled()) { LOG.debug("Finished " + this.currentRegion); } setScanDone(); handler.onSuccess(false); return; } localStartKey = startKey; if (LOG.isDebugEnabled()) { LOG.debug("Finished " + this.currentRegion); } } else { localStartKey = this.scan.getStartRow(); if (!Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY)) { locateTheClosestFrontRow = false; } } if (LOG.isDebugEnabled() && this.currentRegion != null) { // Only worth logging if NOT first region in scan. LOG.debug("Advancing internal scanner to startKey at '" + Bytes.toStringBinary(localStartKey) + "'"); } // In reversed scan, we want to locate the previous region through current // region's start key. In order to get that previous region, first we // create a closest row before the start key of current region, then // locate all the regions from the created closest row to start key of // current region, thus the last one of located regions should be the // previous region of current region. The related logic of locating // regions is implemented in ReversedScannerCallable byte[] locateStartRow = locateTheClosestFrontRow ? createClosestRowBefore(localStartKey) : null; callable = getScannerCallable(localStartKey, nbRows, locateStartRow); // Open a scanner on the region server starting at the // beginning of the region this.caller.callWithRetries(callable, new ResponseHandler<Result[]>() { @Override public void onSuccess(Result[] response) { currentRegion = callable.getHRegionInfo(); if (scanMetrics != null) { scanMetrics.countOfRegions.incrementAndGet(); } handler.onSuccess(true); } @Override public void onFailure(IOException e) { try { ExceptionUtil.rethrowIfInterrupt(e); } catch (InterruptedIOException e1) { handler.onFailure(e1); return; } close(); handler.onFailure(e); } }); }