小编典典

从ThreadPoolTask​​Executor获得可调用或将Runnable强制转换为Callable

java

我正在使用ThreadPoolTask​​Executor执行我的任务,这些任务是Callable接口的实现。我只是想及时检查任务是否仍在池中(监视)。怎么做?我知道可以从ThreadPoolExecutor获得队列,但是如何将Runnable强制转换为Callable?

基本上我有这个可通话的

public interface IFormatter extends Callable<Integer>{
    Long getOrderId();
}

我正在这样执行

ThreadPoolExecutor.submit(new Formatter(order));

最后,我想以某种异步方法遍历ExecutorService的队列,并检查是否具有orderId的线程仍然存在。


阅读 320

收藏
2020-11-30

共1个答案

小编典典

正如在解释这个答案,你可能会通过控制FutureTask包裹的Callable通过手动创建它,并通过排队execute。否则,submit将您包装Callable到一个ExecutorService特定的对象中并将其放入队列中,从而使得无法Callable通过标准API
查询via的属性。

使用自定义 FutureTask

class MyFutureTask extends FutureTask<Integer> {
    final IFormatter theCallable;

    public MyFutureTask(IFormatter callable) {
        super(callable);
        theCallable=callable;
    }
    Long getOrderId() {
        return theCallable.getOrderId();
    }
}

通过其入队threadPoolExecutor.execute(new MyFutureTask(new Formatter(order)));

您可以查询队列中的订单ID:

public static boolean isEnqueued(ThreadPoolExecutor e, Long id) {
    for(Object o: e.getQueue().toArray()) {
        if(o instanceof MyFutureTask && Objects.equals(((MyFutureTask)o).getOrderId(), id))
            return true;
    }
    return false;
}

这适用于任何对象ExecutorService(假设它有一个队列)。如果仅使用一个ThreadPoolExecutor,则可以自定义FutureTask实例的创建(从Java6开始),而不是依靠提交者来创建它:

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
        TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
            workQueue, threadFactory);
    }
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
        TimeUnit unit, BlockingQueue<Runnable> workQueue,
        RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
            workQueue, handler);
    }
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
        TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
        RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
            workQueue, threadFactory, handler);
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        if(callable instanceof IFormatter)
            return (FutureTask<T>)new MyFutureTask((IFormatter)callable);
        return super.newTaskFor(callable);
    }
}

然后,使用实例MyThreadPoolExecutor代替实例的ThreadPoolExecutor每次提交IFormatter都会自动使用MyFutureTask代替standard进行包装FutureTask。缺点是,这仅适用于此特定ExecutorService方法,而通用方法会针对特殊处理生成未经检查的警告。

2020-11-30