Java 类org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy 实例源码

项目:Camel    文件:MulticastProcessor.java   
@Override
public void run() {
    AggregationStrategy strategy = getAggregationStrategy(null);
    if (strategy instanceof DelegateAggregationStrategy) {
        strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
    }
    if (strategy instanceof TimeoutAwareAggregationStrategy) {
        // notify the strategy we timed out
        Exchange oldExchange = result.get();
        if (oldExchange == null) {
            // if they all timed out the result may not have been set yet, so use the original exchange
            oldExchange = original;
        }
        ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, aggregated.intValue(), total.intValue(), timeout);
    } else {
        // log a WARN we timed out since it will not be aggregated and the Exchange will be lost
        LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated.intValue());
    }
    LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated.intValue());
    timedOut.set(true);

    // mark that index as timed out, which allows us to try to retrieve
    // any already completed tasks in the next loop
    if (completion instanceof SubmitOrderedCompletionService) {
        ((SubmitOrderedCompletionService<?>) completion).timeoutTask();
    }

    // we timed out so increment the counter
    aggregated.incrementAndGet();
}