@Override public void renewScanRangeTasks(Collection<ScanRangeTask> tasks, Duration ttl) { if (tasks.isEmpty()) { return; } int timeout = toSeconds(ttl); int id = 0; List<ChangeMessageVisibilityBatchRequestEntry> allEntries = Lists.newArrayListWithCapacity(tasks.size()); for (ScanRangeTask task : tasks) { allEntries.add( new ChangeMessageVisibilityBatchRequestEntry() .withId(String.valueOf(id++)) .withReceiptHandle(((QueueScanRangeTask) task).getMessageId()) .withVisibilityTimeout(timeout)); } // Cannot renew more than 10 in a single request for (List<ChangeMessageVisibilityBatchRequestEntry> entries : Lists.partition(allEntries, 10)) { _sqs.changeMessageVisibilityBatch(new ChangeMessageVisibilityBatchRequest() .withQueueUrl(getQueueUrl(_pendingScanRangeQueue)) .withEntries(entries)); } }
/** * Action call block for negative acknowledge for the list of receipt * handles. This action can be applied on multiple messages for the same * queue. * * @param queueUrl * The queueUrl of the queue, which the receipt handles belong. * @param receiptHandles * The list of handles, which is be used to negative acknowledge * the messages via using * <code>changeMessageVisibilityBatch</code>. * @throws JMSException * If <code>changeMessageVisibilityBatch</code> throws. */ @Override public void action(String queueUrl, List<String> receiptHandles) throws JMSException { if (receiptHandles == null || receiptHandles.isEmpty()) { return; } List<ChangeMessageVisibilityBatchRequestEntry> nackEntries = new ArrayList<ChangeMessageVisibilityBatchRequestEntry>( receiptHandles.size()); int batchId = 0; for (String messageReceiptHandle : receiptHandles) { ChangeMessageVisibilityBatchRequestEntry changeMessageVisibilityBatchRequestEntry = new ChangeMessageVisibilityBatchRequestEntry( Integer.toString(batchId), messageReceiptHandle).withVisibilityTimeout(NACK_TIMEOUT); nackEntries.add(changeMessageVisibilityBatchRequestEntry); batchId++; } amazonSQSClient.changeMessageVisibilityBatch(new ChangeMessageVisibilityBatchRequest( queueUrl, nackEntries)); }
/** * Test NegativeAcknowledger action */ @Test public void testAction() throws JMSException { List<String> receiptHandles = new ArrayList<String>(); receiptHandles.add("r0"); receiptHandles.add("r1"); receiptHandles.add("r2"); negativeAcknowledger.action(QUEUE_URL, receiptHandles); ArgumentCaptor<ChangeMessageVisibilityBatchRequest> argumentCaptor = ArgumentCaptor.forClass(ChangeMessageVisibilityBatchRequest.class); verify(amazonSQSClient).changeMessageVisibilityBatch(argumentCaptor.capture()); assertEquals(1, argumentCaptor.getAllValues().size()); assertEquals(QUEUE_URL, argumentCaptor.getAllValues().get(0).getQueueUrl()); List<ChangeMessageVisibilityBatchRequestEntry> captureList = argumentCaptor.getAllValues().get(0).getEntries(); assertEquals(receiptHandles.size(), captureList.size()); for (ChangeMessageVisibilityBatchRequestEntry item : captureList) { receiptHandles.contains(item.getReceiptHandle()); } }
@Override public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws AmazonClientException { DirectorySQSQueue queue = getQueueFromUrl(changeMessageVisibilityBatchRequest.getQueueUrl(), false); //lists for reporting List<BatchResultErrorEntry> batchResultErrorEntries = new ArrayList<>(); List<ChangeMessageVisibilityBatchResultEntry> batchResultEntries = new ArrayList<>(); //attempt to change the visibility on each for (ChangeMessageVisibilityBatchRequestEntry batchRequestEntry : changeMessageVisibilityBatchRequest.getEntries()) { try { queue.changeVisibility(batchRequestEntry.getReceiptHandle(), batchRequestEntry.getVisibilityTimeout()); batchResultEntries.add(new ChangeMessageVisibilityBatchResultEntry().withId(batchRequestEntry.getId())); } catch (Exception e) { BatchResultErrorEntry batchResultErrorEntry = new BatchResultErrorEntry(). withSenderFault(true). withId(batchRequestEntry.getId()). withMessage(e.getMessage()); batchResultErrorEntries.add(batchResultErrorEntry); } } return new ChangeMessageVisibilityBatchResult().withFailed(batchResultErrorEntries).withSuccessful(batchResultEntries); }
@VisibleForTesting static ChangeMessageVisibilityBatchRequest createRequest(String queueUrl, Map<String, ChangeMessageVisibilityEntry> entries) { return new ChangeMessageVisibilityBatchRequest() .withQueueUrl(queueUrl) .withEntries(entries.entrySet().stream() .map(keyValue -> new ChangeMessageVisibilityBatchRequestEntry() .withId(keyValue.getKey()) .withReceiptHandle(keyValue.getValue().getReceiptHandle()) .withVisibilityTimeout((int) keyValue.getValue().getNewVisibilityTimeout().getSeconds()) ).collect(Collectors.toList())); }
@Test public void testCreateRequest() { ChangeMessageVisibilityBatchRequest request = ChangeMessageVisibilityBatchAction.createRequest(QUEUE_URL, ENTRY_MAP); assertThat(request.getQueueUrl()).isEqualTo(QUEUE_URL); assertThat(request.getEntries().size()).isEqualTo(ENTRY_MAP.size()); }
@Test public void testNonInjectableMocks_shouldReturnNormal() { assertNotNull(sqs.changeMessageVisibilityBatch(new ChangeMessageVisibilityBatchRequest())); assertNotNull(sqs.addPermission(new AddPermissionRequest().withActions("one").withAWSAccountIds("two","three").withLabel("four").withQueueUrl("five"))); assertNotNull(sqs.listDeadLetterSourceQueues(new ListDeadLetterSourceQueuesRequest().withQueueUrl("ten"))); assertNotNull(sqs.getQueueAttributes(new GetQueueAttributesRequest().withAttributeNames(ImmutableList.of("eleven")).withQueueUrl("twelve"))); assertNotNull(sqs.setQueueAttributes(new SetQueueAttributesRequest().withAttributes(ImmutableMap.of("thirteen","fourteen")).withQueueUrl("fifteen"))); }
/** * Simplified method form for invoking the ChangeMessageVisibilityBatch * operation. * * @see #changeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest) */ public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch( String queueUrl, java.util.List<ChangeMessageVisibilityBatchRequestEntry> entries) { ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest = new ChangeMessageVisibilityBatchRequest(queueUrl, entries); return changeMessageVisibilityBatch(changeMessageVisibilityBatchRequest); }
@Override public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch( ChangeMessageVisibilityBatchRequest request, ResultCapture<ChangeMessageVisibilityBatchResult> extractor) { ActionResult result = resource.performAction("ChangeMessageVisibilityBatch", request, extractor); if (result == null) return null; return (ChangeMessageVisibilityBatchResult) result.getData(); }
@Override public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch( List<ChangeMessageVisibilityBatchRequestEntry> entries, ResultCapture<ChangeMessageVisibilityBatchResult> extractor) { ChangeMessageVisibilityBatchRequest request = new ChangeMessageVisibilityBatchRequest() .withEntries(entries); return changeMessageVisibilityBatch(request, extractor); }
@Test public void testChangeMessageVisibilityBatch() throws JMSException { ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest = new ChangeMessageVisibilityBatchRequest(); wrapper.changeMessageVisibilityBatch(changeMessageVisibilityBatchRequest); verify(amazonSQSClient).changeMessageVisibilityBatch(changeMessageVisibilityBatchRequest); }
@Test(expected = JMSException.class) public void testChangeMessageVisibilityBatchThrowAmazonClientException() throws JMSException { ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest = new ChangeMessageVisibilityBatchRequest(); doThrow(new AmazonClientException("ace")) .when(amazonSQSClient).changeMessageVisibilityBatch(eq(changeMessageVisibilityBatchRequest)); wrapper.changeMessageVisibilityBatch(changeMessageVisibilityBatchRequest); }
@Test(expected = JMSException.class) public void testChangeMessageVisibilityBatchThrowAmazonServiceException() throws JMSException { ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest = new ChangeMessageVisibilityBatchRequest(); doThrow(new AmazonServiceException("ase")) .when(amazonSQSClient).changeMessageVisibilityBatch(eq(changeMessageVisibilityBatchRequest)); wrapper.changeMessageVisibilityBatch(changeMessageVisibilityBatchRequest); }
public Observable<ChangeMessageVisibilityBatchResult> changeMessageVisibilityBatchAsync(ChangeMessageVisibilityBatchRequest request) { return Observable.from(sqsClient.changeMessageVisibilityBatchAsync(request)); }
@Override public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch( ChangeMessageVisibilityBatchRequest request) { return changeMessageVisibilityBatch(request, null); }
/** * <p> * Changes the visibility timeout of multiple messages. This is a batch * version of ChangeMessageVisibility. The result of the action on each * message is reported individually in the response. You can send up to 10 * ChangeMessageVisibility requests with each * <code>ChangeMessageVisibilityBatch</code> action. * </p> * <p> * <b>IMPORTANT:</b>Because the batch request can result in a combination of * successful and unsuccessful actions, you should check for batch errors * even when the call returns an HTTP status code of 200. * </p> * <p> * <b>NOTE:</b>Some API actions take lists of parameters. These lists are * specified using the param.n notation. Values of n are integers starting * from 1. For example, a parameter list with two elements looks like this: * </p> * <p> * <code>&Attribute.1=this</code> * </p> * <p> * <code>&Attribute.2=that</code> * </p> * * @param changeMessageVisibilityBatchRequest * Container for the necessary parameters to execute the * ChangeMessageVisibilityBatch service method on AmazonSQS. * * @return The response from the ChangeMessageVisibilityBatch service * method, as returned by AmazonSQS. * * @throws BatchEntryIdsNotDistinctException * @throws TooManyEntriesInBatchRequestException * @throws InvalidBatchEntryIdException * @throws EmptyBatchRequestException * * @throws AmazonClientException * If any internal errors are encountered inside the client * while attempting to make the request or handle the response. * For example if a network connection is not available. * @throws AmazonServiceException * If an error response is returned by AmazonSQS indicating * either a problem with the data in the request, or a server * side issue. */ public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch( ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws AmazonServiceException, AmazonClientException { return amazonSqsToBeExtended.changeMessageVisibilityBatch(changeMessageVisibilityBatchRequest); }
/** * <p> * Changes the visibility timeout of multiple messages. This is a batch * version of ChangeMessageVisibility. The result of the action on each * message is reported individually in the response. You can send up to 10 * ChangeMessageVisibility requests with each * <code>ChangeMessageVisibilityBatch</code> action. * </p> * <p> * <b>IMPORTANT:</b>Because the batch request can result in a combination of * successful and unsuccessful actions, you should check for batch errors * even when the call returns an HTTP status code of 200. * </p> * <p> * <b>NOTE:</b>Some API actions take lists of parameters. These lists are * specified using the param.n notation. Values of n are integers starting * from 1. For example, a parameter list with two elements looks like this: * </p> * <p> * <code>&Attribute.1=this</code> * </p> * <p> * <code>&Attribute.2=that</code> * </p> * * @param changeMessageVisibilityBatchRequest * Container for the necessary parameters to execute the * ChangeMessageVisibilityBatch service method on AmazonSQS. * * @return The response from the ChangeMessageVisibilityBatch service * method, as returned by AmazonSQS. * * @throws BatchEntryIdsNotDistinctException * @throws TooManyEntriesInBatchRequestException * @throws InvalidBatchEntryIdException * @throws EmptyBatchRequestException * * @throws AmazonClientException * If any internal errors are encountered inside the client * while attempting to make the request or handle the response. * For example if a network connection is not available. * @throws AmazonServiceException * If an error response is returned by AmazonSQS indicating * either a problem with the data in the request, or a server * side issue. */ public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch( ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws AmazonServiceException, AmazonClientException { for (ChangeMessageVisibilityBatchRequestEntry entry : changeMessageVisibilityBatchRequest.getEntries()) { if (isS3ReceiptHandle(entry.getReceiptHandle())) { entry.setReceiptHandle(getOrigReceiptHandle(entry.getReceiptHandle())); } } return amazonSqsToBeExtended.changeMessageVisibilityBatch(changeMessageVisibilityBatchRequest); }
/** * Calls <code>changeMessageVisibilityBatch</code> and wraps <code>AmazonClientException</code>. This is * used to for negative acknowledge of messages in batch, so that messages * can be received again without any delay. * * @param changeMessageVisibilityBatchRequest * Container for the necessary parameters to execute the * changeMessageVisibilityBatch service method on AmazonSQS. * @return The response from the changeMessageVisibilityBatch service * method, as returned by AmazonSQS. * @throws JMSException */ public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws JMSException { try { prepareRequest(changeMessageVisibilityBatchRequest); return amazonSQSClient.changeMessageVisibilityBatch(changeMessageVisibilityBatchRequest); } catch (AmazonClientException e) { throw handleException(e, "changeMessageVisibilityBatch"); } }
/** * Test NegativeAcknowledger action withe empty receipt handles */ @Test public void testActionEmptyReceiptHandles() throws JMSException { negativeAcknowledger.action(QUEUE_URL, null); negativeAcknowledger.action(QUEUE_URL, Collections.EMPTY_LIST); verify(amazonSQSClient, never()).changeMessageVisibilityBatch(any(ChangeMessageVisibilityBatchRequest.class)); }
/** * Performs the <code>ChangeMessageVisibilityBatch</code> action. * * <p> * The following request parameters will be populated from the data of this * <code>Queue</code> resource, and any conflicting parameter value set in * the request will be overridden: * <ul> * <li> * <b><code>QueueUrl</code></b> * - mapped from the <code>Url</code> identifier. * </li> * </ul> * * <p> * * @return The response of the low-level client operation associated with * this resource action. * @see ChangeMessageVisibilityBatchRequest */ ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch( ChangeMessageVisibilityBatchRequest request);
/** * Performs the <code>ChangeMessageVisibilityBatch</code> action and use a * ResultCapture to retrieve the low-level client response. * * <p> * The following request parameters will be populated from the data of this * <code>Queue</code> resource, and any conflicting parameter value set in * the request will be overridden: * <ul> * <li> * <b><code>QueueUrl</code></b> * - mapped from the <code>Url</code> identifier. * </li> * </ul> * * <p> * * @return The response of the low-level client operation associated with * this resource action. * @see ChangeMessageVisibilityBatchRequest */ ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch( ChangeMessageVisibilityBatchRequest request, ResultCapture<ChangeMessageVisibilityBatchResult> extractor);