Java 类com.lmax.disruptor.BatchEventProcessor 实例源码

项目:disruptor-code-analysis    文件:CustomPerformanceTest.java   
private void doRun() throws InterruptedException
{
    BatchEventProcessor<?> batchEventProcessor = ringBuffer.createHandler(new SimpleEventHandler());

    Thread t = new Thread(batchEventProcessor);
    t.start();

    long iterations = Constants.ITERATIONS;
    for (long l = 0; l < iterations; l++)
    {
        SimpleEvent e = new SimpleEvent(l, l, l, l);
        ringBuffer.put(e);
    }

    while (batchEventProcessor.getSequence().get() != iterations - 1)
    {
        LockSupport.parkNanos(1);
    }

    batchEventProcessor.halt();
    t.join();
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldSupportCustomProcessorsAsDependencies()
    throws Exception
{
    RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();

    final DelayedEventHandler delayedEventHandler = createDelayedEventHandler();

    CountDownLatch countDownLatch = new CountDownLatch(2);
    EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch);

    final BatchEventProcessor<TestEvent> processor =
        new BatchEventProcessor<TestEvent>(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler);
    disruptor.handleEventsWith(processor);
    disruptor.after(processor).handleEventsWith(handlerWithBarrier);

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler);
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldSupportHandlersAsDependenciesToCustomProcessors()
    throws Exception
{
    final DelayedEventHandler delayedEventHandler = createDelayedEventHandler();
    disruptor.handleEventsWith(delayedEventHandler);


    RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();
    CountDownLatch countDownLatch = new CountDownLatch(2);
    EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch);

    final SequenceBarrier sequenceBarrier = disruptor.after(delayedEventHandler).asSequenceBarrier();
    final BatchEventProcessor<TestEvent> processor =
        new BatchEventProcessor<TestEvent>(ringBuffer, sequenceBarrier, handlerWithBarrier);
    disruptor.handleEventsWith(processor);

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler);
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldSupportCustomProcessorsAndHandlersAsDependencies() throws Exception
{
    final DelayedEventHandler delayedEventHandler1 = createDelayedEventHandler();
    final DelayedEventHandler delayedEventHandler2 = createDelayedEventHandler();
    disruptor.handleEventsWith(delayedEventHandler1);


    RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();
    CountDownLatch countDownLatch = new CountDownLatch(2);
    EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch);

    final SequenceBarrier sequenceBarrier = disruptor.after(delayedEventHandler1).asSequenceBarrier();
    final BatchEventProcessor<TestEvent> processor =
        new BatchEventProcessor<TestEvent>(ringBuffer, sequenceBarrier, delayedEventHandler2);

    disruptor.after(delayedEventHandler1).and(processor).handleEventsWith(handlerWithBarrier);

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler1, delayedEventHandler2);
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldMakeEntriesAvailableToFirstCustomProcessorsImmediately() throws Exception
{
    final CountDownLatch countDownLatch = new CountDownLatch(2);
    final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch);

    disruptor.handleEventsWith(
                               new EventProcessorFactory<TestEvent>()
                               {
                                   @Override
                                   public EventProcessor createEventProcessor(
                                                                              final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences)
                                   {
                                       assertEquals("Should not have had any barrier sequences", 0, barrierSequences.length);
                                       return new BatchEventProcessor<TestEvent>(
                                                                                 disruptor.getRingBuffer(), ringBuffer.newBarrier(
                                                                                                                                  barrierSequences), eventHandler);
                                   }
                               });

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch);
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldHonourDependenciesForCustomProcessors() throws Exception
{
    final CountDownLatch countDownLatch = new CountDownLatch(2);
    final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch);
    final DelayedEventHandler delayedEventHandler = createDelayedEventHandler();

    disruptor.handleEventsWith(delayedEventHandler).then(
        new EventProcessorFactory<TestEvent>()
        {
            @Override
            public EventProcessor createEventProcessor(
                final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences)
            {
                assertSame("Should have had a barrier sequence", 1, barrierSequences.length);
                return new BatchEventProcessor<TestEvent>(
                    disruptor.getRingBuffer(), ringBuffer.newBarrier(
                    barrierSequences), eventHandler);
            }
        });

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler);
}
项目:andes    文件:SimpleMessaging.java   
public void init(Properties configProps) {
    // subscriptions = new SubscriptionsStore();
    //Modified by WSO2 in-order to extend the capability of the existing subscriptions store
    //to be more suitable for the distribution architecture of Andes
    subscriptions = new MQTTSubscriptionStore();
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
            .setNameFormat("Disruptor MQTT Simple Messaging Thread %d").build();
    ExecutorService executor = Executors.newCachedThreadPool(namedThreadFactory);
    Integer ringBufferSize = AndesConfigurationManager.readValue(
            AndesConfiguration.TRANSPORTS_MQTT_INBOUND_BUFFER_SIZE);

    disruptor = new Disruptor<ValueEvent>( ValueEvent.EVENT_FACTORY, ringBufferSize, executor);
    //Added by WSO2, we do not want to ignore the exception here
    disruptor.handleExceptionsWith(new MqttLogExceptionHandler());
    SequenceBarrier barrier = disruptor.getRingBuffer().newBarrier();
    BatchEventProcessor<ValueEvent> eventProcessor = new BatchEventProcessor<ValueEvent>(
            disruptor.getRingBuffer(), barrier, this);
    //Added by WSO2, we need to make sure the exceptions aren't ignored
    eventProcessor.setExceptionHandler(new MqttLogExceptionHandler());
    disruptor.handleEventsWith(eventProcessor);
    m_ringBuffer = disruptor.start();

    disruptorPublish(new InitEvent(configProps));
}
项目:kevoree-library    文件:ProtocolProcessor.java   
/**
 * @param subscriptions the subscription store where are stored all the existing
 *  clients subscriptions.
 * @param storageService the persistent store to use for save/load of messages
 *  for QoS1 and QoS2 handling.
 * @param authenticator the authenticator used in connect messages
 */
