public static List<Map<String, AttributeValue>> queryUntilDone( AmazonDynamoDB dynamoClient, QueryRequest qr, int backoffMillis) throws Exception { List<Map<String, AttributeValue>> output = new ArrayList<>(); Map<String, AttributeValue> lastKeyEvaluated = null; do { int queryAttempts = 0; QueryResult result = null; do { try { result = dynamoClient.query(qr).withLastEvaluatedKey( lastKeyEvaluated); output.addAll(result.getItems()); } catch (ProvisionedThroughputExceededException e) { LOG.warn(String .format("Provisioned Throughput Exceeded - Retry Attempt %s", queryAttempts)); Thread.sleep(2 ^ queryAttempts * backoffMillis); queryAttempts++; } } while (queryAttempts < 10 && result == null); if (result == null) { throw new Exception(String.format( "Unable to execute Query after %s attempts", queryAttempts)); } lastKeyEvaluated = result.getLastEvaluatedKey(); } while (lastKeyEvaluated != null); return output; }
void deleteRow(String storeName, Map<String, AttributeValue> key) { String tableName = storeToTableName(storeName); m_logger.debug("Deleting row from table {}, key={}", tableName, DynamoDBService.getDDBKey(key)); Timer timer = new Timer(); boolean bSuccess = false; for (int attempts = 1; !bSuccess; attempts++) { try { m_ddbClient.deleteItem(tableName, key); if (attempts > 1) { m_logger.info("deleteRow() succeeded on attempt #{}", attempts); } bSuccess = true; m_logger.debug("Time to delete table {}, key={}: {}", new Object[]{tableName, DynamoDBService.getDDBKey(key), timer.toString()}); } catch (ProvisionedThroughputExceededException e) { if (attempts >= m_max_commit_attempts) { String errMsg = "All retries exceeded; abandoning deleteRow() for table: " + tableName; m_logger.error(errMsg, e); throw new RuntimeException(errMsg, e); } m_logger.warn("deleteRow() attempt #{} failed: {}", attempts, e); try { Thread.sleep(attempts * m_retry_wait_millis); } catch (InterruptedException ex2) { // ignore } } } }
void updateRow(String storeName, Map<String, AttributeValue> key, Map<String, AttributeValueUpdate> attributeUpdates) { String tableName = storeToTableName(storeName); m_logger.debug("Updating row in table {}, key={}", tableName, DynamoDBService.getDDBKey(key)); Timer timer = new Timer(); boolean bSuccess = false; for (int attempts = 1; !bSuccess; attempts++) { try { m_ddbClient.updateItem(tableName, key, attributeUpdates); if (attempts > 1) { m_logger.info("updateRow() succeeded on attempt #{}", attempts); } bSuccess = true; m_logger.debug("Time to update table {}, key={}: {}", new Object[]{tableName, DynamoDBService.getDDBKey(key), timer.toString()}); } catch (ProvisionedThroughputExceededException e) { if (attempts >= m_max_commit_attempts) { String errMsg = "All retries exceeded; abandoning updateRow() for table: " + tableName; m_logger.error(errMsg, e); throw new RuntimeException(errMsg, e); } m_logger.warn("updateRow() attempt #{} failed: {}", attempts, e); try { Thread.sleep(attempts * m_retry_wait_millis); } catch (InterruptedException ex2) { // ignore } } } }
private ScanResult scan(ScanRequest scanRequest) { m_logger.debug("Performing scan() request on table {}", scanRequest.getTableName()); Timer timer = new Timer(); boolean bSuccess = false; ScanResult scanResult = null; for (int attempts = 1; !bSuccess; attempts++) { try { scanResult = m_ddbClient.scan(scanRequest); if (attempts > 1) { m_logger.info("scan() succeeded on attempt #{}", attempts); } bSuccess = true; m_logger.debug("Time to scan table {}: {}", scanRequest.getTableName(), timer.toString()); } catch (ProvisionedThroughputExceededException e) { if (attempts >= m_max_read_attempts) { String errMsg = "All retries exceeded; abandoning scan() for table: " + scanRequest.getTableName(); m_logger.error(errMsg, e); throw new RuntimeException(errMsg, e); } m_logger.warn("scan() attempt #{} failed: {}", attempts, e); try { Thread.sleep(attempts * m_retry_wait_millis); } catch (InterruptedException ex2) { // ignore } } } return scanResult; }
@Override public void beforeRequest(Request<?> request) { /* Things to do just before a request is executed */ if (request.getOriginalRequest() instanceof PutItemRequest) { /* Throw throuhgput exceeded exception for 50% of put requests */ if (rnd.nextInt(2) == 0) { logger.info("Injecting ProvisionedThroughputExceededException"); throw new ProvisionedThroughputExceededException("Injected Error"); } } /* Add latency to some Get requests */ if (request.getOriginalRequest() instanceof GetItemRequest) { /* Delay 50% of GetItem requests by 500 ms */ if (rnd.nextInt(2) == 0) { /* Delay on average 50% of the requests from client perspective */ try { logger.info("Injecting 500 ms delay"); Thread.sleep(500); } catch (InterruptedException ie) { logger.info(ie); throw new RuntimeException(ie); } } } }
@Override protected R getFallback() { failMeter.mark(); final List<HystrixEventType> events = getExecutionEvents(); if (isFailure(events)) { final Throwable throwable = getFailedExecutionException(); if (throwable instanceof ProvisionedThroughputExceededException) { // todo: figure out how to handle this via fallbackAction and retries throughputFailMeter.mark(); logger.error("{}", kvp("command_name", commandName, "fallback_event", "ProvisionedThroughputExceededException", "time_millis", this.getExecutionTimeInMilliseconds(), "err", "[" + throwable.getMessage() + "]" ), throwable); } else { logger.warn("{}", kvp("command_name", commandName, "fallback_event", "error", "time_millis", this.getExecutionTimeInMilliseconds(), "err", "[" + throwable.getMessage() + "]" ), throwable); } } if (events.contains(HystrixEventType.TIMEOUT)) { warn(commandName, HystrixEventType.TIMEOUT.name()); } if (events.contains(HystrixEventType.SHORT_CIRCUITED)) { warn(commandName, HystrixEventType.SHORT_CIRCUITED.name()); } if (events.contains(HystrixEventType.BAD_REQUEST)) { warn(commandName, HystrixEventType.BAD_REQUEST.name()); } return fallbackAction.get(); }