private void initDisruptor(int processors, int ringBufferSize) { LOG.info("eds client init disruptor with processors=" + processors + " and ringBufferSize=" + ringBufferSize); executor = Executors.newFixedThreadPool( processors, new ThreadFactoryBuilder().setNameFormat("disruptor-executor-%d").build()); final WaitStrategy waitStrategy = createWaitStrategy(); ringBufferSize = sizeFor(ringBufferSize); // power of 2 disruptor = new Disruptor<>(EdsRingBufferEvent.FACTORY, ringBufferSize, executor, ProducerType.MULTI, waitStrategy); EdsEventWorkHandler[] handlers = new EdsEventWorkHandler[processors]; for (int i = 0; i < handlers.length; i++) { handlers[i] = new EdsEventWorkHandler(); } // handlers number = threads number disruptor.handleEventsWithWorkerPool(handlers); // "handleEventsWith" just like topics , with multiple consumers disruptor.start(); }
@Override protected void configure() { switch (config.getWaitStrategyEnum()) { // A low-cpu usage Disruptor configuration for using in local/test environments case LOW_CPU: bind(WaitStrategy.class).annotatedWith(Names.named("PersistenceStrategy")).to(BlockingWaitStrategy.class); bind(WaitStrategy.class).annotatedWith(Names.named("ReplyStrategy")).to(BlockingWaitStrategy.class); bind(WaitStrategy.class).annotatedWith(Names.named("RetryStrategy")).to(BlockingWaitStrategy.class); break; // The default high-cpu usage Disruptor configuration for getting high throughput on production environments case HIGH_THROUGHPUT: default: bind(WaitStrategy.class).annotatedWith(Names.named("PersistenceStrategy")).to(BusySpinWaitStrategy.class); bind(WaitStrategy.class).annotatedWith(Names.named("ReplyStrategy")).to(BusySpinWaitStrategy.class); bind(WaitStrategy.class).annotatedWith(Names.named("RetryStrategy")).to(YieldingWaitStrategy.class); break; } bind(RequestProcessor.class).to(RequestProcessorImpl.class).in(Singleton.class); bind(PersistenceProcessor.class).to(PersistenceProcessorImpl.class).in(Singleton.class); bind(ReplyProcessor.class).to(ReplyProcessorImpl.class).in(Singleton.class); bind(RetryProcessor.class).to(RetryProcessorImpl.class).in(Singleton.class); }
private RingBufferProcessor(String name, ExecutorService executor, int bufferSize, WaitStrategy waitStrategy, boolean shared, boolean autoCancel) { super(name, executor, autoCancel); this.ringBuffer = RingBuffer.create( shared ? ProducerType.MULTI : ProducerType.SINGLE, new EventFactory<MutableSignal<E>>() { @Override public MutableSignal<E> newInstance() { return new MutableSignal<E>(); } }, bufferSize, waitStrategy ); this.recentSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); this.barrier = ringBuffer.newBarrier(); //ringBuffer.addGatingSequences(recentSequence); }
private RingBufferWorkProcessor(String name, ExecutorService executor, int bufferSize, WaitStrategy waitStrategy, boolean share, boolean autoCancel) { super(name, executor, autoCancel); this.ringBuffer = RingBuffer.create( share ? ProducerType.MULTI : ProducerType.SINGLE, new EventFactory<MutableSignal<E>>() { @Override public MutableSignal<E> newInstance() { return new MutableSignal<E>(); } }, bufferSize, waitStrategy ); ringBuffer.addGatingSequences(workSequence); }
public DisruptorQueueImpl(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) { this._queueName = PREFIX + queueName; _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait); _consumer = new Sequence(); _barrier = _buffer.newBarrier(); _buffer.addGatingSequences(_consumer); if (producerType == ProducerType.SINGLE) { consumerStartedFlag = true; } else { // make sure we flush the pending messages in cache first if (bufferSize < 2) { throw new RuntimeException("QueueSize must >= 2"); } try { publishDirect(FLUSH_CACHE, true); } catch (InsufficientCapacityException e) { throw new RuntimeException("This code should be unreachable!", e); } } }
/** * Create a new Ring Buffer with the specified producer type (SINGLE or MULTI) * * @param producerType producer type to use {@link ProducerType}. * @param factory used to create events within the ring buffer. * @param bufferSize number of elements to create within the ring buffer. * @param waitStrategy used to determine how to wait for new elements to become available. * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2 */ public static <E> RingBuffer<E> create(ProducerType producerType, EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { switch (producerType) { case SINGLE: return createSingleProducer(factory, bufferSize, waitStrategy); case MULTI: return createMultiProducer(factory, bufferSize, waitStrategy); default: throw new IllegalStateException(producerType.toString()); } }
protected Disruptor<BaseEvent> createDisruptor(Class<?> eventClass, List<RegistedEventHandler> registedEventHandlers) { WaitStrategy waitStrategy = new BlockingWaitStrategy(); // load the customized event bufferSize. int bufferSize = EventUtils.getEventBufferSize(eventClass); Disruptor<BaseEvent> disruptor =new Disruptor<BaseEvent>(new BaseEventFactory(), bufferSize, Executors.newCachedThreadPool(), ProducerType.SINGLE, waitStrategy); List<BaseEventHandler> baseEventHandlers = Lists.newArrayList(); EventType eventType = EventUtils.getEventType(eventClass); if(EventType.ORDER.equals(eventType)) { List<RegistedEventHandler> orderingHandlers = orderingRegistedEventHandlers(registedEventHandlers); baseEventHandlers.add(new BaseEventHandler(EventType.ORDER, orderingHandlers.toArray(new RegistedEventHandler[0]))); } else if(EventType.CONCURRENCY.equals(eventType)) { for(RegistedEventHandler registedEventHandler : registedEventHandlers) { baseEventHandlers.add(new BaseEventHandler(EventType.CONCURRENCY, registedEventHandler)); } } else { throw new RuntimeException("The definition of event type is not correct."); } disruptor.handleEventsWith(baseEventHandlers.toArray(new BaseEventHandler[0])); disruptor.start(); return disruptor; }
private static WaitStrategy createWaitStrategy() { final String strategy = System.getProperty("AsyncLogger.WaitStrategy"); LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy); if ("Sleep".equals(strategy)) { LOGGER.debug("disruptor event handler uses SleepingWaitStrategy"); return new SleepingWaitStrategy(); } else if ("Yield".equals(strategy)) { LOGGER.debug("disruptor event handler uses YieldingWaitStrategy"); return new YieldingWaitStrategy(); } else if ("Block".equals(strategy)) { LOGGER.debug("disruptor event handler uses BlockingWaitStrategy"); return new BlockingWaitStrategy(); } LOGGER.debug("disruptor event handler uses SleepingWaitStrategy"); return new SleepingWaitStrategy(); }
private static synchronized void initDisruptor() { if (disruptor != null) { LOGGER.trace("AsyncLoggerConfigHelper not starting new disruptor, using existing object. Ref count is {}.", count); return; } LOGGER.trace("AsyncLoggerConfigHelper creating new disruptor. Ref count is {}.", count); final int ringBufferSize = calculateRingBufferSize(); final WaitStrategy waitStrategy = createWaitStrategy(); executor = Executors.newSingleThreadExecutor(threadFactory); disruptor = new Disruptor<Log4jEventWrapper>(FACTORY, ringBufferSize, executor, ProducerType.MULTI, waitStrategy); final EventHandler<Log4jEventWrapper>[] handlers = new Log4jEventWrapperHandler[] {// new Log4jEventWrapperHandler() }; final ExceptionHandler errorHandler = getExceptionHandler(); disruptor.handleExceptionsWith(errorHandler); disruptor.handleEventsWith(handlers); LOGGER.debug( "Starting AsyncLoggerConfig disruptor with ringbuffer size={}, waitStrategy={}, exceptionHandler={}...", disruptor.getRingBuffer().getBufferSize(), waitStrategy.getClass().getSimpleName(), errorHandler); disruptor.start(); }
public DisruptorQueueImpl(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait, boolean isBatch, int batchSize, long flushMs) { _queueName = PREFIX + queueName; _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait); _consumer = new Sequence(); _barrier = _buffer.newBarrier(); _buffer.addGatingSequences(_consumer); _isBatch = isBatch; _cache = new ArrayList<>(); _inputBatchSize = batchSize; if (_isBatch) { _batcher = new ThreadLocalBatch(); _flusher = new DisruptorFlusher(Math.max(flushMs, 1)); _flusher.start(); } else { _batcher = null; } }
private IConnection initNettyServer(int port) { ConcurrentHashMap<Integer, DisruptorQueue> deserializeQueues = new ConcurrentHashMap<Integer, DisruptorQueue>(); //ConcurrentHashMap<Integer, DisruptorQueue> deserializeCtrlQueues = new ConcurrentHashMap<Integer, DisruptorQueue>(); WaitStrategy wait = (WaitStrategy)Utils.newInstance("com.lmax.disruptor.TimeoutBlockingWaitStrategy", 5, TimeUnit.MILLISECONDS); DisruptorQueue recvControlQueue = DisruptorQueue.mkInstance("Dispatch-control", ProducerType.MULTI, 256, wait, false, 0, 0); Set<Integer> taskSet = new HashSet<Integer>(); taskSet.add(1); IConnection server = context.bind(null, port, deserializeQueues, recvControlQueue, true, taskSet); WaitStrategy waitStrategy = new BlockingWaitStrategy(); DisruptorQueue recvQueue = DisruptorQueue.mkInstance("NettyUnitTest", ProducerType.SINGLE, 1024, waitStrategy, false, 0, 0); server.registerQueue(task, recvQueue); return server; }
@SuppressWarnings("unchecked") private DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) { this.executor = Preconditions.checkNotNull(executor); disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, queueDepth, executor, ProducerType.MULTI, strategy); disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS); disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE); disruptor.start(); }
public static DOMNotificationRouter create(final int queueDepth, final long spinTime, final long parkTime, final TimeUnit unit) { Preconditions.checkArgument(Long.lowestOneBit(queueDepth) == Long.highestOneBit(queueDepth), "Queue depth %s is not power-of-two", queueDepth); final ExecutorService executor = Executors.newCachedThreadPool(); final WaitStrategy strategy = PhasedBackoffWaitStrategy.withLock(spinTime, parkTime, unit); return new DOMNotificationRouter(executor, queueDepth, strategy); }
@Inject ReplyProcessorImpl(@Named("ReplyStrategy") WaitStrategy strategy, MetricsRegistry metrics, Panicker panicker, ObjectPool<Batch> batchPool) { // ------------------------------------------------------------------------------------------------------------ // Disruptor initialization // ------------------------------------------------------------------------------------------------------------ ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("reply-%d"); this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory.build()); this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, MULTI, strategy); disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); disruptor.handleEventsWith(this); this.replyRing = disruptor.start(); // ------------------------------------------------------------------------------------------------------------ // Attribute initialization // ------------------------------------------------------------------------------------------------------------ this.batchPool = batchPool; this.nextIDToHandle.set(0); this.futureEvents = new PriorityQueue<>(10, new Comparator<ReplyBatchEvent>() { public int compare(ReplyBatchEvent replyBatchEvent1, ReplyBatchEvent replyBatchEvent2) { return Long.compare(replyBatchEvent1.getBatchSequence(), replyBatchEvent2.getBatchSequence()); } }); // Metrics config this.abortMeter = metrics.meter(name("tso", "aborts")); this.commitMeter = metrics.meter(name("tso", "commits")); this.timestampMeter = metrics.meter(name("tso", "timestampAllocation")); this.fenceMeter = metrics.meter(name("tso", "fences")); LOG.info("ReplyProcessor initialized"); }
@Inject RetryProcessorImpl(@Named("RetryStrategy") WaitStrategy strategy, MetricsRegistry metrics, CommitTable commitTable, ReplyProcessor replyProc, Panicker panicker, ObjectPool<Batch> batchPool) throws InterruptedException, ExecutionException, IOException { // ------------------------------------------------------------------------------------------------------------ // Disruptor initialization // ------------------------------------------------------------------------------------------------------------ ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("retry-%d").build(); this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory); this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, SINGLE, strategy); disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This must be before handleEventsWith() disruptor.handleEventsWith(this); this.retryRing = disruptor.start(); // ------------------------------------------------------------------------------------------------------------ // Attribute initialization // ------------------------------------------------------------------------------------------------------------ this.commitTableClient = commitTable.getClient(); this.replyProc = replyProc; this.batchPool = batchPool; // Metrics configuration this.txAlreadyCommittedMeter = metrics.meter(name("tso", "retries", "commits", "tx-already-committed")); this.invalidTxMeter = metrics.meter(name("tso", "retries", "aborts", "tx-invalid")); this.noCTFoundMeter = metrics.meter(name("tso", "retries", "aborts", "tx-without-commit-timestamp")); LOG.info("RetryProcessor initialized"); }
/** * Create a new Disruptor. * * @param eventFactory the factory to create events in the ring buffer. * @param ringBufferSize the size of the ring buffer, must be power of 2. * @param threadFactory a {@link ThreadFactory} to create threads for processors. * @param producerType the claim strategy to use for the ring buffer. * @param waitStrategy the wait strategy to use for the ring buffer. */ public Disruptor( final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory, final ProducerType producerType, final WaitStrategy waitStrategy) { this(RingBuffer.create( producerType, eventFactory, ringBufferSize, waitStrategy), new BasicExecutor(threadFactory)); }
@Override public OptionalSteps withWaitStrategy(WaitStrategy waitStrategy) { if (waitStrategy != null) { this.waitStrategy = waitStrategy; } else { log.warn("Tried to configure the waiting strategy with a null value"); } return this; }
@Test public void testCreateWaitStrategyInstance() throws Exception { for (final DisruptorWaitStrategy strategy : DisruptorWaitStrategy.values()) { final WaitStrategy waitStrategyInstance = strategy.createWaitStrategyInstance(); assertNotNull(waitStrategyInstance); assertTrue(waitStrategyInstance instanceof WaitStrategy); } }
public RoundRobinSupplier(int poolsize, String name, int bufferSize, Consumer<Throwable> errorHandler, ProducerType producerType, WaitStrategy waitStrategy) { this.poolsize = poolsize; this.name = name; this.bufferSize = bufferSize; this.errorHandler = errorHandler; this.producerType = producerType; this.waitStrategy = waitStrategy; dispatchers = new Dispatcher[poolsize]; terminated = false; }
private AsyncLoopThread startDispatchThread() { Map stormConf = workerData.getStormConf(); int queue_size = Utils.getInt( stormConf.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE), 1024); WaitStrategy waitStrategy = (WaitStrategy) Utils .newInstance((String) stormConf .get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY)); DisruptorQueue recvQueue = DisruptorQueue.mkInstance("Dispatch", ProducerType.MULTI, queue_size, waitStrategy); // stop consumerStarted //recvQueue.consumerStarted(); IContext context = workerData.getContext(); String topologyId = workerData.getTopologyId(); IConnection recvConnection = context.bind(topologyId, workerData.getPort()); recvConnection.registerQueue(recvQueue); RunnableCallback recvDispather = new VirtualPortDispatch(workerData, recvConnection, recvQueue); AsyncLoopThread vthread = new AsyncLoopThread(recvDispather, false, Thread.MAX_PRIORITY, false); return vthread; }
public DisruptorQueue registerDisruptorQueue() { int queueSize = JStormUtils.parseInt( stormConf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE), 256); WaitStrategy waitStrategy = (WaitStrategy) Utils .newInstance((String) stormConf .get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY)); DisruptorQueue queue = DisruptorQueue.mkInstance("TaskDeserialize", ProducerType.SINGLE, queueSize, waitStrategy); deserializeQueues.put(taskid, queue); return queue; }
public TaskTransfer(String taskName, KryoTupleSerializer serializer, TaskStatus taskStatus, WorkerData workerData) { this.taskName = taskName; this.serializer = serializer; this.taskStatus = taskStatus; this.storm_conf = workerData.getConf(); this.transferQueue = workerData.getTransferQueue(); this.innerTaskTransfer = workerData.getInnerTaskTransfer(); int queue_size = Utils.getInt(storm_conf .get(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE)); WaitStrategy waitStrategy = (WaitStrategy) Utils .newInstance((String) storm_conf .get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY)); this.serializeQueue = DisruptorQueue.mkInstance(taskName, ProducerType.MULTI, queue_size, waitStrategy); this.serializeQueue.consumerStarted(); String taskId = taskName.substring(taskName.indexOf(":") + 1); Metrics.registerQueue(taskName, MetricDef.SERIALIZE_QUEUE, serializeQueue, taskId, Metrics.MetricType.TASK); timer = Metrics.registerTimer(taskName, MetricDef.SERIALIZE_TIME, taskId, Metrics.MetricType.TASK); serializeThread = new AsyncLoopThread(new TransferRunnable()); LOG.info("Successfully start TaskTransfer thread"); }
@Test public void test_small_message() { System.out.println("!!!!!!!!Start test_small_message !!!!!!!!!!!"); String req_msg = "Aloha is the most Hawaiian word."; IConnection server = null; IConnection client = null; server = context.bind(null, port); WaitStrategy waitStrategy = (WaitStrategy) Utils .newInstance((String) storm_conf .get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY)); DisruptorQueue recvQueue = DisruptorQueue.mkInstance( "NettyUnitTest", ProducerType.SINGLE, 1024, waitStrategy); server.registerQueue(recvQueue); client = context.connect(null, "localhost", port); List<TaskMessage> list = new ArrayList<TaskMessage>(); TaskMessage message = new TaskMessage(task, req_msg.getBytes()); list.add(message); client.send(message); TaskMessage recv = server.recv(0); Assert.assertEquals(req_msg, new String(recv.message())); System.out.println("!!!!!!!!!!!!!!!!!!Test one time!!!!!!!!!!!!!!!!!"); server.close(); client.close(); System.out.println("!!!!!!!!!!!!End test_small_message!!!!!!!!!!!!!"); }
@Test public void test_large_msg() { System.out.println("!!!!!!!!!!start larget message test!!!!!!!!"); String req_msg = setupLargMsg(); System.out.println("!!!!Finish batch data, size:" + req_msg.length() + "!!!!"); IConnection server = null; IConnection client = null; server = context.bind(null, port); WaitStrategy waitStrategy = (WaitStrategy) Utils .newInstance((String) storm_conf .get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY)); DisruptorQueue recvQueue = DisruptorQueue.mkInstance( "NettyUnitTest", ProducerType.SINGLE, 1024, waitStrategy); server.registerQueue(recvQueue); client = context.connect(null, "localhost", port); List<TaskMessage> list = new ArrayList<TaskMessage>(); TaskMessage message = new TaskMessage(task, req_msg.getBytes()); list.add(message); LOG.info("Client send data"); client.send(message); TaskMessage recv = server.recv(0); Assert.assertEquals(req_msg, new String(recv.message())); client.close(); server.close(); System.out.println("!!!!!!!!!!End larget message test!!!!!!!!"); }
@Test public void test_server_delay() throws InterruptedException { System.out.println("!!!!!!!!!!Start delay message test!!!!!!!!"); String req_msg = setupLargMsg(); IConnection server = null; IConnection client = null; server = context.bind(null, port); WaitStrategy waitStrategy = (WaitStrategy) Utils .newInstance((String) storm_conf .get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY)); DisruptorQueue recvQueue = DisruptorQueue.mkInstance( "NettyUnitTest", ProducerType.SINGLE, 1024, waitStrategy); server.registerQueue(recvQueue); client = context.connect(null, "localhost", port); List<TaskMessage> list = new ArrayList<TaskMessage>(); TaskMessage message = new TaskMessage(task, req_msg.getBytes()); list.add(message); LOG.info("Client send data"); client.send(message); Thread.sleep(1000); TaskMessage recv = server.recv(0); Assert.assertEquals(req_msg, new String(recv.message())); server.close(); client.close(); System.out.println("!!!!!!!!!!End delay message test!!!!!!!!"); }
/** * Construct a Sequencer with the selected wait strategy and buffer size. * * @param bufferSize the size of the buffer that this will sequence over. * @param waitStrategy for those waiting on sequences. */ public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); availableBuffer = new int[bufferSize]; indexMask = bufferSize - 1; indexShift = Util.log2(bufferSize); initialiseAvailableBuffer(); }
public static DisruptorQueue mkInstance(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) { if (CAPACITY_LIMITED == true) { return new DisruptorQueueImpl(queueName, producerType, bufferSize, wait); } else { return new DisruptorWrapBlockingQueue(queueName, producerType, bufferSize, wait); } }