@VisibleForTesting static SendMessageBatchRequest createRequest(String queueUrl, Map<String, SendMessageEntry> entries) { return new SendMessageBatchRequest() .withQueueUrl(queueUrl) .withEntries(entries.entrySet().stream().map(keyValue -> { SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry() .withId(keyValue.getKey()) .withMessageBody(keyValue.getValue().getBody()); keyValue.getValue().getDelay() .ifPresent((delay) -> entry.setDelaySeconds((int) delay.getSeconds())); return entry; }).collect(Collectors.toList()) ); }
private String buildErrorMessage(List<SendMessageBatchRequestEntry> batchRequestEntries, List<BatchResultErrorEntry> errors) { StringBuilder errorMessage = new StringBuilder(); int count = 0; for (BatchResultErrorEntry error : errors) { if (count > 0) { errorMessage.append(","); } SendMessageBatchRequestEntry failedRequestEventEntry = findRequestEventEntryById(batchRequestEntries, error.getId()); String messageBody = failedRequestEventEntry == null ? null : failedRequestEventEntry.getMessageBody(); errorMessage.append("[" + error.toString() + ",{messageBody:" + "\"" + messageBody + "\"}]"); count++; } return errorMessage.toString(); }
/** * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests happy path scenario. * <p> * <pre> * Inputs: * channel = never empty * batchSize = 5 * maxMessageSize = 10 Bytes * each message size = 2 Bytes * * Expected Output: * number of batches = 1 * number of messages in batch = 5 * </pre> */ @Test public void testCreateBatches() throws Exception { BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10); byte[] mockMsgPayload = {'A', 'b'}; Event mockEvent = Mockito.mock(Event.class); when(mockEvent.getBody()).thenReturn(mockMsgPayload); Channel mockChannel = Mockito.mock(Channel.class); when(mockChannel.take()).thenReturn(mockEvent); List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel); Assert.assertNotNull(batches); Assert.assertEquals(1, batches.size()); List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries(); Assert.assertNotNull(msgEntries); Assert.assertEquals(5, msgEntries.size()); assertCorrectPayloadInEntries(mockMsgPayload, msgEntries); }
/** * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the * channel is empty after first event. * <p> * <pre> * Inputs: * channel = 1 Event (Empty after first take) * batchSize = 5 * maxMessageSize = 10 Bytes * * Expected Output: * number of batches = 1 * number of messages in batch = 1 * </pre> */ @Test public void testCreateBatchesEmptyChannelAfterFirstTake() throws Exception { BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10); byte[] mockMsgPayload = {'A', 'b'}; Event mockEvent = Mockito.mock(Event.class); when(mockEvent.getBody()).thenReturn(mockMsgPayload); Channel mockChannel = Mockito.mock(Channel.class); when(mockChannel.take()).thenReturn(mockEvent).thenReturn(null); List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel); Assert.assertNotNull(batches); Assert.assertEquals(1, batches.size()); List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries(); Assert.assertNotNull(msgEntries); Assert.assertEquals(1, msgEntries.size()); assertCorrectPayloadInEntries(mockMsgPayload, msgEntries); }
/** * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the * channel is empty after the last take for the batch. * <p> * <pre> * Inputs: * channel = 5 Events (Empty after 5th take) * batchSize = 5 * maxMessageSize = 10 Bytes * * Expected Output: * number of batches = 1 * number of messages in batch = 5 * </pre> */ @Test public void testCreateBatchesEmptyChannelAfterLastTake() throws Exception { BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10); byte[] mockMsgPayload = {'A', 'b'}; Event mockEvent = Mockito.mock(Event.class); when(mockEvent.getBody()).thenReturn(mockMsgPayload); Channel mockChannel = Mockito.mock(Channel.class); when(mockChannel.take()).thenReturn(mockEvent, mockEvent, mockEvent, mockEvent, mockEvent, null); List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel); Assert.assertNotNull(batches); Assert.assertEquals(1, batches.size()); List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries(); Assert.assertNotNull(msgEntries); Assert.assertEquals(5, msgEntries.size()); assertCorrectPayloadInEntries(mockMsgPayload, msgEntries); }
/** * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the * channel is empty in the middle of taking events for the batch * <p> * <pre> * Inputs: * channel = 3 Events (Empty after 3rd take) * batchSize = 5 * maxMessageSize = 10 Bytes * * Expected Output: * number of batches = 1 * number of messages in batch = 3 * </pre> */ @Test public void testCreateBatchesEmptyChannelInTheMiddle() throws Exception { BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10); byte[] mockMsgPayload = {'A', 'b'}; Event mockEvent = Mockito.mock(Event.class); when(mockEvent.getBody()).thenReturn(mockMsgPayload); Channel mockChannel = Mockito.mock(Channel.class); when(mockChannel.take()).thenReturn(mockEvent, mockEvent, mockEvent, null); List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel); Assert.assertNotNull(batches); Assert.assertEquals(1, batches.size()); List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries(); Assert.assertNotNull(msgEntries); Assert.assertEquals(3, msgEntries.size()); assertCorrectPayloadInEntries(mockMsgPayload, msgEntries); }
/** * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the * channel is not empty but contains events with empty body in the middle of taking events for the batch * <p> * <pre> * Inputs: * channel = 4 Events (3 Events with Body and 4th Event empty) * batchSize = 5 * maxMessageSize = 10 Bytes * * Expected Output: * number of batches = 1 * number of messages in batch = 3 * </pre> */ @Test public void testCreateBatchesEmptyEventInTheMiddle() throws Exception { BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10); byte[] mockMsgPayload = {'A', 'b'}; byte[] mockEmptyMsgPayload = {}; Event mockEvent = Mockito.mock(Event.class); Event mockEmptyEvent = Mockito.mock(Event.class); when(mockEvent.getBody()).thenReturn(mockMsgPayload); when(mockEmptyEvent.getBody()).thenReturn(mockEmptyMsgPayload); Channel mockChannel = Mockito.mock(Channel.class); when(mockChannel.take()).thenReturn(mockEvent, mockEvent, mockEvent, mockEmptyEvent); List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel); Assert.assertNotNull(batches); Assert.assertEquals(1, batches.size()); List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries(); Assert.assertNotNull(msgEntries); Assert.assertEquals(3, msgEntries.size()); assertCorrectPayloadInEntries(mockMsgPayload, msgEntries); }
/*** * Categorize the messages into batches per queue * @param messages * @return messageBatches - belonging to one or more queues */ private Map<String, List<SendMessageBatchRequestEntry>> createBatchesForQueues(final List<Message> messages) { final Map<String, List<SendMessageBatchRequestEntry>> messageBatches = new HashMap<String, List<SendMessageBatchRequestEntry>>(); for(Message message : messages){ final Map<String, MessageAttributeValue> attributes = this.toMessageAttrs(message); final SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry() .withId(message.getId()) .withMessageAttributes(attributes) .withMessageBody(message.getBody()); if(!messageBatches.containsKey(message.getQueue())){ messageBatches.put(message.getQueue(), new ArrayList<SendMessageBatchRequestEntry>()); } messageBatches.get(message.getQueue()).add(entry); } return messageBatches; }
/** * Posts many messages to queue, with a message envelope that makes them look like they * were sent through Amazon SNS. * * @param messages list of messages to post * @param eventName the value that will be used as "subject" in the SNS envelope * @throws MessagingException Failed to post messages. */ @Override public <T> void postBatch(Collection<T> messages, String eventName) throws MessagingException { if (empty(eventName)) { throw new MessagingException("Cannot publish message with empty eventName!"); } try { Collection<SendMessageBatchRequestEntry> allEntries = new ArrayList<>(messages.size()); int messageIdInBatch = 0; for (T message : messages) { ++messageIdInBatch; String messageBody = wrapInSNSMessage(message, eventName); allEntries.add(new SendMessageBatchRequestEntry(String.valueOf(messageIdInBatch), messageBody)); } sendMessageBatch(allEntries); } catch (AmazonServiceException | IOException | CryptographyException e) { throw new MessagingException("Failed to post messages: " + messages.getClass(), e); } }
void publishMessages(List<Message> messages) { logger.info("Sending {} messages", messages.size()); SendMessageBatchRequest batch = new SendMessageBatchRequest(queueURL); messages.stream().forEach(msg -> { SendMessageBatchRequestEntry sendr = new SendMessageBatchRequestEntry(msg.getId(), msg.getPayload()); batch.getEntries().add(sendr); }); logger.info("sending {}", batch.getEntries().size()); SendMessageBatchResult result = client.sendMessageBatch(batch); logger.info("send result {}", result.getFailed().toString()); }
/** * Handles SQS send message batch result and throws EventDeliveryException to cause the flume transaction to fail * and let flume retry the whole batch in case all the messages in the batch failed to be delivered to SQS. * Currently, this method does just logs errors and skips the messages in case some messages from the batched failed * to be delivered but some succeeded (i.e., partial batch failure). * <p> * TODO: Add retry logic instead letting flume drop the failed messages in case of partial batch failure * * @param batchRequest The SQS SendMessageBatchRequest * @param batchResult The SQS SendMessageBatchResult * * @throws EventDeliveryException In case all the messages in the batch failed to be delivered to SQS */ protected void handleResult(SendMessageBatchRequest batchRequest, SendMessageBatchResult batchResult) throws EventDeliveryException { List<SendMessageBatchRequestEntry> batchRequestEntries = batchRequest.getEntries(); List<BatchResultErrorEntry> errors = batchResult.getFailed(); int attemptedCount = batchRequestEntries == null ? 0 : batchRequestEntries.size(); int errorCount = errors == null ? 0 : errors.size(); if (errorCount > 0) { String errorMessage = buildErrorMessage(batchRequestEntries, errors); if (attemptedCount == errorCount) { // if it was a non-empty batch and if all the messages in the batch have errors then fail the whole // batch and let flume rollback the transaction and retry it // Just throw the EventDeliveryException. This will eventually cause the channel's transaction to // rollback. throw new EventDeliveryException(errorMessage); } else { // TODO: Add retry logic instead letting flume drop the failed messages in case of partial batch failure // Just log the error message and let flume drop failed messages in case of partial batch failures LOG.error(errorMessage); } } }
private SendMessageBatchRequestEntry findRequestEventEntryById(List<SendMessageBatchRequestEntry> entries, String id) { SendMessageBatchRequestEntry foundEntry = null; if (entries != null) { for (SendMessageBatchRequestEntry entry : entries) { if (entry.getId().equals(id)) { foundEntry = entry; break; } } } return foundEntry; }
/** * Tests {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests invalid characters not * allowed by the SQS. See [http://docs.aws.amazon * .com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html] * for list of valid characters allowed by SQS. * <p> * <p> * <pre> * Inputs: * channel = never empty. with messages containing invalid characters. * * Expected Output: * The sink messages should not contain invalid characters * </pre> */ @Test public void testInvalidCharacters() throws Exception { // See // http://stackoverflow.com/questions/16688523/aws-sqs-valid-characters // http://stackoverflow.com/questions/1169754/amazon-sqs-invalid-binary-character-in-message-body // https://forums.aws.amazon.com/thread.jspa?messageID=459090 // http://stackoverflow.com/questions/16329695/invalid-binary-character-when-transmitting-protobuf-net // -messages-over-aws-sqs byte invalidCharByte = 0x1C; String mockMsg = "Test with some invalid chars at the end 0%2F>^F"; byte[] origPayloadWithInvalidChars = ArrayUtils.add(mockMsg.getBytes(), invalidCharByte); BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 1, origPayloadWithInvalidChars.length); Event mockEvent = Mockito.mock(Event.class); when(mockEvent.getBody()).thenReturn(origPayloadWithInvalidChars); Channel mockChannel = Mockito.mock(Channel.class); when(mockChannel.take()).thenReturn(mockEvent); List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel); List<SendMessageBatchRequestEntry> msgEntries = batches.get(0).getEntries(); assertCorrectPayloadInEntries(new String(origPayloadWithInvalidChars).trim().getBytes(), msgEntries); // Make sure that the message being sent by the sink doesn't contain the invalid characters for (SendMessageBatchRequestEntry entry : msgEntries) { Assert.assertNotNull(entry); Assert.assertTrue(ArrayUtils.contains(new String(origPayloadWithInvalidChars).getBytes(), invalidCharByte)); Assert.assertTrue(!ArrayUtils.contains(entry.getMessageBody().getBytes(), invalidCharByte)); } }
/** * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the * specified <i>batchSize</i> can not be fit into the specified <i>maxMessageSize</i> * <p> * <pre> * Inputs: * channel = never empty * batchSize = 5 * maxMessageSize = 10 Bytes * each message size = 3 Bytes * * Expected Output: * number of batches = 2 * number of messages in batch 1 = 3 * number of messages in batch 2 = 2 * </pre> */ @Test public void testCreateBatchesExceedingSize() throws Exception { BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10); byte[] mockMsgPayload = {'A', 'b', '~'}; Event mockEvent = Mockito.mock(Event.class); when(mockEvent.getBody()).thenReturn(mockMsgPayload); Channel mockChannel = Mockito.mock(Channel.class); when(mockChannel.take()).thenReturn(mockEvent); List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel); Assert.assertNotNull(batches); Assert.assertEquals(2, batches.size()); List<SendMessageBatchRequestEntry> msgEntries1 = batches.get(0).getEntries(); Assert.assertNotNull(msgEntries1); Assert.assertEquals(3, msgEntries1.size()); List<SendMessageBatchRequestEntry> msgEntries2 = batches.get(1).getEntries(); Assert.assertNotNull(msgEntries2); Assert.assertEquals(2, msgEntries2.size()); assertCorrectPayloadInEntries(mockMsgPayload, msgEntries2); }
/** * Tests the {@link BatchSQSMsgSender#createBatches(org.apache.flume.Channel)} method. Tests the case when the * specified <i>batchSize</i> can not fit into the specified <i>maxMessageSize</i> and channel gets empty after * certain number of events "takes". * <p> * <pre> * Inputs: * channel = 4 Events * batchSize = 5 * maxMessageSize = 10 Bytes * each message size = 3 Bytes * * Expected Output: * number of batches = 2 * number of messages in batch 1 = 3 * number of messages in batch 2 = 1 * </pre> */ @Test public void testCreateBatchesExceedingSizeLimitedChannel() throws Exception { BatchSQSMsgSender sqsMsgSender = new BatchSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey", 5, 10); byte[] mockMsgPayload = {'^', '@', '~'}; Event mockEvent = Mockito.mock(Event.class); when(mockEvent.getBody()).thenReturn(mockMsgPayload); Channel mockChannel = Mockito.mock(Channel.class); when(mockChannel.take()).thenReturn(mockEvent, mockEvent, mockEvent, mockEvent, null); List<SendMessageBatchRequest> batches = sqsMsgSender.createBatches(mockChannel); Assert.assertNotNull(batches); Assert.assertEquals(2, batches.size()); List<SendMessageBatchRequestEntry> msgEntries1 = batches.get(0).getEntries(); Assert.assertNotNull(msgEntries1); Assert.assertEquals(3, msgEntries1.size()); List<SendMessageBatchRequestEntry> msgEntries2 = batches.get(1).getEntries(); Assert.assertNotNull(msgEntries2); Assert.assertEquals(1, msgEntries2.size()); assertCorrectPayloadInEntries(mockMsgPayload, msgEntries2); }
private void assertCorrectPayloadInEntries(byte[] mockMsgPayload, List<SendMessageBatchRequestEntry> msgEntries) throws UnsupportedEncodingException { for (SendMessageBatchRequestEntry entry : msgEntries) { Assert.assertNotNull(entry); Assert.assertEquals(new String(mockMsgPayload, "UTF-8"), entry.getMessageBody()); } }
public static void main(String[] args) { final AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient(); try { CreateQueueResult create_result = sqs.createQueue(QUEUE_NAME); } catch (AmazonSQSException e) { if (!e.getErrorCode().equals("QueueAlreadyExists")) { throw e; } } String queueUrl = sqs.getQueueUrl(QUEUE_NAME).getQueueUrl(); SendMessageRequest send_msg_request = new SendMessageRequest() .withQueueUrl(queueUrl) .withMessageBody("hello world") .withDelaySeconds(5); sqs.sendMessage(send_msg_request); // Send multiple messages to the queue SendMessageBatchRequest send_batch_request = new SendMessageBatchRequest() .withQueueUrl(queueUrl) .withEntries( new SendMessageBatchRequestEntry( "msg_1", "Hello from message 1"), new SendMessageBatchRequestEntry( "msg_2", "Hello from message 2") .withDelaySeconds(10)); sqs.sendMessageBatch(send_batch_request); // receive messages from the queue List<Message> messages = sqs.receiveMessage(queueUrl).getMessages(); // delete messages from the queue for (Message m : messages) { sqs.deleteMessage(queueUrl, m.getReceiptHandle()); } }
private SendMessageBatchRequestEntry storeMessageInS3(SendMessageBatchRequestEntry batchEntry) { checkMessageAttributes(batchEntry.getMessageAttributes()); String s3Key = UUID.randomUUID().toString(); // Read the content of the message from message body String messageContentStr = batchEntry.getMessageBody(); Long messageContentSize = getStringSizeInBytes(messageContentStr); // Add a new message attribute as a flag MessageAttributeValue messageAttributeValue = new MessageAttributeValue(); messageAttributeValue.setDataType("Number"); messageAttributeValue.setStringValue(messageContentSize.toString()); batchEntry.addMessageAttributesEntry(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, messageAttributeValue); // Store the message content in S3. storeTextInS3(s3Key, messageContentStr, messageContentSize); LOG.info("S3 object created, Bucket name: " + clientConfiguration.getS3BucketName() + ", Object key: " + s3Key + "."); // Convert S3 pointer (bucket name, key, etc) to JSON string MessageS3Pointer s3Pointer = new MessageS3Pointer(clientConfiguration.getS3BucketName(), s3Key); String s3PointerStr = getJSONFromS3Pointer(s3Pointer); // Storing S3 pointer in the message body. batchEntry.setMessageBody(s3PointerStr); return batchEntry; }
/** * @todo Roba schifosa! */ public void sendBulkMessage(List<String> messages){ List<SendMessageBatchRequestEntry> entries = new ArrayList<>(10); Integer i=0; for(String m : messages){ entries.add(new SendMessageBatchRequestEntry((i++).toString(), m)); } c.sendMessageBatch(queueUrl, entries); }
public void remoteBatchSend(BufferedReader in) throws ParseException{ //Batch sending task to remote workers List<SendMessageBatchRequestEntry> entries = new ArrayList<SendMessageBatchRequestEntry>(); String message; final int batchSize = 10; try { JSONParser parser=new JSONParser(); while ((message = in.readLine()) != null) { JSONArray taskList = (JSONArray)parser.parse(message); for(int i=0; i< taskList.size(); i++){ JSONObject task = (JSONObject)taskList.get(i); msg_cnt++; entries.add(new SendMessageBatchRequestEntry() .withId(Integer.toString(msg_cnt)) .withMessageBody(task.toString())); } if(entries.size() == batchSize){ jobQ.batchSend(entries); entries.clear(); } } if(!entries.isEmpty()){ jobQ.batchSend(entries); entries.clear(); } } catch (IOException e) { e.printStackTrace(); } }
/** * Pushes a number of messages in batch to an SQS queue. * @param queueURL the URL of the SQS queue * @param messages the massage bodies */ public static void pushMessages(String queueURL, List<String> messages) { if (!StringUtils.isBlank(queueURL) && messages != null) { // only allow strings - ie JSON try { int j = 0; List<SendMessageBatchRequestEntry> msgs = new ArrayList<>(MAX_MESSAGES); for (int i = 0; i < messages.size(); i++) { String message = messages.get(i); if (!StringUtils.isBlank(message)) { msgs.add(new SendMessageBatchRequestEntry(). withMessageBody(message). withId(Integer.toString(i))); } if (++j >= MAX_MESSAGES || i == messages.size() - 1) { if (!msgs.isEmpty()) { getClient().sendMessageBatch(queueURL, msgs); msgs.clear(); } j = 0; } } } catch (AmazonServiceException ase) { logException(ase); } catch (AmazonClientException ace) { logger.error("Could not reach SQS. {}", ace.toString()); } } }
/*** * Submit the batches of messages * @param messageBatches */ private void submitBatches( final Map<String, List<SendMessageBatchRequestEntry>> messageBatches) { for(Entry<String, List<SendMessageBatchRequestEntry>> queueBatchEntry : messageBatches.entrySet()){ final String queueUrl = this.queueAdmin.getQueueUrl(queueBatchEntry.getKey()); final SendMessageBatchRequest batch = new SendMessageBatchRequest() .withQueueUrl(queueUrl) .withEntries(queueBatchEntry.getValue()); final SendMessageBatchResult batchResult = this.sqs.sendMessageBatch(batch); this.logFailures(batchResult.getFailed()); } }
@Override public SendMessageBatchResult sendMessages( List<SendMessageBatchRequestEntry> entries) { return sendMessages(entries, (ResultCapture<SendMessageBatchResult>)null); }
@Override public SendMessageBatchResult sendMessages( List<SendMessageBatchRequestEntry> entries, ResultCapture<SendMessageBatchResult> extractor) { SendMessageBatchRequest request = new SendMessageBatchRequest() .withEntries(entries); return sendMessages(request, extractor); }
/** * Tests sending messages using batch operation and retrieve them. Also * tests setting the queue attributes and retrieving them. */ @Test @Ignore public void testQueueSubResourceAndAttributes() throws InterruptedException { /** * Trying to get the message which is deleted. Here there is no service * call made, a new sub resource is created with the given handle. So, * this wont be returning null. */ Message message = queue.getMessage("invalid-recepient-handle"); assertNotNull(message); try { message.getAttributes(); fail("An unsupported operation exception must be thrown as load operation is no supported on message attribute"); } catch (UnsupportedOperationException use) { } SendMessageBatchResult sendMessageBatchResult = queue .sendMessages(new SendMessageBatchRequest() .withEntries(new SendMessageBatchRequestEntry("msg1", TEST_MESSAGE))); SendMessageBatchResultEntry sendMessageBatchResultEntry = sendMessageBatchResult .getSuccessful().get(0); List<Message> messages = waitForMessagesFromQueue(null); assertNotNull(messages); assertEquals(1, messages.size()); message = messages.get(0); assertMessage(TEST_MESSAGE, sendMessageBatchResultEntry.getMessageId(), sendMessageBatchResultEntry.getMD5OfMessageBody(), message); queue.setAttributes(ImmutableMapParameter.of("MaximumMessageSize", "2048")); assertTrue(queue.getAttributes().containsKey("MaximumMessageSize")); }
@Override public void run() { List<SendMessageBatchRequestEntry> messages = new ArrayList<>(); for (int i = 0; i < BATCH_MESSAGE_SIZE; i++) { messages.add(new SendMessageBatchRequestEntry(Integer.toString(i), new StringBuilder().append("message_").append(i).toString())); } this.amazonSqs.sendMessageBatch(new SendMessageBatchRequest(this.queueUrl, messages)); this.countDownLatch.countDown(); }
private void sendMessageBatch(Collection<SendMessageBatchRequestEntry> messages) { for (Collection<SendMessageBatchRequestEntry> batch : partition(messages, MAX_BATCH_SIZE)) { final SendMessageBatchResult sendMessageBatchResult = amazonSQS.sendMessageBatch(new SendMessageBatchRequest(queueUrl, new ArrayList<>(batch))); final List<BatchResultErrorEntry> failed = sendMessageBatchResult.getFailed(); if (!failed.isEmpty()) { try { Set<String> failedMessageIds = failed.stream().map(BatchResultErrorEntry::getId).collect(Collectors.toSet()); final Map<String, SendMessageBatchRequestEntry> failedMessageIdToMessage = batch.stream().filter(failedMessageIds::contains).collect(Collectors.toMap( SendMessageBatchRequestEntry::getId, Function.identity() )); failed.stream().forEach(failMessage -> { final SendMessageBatchRequestEntry failedEntry = failedMessageIdToMessage.get(failMessage.getId()); if (failedEntry != null) { final String messageBody = failedEntry.getMessageBody(); LOG.error( "Failed to send message, due to {}, message content : {} ", failMessage, messageBody ); } }); } catch (Exception e) { LOG.error("Failed to log failed to send messages", e); } } } }
@Test public void postBatchShouldSendMessagesWithSNSEnvelope() throws Exception { // Arrange when(mockAmazonSQS.sendMessageBatch(any(SendMessageBatchRequest.class))).thenReturn(mock(SendMessageBatchResult.class)); ArgumentCaptor<SendMessageBatchRequest> captor = ArgumentCaptor.forClass(SendMessageBatchRequest.class); // Act messagePublisher.postBatch( Arrays.asList( new TestMessage("Hello"), new TestMessage("world") ), "subject" ); // Assert verify(mockAmazonSQS).sendMessageBatch(captor.capture()); SendMessageBatchRequest sendMessageBatchRequest = captor.getValue(); assertThat(sendMessageBatchRequest.getQueueUrl()).isEqualTo("queueUrl"); List<SendMessageBatchRequestEntry> entries = sendMessageBatchRequest.getEntries(); assertThat(entries.size()).isEqualTo(2); ObjectMapper mapper = new ObjectMapper(); AmazonSNSMessage msg1 = mapper.readValue(entries.get(0).getMessageBody(), AmazonSNSMessage.class); assertThat(msg1.getSubject()).isEqualTo("subject"); assertThat(msg1.getMessage()).isEqualTo("{\"message\":\"Hello\"}"); AmazonSNSMessage msg2 = mapper.readValue(entries.get(1).getMessageBody(), AmazonSNSMessage.class); assertThat(msg2.getSubject()).isEqualTo("subject"); assertThat(msg2.getMessage()).isEqualTo("{\"message\":\"world\"}"); }
@Override public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest) throws AmazonClientException { DirectorySQSQueue queue = getQueueFromUrl(sendMessageBatchRequest.getQueueUrl(), false); //lists for reporting List<BatchResultErrorEntry> batchResultErrorEntries = new ArrayList<>(); List<SendMessageBatchResultEntry> batchResultEntries = new ArrayList<>(); //attempt to change the visibility on each for (SendMessageBatchRequestEntry batchRequestEntry : sendMessageBatchRequest.getEntries()) { try { final int invisibilityDelay = Objects.firstNonNull(batchRequestEntry.getDelaySeconds(), 0);//0 is amazon spec default Message sentMessage = queue.send(batchRequestEntry.getMessageBody(), invisibilityDelay); batchResultEntries.add(new SendMessageBatchResultEntry(). withId(batchRequestEntry.getId()). withMessageId(sentMessage.getMessageId()). withMD5OfMessageBody(sentMessage.getMD5OfBody())); } catch (IOException e) { BatchResultErrorEntry batchResultErrorEntry = new BatchResultErrorEntry(). withSenderFault(false). withId(batchRequestEntry.getId()). withMessage(e.getMessage()); batchResultErrorEntries.add(batchResultErrorEntry); } } return new SendMessageBatchResult(). withFailed(batchResultErrorEntries). withSuccessful(batchResultEntries); }
@Test public void testBulkSendDelete_shouldWork() { // create queue CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue")); // send batch SendMessageBatchRequestEntry firstRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("one") .withMessageGroupId("groupee").withMessageBody("{\"XOXO\":234}"); SendMessageBatchRequestEntry secondRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("two") .withMessageGroupId("groupee").withMessageBody("{\"Quinoa\":\"Readymade\",\"vegan\":true}"); SendMessageBatchRequestEntry thirdRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("three") .withMessageGroupId("groupee").withMessageBody("{\"VHS\":\"street art slow-carb\"}"); // verify send batch result SendMessageBatchResult sendResult = sqs.sendMessageBatch(new SendMessageBatchRequest().withQueueUrl(createdQueue.getQueueUrl()) .withEntries(ImmutableList.of(firstRequest,secondRequest, thirdRequest))); assertNotNull("verify that batch send returned ok", sendResult); assertTrue("no request failed",sendResult.getFailed().isEmpty()); assertEquals("verify successfull message count", 3, sendResult.getSuccessful().size()); SendMessageBatchResultEntry firstResultEntry = sendResult.getSuccessful().stream().filter(msg -> msg.getId().equals("one")).findAny().get(); assertEquals("verify correct message MD5",getAwsMessageMD5("{\"XOXO\":234}"),firstResultEntry.getMD5OfMessageBody()); assertNotNull("verify message id exists",firstResultEntry.getMessageId()); ReceiveMessageResult receivedMessagesResult = sqs.receiveMessage(new ReceiveMessageRequest().withQueueUrl(createdQueue.getQueueUrl()).withMaxNumberOfMessages(4)); // delete batch List<DeleteMessageBatchRequestEntry> deleteRequests = new ArrayList<>(); deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("one").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle())); deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("two").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle())); deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("three").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle())); DeleteMessageBatchResult deleteBatchResult = sqs.deleteMessageBatch(new DeleteMessageBatchRequest().withQueueUrl(createdQueue.getQueueUrl()).withEntries(deleteRequests)); // verify delete batch result assertNotNull("verify that batch delete returned ok", deleteBatchResult); assertTrue("no request failed",deleteBatchResult.getFailed().isEmpty()); assertEquals("verify successfull message count", 3, deleteBatchResult.getSuccessful().size()); assertTrue("queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getMessageQueue().isEmpty()); for(Message message : receivedMessagesResult.getMessages()) { assertTrue("invisibility-queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getInvisibilityQueueFor(message.getReceiptHandle()).isEmpty()); } // cleanup getQueues().remove("tea-earl-grey-queue"); }
public Observable<SendMessageBatchResult> sendMessageBatchAsync(String queueUrl, List<SendMessageBatchRequestEntry> entries) { return Observable.from(sqsClient.sendMessageBatchAsync(queueUrl, entries)); }
private boolean isLarge(SendMessageBatchRequestEntry batchEntry) { int msgAttributesSize = getMsgAttributesSize(batchEntry.getMessageAttributes()); long msgBodySize = getStringSizeInBytes(batchEntry.getMessageBody()); long totalMsgSize = msgAttributesSize + msgBodySize; return (totalMsgSize > clientConfiguration.getMessageSizeThreshold()); }
public void batchSend(List<SendMessageBatchRequestEntry> entries){ try { // Send batch messages //System.out.println("\nSending a message to jobQueue.\n"); SendMessageBatchRequest batchRequest = new SendMessageBatchRequest().withQueueUrl(queueUrl); batchRequest.setEntries(entries); SendMessageBatchResult batchResult = sqs.sendMessageBatch(batchRequest); // sendMessageBatch can return successfully, and yet individual batch // items fail. So, make sure to retry the failed ones. if (!batchResult.getFailed().isEmpty()) { //System.out.println("Retry sending failed messages..."); List<SendMessageBatchRequestEntry> failedEntries = new ArrayList<SendMessageBatchRequestEntry>(); Iterator<SendMessageBatchRequestEntry> iter = entries.iterator(); while(iter.hasNext()){ if(batchResult.getFailed().contains(iter.next())){ failedEntries.add((SendMessageBatchRequestEntry) iter.next()); } } batchRequest.setEntries(failedEntries); sqs.sendMessageBatch(batchRequest); } } catch (AmazonServiceException ase) { System.out.println("Caught an AmazonServiceException, which means your request made it " + "to Amazon SQS, but was rejected with an error response for some reason."); System.out.println("Error Message: " + ase.getMessage()); System.out.println("HTTP Status Code: " + ase.getStatusCode()); System.out.println("AWS Error Code: " + ase.getErrorCode()); System.out.println("Error Type: " + ase.getErrorType()); System.out.println("Request ID: " + ase.getRequestId()); } catch (AmazonClientException ace) { System.out.println("Caught an AmazonClientException, which means the client encountered " + "a serious internal problem while trying to communicate with SQS, such as not " + "being able to access the network."); System.out.println("Error Message: " + ace.getMessage()); } }
/** * <p> * Delivers up to ten messages to the specified queue. This is a batch * version of SendMessage. The result of the send action on each message is * reported individually in the response. The maximum allowed individual * message size is 256 KB (262,144 bytes). * </p> * <p> * The maximum total payload size (i.e., the sum of all a batch's individual * message lengths) is also 256 KB (262,144 bytes). * </p> * <p> * If the <code>DelaySeconds</code> parameter is not specified for an entry, * the default for the queue is used. * </p> * <p> * <b>IMPORTANT:</b>The following list shows the characters (in Unicode) * that are allowed in your message, according to the W3C XML specification. * For more information, go to http://www.faqs.org/rfcs/rfc1321.html. If you * send any characters that are not included in the list, your request will * be rejected. #x9 | #xA | #xD | [#x20 to #xD7FF] | [#xE000 to #xFFFD] | * [#x10000 to #x10FFFF] * </p> * <p> * <b>IMPORTANT:</b> Because the batch request can result in a combination * of successful and unsuccessful actions, you should check for batch errors * even when the call returns an HTTP status code of 200. * </p> * <p> * <b>NOTE:</b>Some API actions take lists of parameters. These lists are * specified using the param.n notation. Values of n are integers starting * from 1. For example, a parameter list with two elements looks like this: * </p> * <p> * <code>&Attribute.1=this</code> * </p> * <p> * <code>&Attribute.2=that</code> * </p> * * @param queueUrl * The URL of the Amazon SQS queue to take action on. * @param entries * A list of <a>SendMessageBatchRequestEntry</a> items. * * @return The response from the SendMessageBatch service method, as * returned by AmazonSQS. * * @throws BatchEntryIdsNotDistinctException * @throws TooManyEntriesInBatchRequestException * @throws BatchRequestTooLongException * @throws UnsupportedOperationException * @throws InvalidBatchEntryIdException * @throws EmptyBatchRequestException * * @throws AmazonClientException * If any internal errors are encountered inside the client * while attempting to make the request or handle the response. * For example if a network connection is not available. * @throws AmazonServiceException * If an error response is returned by AmazonSQS indicating * either a problem with the data in the request, or a server * side issue. */ public SendMessageBatchResult sendMessageBatch(String queueUrl, List<SendMessageBatchRequestEntry> entries) throws AmazonServiceException, AmazonClientException { return amazonSqsToBeExtended.sendMessageBatch(queueUrl, entries); }
/** * <p> * Delivers up to ten messages to the specified queue. This is a batch * version of SendMessage. The result of the send action on each message is * reported individually in the response. Uploads message payloads to Amazon * S3 when necessary. * </p> * <p> * If the <code>DelaySeconds</code> parameter is not specified for an entry, * the default for the queue is used. * </p> * <p> * <b>IMPORTANT:</b>The following list shows the characters (in Unicode) * that are allowed in your message, according to the W3C XML specification. * For more information, go to http://www.faqs.org/rfcs/rfc1321.html. If you * send any characters that are not included in the list, your request will * be rejected. #x9 | #xA | #xD | [#x20 to #xD7FF] | [#xE000 to #xFFFD] | * [#x10000 to #x10FFFF] * </p> * <p> * <b>IMPORTANT:</b> Because the batch request can result in a combination * of successful and unsuccessful actions, you should check for batch errors * even when the call returns an HTTP status code of 200. * </p> * <p> * <b>NOTE:</b>Some API actions take lists of parameters. These lists are * specified using the param.n notation. Values of n are integers starting * from 1. For example, a parameter list with two elements looks like this: * </p> * <p> * <code>&Attribute.1=this</code> * </p> * <p> * <code>&Attribute.2=that</code> * </p> * * @param queueUrl * The URL of the Amazon SQS queue to take action on. * @param entries * A list of <a>SendMessageBatchRequestEntry</a> items. * * @return The response from the SendMessageBatch service method, as * returned by AmazonSQS. * * @throws BatchEntryIdsNotDistinctException * @throws TooManyEntriesInBatchRequestException * @throws BatchRequestTooLongException * @throws UnsupportedOperationException * @throws InvalidBatchEntryIdException * @throws EmptyBatchRequestException * * @throws AmazonClientException * If any internal errors are encountered inside the client * while attempting to make the request or handle the response. * For example if a network connection is not available. * @throws AmazonServiceException * If an error response is returned by AmazonSQS indicating * either a problem with the data in the request, or a server * side issue. */ public SendMessageBatchResult sendMessageBatch(String queueUrl, List<SendMessageBatchRequestEntry> entries) { SendMessageBatchRequest sendMessageBatchRequest = new SendMessageBatchRequest(queueUrl, entries); return sendMessageBatch(sendMessageBatchRequest); }
/** * Submit a batch of messages to SQS. The messages can be destined for different queues. This * method will categories the messages in batches according to what queue they around bound for. After * categorization of the messages into batches, each batch will be sent serially. * * failures will be written to the error log. * * @param messages - The batches of messages which can be destined for one or more queues */ @Override public void submitBatch(final List<Message> messages) { final Map<String, List<SendMessageBatchRequestEntry>> messageBatches = this.createBatchesForQueues(messages); this.submitBatches(messageBatches); }
/** * The convenient method form for the <code>SendMessages</code> action. * * @see #sendMessages(SendMessageBatchRequest) */ SendMessageBatchResult sendMessages(List<SendMessageBatchRequestEntry> entries);
/** * The convenient method form for the <code>SendMessages</code> action. * * @see #sendMessages(SendMessageBatchRequest, ResultCapture) */ SendMessageBatchResult sendMessages(List<SendMessageBatchRequestEntry> entries, ResultCapture<SendMessageBatchResult> extractor);