Java 类com.lmax.disruptor.PhasedBackoffWaitStrategy 实例源码

项目:hashsdn-controller    文件:DOMNotificationRouter.java   
public static DOMNotificationRouter create(final int queueDepth, final long spinTime, final long parkTime, final TimeUnit unit) {
    Preconditions.checkArgument(Long.lowestOneBit(queueDepth) == Long.highestOneBit(queueDepth),
            "Queue depth %s is not power-of-two", queueDepth);
    final ExecutorService executor = Executors.newCachedThreadPool();
    final WaitStrategy strategy = PhasedBackoffWaitStrategy.withLock(spinTime, parkTime, unit);

    return new DOMNotificationRouter(executor, queueDepth, strategy);
}
项目:FastCSVLoader    文件:DisruptorImplementation.java   
public void execute() throws InterruptedException {
        // Executor that will be used to construct new threads for consumers
        int nConsumers = Integer.getInteger("disruptor.consumers",16);
        checkArgument(((nConsumers != 0) && ((nConsumers & (~nConsumers + 1)) == nConsumers)),"Number of consumers must be a power of 2");
        Executor executor = Executors.newFixedThreadPool(nConsumers, new ThreadFactory() {
            private int i = 0;

            public Thread newThread(Runnable r) {
                log.info("Created thread {}", i);
                return new Thread(r, "DisThread-" + i++);

            }

        });

        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = Integer.getInteger("bufferSize",1024);
        checkArgument(((bufferSize != 0) && ((bufferSize & (~bufferSize + 1)) == bufferSize)),"Buffer size must be a power of 2");

        log.info("Using {} concurrent consumers with a buffer size of {}",nConsumers,bufferSize);
// Construct the Disruptor

        Disruptor disruptor = new Disruptor(new EventFactory<String[]>() {
            @Override
            public String[] newInstance() {

                return new String[trajectoryReader.numberOfFields];
            }
        }, bufferSize, executor,
                ProducerType.SINGLE, PhasedBackoffWaitStrategy.withLiteLock(
                500,
                5000,
                TimeUnit.MILLISECONDS
        ));
        RingBuffer<String[]> ringBuffer = disruptor.getRingBuffer();


        for (int i = 0; i < nConsumers; i++) {
            CqlFrameHandler cqlFrameHandler = new CqlFrameHandler(cqlFrameLoader, nConsumers, i);
            disruptor.handleEventsWith(cqlFrameHandler);
        }

        // Start the Disruptor, starts all threads running


        // Get the ring buffer from the Disruptor to be used for publishing.
        FrameEventProducer producer = new FrameEventProducer(ringBuffer, trajectoryReader);
        Thread t = new Thread(producer, "TrajReader");
        long time = System.currentTimeMillis();
        disruptor.start();
        t.start();

        t.join();
        log.info("Load completed in {} ms", System.currentTimeMillis() - time);
    }