private RingBufferProcessor(String name, ExecutorService executor, int bufferSize, WaitStrategy waitStrategy, boolean shared, boolean autoCancel) { super(name, executor, autoCancel); this.ringBuffer = RingBuffer.create( shared ? ProducerType.MULTI : ProducerType.SINGLE, new EventFactory<MutableSignal<E>>() { @Override public MutableSignal<E> newInstance() { return new MutableSignal<E>(); } }, bufferSize, waitStrategy ); this.recentSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); this.barrier = ringBuffer.newBarrier(); //ringBuffer.addGatingSequences(recentSequence); }
/** * Construct a RingBuffer with the full option set. * * @param eventFactory to newInstance entries for filling the RingBuffer * @param sequencer sequencer to handle the ordering of events moving through the RingBuffer. * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2 */ public RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; this.entries = new Object[sequencer.getBufferSize()]; fill(eventFactory); }
public OffHeapRingBuffer(Sequencer sequencer, int entrySize) { this.sequencer = sequencer; this.entrySize = entrySize; this.mask = sequencer.getBufferSize() - 1; buffer = ByteBuffer.allocateDirect(sequencer.getBufferSize() * entrySize); }
public CustomRingBuffer(Sequencer sequencer) { this.sequencer = sequencer; buffer = new Object[sequencer.getBufferSize()]; mask = sequencer.getBufferSize() - 1; }
public LongRingBuffer(final Sequencer sequencer) { this.sequencer = sequencer; this.buffer = new long[sequencer.getBufferSize()]; this.mask = sequencer.getBufferSize() - 1; }
public MyRunnable(Sequencer sequencer) { this.barrier = sequencer.newBarrier(); }
@Override @SuppressWarnings("unchecked") public void request(long n) { if (n <= 0l) { subscriber.onError(SpecificationExceptions.spec_3_09_exception(n)); return; } if (!eventProcessor.isRunning()) { return; } if (pendingRequest.addAndGet(n) < 0) { pendingRequest.set(Long.MAX_VALUE); } //buffered data in producer unpublished final long currentSequence = eventProcessor.nextSequence; final long cursor = ringBuffer.getCursor(); //if the current subscriber sequence behind ringBuffer cursor, count the distance from the next slot to the end final long buffered = currentSequence < cursor ? cursor - (currentSequence == Sequencer.INITIAL_CURSOR_VALUE ? currentSequence + 1l : currentSequence) : 0l; final long toRequest; if (buffered > 0l) { toRequest = (n - buffered) < 0l ? 0 : n - buffered; } else { toRequest = n; } if (toRequest > 0l) { Subscription parent = upstreamSubscription; if (parent != null) { parent.request(toRequest); } } }
public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException { super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); this.eventLoopGroup = eventLoopGroup; this.channelClass = channelClass; Supplier<Boolean> hasConsumerTask; if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) { this.consumeExecutor = eventLoopGroup.next(); if (consumeExecutor instanceof SingleThreadEventExecutor) { try { Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue"); field.setAccessible(true); Queue<?> queue = (Queue<?>) field.get(consumeExecutor); hasConsumerTask = () -> queue.peek() == consumer; } catch (Exception e) { LOG.warn("Can not get task queue of " + consumeExecutor + ", this is not necessary, just give up", e); hasConsumerTask = () -> false; } } else { hasConsumerTask = () -> false; } } else { ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setNameFormat("AsyncFSWAL-%d").setDaemon(true).build()); hasConsumerTask = () -> threadPool.getQueue().peek() == consumer; this.consumeExecutor = threadPool; } this.hasConsumerTask = hasConsumerTask; int preallocatedEventCount = conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); waitingConsumePayloads = RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount); waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence); // inrease the ringbuffer sequence so our txid is start from 1 waitingConsumePayloads.publish(waitingConsumePayloads.next()); waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor()); batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS, DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS); rollWriter(); }