@Test public void testPurgeQueue_shouldRemoveAll() { // create queue CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue")); // send messages String messageBody = "{\"life-universe-everything\":42}"; sqs.sendMessage(new SendMessageRequest().withDelaySeconds(0).withMessageBody(messageBody) .withMessageGroupId("some-group-id-123").withQueueUrl(createdQueue.getQueueUrl())); String messageBody2 = "{\"dead-emptyness-nothing\":24}"; sqs.sendMessage(new SendMessageRequest().withDelaySeconds(0).withMessageBody(messageBody2) .withMessageGroupId("some-group-id-123").withQueueUrl(createdQueue.getQueueUrl())); // purge queues PurgeQueueResult result = sqs.purgeQueue(new PurgeQueueRequest().withQueueUrl(createdQueue.getQueueUrl())); assertNotNull("verify that purge queue returned ok", result); // verify empty queue ReceiveMessageResult messageResult = sqs.receiveMessage(new ReceiveMessageRequest() .withMaxNumberOfMessages(9).withQueueUrl(createdQueue.getQueueUrl()).withVisibilityTimeout(10) .withWaitTimeSeconds(0)); assertEquals("verify that queue is empty", 0, messageResult.getMessages().size()); // cleanup getQueues().remove("tea-earl-grey-queue"); }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsOneMessage() { final AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); final String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .take(1) // .test() // .awaitDone(10, TimeUnit.SECONDS) // .assertComplete() // .assertValue("body1"); final InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(1)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsOneMessageAndHonoursBackpressure() { final AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); final String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .test(0) // .requestMore(1) // .assertValue("body1")// .assertNotComplete() // .cancel(); final InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(1)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsNoMessagesThenSecondCallReturnsTwoMessages() { final AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); final String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())).thenReturn(new ReceiveMessageResult()) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"), new Message().withBody("body2"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .take(2) // .test() // .awaitDone(10, TimeUnit.SECONDS) // .assertComplete() // .assertValues("body1", "body2"); final InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(2)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
public List<Span> getSpans(boolean delete) { Stream<Span> spans = Stream.empty(); ReceiveMessageResult result = client.receiveMessage(queueUrl); while(result != null && result.getMessages().size() > 0) { spans = Stream.concat(spans, result.getMessages().stream().flatMap(AmazonSQSRule::decodeSpans) ); result = client.receiveMessage(queueUrl); if (delete) { List<DeleteMessageRequest> deletes = result.getMessages().stream() .map(m -> new DeleteMessageRequest(queueUrl, m.getReceiptHandle())) .collect(Collectors.toList()); deletes.forEach(d -> client.deleteMessage(d)); } } return spans.collect(Collectors.toList()); }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsOneMessage() { AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .take(1) // .to(test()) // .awaitTerminalEvent() // .assertCompleted() // .assertValue("body1"); InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(1)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsOneMessageAndHonoursBackpressure() { AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .to(testWithRequest(0)) // .requestMore(1) // .assertValue("body1")// .assertNotCompleted() // .unsubscribe(); InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(1)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsNoMessagesThenSecondCallReturnsTwoMessages() { AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())).thenReturn(new ReceiveMessageResult()) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"), new Message().withBody("body2"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .take(2) // .to(test()) // .awaitTerminalEvent() // .assertCompleted() // .assertValues("body1", "body2"); InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(2)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Override public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageRequest) throws AmazonServiceException, AmazonClientException { Integer maxNumberOfMessages = receiveMessageRequest.getMaxNumberOfMessages() != null ? receiveMessageRequest.getMaxNumberOfMessages() : Integer.MAX_VALUE; ReceiveMessageResult result = new ReceiveMessageResult(); Collection<Message> resultMessages = new ArrayList<Message>(); synchronized (messages) { int fetchSize = 0; for (Iterator<Message> iterator = messages.iterator(); iterator.hasNext() && fetchSize < maxNumberOfMessages; fetchSize++) { Message rc = iterator.next(); resultMessages.add(rc); iterator.remove(); scheduleCancelInflight(receiveMessageRequest.getQueueUrl(), rc); } } result.setMessages(resultMessages); return result; }
@Test public void messageShouldBeProcessedAfterBeingConsumed() throws Exception { //GIVEN ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(); Message message1 = new Message() .withMessageId("aaaa-bbbb-cccc-dddd-eeee") .withBody("Sample test message"); Message message2 = new Message() .withMessageId("ffff-gggg-hhhh-iiii-jjjj") .withBody("Another sample test message"); receiveMessageResult.setMessages(Lists.newArrayList(message1, message2)); when(sqs.receiveMessage((ReceiveMessageRequest) anyObject())).thenReturn(receiveMessageResult, new ReceiveMessageResult()); //WHEN receiverHandler.start(); //THEN Thread.sleep(1000); verify(receiver, times(2)).receive(any()); verify(receiver, times(1)).receive(message1); verify(receiver, times(1)).receive(message2); }
@Test public void sendAndReceiveMessage() { final String queueName = "bizo"; final String messageBody = "hi everybody"; final CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName); sqs.createQueue(createQueueRequest); final GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest().withQueueName(queueName); final GetQueueUrlResult getQueueUrlResult = sqs.getQueueUrl(getQueueUrlRequest); final String queueUrl = getQueueUrlResult.getQueueUrl(); final SendMessageRequest sendMessageRequest = new SendMessageRequest().withQueueUrl(queueUrl).withMessageBody(messageBody); sqs.sendMessage(sendMessageRequest); final int maxNumberOfMessages = 10; final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(maxNumberOfMessages); final ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(receiveMessageRequest); final List<Message> messages = receiveMessageResult.getMessages(); assertThat(messages.size(), equalTo(1)); assertThat(messages.get(0).getBody(), equalTo(messageBody)); }
@Override public Message<String> receive(long timeout) { ReceiveMessageResult receiveMessageResult = this.amazonSqs.receiveMessage( new ReceiveMessageRequest(this.queueUrl). withMaxNumberOfMessages(1). withWaitTimeSeconds(Long.valueOf(timeout).intValue()). withAttributeNames(ATTRIBUTE_NAMES). withMessageAttributeNames(MESSAGE_ATTRIBUTE_NAMES)); if (receiveMessageResult.getMessages().isEmpty()) { return null; } com.amazonaws.services.sqs.model.Message amazonMessage = receiveMessageResult.getMessages().get(0); Message<String> message = createMessage(amazonMessage); this.amazonSqs.deleteMessage(new DeleteMessageRequest(this.queueUrl, amazonMessage.getReceiptHandle())); return message; }
@Test public void receiveMessage_withoutTimeout_returnsTextMessage() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages( Collections.singleton(new com.amazonaws.services.sqs.model.Message().withBody("content")))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); //Act Message<?> receivedMessage = messageChannel.receive(); //Assert assertNotNull(receivedMessage); assertEquals("content", receivedMessage.getPayload()); }
@Test public void receiveMessage_withSpecifiedTimeout_returnsTextMessage() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(2). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages( Collections.singleton(new com.amazonaws.services.sqs.model.Message().withBody("content")))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); //Act Message<?> receivedMessage = messageChannel.receive(2); //Assert assertNotNull(receivedMessage); assertEquals("content", receivedMessage.getPayload()); }
@Test public void receiveMessage_withSpecifiedTimeout_returnsNull() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(2). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages( Collections.emptyList())); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); //Act Message<?> receivedMessage = messageChannel.receive(2); //Assert assertNull(receivedMessage); }
@Test public void receiveMessage_withoutDefaultTimeout_returnsNull() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages( Collections.emptyList())); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); //Act Message<?> receivedMessage = messageChannel.receive(0); //Assert assertNull(receivedMessage); }
@Test public void receiveMessage_withMimeTypeMessageAttribute_shouldCopyToHeaders() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); MimeType mimeType = new MimeType("test", "plain", Charset.forName("UTF-8")); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue(mimeType.toString()))))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act Message<?> receivedMessage = messageChannel.receive(); // Assert assertEquals(mimeType, receivedMessage.getHeaders().get(MessageHeaders.CONTENT_TYPE)); }
@Test public void receiveMessage_withStringMessageHeader_shouldBeReceivedAsQueueMessageAttribute() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); String headerValue = "Header value"; String headerName = "MyHeader"; when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(Collections.singletonMap(headerName, new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue(headerValue))))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act Message<?> receivedMessage = messageChannel.receive(); // Assert assertEquals(headerValue, receivedMessage.getHeaders().get(headerName)); }
@Test public void receiveMessage_withIncompatibleNumericMessageHeader_shouldThrowAnException() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); this.expectedException.expect(IllegalArgumentException.class); this.expectedException.expectMessage("Cannot convert String [17] to target class [java.util.concurrent.atomic.AtomicInteger]"); HashMap<String, MessageAttributeValue> messageAttributes = new HashMap<>(); AtomicInteger atomicInteger = new AtomicInteger(17); messageAttributes.put("atomicInteger", new MessageAttributeValue().withDataType(MessageAttributeDataTypes.NUMBER + ".java.util.concurrent.atomic.AtomicInteger").withStringValue(String.valueOf(atomicInteger))); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(messageAttributes))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act messageChannel.receive(); }
@Test public void receiveMessage_withMissingNumericMessageHeaderTargetClass_shouldThrowAnException() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); this.expectedException.expect(MessagingException.class); this.expectedException.expectMessage("Message attribute with value '12' and data type 'Number.class.not.Found' could not be converted" + " into a Number because target class was not found."); HashMap<String, MessageAttributeValue> messageAttributes = new HashMap<>(); messageAttributes.put("classNotFound", new MessageAttributeValue().withDataType(MessageAttributeDataTypes.NUMBER + ".class.not.Found").withStringValue("12")); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(messageAttributes))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act messageChannel.receive(); }
@Test public void receiveMessage_withBinaryMessageHeader_shouldBeReceivedAsByteBufferMessageAttribute() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); ByteBuffer headerValue = ByteBuffer.wrap("My binary data!".getBytes()); String headerName = "MyHeader"; when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(Collections.singletonMap(headerName, new MessageAttributeValue().withDataType(MessageAttributeDataTypes.BINARY).withBinaryValue(headerValue))))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act Message<?> receivedMessage = messageChannel.receive(); // Assert assertEquals(headerValue, receivedMessage.getHeaders().get(headerName)); }
@Test public void receiveMessage_withIdOfTypeString_IdShouldBeConvertedToUuid() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); UUID uuid = UUID.randomUUID(); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(Collections.singletonMap(MessageHeaders.ID, new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue(uuid.toString()))))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act Message<?> receivedMessage = messageChannel.receive(); // Assert Object idMessageHeader = receivedMessage.getHeaders().get(MessageHeaders.ID); assertTrue(UUID.class.isInstance(idMessageHeader)); assertEquals(uuid, idMessageHeader); }
@Test public void pollAndDeleteMessageShouldWork() throws Exception { ReceiveMessageResult receiveMessageResult = mock(ReceiveMessageResult.class); Message message = mock(Message.class); when(message.getBody()).thenReturn("{}"); when(receiveMessageResult.getMessages()).thenReturn(Arrays.asList(message)); when(mockAmazonSQS.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult); List<PolledMessage<TestMessage>> receivedMessages1 = queueServicePoller.poll(); assertThat(receivedMessages1).hasSize(1); when(mockAmazonSQS.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(mock(ReceiveMessageResult.class)); queueServicePoller.delete(receivedMessages1.get(0)); List<PolledMessage<TestMessage>> receivedMessages2 = queueServicePoller.poll(); assertThat(receivedMessages2).isEmpty(); }
@Test public void deleteBatchMessagesShouldWork() throws Exception { ReceiveMessageResult receiveMessageResult = mock(ReceiveMessageResult.class); Message message = mock(Message.class); when(message.getBody()).thenReturn("{}"); when(receiveMessageResult.getMessages()).thenReturn(Arrays.asList(message, message)); when(mockAmazonSQS.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult); List<PolledMessage<TestMessage>> receivedMessages1 = queueServicePoller.poll(); assertEquals(2, receivedMessages1.size()); when(mockAmazonSQS.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(mock(ReceiveMessageResult.class)); queueServicePoller.delete(receivedMessages1.get(0)); queueServicePoller.delete(receivedMessages1.get(1)); List<PolledMessage<TestMessage>> receivedMessages2 = queueServicePoller.poll(); assertEquals(0, receivedMessages2.size()); }
@Override public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageRequest) throws AmazonClientException { DirectorySQSQueue queue = getQueueFromUrl(receiveMessageRequest.getQueueUrl(), false); //make sure we have a default for max number of messages. int maxNumberOfMessages = Objects.firstNonNull(receiveMessageRequest.getMaxNumberOfMessages(), 10); //10 is amazon spec default //and a default visibility timeout int visibilityTimeout = Objects.firstNonNull(receiveMessageRequest.getVisibilityTimeout(), _defaultVisibilitySeconds); //also a wait time int waitTime = Objects.firstNonNull(receiveMessageRequest.getWaitTimeSeconds(), 0); if (waitTime < 0 || waitTime > 20) { throw new AmazonServiceException("wait time of " + waitTime + " is not between 0 and 20"); } try { List<Message> messageList = queue.receive(maxNumberOfMessages, visibilityTimeout, waitTime); return new ReceiveMessageResult().withMessages(messageList); } catch (IOException e) { throw new AmazonServiceException("error reading messages from " + queue.getQueuePath().toUri().toString(), e); } }
public void client2CanReceiveTwiceAfterInitialEmpty() { final String queueUrl = someNewQueue(); final ReceiveMessageResult result1 = _sqs2.receiveMessage(new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(1).withMaxNumberOfMessages(1)); Assert.assertEquals(result1.getMessages().size(), 0); final SendMessageResult sendResult1 = _sqs1.sendMessage(new SendMessageRequest(queueUrl, someMessageBody())); final ReceiveMessageResult result2 = _sqs2.receiveMessage(new ReceiveMessageRequest(queueUrl). withWaitTimeSeconds(10). withMaxNumberOfMessages(1). withVisibilityTimeout(60)); Assert.assertEquals(result2.getMessages().size(), 1, "first receive failed"); final SendMessageResult sendResult2 = _sqs1.sendMessage(new SendMessageRequest(queueUrl, someMessageBody())); final ReceiveMessageResult result3 = _sqs2.receiveMessage(new ReceiveMessageRequest(queueUrl). withWaitTimeSeconds(20). withMaxNumberOfMessages(1)); Assert.assertEquals(result3.getMessages().size(), 1, "second receive failed"); }
public void client1GetsFromBoth() { final String queueUrl = someNewQueue(); final SendMessageResult sendResult1 = _sqs1.sendMessage(new SendMessageRequest(queueUrl, someMessageBody())); final SendMessageResult sendResult2 = _sqs2.sendMessage(new SendMessageRequest(queueUrl, someMessageBody())); final ReceiveMessageResult receiveMessageResult1 = _sqs1.receiveMessage(new ReceiveMessageRequest(queueUrl). withMaxNumberOfMessages(1). withWaitTimeSeconds(20)); Assert.assertEquals(receiveMessageResult1.getMessages().size(), 1); final ReceiveMessageResult receiveMessageResult2 = _sqs1.receiveMessage(new ReceiveMessageRequest(queueUrl). withWaitTimeSeconds(20)); Assert.assertEquals(receiveMessageResult2.getMessages().size(), 1); }
public void publishAndReceiveSeparateSQSClients() { final String queueName = someQueueName(); final String queueUrl = someNewQueue(queueName); final String topicName = "publishAndReceiveSeparateSQSClients"; final String message = "hi from " + topicName; AmazonSNS amazonSNS = new InMemorySNS(_amazonSQS1, new Subscription(). withTopicArn(makeTopicArn(topicName)). withProtocol("sqs"). withSubscriptionArn(makeSomeSubArn(topicName)). withEndpoint(getQueueArn(queueName))); amazonSNS.publish(new PublishRequest(makeTopicArn(topicName), message)); ReceiveMessageResult result = _amazonSQS2.receiveMessage(new ReceiveMessageRequest(queueUrl). withWaitTimeSeconds(15)); Assert.assertEquals(result.getMessages().size(), 1); Assert.assertEquals(result.getMessages().get(0).getBody(), message); }
@Test public void shouldCreateSNSAndSQSPlusPolicyAsNeeded() throws MissingArgumentException, NotReadyException, FailedToCreateQueueException, InterruptedException { eventSource.init(); String existingSNSARN = eventSource.getSNSArn(); // reset the queue, sns and subscription (this forces policy recreation as well) String sub = eventSource.getARNofSQSSubscriptionToSNS(); if (sub!=null) { snsClient.unsubscribe(sub); } snsClient.deleteTopic(existingSNSARN); sqsClient.deleteQueue(eventSource.getQueueURL()); // now recreate the source and make sure we can send/receive SNSEventSource anotherEventSource = new SNSEventSource(snsClient, sqsClient); anotherEventSource.init(); // should be able to send via sns and then receive from sqs if everything worked ok snsClient.publish(anotherEventSource.getSNSArn(), "aMessage"); ReceiveMessageRequest request = new ReceiveMessageRequest(). withQueueUrl(anotherEventSource.getQueueURL()). withWaitTimeSeconds(10); ReceiveMessageResult result = sqsClient.receiveMessage(request); assertTrue(result.getMessages().size()>0); }
@Test public void oneQueue() { AmazonSQS amazonSQS = mock(AmazonSQS.class); // return one queue when(amazonSQS.listQueues(any(ListQueuesRequest.class))) .thenReturn(new ListQueuesResult().withQueueUrls("test-foo")); // return 3 messages from the queue when(amazonSQS.receiveMessage(any(ReceiveMessageRequest.class))) .thenReturn(new ReceiveMessageResult().withMessages(newMessage("foo"), newMessage("foo"), newMessage("foo"))); AmazonSQSPrioritizedMessageProvider provider = new AmazonSQSPrioritizedMessageProvider(amazonSQS, "test", 1, 60 * 1000); List<Message> messages = provider.next(); assertMessages(messages, 3, "foo"); verify(amazonSQS).listQueues(any(ListQueuesRequest.class)); verify(amazonSQS).receiveMessage(any(ReceiveMessageRequest.class)); }
@Test public void incorrectMD5Test() throws MessageMarshallerException { String payload = "Hello, World"; String messageBody = messageMarshaller.serialize(MessageBuilder .withPayload(payload).build()); com.amazonaws.services.sqs.model.Message sqsMessage = new com.amazonaws.services.sqs.model.Message(); sqsMessage.setBody(messageBody); sqsMessage.setMD5OfBody(messageBody); ReceiveMessageResult result = new ReceiveMessageResult(); result.setMessages(Collections.singletonList(sqsMessage)); when(mockSQS.receiveMessage(any(ReceiveMessageRequest.class))) .thenReturn(result); Message<?> recvMessage = executor.poll(); assertNull("No message since MD5 checksum failed", recvMessage); }
@Test public void correctMD5Test() throws Exception { String payload = "Hello, World"; String messageBody = messageMarshaller.serialize(MessageBuilder .withPayload(payload).build()); com.amazonaws.services.sqs.model.Message sqsMessage = new com.amazonaws.services.sqs.model.Message(); sqsMessage.setBody(messageBody); sqsMessage.setMD5OfBody(new String(Hex.encodeHex(Md5Utils .computeMD5Hash(messageBody.getBytes("UTF-8"))))); ReceiveMessageResult result = new ReceiveMessageResult(); result.setMessages(Collections.singletonList(sqsMessage)); when(mockSQS.receiveMessage(any(ReceiveMessageRequest.class))) .thenReturn(result); Message<?> recvMessage = executor.poll(); assertNotNull("message is not null", recvMessage); Message<?> enclosed = messageMarshaller .deserialize((String) recvMessage.getPayload()); String recvPayload = (String) enclosed.getPayload(); assertEquals("payload must match", payload, recvPayload); }
@Override public void accept(State state, Emitter<SqsMessage> emitter) throws Exception { final Queue<Message> q = state.queue; Optional<SqsMessage> next = Optional.empty(); while (!next.isPresent()) { while (q.isEmpty()) { final ReceiveMessageResult result = sqs.receiveMessage(request); q.addAll(result.getMessages()); } final Message message = q.poll(); next = getNextMessage(message, queueUrl, bucketName, s3, sqs, service); } emitter.onNext(next.get()); }
public BufferedStringSqsQueueTest() { queue.setSendMessageTaskBuffer(sendMessageTaskBufferMock); queue.setDeleteMessageTaskBuffer(deleteMessageTaskBufferMock); queue.setChangeMessageVisibilityTaskBuffer(changeMessageVisibilityTaskBufferMock); when(requestSenderMock.sendRequest(any(GetQueueAttributesAction.class))).thenReturn(Single.just( new GetQueueAttributesResult().withAttributes(MutableSqsQueueAttributesTest.ATTRIBUTE_STRING_MAP) )); when(requestSenderMock.sendRequest(any(ReceiveMessagesAction.class))).thenReturn(Single.just( new ReceiveMessageResult().withMessages(SQS_MESSAGE) )); }
@VisibleForTesting List<Message> receiveMessages() { ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest() .withQueueUrl(queueURL) .withVisibilityTimeout(visibilityTimeout) .withMaxNumberOfMessages(batchSize); ReceiveMessageResult result = client.receiveMessage(receiveMessageRequest); return result.getMessages().stream().map(msg -> new Message(msg.getMessageId(), msg.getBody(), msg.getReceiptHandle())).collect(Collectors.toList()); }
public void checkInstanceState(InstanceCheckStateRequest checkInstanceStateRequest, Context context) { LambdaLogger logger = context.getLogger(); final String queueName = checkInstanceStateRequest.getQueueName(); final String sqsEndpoint = checkInstanceStateRequest.getSqsEndpoint(); // illegal parameter if (queueName == null || sqsEndpoint == null) { logger.log("[ERROR][checkInstanceStatus][stopped]QueueName or SQSEndpoint is not found Parameter. "); throw new IllegalArgumentException("QueueName or SQSEndpoint is not found Parameter. " + "CheckInstanceStateRequest[" + checkInstanceStateRequest + "]"); } // Only the specified number, reliably acquired int numberOfMessages = checkInstanceStateRequest.getNumberOfMessages(); for (int i = 0; i < numberOfMessages; i++) { AmazonSQSAsync client = createSQSClient(); client.setEndpoint(sqsEndpoint); try { String queueUrl = client.createQueue(queueName).getQueueUrl(); ReceiveMessageRequest req = new ReceiveMessageRequest(queueUrl).withVisibilityTimeout(5) .withMaxNumberOfMessages(checkInstanceStateRequest.getNumberOfMessages()); Future<ReceiveMessageResult> result = client.receiveMessageAsync(req); while (!result.isDone()) { Thread.sleep(100); } result.get().getMessages().stream() .forEach(s -> checkInstanceState(s, "stopped", checkInstanceStateRequest, context)); } catch (Exception e) { logger.log("[ERROR][checkInstanceStatus][stopped]message[" + e.getMessage() + "] stackTrace[" + getStackTrace(e) + "] CheckInstanceStateRequest[" + checkInstanceStateRequest + "]"); } finally { client.shutdown(); } } }
public void checkInstanceState(InstanceCheckStateRequest checkInstanceStateRequest, Context context) { LambdaLogger logger = context.getLogger(); final String queueName = checkInstanceStateRequest.getQueueName(); final String sqsEndpoint = checkInstanceStateRequest.getSqsEndpoint(); // illegal parameter if (queueName == null || sqsEndpoint == null) { logger.log("[ERROR][checkInstanceStatus][running]QueueName or SQSEndpoint is not found Parameter. "); throw new IllegalArgumentException("QueueName or SQSEndpoint is not found Parameter. " + "CheckInstanceStateRequest[" + checkInstanceStateRequest + "]"); } // Only the specified number, reliably acquired int numberOfMessages = checkInstanceStateRequest.getNumberOfMessages(); for (int i = 0; i < numberOfMessages; i++) { AmazonSQSAsync client = createSQSClient(); client.setEndpoint(sqsEndpoint); try { String queueUrl = client.createQueue(queueName).getQueueUrl(); ReceiveMessageRequest req = new ReceiveMessageRequest(queueUrl).withVisibilityTimeout(5) .withMaxNumberOfMessages(checkInstanceStateRequest.getNumberOfMessages()); Future<ReceiveMessageResult> result = client.receiveMessageAsync(req); while (!result.isDone()) { Thread.sleep(100); } result.get().getMessages().stream() .forEach(s -> checkInstanceState(s, "running", checkInstanceStateRequest, context)); } catch (Exception e) { logger.log("[ERROR][checkInstanceStatus][running]message[" + e.getMessage() + "] stackTrace[" + getStackTrace(e) + "] CheckInstanceStateRequest[" + checkInstanceStateRequest + "]"); } finally { client.shutdown(); } } }
@Override public void run() { String queueUrl = client.getQueueUrl(queue).getQueueUrl(); ReceiveMessageRequest receiveRequest = new ReceiveMessageRequest(queueUrl); receiveRequest.setMaxNumberOfMessages(10); while (true) { ReceiveMessageResult receiveResult= client.receiveMessage(receiveRequest); List<Message> messages = receiveResult.getMessages(); messages.forEach(System.out::println); } }
private Optional<Message> fetchEvent() { final ReceiveMessageResult receiveMessageResult = amazonSQS.receiveMessage(queueUrl); if (receiveMessageResult.getMessages() != null && !receiveMessageResult.getMessages().isEmpty()) { return Optional.of(receiveMessageResult.getMessages().get(0)); } else { return Optional.empty(); } }
@Test public void testShouldFailWhileDeserializationOfFailedEvent() { final ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(); receiveMessageResult.setMessages(Collections.singleton(new Message())); when(amazonSQS.receiveMessage(anyString())).thenReturn(receiveMessageResult); assertThatThrownBy(() -> sqsFailedEventSource.getFailedEvent()).isInstanceOf(IllegalStateException.class) .hasMessageContaining("Exception occurred during deserialization. Message id ="); }