@Override public void startup() { EventBus eventBus = disruptorDispatchThread.getEventBus(); executorService = new NonOrderedQueuePoolExecutor(poolName, excutorSize); cycleEventHandler = new CycleEventHandler[excutorSize]; for(int i = 0; i < excutorSize; i++){ cycleEventHandler[i] = new CycleEventHandler(eventBus); } RingBuffer ringBuffer = disruptorDispatchThread.getRingBuffer(); workerPool = new WorkerPool(ringBuffer, ringBuffer.newBarrier(), new FatalExceptionHandler(), cycleEventHandler); ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); workerPool.start(executorService); // BatchEventProcessor<CycleEvent>[] batchEventProcessors = new BatchEventProcessor[excutorSize]; // for(int i = 0; i < excutorSize; i++){ // batchEventProcessors[i] = new BatchEventProcessor<>(ringBuffer, sequenceBarrier, cycleEventHandler[i]); // ringBuffer.addGatingSequences(batchEventProcessors[i].getSequence()); //// executorService.submit(batchEventProcessors[i]); // } }
EventHandlerGroup<T> createWorkerPool( final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers) { final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences); final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers); consumerRepository.add(workerPool, sequenceBarrier); return new EventHandlerGroup<T>(this, consumerRepository, workerPool.getWorkerSequences()); }
@Override public boolean initialize(EventsChannelConfig config) { super.initialize(config); log.info("Initialize disruptor events channel " + config.getName() + " with " + config); EventFactory<GridEvent> eventFactory = new DisruptorEventFactory(); int ringBufferSize = config.getBlockQueueMaxNumber(); int threadSize = config.getEventConsumerNumber(); int bufferSize = ringBufferSize; if (Integer.bitCount(bufferSize) != 1) { bufferSize = (int) Math.pow(2, (int) (Math.log(ringBufferSize) / Math.log(2))); log.warn("Change disruptor events channel " + config.getName() + " buffer size from " + ringBufferSize + " to " + bufferSize); } if (bufferSize <= 0) throw new GridException("Invalid disruptor ringbuffur size:" + ringBufferSize); threadPool = Executors.newFixedThreadPool(threadSize); ringBuffer = RingBuffer.createMultiProducer(eventFactory, bufferSize, new BlockingWaitStrategy()); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); ExecutorService executor = Executors.newFixedThreadPool(10); @SuppressWarnings("unchecked") WorkHandler<GridEvent>[] workHandlers = new WorkHandler[threadSize]; for (int i = 0; i < threadSize; i++) { WorkHandler<GridEvent> workHandler = new DisruptorEventsWorkHandler(getName()); workHandlers[i] = workHandler; } workerPool = new WorkerPool<GridEvent>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), workHandlers); workerPool.start(executor); return true; }
public KafkaRpcServer(WorkerPool<ServerEvent> workerPool, RequestConsumer requestConsumer, ServerEventHandler eventHandler, int workerThreads) { this.workerPool = workerPool; this.requestConsumer = requestConsumer; this.eventHandler = eventHandler; this.workerThreads = workerThreads; }
public RpcServer build() { final int workerThreads = 3; Producer<String, Response> responseProducer = new KafkaProducer<>(producerProps, new StringSerializer(), producerValueSerializer); final ServerEventHandler[] workHandlers = new ServerEventHandler[workerThreads]; IntStream.range(0, workerThreads).forEach( nbr -> workHandlers[nbr] = new ServerEventHandler(requestHandler, responseProducer) ); final WorkerPool<ServerEvent> workerPool = new WorkerPool<>(ServerEvent::new, new FatalExceptionHandler(), workHandlers); RequestConsumer requestConsumer = new RequestConsumer(topic, consumerProps, consumerThreads, consumerValueDeserializer); return new KafkaRpcServer(workerPool, requestConsumer, new ServerEventHandler(requestHandler, responseProducer), workerThreads); }
public WorkerPool getWorkerPool() { return workerPool; }
public void setWorkerPool(WorkerPool workerPool) { this.workerPool = workerPool; }
EventHandlerGroup<T> createWorkerPool(final Sequence[] barrierSequences, final WorkHandler<T>[] workHandlers) { final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences); final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers); consumerRepository.add(workerPool, sequenceBarrier); return new EventHandlerGroup<T>(this, consumerRepository, workerPool.getWorkerSequences()); }
private static void bulkLoad( final Cache cache, long entriesToLoad,double[] bbox) { final AtomicLong atomicLong = new AtomicLong(); final ValueHandler[] valueHandlers = new ValueHandler[]{ new ValueHandler(cache, atomicLong), new ValueHandler(cache, atomicLong), new ValueHandler(cache, atomicLong), new ValueHandler(cache, atomicLong) }; final ExecutorService executorService = Executors.newCachedThreadPool(); try { final RingBuffer<ValueEvent> ringBuffer = RingBuffer.createSingleProducer(ValueEvent.EVENT_FACTORY, 4096, new YieldingWaitStrategy()); final WorkerPool<ValueEvent> workerPool = new WorkerPool<ValueEvent>(ringBuffer, ringBuffer.newBarrier(), new FatalExceptionHandler(), valueHandlers); workerPool.start(executorService); try { publishLoadEvents(ringBuffer,entriesToLoad,bbox); } finally { workerPool.drainAndHalt(); } } finally { executorService.shutdown(); } System.out.format("Put %d elements.", atomicLong.get()); }
/** * @param maxQueueSz * @param numThreads * @param name */ public RequestQueueProcessor(int maxQueueSz, int numThreads, String name) { m_name = name; m_maxQueueSz = maxQueueSz; m_ringBuffer = RingBuffer.createMultiProducer(m_eventFactory, normalizeBufferSize(m_maxQueueSz), new BlockingWaitStrategy()); m_barrier = m_ringBuffer.newBarrier(); m_numThreads = numThreads; QueueProcessorWorkHandler[] handlers = new QueueProcessorWorkHandler[m_numThreads]; for (int i = 0; i < m_numThreads; i++) { handlers[i] = new QueueProcessorWorkHandler(); } m_worker = new WorkerPool(m_ringBuffer, m_barrier, new QueueProcessorExceptionHandler(m_dropCounter, name), handlers); m_ringBuffer.addGatingSequences(m_worker.getWorkerSequences()); m_executor = Executors.newFixedThreadPool(m_numThreads, new NameableThreadFactory(name)); m_ringBuffer = m_worker.start(m_executor); }