Java 类com.lmax.disruptor.RingBuffer 实例源码

项目:Okra-Ax    文件:DisruptorAdapterHandler.java   
@Override
protected void channelRead0(ChannelHandlerContext ctx, O msg) throws Exception {
    UUID uuid = CHANNEL_UUID.get(ctx.channel());
    if (null != uuid) {
        NetSession session = SESSIONS.get(uuid);
        if (null != session) {
            RingBuffer<ConcurrentEvent> ringBuffer = THREAD_LOCAL.get().getRingBuffer();
            long next = ringBuffer.next();
            try {
                ConcurrentEvent commandEvent = ringBuffer.get(next);
                commandEvent.setValues(newExecutor(session, msg));
            } finally {
                ringBuffer.publish(next);
            }
        }
    }
}
项目:game-executor    文件:DisruptorExecutorService.java   
@Override
    public void startup() {
        EventBus eventBus = disruptorDispatchThread.getEventBus();
        executorService = new NonOrderedQueuePoolExecutor(poolName, excutorSize);
        cycleEventHandler = new CycleEventHandler[excutorSize];
        for(int i = 0; i < excutorSize; i++){
            cycleEventHandler[i] = new CycleEventHandler(eventBus);
        }

        RingBuffer ringBuffer = disruptorDispatchThread.getRingBuffer();
        workerPool = new WorkerPool(ringBuffer, ringBuffer.newBarrier(), new FatalExceptionHandler(), cycleEventHandler);
        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());

        workerPool.start(executorService);

