private Exchange createExchange(ExchangePattern pattern, com.amazonaws.services.sqs.model.Message msg) { Exchange exchange = super.createExchange(pattern); Message message = exchange.getIn(); message.setBody(msg.getBody()); message.setHeaders(new HashMap<String, Object>(msg.getAttributes())); message.setHeader(SqsConstants.MESSAGE_ID, msg.getMessageId()); message.setHeader(SqsConstants.MD5_OF_BODY, msg.getMD5OfBody()); message.setHeader(SqsConstants.RECEIPT_HANDLE, msg.getReceiptHandle()); message.setHeader(SqsConstants.ATTRIBUTES, msg.getAttributes()); message.setHeader(SqsConstants.MESSAGE_ATTRIBUTES, msg.getMessageAttributes()); //Need to apply the SqsHeaderFilterStrategy this time HeaderFilterStrategy headerFilterStrategy = getHeaderFilterStrategy(); //add all sqs message attributes as camel message headers so that knowledge of //the Sqs class MessageAttributeValue will not leak to the client for (Entry<String, MessageAttributeValue> entry : msg.getMessageAttributes().entrySet()) { String header = entry.getKey(); Object value = translateValue(entry.getValue()); if (!headerFilterStrategy.applyFilterToExternalHeaders(header, value, exchange)) { message.setHeader(header, value); } } return exchange; }
@Override public SendMessageResult sendMessage(AwsParamsDto awsParamsDto, String queueName, String messageText, List<MessageHeader> messageHeaders) { Map<String, MessageAttributeValue> messageAttributes = null; if (CollectionUtils.isNotEmpty(messageHeaders)) { messageAttributes = new HashMap<>(); for (MessageHeader messageHeader : messageHeaders) { messageAttributes.put(messageHeader.getKey(), new MessageAttributeValue().withDataType("String").withStringValue(messageHeader.getValue())); } } return sqsOperations.sendMessage(queueName, messageText, messageAttributes, awsClientFactory.getAmazonSQSClient(awsParamsDto)); }
@Override public SendMessageResult sendMessage(String queueName, String messageText, Map<String, MessageAttributeValue> messageAttributes, AmazonSQS amazonSQS) { // Throw a throttling exception for a specific queue name for testing purposes. if (queueName.equals(MockAwsOperationsHelper.AMAZON_THROTTLING_EXCEPTION)) { AmazonServiceException throttlingException = new AmazonServiceException("test throttling exception"); throttlingException.setErrorCode("ThrottlingException"); throw throttlingException; } // Throw an illegal state exception for a specific queue name for testing purposes. if (queueName.equals(MOCK_SQS_QUEUE_NOT_FOUND_NAME)) { throw new IllegalStateException(String.format("AWS SQS queue with \"%s\" name not found.", queueName)); } // Nothing else to do in the normal case since our unit tests aren't reading messages once they have been published. return new SendMessageResult().withMessageId(AbstractDaoTest.MESSAGE_ID); }
@Test public void shouldSendMessageWithCorrectAttributes() { //GIVEN String body = "Sample text message"; Map<String, MessageAttributeValue> attributes = new HashMap<>(); attributes.put("attribute1", new MessageAttributeValue() .withDataType("String") .withStringValue("value1")); attributes.put("attribute2", new MessageAttributeValue() .withDataType("Number") .withStringValue("230.000000000000000001")); //WHEN sender.send(body, attributes); //THEN SendMessageRequest expected = new SendMessageRequest(); expected.withQueueUrl(queueUrl) .withMessageBody(body) .withMessageAttributes(attributes); verify(sqs).sendMessage(expected); }
@Test public void shouldSendObjectMessageWithCorrectAttributes() throws JsonProcessingException { //GIVEN DummyObject bodyObject = new DummyObject(); Map<String, MessageAttributeValue> attributes = new HashMap<>(); attributes.put("attribute1", new MessageAttributeValue() .withDataType("String") .withStringValue("value1")); attributes.put("attribute2", new MessageAttributeValue() .withDataType("Number") .withStringValue("230.000000000000000001")); //WHEN sender.send(bodyObject, attributes); //THEN SendMessageRequest expected = new SendMessageRequest(); expected.withQueueUrl(queueUrl) .withMessageBody(objectMapper.writeValueAsString(bodyObject)) .withMessageAttributes(attributes); verify(sqs).sendMessage(expected); }
private int getMsgAttributesSize(Map<String, MessageAttributeValue> msgAttributes) { int totalMsgAttributesSize = 0; for (Entry<String, MessageAttributeValue> entry : msgAttributes.entrySet()) { totalMsgAttributesSize += getStringSizeInBytes(entry.getKey()); MessageAttributeValue entryVal = entry.getValue(); if (entryVal.getDataType() != null) { totalMsgAttributesSize += getStringSizeInBytes(entryVal.getDataType()); } String stringVal = entryVal.getStringValue(); if (stringVal != null) { totalMsgAttributesSize += getStringSizeInBytes(entryVal.getStringValue()); } ByteBuffer binaryVal = entryVal.getBinaryValue(); if (binaryVal != null) { totalMsgAttributesSize += binaryVal.array().length; } } return totalMsgAttributesSize; }
/*** * Categorize the messages into batches per queue * @param messages * @return messageBatches - belonging to one or more queues */ private Map<String, List<SendMessageBatchRequestEntry>> createBatchesForQueues(final List<Message> messages) { final Map<String, List<SendMessageBatchRequestEntry>> messageBatches = new HashMap<String, List<SendMessageBatchRequestEntry>>(); for(Message message : messages){ final Map<String, MessageAttributeValue> attributes = this.toMessageAttrs(message); final SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry() .withId(message.getId()) .withMessageAttributes(attributes) .withMessageBody(message.getBody()); if(!messageBatches.containsKey(message.getQueue())){ messageBatches.put(message.getQueue(), new ArrayList<SendMessageBatchRequestEntry>()); } messageBatches.get(message.getQueue()).add(entry); } return messageBatches; }
private SendMessageRequest prepareSendMessageRequest(Message<?> message) { SendMessageRequest sendMessageRequest = new SendMessageRequest(this.queueUrl, String.valueOf(message.getPayload())); if (message.getHeaders().containsKey(SqsMessageHeaders.SQS_GROUP_ID_HEADER)) { sendMessageRequest.setMessageGroupId(message.getHeaders().get(SqsMessageHeaders.SQS_GROUP_ID_HEADER, String.class)); } if (message.getHeaders().containsKey(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER)) { sendMessageRequest.setMessageDeduplicationId(message.getHeaders().get(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER, String.class)); } if (message.getHeaders().containsKey(SqsMessageHeaders.SQS_DELAY_HEADER)) { sendMessageRequest.setDelaySeconds(message.getHeaders().get(SqsMessageHeaders.SQS_DELAY_HEADER, Integer.class)); } Map<String, MessageAttributeValue> messageAttributes = getMessageAttributes(message); if (!messageAttributes.isEmpty()) { sendMessageRequest.withMessageAttributes(messageAttributes); } return sendMessageRequest; }
private static Map<String, Object> getMessageAttributesAsMessageHeaders(com.amazonaws.services.sqs.model.Message message) { Map<String, Object> messageHeaders = new HashMap<>(); for (Map.Entry<String, MessageAttributeValue> messageAttribute : message.getMessageAttributes().entrySet()) { if (MessageHeaders.CONTENT_TYPE.equals(messageAttribute.getKey())) { messageHeaders.put(MessageHeaders.CONTENT_TYPE, MimeType.valueOf(messageAttribute.getValue().getStringValue())); } else if (MessageHeaders.ID.equals(messageAttribute.getKey())) { messageHeaders.put(MessageHeaders.ID, UUID.fromString(messageAttribute.getValue().getStringValue())); } else if (MessageAttributeDataTypes.STRING.equals(messageAttribute.getValue().getDataType())) { messageHeaders.put(messageAttribute.getKey(), messageAttribute.getValue().getStringValue()); } else if (messageAttribute.getValue().getDataType().startsWith(MessageAttributeDataTypes.NUMBER)) { Object numberValue = getNumberValue(messageAttribute.getValue()); if (numberValue != null) { messageHeaders.put(messageAttribute.getKey(), numberValue); } } else if (MessageAttributeDataTypes.BINARY.equals(messageAttribute.getValue().getDataType())) { messageHeaders.put(messageAttribute.getKey(), messageAttribute.getValue().getBinaryValue()); } } return messageHeaders; }
@Test public void receiveMessage_withMimeTypeMessageAttribute_shouldCopyToHeaders() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); MimeType mimeType = new MimeType("test", "plain", Charset.forName("UTF-8")); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue(mimeType.toString()))))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act Message<?> receivedMessage = messageChannel.receive(); // Assert assertEquals(mimeType, receivedMessage.getHeaders().get(MessageHeaders.CONTENT_TYPE)); }
@Test public void receiveMessage_withStringMessageHeader_shouldBeReceivedAsQueueMessageAttribute() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); String headerValue = "Header value"; String headerName = "MyHeader"; when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(Collections.singletonMap(headerName, new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue(headerValue))))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act Message<?> receivedMessage = messageChannel.receive(); // Assert assertEquals(headerValue, receivedMessage.getHeaders().get(headerName)); }
@Test public void receiveMessage_withIncompatibleNumericMessageHeader_shouldThrowAnException() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); this.expectedException.expect(IllegalArgumentException.class); this.expectedException.expectMessage("Cannot convert String [17] to target class [java.util.concurrent.atomic.AtomicInteger]"); HashMap<String, MessageAttributeValue> messageAttributes = new HashMap<>(); AtomicInteger atomicInteger = new AtomicInteger(17); messageAttributes.put("atomicInteger", new MessageAttributeValue().withDataType(MessageAttributeDataTypes.NUMBER + ".java.util.concurrent.atomic.AtomicInteger").withStringValue(String.valueOf(atomicInteger))); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(messageAttributes))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act messageChannel.receive(); }
@Test public void receiveMessage_withMissingNumericMessageHeaderTargetClass_shouldThrowAnException() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); this.expectedException.expect(MessagingException.class); this.expectedException.expectMessage("Message attribute with value '12' and data type 'Number.class.not.Found' could not be converted" + " into a Number because target class was not found."); HashMap<String, MessageAttributeValue> messageAttributes = new HashMap<>(); messageAttributes.put("classNotFound", new MessageAttributeValue().withDataType(MessageAttributeDataTypes.NUMBER + ".class.not.Found").withStringValue("12")); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(messageAttributes))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act messageChannel.receive(); }
@Test public void receiveMessage_withBinaryMessageHeader_shouldBeReceivedAsByteBufferMessageAttribute() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); ByteBuffer headerValue = ByteBuffer.wrap("My binary data!".getBytes()); String headerName = "MyHeader"; when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(Collections.singletonMap(headerName, new MessageAttributeValue().withDataType(MessageAttributeDataTypes.BINARY).withBinaryValue(headerValue))))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act Message<?> receivedMessage = messageChannel.receive(); // Assert assertEquals(headerValue, receivedMessage.getHeaders().get(headerName)); }
@Test public void receiveMessage_withIdOfTypeString_IdShouldBeConvertedToUuid() throws Exception { // Arrange AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class); UUID uuid = UUID.randomUUID(); when(amazonSqs.receiveMessage(new ReceiveMessageRequest("http://testQueue"). withWaitTimeSeconds(0). withMaxNumberOfMessages(1). withAttributeNames(QueueMessageChannel.ATTRIBUTE_NAMES). withMessageAttributeNames("All"))). thenReturn(new ReceiveMessageResult().withMessages(new com.amazonaws.services.sqs.model.Message().withBody("Hello"). withMessageAttributes(Collections.singletonMap(MessageHeaders.ID, new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue(uuid.toString()))))); PollableChannel messageChannel = new QueueMessageChannel(amazonSqs, "http://testQueue"); // Act Message<?> receivedMessage = messageChannel.receive(); // Assert Object idMessageHeader = receivedMessage.getHeaders().get(MessageHeaders.ID); assertTrue(UUID.class.isInstance(idMessageHeader)); assertEquals(uuid, idMessageHeader); }
/** * Adds the reply-to queue name and url attributes during send as part of the send message * request, if necessary */ private void addReplyToQueueReservedAttributes(Map<String, MessageAttributeValue> messageAttributes, SQSMessage message) throws JMSException { Destination replyTo = message.getJMSReplyTo(); if (replyTo instanceof SQSQueueDestination) { SQSQueueDestination replyToQueue = (SQSQueueDestination)replyTo; /** * This will override the existing attributes if exists. Everything that * has prefix JMS_ is reserved for JMS Provider, but if the user sets that * attribute, it will be overwritten. */ addStringAttribute(messageAttributes, SQSMessage.JMS_SQS_REPLY_TO_QUEUE_NAME, replyToQueue.getQueueName()); addStringAttribute(messageAttributes, SQSMessage.JMS_SQS_REPLY_TO_QUEUE_URL, replyToQueue.getQueueUrl()); } }
private Object translateValue(MessageAttributeValue mav) { Object result = null; if (mav.getStringValue() != null) { result = mav.getStringValue(); } else if (mav.getBinaryValue() != null) { result = mav.getBinaryValue(); } return result; }
@Override public SendMessageResult sendMessage(String queueName, String messageText, Map<String, MessageAttributeValue> messageAttributes, AmazonSQS amazonSQS) { try { return amazonSQS.sendMessage(new SendMessageRequest().withQueueUrl(amazonSQS.getQueueUrl(queueName).getQueueUrl()).withMessageBody(messageText) .withMessageAttributes(messageAttributes)); } catch (QueueDoesNotExistException e) { throw new IllegalStateException(String.format("AWS SQS queue with \"%s\" name not found.", queueName), e); } }
public void send(Object object, Map<String, MessageAttributeValue> attributes) { final String json; try { json = objectMapper.writeValueAsString(object); send(json, attributes); } catch (JsonProcessingException e) { LOGGER.error("Could not send message to SQS, cause is " + e.getMessage(), e); } }
public void send(String body, Map<String, MessageAttributeValue> attributes) { SendMessageRequest sendMessageRequest = new SendMessageRequest(); sendMessageRequest.withQueueUrl(queueUrl); sendMessageRequest.withMessageBody(body); for (Map.Entry<String, MessageAttributeValue> entry : attributes.entrySet()) { sendMessageRequest.addMessageAttributesEntry(entry.getKey(), entry.getValue()); } sqs.sendMessage(sendMessageRequest); }
private SendMessageBatchRequestEntry storeMessageInS3(SendMessageBatchRequestEntry batchEntry) { checkMessageAttributes(batchEntry.getMessageAttributes()); String s3Key = UUID.randomUUID().toString(); // Read the content of the message from message body String messageContentStr = batchEntry.getMessageBody(); Long messageContentSize = getStringSizeInBytes(messageContentStr); // Add a new message attribute as a flag MessageAttributeValue messageAttributeValue = new MessageAttributeValue(); messageAttributeValue.setDataType("Number"); messageAttributeValue.setStringValue(messageContentSize.toString()); batchEntry.addMessageAttributesEntry(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, messageAttributeValue); // Store the message content in S3. storeTextInS3(s3Key, messageContentStr, messageContentSize); LOG.info("S3 object created, Bucket name: " + clientConfiguration.getS3BucketName() + ", Object key: " + s3Key + "."); // Convert S3 pointer (bucket name, key, etc) to JSON string MessageS3Pointer s3Pointer = new MessageS3Pointer(clientConfiguration.getS3BucketName(), s3Key); String s3PointerStr = getJSONFromS3Pointer(s3Pointer); // Storing S3 pointer in the message body. batchEntry.setMessageBody(s3PointerStr); return batchEntry; }
private SendMessageRequest storeMessageInS3(SendMessageRequest sendMessageRequest) { checkMessageAttributes(sendMessageRequest.getMessageAttributes()); String s3Key = UUID.randomUUID().toString(); // Read the content of the message from message body String messageContentStr = sendMessageRequest.getMessageBody(); Long messageContentSize = getStringSizeInBytes(messageContentStr); // Add a new message attribute as a flag MessageAttributeValue messageAttributeValue = new MessageAttributeValue(); messageAttributeValue.setDataType("Number"); messageAttributeValue.setStringValue(messageContentSize.toString()); sendMessageRequest.addMessageAttributesEntry(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, messageAttributeValue); // Store the message content in S3. storeTextInS3(s3Key, messageContentStr, messageContentSize); LOG.info("S3 object created, Bucket name: " + clientConfiguration.getS3BucketName() + ", Object key: " + s3Key + "."); // Convert S3 pointer (bucket name, key, etc) to JSON string MessageS3Pointer s3Pointer = new MessageS3Pointer(clientConfiguration.getS3BucketName(), s3Key); String s3PointerStr = getJSONFromS3Pointer(s3Pointer); // Storing S3 pointer in the message body. sendMessageRequest.setMessageBody(s3PointerStr); return sendMessageRequest; }
@Test public void testWhenSmallMessageIsSentThenNoAttributeIsAdded() { int messageLength = LESS_THAN_SQS_SIZE_LIMIT; String messageBody = generateStringWithLength(messageLength); SendMessageRequest messageRequest = new SendMessageRequest(SQS_QUEUE_URL, messageBody); extendedSqsWithDefaultConfig.sendMessage(messageRequest); ArgumentCaptor<SendMessageRequest> sendMessageRequestCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); verify(mockSqsBackend).sendMessage(sendMessageRequestCaptor.capture()); Map<String, MessageAttributeValue> attributes = sendMessageRequestCaptor.getValue().getMessageAttributes(); Assert.assertTrue(attributes.isEmpty()); }
@Test public void testWhenLargeMessgaeIsSentThenAttributeWithPayloadSizeIsAdded() { int messageLength = MORE_THAN_SQS_SIZE_LIMIT; String messageBody = generateStringWithLength(messageLength); SendMessageRequest messageRequest = new SendMessageRequest(SQS_QUEUE_URL, messageBody); extendedSqsWithDefaultConfig.sendMessage(messageRequest); ArgumentCaptor<SendMessageRequest> sendMessageRequestCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); verify(mockSqsBackend).sendMessage(sendMessageRequestCaptor.capture()); Map<String, MessageAttributeValue> attributes = sendMessageRequestCaptor.getValue().getMessageAttributes(); Assert.assertEquals("Number", attributes.get(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME).getDataType()); Assert.assertEquals(messageLength, (int)Integer.valueOf(attributes.get(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME).getStringValue())); }
/** * Convert Message headers from {@link Message} to SQS {@link MessageAttributeValue} Map entries * @param message * @return messageAttributes */ private Map<String, MessageAttributeValue> toMessageAttrs(Message message){ if(message!=null && message.getHeaders() != null && message.getHeaders().size()>0){ final Map<String, MessageAttributeValue> messageAttrs = Maps.newHashMap(); for(Entry<String, String> attr : message.getHeaders().entrySet()){ messageAttrs.put(attr.getKey(), new MessageAttributeValue().withStringValue(attr.getValue())); } return messageAttrs; } return null; }
/** * Tests sending of message with message attributes. Asserts that the * message received has the attributes. Also changes the visibility of the * messages and tries to retrieve them. Performs delete action on the * message to the delete it from the queue. */ @Test @Ignore public void testSendReceiveMessageAttributes() throws InterruptedException { SendMessageResult sendMessageResult = queue .sendMessage(new SendMessageRequest().withMessageBody( TEST_MESSAGE_ATTRIBUTES).withMessageAttributes( ImmutableMapParameter.of( "testAttribute", new MessageAttributeValue().withDataType( "String").withStringValue( "testAttributeValue")))); List<Message> messages = waitForMessagesFromQueue(new ReceiveMessageRequest() .withMessageAttributeNames("testAttribute")); assertNotNull(messages); assertEquals(1, messages.size()); Message message = messages.get(0); assertMessage(TEST_MESSAGE_ATTRIBUTES, sendMessageResult.getMessageId(), sendMessageResult.getMD5OfMessageBody(), message); Map<String, MessageAttributeValue> messageAttributes = message .getMessageAttributes(); assertNotNull(messageAttributes); assertTrue(messageAttributes.containsKey("testAttribute")); assertEquals(messageAttributes.get("testAttribute").getStringValue(), "testAttributeValue"); message.changeVisibility(10); messages = waitForMessagesFromQueue(null); message.delete(); }
private Map<String, MessageAttributeValue> getMessageAttributes(Message<?> message) { HashMap<String, MessageAttributeValue> messageAttributes = new HashMap<>(); for (Map.Entry<String, Object> messageHeader : message.getHeaders().entrySet()) { String messageHeaderName = messageHeader.getKey(); Object messageHeaderValue = messageHeader.getValue(); if (isSkipHeader(messageHeaderName)) { continue; } if (MessageHeaders.CONTENT_TYPE.equals(messageHeaderName) && messageHeaderValue != null) { messageAttributes.put(messageHeaderName, getContentTypeMessageAttribute(messageHeaderValue)); } else if (MessageHeaders.ID.equals(messageHeaderName) && messageHeaderValue != null) { messageAttributes.put(messageHeaderName, getStringMessageAttribute(messageHeaderValue.toString())); } else if (messageHeaderValue instanceof String) { messageAttributes.put(messageHeaderName, getStringMessageAttribute((String) messageHeaderValue)); } else if (messageHeaderValue instanceof Number) { messageAttributes.put(messageHeaderName, getNumberMessageAttribute(messageHeaderValue)); } else if (messageHeaderValue instanceof ByteBuffer) { messageAttributes.put(messageHeaderName, getBinaryMessageAttribute((ByteBuffer) messageHeaderValue)); } else { this.logger.warn(String.format("Message header with name '%s' and type '%s' cannot be sent as" + " message attribute because it is not supported by SQS.", messageHeaderName, messageHeaderValue != null ? messageHeaderValue.getClass().getName() : "")); } } return messageAttributes; }
private MessageAttributeValue getContentTypeMessageAttribute(Object messageHeaderValue) { if (messageHeaderValue instanceof MimeType) { return new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue(messageHeaderValue.toString()); } else if (messageHeaderValue instanceof String) { return new MessageAttributeValue().withDataType(MessageAttributeDataTypes.STRING).withStringValue((String) messageHeaderValue); } return null; }
private static Object getNumberValue(MessageAttributeValue value) { String numberType = value.getDataType().substring(MessageAttributeDataTypes.NUMBER.length() + 1); try { Class<? extends Number> numberTypeClass = Class.forName(numberType).asSubclass(Number.class); return NumberUtils.parseNumber(value.getStringValue(), numberTypeClass); } catch (ClassNotFoundException e) { throw new MessagingException(String.format("Message attribute with value '%s' and data type '%s' could not be converted " + "into a Number because target class was not found.", value.getStringValue(), value.getDataType()), e); } }
/** * Not verified on the client side, but SQS Attribute names must be valid * letter or digit on the basic multilingual plane in addition to allowing * '_', '-' and '.'. No component of an attribute name may be empty, thus an * attribute name may neither start nor end in '.'. And it may not contain * "..". */ Map<String, MessageAttributeValue> propertyToMessageAttribute(SQSMessage message) throws JMSException { Map<String, MessageAttributeValue> messageAttributes = new HashMap<String, MessageAttributeValue>(); Enumeration<String> propertyNames = message.getPropertyNames(); while (propertyNames.hasMoreElements()) { String propertyName = propertyNames.nextElement(); // This is generated from SQS message attribute "ApproximateReceiveCount" if (propertyName.equals(SQSMessagingClientConstants.JMSX_DELIVERY_COUNT)) { continue; } // This property will be used as DeduplicationId argument of SendMessage call // On receive it is mapped back to this JMS property if (propertyName.equals(SQSMessagingClientConstants.JMS_SQS_DEDUPLICATION_ID)) { continue; } // the JMSXGroupID and JMSXGroupSeq are always stored as message // properties, so they are not lost between send and receive // even though SQS Classic does not respect those values when returning messages // and SQS FIFO has a different understanding of message groups JMSMessagePropertyValue propertyObject = message.getJMSMessagePropertyValue(propertyName); MessageAttributeValue messageAttributeValue = new MessageAttributeValue(); messageAttributeValue.setDataType(propertyObject.getType()); messageAttributeValue.setStringValue(propertyObject.getStringMessageAttributeValue()); messageAttributes.put(propertyName, messageAttributeValue); } return messageAttributes; }
/** * Convenience method for adding a single string attribute. */ private void addStringAttribute(Map<String, MessageAttributeValue> messageAttributes, String key, String value) { MessageAttributeValue messageAttributeValue = new MessageAttributeValue(); messageAttributeValue.setDataType(SQSMessagingClientConstants.STRING); messageAttributeValue.setStringValue(value); messageAttributes.put(key, messageAttributeValue); }
/** * Test propertyToMessageAttribute with empty messages of different type */ @Test public void testPropertyToMessageAttributeWithEmpty() throws JMSException { /* * Test Empty text message default attribute */ SQSMessage sqsText = new SQSTextMessage(); Map<String, MessageAttributeValue> messageAttributeText = producer.propertyToMessageAttribute(sqsText); assertEquals(0, messageAttributeText.size()); /* * Test Empty object message default attribute */ SQSMessage sqsObject = new SQSObjectMessage(); Map<String, MessageAttributeValue> messageAttributeObject = producer.propertyToMessageAttribute(sqsObject); assertEquals(0, messageAttributeObject.size()); /* * Test Empty byte message default attribute */ MessageAttributeValue messageAttributeValueByte = new MessageAttributeValue(); messageAttributeValueByte.setDataType("String"); messageAttributeValueByte.setStringValue("byte"); SQSMessage sqsByte = new SQSBytesMessage(); Map<String, MessageAttributeValue> messageAttributeByte = producer.propertyToMessageAttribute(sqsByte); assertEquals(0, messageAttributeObject.size()); }
/** * Test sendInternal input with SQSTextMessage */ @Test public void testSendInternalSQSTextMessage() throws JMSException { String messageBody1 = "MyText1"; String messageBody2 = "MyText2"; SQSTextMessage msg = spy(new SQSTextMessage(messageBody1)); Map<String, MessageAttributeValue> messageAttributes = createMessageAttribute("text"); when(amazonSQSClient.sendMessage(any(SendMessageRequest.class))) .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1)) .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_2)); producer.sendInternal(destination, msg); /* * Re send the message */ msg.setText(messageBody2); producer.sendInternal(destination, msg); List<String> messagesBody = Arrays.asList(messageBody1, messageBody2); verify(amazonSQSClient, times(2)).sendMessage(argThat(new sendMessageRequestMatcher(QUEUE_URL, messagesBody, messageAttributes))); verify(msg, times(2)).setJMSDestination(destination); verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_1); verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_2); verify(msg).setSQSMessageId(MESSAGE_ID_1); verify(msg).setSQSMessageId(MESSAGE_ID_2); }
/** * Test sendInternal input with SQSTextMessage */ @Test public void testSendInternalSQSTextMessageFromReceivedMessage() throws JMSException { /* * Set up non JMS sqs message */ Map<String,MessageAttributeValue> mapMessageAttributes = new HashMap<String, MessageAttributeValue>(); MessageAttributeValue messageAttributeValue = new MessageAttributeValue(); messageAttributeValue.setStringValue(SQSMessage.TEXT_MESSAGE_TYPE); messageAttributeValue.setDataType(SQSMessagingClientConstants.STRING); mapMessageAttributes.put(SQSMessage.JMS_SQS_MESSAGE_TYPE, messageAttributeValue); Map<String, String> mapAttributes = new HashMap<String, String>(); mapAttributes.put(SQSMessagingClientConstants.APPROXIMATE_RECEIVE_COUNT, "1"); com.amazonaws.services.sqs.model.Message message = new com.amazonaws.services.sqs.model.Message() .withMessageAttributes(mapMessageAttributes) .withAttributes(mapAttributes) .withBody("MessageBody"); SQSTextMessage msg = spy(new SQSTextMessage(acknowledger, QUEUE_URL, message)); when(amazonSQSClient.sendMessage(any(SendMessageRequest.class))) .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1)); producer.sendInternal(destination, msg); List<String> messagesBody = Arrays.asList("MessageBody"); verify(amazonSQSClient).sendMessage(argThat(new sendMessageRequestMatcher(QUEUE_URL, messagesBody, mapMessageAttributes))); verify(msg).setJMSDestination(destination); verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_1); verify(msg).setSQSMessageId(MESSAGE_ID_1); }
/** * Test sendInternal input with SQSObjectMessage */ @Test public void testSendInternalSQSObjectMessage() throws JMSException { HashSet<String> set1 = new HashSet<String>(); set1.add("data1"); HashSet<String> set2 = new HashSet<String>(); set2.add("data2"); SQSObjectMessage msg = spy(new SQSObjectMessage(set1)); String megBody1 = msg.getMessageBody(); Map<String, MessageAttributeValue> messageAttributes = createMessageAttribute("object"); when(amazonSQSClient.sendMessage(any(SendMessageRequest.class))) .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1)) .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_2)); producer.sendInternal(destination, msg); /* * Re send the message */ msg.clearBody(); msg.setObject(set2); String megBody2 = msg.getMessageBody(); producer.sendInternal(destination, msg); ArgumentCaptor<SendMessageRequest> argumentCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); verify(amazonSQSClient, times(2)).sendMessage(argumentCaptor.capture()); assertEquals(megBody1, argumentCaptor.getAllValues().get(0).getMessageBody()); assertEquals(megBody2, argumentCaptor.getAllValues().get(1).getMessageBody()); verify(msg, times(2)).setJMSDestination(destination); verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_1); verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_2); verify(msg).setSQSMessageId(MESSAGE_ID_1); verify(msg).setSQSMessageId(MESSAGE_ID_2); }
/** * Test sendInternal input with SQSByteMessage */ @Test public void testSendInternalSQSByteMessage() throws JMSException { SQSBytesMessage msg = spy(new SQSBytesMessage()); msg.writeByte((byte)0); msg.reset(); Map<String, MessageAttributeValue> messageAttributes = createMessageAttribute("byte"); String messageId = "MessageId"; when(amazonSQSClient.sendMessage(any(SendMessageRequest.class))) .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1)) .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_2)); producer.sendInternal(destination, msg); /* * Re send the message */ msg.clearBody(); msg.writeInt(42); producer.sendInternal(destination, msg); List<String> messagesBody = Arrays.asList("AA==", "AAAAKg=="); verify(amazonSQSClient, times(2)).sendMessage(argThat(new sendMessageRequestMatcher(QUEUE_URL, messagesBody, messageAttributes))); verify(msg, times(2)).setJMSDestination(destination); verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_1); verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_2); verify(msg).setSQSMessageId(MESSAGE_ID_1); verify(msg).setSQSMessageId(MESSAGE_ID_2); }
/** * Test sendInternal input with SQSByteMessage */ @Test public void testSendInternalSQSByteMessageFromReceivedMessage() throws JMSException, IOException { /* * Set up non JMS sqs message */ Map<String,MessageAttributeValue> mapMessageAttributes = new HashMap<String, MessageAttributeValue>(); MessageAttributeValue messageAttributeValue = new MessageAttributeValue(); messageAttributeValue.setStringValue(SQSMessage.BYTE_MESSAGE_TYPE); messageAttributeValue.setDataType(SQSMessagingClientConstants.STRING); mapMessageAttributes.put(SQSMessage.JMS_SQS_MESSAGE_TYPE, messageAttributeValue); Map<String, String> mapAttributes = new HashMap<String, String>(); mapAttributes.put(SQSMessagingClientConstants.APPROXIMATE_RECEIVE_COUNT, "1"); byte[] byteArray = new byte[] { 1, 0, 'a', 65 }; String messageBody = Base64.encodeAsString(byteArray); com.amazonaws.services.sqs.model.Message message = new com.amazonaws.services.sqs.model.Message() .withMessageAttributes(mapMessageAttributes) .withAttributes(mapAttributes) .withBody(messageBody); SQSObjectMessage msg = spy(new SQSObjectMessage(acknowledger, QUEUE_URL, message)); Map<String, MessageAttributeValue> messageAttributes = createMessageAttribute("object"); when(amazonSQSClient.sendMessage(any(SendMessageRequest.class))) .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1)) .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_2)); producer.sendInternal(destination, msg); verify(amazonSQSClient).sendMessage(argThat(new sendMessageRequestMatcher(QUEUE_URL, Arrays.asList(messageBody), messageAttributes))); verify(msg).setJMSDestination(destination); verify(msg).setJMSMessageID("ID:" + MESSAGE_ID_1); verify(msg).setSQSMessageId(MESSAGE_ID_1); }
private Map<String, MessageAttributeValue> createMessageAttribute(String type) { MessageAttributeValue messageAttributeValue = new MessageAttributeValue(); messageAttributeValue.setDataType("String"); messageAttributeValue.setStringValue(type); Map<String, MessageAttributeValue> messageAttributes = new HashMap<String, MessageAttributeValue>(); messageAttributes.put(SQSMessage.JMS_SQS_MESSAGE_TYPE, messageAttributeValue); return messageAttributes; }
/** * Test propertyToMessageAttribute with empty messages of different type */ @Test public void testPropertyToMessageAttributeWithEmpty() throws JMSException { /* * Test Empty text message default attribute */ SQSMessage sqsText = new SQSTextMessage(); Map<String, MessageAttributeValue> messageAttributeText = producer.propertyToMessageAttribute(sqsText); assertEquals(0, messageAttributeText.size()); /* * Test Empty object message default attribute */ SQSMessage sqsObject = new SQSObjectMessage(); Map<String, MessageAttributeValue> messageAttributeObject = producer.propertyToMessageAttribute(sqsObject); assertEquals(0, messageAttributeObject.size()); /* * Test Empty byte message default attribute */ SQSMessage sqsByte = new SQSBytesMessage(); Map<String, MessageAttributeValue> messageAttributeByte = producer.propertyToMessageAttribute(sqsByte); assertEquals(0, messageAttributeByte.size()); }
/** * Test sendInternal input with SQSTextMessage */ @Test public void testSendInternalSQSTextMessageFromReceivedMessage() throws JMSException { /* * Set up non JMS sqs message */ Map<String,MessageAttributeValue> mapMessageAttributes = new HashMap<String, MessageAttributeValue>(); MessageAttributeValue messageAttributeValue = new MessageAttributeValue(); messageAttributeValue.setStringValue(SQSMessage.TEXT_MESSAGE_TYPE); messageAttributeValue.setDataType(SQSMessagingClientConstants.STRING); mapMessageAttributes.put(SQSMessage.JMS_SQS_MESSAGE_TYPE, messageAttributeValue); Map<String, String> mapAttributes = new HashMap<String, String>(); mapAttributes.put(SQSMessagingClientConstants.APPROXIMATE_RECEIVE_COUNT, "1"); mapAttributes.put(SQSMessagingClientConstants.MESSAGE_GROUP_ID, GROUP_ID); mapAttributes.put(SQSMessagingClientConstants.MESSAGE_DEDUPLICATION_ID, DEDUP_ID); mapAttributes.put(SQSMessagingClientConstants.SEQUENCE_NUMBER, SEQ_NUMBER); com.amazonaws.services.sqs.model.Message message = new com.amazonaws.services.sqs.model.Message() .withMessageAttributes(mapMessageAttributes) .withAttributes(mapAttributes) .withBody("MessageBody"); SQSTextMessage msg = spy(new SQSTextMessage(acknowledger, QUEUE_URL, message)); when(amazonSQSClient.sendMessage(any(SendMessageRequest.class))) .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID).withSequenceNumber(SEQ_NUMBER_2)); producer.sendInternal(destination, msg); verify(amazonSQSClient).sendMessage(argThat(new sendMessageRequestMatcher(QUEUE_URL, "MessageBody", SQSMessage.TEXT_MESSAGE_TYPE, GROUP_ID, DEDUP_ID))); verify(msg).setJMSDestination(destination); verify(msg).setJMSMessageID("ID:" + MESSAGE_ID); verify(msg).setSQSMessageId(MESSAGE_ID); verify(msg).setSequenceNumber(SEQ_NUMBER_2); }