Java 类com.lmax.disruptor.EventProcessor 实例源码

项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldMakeEntriesAvailableToFirstCustomProcessorsImmediately() throws Exception
{
    final CountDownLatch countDownLatch = new CountDownLatch(2);
    final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch);

    disruptor.handleEventsWith(
                               new EventProcessorFactory<TestEvent>()
                               {
                                   @Override
                                   public EventProcessor createEventProcessor(
                                                                              final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences)
                                   {
                                       assertEquals("Should not have had any barrier sequences", 0, barrierSequences.length);
                                       return new BatchEventProcessor<TestEvent>(
                                                                                 disruptor.getRingBuffer(), ringBuffer.newBarrier(
                                                                                                                                  barrierSequences), eventHandler);
                                   }
                               });

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch);
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldHonourDependenciesForCustomProcessors() throws Exception
{
    final CountDownLatch countDownLatch = new CountDownLatch(2);
    final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch);
    final DelayedEventHandler delayedEventHandler = createDelayedEventHandler();

    disruptor.handleEventsWith(delayedEventHandler).then(
        new EventProcessorFactory<TestEvent>()
        {
            @Override
            public EventProcessor createEventProcessor(
                final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences)
            {
                assertSame("Should have had a barrier sequence", 1, barrierSequences.length);
                return new BatchEventProcessor<TestEvent>(
                    disruptor.getRingBuffer(), ringBuffer.newBarrier(
                    barrierSequences), eventHandler);
            }
        });

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler);
}
项目:disruptor-code-analysis    文件:Disruptor.java   
EventHandlerGroup<T> createEventProcessors(
        final Sequence[] barrierSequences, final EventProcessorFactory<T>[] processorFactories) {
    final EventProcessor[] eventProcessors = new EventProcessor[processorFactories.length];
    for (int i = 0; i < processorFactories.length; i++) {
        eventProcessors[i] = processorFactories[i].createEventProcessor(ringBuffer, barrierSequences);
    }
    return handleEventsWith(eventProcessors);
}
项目:disruptor-code-analysis    文件:EventProcessorInfo.java   
EventProcessorInfo(
    final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier)
{
    this.eventprocessor = eventprocessor;
    this.handler = handler;
    this.barrier = barrier;
}
项目:disruptor-code-analysis    文件:EventHandlerGroup.java   
/**
 * Create a new event handler group that combines the handlers in this group with <tt>processors</tt>.
 *
 * @param processors the processors to combine.
 * @return a new EventHandlerGroup combining the existing and new processors into a single dependency group.
 */
public EventHandlerGroup<T> and(final EventProcessor... processors)
{
    Sequence[] combinedSequences = new Sequence[sequences.length + processors.length];

    for (int i = 0; i < processors.length; i++)
    {
        consumerRepository.add(processors[i]);
        combinedSequences[i] = processors[i].getSequence();
    }
    System.arraycopy(sequences, 0, combinedSequences, processors.length, sequences.length);

    return new EventHandlerGroup<T>(disruptor, consumerRepository, combinedSequences);
}
项目:disruptor-code-analysis    文件:Util.java   
/**
 * Get an array of {@link Sequence}s for the passed {@link EventProcessor}s
 *
 * @param processors for which to get the sequences
 * @return the array of {@link Sequence}s
 */
public static Sequence[] getSequencesFor(final EventProcessor... processors) {
    Sequence[] sequences = new Sequence[processors.length];
    for (int i = 0; i < sequences.length; i++) {
        sequences[i] = processors[i].getSequence();
    }

    return sequences;
}
项目:disruptor-code-analysis    文件:ConsumerRepositoryTest.java   
@Before
public void setUp() throws Exception
{
    consumerRepository = new ConsumerRepository<TestEvent>();
    eventProcessor1 = mockery.mock(EventProcessor.class, "eventProcessor1");
    eventProcessor2 = mockery.mock(EventProcessor.class, "eventProcessor2");

    final Sequence sequence1 = new Sequence();
    final Sequence sequence2 = new Sequence();
    mockery.checking(
        new Expectations()
        {
            {
                allowing(eventProcessor1).getSequence();
                will(returnValue(sequence1));

                allowing(eventProcessor1).isRunning();
                will(returnValue(true));

                allowing(eventProcessor2).getSequence();
                will(returnValue(sequence2));

                allowing(eventProcessor2).isRunning();
                will(returnValue(true));
            }
        });
    handler1 = new SleepingEventHandler();
    handler2 = new SleepingEventHandler();

    barrier1 = mockery.mock(SequenceBarrier.class, "barrier1");
    barrier2 = mockery.mock(SequenceBarrier.class, "barrier2");
}
项目:annotated-src    文件:Disruptor.java   
/**
 * Create a group of event processors to be used as a dependency.
 *
 * @param processors the event processors, previously set up with {@link #handleEventsWith(com.lmax.disruptor.EventProcessor...)},
 *                   that will form the barrier for subsequent handlers or processors.
 * @return an {@link EventHandlerGroup} that can be used to setup a {@link SequenceBarrier} over the specified event processors.
 * @see #after(com.lmax.disruptor.EventHandler[])
 */