//        BatchEventProcessor<CycleEvent>[] batchEventProcessors = new BatchEventProcessor[excutorSize];
//        for(int i = 0; i < excutorSize; i++){
//            batchEventProcessors[i] = new BatchEventProcessor<>(ringBuffer, sequenceBarrier, cycleEventHandler[i]);
//            ringBuffer.addGatingSequences(batchEventProcessors[i].getSequence());
////            executorService.submit(batchEventProcessors[i]);
//        }
    }
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldSupportCustomProcessorsAsDependencies()
    throws Exception
{
    RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();

    final DelayedEventHandler delayedEventHandler = createDelayedEventHandler();

    CountDownLatch countDownLatch = new CountDownLatch(2);
    EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch);

    final BatchEventProcessor<TestEvent> processor =
        new BatchEventProcessor<TestEvent>(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler);
    disruptor.handleEventsWith(processor);
    disruptor.after(processor).handleEventsWith(handlerWithBarrier);

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler);
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldSupportHandlersAsDependenciesToCustomProcessors()
    throws Exception
{
    final DelayedEventHandler delayedEventHandler = createDelayedEventHandler();
    disruptor.handleEventsWith(delayedEventHandler);


    RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();
    CountDownLatch countDownLatch = new CountDownLatch(2);
    EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch);

    final SequenceBarrier sequenceBarrier = disruptor.after(delayedEventHandler).asSequenceBarrier();
    final BatchEventProcessor<TestEvent> processor =
        new BatchEventProcessor<TestEvent>(ringBuffer, sequenceBarrier, handlerWithBarrier);
    disruptor.handleEventsWith(processor);

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler);
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldSupportCustomProcessorsAndHandlersAsDependencies() throws Exception
{
    final DelayedEventHandler delayedEventHandler1 = createDelayedEventHandler();
    final DelayedEventHandler delayedEventHandler2 = createDelayedEventHandler();
    disruptor.handleEventsWith(delayedEventHandler1);


    RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer();
    CountDownLatch countDownLatch = new CountDownLatch(2);
    EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch);

    final SequenceBarrier sequenceBarrier = disruptor.after(delayedEventHandler1).asSequenceBarrier();
    final BatchEventProcessor<TestEvent> processor =
        new BatchEventProcessor<TestEvent>(ringBuffer, sequenceBarrier, delayedEventHandler2);

    disruptor.after(delayedEventHandler1).and(processor).handleEventsWith(handlerWithBarrier);

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler1, delayedEventHandler2);
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldMakeEntriesAvailableToFirstCustomProcessorsImmediately() throws Exception
{
    final CountDownLatch countDownLatch = new CountDownLatch(2);
    final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch);

    disruptor.handleEventsWith(
                               new EventProcessorFactory<TestEvent>()
                               {
                                   @Override
                                   public EventProcessor createEventProcessor(
                                                                              final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences)
                                   {
                                       assertEquals("Should not have had any barrier sequences", 0, barrierSequences.length);
                                       return new BatchEventProcessor<TestEvent>(
                                                                                 disruptor.getRingBuffer(), ringBuffer.newBarrier(
                                                                                                                                  barrierSequences), eventHandler);
                                   }
                               });

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch);
}
项目:disruptor-code-analysis    文件:DisruptorTest.java   
@Test
public void shouldHonourDependenciesForCustomProcessors() throws Exception
{
    final CountDownLatch countDownLatch = new CountDownLatch(2);
    final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch);
    final DelayedEventHandler delayedEventHandler = createDelayedEventHandler();

    disruptor.handleEventsWith(delayedEventHandler).then(
        new EventProcessorFactory<TestEvent>()
        {
            @Override
            public EventProcessor createEventProcessor(
                final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences)
            {
                assertSame("Should have had a barrier sequence", 1, barrierSequences.length);
                return new BatchEventProcessor<TestEvent>(
                    disruptor.getRingBuffer(), ringBuffer.newBarrier(
                    barrierSequences), eventHandler);
            }
        });

    ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler);
}
项目:cicada    文件:DisruptorTransfer.java   
@SuppressWarnings("unchecked")
public DisruptorTransfer(final SpanEventHandler spanEventHandler, final int buffSize) {
  // Executor executor = Executors.newCachedThreadPool();
  final ThreadFactory threadFactory = Executors.defaultThreadFactory();

  // The factory for the event
  final SpanEventFactory factory = new SpanEventFactory();

  // Specify the size of the ring buffer, must be power of 2.
  final int bufferSize = buffSize;

  // Construct the Disruptor
  disruptor = new Disruptor<SpanEvent>(factory, bufferSize, threadFactory);

  // Connect the handler
  // disruptor.handleEventsWith(new
  // SpanEventHandler("http://localhost:9080/upload"));
  disruptor.handleEventsWith(spanEventHandler);

  // Start the Disruptor, starts all threads running
  disruptor.start();

  final RingBuffer<SpanEvent> ringBuffer = disruptor.getRingBuffer();
  producer = new SpanEventProducer(ringBuffer);
}
项目:camunda-bpm-reactor    文件:RingBufferProcessor.java   
private RingBufferProcessor(String name,
                            ExecutorService executor,
                            int bufferSize,
                            WaitStrategy waitStrategy,
                            boolean shared,
                            boolean autoCancel) {
  super(name, executor, autoCancel);

  this.ringBuffer = RingBuffer.create(
    shared ? ProducerType.MULTI : ProducerType.SINGLE,
    new EventFactory<MutableSignal<E>>() {
      @Override
      public MutableSignal<E> newInstance() {
        return new MutableSignal<E>();
      }
    },
    bufferSize,
    waitStrategy
  );

  this.recentSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
  this.barrier = ringBuffer.newBarrier();
  //ringBuffer.addGatingSequences(recentSequence);
}
项目:camunda-bpm-reactor    文件:RingBufferWorkProcessor.java   
private RingBufferWorkProcessor(String name,
                                ExecutorService executor,
                                int bufferSize,
                                WaitStrategy waitStrategy,
                                boolean share,
                                boolean autoCancel) {
  super(name, executor, autoCancel);

  this.ringBuffer = RingBuffer.create(
    share ? ProducerType.MULTI : ProducerType.SINGLE,
    new EventFactory<MutableSignal<E>>() {
      @Override
      public MutableSignal<E> newInstance() {
        return new MutableSignal<E>();
      }
    },
    bufferSize,
    waitStrategy
  );

  ringBuffer.addGatingSequences(workSequence);

}
项目:ddth-queue    文件:DisruptorQueue.java   
/**
 * Init method.
 * 
 * @return
 */
