Java 类com.lmax.disruptor.WorkerPool 实例源码

项目:game-executor    文件:DisruptorExecutorService.java   
@Override
    public void startup() {
        EventBus eventBus = disruptorDispatchThread.getEventBus();
        executorService = new NonOrderedQueuePoolExecutor(poolName, excutorSize);
        cycleEventHandler = new CycleEventHandler[excutorSize];
        for(int i = 0; i < excutorSize; i++){
            cycleEventHandler[i] = new CycleEventHandler(eventBus);
        }

        RingBuffer ringBuffer = disruptorDispatchThread.getRingBuffer();
        workerPool = new WorkerPool(ringBuffer, ringBuffer.newBarrier(), new FatalExceptionHandler(), cycleEventHandler);
        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());

        workerPool.start(executorService);

//        BatchEventProcessor<CycleEvent>[] batchEventProcessors = new BatchEventProcessor[excutorSize];
//        for(int i = 0; i < excutorSize; i++){
//            batchEventProcessors[i] = new BatchEventProcessor<>(ringBuffer, sequenceBarrier, cycleEventHandler[i]);
//            ringBuffer.addGatingSequences(batchEventProcessors[i].getSequence());
////            executorService.submit(batchEventProcessors[i]);
//        }
    }
项目:disruptor-code-analysis    文件:Disruptor.java   
EventHandlerGroup<T> createWorkerPool(
        final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers) {
    final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
    final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
    consumerRepository.add(workerPool, sequenceBarrier);
    return new EventHandlerGroup<T>(this, consumerRepository, workerPool.getWorkerSequences());
}
项目:darks-grid    文件:DisruptorEventsChannel.java   
@Override
public boolean initialize(EventsChannelConfig config)
{
       super.initialize(config);
    log.info("Initialize disruptor events channel " + config.getName() + " with " + config);
    EventFactory<GridEvent> eventFactory = new DisruptorEventFactory();
       int ringBufferSize = config.getBlockQueueMaxNumber(); 
       int threadSize = config.getEventConsumerNumber();
       int bufferSize = ringBufferSize;
       if (Integer.bitCount(bufferSize) != 1)
       {
           bufferSize = (int) Math.pow(2, (int) (Math.log(ringBufferSize) / Math.log(2)));
           log.warn("Change disruptor events channel " + config.getName() + 
                   " buffer size from " + ringBufferSize + " to " + bufferSize);
       }
       if (bufferSize <= 0)
           throw new GridException("Invalid disruptor ringbuffur size:" + ringBufferSize);
       threadPool = Executors.newFixedThreadPool(threadSize);
       ringBuffer = RingBuffer.createMultiProducer(eventFactory, bufferSize, new BlockingWaitStrategy());  
       SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
       ExecutorService executor = Executors.newFixedThreadPool(10);  
       @SuppressWarnings("unchecked")
       WorkHandler<GridEvent>[] workHandlers = new WorkHandler[threadSize];  
       for (int i = 0; i < threadSize; i++) {  
           WorkHandler<GridEvent> workHandler = new DisruptorEventsWorkHandler(getName());  
           workHandlers[i] = workHandler;  
       }  

       workerPool = new WorkerPool<GridEvent>(ringBuffer, sequenceBarrier, 
               new IgnoreExceptionHandler(), workHandlers);  
       workerPool.start(executor);  
    return true;
}
项目:devicehive-java-server    文件:KafkaRpcServer.java   
public KafkaRpcServer(WorkerPool<ServerEvent> workerPool, RequestConsumer requestConsumer, ServerEventHandler eventHandler,
                      int workerThreads) {
    this.workerPool = workerPool;
    this.requestConsumer = requestConsumer;
    this.eventHandler = eventHandler;
    this.workerThreads = workerThreads;
}
项目:devicehive-java-server    文件:ServerBuilder.java   
public RpcServer build() {
    final int workerThreads = 3;
    Producer<String, Response> responseProducer = new KafkaProducer<>(producerProps, new StringSerializer(), producerValueSerializer);
    final ServerEventHandler[] workHandlers = new ServerEventHandler[workerThreads];
    IntStream.range(0, workerThreads).forEach(
            nbr -> workHandlers[nbr] = new ServerEventHandler(requestHandler, responseProducer)
    );
    final WorkerPool<ServerEvent> workerPool = new WorkerPool<>(ServerEvent::new, new FatalExceptionHandler(), workHandlers);

    RequestConsumer requestConsumer = new RequestConsumer(topic, consumerProps, consumerThreads, consumerValueDeserializer);
    return new KafkaRpcServer(workerPool, requestConsumer, new ServerEventHandler(requestHandler, responseProducer), workerThreads);
}
项目:game-executor    文件:DisruptorExecutorService.java   
public WorkerPool getWorkerPool() {
    return workerPool;
}
项目:game-executor    文件:DisruptorExecutorService.java   
public void setWorkerPool(WorkerPool workerPool) {
    this.workerPool = workerPool;
}
项目:annotated-src    文件:Disruptor.java   
EventHandlerGroup<T> createWorkerPool(final Sequence[] barrierSequences, final WorkHandler<T>[] workHandlers) {
    final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
    final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
    consumerRepository.add(workerPool, sequenceBarrier);
    return new EventHandlerGroup<T>(this, consumerRepository, workerPool.getWorkerSequences());
}
项目:Ehcache_LMAXBulkLoader    文件:LaunchLoader.java   
private static void bulkLoad(
        final Cache cache, long entriesToLoad,double[] bbox)
{
    final AtomicLong atomicLong = new AtomicLong();
    final ValueHandler[] valueHandlers = new ValueHandler[]{
            new ValueHandler(cache, atomicLong),
            new ValueHandler(cache, atomicLong),
            new ValueHandler(cache, atomicLong),
            new ValueHandler(cache, atomicLong)
    };

    final ExecutorService executorService = Executors.newCachedThreadPool();
    try
    {
        final RingBuffer<ValueEvent> ringBuffer =
                RingBuffer.createSingleProducer(ValueEvent.EVENT_FACTORY,
                        4096,
                        new YieldingWaitStrategy());

        final WorkerPool<ValueEvent> workerPool =
                new WorkerPool<ValueEvent>(ringBuffer,
                        ringBuffer.newBarrier(),
                        new FatalExceptionHandler(),
                        valueHandlers);

        workerPool.start(executorService);
        try
        {
            publishLoadEvents(ringBuffer,entriesToLoad,bbox);
        }
        finally
        {
            workerPool.drainAndHalt();
        }
    }
    finally
    {
        executorService.shutdown();
    }
    System.out.format("Put %d elements.", atomicLong.get());
}
项目:jetstream    文件:RequestQueueProcessor.java   
/**
 * @param maxQueueSz
 * @param numThreads
 * @param name
 */
public RequestQueueProcessor(int maxQueueSz, int numThreads, String name) {

    m_name = name;
    m_maxQueueSz = maxQueueSz;

    m_ringBuffer = RingBuffer.createMultiProducer(m_eventFactory,
            normalizeBufferSize(m_maxQueueSz), new BlockingWaitStrategy());
    m_barrier = m_ringBuffer.newBarrier();

    m_numThreads = numThreads;

    QueueProcessorWorkHandler[] handlers = new QueueProcessorWorkHandler[m_numThreads];

    for (int i = 0; i < m_numThreads; i++) {
        handlers[i] = new QueueProcessorWorkHandler();
    }

    m_worker = new WorkerPool(m_ringBuffer, m_barrier,
            new QueueProcessorExceptionHandler(m_dropCounter, name),
            handlers);


    m_ringBuffer.addGatingSequences(m_worker.getWorkerSequences());

    m_executor = Executors.newFixedThreadPool(m_numThreads,
            new NameableThreadFactory(name));
    m_ringBuffer = m_worker.start(m_executor);

}