/** * Retrieves queue url for the given queue name. If the queue does not exist, tries to create it. * * @param queueName the queue name to get url for * @return an optional String representing the queue url */ Optional<String> getUrlForQueue(String queueName) { Optional<String> queueUrl = Optional.empty(); try { GetQueueUrlResult queueUrlResult = sqs.getQueueUrl(queueName); if (queueUrlResult.getQueueUrl() != null) { queueUrl = Optional.of(queueUrlResult.getQueueUrl()); } } catch (QueueDoesNotExistException e) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Queue " + queueName + " does not exist, try to create it",e); } CreateQueueRequest createQueueRequest = new CreateQueueRequest(queueName); try { queueUrl = Optional.of(sqs.createQueue(createQueueRequest).getQueueUrl()); } catch (AmazonClientException e2) { LOGGER.error("Could not create queue " + queueName + ", bundle won't work",e2); } } return queueUrl; }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsOneMessage() { final AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); final String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .take(1) // .test() // .awaitDone(10, TimeUnit.SECONDS) // .assertComplete() // .assertValue("body1"); final InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(1)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsOneMessageAndHonoursBackpressure() { final AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); final String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .test(0) // .requestMore(1) // .assertValue("body1")// .assertNotComplete() // .cancel(); final InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(1)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsNoMessagesThenSecondCallReturnsTwoMessages() { final AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); final String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())).thenReturn(new ReceiveMessageResult()) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"), new Message().withBody("body2"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .take(2) // .test() // .awaitDone(10, TimeUnit.SECONDS) // .assertComplete() // .assertValues("body1", "body2"); final InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(2)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Test public void testGet() { // given QueueName qn = new QueueName("q1"); GetQueueUrlResult queueUrlResult = mock(GetQueueUrlResult.class); when(queueUrlResult.getQueueUrl()).thenReturn("url1"); GetQueueAttributesResult attributesResult = mock(GetQueueAttributesResult.class); HashMap<String, String> attributes = new HashMap<>(); attributes.put("1", "3"); attributes.put("hi", "ho"); when(attributesResult.getAttributes()).thenReturn(attributes); when(amazonSQS.getQueueUrl(any(GetQueueUrlRequest.class))).thenReturn(queueUrlResult); when(amazonSQS.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn( attributesResult); // when Queue queue = uut.get(qn); // then assertEquals("url1", queue.getUrl()); assertEquals("q1", queue.getName().getId()); assertEquals(attributes, queue.getQueueAttributes()); }
@Override public void run() { rxSqsClient.getQueueUrlAsync(queueName) .last() .map(GetQueueUrlResult::getQueueUrl) .subscribe(url -> { rxSqsClient.receiveMessageAsync(url) .subscribeOn(Schedulers.io()) .subscribe(message -> { System.out.println(name + ": " + message.getBody() + "[" + message.getMessageId() + "]"); rxSqsClient.deleteMessageAsync(url, message.getReceiptHandle()) .toBlocking() .subscribe(result -> { System.out.println("Acknowledged Message " + message.getMessageId()); }); }, Throwable::printStackTrace); }); }
@Test public void testShouldGetFailedResponseAfterSendingTheEvent() { final GetQueueUrlResult getQueueUrlResult = new GetQueueUrlResult(); getQueueUrlResult.setQueueUrl(randomAlphabetic(10)); final SendMessageResult sendMessageResult = new SendMessageResult(); final SdkHttpMetadata responseMetadata = mock(SdkHttpMetadata.class); when(responseMetadata.getHttpStatusCode()).thenReturn(400); sendMessageResult.setSdkHttpMetadata(responseMetadata); when(amazonSQS.sendMessage(any(SendMessageRequest.class))).thenThrow(new RuntimeException("expected")); assertThatThrownBy(() -> sqsErrorHandler.onError(randomAlphabetic(10), new RuntimeException(), EventTypePartition.of(EventType.of(randomAlphabetic(10)), randomAlphabetic(1)), randomNumeric(10), randomAlphabetic(50))) .isInstanceOf(RuntimeException.class).hasMessageContaining("expected"); }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsOneMessage() { AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .take(1) // .to(test()) // .awaitTerminalEvent() // .assertCompleted() // .assertValue("body1"); InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(1)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsOneMessageAndHonoursBackpressure() { AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .to(testWithRequest(0)) // .requestMore(1) // .assertValue("body1")// .assertNotCompleted() // .unsubscribe(); InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(1)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Test(timeout = 5000) public void testFirstCallToReceiveMessagesReturnsNoMessagesThenSecondCallReturnsTwoMessages() { AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())).thenReturn(new ReceiveMessageResult()) .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"), new Message().withBody("body2"))); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .take(2) // .to(test()) // .awaitTerminalEvent() // .assertCompleted() // .assertValues("body1", "body2"); InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); inorder.verify(sqs, Mockito.times(2)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Test public void shouldGetCorrectQueueUrl() throws Exception { //GIVEN AmazonSQS sqs = mock(AmazonSQS.class); field("sqs").ofType(AmazonSQS.class).in(bundle).set(sqs); String queueUrl = "https://eu-central-1/queue.amazonaws.com/123456/test-queue"; when(sqs.getQueueUrl("test-queue")).thenReturn(new GetQueueUrlResult() .withQueueUrl(queueUrl)); //WHEN Optional<String> urlForQueue = bundle.getUrlForQueue("test-queue"); //THEN assertThat(urlForQueue.isPresent()).isTrue(); assertThat(urlForQueue.get()).isEqualTo(queueUrl); }
@Test public void shouldCorrectlyCreateSenderIfQueueExists() throws Exception, CannotCreateSenderException { //GIVEN AmazonSQS sqs = mock(AmazonSQS.class); field("sqs").ofType(AmazonSQS.class).in(bundle).set(sqs); String queueUrl = "https://eu-central-1/queue.amazonaws.com/123456/test-queue"; when(sqs.getQueueUrl("test-queue")).thenReturn(new GetQueueUrlResult() .withQueueUrl(queueUrl)); //WHEN SqsSender sender = bundle.createSender("test-queue"); //THEN assertThat(sender).isNotNull(); }
private void initQueue() { this.sqs = new AmazonSQSClient(); // Do we need to use new // ClientConfiguration().withMaxConnections(256) // ? this.sqs.configureRegion(region); try { // Check to see if queue exists GetQueueUrlResult queueUrlResult = this.sqs.getQueueUrl(getSqsQueueName()); this.queueUrl = queueUrlResult.getQueueUrl(); } catch (QueueDoesNotExistException queueDoesNotExist) { // Queue does not exist, need to create one CreateQueueRequest createQueueRequest = new CreateQueueRequest(); createQueueRequest.setQueueName(getSqsQueueName()); createQueueRequest.addAttributesEntry("VisibilityTimeout", "" + getVisibilityTimeout()); CreateQueueResult createQueueResult = this.sqs.createQueue(createQueueRequest); this.queueUrl = createQueueResult.getQueueUrl(); } }
@Override public Properties send(Properties properties, Object message) throws ConnectorException { String access_key_id = properties.getProperty("AccessKeyId"); String secret_access_key = properties.getProperty("SecretAccessKey"); BasicAWSCredentials credentials = new BasicAWSCredentials(access_key_id, secret_access_key); AmazonSQS sqs = new AmazonSQSClient(credentials); //System.out.println(properties.getProperty("region")); // Region selection Region region = Region.getRegion(Regions.fromName(properties.getProperty("region"))); sqs.setRegion(region); GetQueueUrlResult queueUrl = sqs.getQueueUrl(properties.getProperty("Queue")); String messageStr = new String((byte[])message); sqs.sendMessage(new SendMessageRequest(queueUrl.getQueueUrl(), messageStr)); return properties; }
@Test public void sendAndReceiveMessage() { final String queueName = "bizo"; final String messageBody = "hi everybody"; final CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName); sqs.createQueue(createQueueRequest); final GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest().withQueueName(queueName); final GetQueueUrlResult getQueueUrlResult = sqs.getQueueUrl(getQueueUrlRequest); final String queueUrl = getQueueUrlResult.getQueueUrl(); final SendMessageRequest sendMessageRequest = new SendMessageRequest().withQueueUrl(queueUrl).withMessageBody(messageBody); sqs.sendMessage(sendMessageRequest); final int maxNumberOfMessages = 10; final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(maxNumberOfMessages); final ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(receiveMessageRequest); final List<Message> messages = receiveMessageResult.getMessages(); assertThat(messages.size(), equalTo(1)); assertThat(messages.get(0).getBody(), equalTo(messageBody)); }
@Test public void shouldCreateSqsQueueResource_withName() throws Exception { // Given final String name = randomString(); final String queueUrl = randomString(); final GetQueueUrlResult getQueueUrlResult = new GetQueueUrlResult().withQueueUrl(queueUrl); final GetQueueUrlRequest expectedGetQueueUrlRequest = new GetQueueUrlRequest(name); when(mockAmazonSqsClient.getQueueUrl(expectedGetQueueUrlRequest)).thenReturn(getQueueUrlResult); final SqsQueueResource mockSqsQueueResource = mock(SqsQueueResource.class); whenNew(SqsQueueResource.class).withArguments(name, queueUrl, mockAmazonSqsClient) .thenReturn(mockSqsQueueResource); // When final SqsQueueResource result = factory.createSqsQueueResource(name); // Then assertSame(mockSqsQueueResource, result); }
@SuppressWarnings("unchecked") @Override public void run() { //Get queue url GetQueueUrlResult urlResult = sqs.getQueueUrl(responseQName); String QueueUrl = urlResult.getQueueUrl(); JSONObject result = new JSONObject(); try { Thread.sleep(sleepLength); result.put("task_id", task_id); result.put("result", "0"); sqs.sendMessage(new SendMessageRequest(QueueUrl, result.toString())); //System.out.println(Thread.currentThread().getName()+" sleep done!"); } catch (Exception e) { result.put("task_id", task_id); result.put("result", "1"); sqs.sendMessage(new SendMessageRequest(QueueUrl, result.toString())); } }
/** * Get a queue url from a queue name * @param queueName * @return queueUrl - For the specified queue name */ private synchronized String getAndSetQueueUrl(final String queueName) throws QueueDoesNotExistException{ try{ final String url = queueUrlMap.get(queueName); if(url != null){ return url; }else{ final GetQueueUrlResult result = this.sqs.getQueueUrl(queueName); if(result != null && !Strings.isNullOrEmpty(result.getQueueUrl())){ queueUrlMap.put(queueName, result.getQueueUrl()); return result.getQueueUrl(); } } }catch(QueueDoesNotExistException qne){ throw qne; }catch(Exception ex){ throw new RuntimeException(ex.getMessage(), ex); } return null; }
/*** * Note, by the time this is called the queue creation process should have already been executed * @param queue */ private void ensureQueueIsReady(SqsQueueConfig config) { final long timeout = (DateTime.now().getMillis() + config.getQueueCreationTimeoutMS()); while(!shutdown && DateTime.now().getMillis() < timeout){ try{ final GetQueueUrlResult queueUrl = this.sqs.getQueueUrl(config.getName()); if(queueUrl!=null && !Strings.isNullOrEmpty(queueUrl.getQueueUrl())){ return; //Queue is ready } }catch(AmazonServiceException asException){ /** not retryable **/ throw new RuntimeException(asException.getMessage(), asException); }catch(Exception e){ // Continue waiting LOGGER.info("Waiting for queue to become ready, this exception is viewed as one that allows us to continue waiting"); } } /** If the Admin Client is not being shutdown, then we timed out **/ if(!this.shutdown){ throw new RuntimeException(String.format("Queue %s did was not ready after creation within %d Milliseconds", config.getName(), config.getQueueCreationTimeoutMS())); } }
@Override public String resolveDestination(String name) throws DestinationResolutionException { String queueName = name; if (this.resourceIdResolver != null) { queueName = this.resourceIdResolver.resolveToPhysicalResourceId(name); } if (isValidQueueUrl(queueName)) { return queueName; } if (this.autoCreate) { //Auto-create is fine to be called even if the queue exists. CreateQueueResult createQueueResult = this.amazonSqs.createQueue(new CreateQueueRequest(queueName)); return createQueueResult.getQueueUrl(); } else { try { GetQueueUrlResult getQueueUrlResult = this.amazonSqs.getQueueUrl(new GetQueueUrlRequest(queueName)); return getQueueUrlResult.getQueueUrl(); } catch (QueueDoesNotExistException e) { throw new DestinationResolutionException(e.getMessage(), e); } } }
@Test public void testIsActive() throws Exception { AbstractMessageListenerContainer container = new StubAbstractMessageListenerContainer(); AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(mock); container.setMessageHandler(mock(QueueMessageHandler.class)); container.afterPropertiesSet(); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://testQueue.amazonaws.com")); container.start(); assertTrue(container.isRunning()); container.stop(); assertFalse(container.isRunning()); //Container can still be active an restarted later (e.g. paused for a while) assertTrue(container.isActive()); }
/** * Test create queue when session is already closed */ @Test public void testCreateQueue() throws JMSException { GetQueueUrlResult result = new GetQueueUrlResult().withQueueUrl(QUEUE_URL); when(sqsClientJMSWrapper.getQueueUrl(QUEUE_NAME)) .thenReturn(result); /* * Create queue */ Queue queue = sqsSession.createQueue(QUEUE_NAME); /* * Verify results */ assert(queue instanceof SQSQueueDestination); assertEquals(QUEUE_NAME, queue.getQueueName()); assertEquals(QUEUE_URL, ((SQSQueueDestination) queue).getQueueUrl()); }
/** * Test create queue when session is already closed */ @Test public void testCreateQueueWithOwnerAccountId() throws JMSException { GetQueueUrlResult result = new GetQueueUrlResult().withQueueUrl(QUEUE_URL); when(sqsClientJMSWrapper.getQueueUrl(QUEUE_NAME, OWNER_ACCOUNT_ID)) .thenReturn(result); /* * Create queue */ Queue queue = sqsSession.createQueue(QUEUE_NAME, OWNER_ACCOUNT_ID); /* * Verify results */ assert(queue instanceof SQSQueueDestination); assertEquals(QUEUE_NAME, queue.getQueueName()); assertEquals(QUEUE_URL, ((SQSQueueDestination) queue).getQueueUrl()); }
/** * @param queueName Name of the SQS queue. This queue must already exist, it will not be created. * @param region Region this queue exists in * @param clientConfig Configuration values for the queue client * @return an SqsQueue */ public Single<SqsQueue<String>> getQueueFromName(String queueName, Regions region, SqsQueueClientConfig clientConfig) { GetQueueUrlAction action = new GetQueueUrlAction(queueName, region); return requestSender.sendRequest(action) .map(GetQueueUrlResult::getQueueUrl) .map(url -> getQueueFromUrl(url, clientConfig)); }
public SqsClientTest() { QUEUE_ALREADY_EXISTS_EXCEPTION.setErrorCode(QUEUE_ALREADY_EXISTS); when(requestSenderMock.sendRequest(any(GetQueueUrlAction.class))) .thenReturn(Single.just(new GetQueueUrlResult().withQueueUrl(QUEUE_URL))); when(requestSenderMock.sendRequest(any(SetQueueAttributesAction.class))) .thenReturn(Single.just(new SetQueueAttributesResult())); when(requestSenderMock.sendRequest(any(GetQueueUrlAction.class))) .thenReturn(Single.just(new GetQueueUrlResult().withQueueUrl(QUEUE_URL))); }
@Test public void testCreateGetUrlListQueue_shouldCreateReturnUrlAndListQueue() { // create first queue CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue")); assertNotNull("verify that, on creation, queue url was returned",createdQueue.getQueueUrl()); // create other queues CreateQueueResult secondTeaQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-mate-queue")); CreateQueueResult anotherQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("coffee-queue")); // get queue url GetQueueUrlResult queueUrlResult = sqs.getQueueUrl(new GetQueueUrlRequest() .withQueueName("tea-earl-grey-queue").withQueueOwnerAWSAccountId("some owner")); assertNotNull("verify that, on fetch, queue url was returned", queueUrlResult.getQueueUrl()); // get all queues ListQueuesResult allQueues = sqs.listQueues(); assertEquals("verify all queues are returned", 3, allQueues.getQueueUrls().size()); assertTrue("verify that all queues contain first queue", allQueues.getQueueUrls().contains(createdQueue.getQueueUrl())); assertTrue("verify that all queues contain second tea queue", allQueues.getQueueUrls().contains(secondTeaQueue.getQueueUrl())); assertTrue("verify that all queues contain coffee queue", allQueues.getQueueUrls().contains(anotherQueue.getQueueUrl())); // get only queues that start with 'tea' ListQueuesResult teaQueues = sqs.listQueues(new ListQueuesRequest("tea")); assertEquals("verify only tea queues are returned", 2, teaQueues.getQueueUrls().size()); assertTrue("verify that tea queues contain first queue", teaQueues.getQueueUrls().contains(createdQueue.getQueueUrl())); assertTrue("verify that tea queues contain second tea queue", teaQueues.getQueueUrls().contains(secondTeaQueue.getQueueUrl())); assertNotNull("verify that delete queue returned ok", sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(queueUrlResult.getQueueUrl()))); assertFalse("verify that the queue was removed", sqs.listQueues().getQueueUrls().stream() .anyMatch( queueUrl -> StringUtils.equals(queueUrl,queueUrlResult.getQueueUrl()) )); // cleanup getQueues().remove("tea-earl-grey-queue"); getQueues().remove("tea-mate-queue"); getQueues().remove("coffee-queue"); }
@Test(timeout = 5000) public void testPollingReturnsAllAvailableMessagesAtEachScheduledCall() { TestScheduler sched = new TestScheduler(); AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); String queueName = "queue"; Mockito.when(sqs.getQueueUrl(queueName)).thenAnswer(x -> new GetQueueUrlResult().withQueueUrl(queueName)); Mockito.when(sqs.receiveMessage(Mockito.<ReceiveMessageRequest>any())) // .thenReturn(new ReceiveMessageResult()) // .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body1"))) // .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body2"))) // .thenReturn(new ReceiveMessageResult()) // .thenReturn(new ReceiveMessageResult().withMessages(new Message().withBody("body3"))) // .thenReturn(new ReceiveMessageResult()) // .thenReturn(new ReceiveMessageResult()); Sqs.queueName(queueName) // .sqsFactory(() -> sqs) // .interval(1, TimeUnit.MINUTES, sched) // .messages() // .map(m -> m.message()) // .doOnError(Throwable::printStackTrace) // .to(test()) // .assertNoValues() // .assertNoTerminalEvent() // .perform(() -> sched.advanceTimeBy(1, TimeUnit.MINUTES)) // .assertValuesAndClear("body1", "body2") // .assertNoTerminalEvent() // .perform(() -> sched.advanceTimeBy(1, TimeUnit.MINUTES)) // .assertValuesAndClear("body3") // .assertNoTerminalEvent() // .perform(() -> sched.advanceTimeBy(1, TimeUnit.MINUTES)) // .assertNoValues() // .assertNoTerminalEvent() // .unsubscribe(); InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.atLeastOnce()).getQueueUrl(queueName); // TODO why times(1), should be times(6)? inorder.verify(sqs, Mockito.times(1)).receiveMessage(Mockito.<ReceiveMessageRequest>any()); inorder.verify(sqs, Mockito.times(1)).shutdown(); inorder.verifyNoMoreInteractions(); }
@Test public void shouldCorrectlyRegisterReceiver() throws Exception { //GIVEN AmazonSQS sqs = mock(AmazonSQS.class); String queueUrl = "https://eu-central-1/queue.amazonaws.com/123456/test-queue"; when(sqs.getQueueUrl("test-queue")).thenReturn(new GetQueueUrlResult() .withQueueUrl(queueUrl)); LifecycleEnvironment lifecycle = mock(LifecycleEnvironment.class); doNothing().when(lifecycle).manage((Managed) anyObject()); when(environment.lifecycle()).thenReturn(lifecycle); HealthCheckRegistry healthChecks = mock(HealthCheckRegistry.class); doNothing().when(healthChecks).register(anyObject(), anyObject()); when(environment.healthChecks()).thenReturn(healthChecks); SqsBundle spiedBundle = spy(bundle); doReturn(sqs).when(spiedBundle).getAmazonSQS(); spiedBundle.run(configurationHolder, environment); //WHEN spiedBundle.registerReceiver("test-queue", (m) -> process(m)); //THEN verify(spiedBundle, times(1)).internalRegisterReceiver(eq("test-queue"), any(SqsReceiverHandler.class)); }
@Override public void poll(long waitInterval ) { Properties properties = new Properties(); String access_key_id = getProperty("AccessKeyId"); String secret_access_key = getProperty("SecretAccessKey"); BasicAWSCredentials credentials = new BasicAWSCredentials(access_key_id, secret_access_key); AmazonSQS sqs = new AmazonSQSClient(credentials); // Region selection Region region = Region.getRegion(Regions.fromName(getProperty("region"))); sqs.setRegion(region); GetQueueUrlResult queueUrl = sqs.getQueueUrl(getProperty("Queue")); ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl.getQueueUrl()); List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages(); String outputMessage = ""; // if there are messages then do the processing if(messages.size() > 0){ //append the message properties to the localenv tree for (Message message : messages) { properties.setProperty("MessageId", message.getMessageId()); properties.setProperty("ReceiptHandle", message.getReceiptHandle()); properties.setProperty("MD5OfBody", message.getMD5OfBody()); // get the message body to a string outputMessage = message.getBody(); } properties.setProperty("queueUrl", queueUrl.getQueueUrl()); // delete the message from the queue String messageReceiptHandle = messages.get(0).getReceiptHandle(); sqs.deleteMessage(new DeleteMessageRequest(queueUrl.getQueueUrl(), messageReceiptHandle)); ConnectorCallback callback = getCallback(); callback.processInboundData(outputMessage.getBytes(), properties); } }
@Test public void getQueueUrl() { final String queueName = "bizo"; final CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName); sqs.createQueue(createQueueRequest); final GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest().withQueueName(queueName); final GetQueueUrlResult getQueueUrlResult = sqs.getQueueUrl(getQueueUrlRequest); final String queueUrl = getQueueUrlResult.getQueueUrl(); assertThat(queueUrl, containsString(queueName)); }
@Test public void deleteMessageSucceedsWithValidReceiptHandle() { final String queueName = "bizo"; final String messageBody = "hi everybody"; final CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName); sqs.createQueue(createQueueRequest); final GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest().withQueueName(queueName); final GetQueueUrlResult getQueueUrlResult = sqs.getQueueUrl(getQueueUrlRequest); final String queueUrl = getQueueUrlResult.getQueueUrl(); final SendMessageRequest sendMessageRequest = new SendMessageRequest().withQueueUrl(queueUrl).withMessageBody(messageBody); sqs.sendMessage(sendMessageRequest); final int maxNumberOfMessages = 10; final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(maxNumberOfMessages); final ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(receiveMessageRequest); final List<Message> messages = receiveMessageResult.getMessages(); assertThat(messages.size(), equalTo(1)); final String receiptHandle = messages.get(0).getReceiptHandle(); final DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest().withQueueUrl(queueUrl).withReceiptHandle(receiptHandle); try { sqs.deleteMessage(deleteMessageRequest); } catch (ReceiptHandleIsInvalidException e) { fail("ReceiptHandleIsInvalidException was thrown"); } }
@Test(expected = ReceiptHandleIsInvalidException.class) public void deleteMessageFailsWithInvalidReceiptHandle() { final String queueName = "bizo"; final String messageBody = "hi everybody"; final CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName); sqs.createQueue(createQueueRequest); final GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest().withQueueName(queueName); final GetQueueUrlResult getQueueUrlResult = sqs.getQueueUrl(getQueueUrlRequest); final String queueUrl = getQueueUrlResult.getQueueUrl(); final SendMessageRequest sendMessageRequest = new SendMessageRequest().withQueueUrl(queueUrl).withMessageBody(messageBody); sqs.sendMessage(sendMessageRequest); final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest().withQueueUrl(queueUrl); final ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(receiveMessageRequest); final List<Message> messages = receiveMessageResult.getMessages(); assertThat(messages.size(), equalTo(1)); final String receiptHandle = "bizo"; final DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest().withQueueUrl(queueUrl).withReceiptHandle(receiptHandle); sqs.deleteMessage(deleteMessageRequest); }
@Override public Queue getQueueByName(GetQueueUrlRequest request, ResultCapture<GetQueueUrlResult> extractor) { ActionResult result = service.performAction("GetQueueByName", request, extractor); if (result == null) return null; return new QueueImpl(result.getResource()); }
@Override public Queue getQueueByName(String queueName, ResultCapture<GetQueueUrlResult> extractor) { GetQueueUrlRequest request = new GetQueueUrlRequest() .withQueueName(queueName); return getQueueByName(request, extractor); }
@Test public void testWithDefaultTaskExecutorAndOneHandler() throws Exception { int testedMaxNumberOfMessages = 10; Map<QueueMessageHandler.MappingInformation, HandlerMethod> messageHandlerMethods = Collections.singletonMap( new QueueMessageHandler.MappingInformation(Collections.singleton("testQueue"), SqsMessageDeletionPolicy.ALWAYS), null); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); QueueMessageHandler mockedHandler = mock(QueueMessageHandler.class); AmazonSQSAsync mockedSqs = mock(AmazonSQSAsync.class, withSettings().stubOnly()); when(mockedSqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); when(mockedSqs.getQueueUrl(any(GetQueueUrlRequest.class))).thenReturn(new GetQueueUrlResult().withQueueUrl("testQueueUrl")); when(mockedHandler.getHandlerMethods()).thenReturn(messageHandlerMethods); container.setMaxNumberOfMessages(testedMaxNumberOfMessages); container.setAmazonSqs(mockedSqs); container.setMessageHandler(mockedHandler); container.afterPropertiesSet(); int expectedPoolMaxSize = messageHandlerMethods.size() * (testedMaxNumberOfMessages + 1); ThreadPoolTaskExecutor taskExecutor = (ThreadPoolTaskExecutor) container.getTaskExecutor(); assertNotNull(taskExecutor); assertEquals(expectedPoolMaxSize, taskExecutor.getMaxPoolSize()); }
@Test public void receiveMessageRequests_withOneElement_created() throws Exception { AbstractMessageListenerContainer container = new StubAbstractMessageListenerContainer(); AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly()); QueueMessageHandler messageHandler = new QueueMessageHandler(); container.setAmazonSqs(mock); container.setMessageHandler(mock(QueueMessageHandler.class)); container.setMessageHandler(messageHandler); StaticApplicationContext applicationContext = new StaticApplicationContext(); applicationContext.registerSingleton("messageListener", MessageListener.class); container.setMaxNumberOfMessages(11); container.setVisibilityTimeout(22); container.setWaitTimeOut(33); messageHandler.setApplicationContext(applicationContext); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://testQueue.amazonaws.com")); when(mock.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); messageHandler.afterPropertiesSet(); container.afterPropertiesSet(); container.start(); Map<String, QueueAttributes> registeredQueues = container.getRegisteredQueues(); assertEquals("http://testQueue.amazonaws.com", registeredQueues.get("testQueue").getReceiveMessageRequest().getQueueUrl()); assertEquals(11L, registeredQueues.get("testQueue").getReceiveMessageRequest().getMaxNumberOfMessages().longValue()); assertEquals(22L, registeredQueues.get("testQueue").getReceiveMessageRequest().getVisibilityTimeout().longValue()); assertEquals(33L, registeredQueues.get("testQueue").getReceiveMessageRequest().getWaitTimeSeconds().longValue()); }
@Test public void receiveMessageRequests_withMultipleElements_created() throws Exception { AbstractMessageListenerContainer container = new StubAbstractMessageListenerContainer(); AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(mock); StaticApplicationContext applicationContext = new StaticApplicationContext(); QueueMessageHandler messageHandler = new QueueMessageHandler(); messageHandler.setApplicationContext(applicationContext); container.setMessageHandler(messageHandler); applicationContext.registerSingleton("messageListener", MessageListener.class); applicationContext.registerSingleton("anotherMessageListener", AnotherMessageListener.class); container.setMaxNumberOfMessages(11); container.setVisibilityTimeout(22); container.setWaitTimeOut(33); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://testQueue.amazonaws.com")); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("anotherTestQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://anotherTestQueue.amazonaws.com")); when(mock.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); messageHandler.afterPropertiesSet(); container.afterPropertiesSet(); container.start(); Map<String, QueueAttributes> registeredQueues = container.getRegisteredQueues(); assertEquals("http://testQueue.amazonaws.com", registeredQueues.get("testQueue").getReceiveMessageRequest().getQueueUrl()); assertEquals(11L, registeredQueues.get("testQueue").getReceiveMessageRequest().getMaxNumberOfMessages().longValue()); assertEquals(22L, registeredQueues.get("testQueue").getReceiveMessageRequest().getVisibilityTimeout().longValue()); assertEquals(33L, registeredQueues.get("testQueue").getReceiveMessageRequest().getWaitTimeSeconds().longValue()); assertEquals("http://anotherTestQueue.amazonaws.com", registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getQueueUrl()); assertEquals(11L, registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getMaxNumberOfMessages().longValue()); assertEquals(22L, registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getVisibilityTimeout().longValue()); assertEquals(33L, registeredQueues.get("anotherTestQueue").getReceiveMessageRequest().getWaitTimeSeconds().longValue()); }
@Test public void testStartCallsDoStartMethod() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(1); AbstractMessageListenerContainer container = new AbstractMessageListenerContainer() { @Override protected void doStart() { countDownLatch.countDown(); } @Override protected void doStop() { throw new UnsupportedOperationException("not supported yet"); } }; AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(mock); container.setMessageHandler(mock(QueueMessageHandler.class)); container.afterPropertiesSet(); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://testQueue.amazonaws.com")); container.start(); try { assertTrue(countDownLatch.await(10, TimeUnit.MILLISECONDS)); } catch (InterruptedException e) { fail("Expected doStart() method to be called"); } }
@Test public void testStopCallsDoStopMethod() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(1); AbstractMessageListenerContainer container = new AbstractMessageListenerContainer() { @Override protected void doStart() { // do nothing in this case } @Override protected void doStop() { countDownLatch.countDown(); } }; AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(mock); container.setMessageHandler(mock(QueueMessageHandler.class)); container.afterPropertiesSet(); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://testQueue.amazonaws.com")); container.start(); container.stop(); try { assertTrue(countDownLatch.await(10, TimeUnit.MILLISECONDS)); } catch (InterruptedException e) { fail("Expected doStart() method to be called"); } }
@Test public void testStopCallsDoStopMethodWithRunnable() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(1); AbstractMessageListenerContainer container = new AbstractMessageListenerContainer() { @Override protected void doStart() { // do nothing in this case } @Override protected void doStop() { countDownLatch.countDown(); } }; AmazonSQSAsync mock = mock(AmazonSQSAsync.class, withSettings().stubOnly()); container.setAmazonSqs(mock); container.setMessageHandler(mock(QueueMessageHandler.class)); container.afterPropertiesSet(); when(mock.getQueueUrl(new GetQueueUrlRequest().withQueueName("testQueue"))). thenReturn(new GetQueueUrlResult().withQueueUrl("http://testQueue.amazonaws.com")); container.start(); container.stop(() -> { try { assertTrue(countDownLatch.await(10, TimeUnit.MILLISECONDS)); } catch (InterruptedException e) { fail("Expected doStart() method to be called"); } }); }