Java 类com.lmax.disruptor.EventHandler 实例源码

项目:message-broker    文件:SharedMessageStore.java   
@SuppressWarnings("unchecked")
public SharedMessageStore(MessageDao messageDao, int bufferSize, int maxDbBatchSize) {

    pendingMessages = new ConcurrentHashMap<>();
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
            .setNameFormat("DisruptorMessageStoreThread-%d").build();

    disruptor = new Disruptor<>(DbOperation.getFactory(),
            bufferSize, namedThreadFactory, ProducerType.MULTI, new SleepingWaitStrategy());

    disruptor.setDefaultExceptionHandler(new LogExceptionHandler());

    disruptor.handleEventsWith(new DbEventMatcher(bufferSize))
            .then(new DbWriter(messageDao, maxDbBatchSize))
            .then((EventHandler<DbOperation>) (event, sequence, endOfBatch) -> event.clear());
    disruptor.start();
    this.messageDao = messageDao;
}
项目:disruptor-code-analysis    文件:MultiBufferBatchEventProcessor.java   
public MultiBufferBatchEventProcessor(
    DataProvider<T>[] providers,
    SequenceBarrier[] barriers,
    EventHandler<T> handler)
{
    if (providers.length != barriers.length)
    {
        throw new IllegalArgumentException();
    }

    this.providers = providers;
    this.barriers = barriers;
    this.handler = handler;

    this.sequences = new Sequence[providers.length];
    for (int i = 0; i < sequences.length; i++)
    {
        sequences[i] = new Sequence(-1);
    }
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldWaitOnAllProducersJoinedByAnd()
    throws Exception
{
    DelayedEventHandler handler1 = createDelayedEventHandler();
    DelayedEventHandler handler2 = createDelayedEventHandler();

    CountDownLatch countDownLatch = new CountDownLatch(2);
    EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch);

    disruptor.handleEventsWith(handler1);
    final EventHandlerGroup<TestEvent> handler2Group = disruptor.handleEventsWith(handler2);
    disruptor.after(handler1).and(handler2Group).handleEventsWith(handlerWithBarrier);

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, handler1, handler2);
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldSupportCustomProcessorsAsDependencies()
    throws Exception
{
    RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();

    final DelayedEventHandler delayedEventHandler = createDelayedEventHandler();

    CountDownLatch countDownLatch = new CountDownLatch(2);
    EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch);

    final BatchEventProcessor<TestEvent> processor =
        new BatchEventProcessor<TestEvent>(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler);
    disruptor.handleEventsWith(processor);
    disruptor.after(processor).handleEventsWith(handlerWithBarrier);

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler);
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldSupportHandlersAsDependenciesToCustomProcessors()
    throws Exception
{
    final DelayedEventHandler delayedEventHandler = createDelayedEventHandler();
    disruptor.handleEventsWith(delayedEventHandler);


    RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();
    CountDownLatch countDownLatch = new CountDownLatch(2);
    EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch);

    final SequenceBarrier sequenceBarrier = disruptor.after(delayedEventHandler).asSequenceBarrier();
    final BatchEventProcessor<TestEvent> processor =
        new BatchEventProcessor<TestEvent>(ringBuffer, sequenceBarrier, handlerWithBarrier);
    disruptor.handleEventsWith(processor);

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler);
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldSupportCustomProcessorsAndHandlersAsDependencies() throws Exception
{
    final DelayedEventHandler delayedEventHandler1 = createDelayedEventHandler();
    final DelayedEventHandler delayedEventHandler2 = createDelayedEventHandler();
    disruptor.handleEventsWith(delayedEventHandler1);


    RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();
    CountDownLatch countDownLatch = new CountDownLatch(2);
    EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch);

    final SequenceBarrier sequenceBarrier = disruptor.after(delayedEventHandler1).asSequenceBarrier();
    final BatchEventProcessor<TestEvent> processor =
        new BatchEventProcessor<TestEvent>(ringBuffer, sequenceBarrier, delayedEventHandler2);

    disruptor.after(delayedEventHandler1).and(processor).handleEventsWith(handlerWithBarrier);

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler1, delayedEventHandler2);
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldMakeEntriesAvailableToFirstCustomProcessorsImmediately() throws Exception
{
    final CountDownLatch countDownLatch = new CountDownLatch(2);
    final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch);

    disruptor.handleEventsWith(
                               new EventProcessorFactory<TestEvent>()
                               {
                                   @Override
                                   public EventProcessor createEventProcessor(
                                                                              final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences)
                                   {
                                       assertEquals("Should not have had any barrier sequences", 0, barrierSequences.length);
                                       return new BatchEventProcessor<TestEvent>(
                                                                                 disruptor.getRingBuffer(), ringBuffer.newBarrier(
                                                                                                                                  barrierSequences), eventHandler);
                                   }
                               });

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch);
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldHonourDependenciesForCustomProcessors() throws Exception
{
    final CountDownLatch countDownLatch = new CountDownLatch(2);
    final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch);
    final DelayedEventHandler delayedEventHandler = createDelayedEventHandler();

    disruptor.handleEventsWith(delayedEventHandler).then(
        new EventProcessorFactory<TestEvent>()
        {
            @Override
            public EventProcessor createEventProcessor(
                final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences)
            {
                assertSame("Should have had a barrier sequence", 1, barrierSequences.length);
                return new BatchEventProcessor<TestEvent>(
                    disruptor.getRingBuffer(), ringBuffer.newBarrier(
                    barrierSequences), eventHandler);
            }
        });

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler);
}
项目:cep    文件:Source.java   
/**
 * Create a new source.
 * <p>This method will prepare the instance with some needed variables
 * in order to be started later with the start method (implemented by children).
 *
 * @param parsersManager Instance of ParserManager that will serve parsers to this source instance
 * @param eventHandler Instance of EventHandler that will receive the events generated by this source instance
 * @param properties Map of properties associated with this source
 */

