private String buildErrorMessage(List<SendMessageBatchRequestEntry> batchRequestEntries, List<BatchResultErrorEntry> errors) { StringBuilder errorMessage = new StringBuilder(); int count = 0; for (BatchResultErrorEntry error : errors) { if (count > 0) { errorMessage.append(","); } SendMessageBatchRequestEntry failedRequestEventEntry = findRequestEventEntryById(batchRequestEntries, error.getId()); String messageBody = failedRequestEventEntry == null ? null : failedRequestEventEntry.getMessageBody(); errorMessage.append("[" + error.toString() + ",{messageBody:" + "\"" + messageBody + "\"}]"); count++; } return errorMessage.toString(); }
private SendMessageBatchResult mockBatchResult(int batchSize, int expectedSuccessCount) { SendMessageBatchResult mockResult = Mockito.mock(SendMessageBatchResult.class); List<SendMessageBatchResultEntry> successfulEntries = new ArrayList<SendMessageBatchResultEntry>(); for (int i = 0; i < expectedSuccessCount; i++) { successfulEntries.add(new SendMessageBatchResultEntry().withId(String.valueOf(i + 1))); } when(mockResult.getSuccessful()).thenReturn(successfulEntries); List<BatchResultErrorEntry> failedEntries = new ArrayList<BatchResultErrorEntry>(); for (int i = expectedSuccessCount; i < batchSize; i++) { failedEntries.add( new BatchResultErrorEntry().withId(String.valueOf(i + 1)).withCode("401").withSenderFault(true) .withMessage("Invalid binary character")); } when(mockResult.getFailed()).thenReturn(failedEntries); return mockResult; }
@Override public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws AmazonClientException { DirectorySQSQueue queue = getQueueFromUrl(deleteMessageBatchRequest.getQueueUrl(), false); //lists for reporting List<BatchResultErrorEntry> batchResultErrorEntries = new ArrayList<>(); List<DeleteMessageBatchResultEntry> batchResultEntries = new ArrayList<>(); //attempt delete on each for (DeleteMessageBatchRequestEntry batchRequestEntry : deleteMessageBatchRequest.getEntries()) { try { queue.delete(batchRequestEntry.getReceiptHandle()); batchResultEntries.add(new DeleteMessageBatchResultEntry().withId(batchRequestEntry.getId())); } catch (IOException e) { BatchResultErrorEntry batchResultErrorEntry = new BatchResultErrorEntry(). withSenderFault(true). withId(batchRequestEntry.getId()). withMessage(e.getMessage()); batchResultErrorEntries.add(batchResultErrorEntry); } } return new DeleteMessageBatchResult().withFailed(batchResultErrorEntries).withSuccessful(batchResultEntries); }
@Override public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws AmazonClientException { DirectorySQSQueue queue = getQueueFromUrl(changeMessageVisibilityBatchRequest.getQueueUrl(), false); //lists for reporting List<BatchResultErrorEntry> batchResultErrorEntries = new ArrayList<>(); List<ChangeMessageVisibilityBatchResultEntry> batchResultEntries = new ArrayList<>(); //attempt to change the visibility on each for (ChangeMessageVisibilityBatchRequestEntry batchRequestEntry : changeMessageVisibilityBatchRequest.getEntries()) { try { queue.changeVisibility(batchRequestEntry.getReceiptHandle(), batchRequestEntry.getVisibilityTimeout()); batchResultEntries.add(new ChangeMessageVisibilityBatchResultEntry().withId(batchRequestEntry.getId())); } catch (Exception e) { BatchResultErrorEntry batchResultErrorEntry = new BatchResultErrorEntry(). withSenderFault(true). withId(batchRequestEntry.getId()). withMessage(e.getMessage()); batchResultErrorEntries.add(batchResultErrorEntry); } } return new ChangeMessageVisibilityBatchResult().withFailed(batchResultErrorEntries).withSuccessful(batchResultEntries); }
/** * Handles SQS send message batch result and throws EventDeliveryException to cause the flume transaction to fail * and let flume retry the whole batch in case all the messages in the batch failed to be delivered to SQS. * Currently, this method does just logs errors and skips the messages in case some messages from the batched failed * to be delivered but some succeeded (i.e., partial batch failure). * <p> * TODO: Add retry logic instead letting flume drop the failed messages in case of partial batch failure * * @param batchRequest The SQS SendMessageBatchRequest * @param batchResult The SQS SendMessageBatchResult * * @throws EventDeliveryException In case all the messages in the batch failed to be delivered to SQS */ protected void handleResult(SendMessageBatchRequest batchRequest, SendMessageBatchResult batchResult) throws EventDeliveryException { List<SendMessageBatchRequestEntry> batchRequestEntries = batchRequest.getEntries(); List<BatchResultErrorEntry> errors = batchResult.getFailed(); int attemptedCount = batchRequestEntries == null ? 0 : batchRequestEntries.size(); int errorCount = errors == null ? 0 : errors.size(); if (errorCount > 0) { String errorMessage = buildErrorMessage(batchRequestEntries, errors); if (attemptedCount == errorCount) { // if it was a non-empty batch and if all the messages in the batch have errors then fail the whole // batch and let flume rollback the transaction and retry it // Just throw the EventDeliveryException. This will eventually cause the channel's transaction to // rollback. throw new EventDeliveryException(errorMessage); } else { // TODO: Add retry logic instead letting flume drop the failed messages in case of partial batch failure // Just log the error message and let flume drop failed messages in case of partial batch failures LOG.error(errorMessage); } } }
/*** * Logging Failures * @param failed */ private void logFailures(final List<BatchResultErrorEntry> failed) { if(failed != null){ for(BatchResultErrorEntry batchError : failed){ LOGGER.error("Failed to submit sqs batch message entry - Id: {} - Code: {} - Message: {}, isSenders fault: {}", batchError.getId(), batchError.getCode(), batchError.getMessage(), batchError.getSenderFault()); } } }
private void sendMessageBatch(Collection<SendMessageBatchRequestEntry> messages) { for (Collection<SendMessageBatchRequestEntry> batch : partition(messages, MAX_BATCH_SIZE)) { final SendMessageBatchResult sendMessageBatchResult = amazonSQS.sendMessageBatch(new SendMessageBatchRequest(queueUrl, new ArrayList<>(batch))); final List<BatchResultErrorEntry> failed = sendMessageBatchResult.getFailed(); if (!failed.isEmpty()) { try { Set<String> failedMessageIds = failed.stream().map(BatchResultErrorEntry::getId).collect(Collectors.toSet()); final Map<String, SendMessageBatchRequestEntry> failedMessageIdToMessage = batch.stream().filter(failedMessageIds::contains).collect(Collectors.toMap( SendMessageBatchRequestEntry::getId, Function.identity() )); failed.stream().forEach(failMessage -> { final SendMessageBatchRequestEntry failedEntry = failedMessageIdToMessage.get(failMessage.getId()); if (failedEntry != null) { final String messageBody = failedEntry.getMessageBody(); LOG.error( "Failed to send message, due to {}, message content : {} ", failMessage, messageBody ); } }); } catch (Exception e) { LOG.error("Failed to log failed to send messages", e); } } } }
@Override public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest) throws AmazonClientException { DirectorySQSQueue queue = getQueueFromUrl(sendMessageBatchRequest.getQueueUrl(), false); //lists for reporting List<BatchResultErrorEntry> batchResultErrorEntries = new ArrayList<>(); List<SendMessageBatchResultEntry> batchResultEntries = new ArrayList<>(); //attempt to change the visibility on each for (SendMessageBatchRequestEntry batchRequestEntry : sendMessageBatchRequest.getEntries()) { try { final int invisibilityDelay = Objects.firstNonNull(batchRequestEntry.getDelaySeconds(), 0);//0 is amazon spec default Message sentMessage = queue.send(batchRequestEntry.getMessageBody(), invisibilityDelay); batchResultEntries.add(new SendMessageBatchResultEntry(). withId(batchRequestEntry.getId()). withMessageId(sentMessage.getMessageId()). withMD5OfMessageBody(sentMessage.getMD5OfBody())); } catch (IOException e) { BatchResultErrorEntry batchResultErrorEntry = new BatchResultErrorEntry(). withSenderFault(false). withId(batchRequestEntry.getId()). withMessage(e.getMessage()); batchResultErrorEntries.add(batchResultErrorEntry); } } return new SendMessageBatchResult(). withFailed(batchResultErrorEntries). withSuccessful(batchResultEntries); }