public MultiBufferBatchEventProcessor( DataProvider<T>[] providers, SequenceBarrier[] barriers, EventHandler<T> handler) { if (providers.length != barriers.length) { throw new IllegalArgumentException(); } this.providers = providers; this.barriers = barriers; this.handler = handler; this.sequences = new Sequence[providers.length]; for (int i = 0; i < sequences.length; i++) { sequences[i] = new Sequence(-1); } }
@Test public void shouldSupportHandlersAsDependenciesToCustomProcessors() throws Exception { final DelayedEventHandler delayedEventHandler = createDelayedEventHandler(); disruptor.handleEventsWith(delayedEventHandler); RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer(); CountDownLatch countDownLatch = new CountDownLatch(2); EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch); final SequenceBarrier sequenceBarrier = disruptor.after(delayedEventHandler).asSequenceBarrier(); final BatchEventProcessor<TestEvent> processor = new BatchEventProcessor<TestEvent>(ringBuffer, sequenceBarrier, handlerWithBarrier); disruptor.handleEventsWith(processor); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler); }
@Test public void shouldSupportCustomProcessorsAndHandlersAsDependencies() throws Exception { final DelayedEventHandler delayedEventHandler1 = createDelayedEventHandler(); final DelayedEventHandler delayedEventHandler2 = createDelayedEventHandler(); disruptor.handleEventsWith(delayedEventHandler1); RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer(); CountDownLatch countDownLatch = new CountDownLatch(2); EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch); final SequenceBarrier sequenceBarrier = disruptor.after(delayedEventHandler1).asSequenceBarrier(); final BatchEventProcessor<TestEvent> processor = new BatchEventProcessor<TestEvent>(ringBuffer, sequenceBarrier, delayedEventHandler2); disruptor.after(delayedEventHandler1).and(processor).handleEventsWith(handlerWithBarrier); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler1, delayedEventHandler2); }
/** * Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when * the {@link com.lmax.disruptor.EventHandler#onEvent(Object, long, boolean)} method returns. * * @param ringBuffer to which events are published. * @param sequenceBarrier on which it is waiting. * @param eventHandler is the delegate to which events are dispatched. * @param turn is the value of, sequence % groupCount this batch processor process events. Turn must be * less than groupCount * @param groupCount total number of concurrent batch processors for the event type * @param batchSize size limit of total content size to batch. This is a loose limit */ public ConcurrentContentReadTaskBatchProcessor(final RingBuffer<DeliveryEventData> ringBuffer, final SequenceBarrier sequenceBarrier, final ContentCacheCreator eventHandler, long turn, int groupCount, int batchSize) { if (turn >= groupCount) { throw new IllegalArgumentException("Turn should be less than groupCount"); } this.ringBuffer = ringBuffer; this.sequenceBarrier = sequenceBarrier; this.eventHandler = eventHandler; this.turn = turn; this.groupCount = groupCount; this.batchSize = batchSize; exceptionHandler = new DeliveryExceptionHandler(); running = new AtomicBoolean(false); if (eventHandler instanceof SequenceReportingEventHandler) { ((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence); } }
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException { if(cursorSequence.get() < sequence) { this.lock.lock(); try { while(cursorSequence.get() < sequence) { barrier.checkAlert(); this.processorNotifyCondition.await(); } } finally { this.lock.unlock(); } } long availableSequence; while((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); LockSupport.parkNanos(1L); } return availableSequence; }
public void init(Properties configProps) { // subscriptions = new SubscriptionsStore(); //Modified by WSO2 in-order to extend the capability of the existing subscriptions store //to be more suitable for the distribution architecture of Andes subscriptions = new MQTTSubscriptionStore(); ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("Disruptor MQTT Simple Messaging Thread %d").build(); ExecutorService executor = Executors.newCachedThreadPool(namedThreadFactory); Integer ringBufferSize = AndesConfigurationManager.readValue( AndesConfiguration.TRANSPORTS_MQTT_INBOUND_BUFFER_SIZE); disruptor = new Disruptor<ValueEvent>( ValueEvent.EVENT_FACTORY, ringBufferSize, executor); //Added by WSO2, we do not want to ignore the exception here disruptor.handleExceptionsWith(new MqttLogExceptionHandler()); SequenceBarrier barrier = disruptor.getRingBuffer().newBarrier(); BatchEventProcessor<ValueEvent> eventProcessor = new BatchEventProcessor<ValueEvent>( disruptor.getRingBuffer(), barrier, this); //Added by WSO2, we need to make sure the exceptions aren't ignored eventProcessor.setExceptionHandler(new MqttLogExceptionHandler()); disruptor.handleEventsWith(eventProcessor); m_ringBuffer = disruptor.start(); disruptorPublish(new InitEvent(configProps)); }
/** * @param subscriptions the subscription store where are stored all the existing * clients subscriptions. * @param storageService the persistent store to use for save/load of messages * for QoS1 and QoS2 handling. * @param authenticator the authenticator used in connect messages */ void init(SubscriptionsStore subscriptions, IStorageService storageService, IAuthenticator authenticator) { //m_clientIDs = clientIDs; this.subscriptions = subscriptions; m_authenticator = authenticator; m_storageService = storageService; //init the output ringbuffer m_executor = Executors.newFixedThreadPool(1); m_ringBuffer = new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY, 1024 * 32); SequenceBarrier barrier = m_ringBuffer.newBarrier(); m_eventProcessor = new BatchEventProcessor<ValueEvent>(m_ringBuffer, barrier, this); //TODO in a presentation is said to don't do the followinf line!! m_ringBuffer.setGatingSequences(m_eventProcessor.getSequence()); m_executor.submit(m_eventProcessor); }
EventHandlerGroup<T> createEventProcessors( final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) { checkNotStarted(); final Sequence[] processorSequences = new Sequence[eventHandlers.length]; final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler<? super T> eventHandler = eventHandlers[i]; final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler); if (exceptionHandler != null) { batchEventProcessor.setExceptionHandler(exceptionHandler); } consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } if (processorSequences.length > 0) { consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); } return new EventHandlerGroup<T>(this, consumerRepository, processorSequences); }
EventHandlerGroup<T> createWorkerPool( final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers) { final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences); final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers); consumerRepository.add(workerPool, sequenceBarrier); return new EventHandlerGroup<T>(this, consumerRepository, workerPool.getWorkerSequences()); }
EventProcessorInfo( final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier) { this.eventprocessor = eventprocessor; this.handler = handler; this.barrier = barrier; }
@Before public void setUp() throws Exception { consumerRepository = new ConsumerRepository<TestEvent>(); eventProcessor1 = mockery.mock(EventProcessor.class, "eventProcessor1"); eventProcessor2 = mockery.mock(EventProcessor.class, "eventProcessor2"); final Sequence sequence1 = new Sequence(); final Sequence sequence2 = new Sequence(); mockery.checking( new Expectations() { { allowing(eventProcessor1).getSequence(); will(returnValue(sequence1)); allowing(eventProcessor1).isRunning(); will(returnValue(true)); allowing(eventProcessor2).getSequence(); will(returnValue(sequence2)); allowing(eventProcessor2).isRunning(); will(returnValue(true)); } }); handler1 = new SleepingEventHandler(); handler2 = new SleepingEventHandler(); barrier1 = mockery.mock(SequenceBarrier.class, "barrier1"); barrier2 = mockery.mock(SequenceBarrier.class, "barrier2"); }
public TestWaiter( final CyclicBarrier cyclicBarrier, final SequenceBarrier sequenceBarrier, final RingBuffer<StubEvent> ringBuffer, final long initialSequence, final long toWaitForSequence) { this.cyclicBarrier = cyclicBarrier; this.initialSequence = initialSequence; this.ringBuffer = ringBuffer; this.toWaitForSequence = toWaitForSequence; this.sequenceBarrier = sequenceBarrier; }
public long waitFor( long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier ) throws AlertException, InterruptedException, TimeoutException { long availableSequence; if ((availableSequence = cursor.get()) < sequence) { flush(); synchronized (lock) { ++numWaiters; while ((availableSequence = cursor.get()) < sequence) { if (state == State.STOPPED) { disruptor.halt(); throw AlertException.INSTANCE; } barrier.checkAlert(); //*/ lock.wait(); /*/ Thread.sleep(1); //*/ } --numWaiters; } } while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); } return availableSequence; }
@Override public long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException, TimeoutException { long availableSequence; while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); LockSupport.parkNanos(parkFor); } return availableSequence; }
/** * @param ring source byte ring containing inbound messages * @param sequenceBarrier on which it is waiting. * @param delegate is the delegate to which message are dispatched. * @param exceptionHandler to be called back when an error occurs * as {@link com.lmax.disruptor.Sequencer#INITIAL_CURSOR_VALUE} */ public AbstractByteRingConsumerEx(ByteRing ring, SequenceBarrier sequenceBarrier, RingBufferBlockProcessor delegate, ExceptionHandler exceptionHandler) { super(ring, sequenceBarrier); this.delegate = delegate; this.exceptionHandler = exceptionHandler; }
@Override public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; while ((availableSequence = dependentSequence.get()) < sequence) { SpinHint.spinLoopHint(); barrier.checkAlert(); } return availableSequence; }
public static void main(String[] args) { final List<ValueEventHandler1PMC> handlers = new ArrayList<>(NUMBER_CONSUMERS); RingBuffer<ValueEvent> ringBuffer = RingBuffer.createSingleProducer( ValueEvent.EVENT_FACTORY, RING_SIZE, new SleepingWaitStrategy()); start = System.nanoTime(); //Create consumers for(int i = 0; i < NUMBER_CONSUMERS; i++) { ValueEventHandler1PMC handler = new ValueEventHandler1PMC(start, handlers); handlers.add(handler); SequenceBarrier barrier = ringBuffer.newBarrier(); BatchEventProcessor<ValueEvent> eventProcessor = new BatchEventProcessor<ValueEvent>( ringBuffer, barrier, handler); ringBuffer.addGatingSequences(eventProcessor.getSequence()); // Each EventProcessor can run on a separate thread EXECUTOR.submit(eventProcessor); } for(int i = 0; i < SAMPLES_SIZE; i++) { // Publishers claim events in sequence long sequence = ringBuffer.next(); ValueEvent event = ringBuffer.get(sequence); event.setValue(i); // this could be more complex with multiple fields // make the event available to EventProcessors ringBuffer.publish(sequence); } }
@Override public boolean initialize(EventsChannelConfig config) { super.initialize(config); log.info("Initialize disruptor events channel " + config.getName() + " with " + config); EventFactory<GridEvent> eventFactory = new DisruptorEventFactory(); int ringBufferSize = config.getBlockQueueMaxNumber(); int threadSize = config.getEventConsumerNumber(); int bufferSize = ringBufferSize; if (Integer.bitCount(bufferSize) != 1) { bufferSize = (int) Math.pow(2, (int) (Math.log(ringBufferSize) / Math.log(2))); log.warn("Change disruptor events channel " + config.getName() + " buffer size from " + ringBufferSize + " to " + bufferSize); } if (bufferSize <= 0) throw new GridException("Invalid disruptor ringbuffur size:" + ringBufferSize); threadPool = Executors.newFixedThreadPool(threadSize); ringBuffer = RingBuffer.createMultiProducer(eventFactory, bufferSize, new BlockingWaitStrategy()); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); ExecutorService executor = Executors.newFixedThreadPool(10); @SuppressWarnings("unchecked") WorkHandler<GridEvent>[] workHandlers = new WorkHandler[threadSize]; for (int i = 0; i < threadSize; i++) { WorkHandler<GridEvent> workHandler = new DisruptorEventsWorkHandler(getName()); workHandlers[i] = workHandler; } workerPool = new WorkerPool<GridEvent>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), workHandlers); workerPool.start(executor); return true; }
/** * {@inheritDoc} */ @Override public long waitFor(final long sequence, final Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException, TimeoutException { return cursor.get(); }
public DisruptorExecutor(int threadCount, int bufferSize, WaitStrategy waitStrategy) { ringBuffer = RingBuffer.createMultiProducer(new EventFactory<RContainer>() { @Override public RContainer newInstance() { return new RContainer(); } }, bufferSize, waitStrategy); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); Sequence workSequence = new Sequence(-1); workProcessors = new WorkProcessor[threadCount]; for (int i = 0 ; i < threadCount ; i++) { workProcessors[i] = new WorkProcessor<RContainer>(ringBuffer, sequenceBarrier, handler, new IgnoreExceptionHandler(), workSequence); } workExec = Executors.newFixedThreadPool(workProcessors.length, new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } }); for (WorkProcessor p : workProcessors) workExec.execute(p); }
public void init(Properties configProps) { subscriptions = new SubscriptionsStore(); m_executor = Executors.newFixedThreadPool(1); m_ringBuffer = new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY, 1024 * 32); SequenceBarrier barrier = m_ringBuffer.newBarrier(); m_eventProcessor = new BatchEventProcessor<ValueEvent>(m_ringBuffer, barrier, this); //TODO in a presentation is said to don't do the followinf line!! m_ringBuffer.setGatingSequences(m_eventProcessor.getSequence()); m_executor.submit(m_eventProcessor); disruptorPublish(new InitEvent(configProps)); }
public void init() { m_executor = Executors.newFixedThreadPool(1); m_ringBuffer = new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY, 1024 * 32); SequenceBarrier barrier = m_ringBuffer.newBarrier(); m_eventProcessor = new BatchEventProcessor<ValueEvent>(m_ringBuffer, barrier, this); //TODO in a presentation is said to don't do the followinf line!! m_ringBuffer.setGatingSequences(m_eventProcessor.getSequence()); m_executor.submit(m_eventProcessor); disruptorPublish(new InitEvent()); }
@Override public SequenceBarrier getBarrier() { return barrier; }
public SequenceBarrier newBarrier() { return sequencer.newBarrier(); }
public static <T> boolean waitRequestOrTerminalEvent( Sequence pendingRequest, RingBuffer<MutableSignal<T>> ringBuffer, SequenceBarrier barrier, Subscriber<? super T> subscriber, AtomicBoolean isRunning ) { final long waitedSequence = ringBuffer.getCursor() + 1L; try { MutableSignal<T> event = null; while (pendingRequest.get() < 0l) { //pause until first request if (event == null) { barrier.waitFor(waitedSequence); event = ringBuffer.get(waitedSequence); if (event.type == MutableSignal.Type.COMPLETE) { try { subscriber.onComplete(); return false; } catch (Throwable t) { Exceptions.throwIfFatal(t); subscriber.onError(t); return false; } } else if (event.type == MutableSignal.Type.ERROR) { subscriber.onError(event.error); return false; } } else { barrier.checkAlert(); } LockSupport.parkNanos(1l); } } catch (TimeoutException te) { //ignore } catch (AlertException ae) { if (!isRunning.get()) { return false; } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } return true; }
@Override public long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException, TimeoutException { return currentStrategy.waitFor(sequence, cursor, dependentSequence, barrier); }
public static void main(String[] args) { RingBuffer<ValueEvent> ringBuffer = RingBuffer.createSingleProducer( ValueEvent.EVENT_FACTORY, RING_SIZE, new SleepingWaitStrategy()); start = System.nanoTime(); //Create first consumer ValueEventHandler1PMCSequenceFirst handler = new ValueEventHandler1PMCSequenceFirst(start); SequenceBarrier barrier; barrier = ringBuffer.newBarrier(); BatchEventProcessor<ValueEvent> firstEventProcessor = new BatchEventProcessor<ValueEvent>( ringBuffer, barrier, handler); // ringBuffer.addGatingSequences(firstEventProcessor.getSequence()); // Each EventProcessor can run on a separate thread EXECUTOR.submit(firstEventProcessor); //Create second consumer ValueEventHandler1PMCSequenceSecond handler2 = new ValueEventHandler1PMCSequenceSecond(start); SequenceBarrier barrier2 = ringBuffer.newBarrier(firstEventProcessor.getSequence()); BatchEventProcessor<ValueEvent> secondEventProcessor = new BatchEventProcessor<ValueEvent>( ringBuffer, barrier2, handler2); ringBuffer.addGatingSequences(secondEventProcessor.getSequence()); // Each EventProcessor can run on a separate thread EXECUTOR.submit(secondEventProcessor); for(int i = 0; i < SAMPLES_SIZE; i++) { // Publishers claim events in sequence long sequence = ringBuffer.next(); ValueEvent event = ringBuffer.get(sequence); event.setValue(i); // this could be more complex with multiple fields // make the event available to EventProcessors ringBuffer.publish(sequence); } }
public DisruptorBasedFlusher() { Integer ringBufferSize = AndesConfigurationManager.readValue( AndesConfiguration.PERFORMANCE_TUNING_DELIVERY_RING_BUFFER_SIZE); Integer parallelContentReaders = AndesConfigurationManager.readValue( AndesConfiguration.PERFORMANCE_TUNING_DELIVERY_PARALLEL_CONTENT_READERS); Integer parallelDecompressionHandlers = AndesConfigurationManager.readValue( AndesConfiguration.PERFORMANCE_TUNING_DELIVERY_PARALLEL_DECOMPRESSION_HANDLERS); Integer parallelDeliveryHandlers = AndesConfigurationManager.readValue( AndesConfiguration.PERFORMANCE_TUNING_DELIVERY_PARALLEL_DELIVERY_HANDLERS); Integer contentSizeToBatch = AndesConfigurationManager.readValue( AndesConfiguration.PERFORMANCE_TUNING_DELIVERY_CONTENT_READ_BATCH_SIZE); int maxContentChunkSize = AndesConfigurationManager.readValue( AndesConfiguration.PERFORMANCE_TUNING_MAX_CONTENT_CHUNK_SIZE); ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorBasedFlusher-%d").build(); Executor threadPoolExecutor = Executors.newCachedThreadPool(namedThreadFactory); disruptor = new Disruptor<>(new DeliveryEventData.DeliveryEventDataFactory(), ringBufferSize, threadPoolExecutor, ProducerType.MULTI, new SleepingBlockingWaitStrategy()); disruptor.handleExceptionsWith(new DeliveryExceptionHandler()); // This barrier is used for contentReaders. Content read processors process events first. Hence take the // barrier directly from ring buffer SequenceBarrier barrier = disruptor.getRingBuffer().newBarrier(); // Initialize content readers ConcurrentContentReadTaskBatchProcessor[] contentReadTaskBatchProcessor = new ConcurrentContentReadTaskBatchProcessor[parallelContentReaders]; for (int i = 0; i < parallelContentReaders; i++) { contentReadTaskBatchProcessor[i] = new ConcurrentContentReadTaskBatchProcessor( disruptor.getRingBuffer(), barrier, new ContentCacheCreator(maxContentChunkSize), i, parallelContentReaders, contentSizeToBatch); contentReadTaskBatchProcessor[i].setExceptionHandler(new DeliveryExceptionHandler()); } // Initialize decompression handlers ContentDecompressionHandler[] decompressionEventHandlers = new ContentDecompressionHandler[parallelDecompressionHandlers]; for (int i = 0; i < parallelDecompressionHandlers; i++) { decompressionEventHandlers[i] = new ContentDecompressionHandler(maxContentChunkSize); } // Initialize delivery handlers DeliveryEventHandler[] deliveryEventHandlers = new DeliveryEventHandler[parallelDeliveryHandlers]; for (int i = 0; i < parallelDeliveryHandlers; i++) { deliveryEventHandlers[i] = new DeliveryEventHandler(i, parallelDeliveryHandlers); } // Initialize handler for delivery event cleanup DeliveryEventCleanupHandler deliveryEventCleanupHandler = new DeliveryEventCleanupHandler(); disruptor.handleEventsWith(contentReadTaskBatchProcessor).then(decompressionEventHandlers) .then(deliveryEventHandlers).then(deliveryEventCleanupHandler); disruptor.start(); ringBuffer = disruptor.getRingBuffer(); //Will add the gauge listener to periodically calculate the outbound messages in the ring MetricManager.gauge(MetricsConstants.DISRUPTOR_OUTBOUND_RING, Level.INFO, new OutBoundRingGauge()); }
EventHandlerGroup<T> createWorkerPool(final Sequence[] barrierSequences, final WorkHandler<T>[] workHandlers) { final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences); final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers); consumerRepository.add(workerPool, sequenceBarrier); return new EventHandlerGroup<T>(this, consumerRepository, workerPool.getWorkerSequences()); }
EventProcessorInfo(final EventProcessor eventprocessor, final EventHandler<T> handler, final SequenceBarrier barrier) { this.eventprocessor = eventprocessor; this.handler = handler; this.barrier = barrier; }
public SequenceBarrier get_barrier() { return _barrier; }
/** * Get the {@link SequenceBarrier} used by a specific handler. Note that the {@link SequenceBarrier} * may be shared by multiple event handlers. * * @param handler the handler to get the barrier for. * @return the SequenceBarrier used by <i>handler</i>. */ public SequenceBarrier getBarrierFor(final EventHandler<T> handler) { return consumerRepository.getBarrierFor(handler); }
/** * Create a dependency barrier for the processors in this group. * This allows custom event processors to have dependencies on * {@link com.lmax.disruptor.BatchEventProcessor}s created by the disruptor. * * @return a {@link SequenceBarrier} including all the processors in this group. */ public SequenceBarrier asSequenceBarrier() { return disruptor.getRingBuffer().newBarrier(sequences); }
/** * Create a new SequenceBarrier to be used by an EventProcessor to track which messages * are available to be read from the ring buffer given a list of sequences to track. * * @see SequenceBarrier * @param sequencesToTrack the additional sequences to track * @return A sequence barrier that will track the specified sequences. */ public SequenceBarrier newBarrier(Sequence... sequencesToTrack) { return sequencer.newBarrier(sequencesToTrack); }
/** * @param ring source byte ring containing inbound messages * @param sequenceBarrier on which it is waiting. * @param delegate is the delegate to which message are dispatched. * @param exceptionHandler to be called back when an error occurs * as {@link com.lmax.disruptor.Sequencer#INITIAL_CURSOR_VALUE} */ public VariableBlockSizeRingConsumer(ByteRing ring, SequenceBarrier sequenceBarrier, RingBufferBlockProcessor delegate, ExceptionHandler exceptionHandler) { super(ring, sequenceBarrier, delegate, exceptionHandler); }
/** * @param ring source byte ring containing inbound messages * @param sequenceBarrier on which it is waiting. * @param delegate is the delegate to which message are dispatched. * @param exceptionHandler to be called back when an error occurs * as {@link com.lmax.disruptor.Sequencer#INITIAL_CURSOR_VALUE} * @param blockSize */ public FixedBlockSizeRingConsumer(ByteRing ring, SequenceBarrier sequenceBarrier, RingBufferBlockProcessor delegate, ExceptionHandler exceptionHandler, int blockSize) { super(ring, sequenceBarrier, delegate, exceptionHandler); this.blockSize = blockSize; }
/** * Create a new SequenceBarrier to be used by an EventProcessor to track which messages are available to be read from the ring buffer given a list of * sequences to track. * * @see SequenceBarrier * @param sequencesToTrack the additional sequences to track * @return A sequence barrier that will track the specified sequences. */ public SequenceBarrier newBarrier(Sequence... sequencesToTrack) { return sequencer.newBarrier(sequencesToTrack); }