public Source(ParsersManager parsersManager, EventHandler eventHandler, Map<String, Object> properties) {
    // Save the references for later use
    this.parsersManager = parsersManager;
    this.properties = properties;

    // Create the ring buffer for this topic and start it
    Disruptor<MapEvent> disruptor = new Disruptor<>(new MapEventFactory(), ConfigData.getRingBufferSize(), Executors.newCachedThreadPool());
    disruptor.handleEventsWith(eventHandler);
    disruptor.start();

    // Create the event producer that will receive the events produced by
    // this source instance
    eventProducer = new EventProducer(disruptor.getRingBuffer());
    prepare();
}
项目:jstrom    文件:DisruptorTest.java   
@Test
public void testLaterStartConsumer() throws InterruptedException {
    System.out.println("!!!!!!!!!!!!!!!Begin testLaterStartConsumer!!!!!!!!!!");
    final AtomicBoolean messageConsumed = new AtomicBoolean(false);

    // Set queue length to 1, so that the RingBuffer can be easily full
    // to trigger consumer blocking
    DisruptorQueue queue = createQueue("consumerHang", ProducerType.MULTI, 2);
    push(queue, 1);
    Runnable producer = new Producer(queue);
    Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
        long count = 0;

        @Override
        public void onEvent(Object obj, long sequence, boolean endOfBatch) throws Exception {

            messageConsumed.set(true);
            System.out.println("Consume " + count++);
        }
    });

    run(producer, 0, 0, consumer, 50);
    Assert.assertTrue("disruptor message is never consumed due to consumer thread hangs", messageConsumed.get());

    System.out.println("!!!!!!!!!!!!!!!!End testLaterStartConsumer!!!!!!!!!!");
}
项目:jstrom    文件:DisruptorTest.java   
@Test
public void testSingleProducer() throws InterruptedException {
    System.out.println("!!!!!!!!!!!!!!Begin testSingleProducer!!!!!!!!!!!!!!");
    final AtomicBoolean messageConsumed = new AtomicBoolean(false);

    // Set queue length to 1, so that the RingBuffer can be easily full
    // to trigger consumer blocking
    DisruptorQueue queue = createQueue("consumerHang", ProducerType.SINGLE, 1);
    push(queue, 1);
    Runnable producer = new Producer(queue);
    Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
        long count = 0;

        @Override
        public void onEvent(Object obj, long sequence, boolean endOfBatch) throws Exception {

            messageConsumed.set(true);
            System.out.println("Consume " + count++);
        }
    });

    run(producer, 0, 0, consumer, 50);
    Assert.assertTrue("disruptor message is never consumed due to consumer thread hangs", messageConsumed.get());

    System.out.println("!!!!!!!!!!!!!!End testSingleProducer!!!!!!!!!!!!!!");
}
项目:disruptor-spring-manager    文件:DefaultDisruptorConfigTest.java   
/**
 * Publisher -> Ring buffer ---> Consumer A 
 * Look at the graph that gets printed by log4j.
 */
