@SuppressWarnings("unchecked") public SharedMessageStore(MessageDao messageDao, int bufferSize, int maxDbBatchSize) { pendingMessages = new ConcurrentHashMap<>(); ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("DisruptorMessageStoreThread-%d").build(); disruptor = new Disruptor<>(DbOperation.getFactory(), bufferSize, namedThreadFactory, ProducerType.MULTI, new SleepingWaitStrategy()); disruptor.setDefaultExceptionHandler(new LogExceptionHandler()); disruptor.handleEventsWith(new DbEventMatcher(bufferSize)) .then(new DbWriter(messageDao, maxDbBatchSize)) .then((EventHandler<DbOperation>) (event, sequence, endOfBatch) -> event.clear()); disruptor.start(); this.messageDao = messageDao; }
public MultiBufferBatchEventProcessor( DataProvider<T>[] providers, SequenceBarrier[] barriers, EventHandler<T> handler) { if (providers.length != barriers.length) { throw new IllegalArgumentException(); } this.providers = providers; this.barriers = barriers; this.handler = handler; this.sequences = new Sequence[providers.length]; for (int i = 0; i < sequences.length; i++) { sequences[i] = new Sequence(-1); } }
@Test public void shouldWaitOnAllProducersJoinedByAnd() throws Exception { DelayedEventHandler handler1 = createDelayedEventHandler(); DelayedEventHandler handler2 = createDelayedEventHandler(); CountDownLatch countDownLatch = new CountDownLatch(2); EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch); disruptor.handleEventsWith(handler1); final EventHandlerGroup<TestEvent> handler2Group = disruptor.handleEventsWith(handler2); disruptor.after(handler1).and(handler2Group).handleEventsWith(handlerWithBarrier); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, handler1, handler2); }
@Test public void shouldSupportCustomProcessorsAsDependencies() throws Exception { RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer(); final DelayedEventHandler delayedEventHandler = createDelayedEventHandler(); CountDownLatch countDownLatch = new CountDownLatch(2); EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch); final BatchEventProcessor<TestEvent> processor = new BatchEventProcessor<TestEvent>(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler); disruptor.handleEventsWith(processor); disruptor.after(processor).handleEventsWith(handlerWithBarrier); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler); }
@Test public void shouldSupportHandlersAsDependenciesToCustomProcessors() throws Exception { final DelayedEventHandler delayedEventHandler = createDelayedEventHandler(); disruptor.handleEventsWith(delayedEventHandler); RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer(); CountDownLatch countDownLatch = new CountDownLatch(2); EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch); final SequenceBarrier sequenceBarrier = disruptor.after(delayedEventHandler).asSequenceBarrier(); final BatchEventProcessor<TestEvent> processor = new BatchEventProcessor<TestEvent>(ringBuffer, sequenceBarrier, handlerWithBarrier); disruptor.handleEventsWith(processor); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler); }
@Test public void shouldSupportCustomProcessorsAndHandlersAsDependencies() throws Exception { final DelayedEventHandler delayedEventHandler1 = createDelayedEventHandler(); final DelayedEventHandler delayedEventHandler2 = createDelayedEventHandler(); disruptor.handleEventsWith(delayedEventHandler1); RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer(); CountDownLatch countDownLatch = new CountDownLatch(2); EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch); final SequenceBarrier sequenceBarrier = disruptor.after(delayedEventHandler1).asSequenceBarrier(); final BatchEventProcessor<TestEvent> processor = new BatchEventProcessor<TestEvent>(ringBuffer, sequenceBarrier, delayedEventHandler2); disruptor.after(delayedEventHandler1).and(processor).handleEventsWith(handlerWithBarrier); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler1, delayedEventHandler2); }
@Test public void shouldMakeEntriesAvailableToFirstCustomProcessorsImmediately() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(2); final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch); disruptor.handleEventsWith( new EventProcessorFactory<TestEvent>() { @Override public EventProcessor createEventProcessor( final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences) { assertEquals("Should not have had any barrier sequences", 0, barrierSequences.length); return new BatchEventProcessor<TestEvent>( disruptor.getRingBuffer(), ringBuffer.newBarrier( barrierSequences), eventHandler); } }); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch); }
@Test public void shouldHonourDependenciesForCustomProcessors() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(2); final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch); final DelayedEventHandler delayedEventHandler = createDelayedEventHandler(); disruptor.handleEventsWith(delayedEventHandler).then( new EventProcessorFactory<TestEvent>() { @Override public EventProcessor createEventProcessor( final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences) { assertSame("Should have had a barrier sequence", 1, barrierSequences.length); return new BatchEventProcessor<TestEvent>( disruptor.getRingBuffer(), ringBuffer.newBarrier( barrierSequences), eventHandler); } }); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler); }
/** * Create a new source. * <p>This method will prepare the instance with some needed variables * in order to be started later with the start method (implemented by children). * * @param parsersManager Instance of ParserManager that will serve parsers to this source instance * @param eventHandler Instance of EventHandler that will receive the events generated by this source instance * @param properties Map of properties associated with this source */ public Source(ParsersManager parsersManager, EventHandler eventHandler, Map<String, Object> properties) { // Save the references for later use this.parsersManager = parsersManager; this.properties = properties; // Create the ring buffer for this topic and start it Disruptor<MapEvent> disruptor = new Disruptor<>(new MapEventFactory(), ConfigData.getRingBufferSize(), Executors.newCachedThreadPool()); disruptor.handleEventsWith(eventHandler); disruptor.start(); // Create the event producer that will receive the events produced by // this source instance eventProducer = new EventProducer(disruptor.getRingBuffer()); prepare(); }
@Test public void testLaterStartConsumer() throws InterruptedException { System.out.println("!!!!!!!!!!!!!!!Begin testLaterStartConsumer!!!!!!!!!!"); final AtomicBoolean messageConsumed = new AtomicBoolean(false); // Set queue length to 1, so that the RingBuffer can be easily full // to trigger consumer blocking DisruptorQueue queue = createQueue("consumerHang", ProducerType.MULTI, 2); push(queue, 1); Runnable producer = new Producer(queue); Runnable consumer = new Consumer(queue, new EventHandler<Object>() { long count = 0; @Override public void onEvent(Object obj, long sequence, boolean endOfBatch) throws Exception { messageConsumed.set(true); System.out.println("Consume " + count++); } }); run(producer, 0, 0, consumer, 50); Assert.assertTrue("disruptor message is never consumed due to consumer thread hangs", messageConsumed.get()); System.out.println("!!!!!!!!!!!!!!!!End testLaterStartConsumer!!!!!!!!!!"); }
@Test public void testSingleProducer() throws InterruptedException { System.out.println("!!!!!!!!!!!!!!Begin testSingleProducer!!!!!!!!!!!!!!"); final AtomicBoolean messageConsumed = new AtomicBoolean(false); // Set queue length to 1, so that the RingBuffer can be easily full // to trigger consumer blocking DisruptorQueue queue = createQueue("consumerHang", ProducerType.SINGLE, 1); push(queue, 1); Runnable producer = new Producer(queue); Runnable consumer = new Consumer(queue, new EventHandler<Object>() { long count = 0; @Override public void onEvent(Object obj, long sequence, boolean endOfBatch) throws Exception { messageConsumed.set(true); System.out.println("Consume " + count++); } }); run(producer, 0, 0, consumer, 50); Assert.assertTrue("disruptor message is never consumed due to consumer thread hangs", messageConsumed.get()); System.out.println("!!!!!!!!!!!!!!End testSingleProducer!!!!!!!!!!!!!!"); }
/** * Publisher -> Ring buffer ---> Consumer A * Look at the graph that gets printed by log4j. */ @Test public void test_publish_single_eventprocessor_topology() { ConsumerA consumerA = new ConsumerA(); EventHandlerChain<String> eventHandlerChain1 = new EventHandlerChain<String>(new EventHandler[]{consumerA}); disruptorConfig.setEventHandlerChain(new EventHandlerChain[]{eventHandlerChain1}); disruptorConfig.init(); disruptorConfig.publish(new EventTranslator<String>() { @Override public void translateTo(String event, long sequence) { event = "hi there"; } }); }
/** * Publisher -> Ring buffer ---> Consumer A -> Consumer B1 -> Consumer D * Look at the graph that gets printed by log4j. */ @Test public void test_publish_simple_eventprocessor_topology() { ConsumerA consumerA = new ConsumerA(); ConsumerB1 consumerB1 = new ConsumerB1(); ConsumerD consumerD = new ConsumerD(); EventHandlerChain<String> eventHandlerChain1 = new EventHandlerChain<String>(new EventHandler[]{consumerA}, new EventHandler[]{consumerB1}); EventHandlerChain<String> eventHandlerChain2 = new EventHandlerChain<String>(new EventHandler[]{consumerB1}, new EventHandler[]{consumerD}); disruptorConfig.setEventHandlerChain(new EventHandlerChain[]{eventHandlerChain1, eventHandlerChain2}); disruptorConfig.init(); disruptorConfig.publish(new EventTranslator<String>() { @Override public void translateTo(String event, long sequence) { event = "hi there"; } }); }
/** * Consumer B1 * / \ * Publisher -> Ring buffer ---> Consumer A - -> Consumer D * \ / * Consumer B2 * * Look at the graph that gets printed by log4j. */ @Test public void test_publish_diamond_eventprocessor_topology() { ConsumerA consumerA = new ConsumerA(); ConsumerB1 consumerB1 = new ConsumerB1(); ConsumerB2 consumerB2 = new ConsumerB2(); ConsumerD consumerD = new ConsumerD(); EventHandlerChain<String> eventHandlerChain1 = new EventHandlerChain<String>(new EventHandler[]{consumerA}, new EventHandler[]{consumerB1, consumerB2}); EventHandlerChain<String> eventHandlerChain2 = new EventHandlerChain<String>(new EventHandler[]{consumerB1, consumerB2}, new EventHandler[]{consumerD}); disruptorConfig.setEventHandlerChain(new EventHandlerChain[]{eventHandlerChain1, eventHandlerChain2}); disruptorConfig.init(); disruptorConfig.publish(new EventTranslator<String>() { @Override public void translateTo(String event, long sequence) { event = "hi there"; } }); }
/** * Consumer B1 -> Consumer C1 * / \ * Publisher -> Ring buffer ---> Consumer A - -> Consumer D * \ / * Consumer B2 -> Consumer C2 * * Look at the graph that gets printed by log4j. */ @Test public void test_publish_complicated_diamond_eventprocessor_topology() { ConsumerA consumerA = new ConsumerA(); ConsumerB1 consumerB1 = new ConsumerB1(); ConsumerB2 consumerB2 = new ConsumerB2(); ConsumerC1 consumerC1 = new ConsumerC1(); ConsumerC2 consumerC2 = new ConsumerC2(); ConsumerD consumerD = new ConsumerD(); EventHandlerChain<String> eventHandlerChain1 = new EventHandlerChain<String>(new EventHandler[]{consumerA}, new EventHandler[]{consumerB1, consumerB2}); EventHandlerChain<String> eventHandlerChain2 = new EventHandlerChain<String>(new EventHandler[]{consumerB1}, new EventHandler[]{consumerC1}); EventHandlerChain<String> eventHandlerChain3 = new EventHandlerChain<String>(new EventHandler[]{consumerB2}, new EventHandler[]{consumerC2}); EventHandlerChain<String> eventHandlerChain4 = new EventHandlerChain<String>(new EventHandler[]{consumerC1, consumerC2}, new EventHandler[]{consumerD}); disruptorConfig.setEventHandlerChain(new EventHandlerChain[]{eventHandlerChain1, eventHandlerChain2, eventHandlerChain3, eventHandlerChain4}); disruptorConfig.init(); disruptorConfig.publish(new EventTranslator<String>() { @Override public void translateTo(String event, long sequence) { event = "hi there"; } }); }
private void consumeBatchToCursor(long cursor, EventHandler<Object> handler) { for(long curr = _consumer.get() + 1; curr <= cursor; curr++) { try { MutableObject mo = _buffer.get(curr); Object o = mo.o; mo.setObject(null); if(o==FLUSH_CACHE) { Object c = null; while(true) { c = _cache.poll(); if(c==null) break; else handler.onEvent(c, curr, true); } } else if(o==INTERRUPT) { throw new InterruptedException("Disruptor processing interrupted"); } else { handler.onEvent(o, curr, curr == cursor); } } catch (Exception e) { throw new RuntimeException(e); } } //TODO: only set this if the consumer cursor has changed? _consumer.set(cursor); }
public static void main(String []args) throws Exception{ // args[0] is expected to be the same sort of property file as needed // by Surf for Kinesis: it should contain // aws-access-key-id: <your-access-id> // aws-secret-key: <your-secret-key> // aws-kinesis-stream-name: <your-stream-name> if (args.length != 1){ errorExit(); } File f = new File(args[0]); if(!f.isFile() || !f.canRead()){ errorExit(); } // Set up disruptor final EventHandler<KinesisEvent> handler = new EventHandler<KinesisEvent>() { @Override public void onEvent(KinesisEvent kinesisEvent, long l, boolean b) throws Exception { System.out.println(String.format("Received : %s", kinesisEvent.getData())); } }; Worker worker = Util.createWorker(f, handler, "DumpStream"); worker.run(); }
public static Worker createWorker(File conf, EventHandler<KinesisEvent> handler, String appName)throws IOException{ Executor executor = Executors.newCachedThreadPool(); Disruptor<KinesisEvent> disruptor = new Disruptor<>(KinesisEvent.EVENT_FACTORY, 128, executor); disruptor.handleEventsWith(handler); RingBuffer<KinesisEvent> buffer = disruptor.start(); Properties props = new Properties(); props.load(new FileReader(conf)); // Generate a unique worker ID String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); String accessid = props.getProperty("aws-access-key-id"); String secretkey = props.getProperty("aws-secret-key"); String streamname = props.getProperty("aws-kinesis-stream-name"); BasicAWSCredentials creds = new BasicAWSCredentials(accessid, secretkey); CredProvider credprovider = new CredProvider(creds); KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(appName, streamname, credprovider, workerId); Worker worker = new Worker(new RecordProcessorFactory(buffer), config, new MetricsFactory()); return worker; }
public static void main( String[] args ) throws Exception { // args[0] is expected to be the same sort of property file as needed // by Surf for Kinesis: it should contain // aws-access-key-id: <your-access-id> // aws-secret-key: <your-secret-key> // aws-kinesis-stream-name: <your-stream-name> if (args.length != 1){ errorExit(); } File f = new File(args[0]); if(!f.isFile() || !f.canRead()){ errorExit(); } // Set up disruptor final EventHandler<KinesisEvent> handler = new PageCountHandler(); Worker worker = Util.createWorker(f, handler, "PageCount"); worker.run(); }
@Before @SuppressWarnings("unchecked") public void setup() { responseBuffer = new Disruptor<ResponseEvent>(new EventFactory<ResponseEvent>() { @Override public ResponseEvent newInstance() { return new ResponseEvent(); } }, 1024, Executors.newCachedThreadPool()); firedEvents = Collections.synchronizedList(new ArrayList<CouchbaseMessage>()); latch = new CountDownLatch(1); responseBuffer.handleEventsWith(new EventHandler<ResponseEvent>() { @Override public void onEvent(ResponseEvent event, long sequence, boolean endOfBatch) throws Exception { firedEvents.add(event.getMessage()); latch.countDown(); } }); responseRingBuffer = responseBuffer.start(); }
private static synchronized void initDisruptor() { if (disruptor != null) { LOGGER.trace("AsyncLoggerConfigHelper not starting new disruptor, using existing object. Ref count is {}.", count); return; } LOGGER.trace("AsyncLoggerConfigHelper creating new disruptor. Ref count is {}.", count); final int ringBufferSize = calculateRingBufferSize(); final WaitStrategy waitStrategy = createWaitStrategy(); executor = Executors.newSingleThreadExecutor(threadFactory); disruptor = new Disruptor<Log4jEventWrapper>(FACTORY, ringBufferSize, executor, ProducerType.MULTI, waitStrategy); final EventHandler<Log4jEventWrapper>[] handlers = new Log4jEventWrapperHandler[] {// new Log4jEventWrapperHandler() }; final ExceptionHandler errorHandler = getExceptionHandler(); disruptor.handleExceptionsWith(errorHandler); disruptor.handleEventsWith(handlers); LOGGER.debug( "Starting AsyncLoggerConfig disruptor with ringbuffer size={}, waitStrategy={}, exceptionHandler={}...", disruptor.getRingBuffer().getBufferSize(), waitStrategy.getClass().getSimpleName(), errorHandler); disruptor.start(); }
public static void main(String[] args) { EventFactory<HelloEvent> eventFactory = new HelloEventFactory(); int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必须是 2 的 N 次方; Disruptor<HelloEvent> disruptor = new Disruptor<HelloEvent>( eventFactory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); EventHandler<HelloEvent> eventHandler = new HelloEventHandler(); disruptor.handleEventsWith(eventHandler, eventHandler); disruptor.start(); }
EventHandlerGroup<T> createEventProcessors( final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) { checkNotStarted(); final Sequence[] processorSequences = new Sequence[eventHandlers.length]; final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler<? super T> eventHandler = eventHandlers[i]; final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler); if (exceptionHandler != null) { batchEventProcessor.setExceptionHandler(exceptionHandler); } consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } if (processorSequences.length > 0) { consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); } return new EventHandlerGroup<T>(this, consumerRepository, processorSequences); }
EventProcessorInfo( final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier) { this.eventprocessor = eventprocessor; this.handler = handler; this.barrier = barrier; }
ExceptionHandlerSetting( final EventHandler<T> eventHandler, final ConsumerRepository<T> consumerRepository) { this.eventHandler = eventHandler; this.consumerRepository = consumerRepository; }
public BatchEventProcessor<EventAccessor<T>> createHandler(final EventHandler<T> handler) { BatchEventProcessor<EventAccessor<T>> processor = new BatchEventProcessor<EventAccessor<T>>( this, sequencer.newBarrier(), new AccessorEventHandler<T>(handler)); sequencer.addGatingSequences(processor.getSequence()); return processor; }
public BatchEventProcessor<LongEvent> createProcessor(final LongHandler handler) { return new BatchEventProcessor<LongEvent>( new LongEvent(), sequencer.newBarrier(), new EventHandler<LongEvent>() { @Override public void onEvent(final LongEvent event, final long sequence, final boolean endOfBatch) throws Exception { handler.onEvent(event.get(), sequence, endOfBatch); } }); }
@Test public void shouldCreateEventProcessorGroupForFirstEventProcessors() throws Exception { executor.ignoreExecutions(); final EventHandler<TestEvent> eventHandler1 = new SleepingEventHandler(); EventHandler<TestEvent> eventHandler2 = new SleepingEventHandler(); final EventHandlerGroup<TestEvent> eventHandlerGroup = disruptor.handleEventsWith(eventHandler1, eventHandler2); disruptor.start(); assertNotNull(eventHandlerGroup); assertThat(Integer.valueOf(executor.getExecutionCount()), equalTo(Integer.valueOf(2))); }
@Test public void shouldMakeEntriesAvailableToFirstHandlersImmediately() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(2); EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch); disruptor.handleEventsWith(createDelayedEventHandler(), eventHandler); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch); }
@Test public void shouldWaitUntilAllFirstEventProcessorsProcessEventBeforeMakingItAvailableToDependentEventProcessors() throws Exception { DelayedEventHandler eventHandler1 = createDelayedEventHandler(); CountDownLatch countDownLatch = new CountDownLatch(2); EventHandler<TestEvent> eventHandler2 = new EventHandlerStub<TestEvent>(countDownLatch); disruptor.handleEventsWith(eventHandler1).then(eventHandler2); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, eventHandler1); }
@Test public void shouldAllowSpecifyingSpecificEventProcessorsToWaitFor() throws Exception { DelayedEventHandler handler1 = createDelayedEventHandler(); DelayedEventHandler handler2 = createDelayedEventHandler(); CountDownLatch countDownLatch = new CountDownLatch(2); EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch); disruptor.handleEventsWith(handler1, handler2); disruptor.after(handler1, handler2).handleEventsWith(handlerWithBarrier); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, handler1, handler2); }
@Test(timeout = 1000) public void shouldTrackRemainingCapacity() throws Exception { final long[] remainingCapacity = {-1}; //Given final EventHandler<TestEvent> eventHandler = new EventHandler<TestEvent>() { @Override public void onEvent(final TestEvent event, final long sequence, final boolean endOfBatch) throws Exception { remainingCapacity[0] = disruptor.getRingBuffer().remainingCapacity(); } }; disruptor.handleEventsWith(eventHandler); //When publishEvent(); //Then while (remainingCapacity[0] == -1) { Thread.sleep(100); } assertThat(remainingCapacity[0], is(ringBuffer.getBufferSize() - 1L)); assertThat(disruptor.getRingBuffer().remainingCapacity(), is(ringBuffer.getBufferSize() - 0L)); }
@Test public void shouldAllowEventHandlerWithSuperType() throws Exception { final CountDownLatch latch = new CountDownLatch(2); final EventHandler<Object> objectHandler = new EventHandlerStub<Object>(latch); disruptor.handleEventsWith(objectHandler); ensureTwoEventsProcessedAccordingToDependencies(latch); }
@Test public void shouldAllowChainingEventHandlersWithSuperType() throws Exception { final CountDownLatch latch = new CountDownLatch(2); final DelayedEventHandler delayedEventHandler = createDelayedEventHandler(); final EventHandler<Object> objectHandler = new EventHandlerStub<Object>(latch); disruptor.handleEventsWith(delayedEventHandler).then(objectHandler); ensureTwoEventsProcessedAccordingToDependencies(latch, delayedEventHandler); }
@Test public void testLaterStartConsumer() throws InterruptedException { System.out .println("!!!!!!!!!!!!!!!Begin testLaterStartConsumer!!!!!!!!!!"); final AtomicBoolean messageConsumed = new AtomicBoolean(false); // Set queue length to 1, so that the RingBuffer can be easily full // to trigger consumer blocking DisruptorQueue queue = createQueue("consumerHang", ProducerType.MULTI, 2); push(queue, 1); Runnable producer = new Producer(queue); Runnable consumer = new Consumer(queue, new EventHandler<Object>() { long count = 0; @Override public void onEvent(Object obj, long sequence, boolean endOfBatch) throws Exception { messageConsumed.set(true); System.out.println("Consume " + count++); } }); run(producer, 0, 0, consumer, 50); Assert.assertTrue( "disruptor message is never consumed due to consumer thread hangs", messageConsumed.get()); System.out .println("!!!!!!!!!!!!!!!!End testLaterStartConsumer!!!!!!!!!!"); }
@Test public void testBeforeStartConsumer() throws InterruptedException { System.out .println("!!!!!!!!!!!!Begin testBeforeStartConsumer!!!!!!!!!"); final AtomicBoolean messageConsumed = new AtomicBoolean(false); // Set queue length to 1, so that the RingBuffer can be easily full // to trigger consumer blocking DisruptorQueue queue = createQueue("consumerHang", ProducerType.MULTI, 2); queue.consumerStarted(); push(queue, 1); Runnable producer = new Producer(queue); Runnable consumer = new Consumer(queue, new EventHandler<Object>() { long count = 0; @Override public void onEvent(Object obj, long sequence, boolean endOfBatch) throws Exception { messageConsumed.set(true); System.out.println("Consume " + count++); } }); run(producer, 0, 0, consumer, 50); Assert.assertTrue( "disruptor message is never consumed due to consumer thread hangs", messageConsumed.get()); System.out .println("!!!!!!!!!!!!!End testBeforeStartConsumer!!!!!!!!!!"); }
@Test public void testSingleProducer() throws InterruptedException { System.out .println("!!!!!!!!!!!!!!Begin testSingleProducer!!!!!!!!!!!!!!"); final AtomicBoolean messageConsumed = new AtomicBoolean(false); // Set queue length to 1, so that the RingBuffer can be easily full // to trigger consumer blocking DisruptorQueue queue = createQueue("consumerHang", ProducerType.SINGLE, 1); push(queue, 1); Runnable producer = new Producer(queue); Runnable consumer = new Consumer(queue, new EventHandler<Object>() { long count = 0; @Override public void onEvent(Object obj, long sequence, boolean endOfBatch) throws Exception { messageConsumed.set(true); System.out.println("Consume " + count++); } }); run(producer, 0, 0, consumer, 50); Assert.assertTrue( "disruptor message is never consumed due to consumer thread hangs", messageConsumed.get()); System.out .println("!!!!!!!!!!!!!!End testSingleProducer!!!!!!!!!!!!!!"); }