private List<String> delete(List<Message> messages) { if (messages == null || messages.isEmpty()) { return null; } DeleteMessageBatchRequest batch = new DeleteMessageBatchRequest().withQueueUrl(queueURL); List<DeleteMessageBatchRequestEntry> entries = batch.getEntries(); messages.stream().forEach(m -> entries.add(new DeleteMessageBatchRequestEntry().withId(m.getId()).withReceiptHandle(m.getReceipt()))); DeleteMessageBatchResult result = client.deleteMessageBatch(batch); List<String> failures = result.getFailed().stream().map(fm -> fm.getId()).collect(Collectors.toList()); logger.debug("failed to delete: {}", failures); return failures; }
@Override public void releaseCompleteScanRanges(Collection<ScanRangeComplete> completions) { if (completions.isEmpty()) { return; } int id = 0; List<DeleteMessageBatchRequestEntry> entries = Lists.newArrayListWithCapacity(completions.size()); for (ScanRangeComplete completion : completions) { entries.add( new DeleteMessageBatchRequestEntry() .withId(String.valueOf(id++)) .withReceiptHandle(((QueueScanRangeComplete) completion).getMessageId())); } _sqs.deleteMessageBatch(new DeleteMessageBatchRequest() .withQueueUrl(getQueueUrl(_completeScanRangeQueue)) .withEntries(entries)); }
/*** * Handle Messages * @param queue * @param sqsMessages */ private void handleMessages(final String queue, final List<com.amazonaws.services.sqs.model.Message> sqsMessages) { final List<Message> messages = fromSqsMessages(queue, sqsMessages); if(CollectionUtil.hasElements(messages)){ this.handler.handleBatch(messages); final List<DeleteMessageBatchRequestEntry> deleteEntries = new ArrayList<DeleteMessageBatchRequestEntry>(); /*** TODO Allow the caller to specify messages to delete **/ for(com.amazonaws.services.sqs.model.Message sqsMessage : sqsMessages){ final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry(sqsMessage.getMessageId(), sqsMessage.getReceiptHandle()); deleteEntries.add(entry); } /** Delete the message batch - TODO - This should actually Respect the feedback given from the handler on which messages to delete**/ this.client.deleteMessageBatch( new DeleteMessageBatchRequest(this.queueUrl) .withEntries(deleteEntries)); } }
/** * Tests a simple send, receive and delete of a message from the queue * resource. Asserts the message contents and its associated attributes. */ @Test @Ignore public void testSendReceiveDelete() throws InterruptedException { SendMessageResult sendMessageResult = queue.sendMessage(TEST_MESSAGE); assertNotNull(sendMessageResult); assertNotNull(sendMessageResult.getMessageId()); List<Message> messages = waitForMessagesFromQueue(null); assertNotNull(messages); assertEquals(1, messages.size()); Message message = messages.get(0); assertMessage(TEST_MESSAGE, sendMessageResult.getMessageId(), sendMessageResult.getMD5OfMessageBody(), message); queue.deleteMessages(new DeleteMessageBatchRequest() .withEntries(new DeleteMessageBatchRequestEntry("msg1", message .getReceiptHandle()))); }
@Override public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws AmazonClientException { DirectorySQSQueue queue = getQueueFromUrl(deleteMessageBatchRequest.getQueueUrl(), false); //lists for reporting List<BatchResultErrorEntry> batchResultErrorEntries = new ArrayList<>(); List<DeleteMessageBatchResultEntry> batchResultEntries = new ArrayList<>(); //attempt delete on each for (DeleteMessageBatchRequestEntry batchRequestEntry : deleteMessageBatchRequest.getEntries()) { try { queue.delete(batchRequestEntry.getReceiptHandle()); batchResultEntries.add(new DeleteMessageBatchResultEntry().withId(batchRequestEntry.getId())); } catch (IOException e) { BatchResultErrorEntry batchResultErrorEntry = new BatchResultErrorEntry(). withSenderFault(true). withId(batchRequestEntry.getId()). withMessage(e.getMessage()); batchResultErrorEntries.add(batchResultErrorEntry); } } return new DeleteMessageBatchResult().withFailed(batchResultErrorEntries).withSuccessful(batchResultEntries); }
@VisibleForTesting static DeleteMessageBatchRequest createRequest(String queueUrl, Map<String, DeleteMessageEntry> entries) { return new DeleteMessageBatchRequest() .withQueueUrl(queueUrl) .withEntries(entries.entrySet().stream() .map(keyValue -> new DeleteMessageBatchRequestEntry() .withId(keyValue.getKey()) .withReceiptHandle(keyValue.getValue().getReceiptHandle()) ).collect(Collectors.toList())); }
@Override public DeleteMessageBatchRequest createDeleteMessageBatchRequest(String queueUrl, List<Message> messages) { final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(messages.size()); for (final Message message : messages) { final DeleteMessageBatchRequestEntry entry = this.createDeleteMessageBatchRequestEntry(message); entries.add(entry); } final DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(queueUrl); request.setEntries(entries); return request; }
@Override public DeleteMessageBatchResult deleteMessages(DeleteMessageBatchRequest request, ResultCapture<DeleteMessageBatchResult> extractor) { ActionResult result = resource.performAction("DeleteMessages", request, extractor); if (result == null) return null; return (DeleteMessageBatchResult) result.getData(); }
@Override public DeleteMessageBatchResult deleteMessages( ResultCapture<DeleteMessageBatchResult> extractor) { DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(); return deleteMessages(request, extractor); }
@Override public DeleteMessageBatchResult deleteMessages( List<DeleteMessageBatchRequestEntry> entries, ResultCapture<DeleteMessageBatchResult> extractor) { DeleteMessageBatchRequest request = new DeleteMessageBatchRequest() .withEntries(entries); return deleteMessages(request, extractor); }
/** * Acknowledges up to 10 messages via calling * <code>deleteMessageBatch</code>. */ @Override public void action(String queueUrl, List<String> receiptHandles) throws JMSException { if (receiptHandles == null || receiptHandles.isEmpty()) { return; } List<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntries = new ArrayList<DeleteMessageBatchRequestEntry>(); int batchId = 0; for (String receiptHandle : receiptHandles) { // Remove the message from queue of unAckMessages unAckMessages.poll(); DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry( Integer.toString(batchId), receiptHandle); deleteMessageBatchRequestEntries.add(entry); batchId++; } DeleteMessageBatchRequest deleteMessageBatchRequest = new DeleteMessageBatchRequest( queueUrl, deleteMessageBatchRequestEntries); /** * TODO: If one of the batch calls fail, then the remaining messages on * the batch will not be deleted, and will be visible and delivered as * duplicate after visibility timeout expires. */ amazonSQSClient.deleteMessageBatch(deleteMessageBatchRequest); }
@Test(expected = JMSException.class) public void testDeleteMessageBatchThrowAmazonClientException() throws JMSException { DeleteMessageBatchRequest deleteMessageBatchRequest = new DeleteMessageBatchRequest(); doThrow(new AmazonClientException("ace")) .when(amazonSQSClient).deleteMessageBatch(eq(deleteMessageBatchRequest)); wrapper.deleteMessageBatch(deleteMessageBatchRequest); }
@Test(expected = JMSException.class) public void testDeleteMessageBatchThrowAmazonServiceException() throws JMSException { DeleteMessageBatchRequest deleteMessageBatchRequest = new DeleteMessageBatchRequest(); doThrow(new AmazonServiceException("ase")) .when(amazonSQSClient).deleteMessageBatch(eq(deleteMessageBatchRequest)); wrapper.deleteMessageBatch(deleteMessageBatchRequest); }
private void delete(String queue, List<Message> messages) { List<DeleteMessageBatchRequestEntry> deleteRequests = new ArrayList<DeleteMessageBatchRequestEntry>(); for(Message m : messages) { deleteRequests.add(new DeleteMessageBatchRequestEntry().withId(m.getMessageId()).withReceiptHandle(m.getReceiptHandle())); } log.info(format("Deleting %s messages", deleteRequests.size())); DeleteMessageBatchRequest batchDelete = new DeleteMessageBatchRequest(); batchDelete.setQueueUrl(queue); batchDelete.setEntries(deleteRequests); sqs.deleteMessageBatch(batchDelete); }
@Test public void testCreateRequest() { DeleteMessageBatchRequest request = DeleteMessageBatchAction.createRequest(QUEUE_URL, ENTRY_MAP); assertThat(request.getQueueUrl()).isEqualTo(QUEUE_URL); assertThat(request.getEntries().size()).isEqualTo(ENTRY_MAP.size()); }
@Test public void testBulkSendDelete_shouldWork() { // create queue CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue")); // send batch SendMessageBatchRequestEntry firstRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("one") .withMessageGroupId("groupee").withMessageBody("{\"XOXO\":234}"); SendMessageBatchRequestEntry secondRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("two") .withMessageGroupId("groupee").withMessageBody("{\"Quinoa\":\"Readymade\",\"vegan\":true}"); SendMessageBatchRequestEntry thirdRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("three") .withMessageGroupId("groupee").withMessageBody("{\"VHS\":\"street art slow-carb\"}"); // verify send batch result SendMessageBatchResult sendResult = sqs.sendMessageBatch(new SendMessageBatchRequest().withQueueUrl(createdQueue.getQueueUrl()) .withEntries(ImmutableList.of(firstRequest,secondRequest, thirdRequest))); assertNotNull("verify that batch send returned ok", sendResult); assertTrue("no request failed",sendResult.getFailed().isEmpty()); assertEquals("verify successfull message count", 3, sendResult.getSuccessful().size()); SendMessageBatchResultEntry firstResultEntry = sendResult.getSuccessful().stream().filter(msg -> msg.getId().equals("one")).findAny().get(); assertEquals("verify correct message MD5",getAwsMessageMD5("{\"XOXO\":234}"),firstResultEntry.getMD5OfMessageBody()); assertNotNull("verify message id exists",firstResultEntry.getMessageId()); ReceiveMessageResult receivedMessagesResult = sqs.receiveMessage(new ReceiveMessageRequest().withQueueUrl(createdQueue.getQueueUrl()).withMaxNumberOfMessages(4)); // delete batch List<DeleteMessageBatchRequestEntry> deleteRequests = new ArrayList<>(); deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("one").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle())); deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("two").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle())); deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("three").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle())); DeleteMessageBatchResult deleteBatchResult = sqs.deleteMessageBatch(new DeleteMessageBatchRequest().withQueueUrl(createdQueue.getQueueUrl()).withEntries(deleteRequests)); // verify delete batch result assertNotNull("verify that batch delete returned ok", deleteBatchResult); assertTrue("no request failed",deleteBatchResult.getFailed().isEmpty()); assertEquals("verify successfull message count", 3, deleteBatchResult.getSuccessful().size()); assertTrue("queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getMessageQueue().isEmpty()); for(Message message : receivedMessagesResult.getMessages()) { assertTrue("invisibility-queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getInvisibilityQueueFor(message.getReceiptHandle()).isEmpty()); } // cleanup getQueues().remove("tea-earl-grey-queue"); }
@Test public void testBulkDelete_withEmptyRequestParams_shouldWork() { assertNotNull(sqs.deleteMessageBatch(new DeleteMessageBatchRequest())); }
@Override public DeleteMessageBatchRequest createDeleteMessageBatchRequest(final SQSQueue queue, final List<Message> messages) { return createDeleteMessageBatchRequest(queue.getUrl(), messages); }
public Observable<DeleteMessageBatchResult> deleteMessageBatchAsync(DeleteMessageBatchRequest request) { return Observable.from(sqsClient.deleteMessageBatchAsync(request)); }
protected void doReceive() { // This is where the interesting stuff happens while (isListening()) { synchronized (this.monitor) { try { this.monitor.wait(this.getVisibilityTimeout() * 1000); } catch (InterruptedException e) { } } boolean messagesReceived = false; do { ReceiveMessageRequest request = new ReceiveMessageRequest().withQueueUrl(this.queueUrl) .withWaitTimeSeconds(1).withMaxNumberOfMessages(10); ReceiveMessageResult result = sqs.receiveMessage(request); List<Message> messages = result.getMessages(); messagesReceived = messages.size() > 0; if (!messagesReceived) { break; } List<DeleteMessageBatchRequestEntry> deletes = new ArrayList<DeleteMessageBatchRequestEntry>(); for (Message message : messages) { String messageBody = message.getBody(); try { this.messageConsumer.accept(messageBody); DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry( UUID.randomUUID().toString(), message.getReceiptHandle()); deletes.add(entry); } catch (Throwable exp) { Logger.getLogger(getSqsQueueName()).log(Level.WARNING, "Could not process message: " + messageBody, exp); } } if (!deletes.isEmpty()) { DeleteMessageBatchRequest deleteBatch = new DeleteMessageBatchRequest(this.queueUrl, deletes); sqs.deleteMessageBatch(deleteBatch); } } while (messagesReceived); } }
@Override public DeleteMessageBatchResult deleteMessages(DeleteMessageBatchRequest request) { return deleteMessages(request, null); }
/** * Test acknowledge 25 first un-acknowledge messages */ @Test public void testOneAck() throws JMSException { int populateMessageSize = 34; populateMessage(populateMessageSize); int ackMessage = 25; testAcknowledge(populateMessageSize, ackMessage); ArgumentCaptor<DeleteMessageBatchRequest> argumentCaptor = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class); verify(amazonSQSClient, times(5)).deleteMessageBatch(argumentCaptor.capture()); //key is the queue url //value is the sequence of sizes of expected batches Map<String, List<Integer>> expectedCalls = new HashMap<String, List<Integer>>(); List<Integer> queue0Calls = new ArrayList<Integer>(); queue0Calls.add(10); queue0Calls.add(1); expectedCalls.put(baseQueueUrl + 0, queue0Calls); List<Integer> queue1Calls = new ArrayList<Integer>(); queue1Calls.add(10); queue1Calls.add(1); expectedCalls.put(baseQueueUrl + 1, queue1Calls); List<Integer> queue2Calls = new ArrayList<Integer>(); queue2Calls.add(4); expectedCalls.put(baseQueueUrl + 2, queue2Calls); for (DeleteMessageBatchRequest request : argumentCaptor.getAllValues()) { String queueUrl = request.getQueueUrl(); List<Integer> expectedSequence = expectedCalls.get(queueUrl); assertNotNull(expectedSequence); assertTrue(expectedSequence.size() > 0); assertEquals(expectedSequence.get(0).intValue(), request.getEntries().size()); expectedSequence.remove(0); if (expectedSequence.isEmpty()) { expectedCalls.remove(queueUrl); } } assertTrue(expectedCalls.isEmpty()); }
/** * <p> * Deletes up to ten messages from the specified queue. This is a batch * version of DeleteMessage. The result of the delete action on each message * is reported individually in the response. Also deletes the message * payloads from Amazon S3 when necessary. * </p> * <p> * <b>IMPORTANT:</b> Because the batch request can result in a combination * of successful and unsuccessful actions, you should check for batch errors * even when the call returns an HTTP status code of 200. * </p> * <p> * <b>NOTE:</b>Some API actions take lists of parameters. These lists are * specified using the param.n notation. Values of n are integers starting * from 1. For example, a parameter list with two elements looks like this: * </p> * <p> * <code>&Attribute.1=this</code> * </p> * <p> * <code>&Attribute.2=that</code> * </p> * * @param deleteMessageBatchRequest * Container for the necessary parameters to execute the * DeleteMessageBatch service method on AmazonSQS. * * @return The response from the DeleteMessageBatch service method, as * returned by AmazonSQS. * * @throws BatchEntryIdsNotDistinctException * @throws TooManyEntriesInBatchRequestException * @throws InvalidBatchEntryIdException * @throws EmptyBatchRequestException * * @throws AmazonClientException * If any internal errors are encountered inside the client * while attempting to make the request or handle the response. * For example if a network connection is not available. * @throws AmazonServiceException * If an error response is returned by AmazonSQS indicating * either a problem with the data in the request, or a server * side issue. */ public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) { if (deleteMessageBatchRequest == null) { String errorMessage = "deleteMessageBatchRequest cannot be null."; LOG.error(errorMessage); throw new AmazonClientException(errorMessage); } deleteMessageBatchRequest.getRequestClientOptions().appendUserAgent( SQSExtendedClientConstants.USER_AGENT_HEADER); if (!clientConfiguration.isLargePayloadSupportEnabled()) { return super.deleteMessageBatch(deleteMessageBatchRequest); } for (DeleteMessageBatchRequestEntry entry : deleteMessageBatchRequest.getEntries()) { String receiptHandle = entry.getReceiptHandle(); String origReceiptHandle = receiptHandle; if (isS3ReceiptHandle(receiptHandle)) { deleteMessagePayloadFromS3(receiptHandle); origReceiptHandle = getOrigReceiptHandle(receiptHandle); } entry.setReceiptHandle(origReceiptHandle); } return super.deleteMessageBatch(deleteMessageBatchRequest); }
/** * Calls <code>deleteMessageBatch</code> and wraps * <code>AmazonClientException</code>. This is used to acknowledge multiple * messages on client_acknowledge mode, so that they can be deleted from SQS * queue. * * @param deleteMessageBatchRequest * Container for the necessary parameters to execute the * deleteMessageBatch service method on AmazonSQS. This is the * batch version of deleteMessage. Max batch size is 10. * @return The response from the deleteMessageBatch service method, as * returned by AmazonSQS * @throws JMSException */ public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws JMSException { try { prepareRequest(deleteMessageBatchRequest); return amazonSQSClient.deleteMessageBatch(deleteMessageBatchRequest); } catch (AmazonClientException e) { throw handleException(e, "deleteMessageBatch"); } }
/** * <p> * Deletes up to ten messages from the specified queue. This is a batch * version of DeleteMessage. The result of the delete action on each message * is reported individually in the response. * </p> * <p> * <b>IMPORTANT:</b> Because the batch request can result in a combination * of successful and unsuccessful actions, you should check for batch errors * even when the call returns an HTTP status code of 200. * </p> * <p> * <b>NOTE:</b>Some API actions take lists of parameters. These lists are * specified using the param.n notation. Values of n are integers starting * from 1. For example, a parameter list with two elements looks like this: * </p> * <p> * <code>&Attribute.1=this</code> * </p> * <p> * <code>&Attribute.2=that</code> * </p> * * @param deleteMessageBatchRequest * Container for the necessary parameters to execute the * DeleteMessageBatch service method on AmazonSQS. * * @return The response from the DeleteMessageBatch service method, as * returned by AmazonSQS. * * @throws BatchEntryIdsNotDistinctException * @throws TooManyEntriesInBatchRequestException * @throws InvalidBatchEntryIdException * @throws EmptyBatchRequestException * * @throws AmazonClientException * If any internal errors are encountered inside the client * while attempting to make the request or handle the response. * For example if a network connection is not available. * @throws AmazonServiceException * If an error response is returned by AmazonSQS indicating * either a problem with the data in the request, or a server * side issue. */ public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws AmazonServiceException, AmazonClientException { return amazonSqsToBeExtended.deleteMessageBatch(deleteMessageBatchRequest); }
/** * <p> * Deletes up to ten messages from the specified queue. This is a batch * version of DeleteMessage. The result of the delete action on each message * is reported individually in the response. Also deletes the message * payloads from Amazon S3 when necessary. * </p> * <p> * <b>IMPORTANT:</b> Because the batch request can result in a combination * of successful and unsuccessful actions, you should check for batch errors * even when the call returns an HTTP status code of 200. * </p> * <p> * <b>NOTE:</b>Some API actions take lists of parameters. These lists are * specified using the param.n notation. Values of n are integers starting * from 1. For example, a parameter list with two elements looks like this: * </p> * <p> * <code>&Attribute.1=this</code> * </p> * <p> * <code>&Attribute.2=that</code> * </p> * * @param queueUrl * The URL of the Amazon SQS queue to take action on. * @param entries * A list of receipt handles for the messages to be deleted. * * @return The response from the DeleteMessageBatch service method, as * returned by AmazonSQS. * * @throws BatchEntryIdsNotDistinctException * @throws TooManyEntriesInBatchRequestException * @throws InvalidBatchEntryIdException * @throws EmptyBatchRequestException * * @throws AmazonClientException * If any internal errors are encountered inside the client * while attempting to make the request or handle the response. * For example if a network connection is not available. * @throws AmazonServiceException * If an error response is returned by AmazonSQS indicating * either a problem with the data in the request, or a server * side issue. */ public DeleteMessageBatchResult deleteMessageBatch(String queueUrl, List<DeleteMessageBatchRequestEntry> entries) { DeleteMessageBatchRequest deleteMessageBatchRequest = new DeleteMessageBatchRequest(queueUrl, entries); return deleteMessageBatch(deleteMessageBatchRequest); }
/** * Performs the <code>DeleteMessages</code> action. * * <p> * The following request parameters will be populated from the data of this * <code>Queue</code> resource, and any conflicting parameter value set in * the request will be overridden: * <ul> * <li> * <b><code>QueueUrl</code></b> * - mapped from the <code>Url</code> identifier. * </li> * </ul> * * <p> * * @return The response of the low-level client operation associated with * this resource action. * @see DeleteMessageBatchRequest */ DeleteMessageBatchResult deleteMessages(DeleteMessageBatchRequest request);
/** * Performs the <code>DeleteMessages</code> action and use a ResultCapture * to retrieve the low-level client response. * * <p> * The following request parameters will be populated from the data of this * <code>Queue</code> resource, and any conflicting parameter value set in * the request will be overridden: * <ul> * <li> * <b><code>QueueUrl</code></b> * - mapped from the <code>Url</code> identifier. * </li> * </ul> * * <p> * * @return The response of the low-level client operation associated with * this resource action. * @see DeleteMessageBatchRequest */ DeleteMessageBatchResult deleteMessages(DeleteMessageBatchRequest request, ResultCapture<DeleteMessageBatchResult> extractor);
DeleteMessageBatchRequest createDeleteMessageBatchRequest(final SQSQueue queue, final List<Message> messages);
DeleteMessageBatchRequest createDeleteMessageBatchRequest(final String queueUrl, final List<Message> messages);