@Test public void shutdownOfSqsAndS3FactoryCreatedClientsOccursWhenS3DeleteObjectFails() { AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); AmazonS3Client s3 = Mockito.mock(AmazonS3Client.class); String s3Id = "123"; SqsMessage m = new SqsMessage(RECEIPT_HANDLE, new byte[] {}, 1000, Optional.of(s3Id), new SqsMessage.Service(Optional.of(() -> s3), () -> sqs, Optional.of(s3), sqs, QUEUE, Optional.of(BUCKET))); Mockito.when(sqs.deleteMessage(QUEUE, RECEIPT_HANDLE)) .thenReturn(new DeleteMessageResult()); Mockito.doThrow(new RuntimeException()).when(s3).deleteObject(BUCKET, s3Id); try { m.deleteMessage(Client.FROM_FACTORY); Assert.fail(); } catch (RuntimeException e) { // do nothing } InOrder inorder = Mockito.inOrder(sqs, s3); inorder.verify(s3, Mockito.times(1)).deleteObject(BUCKET, s3Id); inorder.verify(s3, Mockito.times(1)).shutdown(); inorder.verify(sqs, Mockito.times(1)).shutdown(); Mockito.verifyNoMoreInteractions(sqs, s3); }
@Test public void deleteMessageFromFactoryWhenS3FactoryExists() { AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); AmazonS3Client s3 = Mockito.mock(AmazonS3Client.class); String s3Id = "123"; SqsMessage m = new SqsMessage(RECEIPT_HANDLE, new byte[] {}, 1000, Optional.of(s3Id), new SqsMessage.Service(Optional.of(() -> s3), () -> sqs, Optional.of(s3), sqs, QUEUE, Optional.of(BUCKET))); Mockito.when(sqs.deleteMessage(QUEUE, RECEIPT_HANDLE)) .thenReturn(new DeleteMessageResult()); m.deleteMessage(Client.FROM_FACTORY); InOrder inorder = Mockito.inOrder(sqs, s3); inorder.verify(s3, Mockito.times(1)).deleteObject(BUCKET, s3Id); inorder.verify(sqs, Mockito.times(1)).deleteMessage(QUEUE, RECEIPT_HANDLE); inorder.verify(s3, Mockito.times(1)).shutdown(); inorder.verify(sqs, Mockito.times(1)).shutdown(); Mockito.verifyNoMoreInteractions(sqs, s3); }
@Test public void testClientFromFactory() { AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); SqsMessage m = new SqsMessage(RECEIPT_HANDLE, new byte[] {}, 1000, Optional.empty(), new SqsMessage.Service(Optional.empty(), () -> sqs, Optional.empty(), sqs, QUEUE, Optional.empty())); Mockito.when(sqs.deleteMessage(QUEUE, RECEIPT_HANDLE)) .thenReturn(new DeleteMessageResult()); m.deleteMessage(Client.FROM_FACTORY); InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.times(1)).deleteMessage(QUEUE, RECEIPT_HANDLE); inorder.verify(sqs, Mockito.times(1)).shutdown(); Mockito.verifyNoMoreInteractions(sqs); }
@Test public void testClientFromSourceFailsThenFailsOverToFromFactory() { AmazonSQSClient sqs = Mockito.mock(AmazonSQSClient.class); SqsMessage m = new SqsMessage(RECEIPT_HANDLE, new byte[] {}, 1000, Optional.empty(), new SqsMessage.Service(Optional.empty(), () -> sqs, Optional.empty(), sqs, QUEUE, Optional.empty())); Mockito.when(sqs.deleteMessage(QUEUE, RECEIPT_HANDLE)).thenThrow(new RuntimeException()) .thenReturn(new DeleteMessageResult()); m.deleteMessage(); InOrder inorder = Mockito.inOrder(sqs); inorder.verify(sqs, Mockito.times(2)).deleteMessage(QUEUE, RECEIPT_HANDLE); inorder.verify(sqs, Mockito.times(1)).shutdown(); Mockito.verifyNoMoreInteractions(sqs); }
@Test public void testShouldCommitTheMessageSuccessfully() { final SdkHttpMetadata responseMetadata = mock(SdkHttpMetadata.class); when(responseMetadata.getHttpStatusCode()).thenReturn(200); final DeleteMessageResult deleteMessageResult = new DeleteMessageResult(); deleteMessageResult.setSdkHttpMetadata(responseMetadata); when(amazonSQS.deleteMessage(anyString(), anyString())).thenReturn(deleteMessageResult); sqsFailedEventSource.commit(new SQSFailedEvent(new FailedEvent())); verify(amazonSQS).deleteMessage(anyString(), anyString()); }
@Override public DeleteMessageResult deleteMessage(DeleteMessageRequest deleteMessageRequest) throws AmazonClientException { String receiptHandle = deleteMessageRequest.getReceiptHandle(); if (inFlight.containsKey(receiptHandle)) { ScheduledFuture inFlightTask = inFlight.get(receiptHandle); inFlightTask.cancel(true); } return new DeleteMessageResult(); }
@Override public DeleteMessageResult deleteMessage(DeleteMessageRequest deleteMessageRequest) throws AmazonClientException { try { DirectorySQSQueue queue = getQueueFromUrl(deleteMessageRequest.getQueueUrl(), false); queue.delete(deleteMessageRequest.getReceiptHandle()); return new DeleteMessageResult(); } catch (IOException e) { throw new AmazonServiceException("error deleting message", e); } }
@Test public void testSendChangeVisibilityReceiveDeleteMessage_shouldSendChangeVisibilityReceiveAndDeleteMessage() { // create queue CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue")); // send message String messageBody = "{\"life-universe-everything\":42}"; SendMessageResult sendResult = sqs.sendMessage(new SendMessageRequest().withDelaySeconds(0).withMessageBody(messageBody) .withMessageGroupId("some-group-id-123").withQueueUrl(createdQueue.getQueueUrl())); assertNotNull("message sending returned ok", sendResult); assertNotNull("verify body MD5 exists",sendResult.getMD5OfMessageBody()); assertNotNull("verify message id exists",sendResult.getMessageId()); // receive message ReceiveMessageResult messageResult = sqs.receiveMessage(new ReceiveMessageRequest() .withMaxNumberOfMessages(3).withQueueUrl(createdQueue.getQueueUrl()).withVisibilityTimeout(10) .withWaitTimeSeconds(0)); assertNotNull("verify received message returned ok",messageResult); assertEquals("verify correct receive count", 1, messageResult.getMessages().size()); Message firstMessage = messageResult.getMessages().get(0); assertEquals("verify correct body returned",messageBody,firstMessage.getBody()); assertEquals("verify correct message MD5",getAwsMessageMD5(messageBody),firstMessage.getMD5OfBody()); assertNotNull("verify message id exists",firstMessage.getMessageId()); assertNotNull("verify receipt handle exists",firstMessage.getReceiptHandle()); // extend visibility timeout ChangeMessageVisibilityResult visibilityResult = sqs.changeMessageVisibility(new ChangeMessageVisibilityRequest() .withQueueUrl(createdQueue.getQueueUrl()).withReceiptHandle(firstMessage.getReceiptHandle()).withVisibilityTimeout(40)); assertNotNull("changing visibility returned ok", visibilityResult); // verify if message is invisible ReceiveMessageResult emptyResult = sqs.receiveMessage(new ReceiveMessageRequest() .withMaxNumberOfMessages(1).withQueueUrl(createdQueue.getQueueUrl()).withVisibilityTimeout(20) .withWaitTimeSeconds(0)); assertTrue("at visibility timeout the message should not be available.", emptyResult.getMessages().isEmpty()); // delete message from queue DeleteMessageResult deleteResult = sqs.deleteMessage(new DeleteMessageRequest() .withQueueUrl(createdQueue.getQueueUrl()).withReceiptHandle(firstMessage.getReceiptHandle())); assertNotNull("verify deletion returned ok",deleteResult); assertTrue("queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getMessageQueue().isEmpty()); assertTrue("invisibility-queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getInvisibilityQueueFor(firstMessage.getReceiptHandle()).isEmpty()); // cleanup getQueues().remove("tea-earl-grey-queue"); }
public Observable<DeleteMessageResult> deleteMessageAsync(DeleteMessageRequest request) { return Observable.from(sqsClient.deleteMessageAsync(request)); }
public Observable<DeleteMessageResult> deleteMessageAsync(String queueUrl, String receiptHandle) { return Observable.from(sqsClient.deleteMessageAsync(queueUrl, receiptHandle)); }
/** * <p> * Deletes the specified message from the specified queue and deletes the * message payload from Amazon S3 when necessary. You specify the message by * using the message's <code>receipt handle</code> and not the * <code>message ID</code> you received when you sent the message. Even if * the message is locked by another reader due to the visibility timeout * setting, it is still deleted from the queue. If you leave a message in * the queue for longer than the queue's configured retention period, Amazon * SQS automatically deletes it. * </p> * <p> * <b>NOTE:</b> The receipt handle is associated with a specific instance of * receiving the message. If you receive a message more than once, the * receipt handle you get each time you receive the message is different. * When you request DeleteMessage, if you don't provide the most recently * received receipt handle for the message, the request will still succeed, * but the message might not be deleted. * </p> * <p> * <b>IMPORTANT:</b> It is possible you will receive a message even after * you have deleted it. This might happen on rare occasions if one of the * servers storing a copy of the message is unavailable when you request to * delete the message. The copy remains on the server and might be returned * to you again on a subsequent receive request. You should create your * system to be idempotent so that receiving a particular message more than * once is not a problem. * </p> * * @param deleteMessageRequest * Container for the necessary parameters to execute the * DeleteMessage service method on AmazonSQS. * * @return The response from the DeleteMessage service method, as returned * by AmazonSQS. * * @throws ReceiptHandleIsInvalidException * @throws InvalidIdFormatException * * @throws AmazonClientException * If any internal errors are encountered inside the client * while attempting to make the request or handle the response. * For example if a network connection is not available. * @throws AmazonServiceException * If an error response is returned by AmazonSQS indicating * either a problem with the data in the request, or a server * side issue. */ public DeleteMessageResult deleteMessage(DeleteMessageRequest deleteMessageRequest) { if (deleteMessageRequest == null) { String errorMessage = "deleteMessageRequest cannot be null."; LOG.error(errorMessage); throw new AmazonClientException(errorMessage); } deleteMessageRequest.getRequestClientOptions().appendUserAgent(SQSExtendedClientConstants.USER_AGENT_HEADER); if (!clientConfiguration.isLargePayloadSupportEnabled()) { return super.deleteMessage(deleteMessageRequest); } String receiptHandle = deleteMessageRequest.getReceiptHandle(); String origReceiptHandle = receiptHandle; if (isS3ReceiptHandle(receiptHandle)) { deleteMessagePayloadFromS3(receiptHandle); origReceiptHandle = getOrigReceiptHandle(receiptHandle); } deleteMessageRequest.setReceiptHandle(origReceiptHandle); return super.deleteMessage(deleteMessageRequest); }
/** * <p> * Deletes the specified message from the specified queue. You specify * the message by using the message's <code>receipt handle</code> and not * the <code>message ID</code> you received when you sent the message. * Even if the message is locked by another reader due to the visibility * timeout setting, it is still deleted from the queue. If you leave a * message in the queue for longer than the queue's configured retention * period, Amazon SQS automatically deletes it. * </p> * <p> * <b>NOTE:</b> The receipt handle is associated with a specific * instance of receiving the message. If you receive a message more than * once, the receipt handle you get each time you receive the message is * different. When you request DeleteMessage, if you don't provide the * most recently received receipt handle for the message, the request * will still succeed, but the message might not be deleted. * </p> * <p> * <b>IMPORTANT:</b> It is possible you will receive a message even * after you have deleted it. This might happen on rare occasions if one * of the servers storing a copy of the message is unavailable when you * request to delete the message. The copy remains on the server and * might be returned to you again on a subsequent receive request. You * should create your system to be idempotent so that receiving a * particular message more than once is not a problem. * </p> * * @param deleteMessageRequest Container for the necessary parameters to * execute the DeleteMessage service method on AmazonSQS. * * @return The response from the DeleteMessage service method, as returned * by AmazonSQS. * * @throws ReceiptHandleIsInvalidException * @throws InvalidIdFormatException * * @throws AmazonClientException * If any internal errors are encountered inside the client while * attempting to make the request or handle the response. For example * if a network connection is not available. * @throws AmazonServiceException * If an error response is returned by AmazonSQS indicating * either a problem with the data in the request, or a server side issue. */ public DeleteMessageResult deleteMessage(DeleteMessageRequest deleteMessageRequest) { return amazonSqsToBeExtended.deleteMessage(deleteMessageRequest); }
/** * <p> * Deletes the specified message from the specified queue. You specify the * message by using the message's <code>receipt handle</code> and not the * <code>message ID</code> you received when you sent the message. Even if * the message is locked by another reader due to the visibility timeout * setting, it is still deleted from the queue. If you leave a message in * the queue for longer than the queue's configured retention period, Amazon * SQS automatically deletes it. * </p> * <p> * <b>NOTE:</b> The receipt handle is associated with a specific instance of * receiving the message. If you receive a message more than once, the * receipt handle you get each time you receive the message is different. * When you request DeleteMessage, if you don't provide the most recently * received receipt handle for the message, the request will still succeed, * but the message might not be deleted. * </p> * <p> * <b>IMPORTANT:</b> It is possible you will receive a message even after * you have deleted it. This might happen on rare occasions if one of the * servers storing a copy of the message is unavailable when you request to * delete the message. The copy remains on the server and might be returned * to you again on a subsequent receive request. You should create your * system to be idempotent so that receiving a particular message more than * once is not a problem. * </p> * * @param queueUrl * The URL of the Amazon SQS queue to take action on. * @param receiptHandle * The receipt handle associated with the message to delete. * * @return The response from the DeleteMessage service method, as returned * by AmazonSQS. * * @throws ReceiptHandleIsInvalidException * @throws InvalidIdFormatException * * @throws AmazonClientException * If any internal errors are encountered inside the client * while attempting to make the request or handle the response. * For example if a network connection is not available. * @throws AmazonServiceException * If an error response is returned by AmazonSQS indicating * either a problem with the data in the request, or a server * side issue. */ public DeleteMessageResult deleteMessage(String queueUrl, String receiptHandle) throws AmazonServiceException, AmazonClientException { return amazonSqsToBeExtended.deleteMessage(queueUrl, receiptHandle); }
/** * <p> * Deletes the specified message from the specified queue and deletes the * message payload from Amazon S3 when necessary. You specify the message by * using the message's <code>receipt handle</code> and not the * <code>message ID</code> you received when you sent the message. Even if * the message is locked by another reader due to the visibility timeout * setting, it is still deleted from the queue. If you leave a message in * the queue for longer than the queue's configured retention period, Amazon * SQS automatically deletes it. * </p> * <p> * <b>NOTE:</b> The receipt handle is associated with a specific instance of * receiving the message. If you receive a message more than once, the * receipt handle you get each time you receive the message is different. * When you request DeleteMessage, if you don't provide the most recently * received receipt handle for the message, the request will still succeed, * but the message might not be deleted. * </p> * <p> * <b>IMPORTANT:</b> It is possible you will receive a message even after * you have deleted it. This might happen on rare occasions if one of the * servers storing a copy of the message is unavailable when you request to * delete the message. The copy remains on the server and might be returned * to you again on a subsequent receive request. You should create your * system to be idempotent so that receiving a particular message more than * once is not a problem. * </p> * * @param queueUrl * The URL of the Amazon SQS queue to take action on. * @param receiptHandle * The receipt handle associated with the message to delete. * * @return The response from the DeleteMessage service method, as returned * by AmazonSQS. * * @throws ReceiptHandleIsInvalidException * @throws InvalidIdFormatException * * @throws AmazonClientException * If any internal errors are encountered inside the client * while attempting to make the request or handle the response. * For example if a network connection is not available. * @throws AmazonServiceException * If an error response is returned by AmazonSQS indicating * either a problem with the data in the request, or a server * side issue. */ public DeleteMessageResult deleteMessage(String queueUrl, String receiptHandle) { DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest(queueUrl, receiptHandle); return deleteMessage(deleteMessageRequest); }