public EventHandlerGroup<T> after(final EventProcessor... processors)
{
    for (EventProcessor processor : processors)
    {
        consumerRepository.add(processor);
    }

    return new EventHandlerGroup<T>(this, consumerRepository, Util.getSequencesFor(processors));
}
项目:annotated-src    文件:EventHandlerGroup.java   
/**
 * Create a new event handler group that combines the handlers in this group with <tt>processors</tt>.
 *
 * @param processors the processors to combine.
 * @return a new EventHandlerGroup combining the existing and new processors into a single dependency group.
 */
public EventHandlerGroup<T> and(final EventProcessor... processors) {
    Sequence[] combinedSequences = new Sequence[sequences.length + processors.length];

    for (int i = 0; i < processors.length; i++) {
        consumerRepository.add(processors[i]);
        combinedSequences[i] = processors[i].getSequence();
    }
    System.arraycopy(sequences, 0, combinedSequences, processors.length, sequences.length);

    return new EventHandlerGroup<T>(disruptor, consumerRepository, combinedSequences);
}
项目:annotated-src    文件:Util.java   
/**
 * Get an array of {@link Sequence}s for the passed {@link EventProcessor}s
 *
 * @param processors for which to get the sequences
 * @return the array of {@link Sequence}s
 */
public static Sequence[] getSequencesFor(final EventProcessor... processors)
{
    Sequence[] sequences = new Sequence[processors.length];
    for (int i = 0; i < sequences.length; i++)
    {
        sequences[i] = processors[i].getSequence();
    }

    return sequences;
}
项目:disruptor-code-analysis    文件:EventProcessorInfo.java   
public EventProcessor getEventProcessor()
{
    return eventprocessor;
}
项目:annotated-src    文件:EventProcessorInfo.java   
EventProcessorInfo(final EventProcessor eventprocessor, final EventHandler<T> handler, final SequenceBarrier barrier)
{
    this.eventprocessor = eventprocessor;
    this.handler = handler;
    this.barrier = barrier;
}
项目:annotated-src    文件:EventProcessorInfo.java   
public EventProcessor getEventProcessor()
{
    return eventprocessor;
}
项目:disruptor-code-analysis    文件:Disruptor.java   
/**
 * <p>Set up custom event processors to handle events from the ring buffer. The Disruptor will
 * automatically start this processors when {@link #start()} is called.</p>
 * <p>
 * <p>This method can be used as the start of a chain. For example if the processor <code>A</code> must
 * process events before handler <code>B</code>:</p>
 * <pre><code>dw.handleEventsWith(A).then(B);</code></pre>
 *
 * @param processors the event processors that will process events.
 * @return a {@link EventHandlerGroup} that can be used to chain dependencies.
 */
public EventHandlerGroup<T> handleEventsWith(final EventProcessor... processors) {
    for (final EventProcessor processor : processors) {
        consumerRepository.add(processor);
    }
    return new EventHandlerGroup<T>(this, consumerRepository, Util.getSequencesFor(processors));
}
项目:disruptor-code-analysis    文件:Disruptor.java   
/**
 * Create a group of event processors to be used as a dependency.
 *
 * @param processors the event processors, previously set up with {@link #handleEventsWith(com.lmax.disruptor.EventProcessor...)},
 *                   that will form the barrier for subsequent handlers or processors.
 * @return an {@link EventHandlerGroup} that can be used to setup a {@link SequenceBarrier} over the specified event processors.
 * @see #after(com.lmax.disruptor.EventHandler[])
 */
public EventHandlerGroup<T> after(final EventProcessor... processors) {
    for (final EventProcessor processor : processors) {
        consumerRepository.add(processor);
    }

    return new EventHandlerGroup<T>(this, consumerRepository, Util.getSequencesFor(processors));
}
项目:disruptor-code-analysis    文件:EventProcessorFactory.java   
/**
 * Create a new event processor that gates on <code>barrierSequences</code>.
 *
 * @param barrierSequences the sequences to gate on
 * @return a new EventProcessor that gates on <code>barrierSequences</code> before processing events
 */
EventProcessor createEventProcessor(RingBuffer<T> ringBuffer, Sequence[] barrierSequences);