Java 类com.lmax.disruptor.SequenceBarrier 实例源码

项目:disruptor-code-analysis    文件:MultiBufferBatchEventProcessor.java   
public MultiBufferBatchEventProcessor(
    DataProvider<T>[] providers,
    SequenceBarrier[] barriers,
    EventHandler<T> handler)
{
    if (providers.length != barriers.length)
    {
        throw new IllegalArgumentException();
    }

    this.providers = providers;
    this.barriers = barriers;
    this.handler = handler;

    this.sequences = new Sequence[providers.length];
    for (int i = 0; i < sequences.length; i++)
    {
        sequences[i] = new Sequence(-1);
    }
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldSupportHandlersAsDependenciesToCustomProcessors()
    throws Exception
{
    final DelayedEventHandler delayedEventHandler = createDelayedEventHandler();
    disruptor.handleEventsWith(delayedEventHandler);


    RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();
    CountDownLatch countDownLatch = new CountDownLatch(2);
    EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch);

    final SequenceBarrier sequenceBarrier = disruptor.after(delayedEventHandler).asSequenceBarrier();
    final BatchEventProcessor<TestEvent> processor =
        new BatchEventProcessor<TestEvent>(ringBuffer, sequenceBarrier, handlerWithBarrier);
    disruptor.handleEventsWith(processor);

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler);
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldSupportCustomProcessorsAndHandlersAsDependencies() throws Exception
{
    final DelayedEventHandler delayedEventHandler1 = createDelayedEventHandler();
    final DelayedEventHandler delayedEventHandler2 = createDelayedEventHandler();
    disruptor.handleEventsWith(delayedEventHandler1);


    RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();
    CountDownLatch countDownLatch = new CountDownLatch(2);
    EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch);

    final SequenceBarrier sequenceBarrier = disruptor.after(delayedEventHandler1).asSequenceBarrier();
    final BatchEventProcessor<TestEvent> processor =
        new BatchEventProcessor<TestEvent>(ringBuffer, sequenceBarrier, delayedEventHandler2);

    disruptor.after(delayedEventHandler1).and(processor).handleEventsWith(handlerWithBarrier);

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler1, delayedEventHandler2);
}
项目:andes    文件:ConcurrentContentReadTaskBatchProcessor.java   
/**
 * Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when
 * the {@link com.lmax.disruptor.EventHandler#onEvent(Object, long, boolean)} method returns.
 *
 * @param ringBuffer      to which events are published.
 * @param sequenceBarrier on which it is waiting.
 * @param eventHandler    is the delegate to which events are dispatched.
 * @param turn            is the value of, sequence % groupCount this batch processor process events. Turn must be
 *                        less than groupCount
 * @param groupCount      total number of concurrent batch processors for the event type
 * @param batchSize       size limit of total content size to batch. This is a loose limit
 */
