@Test public void shouldMakeEntriesAvailableToFirstCustomProcessorsImmediately() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(2); final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch); disruptor.handleEventsWith( new EventProcessorFactory<TestEvent>() { @Override public EventProcessor createEventProcessor( final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences) { assertEquals("Should not have had any barrier sequences", 0, barrierSequences.length); return new BatchEventProcessor<TestEvent>( disruptor.getRingBuffer(), ringBuffer.newBarrier( barrierSequences), eventHandler); } }); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch); }
@Test public void shouldHonourDependenciesForCustomProcessors() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(2); final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch); final DelayedEventHandler delayedEventHandler = createDelayedEventHandler(); disruptor.handleEventsWith(delayedEventHandler).then( new EventProcessorFactory<TestEvent>() { @Override public EventProcessor createEventProcessor( final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences) { assertSame("Should have had a barrier sequence", 1, barrierSequences.length); return new BatchEventProcessor<TestEvent>( disruptor.getRingBuffer(), ringBuffer.newBarrier( barrierSequences), eventHandler); } }); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler); }
EventHandlerGroup<T> createEventProcessors( final Sequence[] barrierSequences, final EventProcessorFactory<T>[] processorFactories) { final EventProcessor[] eventProcessors = new EventProcessor[processorFactories.length]; for (int i = 0; i < processorFactories.length; i++) { eventProcessors[i] = processorFactories[i].createEventProcessor(ringBuffer, barrierSequences); } return handleEventsWith(eventProcessors); }
EventProcessorInfo( final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier) { this.eventprocessor = eventprocessor; this.handler = handler; this.barrier = barrier; }
/** * Create a new event handler group that combines the handlers in this group with <tt>processors</tt>. * * @param processors the processors to combine. * @return a new EventHandlerGroup combining the existing and new processors into a single dependency group. */ public EventHandlerGroup<T> and(final EventProcessor... processors) { Sequence[] combinedSequences = new Sequence[sequences.length + processors.length]; for (int i = 0; i < processors.length; i++) { consumerRepository.add(processors[i]); combinedSequences[i] = processors[i].getSequence(); } System.arraycopy(sequences, 0, combinedSequences, processors.length, sequences.length); return new EventHandlerGroup<T>(disruptor, consumerRepository, combinedSequences); }
/** * Get an array of {@link Sequence}s for the passed {@link EventProcessor}s * * @param processors for which to get the sequences * @return the array of {@link Sequence}s */ public static Sequence[] getSequencesFor(final EventProcessor... processors) { Sequence[] sequences = new Sequence[processors.length]; for (int i = 0; i < sequences.length; i++) { sequences[i] = processors[i].getSequence(); } return sequences; }
@Before public void setUp() throws Exception { consumerRepository = new ConsumerRepository<TestEvent>(); eventProcessor1 = mockery.mock(EventProcessor.class, "eventProcessor1"); eventProcessor2 = mockery.mock(EventProcessor.class, "eventProcessor2"); final Sequence sequence1 = new Sequence(); final Sequence sequence2 = new Sequence(); mockery.checking( new Expectations() { { allowing(eventProcessor1).getSequence(); will(returnValue(sequence1)); allowing(eventProcessor1).isRunning(); will(returnValue(true)); allowing(eventProcessor2).getSequence(); will(returnValue(sequence2)); allowing(eventProcessor2).isRunning(); will(returnValue(true)); } }); handler1 = new SleepingEventHandler(); handler2 = new SleepingEventHandler(); barrier1 = mockery.mock(SequenceBarrier.class, "barrier1"); barrier2 = mockery.mock(SequenceBarrier.class, "barrier2"); }
/** * Create a group of event processors to be used as a dependency. * * @param processors the event processors, previously set up with {@link #handleEventsWith(com.lmax.disruptor.EventProcessor...)}, * that will form the barrier for subsequent handlers or processors. * @return an {@link EventHandlerGroup} that can be used to setup a {@link SequenceBarrier} over the specified event processors. * @see #after(com.lmax.disruptor.EventHandler[]) */ public EventHandlerGroup<T> after(final EventProcessor... processors) { for (EventProcessor processor : processors) { consumerRepository.add(processor); } return new EventHandlerGroup<T>(this, consumerRepository, Util.getSequencesFor(processors)); }
public EventProcessor getEventProcessor() { return eventprocessor; }
EventProcessorInfo(final EventProcessor eventprocessor, final EventHandler<T> handler, final SequenceBarrier barrier) { this.eventprocessor = eventprocessor; this.handler = handler; this.barrier = barrier; }
/** * <p>Set up custom event processors to handle events from the ring buffer. The Disruptor will * automatically start this processors when {@link #start()} is called.</p> * <p> * <p>This method can be used as the start of a chain. For example if the processor <code>A</code> must * process events before handler <code>B</code>:</p> * <pre><code>dw.handleEventsWith(A).then(B);</code></pre> * * @param processors the event processors that will process events. * @return a {@link EventHandlerGroup} that can be used to chain dependencies. */ public EventHandlerGroup<T> handleEventsWith(final EventProcessor... processors) { for (final EventProcessor processor : processors) { consumerRepository.add(processor); } return new EventHandlerGroup<T>(this, consumerRepository, Util.getSequencesFor(processors)); }
/** * Create a group of event processors to be used as a dependency. * * @param processors the event processors, previously set up with {@link #handleEventsWith(com.lmax.disruptor.EventProcessor...)}, * that will form the barrier for subsequent handlers or processors. * @return an {@link EventHandlerGroup} that can be used to setup a {@link SequenceBarrier} over the specified event processors. * @see #after(com.lmax.disruptor.EventHandler[]) */ public EventHandlerGroup<T> after(final EventProcessor... processors) { for (final EventProcessor processor : processors) { consumerRepository.add(processor); } return new EventHandlerGroup<T>(this, consumerRepository, Util.getSequencesFor(processors)); }
/** * Create a new event processor that gates on <code>barrierSequences</code>. * * @param barrierSequences the sequences to gate on * @return a new EventProcessor that gates on <code>barrierSequences</code> before processing events */ EventProcessor createEventProcessor(RingBuffer<T> ringBuffer, Sequence[] barrierSequences);