@SuppressWarnings("unchecked") @Test public void shouldProcessEachMessageByOnlyOneWorker() throws Exception { Executor executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE); WorkerPool<AtomicLong> pool = new WorkerPool<AtomicLong>( new AtomicLongEventFactory(), new FatalExceptionHandler(), new AtomicLongWorkHandler(), new AtomicLongWorkHandler()); RingBuffer<AtomicLong> ringBuffer = pool.start(executor); ringBuffer.next(); ringBuffer.next(); ringBuffer.publish(0); ringBuffer.publish(1); Thread.sleep(500); assertThat(ringBuffer.get(0).get(), is(1L)); assertThat(ringBuffer.get(1).get(), is(1L)); }
@SuppressWarnings("unchecked") @Test public void shouldProcessOnlyOnceItHasBeenPublished() throws Exception { Executor executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE); WorkerPool<AtomicLong> pool = new WorkerPool<AtomicLong>( new AtomicLongEventFactory(), new FatalExceptionHandler(), new AtomicLongWorkHandler(), new AtomicLongWorkHandler()); RingBuffer<AtomicLong> ringBuffer = pool.start(executor); ringBuffer.next(); ringBuffer.next(); Thread.sleep(1000); assertThat(ringBuffer.get(0).get(), is(0L)); assertThat(ringBuffer.get(1).get(), is(0L)); }
@SuppressWarnings("unchecked") public ArchiveProcessor(final EventFactory<E> eventFactory, int ringBufferSize, final Class<E> eventClass, final int batchSize, final ArchiveBatchHandler<E> batchHandler) { eventDisruptor = new Disruptor<>(eventFactory, ringBufferSize, DaemonThreadFactory.INSTANCE); eventDisruptor.handleEventsWith(new ArchiveEventHandler<>(eventClass, batchSize, batchHandler)); running = false; archiveThread = new Thread(this, "archiveProcessor"); }
public static void main(String[] args) { Executor executor = Executors.newFixedThreadPool(4); Disruptor<DataEvent> disruptor = new Disruptor<DataEvent>( DataEvent.FACTORY, 1024, DaemonThreadFactory.INSTANCE); TransformingHandler handler1 = new TransformingHandler(0); TransformingHandler handler2 = new TransformingHandler(1); TransformingHandler handler3 = new TransformingHandler(2); CollatingHandler collator = new CollatingHandler(); disruptor.handleEventsWith(handler1, handler2, handler3).then(collator); disruptor.start(); }
public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE); // Build a disruptor and start it. Disruptor<StubEvent> disruptor = new Disruptor<StubEvent>( StubEvent.EVENT_FACTORY, 1024, DaemonThreadFactory.INSTANCE); RingBuffer<StubEvent> ringBuffer = disruptor.start(); // Construct 2 batch event processors. DynamicHandler handler1 = new DynamicHandler(); BatchEventProcessor<StubEvent> processor1 = new BatchEventProcessor<StubEvent>(ringBuffer, ringBuffer.newBarrier(), handler1); DynamicHandler handler2 = new DynamicHandler(); BatchEventProcessor<StubEvent> processor2 = new BatchEventProcessor<StubEvent>(ringBuffer, ringBuffer.newBarrier(processor1.getSequence()), handler2); // Dynamically add both sequences to the ring buffer ringBuffer.addGatingSequences(processor1.getSequence(), processor2.getSequence()); // Start the new batch processors. executor.execute(processor1); executor.execute(processor2); // Remove a processor. // Stop the processor processor2.halt(); // Wait for shutdown the complete handler2.awaitShutdown(); // Remove the gating sequence from the ring buffer ringBuffer.removeGatingSequence(processor2.getSequence()); }
public TransactionLog2() { name = "test"; currentId = new AtomicLong(0); executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE); disruptor = new Disruptor<>(Transaction::new, 1024, executor); ringBuffer = disruptor.start(); processors = new ArrayList<>(); handlers = new ArrayList<>(); }
private FileWriter() { disruptor = new Disruptor<LogMessageHolder>(new EventFactory<LogMessageHolder>() { @Override public LogMessageHolder newInstance() { return new LogMessageHolder(); } }, 1024, DaemonThreadFactory.INSTANCE); disruptor.handleEventsWith(this); buffer = disruptor.getRingBuffer(); lineNum = 0; disruptor.start(); }
@SuppressWarnings("unchecked") @Test public void shouldBatch() throws Exception { Disruptor<LongEvent> d = new Disruptor<LongEvent>( LongEvent.FACTORY, 2048, DaemonThreadFactory.INSTANCE, producerType, new SleepingWaitStrategy()); ParallelEventHandler handler1 = new ParallelEventHandler(1, 0); ParallelEventHandler handler2 = new ParallelEventHandler(1, 1); d.handleEventsWith(handler1, handler2); RingBuffer<LongEvent> buffer = d.start(); EventTranslator<LongEvent> translator = new EventTranslator<LongEvent>() { @Override public void translateTo(LongEvent event, long sequence) { event.set(sequence); } }; int eventCount = 10000; for (int i = 0; i < eventCount; i++) { buffer.publishEvent(translator); } while (handler1.processed != eventCount - 1 || handler2.processed != eventCount - 1) { Thread.sleep(1); } Assert.assertThat(handler1.publishedValue, CoreMatchers.is((long) eventCount - 2)); Assert.assertThat(handler1.eventCount, CoreMatchers.is((long) eventCount / 2)); Assert.assertThat(handler2.publishedValue, CoreMatchers.is((long) eventCount - 1)); Assert.assertThat(handler2.eventCount, CoreMatchers.is((long) eventCount / 2)); }