@Override public Object before(Exchange exchange) throws Exception { if (backlogTracer.shouldTrace(processorDefinition, exchange)) { Date timestamp = new Date(); String toNode = processorDefinition.getId(); String exchangeId = exchange.getExchangeId(); String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(), true, 4, backlogTracer.isBodyIncludeStreams(), backlogTracer.isBodyIncludeFiles(), backlogTracer.getBodyMaxChars()); // if first we should add a pseudo trace message as well, so we have a starting message (eg from the route) String routeId = routeDefinition != null ? routeDefinition.getId() : null; if (first) { Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, timestamp, Date.class); DefaultBacklogTracerEventMessage pseudo = new DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), created, routeId, null, exchangeId, messageAsXml); backlogTracer.traceEvent(pseudo); } DefaultBacklogTracerEventMessage event = new DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), timestamp, routeId, toNode, exchangeId, messageAsXml); backlogTracer.traceEvent(event); } return null; }
protected void prepareExchangeForContinue(Exchange exchange, RedeliveryData data, boolean isDeadLetterChannel) { Exception caught = exchange.getException(); // we continue so clear any exceptions exchange.setException(null); // clear rollback flags exchange.setProperty(Exchange.ROLLBACK_ONLY, null); // reset cached streams so they can be read again MessageHelper.resetStreamCache(exchange.getIn()); // its continued then remove traces of redelivery attempted and caught exception exchange.getIn().removeHeader(Exchange.REDELIVERED); exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER); exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER); exchange.removeProperty(Exchange.FAILURE_HANDLED); // keep the Exchange.EXCEPTION_CAUGHT as property so end user knows the caused exception // create log message String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange); msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught; msg = msg + ". Handled and continue routing."; // log that we failed but want to continue logFailedDelivery(false, false, false, true, isDeadLetterChannel, exchange, msg, data, null); }
public void traceExchange(ProcessorDefinition<?> node, Processor target, TraceInterceptor traceInterceptor, Exchange exchange) throws Exception { if (notificationSender != null && tracer.isJmxTraceNotifications()) { String body = MessageHelper.extractBodyForLogging(exchange.getIn(), "", false, true, tracer.getTraceBodySize()); if (body == null) { body = ""; } String message = body.substring(0, Math.min(body.length(), MAX_MESSAGE_LENGTH)); Map<String, Object> tm = createTraceMessage(node, exchange, body); Notification notification = new Notification("TraceNotification", exchange.toString(), num.getAndIncrement(), System.currentTimeMillis(), message); notification.setUserData(tm); notificationSender.sendNotification(notification); } }
@Override public String browseMessageAsXml(Integer index, Boolean includeBody) { List<Exchange> exchanges = getEndpoint().getExchanges(); if (index >= exchanges.size()) { return null; } Exchange exchange = exchanges.get(index); if (exchange == null) { return null; } Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); String xml = MessageHelper.dumpAsXml(msg, includeBody); return xml; }
@Override public Collection<KeyValueAnnotation> requestAnnotations() { KeyValueAnnotation key1 = KeyValueAnnotation.create("camel.client.endpoint.url", url); KeyValueAnnotation key2 = KeyValueAnnotation.create("camel.client.exchange.id", exchange.getExchangeId()); KeyValueAnnotation key3 = KeyValueAnnotation.create("camel.client.exchange.pattern", exchange.getPattern().name()); KeyValueAnnotation key4 = null; if (eventNotifier.isIncludeMessageBody() || eventNotifier.isIncludeMessageBodyStreams()) { boolean streams = eventNotifier.isIncludeMessageBodyStreams(); StreamCache cache = prepareBodyForLogging(exchange, streams); String body = MessageHelper.extractBodyForLogging(exchange.hasOut() ? exchange.getOut() : exchange.getIn(), "", streams, streams); key4 = KeyValueAnnotation.create("camel.client.exchange.message.request.body", body); if (cache != null) { cache.reset(); } } List<KeyValueAnnotation> list = new ArrayList<>(); list.add(key1); list.add(key2); list.add(key3); if (key4 != null) { list.add(key4); } return list; }
@Override public Collection<KeyValueAnnotation> requestAnnotations() { KeyValueAnnotation key1 = KeyValueAnnotation.create("camel.server.endpoint.url", url); KeyValueAnnotation key2 = KeyValueAnnotation.create("camel.server.exchange.id", exchange.getExchangeId()); KeyValueAnnotation key3 = KeyValueAnnotation.create("camel.server.exchange.pattern", exchange.getPattern().name()); KeyValueAnnotation key4 = null; if (eventNotifier.isIncludeMessageBody() || eventNotifier.isIncludeMessageBodyStreams()) { boolean streams = eventNotifier.isIncludeMessageBodyStreams(); StreamCache cache = prepareBodyForLogging(exchange, streams); String body = MessageHelper.extractBodyForLogging(exchange.hasOut() ? exchange.getOut() : exchange.getIn(), "", streams, streams); key4 = KeyValueAnnotation.create("camel.server.exchange.message.request.body", body); if (cache != null) { cache.reset(); } } List<KeyValueAnnotation> list = new ArrayList<>(); list.add(key1); list.add(key2); list.add(key3); if (key4 != null) { list.add(key4); } return list; }
protected void sendMessageWithContentType(String charset, boolean usingGZip) { Endpoint endpoint = context.getEndpoint("http://localhost:{{port}}/myapp/myservice"); Exchange exchange = endpoint.createExchange(); exchange.getIn().setBody("<order>123</order>"); exchange.getIn().setHeader("User", "Claus"); exchange.getIn().setHeader("SOAPAction", "test"); if (charset == null) { exchange.getIn().setHeader("Content-Type", "text/xml"); } else { exchange.getIn().setHeader("Content-Type", "text/xml; charset=" + charset); } if (usingGZip) { exchange.getIn().setHeader(Exchange.CONTENT_ENCODING, "gzip"); } template.send(endpoint, exchange); String body = exchange.getOut().getBody(String.class); assertEquals("<order>OK</order>", body); assertEquals("Get a wrong content-type ", MessageHelper.getContentType(exchange.getOut()), "text/xml"); }
@Override public void apply(IgniteFuture<Object> future) { Message in = exchange.getIn(); Message out = exchange.getOut(); MessageHelper.copyHeaders(in, out, true); Object result = null; try { result = future.get(); } catch (Exception e) { exchange.setException(e); callback.done(false); return; } exchange.getOut().setBody(result); callback.done(false); }
@Override public boolean process(Exchange exchange, AsyncCallback callback) { Message in = exchange.getIn(); Message out = exchange.getOut(); MessageHelper.copyHeaders(exchange.getIn(), out, true); Object body = in.getBody(); if (endpoint.getSendMode() == IgniteMessagingSendMode.UNORDERED) { if (body instanceof Collection<?> && !endpoint.isTreatCollectionsAsCacheObjects()) { messaging.send(topicFor(exchange), (Collection<?>) body); } else { messaging.send(topicFor(exchange), body); } } else { messaging.sendOrdered(topicFor(exchange), body, endpoint.getTimeout()); } IgniteHelper.maybePropagateIncomingBody(endpoint, in, out); return true; }
protected void populateResponse(Exchange exchange, JettyContentExchange httpExchange, Message in, HeaderFilterStrategy strategy, int responseCode) throws IOException { Message answer = exchange.getOut(); answer.setHeader(Exchange.HTTP_RESPONSE_CODE, responseCode); // must use response fields to get the http headers as // httpExchange.getHeaders() does not work well with multi valued headers Map<String, Collection<String>> headers = httpExchange.getResponseHeaders(); for (Map.Entry<String, Collection<String>> ent : headers.entrySet()) { String name = ent.getKey(); Collection<String> values = ent.getValue(); for (String value : values) { if (name.toLowerCase().equals("content-type")) { name = Exchange.CONTENT_TYPE; exchange.setProperty(Exchange.CHARSET_NAME, IOHelper.getCharsetNameFromContentType(value)); } if (strategy != null && !strategy.applyFilterToExternalHeaders(name, value, exchange)) { HttpHelper.appendHeader(answer.getHeaders(), name, value); } } } // preserve headers from in by copying any non existing headers // to avoid overriding existing headers with old values MessageHelper.copyHeaders(exchange.getIn(), answer, false); // extract body after headers has been set as we want to ensure content-type from Jetty HttpExchange // has been populated first answer.setBody(extractResponseBody(exchange, httpExchange)); }
/** * Creates a {@link DefaultTraceEventMessage} based on the given node it was traced while processing * the current {@link Exchange} * * @param toNode the node where this trace is intercepted * @param exchange the current {@link Exchange} */ public DefaultTraceEventMessage(final Date timestamp, final ProcessorDefinition<?> toNode, final Exchange exchange) { this.tracedExchange = exchange; Message in = exchange.getIn(); // need to use defensive copies to avoid Exchange altering after the point of interception this.timestamp = timestamp; this.fromEndpointUri = exchange.getFromEndpoint() != null ? exchange.getFromEndpoint().getEndpointUri() : null; this.previousNode = extractFromNode(exchange); this.toNode = extractToNode(exchange); this.exchangeId = exchange.getExchangeId(); this.routeId = exchange.getFromRouteId(); this.shortExchangeId = extractShortExchangeId(exchange); this.exchangePattern = exchange.getPattern().toString(); this.properties = exchange.getProperties().isEmpty() ? null : exchange.getProperties().toString(); this.headers = in.getHeaders().isEmpty() ? null : in.getHeaders().toString(); // We should not turn the message body into String this.body = MessageHelper.extractBodyForLogging(in, ""); this.bodyType = MessageHelper.getBodyTypeName(in); if (exchange.hasOut()) { Message out = exchange.getOut(); this.outHeaders = out.getHeaders().isEmpty() ? null : out.getHeaders().toString(); this.outBody = MessageHelper.extractBodyAsString(out); this.outBodyType = MessageHelper.getBodyTypeName(out); } this.causedByException = extractCausedByException(exchange); }
@Override public void beforeProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition) { // store a copy of the message so we can see that from the debugger Date timestamp = new Date(); String toNode = definition.getId(); String routeId = ProcessorDefinitionHelper.getRouteId(definition); String exchangeId = exchange.getExchangeId(); String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(), true, 2, isBodyIncludeStreams(), isBodyIncludeFiles(), getBodyMaxChars()); long uid = debugCounter.incrementAndGet(); BacklogTracerEventMessage msg = new DefaultBacklogTracerEventMessage(uid, timestamp, routeId, toNode, exchangeId, messageAsXml); suspendedBreakpointMessages.put(nodeId, msg); // suspend at this breakpoint final SuspendedExchange se = suspendedBreakpoints.get(nodeId); if (se != null) { // now wait until we should continue logger.log("NodeBreakpoint at node " + toNode + " is waiting to continue for exchangeId: " + exchange.getExchangeId()); try { boolean hit = se.getLatch().await(fallbackTimeout, TimeUnit.SECONDS); if (!hit) { logger.log("NodeBreakpoint at node " + toNode + " timed out and is continued exchangeId: " + exchange.getExchangeId(), LoggingLevel.WARN); } else { logger.log("NodeBreakpoint at node " + toNode + " is continued exchangeId: " + exchange.getExchangeId()); } } catch (InterruptedException e) { // ignore } } }
@Override public void beforeProcess(Exchange exchange, Processor processor, ProcessorDefinition<?> definition) { // store a copy of the message so we can see that from the debugger Date timestamp = new Date(); String toNode = definition.getId(); String routeId = ProcessorDefinitionHelper.getRouteId(definition); String exchangeId = exchange.getExchangeId(); String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(), true, 2, isBodyIncludeStreams(), isBodyIncludeFiles(), getBodyMaxChars()); long uid = debugCounter.incrementAndGet(); BacklogTracerEventMessage msg = new DefaultBacklogTracerEventMessage(uid, timestamp, routeId, toNode, exchangeId, messageAsXml); suspendedBreakpointMessages.put(toNode, msg); // suspend at this breakpoint SuspendedExchange se = new SuspendedExchange(exchange, new CountDownLatch(1)); suspendedBreakpoints.put(toNode, se); // now wait until we should continue logger.log("StepBreakpoint at node " + toNode + " is waiting to continue for exchangeId: " + exchange.getExchangeId()); try { boolean hit = se.getLatch().await(fallbackTimeout, TimeUnit.SECONDS); if (!hit) { logger.log("StepBreakpoint at node " + toNode + " timed out and is continued exchangeId: " + exchange.getExchangeId(), LoggingLevel.WARN); } else { logger.log("StepBreakpoint at node " + toNode + " is continued exchangeId: " + exchange.getExchangeId()); } } catch (InterruptedException e) { // ignore } }
@Override public boolean process(Exchange exchange, AsyncCallback callback) { StreamCache newBody = exchange.getIn().getBody(StreamCache.class); if (newBody != null) { exchange.getIn().setBody(newBody); } MessageHelper.resetStreamCache(exchange.getIn()); return processor.process(exchange, callback); }
protected Exchange prepareExchangeForRoutingSlip(Exchange current, Endpoint endpoint) { Exchange copy = new DefaultExchange(current); // we must use the same id as this is a snapshot strategy where Camel copies a snapshot // before processing the next step in the pipeline, so we have a snapshot of the exchange // just before. This snapshot is used if Camel should do redeliveries (re try) using // DeadLetterChannel. That is why it's important the id is the same, as it is the *same* // exchange being routed. copy.setExchangeId(current.getExchangeId()); copyOutToIn(copy, current); // ensure stream caching is reset MessageHelper.resetStreamCache(copy.getIn()); return copy; }
public void begin() { // we have already acquired and prepare the producer LOG.trace("RecipientProcessorExchangePair #{} begin: {}", index, exchange); exchange.setProperty(Exchange.RECIPIENT_LIST_ENDPOINT, endpoint.getEndpointUri()); // ensure stream caching is reset MessageHelper.resetStreamCache(exchange.getIn()); // if the MEP on the endpoint is different then if (pattern != null) { originalPattern = exchange.getPattern(); LOG.trace("Using exchangePattern: {} on exchange: {}", pattern, exchange); exchange.setPattern(pattern); } }
protected String getBodyAsString(Message message) { if (message.getBody() instanceof Future) { if (!isShowFuture()) { // just use a to string of the future object return message.getBody().toString(); } } return MessageHelper.extractBodyForLogging(message, "", isShowStreams(), isShowFiles(), getMaxChars(message)); }
protected void prepareExchangeForRedelivery(Exchange exchange, RedeliveryData data) { if (!redeliveryEnabled) { throw new IllegalStateException("Redelivery is not enabled on " + this + ". Make sure you have configured the error handler properly."); } // there must be a defensive copy of the exchange ObjectHelper.notNull(data.original, "Defensive copy of Exchange is null", this); // okay we will give it another go so clear the exception so we can try again exchange.setException(null); // clear rollback flags exchange.setProperty(Exchange.ROLLBACK_ONLY, null); // TODO: We may want to store these as state on RedeliveryData so we keep them in case end user messes with Exchange // and then put these on the exchange when doing a redelivery / fault processor // preserve these headers Integer redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); Integer redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class); Boolean redelivered = exchange.getIn().getHeader(Exchange.REDELIVERED, Boolean.class); // we are redelivering so copy from original back to exchange exchange.getIn().copyFrom(data.original.getIn()); exchange.setOut(null); // reset cached streams so they can be read again MessageHelper.resetStreamCache(exchange.getIn()); // put back headers if (redeliveryCounter != null) { exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter); } if (redeliveryMaxCounter != null) { exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter); } if (redelivered != null) { exchange.getIn().setHeader(Exchange.REDELIVERED, redelivered); } }
@Override public String browseRangeMessagesAsXml(Integer fromIndex, Integer toIndex, Boolean includeBody) { if (fromIndex == null) { fromIndex = 0; } if (toIndex == null) { toIndex = Integer.MAX_VALUE; } if (fromIndex > toIndex) { throw new IllegalArgumentException("From index cannot be larger than to index, was: " + fromIndex + " > " + toIndex); } List<Exchange> exchanges = getEndpoint().getExchanges(); if (exchanges.size() == 0) { return null; } StringBuilder sb = new StringBuilder(); sb.append("<messages>"); for (int i = fromIndex; i < exchanges.size() && i <= toIndex; i++) { Exchange exchange = exchanges.get(i); Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); String xml = MessageHelper.dumpAsXml(msg, includeBody); sb.append("\n").append(xml); } sb.append("\n</messages>"); return sb.toString(); }
protected TypeConversionException createTypeConversionException(Exchange exchange, Class<?> type, Object value, Throwable cause) { Object body; // extract the body for logging which allows to limit the message body in the exception/stacktrace // and also can be used to turn off logging sensitive message data if (exchange != null) { body = MessageHelper.extractValueForLogging(value, exchange.getIn()); } else { body = value; } return new TypeConversionException(body, type, cause); }
@Override public void interrupt(Exchange exchange) { AwaitThreadEntry entry = (AwaitThreadEntry) inflight.get(exchange); if (entry != null) { try { StringBuilder sb = new StringBuilder(); sb.append("Interrupted while waiting for asynchronous callback, will release the following blocked thread which was waiting for exchange to finish processing with exchangeId: "); sb.append(exchange.getExchangeId()); sb.append("\n"); sb.append(dumpBlockedThread(entry)); // dump a route stack trace of the exchange String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, exchangeFormatter, false); if (routeStackTrace != null) { sb.append(routeStackTrace); } LOG.warn(sb.toString()); } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } finally { if (statistics.isStatisticsEnabled()) { interruptedCounter.incrementAndGet(); } exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId())); entry.getLatch().countDown(); } } }
/** * Strategy method to extract the document from the exchange. */ protected Object getDocument(Exchange exchange, Object body) { try { return doGetDocument(exchange, body); } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } finally { // call the reset if the in message body is StreamCache MessageHelper.resetStreamCache(exchange.getIn()); } }
public void doWriteResponse(Message message, HttpServletResponse response, Exchange exchange) throws IOException { // set the status code in the response. Default is 200. if (message.getHeader(Exchange.HTTP_RESPONSE_CODE) != null) { int code = message.getHeader(Exchange.HTTP_RESPONSE_CODE, Integer.class); response.setStatus(code); } // set the content type in the response. String contentType = MessageHelper.getContentType(message); if (contentType != null) { response.setContentType(contentType); } // append headers // must use entrySet to ensure case of keys is preserved for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) { String key = entry.getKey(); Object value = entry.getValue(); // use an iterator as there can be multiple values. (must not use a delimiter) final Iterator<?> it = ObjectHelper.createIterator(value, null); while (it.hasNext()) { String headerValue = convertHeaderValueToString(exchange, it.next()); if (headerValue != null && headerFilterStrategy != null && !headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, exchange)) { response.addHeader(key, headerValue); } } } // write the body. if (message.getBody() != null) { if (GZIPHelper.isGzip(message)) { doWriteGZIPResponse(message, response, exchange); } else { doWriteDirectResponse(message, response, exchange); } } }
protected void populateResponse(Exchange exchange, HttpRequestBase httpRequest, HttpResponse httpResponse, Message in, HeaderFilterStrategy strategy, int responseCode) throws IOException, ClassNotFoundException { // We just make the out message is not create when extractResponseBody throws exception Object response = extractResponseBody(httpRequest, httpResponse, exchange, getEndpoint().isIgnoreResponseBody()); Message answer = exchange.getOut(); answer.setHeader(Exchange.HTTP_RESPONSE_CODE, responseCode); if (httpResponse.getStatusLine() != null) { answer.setHeader(Exchange.HTTP_RESPONSE_TEXT, httpResponse.getStatusLine().getReasonPhrase()); } answer.setBody(response); // propagate HTTP response headers Header[] headers = httpResponse.getAllHeaders(); for (Header header : headers) { String name = header.getName(); String value = header.getValue(); if (name.toLowerCase().equals("content-type")) { name = Exchange.CONTENT_TYPE; exchange.setProperty(Exchange.CHARSET_NAME, IOHelper.getCharsetNameFromContentType(value)); } // use http helper to extract parameter value as it may contain multiple values Object extracted = HttpHelper.extractHttpParameterValue(value); if (strategy != null && !strategy.applyFilterToExternalHeaders(name, extracted, exchange)) { HttpHelper.appendHeader(answer.getHeaders(), name, extracted); } } // endpoint might be configured to copy headers from in to out // to avoid overriding existing headers with old values just // filter the http protocol headers if (getEndpoint().isCopyHeaders()) { MessageHelper.copyHeaders(exchange.getIn(), answer, httpProtocolHeaderFilterStrategy, false); } }
@Override public Collection<KeyValueAnnotation> responseAnnotations() { KeyValueAnnotation key1 = KeyValueAnnotation.create("camel.client.endpoint.url", url); KeyValueAnnotation key2 = KeyValueAnnotation.create("camel.client.exchange.id", exchange.getExchangeId()); KeyValueAnnotation key3 = KeyValueAnnotation.create("camel.client.exchange.pattern", exchange.getPattern().name()); KeyValueAnnotation key4 = null; if (eventNotifier.isIncludeMessageBody() || eventNotifier.isIncludeMessageBodyStreams()) { boolean streams = eventNotifier.isIncludeMessageBodyStreams(); StreamCache cache = prepareBodyForLogging(exchange, streams); String body = MessageHelper.extractBodyForLogging(exchange.hasOut() ? exchange.getOut() : exchange.getIn(), "", streams, streams); key4 = KeyValueAnnotation.create("camel.client.exchange.message.response.body", body); if (cache != null) { cache.reset(); } } KeyValueAnnotation key5 = null; // lets capture http response code for http based components String responseCode = exchange.hasOut() ? exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE, String.class) : exchange.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE, String.class); if (responseCode != null) { key5 = KeyValueAnnotation.create("camel.client.exchange.message.response.code", responseCode); } List<KeyValueAnnotation> list = new ArrayList<>(); list.add(key1); list.add(key2); list.add(key3); if (key4 != null) { list.add(key4); } if (key5 != null) { list.add(key5); } return list; }
protected void populateResponse(Exchange exchange, JettyContentExchange httpExchange, Message in, HeaderFilterStrategy strategy, int responseCode) throws IOException { Message answer = exchange.getOut(); answer.setHeader(Exchange.HTTP_RESPONSE_CODE, responseCode); // must use response fields to get the http headers as // httpExchange.getHeaders() does not work well with multi valued headers Map<String, Collection<String>> headers = httpExchange.getResponseHeaders(); for (Map.Entry<String, Collection<String>> ent : headers.entrySet()) { String name = ent.getKey(); Collection<String> values = ent.getValue(); for (String value : values) { if (name.toLowerCase().equals("content-type")) { name = Exchange.CONTENT_TYPE; exchange.setProperty(Exchange.CHARSET_NAME, IOHelper.getCharsetNameFromContentType(value)); } if (strategy != null && !strategy.applyFilterToExternalHeaders(name, value, exchange)) { HttpHelper.appendHeader(answer.getHeaders(), name, value); } } } // preserve headers from in by copying any non existing headers // to avoid overriding existing headers with old values // We also need to apply the httpProtocolHeaderFilterStrategy to filter the http protocol header MessageHelper.copyHeaders(exchange.getIn(), answer, httpProtocolHeaderFilterStrategy, false); // extract body after headers has been set as we want to ensure content-type from Jetty HttpExchange // has been populated first answer.setBody(extractResponseBody(exchange, httpExchange)); }
@Override public Object toHttpRequest(ClientRequest clientRequest, Message message) { Object body = message.getBody(); // set the content type in the response. String contentType = MessageHelper.getContentType(message); if (contentType != null) { // set content-type clientRequest.getRequestHeaders().put(Headers.CONTENT_TYPE, contentType); LOG.trace("Content-Type: {}", contentType); } TypeConverter tc = message.getExchange().getContext().getTypeConverter(); //copy headers from Message to Request for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) { String key = entry.getKey(); Object value = entry.getValue(); // use an iterator as there can be multiple values. (must not use a delimiter) final Iterator<?> it = ObjectHelper.createIterator(value, null); while (it.hasNext()) { String headerValue = tc.convertTo(String.class, it.next()); if (headerValue != null && headerFilterStrategy != null && !headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, message.getExchange())) { LOG.trace("HTTP-Header: {}={}", key, headerValue); clientRequest.getRequestHeaders().add(new HttpString(key), headerValue); } } } return body; }
private void sendImageContent(boolean usingGZip) throws Exception { Endpoint endpoint = context.getEndpoint("http://localhost:{{port}}/myapp/myservice"); Exchange exchange = endpoint.createExchange(); if (usingGZip) { exchange.getIn().setHeader(Exchange.CONTENT_ENCODING, "gzip"); } template.send(endpoint, exchange); assertNotNull(exchange.getOut().getBody()); assertEquals("Get a wrong content-type ", MessageHelper.getContentType(exchange.getOut()), "image/jpeg"); }
@Test public void testMixedContentType() throws Exception { Endpoint endpoint = context.getEndpoint("http://localhost:{{port}}/myapp/myservice"); Exchange exchange = endpoint.createExchange(); exchange.getIn().setBody("<order>123</order>"); exchange.getIn().setHeader("Content-Type", "text/xml"); template.send(endpoint, exchange); String body = exchange.getOut().getBody(String.class); assertEquals("FAIL", body); assertEquals("Get a wrong content-type ", MessageHelper.getContentType(exchange.getOut()), "text/plain"); }
@Override public boolean process(Exchange exchange, AsyncCallback callback) { Message in = exchange.getIn(); Message out = exchange.getOut(); MessageHelper.copyHeaders(in, out, true); Long id = in.getBody(Long.class); switch (idGenOperationFor(exchange)) { case ADD_AND_GET: out.setBody(atomicSeq.addAndGet(id)); break; case GET: out.setBody(atomicSeq.get()); break; case GET_AND_ADD: out.setBody(atomicSeq.getAndAdd(id)); break; case GET_AND_INCREMENT: out.setBody(atomicSeq.getAndIncrement()); break; case INCREMENT_AND_GET: out.setBody(atomicSeq.incrementAndGet()); break; default: exchange.setException(new UnsupportedOperationException("Operation not supported by Ignite ID Generator producer.")); return true; } return true; }
@Override public void handle(AsyncResult<Message<Object>> event) { try { // preserve headers MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), false); Throwable e = event.cause(); if (e != null) { exchange.setException(e); } else { exchange.getOut().setBody(event.result().body()); } } finally { callback.done(false); } }
protected void populateResponse(Exchange exchange, HttpMethod method, Message in, HeaderFilterStrategy strategy, int responseCode) throws IOException, ClassNotFoundException { //We just make the out message is not create when extractResponseBody throws exception, Object response = extractResponseBody(method, exchange, getEndpoint().isIgnoreResponseBody()); Message answer = exchange.getOut(); answer.setHeader(Exchange.HTTP_RESPONSE_CODE, responseCode); answer.setHeader(Exchange.HTTP_RESPONSE_TEXT, method.getStatusText()); answer.setBody(response); // propagate HTTP response headers Header[] headers = method.getResponseHeaders(); for (Header header : headers) { String name = header.getName(); String value = header.getValue(); if (name.toLowerCase().equals("content-type")) { name = Exchange.CONTENT_TYPE; exchange.setProperty(Exchange.CHARSET_NAME, IOHelper.getCharsetNameFromContentType(value)); } // use http helper to extract parameter value as it may contain multiple values Object extracted = HttpHelper.extractHttpParameterValue(value); if (strategy != null && !strategy.applyFilterToExternalHeaders(name, extracted, exchange)) { HttpHelper.appendHeader(answer.getHeaders(), name, extracted); } } // endpoint might be configured to copy headers from in to out // to avoid overriding existing headers with old values just // filter the http protocol headers if (getEndpoint().isCopyHeaders()) { MessageHelper.copyHeaders(exchange.getIn(), answer, httpProtocolHeaderFilterStrategy, false); } }
@Override public void onStatusReceived(AhcEndpoint endpoint, Exchange exchange, HttpResponseStatus responseStatus) throws Exception { // preserve headers from in by copying any non existing headers // to avoid overriding existing headers with old values // Just filter the http protocol headers MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), httpProtocolHeaderFilterStrategy, false); exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, responseStatus.getStatusCode()); exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_TEXT, responseStatus.getStatusText()); }
private String getMimeType(Message message) throws NoSuchHeaderException { String mimeType = message.getHeader(PropertyIds.CONTENT_STREAM_MIME_TYPE, String.class); if (mimeType == null) { mimeType = MessageHelper.getContentType(message); } return mimeType; }
public Object format(final TraceInterceptor interceptor, final ProcessorDefinition<?> node, final Exchange exchange) { Message in = exchange.getIn(); Message out = null; if (exchange.hasOut()) { out = exchange.getOut(); } StringBuilder sb = new StringBuilder(); sb.append(extractBreadCrumb(interceptor, node, exchange)); if (showExchangePattern) { sb.append(", Pattern:").append(exchange.getPattern()); } // only show properties if we have any if (showProperties && !exchange.getProperties().isEmpty()) { sb.append(", Properties:").append(exchange.getProperties()); } // only show headers if we have any if (showHeaders && !in.getHeaders().isEmpty()) { sb.append(", Headers:").append(in.getHeaders()); } if (showBodyType) { sb.append(", BodyType:").append(MessageHelper.getBodyTypeName(in)); } if (showBody) { sb.append(", Body:").append(MessageHelper.extractBodyForLogging(in, "")); } if (showOutHeaders && out != null) { sb.append(", OutHeaders:").append(out.getHeaders()); } if (showOutBodyType && out != null) { sb.append(", OutBodyType:").append(MessageHelper.getBodyTypeName(out)); } if (showOutBody && out != null) { sb.append(", OutBody:").append(MessageHelper.extractBodyForLogging(out, "")); } if (showException && exchange.getException() != null) { sb.append(", Exception:").append(exchange.getException()); } // replace ugly <<<, with <<< String s = sb.toString(); s = s.replaceFirst("<<<,", "<<<"); if (maxChars > 0) { if (s.length() > maxChars) { s = s.substring(0, maxChars) + "..."; } return s; } else { return s; } }
/** * Returns the message history (including exchange details or not) */ public static Expression messageHistoryExpression(final boolean detailed) { return new ExpressionAdapter() { private ExchangeFormatter formatter; public Object evaluate(Exchange exchange) { ExchangeFormatter ef = null; if (detailed) { // use the exchange formatter to log exchange details ef = getOrCreateExchangeFormatter(exchange.getContext()); } return MessageHelper.dumpMessageHistoryStacktrace(exchange, ef, false); } private ExchangeFormatter getOrCreateExchangeFormatter(CamelContext camelContext) { if (formatter == null) { Set<ExchangeFormatter> formatters = camelContext.getRegistry().findByType(ExchangeFormatter.class); if (formatters != null && formatters.size() == 1) { formatter = formatters.iterator().next(); } else { // setup exchange formatter to be used for message history dump DefaultExchangeFormatter def = new DefaultExchangeFormatter(); def.setShowExchangeId(true); def.setMultiline(true); def.setShowHeaders(true); def.setStyle(DefaultExchangeFormatter.OutputStyle.Fixed); try { Integer maxChars = CamelContextHelper.parseInteger(camelContext, camelContext.getProperty(Exchange.LOG_DEBUG_BODY_MAX_CHARS)); if (maxChars != null) { def.setMaxChars(maxChars); } } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } formatter = def; } } return formatter; } @Override public String toString() { return "messageHistory(" + detailed + ")"; } }; }
@Override public Collection<KeyValueAnnotation> responseAnnotations() { String id = exchange.getExchangeId(); String mep = exchange.getPattern().name(); KeyValueAnnotation key1 = KeyValueAnnotation.create("camel.server.endpoint.url", url); KeyValueAnnotation key2 = KeyValueAnnotation.create("camel.server.exchange.id", id); KeyValueAnnotation key3 = KeyValueAnnotation.create("camel.server.exchange.pattern", mep); KeyValueAnnotation key4 = null; if (exchange.getException() != null) { String message = exchange.getException().getMessage(); key4 = KeyValueAnnotation.create("camel.server.exchange.failure", message); } else if (eventNotifier.isIncludeMessageBody() || eventNotifier.isIncludeMessageBodyStreams()) { boolean streams = eventNotifier.isIncludeMessageBodyStreams(); StreamCache cache = prepareBodyForLogging(exchange, streams); String body = MessageHelper.extractBodyForLogging(exchange.hasOut() ? exchange.getOut() : exchange.getIn(), "", streams, streams); key4 = KeyValueAnnotation.create("camel.server.exchange.message.response.body", body); if (cache != null) { cache.reset(); } } KeyValueAnnotation key5 = null; // lets capture http response code for http based components String responseCode = exchange.hasOut() ? exchange.getOut().getHeader(Exchange.HTTP_RESPONSE_CODE, String.class) : exchange.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE, String.class); if (responseCode != null) { key5 = KeyValueAnnotation.create("camel.server.exchange.message.response.code", responseCode); } List<KeyValueAnnotation> list = new ArrayList<>(); list.add(key1); list.add(key2); list.add(key3); if (key4 != null) { list.add(key4); } if (key5 != null) { list.add(key5); } return list; }
private void copyHeaders(Exchange exchange) { MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), false); }
@Override public Object toHttpResponse(HttpServerExchange httpExchange, Message message) { boolean failed = message.getExchange().isFailed(); int defaultCode = failed ? 500 : 200; int code = message.getHeader(Exchange.HTTP_RESPONSE_CODE, defaultCode, int.class); httpExchange.setResponseCode(code); TypeConverter tc = message.getExchange().getContext().getTypeConverter(); //copy headers from Message to Response for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) { String key = entry.getKey(); Object value = entry.getValue(); // use an iterator as there can be multiple values. (must not use a delimiter) final Iterator<?> it = ObjectHelper.createIterator(value, null); while (it.hasNext()) { String headerValue = tc.convertTo(String.class, it.next()); if (headerValue != null && headerFilterStrategy != null && !headerFilterStrategy.applyFilterToCamelHeaders(key, headerValue, message.getExchange())) { LOG.trace("HTTP-Header: {}={}", key, headerValue); httpExchange.getResponseHeaders().add(new HttpString(key), headerValue); } } } Exception exception = message.getExchange().getException(); if (exception != null) { httpExchange.getResponseHeaders().put(Headers.CONTENT_TYPE, MimeMappings.DEFAULT_MIME_MAPPINGS.get("txt")); StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); exception.printStackTrace(pw); String exceptionMessage = sw.toString(); ExchangeHelper.setFailureHandled(message.getExchange()); return exceptionMessage; } // set the content type in the response. String contentType = MessageHelper.getContentType(message); if (contentType != null) { // set content-type httpExchange.getResponseHeaders().put(Headers.CONTENT_TYPE, contentType); LOG.trace("Content-Type: {}", contentType); } return message.getBody(); }
/** * Creates a dynamic context for the given exchange */ protected DynamicQueryContext createDynamicContext(Exchange exchange) throws Exception { Configuration config = getConfiguration(); DynamicQueryContext dynamicQueryContext = new DynamicQueryContext(config); Message in = exchange.getIn(); Item item = null; if (ObjectHelper.isNotEmpty(getHeaderName())) { item = in.getHeader(getHeaderName(), Item.class); } else { item = in.getBody(Item.class); } if (item != null) { dynamicQueryContext.setContextItem(item); } else { Object body = null; if (ObjectHelper.isNotEmpty(getHeaderName())) { body = in.getHeader(getHeaderName()); } else { body = in.getBody(); } // the underlying input stream, which we need to close to avoid locking files or other resources InputStream is = null; try { Source source; // only convert to input stream if really needed if (isInputStreamNeeded(exchange)) { if (ObjectHelper.isNotEmpty(getHeaderName())) { is = exchange.getIn().getHeader(getHeaderName(), InputStream.class); } else { is = exchange.getIn().getBody(InputStream.class); } source = getSource(exchange, is); } else { source = getSource(exchange, body); } // special for bean invocation if (source == null) { if (body instanceof BeanInvocation) { // if its a null bean invocation then handle that BeanInvocation bi = exchange.getContext().getTypeConverter().convertTo(BeanInvocation.class, body); if (bi.getArgs() != null && bi.getArgs().length == 1 && bi.getArgs()[0] == null) { // its a null argument from the bean invocation so use null as answer source = null; } } } if (source == null) { // indicate it was not possible to convert to a Source type throw new NoTypeConversionAvailableException(body, Source.class); } DocumentInfo doc = config.buildDocument(source); dynamicQueryContext.setContextItem(doc); } finally { // can deal if is is null IOHelper.close(is); } } configureQuery(dynamicQueryContext, exchange); // call the reset if the in message body is StreamCache MessageHelper.resetStreamCache(exchange.getIn()); return dynamicQueryContext; }