public ConcurrentContentReadTaskBatchProcessor(final RingBuffer<DeliveryEventData> ringBuffer,
        final SequenceBarrier sequenceBarrier, final ContentCacheCreator eventHandler, long turn, int groupCount,
        int batchSize) {
    if (turn >= groupCount) {
        throw new IllegalArgumentException("Turn should be less than groupCount");
    }

    this.ringBuffer = ringBuffer;
    this.sequenceBarrier = sequenceBarrier;
    this.eventHandler = eventHandler;
    this.turn = turn;
    this.groupCount = groupCount;
    this.batchSize = batchSize;

    exceptionHandler = new DeliveryExceptionHandler();
    running = new AtomicBoolean(false);
    if (eventHandler instanceof SequenceReportingEventHandler) {
        ((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);
    }
}
项目:andes    文件:SleepingBlockingWaitStrategy.java   
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException {
    if(cursorSequence.get() < sequence) {
        this.lock.lock();

        try {
            while(cursorSequence.get() < sequence) {
                barrier.checkAlert();
                this.processorNotifyCondition.await();
            }
        } finally {
            this.lock.unlock();
        }
    }

    long availableSequence;
    while((availableSequence = dependentSequence.get()) < sequence) {
        barrier.checkAlert();
        LockSupport.parkNanos(1L);
    }

    return availableSequence;
}
项目:andes    文件:SimpleMessaging.java   
public void init(Properties configProps) {
    // subscriptions = new SubscriptionsStore();
    //Modified by WSO2 in-order to extend the capability of the existing subscriptions store
    //to be more suitable for the distribution architecture of Andes
    subscriptions = new MQTTSubscriptionStore();
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
            .setNameFormat("Disruptor MQTT Simple Messaging Thread %d").build();
    ExecutorService executor = Executors.newCachedThreadPool(namedThreadFactory);
    Integer ringBufferSize = AndesConfigurationManager.readValue(
            AndesConfiguration.TRANSPORTS_MQTT_INBOUND_BUFFER_SIZE);

    disruptor = new Disruptor<ValueEvent>( ValueEvent.EVENT_FACTORY, ringBufferSize, executor);
    //Added by WSO2, we do not want to ignore the exception here
    disruptor.handleExceptionsWith(new MqttLogExceptionHandler());
    SequenceBarrier barrier = disruptor.getRingBuffer().newBarrier();
    BatchEventProcessor<ValueEvent> eventProcessor = new BatchEventProcessor<ValueEvent>(
            disruptor.getRingBuffer(), barrier, this);
    //Added by WSO2, we need to make sure the exceptions aren't ignored
    eventProcessor.setExceptionHandler(new MqttLogExceptionHandler());
    disruptor.handleEventsWith(eventProcessor);
    m_ringBuffer = disruptor.start();

    disruptorPublish(new InitEvent(configProps));
}
项目:kevoree-library    文件:ProtocolProcessor.java   
/**
 * @param subscriptions the subscription store where are stored all the existing
 *  clients subscriptions.
 * @param storageService the persistent store to use for save/load of messages
 *  for QoS1 and QoS2 handling.
 * @param authenticator the authenticator used in connect messages
 */
void init(SubscriptionsStore subscriptions, IStorageService storageService,
          IAuthenticator authenticator) {
    //m_clientIDs = clientIDs;
    this.subscriptions = subscriptions;
    m_authenticator = authenticator;
    m_storageService = storageService;

    //init the output ringbuffer
    m_executor = Executors.newFixedThreadPool(1);

    m_ringBuffer = new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY, 1024 * 32);

    SequenceBarrier barrier = m_ringBuffer.newBarrier();
    m_eventProcessor = new BatchEventProcessor<ValueEvent>(m_ringBuffer, barrier, this);
    //TODO in a presentation is said to don't do the followinf line!!
    m_ringBuffer.setGatingSequences(m_eventProcessor.getSequence());
    m_executor.submit(m_eventProcessor);
}
项目:disruptor-code-analysis    文件:Disruptor.java   
EventHandlerGroup<T> createEventProcessors(
        final Sequence[] barrierSequences,
        final EventHandler<? super T>[] eventHandlers) {
    checkNotStarted();

    final Sequence[] processorSequences = new Sequence[eventHandlers.length];
    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

    for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
        final EventHandler<? super T> eventHandler = eventHandlers[i];

        final BatchEventProcessor<T> batchEventProcessor =
                new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);

        if (exceptionHandler != null) {
            batchEventProcessor.setExceptionHandler(exceptionHandler);
        }

        consumerRepository.add(batchEventProcessor, eventHandler, barrier);
        processorSequences[i] = batchEventProcessor.getSequence();
    }

    if (processorSequences.length > 0) {
        consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
    }

    return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
}
项目:disruptor-code-analysis    文件:Disruptor.java   
EventHandlerGroup<T> createWorkerPool(
        final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers) {
    final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
    final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
    consumerRepository.add(workerPool, sequenceBarrier);
    return new EventHandlerGroup<T>(this, consumerRepository, workerPool.getWorkerSequences());
}
项目:disruptor-code-analysis    文件:EventProcessorInfo.java   
EventProcessorInfo(
    final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier)
{
    this.eventprocessor = eventprocessor;
    this.handler = handler;
    this.barrier = barrier;
}
项目:disruptor-code-analysis    文件:ConsumerRepositoryTest.java   
@Before
public void setUp() throws Exception
{
    consumerRepository = new ConsumerRepository<TestEvent>();
    eventProcessor1 = mockery.mock(EventProcessor.class, "eventProcessor1");
    eventProcessor2 = mockery.mock(EventProcessor.class, "eventProcessor2");

    final Sequence sequence1 = new Sequence();
    final Sequence sequence2 = new Sequence();
    mockery.checking(
        new Expectations()
        {
            {
                allowing(eventProcessor1).getSequence();
                will(returnValue(sequence1));

                allowing(eventProcessor1).isRunning();
                will(returnValue(true));

                allowing(eventProcessor2).getSequence();
                will(returnValue(sequence2));

                allowing(eventProcessor2).isRunning();
                will(returnValue(true));
            }
        });
    handler1 = new SleepingEventHandler();
    handler2 = new SleepingEventHandler();

    barrier1 = mockery.mock(SequenceBarrier.class, "barrier1");
    barrier2 = mockery.mock(SequenceBarrier.class, "barrier2");
}
项目:disruptor-code-analysis    文件:TestWaiter.java   
public TestWaiter(
    final CyclicBarrier cyclicBarrier,
    final SequenceBarrier sequenceBarrier,
    final RingBuffer<StubEvent> ringBuffer,
    final long initialSequence,
    final long toWaitForSequence)
{
    this.cyclicBarrier = cyclicBarrier;
    this.initialSequence = initialSequence;
    this.ringBuffer = ringBuffer;
    this.toWaitForSequence = toWaitForSequence;
    this.sequenceBarrier = sequenceBarrier;
}
项目:gflogger    文件:LoggerServiceImpl.java   
public long waitFor(
    long sequence,
    Sequence cursor,
    Sequence dependentSequence,
    SequenceBarrier barrier
) throws AlertException, InterruptedException, TimeoutException {
    long availableSequence;
    if ((availableSequence = cursor.get()) < sequence) {
        flush();
        synchronized (lock) {
            ++numWaiters;
            while ((availableSequence = cursor.get()) < sequence) {
                if (state == State.STOPPED) {
                    disruptor.halt();
                    throw AlertException.INSTANCE;
                }
                barrier.checkAlert();
                //*/
                lock.wait();
                /*/
                Thread.sleep(1);
                //*/
            }
            --numWaiters;
        }
    }
    while ((availableSequence = dependentSequence.get()) < sequence) {
        barrier.checkAlert();
    }

    return availableSequence;
}
项目:camunda-bpm-reactor    文件:ParkWaitStrategy.java   
@Override
public long waitFor(long sequence,
                    Sequence cursor,
                    Sequence dependentSequence,
                    SequenceBarrier barrier) throws AlertException,
  InterruptedException,
  TimeoutException {
  long availableSequence;
  while ((availableSequence = dependentSequence.get()) < sequence) {
    barrier.checkAlert();
    LockSupport.parkNanos(parkFor);
  }
  return availableSequence;
}
项目:f1x    文件:AbstractByteRingConsumerEx.java   
/**
 * @param ring             source byte ring containing inbound messages
 * @param sequenceBarrier  on which it is waiting.
 * @param delegate   is the delegate to which message are dispatched.
 * @param exceptionHandler to be called back when an error occurs
 *                         as {@link com.lmax.disruptor.Sequencer#INITIAL_CURSOR_VALUE}
 */