@Test
public void test_publish_single_eventprocessor_topology() {
    ConsumerA consumerA = new ConsumerA();

    EventHandlerChain<String> eventHandlerChain1 = new EventHandlerChain<String>(new EventHandler[]{consumerA});

    disruptorConfig.setEventHandlerChain(new EventHandlerChain[]{eventHandlerChain1});
    disruptorConfig.init();

    disruptorConfig.publish(new EventTranslator<String>() {

        @Override
        public void translateTo(String event, long sequence) {
            event = "hi there";
        }
    });
}
项目:disruptor-spring-manager    文件:DefaultDisruptorConfigTest.java   
/**
 * Publisher -> Ring buffer ---> Consumer A -> Consumer B1 -> Consumer D 
 * Look at the graph that gets printed by log4j.
 */
@Test
public void test_publish_simple_eventprocessor_topology() {
    ConsumerA consumerA = new ConsumerA();
    ConsumerB1 consumerB1 = new ConsumerB1();
    ConsumerD consumerD = new ConsumerD();

    EventHandlerChain<String> eventHandlerChain1 = new EventHandlerChain<String>(new EventHandler[]{consumerA}, new EventHandler[]{consumerB1});
    EventHandlerChain<String> eventHandlerChain2 = new EventHandlerChain<String>(new EventHandler[]{consumerB1}, new EventHandler[]{consumerD});

    disruptorConfig.setEventHandlerChain(new EventHandlerChain[]{eventHandlerChain1, eventHandlerChain2});
    disruptorConfig.init();

    disruptorConfig.publish(new EventTranslator<String>() {

        @Override
        public void translateTo(String event, long sequence) {
            event = "hi there";
        }
    });
}
项目:disruptor-spring-manager    文件:DefaultDisruptorConfigTest.java   
/** 
 *                                            Consumer B1  
 *                                           /           \
 * Publisher -> Ring buffer ---> Consumer A -             -> Consumer D 
 *                                           \           /
 *                                            Consumer B2
 * 
 * Look at the graph that gets printed by log4j.
 */
@Test
public void test_publish_diamond_eventprocessor_topology() {
    ConsumerA consumerA = new ConsumerA();
    ConsumerB1 consumerB1 = new ConsumerB1();
    ConsumerB2 consumerB2 = new ConsumerB2();
    ConsumerD consumerD = new ConsumerD();

    EventHandlerChain<String> eventHandlerChain1 = new EventHandlerChain<String>(new EventHandler[]{consumerA}, new EventHandler[]{consumerB1, consumerB2});
    EventHandlerChain<String> eventHandlerChain2 = new EventHandlerChain<String>(new EventHandler[]{consumerB1, consumerB2}, new EventHandler[]{consumerD});

    disruptorConfig.setEventHandlerChain(new EventHandlerChain[]{eventHandlerChain1, eventHandlerChain2});
    disruptorConfig.init();

    disruptorConfig.publish(new EventTranslator<String>() {

        @Override
        public void translateTo(String event, long sequence) {
            event = "hi there";
        }
    });
}
项目:disruptor-spring-manager    文件:DefaultDisruptorConfigTest.java   
/** 
 *                                            Consumer B1 -> Consumer C1
 *                                           /                          \
 * Publisher -> Ring buffer ---> Consumer A -                            -> Consumer D 
 *                                           \                          /
 *                                            Consumer B2 -> Consumer C2
 * 
 * Look at the graph that gets printed by log4j.
 */