void init(SubscriptionsStore subscriptions, IStorageService storageService,
          IAuthenticator authenticator) {
    //m_clientIDs = clientIDs;
    this.subscriptions = subscriptions;
    m_authenticator = authenticator;
    m_storageService = storageService;

    //init the output ringbuffer
    m_executor = Executors.newFixedThreadPool(1);

    m_ringBuffer = new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY, 1024 * 32);

    SequenceBarrier barrier = m_ringBuffer.newBarrier();
    m_eventProcessor = new BatchEventProcessor<ValueEvent>(m_ringBuffer, barrier, this);
    //TODO in a presentation is said to don't do the followinf line!!
    m_ringBuffer.setGatingSequences(m_eventProcessor.getSequence());
    m_executor.submit(m_eventProcessor);
}
项目:disruptor-code-analysis    文件:Disruptor.java   
EventHandlerGroup<T> createEventProcessors(
        final Sequence[] barrierSequences,
        final EventHandler<? super T>[] eventHandlers) {
    checkNotStarted();

    final Sequence[] processorSequences = new Sequence[eventHandlers.length];
    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

    for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
        final EventHandler<? super T> eventHandler = eventHandlers[i];

        final BatchEventProcessor<T> batchEventProcessor =
                new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);

        if (exceptionHandler != null) {
            batchEventProcessor.setExceptionHandler(exceptionHandler);
        }

        consumerRepository.add(batchEventProcessor, eventHandler, barrier);
        processorSequences[i] = batchEventProcessor.getSequence();
    }

    if (processorSequences.length > 0) {
        consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
    }

    return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
}
项目:disruptor-code-analysis    文件:ExceptionHandlerSetting.java   
/**
 * Specify the {@link ExceptionHandler} to use with the event handler.
 *
 * @param exceptionHandler the exception handler to use.
 */
