private void disruptorEventHandlerChain() { for(int i=0;i<eventHandlerChain.length;i++){ EventHandlerChain<T> eventHandlersChain = eventHandlerChain[i]; EventHandlerGroup<T> eventHandlerGroup = null; if(i == 0){ eventHandlerGroup = getDisruptor().handleEventsWith(eventHandlersChain.getCurrentEventHandlers()); }else{ eventHandlerGroup = getDisruptor().after(eventHandlersChain.getCurrentEventHandlers()); } if(! ArrayUtils.isEmpty(eventHandlersChain.getNextEventHandlers())){ eventHandlerGroup.then(eventHandlersChain.getNextEventHandlers()); } } getEventProcessorGraph(); }
public static void main(String[] args) throws InterruptedException { long beginTime=System.currentTimeMillis(); int bufferSize=1024; ExecutorService executor=Executors.newFixedThreadPool(4); //������캯�����������������˽�����2��demo֮��Ϳ��¾������ˣ���������~ Disruptor<TradeTransaction> disruptor=new Disruptor<TradeTransaction>(new EventFactory<TradeTransaction>() { @Override public TradeTransaction newInstance() { return new TradeTransaction(); } }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); //ʹ��disruptor������������C1,C2 EventHandlerGroup<TradeTransaction> handlerGroup=disruptor.handleEventsWith(new TradeTransactionVasConsumer(),new TradeTransactionInDBHandler()); TradeTransactionJMSNotifyHandler jmsConsumer=new TradeTransactionJMSNotifyHandler(); //������C1,C2����֮��ִ��JMS��Ϣ���Ͳ��� Ҳ���������ߵ�C3 handlerGroup.then(jmsConsumer); disruptor.start();//���� CountDownLatch latch=new CountDownLatch(1); //�������� executor.submit(new TradeTransactionPublisher(latch, disruptor)); latch.await();//�ȴ�����������. disruptor.shutdown(); executor.shutdown(); System.out.println("�ܺ�ʱ:"+(System.currentTimeMillis()-beginTime)); // long tt= System.currentTimeMillis(); // for (int i = 0; i < 1000; i++) { // int j=i; // } // System.out.println("�ܺ�ʱ:"+(System.currentTimeMillis()-tt)); }
protected void attachHandlers(Disruptor<T> disruptor) { List<EventHandler<T>[]> disruptorHandlers = handlers.stream() .<EventHandler<T>[]> map(this::convert) .collect(Collectors.toList()); EventHandlerGroup<T> h = disruptor.handleEventsWith(disruptorHandlers.get(0)); for (int i = 1; i < disruptorHandlers.size(); i++) h = h.then(disruptorHandlers.get(i)); }
public static void main(String[] args) throws Exception { // Executor that will be used to construct new threads for consumers Executor executor = Executors.newCachedThreadPool(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; // Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor); // Connect the handler disruptor.handleEventsWith(DisruptorTests::handleEvent); EventHandlerGroup<LongEvent> g = disruptor.handleEventsWith(DisruptorTests::handleEvent); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); //LongEventProducer producer = new LongEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent(DisruptorTests::translate, bb); Thread.sleep(1000); } }