@Test public void instantiation_WithCustomJacksonConverterThatSupportsJava8Types_shouldConvertMessageToString() throws IOException { // Arrange AmazonSQSAsync amazonSqs = createAmazonSqs(); ObjectMapper objectMapper = Jackson2ObjectMapperBuilder.json().build(); MappingJackson2MessageConverter simpleMessageConverter = new MappingJackson2MessageConverter(); simpleMessageConverter.setSerializedPayloadClass(String.class); simpleMessageConverter.setObjectMapper(objectMapper); QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(amazonSqs, (ResourceIdResolver) null, simpleMessageConverter); // Act queueMessagingTemplate.convertAndSend("test", new TestPerson("Agim", "Emruli", LocalDate.of(2017, 1, 1))); // Assert ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); verify(amazonSqs).sendMessage(sendMessageRequestArgumentCaptor.capture()); TestPerson testPerson = objectMapper.readValue(sendMessageRequestArgumentCaptor.getValue().getMessageBody(), TestPerson.class); assertEquals("Agim", testPerson.getFirstName()); assertEquals("Emruli", testPerson.getLastName()); assertEquals(LocalDate.of(2017, 1, 1), testPerson.getActiveSince()); }
@Test public void doDestroy_whenContainerCallsDestroy_DestroysDefaultTaskExecutor() throws Exception { // Arrange SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(); AmazonSQSAsync sqs = mock(AmazonSQSAsync.class, withSettings().stubOnly()); simpleMessageListenerContainer.setAmazonSqs(sqs); QueueMessageHandler messageHandler = mock(QueueMessageHandler.class); simpleMessageListenerContainer.setMessageHandler(messageHandler); simpleMessageListenerContainer.afterPropertiesSet(); simpleMessageListenerContainer.start(); // Act simpleMessageListenerContainer.destroy(); // Assert assertTrue(((ThreadPoolTaskExecutor) simpleMessageListenerContainer.getTaskExecutor()).getThreadPoolExecutor().isTerminated()); }
@Test public void afterPropertiesSet_whenCalled_taskExecutorIsActive() throws Exception { // Arrange SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); AmazonSQSAsync sqs = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(sqs); QueueMessageHandler messageHandler = mock(QueueMessageHandler.class); container.setMessageHandler(messageHandler); // Act container.afterPropertiesSet(); // Assert assertFalse(((ThreadPoolTaskExecutor) container.getTaskExecutor()).getThreadPoolExecutor().isTerminated()); container.stop(); }
@Test public void testIsActive() throws Exception { AbstractMessageListenerContainer container = new StubAbstractMessageListenerContainer(); AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(mock); container.setMessageHandler(mock(QueueMessageHandler.class)); container.afterPropertiesSet(); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://testQueue.amazonaws.com")); container.start(); assertTrue(container.isRunning()); container.stop(); assertFalse(container.isRunning()); //Container can still be active an restarted later (e.g. paused for a while) assertTrue(container.isActive()); }
@Test public void sendMessage_validTextMessage_returnsTrue() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult()); Message<String> stringMessage = MessageBuilder.withPayload("message content").build(); MessageChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act boolean sent = messageChannel.send(stringMessage); // Assert verify(amazonSqs, only()).sendMessage(any(SendMessageRequest.class)); assertEquals("message content", sendMessageRequestArgumentCaptor.getValue().getMessageBody()); assertTrue(sent); }
@Test public void sendMessage_serviceThrowsError_throwsMessagingException() throws Exception { //Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); Message<String> stringMessage = MessageBuilder.withPayload("message content").build(); MessageChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); when(amazonSqs.sendMessage(new SendMessageRequest("http://testQueue", "message content").withDelaySeconds(0) .withMessageAttributes(isNotNull()))). thenThrow(new AmazonServiceException("wanted error")); //Assert this.expectedException.expect(MessagingException.class); this.expectedException.expectMessage("wanted error"); //Act messageChannel.send(stringMessage); }
@Test public void sendMessage_withMimeTypeAsStringHeader_shouldPassItAsMessageAttribute() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); QueueMessageChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); String mimeTypeAsString = new MimeType("test", "plain", Charset.forName("UTF-8")).toString(); Message<String> message = MessageBuilder.withPayload("Hello").setHeader(MessageHeaders.CONTENT_TYPE, mimeTypeAsString).build(); ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult()); // Act boolean sent = messageChannel.send(message); // Assert assertTrue(sent); assertEquals(mimeTypeAsString, sendMessageRequestArgumentCaptor.getValue().getMessageAttributes().get(MessageHeaders.CONTENT_TYPE).getStringValue()); }
@Test public void sendMessage_withMimeTypeHeader_shouldPassItAsMessageAttribute() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); QueueMessageChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); MimeType mimeType = new MimeType("test", "plain", Charset.forName("UTF-8")); Message<String> message = MessageBuilder.withPayload("Hello").setHeader(MessageHeaders.CONTENT_TYPE, mimeType).build(); ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult()); // Act boolean sent = messageChannel.send(message); // Assert assertTrue(sent); assertEquals(mimeType.toString(), sendMessageRequestArgumentCaptor.getValue().getMessageAttributes().get(MessageHeaders.CONTENT_TYPE).getStringValue()); }
@Test public void receiveMessage_withoutTimeout_returnsTextMessage() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages( Collections.singleton(new com.amazonaws.services.sqs.model.Message().withBody("content")))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); //Act Message<?> receivedMessage = messageChannel.receive(); //Assert assertNotNull(receivedMessage); assertEquals("content", receivedMessage.getPayload()); }
@Test public void receiveMessage_withSpecifiedTimeout_returnsTextMessage() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(2). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages( Collections.singleton(new com.amazonaws.services.sqs.model.Message().withBody("content")))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); //Act Message<?> receivedMessage = messageChannel.receive(2); //Assert assertNotNull(receivedMessage); assertEquals("content", receivedMessage.getPayload()); }
@Test public void receiveMessage_withSpecifiedTimeout_returnsNull() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(2). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages( Collections.emptyList())); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); //Act Message<?> receivedMessage = messageChannel.receive(2); //Assert assertNull(receivedMessage); }
@Test public void receiveMessage_withoutDefaultTimeout_returnsNull() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages( Collections.emptyList())); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); //Act Message<?> receivedMessage = messageChannel.receive(0); //Assert assertNull(receivedMessage); }
@Test public void receiveMessage_withMimeTypeMessageAttribute_shouldCopyToHeaders() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); MimeType mimeType = new MimeType("test", "plain", Charset.forName("UTF-8")); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue(mimeType.toString()))))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act Message<?> receivedMessage = messageChannel.receive(); // Assert assertEquals(mimeType, receivedMessage.getHeaders().get(MessageHeaders.CONTENT_TYPE)); }
@Test public void sendMessage_withStringMessageHeader_shouldBeSentAsQueueMessageAttribute() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); QueueMessageChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); String headerValue = "Header value"; String headerName = "MyHeader"; Message<String> message = MessageBuilder.withPayload("Hello").setHeader(headerName, headerValue).build(); ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult()); // Act boolean sent = messageChannel.send(message); // Assert assertTrue(sent); assertEquals(headerValue, sendMessageRequestArgumentCaptor.getValue().getMessageAttributes().get(headerName).getStringValue()); assertEquals(MessageAttributeDataTypes.STRING, sendMessageRequestArgumentCaptor.getValue().getMessageAttributes().get(headerName).getDataType()); }
@Test public void instantiation_withDefaultMapping2JacksonConverter_shouldSupportJava8Types() throws IOException { // Arrange AmazonSQSAsync amazonSqs = createAmazonSqs(); ObjectMapper objectMapper = Jackson2ObjectMapperBuilder.json().build(); QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(amazonSqs); // Act queueMessagingTemplate.convertAndSend("test", new TestPerson("Agim", "Emruli", LocalDate.of(2017, 1, 1))); // Assert ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); verify(amazonSqs).sendMessage(sendMessageRequestArgumentCaptor.capture()); TestPerson testPerson = objectMapper.readValue(sendMessageRequestArgumentCaptor.getValue().getMessageBody(), TestPerson.class); assertEquals("Agim", testPerson.getFirstName()); assertEquals("Emruli", testPerson.getLastName()); assertEquals(LocalDate.of(2017, 1, 1), testPerson.getActiveSince()); }
@Test public void receiveMessage_withIncompatibleNumericMessageHeader_shouldThrowAnException() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); this.expectedException.expect(IllegalArgumentException.class); this.expectedException.expectMessage("Cannot convert String [17] to target class [java.util.concurrent.atomic.AtomicInteger]"); HashMap<String, MessageAttributeValue> messageAttributes = new HashMap<>(); AtomicInteger atomicInteger = new AtomicInteger(17); messageAttributes.put("atomicInteger", new MessageAttributeValue().withDataType(MessageAttributeDataTypes.NUMBER + ".java.util.concurrent.atomic.AtomicInteger").withStringValue(String.valueOf(atomicInteger))); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(messageAttributes))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act messageChannel.receive(); }
@Test public void receiveMessage_withMissingNumericMessageHeaderTargetClass_shouldThrowAnException() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); this.expectedException.expect(MessagingException.class); this.expectedException.expectMessage("Message attribute with value '12' and data type 'Number.class.not.Found' could not be converted" + " into a Number because target class was not found."); HashMap<String, MessageAttributeValue> messageAttributes = new HashMap<>(); messageAttributes.put("classNotFound", new MessageAttributeValue().withDataType(MessageAttributeDataTypes.NUMBER + ".class.not.Found").withStringValue("12")); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(messageAttributes))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act messageChannel.receive(); }
@Test public void sendMessage_withBinaryMessageHeader_shouldBeSentAsBinaryMessageAttribute() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); QueueMessageChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); ByteBuffer headerValue = ByteBuffer.wrap("My binary data!".getBytes()); String headerName = "MyHeader"; Message<String> message = MessageBuilder.withPayload("Hello").setHeader(headerName, headerValue).build(); ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult()); // Act boolean sent = messageChannel.send(message); // Assert assertTrue(sent); assertEquals(headerValue, sendMessageRequestArgumentCaptor.getValue().getMessageAttributes().get(headerName).getBinaryValue()); assertEquals(MessageAttributeDataTypes.BINARY, sendMessageRequestArgumentCaptor.getValue().getMessageAttributes().get(headerName).getDataType()); }
@Test public void receiveMessage_withBinaryMessageHeader_shouldBeReceivedAsByteBufferMessageAttribute() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); ByteBuffer headerValue = ByteBuffer.wrap("My binary data!".getBytes()); String headerName = "MyHeader"; when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(Collections.singletonMap(headerName, new MessageAttributeValue().withDataType(MessageAttributeDataTypes.BINARY).withBinaryValue(headerValue))))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act Message<?> receivedMessage = messageChannel.receive(); // Assert assertEquals(headerValue, receivedMessage.getHeaders().get(headerName)); }
@Test public void sendMessage_withUuidAsId_shouldConvertUuidToString() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); QueueMessageChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); Message<String> message = MessageBuilder.withPayload("Hello").build(); UUID uuid = (UUID) message.getHeaders().get(MessageHeaders.ID); ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult()); // Act boolean sent = messageChannel.send(message); // Assert assertTrue(sent); assertEquals(uuid.toString(), sendMessageRequestArgumentCaptor.getValue().getMessageAttributes().get(MessageHeaders.ID).getStringValue()); }
@Test public void receiveMessage_withIdOfTypeString_IdShouldBeConvertedToUuid() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); UUID uuid = UUID.randomUUID(); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(Collections.singletonMap(MessageHeaders.ID, new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue(uuid.toString()))))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act Message<?> receivedMessage = messageChannel.receive(); // Assert Object idMessageHeader = receivedMessage.getHeaders().get(MessageHeaders.ID); assertTrue(UUID.class.isInstance(idMessageHeader)); assertEquals(uuid, idMessageHeader); }
@Test @SuppressWarnings("unchecked") public void sendMessage_withTimeout_sendsMessageAsyncAndReturnsTrueOnceFutureCompleted() throws Exception { // Arrange Future<SendMessageResult> future = mock(Future.class); when(future.get(1000, TimeUnit.MILLISECONDS)).thenReturn(new SendMessageResult()); AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); when(amazonSqs.sendMessageAsync(any(SendMessageRequest.class))).thenReturn(future); QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act boolean result = queueMessageChannel.send(MessageBuilder.withPayload("Hello").build(), 1000); // Assert assertTrue(result); verify(amazonSqs, only()).sendMessageAsync(any(SendMessageRequest.class)); }
@Test @SuppressWarnings("unchecked") public void sendMessage_withExecutionExceptionWhileSendingAsyncMessage_throwMessageDeliveryException() throws Exception { // Arrange Future<SendMessageResult> future = mock(Future.class); when(future.get(1000, TimeUnit.MILLISECONDS)).thenThrow(new ExecutionException(new Exception())); AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); when(amazonSqs.sendMessageAsync(any(SendMessageRequest.class))).thenReturn(future); QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Assert this.expectedException.expect(MessageDeliveryException.class); // Act queueMessageChannel.send(MessageBuilder.withPayload("Hello").build(), 1000); }
@Test public void sendMessage_withoutDelayHeader_shouldNotSetDelayOnSendMessageRequestAndNotSetHeaderAsMessageAttribute() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult()); QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); Message<String> message = MessageBuilder.withPayload("Hello").build(); // Act queueMessageChannel.send(message); // Assert SendMessageRequest sendMessageRequest = sendMessageRequestArgumentCaptor.getValue(); assertNull(sendMessageRequest.getDelaySeconds()); assertFalse(sendMessageRequest.getMessageAttributes().containsKey(SqsMessageHeaders.SQS_DELAY_HEADER)); }
@Test public void sendMessage_withGroupIdHeader_shouldSetGroupIdOnSendMessageRequestAndNotSetItAsHeaderAsMessageAttribute() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult()); QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); Message<String> message = MessageBuilder.withPayload("Hello").setHeader(SqsMessageHeaders.SQS_GROUP_ID_HEADER, "id-5").build(); // Act queueMessageChannel.send(message); // Assert SendMessageRequest sendMessageRequest = sendMessageRequestArgumentCaptor.getValue(); assertEquals("id-5", sendMessageRequest.getMessageGroupId()); assertFalse(sendMessageRequest.getMessageAttributes().containsKey(SqsMessageHeaders.SQS_GROUP_ID_HEADER)); }
@Test public void sendMessage_withDeduplicationIdHeader_shouldSetDeduplicationIdOnSendMessageRequestAndNotSetItAsHeaderAsMessageAttribute() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult()); QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); Message<String> message = MessageBuilder.withPayload("Hello").setHeader(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER, "id-5").build(); // Act queueMessageChannel.send(message); // Assert SendMessageRequest sendMessageRequest = sendMessageRequestArgumentCaptor.getValue(); assertEquals("id-5", sendMessageRequest.getMessageDeduplicationId()); assertFalse(sendMessageRequest.getMessageAttributes().containsKey(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER)); }
@Bean public AmazonSQSAsync getSQSClient() { return AmazonSQSAsyncClientBuilder.standard() .withCredentials(new ProfileCredentialsProvider(CREDENTIALS_PROVIDER)) .withRegion(Regions.AP_SOUTHEAST_2) .build(); }
SendMessageResult createQueueMessage(ImageCreateRequest request, Context context) { LambdaLogger logger = context.getLogger(); final String queueName = request.getQueueName(); final String sqsEndpoint = request.getSqsEndpoint(); if (queueName == null || sqsEndpoint == null) { logger.log("skip create queue. [" + request + "]"); return null; } AmazonSQSAsync client = createSQSClient(); client.setEndpoint(sqsEndpoint); request.setSendMessageTimeMillis(System.currentTimeMillis()); try { CreateQueueRequest req = new CreateQueueRequest(queueName); String queueUrl = client.createQueue(req).getQueueUrl(); SendMessageRequest sendMessage = new SendMessageRequest(); sendMessage.setQueueUrl(queueUrl); ObjectMapper om = new ObjectMapper(); sendMessage.setMessageBody(om.writeValueAsString(request)); Future<SendMessageResult> result = client.sendMessageAsync(sendMessage); while (!result.isDone()) { Thread.sleep(100); } return result.get(); } catch (Exception e) { throw new RuntimeException("unexpected error occured in the create queue request.", e); } finally { client.shutdown(); } }
void deleteQueueMessage(Message message, ImageStateCheckAndPargeRequest request, Context context) { AmazonSQSAsync client = createSQSClient(); client.setEndpoint(request.getSqsEndpoint()); try { String queueUrl = client.createQueue(request.getQueueName()).getQueueUrl(); client.deleteMessage(queueUrl, message.getReceiptHandle()); } catch (Exception e) { final String msg = "can not delete message. [" + request + "]"; throw new RuntimeException(msg, e); } finally { client.shutdown(); } }
SendMessageResult createQueueMessage(InstanceRequest instanceRequest, Context context) { LambdaLogger logger = context.getLogger(); final String queueName = instanceRequest.getQueueName(); final String sqsEndpoint = instanceRequest.getSqsEndpoint(); if (queueName == null || sqsEndpoint == null) { logger.log("skip create queue. instanceRequest[" + instanceRequest + "]"); return null; } AmazonSQSAsync client = createSQSClient(); client.setEndpoint(sqsEndpoint); try { CreateQueueRequest req = new CreateQueueRequest(queueName); String queueUrl = client.createQueue(req).getQueueUrl(); instanceRequest.setSendMessageTimeMillis(System.currentTimeMillis()); SendMessageRequest sendMessage = new SendMessageRequest(); sendMessage.setQueueUrl(queueUrl); ObjectMapper om = new ObjectMapper(); sendMessage.setMessageBody(om.writeValueAsString(instanceRequest)); Future<SendMessageResult> result = client.sendMessageAsync(sendMessage); while (!result.isDone()) { Thread.sleep(100); } return result.get(); } catch (Exception e) { throw new RuntimeException("unexpected error occured in the create queue request.", e); } finally { client.shutdown(); } }
protected void deleteQueueMessage(Message message, InstanceCheckStateRequest checkInstanceStateRequest, Context context) { AmazonSQSAsync client = createSQSClient(); client.setEndpoint(checkInstanceStateRequest.getSqsEndpoint()); try { String queueUrl = client.createQueue(checkInstanceStateRequest.getQueueName()).getQueueUrl(); client.deleteMessage(queueUrl, message.getReceiptHandle()); } catch (Exception e) { final String msg = "can not delete message. checkInstanceStateRequest[" + checkInstanceStateRequest + "]"; throw new RuntimeException(msg, e); } finally { client.shutdown(); } }
public void checkInstanceState(InstanceCheckStateRequest checkInstanceStateRequest, Context context) { LambdaLogger logger = context.getLogger(); final String queueName = checkInstanceStateRequest.getQueueName(); final String sqsEndpoint = checkInstanceStateRequest.getSqsEndpoint(); // illegal parameter if (queueName == null || sqsEndpoint == null) { logger.log("[ERROR][checkInstanceStatus][stopped]QueueName or SQSEndpoint is not found Parameter. "); throw new IllegalArgumentException("QueueName or SQSEndpoint is not found Parameter. " + "CheckInstanceStateRequest[" + checkInstanceStateRequest + "]"); } // Only the specified number, reliably acquired int numberOfMessages = checkInstanceStateRequest.getNumberOfMessages(); for (int i = 0; i < numberOfMessages; i++) { AmazonSQSAsync client = createSQSClient(); client.setEndpoint(sqsEndpoint); try { String queueUrl = client.createQueue(queueName).getQueueUrl(); ReceiveMessageRequest req = new ReceiveMessageRequest(queueUrl).withVisibilityTimeout(5) .withMaxNumberOfMessages(checkInstanceStateRequest.getNumberOfMessages()); Future<ReceiveMessageResult> result = client.receiveMessageAsync(req); while (!result.isDone()) { Thread.sleep(100); } result.get().getMessages().stream() .forEach(s -> checkInstanceState(s, "stopped", checkInstanceStateRequest, context)); } catch (Exception e) { logger.log("[ERROR][checkInstanceStatus][stopped]message[" + e.getMessage() + "] stackTrace[" + getStackTrace(e) + "] CheckInstanceStateRequest[" + checkInstanceStateRequest + "]"); } finally { client.shutdown(); } } }
public void checkInstanceState(InstanceCheckStateRequest checkInstanceStateRequest, Context context) { LambdaLogger logger = context.getLogger(); final String queueName = checkInstanceStateRequest.getQueueName(); final String sqsEndpoint = checkInstanceStateRequest.getSqsEndpoint(); // illegal parameter if (queueName == null || sqsEndpoint == null) { logger.log("[ERROR][checkInstanceStatus][running]QueueName or SQSEndpoint is not found Parameter. "); throw new IllegalArgumentException("QueueName or SQSEndpoint is not found Parameter. " + "CheckInstanceStateRequest[" + checkInstanceStateRequest + "]"); } // Only the specified number, reliably acquired int numberOfMessages = checkInstanceStateRequest.getNumberOfMessages(); for (int i = 0; i < numberOfMessages; i++) { AmazonSQSAsync client = createSQSClient(); client.setEndpoint(sqsEndpoint); try { String queueUrl = client.createQueue(queueName).getQueueUrl(); ReceiveMessageRequest req = new ReceiveMessageRequest(queueUrl).withVisibilityTimeout(5) .withMaxNumberOfMessages(checkInstanceStateRequest.getNumberOfMessages()); Future<ReceiveMessageResult> result = client.receiveMessageAsync(req); while (!result.isDone()) { Thread.sleep(100); } result.get().getMessages().stream() .forEach(s -> checkInstanceState(s, "running", checkInstanceStateRequest, context)); } catch (Exception e) { logger.log("[ERROR][checkInstanceStatus][running]message[" + e.getMessage() + "] stackTrace[" + getStackTrace(e) + "] CheckInstanceStateRequest[" + checkInstanceStateRequest + "]"); } finally { client.shutdown(); } } }
@Memoized AmazonSQSAsync get() { AmazonSQSAsync result = AmazonSQSAsyncClientBuilder.standard() .withCredentials(credentialsProvider()) .withEndpointConfiguration(endpointConfiguration()).build(); provisioned = true; return result; }
private AmazonSQSAsync buildAsyncClient() { final AmazonSQSAsyncClientBuilder builder = AmazonSQSAsyncClientBuilder.standard(); builder.setRegion(conf.region.getLabel()); builder.setCredentials(credentials); builder.setClientConfiguration(clientConfiguration); return builder.build(); }
public SqsConsumerWorkerCallable( AmazonSQSAsync sqsAsync, PushSource.Context context, Map<String, String> queueUrlToNamePrefix, int numMessagesPerRequest, long maxBatchWaitTimeMs, int maxBatchSize, DataParserFactory parserFactory, String awsRegionLabel, SqsAttributesOption sqsAttributesOption, ErrorRecordHandler errorRecordHandler, int pollWaitTimeSeconds, Collection<String> messageAttributeNames ) { Utils.checkState(!queueUrlToNamePrefix.isEmpty(), "queueUrlToNamePrefix must be non-empty"); this.sqsAsync = sqsAsync; this.context = context; this.queueUrlToNamePrefix = new HashMap<>(); if (queueUrlToNamePrefix != null) { this.queueUrlToNamePrefix.putAll(queueUrlToNamePrefix); } this.numMessagesPerRequest = numMessagesPerRequest; this.maxBatchWaitTimeMs = maxBatchWaitTimeMs; this.maxBatchSize = maxBatchSize; this.parserFactory = parserFactory; this.awsRegionLabel = awsRegionLabel; this.sqsAttributesOption = sqsAttributesOption; this.errorRecordHandler = errorRecordHandler; this.pollWaitTimeSeconds = pollWaitTimeSeconds; this.messageAttributeNames = new HashSet<>(); if (messageAttributeNames != null) { this.messageAttributeNames.addAll(messageAttributeNames); } }
@Bean public QueueMessagingTemplate defaultQueueMessagingTemplate(AmazonSQSAsync amazonSqs, ResourceIdResolver resourceIdResolver) { QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(amazonSqs, resourceIdResolver); queueMessagingTemplate.setDefaultDestinationName("JsonQueue"); return queueMessagingTemplate; }
@Bean public QueueMessagingTemplate queueMessagingTemplateWithCustomConverter(AmazonSQSAsync amazonSqs, ResourceIdResolver resourceIdResolver) { QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(amazonSqs, resourceIdResolver); queueMessagingTemplate.setDefaultDestinationName("StreamQueue"); queueMessagingTemplate.setMessageConverter(new ObjectMessageConverter()); return queueMessagingTemplate; }