EventHandlerGroup<T> createWorkerPool( final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers) { final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences); final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers); consumerRepository.add(workerPool, sequenceBarrier); return new EventHandlerGroup<T>(this, consumerRepository, workerPool.getWorkerSequences()); }
/** * 初始化正常管道,任何情况下都会有 * * @param pool 线程池 */ private void initNormalChannel(ExecutorService pool) { Disruptor<ElectronsHolder> normalDis = new Disruptor<>(ElectronsHolder::new, conf.getCircuitLen(), pool, ProducerType.MULTI, new LiteBlockingWaitStrategy()); WorkHandler[] workHandlers = new WorkHandler[conf.getCircuitNum()]; Arrays.fill(workHandlers, (WorkHandler<ElectronsHolder>) electronsHolder -> electronsHolder.handle()); normalDis.handleEventsWithWorkerPool(workHandlers); normalDis.handleExceptionsWith(new ElecExceptionHandler("Normal Disruptor")); //初始化channel Channel normalChannel = new NormalChannel(normalDis); //配置限流相关 normalChannel.confLimitRate(conf.isLimitRate(), conf.getPermitsPerSecond(), conf.isWarmup(), conf.getWarmupPeriod(), conf.getWarmPeriodUnit()); channelMap.put(NORMAL_CHANNEL_KEY, normalChannel); }
@Override protected void configure() { configureCommonBeans(); bindConstant().annotatedWith(Names.named("statsCollectors")).to(options.threads()); bind(new TypeLiteral<EventHandler<TwoPhaseEvent<Increment>>>() { }).to(WriteUpdatesEventHandler.class).asEagerSingleton(); bind(new TypeLiteral<WorkHandler<TwoPhaseEvent<Increment>>>() { }).to(LearnWorkHandler.class); bind(OutputFormat.class).to(StatisticsCalculator.class); }
@Override protected void configure() { configureCommonBeans(); bindConstant().annotatedWith(Names.named("statsCollectors")).to(1); bind(new TypeLiteral<EventHandler<TwoPhaseEvent<SparseItem>>>() { }).to(LearnEventHandler.class).asEagerSingleton(); bind(new TypeLiteral<WorkHandler<TwoPhaseEvent<SparseItem>>>() { }).to(ParseInputWorkHandler.class); bind(OutputFormat.class).to(StatisticsCalculator.class).asEagerSingleton(); }
@Override public boolean initialize(EventsChannelConfig config) { super.initialize(config); log.info("Initialize disruptor events channel " + config.getName() + " with " + config); EventFactory<GridEvent> eventFactory = new DisruptorEventFactory(); int ringBufferSize = config.getBlockQueueMaxNumber(); int threadSize = config.getEventConsumerNumber(); int bufferSize = ringBufferSize; if (Integer.bitCount(bufferSize) != 1) { bufferSize = (int) Math.pow(2, (int) (Math.log(ringBufferSize) / Math.log(2))); log.warn("Change disruptor events channel " + config.getName() + " buffer size from " + ringBufferSize + " to " + bufferSize); } if (bufferSize <= 0) throw new GridException("Invalid disruptor ringbuffur size:" + ringBufferSize); threadPool = Executors.newFixedThreadPool(threadSize); ringBuffer = RingBuffer.createMultiProducer(eventFactory, bufferSize, new BlockingWaitStrategy()); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); ExecutorService executor = Executors.newFixedThreadPool(10); @SuppressWarnings("unchecked") WorkHandler<GridEvent>[] workHandlers = new WorkHandler[threadSize]; for (int i = 0; i < threadSize; i++) { WorkHandler<GridEvent> workHandler = new DisruptorEventsWorkHandler(getName()); workHandlers[i] = workHandler; } workerPool = new WorkerPool<GridEvent>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), workHandlers); workerPool.start(executor); return true; }
@SuppressWarnings("unchecked") private WorkHandler<LogEvent<E>>[] createWorkers() { WorkHandler<LogEvent<E>> handler = new ClearingWorkHandler<>(workHandler); WorkHandler<LogEvent<E>>[] workers = new WorkHandler[threadPoolSize]; for (int i = 0; i < threadPoolSize; i++) { workers[i] = handler; } return workers; }
public final void setWorkHandler(WorkHandler<LogEvent<E>> workHandler) { this.workHandler = workHandler; }
public ClearingWorkHandler(WorkHandler<LogEvent<E>> delegate) { this.delegate = delegate; }
EventHandlerGroup<T> createWorkerPool(final Sequence[] barrierSequences, final WorkHandler<T>[] workHandlers) { final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences); final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers); consumerRepository.add(workerPool, sequenceBarrier); return new EventHandlerGroup<T>(this, consumerRepository, workerPool.getWorkerSequences()); }
/** * Set up a {@link WorkerPool} to distribute an event to one of a pool of work handler threads. * Each event will only be processed by one of the work handlers. * The Disruptor will automatically start this processors when {@link #start()} is called. * * @param workHandlers the work handlers that will process events. * @return a {@link EventHandlerGroup} that can be used to chain dependencies. */ @SuppressWarnings("varargs") public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers) { return createWorkerPool(new Sequence[0], workHandlers); }
/** * Set up a worker pool to handle events from the ring buffer. The worker pool will only process events * after every {@link EventProcessor} in this group has processed the event. Each event will be processed * by one of the work handler instances. * <p> * <p>This method is generally used as part of a chain. For example if the handler <code>A</code> must * process events before the worker pool with handlers <code>B, C</code>:</p> * <p> * <pre><code>dw.handleEventsWith(A).thenHandleEventsWithWorkerPool(B, C);</code></pre> * * @param handlers the work handlers that will process events. Each work handler instance will provide an extra thread in the worker pool. * @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors. */ public EventHandlerGroup<T> thenHandleEventsWithWorkerPool(final WorkHandler<? super T>... handlers) { return handleEventsWithWorkerPool(handlers); }
/** * Set up a worker pool to handle events from the ring buffer. The worker pool will only process events * after every {@link EventProcessor} in this group has processed the event. Each event will be processed * by one of the work handler instances. * <p> * <p>This method is generally used as part of a chain. For example if the handler <code>A</code> must * process events before the worker pool with handlers <code>B, C</code>:</p> * <p> * <pre><code>dw.after(A).handleEventsWithWorkerPool(B, C);</code></pre> * * @param handlers the work handlers that will process events. Each work handler instance will provide an extra thread in the worker pool. * @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors. */ public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<? super T>... handlers) { return disruptor.createWorkerPool(sequences, handlers); }
/** * Set up a {@link WorkerPool} to distribute an event to one of a pool of work handler threads. * Each event will only be processed by one of the work handlers. * The Disruptor will automatically start this processors when {@link #start()} is called. * * @param workHandlers the work handlers that will process events. * @return a {@link EventHandlerGroup} that can be used to chain dependencies. */ public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers) { return createWorkerPool(new Sequence[0], workHandlers); }
/** * Set up a worker pool to handle events from the ring buffer. The worker pool will only process events * after every {@link EventProcessor} in this group has processed the event. Each event will be processed * by one of the work handler instances. * * <p>This method is generally used as part of a chain. For example if the handler <code>A</code> must * process events before the worker pool with handlers <code>B, C</code>:</p> * * <pre><code>dw.handleEventsWith(A).thenHandleEventsWithWorkerPool(B, C);</code></pre> * * @param handlers the work handlers that will process events. Each work handler instance will provide an extra thread in the worker pool. * @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors. */ public EventHandlerGroup<T> thenHandleEventsWithWorkerPool(final WorkHandler<T>... handlers) { return handleEventsWithWorkerPool(handlers); }
/** * Set up a worker pool to handle events from the ring buffer. The worker pool will only process events * after every {@link EventProcessor} in this group has processed the event. Each event will be processed * by one of the work handler instances. * * <p>This method is generally used as part of a chain. For example if the handler <code>A</code> must * process events before the worker pool with handlers <code>B, C</code>:</p> * * <pre><code>dw.after(A).handleEventsWithWorkerPool(B, C);</code></pre> * * @param handlers the work handlers that will process events. Each work handler instance will provide an extra thread in the worker pool. * @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors. */ public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... handlers) { return disruptor.createWorkerPool(sequences, handlers); }