public AbstractByteRingConsumerEx(ByteRing ring,
                                  SequenceBarrier sequenceBarrier,
                                  RingBufferBlockProcessor delegate,
                                  ExceptionHandler exceptionHandler) {
    super(ring, sequenceBarrier);
    this.delegate = delegate;
    this.exceptionHandler = exceptionHandler;
}
项目:perf-workshop    文件:SpinLoopHintBusySpinWaitStrategy.java   
@Override
public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
        throws AlertException, InterruptedException
{
    long availableSequence;

    while ((availableSequence = dependentSequence.get()) < sequence)
    {
        SpinHint.spinLoopHint();
        barrier.checkAlert();
    }

    return availableSequence;
}
项目:low-latency-high-throughput    文件:Demo1PMC.java   
public static void main(String[] args) {

    final List<ValueEventHandler1PMC> handlers = new ArrayList<>(NUMBER_CONSUMERS);

    RingBuffer<ValueEvent> ringBuffer = RingBuffer.createSingleProducer(
            ValueEvent.EVENT_FACTORY, RING_SIZE, new SleepingWaitStrategy());

    start = System.nanoTime();

    //Create consumers
    for(int i = 0;  i < NUMBER_CONSUMERS; i++) {
        ValueEventHandler1PMC handler = new ValueEventHandler1PMC(start, handlers);
        handlers.add(handler);
        SequenceBarrier barrier = ringBuffer.newBarrier();
        BatchEventProcessor<ValueEvent> eventProcessor = new BatchEventProcessor<ValueEvent>(
                ringBuffer, barrier, handler);
        ringBuffer.addGatingSequences(eventProcessor.getSequence());

        // Each EventProcessor can run on a separate thread
        EXECUTOR.submit(eventProcessor);
    }

    for(int i = 0;  i  < SAMPLES_SIZE; i++) {
        // Publishers claim events in sequence
        long sequence = ringBuffer.next();
        ValueEvent event = ringBuffer.get(sequence);

        event.setValue(i); // this could be more complex with multiple fields

        // make the event available to EventProcessors
        ringBuffer.publish(sequence);   
    }

}
项目:darks-grid    文件:DisruptorEventsChannel.java   
@Override
public boolean initialize(EventsChannelConfig config)
{
       super.initialize(config);
    log.info("Initialize disruptor events channel " + config.getName() + " with " + config);
    EventFactory<GridEvent> eventFactory = new DisruptorEventFactory();
       int ringBufferSize = config.getBlockQueueMaxNumber(); 
       int threadSize = config.getEventConsumerNumber();
       int bufferSize = ringBufferSize;
       if (Integer.bitCount(bufferSize) != 1)
       {
           bufferSize = (int) Math.pow(2, (int) (Math.log(ringBufferSize) / Math.log(2)));
           log.warn("Change disruptor events channel " + config.getName() + 
                   " buffer size from " + ringBufferSize + " to " + bufferSize);
       }
       if (bufferSize <= 0)
           throw new GridException("Invalid disruptor ringbuffur size:" + ringBufferSize);
       threadPool = Executors.newFixedThreadPool(threadSize);
       ringBuffer = RingBuffer.createMultiProducer(eventFactory, bufferSize, new BlockingWaitStrategy());  
       SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
       ExecutorService executor = Executors.newFixedThreadPool(10);  
       @SuppressWarnings("unchecked")
       WorkHandler<GridEvent>[] workHandlers = new WorkHandler[threadSize];  
       for (int i = 0; i < threadSize; i++) {  
           WorkHandler<GridEvent> workHandler = new DisruptorEventsWorkHandler(getName());  
           workHandlers[i] = workHandler;  
       }  

       workerPool = new WorkerPool<GridEvent>(ringBuffer, sequenceBarrier, 
               new IgnoreExceptionHandler(), workHandlers);  
       workerPool.start(executor);  
    return true;
}
项目:dsys-snio    文件:WakeupWaitStrategy.java   
/**
 * {@inheritDoc}
 */
