@Test public void oneQueue() { AmazonSQS amazonSQS = mock(AmazonSQS.class); // return one queue when(amazonSQS.listQueues(any(ListQueuesRequest.class))) .thenReturn(new ListQueuesResult().withQueueUrls("test-foo")); // return 3 messages from the queue when(amazonSQS.receiveMessage(any(ReceiveMessageRequest.class))) .thenReturn(new ReceiveMessageResult().withMessages(newMessage("foo"), newMessage("foo"), newMessage("foo"))); AmazonSQSPrioritizedMessageProvider provider = new AmazonSQSPrioritizedMessageProvider(amazonSQS, "test", 1, 60 * 1000); List<Message> messages = provider.next(); assertMessages(messages, 3, "foo"); verify(amazonSQS).listQueues(any(ListQueuesRequest.class)); verify(amazonSQS).receiveMessage(any(ReceiveMessageRequest.class)); }
@Test public void testCreateGetUrlListQueue_shouldCreateReturnUrlAndListQueue() { // create first queue CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue")); assertNotNull("verify that, on creation, queue url was returned",createdQueue.getQueueUrl()); // create other queues CreateQueueResult secondTeaQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-mate-queue")); CreateQueueResult anotherQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("coffee-queue")); // get queue url GetQueueUrlResult queueUrlResult = sqs.getQueueUrl(new GetQueueUrlRequest() .withQueueName("tea-earl-grey-queue").withQueueOwnerAWSAccountId("some owner")); assertNotNull("verify that, on fetch, queue url was returned", queueUrlResult.getQueueUrl()); // get all queues ListQueuesResult allQueues = sqs.listQueues(); assertEquals("verify all queues are returned", 3, allQueues.getQueueUrls().size()); assertTrue("verify that all queues contain first queue", allQueues.getQueueUrls().contains(createdQueue.getQueueUrl())); assertTrue("verify that all queues contain second tea queue", allQueues.getQueueUrls().contains(secondTeaQueue.getQueueUrl())); assertTrue("verify that all queues contain coffee queue", allQueues.getQueueUrls().contains(anotherQueue.getQueueUrl())); // get only queues that start with 'tea' ListQueuesResult teaQueues = sqs.listQueues(new ListQueuesRequest("tea")); assertEquals("verify only tea queues are returned", 2, teaQueues.getQueueUrls().size()); assertTrue("verify that tea queues contain first queue", teaQueues.getQueueUrls().contains(createdQueue.getQueueUrl())); assertTrue("verify that tea queues contain second tea queue", teaQueues.getQueueUrls().contains(secondTeaQueue.getQueueUrl())); assertNotNull("verify that delete queue returned ok", sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(queueUrlResult.getQueueUrl()))); assertFalse("verify that the queue was removed", sqs.listQueues().getQueueUrls().stream() .anyMatch( queueUrl -> StringUtils.equals(queueUrl,queueUrlResult.getQueueUrl()) )); // cleanup getQueues().remove("tea-earl-grey-queue"); getQueues().remove("tea-mate-queue"); getQueues().remove("coffee-queue"); }
@Override public QueueCollection getQueues(ListQueuesRequest request) { ResourceCollectionImpl result = service.getCollection("Queues", request); if (result == null) return null; return new QueueCollectionImpl(result); }
@Override public ListQueuesResult listQueues(ListQueuesRequest listQueuesRequest) throws AmazonClientException { List<String> queueUrls = Lists.newArrayListWithCapacity(_queuesByUrl.size()); try (DirectoryStream<Path> queuePaths = Files.newDirectoryStream(_rootDirectory.toPath())) { for (Path queuePath : queuePaths) { if (listQueuesRequest.getQueueNamePrefix() == null || queuePath.getFileName().toString().startsWith(listQueuesRequest.getQueueNamePrefix())) { queueUrls.add(queuePath.toUri().toString()); } } } catch (IOException e) { throw new AmazonServiceException("could not get queue list", e); } return new ListQueuesResult().withQueueUrls(queueUrls); }
private void updateAvailableQueues() { try { ListQueuesResult result = sqs.listQueues(new ListQueuesRequest(queuePrefix)); List<String> availableQueues = Lists.newArrayList(Iterables.filter(result.getQueueUrls(), include)); Collections.sort(availableQueues, queueComparator); messageProviders.clear(); for (String queueUrl : availableQueues) { messageProviders.add(new AmazonSQSMessageProvider(sqs, queueUrl, waitTimeSeconds)); } } catch (AmazonClientException e) { LOG.error("An error occurred while listing SQS queues: {}", e); } }
@Test public void noQueues() { AmazonSQS amazonSQS = mock(AmazonSQS.class); // return an empty list when(amazonSQS.listQueues(any(ListQueuesRequest.class))).thenReturn(new ListQueuesResult()); AmazonSQSPrioritizedMessageProvider provider = new AmazonSQSPrioritizedMessageProvider(amazonSQS, "test", 1, 10 * 1000); List<Message> empty = provider.next(); Assert.assertEquals(0, empty.size()); verify(amazonSQS).listQueues(any(ListQueuesRequest.class)); }
@Test public void testListQueues_withEmptyRequestParams_shouldWork() { assertNotNull(sqs.listQueues(new ListQueuesRequest())); }
private List<String> listQueues(String queueName) { ListQueuesRequest listQueuesRequest = new ListQueuesRequest().withQueueNamePrefix(queueName); ListQueuesResult resultList = client.listQueues(listQueuesRequest); List<String> queueUrls = resultList.getQueueUrls().stream().filter(queueUrl -> queueUrl.contains(queueName)).collect(Collectors.toList()); return queueUrls; }
public Observable<ListQueuesResult> listQueuesAsync(ListQueuesRequest request) { return Observable.from(sqsClient.listQueuesAsync(request)); }
@Override public QueueCollection getQueues(String queueNamePrefix) { return getQueues( new ListQueuesRequest(queueNamePrefix)); }
@Override public QueueCollection getQueues() { return getQueues((ListQueuesRequest)null); }
@Test public void drainThreeQueues() { AmazonSQS amazonSQS = mock(AmazonSQS.class); // return three queues when(amazonSQS.listQueues(any(ListQueuesRequest.class))) .thenReturn(new ListQueuesResult().withQueueUrls("test-A", "test-C", "test-B", "test-D")); // each queue has N messages to return and then 0 messages when(amazonSQS.receiveMessage(any(ReceiveMessageRequest.class))) .thenAnswer(new Answer<Object>() { // fake that a queue has no messages left with flags boolean aDone = false; boolean bDone = false; boolean cDone = false; @Override public Object answer(InvocationOnMock invocation) throws Throwable { ReceiveMessageRequest receiveMessageRequest = (ReceiveMessageRequest) invocation.getArguments()[0]; if (receiveMessageRequest.getQueueUrl().equals("test-A")) { // return 3 messages for A, then no more messages if (!aDone) { aDone = true; return new ReceiveMessageResult().withMessages(newMessage("A"), newMessage("A"), newMessage("A")); } } else if (receiveMessageRequest.getQueueUrl().equals("test-B")) { // return 4 messages for B, then no more messages if (!bDone) { bDone = true; return new ReceiveMessageResult().withMessages(newMessage("B"), newMessage("B"), newMessage("B"), newMessage("B")); } } else if (receiveMessageRequest.getQueueUrl().equals("test-C")) { // return 1 message for C, then no more messages if (!cDone) { cDone = true; return new ReceiveMessageResult().withMessages(newMessage("C")); } } // fall through to return 0 messages return new ReceiveMessageResult().withMessages(); } }); // verify the order of next objects by counting the number of messages we return AmazonSQSPrioritizedMessageProvider provider = new AmazonSQSPrioritizedMessageProvider(amazonSQS, "test", 1, TimeUnit.MINUTES.toMillis(30)); // for queues with messages, 1 receive returns messages, 1 receive returns none and removes the provider assertMessages(provider.next(), 3, "A"); assertMessages(provider.next(), 4, "B"); assertMessages(provider.next(), 1, "C"); // for queues with no messages, 1 receive returns none and removes the provider assertMessages(provider.next(), 0, "D-but-nothing-really"); // when we have no queues left, there are no messages assertMessages(provider.next(), 0, "nothing"); assertMessages(provider.next(), 0, "nothing"); assertMessages(provider.next(), 0, "nothing"); // we only checked for queues once here verify(amazonSQS, times(1)).listQueues(any(ListQueuesRequest.class)); // the total number of receives sent to AWS verify(amazonSQS, times(7)).receiveMessage(any(ReceiveMessageRequest.class)); }
@Test public void updatingQueues() { AmazonSQS amazonSQS = mock(AmazonSQS.class); // return queues when(amazonSQS.listQueues(any(ListQueuesRequest.class))) .thenReturn( new ListQueuesResult().withQueueUrls("test-A", "test-C", "test-B", "test-D"), new ListQueuesResult().withQueueUrls("test-C", "test-B", "test-D")); // always return messages from these queues when(amazonSQS.receiveMessage(any(ReceiveMessageRequest.class))) .thenAnswer(new Answer<Object>() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { ReceiveMessageRequest receiveMessageRequest = (ReceiveMessageRequest) invocation.getArguments()[0]; return new ReceiveMessageResult().withMessages(newMessage(receiveMessageRequest.getQueueUrl()), newMessage(receiveMessageRequest.getQueueUrl())); } }); // check for new queues every 30 minutes, should be way longer than this test runs so we can simulate elapsed time final long intervalNs = TimeUnit.MINUTES.toNanos(30); // simulate the passage of time final Iterator<Long> time = Lists.newArrayList( 0L, // start at 0, always an initial update 1 * intervalNs / 2, // update 1 * intervalNs, // no update 1 * intervalNs + (intervalNs / 2), // update 2 * intervalNs, // no update 3 * intervalNs + 1, // should update 3 * intervalNs + 2 // no update ).iterator(); AmazonSQSPrioritizedMessageProvider provider = new AmazonSQSPrioritizedMessageProvider(amazonSQS, "test", 1, TimeUnit.NANOSECONDS.toMillis(intervalNs)) { @Override public long currentNanoTime() { return time.next(); } }; // no update on constructor verify(amazonSQS, times(0)).listQueues(any(ListQueuesRequest.class)); // update on first next provider.next(); verify(amazonSQS, times(1)).listQueues(any(ListQueuesRequest.class)); // no update provider.next(); verify(amazonSQS, times(1)).listQueues(any(ListQueuesRequest.class)); // update provider.next(); verify(amazonSQS, times(2)).listQueues(any(ListQueuesRequest.class)); // no update provider.next(); verify(amazonSQS, times(2)).listQueues(any(ListQueuesRequest.class)); // update provider.next(); verify(amazonSQS, times(3)).listQueues(any(ListQueuesRequest.class)); // no update provider.next(); verify(amazonSQS, times(3)).listQueues(any(ListQueuesRequest.class)); }
@Test public void filteredQueues() { AmazonSQS amazonSQS = mock(AmazonSQS.class); // return queues when(amazonSQS.listQueues(any(ListQueuesRequest.class))) .thenReturn( new ListQueuesResult().withQueueUrls("test-A", "test-C", "test-B", "test-D"), new ListQueuesResult().withQueueUrls("test-C", "test-B", "test-D"), new ListQueuesResult().withQueueUrls("test-C", "test-D")); // always return messages from these queues when(amazonSQS.receiveMessage(any(ReceiveMessageRequest.class))) .thenAnswer(new Answer<Object>() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { ReceiveMessageRequest receiveMessageRequest = (ReceiveMessageRequest) invocation.getArguments()[0]; return new ReceiveMessageResult().withMessages(newMessage(receiveMessageRequest.getQueueUrl()), newMessage(receiveMessageRequest.getQueueUrl())); } }); AmazonSQSPrioritizedMessageProvider provider = new AmazonSQSPrioritizedMessageProvider(amazonSQS, "test", 1, 0) .withInclude(new Predicate<String>() { @Override public boolean apply(String input) { return input.equals("test-B"); } }); // no update on constructor verify(amazonSQS, times(0)).listQueues(any(ListQueuesRequest.class)); // update on every next, only test-B messages assertMessages(provider.next(), 2, "test-B"); verify(amazonSQS, times(1)).listQueues(any(ListQueuesRequest.class)); // update on every next, only test-B messages assertMessages(provider.next(), 2, "test-B"); verify(amazonSQS, times(2)).listQueues(any(ListQueuesRequest.class)); // update on every next, should be no messages when there is no B assertMessages(provider.next(), 0, "nothing"); verify(amazonSQS, times(3)).listQueues(any(ListQueuesRequest.class)); }
@Test public void sortedQueuesReverse() { AmazonSQS amazonSQS = mock(AmazonSQS.class); // return queues when(amazonSQS.listQueues(any(ListQueuesRequest.class))) .thenReturn( new ListQueuesResult().withQueueUrls("test-A", "test-C", "test-B", "test-D"), new ListQueuesResult().withQueueUrls("test-A", "test-C", "test-B"), new ListQueuesResult().withQueueUrls("test-C", "test-e", "test-D", "test-F", "test-A")); // always return messages from these queues when(amazonSQS.receiveMessage(any(ReceiveMessageRequest.class))) .thenAnswer(new Answer<Object>() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { ReceiveMessageRequest receiveMessageRequest = (ReceiveMessageRequest) invocation.getArguments()[0]; return new ReceiveMessageResult().withMessages(newMessage(receiveMessageRequest.getQueueUrl()), newMessage(receiveMessageRequest.getQueueUrl())); } }); AmazonSQSPrioritizedMessageProvider provider = new AmazonSQSPrioritizedMessageProvider(amazonSQS, "test", 1, 0) .withQueueComparator(new Comparator<String>() { @Override public int compare(String s1, String s2) { // reverse sorting return s2.compareTo(s1); } }); // no update on constructor verify(amazonSQS, times(0)).listQueues(any(ListQueuesRequest.class)); // update on every next, only test-D messages assertMessages(provider.next(), 2, "test-D"); verify(amazonSQS, times(1)).listQueues(any(ListQueuesRequest.class)); // update on every next, only test-C messages assertMessages(provider.next(), 2, "test-C"); verify(amazonSQS, times(2)).listQueues(any(ListQueuesRequest.class)); // update on every next, only test-e messages assertMessages(provider.next(), 2, "test-e"); verify(amazonSQS, times(3)).listQueues(any(ListQueuesRequest.class)); }
@Test public void sortedQueuesReverseIgnoreCase() { AmazonSQS amazonSQS = mock(AmazonSQS.class); // return queues when(amazonSQS.listQueues(any(ListQueuesRequest.class))) .thenReturn( new ListQueuesResult().withQueueUrls("test-A", "test-C", "test-B", "test-D"), new ListQueuesResult().withQueueUrls("test-A", "test-C", "test-B"), new ListQueuesResult().withQueueUrls("test-C", "test-e", "test-D", "test-F", "test-A")); // always return messages from these queues when(amazonSQS.receiveMessage(any(ReceiveMessageRequest.class))) .thenAnswer(new Answer<Object>() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { ReceiveMessageRequest receiveMessageRequest = (ReceiveMessageRequest) invocation.getArguments()[0]; return new ReceiveMessageResult().withMessages(newMessage(receiveMessageRequest.getQueueUrl()), newMessage(receiveMessageRequest.getQueueUrl())); } }); AmazonSQSPrioritizedMessageProvider provider = new AmazonSQSPrioritizedMessageProvider(amazonSQS, "test", 1, 0) .withQueueComparator(new Comparator<String>() { @Override public int compare(String s1, String s2) { // reverse sorting, ignore case return s2.compareToIgnoreCase(s1); } }); // no update on constructor verify(amazonSQS, times(0)).listQueues(any(ListQueuesRequest.class)); // update on every next, only test-D messages assertMessages(provider.next(), 2, "test-D"); verify(amazonSQS, times(1)).listQueues(any(ListQueuesRequest.class)); // update on every next, only test-C messages assertMessages(provider.next(), 2, "test-C"); verify(amazonSQS, times(2)).listQueues(any(ListQueuesRequest.class)); // update on every next, only test-F messages assertMessages(provider.next(), 2, "test-F"); verify(amazonSQS, times(3)).listQueues(any(ListQueuesRequest.class)); }
/** * <p> * Returns a list of your queues. The maximum number of queues that can be * returned is 1000. If you specify a value for the optional * <code>QueueNamePrefix</code> parameter, only queues with a name beginning * with the specified value are returned. * </p> * * @param listQueuesRequest * Container for the necessary parameters to execute the * ListQueues service method on AmazonSQS. * * @return The response from the ListQueues service method, as returned by * AmazonSQS. * * * @throws AmazonClientException * If any internal errors are encountered inside the client * while attempting to make the request or handle the response. * For example if a network connection is not available. * @throws AmazonServiceException * If an error response is returned by AmazonSQS indicating * either a problem with the data in the request, or a server * side issue. */ public ListQueuesResult listQueues(ListQueuesRequest listQueuesRequest) throws AmazonServiceException, AmazonClientException { return amazonSqsToBeExtended.listQueues(listQueuesRequest); }
/** * Retrieves the Queues collection referenced by this resource. */ QueueCollection getQueues(ListQueuesRequest request);