public DisruptorQueue<ID, DATA> init() {
    /* single producer "seems" to offer better performance */
    ringBuffer = RingBuffer.createSingleProducer(EVENT_FACTORY, ringSize);
    // ringBuffer = RingBuffer.createMultiProducer(EVENT_FACTORY, ringSize);

    if (!isEphemeralDisabled()) {
        int ephemeralBoundSize = Math.max(0, getEphemeralMaxSize());
        ephemeralStorage = new ConcurrentHashMap<>(
                ephemeralBoundSize > 0 ? Math.min(ephemeralBoundSize, ringSize) : ringSize);
    }

    consumedSeq = new Sequence();
    ringBuffer.addGatingSequences(consumedSeq);
    long cursor = ringBuffer.getCursor();
    consumedSeq.set(cursor);
    knownPublishedSeq = cursor;

    return this;
}
项目:jetstream    文件:SingleConsumerDisruptorQueue.java   
/**
 * Construct a blocking queue based on disruptor.
 * 
 * @param bufferSize
 *            ring buffer size
 * @param singleProducer
 *            whether only single thread produce events.
 */
public SingleConsumerDisruptorQueue(int bufferSize, boolean singleProducer) {
    if (singleProducer) {
        ringBuffer = RingBuffer.createSingleProducer(new Factory<T>(), normalizeBufferSize(bufferSize));
    } else {
        ringBuffer = RingBuffer.createMultiProducer(new Factory<T>(), normalizeBufferSize(bufferSize));
    }

    consumedSeq = new Sequence();
    ringBuffer.addGatingSequences(consumedSeq);
    barrier = ringBuffer.newBarrier();

    long cursor = ringBuffer.getCursor();
    consumedSeq.set(cursor);
    knownPublishedSeq = cursor;
}
项目:Okra    文件:DisruptorAdapterHandler.java   
@Override
protected void channelRead0(ChannelHandlerContext ctx, O msg) throws Exception {
    UUID uuid = CHANNEL_UUID.get(ctx.channel());
    if (null != uuid) {
        Session session = SESSIONS.get(uuid);
        if (null != session) {
            RingBuffer<ConcurrentEvent> ringBuffer = THREAD_LOCAL.get().getRingBuffer();
            long next = ringBuffer.next();
            try {
                ConcurrentEvent commandEvent = ringBuffer.get(next);
                commandEvent.setValues(newExecutor(session, msg));
            } finally {
                ringBuffer.publish(next);
            }
        }
    }
}
项目:smart-cqrs    文件:EventBus.java   
public void post(Object event) {
    Class<?> eventClass = event.getClass();
    Disruptor<BaseEvent> eventDisruptor = disruptorPool.get(eventClass);
    if(eventDisruptor == null) {
        List<RegistedEventHandler> registedEventHandlers = handlesMap.get(eventClass);
        if(registedEventHandlers == null || registedEventHandlers.size() == 0) {
            throw new RuntimeException("The " + eventClass.getSimpleName() + " event dosen't regist any eventHandler.");
        }
        eventDisruptor = createDisruptor(eventClass, registedEventHandlers);
        disruptorPool.put(eventClass, eventDisruptor);
    }
    // check whether has event Repository definition.
    if(eventRepository != null) {
        eventRepository.save(event);
    }
    RingBuffer<BaseEvent> ringBuffer = eventDisruptor.getRingBuffer();
    BaseEventProducer producer = new BaseEventProducer(ringBuffer);
    producer.onData(event);
}
项目:hbase    文件:AbstractFSWAL.java   
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
    WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
    throws IOException {
  if (this.closed) {
    throw new IOException(
        "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
  }
  MutableLong txidHolder = new MutableLong();
  MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
    txidHolder.setValue(ringBuffer.next());
  });
  long txid = txidHolder.longValue();
  try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
    FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore);
    entry.stampRegionSequenceId(we);
    ringBuffer.get(txid).load(entry);
  } finally {
    ringBuffer.publish(txid);
  }
  return txid;
}
项目:couchbase-jvm-core    文件:CouchbaseNodeTest.java   
@Test
public void shouldSetDispatchedHostnameAfterSend() {
    ServiceRegistry registryMock = mock(ServiceRegistry.class);
    ServiceFactory serviceFactory = mock(ServiceFactory.class);

    Service binaryServiceMock = mock(Service.class);
    when(binaryServiceMock.type()).thenReturn(ServiceType.BINARY);
    when(binaryServiceMock.states()).thenReturn(Observable.just(LifecycleState.CONNECTED));
    when(binaryServiceMock.connect()).thenReturn(Observable.just(LifecycleState.CONNECTED));
    when(serviceFactory.create(anyString(), anyString(), anyString(), anyString(),
            eq(0), same(environment), eq(ServiceType.BINARY), any(RingBuffer.class))).thenReturn(binaryServiceMock);


    CouchbaseNode node = new CouchbaseNode(host, registryMock, environment, null, serviceFactory);
   node.addService(new AddServiceRequest(ServiceType.BINARY, "bucket", null, 0, host))
            .toBlocking().single();

    CouchbaseRequest request = mock(CouchbaseRequest.class);
    node.send(request);

    verify(request, times(1)).dispatchHostname(any(String.class));

}
项目:bifroest    文件:DisruptorEventBus.java   
@Override
public void synchronousFire( final Object event ) {
    eventCount.increment();
    CountDownLatch latch = new CountDownLatch( handlers.size() );
    try {
        RingBuffer<StatisticEventHolder> ringBuffer = disruptor.getRingBuffer();
        long sequence = ringBuffer.next();
        try {
            StatisticEventHolder holder = ringBuffer.get( sequence );
            holder.set( event );
            holder.setLatch( latch );
        } finally {
            ringBuffer.publish( sequence );
        }
        latch.await();
    } catch ( InterruptedException e ) {
        return;
    }
}
项目:andes    文件:ConcurrentContentReadTaskBatchProcessor.java   
/**
 * Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when
 * the {@link com.lmax.disruptor.EventHandler#onEvent(Object, long, boolean)} method returns.
 *
 * @param ringBuffer      to which events are published.
 * @param sequenceBarrier on which it is waiting.
 * @param eventHandler    is the delegate to which events are dispatched.
 * @param turn            is the value of, sequence % groupCount this batch processor process events. Turn must be
 *                        less than groupCount
 * @param groupCount      total number of concurrent batch processors for the event type
 * @param batchSize       size limit of total content size to batch. This is a loose limit
 */
