我有线程池,它将带一个Callable工作线程RejectionHandler。我需要获得此Callable任务,RejectionHandler但无法获得它。
RejectionHandler
Callable
在下面的示例中,我需要为RejectionHandler执行的Callable任务的uniqueId。在RejecitonHandler中,Runnable浇铸FutureTask,我希望它应该被强制转换为Callable工作线程。
请帮助我在中获取CallableWorker线程实例RejectionHandler。
CallableWorker
import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class RejectionDemo { RejectionDemo(){ Random random = new Random(); ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), new RejectionHandlerImpl()); CallableWorkerThread workers[] = new CallableWorkerThread[10]; for (int i=0; i< workers.length; i++){ workers[i] = new CallableWorkerThread(random.nextInt(100)); FutureTask<Integer> task = new FutureTask<Integer>(workers[i]); executor.submit(task); } } public static void main(String args[]){ RejectionDemo demo = new RejectionDemo(); } public class CallableWorkerThread implements Callable<Integer> { private int uniqueId; CallableWorkerThread(int uniqueId) { this.uniqueId = uniqueId; } public Integer call() { System.out.println("Unique id="+uniqueId); return uniqueId; } public String toString(){ return ""+uniqueId; } } class RejectionHandlerImpl implements RejectedExecutionHandler{ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try{ System.out.println(r); }catch(Throwable t){ t.printStackTrace(); } } } }
输出量
java.util.concurrent.FutureTask@70036428 Unique id=68 java.util.concurrent.FutureTask@6ea4b78b java.util.concurrent.FutureTask@e3f6d java.util.concurrent.FutureTask@1ce84763 java.util.concurrent.FutureTask@55a6c368 java.util.concurrent.FutureTask@4e77b794 java.util.concurrent.FutureTask@15b57dcb Unique id=55 Unique id=83
我期待CallableWorkerThread而不是FutureTask。帮助我获取WorkerThread实例。
在你的代码中
workers[i] = new CallableWorkerThread(random.nextInt(100)); FutureTask<Integer> task = new FutureTask<Integer>(workers[i]); executor.submit(task);
您创建了一个FutureTask包装CallableWorkerThread实例的,但是您正在使用submit接受一个任意值Runnable并返回FutureTask包装一个的Runnable。
FutureTask
CallableWorkerThread
换句话说,您将自己包装FutureTask在另一个中FutureTask。有两种解决方法
workers[i] = new CallableWorkerThread(random.nextInt(100)); executor.submit(workers[i]);
让ExecutorService你包在Callable里面FutureTask。
workers[i] = new CallableWorkerThread(random.nextInt(100)); executor.execute(new FutureTask<Integer>(workers[i]));
Callable手动包装并排队,Runnable无需进一步包装(请注意使用execute而不是submit)
由于您要启用对original的检索Callable,因此第二个选项适合您,因为它可以让您完全控制FutureTask实例:
static class MyFutureTask<T> extends FutureTask<T> { final Callable<T> theCallable; public MyFutureTask(Callable<T> callable) { super(callable); theCallable=callable; } }
提交代码:
for (int i=0; i< workers.length; i++){ workers[i] = new CallableWorkerThread(random.nextInt(100)); executor.execute(new MyFutureTask<Integer>(workers[i])); }
RejectedExecutionHandler:
class RejectionHandlerImpl implements RejectedExecutionHandler{ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if(r instanceof MyFutureTask) { MyFutureTask<?> myFutureTask = (MyFutureTask)r; Callable<?> c=myFutureTask.theCallable; System.out.println(c); } else System.out.println(r); } }