@Override
public long waitFor(final long sequence, final Sequence cursor,
        final Sequence dependentSequence, final SequenceBarrier barrier)
        throws AlertException, InterruptedException, TimeoutException {
    return cursor.get();
}
项目:injector    文件:DisruptorExecutor.java   
public DisruptorExecutor(int threadCount, int bufferSize, WaitStrategy waitStrategy)
{
    ringBuffer = RingBuffer.createMultiProducer(new EventFactory<RContainer>()
    {

        @Override
        public RContainer newInstance()
        {
            return new RContainer();
        }
    }, bufferSize, waitStrategy);
    SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
    Sequence workSequence = new Sequence(-1);
    workProcessors = new WorkProcessor[threadCount];
    for (int i = 0 ; i < threadCount ; i++)
    {
        workProcessors[i] = new WorkProcessor<RContainer>(ringBuffer, sequenceBarrier,
            handler, new IgnoreExceptionHandler(), workSequence);
    }
    workExec = Executors.newFixedThreadPool(workProcessors.length, new ThreadFactory()
    {
        public Thread newThread(Runnable r)
        {
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        }
    });
    for (WorkProcessor p : workProcessors)
        workExec.execute(p);
}
项目:kevoree-library    文件:SimpleMessaging.java   
public void init(Properties configProps) {
    subscriptions = new SubscriptionsStore();
    m_executor = Executors.newFixedThreadPool(1);

    m_ringBuffer = new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY, 1024 * 32);

    SequenceBarrier barrier = m_ringBuffer.newBarrier();
    m_eventProcessor = new BatchEventProcessor<ValueEvent>(m_ringBuffer, barrier, this);
    //TODO in a presentation is said to don't do the followinf line!!
    m_ringBuffer.setGatingSequences(m_eventProcessor.getSequence());
    m_executor.submit(m_eventProcessor);

    disruptorPublish(new InitEvent(configProps));
}
项目:moquette-mqtt    文件:SimpleMessaging.java   
public void init() {
    m_executor = Executors.newFixedThreadPool(1);

    m_ringBuffer = new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY, 1024 * 32);

    SequenceBarrier barrier = m_ringBuffer.newBarrier();
    m_eventProcessor = new BatchEventProcessor<ValueEvent>(m_ringBuffer, barrier, this);
    //TODO in a presentation is said to don't do the followinf line!!
    m_ringBuffer.setGatingSequences(m_eventProcessor.getSequence());
    m_executor.submit(m_eventProcessor);

    disruptorPublish(new InitEvent());
}
项目:disruptor-code-analysis    文件:EventProcessorInfo.java   
@Override
public SequenceBarrier getBarrier()
{
    return barrier;
}
项目:disruptor-code-analysis    文件:OneToOneOffHeapThroughputTest.java   
public SequenceBarrier newBarrier()
{
    return sequencer.newBarrier();
}
项目:camunda-bpm-reactor    文件:RingBufferSubscriberUtils.java   
public static <T> boolean waitRequestOrTerminalEvent(
  Sequence pendingRequest,
  RingBuffer<MutableSignal<T>> ringBuffer,
  SequenceBarrier barrier,
  Subscriber<? super T> subscriber,
  AtomicBoolean isRunning
) {
  final long waitedSequence = ringBuffer.getCursor() + 1L;
  try {
    MutableSignal<T> event = null;
    while (pendingRequest.get() < 0l) {
      //pause until first request
      if (event == null) {
        barrier.waitFor(waitedSequence);
        event = ringBuffer.get(waitedSequence);

        if (event.type == MutableSignal.Type.COMPLETE) {
          try {
            subscriber.onComplete();
            return false;
          } catch (Throwable t) {
            Exceptions.throwIfFatal(t);
            subscriber.onError(t);
            return false;
          }
        } else if (event.type == MutableSignal.Type.ERROR) {
          subscriber.onError(event.error);
          return false;
        }
      } else {
        barrier.checkAlert();
      }
      LockSupport.parkNanos(1l);
    }
  } catch (TimeoutException te) {
    //ignore
  } catch (AlertException ae) {
    if (!isRunning.get()) {
      return false;
    }
  } catch (InterruptedException ie) {
    Thread.currentThread().interrupt();
  }

  return true;
}
项目:camunda-bpm-reactor    文件:AgileWaitingStrategy.java   
@Override
public long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
  throws AlertException, InterruptedException, TimeoutException {
  return currentStrategy.waitFor(sequence, cursor, dependentSequence, barrier);
}
项目:low-latency-high-throughput    文件:Demo1PMCSequence.java   
public static void main(String[] args) {

        RingBuffer<ValueEvent> ringBuffer = RingBuffer.createSingleProducer(
                ValueEvent.EVENT_FACTORY, RING_SIZE, new SleepingWaitStrategy());

        start = System.nanoTime();

        //Create first consumer
        ValueEventHandler1PMCSequenceFirst handler = new ValueEventHandler1PMCSequenceFirst(start);
        SequenceBarrier barrier;
        barrier = ringBuffer.newBarrier();
        BatchEventProcessor<ValueEvent> firstEventProcessor = new BatchEventProcessor<ValueEvent>(
                ringBuffer, barrier, handler);
//      ringBuffer.addGatingSequences(firstEventProcessor.getSequence());

        // Each EventProcessor can run on a separate thread
        EXECUTOR.submit(firstEventProcessor);

        //Create second consumer
        ValueEventHandler1PMCSequenceSecond handler2 = new ValueEventHandler1PMCSequenceSecond(start);
        SequenceBarrier barrier2 = ringBuffer.newBarrier(firstEventProcessor.getSequence());
        BatchEventProcessor<ValueEvent> secondEventProcessor = new BatchEventProcessor<ValueEvent>(
                ringBuffer, barrier2, handler2);
        ringBuffer.addGatingSequences(secondEventProcessor.getSequence());

        // Each EventProcessor can run on a separate thread
        EXECUTOR.submit(secondEventProcessor);


        for(int i = 0;  i  < SAMPLES_SIZE; i++) {
            // Publishers claim events in sequence
            long sequence = ringBuffer.next();
            ValueEvent event = ringBuffer.get(sequence);

            event.setValue(i); // this could be more complex with multiple fields

            // make the event available to EventProcessors
            ringBuffer.publish(sequence);   
        }

    }
