@Test public void testPurgeQueue_shouldRemoveAll() { // create queue CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue")); // send messages String messageBody = "{\"life-universe-everything\":42}"; sqs.sendMessage(new SendMessageRequest().withDelaySeconds(0).withMessageBody(messageBody) .withMessageGroupId("some-group-id-123").withQueueUrl(createdQueue.getQueueUrl())); String messageBody2 = "{\"dead-emptyness-nothing\":24}"; sqs.sendMessage(new SendMessageRequest().withDelaySeconds(0).withMessageBody(messageBody2) .withMessageGroupId("some-group-id-123").withQueueUrl(createdQueue.getQueueUrl())); // purge queues PurgeQueueResult result = sqs.purgeQueue(new PurgeQueueRequest().withQueueUrl(createdQueue.getQueueUrl())); assertNotNull("verify that purge queue returned ok", result); // verify empty queue ReceiveMessageResult messageResult = sqs.receiveMessage(new ReceiveMessageRequest() .withMaxNumberOfMessages(9).withQueueUrl(createdQueue.getQueueUrl()).withVisibilityTimeout(10) .withWaitTimeSeconds(0)); assertEquals("verify that queue is empty", 0, messageResult.getMessages().size()); // cleanup getQueues().remove("tea-earl-grey-queue"); }
public List<QueueMessage> getMessages() { ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(_queueDns); List<Message> messages = _sqs.receiveMessage(receiveMessageRequest).getMessages(); List<QueueMessage> deserializedMessages = new ArrayList<>(); for (Message message : messages) { String body = message.getBody(); QueueMessage qm = _gson.fromJson(body, QueueMessage.class); deserializedMessages.add(qm); System.out.println("query time: " + qm.queryExecutionTime); System.out.println("exec time: " + qm.totalExecutionTime); System.out.println("Has ex: " + qm.hasException); System.out.println("ex message: " + qm.exceptionMessage + "\n"); String receiptHandle = message.getReceiptHandle(); _sqs.deleteMessage(new DeleteMessageRequest(_queueDns, receiptHandle)); } return deserializedMessages; }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsOneMessage() { final AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); final String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .take(1) // .test() // .awaitDone(10, TimeUnit.SECONDS) // .assertComplete() // .assertValue("body1"); final InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(1)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsOneMessageAndHonoursBackpressure() { final AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); final String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .test(0) // .requestMore(1) // .assertValue("body1")// .assertNotComplete() // .cancel(); final InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(1)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsNoMessagesThenSecondCallReturnsTwoMessages() { final AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); final String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())).thenReturn(new ReceiveMessageResult()) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"), new Message().withBody("body2"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .take(2) // .test() // .awaitDone(10, TimeUnit.SECONDS) // .assertComplete() // .assertValues("body1", "body2"); final InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(2)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Override public List<ScanRangeTask> claimScanRangeTasks(int max, Duration ttl) { if (max == 0) { return ImmutableList.of(); } List<Message> messages = _sqs.receiveMessage(new ReceiveMessageRequest() .withQueueUrl(getQueueUrl(_pendingScanRangeQueue)) .withMaxNumberOfMessages(Math.min(max, 10)) // SQS cannot claim more than 10 messages .withVisibilityTimeout(toSeconds(ttl)) ).getMessages(); return FluentIterable.from(messages) .transform(new Function<Message, ScanRangeTask>() { @Override public ScanRangeTask apply(Message message) { QueueScanRangeTask task = JsonHelper.fromJson(message.getBody(), QueueScanRangeTask.class); task.setMessageId(message.getReceiptHandle()); return task; } }) .toList(); }
@Override public List<ScanRangeComplete> claimCompleteScanRanges(Duration ttl) { List<Message> messages = _sqs.receiveMessage(new ReceiveMessageRequest() .withQueueUrl(getQueueUrl(_completeScanRangeQueue)) .withMaxNumberOfMessages(10) .withVisibilityTimeout(toSeconds(ttl)) ).getMessages(); return FluentIterable.from(messages) .transform(new Function<Message, ScanRangeComplete>() { @Override public ScanRangeComplete apply(Message message) { QueueScanRangeComplete completion = JsonHelper.fromJson(message.getBody(), QueueScanRangeComplete.class); completion.setMessageId(message.getReceiptHandle()); return completion; } }) .toList(); }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsOneMessage() { AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .take(1) // .to(test()) // .awaitTerminalEvent() // .assertCompleted() // .assertValue("body1"); InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(1)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsOneMessageAndHonoursBackpressure() { AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .to(testWithRequest(0)) // .requestMore(1) // .assertValue("body1")// .assertNotCompleted() // .unsubscribe(); InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(1)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsNoMessagesThenSecondCallReturnsTwoMessages() { AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())).thenReturn(new ReceiveMessageResult()) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"), new Message().withBody("body2"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .take(2) // .to(test()) // .awaitTerminalEvent() // .assertCompleted() // .assertValues("body1", "body2"); InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(2)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Override public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageRequest) throws AmazonServiceException, AmazonClientException { Integer maxNumberOfMessages = receiveMessageRequest.getMaxNumberOfMessages() != null ? receiveMessageRequest.getMaxNumberOfMessages() : Integer.MAX_VALUE; ReceiveMessageResult result = new ReceiveMessageResult(); Collection<Message> resultMessages = new ArrayList<Message>(); synchronized (messages) { int fetchSize = 0; for (Iterator<Message> iterator = messages.iterator(); iterator.hasNext() && fetchSize < maxNumberOfMessages; fetchSize++) { Message rc = iterator.next(); resultMessages.add(rc); iterator.remove(); scheduleCancelInflight(receiveMessageRequest.getQueueUrl(), rc); } } result.setMessages(resultMessages); return result; }
public List<JobStatusNotification> pollMessageFromQueueByJobId(String queueUrl, String jobId) { ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest() .withQueueUrl(queueUrl) .withMaxNumberOfMessages(MAX_NUMBER_OF_MESSAGES) .withVisibilityTimeout(VISIBILITY_TIMEOUT) .withWaitTimeSeconds(WAIT_TIME_SECONDS); List<JobStatusNotification> jobStatusNotifications = new ArrayList<>(); for (Message message : sqsClient.receiveMessage(receiveMessageRequest).getMessages()) { try { JobStatusNotification jobStatusNotification = parseMessage(message.getBody()); if (jobStatusNotification.getJobId().equalsIgnoreCase(jobId)) { jobStatusNotifications.add(jobStatusNotification); sqsClient.deleteMessage(new DeleteMessageRequest().withQueueUrl(queueUrl).withReceiptHandle(message.getReceiptHandle())); } } catch (IOException e) { logger.error(e.getMessage(), e); } } return jobStatusNotifications; }
@SuppressWarnings("unchecked") public T peekMessage(int waitFor) throws Exception { // Receive messages logger.info("Trying to recieve message from: " + _queueName); ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(_queueURL); receiveMessageRequest.setMaxNumberOfMessages(1); receiveMessageRequest.setWaitTimeSeconds(waitFor); List<Message> messages = _sqs.receiveMessage(receiveMessageRequest).getMessages(); for (Message message : messages) { logger.info(" Got Message"); logger.info(" Body: " + message.getBody()); logger.info(" Handle: " + message.getReceiptHandle()); _lastMessage = message; GenericMessage msg = GenericMessage.fromXML(message.getBody()); if (!msg.type.equals(_msgClass.getName())) throw new Exception("Invalid message type recieved."); return (T) msg.body; } return null; }
@Test public void messageShouldBeProcessedAfterBeingConsumed() throws Exception { //GIVEN ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(); Message message1 = new Message() .withMessageId("aaaa-bbbb-cccc-dddd-eeee") .withBody("Sample test message"); Message message2 = new Message() .withMessageId("ffff-gggg-hhhh-iiii-jjjj") .withBody("Another sample test message"); receiveMessageResult.setMessages(Lists.newArrayList(message1, message2)); when(sqs.receiveMessage((ReceiveMessageRequest) anyObject())).thenReturn(receiveMessageResult, new ReceiveMessageResult()); //WHEN receiverHandler.start(); //THEN Thread.sleep(1000); verify(receiver, times(2)).receive(any()); verify(receiver, times(1)).receive(message1); verify(receiver, times(1)).receive(message2); }
@Test public void sendAndReceiveMessage() { final String queueName = "bizo"; final String messageBody = "hi everybody"; final CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName); sqs.createQueue(createQueueRequest); final GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest().withQueueName(queueName); final GetQueueUrlResult getQueueUrlResult = sqs.getQueueUrl(getQueueUrlRequest); final String queueUrl = getQueueUrlResult.getQueueUrl(); final SendMessageRequest sendMessageRequest = new SendMessageRequest().withQueueUrl(queueUrl).withMessageBody(messageBody); sqs.sendMessage(sendMessageRequest); final int maxNumberOfMessages = 10; final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(maxNumberOfMessages); final ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(receiveMessageRequest); final List<Message> messages = receiveMessageResult.getMessages(); assertThat(messages.size(), equalTo(1)); assertThat(messages.get(0).getBody(), equalTo(messageBody)); }
public ReceiveMessageRequest getReceiveMessageRequest() { ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(this.destinationUrl). withAttributeNames(RECEIVING_ATTRIBUTES). withMessageAttributeNames(RECEIVING_MESSAGE_ATTRIBUTES); if (this.maxNumberOfMessages != null) { receiveMessageRequest.withMaxNumberOfMessages(this.maxNumberOfMessages); } else { receiveMessageRequest.withMaxNumberOfMessages(DEFAULT_MAX_NUMBER_OF_MESSAGES); } if (this.visibilityTimeout != null) { receiveMessageRequest.withVisibilityTimeout(this.visibilityTimeout); } if (this.waitTimeOut != null) { receiveMessageRequest.setWaitTimeSeconds(this.waitTimeOut); } return receiveMessageRequest; }
@Override public Message<String> receive(long timeout) { ReceiveMessageResult receiveMessageResult = this.amazonSqs.receiveMessage( new ReceiveMessageRequest(this.queueUrl). withMaxNumberOfMessages(1). withWaitTimeSeconds(Long.valueOf(timeout).intValue()). withAttributeNames(ATTRIBUTE_NAMES). withMessageAttributeNames(MESSAGE_ATTRIBUTE_NAMES)); if (receiveMessageResult.getMessages().isEmpty()) { return null; } com.amazonaws.services.sqs.model.Message amazonMessage = receiveMessageResult.getMessages().get(0); Message<String> message = createMessage(amazonMessage); this.amazonSqs.deleteMessage(new DeleteMessageRequest(this.queueUrl, amazonMessage.getReceiptHandle())); return message; }
@Test public void receiveMessage_withoutTimeout_returnsTextMessage() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages( Collections.singleton(new com.amazonaws.services.sqs.model.Message().withBody("content")))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); //Act Message<?> receivedMessage = messageChannel.receive(); //Assert assertNotNull(receivedMessage); assertEquals("content", receivedMessage.getPayload()); }
@Test public void receiveMessage_withSpecifiedTimeout_returnsTextMessage() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(2). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages( Collections.singleton(new com.amazonaws.services.sqs.model.Message().withBody("content")))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); //Act Message<?> receivedMessage = messageChannel.receive(2); //Assert assertNotNull(receivedMessage); assertEquals("content", receivedMessage.getPayload()); }
@Test public void receiveMessage_withSpecifiedTimeout_returnsNull() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(2). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages( Collections.emptyList())); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); //Act Message<?> receivedMessage = messageChannel.receive(2); //Assert assertNull(receivedMessage); }
@Test public void receiveMessage_withoutDefaultTimeout_returnsNull() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages( Collections.emptyList())); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); //Act Message<?> receivedMessage = messageChannel.receive(0); //Assert assertNull(receivedMessage); }
@Test public void receiveMessage_withMimeTypeMessageAttribute_shouldCopyToHeaders() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); MimeType mimeType = new MimeType("test", "plain", Charset.forName("UTF-8")); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue(mimeType.toString()))))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act Message<?> receivedMessage = messageChannel.receive(); // Assert assertEquals(mimeType, receivedMessage.getHeaders().get(MessageHeaders.CONTENT_TYPE)); }
@Test public void receiveMessage_withStringMessageHeader_shouldBeReceivedAsQueueMessageAttribute() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); String headerValue = "Header value"; String headerName = "MyHeader"; when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(Collections.singletonMap(headerName, new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue(headerValue))))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act Message<?> receivedMessage = messageChannel.receive(); // Assert assertEquals(headerValue, receivedMessage.getHeaders().get(headerName)); }
@Test public void receiveMessage_withIncompatibleNumericMessageHeader_shouldThrowAnException() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); this.expectedException.expect(IllegalArgumentException.class); this.expectedException.expectMessage("Cannot convert String [17] to target class [java.util.concurrent.atomic.AtomicInteger]"); HashMap<String, MessageAttributeValue> messageAttributes = new HashMap<>(); AtomicInteger atomicInteger = new AtomicInteger(17); messageAttributes.put("atomicInteger", new MessageAttributeValue().withDataType(MessageAttributeDataTypes.NUMBER + ".java.util.concurrent.atomic.AtomicInteger").withStringValue(String.valueOf(atomicInteger))); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(messageAttributes))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act messageChannel.receive(); }
@Test public void receiveMessage_withMissingNumericMessageHeaderTargetClass_shouldThrowAnException() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); this.expectedException.expect(MessagingException.class); this.expectedException.expectMessage("Message attribute with value '12' and data type 'Number.class.not.Found' could not be converted" + " into a Number because target class was not found."); HashMap<String, MessageAttributeValue> messageAttributes = new HashMap<>(); messageAttributes.put("classNotFound", new MessageAttributeValue().withDataType(MessageAttributeDataTypes.NUMBER + ".class.not.Found").withStringValue("12")); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(messageAttributes))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act messageChannel.receive(); }
@Test public void receiveMessage_withBinaryMessageHeader_shouldBeReceivedAsByteBufferMessageAttribute() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); ByteBuffer headerValue = ByteBuffer.wrap("My binary data!".getBytes()); String headerName = "MyHeader"; when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(Collections.singletonMap(headerName, new MessageAttributeValue().withDataType(MessageAttributeDataTypes.BINARY).withBinaryValue(headerValue))))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act Message<?> receivedMessage = messageChannel.receive(); // Assert assertEquals(headerValue, receivedMessage.getHeaders().get(headerName)); }
@Test public void receiveMessage_withIdOfTypeString_IdShouldBeConvertedToUuid() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); UUID uuid = UUID.randomUUID(); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(Collections.singletonMap(MessageHeaders.ID, new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue(uuid.toString()))))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act Message<?> receivedMessage = messageChannel.receive(); // Assert Object idMessageHeader = receivedMessage.getHeaders().get(MessageHeaders.ID); assertTrue(UUID.class.isInstance(idMessageHeader)); assertEquals(uuid, idMessageHeader); }
public static String popFrom(String name) { try { String queueUrl = getConnection().createQueue( new CreateQueueRequest(name)).getQueueUrl(); ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest( queueUrl); receiveMessageRequest.setMaxNumberOfMessages(1); if (null != receiveMessageRequest) { List<Message> messages = getConnection().receiveMessage( receiveMessageRequest).getMessages(); if (messages.size() > 0) { String messageRecieptHandle = messages.get(0) .getReceiptHandle(); getConnection().deleteMessage( new DeleteMessageRequest(receiveMessageRequest .getQueueUrl(), messageRecieptHandle)); return messages.get(0).getBody(); } } } catch (Exception e) { e.printStackTrace(); } return null; }
/** * Test Get Messages throws error */ @Test public void testGetMessagesError() throws InterruptedException, JMSException { int retriesAttempted = 3; int prefetchBatchSize = 5; consumerPrefetch.retriesAttempted = retriesAttempted; when(amazonSQSClient.receiveMessage(any(ReceiveMessageRequest.class))) .thenThrow(new Error()); try { consumerPrefetch.getMessages(prefetchBatchSize); } catch (Error e) { // Expected error exception } }
@Test public void pollAndDeleteMessageShouldWork() throws Exception { ReceiveMessageResult receiveMessageResult = mock(ReceiveMessageResult.class); Message message = mock(Message.class); when(message.getBody()).thenReturn("{}"); when(receiveMessageResult.getMessages()).thenReturn(Arrays.asList(message)); when(mockAmazonSQS.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult); List<PolledMessage<TestMessage>> receivedMessages1 = queueServicePoller.poll(); assertThat(receivedMessages1).hasSize(1); when(mockAmazonSQS.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(mock(ReceiveMessageResult.class)); queueServicePoller.delete(receivedMessages1.get(0)); List<PolledMessage<TestMessage>> receivedMessages2 = queueServicePoller.poll(); assertThat(receivedMessages2).isEmpty(); }
@Test public void deleteBatchMessagesShouldWork() throws Exception { ReceiveMessageResult receiveMessageResult = mock(ReceiveMessageResult.class); Message message = mock(Message.class); when(message.getBody()).thenReturn("{}"); when(receiveMessageResult.getMessages()).thenReturn(Arrays.asList(message, message)); when(mockAmazonSQS.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult); List<PolledMessage<TestMessage>> receivedMessages1 = queueServicePoller.poll(); assertEquals(2, receivedMessages1.size()); when(mockAmazonSQS.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(mock(ReceiveMessageResult.class)); queueServicePoller.delete(receivedMessages1.get(0)); queueServicePoller.delete(receivedMessages1.get(1)); List<PolledMessage<TestMessage>> receivedMessages2 = queueServicePoller.poll(); assertEquals(0, receivedMessages2.size()); }
@Override public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageRequest) throws AmazonClientException { DirectorySQSQueue queue = getQueueFromUrl(receiveMessageRequest.getQueueUrl(), false); //make sure we have a default for max number of messages. int maxNumberOfMessages = Objects.firstNonNull(receiveMessageRequest.getMaxNumberOfMessages(), 10); //10 is amazon spec default //and a default visibility timeout int visibilityTimeout = Objects.firstNonNull(receiveMessageRequest.getVisibilityTimeout(), _defaultVisibilitySeconds); //also a wait time int waitTime = Objects.firstNonNull(receiveMessageRequest.getWaitTimeSeconds(), 0); if (waitTime < 0 || waitTime > 20) { throw new AmazonServiceException("wait time of " + waitTime + " is not between 0 and 20"); } try { List<Message> messageList = queue.receive(maxNumberOfMessages, visibilityTimeout, waitTime); return new ReceiveMessageResult().withMessages(messageList); } catch (IOException e) { throw new AmazonServiceException("error reading messages from " + queue.getQueuePath().toUri().toString(), e); } }
public void client2CanReceiveTwiceAfterInitialEmpty() { final String queueUrl = someNewQueue(); final ReceiveMessageResult result1 = _sqs2.receiveMessage(new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(1).withMaxNumberOfMessages(1)); Assert.assertEquals(result1.getMessages().size(), 0); final SendMessageResult sendResult1 = _sqs1.sendMessage(new SendMessageRequest(queueUrl, someMessageBody())); final ReceiveMessageResult result2 = _sqs2.receiveMessage(new ReceiveMessageRequest(queueUrl). withWaitTimeSeconds(10). withMaxNumberOfMessages(1). withVisibilityTimeout(60)); Assert.assertEquals(result2.getMessages().size(), 1, "first receive failed"); final SendMessageResult sendResult2 = _sqs1.sendMessage(new SendMessageRequest(queueUrl, someMessageBody())); final ReceiveMessageResult result3 = _sqs2.receiveMessage(new ReceiveMessageRequest(queueUrl). withWaitTimeSeconds(20). withMaxNumberOfMessages(1)); Assert.assertEquals(result3.getMessages().size(), 1, "second receive failed"); }
public void client1GetsFromBoth() { final String queueUrl = someNewQueue(); final SendMessageResult sendResult1 = _sqs1.sendMessage(new SendMessageRequest(queueUrl, someMessageBody())); final SendMessageResult sendResult2 = _sqs2.sendMessage(new SendMessageRequest(queueUrl, someMessageBody())); final ReceiveMessageResult receiveMessageResult1 = _sqs1.receiveMessage(new ReceiveMessageRequest(queueUrl). withMaxNumberOfMessages(1). withWaitTimeSeconds(20)); Assert.assertEquals(receiveMessageResult1.getMessages().size(), 1); final ReceiveMessageResult receiveMessageResult2 = _sqs1.receiveMessage(new ReceiveMessageRequest(queueUrl). withWaitTimeSeconds(20)); Assert.assertEquals(receiveMessageResult2.getMessages().size(), 1); }
public void publishAndReceiveSeparateSQSClients() { final String queueName = someQueueName(); final String queueUrl = someNewQueue(queueName); final String topicName = "publishAndReceiveSeparateSQSClients"; final String message = "hi from " + topicName; AmazonSNS amazonSNS = new InMemorySNS(_amazonSQS1, new Subscription(). withTopicArn(makeTopicArn(topicName)). withProtocol("sqs"). withSubscriptionArn(makeSomeSubArn(topicName)). withEndpoint(getQueueArn(queueName))); amazonSNS.publish(new PublishRequest(makeTopicArn(topicName), message)); ReceiveMessageResult result = _amazonSQS2.receiveMessage(new ReceiveMessageRequest(queueUrl). withWaitTimeSeconds(15)); Assert.assertEquals(result.getMessages().size(), 1); Assert.assertEquals(result.getMessages().get(0).getBody(), message); }
@Test public void shouldCreateSNSAndSQSPlusPolicyAsNeeded() throws MissingArgumentException, NotReadyException, FailedToCreateQueueException, InterruptedException { eventSource.init(); String existingSNSARN = eventSource.getSNSArn(); // reset the queue, sns and subscription (this forces policy recreation as well) String sub = eventSource.getARNofSQSSubscriptionToSNS(); if (sub!=null) { snsClient.unsubscribe(sub); } snsClient.deleteTopic(existingSNSARN); sqsClient.deleteQueue(eventSource.getQueueURL()); // now recreate the source and make sure we can send/receive SNSEventSource anotherEventSource = new SNSEventSource(snsClient, sqsClient); anotherEventSource.init(); // should be able to send via sns and then receive from sqs if everything worked ok snsClient.publish(anotherEventSource.getSNSArn(), "aMessage"); ReceiveMessageRequest request = new ReceiveMessageRequest(). withQueueUrl(anotherEventSource.getQueueURL()). withWaitTimeSeconds(10); ReceiveMessageResult result = sqsClient.receiveMessage(request); assertTrue(result.getMessages().size()>0); }
@Test public void oneQueue() { AmazonSQS amazonSQS = mock(AmazonSQS.class); // return one queue when(amazonSQS.listQueues(any(ListQueuesRequest.class))) .thenReturn(new ListQueuesResult().withQueueUrls("test-foo")); // return 3 messages from the queue when(amazonSQS.receiveMessage(any(ReceiveMessageRequest.class))) .thenReturn(new ReceiveMessageResult().withMessages(newMessage("foo"), newMessage("foo"), newMessage("foo"))); AmazonSQSPrioritizedMessageProvider provider = new AmazonSQSPrioritizedMessageProvider(amazonSQS, "test", 1, 60 * 1000); List<Message> messages = provider.next(); assertMessages(messages, 3, "foo"); verify(amazonSQS).listQueues(any(ListQueuesRequest.class)); verify(amazonSQS).receiveMessage(any(ReceiveMessageRequest.class)); }
@Test public void incorrectMD5Test() throws MessageMarshallerException { String payload = "Hello, World"; String messageBody = messageMarshaller.serialize(MessageBuilder .withPayload(payload).build()); com.amazonaws.services.sqs.model.Message sqsMessage = new com.amazonaws.services.sqs.model.Message(); sqsMessage.setBody(messageBody); sqsMessage.setMD5OfBody(messageBody); ReceiveMessageResult result = new ReceiveMessageResult(); result.setMessages(Collections.singletonList(sqsMessage)); when(mockSQS.receiveMessage(any(ReceiveMessageRequest.class))) .thenReturn(result); Message<?> recvMessage = executor.poll(); assertNull("No message since MD5 checksum failed", recvMessage); }
@Test public void correctMD5Test() throws Exception { String payload = "Hello, World"; String messageBody = messageMarshaller.serialize(MessageBuilder .withPayload(payload).build()); com.amazonaws.services.sqs.model.Message sqsMessage = new com.amazonaws.services.sqs.model.Message(); sqsMessage.setBody(messageBody); sqsMessage.setMD5OfBody(new String(Hex.encodeHex(Md5Utils .computeMD5Hash(messageBody.getBytes("UTF-8"))))); ReceiveMessageResult result = new ReceiveMessageResult(); result.setMessages(Collections.singletonList(sqsMessage)); when(mockSQS.receiveMessage(any(ReceiveMessageRequest.class))) .thenReturn(result); Message<?> recvMessage = executor.poll(); assertNotNull("message is not null", recvMessage); Message<?> enclosed = messageMarshaller .deserialize((String) recvMessage.getPayload()); String recvPayload = (String) enclosed.getPayload(); assertEquals("payload must match", payload, recvPayload); }
@Override public State call() { queueUrl = sqs.getQueueUrl(queueName).getQueueUrl(); request = new ReceiveMessageRequest(queueUrl) // .withWaitTimeSeconds(20) // .withMaxNumberOfMessages(10); return new State(new LinkedList<>()); }