private RingBufferProcessor(String name, ExecutorService executor, int bufferSize, WaitStrategy waitStrategy, boolean shared, boolean autoCancel) { super(name, executor, autoCancel); this.ringBuffer = RingBuffer.create( shared ? ProducerType.MULTI : ProducerType.SINGLE, new EventFactory<MutableSignal<E>>() { @Override public MutableSignal<E> newInstance() { return new MutableSignal<E>(); } }, bufferSize, waitStrategy ); this.recentSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); this.barrier = ringBuffer.newBarrier(); //ringBuffer.addGatingSequences(recentSequence); }
private RingBufferWorkProcessor(String name, ExecutorService executor, int bufferSize, WaitStrategy waitStrategy, boolean share, boolean autoCancel) { super(name, executor, autoCancel); this.ringBuffer = RingBuffer.create( share ? ProducerType.MULTI : ProducerType.SINGLE, new EventFactory<MutableSignal<E>>() { @Override public MutableSignal<E> newInstance() { return new MutableSignal<E>(); } }, bufferSize, waitStrategy ); ringBuffer.addGatingSequences(workSequence); }
/** * Construct a RingBuffer with the full option set. * * @param eventFactory to newInstance entries for filling the RingBuffer * @param sequencer sequencer to handle the ordering of events moving through the RingBuffer. * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2 */ public RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; this.entries = new Object[sequencer.getBufferSize()]; fill(eventFactory); }
/** * Create a new Ring Buffer with the specified producer type (SINGLE or MULTI) * * @param producerType producer type to use {@link ProducerType}. * @param factory used to create events within the ring buffer. * @param bufferSize number of elements to create within the ring buffer. * @param waitStrategy used to determine how to wait for new elements to become available. * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2 */ public static <E> RingBuffer<E> create(ProducerType producerType, EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { switch (producerType) { case SINGLE: return createSingleProducer(factory, bufferSize, waitStrategy); case MULTI: return createMultiProducer(factory, bufferSize, waitStrategy); default: throw new IllegalStateException(producerType.toString()); } }
RingBufferProvider(@Nonnegative final int capacity, @Nonnull final Factory<T> factory, @Nonnull final MessageBufferConsumer<T> appIn) { if (appIn == null) { throw new NullPointerException("appIn == null"); } this.waitOut = new WakeupWaitStrategy(); this.waitIn = null; final EventFactory<T> evfactory = wrapFactory(factory); this.out = RingBuffer.createMultiProducer(evfactory, capacity, waitOut); this.in = null; this.attachOut = new Object[capacity]; this.attachIn = null; this.appOut = new RingBufferProducer<>(out, attachOut); this.chnIn = new RingBufferConsumer<>(out, attachOut); this.chnOut = appIn.createProducer(); this.appIn = appIn; this.internalConsumer = false; }
@Before @SuppressWarnings("unchecked") public void setup() { responseBuffer = new Disruptor<ResponseEvent>(new EventFactory<ResponseEvent>() { @Override public ResponseEvent newInstance() { return new ResponseEvent(); } }, 1024, Executors.newCachedThreadPool()); firedEvents = Collections.synchronizedList(new ArrayList<CouchbaseMessage>()); latch = new CountDownLatch(1); responseBuffer.handleEventsWith(new EventHandler<ResponseEvent>() { @Override public void onEvent(ResponseEvent event, long sequence, boolean endOfBatch) throws Exception { firedEvents.add(event.getMessage()); latch.countDown(); } }); responseRingBuffer = responseBuffer.start(); }
public DisruptorPublisher(int bufferSize, TestHandler handler) { this.handler = new HelloEventHandler(handler); EventFactory<HelloEvent> eventFactory = new HelloEventFactory(); int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必须是 2 的 N 次方; executor = Executors.newSingleThreadExecutor(); disruptor = new Disruptor<HelloEvent>( eventFactory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, YIELDING_WAIT); }
public static void main(String[] args) { EventFactory<HelloEvent> eventFactory = new HelloEventFactory(); int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必须是 2 的 N 次方; Disruptor<HelloEvent> disruptor = new Disruptor<HelloEvent>( eventFactory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); EventHandler<HelloEvent> eventHandler = new HelloEventHandler(); disruptor.handleEventsWith(eventHandler, eventHandler); disruptor.start(); }
@SuppressWarnings("unchecked") public ArchiveProcessor(final EventFactory<E> eventFactory, int ringBufferSize, final Class<E> eventClass, final int batchSize, final ArchiveBatchHandler<E> batchHandler) { eventDisruptor = new Disruptor<>(eventFactory, ringBufferSize, DaemonThreadFactory.INSTANCE); eventDisruptor.handleEventsWith(new ArchiveEventHandler<>(eventClass, batchSize, batchHandler)); running = false; archiveThread = new Thread(this, "archiveProcessor"); }
/** * Create a new Disruptor. * * @param eventFactory the factory to create events in the ring buffer. * @param ringBufferSize the size of the ring buffer, must be power of 2. * @param threadFactory a {@link ThreadFactory} to create threads for processors. * @param producerType the claim strategy to use for the ring buffer. * @param waitStrategy the wait strategy to use for the ring buffer. */ public Disruptor( final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory, final ProducerType producerType, final WaitStrategy waitStrategy) { this(RingBuffer.create( producerType, eventFactory, ringBufferSize, waitStrategy), new BasicExecutor(threadFactory)); }
public static <T> EventFactory<BatchedPoller.DataEvent<T>> factory() { return new EventFactory<BatchedPoller.DataEvent<T>>() { @Override public BatchedPoller.DataEvent<T> newInstance() { return new BatchedPoller.DataEvent<T>(); } }; }
public static <T> EventFactory<DataEvent<T>> factory() { return new EventFactory<DataEvent<T>>() { @Override public DataEvent<T> newInstance() { return new DataEvent<T>(); } }; }
public static void main(String[] args) throws InterruptedException { long beginTime=System.currentTimeMillis(); int bufferSize=1024; ExecutorService executor=Executors.newFixedThreadPool(4); //������캯�����������������˽�����2��demo֮��Ϳ��¾������ˣ���������~ Disruptor<TradeTransaction> disruptor=new Disruptor<TradeTransaction>(new EventFactory<TradeTransaction>() { @Override public TradeTransaction newInstance() { return new TradeTransaction(); } }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); //ʹ��disruptor������������C1,C2 EventHandlerGroup<TradeTransaction> handlerGroup=disruptor.handleEventsWith(new TradeTransactionVasConsumer(),new TradeTransactionInDBHandler()); TradeTransactionJMSNotifyHandler jmsConsumer=new TradeTransactionJMSNotifyHandler(); //������C1,C2����֮��ִ��JMS��Ϣ���Ͳ��� Ҳ���������ߵ�C3 handlerGroup.then(jmsConsumer); disruptor.start();//���� CountDownLatch latch=new CountDownLatch(1); //�������� executor.submit(new TradeTransactionPublisher(latch, disruptor)); latch.await();//�ȴ�����������. disruptor.shutdown(); executor.shutdown(); System.out.println("�ܺ�ʱ:"+(System.currentTimeMillis()-beginTime)); // long tt= System.currentTimeMillis(); // for (int i = 0; i < 1000; i++) { // int j=i; // } // System.out.println("�ܺ�ʱ:"+(System.currentTimeMillis()-tt)); }
private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < entries.length; i++) { entries[i] = eventFactory.newInstance(); } }
private FileWriter() { disruptor = new Disruptor<LogMessageHolder>(new EventFactory<LogMessageHolder>() { @Override public LogMessageHolder newInstance() { return new LogMessageHolder(); } }, 1024, DaemonThreadFactory.INSTANCE); disruptor.handleEventsWith(this); buffer = disruptor.getRingBuffer(); lineNum = 0; disruptor.start(); }
@Before public void setup() { Disruptor<Event> disruptor = new Disruptor<Event>(new EventFactory<Event>() { public Event newInstance() { return new Event(); } }, 1024 * 8, Executors.newFixedThreadPool(5)); disruptor.handleEventsWith( new EventHandler<Event>() { @Override public void onEvent(Event event, long sequence, boolean endOfBatch) throws Exception { cdh1.countDown(); int value = holder.getValue(); Thread.yield(); int newValue = value + 1; holder.setValue(newValue); counter.incrementAndGet(); } }); disruptor.start(); this.disruptor = disruptor; }
@SuppressWarnings("unchecked") public AsyncConcurrentBatchingProcessor(BatchProcessor<T> processor, EventFactory<T> factory, EventTranslatorOneArg<T,T> translator ){ Executor executor = Executors.newSingleThreadExecutor(); int bufferSize = 1024; m_disruptor = new Disruptor<T>(factory, bufferSize, executor); m_disruptor.handleEventsWith(this); m_buffer = m_disruptor.start(); m_processor = processor; m_translator = translator; }
@Override public boolean initialize(EventsChannelConfig config) { super.initialize(config); log.info("Initialize disruptor events channel " + config.getName() + " with " + config); EventFactory<GridEvent> eventFactory = new DisruptorEventFactory(); int ringBufferSize = config.getBlockQueueMaxNumber(); int threadSize = config.getEventConsumerNumber(); int bufferSize = ringBufferSize; if (Integer.bitCount(bufferSize) != 1) { bufferSize = (int) Math.pow(2, (int) (Math.log(ringBufferSize) / Math.log(2))); log.warn("Change disruptor events channel " + config.getName() + " buffer size from " + ringBufferSize + " to " + bufferSize); } if (bufferSize <= 0) throw new GridException("Invalid disruptor ringbuffur size:" + ringBufferSize); threadPool = Executors.newFixedThreadPool(threadSize); ringBuffer = RingBuffer.createMultiProducer(eventFactory, bufferSize, new BlockingWaitStrategy()); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); ExecutorService executor = Executors.newFixedThreadPool(10); @SuppressWarnings("unchecked") WorkHandler<GridEvent>[] workHandlers = new WorkHandler[threadSize]; for (int i = 0; i < threadSize; i++) { WorkHandler<GridEvent> workHandler = new DisruptorEventsWorkHandler(getName()); workHandlers[i] = workHandler; } workerPool = new WorkerPool<GridEvent>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), workHandlers); workerPool.start(executor); return true; }
RingBufferProvider(@Nonnegative final int capacity, @Nonnull final Factory<T> factory) { this.waitOut = new WakeupWaitStrategy(); this.waitIn = new BlockingWaitStrategy(); final EventFactory<T> evfactory = wrapFactory(factory); this.out = RingBuffer.createMultiProducer(evfactory, capacity, waitOut); this.in = RingBuffer.createSingleProducer(evfactory, capacity, waitIn); this.attachOut = new Object[capacity]; this.attachIn = new Object[capacity]; this.appOut = new RingBufferProducer<>(out, attachOut); this.chnIn = new RingBufferConsumer<>(out, attachOut); this.chnOut = new RingBufferProducer<>(in, attachIn); this.appIn = new RingBufferConsumer<>(in, attachIn); this.internalConsumer = true; }
/** * Convert a {@link Factory} into an {@link EventFactory} */ private static <T> EventFactory<T> wrapFactory(@Nonnull final Factory<T> factory) { if (factory == null) { throw new NullPointerException("factory == null"); } return new EventFactory<T>() { @Override public T newInstance() { return factory.newInstance(); } }; }
public static <T> RingBufferConsumer<T> createConsumer(@Nonnegative final int capacity, @Nonnull final Factory<T> factory) { final EventFactory<T> evfactory = wrapFactory(factory); final RingBuffer<T> buffer = RingBuffer.createMultiProducer(evfactory, capacity); final Object[] attachments = new Object[capacity]; final RingBufferConsumer<T> consumer = new RingBufferConsumer<>(buffer, attachments); return consumer; }
@Before @SuppressWarnings("unchecked") public void setup() { responseBuffer = new Disruptor<ResponseEvent>(new EventFactory<ResponseEvent>() { @Override public ResponseEvent newInstance() { return new ResponseEvent(); } }, 1024, Executors.newCachedThreadPool()); firedEvents = Collections.synchronizedList(new ArrayList<CouchbaseMessage>()); latch = new CountDownLatch(1); responseBuffer.handleEventsWith(new EventHandler<ResponseEvent>() { @Override public void onEvent(ResponseEvent event, long sequence, boolean endOfBatch) throws Exception { firedEvents.add(event.getMessage()); latch.countDown(); } }); responseRingBuffer = responseBuffer.start(); CoreEnvironment environment = mock(CoreEnvironment.class); when(environment.scheduler()).thenReturn(Schedulers.computation()); when(environment.maxRequestLifetime()).thenReturn(10000L); // 10 seconds when(environment.autoreleaseAfter()).thenReturn(2000L); when(environment.retryStrategy()).thenReturn(FailFastRetryStrategy.INSTANCE); endpoint = mock(AbstractEndpoint.class); when(endpoint.environment()).thenReturn(environment); when(environment.userAgent()).thenReturn("Couchbase Client Mock"); queue = new ArrayDeque<ViewRequest>(); handler = new ViewHandler(endpoint, responseRingBuffer, queue, false, false); channel = new EmbeddedChannel(handler); }
protected void commonSetup() { responseBuffer = new Disruptor<ResponseEvent>(new EventFactory<ResponseEvent>() { @Override public ResponseEvent newInstance() { return new ResponseEvent(); } }, 1024, Executors.newCachedThreadPool()); firedEvents = Collections.synchronizedList(new ArrayList<CouchbaseMessage>()); latch = new CountDownLatch(1); responseBuffer.handleEventsWith(new EventHandler<ResponseEvent>() { @Override public void onEvent(ResponseEvent event, long sequence, boolean endOfBatch) throws Exception { firedEvents.add(event.getMessage()); latch.countDown(); } }); responseRingBuffer = responseBuffer.start(); CoreEnvironment environment = mock(CoreEnvironment.class); when(environment.scheduler()).thenReturn(Schedulers.computation()); when(environment.maxRequestLifetime()).thenReturn(10000L); when(environment.autoreleaseAfter()).thenReturn(2000L); when(environment.retryStrategy()).thenReturn(FailFastRetryStrategy.INSTANCE); endpoint = mock(AbstractEndpoint.class); when(endpoint.environment()).thenReturn(environment); when(environment.userAgent()).thenReturn("Couchbase Client Mock"); queue = new ArrayDeque<QueryRequest>(); }
@Before @SuppressWarnings("unchecked") public void setup() { responseBuffer = new Disruptor<ResponseEvent>(new EventFactory<ResponseEvent>() { @Override public ResponseEvent newInstance() { return new ResponseEvent(); } }, 1024, Executors.newCachedThreadPool()); firedEvents = Collections.synchronizedList(new ArrayList<CouchbaseMessage>()); latch = new CountDownLatch(1); responseBuffer.handleEventsWith(new EventHandler<ResponseEvent>() { @Override public void onEvent(ResponseEvent event, long sequence, boolean endOfBatch) throws Exception { firedEvents.add(event.getMessage()); latch.countDown(); } }); responseRingBuffer = responseBuffer.start(); CoreEnvironment environment = mock(CoreEnvironment.class); when(environment.scheduler()).thenReturn(Schedulers.computation()); when(environment.maxRequestLifetime()).thenReturn(10000L); // 10 seconds when(environment.autoreleaseAfter()).thenReturn(2000L); when(environment.retryStrategy()).thenReturn(FailFastRetryStrategy.INSTANCE); endpoint = mock(AbstractEndpoint.class); when(endpoint.environment()).thenReturn(environment); when(environment.userAgent()).thenReturn("Couchbase Client Mock"); queue = new ArrayDeque<SearchRequest>(); handler = new SearchHandler(endpoint, responseRingBuffer, queue, false, false); channel = new EmbeddedChannel(handler); }
public DisruptorExecutor(int threadCount, int bufferSize, WaitStrategy waitStrategy) { ringBuffer = RingBuffer.createMultiProducer(new EventFactory<RContainer>() { @Override public RContainer newInstance() { return new RContainer(); } }, bufferSize, waitStrategy); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); Sequence workSequence = new Sequence(-1); workProcessors = new WorkProcessor[threadCount]; for (int i = 0 ; i < threadCount ; i++) { workProcessors[i] = new WorkProcessor<RContainer>(ringBuffer, sequenceBarrier, handler, new IgnoreExceptionHandler(), workSequence); } workExec = Executors.newFixedThreadPool(workProcessors.length, new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } }); for (WorkProcessor p : workProcessors) workExec.execute(p); }
/** * Create a new Disruptor. * * @param eventFactory the factory to create events in the ring buffer. * @param executor an {@link Executor} to execute event processors. * @param producerType the claim strategy to use for the ring buffer. * @param waitStrategy the wait strategy to use for the ring buffer. */ public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor, final ProducerType producerType, final WaitStrategy waitStrategy) { this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor); }
@Override public EventFactory<RelatedItemSearch> createRelatedItemSearchEventFactory() { return new EventFactory<RelatedItemSearch>() { @Override public RelatedItemSearch newInstance() { return relatedItemSearchFactory.createSearchObject(); } }; }
@Test public void testCanCreateReferenceObject() { EventFactory<RelatedItemReference> factory = new RelatedItemReferenceMessageFactory(); assertNotNull(factory.newInstance()); assertTrue(factory.newInstance() instanceof RelatedItemReference); }
public DisruptorIndexRequestProcessorFactory(IndexingRequestConverterFactory requestBytesConverter, EventFactory<RelatedItemIndexingMessage> indexingMessageFactory, RelatedItemIndexingMessageEventHandler indexingEventHandler) { this.requestBytesConverter = requestBytesConverter; this.indexingMessageFactory = indexingMessageFactory; this.indexingEventHandler = indexingEventHandler; }
/** * Create a new {@code HashWheelTimer} using the given timer {@param resolution} and {@param wheelSize}. All times * will * rounded up to the closest multiple of this resolution. * * @param name name for daemon thread factory to be displayed * @param res resolution of this timer in milliseconds * @param wheelSize size of the Ring Buffer supporting the Timer, the larger the wheel, the less the lookup time is * for sparse timeouts. Sane default is 512. * @param strategy strategy for waiting for the next tick * @param exec Executor instance to submit tasks to */ public HashWheelTimer(String name, int res, int wheelSize, WaitStrategy strategy, Executor exec) { this.waitStrategy = strategy; this.wheel = RingBuffer.createSingleProducer(new EventFactory<Set<TimerPausable>>() { @Override public Set<TimerPausable> newInstance() { return new ConcurrentSkipListSet<TimerPausable>(); } }, wheelSize); this.resolution = res; this.loop = new NamedDaemonThreadFactory(name).newThread(new Runnable() { @Override public void run() { long deadline = System.currentTimeMillis(); while (true) { Set<TimerPausable> registrations = wheel.get(wheel.getCursor()); for (TimerPausable r : registrations) { if (r.isCancelled()) { registrations.remove(r); } else if (r.ready()) { executor.execute(r); registrations.remove(r); if (!r.isCancelAfterUse()) { reschedule(r); } } else if (r.isPaused()) { reschedule(r); } else { r.decrement(); } } deadline += resolution; try { waitStrategy.waitUntil(deadline); } catch (InterruptedException e) { return; } wheel.publish(wheel.next()); } } }); this.executor = exec; this.start(); }