项目:andes    文件:DisruptorBasedFlusher.java   
public DisruptorBasedFlusher() {
    Integer ringBufferSize = AndesConfigurationManager.readValue(
            AndesConfiguration.PERFORMANCE_TUNING_DELIVERY_RING_BUFFER_SIZE);
    Integer parallelContentReaders = AndesConfigurationManager.readValue(
            AndesConfiguration.PERFORMANCE_TUNING_DELIVERY_PARALLEL_CONTENT_READERS);
    Integer parallelDecompressionHandlers = AndesConfigurationManager.readValue(
            AndesConfiguration.PERFORMANCE_TUNING_DELIVERY_PARALLEL_DECOMPRESSION_HANDLERS);
    Integer parallelDeliveryHandlers = AndesConfigurationManager.readValue(
            AndesConfiguration.PERFORMANCE_TUNING_DELIVERY_PARALLEL_DELIVERY_HANDLERS);
    Integer contentSizeToBatch = AndesConfigurationManager.readValue(
            AndesConfiguration.PERFORMANCE_TUNING_DELIVERY_CONTENT_READ_BATCH_SIZE);
    int maxContentChunkSize = AndesConfigurationManager.readValue(
            AndesConfiguration.PERFORMANCE_TUNING_MAX_CONTENT_CHUNK_SIZE);

    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorBasedFlusher-%d").build();
    Executor threadPoolExecutor = Executors.newCachedThreadPool(namedThreadFactory);

    disruptor = new Disruptor<>(new DeliveryEventData.DeliveryEventDataFactory(), ringBufferSize,
                                                 threadPoolExecutor,
                                                 ProducerType.MULTI,
                                                 new SleepingBlockingWaitStrategy());

    disruptor.handleExceptionsWith(new DeliveryExceptionHandler());

    // This barrier is used for contentReaders. Content read processors process events first. Hence take the
    // barrier directly from ring buffer
    SequenceBarrier barrier = disruptor.getRingBuffer().newBarrier();

    // Initialize content readers
    ConcurrentContentReadTaskBatchProcessor[] contentReadTaskBatchProcessor = new ConcurrentContentReadTaskBatchProcessor[parallelContentReaders];
    for (int i = 0; i < parallelContentReaders; i++) {
        contentReadTaskBatchProcessor[i] = new ConcurrentContentReadTaskBatchProcessor(
                disruptor.getRingBuffer(),
                barrier,
                new ContentCacheCreator(maxContentChunkSize),
                i,
                parallelContentReaders,
                contentSizeToBatch);

        contentReadTaskBatchProcessor[i].setExceptionHandler(new DeliveryExceptionHandler());
    }

    // Initialize decompression handlers
    ContentDecompressionHandler[] decompressionEventHandlers =
            new ContentDecompressionHandler[parallelDecompressionHandlers];
    for (int i = 0; i < parallelDecompressionHandlers; i++) {
        decompressionEventHandlers[i] = new ContentDecompressionHandler(maxContentChunkSize);
    }

    // Initialize delivery handlers
    DeliveryEventHandler[] deliveryEventHandlers = new DeliveryEventHandler[parallelDeliveryHandlers];
    for (int i = 0; i < parallelDeliveryHandlers; i++) {
        deliveryEventHandlers[i] = new DeliveryEventHandler(i, parallelDeliveryHandlers);
    }

    // Initialize handler for delivery event cleanup
    DeliveryEventCleanupHandler deliveryEventCleanupHandler = new DeliveryEventCleanupHandler();

    disruptor.handleEventsWith(contentReadTaskBatchProcessor).then(decompressionEventHandlers)
            .then(deliveryEventHandlers).then(deliveryEventCleanupHandler);

    disruptor.start();
    ringBuffer = disruptor.getRingBuffer();

    //Will add the gauge listener to periodically calculate the outbound messages in the ring
    MetricManager.gauge(MetricsConstants.DISRUPTOR_OUTBOUND_RING, Level.INFO, new OutBoundRingGauge());
}
项目:annotated-src    文件:Disruptor.java   
EventHandlerGroup<T> createWorkerPool(final Sequence[] barrierSequences, final WorkHandler<T>[] workHandlers) {
    final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
    final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
    consumerRepository.add(workerPool, sequenceBarrier);
    return new EventHandlerGroup<T>(this, consumerRepository, workerPool.getWorkerSequences());
}
项目:annotated-src    文件:EventProcessorInfo.java   
EventProcessorInfo(final EventProcessor eventprocessor, final EventHandler<T> handler, final SequenceBarrier barrier)
{
    this.eventprocessor = eventprocessor;
    this.handler = handler;
    this.barrier = barrier;
}
项目:annotated-src    文件:EventProcessorInfo.java   
@Override
public SequenceBarrier getBarrier()
{
    return barrier;
}
项目:jstorm    文件:DisruptorQueueImpl.java   
public SequenceBarrier get_barrier() {
    return _barrier;
}
项目:disruptor-code-analysis    文件:Disruptor.java   
/**
 * Get the {@link SequenceBarrier} used by a specific handler. Note that the {@link SequenceBarrier}
 * may be shared by multiple event handlers.
 *
 * @param handler the handler to get the barrier for.
 * @return the SequenceBarrier used by <i>handler</i>.
 */
