private List<String> delete(List<Message> messages) { if (messages == null || messages.isEmpty()) { return null; } DeleteMessageBatchRequest batch = new DeleteMessageBatchRequest().withQueueUrl(queueURL); List<DeleteMessageBatchRequestEntry> entries = batch.getEntries(); messages.stream().forEach(m -> entries.add(new DeleteMessageBatchRequestEntry().withId(m.getId()).withReceiptHandle(m.getReceipt()))); DeleteMessageBatchResult result = client.deleteMessageBatch(batch); List<String> failures = result.getFailed().stream().map(fm -> fm.getId()).collect(Collectors.toList()); logger.debug("failed to delete: {}", failures); return failures; }
@Override public void releaseCompleteScanRanges(Collection<ScanRangeComplete> completions) { if (completions.isEmpty()) { return; } int id = 0; List<DeleteMessageBatchRequestEntry> entries = Lists.newArrayListWithCapacity(completions.size()); for (ScanRangeComplete completion : completions) { entries.add( new DeleteMessageBatchRequestEntry() .withId(String.valueOf(id++)) .withReceiptHandle(((QueueScanRangeComplete) completion).getMessageId())); } _sqs.deleteMessageBatch(new DeleteMessageBatchRequest() .withQueueUrl(getQueueUrl(_completeScanRangeQueue)) .withEntries(entries)); }
/*** * Handle Messages * @param queue * @param sqsMessages */ private void handleMessages(final String queue, final List<com.amazonaws.services.sqs.model.Message> sqsMessages) { final List<Message> messages = fromSqsMessages(queue, sqsMessages); if(CollectionUtil.hasElements(messages)){ this.handler.handleBatch(messages); final List<DeleteMessageBatchRequestEntry> deleteEntries = new ArrayList<DeleteMessageBatchRequestEntry>(); /*** TODO Allow the caller to specify messages to delete **/ for(com.amazonaws.services.sqs.model.Message sqsMessage : sqsMessages){ final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry(sqsMessage.getMessageId(), sqsMessage.getReceiptHandle()); deleteEntries.add(entry); } /** Delete the message batch - TODO - This should actually Respect the feedback given from the handler on which messages to delete**/ this.client.deleteMessageBatch( new DeleteMessageBatchRequest(this.queueUrl) .withEntries(deleteEntries)); } }
/** * Tests a simple send, receive and delete of a message from the queue * resource. Asserts the message contents and its associated attributes. */ @Test @Ignore public void testSendReceiveDelete() throws InterruptedException { SendMessageResult sendMessageResult = queue.sendMessage(TEST_MESSAGE); assertNotNull(sendMessageResult); assertNotNull(sendMessageResult.getMessageId()); List<Message> messages = waitForMessagesFromQueue(null); assertNotNull(messages); assertEquals(1, messages.size()); Message message = messages.get(0); assertMessage(TEST_MESSAGE, sendMessageResult.getMessageId(), sendMessageResult.getMD5OfMessageBody(), message); queue.deleteMessages(new DeleteMessageBatchRequest() .withEntries(new DeleteMessageBatchRequestEntry("msg1", message .getReceiptHandle()))); }
@Override public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws AmazonClientException { DirectorySQSQueue queue = getQueueFromUrl(deleteMessageBatchRequest.getQueueUrl(), false); //lists for reporting List<BatchResultErrorEntry> batchResultErrorEntries = new ArrayList<>(); List<DeleteMessageBatchResultEntry> batchResultEntries = new ArrayList<>(); //attempt delete on each for (DeleteMessageBatchRequestEntry batchRequestEntry : deleteMessageBatchRequest.getEntries()) { try { queue.delete(batchRequestEntry.getReceiptHandle()); batchResultEntries.add(new DeleteMessageBatchResultEntry().withId(batchRequestEntry.getId())); } catch (IOException e) { BatchResultErrorEntry batchResultErrorEntry = new BatchResultErrorEntry(). withSenderFault(true). withId(batchRequestEntry.getId()). withMessage(e.getMessage()); batchResultErrorEntries.add(batchResultErrorEntry); } } return new DeleteMessageBatchResult().withFailed(batchResultErrorEntries).withSuccessful(batchResultEntries); }
@VisibleForTesting static DeleteMessageBatchRequest createRequest(String queueUrl, Map<String, DeleteMessageEntry> entries) { return new DeleteMessageBatchRequest() .withQueueUrl(queueUrl) .withEntries(entries.entrySet().stream() .map(keyValue -> new DeleteMessageBatchRequestEntry() .withId(keyValue.getKey()) .withReceiptHandle(keyValue.getValue().getReceiptHandle()) ).collect(Collectors.toList())); }
@Override public DeleteMessageBatchRequest createDeleteMessageBatchRequest(String queueUrl, List<Message> messages) { final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(messages.size()); for (final Message message : messages) { final DeleteMessageBatchRequestEntry entry = this.createDeleteMessageBatchRequestEntry(message); entries.add(entry); } final DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(queueUrl); request.setEntries(entries); return request; }
private void process(final List<Message> messages) { if (messages.size() == 0) return; final List<DeleteMessageBatchRequestEntry> toDelete = new ArrayList<>(); int count = 0; for (Message message : messages) { final String deleteId = String.valueOf(count++); try { String stringBody = message.getBody(); if (stringBody.isEmpty()) continue; // allow plain-text json, but permit base64 encoded thrift or json byte[] spans = stringBody.charAt(0) == '[' ? stringBody.getBytes(UTF_8) : Base64.decode(stringBody); collector.acceptSpans(spans, DETECTING_DECODER, new Callback<Void>() { @Override public void onSuccess(Void value) { toDelete.add(new DeleteMessageBatchRequestEntry(deleteId, message.getReceiptHandle())); } @Override public void onError(Throwable t) { // don't delete messages. this will allow accept calls retry once the // messages are marked visible by sqs. logger.log(Level.WARNING, "collector accept failed", t); } }); } catch (RuntimeException | Error e) { logger.log(Level.WARNING, "message decoding failed", e); toDelete.add(new DeleteMessageBatchRequestEntry(deleteId, message.getReceiptHandle())); } } delete(toDelete); }
/** * Pulls a number of messages from an SQS queue. * @param queueURL the URL of the SQS queue * @param numberOfMessages the number of messages to pull * @return a list of messages */ public static List<String> pullMessages(String queueURL, int numberOfMessages) { List<String> messages = new ArrayList<>(); if (!StringUtils.isBlank(queueURL)) { try { int batchSteps = 1; int maxForBatch = numberOfMessages; if ((numberOfMessages > MAX_MESSAGES)) { batchSteps = (numberOfMessages / MAX_MESSAGES) + ((numberOfMessages % MAX_MESSAGES > 0) ? 1 : 0); maxForBatch = MAX_MESSAGES; } for (int i = 0; i < batchSteps; i++) { List<Message> list = getClient().receiveMessage(new ReceiveMessageRequest(queueURL). withMaxNumberOfMessages(maxForBatch).withWaitTimeSeconds(POLLING_INTERVAL)).getMessages(); if (list != null && !list.isEmpty()) { List<DeleteMessageBatchRequestEntry> del = new ArrayList<>(); for (Message msg : list) { messages.add(msg.getBody()); del.add(new DeleteMessageBatchRequestEntry(msg.getMessageId(), msg.getReceiptHandle())); } getClient().deleteMessageBatch(queueURL, del); } } } catch (AmazonServiceException ase) { logException(ase); } catch (AmazonClientException ace) { logger.error("Could not reach SQS. {}", ace.toString()); } } return messages; }
@Override public DeleteMessageBatchResult deleteMessages( List<DeleteMessageBatchRequestEntry> entries) { return deleteMessages(entries, (ResultCapture<DeleteMessageBatchResult>)null); }
@Override public DeleteMessageBatchResult deleteMessages( List<DeleteMessageBatchRequestEntry> entries, ResultCapture<DeleteMessageBatchResult> extractor) { DeleteMessageBatchRequest request = new DeleteMessageBatchRequest() .withEntries(entries); return deleteMessages(request, extractor); }
/** * Acknowledges up to 10 messages via calling * <code>deleteMessageBatch</code>. */ @Override public void action(String queueUrl, List<String> receiptHandles) throws JMSException { if (receiptHandles == null || receiptHandles.isEmpty()) { return; } List<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntries = new ArrayList<DeleteMessageBatchRequestEntry>(); int batchId = 0; for (String receiptHandle : receiptHandles) { // Remove the message from queue of unAckMessages unAckMessages.poll(); DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry( Integer.toString(batchId), receiptHandle); deleteMessageBatchRequestEntries.add(entry); batchId++; } DeleteMessageBatchRequest deleteMessageBatchRequest = new DeleteMessageBatchRequest( queueUrl, deleteMessageBatchRequestEntries); /** * TODO: If one of the batch calls fail, then the remaining messages on * the batch will not be deleted, and will be visible and delivered as * duplicate after visibility timeout expires. */ amazonSQSClient.deleteMessageBatch(deleteMessageBatchRequest); }
private void delete(String queue, List<Message> messages) { List<DeleteMessageBatchRequestEntry> deleteRequests = new ArrayList<DeleteMessageBatchRequestEntry>(); for(Message m : messages) { deleteRequests.add(new DeleteMessageBatchRequestEntry().withId(m.getMessageId()).withReceiptHandle(m.getReceiptHandle())); } log.info(format("Deleting %s messages", deleteRequests.size())); DeleteMessageBatchRequest batchDelete = new DeleteMessageBatchRequest(); batchDelete.setQueueUrl(queue); batchDelete.setEntries(deleteRequests); sqs.deleteMessageBatch(batchDelete); }
@Test public void testBulkSendDelete_shouldWork() { // create queue CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue")); // send batch SendMessageBatchRequestEntry firstRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("one") .withMessageGroupId("groupee").withMessageBody("{\"XOXO\":234}"); SendMessageBatchRequestEntry secondRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("two") .withMessageGroupId("groupee").withMessageBody("{\"Quinoa\":\"Readymade\",\"vegan\":true}"); SendMessageBatchRequestEntry thirdRequest = new SendMessageBatchRequestEntry().withDelaySeconds(0).withId("three") .withMessageGroupId("groupee").withMessageBody("{\"VHS\":\"street art slow-carb\"}"); // verify send batch result SendMessageBatchResult sendResult = sqs.sendMessageBatch(new SendMessageBatchRequest().withQueueUrl(createdQueue.getQueueUrl()) .withEntries(ImmutableList.of(firstRequest,secondRequest, thirdRequest))); assertNotNull("verify that batch send returned ok", sendResult); assertTrue("no request failed",sendResult.getFailed().isEmpty()); assertEquals("verify successfull message count", 3, sendResult.getSuccessful().size()); SendMessageBatchResultEntry firstResultEntry = sendResult.getSuccessful().stream().filter(msg -> msg.getId().equals("one")).findAny().get(); assertEquals("verify correct message MD5",getAwsMessageMD5("{\"XOXO\":234}"),firstResultEntry.getMD5OfMessageBody()); assertNotNull("verify message id exists",firstResultEntry.getMessageId()); ReceiveMessageResult receivedMessagesResult = sqs.receiveMessage(new ReceiveMessageRequest().withQueueUrl(createdQueue.getQueueUrl()).withMaxNumberOfMessages(4)); // delete batch List<DeleteMessageBatchRequestEntry> deleteRequests = new ArrayList<>(); deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("one").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle())); deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("two").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle())); deleteRequests.add(new DeleteMessageBatchRequestEntry().withId("three").withReceiptHandle(receivedMessagesResult.getMessages().get(0).getReceiptHandle())); DeleteMessageBatchResult deleteBatchResult = sqs.deleteMessageBatch(new DeleteMessageBatchRequest().withQueueUrl(createdQueue.getQueueUrl()).withEntries(deleteRequests)); // verify delete batch result assertNotNull("verify that batch delete returned ok", deleteBatchResult); assertTrue("no request failed",deleteBatchResult.getFailed().isEmpty()); assertEquals("verify successfull message count", 3, deleteBatchResult.getSuccessful().size()); assertTrue("queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getMessageQueue().isEmpty()); for(Message message : receivedMessagesResult.getMessages()) { assertTrue("invisibility-queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getInvisibilityQueueFor(message.getReceiptHandle()).isEmpty()); } // cleanup getQueues().remove("tea-earl-grey-queue"); }
private DeleteMessageBatchRequestEntry createDeleteMessageBatchRequestEntry(final Message message) { final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry(); entry.setReceiptHandle(message.getReceiptHandle()); entry.setId(message.getMessageId()); return entry; }
public Observable<DeleteMessageBatchResult> deleteMessageBatchAsync(String queueUrl, List<DeleteMessageBatchRequestEntry> entries) { return Observable.from(sqsClient.deleteMessageBatchAsync(queueUrl, entries)); }
private DeleteMessageBatchResult delete(List<DeleteMessageBatchRequestEntry> entries) { return client.deleteMessageBatch(queueUrl, entries); }
protected void doReceive() { // This is where the interesting stuff happens while (isListening()) { synchronized (this.monitor) { try { this.monitor.wait(this.getVisibilityTimeout() * 1000); } catch (InterruptedException e) { } } boolean messagesReceived = false; do { ReceiveMessageRequest request = new ReceiveMessageRequest().withQueueUrl(this.queueUrl) .withWaitTimeSeconds(1).withMaxNumberOfMessages(10); ReceiveMessageResult result = sqs.receiveMessage(request); List<Message> messages = result.getMessages(); messagesReceived = messages.size() > 0; if (!messagesReceived) { break; } List<DeleteMessageBatchRequestEntry> deletes = new ArrayList<DeleteMessageBatchRequestEntry>(); for (Message message : messages) { String messageBody = message.getBody(); try { this.messageConsumer.accept(messageBody); DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry( UUID.randomUUID().toString(), message.getReceiptHandle()); deletes.add(entry); } catch (Throwable exp) { Logger.getLogger(getSqsQueueName()).log(Level.WARNING, "Could not process message: " + messageBody, exp); } } if (!deletes.isEmpty()) { DeleteMessageBatchRequest deleteBatch = new DeleteMessageBatchRequest(this.queueUrl, deletes); sqs.deleteMessageBatch(deleteBatch); } } while (messagesReceived); } }
/** * <p> * Deletes up to ten messages from the specified queue. This is a batch * version of DeleteMessage. The result of the delete action on each message * is reported individually in the response. Also deletes the message * payloads from Amazon S3 when necessary. * </p> * <p> * <b>IMPORTANT:</b> Because the batch request can result in a combination * of successful and unsuccessful actions, you should check for batch errors * even when the call returns an HTTP status code of 200. * </p> * <p> * <b>NOTE:</b>Some API actions take lists of parameters. These lists are * specified using the param.n notation. Values of n are integers starting * from 1. For example, a parameter list with two elements looks like this: * </p> * <p> * <code>&Attribute.1=this</code> * </p> * <p> * <code>&Attribute.2=that</code> * </p> * * @param deleteMessageBatchRequest * Container for the necessary parameters to execute the * DeleteMessageBatch service method on AmazonSQS. * * @return The response from the DeleteMessageBatch service method, as * returned by AmazonSQS. * * @throws BatchEntryIdsNotDistinctException * @throws TooManyEntriesInBatchRequestException * @throws InvalidBatchEntryIdException * @throws EmptyBatchRequestException * * @throws AmazonClientException * If any internal errors are encountered inside the client * while attempting to make the request or handle the response. * For example if a network connection is not available. * @throws AmazonServiceException * If an error response is returned by AmazonSQS indicating * either a problem with the data in the request, or a server * side issue. */ public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) { if (deleteMessageBatchRequest == null) { String errorMessage = "deleteMessageBatchRequest cannot be null."; LOG.error(errorMessage); throw new AmazonClientException(errorMessage); } deleteMessageBatchRequest.getRequestClientOptions().appendUserAgent( SQSExtendedClientConstants.USER_AGENT_HEADER); if (!clientConfiguration.isLargePayloadSupportEnabled()) { return super.deleteMessageBatch(deleteMessageBatchRequest); } for (DeleteMessageBatchRequestEntry entry : deleteMessageBatchRequest.getEntries()) { String receiptHandle = entry.getReceiptHandle(); String origReceiptHandle = receiptHandle; if (isS3ReceiptHandle(receiptHandle)) { deleteMessagePayloadFromS3(receiptHandle); origReceiptHandle = getOrigReceiptHandle(receiptHandle); } entry.setReceiptHandle(origReceiptHandle); } return super.deleteMessageBatch(deleteMessageBatchRequest); }
/** * <p> * Deletes up to ten messages from the specified queue. This is a batch * version of DeleteMessage. The result of the delete action on each message * is reported individually in the response. * </p> * <p> * <b>IMPORTANT:</b> Because the batch request can result in a combination * of successful and unsuccessful actions, you should check for batch errors * even when the call returns an HTTP status code of 200. * </p> * <p> * <b>NOTE:</b>Some API actions take lists of parameters. These lists are * specified using the param.n notation. Values of n are integers starting * from 1. For example, a parameter list with two elements looks like this: * </p> * <p> * <code>&Attribute.1=this</code> * </p> * <p> * <code>&Attribute.2=that</code> * </p> * * @param queueUrl * The URL of the Amazon SQS queue to take action on. * @param entries * A list of receipt handles for the messages to be deleted. * * @return The response from the DeleteMessageBatch service method, as * returned by AmazonSQS. * * @throws BatchEntryIdsNotDistinctException * @throws TooManyEntriesInBatchRequestException * @throws InvalidBatchEntryIdException * @throws EmptyBatchRequestException * * @throws AmazonClientException * If any internal errors are encountered inside the client * while attempting to make the request or handle the response. * For example if a network connection is not available. * @throws AmazonServiceException * If an error response is returned by AmazonSQS indicating * either a problem with the data in the request, or a server * side issue. */ public DeleteMessageBatchResult deleteMessageBatch(String queueUrl, List<DeleteMessageBatchRequestEntry> entries) throws AmazonServiceException, AmazonClientException { return amazonSqsToBeExtended.deleteMessageBatch(queueUrl, entries); }
/** * <p> * Deletes up to ten messages from the specified queue. This is a batch * version of DeleteMessage. The result of the delete action on each message * is reported individually in the response. Also deletes the message * payloads from Amazon S3 when necessary. * </p> * <p> * <b>IMPORTANT:</b> Because the batch request can result in a combination * of successful and unsuccessful actions, you should check for batch errors * even when the call returns an HTTP status code of 200. * </p> * <p> * <b>NOTE:</b>Some API actions take lists of parameters. These lists are * specified using the param.n notation. Values of n are integers starting * from 1. For example, a parameter list with two elements looks like this: * </p> * <p> * <code>&Attribute.1=this</code> * </p> * <p> * <code>&Attribute.2=that</code> * </p> * * @param queueUrl * The URL of the Amazon SQS queue to take action on. * @param entries * A list of receipt handles for the messages to be deleted. * * @return The response from the DeleteMessageBatch service method, as * returned by AmazonSQS. * * @throws BatchEntryIdsNotDistinctException * @throws TooManyEntriesInBatchRequestException * @throws InvalidBatchEntryIdException * @throws EmptyBatchRequestException * * @throws AmazonClientException * If any internal errors are encountered inside the client * while attempting to make the request or handle the response. * For example if a network connection is not available. * @throws AmazonServiceException * If an error response is returned by AmazonSQS indicating * either a problem with the data in the request, or a server * side issue. */ public DeleteMessageBatchResult deleteMessageBatch(String queueUrl, List<DeleteMessageBatchRequestEntry> entries) { DeleteMessageBatchRequest deleteMessageBatchRequest = new DeleteMessageBatchRequest(queueUrl, entries); return deleteMessageBatch(deleteMessageBatchRequest); }
/** * The convenient method form for the <code>DeleteMessages</code> action. * * @see #deleteMessages(DeleteMessageBatchRequest) */ DeleteMessageBatchResult deleteMessages(List<DeleteMessageBatchRequestEntry> entries);
/** * The convenient method form for the <code>DeleteMessages</code> action. * * @see #deleteMessages(DeleteMessageBatchRequest, ResultCapture) */ DeleteMessageBatchResult deleteMessages(List<DeleteMessageBatchRequestEntry> entries, ResultCapture<DeleteMessageBatchResult> extractor);