Java 类com.lmax.disruptor.IgnoreExceptionHandler 实例源码

项目:darks-grid    文件:DisruptorEventsChannel.java   
@Override
public boolean initialize(EventsChannelConfig config)
{
       super.initialize(config);
    log.info("Initialize disruptor events channel " + config.getName() + " with " + config);
    EventFactory<GridEvent> eventFactory = new DisruptorEventFactory();
       int ringBufferSize = config.getBlockQueueMaxNumber(); 
       int threadSize = config.getEventConsumerNumber();
       int bufferSize = ringBufferSize;
       if (Integer.bitCount(bufferSize) != 1)
       {
           bufferSize = (int) Math.pow(2, (int) (Math.log(ringBufferSize) / Math.log(2)));
           log.warn("Change disruptor events channel " + config.getName() + 
                   " buffer size from " + ringBufferSize + " to " + bufferSize);
       }
       if (bufferSize <= 0)
           throw new GridException("Invalid disruptor ringbuffur size:" + ringBufferSize);
       threadPool = Executors.newFixedThreadPool(threadSize);
       ringBuffer = RingBuffer.createMultiProducer(eventFactory, bufferSize, new BlockingWaitStrategy());  
       SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
       ExecutorService executor = Executors.newFixedThreadPool(10);  
       @SuppressWarnings("unchecked")
       WorkHandler<GridEvent>[] workHandlers = new WorkHandler[threadSize];  
       for (int i = 0; i < threadSize; i++) {  
           WorkHandler<GridEvent> workHandler = new DisruptorEventsWorkHandler(getName());  
           workHandlers[i] = workHandler;  
       }  

       workerPool = new WorkerPool<GridEvent>(ringBuffer, sequenceBarrier, 
               new IgnoreExceptionHandler(), workHandlers);  
       workerPool.start(executor);  
    return true;
}
项目:injector    文件:DisruptorExecutor.java   
public DisruptorExecutor(int threadCount, int bufferSize, WaitStrategy waitStrategy)
{
    ringBuffer = RingBuffer.createMultiProducer(new EventFactory<RContainer>()
    {

        @Override
        public RContainer newInstance()
        {
            return new RContainer();
        }
    }, bufferSize, waitStrategy);
    SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
    Sequence workSequence = new Sequence(-1);
    workProcessors = new WorkProcessor[threadCount];
    for (int i = 0 ; i < threadCount ; i++)
    {
        workProcessors[i] = new WorkProcessor<RContainer>(ringBuffer, sequenceBarrier,
            handler, new IgnoreExceptionHandler(), workSequence);
    }
    workExec = Executors.newFixedThreadPool(workProcessors.length, new ThreadFactory()
    {
        public Thread newThread(Runnable r)
        {
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        }
    });
    for (WorkProcessor p : workProcessors)
        workExec.execute(p);
}
项目:related    文件:RoundRobinRelatedItemIndexingMessageEventHandler.java   
public RoundRobinRelatedItemIndexingMessageEventHandler(final Configuration configuration,
                                                        RelatedItemIndexingMessageConverter converter,
                                                        RelatedItemReferenceMessageFactory messageFactory,
                                                        RelatedItemReferenceEventHandlerFactory relatedItemIndexingEventHandlerFactory
) {
    this.configuration = configuration;
    this.converter = converter;
    int numberOfIndexingRequestProcessors = Util.ceilingNextPowerOfTwo(configuration.getNumberOfIndexingRequestProcessors());
    disruptors = new Disruptor[numberOfIndexingRequestProcessors];
    executors = new ExecutorService[numberOfIndexingRequestProcessors];
    handlers = new RelatedItemReferenceEventHandler[numberOfIndexingRequestProcessors];

    mask = numberOfIndexingRequestProcessors-1;
    final int sizeOfQueue;
    if(configuration.getSizeOfBatchIndexingRequestQueue()==-1) {
        sizeOfQueue = Util.ceilingNextPowerOfTwo(configuration.getSizeOfIncomingMessageQueue()/numberOfIndexingRequestProcessors);
    } else {
        sizeOfQueue = Util.ceilingNextPowerOfTwo(configuration.getSizeOfBatchIndexingRequestQueue());
    }
    int i = numberOfIndexingRequestProcessors;
    while(i--!=0) {
        ExecutorService executorService = getExecutorService();
        executors[i]  = executorService;
        Disruptor<RelatedItemReference> disruptor = new Disruptor<RelatedItemReference>(
                messageFactory,
                sizeOfQueue, executorService,
                ProducerType.SINGLE, configuration.getWaitStrategyFactory().createWaitStrategy());

        disruptors[i]  = disruptor;
        handlers[i] = relatedItemIndexingEventHandlerFactory.getHandler();
        disruptor.handleExceptionsWith(new IgnoreExceptionHandler());
        disruptor.handleEventsWith(handlers[i]);

        disruptor.start();

    }
    nextDisruptor[COUNTER_POS] = 1;
    batchSize = configuration.getIndexBatchSize();

    batchMessages= new ArrayList<RelatedItem>(batchSize + configuration.getMaxNumberOfRelatedItemsPerItem());
}