Java 类com.lmax.disruptor.YieldingWaitStrategy 实例源码

项目:incubator-omid    文件:DisruptorModule.java   
@Override
protected void configure() {
    switch (config.getWaitStrategyEnum()) {
    // A low-cpu usage Disruptor configuration for using in local/test environments
    case LOW_CPU:
         bind(WaitStrategy.class).annotatedWith(Names.named("PersistenceStrategy")).to(BlockingWaitStrategy.class);
         bind(WaitStrategy.class).annotatedWith(Names.named("ReplyStrategy")).to(BlockingWaitStrategy.class);
         bind(WaitStrategy.class).annotatedWith(Names.named("RetryStrategy")).to(BlockingWaitStrategy.class);
         break;
    // The default high-cpu usage Disruptor configuration for getting high throughput on production environments
    case HIGH_THROUGHPUT:
    default:
         bind(WaitStrategy.class).annotatedWith(Names.named("PersistenceStrategy")).to(BusySpinWaitStrategy.class);
         bind(WaitStrategy.class).annotatedWith(Names.named("ReplyStrategy")).to(BusySpinWaitStrategy.class);
         bind(WaitStrategy.class).annotatedWith(Names.named("RetryStrategy")).to(YieldingWaitStrategy.class);
         break;
    }
    bind(RequestProcessor.class).to(RequestProcessorImpl.class).in(Singleton.class);
    bind(PersistenceProcessor.class).to(PersistenceProcessorImpl.class).in(Singleton.class);
    bind(ReplyProcessor.class).to(ReplyProcessorImpl.class).in(Singleton.class);
    bind(RetryProcessor.class).to(RetryProcessorImpl.class).in(Singleton.class);

}
项目:incubator-omid    文件:TestRetryProcessor.java   
@Test(timeOut = 10_000)
public void testRetriedRequestForAnExistingTxReturnsCommit() throws Exception {
    ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool();

    // The element to test
    RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);

    // Test we'll reply with a commit for a retry request when the start timestamp IS in the commit table
    commitTable.getWriter().addCommittedTransaction(ST_TX_1, CT_TX_1);
    retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
    ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
    ArgumentCaptor<Long> secondTSCapture = ArgumentCaptor.forClass(Long.class);

    verify(replyProc, timeout(100).times(1)).sendCommitResponse(firstTSCapture.capture(),
                                                                secondTSCapture.capture(),
                                                                any(Channel.class));

    long startTS = firstTSCapture.getValue();
    long commitTS = secondTSCapture.getValue();
    assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as ST_TX_1");
    assertEquals(commitTS, CT_TX_1, "Captured timestamp should be the same as CT_TX_1");

}
项目:incubator-omid    文件:TestRetryProcessor.java   
@Test(timeOut = 10_000)
public void testRetriedRequestForInvalidatedTransactionReturnsAnAbort() throws Exception {

    // Invalidate the transaction
    commitTable.getClient().tryInvalidateTransaction(ST_TX_1);

    // Pre-start verification: Validate that the transaction is invalidated
    // NOTE: This test should be in the a test class for InMemoryCommitTable
    Optional<CommitTimestamp> invalidTxMarker = commitTable.getClient().getCommitTimestamp(ST_TX_1).get();
    Assert.assertTrue(invalidTxMarker.isPresent());
    Assert.assertEquals(invalidTxMarker.get().getValue(), InMemoryCommitTable.INVALID_TRANSACTION_MARKER);

    ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool();

    // The element to test
    RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);

    // Test we return an Abort to a retry request when the transaction id IS in the commit table BUT invalidated
    retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
    ArgumentCaptor<Long> startTSCapture = ArgumentCaptor.forClass(Long.class);
    verify(replyProc, timeout(100).times(1)).sendAbortResponse(startTSCapture.capture(), any(Channel.class));
    long startTS = startTSCapture.getValue();
    Assert.assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as NON_EXISTING_ST_TX");

}
项目:ddth-queue    文件:QndDisruptor2.java   
@SuppressWarnings("unchecked")
static void qndSingleThread(int numItems) {
    final AtomicLong COUNTER_RECEIVED = new AtomicLong(0);
    final Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent.FACTORY, 128,
            Executors.defaultThreadFactory(), ProducerType.MULTI, new YieldingWaitStrategy());
    disruptor.handleEventsWith(
            (event, sequence, endOfBatch) -> COUNTER_RECEIVED.incrementAndGet());
    disruptor.start();

    final long t = System.currentTimeMillis();
    for (int i = 0; i < numItems; i++) {
        disruptor.publishEvent((event, seq) -> event.set(seq));
    }
    long d = System.currentTimeMillis() - t;
    NumberFormat nf = NumberFormat.getInstance();
    System.out.println("========== qndSingleThread:");
    System.out.println("Sent: " + nf.format(numItems) + " / Received: "
            + nf.format(COUNTER_RECEIVED.get()) + " / Duration: " + d + " / Speed: "
            + NumberFormat.getInstance().format((numItems * 1000.0 / d)) + " items/sec");

    disruptor.shutdown();
}
项目:jboss-fuse-examples    文件:DisruptorService.java   
public void init() throws NullPointerException {
    if (factory == null) {
        throw new NullPointerException("factory == null");
    }

    if (bufferSize <= 0) {
        throw new NullPointerException("bufferSize <= 0");
    }

    if (threadFactory == null) {
        throw new NullPointerException("threadFactory == null");
    }

    producers = new ArrayList<ExchangeEventProducer<T>>();
    disruptor = new Disruptor<Exchange>(factory, bufferSize, threadFactory, ProducerType.SINGLE, new YieldingWaitStrategy());
}
项目:log4j2    文件:AsyncLogger.java   
private static WaitStrategy createWaitStrategy() {
    final String strategy = System.getProperty("AsyncLogger.WaitStrategy");
    LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
    if ("Sleep".equals(strategy)) {
        LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
        return new SleepingWaitStrategy();
    } else if ("Yield".equals(strategy)) {
        LOGGER.debug("disruptor event handler uses YieldingWaitStrategy");
        return new YieldingWaitStrategy();
    } else if ("Block".equals(strategy)) {
        LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
        return new BlockingWaitStrategy();
    }
    LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
    return new SleepingWaitStrategy();
}
项目:jaf-examples    文件:Main.java   
public static void main(String[] args) {
    EventFactory<HelloEvent> eventFactory = new HelloEventFactory();
    int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必须是 2 的 N 次方;

    Disruptor<HelloEvent> disruptor = new Disruptor<HelloEvent>(
            eventFactory, ringBufferSize, Executors.defaultThreadFactory(),
            ProducerType.SINGLE, new YieldingWaitStrategy());

    EventHandler<HelloEvent> eventHandler = new HelloEventHandler();
    disruptor.handleEventsWith(eventHandler, eventHandler);

    disruptor.start();

}
项目:incubator-omid    文件:TestRetryProcessor.java   
@Test(timeOut = 10_000)
public void testRetriedRequestForANonExistingTxReturnsAbort() throws Exception {
    ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool();

    // The element to test
    RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);

    // Test we'll reply with an abort for a retry request when the start timestamp IS NOT in the commit table
    retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, new MonitoringContext(metrics));
    ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);

    verify(replyProc, timeout(100).times(1)).sendAbortResponse(firstTSCapture.capture(), any(Channel.class));
    long startTS = firstTSCapture.getValue();
    assertEquals(startTS, NON_EXISTING_ST_TX, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
}
项目:disruptor-code-analysis    文件:OneToOneTranslatorThroughputTest.java   
@SuppressWarnings("unchecked")
public OneToOneTranslatorThroughputTest()
{
    Disruptor<ValueEvent> disruptor =
        new Disruptor<ValueEvent>(
            ValueEvent.EVENT_FACTORY,
            BUFFER_SIZE, executor,
            ProducerType.SINGLE,
            new YieldingWaitStrategy());
    disruptor.handleEventsWith(handler);
    this.ringBuffer = disruptor.start();
}
项目:ddth-queue    文件:QndDisruptor2.java   
@SuppressWarnings("unchecked")
static void qndMultiThreads(int numItems, int numThreads) throws InterruptedException {
    final AtomicLong COUNTER_RECEIVED = new AtomicLong(0);
    final Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent.FACTORY, 128,
            Executors.defaultThreadFactory(), ProducerType.MULTI, new YieldingWaitStrategy());
    disruptor.handleEventsWith(
            (event, sequence, endOfBatch) -> COUNTER_RECEIVED.incrementAndGet());
    disruptor.start();

    final long t = System.currentTimeMillis();
    final int numItemsPerThread = numItems / numThreads;
    final Thread[] THREADS = new Thread[numThreads];
    for (int i = 0; i < THREADS.length; i++) {
        THREADS[i] = new Thread() {
            public void run() {
                for (int i = 0; i < numItemsPerThread; i++) {
                    disruptor.publishEvent((event, seq) -> event.set(seq));
                }
            }
        };
        THREADS[i].start();
    }
    for (Thread thread : THREADS) {
        thread.join();
    }

    long d = System.currentTimeMillis() - t;
    NumberFormat nf = NumberFormat.getInstance();
    System.out.println("========== qndSingleThread:");
    System.out.println("Sent: " + nf.format(numItems) + " / Received: "
            + nf.format(COUNTER_RECEIVED.get()) + " / Duration: " + d + " / Speed: "
            + NumberFormat.getInstance().format((numItems * 1000.0 / d)) + " items/sec");

    disruptor.shutdown();
}
项目:disruptor-spring-manager    文件:WaitStrategyTypeTest.java   
@Test
public void test_All_WaitStrategies() {
    assertTrue(WaitStrategyType.BLOCKING.instance() instanceof BlockingWaitStrategy);
    assertTrue(WaitStrategyType.BUSY_SPIN.instance() instanceof BusySpinWaitStrategy);
    assertTrue(WaitStrategyType.LITE_BLOCKING.instance() instanceof LiteBlockingWaitStrategy);
    assertTrue(WaitStrategyType.SLEEPING_WAIT.instance() instanceof SleepingWaitStrategy);
    assertTrue(WaitStrategyType.YIELDING.instance() instanceof YieldingWaitStrategy);
}
项目:logback-ext    文件:WaitStrategyFactory.java   
public static WaitStrategy createFromType(String name) {
    if ("BusySpin".equalsIgnoreCase(name)) {
        return new BusySpinWaitStrategy();
    } else if ("Blocking".equalsIgnoreCase(name)) {
        return new BlockingWaitStrategy();
    } else if ("Yielding".equalsIgnoreCase(name)) {
        return new YieldingWaitStrategy();
    } else if ("Sleeping".equalsIgnoreCase(name)) {
        return new SleepingWaitStrategy();
    } else {
        throw new IllegalArgumentException("Invalid or unsupported wait strategy type '" + name + "'");
    }
}
项目:log4j2    文件:AsyncLoggerConfigHelper.java   
private static WaitStrategy createWaitStrategy() {
    final String strategy = System
            .getProperty("AsyncLoggerConfig.WaitStrategy");
    LOGGER.debug("property AsyncLoggerConfig.WaitStrategy={}", strategy);
    if ("Sleep".equals(strategy)) {
        return new SleepingWaitStrategy();
    } else if ("Yield".equals(strategy)) {
        return new YieldingWaitStrategy();
    } else if ("Block".equals(strategy)) {
        return new BlockingWaitStrategy();
    }
    return new SleepingWaitStrategy();
}
项目:disruptor-vs-queue    文件:EventHandlerRunner.java   
/**
 * Create a pool of consumers, all reading from a ring buffer to process events.
 *
 * @param NUM_THREADS total number of consumers to create in the pool
 * @param totalEventsToPublish total events to publish to the queue
 * @param ringConsumers the placeholder for creating consumers
 * @throws InterruptedException if there is a problem talking to the blocking queue
 */