@Test
public void test_publish_complicated_diamond_eventprocessor_topology() {
    ConsumerA consumerA = new ConsumerA();
    ConsumerB1 consumerB1 = new ConsumerB1();
    ConsumerB2 consumerB2 = new ConsumerB2();
    ConsumerC1 consumerC1 = new ConsumerC1();
    ConsumerC2 consumerC2 = new ConsumerC2();
    ConsumerD consumerD = new ConsumerD();

    EventHandlerChain<String> eventHandlerChain1 = new EventHandlerChain<String>(new EventHandler[]{consumerA}, new EventHandler[]{consumerB1, consumerB2});
    EventHandlerChain<String> eventHandlerChain2 = new EventHandlerChain<String>(new EventHandler[]{consumerB1}, new EventHandler[]{consumerC1});
    EventHandlerChain<String> eventHandlerChain3 = new EventHandlerChain<String>(new EventHandler[]{consumerB2}, new EventHandler[]{consumerC2});
    EventHandlerChain<String> eventHandlerChain4 = new EventHandlerChain<String>(new EventHandler[]{consumerC1, consumerC2}, new EventHandler[]{consumerD});

    disruptorConfig.setEventHandlerChain(new EventHandlerChain[]{eventHandlerChain1, eventHandlerChain2, eventHandlerChain3, eventHandlerChain4});
    disruptorConfig.init();

    disruptorConfig.publish(new EventTranslator<String>() {

        @Override
        public void translateTo(String event, long sequence) {
            event = "hi there";
        }
    });
}
项目:incubator-storm    文件:DisruptorQueue.java   
private void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
    for(long curr = _consumer.get() + 1; curr <= cursor; curr++) {
        try {
            MutableObject mo = _buffer.get(curr);
            Object o = mo.o;
            mo.setObject(null);
            if(o==FLUSH_CACHE) {
                Object c = null;
                while(true) {                        
                    c = _cache.poll();
                    if(c==null) break;
                    else handler.onEvent(c, curr, true);
                }
            } else if(o==INTERRUPT) {
                throw new InterruptedException("Disruptor processing interrupted");
            } else {
                handler.onEvent(o, curr, curr == cursor);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    //TODO: only set this if the consumer cursor has changed?
    _consumer.set(cursor);
}
项目:storm-resa    文件:DisruptorQueue.java   
private void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
    for(long curr = _consumer.get() + 1; curr <= cursor; curr++) {
        try {
            MutableObject mo = _buffer.get(curr);
            Object o = mo.o;
            mo.setObject(null);
            if(o==FLUSH_CACHE) {
                Object c = null;
                while(true) {                        
                    c = _cache.poll();
                    if(c==null) break;
                    else handler.onEvent(c, curr, true);
                }
            } else if(o==INTERRUPT) {
                throw new InterruptedException("Disruptor processing interrupted");
            } else {
                handler.onEvent(o, curr, curr == cursor);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    //TODO: only set this if the consumer cursor has changed?
    _consumer.set(cursor);
}
项目:Surf    文件:DumpStream.java   
public static void main(String []args) throws Exception{
    // args[0] is expected to be the same sort of property file as needed
    // by Surf for Kinesis: it should contain 
    // aws-access-key-id: <your-access-id>
    // aws-secret-key: <your-secret-key>
    // aws-kinesis-stream-name: <your-stream-name>
    if (args.length != 1){
        errorExit();
    }
    File f = new File(args[0]);
    if(!f.isFile() || !f.canRead()){
        errorExit();
    }

    // Set up disruptor
    final EventHandler<KinesisEvent> handler = new EventHandler<KinesisEvent>() {
        @Override
        public void onEvent(KinesisEvent kinesisEvent, long l, boolean b) throws Exception {
            System.out.println(String.format("Received : %s", kinesisEvent.getData()));
        }
    };
    Worker worker = Util.createWorker(f, handler, "DumpStream");
    worker.run();
}
项目:Surf    文件:Util.java   
public static Worker createWorker(File conf, EventHandler<KinesisEvent> handler, String appName)throws IOException{
    Executor executor = Executors.newCachedThreadPool();
    Disruptor<KinesisEvent> disruptor = new Disruptor<>(KinesisEvent.EVENT_FACTORY, 128, executor);

    disruptor.handleEventsWith(handler);
    RingBuffer<KinesisEvent> buffer = disruptor.start();

    Properties props = new Properties();
    props.load(new FileReader(conf));
    // Generate a unique worker ID
    String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
    String accessid = props.getProperty("aws-access-key-id");
    String secretkey = props.getProperty("aws-secret-key");
    String streamname = props.getProperty("aws-kinesis-stream-name");
    BasicAWSCredentials creds = new BasicAWSCredentials(accessid, secretkey);
    CredProvider credprovider = new CredProvider(creds);
    KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(appName, streamname,  credprovider, workerId);

    Worker worker = new Worker(new RecordProcessorFactory(buffer), config, new MetricsFactory());
    return worker;
}
项目:Surf    文件:PageCount.java   
public static void main( String[] args ) throws Exception
{
    // args[0] is expected to be the same sort of property file as needed
    // by Surf for Kinesis: it should contain 
    // aws-access-key-id: <your-access-id>
    // aws-secret-key: <your-secret-key>
    // aws-kinesis-stream-name: <your-stream-name>
    if (args.length != 1){
        errorExit();
    }
    File f = new File(args[0]);
    if(!f.isFile() || !f.canRead()){
        errorExit();
    }

    // Set up disruptor
    final EventHandler<KinesisEvent> handler = new PageCountHandler();
    Worker worker = Util.createWorker(f, handler, "PageCount");
    worker.run();
}
项目:couchbase-jvm-core    文件:AbstractGenericHandlerTest.java   
@Before
@SuppressWarnings("unchecked")
public void setup() {
    responseBuffer = new Disruptor<ResponseEvent>(new EventFactory<ResponseEvent>() {
        @Override
        public ResponseEvent newInstance() {
            return new ResponseEvent();
        }
    }, 1024, Executors.newCachedThreadPool());

    firedEvents = Collections.synchronizedList(new ArrayList<CouchbaseMessage>());
    latch = new CountDownLatch(1);
    responseBuffer.handleEventsWith(new EventHandler<ResponseEvent>() {
        @Override
        public void onEvent(ResponseEvent event, long sequence, boolean endOfBatch) throws Exception {
            firedEvents.add(event.getMessage());
            latch.countDown();
        }
    });
    responseRingBuffer = responseBuffer.start();
}
项目:log4j2    文件:AsyncLoggerConfigHelper.java   
private static synchronized void initDisruptor() {
    if (disruptor != null) {
        LOGGER.trace("AsyncLoggerConfigHelper not starting new disruptor, using existing object. Ref count is {}.", count);
        return;
    }
    LOGGER.trace("AsyncLoggerConfigHelper creating new disruptor. Ref count is {}.", count);
    final int ringBufferSize = calculateRingBufferSize();
    final WaitStrategy waitStrategy = createWaitStrategy();
    executor = Executors.newSingleThreadExecutor(threadFactory);
    disruptor = new Disruptor<Log4jEventWrapper>(FACTORY, ringBufferSize,
            executor, ProducerType.MULTI, waitStrategy);
    final EventHandler<Log4jEventWrapper>[] handlers = new Log4jEventWrapperHandler[] {//
    new Log4jEventWrapperHandler() };
    final ExceptionHandler errorHandler = getExceptionHandler();
    disruptor.handleExceptionsWith(errorHandler);
    disruptor.handleEventsWith(handlers);

    LOGGER.debug(
            "Starting AsyncLoggerConfig disruptor with ringbuffer size={}, waitStrategy={}, exceptionHandler={}...",
            disruptor.getRingBuffer().getBufferSize(), waitStrategy.getClass().getSimpleName(), errorHandler);
    disruptor.start();
}
项目:jstorm    文件:DisruptorTest.java   
@Test
public void testSingleProducer() throws InterruptedException {
    System.out.println("!!!!!!!!!!!!!!Begin testSingleProducer!!!!!!!!!!!!!!");
    final AtomicBoolean messageConsumed = new AtomicBoolean(false);

    // Set queue length to 1, so that the RingBuffer can be easily full
    // to trigger consumer blocking
    DisruptorQueue queue = createQueue("consumerHang", ProducerType.SINGLE, 1);
    push(queue, 1);
    Runnable producer = new Producer(queue);
    Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
        long count = 0;

        @Override
        public void onEvent(Object obj, long sequence, boolean endOfBatch) throws Exception {

            messageConsumed.set(true);
            System.out.println("Consume " + count++);
        }
    });

    run(producer, 0, 0, consumer, 50);
    Assert.assertTrue("disruptor message is never consumed due to consumer thread hangs", messageConsumed.get());

    System.out.println("!!!!!!!!!!!!!!End testSingleProducer!!!!!!!!!!!!!!");
}
项目:jaf-examples    文件:Main.java   
public static void main(String[] args) {
    EventFactory<HelloEvent> eventFactory = new HelloEventFactory();
    int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必须是 2 的 N 次方;

    Disruptor<HelloEvent> disruptor = new Disruptor<HelloEvent>(
            eventFactory, ringBufferSize, Executors.defaultThreadFactory(),
            ProducerType.SINGLE, new YieldingWaitStrategy());

    EventHandler<HelloEvent> eventHandler = new HelloEventHandler();
    disruptor.handleEventsWith(eventHandler, eventHandler);

    disruptor.start();

}
项目:disruptor-code-analysis    文件:Disruptor.java   
EventHandlerGroup<T> createEventProcessors(
        final Sequence[] barrierSequences,
        final EventHandler<? super T>[] eventHandlers) {
    checkNotStarted();

    final Sequence[] processorSequences = new Sequence[eventHandlers.length];
    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

    for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
        final EventHandler<? super T> eventHandler = eventHandlers[i];

        final BatchEventProcessor<T> batchEventProcessor =
                new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);

        if (exceptionHandler != null) {
            batchEventProcessor.setExceptionHandler(exceptionHandler);
        }

        consumerRepository.add(batchEventProcessor, eventHandler, barrier);
        processorSequences[i] = batchEventProcessor.getSequence();
    }

    if (processorSequences.length > 0) {
        consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
    }

    return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
}
项目:disruptor-code-analysis    文件:EventProcessorInfo.java   
EventProcessorInfo(
    final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier)
{
    this.eventprocessor = eventprocessor;
    this.handler = handler;
    this.barrier = barrier;
}
项目:disruptor-code-analysis    文件:ExceptionHandlerSetting.java   
ExceptionHandlerSetting(
    final EventHandler<T> eventHandler,
    final ConsumerRepository<T> consumerRepository)
{
    this.eventHandler = eventHandler;
    this.consumerRepository = consumerRepository;
}
项目:disruptor-code-analysis    文件:CustomRingBuffer.java   
public BatchEventProcessor<EventAccessor<T>> createHandler(final EventHandler<T> handler)
{
    BatchEventProcessor<EventAccessor<T>> processor =
        new BatchEventProcessor<EventAccessor<T>>(
            this,
            sequencer.newBarrier(),
            new AccessorEventHandler<T>(handler));
    sequencer.addGatingSequences(processor.getSequence());

    return processor;
}
项目:disruptor-code-analysis    文件:LongRingBuffer.java   
public BatchEventProcessor<LongEvent> createProcessor(final LongHandler handler)
{
    return new BatchEventProcessor<LongEvent>(
        new LongEvent(),
        sequencer.newBarrier(),
        new EventHandler<LongEvent>()
        {
            @Override
            public void onEvent(final LongEvent event, final long sequence, final boolean endOfBatch)
                throws Exception
            {
                handler.onEvent(event.get(), sequence, endOfBatch);
            }
        });
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldCreateEventProcessorGroupForFirstEventProcessors()
    throws Exception
{
    executor.ignoreExecutions();
    final EventHandler<TestEvent> eventHandler1 = new SleepingEventHandler();
    EventHandler<TestEvent> eventHandler2 = new SleepingEventHandler();

    final EventHandlerGroup<TestEvent> eventHandlerGroup =
        disruptor.handleEventsWith(eventHandler1, eventHandler2);
    disruptor.start();

    assertNotNull(eventHandlerGroup);
    assertThat(Integer.valueOf(executor.getExecutionCount()), equalTo(Integer.valueOf(2)));
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldMakeEntriesAvailableToFirstHandlersImmediately() throws Exception
{
    CountDownLatch countDownLatch = new CountDownLatch(2);
    EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch);

    disruptor.handleEventsWith(createDelayedEventHandler(), eventHandler);

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch);
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldWaitUntilAllFirstEventProcessorsProcessEventBeforeMakingItAvailableToDependentEventProcessors()
    throws Exception
{
    DelayedEventHandler eventHandler1 = createDelayedEventHandler();

    CountDownLatch countDownLatch = new CountDownLatch(2);
    EventHandler<TestEvent> eventHandler2 = new EventHandlerStub<TestEvent>(countDownLatch);

    disruptor.handleEventsWith(eventHandler1).then(eventHandler2);

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, eventHandler1);
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldAllowSpecifyingSpecificEventProcessorsToWaitFor()
    throws Exception
{
    DelayedEventHandler handler1 = createDelayedEventHandler();
    DelayedEventHandler handler2 = createDelayedEventHandler();

    CountDownLatch countDownLatch = new CountDownLatch(2);
    EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch);

    disruptor.handleEventsWith(handler1, handler2);
    disruptor.after(handler1, handler2).handleEventsWith(handlerWithBarrier);

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, handler1, handler2);
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test(timeout = 1000)
public void shouldTrackRemainingCapacity() throws Exception
{
    final long[] remainingCapacity = {-1};
    //Given
    final EventHandler<TestEvent> eventHandler = new EventHandler<TestEvent>()
    {
        @Override
        public void onEvent(final TestEvent event, final long sequence, final boolean endOfBatch) throws Exception
        {
            remainingCapacity[0] = disruptor.getRingBuffer().remainingCapacity();
        }
    };

    disruptor.handleEventsWith(eventHandler);

    //When
    publishEvent();

    //Then
    while (remainingCapacity[0] == -1)
    {
        Thread.sleep(100);
    }
    assertThat(remainingCapacity[0], is(ringBuffer.getBufferSize() - 1L));
    assertThat(disruptor.getRingBuffer().remainingCapacity(), is(ringBuffer.getBufferSize() - 0L));
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldAllowEventHandlerWithSuperType() throws Exception
{
    final CountDownLatch latch = new CountDownLatch(2);
    final EventHandler<Object> objectHandler = new EventHandlerStub<Object>(latch);

    disruptor.handleEventsWith(objectHandler);

    ensureTwoEventsProcessedAccordingToDependencies(latch);
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldAllowChainingEventHandlersWithSuperType() throws Exception
{
    final CountDownLatch latch = new CountDownLatch(2);
    final DelayedEventHandler delayedEventHandler = createDelayedEventHandler();
    final EventHandler<Object> objectHandler = new EventHandlerStub<Object>(latch);

    disruptor.handleEventsWith(delayedEventHandler).then(objectHandler);

    ensureTwoEventsProcessedAccordingToDependencies(latch, delayedEventHandler);
}
项目:jstorm-0.9.6.3-    文件:DisruptorTest.java   
@Test
public void testLaterStartConsumer() throws InterruptedException {
    System.out
            .println("!!!!!!!!!!!!!!!Begin testLaterStartConsumer!!!!!!!!!!");
    final AtomicBoolean messageConsumed = new AtomicBoolean(false);

    // Set queue length to 1, so that the RingBuffer can be easily full
    // to trigger consumer blocking
    DisruptorQueue queue = createQueue("consumerHang", ProducerType.MULTI,
            2);
    push(queue, 1);
    Runnable producer = new Producer(queue);
    Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
        long count = 0;

        @Override
        public void onEvent(Object obj, long sequence, boolean endOfBatch)
                throws Exception {

            messageConsumed.set(true);
            System.out.println("Consume " + count++);
        }
    });

    run(producer, 0, 0, consumer, 50);
    Assert.assertTrue(
            "disruptor message is never consumed due to consumer thread hangs",
            messageConsumed.get());

    System.out
            .println("!!!!!!!!!!!!!!!!End testLaterStartConsumer!!!!!!!!!!");
}
项目:jstorm-0.9.6.3-    文件:DisruptorTest.java   
@Test
public void testBeforeStartConsumer() throws InterruptedException {
    System.out
            .println("!!!!!!!!!!!!Begin testBeforeStartConsumer!!!!!!!!!");
    final AtomicBoolean messageConsumed = new AtomicBoolean(false);

    // Set queue length to 1, so that the RingBuffer can be easily full
    // to trigger consumer blocking
    DisruptorQueue queue = createQueue("consumerHang", ProducerType.MULTI,
            2);
    queue.consumerStarted();
    push(queue, 1);
    Runnable producer = new Producer(queue);
    Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
        long count = 0;

        @Override
        public void onEvent(Object obj, long sequence, boolean endOfBatch)
                throws Exception {

            messageConsumed.set(true);
            System.out.println("Consume " + count++);
        }
    });

    run(producer, 0, 0, consumer, 50);
    Assert.assertTrue(
            "disruptor message is never consumed due to consumer thread hangs",
            messageConsumed.get());

    System.out
            .println("!!!!!!!!!!!!!End testBeforeStartConsumer!!!!!!!!!!");
}
项目:jstorm-0.9.6.3-    文件:DisruptorTest.java   
@Test
public void testSingleProducer() throws InterruptedException {
    System.out
            .println("!!!!!!!!!!!!!!Begin testSingleProducer!!!!!!!!!!!!!!");
    final AtomicBoolean messageConsumed = new AtomicBoolean(false);

    // Set queue length to 1, so that the RingBuffer can be easily full
    // to trigger consumer blocking
    DisruptorQueue queue = createQueue("consumerHang", ProducerType.SINGLE,
            1);
    push(queue, 1);
    Runnable producer = new Producer(queue);
    Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
        long count = 0;

        @Override
        public void onEvent(Object obj, long sequence, boolean endOfBatch)
                throws Exception {

            messageConsumed.set(true);
            System.out.println("Consume " + count++);
        }
    });

    run(producer, 0, 0, consumer, 50);
    Assert.assertTrue(
            "disruptor message is never consumed due to consumer thread hangs",
            messageConsumed.get());

    System.out
            .println("!!!!!!!!!!!!!!End testSingleProducer!!!!!!!!!!!!!!");
}
项目:learn_jstorm    文件:DisruptorTest.java   
@Test
public void testLaterStartConsumer() throws InterruptedException {
    System.out
            .println("!!!!!!!!!!!!!!!Begin testLaterStartConsumer!!!!!!!!!!");
    final AtomicBoolean messageConsumed = new AtomicBoolean(false);

    // Set queue length to 1, so that the RingBuffer can be easily full
    // to trigger consumer blocking
    DisruptorQueue queue = createQueue("consumerHang", ProducerType.MULTI,
            2);
    push(queue, 1);
    Runnable producer = new Producer(queue);
    Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
        long count = 0;

        @Override
        public void onEvent(Object obj, long sequence, boolean endOfBatch)
                throws Exception {

            messageConsumed.set(true);
            System.out.println("Consume " + count++);
        }
    });

    run(producer, 0, 0, consumer, 50);
    Assert.assertTrue(
            "disruptor message is never consumed due to consumer thread hangs",
            messageConsumed.get());

    System.out
            .println("!!!!!!!!!!!!!!!!End testLaterStartConsumer!!!!!!!!!!");
}