@Override protected void configure() { switch (config.getWaitStrategyEnum()) { // A low-cpu usage Disruptor configuration for using in local/test environments case LOW_CPU: bind(WaitStrategy.class).annotatedWith(Names.named("PersistenceStrategy")).to(BlockingWaitStrategy.class); bind(WaitStrategy.class).annotatedWith(Names.named("ReplyStrategy")).to(BlockingWaitStrategy.class); bind(WaitStrategy.class).annotatedWith(Names.named("RetryStrategy")).to(BlockingWaitStrategy.class); break; // The default high-cpu usage Disruptor configuration for getting high throughput on production environments case HIGH_THROUGHPUT: default: bind(WaitStrategy.class).annotatedWith(Names.named("PersistenceStrategy")).to(BusySpinWaitStrategy.class); bind(WaitStrategy.class).annotatedWith(Names.named("ReplyStrategy")).to(BusySpinWaitStrategy.class); bind(WaitStrategy.class).annotatedWith(Names.named("RetryStrategy")).to(YieldingWaitStrategy.class); break; } bind(RequestProcessor.class).to(RequestProcessorImpl.class).in(Singleton.class); bind(PersistenceProcessor.class).to(PersistenceProcessorImpl.class).in(Singleton.class); bind(ReplyProcessor.class).to(ReplyProcessorImpl.class).in(Singleton.class); bind(RetryProcessor.class).to(RetryProcessorImpl.class).in(Singleton.class); }
private void testPersistenceWithHALeaseManagerPreservingLease(TSOServerConfig tsoConfig) throws Exception { // Init a HA lease manager LeaseManager simulatedHALeaseManager = mock(LeaseManager.class); ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool()); PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool); // Component under test PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool, panicker, handlers, metrics); // Test: Configure the lease manager to return true always doReturn(true).when(simulatedHALeaseManager).stillInLeasePeriod(); proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); proc.triggerCurrentBatchFlush(); verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod(); verify(batchPool, times(2)).borrowObject(); }
private void testPersistenceWithHALeaseManagerFailingToPreserveLease1(TSOServerConfig tsoConfig) throws Exception { // Init a HA lease manager LeaseManager simulatedHALeaseManager = mock(LeaseManager.class); ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool()); PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool); // Component under test PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool, panicker, handlers, metrics); // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod(); proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); proc.triggerCurrentBatchFlush(); verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod(); verify(batchPool, times(2)).borrowObject(); }
private void testPersistenceWithHALeaseManagerFailingToPreserveLease2(TSOServerConfig tsoConfig) throws Exception { // Init a HA lease manager LeaseManager simulatedHALeaseManager = mock(LeaseManager.class); ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool()); PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool); // Component under test PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool, panicker, handlers, metrics); // Test: Configure the lease manager to return false for stillInLeasePeriod doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod(); proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); proc.triggerCurrentBatchFlush(); verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod(); verify(batchPool, times(2)).borrowObject(); }
private void testPersistenceWithHALeaseManagerFailingToPreserveLease3(TSOServerConfig tsoConfig) throws Exception { // Init a HA lease manager LeaseManager simulatedHALeaseManager = mock(LeaseManager.class); ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool()); PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool); // Component under test PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool, panicker, handlers, metrics); // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod and raise // an exception when flush // Configure mock writer to flush unsuccessfully doThrow(new IOException("Unable to write")).when(mockWriter).flush(); doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod(); proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); proc.triggerCurrentBatchFlush(); verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod(); verify(batchPool, times(2)).borrowObject(); }
private PersistenceProcessorHandler[] configureHandlers(TSOServerConfig tsoConfig, LeaseManager leaseManager, ObjectPool<Batch> batchPool) throws Exception { PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()]; for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) { handlers[i] = new PersistenceProcessorHandler(metrics, "localhost:1234", leaseManager, commitTable, new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool), retryProcessor, new RuntimeExceptionPanicker()); } return handlers; }
protected Disruptor<BaseEvent> createDisruptor(Class<?> eventClass, List<RegistedEventHandler> registedEventHandlers) { WaitStrategy waitStrategy = new BlockingWaitStrategy(); // load the customized event bufferSize. int bufferSize = EventUtils.getEventBufferSize(eventClass); Disruptor<BaseEvent> disruptor =new Disruptor<BaseEvent>(new BaseEventFactory(), bufferSize, Executors.newCachedThreadPool(), ProducerType.SINGLE, waitStrategy); List<BaseEventHandler> baseEventHandlers = Lists.newArrayList(); EventType eventType = EventUtils.getEventType(eventClass); if(EventType.ORDER.equals(eventType)) { List<RegistedEventHandler> orderingHandlers = orderingRegistedEventHandlers(registedEventHandlers); baseEventHandlers.add(new BaseEventHandler(EventType.ORDER, orderingHandlers.toArray(new RegistedEventHandler[0]))); } else if(EventType.CONCURRENCY.equals(eventType)) { for(RegistedEventHandler registedEventHandler : registedEventHandlers) { baseEventHandlers.add(new BaseEventHandler(EventType.CONCURRENCY, registedEventHandler)); } } else { throw new RuntimeException("The definition of event type is not correct."); } disruptor.handleEventsWith(baseEventHandlers.toArray(new BaseEventHandler[0])); disruptor.start(); return disruptor; }
private static WaitStrategy createWaitStrategy() { final String strategy = System.getProperty("AsyncLogger.WaitStrategy"); LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy); if ("Sleep".equals(strategy)) { LOGGER.debug("disruptor event handler uses SleepingWaitStrategy"); return new SleepingWaitStrategy(); } else if ("Yield".equals(strategy)) { LOGGER.debug("disruptor event handler uses YieldingWaitStrategy"); return new YieldingWaitStrategy(); } else if ("Block".equals(strategy)) { LOGGER.debug("disruptor event handler uses BlockingWaitStrategy"); return new BlockingWaitStrategy(); } LOGGER.debug("disruptor event handler uses SleepingWaitStrategy"); return new SleepingWaitStrategy(); }
private IConnection initNettyServer(int port) { ConcurrentHashMap<Integer, DisruptorQueue> deserializeQueues = new ConcurrentHashMap<Integer, DisruptorQueue>(); //ConcurrentHashMap<Integer, DisruptorQueue> deserializeCtrlQueues = new ConcurrentHashMap<Integer, DisruptorQueue>(); WaitStrategy wait = (WaitStrategy)Utils.newInstance("com.lmax.disruptor.TimeoutBlockingWaitStrategy", 5, TimeUnit.MILLISECONDS); DisruptorQueue recvControlQueue = DisruptorQueue.mkInstance("Dispatch-control", ProducerType.MULTI, 256, wait, false, 0, 0); Set<Integer> taskSet = new HashSet<Integer>(); taskSet.add(1); IConnection server = context.bind(null, port, deserializeQueues, recvControlQueue, true, taskSet); WaitStrategy waitStrategy = new BlockingWaitStrategy(); DisruptorQueue recvQueue = DisruptorQueue.mkInstance("NettyUnitTest", ProducerType.SINGLE, 1024, waitStrategy, false, 0, 0); server.registerQueue(task, recvQueue); return server; }
@Override protected Disruptor<ConcurrentEvent> initialValue() { Disruptor<ConcurrentEvent> disruptor = new Disruptor<>( ConcurrentEventFactory.DEFAULT, DEFAULT_RING_BUFFER_SIZE, CACHED_THREAD_POOL, ProducerType.SINGLE, new BlockingWaitStrategy()); disruptor.handleEventsWith(new ConcurrentHandler()); // disruptor.handleExceptionsWith(); disruptor.start(); return disruptor; }
@Test(timeOut = 30_000) public void testLowWatermarkIsPersisted() throws Exception { TSOServerConfig tsoConfig = new TSOServerConfig(); PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()]; for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) { handlers[i] = new PersistenceProcessorHandler(metrics, "localhost:1234", mock(LeaseManager.class), commitTable, mock(ReplyProcessor.class), retryProcessor, panicker); } // Component under test PersistenceProcessorImpl persistenceProcessor = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, mock(ObjectPool.class), panicker, handlers, metrics); persistenceProcessor.persistLowWatermark(ANY_LWM).get(); ArgumentCaptor<Long> lwmCapture = ArgumentCaptor.forClass(Long.class); CommitTable.Writer lwmWriter = commitTable.getWriter(); verify(lwmWriter, timeout(100).times(1)).updateLowWatermark(lwmCapture.capture()); assertEquals(lwmCapture.getValue().longValue(), ANY_LWM); }
@Test(timeOut = 30_000) public void testRuntimeExceptionOnCommitPersistenceTakesDownDaemon() throws Exception { TSOServerConfig config = new TSOServerConfig(); ObjectPool<Batch> batchPool = new BatchPoolModule(config).getBatchPool(); ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool); PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()]; for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) { handlers[i] = new PersistenceProcessorHandler(metrics, "localhost:1234", mock(LeaseManager.class), commitTable, replyProcessor, retryProcessor, panicker); } PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, new BlockingWaitStrategy(), commitTable, batchPool, panicker, handlers, metrics); // Configure writer to explode with a runtime exception doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong()); MonitoringContext monCtx = new MonitoringContext(metrics); // Check the panic is extended! proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx); proc.triggerCurrentBatchFlush(); verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class)); }
@Test(timeOut = 10_000) public void testBadFormedPackageThrowsException() throws Exception { // We need an instance throwing exceptions for this test replyProcessor = spy(new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, new RuntimeExceptionPanicker(), batchPool)); // Prepare test batch Batch batch = batchPool.borrowObject(); batch.addCommitRetry(FIRST_ST, null, monCtx); ReplyBatchEvent e = ReplyBatchEvent.EVENT_FACTORY.newInstance(); ReplyBatchEvent.makeReplyBatch(e, batch, 0); assertEquals(replyProcessor.nextIDToHandle.get(), 0); assertEquals(replyProcessor.futureEvents.size(), 0); assertEquals(batchPool.getNumActive(), 1); assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1); try { replyProcessor.onEvent(e, ANY_DISRUPTOR_SEQUENCE, false); fail(); } catch (RuntimeException re) { // Expected } assertEquals(replyProcessor.nextIDToHandle.get(), 0); assertEquals(replyProcessor.futureEvents.size(), 0); assertEquals(batchPool.getNumActive(), 1); assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1); }
/** * Initialize mission board context. */ public void init() { this.board = new ConcurrentHashMap<>(); // HikariCP HikariConfig hikariConfig = new HikariConfig(config.getHikariCPConfigPath()); this.dataSource = new HikariDataSource(hikariConfig); // Disruptor int rbSize = (config.getRingBufferSize() % 2 == 0 && config.getRingBufferSize() > 0) ? config.getRingBufferSize() : DEF_BUFFER_SIZE; this.disruptor = new Disruptor<>(DEFAULT_FACTORY, rbSize, DEFAULT_POOL, MULTI, new BlockingWaitStrategy()); this.disruptor.handleEventsWith(new LogRecordTaskHandler()); // this.disruptor.handleEventsWithWorkerPool(); // TODO: 用于后期EventHandler和WorkPool的WorkHandler对比测试 // this.disruptor.handleExceptionsWith(exceptionHandler); this.disruptor.start(); // Struct parser this.parser = new Dom4JParser(config.getLogPath()); this.tasks = new SimpleTaskService(); // schedule publish task this.future = this.tasks.scheduleAtFixedRate(() -> { try { publishAll(); } catch (Exception e) { LOG.error("Error publishAll().", e); } }, 1000L, config.getTaskInterval(), TimeUnit.MILLISECONDS); // add shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { stop(); })); }
@SuppressWarnings("unchecked") public WorkQueueDispatcher(String name, int poolSize, int backlog, final Consumer<Throwable> uncaughtExceptionHandler) { this(name, poolSize, backlog, uncaughtExceptionHandler, ProducerType.MULTI, new BlockingWaitStrategy()); }
@Override public void afterPropertiesSet() throws Exception { ReportableEventFactory factory = new ReportableEventFactory(); int bufferSize = environment.getProperty("reporters.system.buffersize", int.class, 4096); disruptor = new Disruptor<>(factory, bufferSize, new ThreadFactory() { private int counter = 0; private static final String THREAD_PREFIX = "reporter-disruptor"; @Override public Thread newThread(Runnable r) { return new Thread(r, THREAD_PREFIX + '-' + counter++); } }, ProducerType.MULTI, new BlockingWaitStrategy()); }
@Test public void test_All_WaitStrategies() { assertTrue(WaitStrategyType.BLOCKING.instance() instanceof BlockingWaitStrategy); assertTrue(WaitStrategyType.BUSY_SPIN.instance() instanceof BusySpinWaitStrategy); assertTrue(WaitStrategyType.LITE_BLOCKING.instance() instanceof LiteBlockingWaitStrategy); assertTrue(WaitStrategyType.SLEEPING_WAIT.instance() instanceof SleepingWaitStrategy); assertTrue(WaitStrategyType.YIELDING.instance() instanceof YieldingWaitStrategy); }
/** * Create an edit log at the given <code>dir</code> location. You should never have to load an * existing log. If there is a log at startup, it should have already been processed and deleted * by the time the WAL object is started up. * @param fs filesystem handle * @param rootDir path to where logs and oldlogs * @param logDir dir where wals are stored * @param archiveDir dir where wals are archived * @param conf configuration to use * @param listeners Listeners on WAL events. Listeners passed here will be registered before we do * anything else; e.g. the Constructor {@link #rollWriter()}. * @param failIfWALExists If true IOException will be thrown if files related to this wal already * exist. * @param prefix should always be hostname and port in distributed env and it will be URL encoded * before being used. If prefix is null, "wal" will be used * @param suffix will be url encoded. null is treated as empty. non-empty must start with * {@link org.apache.hadoop.hbase.wal.AbstractFSWALProvider#WAL_FILE_NAME_DELIMITER} */ public FSHLog(final FileSystem fs, final Path rootDir, final String logDir, final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix, final String suffix) throws IOException { super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); this.minTolerableReplication = conf.getInt("hbase.regionserver.hlog.tolerable.lowreplication", FSUtils.getDefaultReplication(fs, this.walDir)); this.lowReplicationRollLimit = conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", 5); this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 2); // rollWriter sets this.hdfs_out if it can. rollWriter(); // This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is // put on the ring buffer. String hostingThreadName = Thread.currentThread().getName(); // Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense // spinning as other strategies do. this.disruptor = new Disruptor<>(RingBufferTruck::new, getPreallocatedEventCount(), Threads.getNamedThreadFactory(hostingThreadName + ".append"), ProducerType.MULTI, new BlockingWaitStrategy()); // Advance the ring buffer sequence so that it starts from 1 instead of 0, // because SyncFuture.NOT_DONE = 0. this.disruptor.getRingBuffer().next(); int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200); this.ringBufferEventHandler = new RingBufferEventHandler( conf.getInt("hbase.regionserver.hlog.syncer.count", 5), maxHandlersCount); this.disruptor.setDefaultExceptionHandler(new RingBufferExceptionHandler()); this.disruptor.handleEventsWith(new RingBufferEventHandler[] { this.ringBufferEventHandler }); // Starting up threads in constructor is a no no; Interface should have an init call. this.disruptor.start(); }
@Override public boolean initialize(EventsChannelConfig config) { super.initialize(config); log.info("Initialize disruptor events channel " + config.getName() + " with " + config); EventFactory<GridEvent> eventFactory = new DisruptorEventFactory(); int ringBufferSize = config.getBlockQueueMaxNumber(); int threadSize = config.getEventConsumerNumber(); int bufferSize = ringBufferSize; if (Integer.bitCount(bufferSize) != 1) { bufferSize = (int) Math.pow(2, (int) (Math.log(ringBufferSize) / Math.log(2))); log.warn("Change disruptor events channel " + config.getName() + " buffer size from " + ringBufferSize + " to " + bufferSize); } if (bufferSize <= 0) throw new GridException("Invalid disruptor ringbuffur size:" + ringBufferSize); threadPool = Executors.newFixedThreadPool(threadSize); ringBuffer = RingBuffer.createMultiProducer(eventFactory, bufferSize, new BlockingWaitStrategy()); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); ExecutorService executor = Executors.newFixedThreadPool(10); @SuppressWarnings("unchecked") WorkHandler<GridEvent>[] workHandlers = new WorkHandler[threadSize]; for (int i = 0; i < threadSize; i++) { WorkHandler<GridEvent> workHandler = new DisruptorEventsWorkHandler(getName()); workHandlers[i] = workHandler; } workerPool = new WorkerPool<GridEvent>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), workHandlers); workerPool.start(executor); return true; }
public static WaitStrategy createFromType(String name) { if ("BusySpin".equalsIgnoreCase(name)) { return new BusySpinWaitStrategy(); } else if ("Blocking".equalsIgnoreCase(name)) { return new BlockingWaitStrategy(); } else if ("Yielding".equalsIgnoreCase(name)) { return new YieldingWaitStrategy(); } else if ("Sleeping".equalsIgnoreCase(name)) { return new SleepingWaitStrategy(); } else { throw new IllegalArgumentException("Invalid or unsupported wait strategy type '" + name + "'"); } }
RingBufferProvider(@Nonnegative final int capacity, @Nonnull final Factory<T> factory) { this.waitOut = new WakeupWaitStrategy(); this.waitIn = new BlockingWaitStrategy(); final EventFactory<T> evfactory = wrapFactory(factory); this.out = RingBuffer.createMultiProducer(evfactory, capacity, waitOut); this.in = RingBuffer.createSingleProducer(evfactory, capacity, waitIn); this.attachOut = new Object[capacity]; this.attachIn = new Object[capacity]; this.appOut = new RingBufferProducer<>(out, attachOut); this.chnIn = new RingBufferConsumer<>(out, attachOut); this.chnOut = new RingBufferProducer<>(in, attachIn); this.appIn = new RingBufferConsumer<>(in, attachIn); this.internalConsumer = true; }
public DisruptorEventExecutor() { // ringBuffer = RingBuffer.createMultiProducer(WaitingEvent.EVENT_FACTORY, 128, PhasedBackoffWaitStrategy.withLock(1, 1, TimeUnit.MILLISECONDS)); ringBuffer = RingBuffer.createMultiProducer(WaitingEvent.EVENT_FACTORY, 8192, new BlockingWaitStrategy()); BatchEventProcessor<WaitingEvent> processor = new BatchEventProcessor<WaitingEvent>(ringBuffer, ringBuffer.newBarrier(), new DisruptorHandler()); ringBuffer.addGatingSequences(processor.getSequence()); executeThread = new Thread(processor); executeThread.setName("disruptEventExecutor"); executeThread.start(); }
@Setup public void setup() { if (opWorkRatio < 0) throw new IllegalStateException(); String[] opSleepArgs = opSleep.split("/"); opSleepChance = Float.parseFloat(opSleepArgs[0]); opSleepNanos = Long.parseLong(opSleepArgs[1]) * 1000L; opWorkTokens = (int) Math.ceil(opWork * opWorkRatio * (1d / executorChainLength)); String[] taskArgs = tasks.split(":"); int concurrentRequests = (int) (threads * Double.parseDouble(taskArgs[0])); int maxTasksQueued = (int) (threads * Double.parseDouble(taskArgs[1])); final InjectorPlus injector = new InjectorPlus(""); exec = new ExecutorPlus[executorChainLength]; workGate = new Semaphore(concurrentRequests, false); for (int i = 0 ; i < exec.length ; i++) { switch (ExecutorType.valueOf(type)) { case INJECTOR: exec[i] = injector.newExecutor(threads, maxTasksQueued); break; case JDK: exec[i] = new BlockingThreadPoolExecutor(threads, maxTasksQueued); break; case FJP: exec[i] = new BlockingForkJoinPool(threads, maxTasksQueued); break; case DISRUPTOR_SPIN: exec[i] = new DisruptorExecutor(threads, maxTasksQueued, new BusySpinWaitStrategy()); break; case DISRUPTOR_BLOCK: exec[i] = new DisruptorExecutor(threads, maxTasksQueued, new BlockingWaitStrategy()); break; } } }
/** * Set up the disruptor service to have a single consumer which aggregates the data. */ @PostConstruct public void start() { threadFactory = Executors.defaultThreadFactory(); disruptor = new Disruptor<>(Registration.FACTORY, ringSize, threadFactory, ProducerType.MULTI, new BlockingWaitStrategy()); disruptor.handleEventsWith(new ContainerEventHandler()); ringBuffer = disruptor.start(); }
private static WaitStrategy createWaitStrategy() { final String strategy = System .getProperty("AsyncLoggerConfig.WaitStrategy"); LOGGER.debug("property AsyncLoggerConfig.WaitStrategy={}", strategy); if ("Sleep".equals(strategy)) { return new SleepingWaitStrategy(); } else if ("Yield".equals(strategy)) { return new YieldingWaitStrategy(); } else if ("Block".equals(strategy)) { return new BlockingWaitStrategy(); } return new SleepingWaitStrategy(); }
private static WaitStrategy createWaitStrategy() { return new BlockingWaitStrategy(); }
@Test(timeOut = 30_000) public void testCommitPersistenceWithSingleCommitTableWriter() throws Exception { final int NUM_CT_WRITERS = 1; final int BATCH_SIZE_PER_CT_WRITER = 2; // Init a non-HA lease manager VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class), mock(TSOStateManager.class))); TSOServerConfig tsoConfig = new TSOServerConfig(); tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER); tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS); ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool()); ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool); PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()]; for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) { handlers[i] = new PersistenceProcessorHandler(metrics, "localhost:1234", leaseManager, commitTable, replyProcessor, retryProcessor, panicker); } // Component under test PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool, panicker, handlers, metrics); verify(batchPool, times(1)).borrowObject(); // Called during initialization proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Flush: batch full proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Flush: batch full verify(batchPool, times(1 + BATCH_SIZE_PER_CT_WRITER)).borrowObject(); // 3: 1 in init + 2 when flushing }
@Test(timeOut = 30_000) public void testCommitPersistenceWithMultipleCommitTableWriters() throws Exception { final int NUM_CT_WRITERS = 2; final int BATCH_SIZE_PER_CT_WRITER = 2; // Init a non-HA lease manager VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class), mock(TSOStateManager.class))); TSOServerConfig tsoConfig = new TSOServerConfig(); tsoConfig.setBatchSizePerCTWriter(BATCH_SIZE_PER_CT_WRITER); tsoConfig.setNumConcurrentCTWriters(NUM_CT_WRITERS); ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool()); ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool); PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()]; for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) { handlers[i] = new PersistenceProcessorHandler(metrics, "localhost:1234", leaseManager, commitTable, replyProcessor, retryProcessor, panicker); } // Component under test PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool, panicker, handlers, metrics); verify(batchPool, times(1)).borrowObject(); // Called during initialization // Fill 1st handler Batches completely proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 1st batch full verify(batchPool, times(2)).borrowObject(); proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full verify(batchPool, times(3)).borrowObject(); // Test empty flush does not trigger response in getting a new currentBatch proc.triggerCurrentBatchFlush(); verify(batchPool, times(3)).borrowObject(); // Fill 2nd handler Batches completely proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 1st batch full proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full verify(batchPool, times(1 + (NUM_CT_WRITERS * BATCH_SIZE_PER_CT_WRITER))).borrowObject(); // Start filling a new currentBatch and flush it immediately proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Batch not full verify(batchPool, times(5)).borrowObject(); proc.triggerCurrentBatchFlush(); // Flushing should provoke invocation of a new batch verify(batchPool, times(6)).borrowObject(); // Test empty flush does not trigger response proc.triggerCurrentBatchFlush(); proc.triggerCurrentBatchFlush(); proc.triggerCurrentBatchFlush(); proc.triggerCurrentBatchFlush(); proc.triggerCurrentBatchFlush(); verify(batchPool, times(6)).borrowObject(); }
@Test(timeOut = 30_000) public void testCommitPersistenceWithNonHALeaseManager() throws Exception { final int NUM_CT_WRITERS = 1; final int BATCH_SIZE_PER_CT_WRITER = 1; TSOServerConfig tsoConfig = new TSOServerConfig(); tsoConfig.setBatchSizePerCTWriter(NUM_CT_WRITERS); tsoConfig.setNumConcurrentCTWriters(BATCH_SIZE_PER_CT_WRITER); tsoConfig.setBatchPersistTimeoutInMs(100); ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool()); ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool); // Init a non-HA lease manager VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class), mock(TSOStateManager.class))); PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()]; for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) { handlers[i] = new PersistenceProcessorHandler(metrics, "localhost:1234", leaseManager, commitTable, replyProcessor, retryProcessor, panicker); } // Component under test PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool, panicker, handlers, metrics); // The non-ha lease manager always return true for // stillInLeasePeriod(), so verify the currentBatch sends replies as master proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); proc.triggerCurrentBatchFlush(); verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod(); verify(batchPool, times(2)).borrowObject(); }
@BeforeMethod(alwaysRun = true, timeOut = 30_000) public void initMocksAndComponents() throws Exception { MockitoAnnotations.initMocks(this); TSOServerConfig tsoConfig = new TSOServerConfig(); tsoConfig.setNumConcurrentCTWriters(BATCH_POOL_SIZE); // Configure null metrics provider metrics = new NullMetricsProvider(); batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool()); replyProcessor = spy(new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool)); }
@Test(timeOut = 10_000) public void testRuntimeExceptionTakesDownDaemon() throws Exception { Panicker panicker = spy(new MockPanicker()); final CommitTable.Writer mockWriter = mock(CommitTable.Writer.class); doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong()); final CommitTable.Client mockClient = mock(CommitTable.Client.class); CommitTable commitTable = new CommitTable() { @Override public Writer getWriter() { return mockWriter; } @Override public Client getClient() { return mockClient; } }; TSOServerConfig config = new TSOServerConfig(); ObjectPool<Batch> batchPool = new BatchPoolModule(config).getBatchPool(); PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()]; for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) { handlers[i] = new PersistenceProcessorHandler(metrics, "localhost:1234", mock(LeaseManager.class), commitTable, mock(ReplyProcessor.class), mock(RetryProcessor.class), panicker); } PersistenceProcessor proc = new PersistenceProcessorImpl(config, new BlockingWaitStrategy(), commitTable, batchPool, panicker, handlers, metrics); proc.addCommitToBatch(1, 2, null, new MonitoringContext(metrics)); new RequestProcessorImpl(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class)); verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class)); }
private void createDisruptor(final Executor executor) { disruptor = new Disruptor<TestEvent>( TestEvent.EVENT_FACTORY, 4, executor, ProducerType.SINGLE, new BlockingWaitStrategy()); }
public AgileWaitingStrategy() { this(new BlockingWaitStrategy(), new YieldingWaitStrategy()); }
public BufferedFixInitiator(String host, int port, FixVersion fixVersion, SessionID sessionID, int queueSize, FixInitiatorSettings settings) { super(host, port, fixVersion, sessionID, settings); ring = new ByteRing (queueSize, new BlockingWaitStrategy()); //TODO: Use BusySpinWaitStrategy? executor = Executors.newCachedThreadPool(); }
@SuppressWarnings("unchecked") public static void main(String[] args) throws AlertException, InterruptedException, TimeoutException { // Executor that will be used to construct new threads for consumers ExecutorService executor = Executors.newCachedThreadPool(); try { // The factory for the event EventFactory<Message> factory = new MessageFactory(); // Specify the size of the ring buffer, must be power of 2 int bufferSize = 1024; // Construct the Disruptor with a SingleProducerSequencer Disruptor<Message> disruptor = new Disruptor<Message>( factory, bufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy()); try { // Connect the handlers disruptor .handleEventsWith(new MessageConsumer1()) .then(new MessageConsumer2()); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing RingBuffer<Message> ringBuffer = disruptor.getRingBuffer(); MessageProducer producer = new MessageProducer(ringBuffer); for (int i = 0; i < 1000; i++) { producer.onData("content_" + i); } } finally { disruptor.shutdown(); } } finally { executor.shutdown(); } }