/** * Wait for the {@link RingBuffer} to drain of published events then halt the workers. */ public void drainAndHalt() { Sequence[] workerSequences = getWorkerSequences(); while (ringBuffer.getCursor() > Util.getMinimumSequence(workerSequences)) { Thread.yield(); } for (WorkProcessor<?> processor : workProcessors) { processor.halt(); } started.set(false); }
AuditReporter(int queueSize, long timeBucketIntervalInSec, int reportFreqMsgCount, int reportFreqIntervalSec, boolean combineMetricsAmongHosts) { reportExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat(getType() + "-audit-reporter-%d") .build());; queueSize = Util.ceilingNextPowerOfTwo(queueSize); disruptor = new Disruptor<AuditMsgReportTask>(new AuditMsgReportTaskFactory(), queueSize, reportExecutor); disruptor.handleEventsWith(new AuditMsgReportTaskHandler(this)); ringBuffer = disruptor.getRingBuffer(); aggregator = new AuditAggregator(timeBucketIntervalInSec, reportFreqMsgCount, reportFreqIntervalSec, combineMetricsAmongHosts); SUBMITTED_COUNTER = Metrics.getRegistry().meter(getType() + ".auditReporter.submittedNumber"); FAILED_TO_SUBMIT_COUNTER = Metrics.getRegistry().meter(getType() + ".auditReporter.failedToSubmitNumber"); REPORTED_COUNTER = Metrics.getRegistry().meter(getType() + ".auditReporter.reportedNumber"); FAILED_TO_REPORT_COUNTER = Metrics.getRegistry().meter(getType() + ".auditReporter.failedToReportNumber"); Metrics.getRegistry().register(getType() + ".auditReporter.queueSize", new Gauge<Integer>() { @Override public Integer getValue() { return (int) disruptor.getRingBuffer().remainingCapacity(); } }); }
/** * @see Sequencer#hasAvailableCapacity(int) */ @Override public boolean hasAvailableCapacity(final int requiredCapacity) { long nextValue = pad.nextValue; long wrapPoint = (nextValue + requiredCapacity) - bufferSize; long cachedGatingSequence = pad.cachedValue; if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { long minSequence = Util.getMinimumSequence(gatingSequences, nextValue); pad.cachedValue = minSequence; if (wrapPoint > minSequence) { return false; } } return true; }
private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue) { long wrapPoint = (cursorValue + requiredCapacity) - bufferSize; long cachedGatingSequence = gatingSequenceCache.get(); if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue) { long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue); gatingSequenceCache.set(minSequence); if (wrapPoint > minSequence) { return false; } } return true; }
/** * Wait for the {@link RingBuffer} to drain of published events then halt * the workers. */ public void drainAndHalt() { Sequence[] workerSequences = getWorkerSequences(); while (ringBuffer.getCursor() > Util.getMinimumSequence(workerSequences)) { Thread.yield(); } for (WorkProcessor<?> processor : workProcessors) { processor.halt(); } started.set(false); }
private static int calculateRingBufferSize() { int ringBufferSize = RINGBUFFER_DEFAULT_SIZE; final String userPreferredRBSize = System.getProperty( "AsyncLogger.RingBufferSize", String.valueOf(ringBufferSize)); try { int size = Integer.parseInt(userPreferredRBSize); if (size < RINGBUFFER_MIN_SIZE) { size = RINGBUFFER_MIN_SIZE; LOGGER.warn( "Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize, RINGBUFFER_MIN_SIZE); } ringBufferSize = size; } catch (final Exception ex) { LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize); } return Util.ceilingNextPowerOfTwo(ringBufferSize); }
private static int calculateRingBufferSize() { int ringBufferSize = RINGBUFFER_DEFAULT_SIZE; final String userPreferredRBSize = System.getProperty( "AsyncLoggerConfig.RingBufferSize", String.valueOf(ringBufferSize)); try { int size = Integer.parseInt(userPreferredRBSize); if (size < RINGBUFFER_MIN_SIZE) { size = RINGBUFFER_MIN_SIZE; LOGGER.warn( "Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize, RINGBUFFER_MIN_SIZE); } ringBufferSize = size; } catch (final Exception ex) { LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize); } return Util.ceilingNextPowerOfTwo(ringBufferSize); }
/** * 判断RingBuffer是否还有可用的空间能够容纳requiredCapacity个Event. */ @Override public boolean hasAvailableCapacity(final int requiredCapacity) { long nextValue = this.nextValue; // 生产者下一个可使用的位置序号 // 从nextValue位置开始,如果再申请requiredCapacity个位置,将要达到的位置,因为是环形数组,所以减去bufferSize // 下面会用该值和消费者的位置序号比较. long wrapPoint = (nextValue + requiredCapacity) - bufferSize; // 消费者上一次消费的位置, 消费者每次消费之后会更新该值. long cachedGatingSequence = this.cachedValue; // 先看看这个条件的对立条件: wrapPoint <= cachedGatingSequence && cachedGatingSequence <= nextValue // 表示当前生产者走在消费者的前面, 并且就算再申请requiredCapacity个位置达到的位置也不会覆盖消费者上一次消费的位置(就更不用关心 // 当前消费者消费的位置了,因为消费者消费的位置是一直增大的),这种情况一定能够分配requiredCapacity个空间. if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { // gatingSequences保存的是消费者的当前消费位置, 因为可能有多个消费者, 所以此处获取序号最小的位置. long minSequence = Util.getMinimumSequence(gatingSequences, nextValue); // 顺便更新消费者上一次消费的位置... this.cachedValue = minSequence; // 如果申请之后的位置会覆盖消费者的位置,则不能分配空间,返回false if (wrapPoint > minSequence) { return false; } // 否则返回true. } return true; }
/** * 申请n个可用空间, 返回该位置的序号, 如果当前没有可用空间, 则一直阻塞直到有可用空间位置. */ @Override public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long nextValue = this.nextValue; long nextSequence = nextValue + n; long wrapPoint = nextSequence - bufferSize; long cachedGatingSequence = this.cachedValue; // 这里的判断逻辑和上面的hasAvailableCapacity函数一致, 不多说了. if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { long minSequence; // 如果一直没有可用空间, 当前线程挂起, 不断循环检测,直到有可用空间. while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? } // 顺便更新一下消费者消费的位置序号. this.cachedValue = minSequence; } this.nextValue = nextSequence; // 返回最后一个可用位置的序号. return nextSequence; }
/** * 返回当前RingBuffer的可用位置数目. */ @Override public long remainingCapacity() { long nextValue = this.nextValue; // (多个)消费者消费的最小位置 long consumed = Util.getMinimumSequence(gatingSequences, nextValue); // 生产者的位置 long produced = nextValue; // 空余的可用的位置数目. return getBufferSize() - (produced - consumed); }
/** * Construct a Sequencer with the selected wait strategy and buffer size. * * @param bufferSize the size of the buffer that this will sequence over. * @param waitStrategy for those waiting on sequences. */ public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); availableBuffer = new int[bufferSize]; indexMask = bufferSize - 1; indexShift = Util.log2(bufferSize); initialiseAvailableBuffer(); }
/** * @see Sequencer#remainingCapacity() */ @Override public long remainingCapacity() { long consumed = Util.getMinimumSequence(gatingSequences, cursor.get()); long produced = cursor.get(); return getBufferSize() - (produced - consumed); }
@Test public void shouldWaitForWorkCompleteWhereAllWorkersAreBlockedOnRingBuffer() throws Exception { long expectedNumberMessages = 10; fillRingBuffer(expectedNumberMessages); final StubEventProcessor[] workers = new StubEventProcessor[3]; for (int i = 0, size = workers.length; i < size; i++) { workers[i] = new StubEventProcessor(); workers[i].setSequence(expectedNumberMessages - 1); } final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(Util.getSequencesFor(workers)); Runnable runnable = new Runnable() { public void run() { long sequence = ringBuffer.next(); StubEvent event = ringBuffer.get(sequence); event.setValue((int) sequence); ringBuffer.publish(sequence); for (StubEventProcessor stubWorker : workers) { stubWorker.setSequence(sequence); } } }; new Thread(runnable).start(); long expectedWorkSequence = expectedNumberMessages; long completedWorkSequence = sequenceBarrier.waitFor(expectedNumberMessages); assertTrue(completedWorkSequence >= expectedWorkSequence); }
@Test public void shouldWaitForWorkCompleteWhereCompleteWorkThresholdIsBehind() throws Exception { long expectedNumberMessages = 10; fillRingBuffer(expectedNumberMessages); final StubEventProcessor[] eventProcessors = new StubEventProcessor[3]; for (int i = 0, size = eventProcessors.length; i < size; i++) { eventProcessors[i] = new StubEventProcessor(); eventProcessors[i].setSequence(expectedNumberMessages - 2); } final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(Util.getSequencesFor(eventProcessors)); Runnable runnable = new Runnable() { public void run() { for (StubEventProcessor stubWorker : eventProcessors) { stubWorker.setSequence(stubWorker.getSequence().get() + 1L); } } }; Thread thread = new Thread(runnable); thread.start(); thread.join(); long expectedWorkSequence = expectedNumberMessages - 1; long completedWorkSequence = sequenceBarrier.waitFor(expectedWorkSequence); assertTrue(completedWorkSequence >= expectedWorkSequence); }
/** * @see Sequencer#next(int) */ @Override public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long nextValue = pad.nextValue; long nextSequence = nextValue + n; long wrapPoint = nextSequence - bufferSize; long cachedGatingSequence = pad.cachedValue; if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { long minSequence; while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { if (AbstractSequencerExt.isWaitSleep()) { try { Thread.sleep(1); } catch (InterruptedException e) { } }else { LockSupport.parkNanos(1); } } pad.cachedValue = minSequence; } pad.nextValue = nextSequence; return nextSequence; }
/** * @see Sequencer#remainingCapacity() */ @Override public long remainingCapacity() { long nextValue = pad.nextValue; long consumed = Util.getMinimumSequence(gatingSequences, nextValue); long produced = nextValue; return getBufferSize() - (produced - consumed); }
/** * Wait for the {@link RingBuffer} to drain of published events then halt the workers. */ public void drainAndHalt() { Sequence[] workerSequences = getWorkerSequences(); while (ringBuffer.getCursor() > Util.getMinimumSequence(workerSequences)) { Thread.yield(); } for (MessageProcessor processor : messageProcessors) { processor.halt(); } started.set(false); }
/** * @see Sequencer#next(int) */ @Override public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long nextValue = pad.nextValue; long nextSequence = nextValue + n; long wrapPoint = nextSequence - bufferSize; long cachedGatingSequence = pad.cachedValue; if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { long minSequence; while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { if (AbstractSequencerExt.isWaitSleep()) { try { Thread.sleep(1); } catch (InterruptedException e) { } } else { LockSupport.parkNanos(1); } } pad.cachedValue = minSequence; } pad.nextValue = nextSequence; return nextSequence; }