Java 类com.lmax.disruptor.dsl.EventHandlerGroup 实例源码

项目:disruptor-spring-manager    文件:DefaultDisruptorConfig.java   
private void disruptorEventHandlerChain() {
    for(int i=0;i<eventHandlerChain.length;i++){
        EventHandlerChain<T> eventHandlersChain = eventHandlerChain[i];
        EventHandlerGroup<T> eventHandlerGroup = null;
        if(i == 0){
            eventHandlerGroup = getDisruptor().handleEventsWith(eventHandlersChain.getCurrentEventHandlers());
        }else{
            eventHandlerGroup = getDisruptor().after(eventHandlersChain.getCurrentEventHandlers());
        }

        if(! ArrayUtils.isEmpty(eventHandlersChain.getNextEventHandlers())){
            eventHandlerGroup.then(eventHandlersChain.getNextEventHandlers());
        }
    }

    getEventProcessorGraph();
}
项目:disruptorDemo    文件:Demo3.java   
public static void main(String[] args) throws InterruptedException {
        long beginTime=System.currentTimeMillis();

        int bufferSize=1024;
        ExecutorService executor=Executors.newFixedThreadPool(4);
        //������캯�����������������˽�����2��demo֮��Ϳ��¾������ˣ���������~
        Disruptor<TradeTransaction> disruptor=new Disruptor<TradeTransaction>(new EventFactory<TradeTransaction>() {
            @Override
            public TradeTransaction newInstance() {
                return new TradeTransaction();
            }
        }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());

        //ʹ��disruptor������������C1,C2
        EventHandlerGroup<TradeTransaction> handlerGroup=disruptor.handleEventsWith(new TradeTransactionVasConsumer(),new TradeTransactionInDBHandler());

        TradeTransactionJMSNotifyHandler jmsConsumer=new TradeTransactionJMSNotifyHandler();
        //������C1,C2����֮��ִ��JMS��Ϣ���Ͳ��� Ҳ���������ߵ�C3
        handlerGroup.then(jmsConsumer);


        disruptor.start();//����
        CountDownLatch latch=new CountDownLatch(1);
        //������׼��
        executor.submit(new TradeTransactionPublisher(latch, disruptor));
        latch.await();//�ȴ�����������.
        disruptor.shutdown();
        executor.shutdown();

        System.out.println("�ܺ�ʱ:"+(System.currentTimeMillis()-beginTime));
//      long tt= System.currentTimeMillis();
//      for (int i = 0; i < 1000; i++) {
//          int j=i;
//      }
//      System.out.println("�ܺ�ʱ:"+(System.currentTimeMillis()-tt));

    }
项目:reveno    文件:DisruptorPipeProcessor.java   
protected void attachHandlers(Disruptor<T> disruptor) {
    List<EventHandler<T>[]> disruptorHandlers = handlers.stream()
            .<EventHandler<T>[]> map(this::convert)
            .collect(Collectors.toList());

    EventHandlerGroup<T> h = disruptor.handleEventsWith(disruptorHandlers.get(0));
    for (int i = 1; i < disruptorHandlers.size(); i++)
        h = h.then(disruptorHandlers.get(i));
}
项目:hologram    文件:DisruptorTests.java   
public static void main(String[] args) throws Exception
{
    // Executor that will be used to construct new threads for consumers
    Executor executor = Executors.newCachedThreadPool();

    // Specify the size of the ring buffer, must be power of 2.
    int bufferSize = 1024;

    // Construct the Disruptor
    Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);

    // Connect the handler
    disruptor.handleEventsWith(DisruptorTests::handleEvent);

    EventHandlerGroup<LongEvent> g = disruptor.handleEventsWith(DisruptorTests::handleEvent);



    // Start the Disruptor, starts all threads running
    disruptor.start();

    // Get the ring buffer from the Disruptor to be used for publishing.
    RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

    //LongEventProducer producer = new LongEventProducer(ringBuffer);

    ByteBuffer bb = ByteBuffer.allocate(8);
    for (long l = 0; true; l++)
    {
        bb.putLong(0, l);
        ringBuffer.publishEvent(DisruptorTests::translate, bb);
        Thread.sleep(1000);
    }
}