private void checkQueueLength() { try { GetQueueAttributesResult result = sqsClient.getQueueAttributes(queueUrl, Arrays.asList(QUEUELENGTHATTR, QUEUEINVISIBLEATTR)); Map<String, String> attrs = result.getAttributes(); if (attrs.containsKey(QUEUELENGTHATTR)) { Stats.addMetric(StatsUtil.getStatsName("healthcheck", "ec2queue_length"), Integer.parseInt(attrs.get(QUEUELENGTHATTR))); logger.info("Ec2 queue length is {}", attrs.get(QUEUELENGTHATTR)); } if (attrs.containsKey(QUEUEINVISIBLEATTR)) { Stats.addMetric(StatsUtil.getStatsName("healthcheck", "ec2queue_in_processing"), Integer.parseInt(attrs.get("ApproximateNumberOfMessagesNotVisible"))); logger.info("Ec2 queue in processing length is {}", attrs.get(QUEUEINVISIBLEATTR)); } } catch (Exception ex) { logger.warn(ExceptionUtils.getRootCauseMessage(ex)); logger.warn(ExceptionUtils.getFullStackTrace(ex)); } }
@Test public void testGet() { // given QueueName qn = new QueueName("q1"); GetQueueUrlResult queueUrlResult = mock(GetQueueUrlResult.class); when(queueUrlResult.getQueueUrl()).thenReturn("url1"); GetQueueAttributesResult attributesResult = mock(GetQueueAttributesResult.class); HashMap<String, String> attributes = new HashMap<>(); attributes.put("1", "3"); attributes.put("hi", "ho"); when(attributesResult.getAttributes()).thenReturn(attributes); when(amazonSQS.getQueueUrl(any(GetQueueUrlRequest.class))).thenReturn(queueUrlResult); when(amazonSQS.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn( attributesResult); // when Queue queue = uut.get(qn); // then assertEquals("url1", queue.getUrl()); assertEquals("q1", queue.getName().getId()); assertEquals(attributes, queue.getQueueAttributes()); }
@Test public void testShouldReturnTotalNumberOfFailedEvents() { final SdkHttpMetadata responseMetadata = mock(SdkHttpMetadata.class); when(responseMetadata.getHttpStatusCode()).thenReturn(200); final String totalNumberOfFailedEvents = RandomStringUtils.randomNumeric(4); final Map<String, String> attributes = new HashMap<>(); attributes.put(QueueAttributeName.ApproximateNumberOfMessages.name(), totalNumberOfFailedEvents); final GetQueueAttributesResult getQueueAttributesResult = new GetQueueAttributesResult(); getQueueAttributesResult.setSdkHttpMetadata(responseMetadata); getQueueAttributesResult.setAttributes(attributes); when(amazonSQS.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(getQueueAttributesResult); assertThat(sqsFailedEventSource.getSize()).isEqualTo(Long.valueOf(totalNumberOfFailedEvents)); }
private void addPermissions() { if (permissions != null && permissions.isEmpty() == false) { GetQueueAttributesResult result = sqsClient .getQueueAttributes(new GetQueueAttributesRequest(queueUrl, Arrays.asList("Policy"))); AwsUtil.addPermissions(result.getAttributes(), permissions, new AwsUtil.AddPermissionHandler() { @Override public void execute(Permission p) { sqsClient.addPermission(new AddPermissionRequest() .withQueueUrl(queueUrl) .withLabel(p.getLabel()) .withAWSAccountIds(p.getAwsAccountIds()) .withActions(p.getActions())); } }); } }
public BufferedStringSqsQueueTest() { queue.setSendMessageTaskBuffer(sendMessageTaskBufferMock); queue.setDeleteMessageTaskBuffer(deleteMessageTaskBufferMock); queue.setChangeMessageVisibilityTaskBuffer(changeMessageVisibilityTaskBufferMock); when(requestSenderMock.sendRequest(any(GetQueueAttributesAction.class))).thenReturn(Single.just( new GetQueueAttributesResult().withAttributes(MutableSqsQueueAttributesTest.ATTRIBUTE_STRING_MAP) )); when(requestSenderMock.sendRequest(any(ReceiveMessagesAction.class))).thenReturn(Single.just( new ReceiveMessageResult().withMessages(SQS_MESSAGE) )); }
@Override public long size() { GetQueueAttributesResult attributes = client.getQueueAttributes(queueURL, Arrays.asList("ApproximateNumberOfMessages")); String sizeAsStr = attributes.getAttributes().get("ApproximateNumberOfMessages"); try { return Long.parseLong(sizeAsStr); } catch(Exception e) { return -1; } }
@Override public long getSize() { final GetQueueAttributesRequest getQueueAttributesRequest = new GetQueueAttributesRequest(queueUrl, Collections.singletonList(QueueAttributeName.ApproximateNumberOfMessages.name())); final GetQueueAttributesResult queueAttributes = amazonSQS.getQueueAttributes(getQueueAttributesRequest); if (queueAttributes.getAttributes() != null) { return Long.valueOf(queueAttributes.getAttributes().getOrDefault( QueueAttributeName.ApproximateNumberOfMessages.name(), "0")); } else { return 0L; } }
@Test public void testShouldReturnDefaultTotalNumberOfFailedEvents() { final SdkHttpMetadata responseMetadata = mock(SdkHttpMetadata.class); when(responseMetadata.getHttpStatusCode()).thenReturn(200); final GetQueueAttributesResult getQueueAttributesResult = new GetQueueAttributesResult(); getQueueAttributesResult.setSdkHttpMetadata(responseMetadata); getQueueAttributesResult.setAttributes(new HashMap<>()); when(amazonSQS.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(getQueueAttributesResult); assertThat(sqsFailedEventSource.getSize()).isEqualTo(Long.valueOf(0L)); }
@Test public void testShouldReturnDefaultTotalNumberOfFailedEventsWhenThereIsNoQueueAttributes() { final SdkHttpMetadata responseMetadata = mock(SdkHttpMetadata.class); when(responseMetadata.getHttpStatusCode()).thenReturn(200); final GetQueueAttributesResult getQueueAttributesResult = new GetQueueAttributesResult(); getQueueAttributesResult.setSdkHttpMetadata(responseMetadata); when(amazonSQS.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(getQueueAttributesResult); assertThat(sqsFailedEventSource.getSize()).isEqualTo(Long.valueOf(0L)); }
/** * Return the approximate number of visible messages in an SQS queue. * * @param client * SQS client * @param queueUrl * Queue URL * @return approximate number of visible messages */ private int getNumMessages() { try { final GetQueueAttributesResult result = sqs .getQueueAttributes(new GetQueueAttributesRequest(queueUrl) .withAttributeNames(NUM_MESSAGES_KEY)); final int count = Integer.parseInt( result.getAttributes().getOrDefault(NUM_MESSAGES_KEY, "0")); LOGGER.info("Approximately {} messages in queue", count); return count; } catch (Exception e) { LOGGER.error("Unable to get approximate number of messages", e); } return 0; }
@Test public void testWithDefaultTaskExecutorAndOneHandler() throws Exception { int testedMaxNumberOfMessages = 10; Map<QueueMessageHandler.MappingInformation, HandlerMethod> messageHandlerMethods = Collections.singletonMap( new QueueMessageHandler.MappingInformation(Collections.singleton("testQueue"), SqsMessageDeletionPolicy.ALWAYS), null); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); QueueMessageHandler mockedHandler = mock(QueueMessageHandler.class); AmazonSQSAsync mockedSqs = mock(AmazonSQSAsync.class, withSettings().stubOnly()); when(mockedSqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); when(mockedSqs.getQueueUrl(any(GetQueueUrlRequest.class))).thenReturn(new GetQueueUrlResult().withQueueUrl("testQueueUrl")); when(mockedHandler.getHandlerMethods()).thenReturn(messageHandlerMethods); container.setMaxNumberOfMessages(testedMaxNumberOfMessages); container.setAmazonSqs(mockedSqs); container.setMessageHandler(mockedHandler); container.afterPropertiesSet(); int expectedPoolMaxSize = messageHandlerMethods.size() * (testedMaxNumberOfMessages + 1); ThreadPoolTaskExecutor taskExecutor = (ThreadPoolTaskExecutor) container.getTaskExecutor(); assertNotNull(taskExecutor); assertEquals(expectedPoolMaxSize, taskExecutor.getMaxPoolSize()); }
@Bean public AmazonSQSAsync amazonSQS() { AmazonSQSAsync mockAmazonSQS = mock(AmazonSQSAsync.class, withSettings().stubOnly()); mockGetQueueUrl(mockAmazonSQS, "testQueue", "http://testQueue.amazonaws.com"); when(mockAmazonSQS.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(new ReceiveMessageResult()); when(mockAmazonSQS.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); return mockAmazonSQS; }
@Test public void receiveMessageRequests_withOneElement_created() throws Exception { AbstractMessageListenerContainer container = new StubAbstractMessageListenerContainer(); AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly()); QueueMessageHandler messageHandler = new QueueMessageHandler(); container.setAmazonSqs(mock); container.setMessageHandler(mock(QueueMessageHandler.class)); container.setMessageHandler(messageHandler); StaticApplicationContext applicationContext = new StaticApplicationContext(); applicationContext.registerSingleton("messageListener", MessageListener.class); container.setMaxNumberOfMessages(11); container.setVisibilityTimeout(22); container.setWaitTimeOut(33); messageHandler.setApplicationContext(applicationContext); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://testQueue.amazonaws.com")); when(mock.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); messageHandler.afterPropertiesSet(); container.afterPropertiesSet(); container.start(); Map<String, QueueAttributes> registeredQueues = container.getRegisteredQueues(); assertEquals("http://testQueue.amazonaws.com", registeredQueues.get("testQueue").getReceiveMessageRequest().getQueueUrl()); assertEquals(11L, registeredQueues.get("testQueue").getReceiveMessageRequest().getMaxNumberOfMessages().longValue()); assertEquals(22L, registeredQueues.get("testQueue").getReceiveMessageRequest().getVisibilityTimeout().longValue()); assertEquals(33L, registeredQueues.get("testQueue").getReceiveMessageRequest().getWaitTimeSeconds().longValue()); }
@Test public void receiveMessageRequests_withMultipleElements_created() throws Exception { AbstractMessageListenerContainer container = new StubAbstractMessageListenerContainer(); AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(mock); StaticApplicationContext applicationContext = new StaticApplicationContext(); QueueMessageHandler messageHandler = new QueueMessageHandler(); messageHandler.setApplicationContext(applicationContext); container.setMessageHandler(messageHandler); applicationContext.registerSingleton("messageListener", MessageListener.class); applicationContext.registerSingleton("anotherMessageListener", AnotherMessageListener.class); container.setMaxNumberOfMessages(11); container.setVisibilityTimeout(22); container.setWaitTimeOut(33); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://testQueue.amazonaws.com")); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("anotherTestQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://anotherTestQueue.amazonaws.com")); when(mock.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); messageHandler.afterPropertiesSet(); container.afterPropertiesSet(); container.start(); Map<String, QueueAttributes> registeredQueues = container.getRegisteredQueues(); assertEquals("http://testQueue.amazonaws.com", registeredQueues.get("testQueue").getReceiveMessageRequest().getQueueUrl()); assertEquals(11L, registeredQueues.get("testQueue").getReceiveMessageRequest().getMaxNumberOfMessages().longValue()); assertEquals(22L, registeredQueues.get("testQueue").getReceiveMessageRequest().getVisibilityTimeout().longValue()); assertEquals(33L, registeredQueues.get("testQueue").getReceiveMessageRequest().getWaitTimeSeconds().longValue()); assertEquals("http://anotherTestQueue.amazonaws.com", registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getQueueUrl()); assertEquals(11L, registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getMaxNumberOfMessages().longValue()); assertEquals(22L, registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getVisibilityTimeout().longValue()); assertEquals(33L, registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getWaitTimeSeconds().longValue()); }
@Test public void receiveMessageRequests_withDestinationResolverThrowingException_shouldLogWarningAndNotCreateRequest() throws Exception { // Arrange AbstractMessageListenerContainer container = new StubAbstractMessageListenerContainer(); Logger loggerMock = container.getLogger(); AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(mock); StaticApplicationContext applicationContext = new StaticApplicationContext(); QueueMessageHandler messageHandler = new QueueMessageHandler(); messageHandler.setApplicationContext(applicationContext); container.setMessageHandler(messageHandler); applicationContext.registerSingleton("messageListener", MessageListener.class); applicationContext.registerSingleton("anotherMessageListener", AnotherMessageListener.class); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))). thenThrow(new DestinationResolutionException("Queue not found")); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("anotherTestQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://anotherTestQueue.amazonaws.com")); when(mock.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); messageHandler.afterPropertiesSet(); container.afterPropertiesSet(); // Act container.start(); // Assert ArgumentCaptor<String> logMsgArgCaptor = ArgumentCaptor.forClass(String.class); verify(loggerMock).warn(logMsgArgCaptor.capture()); Map<String, QueueAttributes> registeredQueues = container.getRegisteredQueues(); assertFalse(registeredQueues.containsKey("testQueue")); assertEquals("Ignoring queue with name 'testQueue' as it does not exist.", logMsgArgCaptor.getValue()); assertEquals("http://anotherTestQueue.amazonaws.com", registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getQueueUrl()); }
private static String getSQSQueueARN(AmazonSQS amazonSQS, String queueURL) { // This statement will throw if the queue does not exist. GetQueueAttributesResult queueAttributes = amazonSQS.getQueueAttributes( new GetQueueAttributesRequest() .withQueueUrl(queueURL) .withAttributeNames(QueueAttributeName.QueueArn) ); return queueAttributes .getAttributes() .get(QueueAttributeName.QueueArn.name()); }
@Override public GetQueueAttributesResult getQueueAttributes(GetQueueAttributesRequest getQueueAttributesRequest) throws AmazonClientException { DirectorySQSQueue queue = getQueueFromUrl(getQueueAttributesRequest.getQueueUrl(), false); Map<String, String> attributes = Maps.newHashMap(); List<String> unsupported = Lists.newArrayList(); for (String attribute : getQueueAttributesRequest.getAttributeNames()) { switch (attribute) { case "QueueArn": attributes.put("QueueArn", BASE_ARN + queue.getQueuePath().getFileName()); break; case "All": case "ApproximateNumberOfMessages": case "ApproximateNumberOfMessagesNotVisible": case "VisibilityTimeout": case "CreatedTimestamp": case "LastModifiedTimestamp": case "Policy": case "MaximumMessageSize": case "MessageRetentionPeriod": case "ApproximateNumberOfMessagesDelayed": case "DelaySeconds": case "ReceiveMessageWaitTimeSeconds": default: unsupported.add(attribute); break; } } if (!unsupported.isEmpty()) { throw new UnsupportedOperationException("attributes not implemented: " + unsupported); } return new GetQueueAttributesResult().withAttributes(attributes); }
public void getQueueArnFromAttributes() { String queueName = someQueueName(); CreateQueueResult createQueueResult = _amazonSQS.createQueue(new CreateQueueRequest(queueName)); String queueUrl = createQueueResult.getQueueUrl(); List<String> requestedAttributes = ImmutableList.of("QueueArn"); GetQueueAttributesResult getQueueAttributesResult = _amazonSQS.getQueueAttributes(new GetQueueAttributesRequest() .withQueueUrl(queueUrl) .withAttributeNames(requestedAttributes)); Map<String, String> resultAttributes = getQueueAttributesResult.getAttributes(); String queueArn = resultAttributes.get("QueueArn"); String queueNameFromArn = queueArn.substring(queueArn.lastIndexOf(":") + 1); Assert.assertEquals(queueNameFromArn, queueName); }
public Map<String, String> getQueueAttributes(String url) throws MissingArgumentException { // find the queue arn, we need this to create the SNS subscription GetQueueAttributesRequest getQueueAttributesRequest = new GetQueueAttributesRequest(url); getQueueAttributesRequest.setAttributeNames(attributeNames); GetQueueAttributesResult attribResult = sqsClient.getQueueAttributes(getQueueAttributesRequest); Map<String, String> attribMap = attribResult.getAttributes(); if (!attribMap.containsKey(QUEUE_ARN_KEY)) { String msg = "Missing arn attirbute, tried attribute with name: " + QUEUE_ARN_KEY; logger.error(msg); throw new MissingArgumentException(msg); } return attribMap; }
private void resolveQueueArn() { GetQueueAttributesRequest request = new GetQueueAttributesRequest( queueUrl); GetQueueAttributesResult result = sqsClient.getQueueAttributes(request .withAttributeNames(Collections.singletonList(QUEUE_ARN_KEY))); queueArn = result.getAttributes().get(QUEUE_ARN_KEY); }
String getQueueARN() { GetQueueAttributesResult response = client.getQueueAttributes(queueURL, Arrays.asList("QueueArn")); return response.getAttributes().get("QueueArn"); }
public Observable<GetQueueAttributesResult> getQueueAttributesAsync(GetQueueAttributesRequest request) { return Observable.from(sqsClient.getQueueAttributesAsync(request)); }
public Observable<GetQueueAttributesResult> getQueueAttributesAsync(String queueUrl, List<String> attributeNames) { return Observable.from(sqsClient.getQueueAttributesAsync(queueUrl, attributeNames)); }
public int getNumberOfItems() { String key = "ApproximateNumberOfMessages"; List<String> attrib = Arrays.asList(key); GetQueueAttributesResult res = _sqs.getQueueAttributes(new GetQueueAttributesRequest(_queueURL,attrib)); return Integer.parseInt(res.getAttributes().get(key)); }
@Override public boolean load(GetQueueAttributesRequest request, ResultCapture<GetQueueAttributesResult> extractor) { return resource.load(request, extractor); }
@Test public void testSimpleReceiveMessage() throws Exception { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); AmazonSQSAsync sqs = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(sqs); CountDownLatch countDownLatch = new CountDownLatch(1); QueueMessageHandler messageHandler = new QueueMessageHandler() { @Override public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException { countDownLatch.countDown(); assertEquals("messageContent", message.getPayload()); } }; container.setMessageHandler(messageHandler); StaticApplicationContext applicationContext = new StaticApplicationContext(); applicationContext.registerSingleton("testMessageListener", TestMessageListener.class); messageHandler.setApplicationContext(applicationContext); container.setBeanName("testContainerName"); messageHandler.afterPropertiesSet(); mockGetQueueUrl(sqs, "testQueue", "http://testSimpleReceiveMessage.amazonaws.com"); mockGetQueueAttributesWithEmptyResult(sqs, "http://testSimpleReceiveMessage.amazonaws.com"); container.afterPropertiesSet(); when(sqs.receiveMessage(new ReceiveMessageRequest("http://testSimpleReceiveMessage.amazonaws.com").withAttributeNames("All") .withMessageAttributeNames("All") .withMaxNumberOfMessages(10))) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("messageContent"), new Message().withBody("messageContent"))) .thenReturn(new ReceiveMessageResult()); when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); container.start(); assertTrue(countDownLatch.await(1, TimeUnit.SECONDS)); container.stop(); }
@Test public void listener_withMultipleMessageHandlers_shouldBeCalled() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(2); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer() { @Override protected void executeMessage(org.springframework.messaging.Message<String> stringMessage) { countDownLatch.countDown(); super.executeMessage(stringMessage); } }; AmazonSQSAsync sqs = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(sqs); QueueMessageHandler messageHandler = new QueueMessageHandler(); container.setMessageHandler(messageHandler); StaticApplicationContext applicationContext = new StaticApplicationContext(); applicationContext.registerSingleton("testMessageListener", TestMessageListener.class); applicationContext.registerSingleton("anotherTestMessageListener", AnotherTestMessageListener.class); mockGetQueueUrl(sqs, "testQueue", "http://listener_withMultipleMessageHandlers_shouldBeCalled.amazonaws.com"); mockGetQueueAttributesWithEmptyResult(sqs, "http://listener_withMultipleMessageHandlers_shouldBeCalled.amazonaws.com"); mockGetQueueUrl(sqs, "anotherTestQueue", "http://listener_withMultipleMessageHandlers_shouldBeCalled.another.amazonaws.com"); mockGetQueueAttributesWithEmptyResult(sqs, "http://listener_withMultipleMessageHandlers_shouldBeCalled.another.amazonaws.com"); messageHandler.setApplicationContext(applicationContext); messageHandler.afterPropertiesSet(); container.afterPropertiesSet(); when(sqs.receiveMessage(new ReceiveMessageRequest("http://listener_withMultipleMessageHandlers_shouldBeCalled.amazonaws.com").withAttributeNames("All") .withMessageAttributeNames("All") .withMaxNumberOfMessages(10))) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("messageContent"))) .thenReturn(new ReceiveMessageResult()); when(sqs.receiveMessage(new ReceiveMessageRequest("http://listener_withMultipleMessageHandlers_shouldBeCalled.another.amazonaws.com").withAttributeNames("All") .withMessageAttributeNames("All") .withMaxNumberOfMessages(10))) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("anotherMessageContent"))) .thenReturn(new ReceiveMessageResult()); when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); container.start(); assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS)); container.stop(); assertEquals("messageContent", applicationContext.getBean(TestMessageListener.class).getMessage()); assertEquals("anotherMessageContent", applicationContext.getBean(AnotherTestMessageListener.class).getMessage()); }
@Test public void messageExecutor_withMessageWithAttributes_shouldPassThemAsHeaders() throws Exception { // Arrange CountDownLatch countDownLatch = new CountDownLatch(1); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer() { @Override protected void executeMessage(org.springframework.messaging.Message<String> stringMessage) { countDownLatch.countDown(); super.executeMessage(stringMessage); } }; AmazonSQSAsync sqs = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(sqs); QueueMessageHandler messageHandler = spy(new QueueMessageHandler()); container.setMessageHandler(messageHandler); StaticApplicationContext applicationContext = new StaticApplicationContext(); applicationContext.registerSingleton("testMessageListener", TestMessageListener.class); mockGetQueueUrl(sqs, "testQueue", "http://messageExecutor_withMessageWithAttributes_shouldPassThemAsHeaders.amazonaws.com"); mockGetQueueAttributesWithEmptyResult(sqs, "http://messageExecutor_withMessageWithAttributes_shouldPassThemAsHeaders.amazonaws.com"); messageHandler.setApplicationContext(applicationContext); messageHandler.afterPropertiesSet(); container.afterPropertiesSet(); when(sqs.receiveMessage(new ReceiveMessageRequest("http://messageExecutor_withMessageWithAttributes_shouldPassThemAsHeaders.amazonaws.com").withAttributeNames("All") .withMessageAttributeNames("All") .withMaxNumberOfMessages(10))) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("messageContent").withAttributes(Collections.singletonMap("SenderId", "ID")))) .thenReturn(new ReceiveMessageResult()); when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); // Act container.start(); // Assert assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS)); container.stop(); verify(messageHandler).handleMessage(this.stringMessageCaptor.capture()); assertEquals("ID", this.stringMessageCaptor.getValue().getHeaders().get("SenderId")); }
@Test public void messageExecutor_messageWithMimeTypeMessageAttribute_shouldSetItAsHeader() throws Exception { // Arrange CountDownLatch countDownLatch = new CountDownLatch(1); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer() { @Override protected void executeMessage(org.springframework.messaging.Message<String> stringMessage) { countDownLatch.countDown(); super.executeMessage(stringMessage); } }; AmazonSQSAsync sqs = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(sqs); QueueMessageHandler messageHandler = spy(new QueueMessageHandler()); container.setMessageHandler(messageHandler); StaticApplicationContext applicationContext = new StaticApplicationContext(); applicationContext.registerSingleton("testMessageListener", TestMessageListener.class); mockGetQueueUrl(sqs, "testQueue", "http://messageExecutor_messageWithMimeTypeMessageAttribute_shouldSetItAsHeader.amazonaws.com"); mockGetQueueAttributesWithEmptyResult(sqs, "http://messageExecutor_messageWithMimeTypeMessageAttribute_shouldSetItAsHeader.amazonaws.com"); messageHandler.setApplicationContext(applicationContext); messageHandler.afterPropertiesSet(); container.afterPropertiesSet(); MimeType mimeType = new MimeType("text", "plain", Charset.forName("UTF-8")); when(sqs.receiveMessage(new ReceiveMessageRequest("http://messageExecutor_messageWithMimeTypeMessageAttribute_shouldSetItAsHeader.amazonaws.com").withAttributeNames("All") .withMessageAttributeNames("All") .withMaxNumberOfMessages(10))) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("messageContent") .withAttributes(Collections.singletonMap("SenderId", "ID")) .withMessageAttributes(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, new MessageAttributeValue().withDataType("String") .withStringValue(mimeType.toString()))))) .thenReturn(new ReceiveMessageResult()); when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); // Act container.start(); // Assert assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS)); container.stop(); verify(messageHandler).handleMessage(this.stringMessageCaptor.capture()); assertEquals(mimeType, this.stringMessageCaptor.getValue().getHeaders().get(MessageHeaders.CONTENT_TYPE)); }
@Test public void receiveMessage_throwsAnException_operationShouldBeRetried() throws Exception { // Arrange Level previous = disableLogging(); AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class, withSettings().stubOnly()); when(amazonSqs.receiveMessage(any(ReceiveMessageRequest.class))).thenThrow(new RuntimeException("Boom!")) .thenReturn(new ReceiveMessageResult() .withMessages(new Message().withBody("messageContent"), new Message().withBody("messageContent"))); CountDownLatch countDownLatch = new CountDownLatch(1); QueueMessageHandler messageHandler = new QueueMessageHandler() { @Override public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException { countDownLatch.countDown(); assertEquals("messageContent", message.getPayload()); } }; StaticApplicationContext applicationContext = new StaticApplicationContext(); applicationContext.registerSingleton("testMessageListener", TestMessageListener.class); messageHandler.setApplicationContext(applicationContext); mockGetQueueUrl(amazonSqs, "testQueue", "http://receiveMessage_throwsAnException_operationShouldBeRetried.amazonaws.com"); messageHandler.afterPropertiesSet(); when(amazonSqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setBackOffTime(0); container.setAmazonSqs(amazonSqs); container.setMessageHandler(messageHandler); container.setAutoStartup(false); container.afterPropertiesSet(); // Act container.start(); // Assert assertTrue(countDownLatch.await(1, TimeUnit.SECONDS)); container.stop(); setLogLevel(previous); }
@Override public GetQueueAttributesResult getQueueAttributes(GetQueueAttributesRequest getQueueAttributesRequest) throws AmazonClientException { return new GetQueueAttributesResult(); }
private static void mockGetQueueAttributesWithRedrivePolicy(AmazonSQSAsync sqs, String queueUrl) { when(sqs.getQueueAttributes(new GetQueueAttributesRequest(queueUrl).withAttributeNames(QueueAttributeName.RedrivePolicy))). thenReturn(new GetQueueAttributesResult().addAttributesEntry(QueueAttributeName.RedrivePolicy.toString(), "{\"some\": \"JSON\"}")); }
private static void mockGetQueueAttributesWithEmptyResult(AmazonSQSAsync sqs, String queueUrl) { when(sqs.getQueueAttributes(new GetQueueAttributesRequest(queueUrl).withAttributeNames(QueueAttributeName.RedrivePolicy))). thenReturn(new GetQueueAttributesResult()); }
private static void allowSQSQueueToReceiveMessagesFromSNSTopic( AmazonSQS amazonSQS, String queueURL, String queueARN, String topicARN ) { GetQueueAttributesResult queueAttributesResult = amazonSQS.getQueueAttributes( new GetQueueAttributesRequest().withQueueUrl(queueURL).withAttributeNames( QueueAttributeName.Policy ) ); String policyJson = queueAttributesResult.getAttributes().get(QueueAttributeName.Policy.name()); final List<Statement> statements; if (policyJson != null) { statements = new ArrayList<>(Policy.fromJson(policyJson).getStatements()); } else { // no policies yet exist statements = new ArrayList<>(); } statements.add( new Statement(Statement.Effect.Allow) .withPrincipals(Principal.AllUsers) .withResources(new Resource(queueARN)) .withActions(SQSActions.SendMessage) .withConditions(ConditionFactory.newSourceArnCondition(topicARN)) ); Policy policy = new Policy(); policy.setStatements(statements); Map<String, String> queueAttributes = new HashMap<>(); queueAttributes.put(QueueAttributeName.Policy.name(), policy.toJson()); // Note that if the queue already has this policy, this will do nothing. amazonSQS.setQueueAttributes( new SetQueueAttributesRequest() .withQueueUrl(queueURL) .withAttributes(queueAttributes) ); }
/** * Makes a call to the service to load this resource's attributes if they * are not loaded yet, and use a ResultCapture to retrieve the low-level * client response * The following request parameters will be populated from the data of this * <code>Queue</code> resource, and any conflicting parameter value set in * the request will be overridden: * <ul> * <li> * <b><code>QueueUrl</code></b> * - mapped from the <code>Url</code> identifier. * </li> * <li> * <b><code>AttributeNames.0</code></b> * - constant value <code>All</code>. * </li> * </ul> * * <p> * * @return Returns {@code true} if the resource is not yet loaded when this * method was invoked, which indicates that a service call has been * made to retrieve the attributes. * @see GetQueueAttributesRequest */ boolean load(GetQueueAttributesRequest request, ResultCapture<GetQueueAttributesResult> extractor);