@Test public void testRun() { // given // when uut.run(); // then ArgumentCaptor<ChangeMessageVisibilityRequest> captor = ArgumentCaptor.forClass( ChangeMessageVisibilityRequest.class); verify(sqsClient).changeMessageVisibility(captor.capture()); ChangeMessageVisibilityRequest request = captor.getValue(); assertEquals("rhd", request.getReceiptHandle()); assertEquals("queue", request.getQueueUrl()); assertEquals(600, request.getVisibilityTimeout().intValue()); }
public void changeMessageVisibility(Message msg, int value) { logger.info("Change visibility to {} seconds", value); if (value > 36000) { value = 36000; } ChangeMessageVisibilityRequest request = new ChangeMessageVisibilityRequest() .withQueueUrl(this.queueUrl) .withReceiptHandle(msg.getReceiptHandle()).withVisibilityTimeout(value); this.getClient().changeMessageVisibility(request); }
VisibilityTimeoutExtender(@NonNull AmazonSQS sqsClient, @NonNull Duration newVisibilityTimeout, @NonNull Message<?> message, @NonNull String queueUrl) { this.sqsClient = sqsClient; request = new ChangeMessageVisibilityRequest().withQueueUrl(queueUrl).withReceiptHandle( message.getHeaders().get("ReceiptHandle", String.class)).withVisibilityTimeout( timeoutInSeconds(newVisibilityTimeout)); }
@Test public void longReceiveExtendsMessageVisibility() throws Exception { this.mock.expectedMessageCount(1); this.mock.whenAnyExchangeReceived(new Processor() { @Override public void process(Exchange exchange) throws Exception { // Simulate message that takes a while to receive. Thread.sleep(TIMEOUT * 1500L); // 150% of TIMEOUT. } }); Message message = new Message(); message.setBody("Message 1"); message.setMD5OfBody("6a1559560f67c5e7a7d5d838bf0272ee"); message.setMessageId("f6fb6f99-5eb2-4be4-9b15-144774141458"); message.setReceiptHandle(RECEIPT_HANDLE); this.clientMock.messages.add(message); assertMockEndpointsSatisfied(); // Wait for message to arrive. assertTrue("Expected at least one changeMessageVisibility request.", this.clientMock.changeMessageVisibilityRequests.size() >= 1); for (ChangeMessageVisibilityRequest req : this.clientMock.changeMessageVisibilityRequests) { assertEquals("https://queue.amazonaws.com/541925086079/MyQueue", req.getQueueUrl()); assertEquals(RECEIPT_HANDLE, req.getReceiptHandle()); Integer expectedTimeout = new Integer(6); // Should be 1.5 x TIMEOUT as takes into account the delay period assertEquals(expectedTimeout, req.getVisibilityTimeout()); } }
/** * Simplified method form for invoking the ChangeMessageVisibility * operation. * * @see #changeMessageVisibility(ChangeMessageVisibilityRequest) */ public ChangeMessageVisibilityResult changeMessageVisibility(String queueUrl, String receiptHandle, Integer visibilityTimeout) { ChangeMessageVisibilityRequest changeMessageVisibilityRequest = new ChangeMessageVisibilityRequest(queueUrl, receiptHandle, visibilityTimeout); return changeMessageVisibility(changeMessageVisibilityRequest); }
@Override public void changeVisibility(Integer visibilityTimeout, ResultCapture<Void> extractor) { ChangeMessageVisibilityRequest request = new ChangeMessageVisibilityRequest() .withVisibilityTimeout(visibilityTimeout); changeVisibility(request, extractor); }
@Override public Future<?> extend(int seconds) { return this.amazonSqsAsync.changeMessageVisibilityAsync(new ChangeMessageVisibilityRequest() .withQueueUrl(this.queueUrl) .withReceiptHandle(this.receiptHandle) .withVisibilityTimeout(seconds)); }
@Test public void testChangeMessageVisibility() throws JMSException { ChangeMessageVisibilityRequest changeMessageVisibilityRequest = new ChangeMessageVisibilityRequest(); wrapper.changeMessageVisibility(changeMessageVisibilityRequest); verify(amazonSQSClient).changeMessageVisibility(changeMessageVisibilityRequest); }
@Test(expected = JMSException.class) public void testChangeMessageVisibilityThrowAmazonClientException() throws JMSException { ChangeMessageVisibilityRequest changeMessageVisibilityRequest = new ChangeMessageVisibilityRequest(); doThrow(new AmazonClientException("ace")) .when(amazonSQSClient).changeMessageVisibility(eq(changeMessageVisibilityRequest)); wrapper.changeMessageVisibility(changeMessageVisibilityRequest); }
@Test(expected = JMSException.class) public void testChangeMessageVisibilityThrowAmazonServiceException() throws JMSException { ChangeMessageVisibilityRequest changeMessageVisibilityRequest = new ChangeMessageVisibilityRequest(); doThrow(new AmazonServiceException("ase")) .when(amazonSQSClient).changeMessageVisibility(eq(changeMessageVisibilityRequest)); wrapper.changeMessageVisibility(changeMessageVisibilityRequest); }
@Override public ChangeMessageVisibilityResult changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) throws AmazonClientException { try { DirectorySQSQueue queue = getQueueFromUrl(changeMessageVisibilityRequest.getQueueUrl(), false); queue.changeVisibility(changeMessageVisibilityRequest.getReceiptHandle(), changeMessageVisibilityRequest.getVisibilityTimeout()); return new ChangeMessageVisibilityResult(); } catch (IOException e) { throw new AmazonServiceException("error", e); } }
public void willReceiveMessageAfterTimeout() throws InterruptedException { final String queueUrl = someNewQueue(); final String messageBody = someMessageBody(); final SendMessageResult sendResult = _amazonSQS.sendMessage(new SendMessageRequest(queueUrl, messageBody)); Assert.assertNotNull(sendResult.getMD5OfMessageBody()); verifyReceiveEmail(sendResult.getMessageId(), queueUrl, 1); sleep(1); verifyReceiveEmail(sendResult.getMessageId(), queueUrl, 2); sleep(1); verifyReceiveNone(queueUrl); sleep(1); final String receiptHandle = verifyReceiveEmail(sendResult.getMessageId(), queueUrl, 1); _amazonSQS.changeMessageVisibility(new ChangeMessageVisibilityRequest(queueUrl, receiptHandle, 3)); sleep(2); verifyReceiveNone(queueUrl); sleep(1); verifyReceiveEmail(sendResult.getMessageId(), queueUrl, null); }
@Override public void setVisibilityTimeout(Message message, Integer visibilityTimeoutSeconds) { if (message instanceof OriginatingMessage) { OriginatingMessage originatingMessage = (OriginatingMessage) message; sqs.changeMessageVisibility(new ChangeMessageVisibilityRequest(originatingMessage.getOriginatingQueueUrl(), message.getReceipt(), visibilityTimeoutSeconds)); } else { throw new RuntimeException("Unsupported message type: " + message.getBody()); } }
@Test public void testSendChangeVisibilityReceiveDeleteMessage_shouldSendChangeVisibilityReceiveAndDeleteMessage() { // create queue CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue")); // send message String messageBody = "{\"life-universe-everything\":42}"; SendMessageResult sendResult = sqs.sendMessage(new SendMessageRequest().withDelaySeconds(0).withMessageBody(messageBody) .withMessageGroupId("some-group-id-123").withQueueUrl(createdQueue.getQueueUrl())); assertNotNull("message sending returned ok", sendResult); assertNotNull("verify body MD5 exists",sendResult.getMD5OfMessageBody()); assertNotNull("verify message id exists",sendResult.getMessageId()); // receive message ReceiveMessageResult messageResult = sqs.receiveMessage(new ReceiveMessageRequest() .withMaxNumberOfMessages(3).withQueueUrl(createdQueue.getQueueUrl()).withVisibilityTimeout(10) .withWaitTimeSeconds(0)); assertNotNull("verify received message returned ok",messageResult); assertEquals("verify correct receive count", 1, messageResult.getMessages().size()); Message firstMessage = messageResult.getMessages().get(0); assertEquals("verify correct body returned",messageBody,firstMessage.getBody()); assertEquals("verify correct message MD5",getAwsMessageMD5(messageBody),firstMessage.getMD5OfBody()); assertNotNull("verify message id exists",firstMessage.getMessageId()); assertNotNull("verify receipt handle exists",firstMessage.getReceiptHandle()); // extend visibility timeout ChangeMessageVisibilityResult visibilityResult = sqs.changeMessageVisibility(new ChangeMessageVisibilityRequest() .withQueueUrl(createdQueue.getQueueUrl()).withReceiptHandle(firstMessage.getReceiptHandle()).withVisibilityTimeout(40)); assertNotNull("changing visibility returned ok", visibilityResult); // verify if message is invisible ReceiveMessageResult emptyResult = sqs.receiveMessage(new ReceiveMessageRequest() .withMaxNumberOfMessages(1).withQueueUrl(createdQueue.getQueueUrl()).withVisibilityTimeout(20) .withWaitTimeSeconds(0)); assertTrue("at visibility timeout the message should not be available.", emptyResult.getMessages().isEmpty()); // delete message from queue DeleteMessageResult deleteResult = sqs.deleteMessage(new DeleteMessageRequest() .withQueueUrl(createdQueue.getQueueUrl()).withReceiptHandle(firstMessage.getReceiptHandle())); assertNotNull("verify deletion returned ok",deleteResult); assertTrue("queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getMessageQueue().isEmpty()); assertTrue("invisibility-queue must be empty after removal",getQueues().get("tea-earl-grey-queue").getInvisibilityQueueFor(firstMessage.getReceiptHandle()).isEmpty()); // cleanup getQueues().remove("tea-earl-grey-queue"); }
@Test public void testChangeMessageVisibility_withEmptyRequestParams_shouldWork() { assertNotNull(sqs.changeMessageVisibility(new ChangeMessageVisibilityRequest())); }
@Override public void setUnackTimeout(Message message, long unackTimeout) { int unackTimeoutInSeconds = (int) (unackTimeout / 1000); ChangeMessageVisibilityRequest request = new ChangeMessageVisibilityRequest(queueURL, message.getReceipt(), unackTimeoutInSeconds); client.changeMessageVisibility(request); }
public Observable<ChangeMessageVisibilityResult> changeMessageVisibilityAsync(ChangeMessageVisibilityRequest request) { return Observable.from(sqsClient.changeMessageVisibilityAsync(request)); }
@Override public ChangeMessageVisibilityResult changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) throws AmazonServiceException, AmazonClientException { this.changeMessageVisibilityRequests.add(changeMessageVisibilityRequest); return new ChangeMessageVisibilityResult(); }
@Override public void changeVisibility(ChangeMessageVisibilityRequest request) { changeVisibility(request, null); }
@Override public void changeVisibility(ChangeMessageVisibilityRequest request, ResultCapture<Void> extractor) { resource.performAction("ChangeVisibility", request, extractor); }
@Test public void receiveMessage_withMessageListenerMethodAndVisibilityProlonging_callsChangeMessageVisibility() throws Exception { // Arrange CountDownLatch countDownLatch = new CountDownLatch(1); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer() { @Override protected void executeMessage(org.springframework.messaging.Message<String> stringMessage) { countDownLatch.countDown(); super.executeMessage(stringMessage); } }; AmazonSQSAsync sqs = mock(AmazonSQSAsync.class); container.setAmazonSqs(sqs); QueueMessageHandler messageHandler = new QueueMessageHandler(); container.setMessageHandler(messageHandler); StaticApplicationContext applicationContext = new StaticApplicationContext(); applicationContext.registerSingleton("testListener", TestMessageListenerWithVisibilityProlong.class); mockGetQueueUrl(sqs, "testQueue", "http://receiveMessage_withMessageListenerMethodAndVisibilityProlonging_callsChangeMessageVisibility.amazonaws.com"); mockGetQueueAttributesWithEmptyResult(sqs, "http://receiveMessage_withMessageListenerMethodAndVisibilityProlonging_callsChangeMessageVisibility.amazonaws.com"); messageHandler.setApplicationContext(applicationContext); messageHandler.afterPropertiesSet(); container.afterPropertiesSet(); mockReceiveMessage(sqs, "http://receiveMessage_withMessageListenerMethodAndVisibilityProlonging_callsChangeMessageVisibility.amazonaws.com", "messageContent", "ReceiptHandle"); // Act container.start(); // Assert countDownLatch.await(1L, TimeUnit.SECONDS); verify(sqs, never()).changeMessageVisibilityAsync(any(ChangeMessageVisibilityRequest.class)); TestMessageListenerWithVisibilityProlong testMessageListenerWithVisibilityProlong = applicationContext.getBean(TestMessageListenerWithVisibilityProlong.class); testMessageListenerWithVisibilityProlong.getCountDownLatch().await(1L, TimeUnit.SECONDS); testMessageListenerWithVisibilityProlong.extend(5); verify(sqs, times(1)).changeMessageVisibilityAsync(eq(new ChangeMessageVisibilityRequest("http://receiveMessage_withMessageListenerMethodAndVisibilityProlonging_callsChangeMessageVisibility.amazonaws.com", "ReceiptHandle", 5))); container.stop(); }
@Override public void setVisibilityTimeout(Message message, Integer visibilityTimeoutSeconds) { sqs.changeMessageVisibility(new ChangeMessageVisibilityRequest(queueUrl, message.getReceipt(), visibilityTimeoutSeconds)); }
/** * <p> * Changes the visibility timeout of a specified message in a queue to a new * value. The maximum allowed timeout value you can set the value to is 12 * hours. This means you can't extend the timeout of a message in an * existing queue to more than a total visibility timeout of 12 hours. (For * more information visibility timeout, see <a href= * "http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html" * > Visibility Timeout </a> in the <i>Amazon SQS Developer Guide</i> .) * </p> * <p> * For example, let's say you have a message and its default message * visibility timeout is 30 minutes. You could call * <code>ChangeMessageVisiblity</code> with a value of two hours and the * effective timeout would be two hours and 30 minutes. When that time comes * near you could again extend the time out by calling * ChangeMessageVisiblity, but this time the maximum allowed timeout would * be 9 hours and 30 minutes. * </p> * <p> * <b>NOTE:</b> There is a 120,000 limit for the number of inflight messages * per queue. Messages are inflight after they have been received from the * queue by a consuming component, but have not yet been deleted from the * queue. If you reach the 120,000 limit, you will receive an OverLimit * error message from Amazon SQS. To help avoid reaching the limit, you * should delete the messages from the queue after they have been processed. * You can also increase the number of queues you use to process the * messages. * </p> * <p> * <b>IMPORTANT:</b>If you attempt to set the VisibilityTimeout to an amount * more than the maximum time left, Amazon SQS returns an error. It will not * automatically recalculate and increase the timeout to the maximum time * remaining. * </p> * <p> * <b>IMPORTANT:</b>Unlike with a queue, when you change the visibility * timeout for a specific message, that timeout value is applied immediately * but is not saved in memory for that message. If you don't delete a * message after it is received, the visibility timeout for the message the * next time it is received reverts to the original timeout value, not the * value you set with the ChangeMessageVisibility action. * </p> * * @param changeMessageVisibilityRequest * Container for the necessary parameters to execute the * ChangeMessageVisibility service method on AmazonSQS. * * * @throws ReceiptHandleIsInvalidException * @throws MessageNotInflightException * * @throws AmazonClientException * If any internal errors are encountered inside the client * while attempting to make the request or handle the response. * For example if a network connection is not available. * @throws AmazonServiceException * If an error response is returned by AmazonSQS indicating * either a problem with the data in the request, or a server * side issue. */ public ChangeMessageVisibilityResult changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) throws AmazonServiceException, AmazonClientException { if (isS3ReceiptHandle(changeMessageVisibilityRequest.getReceiptHandle())) { changeMessageVisibilityRequest.setReceiptHandle( getOrigReceiptHandle(changeMessageVisibilityRequest.getReceiptHandle())); } return amazonSqsToBeExtended.changeMessageVisibility(changeMessageVisibilityRequest); }
/** * Calls <code>changeMessageVisibility</code> and wraps <code>AmazonClientException</code>. This is * used to for negative acknowledge of a single message, so that messages can be received again without any delay. * * @param changeMessageVisibilityRequest * Container for the necessary parameters to execute the * changeMessageVisibility service method on AmazonSQS. * @throws JMSException */ public void changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) throws JMSException { try { prepareRequest(changeMessageVisibilityRequest); amazonSQSClient.changeMessageVisibility(changeMessageVisibilityRequest); } catch (AmazonClientException e) { throw handleException(e, "changeMessageVisibility"); } }
/** * <p> * Changes the visibility timeout of a specified message in a queue to a new * value. The maximum allowed timeout value you can set the value to is 12 * hours. This means you can't extend the timeout of a message in an * existing queue to more than a total visibility timeout of 12 hours. (For * more information visibility timeout, see <a href= * "http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html" * > Visibility Timeout </a> in the <i>Amazon SQS Developer Guide</i> .) * </p> * <p> * For example, let's say you have a message and its default message * visibility timeout is 30 minutes. You could call * <code>ChangeMessageVisiblity</code> with a value of two hours and the * effective timeout would be two hours and 30 minutes. When that time comes * near you could again extend the time out by calling * ChangeMessageVisiblity, but this time the maximum allowed timeout would * be 9 hours and 30 minutes. * </p> * <p> * <b>NOTE:</b> There is a 120,000 limit for the number of inflight messages * per queue. Messages are inflight after they have been received from the * queue by a consuming component, but have not yet been deleted from the * queue. If you reach the 120,000 limit, you will receive an OverLimit * error message from Amazon SQS. To help avoid reaching the limit, you * should delete the messages from the queue after they have been processed. * You can also increase the number of queues you use to process the * messages. * </p> * <p> * <b>IMPORTANT:</b>If you attempt to set the VisibilityTimeout to an amount * more than the maximum time left, Amazon SQS returns an error. It will not * automatically recalculate and increase the timeout to the maximum time * remaining. * </p> * <p> * <b>IMPORTANT:</b>Unlike with a queue, when you change the visibility * timeout for a specific message, that timeout value is applied immediately * but is not saved in memory for that message. If you don't delete a * message after it is received, the visibility timeout for the message the * next time it is received reverts to the original timeout value, not the * value you set with the ChangeMessageVisibility action. * </p> * * @param changeMessageVisibilityRequest * Container for the necessary parameters to execute the * ChangeMessageVisibility service method on AmazonSQS. * * * @throws ReceiptHandleIsInvalidException * @throws MessageNotInflightException * * @throws AmazonClientException * If any internal errors are encountered inside the client * while attempting to make the request or handle the response. * For example if a network connection is not available. * @throws AmazonServiceException * If an error response is returned by AmazonSQS indicating * either a problem with the data in the request, or a server * side issue. */ public ChangeMessageVisibilityResult changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) throws AmazonServiceException, AmazonClientException { return amazonSqsToBeExtended.changeMessageVisibility(changeMessageVisibilityRequest); }
/** * Performs the <code>ChangeVisibility</code> action. * * <p> * The following request parameters will be populated from the data of this * <code>Message</code> resource, and any conflicting parameter value set in * the request will be overridden: * <ul> * <li> * <b><code>QueueUrl</code></b> * - mapped from the <code>QueueUrl</code> identifier. * </li> * <li> * <b><code>ReceiptHandle</code></b> * - mapped from the <code>ReceiptHandle</code> identifier. * </li> * </ul> * * <p> * * @see ChangeMessageVisibilityRequest */ void changeVisibility(ChangeMessageVisibilityRequest request);
/** * Performs the <code>ChangeVisibility</code> action and use a ResultCapture * to retrieve the low-level client response. * * <p> * The following request parameters will be populated from the data of this * <code>Message</code> resource, and any conflicting parameter value set in * the request will be overridden: * <ul> * <li> * <b><code>QueueUrl</code></b> * - mapped from the <code>QueueUrl</code> identifier. * </li> * <li> * <b><code>ReceiptHandle</code></b> * - mapped from the <code>ReceiptHandle</code> identifier. * </li> * </ul> * * <p> * * @see ChangeMessageVisibilityRequest */ void changeVisibility(ChangeMessageVisibilityRequest request, ResultCapture<Void> extractor);