public void with(ExceptionHandler<? super T> exceptionHandler)
{
    ((BatchEventProcessor<T>) consumerRepository.getEventProcessorFor(eventHandler))
        .setExceptionHandler(exceptionHandler);
    consumerRepository.getBarrierFor(eventHandler).alert();
}
项目:disruptor-code-analysis    文件:SimplePerformanceTest.java   
private void doRun() throws InterruptedException
{
    BatchEventProcessor<EventHolder> batchEventProcessor =
        new BatchEventProcessor<EventHolder>(
            ringBuffer,
            ringBuffer.newBarrier(),
            eventHolderHandler);
    ringBuffer.addGatingSequences(batchEventProcessor.getSequence());

    Thread t = new Thread(batchEventProcessor);
    t.start();

    long iterations = Constants.ITERATIONS;
    for (long l = 0; l < iterations; l++)
    {
        SimpleEvent e = new SimpleEvent(l, l, l, l);
        ringBuffer.publishEvent(TRANSLATOR, e);
    }

    while (batchEventProcessor.getSequence().get() != iterations - 1)
    {
        LockSupport.parkNanos(1);
    }

    batchEventProcessor.halt();
    t.join();
}
项目:disruptor-code-analysis    文件:CustomRingBuffer.java   
public BatchEventProcessor<EventAccessor<T>> createHandler(final EventHandler<T> handler)
{
    BatchEventProcessor<EventAccessor<T>> processor =
        new BatchEventProcessor<EventAccessor<T>>(
            this,
            sequencer.newBarrier(),
            new AccessorEventHandler<T>(handler));
    sequencer.addGatingSequences(processor.getSequence());

    return processor;
}
项目:disruptor-code-analysis    文件:LongRingBuffer.java   
public BatchEventProcessor<LongEvent> createProcessor(final LongHandler handler)
{
    return new BatchEventProcessor<LongEvent>(
        new LongEvent(),
        sequencer.newBarrier(),
        new EventHandler<LongEvent>()
        {
            @Override
            public void onEvent(final LongEvent event, final long sequence, final boolean endOfBatch)
                throws Exception
            {
                handler.onEvent(event.get(), sequence, endOfBatch);
            }
        });
}
项目:low-latency-high-throughput    文件:Demo1PMC.java   
public static void main(String[] args) {

    final List<ValueEventHandler1PMC> handlers = new ArrayList<>(NUMBER_CONSUMERS);

    RingBuffer<ValueEvent> ringBuffer = RingBuffer.createSingleProducer(
            ValueEvent.EVENT_FACTORY, RING_SIZE, new SleepingWaitStrategy());

    start = System.nanoTime();

    //Create consumers
    for(int i = 0;  i < NUMBER_CONSUMERS; i++) {
        ValueEventHandler1PMC handler = new ValueEventHandler1PMC(start, handlers);
        handlers.add(handler);
        SequenceBarrier barrier = ringBuffer.newBarrier();
        BatchEventProcessor<ValueEvent> eventProcessor = new BatchEventProcessor<ValueEvent>(
                ringBuffer, barrier, handler);
        ringBuffer.addGatingSequences(eventProcessor.getSequence());

        // Each EventProcessor can run on a separate thread
        EXECUTOR.submit(eventProcessor);
    }

    for(int i = 0;  i  < SAMPLES_SIZE; i++) {
        // Publishers claim events in sequence
        long sequence = ringBuffer.next();
        ValueEvent event = ringBuffer.get(sequence);

        event.setValue(i); // this could be more complex with multiple fields

        // make the event available to EventProcessors
        ringBuffer.publish(sequence);   
    }

}
项目:FireFly    文件:DisruptorEventExecutor.java   
public DisruptorEventExecutor() {
//      ringBuffer = RingBuffer.createMultiProducer(WaitingEvent.EVENT_FACTORY, 128, PhasedBackoffWaitStrategy.withLock(1, 1, TimeUnit.MILLISECONDS));
        ringBuffer = RingBuffer.createMultiProducer(WaitingEvent.EVENT_FACTORY, 8192, new BlockingWaitStrategy());

        BatchEventProcessor<WaitingEvent> processor = new BatchEventProcessor<WaitingEvent>(ringBuffer, ringBuffer.newBarrier(), new DisruptorHandler());
        ringBuffer.addGatingSequences(processor.getSequence());
        executeThread = new Thread(processor);
        executeThread.setName("disruptEventExecutor");
        executeThread.start();
    }