public SequenceBarrier getBarrierFor(final EventHandler<T> handler) {
    return consumerRepository.getBarrierFor(handler);
}
项目:disruptor-code-analysis    文件:EventHandlerGroup.java   
/**
 * Create a dependency barrier for the processors in this group.
 * This allows custom event processors to have dependencies on
 * {@link com.lmax.disruptor.BatchEventProcessor}s created by the disruptor.
 *
 * @return a {@link SequenceBarrier} including all the processors in this group.
 */
public SequenceBarrier asSequenceBarrier()
{
    return disruptor.getRingBuffer().newBarrier(sequences);
}
项目:jstorm-0.9.6.3-    文件:RingBuffer.java   
/**
 * Create a new SequenceBarrier to be used by an EventProcessor to track which messages
 * are available to be read from the ring buffer given a list of sequences to track.
 *
 * @see SequenceBarrier
 * @param sequencesToTrack the additional sequences to track
 * @return A sequence barrier that will track the specified sequences.
 */
public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
{
    return sequencer.newBarrier(sequencesToTrack);
}
项目:learn_jstorm    文件:RingBuffer.java   
/**
 * Create a new SequenceBarrier to be used by an EventProcessor to track which messages
 * are available to be read from the ring buffer given a list of sequences to track.
 *
 * @see SequenceBarrier
 * @param sequencesToTrack the additional sequences to track
 * @return A sequence barrier that will track the specified sequences.
 */
