@Test public void testGet() { // given QueueName qn = new QueueName("q1"); GetQueueUrlResult queueUrlResult = mock(GetQueueUrlResult.class); when(queueUrlResult.getQueueUrl()).thenReturn("url1"); GetQueueAttributesResult attributesResult = mock(GetQueueAttributesResult.class); HashMap<String, String> attributes = new HashMap<>(); attributes.put("1", "3"); attributes.put("hi", "ho"); when(attributesResult.getAttributes()).thenReturn(attributes); when(amazonSQS.getQueueUrl(any(GetQueueUrlRequest.class))).thenReturn(queueUrlResult); when(amazonSQS.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn( attributesResult); // when Queue queue = uut.get(qn); // then assertEquals("url1", queue.getUrl()); assertEquals("q1", queue.getName().getId()); assertEquals(attributes, queue.getQueueAttributes()); }
@Test public void sendAndReceiveMessage() { final String queueName = "bizo"; final String messageBody = "hi everybody"; final CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName); sqs.createQueue(createQueueRequest); final GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest().withQueueName(queueName); final GetQueueUrlResult getQueueUrlResult = sqs.getQueueUrl(getQueueUrlRequest); final String queueUrl = getQueueUrlResult.getQueueUrl(); final SendMessageRequest sendMessageRequest = new SendMessageRequest().withQueueUrl(queueUrl).withMessageBody(messageBody); sqs.sendMessage(sendMessageRequest); final int maxNumberOfMessages = 10; final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(maxNumberOfMessages); final ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(receiveMessageRequest); final List<Message> messages = receiveMessageResult.getMessages(); assertThat(messages.size(), equalTo(1)); assertThat(messages.get(0).getBody(), equalTo(messageBody)); }
@Test public void shouldCreateSqsQueueResource_withName() throws Exception { // Given final String name = randomString(); final String queueUrl = randomString(); final GetQueueUrlResult getQueueUrlResult = new GetQueueUrlResult().withQueueUrl(queueUrl); final GetQueueUrlRequest expectedGetQueueUrlRequest = new GetQueueUrlRequest(name); when(mockAmazonSqsClient.getQueueUrl(expectedGetQueueUrlRequest)).thenReturn(getQueueUrlResult); final SqsQueueResource mockSqsQueueResource = mock(SqsQueueResource.class); whenNew(SqsQueueResource.class).withArguments(name, queueUrl, mockAmazonSqsClient) .thenReturn(mockSqsQueueResource); // When final SqsQueueResource result = factory.createSqsQueueResource(name); // Then assertSame(mockSqsQueueResource, result); }
@Override public String resolveDestination(String name) throws DestinationResolutionException { String queueName = name; if (this.resourceIdResolver != null) { queueName = this.resourceIdResolver.resolveToPhysicalResourceId(name); } if (isValidQueueUrl(queueName)) { return queueName; } if (this.autoCreate) { //Auto-create is fine to be called even if the queue exists. CreateQueueResult createQueueResult = this.amazonSqs.createQueue(new CreateQueueRequest(queueName)); return createQueueResult.getQueueUrl(); } else { try { GetQueueUrlResult getQueueUrlResult = this.amazonSqs.getQueueUrl(new GetQueueUrlRequest(queueName)); return getQueueUrlResult.getQueueUrl(); } catch (QueueDoesNotExistException e) { throw new DestinationResolutionException(e.getMessage(), e); } } }
@Test public void testIsActive() throws Exception { AbstractMessageListenerContainer container = new StubAbstractMessageListenerContainer(); AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(mock); container.setMessageHandler(mock(QueueMessageHandler.class)); container.afterPropertiesSet(); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://testQueue.amazonaws.com")); container.start(); assertTrue(container.isRunning()); container.stop(); assertFalse(container.isRunning()); //Container can still be active an restarted later (e.g. paused for a while) assertTrue(container.isActive()); }
@Override public PublishResult publish(PublishRequest publishRequest) throws AmazonClientException { String topicArn = publishRequest.getTopicArn(); if (!_subscriptionsForTopic.containsKey(topicArn)) { throw new NotFoundException("no such topic " + topicArn); } List<Subscription> topicSubscriptions = FluentIterable. from(_subscriptionsForTopic.get(topicArn)). transform(Functions.forMap(_subscriptionsByArn)). toList(); for (Subscription subscription : topicSubscriptions) { String queueName = getLast(subscription.getEndpoint().split(":")); String queueUrl = _sqsClient. getQueueUrl(new GetQueueUrlRequest().withQueueName(queueName)). getQueueUrl(); _sqsClient.sendMessage(new SendMessageRequest(). withQueueUrl(queueUrl). withMessageBody(publishRequest.getMessage())); } return new PublishResult(); }
public void initalize(URI uri, Configuration conf) { this.conf = conf; String keyId = conf.get("fs."+uri.getScheme()+".awsAccessKeyId"); String keySecret = conf.get("fs."+uri.getScheme()+".awsSecretAccessKey"); //An override option for accessing across accounts keyId = conf.get("fs."+uri.getScheme()+".override.awsAccessKeyId", keyId); keySecret = conf.get("fs."+uri.getScheme()+".override.awsSecretAccessKey", keySecret); sqs = new AmazonSQSClient(new BasicAWSCredentials(keyId, keySecret)); //SQS Consistency Queue consistencyQueue = conf.get("fs"+uri.getScheme()+".alert.sqs.queue", consistencyQueue); consistencyQueue = sqs.getQueueUrl(new GetQueueUrlRequest(consistencyQueue)).getQueueUrl(); //SQS Timeout Queue timeoutQueue = conf.get("fs"+uri.getScheme()+".timeout.sqs.queue", timeoutQueue); timeoutQueue = sqs.getQueueUrl(new GetQueueUrlRequest(timeoutQueue)).getQueueUrl(); }
private void initSqs(String keyId, String keySecret) { log.debug("Initializing SQS Client"); sqs = new AmazonSQSClient(new BasicAWSCredentials(keyId, keySecret)); //SQS Consistency Queue consistencyQueue = conf.get("s3mper.alert.sqs.queue", consistencyQueue); consistencyQueueUrl = sqs.getQueueUrl(new GetQueueUrlRequest(consistencyQueue)).getQueueUrl(); //SQS Timeout Queue timeoutQueue = conf.get("s3mper.timeout.sqs.queue", timeoutQueue); timeoutQueueUrl = sqs.getQueueUrl(new GetQueueUrlRequest(timeoutQueue)).getQueueUrl(); //SQS Notification Queue notificationQueue = conf.get("s3mper.notification.sqs.queue", notificationQueue); notificationQueueUrl = sqs.getQueueUrl(new GetQueueUrlRequest(notificationQueue)).getQueueUrl(); //Disable reporting (Testing purposes mostly) reportingDisabled = conf.getBoolean("s3mper.reporting.disabled", reportingDisabled); }
@Test public void testCreateGetUrlListQueue_shouldCreateReturnUrlAndListQueue() { // create first queue CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue")); assertNotNull("verify that, on creation, queue url was returned",createdQueue.getQueueUrl()); // create other queues CreateQueueResult secondTeaQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-mate-queue")); CreateQueueResult anotherQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("coffee-queue")); // get queue url GetQueueUrlResult queueUrlResult = sqs.getQueueUrl(new GetQueueUrlRequest() .withQueueName("tea-earl-grey-queue").withQueueOwnerAWSAccountId("some owner")); assertNotNull("verify that, on fetch, queue url was returned", queueUrlResult.getQueueUrl()); // get all queues ListQueuesResult allQueues = sqs.listQueues(); assertEquals("verify all queues are returned", 3, allQueues.getQueueUrls().size()); assertTrue("verify that all queues contain first queue", allQueues.getQueueUrls().contains(createdQueue.getQueueUrl())); assertTrue("verify that all queues contain second tea queue", allQueues.getQueueUrls().contains(secondTeaQueue.getQueueUrl())); assertTrue("verify that all queues contain coffee queue", allQueues.getQueueUrls().contains(anotherQueue.getQueueUrl())); // get only queues that start with 'tea' ListQueuesResult teaQueues = sqs.listQueues(new ListQueuesRequest("tea")); assertEquals("verify only tea queues are returned", 2, teaQueues.getQueueUrls().size()); assertTrue("verify that tea queues contain first queue", teaQueues.getQueueUrls().contains(createdQueue.getQueueUrl())); assertTrue("verify that tea queues contain second tea queue", teaQueues.getQueueUrls().contains(secondTeaQueue.getQueueUrl())); assertNotNull("verify that delete queue returned ok", sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(queueUrlResult.getQueueUrl()))); assertFalse("verify that the queue was removed", sqs.listQueues().getQueueUrls().stream() .anyMatch( queueUrl -> StringUtils.equals(queueUrl,queueUrlResult.getQueueUrl()) )); // cleanup getQueues().remove("tea-earl-grey-queue"); getQueues().remove("tea-mate-queue"); getQueues().remove("coffee-queue"); }
public Queue get(@NonNull QueueName queueName) { GetQueueUrlRequest urlRequest = new GetQueueUrlRequest().withQueueName(queueName.getId()); String queueUrl = amazonSQS.getQueueUrl(urlRequest).getQueueUrl(); GetQueueAttributesRequest attributesRequest = new GetQueueAttributesRequest(queueUrl, Collections.singletonList("All")); Map<String, String> attributes = amazonSQS.getQueueAttributes(attributesRequest) .getAttributes(); return new Queue(queueName, queueUrl, attributes); }
private String queryQueueUrl(String queueName) { try { return _sqs.getQueueUrl(new GetQueueUrlRequest(queueName)).getQueueUrl(); } catch (QueueDoesNotExistException e) { // Create the queue int visibilityTimeout = queueName.equals(_pendingScanRangeQueue) ? DEFAULT_TASK_CLAIM_VISIBILITY_TIMEOUT : DEFAULT_TASK_COMPLETE_VISIBILITY_TIMEOUT; return _sqs.createQueue( new CreateQueueRequest(queueName) .withAttributes(ImmutableMap.<String, String>of( "VisibilityTimeout", String.valueOf(visibilityTimeout))) ).getQueueUrl(); } }
@Test public void getQueueUrl() { final String queueName = "bizo"; final CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName); sqs.createQueue(createQueueRequest); final GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest().withQueueName(queueName); final GetQueueUrlResult getQueueUrlResult = sqs.getQueueUrl(getQueueUrlRequest); final String queueUrl = getQueueUrlResult.getQueueUrl(); assertThat(queueUrl, containsString(queueName)); }
@Test public void deleteMessageSucceedsWithValidReceiptHandle() { final String queueName = "bizo"; final String messageBody = "hi everybody"; final CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName); sqs.createQueue(createQueueRequest); final GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest().withQueueName(queueName); final GetQueueUrlResult getQueueUrlResult = sqs.getQueueUrl(getQueueUrlRequest); final String queueUrl = getQueueUrlResult.getQueueUrl(); final SendMessageRequest sendMessageRequest = new SendMessageRequest().withQueueUrl(queueUrl).withMessageBody(messageBody); sqs.sendMessage(sendMessageRequest); final int maxNumberOfMessages = 10; final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(maxNumberOfMessages); final ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(receiveMessageRequest); final List<Message> messages = receiveMessageResult.getMessages(); assertThat(messages.size(), equalTo(1)); final String receiptHandle = messages.get(0).getReceiptHandle(); final DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest().withQueueUrl(queueUrl).withReceiptHandle(receiptHandle); try { sqs.deleteMessage(deleteMessageRequest); } catch (ReceiptHandleIsInvalidException e) { fail("ReceiptHandleIsInvalidException was thrown"); } }
@Test(expected = ReceiptHandleIsInvalidException.class) public void deleteMessageFailsWithInvalidReceiptHandle() { final String queueName = "bizo"; final String messageBody = "hi everybody"; final CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName); sqs.createQueue(createQueueRequest); final GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest().withQueueName(queueName); final GetQueueUrlResult getQueueUrlResult = sqs.getQueueUrl(getQueueUrlRequest); final String queueUrl = getQueueUrlResult.getQueueUrl(); final SendMessageRequest sendMessageRequest = new SendMessageRequest().withQueueUrl(queueUrl).withMessageBody(messageBody); sqs.sendMessage(sendMessageRequest); final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest().withQueueUrl(queueUrl); final ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(receiveMessageRequest); final List<Message> messages = receiveMessageResult.getMessages(); assertThat(messages.size(), equalTo(1)); final String receiptHandle = "bizo"; final DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest().withQueueUrl(queueUrl).withReceiptHandle(receiptHandle); sqs.deleteMessage(deleteMessageRequest); }
@Override public SqsQueueResource createSqsQueueResource(final String name) { final String queueUrl = amazonSqsClient.getQueueUrl(new GetQueueUrlRequest(name)).getQueueUrl(); logger.info("Using existing SQS queue: " + name); final SqsQueueResource sqsQueueResource = new SqsQueueResource(name, queueUrl, amazonSqsClient); return sqsQueueResource; }
@Override public Queue getQueueByName(GetQueueUrlRequest request, ResultCapture<GetQueueUrlResult> extractor) { ActionResult result = service.performAction("GetQueueByName", request, extractor); if (result == null) return null; return new QueueImpl(result.getResource()); }
@Override public Queue getQueueByName(String queueName, ResultCapture<GetQueueUrlResult> extractor) { GetQueueUrlRequest request = new GetQueueUrlRequest() .withQueueName(queueName); return getQueueByName(request, extractor); }
@Test public void testWithDefaultTaskExecutorAndOneHandler() throws Exception { int testedMaxNumberOfMessages = 10; Map<QueueMessageHandler.MappingInformation, HandlerMethod> messageHandlerMethods = Collections.singletonMap( new QueueMessageHandler.MappingInformation(Collections.singleton("testQueue"), SqsMessageDeletionPolicy.ALWAYS), null); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); QueueMessageHandler mockedHandler = mock(QueueMessageHandler.class); AmazonSQSAsync mockedSqs = mock(AmazonSQSAsync.class, withSettings().stubOnly()); when(mockedSqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); when(mockedSqs.getQueueUrl(any(GetQueueUrlRequest.class))).thenReturn(new GetQueueUrlResult().withQueueUrl("testQueueUrl")); when(mockedHandler.getHandlerMethods()).thenReturn(messageHandlerMethods); container.setMaxNumberOfMessages(testedMaxNumberOfMessages); container.setAmazonSqs(mockedSqs); container.setMessageHandler(mockedHandler); container.afterPropertiesSet(); int expectedPoolMaxSize = messageHandlerMethods.size() * (testedMaxNumberOfMessages + 1); ThreadPoolTaskExecutor taskExecutor = (ThreadPoolTaskExecutor) container.getTaskExecutor(); assertNotNull(taskExecutor); assertEquals(expectedPoolMaxSize, taskExecutor.getMaxPoolSize()); }
@Test public void receiveMessageRequests_withOneElement_created() throws Exception { AbstractMessageListenerContainer container = new StubAbstractMessageListenerContainer(); AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly()); QueueMessageHandler messageHandler = new QueueMessageHandler(); container.setAmazonSqs(mock); container.setMessageHandler(mock(QueueMessageHandler.class)); container.setMessageHandler(messageHandler); StaticApplicationContext applicationContext = new StaticApplicationContext(); applicationContext.registerSingleton("messageListener", MessageListener.class); container.setMaxNumberOfMessages(11); container.setVisibilityTimeout(22); container.setWaitTimeOut(33); messageHandler.setApplicationContext(applicationContext); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://testQueue.amazonaws.com")); when(mock.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); messageHandler.afterPropertiesSet(); container.afterPropertiesSet(); container.start(); Map<String, QueueAttributes> registeredQueues = container.getRegisteredQueues(); assertEquals("http://testQueue.amazonaws.com", registeredQueues.get("testQueue").getReceiveMessageRequest().getQueueUrl()); assertEquals(11L, registeredQueues.get("testQueue").getReceiveMessageRequest().getMaxNumberOfMessages().longValue()); assertEquals(22L, registeredQueues.get("testQueue").getReceiveMessageRequest().getVisibilityTimeout().longValue()); assertEquals(33L, registeredQueues.get("testQueue").getReceiveMessageRequest().getWaitTimeSeconds().longValue()); }
@Test public void receiveMessageRequests_withMultipleElements_created() throws Exception { AbstractMessageListenerContainer container = new StubAbstractMessageListenerContainer(); AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(mock); StaticApplicationContext applicationContext = new StaticApplicationContext(); QueueMessageHandler messageHandler = new QueueMessageHandler(); messageHandler.setApplicationContext(applicationContext); container.setMessageHandler(messageHandler); applicationContext.registerSingleton("messageListener", MessageListener.class); applicationContext.registerSingleton("anotherMessageListener", AnotherMessageListener.class); container.setMaxNumberOfMessages(11); container.setVisibilityTimeout(22); container.setWaitTimeOut(33); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://testQueue.amazonaws.com")); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("anotherTestQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://anotherTestQueue.amazonaws.com")); when(mock.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); messageHandler.afterPropertiesSet(); container.afterPropertiesSet(); container.start(); Map<String, QueueAttributes> registeredQueues = container.getRegisteredQueues(); assertEquals("http://testQueue.amazonaws.com", registeredQueues.get("testQueue").getReceiveMessageRequest().getQueueUrl()); assertEquals(11L, registeredQueues.get("testQueue").getReceiveMessageRequest().getMaxNumberOfMessages().longValue()); assertEquals(22L, registeredQueues.get("testQueue").getReceiveMessageRequest().getVisibilityTimeout().longValue()); assertEquals(33L, registeredQueues.get("testQueue").getReceiveMessageRequest().getWaitTimeSeconds().longValue()); assertEquals("http://anotherTestQueue.amazonaws.com", registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getQueueUrl()); assertEquals(11L, registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getMaxNumberOfMessages().longValue()); assertEquals(22L, registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getVisibilityTimeout().longValue()); assertEquals(33L, registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getWaitTimeSeconds().longValue()); }
@Test public void testStartCallsDoStartMethod() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(1); AbstractMessageListenerContainer container = new AbstractMessageListenerContainer() { @Override protected void doStart() { countDownLatch.countDown(); } @Override protected void doStop() { throw new UnsupportedOperationException("not supported yet"); } }; AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(mock); container.setMessageHandler(mock(QueueMessageHandler.class)); container.afterPropertiesSet(); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://testQueue.amazonaws.com")); container.start(); try { assertTrue(countDownLatch.await(10, TimeUnit.MILLISECONDS)); } catch (InterruptedException e) { fail("Expected doStart() method to be called"); } }
@Test public void testStopCallsDoStopMethod() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(1); AbstractMessageListenerContainer container = new AbstractMessageListenerContainer() { @Override protected void doStart() { // do nothing in this case } @Override protected void doStop() { countDownLatch.countDown(); } }; AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(mock); container.setMessageHandler(mock(QueueMessageHandler.class)); container.afterPropertiesSet(); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://testQueue.amazonaws.com")); container.start(); container.stop(); try { assertTrue(countDownLatch.await(10, TimeUnit.MILLISECONDS)); } catch (InterruptedException e) { fail("Expected doStart() method to be called"); } }
@Test public void testStopCallsDoStopMethodWithRunnable() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(1); AbstractMessageListenerContainer container = new AbstractMessageListenerContainer() { @Override protected void doStart() { // do nothing in this case } @Override protected void doStop() { countDownLatch.countDown(); } }; AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(mock); container.setMessageHandler(mock(QueueMessageHandler.class)); container.afterPropertiesSet(); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://testQueue.amazonaws.com")); container.start(); container.stop(() -> { try { assertTrue(countDownLatch.await(10, TimeUnit.MILLISECONDS)); } catch (InterruptedException e) { fail("Expected doStart() method to be called"); } }); }
@Test public void receiveMessageRequests_withDestinationResolverThrowingException_shouldLogWarningAndNotCreateRequest() throws Exception { // Arrange AbstractMessageListenerContainer container = new StubAbstractMessageListenerContainer(); Logger loggerMock = container.getLogger(); AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(mock); StaticApplicationContext applicationContext = new StaticApplicationContext(); QueueMessageHandler messageHandler = new QueueMessageHandler(); messageHandler.setApplicationContext(applicationContext); container.setMessageHandler(messageHandler); applicationContext.registerSingleton("messageListener", MessageListener.class); applicationContext.registerSingleton("anotherMessageListener", AnotherMessageListener.class); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))). thenThrow(new DestinationResolutionException("Queue not found")); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("anotherTestQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://anotherTestQueue.amazonaws.com")); when(mock.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); messageHandler.afterPropertiesSet(); container.afterPropertiesSet(); // Act container.start(); // Assert ArgumentCaptor<String> logMsgArgCaptor = ArgumentCaptor.forClass(String.class); verify(loggerMock).warn(logMsgArgCaptor.capture()); Map<String, QueueAttributes> registeredQueues = container.getRegisteredQueues(); assertFalse(registeredQueues.containsKey("testQueue")); assertEquals("Ignoring queue with name 'testQueue' as it does not exist.", logMsgArgCaptor.getValue()); assertEquals("http://anotherTestQueue.amazonaws.com", registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getQueueUrl()); }
@Test public void testNoAutoCreate() throws Exception { AmazonSQS amazonSqs = mock(AmazonSQS.class); String queueUrl = "http://foo/bar"; when(amazonSqs.getQueueUrl(new GetQueueUrlRequest("foo"))).thenReturn(new GetQueueUrlResult().withQueueUrl(queueUrl)); DynamicQueueUrlDestinationResolver dynamicQueueDestinationResolver = new DynamicQueueUrlDestinationResolver(amazonSqs); assertEquals(queueUrl, dynamicQueueDestinationResolver.resolveDestination("foo")); }
@Test public void testInvalidDestinationName() throws Exception { AmazonSQS amazonSqs = mock(AmazonSQS.class); AmazonServiceException exception = new QueueDoesNotExistException("AWS.SimpleQueueService.NonExistentQueue"); exception.setErrorCode("AWS.SimpleQueueService.NonExistentQueue"); String queueUrl = "invalidName"; when(amazonSqs.getQueueUrl(new GetQueueUrlRequest(queueUrl))).thenThrow(exception); DynamicQueueUrlDestinationResolver dynamicQueueDestinationResolver = new DynamicQueueUrlDestinationResolver(amazonSqs); try { dynamicQueueDestinationResolver.resolveDestination(queueUrl); } catch (DestinationResolutionException e) { assertTrue(e.getMessage().startsWith("AWS.SimpleQueueService.NonExistentQueue")); } }
@Test public void resolveDestination_withResourceIdResolver_nonUrlId_shouldGetUrlByResolvedName() throws Exception { String queueUrl = "http://queue.com"; String resolvedQueueName = "some-queue-name"; AmazonSQS amazonSqs = mock(AmazonSQS.class); when(amazonSqs.getQueueUrl(new GetQueueUrlRequest(resolvedQueueName))).thenReturn(new GetQueueUrlResult().withQueueUrl(queueUrl)); ResourceIdResolver resourceIdResolver = mock(ResourceIdResolver.class); when(resourceIdResolver.resolveToPhysicalResourceId(anyString())).thenReturn(resolvedQueueName); DynamicQueueUrlDestinationResolver dynamicQueueUrlDestinationResolver = new DynamicQueueUrlDestinationResolver(amazonSqs, resourceIdResolver); String physicalResourceId = dynamicQueueUrlDestinationResolver.resolveDestination("testQueue"); assertEquals("http://queue.com", physicalResourceId); }
private AmazonSQSAsync createAmazonSqs() { AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); GetQueueUrlResult queueUrl = new GetQueueUrlResult(); queueUrl.setQueueUrl("http://queue-url.com"); when(amazonSqs.getQueueUrl(any(GetQueueUrlRequest.class))).thenReturn(queueUrl); ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(); com.amazonaws.services.sqs.model.Message message = new com.amazonaws.services.sqs.model.Message(); message.setBody("My message"); receiveMessageResult.withMessages(message); when(amazonSqs.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult); return amazonSqs; }
@Test public void testGetQueueUrlQueueName() throws JMSException { GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest(QUEUE_NAME); wrapper.getQueueUrl(QUEUE_NAME); verify(amazonSQSClient).getQueueUrl(eq(getQueueUrlRequest)); }
@Test public void testGetQueueUrlQueueNameWithAccountId() throws JMSException { GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest(QUEUE_NAME); getQueueUrlRequest.setQueueOwnerAWSAccountId(OWNER_ACCOUNT_ID); wrapper.getQueueUrl(QUEUE_NAME, OWNER_ACCOUNT_ID); verify(amazonSQSClient).getQueueUrl(eq(getQueueUrlRequest)); }
@Test(expected = JMSException.class) public void testGetQueueUrlQueueNameThrowAmazonClientException() throws JMSException { GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest(QUEUE_NAME); doThrow(new AmazonClientException("ace")) .when(amazonSQSClient).getQueueUrl(eq(getQueueUrlRequest)); wrapper.getQueueUrl(QUEUE_NAME); }
@Test(expected = JMSException.class) public void testGetQueueUrlQueueNameThrowAmazonServiceException() throws JMSException { GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest(QUEUE_NAME); doThrow(new AmazonServiceException("ase")) .when(amazonSQSClient).getQueueUrl(eq(getQueueUrlRequest)); wrapper.getQueueUrl(QUEUE_NAME); }
@Test(expected = InvalidDestinationException.class) public void testGetQueueUrlQueueNameThrowQueueDoesNotExistException() throws JMSException { GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest(QUEUE_NAME); doThrow(new QueueDoesNotExistException("qdnee")) .when(amazonSQSClient).getQueueUrl(eq(getQueueUrlRequest)); wrapper.getQueueUrl(QUEUE_NAME); }
@Test(expected = InvalidDestinationException.class) public void testGetQueueUrlQueueNameWithAccountIdThrowQueueDoesNotExistException() throws JMSException { GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest(QUEUE_NAME); getQueueUrlRequest.setQueueOwnerAWSAccountId(OWNER_ACCOUNT_ID); doThrow(new QueueDoesNotExistException("qdnee")) .when(amazonSQSClient).getQueueUrl(eq(getQueueUrlRequest)); wrapper.getQueueUrl(QUEUE_NAME,OWNER_ACCOUNT_ID); }
@Test public void testGetQueueUrl() throws JMSException { GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest(QUEUE_NAME); wrapper.getQueueUrl(getQueueUrlRequest); verify(amazonSQSClient).getQueueUrl(eq(getQueueUrlRequest)); }
@Test(expected = JMSException.class) public void testGetQueueUrlThrowAmazonClientException() throws JMSException { GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest(QUEUE_NAME); doThrow(new AmazonClientException("ace")) .when(amazonSQSClient).getQueueUrl(eq(getQueueUrlRequest)); wrapper.getQueueUrl(getQueueUrlRequest); }
@Test(expected = JMSException.class) public void testGetQueueUrlThrowAmazonServiceException() throws JMSException { GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest(QUEUE_NAME); doThrow(new AmazonServiceException("ase")) .when(amazonSQSClient).getQueueUrl(eq(getQueueUrlRequest)); wrapper.getQueueUrl(QUEUE_NAME); }
@Test public void testQueueExistsThrowQueueDoesNotExistException() throws JMSException { GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest(QUEUE_NAME); doThrow(new QueueDoesNotExistException("qdnee")) .when(amazonSQSClient).getQueueUrl(eq(getQueueUrlRequest)); assertFalse(wrapper.queueExists(QUEUE_NAME)); }
@Test(expected = JMSException.class) public void testQueueExistsThrowAmazonClientException() throws JMSException { GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest(QUEUE_NAME); doThrow(new AmazonClientException("ace")) .when(amazonSQSClient).getQueueUrl(eq(getQueueUrlRequest)); wrapper.queueExists(QUEUE_NAME); }
@Test(expected = JMSException.class) public void testQueueExistsThrowAmazonServiceException() throws JMSException { GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest(QUEUE_NAME); doThrow(new AmazonServiceException("ase")) .when(amazonSQSClient).getQueueUrl(eq(getQueueUrlRequest)); wrapper.queueExists(QUEUE_NAME); }