@Setup public void setup() { executor = Executors.newSingleThreadExecutor(); disruptor = new Disruptor<LongEvent>(LongEvent.EVENT_FACTORY, executor, new SingleThreadedClaimStrategy(Run.QUEUE_SIZE), new BusySpinWaitStrategy()); eventCount = new AtomicInteger(); handler = (event, sequence, endOfBatch) -> { if(Run.LONGVAL == event.getValue()) { eventCount.incrementAndGet(); } else { throw new RuntimeException("Failed."); } }; disruptor.handleEventsWith(handler); ringBuffer = disruptor.start(); }
@Setup public void setup() { executor = Executors.newFixedThreadPool(Run.NTHREAD); disruptor = new Disruptor<LongEvent>(LongEvent.EVENT_FACTORY, executor, new MultiThreadedClaimStrategy(Run.QUEUE_SIZE), new BusySpinWaitStrategy()); eventCount = new AtomicInteger(); handler = (event, sequence, endOfBatch) -> { if(Run.LONGVAL == event.getValue()) { eventCount.incrementAndGet(); } else { throw new RuntimeException("Failed."); } }; disruptor.handleEventsWith(handler); ringBuffer = disruptor.start(); }
@Override protected void configure() { switch (config.getWaitStrategyEnum()) { // A low-cpu usage Disruptor configuration for using in local/test environments case LOW_CPU: bind(WaitStrategy.class).annotatedWith(Names.named("PersistenceStrategy")).to(BlockingWaitStrategy.class); bind(WaitStrategy.class).annotatedWith(Names.named("ReplyStrategy")).to(BlockingWaitStrategy.class); bind(WaitStrategy.class).annotatedWith(Names.named("RetryStrategy")).to(BlockingWaitStrategy.class); break; // The default high-cpu usage Disruptor configuration for getting high throughput on production environments case HIGH_THROUGHPUT: default: bind(WaitStrategy.class).annotatedWith(Names.named("PersistenceStrategy")).to(BusySpinWaitStrategy.class); bind(WaitStrategy.class).annotatedWith(Names.named("ReplyStrategy")).to(BusySpinWaitStrategy.class); bind(WaitStrategy.class).annotatedWith(Names.named("RetryStrategy")).to(YieldingWaitStrategy.class); break; } bind(RequestProcessor.class).to(RequestProcessorImpl.class).in(Singleton.class); bind(PersistenceProcessor.class).to(PersistenceProcessorImpl.class).in(Singleton.class); bind(ReplyProcessor.class).to(ReplyProcessorImpl.class).in(Singleton.class); bind(RetryProcessor.class).to(RetryProcessorImpl.class).in(Singleton.class); }
/** * Start dispatching events * * @return a Future that asyncrhonously notifies an observer when this EventReactor is ready */ @SuppressWarnings("unchecked") public CompletableFuture<? extends EventReactor<T>> open() { if (isRunning.compareAndSet(false, true)) { CompletableFuture<EventReactor<T>> future = new CompletableFuture<>(); this.disruptor = new Disruptor<>(EVENT_FACTORY, ringSize, threadFactory, ProducerType.MULTI, new BusySpinWaitStrategy()); this.disruptor.handleEventsWith(this::handleEvent); this.ringBuffer = disruptor.start(); // Starts a timer thread this.timer = new Timer(); future.complete(this); return future; } else { return CompletableFuture.completedFuture(this); } }
public static void main(String[] args) throws InterruptedException { long beginTime=System.currentTimeMillis(); int bufferSize=1024; ExecutorService executor=Executors.newFixedThreadPool(4); //������캯�����������������˽�����2��demo֮��Ϳ��¾������ˣ���������~ Disruptor<TradeTransaction> disruptor=new Disruptor<TradeTransaction>(new EventFactory<TradeTransaction>() { @Override public TradeTransaction newInstance() { return new TradeTransaction(); } }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); //ʹ��disruptor������������C1,C2 EventHandlerGroup<TradeTransaction> handlerGroup=disruptor.handleEventsWith(new TradeTransactionVasConsumer(),new TradeTransactionInDBHandler()); TradeTransactionJMSNotifyHandler jmsConsumer=new TradeTransactionJMSNotifyHandler(); //������C1,C2����֮��ִ��JMS��Ϣ���Ͳ��� Ҳ���������ߵ�C3 handlerGroup.then(jmsConsumer); disruptor.start();//���� CountDownLatch latch=new CountDownLatch(1); //�������� executor.submit(new TradeTransactionPublisher(latch, disruptor)); latch.await();//�ȴ�����������. disruptor.shutdown(); executor.shutdown(); System.out.println("�ܺ�ʱ:"+(System.currentTimeMillis()-beginTime)); // long tt= System.currentTimeMillis(); // for (int i = 0; i < 1000; i++) { // int j=i; // } // System.out.println("�ܺ�ʱ:"+(System.currentTimeMillis()-tt)); }
@SuppressWarnings("unchecked") public void start() { if (isRunning.compareAndSet(false, true)) { this.disruptor = new Disruptor<>(BufferEvent.EVENT_FACTORY, ringSize, threadFactory, ProducerType.SINGLE, new BusySpinWaitStrategy()); this.disruptor.handleEventsWith(this::handleEvent); this.ringBuffer = disruptor.start(); } }
@Override public CompletableFuture<? extends Service> open() { if (isRunning.compareAndSet(false, true)) { this.disruptor = new Disruptor<>(BufferEvent.EVENT_FACTORY, ringSize, threadFactory, ProducerType.SINGLE, new BusySpinWaitStrategy()); this.disruptor.handleEventsWith(this::onEvent); this.ringBuffer = disruptor.start(); } return CompletableFuture.completedFuture(this); }
@Test public void test_All_WaitStrategies() { assertTrue(WaitStrategyType.BLOCKING.instance() instanceof BlockingWaitStrategy); assertTrue(WaitStrategyType.BUSY_SPIN.instance() instanceof BusySpinWaitStrategy); assertTrue(WaitStrategyType.LITE_BLOCKING.instance() instanceof LiteBlockingWaitStrategy); assertTrue(WaitStrategyType.SLEEPING_WAIT.instance() instanceof SleepingWaitStrategy); assertTrue(WaitStrategyType.YIELDING.instance() instanceof YieldingWaitStrategy); }
public static WaitStrategy createFromType(String name) { if ("BusySpin".equalsIgnoreCase(name)) { return new BusySpinWaitStrategy(); } else if ("Blocking".equalsIgnoreCase(name)) { return new BlockingWaitStrategy(); } else if ("Yielding".equalsIgnoreCase(name)) { return new YieldingWaitStrategy(); } else if ("Sleeping".equalsIgnoreCase(name)) { return new SleepingWaitStrategy(); } else { throw new IllegalArgumentException("Invalid or unsupported wait strategy type '" + name + "'"); } }
@Setup public void setup() { if (opWorkRatio < 0) throw new IllegalStateException(); String[] opSleepArgs = opSleep.split("/"); opSleepChance = Float.parseFloat(opSleepArgs[0]); opSleepNanos = Long.parseLong(opSleepArgs[1]) * 1000L; opWorkTokens = (int) Math.ceil(opWork * opWorkRatio * (1d / executorChainLength)); String[] taskArgs = tasks.split(":"); int concurrentRequests = (int) (threads * Double.parseDouble(taskArgs[0])); int maxTasksQueued = (int) (threads * Double.parseDouble(taskArgs[1])); final InjectorPlus injector = new InjectorPlus(""); exec = new ExecutorPlus[executorChainLength]; workGate = new Semaphore(concurrentRequests, false); for (int i = 0 ; i < exec.length ; i++) { switch (ExecutorType.valueOf(type)) { case INJECTOR: exec[i] = injector.newExecutor(threads, maxTasksQueued); break; case JDK: exec[i] = new BlockingThreadPoolExecutor(threads, maxTasksQueued); break; case FJP: exec[i] = new BlockingForkJoinPool(threads, maxTasksQueued); break; case DISRUPTOR_SPIN: exec[i] = new DisruptorExecutor(threads, maxTasksQueued, new BusySpinWaitStrategy()); break; case DISRUPTOR_BLOCK: exec[i] = new DisruptorExecutor(threads, maxTasksQueued, new BlockingWaitStrategy()); break; } } }
@Test public void testAverageLatency() throws InterruptedException, ExecutionException{ start = 0; end = 0; diff = 0; final ExecutorService es = Executors.newCachedThreadPool(); // The factory for the event LongEventFactory factory = new LongEventFactory(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; // Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, es, ProducerType.SINGLE, new BusySpinWaitStrategy()); // Connect the handler disruptor.handleEventsWith(new LongEventHandler()); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer prod = new LongEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); final Runnable producer = new Runnable(){ public void run() { start = System.nanoTime(); bb.putLong(0, 1); prod.onData(bb); } }; // Start the Disruptor, starts all threads running disruptor.start(); Thread.sleep(1000); int size = 1000000; float percentageError = 0.05f; for (int i=0;i<size;i++){ es.submit(producer); while(!eventRecieved){ Thread.sleep(100); } eventRecieved = false; } double sum = 0; for (Long r : results){ sum = r + sum; } long min = Collections.min(results); System.out.println("min nano: "+min); long max = Collections.max(results); System.out.println("max nano: "+max); System.out.println("results: "+results); System.out.println("average nano, minus anomalies: "+((sum-max-min)/(double)(results.size()-2))); System.out.println("average nano, minus anomalies: "+averageNanoMinusAnomalies(results,(int)(size*percentageError))); assertTrue(((sum-max-min)/(double)(results.size()-2))<13000); }
@Test public void testLatency() throws InterruptedException, ExecutionException{ start = 0; end = 0; diff = 0; final ExecutorService es = Executors.newFixedThreadPool(2); // The factory for the event LongEventFactory factory = new LongEventFactory(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; // Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, es, ProducerType.SINGLE, new BusySpinWaitStrategy()); // Connect the handler disruptor.handleEventsWith(new LongEventHandler()); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer prod = new LongEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); final Runnable producer = new Runnable(){ public void run() { start = System.nanoTime(); bb.putLong(0, 1); prod.onData(bb); } }; // Start the Disruptor, starts all threads running disruptor.start(); Thread.sleep(1000); es.submit(producer); Thread.sleep(1000); long nano = diff; System.out.println("nano: "+nano); System.out.println("micro: "+(nano/1000)); assertTrue((nano/1000)<100); }