public void run(int NUM_THREADS, int totalEventsToPublish, RingEventHandlerConsumer<T>[] ringConsumers) throws InterruptedException {
    final int RING_BUFFER_SIZE = 1024;

    ExecutorService executor;
    if(NUM_THREADS == 1) {
        executor = Executors.newSingleThreadExecutor();
    } else {
        executor = Executors.newFixedThreadPool(NUM_THREADS);
    }

    Disruptor<T> disruptor =
            new Disruptor<>(disruptorEventFactory, RING_BUFFER_SIZE, executor,
                    ProducerType.SINGLE,
                    new YieldingWaitStrategy());

    for(int i = 0; i < NUM_THREADS; i++) {
        ringConsumers[i] = new RingEventHandlerConsumer<T>(i, NUM_THREADS, consumerFactory.createConsumer(i, NUM_THREADS));
    }

    disruptor.handleEventsWith(ringConsumers);

    RingBuffer<T> eventRingBuffer = disruptor.start();

    final long startTime = System.currentTimeMillis();
    publishEvents(totalEventsToPublish, eventRingBuffer);

    disruptor.shutdown();
    executor.shutdown();
    final long endTime = System.currentTimeMillis();
    logger.info("It took " + (endTime - startTime) + "ms to process " + totalEventsToPublish + " messages.");
    for(RingEventHandlerConsumer consumer : ringConsumers) {
        logger.info("Processed " + consumer.getMessagesProcessed() + " messages.");
    }
}
项目:chaz-bct    文件:BeanConfiguration.java   
@Bean
public Disruptor<MarketEvent> disruptor() {
    return new Disruptor<>(() -> new MarketEvent(), 1024 * 1024, Executors.defaultThreadFactory(), ProducerType.MULTI, new YieldingWaitStrategy());
}
项目:disruptor-code-analysis    文件:CustomPerformanceTest.java   
public CustomPerformanceTest()
{
    ringBuffer =
        new CustomRingBuffer<SimpleEvent>(new SingleProducerSequencer(Constants.SIZE, new YieldingWaitStrategy()));
}
项目:disruptor-code-analysis    文件:SimplePerformanceTest.java   
public SimplePerformanceTest()
{
    ringBuffer = RingBuffer.createSingleProducer(EventHolder.FACTORY, Constants.SIZE, new YieldingWaitStrategy());
    eventHolderHandler = new EventHolderHandler(new SimpleEventHandler());
}
项目:camunda-bpm-reactor    文件:AgileWaitingStrategy.java   
public AgileWaitingStrategy() {
  this(new BlockingWaitStrategy(), new YieldingWaitStrategy());
}
项目:Ehcache_LMAXBulkLoader    文件:LaunchLoader.java   
private static void bulkLoad(
        final Cache cache, long entriesToLoad,double[] bbox)
{
    final AtomicLong atomicLong = new AtomicLong();
    final ValueHandler[] valueHandlers = new ValueHandler[]{
            new ValueHandler(cache, atomicLong),
            new ValueHandler(cache, atomicLong),
            new ValueHandler(cache, atomicLong),
            new ValueHandler(cache, atomicLong)
    };

    final ExecutorService executorService = Executors.newCachedThreadPool();
    try
    {
        final RingBuffer<ValueEvent> ringBuffer =
                RingBuffer.createSingleProducer(ValueEvent.EVENT_FACTORY,
                        4096,
                        new YieldingWaitStrategy());

        final WorkerPool<ValueEvent> workerPool =
                new WorkerPool<ValueEvent>(ringBuffer,
                        ringBuffer.newBarrier(),
                        new FatalExceptionHandler(),
                        valueHandlers);

        workerPool.start(executorService);
        try
        {
            publishLoadEvents(ringBuffer,entriesToLoad,bbox);
        }
        finally
        {
            workerPool.drainAndHalt();
        }
    }
    finally
    {
        executorService.shutdown();
    }
    System.out.format("Put %d elements.", atomicLong.get());
}
项目:cakes    文件:Main.java   
public static void main(String[] args) {
    ThreadFactory threadFactory = Executors.defaultThreadFactory();

    Disruptor<UserEvent> disruptor = new Disruptor<>(new UserEventFactory(), 1024, threadFactory, ProducerType.SINGLE, new YieldingWaitStrategy());

    disruptor.handleEventsWith(new UserEventConsumer());

    disruptor.start();

    new UserEventProducer(disruptor.getRingBuffer()).doProcess();

    disruptor.shutdown();
}