项目:kevoree-library    文件:SimpleMessaging.java   
public void init(Properties configProps) {
    subscriptions = new SubscriptionsStore();
    m_executor = Executors.newFixedThreadPool(1);

    m_ringBuffer = new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY, 1024 * 32);

    SequenceBarrier barrier = m_ringBuffer.newBarrier();
    m_eventProcessor = new BatchEventProcessor<ValueEvent>(m_ringBuffer, barrier, this);
    //TODO in a presentation is said to don't do the followinf line!!
    m_ringBuffer.setGatingSequences(m_eventProcessor.getSequence());
    m_executor.submit(m_eventProcessor);

    disruptorPublish(new InitEvent(configProps));
}
项目:moquette-mqtt    文件:SimpleMessaging.java   
public void init() {
    m_executor = Executors.newFixedThreadPool(1);

    m_ringBuffer = new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY, 1024 * 32);

    SequenceBarrier barrier = m_ringBuffer.newBarrier();
    m_eventProcessor = new BatchEventProcessor<ValueEvent>(m_ringBuffer, barrier, this);
    //TODO in a presentation is said to don't do the followinf line!!
    m_ringBuffer.setGatingSequences(m_eventProcessor.getSequence());
    m_executor.submit(m_eventProcessor);

    disruptorPublish(new InitEvent());
}
项目:low-latency-high-throughput    文件:Demo1PMCSequence.java   
public static void main(String[] args) {

        RingBuffer<ValueEvent> ringBuffer = RingBuffer.createSingleProducer(
                ValueEvent.EVENT_FACTORY, RING_SIZE, new SleepingWaitStrategy());

        start = System.nanoTime();

        //Create first consumer
        ValueEventHandler1PMCSequenceFirst handler = new ValueEventHandler1PMCSequenceFirst(start);
        SequenceBarrier barrier;
        barrier = ringBuffer.newBarrier();
        BatchEventProcessor<ValueEvent> firstEventProcessor = new BatchEventProcessor<ValueEvent>(
                ringBuffer, barrier, handler);
//      ringBuffer.addGatingSequences(firstEventProcessor.getSequence());

        // Each EventProcessor can run on a separate thread
        EXECUTOR.submit(firstEventProcessor);

        //Create second consumer
        ValueEventHandler1PMCSequenceSecond handler2 = new ValueEventHandler1PMCSequenceSecond(start);
        SequenceBarrier barrier2 = ringBuffer.newBarrier(firstEventProcessor.getSequence());
        BatchEventProcessor<ValueEvent> secondEventProcessor = new BatchEventProcessor<ValueEvent>(
                ringBuffer, barrier2, handler2);
        ringBuffer.addGatingSequences(secondEventProcessor.getSequence());

        // Each EventProcessor can run on a separate thread
        EXECUTOR.submit(secondEventProcessor);


        for(int i = 0;  i  < SAMPLES_SIZE; i++) {
            // Publishers claim events in sequence
            long sequence = ringBuffer.next();
            ValueEvent event = ringBuffer.get(sequence);

            event.setValue(i); // this could be more complex with multiple fields

            // make the event available to EventProcessors
            ringBuffer.publish(sequence);   
        }

    }
项目:annotated-src    文件:ExceptionHandlerSetting.java   
/**
 * Specify the {@link ExceptionHandler} to use with the event handler.
 *
 * @param exceptionHandler the exception handler to use.
 */
public void with(ExceptionHandler exceptionHandler)
{
    ((BatchEventProcessor<?>) consumerRepository.getEventProcessorFor(eventHandler)).setExceptionHandler(exceptionHandler);
    consumerRepository.getBarrierFor(eventHandler).alert();
}