public static DOMNotificationRouter create(final int queueDepth, final long spinTime, final long parkTime, final TimeUnit unit) { Preconditions.checkArgument(Long.lowestOneBit(queueDepth) == Long.highestOneBit(queueDepth), "Queue depth %s is not power-of-two", queueDepth); final ExecutorService executor = Executors.newCachedThreadPool(); final WaitStrategy strategy = PhasedBackoffWaitStrategy.withLock(spinTime, parkTime, unit); return new DOMNotificationRouter(executor, queueDepth, strategy); }
public void execute() throws InterruptedException { // Executor that will be used to construct new threads for consumers int nConsumers = Integer.getInteger("disruptor.consumers",16); checkArgument(((nConsumers != 0) && ((nConsumers & (~nConsumers + 1)) == nConsumers)),"Number of consumers must be a power of 2"); Executor executor = Executors.newFixedThreadPool(nConsumers, new ThreadFactory() { private int i = 0; public Thread newThread(Runnable r) { log.info("Created thread {}", i); return new Thread(r, "DisThread-" + i++); } }); // Specify the size of the ring buffer, must be power of 2. int bufferSize = Integer.getInteger("bufferSize",1024); checkArgument(((bufferSize != 0) && ((bufferSize & (~bufferSize + 1)) == bufferSize)),"Buffer size must be a power of 2"); log.info("Using {} concurrent consumers with a buffer size of {}",nConsumers,bufferSize); // Construct the Disruptor Disruptor disruptor = new Disruptor(new EventFactory<String[]>() { @Override public String[] newInstance() { return new String[trajectoryReader.numberOfFields]; } }, bufferSize, executor, ProducerType.SINGLE, PhasedBackoffWaitStrategy.withLiteLock( 500, 5000, TimeUnit.MILLISECONDS )); RingBuffer<String[]> ringBuffer = disruptor.getRingBuffer(); for (int i = 0; i < nConsumers; i++) { CqlFrameHandler cqlFrameHandler = new CqlFrameHandler(cqlFrameLoader, nConsumers, i); disruptor.handleEventsWith(cqlFrameHandler); } // Start the Disruptor, starts all threads running // Get the ring buffer from the Disruptor to be used for publishing. FrameEventProducer producer = new FrameEventProducer(ringBuffer, trajectoryReader); Thread t = new Thread(producer, "TrajReader"); long time = System.currentTimeMillis(); disruptor.start(); t.start(); t.join(); log.info("Load completed in {} ms", System.currentTimeMillis() - time); }