@Override public void renewScanRangeTasks(Collection<ScanRangeTask> tasks, Duration ttl) { if (tasks.isEmpty()) { return; } int timeout = toSeconds(ttl); int id = 0; List<ChangeMessageVisibilityBatchRequestEntry> allEntries = Lists.newArrayListWithCapacity(tasks.size()); for (ScanRangeTask task : tasks) { allEntries.add( new ChangeMessageVisibilityBatchRequestEntry() .withId(String.valueOf(id++)) .withReceiptHandle(((QueueScanRangeTask) task).getMessageId()) .withVisibilityTimeout(timeout)); } // Cannot renew more than 10 in a single request for (List<ChangeMessageVisibilityBatchRequestEntry> entries : Lists.partition(allEntries, 10)) { _sqs.changeMessageVisibilityBatch(new ChangeMessageVisibilityBatchRequest() .withQueueUrl(getQueueUrl(_pendingScanRangeQueue)) .withEntries(entries)); } }
public static void changeMessageVisibilityMultiple( String queue_url, int timeout) { AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient(); List<ChangeMessageVisibilityBatchRequestEntry> entries = new ArrayList<ChangeMessageVisibilityBatchRequestEntry>(); entries.add(new ChangeMessageVisibilityBatchRequestEntry( "unique_id_msg1", sqs.receiveMessage(queue_url) .getMessages() .get(0) .getReceiptHandle()) .withVisibilityTimeout(timeout)); entries.add(new ChangeMessageVisibilityBatchRequestEntry( "unique_id_msg2", sqs.receiveMessage(queue_url) .getMessages() .get(0) .getReceiptHandle()) .withVisibilityTimeout(timeout + 200)); sqs.changeMessageVisibilityBatch(queue_url, entries); }
/** * Action call block for negative acknowledge for the list of receipt * handles. This action can be applied on multiple messages for the same * queue. * * @param queueUrl * The queueUrl of the queue, which the receipt handles belong. * @param receiptHandles * The list of handles, which is be used to negative acknowledge * the messages via using * <code>changeMessageVisibilityBatch</code>. * @throws JMSException * If <code>changeMessageVisibilityBatch</code> throws. */ @Override public void action(String queueUrl, List<String> receiptHandles) throws JMSException { if (receiptHandles == null || receiptHandles.isEmpty()) { return; } List<ChangeMessageVisibilityBatchRequestEntry> nackEntries = new ArrayList<ChangeMessageVisibilityBatchRequestEntry>( receiptHandles.size()); int batchId = 0; for (String messageReceiptHandle : receiptHandles) { ChangeMessageVisibilityBatchRequestEntry changeMessageVisibilityBatchRequestEntry = new ChangeMessageVisibilityBatchRequestEntry( Integer.toString(batchId), messageReceiptHandle).withVisibilityTimeout(NACK_TIMEOUT); nackEntries.add(changeMessageVisibilityBatchRequestEntry); batchId++; } amazonSQSClient.changeMessageVisibilityBatch(new ChangeMessageVisibilityBatchRequest( queueUrl, nackEntries)); }
/** * Test NegativeAcknowledger action */ @Test public void testAction() throws JMSException { List<String> receiptHandles = new ArrayList<String>(); receiptHandles.add("r0"); receiptHandles.add("r1"); receiptHandles.add("r2"); negativeAcknowledger.action(QUEUE_URL, receiptHandles); ArgumentCaptor<ChangeMessageVisibilityBatchRequest> argumentCaptor = ArgumentCaptor.forClass(ChangeMessageVisibilityBatchRequest.class); verify(amazonSQSClient).changeMessageVisibilityBatch(argumentCaptor.capture()); assertEquals(1, argumentCaptor.getAllValues().size()); assertEquals(QUEUE_URL, argumentCaptor.getAllValues().get(0).getQueueUrl()); List<ChangeMessageVisibilityBatchRequestEntry> captureList = argumentCaptor.getAllValues().get(0).getEntries(); assertEquals(receiptHandles.size(), captureList.size()); for (ChangeMessageVisibilityBatchRequestEntry item : captureList) { receiptHandles.contains(item.getReceiptHandle()); } }
@Override public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws AmazonClientException { DirectorySQSQueue queue = getQueueFromUrl(changeMessageVisibilityBatchRequest.getQueueUrl(), false); //lists for reporting List<BatchResultErrorEntry> batchResultErrorEntries = new ArrayList<>(); List<ChangeMessageVisibilityBatchResultEntry> batchResultEntries = new ArrayList<>(); //attempt to change the visibility on each for (ChangeMessageVisibilityBatchRequestEntry batchRequestEntry : changeMessageVisibilityBatchRequest.getEntries()) { try { queue.changeVisibility(batchRequestEntry.getReceiptHandle(), batchRequestEntry.getVisibilityTimeout()); batchResultEntries.add(new ChangeMessageVisibilityBatchResultEntry().withId(batchRequestEntry.getId())); } catch (Exception e) { BatchResultErrorEntry batchResultErrorEntry = new BatchResultErrorEntry(). withSenderFault(true). withId(batchRequestEntry.getId()). withMessage(e.getMessage()); batchResultErrorEntries.add(batchResultErrorEntry); } } return new ChangeMessageVisibilityBatchResult().withFailed(batchResultErrorEntries).withSuccessful(batchResultEntries); }
@VisibleForTesting static ChangeMessageVisibilityBatchRequest createRequest(String queueUrl, Map<String, ChangeMessageVisibilityEntry> entries) { return new ChangeMessageVisibilityBatchRequest() .withQueueUrl(queueUrl) .withEntries(entries.entrySet().stream() .map(keyValue -> new ChangeMessageVisibilityBatchRequestEntry() .withId(keyValue.getKey()) .withReceiptHandle(keyValue.getValue().getReceiptHandle()) .withVisibilityTimeout((int) keyValue.getValue().getNewVisibilityTimeout().getSeconds()) ).collect(Collectors.toList())); }
/** * Simplified method form for invoking the ChangeMessageVisibilityBatch * operation. * * @see #changeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest) */ public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch( String queueUrl, java.util.List<ChangeMessageVisibilityBatchRequestEntry> entries) { ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest = new ChangeMessageVisibilityBatchRequest(queueUrl, entries); return changeMessageVisibilityBatch(changeMessageVisibilityBatchRequest); }
public Observable<ChangeMessageVisibilityBatchResult> changeMessageVisibilityBatchAsync(String queueUrl, List<ChangeMessageVisibilityBatchRequestEntry> entries) { return Observable.from(sqsClient.changeMessageVisibilityBatchAsync(queueUrl, entries)); }
/** * <p> * Changes the visibility timeout of multiple messages. This is a batch * version of ChangeMessageVisibility. The result of the action on each * message is reported individually in the response. You can send up to 10 * ChangeMessageVisibility requests with each * <code>ChangeMessageVisibilityBatch</code> action. * </p> * <p> * <b>IMPORTANT:</b>Because the batch request can result in a combination of * successful and unsuccessful actions, you should check for batch errors * even when the call returns an HTTP status code of 200. * </p> * <p> * <b>NOTE:</b>Some API actions take lists of parameters. These lists are * specified using the param.n notation. Values of n are integers starting * from 1. For example, a parameter list with two elements looks like this: * </p> * <p> * <code>&Attribute.1=this</code> * </p> * <p> * <code>&Attribute.2=that</code> * </p> * * @param queueUrl * The URL of the Amazon SQS queue to take action on. * @param entries * A list of receipt handles of the messages for which the * visibility timeout must be changed. * * @return The response from the ChangeMessageVisibilityBatch service * method, as returned by AmazonSQS. * * @throws BatchEntryIdsNotDistinctException * @throws TooManyEntriesInBatchRequestException * @throws InvalidBatchEntryIdException * @throws EmptyBatchRequestException * * @throws AmazonClientException * If any internal errors are encountered inside the client * while attempting to make the request or handle the response. * For example if a network connection is not available. * @throws AmazonServiceException * If an error response is returned by AmazonSQS indicating * either a problem with the data in the request, or a server * side issue. */ public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(String queueUrl, List<ChangeMessageVisibilityBatchRequestEntry> entries) throws AmazonServiceException, AmazonClientException { return amazonSqsToBeExtended.changeMessageVisibilityBatch(queueUrl, entries); }
/** * <p> * Changes the visibility timeout of multiple messages. This is a batch * version of ChangeMessageVisibility. The result of the action on each * message is reported individually in the response. You can send up to 10 * ChangeMessageVisibility requests with each * <code>ChangeMessageVisibilityBatch</code> action. * </p> * <p> * <b>IMPORTANT:</b>Because the batch request can result in a combination of * successful and unsuccessful actions, you should check for batch errors * even when the call returns an HTTP status code of 200. * </p> * <p> * <b>NOTE:</b>Some API actions take lists of parameters. These lists are * specified using the param.n notation. Values of n are integers starting * from 1. For example, a parameter list with two elements looks like this: * </p> * <p> * <code>&Attribute.1=this</code> * </p> * <p> * <code>&Attribute.2=that</code> * </p> * * @param changeMessageVisibilityBatchRequest * Container for the necessary parameters to execute the * ChangeMessageVisibilityBatch service method on AmazonSQS. * * @return The response from the ChangeMessageVisibilityBatch service * method, as returned by AmazonSQS. * * @throws BatchEntryIdsNotDistinctException * @throws TooManyEntriesInBatchRequestException * @throws InvalidBatchEntryIdException * @throws EmptyBatchRequestException * * @throws AmazonClientException * If any internal errors are encountered inside the client * while attempting to make the request or handle the response. * For example if a network connection is not available. * @throws AmazonServiceException * If an error response is returned by AmazonSQS indicating * either a problem with the data in the request, or a server * side issue. */ public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch( ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws AmazonServiceException, AmazonClientException { for (ChangeMessageVisibilityBatchRequestEntry entry : changeMessageVisibilityBatchRequest.getEntries()) { if (isS3ReceiptHandle(entry.getReceiptHandle())) { entry.setReceiptHandle(getOrigReceiptHandle(entry.getReceiptHandle())); } } return amazonSqsToBeExtended.changeMessageVisibilityBatch(changeMessageVisibilityBatchRequest); }