private void doRun() throws InterruptedException { BatchEventProcessor<?> batchEventProcessor = ringBuffer.createHandler(new SimpleEventHandler()); Thread t = new Thread(batchEventProcessor); t.start(); long iterations = Constants.ITERATIONS; for (long l = 0; l < iterations; l++) { SimpleEvent e = new SimpleEvent(l, l, l, l); ringBuffer.put(e); } while (batchEventProcessor.getSequence().get() != iterations - 1) { LockSupport.parkNanos(1); } batchEventProcessor.halt(); t.join(); }
@Test public void shouldSupportCustomProcessorsAsDependencies() throws Exception { RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer(); final DelayedEventHandler delayedEventHandler = createDelayedEventHandler(); CountDownLatch countDownLatch = new CountDownLatch(2); EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch); final BatchEventProcessor<TestEvent> processor = new BatchEventProcessor<TestEvent>(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler); disruptor.handleEventsWith(processor); disruptor.after(processor).handleEventsWith(handlerWithBarrier); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler); }
@Test public void shouldSupportHandlersAsDependenciesToCustomProcessors() throws Exception { final DelayedEventHandler delayedEventHandler = createDelayedEventHandler(); disruptor.handleEventsWith(delayedEventHandler); RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer(); CountDownLatch countDownLatch = new CountDownLatch(2); EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch); final SequenceBarrier sequenceBarrier = disruptor.after(delayedEventHandler).asSequenceBarrier(); final BatchEventProcessor<TestEvent> processor = new BatchEventProcessor<TestEvent>(ringBuffer, sequenceBarrier, handlerWithBarrier); disruptor.handleEventsWith(processor); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler); }
@Test public void shouldSupportCustomProcessorsAndHandlersAsDependencies() throws Exception { final DelayedEventHandler delayedEventHandler1 = createDelayedEventHandler(); final DelayedEventHandler delayedEventHandler2 = createDelayedEventHandler(); disruptor.handleEventsWith(delayedEventHandler1); RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer(); CountDownLatch countDownLatch = new CountDownLatch(2); EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch); final SequenceBarrier sequenceBarrier = disruptor.after(delayedEventHandler1).asSequenceBarrier(); final BatchEventProcessor<TestEvent> processor = new BatchEventProcessor<TestEvent>(ringBuffer, sequenceBarrier, delayedEventHandler2); disruptor.after(delayedEventHandler1).and(processor).handleEventsWith(handlerWithBarrier); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler1, delayedEventHandler2); }
@Test public void shouldMakeEntriesAvailableToFirstCustomProcessorsImmediately() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(2); final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch); disruptor.handleEventsWith( new EventProcessorFactory<TestEvent>() { @Override public EventProcessor createEventProcessor( final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences) { assertEquals("Should not have had any barrier sequences", 0, barrierSequences.length); return new BatchEventProcessor<TestEvent>( disruptor.getRingBuffer(), ringBuffer.newBarrier( barrierSequences), eventHandler); } }); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch); }
@Test public void shouldHonourDependenciesForCustomProcessors() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(2); final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch); final DelayedEventHandler delayedEventHandler = createDelayedEventHandler(); disruptor.handleEventsWith(delayedEventHandler).then( new EventProcessorFactory<TestEvent>() { @Override public EventProcessor createEventProcessor( final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences) { assertSame("Should have had a barrier sequence", 1, barrierSequences.length); return new BatchEventProcessor<TestEvent>( disruptor.getRingBuffer(), ringBuffer.newBarrier( barrierSequences), eventHandler); } }); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler); }
public void init(Properties configProps) { // subscriptions = new SubscriptionsStore(); //Modified by WSO2 in-order to extend the capability of the existing subscriptions store //to be more suitable for the distribution architecture of Andes subscriptions = new MQTTSubscriptionStore(); ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("Disruptor MQTT Simple Messaging Thread %d").build(); ExecutorService executor = Executors.newCachedThreadPool(namedThreadFactory); Integer ringBufferSize = AndesConfigurationManager.readValue( AndesConfiguration.TRANSPORTS_MQTT_INBOUND_BUFFER_SIZE); disruptor = new Disruptor<ValueEvent>( ValueEvent.EVENT_FACTORY, ringBufferSize, executor); //Added by WSO2, we do not want to ignore the exception here disruptor.handleExceptionsWith(new MqttLogExceptionHandler()); SequenceBarrier barrier = disruptor.getRingBuffer().newBarrier(); BatchEventProcessor<ValueEvent> eventProcessor = new BatchEventProcessor<ValueEvent>( disruptor.getRingBuffer(), barrier, this); //Added by WSO2, we need to make sure the exceptions aren't ignored eventProcessor.setExceptionHandler(new MqttLogExceptionHandler()); disruptor.handleEventsWith(eventProcessor); m_ringBuffer = disruptor.start(); disruptorPublish(new InitEvent(configProps)); }
/** * @param subscriptions the subscription store where are stored all the existing * clients subscriptions. * @param storageService the persistent store to use for save/load of messages * for QoS1 and QoS2 handling. * @param authenticator the authenticator used in connect messages */ void init(SubscriptionsStore subscriptions, IStorageService storageService, IAuthenticator authenticator) { //m_clientIDs = clientIDs; this.subscriptions = subscriptions; m_authenticator = authenticator; m_storageService = storageService; //init the output ringbuffer m_executor = Executors.newFixedThreadPool(1); m_ringBuffer = new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY, 1024 * 32); SequenceBarrier barrier = m_ringBuffer.newBarrier(); m_eventProcessor = new BatchEventProcessor<ValueEvent>(m_ringBuffer, barrier, this); //TODO in a presentation is said to don't do the followinf line!! m_ringBuffer.setGatingSequences(m_eventProcessor.getSequence()); m_executor.submit(m_eventProcessor); }
EventHandlerGroup<T> createEventProcessors( final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) { checkNotStarted(); final Sequence[] processorSequences = new Sequence[eventHandlers.length]; final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler<? super T> eventHandler = eventHandlers[i]; final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler); if (exceptionHandler != null) { batchEventProcessor.setExceptionHandler(exceptionHandler); } consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } if (processorSequences.length > 0) { consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); } return new EventHandlerGroup<T>(this, consumerRepository, processorSequences); }
/** * Specify the {@link ExceptionHandler} to use with the event handler. * * @param exceptionHandler the exception handler to use. */ public void with(ExceptionHandler<? super T> exceptionHandler) { ((BatchEventProcessor<T>) consumerRepository.getEventProcessorFor(eventHandler)) .setExceptionHandler(exceptionHandler); consumerRepository.getBarrierFor(eventHandler).alert(); }
private void doRun() throws InterruptedException { BatchEventProcessor<EventHolder> batchEventProcessor = new BatchEventProcessor<EventHolder>( ringBuffer, ringBuffer.newBarrier(), eventHolderHandler); ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); Thread t = new Thread(batchEventProcessor); t.start(); long iterations = Constants.ITERATIONS; for (long l = 0; l < iterations; l++) { SimpleEvent e = new SimpleEvent(l, l, l, l); ringBuffer.publishEvent(TRANSLATOR, e); } while (batchEventProcessor.getSequence().get() != iterations - 1) { LockSupport.parkNanos(1); } batchEventProcessor.halt(); t.join(); }
public BatchEventProcessor<EventAccessor<T>> createHandler(final EventHandler<T> handler) { BatchEventProcessor<EventAccessor<T>> processor = new BatchEventProcessor<EventAccessor<T>>( this, sequencer.newBarrier(), new AccessorEventHandler<T>(handler)); sequencer.addGatingSequences(processor.getSequence()); return processor; }
public BatchEventProcessor<LongEvent> createProcessor(final LongHandler handler) { return new BatchEventProcessor<LongEvent>( new LongEvent(), sequencer.newBarrier(), new EventHandler<LongEvent>() { @Override public void onEvent(final LongEvent event, final long sequence, final boolean endOfBatch) throws Exception { handler.onEvent(event.get(), sequence, endOfBatch); } }); }
public static void main(String[] args) { final List<ValueEventHandler1PMC> handlers = new ArrayList<>(NUMBER_CONSUMERS); RingBuffer<ValueEvent> ringBuffer = RingBuffer.createSingleProducer( ValueEvent.EVENT_FACTORY, RING_SIZE, new SleepingWaitStrategy()); start = System.nanoTime(); //Create consumers for(int i = 0; i < NUMBER_CONSUMERS; i++) { ValueEventHandler1PMC handler = new ValueEventHandler1PMC(start, handlers); handlers.add(handler); SequenceBarrier barrier = ringBuffer.newBarrier(); BatchEventProcessor<ValueEvent> eventProcessor = new BatchEventProcessor<ValueEvent>( ringBuffer, barrier, handler); ringBuffer.addGatingSequences(eventProcessor.getSequence()); // Each EventProcessor can run on a separate thread EXECUTOR.submit(eventProcessor); } for(int i = 0; i < SAMPLES_SIZE; i++) { // Publishers claim events in sequence long sequence = ringBuffer.next(); ValueEvent event = ringBuffer.get(sequence); event.setValue(i); // this could be more complex with multiple fields // make the event available to EventProcessors ringBuffer.publish(sequence); } }
public DisruptorEventExecutor() { // ringBuffer = RingBuffer.createMultiProducer(WaitingEvent.EVENT_FACTORY, 128, PhasedBackoffWaitStrategy.withLock(1, 1, TimeUnit.MILLISECONDS)); ringBuffer = RingBuffer.createMultiProducer(WaitingEvent.EVENT_FACTORY, 8192, new BlockingWaitStrategy()); BatchEventProcessor<WaitingEvent> processor = new BatchEventProcessor<WaitingEvent>(ringBuffer, ringBuffer.newBarrier(), new DisruptorHandler()); ringBuffer.addGatingSequences(processor.getSequence()); executeThread = new Thread(processor); executeThread.setName("disruptEventExecutor"); executeThread.start(); }
public void init(Properties configProps) { subscriptions = new SubscriptionsStore(); m_executor = Executors.newFixedThreadPool(1); m_ringBuffer = new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY, 1024 * 32); SequenceBarrier barrier = m_ringBuffer.newBarrier(); m_eventProcessor = new BatchEventProcessor<ValueEvent>(m_ringBuffer, barrier, this); //TODO in a presentation is said to don't do the followinf line!! m_ringBuffer.setGatingSequences(m_eventProcessor.getSequence()); m_executor.submit(m_eventProcessor); disruptorPublish(new InitEvent(configProps)); }
public void init() { m_executor = Executors.newFixedThreadPool(1); m_ringBuffer = new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY, 1024 * 32); SequenceBarrier barrier = m_ringBuffer.newBarrier(); m_eventProcessor = new BatchEventProcessor<ValueEvent>(m_ringBuffer, barrier, this); //TODO in a presentation is said to don't do the followinf line!! m_ringBuffer.setGatingSequences(m_eventProcessor.getSequence()); m_executor.submit(m_eventProcessor); disruptorPublish(new InitEvent()); }
public static void main(String[] args) { RingBuffer<ValueEvent> ringBuffer = RingBuffer.createSingleProducer( ValueEvent.EVENT_FACTORY, RING_SIZE, new SleepingWaitStrategy()); start = System.nanoTime(); //Create first consumer ValueEventHandler1PMCSequenceFirst handler = new ValueEventHandler1PMCSequenceFirst(start); SequenceBarrier barrier; barrier = ringBuffer.newBarrier(); BatchEventProcessor<ValueEvent> firstEventProcessor = new BatchEventProcessor<ValueEvent>( ringBuffer, barrier, handler); // ringBuffer.addGatingSequences(firstEventProcessor.getSequence()); // Each EventProcessor can run on a separate thread EXECUTOR.submit(firstEventProcessor); //Create second consumer ValueEventHandler1PMCSequenceSecond handler2 = new ValueEventHandler1PMCSequenceSecond(start); SequenceBarrier barrier2 = ringBuffer.newBarrier(firstEventProcessor.getSequence()); BatchEventProcessor<ValueEvent> secondEventProcessor = new BatchEventProcessor<ValueEvent>( ringBuffer, barrier2, handler2); ringBuffer.addGatingSequences(secondEventProcessor.getSequence()); // Each EventProcessor can run on a separate thread EXECUTOR.submit(secondEventProcessor); for(int i = 0; i < SAMPLES_SIZE; i++) { // Publishers claim events in sequence long sequence = ringBuffer.next(); ValueEvent event = ringBuffer.get(sequence); event.setValue(i); // this could be more complex with multiple fields // make the event available to EventProcessors ringBuffer.publish(sequence); } }
/** * Specify the {@link ExceptionHandler} to use with the event handler. * * @param exceptionHandler the exception handler to use. */ public void with(ExceptionHandler exceptionHandler) { ((BatchEventProcessor<?>) consumerRepository.getEventProcessorFor(eventHandler)).setExceptionHandler(exceptionHandler); consumerRepository.getBarrierFor(eventHandler).alert(); }