public ConcurrentContentReadTaskBatchProcessor(final RingBuffer<DeliveryEventData> ringBuffer,
        final SequenceBarrier sequenceBarrier, final ContentCacheCreator eventHandler, long turn, int groupCount,
        int batchSize) {
    if (turn >= groupCount) {
        throw new IllegalArgumentException("Turn should be less than groupCount");
    }

    this.ringBuffer = ringBuffer;
    this.sequenceBarrier = sequenceBarrier;
    this.eventHandler = eventHandler;
    this.turn = turn;
    this.groupCount = groupCount;
    this.batchSize = batchSize;

    exceptionHandler = new DeliveryExceptionHandler();
    running = new AtomicBoolean(false);
    if (eventHandler instanceof SequenceReportingEventHandler) {
        ((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);
    }
}
项目:disruptorqueue    文件:SingleConsumerDisruptorQueue.java   
/**
 * Construct a blocking queue based on disruptor.
 * 
 * @param bufferSize
 *            ring buffer size
 * @param singleProducer
 *            whether only single thread produce events.
 */
public SingleConsumerDisruptorQueue(int bufferSize, boolean singleProducer) {
    if (singleProducer) {
        ringBuffer = RingBuffer.createSingleProducer(new Factory<T>(), normalizeBufferSize(bufferSize));
    } else {
        ringBuffer = RingBuffer.createMultiProducer(new Factory<T>(), normalizeBufferSize(bufferSize));
    }

    consumedSeq = new Sequence();
    ringBuffer.addGatingSequences(consumedSeq);
    barrier = ringBuffer.newBarrier();

    long cursor = ringBuffer.getCursor();
    consumedSeq.set(cursor);
    knownPublishedSeq = cursor;
}
项目:dsys-snio    文件:RingBufferProvider.java   
RingBufferProvider(@Nonnegative final int capacity, @Nonnull final Factory<T> factory,
        @Nonnull final MessageBufferConsumer<T> appIn) {
    if (appIn == null) {
        throw new NullPointerException("appIn == null");
    }
    this.waitOut = new WakeupWaitStrategy();
    this.waitIn = null;
    final EventFactory<T> evfactory = wrapFactory(factory);
    this.out = RingBuffer.createMultiProducer(evfactory, capacity, waitOut);
    this.in = null;
    this.attachOut = new Object[capacity];
    this.attachIn = null;
    this.appOut = new RingBufferProducer<>(out, attachOut);
    this.chnIn = new RingBufferConsumer<>(out, attachOut);
    this.chnOut = appIn.createProducer();
    this.appIn = appIn;
    this.internalConsumer = false;
}
项目:dsys-snio    文件:RingBufferConsumer.java   
RingBufferConsumer(@Nonnull final RingBuffer<T> buffer, @Nonnull final Object[] attachments) {
    if (buffer == null) {
        throw new NullPointerException("buffer == null");
    }
    if (attachments == null) {
        throw new NullPointerException("attachments == null");
    }
    if (buffer.getBufferSize() != attachments.length) {
        throw new IllegalArgumentException("buffer.getBufferSize() != attachments.length");
    }
    this.buffer = buffer;
    this.attachments = attachments;
    this.barrier = buffer.newBarrier();
    this.sequence = new Sequence();
    buffer.addGatingSequences(sequence);
    this.cursor = sequence.get();
    this.available = sequence.get();
}
项目:Surf    文件:Util.java   
public static Worker createWorker(File conf, EventHandler<KinesisEvent> handler, String appName)throws IOException{
    Executor executor = Executors.newCachedThreadPool();
    Disruptor<KinesisEvent> disruptor = new Disruptor<>(KinesisEvent.EVENT_FACTORY, 128, executor);

    disruptor.handleEventsWith(handler);
    RingBuffer<KinesisEvent> buffer = disruptor.start();

    Properties props = new Properties();
    props.load(new FileReader(conf));
    // Generate a unique worker ID
    String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
    String accessid = props.getProperty("aws-access-key-id");
    String secretkey = props.getProperty("aws-secret-key");
    String streamname = props.getProperty("aws-kinesis-stream-name");
    BasicAWSCredentials creds = new BasicAWSCredentials(accessid, secretkey);
    CredProvider credprovider = new CredProvider(creds);
    KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(appName, streamname,  credprovider, workerId);

    Worker worker = new Worker(new RecordProcessorFactory(buffer), config, new MetricsFactory());
    return worker;
}
项目:couchbase-jvm-core    文件:AbstractDynamicService.java   
protected AbstractDynamicService(final String hostname, final String bucket, final String username, final String password, final int port,
    final CoreEnvironment env, final int minEndpoints,
    final RingBuffer<ResponseEvent> responseBuffer, final EndpointFactory endpointFactory) {
    super(minEndpoints == 0 ? LifecycleState.IDLE : LifecycleState.DISCONNECTED);
    this.initialState = minEndpoints == 0 ? LifecycleState.IDLE : LifecycleState.DISCONNECTED;
    this.hostname = hostname;
    this.bucket = bucket;
    this.username = username;
    this.password = password;
    this.port = port;
    this.env = env;
    this.minEndpoints = minEndpoints;
    this.responseBuffer = responseBuffer;
    this.endpointFactory = endpointFactory;
    endpointStates = new EndpointStateZipper(initialState);
    endpoints = new Endpoint[minEndpoints];
    endpointStates.states().subscribe(new Action1<LifecycleState>() {
        @Override
        public void call(LifecycleState lifecycleState) {
            transitionState(lifecycleState);
        }
    });
}
项目:godeye    文件:DisruptorTransfer.java   
public DisruptorTransfer(final SpanEventHandler spanEventHandler, final int buffSize) {
  final ThreadFactory threadFactory = Executors.defaultThreadFactory();
  final SpanEventFactory factory = new SpanEventFactory();
  final int bufferSize = buffSize;// RingBuffer 大小,必须是 2 的 N 次方;
  disruptor = new Disruptor<>(factory, bufferSize, threadFactory);

  disruptor.handleEventsWith(spanEventHandler);// disruptor.handleEventsWith(new SpanEventHandler("http://localhost:9080/upload"));
  disruptor.start();// Start the Disruptor, starts all threads running
  final RingBuffer<SpanEvent> ringBuffer = disruptor.getRingBuffer();
  producer = new SpanEventProducer(ringBuffer);
}
项目:tapir    文件:PcapStream.java   
default void loop(Disruptor disruptor, PcapHandle handle) throws Exception {
    RingBuffer<PacketContainer> ringBuffer = disruptor.getRingBuffer();

    handle.loop(0, (RawPacketListener) packet -> {
        long sequence = ringBuffer.next();

        PacketContainer container = ringBuffer.get(sequence);
        container.clear();
        container.setDlt(handle.getDlt());
        container.setTimestamp(handle.getTimestamp());
        container.setRawPacket(packet);

        ringBuffer.publish(sequence);
    });
}
项目:Okra-Ax    文件:GpcEventDispatcher.java   
@Override
protected void channelRead0(ChannelHandlerContext ctx, GpcCall msg) throws Exception {
    NetSession session = NET_SESSION_MAP.get(ctx.channel().id());
    if (null == session)
        return;
    RingBuffer<ConcurrentEvent> ringBuffer = THREAD_LOCAL.get().getRingBuffer();
    long next = ringBuffer.next();
    try {
        ConcurrentEvent commandEvent = ringBuffer.get(next);
        commandEvent.setValues(newExecutor(session, msg));
    } finally {
        ringBuffer.publish(next);
    }
}
项目:Okra-Ax    文件:DisruptorAdapterBy41xHandler.java   
@Override
protected void channelRead0(ChannelHandlerContext ctx, O msg) throws Exception {
    NetSession session = SESSIONS.get(ctx.channel().id());
    if (null == session) {
        return;
    }
    RingBuffer<ConcurrentEvent> ringBuffer = THREAD_LOCAL.get().getRingBuffer();
    long next = ringBuffer.next();
    try {
        ConcurrentEvent commandEvent = ringBuffer.get(next);
        commandEvent.setValues(newExecutor(session, msg));
    } finally {
        ringBuffer.publish(next);
    }
}
项目:elastic-rabbitmq    文件:LongEventMain.java   
public static void main(String[] args) throws Exception {
        Executor executor = Executors.newCachedThreadPool();

        LongEventFactory eventFactory = new LongEventFactory();
        int bufferSize = 1024;

//        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, bufferSize, executor);
//        disruptor.handleEventsWith(new LongEventHandler());
//        disruptor.start();

        Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
        disruptor.handleEventsWith((event, sequence, endOfBatch) -> {System.out.println("Event: " + event);
            System.out.println("CurrentThreadName:" + Thread.currentThread().getName());
        });
        disruptor.start();

        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        LongEventProducer producer = new LongEventProducer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++) {
            bb.putLong(0, l);
            ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);

           // producer.onData(bb);
            //Thread.sleep(1000);
        }
    }
