public List<QueueMessage> getMessages() { ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(_queueDns); List<Message> messages = _sqs.receiveMessage(receiveMessageRequest).getMessages(); List<QueueMessage> deserializedMessages = new ArrayList<>(); for (Message message : messages) { String body = message.getBody(); QueueMessage qm = _gson.fromJson(body, QueueMessage.class); deserializedMessages.add(qm); System.out.println("query time: " + qm.queryExecutionTime); System.out.println("exec time: " + qm.totalExecutionTime); System.out.println("Has ex: " + qm.hasException); System.out.println("ex message: " + qm.exceptionMessage + "\n"); String receiptHandle = message.getReceiptHandle(); _sqs.deleteMessage(new DeleteMessageRequest(_queueDns, receiptHandle)); } return deserializedMessages; }
public List<Span> getSpans(boolean delete) { Stream<Span> spans = Stream.empty(); ReceiveMessageResult result = client.receiveMessage(queueUrl); while(result != null && result.getMessages().size() > 0) { spans = Stream.concat(spans, result.getMessages().stream().flatMap(AmazonSQSRule::decodeSpans) ); result = client.receiveMessage(queueUrl); if (delete) { List<DeleteMessageRequest> deletes = result.getMessages().stream() .map(m -> new DeleteMessageRequest(queueUrl, m.getReceiptHandle())) .collect(Collectors.toList()); deletes.forEach(d -> client.deleteMessage(d)); } } return spans.collect(Collectors.toList()); }
private void pollMessages(AmazonSQS sqs) { log.info("Polling messages"); while (true) { List<Message> messages = sqs.receiveMessage(QUEUE_URL).getMessages(); messages.forEach(m -> { log.info("Message Received: " + m.getBody()); System.out.println(m.getBody()); DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest(QUEUE_URL, m.getReceiptHandle()); sqs.deleteMessage(deleteMessageRequest); }); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); e.printStackTrace(); } } }
/** * Strategy to delete the message after being processed. * * @param exchange the exchange */ protected void processCommit(Exchange exchange) { try { if (shouldDelete(exchange)) { String receiptHandle = exchange.getIn().getHeader(SqsConstants.RECEIPT_HANDLE, String.class); DeleteMessageRequest deleteRequest = new DeleteMessageRequest(getQueueUrl(), receiptHandle); LOG.trace("Deleting message with receipt handle {}...", receiptHandle); getClient().deleteMessage(deleteRequest); LOG.trace("Deleted message with receipt handle {}...", receiptHandle); } } catch (AmazonClientException e) { getExceptionHandler().handleException("Error occurred during deleting message. This exception is ignored.", exchange, e); } }
public List<JobStatusNotification> pollMessageFromQueueByJobId(String queueUrl, String jobId) { ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest() .withQueueUrl(queueUrl) .withMaxNumberOfMessages(MAX_NUMBER_OF_MESSAGES) .withVisibilityTimeout(VISIBILITY_TIMEOUT) .withWaitTimeSeconds(WAIT_TIME_SECONDS); List<JobStatusNotification> jobStatusNotifications = new ArrayList<>(); for (Message message : sqsClient.receiveMessage(receiveMessageRequest).getMessages()) { try { JobStatusNotification jobStatusNotification = parseMessage(message.getBody()); if (jobStatusNotification.getJobId().equalsIgnoreCase(jobId)) { jobStatusNotifications.add(jobStatusNotification); sqsClient.deleteMessage(new DeleteMessageRequest().withQueueUrl(queueUrl).withReceiptHandle(message.getReceiptHandle())); } } catch (IOException e) { logger.error(e.getMessage(), e); } } return jobStatusNotifications; }
@Override public Message<String> receive(long timeout) { ReceiveMessageResult receiveMessageResult = this.amazonSqs.receiveMessage( new ReceiveMessageRequest(this.queueUrl). withMaxNumberOfMessages(1). withWaitTimeSeconds(Long.valueOf(timeout).intValue()). withAttributeNames(ATTRIBUTE_NAMES). withMessageAttributeNames(MESSAGE_ATTRIBUTE_NAMES)); if (receiveMessageResult.getMessages().isEmpty()) { return null; } com.amazonaws.services.sqs.model.Message amazonMessage = receiveMessageResult.getMessages().get(0); Message<String> message = createMessage(amazonMessage); this.amazonSqs.deleteMessage(new DeleteMessageRequest(this.queueUrl, amazonMessage.getReceiptHandle())); return message; }
public static String popFrom(String name) { try { String queueUrl = getConnection().createQueue( new CreateQueueRequest(name)).getQueueUrl(); ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest( queueUrl); receiveMessageRequest.setMaxNumberOfMessages(1); if (null != receiveMessageRequest) { List<Message> messages = getConnection().receiveMessage( receiveMessageRequest).getMessages(); if (messages.size() > 0) { String messageRecieptHandle = messages.get(0) .getReceiptHandle(); getConnection().deleteMessage( new DeleteMessageRequest(receiveMessageRequest .getQueueUrl(), messageRecieptHandle)); return messages.get(0).getBody(); } } } catch (Exception e) { e.printStackTrace(); } return null; }
public void deleteMessage(Message msg) { DeleteMessageRequest request = new DeleteMessageRequest() .withQueueUrl(this.queueUrl) .withReceiptHandle(msg.getReceiptHandle()); this.getClient().deleteMessage(request); }
@Override public void releaseScanRangeTask(ScanRangeTask task) { // Signal that the range is complete signalScanRangeComplete(task.getScanId()); // Ack the task _sqs.deleteMessage(new DeleteMessageRequest() .withQueueUrl(getQueueUrl(_pendingScanRangeQueue)) .withReceiptHandle(((QueueScanRangeTask) task).getMessageId())); }
@Override public DeleteMessageResult deleteMessage(DeleteMessageRequest deleteMessageRequest) throws AmazonClientException { String receiptHandle = deleteMessageRequest.getReceiptHandle(); if (inFlight.containsKey(receiptHandle)) { ScheduledFuture inFlightTask = inFlight.get(receiptHandle); inFlightTask.cancel(true); } return new DeleteMessageResult(); }
public void deleteLastMessage() { if (_lastMessage != null) { // Deletes a message logger.info("Deleting the last message with handle: " + _lastMessage.getReceiptHandle()); _sqs.deleteMessage(new DeleteMessageRequest(_queueURL, _lastMessage.getReceiptHandle())); _lastMessage = null; } }
public void run() { while (receiveMessages) { try { ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(getQueueUrl()); List<Message> messages = amazonSQS.receiveMessage(receiveMessageRequest).getMessages(); for (Message message : messages) { JSONObject obj = new JSONObject(message.getBody()); String msg = obj.get("Message").toString(); JSONObject jsonMessage = new JSONObject(msg); if (jsonMessage.has("Event")) { AutoScalingEvents event = AutoScalingEvents.fromString((String) jsonMessage.get("Event")); switch (event) { case EC2_INSTANCE_TERMINATE: { if (eventsRepository.findOne(message.getMessageId()) == null) { clusterEventPublisher.nodeTerminated((String) jsonMessage.get("EC2InstanceId"), message.getMessageId()); amazonSQS.deleteMessage(new DeleteMessageRequest() .withQueueUrl(getQueueUrl()).withReceiptHandle(message.getReceiptHandle())); } break; } default: { LOG.warn("New AutoScaling event: {}", message.toString()); amazonSQS.deleteMessage(new DeleteMessageRequest() .withQueueUrl(getQueueUrl()).withReceiptHandle(message.getReceiptHandle())); } } } else { LOG.warn("Unknown event: {}", message.toString()); amazonSQS.deleteMessage(new DeleteMessageRequest() .withQueueUrl(getQueueUrl()).withReceiptHandle(message.getReceiptHandle())); } } } catch (Exception e) { LOG.error("Unable to process AutoScaling event", e); } sleep(); } }
private void deleteMessage(Message message) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Delete message " + message); } String messageReceiptHandle = message.getReceiptHandle(); sqs.deleteMessage(new DeleteMessageRequest(queueUrl, messageReceiptHandle)); }
@Test public void messageShouldBeDeletedAfterBeingConsumed() throws Exception { //GIVEN ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(); Message message1 = new Message() .withMessageId("aaaa-bbbb-cccc-dddd-eeee") .withBody("Sample test message") .withReceiptHandle("qwertz"); Message message2 = new Message() .withMessageId("ffff-gggg-hhhh-iiii-jjjj") .withBody("Another sample test message") .withReceiptHandle("asdfgh"); receiveMessageResult.setMessages(Lists.newArrayList(message1, message2)); // simulate at the 2nd call that the message has been deleted on SQS side when(sqs.receiveMessage((ReceiveMessageRequest) anyObject())) .thenReturn(receiveMessageResult) .thenReturn(new ReceiveMessageResult()); DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest(queueUrl, "asdfgh"); doNothing().when(sqs).deleteMessage(deleteMessageRequest); //WHEN receiverHandler.start(); //THEN Thread.sleep(100); verify(sqs, times(1)).deleteMessage(deleteMessageRequest); }
@Override public void poll(long waitInterval ) { Properties properties = new Properties(); String access_key_id = getProperty("AccessKeyId"); String secret_access_key = getProperty("SecretAccessKey"); BasicAWSCredentials credentials = new BasicAWSCredentials(access_key_id, secret_access_key); AmazonSQS sqs = new AmazonSQSClient(credentials); // Region selection Region region = Region.getRegion(Regions.fromName(getProperty("region"))); sqs.setRegion(region); GetQueueUrlResult queueUrl = sqs.getQueueUrl(getProperty("Queue")); ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl.getQueueUrl()); List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages(); String outputMessage = ""; // if there are messages then do the processing if(messages.size() > 0){ //append the message properties to the localenv tree for (Message message : messages) { properties.setProperty("MessageId", message.getMessageId()); properties.setProperty("ReceiptHandle", message.getReceiptHandle()); properties.setProperty("MD5OfBody", message.getMD5OfBody()); // get the message body to a string outputMessage = message.getBody(); } properties.setProperty("queueUrl", queueUrl.getQueueUrl()); // delete the message from the queue String messageReceiptHandle = messages.get(0).getReceiptHandle(); sqs.deleteMessage(new DeleteMessageRequest(queueUrl.getQueueUrl(), messageReceiptHandle)); ConnectorCallback callback = getCallback(); callback.processInboundData(outputMessage.getBytes(), properties); } }
@Test public void deleteMessageSucceedsWithValidReceiptHandle() { final String queueName = "bizo"; final String messageBody = "hi everybody"; final CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName); sqs.createQueue(createQueueRequest); final GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest().withQueueName(queueName); final GetQueueUrlResult getQueueUrlResult = sqs.getQueueUrl(getQueueUrlRequest); final String queueUrl = getQueueUrlResult.getQueueUrl(); final SendMessageRequest sendMessageRequest = new SendMessageRequest().withQueueUrl(queueUrl).withMessageBody(messageBody); sqs.sendMessage(sendMessageRequest); final int maxNumberOfMessages = 10; final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(maxNumberOfMessages); final ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(receiveMessageRequest); final List<Message> messages = receiveMessageResult.getMessages(); assertThat(messages.size(), equalTo(1)); final String receiptHandle = messages.get(0).getReceiptHandle(); final DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest().withQueueUrl(queueUrl).withReceiptHandle(receiptHandle); try { sqs.deleteMessage(deleteMessageRequest); } catch (ReceiptHandleIsInvalidException e) { fail("ReceiptHandleIsInvalidException was thrown"); } }
@Test(expected = ReceiptHandleIsInvalidException.class) public void deleteMessageFailsWithInvalidReceiptHandle() { final String queueName = "bizo"; final String messageBody = "hi everybody"; final CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName); sqs.createQueue(createQueueRequest); final GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest().withQueueName(queueName); final GetQueueUrlResult getQueueUrlResult = sqs.getQueueUrl(getQueueUrlRequest); final String queueUrl = getQueueUrlResult.getQueueUrl(); final SendMessageRequest sendMessageRequest = new SendMessageRequest().withQueueUrl(queueUrl).withMessageBody(messageBody); sqs.sendMessage(sendMessageRequest); final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest().withQueueUrl(queueUrl); final ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(receiveMessageRequest); final List<Message> messages = receiveMessageResult.getMessages(); assertThat(messages.size(), equalTo(1)); final String receiptHandle = "bizo"; final DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest().withQueueUrl(queueUrl).withReceiptHandle(receiptHandle); sqs.deleteMessage(deleteMessageRequest); }
protected void deleteMessage(final Message message) { DeleteMessageRequest request = new DeleteMessageRequest() .withQueueUrl(config.getProperty(ConfigProps.TRANSCODE_QUEUE)) .withReceiptHandle(message.getReceiptHandle()); sqsClient.deleteMessage(request); }
@Override public void delete() throws Exception { while(this.receipts.size()>0){ try { String receipt = this.receipts.poll(); DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest() .withQueueUrl(this.queueUrl).withReceiptHandle(receipt); this.client.deleteMessage(deleteMessageRequest); } catch(Throwable e){ e.printStackTrace(); } } }
@Override public void delete(JSONObject data) throws Exception { String receipt = data.getString(ITEM_RECEIPT); try { DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest() .withQueueUrl(this.queueUrl).withReceiptHandle(receipt); this.client.deleteMessage(deleteMessageRequest); } catch(Throwable e){ e.printStackTrace(); } }
public void deleteNotification(CloudtrailSNSNotification notification) { LOG.debug("Deleting SQS CloudTrail notification <{}>.", notification.getReceiptHandle()); sqs.deleteMessage(new DeleteMessageRequest() .withQueueUrl(queueName) .withReceiptHandle(notification.getReceiptHandle())); }
@Test public void executeMessage_successfulExecution_shouldRemoveMessageFromQueue() throws Exception { // Arrange CountDownLatch countDownLatch = new CountDownLatch(1); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer() { @Override protected void executeMessage(org.springframework.messaging.Message<String> stringMessage) { countDownLatch.countDown(); super.executeMessage(stringMessage); } }; AmazonSQSAsync sqs = mock(AmazonSQSAsync.class); container.setAmazonSqs(sqs); QueueMessageHandler messageHandler = new QueueMessageHandler(); container.setMessageHandler(messageHandler); StaticApplicationContext applicationContext = new StaticApplicationContext(); applicationContext.registerSingleton("testMessageListener", TestMessageListener.class); mockGetQueueUrl(sqs, "testQueue", "http://executeMessage_successfulExecution_shouldRemoveMessageFromQueue.amazonaws.com"); mockGetQueueAttributesWithEmptyResult(sqs, "http://executeMessage_successfulExecution_shouldRemoveMessageFromQueue.amazonaws.com"); messageHandler.setApplicationContext(applicationContext); messageHandler.afterPropertiesSet(); container.afterPropertiesSet(); mockReceiveMessage(sqs, "http://executeMessage_successfulExecution_shouldRemoveMessageFromQueue.amazonaws.com", "messageContent", "ReceiptHandle"); // Act container.start(); // Assert assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS)); container.stop(); verify(sqs, times(1)).deleteMessageAsync(eq(new DeleteMessageRequest("http://executeMessage_successfulExecution_shouldRemoveMessageFromQueue.amazonaws.com", "ReceiptHandle"))); }
/** * Delete a message from the SQS queue that you specified in the configuration file. * * @param sqsMessage the {@link Message} that you want to delete. * @param progressStatus {@link ProgressStatus} tracks the start and end status. * */ public void deleteMessageFromQueue(Message sqsMessage, ProgressStatus progressStatus) { final Object reportObject = progressReporter.reportStart(progressStatus); boolean deleteMessageSuccess = false; try { sqsClient.deleteMessage(new DeleteMessageRequest(config.getSqsUrl(), sqsMessage.getReceiptHandle())); deleteMessageSuccess = true; } catch (AmazonServiceException e) { LibraryUtils.handleException(exceptionHandler, progressStatus, e, "Failed to delete sqs message."); } LibraryUtils.endToProcess(progressReporter, deleteMessageSuccess, progressStatus, reportObject); }
/** Acknowledges the consumed message via calling <code>deleteMessage</code> */ @Override public void acknowledge(SQSMessage message) throws JMSException { session.checkClosed(); amazonSQSClient.deleteMessage(new DeleteMessageRequest( message.getQueueUrl(), message.getReceiptHandle())); }
/** * Acknowledges the consumed message via calling <code>deleteMessage</code>. */ @Override public void acknowledge(SQSMessage message) throws JMSException { session.checkClosed(); amazonSQSClient.deleteMessage(new DeleteMessageRequest( message.getQueueUrl(), message.getReceiptHandle())); unAckMessages.remove(message.getReceiptHandle()); }
@Test(expected = JMSException.class) public void testDeleteMessageThrowAmazonClientException() throws JMSException { DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest(); doThrow(new AmazonClientException("ace")) .when(amazonSQSClient).deleteMessage(eq(deleteMessageRequest)); wrapper.deleteMessage(deleteMessageRequest); }
@Test(expected = JMSException.class) public void testDeleteMessageThrowAmazonServiceException() throws JMSException { DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest(); doThrow(new AmazonServiceException("ase")) .when(amazonSQSClient).deleteMessage(eq(deleteMessageRequest)); wrapper.deleteMessage(deleteMessageRequest); }
/** * Test acknowledging message with auto acknowledger */ @Test public void testAcknowledge() throws Exception { /* * Set up message mock */ SQSMessage message = mock(SQSMessage.class); when(message.getQueueUrl()) .thenReturn(QUEUE_URL); when(message.getReceiptHandle()) .thenReturn(RECEIPT_HANDLE); /* * Use the acknowledger to ack the message */ acknowledger.acknowledge(message); /* * Verify results */ ArgumentCaptor<DeleteMessageRequest> argumentCaptor = ArgumentCaptor.forClass(DeleteMessageRequest.class); verify(amazonSQSClient).deleteMessage(argumentCaptor.capture()); assertEquals(1, argumentCaptor.getAllValues().size()); DeleteMessageRequest input = argumentCaptor.getAllValues().get(0); assertEquals(QUEUE_URL, input.getQueueUrl()); assertEquals(RECEIPT_HANDLE, input.getReceiptHandle()); }
/** * Test acknowledge does not impact messages that were not specifically acknowledge */ @Test public void testAcknowledge() throws JMSException { int populateMessageSize = 37; populateMessage(populateMessageSize); int counter = 0; List<SQSMessage> populatedMessagesCopy = new ArrayList<SQSMessage>(populatedMessages); while (!populatedMessagesCopy.isEmpty()) { int rand = new Random().nextInt(populatedMessagesCopy.size()); SQSMessage message = populatedMessagesCopy.remove(rand); message.acknowledge(); assertEquals(populateMessageSize - (++counter), acknowledger.getUnAckMessages().size()); } assertEquals(0, acknowledger.getUnAckMessages().size()); ArgumentCaptor<DeleteMessageRequest> argumentCaptor = ArgumentCaptor.forClass(DeleteMessageRequest.class); verify(amazonSQSClient, times(populateMessageSize)).deleteMessage(argumentCaptor.capture()); for (SQSMessage msg : populatedMessages) { DeleteMessageRequest deleteRequest = new DeleteMessageRequest() .withQueueUrl(msg.getQueueUrl()) .withReceiptHandle(msg.getReceiptHandle()); assertTrue(argumentCaptor.getAllValues().contains(deleteRequest)); } }
private void deleteMessageFromQueue(String messageReceiptHandle, String messageQueueUrl) throws MessagingException { try { amazonSQS.deleteMessage(new DeleteMessageRequest(messageQueueUrl, messageReceiptHandle)); } catch (AbortedException e) { LOG.info("Client abort delete message."); } catch (AmazonClientException ase) { throw new MessagingException("Failed to delete message with receipt handle " + messageReceiptHandle + " from queue " + messageQueueUrl, ase); } }
/** * Deletes a message from queue. * * @param message Received message. * @throws MessagingException Failed to delete message. */ @Override public void delete(PolledMessage<M> message) throws MessagingException { try { amazonSQS.deleteMessage(new DeleteMessageRequest(queueUrl, message.getMessageId())); } catch (AmazonClientException ase) { throw new MessagingException("Failed to delete message with id " + message.getMessageId(), ase); } }
@Test public void shouldDeletePolledMessagesAfterHavingPassedThemToTheMessageHandler() throws Exception { // Arrange Message msg1 = createMessage("testReceiptHandle"); receivedMessages.add(msg1); // Act queueProcessor.poll(); // Assert ArgumentCaptor<DeleteMessageRequest> argumentCaptor = ArgumentCaptor.forClass(DeleteMessageRequest.class); verify(mockAmazonSQS).deleteMessage(argumentCaptor.capture()); assertEquals("testReceiptHandle", argumentCaptor.getValue().getReceiptHandle()); }
@Test public void shouldNotDeletePolledMessagesIfTheHandlerThrowsAnException() throws Exception { // Arrange Message msg1 = createMessage("msg1"); receivedMessages.add(msg1); doThrow(new Exception()).when(mockHandler).handle(msg1); // Act queueProcessor.poll(); // Assert verify(mockHandler).handle(msg1); verify(mockAmazonSQS, never()).deleteMessage(any(DeleteMessageRequest.class)); }
@Override public DeleteMessageResult deleteMessage(DeleteMessageRequest deleteMessageRequest) throws AmazonClientException { try { DirectorySQSQueue queue = getQueueFromUrl(deleteMessageRequest.getQueueUrl(), false); queue.delete(deleteMessageRequest.getReceiptHandle()); return new DeleteMessageResult(); } catch (IOException e) { throw new AmazonServiceException("error deleting message", e); } }
@Override public void delete(Message message) { if (message instanceof OriginatingMessage) { OriginatingMessage originatingMessage = (OriginatingMessage) message; sqs.deleteMessage(new DeleteMessageRequest(originatingMessage.getOriginatingQueueUrl(), message.getReceipt())); } else { throw new RuntimeException("Unsupported message type: " + message.getBody()); } }
public String acknowlegdeReceipt(Message<?> message) { String receiptHandle = (String) message.getHeaders().get( SqsHeaders.MSG_RECEIPT_HANDLE); if (sqsClient != null && receiptHandle != null && !receiptHandle.isEmpty()) { sqsClient.deleteMessage(new DeleteMessageRequest(queueUrl, receiptHandle)); } return receiptHandle; }
@Test public void testSendChangeVisibilityReceiveDeleteMessage_shouldSendChangeVisibilityReceiveAndDeleteMessage() { // create queue CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue")); // send message String messageBody = "{\"life-universe-everything\":42}"; SendMessageResult sendResult = sqs.sendMessage(new SendMessageRequest().withDelaySeconds(0).withMessageBody(messageBody) .withMessageGroupId("some-group-id-123").withQueueUrl(createdQueue.getQueueUrl())); assertNotNull("message sending returned ok", sendResult); assertNotNull("verify body MD5 exists",sendResult.getMD5OfMessageBody()); assertNotNull("verify message id exists",sendResult.getMessageId()); // receive message ReceiveMessageResult messageResult = sqs.receiveMessage(new ReceiveMessageRequest() .withMaxNumberOfMessages(3).withQueueUrl(createdQueue.getQueueUrl()).withVisibilityTimeout(10) .withWaitTimeSeconds(0)); assertNotNull("verify received message returned ok",messageResult); assertEquals("verify correct receive count", 1, messageResult.getMessages().size()); Message firstMessage = messageResult.getMessages().get(0); assertEquals("verify correct body returned",messageBody,firstMessage.getBody()); assertEquals("verify correct message MD5",getAwsMessageMD5(messageBody),firstMessage.getMD5OfBody()); assertNotNull("verify message id exists",firstMessage.getMessageId()); assertNotNull("verify receipt handle exists",firstMessage.getReceiptHandle()); // extend visibility timeout ChangeMessageVisibilityResult visibilityResult = sqs.changeMessageVisibility(new ChangeMessageVisibilityRequest() .withQueueUrl(createdQueue.getQueueUrl()).withReceiptHandle(firstMessage.getReceiptHandle()).withVisibilityTimeout(40)); assertNotNull("changing visibility returned ok", visibilityResult); // verify if message is invisible ReceiveMessageResult emptyResult = sqs.receiveMessage(new ReceiveMessageRequest() .withMaxNumberOfMessages(1).withQueueUrl(createdQueue.getQueueUrl()).withVisibilityTimeout(20) .withWaitTimeSeconds(0)); assertTrue("at visibility timeout the message should not be available.", emptyResult.getMessages().isEmpty()); // delete message from queue DeleteMessageResult deleteResult = sqs.deleteMessage(new DeleteMessageRequest() .withQueueUrl(createdQueue.getQueueUrl()).withReceiptHandle(firstMessage.getReceiptHandle())); assertNotNull("verify deletion returned ok",deleteResult); assertTrue("queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getMessageQueue().isEmpty()); assertTrue("invisibility-queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getInvisibilityQueueFor(firstMessage.getReceiptHandle()).isEmpty()); // cleanup getQueues().remove("tea-earl-grey-queue"); }
@Test public void testDeleteMessage_withEmptyRequestParams_shouldWork() { assertNotNull(sqs.deleteMessage(new DeleteMessageRequest())); }
public Observable<DeleteMessageResult> deleteMessageAsync(DeleteMessageRequest request) { return Observable.from(sqsClient.deleteMessageAsync(request)); }
/** * <p> * Deletes the specified message from the specified queue and deletes the * message payload from Amazon S3 when necessary. You specify the message by * using the message's <code>receipt handle</code> and not the * <code>message ID</code> you received when you sent the message. Even if * the message is locked by another reader due to the visibility timeout * setting, it is still deleted from the queue. If you leave a message in * the queue for longer than the queue's configured retention period, Amazon * SQS automatically deletes it. * </p> * <p> * <b>NOTE:</b> The receipt handle is associated with a specific instance of * receiving the message. If you receive a message more than once, the * receipt handle you get each time you receive the message is different. * When you request DeleteMessage, if you don't provide the most recently * received receipt handle for the message, the request will still succeed, * but the message might not be deleted. * </p> * <p> * <b>IMPORTANT:</b> It is possible you will receive a message even after * you have deleted it. This might happen on rare occasions if one of the * servers storing a copy of the message is unavailable when you request to * delete the message. The copy remains on the server and might be returned * to you again on a subsequent receive request. You should create your * system to be idempotent so that receiving a particular message more than * once is not a problem. * </p> * * @param deleteMessageRequest * Container for the necessary parameters to execute the * DeleteMessage service method on AmazonSQS. * * @return The response from the DeleteMessage service method, as returned * by AmazonSQS. * * @throws ReceiptHandleIsInvalidException * @throws InvalidIdFormatException * * @throws AmazonClientException * If any internal errors are encountered inside the client * while attempting to make the request or handle the response. * For example if a network connection is not available. * @throws AmazonServiceException * If an error response is returned by AmazonSQS indicating * either a problem with the data in the request, or a server * side issue. */ public DeleteMessageResult deleteMessage(DeleteMessageRequest deleteMessageRequest) { if (deleteMessageRequest == null) { String errorMessage = "deleteMessageRequest cannot be null."; LOG.error(errorMessage); throw new AmazonClientException(errorMessage); } deleteMessageRequest.getRequestClientOptions().appendUserAgent(SQSExtendedClientConstants.USER_AGENT_HEADER); if (!clientConfiguration.isLargePayloadSupportEnabled()) { return super.deleteMessage(deleteMessageRequest); } String receiptHandle = deleteMessageRequest.getReceiptHandle(); String origReceiptHandle = receiptHandle; if (isS3ReceiptHandle(receiptHandle)) { deleteMessagePayloadFromS3(receiptHandle); origReceiptHandle = getOrigReceiptHandle(receiptHandle); } deleteMessageRequest.setReceiptHandle(origReceiptHandle); return super.deleteMessage(deleteMessageRequest); }