public TBoundedThreadPoolServer(Args options, ThriftMetrics metrics) { super(options); if (options.maxQueuedRequests > 0) { this.callQueue = new CallQueue( new LinkedBlockingQueue<Call>(options.maxQueuedRequests), metrics); } else { this.callQueue = new CallQueue(new SynchronousQueue<Call>(), metrics); } ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setDaemon(true); tfb.setNameFormat("thrift-worker-%d"); executorService = new ThreadPoolExecutor(options.minWorkerThreads, options.maxWorkerThreads, options.threadKeepAliveTimeSec, TimeUnit.SECONDS, this.callQueue, tfb.build()); serverOptions = options; }
private static ExecutorService createExecutor( int workerThreads, ThriftMetrics metrics) { CallQueue callQueue = new CallQueue( new LinkedBlockingQueue<Call>(), metrics); ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setDaemon(true); tfb.setNameFormat("thrift2-worker-%d"); ThreadPoolExecutor pool = new ThreadPoolExecutor(workerThreads, workerThreads, Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build()); pool.prestartAllCoreThreads(); return pool; }
@Test(timeout = 60000) public void testPutTake() throws Exception { ThriftMetrics metrics = createMetrics(); CallQueue callQueue = new CallQueue( new LinkedBlockingQueue<Call>(), metrics); for (int i = 0; i < elementsAdded; ++i) { callQueue.put(createDummyRunnable()); } for (int i = 0; i < elementsRemoved; ++i) { callQueue.take(); } verifyMetrics(metrics, "timeInQueue_num_ops", elementsRemoved); }
@Test(timeout = 60000) public void testOfferPoll() throws Exception { ThriftMetrics metrics = createMetrics(); CallQueue callQueue = new CallQueue( new LinkedBlockingQueue<Call>(), metrics); for (int i = 0; i < elementsAdded; ++i) { callQueue.offer(createDummyRunnable()); } for (int i = 0; i < elementsRemoved; ++i) { callQueue.poll(); } verifyMetrics(metrics, "timeInQueue_num_ops", elementsRemoved); }
private static ExecutorService createExecutor( int workerThreads, ThriftMetrics metrics) { CallQueue callQueue = new CallQueue( new LinkedBlockingQueue<Call>(), metrics); ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setDaemon(true); tfb.setNameFormat("thrift2-worker-%d"); return new ThreadPoolExecutor(workerThreads, workerThreads, Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build()); }
@Test(timeout=60000) public void testPutTake() throws Exception { ThriftMetrics metrics = createMetrics(); CallQueue callQueue = new CallQueue( new LinkedBlockingQueue<Call>(), metrics); for (int i = 0; i < elementsAdded; ++i) { callQueue.put(createDummyRunnable()); } for (int i = 0; i < elementsRemoved; ++i) { callQueue.take(); } verifyMetrics(metrics, "timeInQueue_num_ops", elementsRemoved); }
@Test(timeout=60000) public void testOfferPoll() throws Exception { ThriftMetrics metrics = createMetrics(); CallQueue callQueue = new CallQueue( new LinkedBlockingQueue<Call>(), metrics); for (int i = 0; i < elementsAdded; ++i) { callQueue.offer(createDummyRunnable()); } for (int i = 0; i < elementsRemoved; ++i) { callQueue.poll(); } verifyMetrics(metrics, "timeInQueue_num_ops", elementsRemoved); }
@Test(timeout=3000) public void testPutTake() throws Exception { ThriftMetrics metrics = createMetrics(); CallQueue callQueue = new CallQueue( new LinkedBlockingQueue<Call>(), metrics); for (int i = 0; i < elementsAdded; ++i) { callQueue.put(createDummyRunnable()); } for (int i = 0; i < elementsRemoved; ++i) { callQueue.take(); } verifyMetrics(metrics, "timeInQueue_num_ops", elementsRemoved); }
@Test(timeout=3000) public void testOfferPoll() throws Exception { ThriftMetrics metrics = createMetrics(); CallQueue callQueue = new CallQueue( new LinkedBlockingQueue<Call>(), metrics); for (int i = 0; i < elementsAdded; ++i) { callQueue.offer(createDummyRunnable()); } for (int i = 0; i < elementsRemoved; ++i) { callQueue.poll(); } verifyMetrics(metrics, "timeInQueue_num_ops", elementsRemoved); }