项目:disruptor-code-analysis    文件:Disruptor.java   
/**
 * Create a new Disruptor.
 *
 * @param eventFactory   the factory to create events in the ring buffer.
 * @param ringBufferSize the size of the ring buffer, must be power of 2.
 * @param threadFactory  a {@link ThreadFactory} to create threads for processors.
 * @param producerType   the claim strategy to use for the ring buffer.
 * @param waitStrategy   the wait strategy to use for the ring buffer.
 */
public Disruptor(
        final EventFactory<T> eventFactory,
        final int ringBufferSize,
        final ThreadFactory threadFactory,
        final ProducerType producerType,
        final WaitStrategy waitStrategy) {
    this(RingBuffer.create(
            producerType, eventFactory, ringBufferSize, waitStrategy),
            new BasicExecutor(threadFactory));
}
项目:disruptor-code-analysis    文件:Disruptor.java   
/**
 * <p>Starts the event processors and returns the fully configured ring buffer.</p>
 * <p>
 * <p>The ring buffer is set up to prevent overwriting any entry that is yet to
 * be processed by the slowest event processor.</p>
 * <p>
 * <p>This method must only be called once after all event processors have been added.</p>
 *
 * @return the configured ring buffer.
 */
public RingBuffer<T> start() {
    final Sequence[] gatingSequences = consumerRepository.getLastSequenceInChain(true);
    ringBuffer.addGatingSequences(gatingSequences);

    checkOnlyStartedOnce();
    for (final ConsumerInfo consumerInfo : consumerRepository) {
        consumerInfo.start(executor);
    }

    return ringBuffer;
}
项目:disruptor-code-analysis    文件:OneToOneTranslatorThroughputTest.java   
@Override
protected long runDisruptorPass() throws InterruptedException
{
    MutableLong value = this.value;

    final CountDownLatch latch = new CountDownLatch(1);
    long expectedCount = ringBuffer.getMinimumGatingSequence() + ITERATIONS;

    handler.reset(latch, expectedCount);
    long start = System.currentTimeMillis();

    final RingBuffer<ValueEvent> rb = ringBuffer;

    for (long l = 0; l < ITERATIONS; l++)
    {
        value.set(l);
        rb.publishEvent(Translator.INSTANCE, value);
    }

    latch.await();
    long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start);
    waitForEventProcessorSequence(expectedCount);

    failIfNot(expectedResult, handler.getValue());

    return opsPerSecond;
}
项目:disruptor-code-analysis    文件:OneToOneSequencedBatchThroughputTest.java   
@Override
protected long runDisruptorPass() throws InterruptedException
{
    final CountDownLatch latch = new CountDownLatch(1);
    long expectedCount = batchEventProcessor.getSequence().get() + ITERATIONS * BATCH_SIZE;
    handler.reset(latch, expectedCount);
    executor.submit(batchEventProcessor);
    long start = System.currentTimeMillis();

    final RingBuffer<ValueEvent> rb = ringBuffer;

    for (long i = 0; i < ITERATIONS; i++)
    {
        long hi = rb.next(BATCH_SIZE);
        long lo = hi - (BATCH_SIZE - 1);
        for (long l = lo; l <= hi; l++)
        {
            rb.get(l).setValue(i);
        }
        rb.publish(lo, hi);
    }

    latch.await();
    long opsPerSecond = (BATCH_SIZE * ITERATIONS * 1000L) / (System.currentTimeMillis() - start);
    waitForEventProcessorSequence(expectedCount);
    batchEventProcessor.halt();

    failIfNot(expectedResult, handler.getValue());

    return opsPerSecond;
}
项目:disruptor-code-analysis    文件:OneToOneSequencedPollerThroughputTest.java   
@Override
protected long runDisruptorPass() throws InterruptedException
{
    final CountDownLatch latch = new CountDownLatch(1);
    long expectedCount = poller.getSequence().get() + ITERATIONS;
    pollRunnable.reset(latch, expectedCount);
    executor.submit(pollRunnable);
    long start = System.currentTimeMillis();

    final RingBuffer<ValueEvent> rb = ringBuffer;

    for (long i = 0; i < ITERATIONS; i++)
    {
        long next = rb.next();
        rb.get(next).setValue(i);
        rb.publish(next);
    }

    latch.await();
    long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start);
    waitForEventProcessorSequence(expectedCount);
    pollRunnable.halt();

    failIfNot(expectedResult, pollRunnable.getValue());

    return opsPerSecond;
}
项目:disruptor-code-analysis    文件:OneToOneSequencedThroughputTest.java   
@Override
protected long runDisruptorPass() throws InterruptedException
{
    final CountDownLatch latch = new CountDownLatch(1);
    long expectedCount = batchEventProcessor.getSequence().get() + ITERATIONS;
    handler.reset(latch, expectedCount);
    executor.submit(batchEventProcessor);
    long start = System.currentTimeMillis();

    final RingBuffer<ValueEvent> rb = ringBuffer;

    for (long i = 0; i < ITERATIONS; i++)
    {
        long next = rb.next();
        rb.get(next).setValue(i);
        rb.publish(next);
    }

    latch.await();
    long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start);
    waitForEventProcessorSequence(expectedCount);
    batchEventProcessor.halt();

    failIfNot(expectedResult, handler.getValue());

    return opsPerSecond;
}
项目:disruptor-code-analysis    文件:OneToOneSequencedLongArrayThroughputTest.java   
@Override
protected long runDisruptorPass() throws InterruptedException
{
    final CountDownLatch latch = new CountDownLatch(1);
    long expectedCount = batchEventProcessor.getSequence().get() + ITERATIONS;
    handler.reset(latch, ITERATIONS);
    executor.submit(batchEventProcessor);
    long start = System.currentTimeMillis();

    final RingBuffer<long[]> rb = ringBuffer;

    for (long i = 0; i < ITERATIONS; i++)
    {
        long next = rb.next();
        long[] event = rb.get(next);
        for (int j = 0; j < event.length; j++)
        {
            event[j] = i;
        }
        rb.publish(next);
    }

    latch.await();
    long opsPerSecond = (ITERATIONS * ARRAY_SIZE * 1000L) / (System.currentTimeMillis() - start);
    waitForEventProcessorSequence(expectedCount);
    batchEventProcessor.halt();

    PerfTestUtil.failIf(0, handler.getValue());

    return opsPerSecond;
}
项目:disruptor-code-analysis    文件:OneToOneOnHeapThroughputTest.java   
@Override
protected long runDisruptorPass() throws Exception
{
    byte[] data = this.data;

    final CountDownLatch latch = new CountDownLatch(1);
    long expectedCount = processor.getSequence().get() + ITERATIONS;
    handler.reset(latch, ITERATIONS);
    executor.execute(processor);
    long start = System.currentTimeMillis();

    final RingBuffer<ByteBuffer> rb = buffer;

    for (long i = 0; i < ITERATIONS; i++)
    {
        long next = rb.next();
        ByteBuffer event = rb.get(next);
        event.clear();
        event.put(data);
        event.flip();
        rb.publish(next);
    }

    latch.await();
    long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start);
    waitForEventProcessorSequence(expectedCount);
    processor.halt();

    return opsPerSecond;
}
项目:disruptor-code-analysis    文件:ValueBatchPublisher.java   
public ValueBatchPublisher(
    final CyclicBarrier cyclicBarrier,
    final RingBuffer<ValueEvent> ringBuffer,
    final long iterations,
    final int batchSize)
{
    this.cyclicBarrier = cyclicBarrier;
    this.ringBuffer = ringBuffer;
    this.iterations = iterations;
    this.batchSize = batchSize;
}
项目:disruptor-code-analysis    文件:ValuePublisher.java   
public ValuePublisher(
    final CyclicBarrier cyclicBarrier, final RingBuffer<ValueEvent> ringBuffer, final long iterations)
{
    this.cyclicBarrier = cyclicBarrier;
    this.ringBuffer = ringBuffer;
    this.iterations = iterations;
}
项目:disruptor-code-analysis    文件:LongArrayPublisher.java   
public LongArrayPublisher(
    final CyclicBarrier cyclicBarrier,
    final RingBuffer<long[]> ringBuffer,
    final long iterations,
    final long arraySize)
{
    this.cyclicBarrier = cyclicBarrier;
    this.ringBuffer = ringBuffer;
    this.iterations = iterations;
    this.arraySize = arraySize;
}
项目:disruptor-code-analysis    文件:PullWithBatchedPoller.java   
public static void main(String[] args) throws Exception
{
    int batchSize = 40;
    RingBuffer<BatchedPoller.DataEvent<Object>> ringBuffer = RingBuffer.createMultiProducer(BatchedPoller.DataEvent.factory(), 1024);

    BatchedPoller<Object> poller = new BatchedPoller<Object>(ringBuffer, batchSize);

    Object value = poller.poll();

    // Value could be null if no events are available.
    if (null != value)
    {
        // Process value.
    }
}