@Test public void testPurgeQueue_shouldRemoveAll() { // create queue CreateQueueResult createdQueue = sqs.createQueue(new CreateQueueRequest().withQueueName("tea-earl-grey-queue")); // send messages String messageBody = "{\"life-universe-everything\":42}"; sqs.sendMessage(new SendMessageRequest().withDelaySeconds(0).withMessageBody(messageBody) .withMessageGroupId("some-group-id-123").withQueueUrl(createdQueue.getQueueUrl())); String messageBody2 = "{\"dead-emptyness-nothing\":24}"; sqs.sendMessage(new SendMessageRequest().withDelaySeconds(0).withMessageBody(messageBody2) .withMessageGroupId("some-group-id-123").withQueueUrl(createdQueue.getQueueUrl())); // purge queues PurgeQueueResult result = sqs.purgeQueue(new PurgeQueueRequest().withQueueUrl(createdQueue.getQueueUrl())); assertNotNull("verify that purge queue returned ok", result); // verify empty queue ReceiveMessageResult messageResult = sqs.receiveMessage(new ReceiveMessageRequest() .withMaxNumberOfMessages(9).withQueueUrl(createdQueue.getQueueUrl()).withVisibilityTimeout(10) .withWaitTimeSeconds(0)); assertEquals("verify that queue is empty", 0, messageResult.getMessages().size()); // cleanup getQueues().remove("tea-earl-grey-queue"); }
private boolean processObjects(List<S3ObjectSummary> objects) { Logger.Debug("Scanning next batch of %s ", objects.size()); objects .parallelStream() .filter(this::shouldEnqueue) .forEach(object -> { numSeen.incrementAndGet(); String path = object.getBucketName() + "/" + object.getKey(); Logger.Info("Posting: %s", path); SendMessageRequest msg = new SendMessageRequest() .withQueueUrl(queueUrl) .withMessageBody(path); sqs.sendMessage(msg); }); if (max > -1L && numSeen.incrementAndGet() > max) { Logger.Info("Added max jobs, quitting"); return false; } return true; }
@Override public int send(Channel channel) throws EventDeliveryException { int eventProcessedCounter = 0; Event event = channel.take(); if (event == null || event.getBody().length == 0) { // Don't bother with anything if the channel returns null event or an event with empty body return eventProcessedCounter; } try { this.amazonSQS.sendMessage(new SendMessageRequest(sqsUrl, new String(event.getBody(), "UTF-8").trim())); // This event is processed successfully to increment the counter eventProcessedCounter++; } catch (AmazonServiceException ase) { throw new EventDeliveryException("Failure sending message request to Amazon SQS, " + "the request made it to SQS but was rejected for some reason.", ase); } catch (AmazonClientException ace) { throw new EventDeliveryException("Failure sending message request to Amazon SQS.", ace); } catch (UnsupportedEncodingException e) { throw new EventDeliveryException("Character set UTF-8 not supported.", e); } return eventProcessedCounter; }
@Test public void testSend() throws Exception { BasicSQSMsgSender msgSender = new BasicSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey"); Channel mockChannel = mock(Channel.class); Event mockEvent = mock(Event.class); when(mockEvent.getBody()).thenReturn("This is a test event message".getBytes()); when(mockChannel.take()).thenReturn(mockEvent); AmazonSQS mockSqs = mock(AmazonSQS.class); when(mockSqs.sendMessage(any(SendMessageRequest.class))).thenReturn(new SendMessageResult()); msgSender.setAmazonSQS(mockSqs); int eventCount = msgSender.send(mockChannel); assertEquals(1, eventCount); }
@Test public void testSendEventWithEmptyBody() throws Exception { BasicSQSMsgSender msgSender = new BasicSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey"); Channel mockChannel = mock(Channel.class); Event mockEvent = mock(Event.class); when(mockEvent.getBody()).thenReturn("".getBytes()); when(mockChannel.take()).thenReturn(mockEvent); AmazonSQS mockSqs = mock(AmazonSQS.class); when(mockSqs.sendMessage(any(SendMessageRequest.class))).thenReturn(new SendMessageResult()); msgSender.setAmazonSQS(mockSqs); int eventCount = msgSender.send(mockChannel); assertEquals(0, eventCount); }
@Test(expected = EventDeliveryException.class) public void testSendFailureAmazonServiceException() throws Exception { BasicSQSMsgSender msgSender = new BasicSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey"); Channel mockChannel = mock(Channel.class); Event mockEvent = mock(Event.class); when(mockEvent.getBody()).thenReturn("This is a test event message".getBytes()); when(mockChannel.take()).thenReturn(mockEvent); AmazonSQS mockSqs = mock(AmazonSQS.class); when(mockSqs.sendMessage(any(SendMessageRequest.class))) .thenThrow(new AmazonServiceException("Mock AmazonServiceException")); msgSender.setAmazonSQS(mockSqs); msgSender.send(mockChannel); }
@Test(expected = EventDeliveryException.class) public void testSendFailureAmazonClientException() throws Exception { BasicSQSMsgSender msgSender = new BasicSQSMsgSender("https://some-fake/url", "us-east-1", "someAwsAccessKey", "someAwsSecretKey"); Channel mockChannel = mock(Channel.class); Event mockEvent = mock(Event.class); when(mockEvent.getBody()).thenReturn("This is a test event message".getBytes()); when(mockChannel.take()).thenReturn(mockEvent); AmazonSQS mockSqs = mock(AmazonSQS.class); when(mockSqs.sendMessage(any(SendMessageRequest.class))) .thenThrow(new AmazonClientException("Mock AmazonClientException")); msgSender.setAmazonSQS(mockSqs); msgSender.send(mockChannel); }
@Test public void testShouldGetFailedResponseAfterSendingTheEvent() { final GetQueueUrlResult getQueueUrlResult = new GetQueueUrlResult(); getQueueUrlResult.setQueueUrl(randomAlphabetic(10)); final SendMessageResult sendMessageResult = new SendMessageResult(); final SdkHttpMetadata responseMetadata = mock(SdkHttpMetadata.class); when(responseMetadata.getHttpStatusCode()).thenReturn(400); sendMessageResult.setSdkHttpMetadata(responseMetadata); when(amazonSQS.sendMessage(any(SendMessageRequest.class))).thenThrow(new RuntimeException("expected")); assertThatThrownBy(() -> sqsErrorHandler.onError(randomAlphabetic(10), new RuntimeException(), EventTypePartition.of(EventType.of(randomAlphabetic(10)), randomAlphabetic(1)), randomNumeric(10), randomAlphabetic(50))) .isInstanceOf(RuntimeException.class).hasMessageContaining("expected"); }
@Test public void testShouldSendEventToSQS() throws JsonProcessingException { final SendMessageResult sendMessageResult = new SendMessageResult(); final SdkHttpMetadata responseMetadata = mock(SdkHttpMetadata.class); when(responseMetadata.getHttpStatusCode()).thenReturn(200); sendMessageResult.setSdkHttpMetadata(responseMetadata); when(amazonSQS.sendMessage(any(SendMessageRequest.class))).thenReturn(sendMessageResult); sqsErrorHandler.onError(randomAlphabetic(10), new RuntimeException(), EventTypePartition.of(EventType.of(randomAlphabetic(10)), randomAlphabetic(1)), randomNumeric(10), randomAlphabetic(50)); verify(objectMapper).writeValueAsString(anyString()); verify(amazonSQS).sendMessage(any(SendMessageRequest.class)); }
public void process(Exchange exchange) throws Exception { String body = exchange.getIn().getBody(String.class); SendMessageRequest request = new SendMessageRequest(getQueueUrl(), body); request.setMessageAttributes(translateAttributes(exchange.getIn().getHeaders(), exchange)); addDelay(request, exchange); LOG.trace("Sending request [{}] from exchange [{}]...", request, exchange); SendMessageResult result = getClient().sendMessage(request); LOG.trace("Received result [{}]", result); Message message = getMessageForResponse(exchange); message.setHeader(SqsConstants.MESSAGE_ID, result.getMessageId()); message.setHeader(SqsConstants.MD5_OF_BODY, result.getMD5OfMessageBody()); }
@Before public void setup() throws Exception { underTest = new SqsProducer(sqsEndpoint); sendMessageResult = new SendMessageResult().withMD5OfMessageBody(MESSAGE_MD5).withMessageId(MESSAGE_ID); sqsConfiguration = new SqsConfiguration(); HeaderFilterStrategy headerFilterStrategy = new SqsHeaderFilterStrategy(); sqsConfiguration.setDelaySeconds(Integer.valueOf(0)); when(sqsEndpoint.getClient()).thenReturn(amazonSQSClient); when(sqsEndpoint.getConfiguration()).thenReturn(sqsConfiguration); when(amazonSQSClient.sendMessage(any(SendMessageRequest.class))).thenReturn(sendMessageResult); when(exchange.getOut()).thenReturn(outMessage); when(exchange.getIn()).thenReturn(inMessage); when(exchange.getPattern()).thenReturn(ExchangePattern.InOnly); when(inMessage.getBody(String.class)).thenReturn(SAMPLE_MESSAGE_BODY); when(sqsEndpoint.getQueueUrl()).thenReturn(QUEUE_URL); when(sqsEndpoint.getHeaderFilterStrategy()).thenReturn(headerFilterStrategy); }
@Test public void isAttributeMessageStringHeaderOnTheRequest() throws Exception { Map<String, Object> headers = new HashMap<String, Object>(); headers.put(SAMPLE_MESSAGE_HEADER_NAME_1, SAMPLE_MESSAGE_HEADER_VALUE_1); when(inMessage.getHeaders()).thenReturn(headers); underTest.process(exchange); ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class); verify(amazonSQSClient).sendMessage(capture.capture()); assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_1, capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_1) .getStringValue()); assertNull(capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_1) .getBinaryValue()); }
@Test public void isAttributeMessageByteBufferHeaderOnTheRequest() throws Exception { Map<String, Object> headers = new HashMap<String, Object>(); headers.put(SAMPLE_MESSAGE_HEADER_NAME_2, SAMPLE_MESSAGE_HEADER_VALUE_2); when(inMessage.getHeaders()).thenReturn(headers); underTest.process(exchange); ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class); verify(amazonSQSClient).sendMessage(capture.capture()); assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_2, capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_2) .getBinaryValue()); assertNull(capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_2) .getStringValue()); }
@Test public void isAllAttributeMessagesOnTheRequest() throws Exception { Map<String, Object> headers = new HashMap<String, Object>(); headers.put(SAMPLE_MESSAGE_HEADER_NAME_1, SAMPLE_MESSAGE_HEADER_VALUE_1); headers.put(SAMPLE_MESSAGE_HEADER_NAME_2, SAMPLE_MESSAGE_HEADER_VALUE_2); headers.put(SAMPLE_MESSAGE_HEADER_NAME_3, SAMPLE_MESSAGE_HEADER_VALUE_3); headers.put(SAMPLE_MESSAGE_HEADER_NAME_4, SAMPLE_MESSAGE_HEADER_VALUE_4); when(inMessage.getHeaders()).thenReturn(headers); underTest.process(exchange); ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class); verify(amazonSQSClient).sendMessage(capture.capture()); assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_1, capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_1) .getStringValue()); assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_2, capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_2) .getBinaryValue()); assertEquals(SAMPLE_MESSAGE_HEADER_VALUE_3, capture.getValue().getMessageAttributes().get(SAMPLE_MESSAGE_HEADER_NAME_3) .getStringValue()); assertEquals(3, capture.getValue().getMessageAttributes().size()); }
@Override public SendMessageResult sendMessage(SendMessageRequest sendMessageRequest) throws AmazonServiceException, AmazonClientException { Message message = new Message(); message.setBody(sendMessageRequest.getMessageBody()); message.setMD5OfBody("6a1559560f67c5e7a7d5d838bf0272ee"); message.setMessageId("f6fb6f99-5eb2-4be4-9b15-144774141458"); message.setReceiptHandle("0NNAq8PwvXsyZkR6yu4nQ07FGxNmOBWi5zC9+4QMqJZ0DJ3gVOmjI2Gh/oFnb0IeJqy5Zc8kH4JX7GVpfjcEDjaAPSeOkXQZRcaBqt" + "4lOtyfj0kcclVV/zS7aenhfhX5Ixfgz/rHhsJwtCPPvTAdgQFGYrqaHly+etJiawiNPVc="); synchronized (messages) { messages.add(message); } SendMessageResult result = new SendMessageResult(); result.setMessageId("f6fb6f99-5eb2-4be4-9b15-144774141458"); result.setMD5OfMessageBody("6a1559560f67c5e7a7d5d838bf0272ee"); return result; }
@Test public void shouldSendMessageWithCorrectAttributes() { //GIVEN String body = "Sample text message"; Map<String, MessageAttributeValue> attributes = new HashMap<>(); attributes.put("attribute1", new MessageAttributeValue() .withDataType("String") .withStringValue("value1")); attributes.put("attribute2", new MessageAttributeValue() .withDataType("Number") .withStringValue("230.000000000000000001")); //WHEN sender.send(body, attributes); //THEN SendMessageRequest expected = new SendMessageRequest(); expected.withQueueUrl(queueUrl) .withMessageBody(body) .withMessageAttributes(attributes); verify(sqs).sendMessage(expected); }
@Test public void shouldSendObjectMessageWithCorrectAttributes() throws JsonProcessingException { //GIVEN DummyObject bodyObject = new DummyObject(); Map<String, MessageAttributeValue> attributes = new HashMap<>(); attributes.put("attribute1", new MessageAttributeValue() .withDataType("String") .withStringValue("value1")); attributes.put("attribute2", new MessageAttributeValue() .withDataType("Number") .withStringValue("230.000000000000000001")); //WHEN sender.send(bodyObject, attributes); //THEN SendMessageRequest expected = new SendMessageRequest(); expected.withQueueUrl(queueUrl) .withMessageBody(objectMapper.writeValueAsString(bodyObject)) .withMessageAttributes(attributes); verify(sqs).sendMessage(expected); }
@Override public void write(byte[] bytes) throws IOException { if (bytes == null || bytes.length == 0) { return; } final String msg = new String(bytes); if (bytes.length > maxMessageSizeInKB * 1024) { addWarn(format("Logging event '%s' exceeds the maximum size of %dkB", msg, maxMessageSizeInKB)); return; } sqs.sendMessageAsync(new SendMessageRequest(queueUrl, msg), new AsyncHandler<SendMessageRequest, SendMessageResult>() { public void onError(Exception exception) { addWarn(format("Appender '%s' failed to send logging event '%s' to '%s'", getName(), msg, queueUrl), exception); } public void onSuccess(SendMessageRequest request, SendMessageResult result) { /** noop **/ } }); }
@Override public Properties send(Properties properties, Object message) throws ConnectorException { String access_key_id = properties.getProperty("AccessKeyId"); String secret_access_key = properties.getProperty("SecretAccessKey"); BasicAWSCredentials credentials = new BasicAWSCredentials(access_key_id, secret_access_key); AmazonSQS sqs = new AmazonSQSClient(credentials); //System.out.println(properties.getProperty("region")); // Region selection Region region = Region.getRegion(Regions.fromName(properties.getProperty("region"))); sqs.setRegion(region); GetQueueUrlResult queueUrl = sqs.getQueueUrl(properties.getProperty("Queue")); String messageStr = new String((byte[])message); sqs.sendMessage(new SendMessageRequest(queueUrl.getQueueUrl(), messageStr)); return properties; }
@Test public void sendAndReceiveMessage() { final String queueName = "bizo"; final String messageBody = "hi everybody"; final CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName); sqs.createQueue(createQueueRequest); final GetQueueUrlRequest getQueueUrlRequest = new GetQueueUrlRequest().withQueueName(queueName); final GetQueueUrlResult getQueueUrlResult = sqs.getQueueUrl(getQueueUrlRequest); final String queueUrl = getQueueUrlResult.getQueueUrl(); final SendMessageRequest sendMessageRequest = new SendMessageRequest().withQueueUrl(queueUrl).withMessageBody(messageBody); sqs.sendMessage(sendMessageRequest); final int maxNumberOfMessages = 10; final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(maxNumberOfMessages); final ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(receiveMessageRequest); final List<Message> messages = receiveMessageResult.getMessages(); assertThat(messages.size(), equalTo(1)); assertThat(messages.get(0).getBody(), equalTo(messageBody)); }
@SuppressWarnings("unchecked") @Override public void run() { //Get queue url GetQueueUrlResult urlResult = sqs.getQueueUrl(responseQName); String QueueUrl = urlResult.getQueueUrl(); JSONObject result = new JSONObject(); try { Thread.sleep(sleepLength); result.put("task_id", task_id); result.put("result", "0"); sqs.sendMessage(new SendMessageRequest(QueueUrl, result.toString())); //System.out.println(Thread.currentThread().getName()+" sleep done!"); } catch (Exception e) { result.put("task_id", task_id); result.put("result", "1"); sqs.sendMessage(new SendMessageRequest(QueueUrl, result.toString())); } }
private SendMessageRequest prepareSendMessageRequest(Message<?> message) { SendMessageRequest sendMessageRequest = new SendMessageRequest(this.queueUrl, String.valueOf(message.getPayload())); if (message.getHeaders().containsKey(SqsMessageHeaders.SQS_GROUP_ID_HEADER)) { sendMessageRequest.setMessageGroupId(message.getHeaders().get(SqsMessageHeaders.SQS_GROUP_ID_HEADER, String.class)); } if (message.getHeaders().containsKey(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER)) { sendMessageRequest.setMessageDeduplicationId(message.getHeaders().get(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER, String.class)); } if (message.getHeaders().containsKey(SqsMessageHeaders.SQS_DELAY_HEADER)) { sendMessageRequest.setDelaySeconds(message.getHeaders().get(SqsMessageHeaders.SQS_DELAY_HEADER, Integer.class)); } Map<String, MessageAttributeValue> messageAttributes = getMessageAttributes(message); if (!messageAttributes.isEmpty()) { sendMessageRequest.withMessageAttributes(messageAttributes); } return sendMessageRequest; }
@Test public void sendMessage_validTextMessage_returnsTrue() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult()); Message<String> stringMessage = MessageBuilder.withPayload("message content").build(); MessageChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act boolean sent = messageChannel.send(stringMessage); // Assert verify(amazonSqs, only()).sendMessage(any(SendMessageRequest.class)); assertEquals("message content", sendMessageRequestArgumentCaptor.getValue().getMessageBody()); assertTrue(sent); }
@Test public void sendMessage_serviceThrowsError_throwsMessagingException() throws Exception { //Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); Message<String> stringMessage = MessageBuilder.withPayload("message content").build(); MessageChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); when(amazonSqs.sendMessage(new SendMessageRequest("http://testQueue", "message content").withDelaySeconds(0) .withMessageAttributes(isNotNull()))). thenThrow(new AmazonServiceException("wanted error")); //Assert this.expectedException.expect(MessagingException.class); this.expectedException.expectMessage("wanted error"); //Act messageChannel.send(stringMessage); }
@Test public void sendMessage_withMimeTypeAsStringHeader_shouldPassItAsMessageAttribute() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); QueueMessageChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); String mimeTypeAsString = new MimeType("test", "plain", Charset.forName("UTF-8")).toString(); Message<String> message = MessageBuilder.withPayload("Hello").setHeader(MessageHeaders.CONTENT_TYPE, mimeTypeAsString).build(); ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult()); // Act boolean sent = messageChannel.send(message); // Assert assertTrue(sent); assertEquals(mimeTypeAsString, sendMessageRequestArgumentCaptor.getValue().getMessageAttributes().get(MessageHeaders.CONTENT_TYPE).getStringValue()); }
@Test public void sendMessage_withMimeTypeHeader_shouldPassItAsMessageAttribute() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); QueueMessageChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); MimeType mimeType = new MimeType("test", "plain", Charset.forName("UTF-8")); Message<String> message = MessageBuilder.withPayload("Hello").setHeader(MessageHeaders.CONTENT_TYPE, mimeType).build(); ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult()); // Act boolean sent = messageChannel.send(message); // Assert assertTrue(sent); assertEquals(mimeType.toString(), sendMessageRequestArgumentCaptor.getValue().getMessageAttributes().get(MessageHeaders.CONTENT_TYPE).getStringValue()); }
@Test public void sendMessage_withStringMessageHeader_shouldBeSentAsQueueMessageAttribute() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); QueueMessageChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); String headerValue = "Header value"; String headerName = "MyHeader"; Message<String> message = MessageBuilder.withPayload("Hello").setHeader(headerName, headerValue).build(); ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult()); // Act boolean sent = messageChannel.send(message); // Assert assertTrue(sent); assertEquals(headerValue, sendMessageRequestArgumentCaptor.getValue().getMessageAttributes().get(headerName).getStringValue()); assertEquals(MessageAttributeDataTypes.STRING, sendMessageRequestArgumentCaptor.getValue().getMessageAttributes().get(headerName).getDataType()); }
@Test public void sendMessage_withBinaryMessageHeader_shouldBeSentAsBinaryMessageAttribute() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); QueueMessageChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); ByteBuffer headerValue = ByteBuffer.wrap("My binary data!".getBytes()); String headerName = "MyHeader"; Message<String> message = MessageBuilder.withPayload("Hello").setHeader(headerName, headerValue).build(); ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult()); // Act boolean sent = messageChannel.send(message); // Assert assertTrue(sent); assertEquals(headerValue, sendMessageRequestArgumentCaptor.getValue().getMessageAttributes().get(headerName).getBinaryValue()); assertEquals(MessageAttributeDataTypes.BINARY, sendMessageRequestArgumentCaptor.getValue().getMessageAttributes().get(headerName).getDataType()); }
@Test public void sendMessage_withUuidAsId_shouldConvertUuidToString() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); QueueMessageChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); Message<String> message = MessageBuilder.withPayload("Hello").build(); UUID uuid = (UUID) message.getHeaders().get(MessageHeaders.ID); ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult()); // Act boolean sent = messageChannel.send(message); // Assert assertTrue(sent); assertEquals(uuid.toString(), sendMessageRequestArgumentCaptor.getValue().getMessageAttributes().get(MessageHeaders.ID).getStringValue()); }
@Test @SuppressWarnings("unchecked") public void sendMessage_withTimeout_sendsMessageAsyncAndReturnsTrueOnceFutureCompleted() throws Exception { // Arrange Future<SendMessageResult> future = mock(Future.class); when(future.get(1000, TimeUnit.MILLISECONDS)).thenReturn(new SendMessageResult()); AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); when(amazonSqs.sendMessageAsync(any(SendMessageRequest.class))).thenReturn(future); QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act boolean result = queueMessageChannel.send(MessageBuilder.withPayload("Hello").build(), 1000); // Assert assertTrue(result); verify(amazonSqs, only()).sendMessageAsync(any(SendMessageRequest.class)); }
@Test @SuppressWarnings("unchecked") public void sendMessage_withSendMessageAsyncTakingMoreTimeThanSpecifiedTimeout_returnsFalse() throws Exception { // Arrange Future<SendMessageResult> future = mock(Future.class); when(future.get(1000, TimeUnit.MILLISECONDS)).thenThrow(new TimeoutException()); AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); when(amazonSqs.sendMessageAsync(any(SendMessageRequest.class))).thenReturn(future); QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act boolean result = queueMessageChannel.send(MessageBuilder.withPayload("Hello").build(), 1000); // Assert assertFalse(result); }
@Test @SuppressWarnings("unchecked") public void sendMessage_withExecutionExceptionWhileSendingAsyncMessage_throwMessageDeliveryException() throws Exception { // Arrange Future<SendMessageResult> future = mock(Future.class); when(future.get(1000, TimeUnit.MILLISECONDS)).thenThrow(new ExecutionException(new Exception())); AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); when(amazonSqs.sendMessageAsync(any(SendMessageRequest.class))).thenReturn(future); QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Assert this.expectedException.expect(MessageDeliveryException.class); // Act queueMessageChannel.send(MessageBuilder.withPayload("Hello").build(), 1000); }
@Test public void sendMessage_withDelayHeader_shouldSetDelayOnSendMessageRequestAndNotSetItAsHeaderAsMessageAttribute() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult()); QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); Message<String> message = MessageBuilder.withPayload("Hello").setHeader(SqsMessageHeaders.SQS_DELAY_HEADER, 15).build(); // Act queueMessageChannel.send(message); // Assert SendMessageRequest sendMessageRequest = sendMessageRequestArgumentCaptor.getValue(); assertEquals(new Integer(15), sendMessageRequest.getDelaySeconds()); assertFalse(sendMessageRequest.getMessageAttributes().containsKey(SqsMessageHeaders.SQS_DELAY_HEADER)); }
@Test public void sendMessage_withoutDelayHeader_shouldNotSetDelayOnSendMessageRequestAndNotSetHeaderAsMessageAttribute() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult()); QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); Message<String> message = MessageBuilder.withPayload("Hello").build(); // Act queueMessageChannel.send(message); // Assert SendMessageRequest sendMessageRequest = sendMessageRequestArgumentCaptor.getValue(); assertNull(sendMessageRequest.getDelaySeconds()); assertFalse(sendMessageRequest.getMessageAttributes().containsKey(SqsMessageHeaders.SQS_DELAY_HEADER)); }
@Test public void sendMessage_withGroupIdHeader_shouldSetGroupIdOnSendMessageRequestAndNotSetItAsHeaderAsMessageAttribute() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult()); QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); Message<String> message = MessageBuilder.withPayload("Hello").setHeader(SqsMessageHeaders.SQS_GROUP_ID_HEADER, "id-5").build(); // Act queueMessageChannel.send(message); // Assert SendMessageRequest sendMessageRequest = sendMessageRequestArgumentCaptor.getValue(); assertEquals("id-5", sendMessageRequest.getMessageGroupId()); assertFalse(sendMessageRequest.getMessageAttributes().containsKey(SqsMessageHeaders.SQS_GROUP_ID_HEADER)); }
@Test public void sendMessage_withDeduplicationIdHeader_shouldSetDeduplicationIdOnSendMessageRequestAndNotSetItAsHeaderAsMessageAttribute() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); when(amazonSqs.sendMessage(sendMessageRequestArgumentCaptor.capture())).thenReturn(new SendMessageResult()); QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); Message<String> message = MessageBuilder.withPayload("Hello").setHeader(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER, "id-5").build(); // Act queueMessageChannel.send(message); // Assert SendMessageRequest sendMessageRequest = sendMessageRequestArgumentCaptor.getValue(); assertEquals("id-5", sendMessageRequest.getMessageDeduplicationId()); assertFalse(sendMessageRequest.getMessageAttributes().containsKey(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER)); }
@Test public void instantiation_WithCustomJacksonConverterThatSupportsJava8Types_shouldConvertMessageToString() throws IOException { // Arrange AmazonSQSAsync amazonSqs = createAmazonSqs(); ObjectMapper objectMapper = Jackson2ObjectMapperBuilder.json().build(); MappingJackson2MessageConverter simpleMessageConverter = new MappingJackson2MessageConverter(); simpleMessageConverter.setSerializedPayloadClass(String.class); simpleMessageConverter.setObjectMapper(objectMapper); QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(amazonSqs, (ResourceIdResolver) null, simpleMessageConverter); // Act queueMessagingTemplate.convertAndSend("test", new TestPerson("Agim", "Emruli", LocalDate.of(2017, 1, 1))); // Assert ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); verify(amazonSqs).sendMessage(sendMessageRequestArgumentCaptor.capture()); TestPerson testPerson = objectMapper.readValue(sendMessageRequestArgumentCaptor.getValue().getMessageBody(), TestPerson.class); assertEquals("Agim", testPerson.getFirstName()); assertEquals("Emruli", testPerson.getLastName()); assertEquals(LocalDate.of(2017, 1, 1), testPerson.getActiveSince()); }
@Test public void instantiation_withDefaultMapping2JacksonConverter_shouldSupportJava8Types() throws IOException { // Arrange AmazonSQSAsync amazonSqs = createAmazonSqs(); ObjectMapper objectMapper = Jackson2ObjectMapperBuilder.json().build(); QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(amazonSqs); // Act queueMessagingTemplate.convertAndSend("test", new TestPerson("Agim", "Emruli", LocalDate.of(2017, 1, 1))); // Assert ArgumentCaptor<SendMessageRequest> sendMessageRequestArgumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); verify(amazonSqs).sendMessage(sendMessageRequestArgumentCaptor.capture()); TestPerson testPerson = objectMapper.readValue(sendMessageRequestArgumentCaptor.getValue().getMessageBody(), TestPerson.class); assertEquals("Agim", testPerson.getFirstName()); assertEquals("Emruli", testPerson.getLastName()); assertEquals(LocalDate.of(2017, 1, 1), testPerson.getActiveSince()); }
/** * Test sendInternal input with SQSTextMessage */ @Test public void testSendInternalSQSTextMessage() throws JMSException { String messageBody = "MyText1"; SQSTextMessage msg = spy(new SQSTextMessage(messageBody)); msg.setStringProperty(SQSMessagingClientConstants.JMSX_GROUP_ID, GROUP_ID); msg.setStringProperty(SQSMessagingClientConstants.JMS_SQS_DEDUPLICATION_ID, DEDUP_ID); when(amazonSQSClient.sendMessage(any(SendMessageRequest.class))) .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID).withSequenceNumber(SEQ_NUMBER)); producer.sendInternal(destination, msg); verify(amazonSQSClient).sendMessage(argThat(new sendMessageRequestMatcher(QUEUE_URL, messageBody, SQSMessage.TEXT_MESSAGE_TYPE, GROUP_ID, DEDUP_ID))); verify(msg).setJMSDestination(destination); verify(msg).setJMSMessageID("ID:" + MESSAGE_ID); verify(msg).setSQSMessageId(MESSAGE_ID); verify(msg).setSequenceNumber(SEQ_NUMBER); }
/** * Test sendInternal input with SQSObjectMessage */ @Test public void testSendInternalSQSObjectMessage() throws JMSException { HashSet<String> set = new HashSet<String>(); set.add("data1"); SQSObjectMessage msg = spy(new SQSObjectMessage(set)); msg.setStringProperty(SQSMessagingClientConstants.JMSX_GROUP_ID, GROUP_ID); msg.setStringProperty(SQSMessagingClientConstants.JMS_SQS_DEDUPLICATION_ID, DEDUP_ID); String msgBody = msg.getMessageBody(); when(amazonSQSClient.sendMessage(any(SendMessageRequest.class))) .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID).withSequenceNumber(SEQ_NUMBER)); producer.sendInternal(destination, msg); verify(amazonSQSClient).sendMessage(argThat(new sendMessageRequestMatcher(QUEUE_URL, msgBody, SQSMessage.OBJECT_MESSAGE_TYPE, GROUP_ID, DEDUP_ID))); verify(msg).setJMSDestination(destination); verify(msg).setJMSMessageID("ID:" + MESSAGE_ID); verify(msg).setSQSMessageId(MESSAGE_ID); verify(msg).setSequenceNumber(SEQ_NUMBER); }