我正在使用ThreadPoolTaskExecutor执行我的任务,这些任务是Callable接口的实现。我只是想及时检查任务是否仍在池中(监视)。怎么做?我知道可以从ThreadPoolExecutor获得队列,但是如何将Runnable强制转换为Callable?
基本上我有这个可通话的
public interface IFormatter extends Callable<Integer>{ Long getOrderId(); }
我正在这样执行
ThreadPoolExecutor.submit(new Formatter(order));
最后,我想以某种异步方法遍历ExecutorService的队列,并检查是否具有orderId的线程仍然存在。
正如在解释这个答案,你可能会通过控制FutureTask包裹的Callable通过手动创建它,并通过排队execute。否则,submit将您包装Callable到一个ExecutorService特定的对象中并将其放入队列中,从而使得无法Callable通过标准API 查询via的属性。
FutureTask
Callable
execute
submit
ExecutorService
使用自定义 FutureTask
class MyFutureTask extends FutureTask<Integer> { final IFormatter theCallable; public MyFutureTask(IFormatter callable) { super(callable); theCallable=callable; } Long getOrderId() { return theCallable.getOrderId(); } }
通过其入队threadPoolExecutor.execute(new MyFutureTask(new Formatter(order)));,
threadPoolExecutor.execute(new MyFutureTask(new Formatter(order)));
您可以查询队列中的订单ID:
public static boolean isEnqueued(ThreadPoolExecutor e, Long id) { for(Object o: e.getQueue().toArray()) { if(o instanceof MyFutureTask && Objects.equals(((MyFutureTask)o).getOrderId(), id)) return true; } return false; }
这适用于任何对象ExecutorService(假设它有一个队列)。如果仅使用一个ThreadPoolExecutor,则可以自定义FutureTask实例的创建(从Java6开始),而不是依靠提交者来创建它:
ThreadPoolExecutor
public class MyThreadPoolExecutor extends ThreadPoolExecutor { public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } @Override protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { if(callable instanceof IFormatter) return (FutureTask<T>)new MyFutureTask((IFormatter)callable); return super.newTaskFor(callable); } }
然后,使用实例MyThreadPoolExecutor代替实例的ThreadPoolExecutor每次提交IFormatter都会自动使用MyFutureTask代替standard进行包装FutureTask。缺点是,这仅适用于此特定ExecutorService方法,而通用方法会针对特殊处理生成未经检查的警告。
MyThreadPoolExecutor
IFormatter
MyFutureTask