@Override public MqttDeliveryFuture onDeviceConnect(final String deviceName, final String deviceType) { final int msgId = msgIdSeq.incrementAndGet(); ObjectNode node = newNode().put("device", deviceName); if (!StringUtils.isEmpty(deviceType)) { node.put("type", deviceType); } MqttMessage msg = new MqttMessage(toBytes(node)); msg.setId(msgId); log.info("[{}] Device Connected!", deviceName); devices.putIfAbsent(deviceName, new DeviceInfo(deviceName, deviceType)); return publishAsync(GATEWAY_CONNECT_TOPIC, msg, token -> { log.info("[{}][{}] Device connect event is reported to Thingsboard!", deviceName, msgId); }, error -> log.warn("[{}][{}] Failed to report device connection!", deviceName, msgId, error)); }
@Test public void testMQtt() throws Exception { CountDownLatch latch = new CountDownLatch(1); MqttClient client = new MqttClient("tcp://localhost:" + MQTT_PORT, MqttClient.generateClientId(), new MemoryPersistence()); client.connect(); MqttComponent mqtt = new MqttComponent(); mqtt.client = client; Publisher<byte[]> fromTopic = mqtt.from("input", byte[].class); Subscriber<byte[]> toTopic = mqtt.to("output", byte[].class); Flux.from(fromTopic) .log() .subscribe(toTopic); client.subscribe("output", (topic, message) -> { result = new Integer(new String(message.getPayload())); latch.countDown(); }); client.publish("input", new MqttMessage(new Integer(2).toString().getBytes())); client.publish("input", new MqttMessage(new Integer(2).toString().getBytes())); latch.await(100, TimeUnit.SECONDS); Assert.assertEquals(2, result, 0.1); client.disconnect(); client.close(); }
@Override public MqttDeliveryFuture onDeviceAttributesUpdate(String deviceName, List<KvEntry> attributes) { final int msgId = msgIdSeq.incrementAndGet(); log.trace("[{}][{}] Updating device attributes: {}", deviceName, msgId, attributes); checkDeviceConnected(deviceName); ObjectNode node = newNode(); ObjectNode deviceNode = node.putObject(deviceName); attributes.forEach(kv -> putToNode(deviceNode, kv)); final int packSize = attributes.size(); MqttMessage msg = new MqttMessage(toBytes(node)); msg.setId(msgId); return publishAsync(GATEWAY_ATTRIBUTES_TOPIC, msg, token -> { log.debug("[{}][{}] Device attributes were delivered!", deviceName, msgId); attributesCount.addAndGet(packSize); }, error -> log.warn("[{}][{}] Failed to report device attributes!", deviceName, msgId, error)); }
@Override public MqttDeliveryFuture onDeviceTelemetry(String deviceName, List<TsKvEntry> telemetry) { final int msgId = msgIdSeq.incrementAndGet(); log.trace("[{}][{}] Updating device telemetry: {}", deviceName, msgId, telemetry); checkDeviceConnected(deviceName); ObjectNode node = newNode(); Map<Long, List<TsKvEntry>> tsMap = telemetry.stream().collect(Collectors.groupingBy(v -> v.getTs())); ArrayNode deviceNode = node.putArray(deviceName); tsMap.entrySet().forEach(kv -> { Long ts = kv.getKey(); ObjectNode tsNode = deviceNode.addObject(); tsNode.put("ts", ts); ObjectNode valuesNode = tsNode.putObject("values"); kv.getValue().forEach(v -> putToNode(valuesNode, v)); }); final int packSize = telemetry.size(); MqttMessage msg = new MqttMessage(toBytes(node)); msg.setId(msgId); return publishAsync(GATEWAY_TELEMETRY_TOPIC, msg, token -> { log.debug("[{}][{}] Device telemetry published to Thingsboard!", msgId, deviceName); telemetryCount.addAndGet(packSize); }, error -> log.warn("[{}][{}] Failed to publish device telemetry!", deviceName, msgId, error)); }
@Override public void onDeviceAttributeRequest(AttributeRequest request, Consumer<AttributeResponse> listener) { final int msgId = msgIdSeq.incrementAndGet(); String deviceName = request.getDeviceName(); AttributeRequestKey requestKey = new AttributeRequestKey(request.getRequestId(), request.getDeviceName()); log.trace("[{}][{}] Requesting {} attribute: {}", deviceName, msgId, request.isClientScope() ? "client" : "shared", request.getAttributeKey()); checkDeviceConnected(deviceName); ObjectNode node = newNode(); node.put("id", request.getRequestId()); node.put("client", request.isClientScope()); node.put("device", request.getDeviceName()); node.put("key", request.getAttributeKey()); MqttMessage msg = new MqttMessage(toBytes(node)); msg.setId(msgId); pendingAttrRequestsMap.put(requestKey, new AttributeRequestListener(request, listener)); publishAsync(GATEWAY_REQUESTS_ATTRIBUTES_TOPIC, msg, token -> { log.debug("[{}][{}] Device attributes request was delivered!", deviceName, msgId); }, error -> { log.warn("[{}][{}] Failed to report device attributes!", deviceName, msgId, error); pendingAttrRequestsMap.remove(requestKey); }); }
@Override public void onDeviceRpcResponse(RpcCommandResponse response) { final int msgId = msgIdSeq.incrementAndGet(); int requestId = response.getRequestId(); String deviceName = response.getDeviceName(); String data = response.getData(); checkDeviceConnected(deviceName); ObjectNode node = newNode(); node.put("id", requestId); node.put("device", deviceName); node.put("data", data); MqttMessage msg = new MqttMessage(toBytes(node)); msg.setId(msgId); publishAsync(GATEWAY_RPC_TOPIC, msg, token -> { log.debug("[{}][{}] RPC response from device was delivered!", deviceName, requestId); }, error -> { log.warn("[{}][{}] Failed to report RPC response from device!", deviceName, requestId, error); }); }
private void reportStats() { if (tbClient == null) { log.info("Can't report stats because client was not initialized yet!"); return; } ObjectNode node = newNode(); node.put("ts", System.currentTimeMillis()); ObjectNode valuesNode = node.putObject("values"); valuesNode.put("devicesOnline", devices.size()); valuesNode.put("attributesUploaded", attributesCount.getAndSet(0)); valuesNode.put("telemetryUploaded", telemetryCount.getAndSet(0)); if (error != null) { valuesNode.put("latestError", JsonTools.toString(error)); error = null; } MqttMessage msg = new MqttMessage(toBytes(node)); msg.setId(msgIdSeq.incrementAndGet()); publishAsync(DEVICE_TELEMETRY_TOPIC, msg, token -> log.info("Gateway statistics {} reported!", node), error -> log.warn("Failed to report gateway statistics!", error)); }
private void onAttributesUpdate(MqttMessage message) { JsonNode payload = fromString(new String(message.getPayload(), StandardCharsets.UTF_8)); String deviceName = payload.get("device").asText(); Set<AttributesUpdateListener> listeners = attributeUpdateSubs.stream() .filter(sub -> sub.matches(deviceName)).map(sub -> sub.getListener()) .collect(Collectors.toSet()); if (!listeners.isEmpty()) { JsonNode data = payload.get("data"); List<KvEntry> attributes = getKvEntries(data); listeners.forEach(listener -> callbackExecutor.submit(() -> { try { listener.onAttributesUpdated(deviceName, attributes); } catch (Exception e) { log.error("[{}] Failed to process attributes update", deviceName, e); } })); } }
private void onRpcCommand(MqttMessage message) { JsonNode payload = fromString(new String(message.getPayload(), StandardCharsets.UTF_8)); String deviceName = payload.get("device").asText(); Set<RpcCommandListener> listeners = rpcCommandSubs.stream() .filter(sub -> sub.matches(deviceName)).map(RpcCommandSubscription::getListener) .collect(Collectors.toSet()); if (!listeners.isEmpty()) { JsonNode data = payload.get("data"); RpcCommandData rpcCommand = new RpcCommandData(); rpcCommand.setRequestId(data.get("id").asInt()); rpcCommand.setMethod(data.get("method").asText()); rpcCommand.setParams(JsonTools.toString(data.get("params"))); listeners.forEach(listener -> callbackExecutor.submit(() -> { try { listener.onRpcCommand(deviceName, rpcCommand); } catch (Exception e) { log.error("[{}][{}] Failed to process rpc command", deviceName, rpcCommand.getRequestId(), e); } })); } else { log.warn("No listener registered for RPC command to device {}!", deviceName); } }
public synchronized boolean sendMessage(byte[] messagePayload) { logger.info("sending message"); if (client != null) { try { MqttMessage message = new MqttMessage(messagePayload); message.setQos(qos); message.setRetained(false); client.publish(topic, message); return true; } catch (Exception e) { logger.error("Failed to send outbound message to topic: " + topic + " - unexpected issue: " + new String(messagePayload)); e.printStackTrace(); } } else { logger.info("client is null"); } return false; }
public boolean sendMessage(byte[] messagePayload) { if (client != null) { try { MqttMessage message = new MqttMessage(messagePayload); message.setQos(qos); message.setRetained(false); client.publish(topic, message); return true; } catch (Exception e) { logger.error("Failed to send outbound message to topic: " + topic + " - unexpected issue: " + new String(messagePayload)); e.printStackTrace(); } } return false; }
public AttributeRequest convert(String topic, MqttMessage msg) { deviceNameTopicPattern = checkAndCompile(deviceNameTopicPattern, deviceNameTopicExpression); attributeKeyTopicPattern = checkAndCompile(attributeKeyTopicPattern, attributeKeyTopicExpression); requestIdTopicPattern = checkAndCompile(requestIdTopicPattern, requestIdTopicExpression); String data = new String(msg.getPayload(), StandardCharsets.UTF_8); DocumentContext document = JsonPath.parse(data); AttributeRequest.AttributeRequestBuilder builder = AttributeRequest.builder(); builder.deviceName(eval(topic, deviceNameTopicPattern, document, deviceNameJsonExpression)); builder.attributeKey(eval(topic, attributeKeyTopicPattern, document, attributeKeyJsonExpression)); builder.requestId(Integer.parseInt(eval(topic, requestIdTopicPattern, document, requestIdJsonExpression))); builder.clientScope(this.isClientScope()); builder.topicExpression(this.getResponseTopicExpression()); builder.valueExpression(this.getValueExpression()); return builder.build(); }
@Override public void messageArrived(String topic, MqttMessage value) throws Exception { logger.debug(topic+ " "+value.toString()); String node = "iot:"+topic.replace('/', '_'); String temperature = value.toString(); Bindings bindings = new Bindings(); bindings.addBinding("node", new RDFTermURI(node)); bindings.addBinding("value", new RDFTermLiteral(temperature)); adapter.update(bindings); if (debugHash.containsKey(node)) { if (!debugHash.get(node).equals(temperature)) { logger.debug(topic+ " "+debugHash.get(node)+"-->"+temperature.toString()); } } debugHash.put(node, temperature); }
@Override public void messageArrived(String topic, MqttMessage message) throws Exception { messages.add("[RECIVED at "+ System.currentTimeMillis() + "] -- "+ clientID + " -- TOPIC: " + topic + " NEW MESSAGE ARRIVED: "+ message); if(logLevel.toLowerCase().equals("info")){ //Console Log System.out.println("--"+ clientID + "-- TOPIC: " + topic + " NEW MESSAGE ARRIVED "); //File Log log.info("--"+ clientID + "-- TOPIC: " + topic + " NEW MESSAGE ARRIVED "); }else if(logLevel.toLowerCase().equals("debug")){ //Console Log System.out.println("--"+ clientID + "-- TOPIC: " + topic + " NEW MESSAGE ARRIVED: "+ message); //File Log log.info("--"+ clientID + "-- TOPIC: " + topic + " NEW MESSAGE ARRIVED: "+ message); } }
private void publishMessage(String uuid, String major, String minor, String topic) { if (mqttAndroidClient != null) { try { JSONObject jsonObject = new JSONObject(); jsonObject.put("uuid", uuid); jsonObject.put("major", major); jsonObject.put("minor", minor); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(jsonObject.toString().getBytes()); mqttAndroidClient.publish(topic, mqttMessage); boolean logEvent = defaultSharedPreferences.getBoolean(GENEARL_LOG_KEY, false); if (logEvent) { String logMessage = context.getString(R.string.published_mqtt_message_to_topic, mqttMessage, topic); logPersistence.saveNewLog(logMessage, ""); } } catch (JSONException e) { logPersistence.saveNewLog(context.getString(R.string.error_publishing_on_topic, topic), ""); Log.e(TAG, context.getString(R.string.error_publishing_on_topic, topic), e); } } else { Log.i(TAG, context.getString(R.string.publish_failed_not_set_up)); } }
protected void mqttCallback() { client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { msg("Connection lost..."); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { TextView tvMessage = (TextView) findViewById(R.id.tvMessage); tvMessage.setText(message.toString()); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }); }
/** * convert received mqtt message and write it to item * * @param hive * @param session * @param itemId * @param message * @throws InvalidSessionException * @throws PermissionDeniedException * @throws InvalidItemException * @throws MqttException */ private void writeMessageToItem ( final Hive hive, final Session session, final String itemId, final MqttMessage message ) throws InvalidSessionException, PermissionDeniedException, InvalidItemException, MqttException { final DataItemValue div = messageToValue ( itemId, message ); if ( div != null ) { if ( div.getValue () != null ) { hive.startWrite ( session, itemId, div.getValue (), null, null ); } if ( div.getAttributes () != null && !div.getAttributes ().isEmpty () ) { hive.startWriteAttributes ( session, itemId, div.getAttributes (), null, null ); } } }
/** * @param itemId * @param message * from MQTT topic * @return converted value * @throws MqttException */ private DataItemValue messageToValue ( final String itemId, final MqttMessage message ) throws MqttException { final DataItemValue div; try { div = gson.fromJson ( new String ( message.getPayload (), "UTF-8" ), DataItemValue.class ); if ( message.isRetained () || message.isDuplicate () ) { logger.info ( "message is retained/duplicate, will not write" ); return null; } return div; } catch ( JsonSyntaxException | UnsupportedEncodingException e1 ) { logger.warn ( "could not parse message {}", message ); return null; } }
@Override public MqttDeliveryFuture onDeviceAttributesUpdate(String deviceName, List<KvEntry> attributes) { final int msgId = msgIdSeq.incrementAndGet(); log.trace("[{}][{}] Updating device attributes: {}", deviceName, msgId, attributes); checkDeviceConnected(deviceName); ObjectNode node = newNode(); ObjectNode deviceNode = node.putObject(deviceName); attributes.forEach(kv -> JsonTools.putToNode(deviceNode, kv)); final int packSize = attributes.size(); MqttMessage msg = new MqttMessage(toBytes(node)); msg.setId(msgId); return publishAsync(GATEWAY_ATTRIBUTES_TOPIC, msg, token -> { log.debug("[{}][{}] Device attributes were delivered!", deviceName, msgId); attributesCount.addAndGet(packSize); }, error -> log.warn("[{}][{}] Failed to report device attributes!", deviceName, msgId, error)); }
@Override public MqttDeliveryFuture onDeviceTelemetry(String deviceName, List<TsKvEntry> telemetry) { final int msgId = msgIdSeq.incrementAndGet(); log.trace("[{}][{}] Updating device telemetry: {}", deviceName, msgId, telemetry); checkDeviceConnected(deviceName); ObjectNode node = newNode(); Map<Long, List<TsKvEntry>> tsMap = telemetry.stream().collect(Collectors.groupingBy(v -> v.getTs())); ArrayNode deviceNode = node.putArray(deviceName); tsMap.entrySet().forEach(kv -> { Long ts = kv.getKey(); ObjectNode tsNode = deviceNode.addObject(); tsNode.put("ts", ts); ObjectNode valuesNode = tsNode.putObject("values"); kv.getValue().forEach(v -> JsonTools.putToNode(valuesNode, v)); }); final int packSize = telemetry.size(); MqttMessage msg = new MqttMessage(toBytes(node)); msg.setId(msgId); return publishAsync(GATEWAY_TELEMETRY_TOPIC, msg, token -> { log.debug("[{}][{}] Device telemetry published to IoT Platform!", msgId, deviceName); telemetryCount.addAndGet(packSize); }, error -> log.warn("[{}][{}] Failed to publish device telemetry!", deviceName, msgId, error)); }
private void onAttributesUpdate(MqttMessage message) { JsonNode payload = fromString(new String(message.getPayload(), StandardCharsets.UTF_8)); String deviceName = payload.get("device").asText(); Set<AttributesUpdateListener> listeners = attributeUpdateSubs.stream().filter(sub -> sub.matches(deviceName)) .map(sub -> sub.getListener()).collect(Collectors.toSet()); if (!listeners.isEmpty()) { JsonNode data = payload.get("data"); List<KvEntry> attributes = getKvEntries(data); listeners.forEach(listener -> callbackExecutor.submit(() -> { try { listener.onAttributesUpdated(deviceName, attributes); } catch (Exception e) { log.error("[{}] Failed to process attributes update", deviceName, e); } })); } }
private void onRpcCommand(MqttMessage message) { JsonNode payload = fromString(new String(message.getPayload(), StandardCharsets.UTF_8)); String deviceName = payload.get("device").asText(); Set<RpcCommandListener> listeners = rpcCommandSubs.stream().filter(sub -> sub.matches(deviceName)) .map(sub -> sub.getListener()).collect(Collectors.toSet()); if (!listeners.isEmpty()) { JsonNode data = payload.get("data"); RpcCommandData rpcCommand = new RpcCommandData(); rpcCommand.setRequestId(data.get("id").asInt()); rpcCommand.setMethod(data.get("method").asText()); rpcCommand.setParams(JsonTools.toString(data.get("params"))); listeners.forEach(listener -> callbackExecutor.submit(() -> { try { listener.onRpcCommand(deviceName, rpcCommand); } catch (Exception e) { log.error("[{}][{}] Failed to process rpc command", deviceName, rpcCommand.getRequestId(), e); } })); } else { log.warn("No listener registered for RPC command to device {}!", deviceName); } }
private void onPublish(final String requestId, final String topic, final byte[] payload) { if (!clientIsConnected()) { broadcastException(BROADCAST_EXCEPTION, requestId, new Exception("Can't publish to topic: " + topic + ", client not connected!")); return; } try { MQTTServiceLogger.debug("onPublish", "Publishing to topic: " + topic + ", payload with size " + payload.length); MqttMessage message = new MqttMessage(payload); message.setQos(0); mClient.publish(topic, message); MQTTServiceLogger.debug("onPublish", "Successfully published to topic: " + topic + ", payload: " + payload); broadcast(BROADCAST_PUBLISH_SUCCESS, requestId, PARAM_TOPIC, topic ); } catch (Exception exc) { broadcastException(BROADCAST_EXCEPTION, requestId, new MqttException(exc)); } }
@Override public void messageArrived(String topic, MqttMessage message) throws Exception { String body = new String(message.getPayload()); log.info("Topic: {}, Message: {}", topic, body); try { String eventName = topic.substring(topic.lastIndexOf('/') + 1); Class<?> cls = epService.getEPAdministrator() .getConfiguration() .getEventType(eventName) .getUnderlyingType(); Object event = mapper.readValue(body, cls); epService.getEPRuntime().sendEvent(event); } catch (Exception e) { log.error("Can not resolve Mqtt message", e); } }
public boolean sendMessage(byte[] messagePayload) { // connectClient(); if (client != null) { try { MqttMessage message = new MqttMessage(messagePayload); message.setQos(OUTGOING_MQTT_QOS); message.setRetained(false); client.publish(topic, message); return true; } catch (Exception e) { logger.error( "Failed to send outbound message (unexpected issue): " + new String(messagePayload)); logger.error(e.getLocalizedMessage()); e.printStackTrace(); } } return false; }
@Override public void messageArrived(String topic, MqttMessage message) throws Exception { String payload = new String(message.getPayload()); System.out.println("Red'c command: " + payload); JsonObject jsonObject = parser.parse(payload).getAsJsonObject(); String cmd = extractCommandData(jsonObject, CMD_KEY); switch (cmd) { case "ping": sendResponse(pingResponse(jsonObject)); break; case "randnum": sendResponse(randResponse(jsonObject)); break; default: sendResponse(payload); } msgRecd = true; }
/** * Overridden from the {@link MqttCallbackExtended#messageArrived(String, MqttMessage)} method. * <p> * If the {@code inboundMessageChannel} is not null, the received message is sent to this * {@link MessageChannel}. */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { try { if (inboundMessageChannel != null) { inboundMessageChannel.send(MessageBuilder.withPayload(message.getPayload().clone()) .setHeader(MqttHeaderHelper.TOPIC, topic) .setHeader(MqttHeaderHelper.ID, message.getId()) .setHeader(MqttHeaderHelper.QOS, MqttQualityOfService.findByLevelIdentifier(message.getQos())) .setHeader(MqttHeaderHelper.RETAINED, message.isRetained()) .setHeader(MqttHeaderHelper.DUPLICATE, message.isDuplicate()).build()); } } catch (Exception | Error ex) { LOG.error(String.format( "Client ID %s could not send the message to the Inbound Channel. Topic: %s, Message: %s", getClientId(), topic, message.toString()), ex); } }
@Override public void messageArrived(String topic, MqttMessage message) throws Exception { log.trace("Message arrived [{}] {}", topic, message.getId()); if (topic.equals(GATEWAY_ATTRIBUTES_TOPIC)) { onAttributesUpdate(message); } else if (topic.equals(GATEWAY_RESPONSES_ATTRIBUTES_TOPIC)) { onDeviceAttributesResponse(message); } else if (topic.equals(GATEWAY_RPC_TOPIC)) { onRpcCommand(message); } }
private void onAttributeResponse(AttributeResponse response) { if (response.getData().isPresent()) { KvEntry attribute = response.getData().get(); String topic = replace(response.getTopicExpression(), Integer.toString(response.getRequestId()), response.getDeviceName(), attribute); String body = replace(response.getValueExpression(), Integer.toString(response.getRequestId()), response.getDeviceName(), attribute); publish(response.getDeviceName(), topic, new MqttMessage(body.getBytes(StandardCharsets.UTF_8))); } else { log.warn("[{}] {} attribute [{}] not found", response.getDeviceName(), response.isClientScope() ? "Client" : "Shared", response.getKey()); } }
@Override public void onRpcCommand(String deviceName, RpcCommandData command) { int requestId = command.getRequestId(); List<ServerSideRpcMapping> mappings = configuration.getServerSideRpc().stream() .filter(mapping -> deviceName.matches(mapping.getDeviceNameFilter())) .filter(mapping -> command.getMethod().matches(mapping.getMethodFilter())).collect(Collectors.toList()); mappings.forEach(mapping -> { String requestTopic = replace(mapping.getRequestTopicExpression(), deviceName, command); String body = replace(mapping.getValueExpression(), deviceName, command); boolean oneway = StringUtils.isEmpty(mapping.getResponseTopicExpression()); if (oneway) { publish(deviceName, requestTopic, new MqttMessage(body.getBytes(StandardCharsets.UTF_8))); } else { String responseTopic = replace(mapping.getResponseTopicExpression(), deviceName, command); try { log.info("[{}] Temporary subscribe to RPC response topic [{}]", deviceName, responseTopic); client.subscribe(responseTopic, 1, new MqttRpcResponseMessageListener(requestId, deviceName, this::onRpcCommandResponse) ).waitForCompletion(); scheduler.schedule(() -> { unsubscribe(deviceName, requestId, responseTopic); }, mapping.getResponseTimeout(), TimeUnit.MILLISECONDS); publish(deviceName, requestTopic, new MqttMessage(body.getBytes(StandardCharsets.UTF_8))); } catch (MqttException e) { log.warn("[{}] Failed to subscribe to response topic and push RPC command [{}]", deviceName, requestId, e); } } }); }
@Override public void messageArrived(String topic, MqttMessage message) throws Exception { try { consumer.accept(converter.convert(topic, message)); } catch (Exception e) { log.info("[{}] Failed to decode message: {}", topic, Arrays.toString(message.getPayload()), e); } }
@Override public void messageArrived(String topic, MqttMessage msg) throws Exception { RpcCommandResponse response = new RpcCommandResponse(); response.setRequestId(requestId); response.setDeviceName(deviceName); response.setData(new String(msg.getPayload(), StandardCharsets.UTF_8)); consumer.accept(topic, response); }
@Override public List<DeviceData> convert(String topic, MqttMessage msg) throws Exception { String data = new String(msg.getPayload(), StandardCharsets.UTF_8); log.trace("Parsing json message: {}", data); if (!filterExpression.isEmpty()) { try { log.debug("Data before filtering {}", data); DocumentContext document = JsonPath.parse(data); document = JsonPath.parse((Object) document.read(filterExpression)); data = document.jsonString(); log.debug("Data after filtering {}", data); } catch (RuntimeException e) { log.debug("Failed to apply filter expression: {}", filterExpression); throw new RuntimeException("Failed to apply filter expression " + filterExpression); } } JsonNode node = mapper.readTree(data); List<String> srcList; if (node.isArray()) { srcList = new ArrayList<>(node.size()); for (int i = 0; i < node.size(); i++) { srcList.add(mapper.writeValueAsString(node.get(i))); } } else { srcList = Collections.singletonList(data); } return parse(topic, srcList); }