@Override protected void configure() { switch (config.getWaitStrategyEnum()) { // A low-cpu usage Disruptor configuration for using in local/test environments case LOW_CPU: bind(WaitStrategy.class).annotatedWith(Names.named("PersistenceStrategy")).to(BlockingWaitStrategy.class); bind(WaitStrategy.class).annotatedWith(Names.named("ReplyStrategy")).to(BlockingWaitStrategy.class); bind(WaitStrategy.class).annotatedWith(Names.named("RetryStrategy")).to(BlockingWaitStrategy.class); break; // The default high-cpu usage Disruptor configuration for getting high throughput on production environments case HIGH_THROUGHPUT: default: bind(WaitStrategy.class).annotatedWith(Names.named("PersistenceStrategy")).to(BusySpinWaitStrategy.class); bind(WaitStrategy.class).annotatedWith(Names.named("ReplyStrategy")).to(BusySpinWaitStrategy.class); bind(WaitStrategy.class).annotatedWith(Names.named("RetryStrategy")).to(YieldingWaitStrategy.class); break; } bind(RequestProcessor.class).to(RequestProcessorImpl.class).in(Singleton.class); bind(PersistenceProcessor.class).to(PersistenceProcessorImpl.class).in(Singleton.class); bind(ReplyProcessor.class).to(ReplyProcessorImpl.class).in(Singleton.class); bind(RetryProcessor.class).to(RetryProcessorImpl.class).in(Singleton.class); }
@Test(timeOut = 10_000) public void testRetriedRequestForAnExistingTxReturnsCommit() throws Exception { ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool(); // The element to test RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool); // Test we'll reply with a commit for a retry request when the start timestamp IS in the commit table commitTable.getWriter().addCommittedTransaction(ST_TX_1, CT_TX_1); retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics)); ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class); ArgumentCaptor<Long> secondTSCapture = ArgumentCaptor.forClass(Long.class); verify(replyProc, timeout(100).times(1)).sendCommitResponse(firstTSCapture.capture(), secondTSCapture.capture(), any(Channel.class)); long startTS = firstTSCapture.getValue(); long commitTS = secondTSCapture.getValue(); assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as ST_TX_1"); assertEquals(commitTS, CT_TX_1, "Captured timestamp should be the same as CT_TX_1"); }
@Test(timeOut = 10_000) public void testRetriedRequestForInvalidatedTransactionReturnsAnAbort() throws Exception { // Invalidate the transaction commitTable.getClient().tryInvalidateTransaction(ST_TX_1); // Pre-start verification: Validate that the transaction is invalidated // NOTE: This test should be in the a test class for InMemoryCommitTable Optional<CommitTimestamp> invalidTxMarker = commitTable.getClient().getCommitTimestamp(ST_TX_1).get(); Assert.assertTrue(invalidTxMarker.isPresent()); Assert.assertEquals(invalidTxMarker.get().getValue(), InMemoryCommitTable.INVALID_TRANSACTION_MARKER); ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool(); // The element to test RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool); // Test we return an Abort to a retry request when the transaction id IS in the commit table BUT invalidated retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics)); ArgumentCaptor<Long> startTSCapture = ArgumentCaptor.forClass(Long.class); verify(replyProc, timeout(100).times(1)).sendAbortResponse(startTSCapture.capture(), any(Channel.class)); long startTS = startTSCapture.getValue(); Assert.assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as NON_EXISTING_ST_TX"); }
@SuppressWarnings("unchecked") static void qndSingleThread(int numItems) { final AtomicLong COUNTER_RECEIVED = new AtomicLong(0); final Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent.FACTORY, 128, Executors.defaultThreadFactory(), ProducerType.MULTI, new YieldingWaitStrategy()); disruptor.handleEventsWith( (event, sequence, endOfBatch) -> COUNTER_RECEIVED.incrementAndGet()); disruptor.start(); final long t = System.currentTimeMillis(); for (int i = 0; i < numItems; i++) { disruptor.publishEvent((event, seq) -> event.set(seq)); } long d = System.currentTimeMillis() - t; NumberFormat nf = NumberFormat.getInstance(); System.out.println("========== qndSingleThread:"); System.out.println("Sent: " + nf.format(numItems) + " / Received: " + nf.format(COUNTER_RECEIVED.get()) + " / Duration: " + d + " / Speed: " + NumberFormat.getInstance().format((numItems * 1000.0 / d)) + " items/sec"); disruptor.shutdown(); }
public void init() throws NullPointerException { if (factory == null) { throw new NullPointerException("factory == null"); } if (bufferSize <= 0) { throw new NullPointerException("bufferSize <= 0"); } if (threadFactory == null) { throw new NullPointerException("threadFactory == null"); } producers = new ArrayList<ExchangeEventProducer<T>>(); disruptor = new Disruptor<Exchange>(factory, bufferSize, threadFactory, ProducerType.SINGLE, new YieldingWaitStrategy()); }
private static WaitStrategy createWaitStrategy() { final String strategy = System.getProperty("AsyncLogger.WaitStrategy"); LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy); if ("Sleep".equals(strategy)) { LOGGER.debug("disruptor event handler uses SleepingWaitStrategy"); return new SleepingWaitStrategy(); } else if ("Yield".equals(strategy)) { LOGGER.debug("disruptor event handler uses YieldingWaitStrategy"); return new YieldingWaitStrategy(); } else if ("Block".equals(strategy)) { LOGGER.debug("disruptor event handler uses BlockingWaitStrategy"); return new BlockingWaitStrategy(); } LOGGER.debug("disruptor event handler uses SleepingWaitStrategy"); return new SleepingWaitStrategy(); }
public static void main(String[] args) { EventFactory<HelloEvent> eventFactory = new HelloEventFactory(); int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必须是 2 的 N 次方; Disruptor<HelloEvent> disruptor = new Disruptor<HelloEvent>( eventFactory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); EventHandler<HelloEvent> eventHandler = new HelloEventHandler(); disruptor.handleEventsWith(eventHandler, eventHandler); disruptor.start(); }
@Test(timeOut = 10_000) public void testRetriedRequestForANonExistingTxReturnsAbort() throws Exception { ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool(); // The element to test RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool); // Test we'll reply with an abort for a retry request when the start timestamp IS NOT in the commit table retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, new MonitoringContext(metrics)); ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class); verify(replyProc, timeout(100).times(1)).sendAbortResponse(firstTSCapture.capture(), any(Channel.class)); long startTS = firstTSCapture.getValue(); assertEquals(startTS, NON_EXISTING_ST_TX, "Captured timestamp should be the same as NON_EXISTING_ST_TX"); }
@SuppressWarnings("unchecked") public OneToOneTranslatorThroughputTest() { Disruptor<ValueEvent> disruptor = new Disruptor<ValueEvent>( ValueEvent.EVENT_FACTORY, BUFFER_SIZE, executor, ProducerType.SINGLE, new YieldingWaitStrategy()); disruptor.handleEventsWith(handler); this.ringBuffer = disruptor.start(); }
@SuppressWarnings("unchecked") static void qndMultiThreads(int numItems, int numThreads) throws InterruptedException { final AtomicLong COUNTER_RECEIVED = new AtomicLong(0); final Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent.FACTORY, 128, Executors.defaultThreadFactory(), ProducerType.MULTI, new YieldingWaitStrategy()); disruptor.handleEventsWith( (event, sequence, endOfBatch) -> COUNTER_RECEIVED.incrementAndGet()); disruptor.start(); final long t = System.currentTimeMillis(); final int numItemsPerThread = numItems / numThreads; final Thread[] THREADS = new Thread[numThreads]; for (int i = 0; i < THREADS.length; i++) { THREADS[i] = new Thread() { public void run() { for (int i = 0; i < numItemsPerThread; i++) { disruptor.publishEvent((event, seq) -> event.set(seq)); } } }; THREADS[i].start(); } for (Thread thread : THREADS) { thread.join(); } long d = System.currentTimeMillis() - t; NumberFormat nf = NumberFormat.getInstance(); System.out.println("========== qndSingleThread:"); System.out.println("Sent: " + nf.format(numItems) + " / Received: " + nf.format(COUNTER_RECEIVED.get()) + " / Duration: " + d + " / Speed: " + NumberFormat.getInstance().format((numItems * 1000.0 / d)) + " items/sec"); disruptor.shutdown(); }
@Test public void test_All_WaitStrategies() { assertTrue(WaitStrategyType.BLOCKING.instance() instanceof BlockingWaitStrategy); assertTrue(WaitStrategyType.BUSY_SPIN.instance() instanceof BusySpinWaitStrategy); assertTrue(WaitStrategyType.LITE_BLOCKING.instance() instanceof LiteBlockingWaitStrategy); assertTrue(WaitStrategyType.SLEEPING_WAIT.instance() instanceof SleepingWaitStrategy); assertTrue(WaitStrategyType.YIELDING.instance() instanceof YieldingWaitStrategy); }
public static WaitStrategy createFromType(String name) { if ("BusySpin".equalsIgnoreCase(name)) { return new BusySpinWaitStrategy(); } else if ("Blocking".equalsIgnoreCase(name)) { return new BlockingWaitStrategy(); } else if ("Yielding".equalsIgnoreCase(name)) { return new YieldingWaitStrategy(); } else if ("Sleeping".equalsIgnoreCase(name)) { return new SleepingWaitStrategy(); } else { throw new IllegalArgumentException("Invalid or unsupported wait strategy type '" + name + "'"); } }
private static WaitStrategy createWaitStrategy() { final String strategy = System .getProperty("AsyncLoggerConfig.WaitStrategy"); LOGGER.debug("property AsyncLoggerConfig.WaitStrategy={}", strategy); if ("Sleep".equals(strategy)) { return new SleepingWaitStrategy(); } else if ("Yield".equals(strategy)) { return new YieldingWaitStrategy(); } else if ("Block".equals(strategy)) { return new BlockingWaitStrategy(); } return new SleepingWaitStrategy(); }
/** * Create a pool of consumers, all reading from a ring buffer to process events. * * @param NUM_THREADS total number of consumers to create in the pool * @param totalEventsToPublish total events to publish to the queue * @param ringConsumers the placeholder for creating consumers * @throws InterruptedException if there is a problem talking to the blocking queue */ public void run(int NUM_THREADS, int totalEventsToPublish, RingEventHandlerConsumer<T>[] ringConsumers) throws InterruptedException { final int RING_BUFFER_SIZE = 1024; ExecutorService executor; if(NUM_THREADS == 1) { executor = Executors.newSingleThreadExecutor(); } else { executor = Executors.newFixedThreadPool(NUM_THREADS); } Disruptor<T> disruptor = new Disruptor<>(disruptorEventFactory, RING_BUFFER_SIZE, executor, ProducerType.SINGLE, new YieldingWaitStrategy()); for(int i = 0; i < NUM_THREADS; i++) { ringConsumers[i] = new RingEventHandlerConsumer<T>(i, NUM_THREADS, consumerFactory.createConsumer(i, NUM_THREADS)); } disruptor.handleEventsWith(ringConsumers); RingBuffer<T> eventRingBuffer = disruptor.start(); final long startTime = System.currentTimeMillis(); publishEvents(totalEventsToPublish, eventRingBuffer); disruptor.shutdown(); executor.shutdown(); final long endTime = System.currentTimeMillis(); logger.info("It took " + (endTime - startTime) + "ms to process " + totalEventsToPublish + " messages."); for(RingEventHandlerConsumer consumer : ringConsumers) { logger.info("Processed " + consumer.getMessagesProcessed() + " messages."); } }
@Bean public Disruptor<MarketEvent> disruptor() { return new Disruptor<>(() -> new MarketEvent(), 1024 * 1024, Executors.defaultThreadFactory(), ProducerType.MULTI, new YieldingWaitStrategy()); }
public CustomPerformanceTest() { ringBuffer = new CustomRingBuffer<SimpleEvent>(new SingleProducerSequencer(Constants.SIZE, new YieldingWaitStrategy())); }
public SimplePerformanceTest() { ringBuffer = RingBuffer.createSingleProducer(EventHolder.FACTORY, Constants.SIZE, new YieldingWaitStrategy()); eventHolderHandler = new EventHolderHandler(new SimpleEventHandler()); }
public AgileWaitingStrategy() { this(new BlockingWaitStrategy(), new YieldingWaitStrategy()); }
private static void bulkLoad( final Cache cache, long entriesToLoad,double[] bbox) { final AtomicLong atomicLong = new AtomicLong(); final ValueHandler[] valueHandlers = new ValueHandler[]{ new ValueHandler(cache, atomicLong), new ValueHandler(cache, atomicLong), new ValueHandler(cache, atomicLong), new ValueHandler(cache, atomicLong) }; final ExecutorService executorService = Executors.newCachedThreadPool(); try { final RingBuffer<ValueEvent> ringBuffer = RingBuffer.createSingleProducer(ValueEvent.EVENT_FACTORY, 4096, new YieldingWaitStrategy()); final WorkerPool<ValueEvent> workerPool = new WorkerPool<ValueEvent>(ringBuffer, ringBuffer.newBarrier(), new FatalExceptionHandler(), valueHandlers); workerPool.start(executorService); try { publishLoadEvents(ringBuffer,entriesToLoad,bbox); } finally { workerPool.drainAndHalt(); } } finally { executorService.shutdown(); } System.out.format("Put %d elements.", atomicLong.get()); }
public static void main(String[] args) { ThreadFactory threadFactory = Executors.defaultThreadFactory(); Disruptor<UserEvent> disruptor = new Disruptor<>(new UserEventFactory(), 1024, threadFactory, ProducerType.SINGLE, new YieldingWaitStrategy()); disruptor.handleEventsWith(new UserEventConsumer()); disruptor.start(); new UserEventProducer(disruptor.getRingBuffer()).doProcess(); disruptor.shutdown(); }