@Override public long size() throws Exception { // get all the attributes of the queue List<String> attributeNames = new ArrayList<String>(); attributeNames.add("All"); // list the attributes of the queue we are interested in GetQueueAttributesRequest request = new GetQueueAttributesRequest(queueUrl); request.setAttributeNames(attributeNames); Map<String, String> attributes = client.getQueueAttributes(request) .getAttributes(); int messages = Integer.parseInt(attributes .get("ApproximateNumberOfMessages")); //int messagesNotVisible = Integer.parseInt(attributes // .get("ApproximateNumberOfMessagesNotVisible")); return (long) messages; }
@Test public void testGet() { // given QueueName qn = new QueueName("q1"); GetQueueUrlResult queueUrlResult = mock(GetQueueUrlResult.class); when(queueUrlResult.getQueueUrl()).thenReturn("url1"); GetQueueAttributesResult attributesResult = mock(GetQueueAttributesResult.class); HashMap<String, String> attributes = new HashMap<>(); attributes.put("1", "3"); attributes.put("hi", "ho"); when(attributesResult.getAttributes()).thenReturn(attributes); when(amazonSQS.getQueueUrl(any(GetQueueUrlRequest.class))).thenReturn(queueUrlResult); when(amazonSQS.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn( attributesResult); // when Queue queue = uut.get(qn); // then assertEquals("url1", queue.getUrl()); assertEquals("q1", queue.getName().getId()); assertEquals(attributes, queue.getQueueAttributes()); }
@Test public void testShouldReturnTotalNumberOfFailedEvents() { final SdkHttpMetadata responseMetadata = mock(SdkHttpMetadata.class); when(responseMetadata.getHttpStatusCode()).thenReturn(200); final String totalNumberOfFailedEvents = RandomStringUtils.randomNumeric(4); final Map<String, String> attributes = new HashMap<>(); attributes.put(QueueAttributeName.ApproximateNumberOfMessages.name(), totalNumberOfFailedEvents); final GetQueueAttributesResult getQueueAttributesResult = new GetQueueAttributesResult(); getQueueAttributesResult.setSdkHttpMetadata(responseMetadata); getQueueAttributesResult.setAttributes(attributes); when(amazonSQS.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(getQueueAttributesResult); assertThat(sqsFailedEventSource.getSize()).isEqualTo(Long.valueOf(totalNumberOfFailedEvents)); }
public String subscribeQueueToTopic(String snsTopicArn, String sqsQueueUrl){ Map<String, String> queueAttributes = sqsClient.getQueueAttributes(new GetQueueAttributesRequest(sqsQueueUrl) .withAttributeNames(QueueAttributeName.QueueArn.toString())).getAttributes(); String sqsQueueArn = queueAttributes.get(QueueAttributeName.QueueArn.toString()); Policy policy = new Policy().withStatements( new Statement(Effect.Allow) .withId("topic-subscription-" + snsTopicArn) .withPrincipals(Principal.AllUsers) .withActions(SQSActions.SendMessage) .withResources(new Resource(sqsQueueArn)) .withConditions(ConditionFactory.newSourceArnCondition(snsTopicArn))); logger.debug("Policy: " + policy.toJson()); queueAttributes = new HashMap<String, String>(); queueAttributes.put(QueueAttributeName.Policy.toString(), policy.toJson()); sqsClient.setQueueAttributes(new SetQueueAttributesRequest(sqsQueueUrl, queueAttributes)); SubscribeResult subscribeResult = snsClient.subscribe(new SubscribeRequest() .withEndpoint(sqsQueueArn) .withProtocol("sqs") .withTopicArn(snsTopicArn)); return subscribeResult.getSubscriptionArn(); }
public static int getCount(String name) { try { String queueUrl = getConnection().createQueue( new CreateQueueRequest(name)).getQueueUrl(); List<String> attributeNames = new ArrayList<String>(); attributeNames.add("All"); // list the attributes of the queue we are interested in GetQueueAttributesRequest request = new GetQueueAttributesRequest(queueUrl); request.setAttributeNames(attributeNames); Map<String, String> attributes = sqs.getQueueAttributes(request).getAttributes(); int messages = Integer.parseInt(attributes.get("ApproximateNumberOfMessages")); //System.out.println("Messages in the queue: " + messages); return messages; } catch (Exception e) { e.printStackTrace(); return -1; } }
private void addPermissions() { if (permissions != null && permissions.isEmpty() == false) { GetQueueAttributesResult result = sqsClient .getQueueAttributes(new GetQueueAttributesRequest(queueUrl, Arrays.asList("Policy"))); AwsUtil.addPermissions(result.getAttributes(), permissions, new AwsUtil.AddPermissionHandler() { @Override public void execute(Permission p) { sqsClient.addPermission(new AddPermissionRequest() .withQueueUrl(queueUrl) .withLabel(p.getLabel()) .withAWSAccountIds(p.getAwsAccountIds()) .withActions(p.getActions())); } }); } }
@Test public void testNonInjectableMocks_shouldReturnNormal() { assertNotNull(sqs.changeMessageVisibilityBatch(new ChangeMessageVisibilityBatchRequest())); assertNotNull(sqs.addPermission(new AddPermissionRequest().withActions("one").withAWSAccountIds("two","three").withLabel("four").withQueueUrl("five"))); assertNotNull(sqs.listDeadLetterSourceQueues(new ListDeadLetterSourceQueuesRequest().withQueueUrl("ten"))); assertNotNull(sqs.getQueueAttributes(new GetQueueAttributesRequest().withAttributeNames(ImmutableList.of("eleven")).withQueueUrl("twelve"))); assertNotNull(sqs.setQueueAttributes(new SetQueueAttributesRequest().withAttributes(ImmutableMap.of("thirteen","fourteen")).withQueueUrl("fifteen"))); }
public Queue get(@NonNull QueueName queueName) { GetQueueUrlRequest urlRequest = new GetQueueUrlRequest().withQueueName(queueName.getId()); String queueUrl = amazonSQS.getQueueUrl(urlRequest).getQueueUrl(); GetQueueAttributesRequest attributesRequest = new GetQueueAttributesRequest(queueUrl, Collections.singletonList("All")); Map<String, String> attributes = amazonSQS.getQueueAttributes(attributesRequest) .getAttributes(); return new Queue(queueName, queueUrl, attributes); }
@Override public long getSize() { final GetQueueAttributesRequest getQueueAttributesRequest = new GetQueueAttributesRequest(queueUrl, Collections.singletonList(QueueAttributeName.ApproximateNumberOfMessages.name())); final GetQueueAttributesResult queueAttributes = amazonSQS.getQueueAttributes(getQueueAttributesRequest); if (queueAttributes.getAttributes() != null) { return Long.valueOf(queueAttributes.getAttributes().getOrDefault( QueueAttributeName.ApproximateNumberOfMessages.name(), "0")); } else { return 0L; } }
@Test public void testShouldReturnDefaultTotalNumberOfFailedEvents() { final SdkHttpMetadata responseMetadata = mock(SdkHttpMetadata.class); when(responseMetadata.getHttpStatusCode()).thenReturn(200); final GetQueueAttributesResult getQueueAttributesResult = new GetQueueAttributesResult(); getQueueAttributesResult.setSdkHttpMetadata(responseMetadata); getQueueAttributesResult.setAttributes(new HashMap<>()); when(amazonSQS.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(getQueueAttributesResult); assertThat(sqsFailedEventSource.getSize()).isEqualTo(Long.valueOf(0L)); }
@Test public void testShouldReturnDefaultTotalNumberOfFailedEventsWhenThereIsNoQueueAttributes() { final SdkHttpMetadata responseMetadata = mock(SdkHttpMetadata.class); when(responseMetadata.getHttpStatusCode()).thenReturn(200); final GetQueueAttributesResult getQueueAttributesResult = new GetQueueAttributesResult(); getQueueAttributesResult.setSdkHttpMetadata(responseMetadata); when(amazonSQS.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(getQueueAttributesResult); assertThat(sqsFailedEventSource.getSize()).isEqualTo(Long.valueOf(0L)); }
/** * Return the approximate number of visible messages in an SQS queue. * * @param client * SQS client * @param queueUrl * Queue URL * @return approximate number of visible messages */ private int getNumMessages() { try { final GetQueueAttributesResult result = sqs .getQueueAttributes(new GetQueueAttributesRequest(queueUrl) .withAttributeNames(NUM_MESSAGES_KEY)); final int count = Integer.parseInt( result.getAttributes().getOrDefault(NUM_MESSAGES_KEY, "0")); LOGGER.info("Approximately {} messages in queue", count); return count; } catch (Exception e) { LOGGER.error("Unable to get approximate number of messages", e); } return 0; }
public static int getQueueSize(AmazonSQS sqs, String queueUrl){ HashMap<String, String> attributes; Collection<String> attributeNames = new ArrayList<String>(); attributeNames.add("ApproximateNumberOfMessages"); GetQueueAttributesRequest getAttributesRequest = new GetQueueAttributesRequest(queueUrl) .withAttributeNames(attributeNames); attributes = (HashMap<String, String>) sqs.getQueueAttributes(getAttributesRequest).getAttributes(); return Integer.valueOf(attributes.get("ApproximateNumberOfMessages")); }
public int getApproximateQueueSize() { Collection<String> attributeNames = new ArrayList<String>(); attributeNames.add("ApproximateNumberOfMessages"); GetQueueAttributesRequest queueAttributesRequest = new GetQueueAttributesRequest(queueURL).withAttributeNames(attributeNames); Map<String, String> attributes = sqs.getQueueAttributes(queueAttributesRequest).getAttributes(); return Integer.valueOf(attributes.get("ApproximateNumberOfMessages")); }
public int getApproximateNotVisibleMessageNum() { Collection<String> attributeNames = new ArrayList<String>(); attributeNames.add("ApproximateNumberOfMessagesNotVisible"); GetQueueAttributesRequest queueAttributesRequest = new GetQueueAttributesRequest(queueURL).withAttributeNames(attributeNames); Map<String, String> attributes = sqs.getQueueAttributes(queueAttributesRequest).getAttributes(); return Integer.valueOf(attributes.get("ApproximateNumberOfMessagesNotVisible")); }
@Test public void testWithDefaultTaskExecutorAndOneHandler() throws Exception { int testedMaxNumberOfMessages = 10; Map<QueueMessageHandler.MappingInformation, HandlerMethod> messageHandlerMethods = Collections.singletonMap( new QueueMessageHandler.MappingInformation(Collections.singleton("testQueue"), SqsMessageDeletionPolicy.ALWAYS), null); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); QueueMessageHandler mockedHandler = mock(QueueMessageHandler.class); AmazonSQSAsync mockedSqs = mock(AmazonSQSAsync.class, withSettings().stubOnly()); when(mockedSqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); when(mockedSqs.getQueueUrl(any(GetQueueUrlRequest.class))).thenReturn(new GetQueueUrlResult().withQueueUrl("testQueueUrl")); when(mockedHandler.getHandlerMethods()).thenReturn(messageHandlerMethods); container.setMaxNumberOfMessages(testedMaxNumberOfMessages); container.setAmazonSqs(mockedSqs); container.setMessageHandler(mockedHandler); container.afterPropertiesSet(); int expectedPoolMaxSize = messageHandlerMethods.size() * (testedMaxNumberOfMessages + 1); ThreadPoolTaskExecutor taskExecutor = (ThreadPoolTaskExecutor) container.getTaskExecutor(); assertNotNull(taskExecutor); assertEquals(expectedPoolMaxSize, taskExecutor.getMaxPoolSize()); }
@Bean public AmazonSQSAsync amazonSQS() { AmazonSQSAsync mockAmazonSQS = mock(AmazonSQSAsync.class, withSettings().stubOnly()); mockGetQueueUrl(mockAmazonSQS, "testQueue", "http://testQueue.amazonaws.com"); when(mockAmazonSQS.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(new ReceiveMessageResult()); when(mockAmazonSQS.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); return mockAmazonSQS; }
@Test public void receiveMessageRequests_withOneElement_created() throws Exception { AbstractMessageListenerContainer container = new StubAbstractMessageListenerContainer(); AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly()); QueueMessageHandler messageHandler = new QueueMessageHandler(); container.setAmazonSqs(mock); container.setMessageHandler(mock(QueueMessageHandler.class)); container.setMessageHandler(messageHandler); StaticApplicationContext applicationContext = new StaticApplicationContext(); applicationContext.registerSingleton("messageListener", MessageListener.class); container.setMaxNumberOfMessages(11); container.setVisibilityTimeout(22); container.setWaitTimeOut(33); messageHandler.setApplicationContext(applicationContext); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://testQueue.amazonaws.com")); when(mock.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); messageHandler.afterPropertiesSet(); container.afterPropertiesSet(); container.start(); Map<String, QueueAttributes> registeredQueues = container.getRegisteredQueues(); assertEquals("http://testQueue.amazonaws.com", registeredQueues.get("testQueue").getReceiveMessageRequest().getQueueUrl()); assertEquals(11L, registeredQueues.get("testQueue").getReceiveMessageRequest().getMaxNumberOfMessages().longValue()); assertEquals(22L, registeredQueues.get("testQueue").getReceiveMessageRequest().getVisibilityTimeout().longValue()); assertEquals(33L, registeredQueues.get("testQueue").getReceiveMessageRequest().getWaitTimeSeconds().longValue()); }
@Test public void receiveMessageRequests_withMultipleElements_created() throws Exception { AbstractMessageListenerContainer container = new StubAbstractMessageListenerContainer(); AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(mock); StaticApplicationContext applicationContext = new StaticApplicationContext(); QueueMessageHandler messageHandler = new QueueMessageHandler(); messageHandler.setApplicationContext(applicationContext); container.setMessageHandler(messageHandler); applicationContext.registerSingleton("messageListener", MessageListener.class); applicationContext.registerSingleton("anotherMessageListener", AnotherMessageListener.class); container.setMaxNumberOfMessages(11); container.setVisibilityTimeout(22); container.setWaitTimeOut(33); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://testQueue.amazonaws.com")); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("anotherTestQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://anotherTestQueue.amazonaws.com")); when(mock.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); messageHandler.afterPropertiesSet(); container.afterPropertiesSet(); container.start(); Map<String, QueueAttributes> registeredQueues = container.getRegisteredQueues(); assertEquals("http://testQueue.amazonaws.com", registeredQueues.get("testQueue").getReceiveMessageRequest().getQueueUrl()); assertEquals(11L, registeredQueues.get("testQueue").getReceiveMessageRequest().getMaxNumberOfMessages().longValue()); assertEquals(22L, registeredQueues.get("testQueue").getReceiveMessageRequest().getVisibilityTimeout().longValue()); assertEquals(33L, registeredQueues.get("testQueue").getReceiveMessageRequest().getWaitTimeSeconds().longValue()); assertEquals("http://anotherTestQueue.amazonaws.com", registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getQueueUrl()); assertEquals(11L, registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getMaxNumberOfMessages().longValue()); assertEquals(22L, registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getVisibilityTimeout().longValue()); assertEquals(33L, registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getWaitTimeSeconds().longValue()); }
@Test public void receiveMessageRequests_withDestinationResolverThrowingException_shouldLogWarningAndNotCreateRequest() throws Exception { // Arrange AbstractMessageListenerContainer container = new StubAbstractMessageListenerContainer(); Logger loggerMock = container.getLogger(); AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(mock); StaticApplicationContext applicationContext = new StaticApplicationContext(); QueueMessageHandler messageHandler = new QueueMessageHandler(); messageHandler.setApplicationContext(applicationContext); container.setMessageHandler(messageHandler); applicationContext.registerSingleton("messageListener", MessageListener.class); applicationContext.registerSingleton("anotherMessageListener", AnotherMessageListener.class); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))). thenThrow(new DestinationResolutionException("Queue not found")); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("anotherTestQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://anotherTestQueue.amazonaws.com")); when(mock.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); messageHandler.afterPropertiesSet(); container.afterPropertiesSet(); // Act container.start(); // Assert ArgumentCaptor<String> logMsgArgCaptor = ArgumentCaptor.forClass(String.class); verify(loggerMock).warn(logMsgArgCaptor.capture()); Map<String, QueueAttributes> registeredQueues = container.getRegisteredQueues(); assertFalse(registeredQueues.containsKey("testQueue")); assertEquals("Ignoring queue with name 'testQueue' as it does not exist.", logMsgArgCaptor.getValue()); assertEquals("http://anotherTestQueue.amazonaws.com", registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getQueueUrl()); }
private static String getSQSQueueARN(AmazonSQS amazonSQS, String queueURL) { // This statement will throw if the queue does not exist. GetQueueAttributesResult queueAttributes = amazonSQS.getQueueAttributes( new GetQueueAttributesRequest() .withQueueUrl(queueURL) .withAttributeNames(QueueAttributeName.QueueArn) ); return queueAttributes .getAttributes() .get(QueueAttributeName.QueueArn.name()); }
@Override public GetQueueAttributesResult getQueueAttributes(GetQueueAttributesRequest getQueueAttributesRequest) throws AmazonClientException { DirectorySQSQueue queue = getQueueFromUrl(getQueueAttributesRequest.getQueueUrl(), false); Map<String, String> attributes = Maps.newHashMap(); List<String> unsupported = Lists.newArrayList(); for (String attribute : getQueueAttributesRequest.getAttributeNames()) { switch (attribute) { case "QueueArn": attributes.put("QueueArn", BASE_ARN + queue.getQueuePath().getFileName()); break; case "All": case "ApproximateNumberOfMessages": case "ApproximateNumberOfMessagesNotVisible": case "VisibilityTimeout": case "CreatedTimestamp": case "LastModifiedTimestamp": case "Policy": case "MaximumMessageSize": case "MessageRetentionPeriod": case "ApproximateNumberOfMessagesDelayed": case "DelaySeconds": case "ReceiveMessageWaitTimeSeconds": default: unsupported.add(attribute); break; } } if (!unsupported.isEmpty()) { throw new UnsupportedOperationException("attributes not implemented: " + unsupported); } return new GetQueueAttributesResult().withAttributes(attributes); }
public void getQueueArnFromAttributes() { String queueName = someQueueName(); CreateQueueResult createQueueResult = _amazonSQS.createQueue(new CreateQueueRequest(queueName)); String queueUrl = createQueueResult.getQueueUrl(); List<String> requestedAttributes = ImmutableList.of("QueueArn"); GetQueueAttributesResult getQueueAttributesResult = _amazonSQS.getQueueAttributes(new GetQueueAttributesRequest() .withQueueUrl(queueUrl) .withAttributeNames(requestedAttributes)); Map<String, String> resultAttributes = getQueueAttributesResult.getAttributes(); String queueArn = resultAttributes.get("QueueArn"); String queueNameFromArn = queueArn.substring(queueArn.lastIndexOf(":") + 1); Assert.assertEquals(queueNameFromArn, queueName); }
public Map<String, String> getQueueAttributes(String url) throws MissingArgumentException { // find the queue arn, we need this to create the SNS subscription GetQueueAttributesRequest getQueueAttributesRequest = new GetQueueAttributesRequest(url); getQueueAttributesRequest.setAttributeNames(attributeNames); GetQueueAttributesResult attribResult = sqsClient.getQueueAttributes(getQueueAttributesRequest); Map<String, String> attribMap = attribResult.getAttributes(); if (!attribMap.containsKey(QUEUE_ARN_KEY)) { String msg = "Missing arn attirbute, tried attribute with name: " + QUEUE_ARN_KEY; logger.error(msg); throw new MissingArgumentException(msg); } return attribMap; }
private void resolveQueueArn() { GetQueueAttributesRequest request = new GetQueueAttributesRequest( queueUrl); GetQueueAttributesResult result = sqsClient.getQueueAttributes(request .withAttributeNames(Collections.singletonList(QUEUE_ARN_KEY))); queueArn = result.getAttributes().get(QUEUE_ARN_KEY); }
@VisibleForTesting static GetQueueAttributesRequest createRequest(String queueUrl) { return new GetQueueAttributesRequest(queueUrl, Collections.singletonList("All")); }
@Test public void testCreateRequest() { GetQueueAttributesRequest request = GetQueueAttributesAction.createRequest(QUEUE_URL); assertThat(request.getQueueUrl()).isEqualTo(QUEUE_URL); assertThat(request.getAttributeNames()).isEqualTo(Collections.singletonList("All")); }
public Observable<GetQueueAttributesResult> getQueueAttributesAsync(GetQueueAttributesRequest request) { return Observable.from(sqsClient.getQueueAttributesAsync(request)); }
public int getNumberOfItems() { String key = "ApproximateNumberOfMessages"; List<String> attrib = Arrays.asList(key); GetQueueAttributesResult res = _sqs.getQueueAttributes(new GetQueueAttributesRequest(_queueURL,attrib)); return Integer.parseInt(res.getAttributes().get(key)); }
@Override public boolean load(GetQueueAttributesRequest request) { return load(request, null); }
@Override public boolean load(GetQueueAttributesRequest request, ResultCapture<GetQueueAttributesResult> extractor) { return resource.load(request, extractor); }
@Test public void testSimpleReceiveMessage() throws Exception { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); AmazonSQSAsync sqs = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(sqs); CountDownLatch countDownLatch = new CountDownLatch(1); QueueMessageHandler messageHandler = new QueueMessageHandler() { @Override public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException { countDownLatch.countDown(); assertEquals("messageContent", message.getPayload()); } }; container.setMessageHandler(messageHandler); StaticApplicationContext applicationContext = new StaticApplicationContext(); applicationContext.registerSingleton("testMessageListener", TestMessageListener.class); messageHandler.setApplicationContext(applicationContext); container.setBeanName("testContainerName"); messageHandler.afterPropertiesSet(); mockGetQueueUrl(sqs, "testQueue", "http://testSimpleReceiveMessage.amazonaws.com"); mockGetQueueAttributesWithEmptyResult(sqs, "http://testSimpleReceiveMessage.amazonaws.com"); container.afterPropertiesSet(); when(sqs.receiveMessage(new ReceiveMessageRequest("http://testSimpleReceiveMessage.amazonaws.com").withAttributeNames("All") .withMessageAttributeNames("All") .withMaxNumberOfMessages(10))) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("messageContent"), new Message().withBody("messageContent"))) .thenReturn(new ReceiveMessageResult()); when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); container.start(); assertTrue(countDownLatch.await(1, TimeUnit.SECONDS)); container.stop(); }
@Test public void listener_withMultipleMessageHandlers_shouldBeCalled() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(2); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer() { @Override protected void executeMessage(org.springframework.messaging.Message<String> stringMessage) { countDownLatch.countDown(); super.executeMessage(stringMessage); } }; AmazonSQSAsync sqs = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(sqs); QueueMessageHandler messageHandler = new QueueMessageHandler(); container.setMessageHandler(messageHandler); StaticApplicationContext applicationContext = new StaticApplicationContext(); applicationContext.registerSingleton("testMessageListener", TestMessageListener.class); applicationContext.registerSingleton("anotherTestMessageListener", AnotherTestMessageListener.class); mockGetQueueUrl(sqs, "testQueue", "http://listener_withMultipleMessageHandlers_shouldBeCalled.amazonaws.com"); mockGetQueueAttributesWithEmptyResult(sqs, "http://listener_withMultipleMessageHandlers_shouldBeCalled.amazonaws.com"); mockGetQueueUrl(sqs, "anotherTestQueue", "http://listener_withMultipleMessageHandlers_shouldBeCalled.another.amazonaws.com"); mockGetQueueAttributesWithEmptyResult(sqs, "http://listener_withMultipleMessageHandlers_shouldBeCalled.another.amazonaws.com"); messageHandler.setApplicationContext(applicationContext); messageHandler.afterPropertiesSet(); container.afterPropertiesSet(); when(sqs.receiveMessage(new ReceiveMessageRequest("http://listener_withMultipleMessageHandlers_shouldBeCalled.amazonaws.com").withAttributeNames("All") .withMessageAttributeNames("All") .withMaxNumberOfMessages(10))) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("messageContent"))) .thenReturn(new ReceiveMessageResult()); when(sqs.receiveMessage(new ReceiveMessageRequest("http://listener_withMultipleMessageHandlers_shouldBeCalled.another.amazonaws.com").withAttributeNames("All") .withMessageAttributeNames("All") .withMaxNumberOfMessages(10))) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("anotherMessageContent"))) .thenReturn(new ReceiveMessageResult()); when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); container.start(); assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS)); container.stop(); assertEquals("messageContent", applicationContext.getBean(TestMessageListener.class).getMessage()); assertEquals("anotherMessageContent", applicationContext.getBean(AnotherTestMessageListener.class).getMessage()); }
@Test public void messageExecutor_withMessageWithAttributes_shouldPassThemAsHeaders() throws Exception { // Arrange CountDownLatch countDownLatch = new CountDownLatch(1); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer() { @Override protected void executeMessage(org.springframework.messaging.Message<String> stringMessage) { countDownLatch.countDown(); super.executeMessage(stringMessage); } }; AmazonSQSAsync sqs = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(sqs); QueueMessageHandler messageHandler = spy(new QueueMessageHandler()); container.setMessageHandler(messageHandler); StaticApplicationContext applicationContext = new StaticApplicationContext(); applicationContext.registerSingleton("testMessageListener", TestMessageListener.class); mockGetQueueUrl(sqs, "testQueue", "http://messageExecutor_withMessageWithAttributes_shouldPassThemAsHeaders.amazonaws.com"); mockGetQueueAttributesWithEmptyResult(sqs, "http://messageExecutor_withMessageWithAttributes_shouldPassThemAsHeaders.amazonaws.com"); messageHandler.setApplicationContext(applicationContext); messageHandler.afterPropertiesSet(); container.afterPropertiesSet(); when(sqs.receiveMessage(new ReceiveMessageRequest("http://messageExecutor_withMessageWithAttributes_shouldPassThemAsHeaders.amazonaws.com").withAttributeNames("All") .withMessageAttributeNames("All") .withMaxNumberOfMessages(10))) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("messageContent").withAttributes(Collections.singletonMap("SenderId", "ID")))) .thenReturn(new ReceiveMessageResult()); when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); // Act container.start(); // Assert assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS)); container.stop(); verify(messageHandler).handleMessage(this.stringMessageCaptor.capture()); assertEquals("ID", this.stringMessageCaptor.getValue().getHeaders().get("SenderId")); }
@Test public void messageExecutor_messageWithMimeTypeMessageAttribute_shouldSetItAsHeader() throws Exception { // Arrange CountDownLatch countDownLatch = new CountDownLatch(1); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer() { @Override protected void executeMessage(org.springframework.messaging.Message<String> stringMessage) { countDownLatch.countDown(); super.executeMessage(stringMessage); } }; AmazonSQSAsync sqs = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(sqs); QueueMessageHandler messageHandler = spy(new QueueMessageHandler()); container.setMessageHandler(messageHandler); StaticApplicationContext applicationContext = new StaticApplicationContext(); applicationContext.registerSingleton("testMessageListener", TestMessageListener.class); mockGetQueueUrl(sqs, "testQueue", "http://messageExecutor_messageWithMimeTypeMessageAttribute_shouldSetItAsHeader.amazonaws.com"); mockGetQueueAttributesWithEmptyResult(sqs, "http://messageExecutor_messageWithMimeTypeMessageAttribute_shouldSetItAsHeader.amazonaws.com"); messageHandler.setApplicationContext(applicationContext); messageHandler.afterPropertiesSet(); container.afterPropertiesSet(); MimeType mimeType = new MimeType("text", "plain", Charset.forName("UTF-8")); when(sqs.receiveMessage(new ReceiveMessageRequest("http://messageExecutor_messageWithMimeTypeMessageAttribute_shouldSetItAsHeader.amazonaws.com").withAttributeNames("All") .withMessageAttributeNames("All") .withMaxNumberOfMessages(10))) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("messageContent") .withAttributes(Collections.singletonMap("SenderId", "ID")) .withMessageAttributes(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, new MessageAttributeValue().withDataType("String") .withStringValue(mimeType.toString()))))) .thenReturn(new ReceiveMessageResult()); when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); // Act container.start(); // Assert assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS)); container.stop(); verify(messageHandler).handleMessage(this.stringMessageCaptor.capture()); assertEquals(mimeType, this.stringMessageCaptor.getValue().getHeaders().get(MessageHeaders.CONTENT_TYPE)); }
@Test public void receiveMessage_throwsAnException_operationShouldBeRetried() throws Exception { // Arrange Level previous = disableLogging(); AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class, withSettings().stubOnly()); when(amazonSqs.receiveMessage(any(ReceiveMessageRequest.class))).thenThrow(new RuntimeException("Boom!")) .thenReturn(new ReceiveMessageResult() .withMessages(new Message().withBody("messageContent"), new Message().withBody("messageContent"))); CountDownLatch countDownLatch = new CountDownLatch(1); QueueMessageHandler messageHandler = new QueueMessageHandler() { @Override public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException { countDownLatch.countDown(); assertEquals("messageContent", message.getPayload()); } }; StaticApplicationContext applicationContext = new StaticApplicationContext(); applicationContext.registerSingleton("testMessageListener", TestMessageListener.class); messageHandler.setApplicationContext(applicationContext); mockGetQueueUrl(amazonSqs, "testQueue", "http://receiveMessage_throwsAnException_operationShouldBeRetried.amazonaws.com"); messageHandler.afterPropertiesSet(); when(amazonSqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setBackOffTime(0); container.setAmazonSqs(amazonSqs); container.setMessageHandler(messageHandler); container.setAutoStartup(false); container.afterPropertiesSet(); // Act container.start(); // Assert assertTrue(countDownLatch.await(1, TimeUnit.SECONDS)); container.stop(); setLogLevel(previous); }
@Override public GetQueueAttributesResult getQueueAttributes(GetQueueAttributesRequest getQueueAttributesRequest) throws AmazonClientException { return new GetQueueAttributesResult(); }
private static void mockGetQueueAttributesWithRedrivePolicy(AmazonSQSAsync sqs, String queueUrl) { when(sqs.getQueueAttributes(new GetQueueAttributesRequest(queueUrl).withAttributeNames(QueueAttributeName.RedrivePolicy))). thenReturn(new GetQueueAttributesResult().addAttributesEntry(QueueAttributeName.RedrivePolicy.toString(), "{\"some\": \"JSON\"}")); }
private static void mockGetQueueAttributesWithEmptyResult(AmazonSQSAsync sqs, String queueUrl) { when(sqs.getQueueAttributes(new GetQueueAttributesRequest(queueUrl).withAttributeNames(QueueAttributeName.RedrivePolicy))). thenReturn(new GetQueueAttributesResult()); }
private static void allowSQSQueueToReceiveMessagesFromSNSTopic( AmazonSQS amazonSQS, String queueURL, String queueARN, String topicARN ) { GetQueueAttributesResult queueAttributesResult = amazonSQS.getQueueAttributes( new GetQueueAttributesRequest().withQueueUrl(queueURL).withAttributeNames( QueueAttributeName.Policy ) ); String policyJson = queueAttributesResult.getAttributes().get(QueueAttributeName.Policy.name()); final List<Statement> statements; if (policyJson != null) { statements = new ArrayList<>(Policy.fromJson(policyJson).getStatements()); } else { // no policies yet exist statements = new ArrayList<>(); } statements.add( new Statement(Statement.Effect.Allow) .withPrincipals(Principal.AllUsers) .withResources(new Resource(queueARN)) .withActions(SQSActions.SendMessage) .withConditions(ConditionFactory.newSourceArnCondition(topicARN)) ); Policy policy = new Policy(); policy.setStatements(statements); Map<String, String> queueAttributes = new HashMap<>(); queueAttributes.put(QueueAttributeName.Policy.name(), policy.toJson()); // Note that if the queue already has this policy, this will do nothing. amazonSQS.setQueueAttributes( new SetQueueAttributesRequest() .withQueueUrl(queueURL) .withAttributes(queueAttributes) ); }