public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
{
    return sequencer.newBarrier(sequencesToTrack);
}
项目:f1x    文件:VariableBlockSizeRingConsumer.java   
/**
 * @param ring             source byte ring containing inbound messages
 * @param sequenceBarrier  on which it is waiting.
 * @param delegate         is the delegate to which message are dispatched.
 * @param exceptionHandler to be called back when an error occurs
 *                         as {@link com.lmax.disruptor.Sequencer#INITIAL_CURSOR_VALUE}
 */
public VariableBlockSizeRingConsumer(ByteRing ring, SequenceBarrier sequenceBarrier, RingBufferBlockProcessor delegate, ExceptionHandler exceptionHandler) {
    super(ring, sequenceBarrier, delegate, exceptionHandler);
}
项目:f1x    文件:FixedBlockSizeRingConsumer.java   
/**
     * @param ring             source byte ring containing inbound messages
     * @param sequenceBarrier  on which it is waiting.
     * @param delegate         is the delegate to which message are dispatched.
     * @param exceptionHandler to be called back when an error occurs
*                         as {@link com.lmax.disruptor.Sequencer#INITIAL_CURSOR_VALUE}
     * @param blockSize
     */
    public FixedBlockSizeRingConsumer(ByteRing ring, SequenceBarrier sequenceBarrier, RingBufferBlockProcessor delegate, ExceptionHandler exceptionHandler, int blockSize) {
        super(ring, sequenceBarrier, delegate, exceptionHandler);
        this.blockSize = blockSize;
    }
项目:jstrom    文件:RingBuffer.java   
/**
 * Create a new SequenceBarrier to be used by an EventProcessor to track which messages are available to be read from the ring buffer given a list of
 * sequences to track.
 * 
 * @see SequenceBarrier
 * @param sequencesToTrack the additional sequences to track
 * @return A sequence barrier that will track the specified sequences.
 */
public SequenceBarrier newBarrier(Sequence... sequencesToTrack) {
    return sequencer.newBarrier(sequencesToTrack);
}
项目:Tstream    文件:RingBuffer.java   
/**
 * Create a new SequenceBarrier to be used by an EventProcessor to track which messages
 * are available to be read from the ring buffer given a list of sequences to track.
 *
 * @see SequenceBarrier
 * @param sequencesToTrack the additional sequences to track
 * @return A sequence barrier that will track the specified sequences.
 */
public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
{
    return sequencer.newBarrier(sequencesToTrack);
}