Java 类com.lmax.disruptor.WorkHandler 实例源码

项目:disruptor-code-analysis    文件:Disruptor.java   
EventHandlerGroup<T> createWorkerPool(
        final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers) {
    final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
    final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
    consumerRepository.add(workerPool, sequenceBarrier);
    return new EventHandlerGroup<T>(this, consumerRepository, workerPool.getWorkerSequences());
}
项目:Electrons    文件:Dispatcher.java   
/**
 * 初始化正常管道,任何情况下都会有
 *
 * @param pool 线程池
 */
private void initNormalChannel(ExecutorService pool) {
    Disruptor<ElectronsHolder> normalDis = new Disruptor<>(ElectronsHolder::new, conf.getCircuitLen(), pool, ProducerType.MULTI, new LiteBlockingWaitStrategy());
    WorkHandler[] workHandlers = new WorkHandler[conf.getCircuitNum()];
    Arrays.fill(workHandlers, (WorkHandler<ElectronsHolder>) electronsHolder -> electronsHolder.handle());
    normalDis.handleEventsWithWorkerPool(workHandlers);
    normalDis.handleExceptionsWith(new ElecExceptionHandler("Normal Disruptor"));

    //初始化channel
    Channel normalChannel = new NormalChannel(normalDis);
    //配置限流相关
    normalChannel.confLimitRate(conf.isLimitRate(), conf.getPermitsPerSecond(), conf.isWarmup(), conf.getWarmupPeriod(), conf.getWarmPeriodUnit());
    channelMap.put(NORMAL_CHANNEL_KEY, normalChannel);
}
项目:Scaled-ML    文件:ParallelModule.java   
@Override
protected void configure() {
    configureCommonBeans();
    bindConstant().annotatedWith(Names.named("statsCollectors")).to(options.threads());
    bind(new TypeLiteral<EventHandler<TwoPhaseEvent<Increment>>>() {
    }).to(WriteUpdatesEventHandler.class).asEagerSingleton();
    bind(new TypeLiteral<WorkHandler<TwoPhaseEvent<Increment>>>() {
    }).to(LearnWorkHandler.class);
    bind(OutputFormat.class).to(StatisticsCalculator.class);
}
项目:Scaled-ML    文件:SemiParallelModule.java   
@Override
protected void configure() {
    configureCommonBeans();
    bindConstant().annotatedWith(Names.named("statsCollectors")).to(1);
    bind(new TypeLiteral<EventHandler<TwoPhaseEvent<SparseItem>>>() {
    }).to(LearnEventHandler.class).asEagerSingleton();
    bind(new TypeLiteral<WorkHandler<TwoPhaseEvent<SparseItem>>>() {
    }).to(ParseInputWorkHandler.class);
    bind(OutputFormat.class).to(StatisticsCalculator.class).asEagerSingleton();
}
项目:darks-grid    文件:DisruptorEventsChannel.java   
@Override
public boolean initialize(EventsChannelConfig config)
{
       super.initialize(config);
    log.info("Initialize disruptor events channel " + config.getName() + " with " + config);
    EventFactory<GridEvent> eventFactory = new DisruptorEventFactory();
       int ringBufferSize = config.getBlockQueueMaxNumber(); 
       int threadSize = config.getEventConsumerNumber();
       int bufferSize = ringBufferSize;
       if (Integer.bitCount(bufferSize) != 1)
       {
           bufferSize = (int) Math.pow(2, (int) (Math.log(ringBufferSize) / Math.log(2)));
           log.warn("Change disruptor events channel " + config.getName() + 
                   " buffer size from " + ringBufferSize + " to " + bufferSize);
       }
       if (bufferSize <= 0)
           throw new GridException("Invalid disruptor ringbuffur size:" + ringBufferSize);
       threadPool = Executors.newFixedThreadPool(threadSize);
       ringBuffer = RingBuffer.createMultiProducer(eventFactory, bufferSize, new BlockingWaitStrategy());  
       SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
       ExecutorService executor = Executors.newFixedThreadPool(10);  
       @SuppressWarnings("unchecked")
       WorkHandler<GridEvent>[] workHandlers = new WorkHandler[threadSize];  
       for (int i = 0; i < threadSize; i++) {  
           WorkHandler<GridEvent> workHandler = new DisruptorEventsWorkHandler(getName());  
           workHandlers[i] = workHandler;  
       }  

       workerPool = new WorkerPool<GridEvent>(ringBuffer, sequenceBarrier, 
               new IgnoreExceptionHandler(), workHandlers);  
       workerPool.start(executor);  
    return true;
}
项目:logback-ext    文件:DisruptorAppender.java   
@SuppressWarnings("unchecked")
private WorkHandler<LogEvent<E>>[] createWorkers() {
    WorkHandler<LogEvent<E>> handler = new ClearingWorkHandler<>(workHandler);
    WorkHandler<LogEvent<E>>[] workers = new WorkHandler[threadPoolSize];
    for (int i = 0; i < threadPoolSize; i++) {
        workers[i] = handler;
    }
    return workers;
}
项目:logback-ext    文件:DisruptorAppender.java   
public final void setWorkHandler(WorkHandler<LogEvent<E>> workHandler) {
    this.workHandler = workHandler;
}
项目:logback-ext    文件:DisruptorAppender.java   
public ClearingWorkHandler(WorkHandler<LogEvent<E>> delegate) {
    this.delegate = delegate;
}
项目:annotated-src    文件:Disruptor.java   
EventHandlerGroup<T> createWorkerPool(final Sequence[] barrierSequences, final WorkHandler<T>[] workHandlers) {
    final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
    final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
    consumerRepository.add(workerPool, sequenceBarrier);
    return new EventHandlerGroup<T>(this, consumerRepository, workerPool.getWorkerSequences());
}
项目:disruptor-code-analysis    文件:Disruptor.java   
/**
 * Set up a {@link WorkerPool} to distribute an event to one of a pool of work handler threads.
 * Each event will only be processed by one of the work handlers.
 * The Disruptor will automatically start this processors when {@link #start()} is called.
 *
 * @param workHandlers the work handlers that will process events.
 * @return a {@link EventHandlerGroup} that can be used to chain dependencies.
 */
