private SendMessageBatchResult mockBatchResult(int batchSize, int expectedSuccessCount) { SendMessageBatchResult mockResult = Mockito.mock(SendMessageBatchResult.class); List<SendMessageBatchResultEntry> successfulEntries = new ArrayList<SendMessageBatchResultEntry>(); for (int i = 0; i < expectedSuccessCount; i++) { successfulEntries.add(new SendMessageBatchResultEntry().withId(String.valueOf(i + 1))); } when(mockResult.getSuccessful()).thenReturn(successfulEntries); List<BatchResultErrorEntry> failedEntries = new ArrayList<BatchResultErrorEntry>(); for (int i = expectedSuccessCount; i < batchSize; i++) { failedEntries.add( new BatchResultErrorEntry().withId(String.valueOf(i + 1)).withCode("401").withSenderFault(true) .withMessage("Invalid binary character")); } when(mockResult.getFailed()).thenReturn(failedEntries); return mockResult; }
/** * Tests sending messages using batch operation and retrieve them. Also * tests setting the queue attributes and retrieving them. */ @Test @Ignore public void testQueueSubResourceAndAttributes() throws InterruptedException { /** * Trying to get the message which is deleted. Here there is no service * call made, a new sub resource is created with the given handle. So, * this wont be returning null. */ Message message = queue.getMessage("invalid-recepient-handle"); assertNotNull(message); try { message.getAttributes(); fail("An unsupported operation exception must be thrown as load operation is no supported on message attribute"); } catch (UnsupportedOperationException use) { } SendMessageBatchResult sendMessageBatchResult = queue .sendMessages(new SendMessageBatchRequest() .withEntries(new SendMessageBatchRequestEntry("msg1", TEST_MESSAGE))); SendMessageBatchResultEntry sendMessageBatchResultEntry = sendMessageBatchResult .getSuccessful().get(0); List<Message> messages = waitForMessagesFromQueue(null); assertNotNull(messages); assertEquals(1, messages.size()); message = messages.get(0); assertMessage(TEST_MESSAGE, sendMessageBatchResultEntry.getMessageId(), sendMessageBatchResultEntry.getMD5OfMessageBody(), message); queue.setAttributes(ImmutableMapParameter.of("MaximumMessageSize", "2048")); assertTrue(queue.getAttributes().containsKey("MaximumMessageSize")); }
@Override public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest) throws AmazonClientException { DirectorySQSQueue queue = getQueueFromUrl(sendMessageBatchRequest.getQueueUrl(), false); //lists for reporting List<BatchResultErrorEntry> batchResultErrorEntries = new ArrayList<>(); List<SendMessageBatchResultEntry> batchResultEntries = new ArrayList<>(); //attempt to change the visibility on each for (SendMessageBatchRequestEntry batchRequestEntry : sendMessageBatchRequest.getEntries()) { try { final int invisibilityDelay = Objects.firstNonNull(batchRequestEntry.getDelaySeconds(), 0);//0 is amazon spec default Message sentMessage = queue.send(batchRequestEntry.getMessageBody(), invisibilityDelay); batchResultEntries.add(new SendMessageBatchResultEntry(). withId(batchRequestEntry.getId()). withMessageId(sentMessage.getMessageId()). withMD5OfMessageBody(sentMessage.getMD5OfBody())); } catch (IOException e) { BatchResultErrorEntry batchResultErrorEntry = new BatchResultErrorEntry(). withSenderFault(false). withId(batchRequestEntry.getId()). withMessage(e.getMessage()); batchResultErrorEntries.add(batchResultErrorEntry); } } return new SendMessageBatchResult(). withFailed(batchResultErrorEntries). withSuccessful(batchResultEntries); }
@Test public void testBulkSendDelete_shouldWork() { // create queue CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue")); // send batch SendMessageBatchRequestEntry firstRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("one") .withMessageGroupId("groupee").withMessageBody("{\"XOXO\":234}"); SendMessageBatchRequestEntry secondRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("two") .withMessageGroupId("groupee").withMessageBody("{\"Quinoa\":\"Readymade\",\"vegan\":true}"); SendMessageBatchRequestEntry thirdRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("three") .withMessageGroupId("groupee").withMessageBody("{\"VHS\":\"street art slow-carb\"}"); // verify send batch result SendMessageBatchResult sendResult = sqs.sendMessageBatch(new SendMessageBatchRequest().withQueueUrl(createdQueue.getQueueUrl()) .withEntries(ImmutableList.of(firstRequest,secondRequest, thirdRequest))); assertNotNull("verify that batch send returned ok", sendResult); assertTrue("no request failed",sendResult.getFailed().isEmpty()); assertEquals("verify successfull message count", 3, sendResult.getSuccessful().size()); SendMessageBatchResultEntry firstResultEntry = sendResult.getSuccessful().stream().filter(msg -> msg.getId().equals("one")).findAny().get(); assertEquals("verify correct message MD5",getAwsMessageMD5("{\"XOXO\":234}"),firstResultEntry.getMD5OfMessageBody()); assertNotNull("verify message id exists",firstResultEntry.getMessageId()); ReceiveMessageResult receivedMessagesResult = sqs.receiveMessage(new ReceiveMessageRequest().withQueueUrl(createdQueue.getQueueUrl()).withMaxNumberOfMessages(4)); // delete batch List<DeleteMessageBatchRequestEntry> deleteRequests = new ArrayList<>(); deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("one").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle())); deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("two").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle())); deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("three").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle())); DeleteMessageBatchResult deleteBatchResult = sqs.deleteMessageBatch(new DeleteMessageBatchRequest().withQueueUrl(createdQueue.getQueueUrl()).withEntries(deleteRequests)); // verify delete batch result assertNotNull("verify that batch delete returned ok", deleteBatchResult); assertTrue("no request failed",deleteBatchResult.getFailed().isEmpty()); assertEquals("verify successfull message count", 3, deleteBatchResult.getSuccessful().size()); assertTrue("queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getMessageQueue().isEmpty()); for(Message message : receivedMessagesResult.getMessages()) { assertTrue("invisibility-queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getInvisibilityQueueFor(message.getReceiptHandle()).isEmpty()); } // cleanup getQueues().remove("tea-earl-grey-queue"); }