/** * Strategy to test if the given header is valid. Without this, the * com.rabbitmq.client.impl.Frame.java class will throw an * IllegalArgumentException (invalid value in table) and close the * connection. * * @param headerValue the header value * @return the value to use, <tt>null</tt> to ignore this header * @see com.rabbitmq.client.impl.Frame#fieldValueSize */ private Object getValidRabbitMQHeaderValue(Object headerValue) { if (headerValue instanceof String) { return headerValue; } else if (headerValue instanceof Number) { return headerValue; } else if (headerValue instanceof Boolean) { return headerValue; } else if (headerValue instanceof Date) { return headerValue; } else if (headerValue instanceof byte[]) { return headerValue; } else if (headerValue instanceof LongString) { return headerValue; } return null; }
static Map<String, Struct> headers(BasicProperties basicProperties) { Map<String, Object> input = basicProperties.getHeaders(); Map<String, Struct> results = new LinkedHashMap<>(); if (null != input) { for (Map.Entry<String, Object> kvp : input.entrySet()) { log.trace("headers() - key = '{}' value= '{}'", kvp.getKey(), kvp.getValue()); final String field; final Object headerValue; if (kvp.getValue() instanceof LongString) { headerValue = kvp.getValue().toString(); } else { headerValue = kvp.getValue(); } if (!FIELD_LOOKUP.containsKey(headerValue.getClass())) { throw new DataException( String.format("Could not determine the type for field '%s' type '%s'", kvp.getKey(), headerValue.getClass().getName()) ); } else { field = FIELD_LOOKUP.get(headerValue.getClass()); } log.trace("headers() - Storing value for header in field = '{}' as {}", field, field); Struct value = new Struct(SCHEMA_HEADER_VALUE) .put("type", field) .put(field, headerValue); results.put(kvp.getKey(), value); } } return results; }
private void populateMessageHeadersFromRabbitMQHeaders(final Message message, final AMQP.BasicProperties properties) { Map<String, Object> headers = properties.getHeaders(); if (headers != null) { for (Map.Entry<String, Object> entry : headers.entrySet()) { // Convert LongStrings to String. if (entry.getValue() instanceof LongString) { message.setHeader(entry.getKey(), entry.getValue().toString()); } else { message.setHeader(entry.getKey(), entry.getValue()); } } } }
public static Map getTransportHeaders(RabbitMQMessage message) { Map<String, String> map = new HashMap<String, String>(); // correlation ID if (message.getCorrelationId() != null) { map.put(RabbitMQConstants.CORRELATION_ID, message.getCorrelationId()); } // if a AMQP message ID is found if (message.getMessageId() != null) { map.put(RabbitMQConstants.MESSAGE_ID, message.getMessageId()); } // replyto destination name if (message.getReplyTo() != null) { String dest = message.getReplyTo(); map.put(RabbitMQConstants.RABBITMQ_REPLY_TO, dest); } // any other transport properties / headers Map<String, Object> headers = message.getHeaders(); if (headers != null && !headers.isEmpty()) { for (String headerName : headers.keySet()) { String value = new String(((LongString)(headers.get(headerName))).getBytes()); map.put(headerName, value); } } return map; }
@Test public void parseMessageHeaderIntoBackend() throws Exception { final Map<String, LongString> rabbitTraceeHeader = new HashMap<>(); rabbitTraceeHeader.put(INVOCATION_ID_KEY, new TestLongString("Crazy ID")); unit.toMessageProperties(createRabbitHeaderWith(TPIC_HEADER, rabbitTraceeHeader), mock(Envelope.class), CHARSET_UTF8); assertThat(backend.copyToMap(), hasEntry(INVOCATION_ID_KEY, "Crazy ID")); }
/** * Fixes a map so that we have valid data present * * @param m * * @return */ @SuppressWarnings("unchecked") public static Map<String, Object> fixAMQPTable( Map<String, Object> m ) { if( m == null ) { return Collections.emptyMap(); } if( !m.isEmpty() ) { Iterator<Map.Entry<String, Object>> it = m.entrySet().iterator(); while( it.hasNext() ) { Map.Entry<String, Object> e = it.next(); Object value = e.getValue(); // Convert certain types to supported ones otherwise if( value instanceof LongString ) { e.setValue( value.toString() ); } else if( value instanceof BigInteger ) { // wrap in a decimal e.setValue( new BigDecimal( (BigInteger) value ) ); } else if( value instanceof Map ) { // Recurse e.setValue( fixAMQPTable( (Map<String, Object>) value ) ); } else if( value instanceof Collection ) { e.setValue( CollectionUtils.unmodifiableList( (Collection) value ) ); } else if( !(value == null || value instanceof String || value instanceof Integer || value instanceof BigDecimal || value instanceof Date || value instanceof Byte || value instanceof Double || value instanceof Float || value instanceof Long || value instanceof Short || value instanceof Boolean || value instanceof byte[] || value instanceof List || value instanceof Object[]) ) { // Not one of the remaining valid types then remove it it.remove(); } } } return m; }