@VisibleForTesting static SendMessageBatchRequest createRequest(String queueUrl, Map<String, SendMessageEntry> entries) { return new SendMessageBatchRequest() .withQueueUrl(queueUrl) .withEntries(entries.entrySet().stream().map(keyValue -> { SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry() .withId(keyValue.getKey()) .withMessageBody(keyValue.getValue().getBody()); keyValue.getValue().getDelay() .ifPresent((delay) -> entry.setDelaySeconds((int) delay.getSeconds())); return entry; }).collect(Collectors.toList()) ); }
/** * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests happy path scenario. * <p> * <pre> * Inputs: * channel = never empty * batchSize = 5 * maxMessageSize = 10 Bytes * each message size = 2 Bytes * * Expected Output: * number of batches = 1 * number of messages in batch = 5 * </pre> */ @Test public void testCreateBatches() throws Exception { BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10); byte[] mockMsgPayload = {'A', 'b'}; Event mockEvent = Mockito.mock(Event.class); when(mockEvent.getBody()).thenReturn(mockMsgPayload); Channel mockChannel = Mockito.mock(Channel.class); when(mockChannel.take()).thenReturn(mockEvent); List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel); Assert.assertNotNull(batches); Assert.assertEquals(1, batches.size()); List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries(); Assert.assertNotNull(msgEntries); Assert.assertEquals(5, msgEntries.size()); assertCorrectPayloadInEntries(mockMsgPayload, msgEntries); }
/** * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the * channel returns event with empty body. * <p> * <pre> * Inputs: * channel = 1 event with empty body * batchSize = 5 * maxMessageSize = 10 Bytes * * Expected Output: * number of batches = 0 * </pre> */ @Test public void testCreateBatchesEventWithEmptyBody() throws Exception { BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10); Channel mockChannel = Mockito.mock(Channel.class); Event mockEvent = Mockito.mock(Event.class); when(mockEvent.getBody()).thenReturn("".getBytes()); when(mockChannel.take()).thenReturn(mockEvent); List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel); Assert.assertNotNull(batches); Assert.assertEquals(0, batches.size()); }
/** * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the * channel is empty after first event. * <p> * <pre> * Inputs: * channel = 1 Event (Empty after first take) * batchSize = 5 * maxMessageSize = 10 Bytes * * Expected Output: * number of batches = 1 * number of messages in batch = 1 * </pre> */ @Test public void testCreateBatchesEmptyChannelAfterFirstTake() throws Exception { BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10); byte[] mockMsgPayload = {'A', 'b'}; Event mockEvent = Mockito.mock(Event.class); when(mockEvent.getBody()).thenReturn(mockMsgPayload); Channel mockChannel = Mockito.mock(Channel.class); when(mockChannel.take()).thenReturn(mockEvent).thenReturn(null); List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel); Assert.assertNotNull(batches); Assert.assertEquals(1, batches.size()); List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries(); Assert.assertNotNull(msgEntries); Assert.assertEquals(1, msgEntries.size()); assertCorrectPayloadInEntries(mockMsgPayload, msgEntries); }
/** * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the * channel is empty after the last take for the batch. * <p> * <pre> * Inputs: * channel = 5 Events (Empty after 5th take) * batchSize = 5 * maxMessageSize = 10 Bytes * * Expected Output: * number of batches = 1 * number of messages in batch = 5 * </pre> */ @Test public void testCreateBatchesEmptyChannelAfterLastTake() throws Exception { BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10); byte[] mockMsgPayload = {'A', 'b'}; Event mockEvent = Mockito.mock(Event.class); when(mockEvent.getBody()).thenReturn(mockMsgPayload); Channel mockChannel = Mockito.mock(Channel.class); when(mockChannel.take()).thenReturn(mockEvent, mockEvent, mockEvent, mockEvent, mockEvent, null); List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel); Assert.assertNotNull(batches); Assert.assertEquals(1, batches.size()); List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries(); Assert.assertNotNull(msgEntries); Assert.assertEquals(5, msgEntries.size()); assertCorrectPayloadInEntries(mockMsgPayload, msgEntries); }
/** * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the * channel is empty in the middle of taking events for the batch * <p> * <pre> * Inputs: * channel = 3 Events (Empty after 3rd take) * batchSize = 5 * maxMessageSize = 10 Bytes * * Expected Output: * number of batches = 1 * number of messages in batch = 3 * </pre> */ @Test public void testCreateBatchesEmptyChannelInTheMiddle() throws Exception { BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10); byte[] mockMsgPayload = {'A', 'b'}; Event mockEvent = Mockito.mock(Event.class); when(mockEvent.getBody()).thenReturn(mockMsgPayload); Channel mockChannel = Mockito.mock(Channel.class); when(mockChannel.take()).thenReturn(mockEvent, mockEvent, mockEvent, null); List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel); Assert.assertNotNull(batches); Assert.assertEquals(1, batches.size()); List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries(); Assert.assertNotNull(msgEntries); Assert.assertEquals(3, msgEntries.size()); assertCorrectPayloadInEntries(mockMsgPayload, msgEntries); }
/** * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the * channel is not empty but contains events with empty body in the middle of taking events for the batch * <p> * <pre> * Inputs: * channel = 4 Events (3 Events with Body and 4th Event empty) * batchSize = 5 * maxMessageSize = 10 Bytes * * Expected Output: * number of batches = 1 * number of messages in batch = 3 * </pre> */ @Test public void testCreateBatchesEmptyEventInTheMiddle() throws Exception { BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10); byte[] mockMsgPayload = {'A', 'b'}; byte[] mockEmptyMsgPayload = {}; Event mockEvent = Mockito.mock(Event.class); Event mockEmptyEvent = Mockito.mock(Event.class); when(mockEvent.getBody()).thenReturn(mockMsgPayload); when(mockEmptyEvent.getBody()).thenReturn(mockEmptyMsgPayload); Channel mockChannel = Mockito.mock(Channel.class); when(mockChannel.take()).thenReturn(mockEvent, mockEvent, mockEvent, mockEmptyEvent); List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel); Assert.assertNotNull(batches); Assert.assertEquals(1, batches.size()); List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries(); Assert.assertNotNull(msgEntries); Assert.assertEquals(3, msgEntries.size()); assertCorrectPayloadInEntries(mockMsgPayload, msgEntries); }
/** * Tests the {@link BatchSQSMsgSender#send(org.apache.flume.Channel)} method. Tests the happy path scenario. * * @throws Exception */ @Test public void testSend() throws Exception { BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10); AmazonSQS mockSqs = Mockito.mock(AmazonSQS.class); when(mockSqs.sendMessageBatch(any(SendMessageBatchRequest.class))).thenReturn(new SendMessageBatchResult()); sqsMsgSender.setAmazonSQS(mockSqs); byte[] mockMsgPayload = {'A', 'b'}; Event mockEvent = Mockito.mock(Event.class); when(mockEvent.getBody()).thenReturn(mockMsgPayload); Channel mockChannel = Mockito.mock(Channel.class); when(mockChannel.take()).thenReturn(mockEvent); sqsMsgSender.send(mockChannel); }
/** * Tests the {@link BatchSQSMsgSender#send(org.apache.flume.Channel)} method. Tests the failure scenario when AWS * SQS API throws AmazonServiceException. * * @throws Exception */ @Test(expected = EventDeliveryException.class) public void testSendFailureAmazonServiceException() throws Exception { BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10); AmazonSQS mockSqs = Mockito.mock(AmazonSQS.class); when(mockSqs.sendMessageBatch(any(SendMessageBatchRequest.class))).thenThrow(AmazonServiceException.class); sqsMsgSender.setAmazonSQS(mockSqs); byte[] mockMsgPayload = {'A', 'b'}; Event mockEvent = Mockito.mock(Event.class); when(mockEvent.getBody()).thenReturn(mockMsgPayload); Channel mockChannel = Mockito.mock(Channel.class); when(mockChannel.take()).thenReturn(mockEvent); sqsMsgSender.send(mockChannel); }
/** * Tests the {@link BatchSQSMsgSender#send(org.apache.flume.Channel)} method. Tests the failure scenario when AWS * SQS API throws AmazonClientException. * * @throws Exception */ @Test(expected = EventDeliveryException.class) public void testSendFailureAmazonClientException() throws Exception { BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10); AmazonSQS mockSqs = Mockito.mock(AmazonSQS.class); when(mockSqs.sendMessageBatch(any(SendMessageBatchRequest.class))).thenThrow(AmazonClientException.class); sqsMsgSender.setAmazonSQS(mockSqs); byte[] mockMsgPayload = {'A', 'b'}; Event mockEvent = Mockito.mock(Event.class); when(mockEvent.getBody()).thenReturn(mockMsgPayload); Channel mockChannel = Mockito.mock(Channel.class); when(mockChannel.take()).thenReturn(mockEvent); sqsMsgSender.send(mockChannel); }
@Test public void shouldSendBatchesInSizeOfTen() throws Exception { when(mockAmazonSQS.sendMessageBatch(any(SendMessageBatchRequest.class))).thenReturn(mock(SendMessageBatchResult.class)); ArgumentCaptor<SendMessageBatchRequest> captor = ArgumentCaptor.forClass(SendMessageBatchRequest.class); messagePublisher.postBatch(messageBatch(10), subject); verify(mockAmazonSQS, times(1)).sendMessageBatch(captor.capture()); assertThat(captor.getValue().getEntries()).hasSize(10); messagePublisher.postBatch(messageBatch(20), subject); verify(mockAmazonSQS, times(3)).sendMessageBatch(any(SendMessageBatchRequest.class)); messagePublisher.postBatch(messageBatch(11), subject); verify(mockAmazonSQS, times(5)).sendMessageBatch(captor.capture()); assertThat(captor.getValue().getEntries()).hasSize(1); messagePublisher.postBatch(messageBatch(9), subject); verify(mockAmazonSQS, times(6)).sendMessageBatch(captor.capture()); assertThat(captor.getValue().getEntries()).hasSize(9); }
void publishMessages(List<Message> messages) { logger.info("Sending {} messages", messages.size()); SendMessageBatchRequest batch = new SendMessageBatchRequest(queueURL); messages.stream().forEach(msg -> { SendMessageBatchRequestEntry sendr = new SendMessageBatchRequestEntry(msg.getId(), msg.getPayload()); batch.getEntries().add(sendr); }); logger.info("sending {}", batch.getEntries().size()); SendMessageBatchResult result = client.sendMessageBatch(batch); logger.info("send result {}", result.getFailed().toString()); }
@Override public int send(Channel channel) throws EventDeliveryException { int eventProcessedCounter = 0; // Create batch request List<SendMessageBatchRequest> batchRequests = createBatches(channel); for (SendMessageBatchRequest batchRequest : batchRequests) { // Send batch request SendMessageBatchResult result = null; try { result = this.amazonSQS.sendMessageBatch(batchRequest); } catch (AmazonServiceException ase) { // Throw request reached to SQS but the whole batch was rejected for some reason. Let the whole batch // be treated as "failed". Flume will retry the while batch throw new EventDeliveryException("Failure sending batch message request to Amazon SQS, " + "the request made it to SQS but was rejected for some reason.", ase); } catch (AmazonClientException ace) { throw new EventDeliveryException("Failure sending batch message request to Amazon SQS.", ace); } // Handle the result of the SQS batch request i.e., log errors, or fail the whole batch by throwing // EventDeliveryException in case of errors etc. handleResult(batchRequest, result); // The code reached here means there is nothing to rollback in this transaction. So increment the // eventProcessedCounter by the number of successfully sent messages. eventProcessedCounter += result.getSuccessful().size(); } return eventProcessedCounter; }
/** * Handles SQS send message batch result and throws EventDeliveryException to cause the flume transaction to fail * and let flume retry the whole batch in case all the messages in the batch failed to be delivered to SQS. * Currently, this method does just logs errors and skips the messages in case some messages from the batched failed * to be delivered but some succeeded (i.e., partial batch failure). * <p> * TODO: Add retry logic instead letting flume drop the failed messages in case of partial batch failure * * @param batchRequest The SQS SendMessageBatchRequest * @param batchResult The SQS SendMessageBatchResult * * @throws EventDeliveryException In case all the messages in the batch failed to be delivered to SQS */ protected void handleResult(SendMessageBatchRequest batchRequest, SendMessageBatchResult batchResult) throws EventDeliveryException { List<SendMessageBatchRequestEntry> batchRequestEntries = batchRequest.getEntries(); List<BatchResultErrorEntry> errors = batchResult.getFailed(); int attemptedCount = batchRequestEntries == null ? 0 : batchRequestEntries.size(); int errorCount = errors == null ? 0 : errors.size(); if (errorCount > 0) { String errorMessage = buildErrorMessage(batchRequestEntries, errors); if (attemptedCount == errorCount) { // if it was a non-empty batch and if all the messages in the batch have errors then fail the whole // batch and let flume rollback the transaction and retry it // Just throw the EventDeliveryException. This will eventually cause the channel's transaction to // rollback. throw new EventDeliveryException(errorMessage); } else { // TODO: Add retry logic instead letting flume drop the failed messages in case of partial batch failure // Just log the error message and let flume drop failed messages in case of partial batch failures LOG.error(errorMessage); } } }
/** * Tests {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests invalid characters not * allowed by the SQS. See [http://docs.aws.amazon * .com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html] * for list of valid characters allowed by SQS. * <p> * <p> * <pre> * Inputs: * channel = never empty. with messages containing invalid characters. * * Expected Output: * The sink messages should not contain invalid characters * </pre> */ @Test public void testInvalidCharacters() throws Exception { // See // http://stackoverflow.com/questions/16688523/aws-sqs-valid-characters // http://stackoverflow.com/questions/1169754/amazon-sqs-invalid-binary-character-in-message-body // https://forums.aws.amazon.com/thread.jspa?messageID=459090 // http://stackoverflow.com/questions/16329695/invalid-binary-character-when-transmitting-protobuf-net // -messages-over-aws-sqs byte invalidCharByte = 0x1C; String mockMsg = "Test with some invalid chars at the end 0%2F>^F"; byte[] origPayloadWithInvalidChars = ArrayUtils.add(mockMsg.getBytes(), invalidCharByte); BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 1, origPayloadWithInvalidChars.length); Event mockEvent = Mockito.mock(Event.class); when(mockEvent.getBody()).thenReturn(origPayloadWithInvalidChars); Channel mockChannel = Mockito.mock(Channel.class); when(mockChannel.take()).thenReturn(mockEvent); List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel); List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries(); assertCorrectPayloadInEntries(new String(origPayloadWithInvalidChars).trim().getBytes(), msgEntries); // Make sure that the message being sent by the sink doesn't contain the invalid characters for (SendMessageBatchRequestEntry entry : msgEntries) { Assert.assertNotNull(entry); Assert.assertTrue(ArrayUtils.contains(new String(origPayloadWithInvalidChars).getBytes(), invalidCharByte)); Assert.assertTrue(!ArrayUtils.contains(entry.getMessageBody().getBytes(), invalidCharByte)); } }
/** * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the * specified <i>batchSize</i> can not be fit into the specified <i>maxMessageSize</i> * <p> * <pre> * Inputs: * channel = never empty * batchSize = 5 * maxMessageSize = 10 Bytes * each message size = 3 Bytes * * Expected Output: * number of batches = 2 * number of messages in batch 1 = 3 * number of messages in batch 2 = 2 * </pre> */ @Test public void testCreateBatchesExceedingSize() throws Exception { BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10); byte[] mockMsgPayload = {'A', 'b', '~'}; Event mockEvent = Mockito.mock(Event.class); when(mockEvent.getBody()).thenReturn(mockMsgPayload); Channel mockChannel = Mockito.mock(Channel.class); when(mockChannel.take()).thenReturn(mockEvent); List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel); Assert.assertNotNull(batches); Assert.assertEquals(2, batches.size()); List<SendMessageBatchRequestEntry> msgEntries1 = batches.get(0).getEntries(); Assert.assertNotNull(msgEntries1); Assert.assertEquals(3, msgEntries1.size()); List<SendMessageBatchRequestEntry> msgEntries2 = batches.get(1).getEntries(); Assert.assertNotNull(msgEntries2); Assert.assertEquals(2, msgEntries2.size()); assertCorrectPayloadInEntries(mockMsgPayload, msgEntries2); }
/** * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the * specified <i>batchSize</i> can not fit into the specified <i>maxMessageSize</i> and channel gets empty after * certain number of events "takes". * <p> * <pre> * Inputs: * channel = 4 Events * batchSize = 5 * maxMessageSize = 10 Bytes * each message size = 3 Bytes * * Expected Output: * number of batches = 2 * number of messages in batch 1 = 3 * number of messages in batch 2 = 1 * </pre> */ @Test public void testCreateBatchesExceedingSizeLimitedChannel() throws Exception { BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10); byte[] mockMsgPayload = {'^', '@', '~'}; Event mockEvent = Mockito.mock(Event.class); when(mockEvent.getBody()).thenReturn(mockMsgPayload); Channel mockChannel = Mockito.mock(Channel.class); when(mockChannel.take()).thenReturn(mockEvent, mockEvent, mockEvent, mockEvent, null); List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel); Assert.assertNotNull(batches); Assert.assertEquals(2, batches.size()); List<SendMessageBatchRequestEntry> msgEntries1 = batches.get(0).getEntries(); Assert.assertNotNull(msgEntries1); Assert.assertEquals(3, msgEntries1.size()); List<SendMessageBatchRequestEntry> msgEntries2 = batches.get(1).getEntries(); Assert.assertNotNull(msgEntries2); Assert.assertEquals(1, msgEntries2.size()); assertCorrectPayloadInEntries(mockMsgPayload, msgEntries2); }
/** * Tests the {@link BatchSQSMsgSender#send(org.apache.flume.Channel)} method. Tests the failure scenario when * certain messages in the batch failed to be delivered to SQS. * <p> * <pre> * Expected: * - No EventDeliveryException is thrown * - The BatchSQSMsgSender returns successfully processed events count * </pre> * * @throws Exception */ @Test public void testSendPartialBatchFailure() throws Exception { int batchSize = 5; int failedMsgCount = 1; int expectedSuccessCount = batchSize - failedMsgCount; BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", batchSize, 100); AmazonSQS mockSqs = Mockito.mock(AmazonSQS.class); SendMessageBatchResult mockResult = mockBatchResult(batchSize, expectedSuccessCount); when(mockSqs.sendMessageBatch(any(SendMessageBatchRequest.class))).thenReturn(mockResult); sqsMsgSender.setAmazonSQS(mockSqs); String msgBody = "Some message payload"; byte[] mockMsgPayload = msgBody.getBytes(); Event mockEvent = Mockito.mock(Event.class); when(mockEvent.getBody()).thenReturn(mockMsgPayload); Channel mockChannel = Mockito.mock(Channel.class); when(mockChannel.take()).thenReturn(mockEvent); int successCount = sqsMsgSender.send(mockChannel); Assert.assertEquals(expectedSuccessCount, successCount); }
/** * Tests the {@link BatchSQSMsgSender#send(org.apache.flume.Channel)} method. Tests the failure scenario when all * the messages in the batch failed to be delivered to SQS. * <p> * Expected: - EventDeliveryException is thrown - EventDeliveryException also contains the failed messages payload * in the exception message * * @throws Exception */ @Test(expected = EventDeliveryException.class) public void testSendCompleteBatchFailure() throws Exception { int batchSize = 5; int failedMsgCount = batchSize; int expectedSuccessCount = batchSize - failedMsgCount; BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", batchSize, 100); AmazonSQS mockSqs = Mockito.mock(AmazonSQS.class); SendMessageBatchResult mockResult = mockBatchResult(batchSize, expectedSuccessCount); when(mockSqs.sendMessageBatch(any(SendMessageBatchRequest.class))).thenReturn(mockResult); sqsMsgSender.setAmazonSQS(mockSqs); String msgBody = "Some message payload"; byte[] mockMsgPayload = msgBody.getBytes(); Event mockEvent = Mockito.mock(Event.class); when(mockEvent.getBody()).thenReturn(mockMsgPayload); Channel mockChannel = Mockito.mock(Channel.class); when(mockChannel.take()).thenReturn(mockEvent); try { sqsMsgSender.send(mockChannel); } catch (EventDeliveryException ede) { // Make sure that the original payload is also part of the exception error messsage body // to get the failed payloads logged along with errors Assert.assertTrue(ede.getMessage().contains(msgBody)); //rethrow as the test is expecting this exception to be thrown throw ede; } }
public static void main(String[] args) { final AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient(); try { CreateQueueResult create_result = sqs.createQueue(QUEUE_NAME); } catch (AmazonSQSException e) { if (!e.getErrorCode().equals("QueueAlreadyExists")) { throw e; } } String queueUrl = sqs.getQueueUrl(QUEUE_NAME).getQueueUrl(); SendMessageRequest send_msg_request = new SendMessageRequest() .withQueueUrl(queueUrl) .withMessageBody("hello world") .withDelaySeconds(5); sqs.sendMessage(send_msg_request); // Send multiple messages to the queue SendMessageBatchRequest send_batch_request = new SendMessageBatchRequest() .withQueueUrl(queueUrl) .withEntries( new SendMessageBatchRequestEntry( "msg_1", "Hello from message 1"), new SendMessageBatchRequestEntry( "msg_2", "Hello from message 2") .withDelaySeconds(10)); sqs.sendMessageBatch(send_batch_request); // receive messages from the queue List<Message> messages = sqs.receiveMessage(queueUrl).getMessages(); // delete messages from the queue for (Message m : messages) { sqs.deleteMessage(queueUrl, m.getReceiptHandle()); } }
/*** * Submit the batches of messages * @param messageBatches */ private void submitBatches( final Map<String, List<SendMessageBatchRequestEntry>> messageBatches) { for(Entry<String, List<SendMessageBatchRequestEntry>> queueBatchEntry : messageBatches.entrySet()){ final String queueUrl = this.queueAdmin.getQueueUrl(queueBatchEntry.getKey()); final SendMessageBatchRequest batch = new SendMessageBatchRequest() .withQueueUrl(queueUrl) .withEntries(queueBatchEntry.getValue()); final SendMessageBatchResult batchResult = this.sqs.sendMessageBatch(batch); this.logFailures(batchResult.getFailed()); } }
@Override public SendMessageBatchResult sendMessages(SendMessageBatchRequest request, ResultCapture<SendMessageBatchResult> extractor) { ActionResult result = resource.performAction("SendMessages", request, extractor); if (result == null) return null; return (SendMessageBatchResult) result.getData(); }
@Override public SendMessageBatchResult sendMessages( ResultCapture<SendMessageBatchResult> extractor) { SendMessageBatchRequest request = new SendMessageBatchRequest(); return sendMessages(request, extractor); }
@Override public SendMessageBatchResult sendMessages( List<SendMessageBatchRequestEntry> entries, ResultCapture<SendMessageBatchResult> extractor) { SendMessageBatchRequest request = new SendMessageBatchRequest() .withEntries(entries); return sendMessages(request, extractor); }
/** * Tests sending messages using batch operation and retrieve them. Also * tests setting the queue attributes and retrieving them. */ @Test @Ignore public void testQueueSubResourceAndAttributes() throws InterruptedException { /** * Trying to get the message which is deleted. Here there is no service * call made, a new sub resource is created with the given handle. So, * this wont be returning null. */ Message message = queue.getMessage("invalid-recepient-handle"); assertNotNull(message); try { message.getAttributes(); fail("An unsupported operation exception must be thrown as load operation is no supported on message attribute"); } catch (UnsupportedOperationException use) { } SendMessageBatchResult sendMessageBatchResult = queue .sendMessages(new SendMessageBatchRequest() .withEntries(new SendMessageBatchRequestEntry("msg1", TEST_MESSAGE))); SendMessageBatchResultEntry sendMessageBatchResultEntry = sendMessageBatchResult .getSuccessful().get(0); List<Message> messages = waitForMessagesFromQueue(null); assertNotNull(messages); assertEquals(1, messages.size()); message = messages.get(0); assertMessage(TEST_MESSAGE, sendMessageBatchResultEntry.getMessageId(), sendMessageBatchResultEntry.getMD5OfMessageBody(), message); queue.setAttributes(ImmutableMapParameter.of("MaximumMessageSize", "2048")); assertTrue(queue.getAttributes().containsKey("MaximumMessageSize")); }
@Override public void run() { List<SendMessageBatchRequestEntry> messages = new ArrayList<>(); for (int i = 0; i < BATCH_MESSAGE_SIZE; i++) { messages.add(new SendMessageBatchRequestEntry(Integer.toString(i), new StringBuilder().append("message_").append(i).toString())); } this.amazonSqs.sendMessageBatch(new SendMessageBatchRequest(this.queueUrl, messages)); this.countDownLatch.countDown(); }
private void sendMessageBatch(Collection<SendMessageBatchRequestEntry> messages) { for (Collection<SendMessageBatchRequestEntry> batch : partition(messages, MAX_BATCH_SIZE)) { final SendMessageBatchResult sendMessageBatchResult = amazonSQS.sendMessageBatch(new SendMessageBatchRequest(queueUrl, new ArrayList<>(batch))); final List<BatchResultErrorEntry> failed = sendMessageBatchResult.getFailed(); if (!failed.isEmpty()) { try { Set<String> failedMessageIds = failed.stream().map(BatchResultErrorEntry::getId).collect(Collectors.toSet()); final Map<String, SendMessageBatchRequestEntry> failedMessageIdToMessage = batch.stream().filter(failedMessageIds::contains).collect(Collectors.toMap( SendMessageBatchRequestEntry::getId, Function.identity() )); failed.stream().forEach(failMessage -> { final SendMessageBatchRequestEntry failedEntry = failedMessageIdToMessage.get(failMessage.getId()); if (failedEntry != null) { final String messageBody = failedEntry.getMessageBody(); LOG.error( "Failed to send message, due to {}, message content : {} ", failMessage, messageBody ); } }); } catch (Exception e) { LOG.error("Failed to log failed to send messages", e); } } } }
@Test public void postBatchShouldSendMessagesWithSNSEnvelope() throws Exception { // Arrange when(mockAmazonSQS.sendMessageBatch(any(SendMessageBatchRequest.class))).thenReturn(mock(SendMessageBatchResult.class)); ArgumentCaptor<SendMessageBatchRequest> captor = ArgumentCaptor.forClass(SendMessageBatchRequest.class); // Act messagePublisher.postBatch( Arrays.asList( new TestMessage("Hello"), new TestMessage("world") ), "subject" ); // Assert verify(mockAmazonSQS).sendMessageBatch(captor.capture()); SendMessageBatchRequest sendMessageBatchRequest = captor.getValue(); assertThat(sendMessageBatchRequest.getQueueUrl()).isEqualTo("queueUrl"); List<SendMessageBatchRequestEntry> entries = sendMessageBatchRequest.getEntries(); assertThat(entries.size()).isEqualTo(2); ObjectMapper mapper = new ObjectMapper(); AmazonSNSMessage msg1 = mapper.readValue(entries.get(0).getMessageBody(), AmazonSNSMessage.class); assertThat(msg1.getSubject()).isEqualTo("subject"); assertThat(msg1.getMessage()).isEqualTo("{\"message\":\"Hello\"}"); AmazonSNSMessage msg2 = mapper.readValue(entries.get(1).getMessageBody(), AmazonSNSMessage.class); assertThat(msg2.getSubject()).isEqualTo("subject"); assertThat(msg2.getMessage()).isEqualTo("{\"message\":\"world\"}"); }
@Override public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest) throws AmazonClientException { DirectorySQSQueue queue = getQueueFromUrl(sendMessageBatchRequest.getQueueUrl(), false); //lists for reporting List<BatchResultErrorEntry> batchResultErrorEntries = new ArrayList<>(); List<SendMessageBatchResultEntry> batchResultEntries = new ArrayList<>(); //attempt to change the visibility on each for (SendMessageBatchRequestEntry batchRequestEntry : sendMessageBatchRequest.getEntries()) { try { final int invisibilityDelay = Objects.firstNonNull(batchRequestEntry.getDelaySeconds(), 0);//0 is amazon spec default Message sentMessage = queue.send(batchRequestEntry.getMessageBody(), invisibilityDelay); batchResultEntries.add(new SendMessageBatchResultEntry(). withId(batchRequestEntry.getId()). withMessageId(sentMessage.getMessageId()). withMD5OfMessageBody(sentMessage.getMD5OfBody())); } catch (IOException e) { BatchResultErrorEntry batchResultErrorEntry = new BatchResultErrorEntry(). withSenderFault(false). withId(batchRequestEntry.getId()). withMessage(e.getMessage()); batchResultErrorEntries.add(batchResultErrorEntry); } } return new SendMessageBatchResult(). withFailed(batchResultErrorEntries). withSuccessful(batchResultEntries); }
@Test public void testCreateRequest() { SendMessageBatchRequest request = SendMessageBatchAction.createRequest(QUEUE_URL, ENTRY_MAP); assertThat(request.getQueueUrl()).isEqualTo(QUEUE_URL); assertThat(request.getEntries().size()).isEqualTo(ENTRY_MAP.size()); }
@Test public void testBulkSendDelete_shouldWork() { // create queue CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue")); // send batch SendMessageBatchRequestEntry firstRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("one") .withMessageGroupId("groupee").withMessageBody("{\"XOXO\":234}"); SendMessageBatchRequestEntry secondRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("two") .withMessageGroupId("groupee").withMessageBody("{\"Quinoa\":\"Readymade\",\"vegan\":true}"); SendMessageBatchRequestEntry thirdRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("three") .withMessageGroupId("groupee").withMessageBody("{\"VHS\":\"street art slow-carb\"}"); // verify send batch result SendMessageBatchResult sendResult = sqs.sendMessageBatch(new SendMessageBatchRequest().withQueueUrl(createdQueue.getQueueUrl()) .withEntries(ImmutableList.of(firstRequest,secondRequest, thirdRequest))); assertNotNull("verify that batch send returned ok", sendResult); assertTrue("no request failed",sendResult.getFailed().isEmpty()); assertEquals("verify successfull message count", 3, sendResult.getSuccessful().size()); SendMessageBatchResultEntry firstResultEntry = sendResult.getSuccessful().stream().filter(msg -> msg.getId().equals("one")).findAny().get(); assertEquals("verify correct message MD5",getAwsMessageMD5("{\"XOXO\":234}"),firstResultEntry.getMD5OfMessageBody()); assertNotNull("verify message id exists",firstResultEntry.getMessageId()); ReceiveMessageResult receivedMessagesResult = sqs.receiveMessage(new ReceiveMessageRequest().withQueueUrl(createdQueue.getQueueUrl()).withMaxNumberOfMessages(4)); // delete batch List<DeleteMessageBatchRequestEntry> deleteRequests = new ArrayList<>(); deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("one").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle())); deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("two").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle())); deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("three").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle())); DeleteMessageBatchResult deleteBatchResult = sqs.deleteMessageBatch(new DeleteMessageBatchRequest().withQueueUrl(createdQueue.getQueueUrl()).withEntries(deleteRequests)); // verify delete batch result assertNotNull("verify that batch delete returned ok", deleteBatchResult); assertTrue("no request failed",deleteBatchResult.getFailed().isEmpty()); assertEquals("verify successfull message count", 3, deleteBatchResult.getSuccessful().size()); assertTrue("queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getMessageQueue().isEmpty()); for(Message message : receivedMessagesResult.getMessages()) { assertTrue("invisibility-queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getInvisibilityQueueFor(message.getReceiptHandle()).isEmpty()); } // cleanup getQueues().remove("tea-earl-grey-queue"); }
@Test public void testBulkSend_withEmptyRequestParams_shouldWork() { assertNotNull(sqs.sendMessageBatch(new SendMessageBatchRequest())); }
public Observable<SendMessageBatchResult> sendMessageBatchAsync(SendMessageBatchRequest request) { return Observable.from(sqsClient.sendMessageBatchAsync(request)); }
public void batchSend(List<SendMessageBatchRequestEntry> entries){ try { // Send batch messages //System.out.println("\nSending a message to jobQueue.\n"); SendMessageBatchRequest batchRequest = new SendMessageBatchRequest().withQueueUrl(queueUrl); batchRequest.setEntries(entries); SendMessageBatchResult batchResult = sqs.sendMessageBatch(batchRequest); // sendMessageBatch can return successfully, and yet individual batch // items fail. So, make sure to retry the failed ones. if (!batchResult.getFailed().isEmpty()) { //System.out.println("Retry sending failed messages..."); List<SendMessageBatchRequestEntry> failedEntries = new ArrayList<SendMessageBatchRequestEntry>(); Iterator<SendMessageBatchRequestEntry> iter = entries.iterator(); while(iter.hasNext()){ if(batchResult.getFailed().contains(iter.next())){ failedEntries.add((SendMessageBatchRequestEntry) iter.next()); } } batchRequest.setEntries(failedEntries); sqs.sendMessageBatch(batchRequest); } } catch (AmazonServiceException ase) { System.out.println("Caught an AmazonServiceException, which means your request made it " + "to Amazon SQS, but was rejected with an error response for some reason."); System.out.println("Error Message: " + ase.getMessage()); System.out.println("HTTP Status Code: " + ase.getStatusCode()); System.out.println("AWS Error Code: " + ase.getErrorCode()); System.out.println("Error Type: " + ase.getErrorType()); System.out.println("Request ID: " + ase.getRequestId()); } catch (AmazonClientException ace) { System.out.println("Caught an AmazonClientException, which means the client encountered " + "a serious internal problem while trying to communicate with SQS, such as not " + "being able to access the network."); System.out.println("Error Message: " + ace.getMessage()); } }
@Override public SendMessageBatchResult sendMessages(SendMessageBatchRequest request) { return sendMessages(request, null); }
/** * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the * channel is empty. * <p> * <pre> * Inputs: * channel = empty * batchSize = 5 * maxMessageSize = 10 Bytes * * Expected Output: * number of batches = 0 * </pre> */ @Test public void testCreateBatchesEmptyChannel() throws Exception { BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10); Channel mockChannel = Mockito.mock(Channel.class); when(mockChannel.take()).thenReturn(null); List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel); Assert.assertNotNull(batches); Assert.assertEquals(0, batches.size()); }
/** * <p> * Delivers up to ten messages to the specified queue. This is a batch * version of SendMessage. The result of the send action on each message is * reported individually in the response. The maximum allowed individual * message size is 256 KB (262,144 bytes). * </p> * <p> * The maximum total payload size (i.e., the sum of all a batch's individual * message lengths) is also 256 KB (262,144 bytes). * </p> * <p> * If the <code>DelaySeconds</code> parameter is not specified for an entry, * the default for the queue is used. * </p> * <p> * <b>IMPORTANT:</b>The following list shows the characters (in Unicode) * that are allowed in your message, according to the W3C XML specification. * For more information, go to http://www.faqs.org/rfcs/rfc1321.html. If you * send any characters that are not included in the list, your request will * be rejected. #x9 | #xA | #xD | [#x20 to #xD7FF] | [#xE000 to #xFFFD] | * [#x10000 to #x10FFFF] * </p> * <p> * <b>IMPORTANT:</b> Because the batch request can result in a combination * of successful and unsuccessful actions, you should check for batch errors * even when the call returns an HTTP status code of 200. * </p> * <p> * <b>NOTE:</b>Some API actions take lists of parameters. These lists are * specified using the param.n notation. Values of n are integers starting * from 1. For example, a parameter list with two elements looks like this: * </p> * <p> * <code>&Attribute.1=this</code> * </p> * <p> * <code>&Attribute.2=that</code> * </p> * * @param sendMessageBatchRequest * Container for the necessary parameters to execute the * SendMessageBatch service method on AmazonSQS. * * @return The response from the SendMessageBatch service method, as * returned by AmazonSQS. * * @throws BatchEntryIdsNotDistinctException * @throws TooManyEntriesInBatchRequestException * @throws BatchRequestTooLongException * @throws UnsupportedOperationException * @throws InvalidBatchEntryIdException * @throws EmptyBatchRequestException * * @throws AmazonClientException * If any internal errors are encountered inside the client * while attempting to make the request or handle the response. * For example if a network connection is not available. * @throws AmazonServiceException * If an error response is returned by AmazonSQS indicating * either a problem with the data in the request, or a server * side issue. */ public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest) throws AmazonServiceException, AmazonClientException { return amazonSqsToBeExtended.sendMessageBatch(sendMessageBatchRequest); }
/** * <p> * Delivers up to ten messages to the specified queue. This is a batch * version of SendMessage. The result of the send action on each message is * reported individually in the response. Uploads message payloads to Amazon * S3 when necessary. * </p> * <p> * If the <code>DelaySeconds</code> parameter is not specified for an entry, * the default for the queue is used. * </p> * <p> * <b>IMPORTANT:</b>The following list shows the characters (in Unicode) * that are allowed in your message, according to the W3C XML specification. * For more information, go to http://www.faqs.org/rfcs/rfc1321.html. If you * send any characters that are not included in the list, your request will * be rejected. #x9 | #xA | #xD | [#x20 to #xD7FF] | [#xE000 to #xFFFD] | * [#x10000 to #x10FFFF] * </p> * <p> * <b>IMPORTANT:</b> Because the batch request can result in a combination * of successful and unsuccessful actions, you should check for batch errors * even when the call returns an HTTP status code of 200. * </p> * <p> * <b>NOTE:</b>Some API actions take lists of parameters. These lists are * specified using the param.n notation. Values of n are integers starting * from 1. For example, a parameter list with two elements looks like this: * </p> * <p> * <code>&Attribute.1=this</code> * </p> * <p> * <code>&Attribute.2=that</code> * </p> * * @param queueUrl * The URL of the Amazon SQS queue to take action on. * @param entries * A list of <a>SendMessageBatchRequestEntry</a> items. * * @return The response from the SendMessageBatch service method, as * returned by AmazonSQS. * * @throws BatchEntryIdsNotDistinctException * @throws TooManyEntriesInBatchRequestException * @throws BatchRequestTooLongException * @throws UnsupportedOperationException * @throws InvalidBatchEntryIdException * @throws EmptyBatchRequestException * * @throws AmazonClientException * If any internal errors are encountered inside the client * while attempting to make the request or handle the response. * For example if a network connection is not available. * @throws AmazonServiceException * If an error response is returned by AmazonSQS indicating * either a problem with the data in the request, or a server * side issue. */ public SendMessageBatchResult sendMessageBatch(String queueUrl, List<SendMessageBatchRequestEntry> entries) { SendMessageBatchRequest sendMessageBatchRequest = new SendMessageBatchRequest(queueUrl, entries); return sendMessageBatch(sendMessageBatchRequest); }
/** * Performs the <code>SendMessages</code> action. * * <p> * The following request parameters will be populated from the data of this * <code>Queue</code> resource, and any conflicting parameter value set in * the request will be overridden: * <ul> * <li> * <b><code>QueueUrl</code></b> * - mapped from the <code>Url</code> identifier. * </li> * </ul> * * <p> * * @return The response of the low-level client operation associated with * this resource action. * @see SendMessageBatchRequest */ SendMessageBatchResult sendMessages(SendMessageBatchRequest request);
/** * Performs the <code>SendMessages</code> action and use a ResultCapture to * retrieve the low-level client response. * * <p> * The following request parameters will be populated from the data of this * <code>Queue</code> resource, and any conflicting parameter value set in * the request will be overridden: * <ul> * <li> * <b><code>QueueUrl</code></b> * - mapped from the <code>Url</code> identifier. * </li> * </ul> * * <p> * * @return The response of the low-level client operation associated with * this resource action. * @see SendMessageBatchRequest */ SendMessageBatchResult sendMessages(SendMessageBatchRequest request, ResultCapture<SendMessageBatchResult> extractor);