@SuppressWarnings("varargs")
public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers) {
    return createWorkerPool(new Sequence[0], workHandlers);
}
项目:disruptor-code-analysis    文件:EventHandlerGroup.java   
/**
 * Set up a worker pool to handle events from the ring buffer. The worker pool will only process events
 * after every {@link EventProcessor} in this group has processed the event. Each event will be processed
 * by one of the work handler instances.
 * <p>
 * <p>This method is generally used as part of a chain. For example if the handler <code>A</code> must
 * process events before the worker pool with handlers <code>B, C</code>:</p>
 * <p>
 * <pre><code>dw.handleEventsWith(A).thenHandleEventsWithWorkerPool(B, C);</code></pre>
 *
 * @param handlers the work handlers that will process events. Each work handler instance will provide an extra thread in the worker pool.
 * @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors.
 */
public EventHandlerGroup<T> thenHandleEventsWithWorkerPool(final WorkHandler<? super T>... handlers)
{
    return handleEventsWithWorkerPool(handlers);
}
项目:disruptor-code-analysis    文件:EventHandlerGroup.java   
/**
 * Set up a worker pool to handle events from the ring buffer. The worker pool will only process events
 * after every {@link EventProcessor} in this group has processed the event. Each event will be processed
 * by one of the work handler instances.
 * <p>
 * <p>This method is generally used as part of a chain. For example if the handler <code>A</code> must
 * process events before the worker pool with handlers <code>B, C</code>:</p>
 * <p>
 * <pre><code>dw.after(A).handleEventsWithWorkerPool(B, C);</code></pre>
 *
 * @param handlers the work handlers that will process events. Each work handler instance will provide an extra thread in the worker pool.
 * @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors.
 */
public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<? super T>... handlers)
{
    return disruptor.createWorkerPool(sequences, handlers);
}
项目:annotated-src    文件:Disruptor.java   
/**
 * Set up a {@link WorkerPool} to distribute an event to one of a pool of work handler threads.
 * Each event will only be processed by one of the work handlers.
 * The Disruptor will automatically start this processors when {@link #start()} is called.
 *
 * @param workHandlers the work handlers that will process events.
 * @return a {@link EventHandlerGroup} that can be used to chain dependencies.
 */
public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers) {
    return createWorkerPool(new Sequence[0], workHandlers);
}
项目:annotated-src    文件:EventHandlerGroup.java   
/**
 * Set up a worker pool to handle events from the ring buffer. The worker pool will only process events
 * after every {@link EventProcessor} in this group has processed the event. Each event will be processed
 * by one of the work handler instances.
 *
 * <p>This method is generally used as part of a chain. For example if the handler <code>A</code> must
 * process events before the worker pool with handlers <code>B, C</code>:</p>
 *
 * <pre><code>dw.handleEventsWith(A).thenHandleEventsWithWorkerPool(B, C);</code></pre>
 *
 * @param handlers the work handlers that will process events. Each work handler instance will provide an extra thread in the worker pool.
 * @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors.
 */
public EventHandlerGroup<T> thenHandleEventsWithWorkerPool(final WorkHandler<T>... handlers) {
    return handleEventsWithWorkerPool(handlers);
}
项目:annotated-src    文件:EventHandlerGroup.java   
/**
 * Set up a worker pool to handle events from the ring buffer. The worker pool will only process events
 * after every {@link EventProcessor} in this group has processed the event. Each event will be processed
 * by one of the work handler instances.
 *
 * <p>This method is generally used as part of a chain. For example if the handler <code>A</code> must
 * process events before the worker pool with handlers <code>B, C</code>:</p>
 *
 * <pre><code>dw.after(A).handleEventsWithWorkerPool(B, C);</code></pre>
 *
 * @param handlers the work handlers that will process events. Each work handler instance will provide an extra thread in the worker pool.
 * @return a {@link EventHandlerGroup} that can be used to set up a event processor barrier over the created event processors.
 */
public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... handlers) {
    return disruptor.createWorkerPool(sequences, handlers);
}