@SuppressWarnings("unchecked") public static void main(String[] args) throws InterruptedException { //Executor that will be used to construct new threads for consumers Executor executor = Executors.newCachedThreadPool(); //Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; //Disruptor<ObjectEvent> disruptor = new Disruptor<>(ObjectEvent::new, bufferSize, executor); Disruptor<ObjectEvent> disruptor = new Disruptor<>(ObjectEvent::new, bufferSize, executor, ProducerType.SINGLE, new LiteBlockingWaitStrategy()); disruptor.handleEventsWith(App::handleEvent1); disruptor.handleEventsWith(App::handleEvent2); //disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event.getObject())); disruptor.start(); produceEvents(disruptor); }
/** * Creates a new MetricsMetaAPIImpl * @param properties The configuration properties */ public MetricsMetaAPIImpl(final Properties properties) { dataSource = SQLCompilerDataSource.getInstance(properties); sqlWorker = dataSource.getSQLWorker(); tagPredicateCache = new TagPredicateCache(sqlWorker); fjPool = new ManagedForkJoinPool(getClass().getSimpleName(), Runtime.getRuntime().availableProcessors(), true, JMXHelper.objectName(getClass())); metaReader = new DefaultMetaReader(sqlWorker); dispatcher = new WorkQueueDispatcher("MetricsMetaDispatcher", Runtime.getRuntime().availableProcessors(), 1024, this, ProducerType.MULTI, new LiteBlockingWaitStrategy()); log.info("Dispatcher Alive: {}", dispatcher.alive()); }
/** * 根据config初始化特殊通道 * * @param symbol 事件 * @param listeners 对应的监听器集合 */ private void initSpecDisruptor(String symbol, List<ElectronsListener> listeners) { ExecutorService specPool = Executors.newFixedThreadPool(conf.getSpecCircuitNum(), new ThreadFactory() { final AtomicInteger cursor = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "Electrons Thread (from spec channel) : thread" + cursor.incrementAndGet()); } }); pools.add(specPool); Disruptor<ElectronsHolder> disruptor = new Disruptor<>(ElectronsHolder::new, conf.getSpecCircuitLen(), specPool, ProducerType.MULTI, new LiteBlockingWaitStrategy()); disruptor.handleExceptionsWith(new ElecExceptionHandler("Spec Disruptor {" + symbol + "}")); //初始化管道并放入集合中 SpecChannel specChannel = new SpecChannel(disruptor); if (conf.isBreaker()) { EventCountCircuitBreaker breaker = new EventCountCircuitBreaker(conf.getErrorNum(), conf.getPerUnit(), conf.getUnit(), conf.getCloseThreshold(), conf.getRest(), conf.getRestUnit()); specChannel.setBreaker(breaker); } //构建listener顺序 ListenerChainBuilderNew.buildChain(specChannel, listeners); channelMap.put(SPEC_CHANNEL_PREFIX + symbol, specChannel); }
/** * 初始化正常管道,任何情况下都会有 * * @param pool 线程池 */ private void initNormalChannel(ExecutorService pool) { Disruptor<ElectronsHolder> normalDis = new Disruptor<>(ElectronsHolder::new, conf.getCircuitLen(), pool, ProducerType.MULTI, new LiteBlockingWaitStrategy()); WorkHandler[] workHandlers = new WorkHandler[conf.getCircuitNum()]; Arrays.fill(workHandlers, (WorkHandler<ElectronsHolder>) electronsHolder -> electronsHolder.handle()); normalDis.handleEventsWithWorkerPool(workHandlers); normalDis.handleExceptionsWith(new ElecExceptionHandler("Normal Disruptor")); //初始化channel Channel normalChannel = new NormalChannel(normalDis); //配置限流相关 normalChannel.confLimitRate(conf.isLimitRate(), conf.getPermitsPerSecond(), conf.isWarmup(), conf.getWarmupPeriod(), conf.getWarmPeriodUnit()); channelMap.put(NORMAL_CHANNEL_KEY, normalChannel); }
@Test public void test_All_WaitStrategies() { assertTrue(WaitStrategyType.BLOCKING.instance() instanceof BlockingWaitStrategy); assertTrue(WaitStrategyType.BUSY_SPIN.instance() instanceof BusySpinWaitStrategy); assertTrue(WaitStrategyType.LITE_BLOCKING.instance() instanceof LiteBlockingWaitStrategy); assertTrue(WaitStrategyType.SLEEPING_WAIT.instance() instanceof SleepingWaitStrategy); assertTrue(WaitStrategyType.YIELDING.instance() instanceof YieldingWaitStrategy); }
/** * Create a new RingBufferProcessor using {@link #SMALL_BUFFER_SIZE} backlog size, blockingWait Strategy * and auto-cancel. * <p> * A new Cached ThreadExecutorPool will be implicitely created. * * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferProcessor<E> create() { return create(RingBufferProcessor.class.getSimpleName(), SMALL_BUFFER_SIZE, new LiteBlockingWaitStrategy(), true); }
/** * Create a new RingBufferProcessor using {@link #SMALL_BUFFER_SIZE} backlog size, blockingWait Strategy * and the passed auto-cancel setting. * <p> * A new Cached ThreadExecutorPool will be implicitely created. * * @param autoCancel Should this propagate cancellation when unregistered by all subscribers ? * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferProcessor<E> create(boolean autoCancel) { return create(RingBufferProcessor.class.getSimpleName(), SMALL_BUFFER_SIZE, new LiteBlockingWaitStrategy(), autoCancel); }
/** * Create a new RingBufferProcessor using {@link #SMALL_BUFFER_SIZE} backlog size, blockingWait Strategy * and auto-cancel. * <p> * The passed {@link java.util.concurrent.ExecutorService} will execute as many event-loop * consuming the ringbuffer as subscribers. * * @param service A provided ExecutorService to manage threading infrastructure * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferProcessor<E> create(ExecutorService service) { return create(service, SMALL_BUFFER_SIZE, new LiteBlockingWaitStrategy(), true); }
/** * Create a new RingBufferProcessor using {@link #SMALL_BUFFER_SIZE} backlog size, blockingWait Strategy * and the passed auto-cancel setting. * <p> * The passed {@link java.util.concurrent.ExecutorService} will execute as many event-loop * consuming the ringbuffer as subscribers. * * @param service A provided ExecutorService to manage threading infrastructure * @param autoCancel Should this propagate cancellation when unregistered by all subscribers ? * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferProcessor<E> create(ExecutorService service, boolean autoCancel) { return create(service, SMALL_BUFFER_SIZE, new LiteBlockingWaitStrategy(), autoCancel); }
/** * Create a new RingBufferProcessor using {@link #SMALL_BUFFER_SIZE} backlog size, blockingWait Strategy * and the passed auto-cancel setting. * <p> * A new Cached ThreadExecutorPool will be implicitely created and will use the passed name to qualify * the created threads. * * @param name Use a new Cached ExecutorService and assign this name to the created threads * @param bufferSize A Backlog Size to mitigate slow subscribers * @param <E> Type of processed signals * @return */ public static <E> RingBufferProcessor<E> create(String name, int bufferSize) { return create(name, bufferSize, new LiteBlockingWaitStrategy(), true); }
/** * Create a new RingBufferProcessor using the blockingWait Strategy, passed backlog size, * and auto-cancel settings. * <p> * The passed {@link java.util.concurrent.ExecutorService} will execute as many event-loop * consuming the ringbuffer as subscribers. * * @param name Use a new Cached ExecutorService and assign this name to the created threads * @param bufferSize A Backlog Size to mitigate slow subscribers * @param autoCancel Should this propagate cancellation when unregistered by all subscribers ? * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferProcessor<E> create(String name, int bufferSize, boolean autoCancel) { return create(name, bufferSize, new LiteBlockingWaitStrategy(), autoCancel); }
/** * Create a new RingBufferProcessor using passed backlog size, blockingWait Strategy * and will auto-cancel. * <p> * The passed {@link java.util.concurrent.ExecutorService} will execute as many event-loop * consuming the ringbuffer as subscribers. * * @param service A provided ExecutorService to manage threading infrastructure * @param bufferSize A Backlog Size to mitigate slow subscribers * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferProcessor<E> create(ExecutorService service, int bufferSize) { return create(service, bufferSize, new LiteBlockingWaitStrategy(), true); }
/** * Create a new RingBufferProcessor using passed backlog size, blockingWait Strategy * and the auto-cancel argument. * <p> * The passed {@link java.util.concurrent.ExecutorService} will execute as many event-loop * consuming the ringbuffer as subscribers. * * @param service A provided ExecutorService to manage threading infrastructure * @param bufferSize A Backlog Size to mitigate slow subscribers * @param autoCancel Should this propagate cancellation when unregistered by all subscribers ? * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferProcessor<E> create(ExecutorService service, int bufferSize, boolean autoCancel) { return create(service, bufferSize, new LiteBlockingWaitStrategy(), autoCancel); }
/** * Create a new RingBufferProcessor using {@link #SMALL_BUFFER_SIZE} backlog size, blockingWait Strategy * and auto-cancel. * <p> * A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that * will fan-in data. * <p> * A new Cached ThreadExecutorPool will be implicitely created. * * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferProcessor<E> share() { return share(RingBufferProcessor.class.getSimpleName(), SMALL_BUFFER_SIZE, new LiteBlockingWaitStrategy(), true); }
/** * Create a new RingBufferProcessor using {@link #SMALL_BUFFER_SIZE} backlog size, blockingWait Strategy * and the passed auto-cancel setting. * <p> * A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that * will fan-in data. * <p> * A new Cached ThreadExecutorPool will be implicitely created. * * @param autoCancel Should this propagate cancellation when unregistered by all subscribers ? * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferProcessor<E> share(boolean autoCancel) { return share(RingBufferProcessor.class.getSimpleName(), SMALL_BUFFER_SIZE, new LiteBlockingWaitStrategy(), autoCancel); }
/** * Create a new RingBufferProcessor using {@link #SMALL_BUFFER_SIZE} backlog size, blockingWait Strategy * and auto-cancel. * <p> * A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that * will fan-in data. * <p> * The passed {@link java.util.concurrent.ExecutorService} will execute as many event-loop * consuming the ringbuffer as subscribers. * * @param service A provided ExecutorService to manage threading infrastructure * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferProcessor<E> share(ExecutorService service) { return share(service, SMALL_BUFFER_SIZE, new LiteBlockingWaitStrategy(), true); }
/** * Create a new RingBufferProcessor using {@link #SMALL_BUFFER_SIZE} backlog size, blockingWait Strategy * and the passed auto-cancel setting. * <p> * A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that * will fan-in data. * <p> * The passed {@link java.util.concurrent.ExecutorService} will execute as many event-loop * consuming the ringbuffer as subscribers. * * @param service A provided ExecutorService to manage threading infrastructure * @param autoCancel Should this propagate cancellation when unregistered by all subscribers ? * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferProcessor<E> share(ExecutorService service, boolean autoCancel) { return share(service, SMALL_BUFFER_SIZE, new LiteBlockingWaitStrategy(), autoCancel); }
/** * Create a new RingBufferProcessor using {@link #SMALL_BUFFER_SIZE} backlog size, blockingWait Strategy * and the passed auto-cancel setting. * <p> * A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that * will fan-in data. * <p> * A new Cached ThreadExecutorPool will be implicitely created and will use the passed name to qualify * the created threads. * * @param name Use a new Cached ExecutorService and assign this name to the created threads * @param bufferSize A Backlog Size to mitigate slow subscribers * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferProcessor<E> share(String name, int bufferSize) { return share(name, bufferSize, new LiteBlockingWaitStrategy(), true); }
/** * Create a new RingBufferProcessor using the blockingWait Strategy, passed backlog size, * and auto-cancel settings. * <p> * A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that * will fan-in data. * <p> * The passed {@link java.util.concurrent.ExecutorService} will execute as many event-loop * consuming the ringbuffer as subscribers. * * @param name Use a new Cached ExecutorService and assign this name to the created threads * @param bufferSize A Backlog Size to mitigate slow subscribers * @param autoCancel Should this propagate cancellation when unregistered by all subscribers ? * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferProcessor<E> share(String name, int bufferSize, boolean autoCancel) { return share(name, bufferSize, new LiteBlockingWaitStrategy(), autoCancel); }
/** * Create a new RingBufferProcessor using passed backlog size, blockingWait Strategy * and will auto-cancel. * <p> * A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that * will fan-in data. * <p> * The passed {@link java.util.concurrent.ExecutorService} will execute as many event-loop * consuming the ringbuffer as subscribers. * * @param service A provided ExecutorService to manage threading infrastructure * @param bufferSize A Backlog Size to mitigate slow subscribers * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferProcessor<E> share(ExecutorService service, int bufferSize) { return share(service, bufferSize, new LiteBlockingWaitStrategy(), true); }
/** * Create a new RingBufferProcessor using passed backlog size, blockingWait Strategy * and the auto-cancel argument. * <p> * A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that * will fan-in data. * <p> * The passed {@link java.util.concurrent.ExecutorService} will execute as many event-loop * consuming the ringbuffer as subscribers. * * @param service A provided ExecutorService to manage threading infrastructure * @param bufferSize A Backlog Size to mitigate slow subscribers * @param autoCancel Should this propagate cancellation when unregistered by all subscribers ? * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferProcessor<E> share(ExecutorService service, int bufferSize, boolean autoCancel) { return share(service, bufferSize, new LiteBlockingWaitStrategy(), autoCancel); }
/** * Create a new RingBufferWorkProcessor using {@link #SMALL_BUFFER_SIZE} backlog size, blockingWait Strategy * and auto-cancel. * <p> * A new Cached ThreadExecutorPool will be implicitely created. * * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferWorkProcessor<E> create() { return create(RingBufferWorkProcessor.class.getSimpleName(), SMALL_BUFFER_SIZE, new LiteBlockingWaitStrategy(), true); }
/** * Create a new RingBufferWorkProcessor using {@link #SMALL_BUFFER_SIZE} backlog size, blockingWait Strategy * and the passed auto-cancel setting. * <p> * A new Cached ThreadExecutorPool will be implicitely created. * * @param autoCancel Should this propagate cancellation when unregistered by all subscribers ? * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferWorkProcessor<E> create(boolean autoCancel) { return create(RingBufferWorkProcessor.class.getSimpleName(), SMALL_BUFFER_SIZE, new LiteBlockingWaitStrategy(), autoCancel); }
/** * Create a new RingBufferWorkProcessor using {@link #SMALL_BUFFER_SIZE} backlog size, blockingWait Strategy * and auto-cancel. * The passed {@link java.util.concurrent.ExecutorService} will execute as many event-loop * consuming the ringbuffer as subscribers. * * @param service A provided ExecutorService to manage threading infrastructure * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferWorkProcessor<E> create(ExecutorService service) { return create(service, SMALL_BUFFER_SIZE, new LiteBlockingWaitStrategy(), true); }
/** * Create a new RingBufferWorkProcessor using {@link #SMALL_BUFFER_SIZE} backlog size, blockingWait Strategy * and the passed auto-cancel setting. * <p> * The passed {@link java.util.concurrent.ExecutorService} will execute as many event-loop * consuming the ringbuffer as subscribers. * * @param service A provided ExecutorService to manage threading infrastructure * @param autoCancel Should this propagate cancellation when unregistered by all subscribers ? * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferWorkProcessor<E> create(ExecutorService service, boolean autoCancel) { return create(service, SMALL_BUFFER_SIZE, new LiteBlockingWaitStrategy(), autoCancel); }
/** * Create a new RingBufferProcessor using the passed buffer size, blockingWait Strategy * and auto-cancel. * <p> * A new Cached ThreadExecutorPool will be implicitely created and will use the passed name to qualify * the created threads. * * @param name Use a new Cached ExecutorService and assign this name to the created threads * @param bufferSize A Backlog Size to mitigate slow subscribers * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferWorkProcessor<E> create(String name, int bufferSize) { return create(name, bufferSize, new LiteBlockingWaitStrategy(), true); }
/** * Create a new RingBufferProcessor using the passed buffer size, blockingWait Strategy * and the passed auto-cancel setting. * <p> * A new Cached ThreadExecutorPool will be implicitely created and will use the passed name to qualify * the created threads. * * @param name Use a new Cached ExecutorService and assign this name to the created threads * @param bufferSize A Backlog Size to mitigate slow subscribers * @param autoCancel Should this propagate cancellation when unregistered by all subscribers ? * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferWorkProcessor<E> create(String name, int bufferSize, boolean autoCancel) { return create(name, bufferSize, new LiteBlockingWaitStrategy(), autoCancel); }
/** * Create a new RingBufferProcessor using the passed buffer size, blockingWait Strategy * and auto-cancel. * <p> * The passed {@link java.util.concurrent.ExecutorService} will execute as many event-loop * consuming the ringbuffer as subscribers. * * @param service A provided ExecutorService to manage threading infrastructure * @param bufferSize A Backlog Size to mitigate slow subscribers * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferWorkProcessor<E> create(ExecutorService service, int bufferSize) { return create(service, bufferSize, new LiteBlockingWaitStrategy(), true); }
/** * Create a new RingBufferWorkProcessor using the passed buffer size, blockingWait Strategy * and auto-cancel. * <p> * The passed {@link java.util.concurrent.ExecutorService} will execute as many event-loop * consuming the ringbuffer as subscribers. * * @param service A provided ExecutorService to manage threading infrastructure * @param bufferSize A Backlog Size to mitigate slow subscribers * @param autoCancel Should this propagate cancellation when unregistered by all subscribers ? * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferWorkProcessor<E> create(ExecutorService service, int bufferSize, boolean autoCancel) { return create(service, bufferSize, new LiteBlockingWaitStrategy(), autoCancel); }
/** * Create a new RingBufferWorkProcessor using {@link #SMALL_BUFFER_SIZE} backlog size, blockingWait Strategy * and auto-cancel. * <p> * A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that * will fan-in data. * <p> * A new Cached ThreadExecutorPool will be implicitely created. * * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferWorkProcessor<E> share() { return share(RingBufferWorkProcessor.class.getSimpleName(), SMALL_BUFFER_SIZE, new LiteBlockingWaitStrategy(), true); }
/** * Create a new RingBufferWorkProcessor using {@link #SMALL_BUFFER_SIZE} backlog size, blockingWait Strategy * and the passed auto-cancel setting. * <p> * A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that * will fan-in data. * <p> * A new Cached ThreadExecutorPool will be implicitely created. * * @param autoCancel Should this propagate cancellation when unregistered by all subscribers ? * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferWorkProcessor<E> share(boolean autoCancel) { return share(RingBufferWorkProcessor.class.getSimpleName(), SMALL_BUFFER_SIZE, new LiteBlockingWaitStrategy(), autoCancel); }
/** * Create a new RingBufferWorkProcessor using {@link #SMALL_BUFFER_SIZE} backlog size, blockingWait Strategy * and auto-cancel. * The passed {@link java.util.concurrent.ExecutorService} will execute as many event-loop * consuming the ringbuffer as subscribers. * * @param service A provided ExecutorService to manage threading infrastructure * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferWorkProcessor<E> share(ExecutorService service) { return share(service, SMALL_BUFFER_SIZE, new LiteBlockingWaitStrategy(), true); }
/** * Create a new RingBufferWorkProcessor using {@link #SMALL_BUFFER_SIZE} backlog size, blockingWait Strategy * and the passed auto-cancel setting. * <p> * A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that * will fan-in data. * <p> * The passed {@link java.util.concurrent.ExecutorService} will execute as many event-loop * consuming the ringbuffer as subscribers. * * @param service A provided ExecutorService to manage threading infrastructure * @param autoCancel Should this propagate cancellation when unregistered by all subscribers ? * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferWorkProcessor<E> share(ExecutorService service, boolean autoCancel) { return share(service, SMALL_BUFFER_SIZE, new LiteBlockingWaitStrategy(), autoCancel); }
/** * Create a new RingBufferProcessor using the passed buffer size, blockingWait Strategy * and auto-cancel. * <p> * A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that * will fan-in data. * <p> * A new Cached ThreadExecutorPool will be implicitely created and will use the passed name to qualify * the created threads. * * @param name Use a new Cached ExecutorService and assign this name to the created threads * @param bufferSize A Backlog Size to mitigate slow subscribers * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferWorkProcessor<E> share(String name, int bufferSize) { return share(name, bufferSize, new LiteBlockingWaitStrategy(), true); }
/** * Create a new RingBufferProcessor using the passed buffer size, blockingWait Strategy * and the passed auto-cancel setting. * <p> * A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that * will fan-in data. * <p> * A new Cached ThreadExecutorPool will be implicitely created and will use the passed name to qualify * the created threads. * * @param name Use a new Cached ExecutorService and assign this name to the created threads * @param bufferSize A Backlog Size to mitigate slow subscribers * @param autoCancel Should this propagate cancellation when unregistered by all subscribers ? * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferWorkProcessor<E> share(String name, int bufferSize, boolean autoCancel) { return share(name, bufferSize, new LiteBlockingWaitStrategy(), autoCancel); }
/** * Create a new RingBufferProcessor using the passed buffer size, blockingWait Strategy * and auto-cancel. * <p> * A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that * will fan-in data. * <p> * The passed {@link java.util.concurrent.ExecutorService} will execute as many event-loop * consuming the ringbuffer as subscribers. * * @param service A provided ExecutorService to manage threading infrastructure * @param bufferSize A Backlog Size to mitigate slow subscribers * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferWorkProcessor<E> share(ExecutorService service, int bufferSize) { return share(service, bufferSize, new LiteBlockingWaitStrategy(), true); }
/** * Create a new RingBufferWorkProcessor using the passed buffer size, blockingWait Strategy * and auto-cancel. * <p> * A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that * will fan-in data. * <p> * The passed {@link java.util.concurrent.ExecutorService} will execute as many event-loop * consuming the ringbuffer as subscribers. * * @param service A provided ExecutorService to manage threading infrastructure * @param bufferSize A Backlog Size to mitigate slow subscribers * @param autoCancel Should this propagate cancellation when unregistered by all subscribers ? * @param <E> Type of processed signals * @return a fresh processor */ public static <E> RingBufferWorkProcessor<E> share(ExecutorService service, int bufferSize, boolean autoCancel) { return share(service, bufferSize, new LiteBlockingWaitStrategy(), autoCancel); }