@Override public void run() { AggregationStrategy strategy = getAggregationStrategy(null); if (strategy instanceof DelegateAggregationStrategy) { strategy = ((DelegateAggregationStrategy) strategy).getDelegate(); } if (strategy instanceof TimeoutAwareAggregationStrategy) { // notify the strategy we timed out Exchange oldExchange = result.get(); if (oldExchange == null) { // if they all timed out the result may not have been set yet, so use the original exchange oldExchange = original; } ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, aggregated.intValue(), total.intValue(), timeout); } else { // log a WARN we timed out since it will not be aggregated and the Exchange will be lost LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated.intValue()); } LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated.intValue()); timedOut.set(true); // mark that index as timed out, which allows us to try to retrieve // any already completed tasks in the next loop if (completion instanceof SubmitOrderedCompletionService) { ((SubmitOrderedCompletionService<?>) completion).timeoutTask(); } // we timed out so increment the counter aggregated.incrementAndGet(); }