@Override public boolean initialize(EventsChannelConfig config) { super.initialize(config); log.info("Initialize disruptor events channel " + config.getName() + " with " + config); EventFactory<GridEvent> eventFactory = new DisruptorEventFactory(); int ringBufferSize = config.getBlockQueueMaxNumber(); int threadSize = config.getEventConsumerNumber(); int bufferSize = ringBufferSize; if (Integer.bitCount(bufferSize) != 1) { bufferSize = (int) Math.pow(2, (int) (Math.log(ringBufferSize) / Math.log(2))); log.warn("Change disruptor events channel " + config.getName() + " buffer size from " + ringBufferSize + " to " + bufferSize); } if (bufferSize <= 0) throw new GridException("Invalid disruptor ringbuffur size:" + ringBufferSize); threadPool = Executors.newFixedThreadPool(threadSize); ringBuffer = RingBuffer.createMultiProducer(eventFactory, bufferSize, new BlockingWaitStrategy()); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); ExecutorService executor = Executors.newFixedThreadPool(10); @SuppressWarnings("unchecked") WorkHandler<GridEvent>[] workHandlers = new WorkHandler[threadSize]; for (int i = 0; i < threadSize; i++) { WorkHandler<GridEvent> workHandler = new DisruptorEventsWorkHandler(getName()); workHandlers[i] = workHandler; } workerPool = new WorkerPool<GridEvent>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), workHandlers); workerPool.start(executor); return true; }
public DisruptorExecutor(int threadCount, int bufferSize, WaitStrategy waitStrategy) { ringBuffer = RingBuffer.createMultiProducer(new EventFactory<RContainer>() { @Override public RContainer newInstance() { return new RContainer(); } }, bufferSize, waitStrategy); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); Sequence workSequence = new Sequence(-1); workProcessors = new WorkProcessor[threadCount]; for (int i = 0 ; i < threadCount ; i++) { workProcessors[i] = new WorkProcessor<RContainer>(ringBuffer, sequenceBarrier, handler, new IgnoreExceptionHandler(), workSequence); } workExec = Executors.newFixedThreadPool(workProcessors.length, new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } }); for (WorkProcessor p : workProcessors) workExec.execute(p); }
public RoundRobinRelatedItemIndexingMessageEventHandler(final Configuration configuration, RelatedItemIndexingMessageConverter converter, RelatedItemReferenceMessageFactory messageFactory, RelatedItemReferenceEventHandlerFactory relatedItemIndexingEventHandlerFactory ) { this.configuration = configuration; this.converter = converter; int numberOfIndexingRequestProcessors = Util.ceilingNextPowerOfTwo(configuration.getNumberOfIndexingRequestProcessors()); disruptors = new Disruptor[numberOfIndexingRequestProcessors]; executors = new ExecutorService[numberOfIndexingRequestProcessors]; handlers = new RelatedItemReferenceEventHandler[numberOfIndexingRequestProcessors]; mask = numberOfIndexingRequestProcessors-1; final int sizeOfQueue; if(configuration.getSizeOfBatchIndexingRequestQueue()==-1) { sizeOfQueue = Util.ceilingNextPowerOfTwo(configuration.getSizeOfIncomingMessageQueue()/numberOfIndexingRequestProcessors); } else { sizeOfQueue = Util.ceilingNextPowerOfTwo(configuration.getSizeOfBatchIndexingRequestQueue()); } int i = numberOfIndexingRequestProcessors; while(i--!=0) { ExecutorService executorService = getExecutorService(); executors[i] = executorService; Disruptor<RelatedItemReference> disruptor = new Disruptor<RelatedItemReference>( messageFactory, sizeOfQueue, executorService, ProducerType.SINGLE, configuration.getWaitStrategyFactory().createWaitStrategy()); disruptors[i] = disruptor; handlers[i] = relatedItemIndexingEventHandlerFactory.getHandler(); disruptor.handleExceptionsWith(new IgnoreExceptionHandler()); disruptor.handleEventsWith(handlers[i]); disruptor.start(); } nextDisruptor[COUNTER_POS] = 1; batchSize = configuration.getIndexBatchSize(); batchMessages= new ArrayList<RelatedItem>(batchSize + configuration.getMaxNumberOfRelatedItemsPerItem()); }