public MultiBufferBatchEventProcessor( DataProvider<T>[] providers, SequenceBarrier[] barriers, EventHandler<T> handler) { if (providers.length != barriers.length) { throw new IllegalArgumentException(); } this.providers = providers; this.barriers = barriers; this.handler = handler; this.sequences = new Sequence[providers.length]; for (int i = 0; i < sequences.length; i++) { sequences[i] = new Sequence(-1); } }
@Test public void shouldMakeEntriesAvailableToFirstCustomProcessorsImmediately() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(2); final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch); disruptor.handleEventsWith( new EventProcessorFactory<TestEvent>() { @Override public EventProcessor createEventProcessor( final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences) { assertEquals("Should not have had any barrier sequences", 0, barrierSequences.length); return new BatchEventProcessor<TestEvent>( disruptor.getRingBuffer(), ringBuffer.newBarrier( barrierSequences), eventHandler); } }); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch); }
@Test public void shouldHonourDependenciesForCustomProcessors() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(2); final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch); final DelayedEventHandler delayedEventHandler = createDelayedEventHandler(); disruptor.handleEventsWith(delayedEventHandler).then( new EventProcessorFactory<TestEvent>() { @Override public EventProcessor createEventProcessor( final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences) { assertSame("Should have had a barrier sequence", 1, barrierSequences.length); return new BatchEventProcessor<TestEvent>( disruptor.getRingBuffer(), ringBuffer.newBarrier( barrierSequences), eventHandler); } }); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler); }
private RingBufferProcessor(String name, ExecutorService executor, int bufferSize, WaitStrategy waitStrategy, boolean shared, boolean autoCancel) { super(name, executor, autoCancel); this.ringBuffer = RingBuffer.create( shared ? ProducerType.MULTI : ProducerType.SINGLE, new EventFactory<MutableSignal<E>>() { @Override public MutableSignal<E> newInstance() { return new MutableSignal<E>(); } }, bufferSize, waitStrategy ); this.recentSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); this.barrier = ringBuffer.newBarrier(); //ringBuffer.addGatingSequences(recentSequence); }
private boolean replay(final boolean unbounded) { Sequence replayedSequence; MutableSignal<T> signal; while ((replayedSequence = processor.cancelledSequences.poll()) != null) { signal = processor.ringBuffer.get(replayedSequence.get() + 1L); try { if (signal.value == null) { barrier.waitFor(replayedSequence.get() + 1L); } readNextEvent(signal, unbounded); RingBufferSubscriberUtils.routeOnce(signal, subscriber); processor.ringBuffer.removeGatingSequence(replayedSequence); } catch (TimeoutException | InterruptedException | AlertException | CancelException ce) { processor.ringBuffer.removeGatingSequence(sequence); processor.cancelledSequences.add(replayedSequence); return true; } } return false; }
public DisruptorQueueImpl(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) { this._queueName = PREFIX + queueName; _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait); _consumer = new Sequence(); _barrier = _buffer.newBarrier(); _buffer.addGatingSequences(_consumer); if (producerType == ProducerType.SINGLE) { consumerStartedFlag = true; } else { // make sure we flush the pending messages in cache first if (bufferSize < 2) { throw new RuntimeException("QueueSize must >= 2"); } try { publishDirect(FLUSH_CACHE, true); } catch (InsufficientCapacityException e) { throw new RuntimeException("This code should be unreachable!", e); } } }
private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue) { long wrapPoint = (cursorValue + requiredCapacity) - bufferSize; long cachedGatingSequence = gatingSequenceCache.get(); if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue) { long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue); gatingSequenceCache.set(minSequence); if (wrapPoint > minSequence) { return false; } } return true; }
/** * Init method. * * @return */ public DisruptorQueue<ID, DATA> init() { /* single producer "seems" to offer better performance */ ringBuffer = RingBuffer.createSingleProducer(EVENT_FACTORY, ringSize); // ringBuffer = RingBuffer.createMultiProducer(EVENT_FACTORY, ringSize); if (!isEphemeralDisabled()) { int ephemeralBoundSize = Math.max(0, getEphemeralMaxSize()); ephemeralStorage = new ConcurrentHashMap<>( ephemeralBoundSize > 0 ? Math.min(ephemeralBoundSize, ringSize) : ringSize); } consumedSeq = new Sequence(); ringBuffer.addGatingSequences(consumedSeq); long cursor = ringBuffer.getCursor(); consumedSeq.set(cursor); knownPublishedSeq = cursor; return this; }
/** * Construct a blocking queue based on disruptor. * * @param bufferSize * ring buffer size * @param singleProducer * whether only single thread produce events. */ public SingleConsumerDisruptorQueue(int bufferSize, boolean singleProducer) { if (singleProducer) { ringBuffer = RingBuffer.createSingleProducer(new Factory<T>(), normalizeBufferSize(bufferSize)); } else { ringBuffer = RingBuffer.createMultiProducer(new Factory<T>(), normalizeBufferSize(bufferSize)); } consumedSeq = new Sequence(); ringBuffer.addGatingSequences(consumedSeq); barrier = ringBuffer.newBarrier(); long cursor = ringBuffer.getCursor(); consumedSeq.set(cursor); knownPublishedSeq = cursor; }
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException { if(cursorSequence.get() < sequence) { this.lock.lock(); try { while(cursorSequence.get() < sequence) { barrier.checkAlert(); this.processorNotifyCondition.await(); } } finally { this.lock.unlock(); } } long availableSequence; while((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); LockSupport.parkNanos(1L); } return availableSequence; }
RingBufferConsumer(@Nonnull final RingBuffer<T> buffer, @Nonnull final Object[] attachments) { if (buffer == null) { throw new NullPointerException("buffer == null"); } if (attachments == null) { throw new NullPointerException("attachments == null"); } if (buffer.getBufferSize() != attachments.length) { throw new IllegalArgumentException("buffer.getBufferSize() != attachments.length"); } this.buffer = buffer; this.attachments = attachments; this.barrier = buffer.newBarrier(); this.sequence = new Sequence(); buffer.addGatingSequences(sequence); this.cursor = sequence.get(); this.available = sequence.get(); }
public DisruptorQueueImpl(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait, boolean isBatch, int batchSize, long flushMs) { _queueName = PREFIX + queueName; _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait); _consumer = new Sequence(); _barrier = _buffer.newBarrier(); _buffer.addGatingSequences(_consumer); _isBatch = isBatch; _cache = new ArrayList<>(); _inputBatchSize = batchSize; if (_isBatch) { _batcher = new ThreadLocalBatch(); _flusher = new DisruptorFlusher(Math.max(flushMs, 1)); _flusher.start(); } else { _batcher = null; } }
/** * <p>Starts the event processors and returns the fully configured ring buffer.</p> * <p> * <p>The ring buffer is set up to prevent overwriting any entry that is yet to * be processed by the slowest event processor.</p> * <p> * <p>This method must only be called once after all event processors have been added.</p> * * @return the configured ring buffer. */ public RingBuffer<T> start() { final Sequence[] gatingSequences = consumerRepository.getLastSequenceInChain(true); ringBuffer.addGatingSequences(gatingSequences); checkOnlyStartedOnce(); for (final ConsumerInfo consumerInfo : consumerRepository) { consumerInfo.start(executor); } return ringBuffer; }
/** * Confirms if all messages have been consumed by all event processors */ private boolean hasBacklog() { final long cursor = ringBuffer.getCursor(); for (final Sequence consumer : consumerRepository.getLastSequenceInChain(false)) { if (cursor > consumer.get()) { return true; } } return false; }
EventHandlerGroup<T> createEventProcessors( final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) { checkNotStarted(); final Sequence[] processorSequences = new Sequence[eventHandlers.length]; final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler<? super T> eventHandler = eventHandlers[i]; final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler); if (exceptionHandler != null) { batchEventProcessor.setExceptionHandler(exceptionHandler); } consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } if (processorSequences.length > 0) { consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); } return new EventHandlerGroup<T>(this, consumerRepository, processorSequences); }
EventHandlerGroup<T> createEventProcessors( final Sequence[] barrierSequences, final EventProcessorFactory<T>[] processorFactories) { final EventProcessor[] eventProcessors = new EventProcessor[processorFactories.length]; for (int i = 0; i < processorFactories.length; i++) { eventProcessors[i] = processorFactories[i].createEventProcessor(ringBuffer, barrierSequences); } return handleEventsWith(eventProcessors); }
EventHandlerGroup<T> createWorkerPool( final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers) { final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences); final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers); consumerRepository.add(workerPool, sequenceBarrier); return new EventHandlerGroup<T>(this, consumerRepository, workerPool.getWorkerSequences()); }
EventHandlerGroup( final Disruptor<T> disruptor, final ConsumerRepository<T> consumerRepository, final Sequence[] sequences) { this.disruptor = disruptor; this.consumerRepository = consumerRepository; this.sequences = Arrays.copyOf(sequences, sequences.length); }
/** * Create a new event handler group that combines the consumers in this group with <tt>otherHandlerGroup</tt>. * * @param otherHandlerGroup the event handler group to combine. * @return a new EventHandlerGroup combining the existing and new consumers into a single dependency group. */ public EventHandlerGroup<T> and(final EventHandlerGroup<T> otherHandlerGroup) { final Sequence[] combinedSequences = new Sequence[this.sequences.length + otherHandlerGroup.sequences.length]; System.arraycopy(this.sequences, 0, combinedSequences, 0, this.sequences.length); System .arraycopy(otherHandlerGroup.sequences, 0, combinedSequences, this.sequences.length, otherHandlerGroup.sequences.length); return new EventHandlerGroup<T>(disruptor, consumerRepository, combinedSequences); }
/** * Create a new event handler group that combines the handlers in this group with <tt>processors</tt>. * * @param processors the processors to combine. * @return a new EventHandlerGroup combining the existing and new processors into a single dependency group. */ public EventHandlerGroup<T> and(final EventProcessor... processors) { Sequence[] combinedSequences = new Sequence[sequences.length + processors.length]; for (int i = 0; i < processors.length; i++) { consumerRepository.add(processors[i]); combinedSequences[i] = processors[i].getSequence(); } System.arraycopy(sequences, 0, combinedSequences, processors.length, sequences.length); return new EventHandlerGroup<T>(disruptor, consumerRepository, combinedSequences); }
/** * Get an array of {@link Sequence}s for the passed {@link EventProcessor}s * * @param processors for which to get the sequences * @return the array of {@link Sequence}s */ public static Sequence[] getSequencesFor(final EventProcessor... processors) { Sequence[] sequences = new Sequence[processors.length]; for (int i = 0; i < sequences.length; i++) { sequences[i] = processors[i].getSequence(); } return sequences; }
@Before public void setUp() throws Exception { consumerRepository = new ConsumerRepository<TestEvent>(); eventProcessor1 = mockery.mock(EventProcessor.class, "eventProcessor1"); eventProcessor2 = mockery.mock(EventProcessor.class, "eventProcessor2"); final Sequence sequence1 = new Sequence(); final Sequence sequence2 = new Sequence(); mockery.checking( new Expectations() { { allowing(eventProcessor1).getSequence(); will(returnValue(sequence1)); allowing(eventProcessor1).isRunning(); will(returnValue(true)); allowing(eventProcessor2).getSequence(); will(returnValue(sequence2)); allowing(eventProcessor2).isRunning(); will(returnValue(true)); } }); handler1 = new SleepingEventHandler(); handler2 = new SleepingEventHandler(); barrier1 = mockery.mock(SequenceBarrier.class, "barrier1"); barrier2 = mockery.mock(SequenceBarrier.class, "barrier2"); }
@Test public void shouldGetLastEventProcessorsInChain() throws Exception { consumerRepository.add(eventProcessor1, handler1, barrier1); consumerRepository.add(eventProcessor2, handler2, barrier2); consumerRepository.unMarkEventProcessorsAsEndOfChain(eventProcessor2.getSequence()); final Sequence[] lastEventProcessorsInChain = consumerRepository.getLastSequenceInChain(true); assertThat(lastEventProcessorsInChain.length, equalTo(1)); assertThat(lastEventProcessorsInChain[0], sameInstance(eventProcessor1.getSequence())); }
@Test public void shouldReturnMinimumSequence() { final Sequence[] sequences = new Sequence[3]; context.setImposteriser(ClassImposteriser.INSTANCE); sequences[0] = context.mock(Sequence.class, "s0"); sequences[1] = context.mock(Sequence.class, "s1"); sequences[2] = context.mock(Sequence.class, "s2"); context.checking( new Expectations() { { oneOf(sequences[0]).get(); will(returnValue(Long.valueOf(7L))); oneOf(sequences[1]).get(); will(returnValue(Long.valueOf(3L))); oneOf(sequences[2]).get(); will(returnValue(Long.valueOf(12L))); } }); Assert.assertEquals(3L, Util.getMinimumSequence(sequences)); }
@Test public void shouldReturnLongMaxWhenNoEventProcessors() { final Sequence[] sequences = new Sequence[0]; Assert.assertEquals(Long.MAX_VALUE, Util.getMinimumSequence(sequences)); }
public long waitFor( long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier ) throws AlertException, InterruptedException, TimeoutException { long availableSequence; if ((availableSequence = cursor.get()) < sequence) { flush(); synchronized (lock) { ++numWaiters; while ((availableSequence = cursor.get()) < sequence) { if (state == State.STOPPED) { disruptor.halt(); throw AlertException.INSTANCE; } barrier.checkAlert(); //*/ lock.wait(); /*/ Thread.sleep(1); //*/ } --numWaiters; } } while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); } return availableSequence; }
public RingBufferSubscription(Sequence pendingRequest, Subscriber<? super E> subscriber, BatchSignalProcessor<E> eventProcessor) { this.subscriber = subscriber; this.eventProcessor = eventProcessor; this.pendingRequest = pendingRequest; }
/** * Construct a {@link com.lmax.disruptor.EventProcessor} that will automatically track the progress by updating * its * sequence */ public BatchSignalProcessor(RingBufferProcessor<T> processor, Sequence pendingRequest, Subscriber<? super T> subscriber) { this.processor = processor; this.pendingRequest = pendingRequest; this.subscriber = subscriber; }
@Override public long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException, TimeoutException { long availableSequence; while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); LockSupport.parkNanos(parkFor); } return availableSequence; }
@Override public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; while ((availableSequence = dependentSequence.get()) < sequence) { SpinHint.spinLoopHint(); barrier.checkAlert(); } return availableSequence; }
public static void main(String[] args) throws Exception { ringBuffer = RingBuffer.createSingleProducer(LongEvent.FACTORY, 4); consumedSeq = new Sequence(); ringBuffer.addGatingSequences(consumedSeq); long cursor = ringBuffer.getCursor(); consumedSeq.set(cursor); knownPublishedSeq = cursor; for (int i = 0; i < 100; i++) { boolean status = put(ringBuffer, i); System.out.println(i + ": " + status); Long value = take(); System.out.println("Took: " + value); } }
public DisruptorQueue(ClaimStrategy claim, WaitStrategy wait) { _buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), claim, wait); _consumer = new Sequence(); _barrier = _buffer.newBarrier(); _buffer.setGatingSequences(_consumer); if(claim instanceof SingleThreadedClaimStrategy) { consumerStartedFlag = true; } }