@Override protected void channelRead0(ChannelHandlerContext ctx, O msg) throws Exception { UUID uuid = CHANNEL_UUID.get(ctx.channel()); if (null != uuid) { NetSession session = SESSIONS.get(uuid); if (null != session) { RingBuffer<ConcurrentEvent> ringBuffer = THREAD_LOCAL.get().getRingBuffer(); long next = ringBuffer.next(); try { ConcurrentEvent commandEvent = ringBuffer.get(next); commandEvent.setValues(newExecutor(session, msg)); } finally { ringBuffer.publish(next); } } } }
@Override public void startup() { EventBus eventBus = disruptorDispatchThread.getEventBus(); executorService = new NonOrderedQueuePoolExecutor(poolName, excutorSize); cycleEventHandler = new CycleEventHandler[excutorSize]; for(int i = 0; i < excutorSize; i++){ cycleEventHandler[i] = new CycleEventHandler(eventBus); } RingBuffer ringBuffer = disruptorDispatchThread.getRingBuffer(); workerPool = new WorkerPool(ringBuffer, ringBuffer.newBarrier(), new FatalExceptionHandler(), cycleEventHandler); ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); workerPool.start(executorService); // BatchEventProcessor<CycleEvent>[] batchEventProcessors = new BatchEventProcessor[excutorSize]; // for(int i = 0; i < excutorSize; i++){ // batchEventProcessors[i] = new BatchEventProcessor<>(ringBuffer, sequenceBarrier, cycleEventHandler[i]); // ringBuffer.addGatingSequences(batchEventProcessors[i].getSequence()); //// executorService.submit(batchEventProcessors[i]); // } }
@Test public void shouldSupportCustomProcessorsAsDependencies() throws Exception { RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer(); final DelayedEventHandler delayedEventHandler = createDelayedEventHandler(); CountDownLatch countDownLatch = new CountDownLatch(2); EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch); final BatchEventProcessor<TestEvent> processor = new BatchEventProcessor<TestEvent>(ringBuffer, ringBuffer.newBarrier(), delayedEventHandler); disruptor.handleEventsWith(processor); disruptor.after(processor).handleEventsWith(handlerWithBarrier); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler); }
@Test public void shouldSupportHandlersAsDependenciesToCustomProcessors() throws Exception { final DelayedEventHandler delayedEventHandler = createDelayedEventHandler(); disruptor.handleEventsWith(delayedEventHandler); RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer(); CountDownLatch countDownLatch = new CountDownLatch(2); EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch); final SequenceBarrier sequenceBarrier = disruptor.after(delayedEventHandler).asSequenceBarrier(); final BatchEventProcessor<TestEvent> processor = new BatchEventProcessor<TestEvent>(ringBuffer, sequenceBarrier, handlerWithBarrier); disruptor.handleEventsWith(processor); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler); }
@Test public void shouldSupportCustomProcessorsAndHandlersAsDependencies() throws Exception { final DelayedEventHandler delayedEventHandler1 = createDelayedEventHandler(); final DelayedEventHandler delayedEventHandler2 = createDelayedEventHandler(); disruptor.handleEventsWith(delayedEventHandler1); RingBuffer<TestEvent> ringBuffer = disruptor.getRingBuffer(); CountDownLatch countDownLatch = new CountDownLatch(2); EventHandler<TestEvent> handlerWithBarrier = new EventHandlerStub<TestEvent>(countDownLatch); final SequenceBarrier sequenceBarrier = disruptor.after(delayedEventHandler1).asSequenceBarrier(); final BatchEventProcessor<TestEvent> processor = new BatchEventProcessor<TestEvent>(ringBuffer, sequenceBarrier, delayedEventHandler2); disruptor.after(delayedEventHandler1).and(processor).handleEventsWith(handlerWithBarrier); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler1, delayedEventHandler2); }
@Test public void shouldMakeEntriesAvailableToFirstCustomProcessorsImmediately() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(2); final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch); disruptor.handleEventsWith( new EventProcessorFactory<TestEvent>() { @Override public EventProcessor createEventProcessor( final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences) { assertEquals("Should not have had any barrier sequences", 0, barrierSequences.length); return new BatchEventProcessor<TestEvent>( disruptor.getRingBuffer(), ringBuffer.newBarrier( barrierSequences), eventHandler); } }); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch); }
@Test public void shouldHonourDependenciesForCustomProcessors() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(2); final EventHandler<TestEvent> eventHandler = new EventHandlerStub<TestEvent>(countDownLatch); final DelayedEventHandler delayedEventHandler = createDelayedEventHandler(); disruptor.handleEventsWith(delayedEventHandler).then( new EventProcessorFactory<TestEvent>() { @Override public EventProcessor createEventProcessor( final RingBuffer<TestEvent> ringBuffer, final Sequence[] barrierSequences) { assertSame("Should have had a barrier sequence", 1, barrierSequences.length); return new BatchEventProcessor<TestEvent>( disruptor.getRingBuffer(), ringBuffer.newBarrier( barrierSequences), eventHandler); } }); ensureTwoEventsProcessedAccordingToDependencies(countDownLatch, delayedEventHandler); }
@SuppressWarnings("unchecked") public DisruptorTransfer(final SpanEventHandler spanEventHandler, final int buffSize) { // Executor executor = Executors.newCachedThreadPool(); final ThreadFactory threadFactory = Executors.defaultThreadFactory(); // The factory for the event final SpanEventFactory factory = new SpanEventFactory(); // Specify the size of the ring buffer, must be power of 2. final int bufferSize = buffSize; // Construct the Disruptor disruptor = new Disruptor<SpanEvent>(factory, bufferSize, threadFactory); // Connect the handler // disruptor.handleEventsWith(new // SpanEventHandler("http://localhost:9080/upload")); disruptor.handleEventsWith(spanEventHandler); // Start the Disruptor, starts all threads running disruptor.start(); final RingBuffer<SpanEvent> ringBuffer = disruptor.getRingBuffer(); producer = new SpanEventProducer(ringBuffer); }
private RingBufferProcessor(String name, ExecutorService executor, int bufferSize, WaitStrategy waitStrategy, boolean shared, boolean autoCancel) { super(name, executor, autoCancel); this.ringBuffer = RingBuffer.create( shared ? ProducerType.MULTI : ProducerType.SINGLE, new EventFactory<MutableSignal<E>>() { @Override public MutableSignal<E> newInstance() { return new MutableSignal<E>(); } }, bufferSize, waitStrategy ); this.recentSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); this.barrier = ringBuffer.newBarrier(); //ringBuffer.addGatingSequences(recentSequence); }
private RingBufferWorkProcessor(String name, ExecutorService executor, int bufferSize, WaitStrategy waitStrategy, boolean share, boolean autoCancel) { super(name, executor, autoCancel); this.ringBuffer = RingBuffer.create( share ? ProducerType.MULTI : ProducerType.SINGLE, new EventFactory<MutableSignal<E>>() { @Override public MutableSignal<E> newInstance() { return new MutableSignal<E>(); } }, bufferSize, waitStrategy ); ringBuffer.addGatingSequences(workSequence); }
/** * Init method. * * @return */ public DisruptorQueue<ID, DATA> init() { /* single producer "seems" to offer better performance */ ringBuffer = RingBuffer.createSingleProducer(EVENT_FACTORY, ringSize); // ringBuffer = RingBuffer.createMultiProducer(EVENT_FACTORY, ringSize); if (!isEphemeralDisabled()) { int ephemeralBoundSize = Math.max(0, getEphemeralMaxSize()); ephemeralStorage = new ConcurrentHashMap<>( ephemeralBoundSize > 0 ? Math.min(ephemeralBoundSize, ringSize) : ringSize); } consumedSeq = new Sequence(); ringBuffer.addGatingSequences(consumedSeq); long cursor = ringBuffer.getCursor(); consumedSeq.set(cursor); knownPublishedSeq = cursor; return this; }
/** * Construct a blocking queue based on disruptor. * * @param bufferSize * ring buffer size * @param singleProducer * whether only single thread produce events. */ public SingleConsumerDisruptorQueue(int bufferSize, boolean singleProducer) { if (singleProducer) { ringBuffer = RingBuffer.createSingleProducer(new Factory<T>(), normalizeBufferSize(bufferSize)); } else { ringBuffer = RingBuffer.createMultiProducer(new Factory<T>(), normalizeBufferSize(bufferSize)); } consumedSeq = new Sequence(); ringBuffer.addGatingSequences(consumedSeq); barrier = ringBuffer.newBarrier(); long cursor = ringBuffer.getCursor(); consumedSeq.set(cursor); knownPublishedSeq = cursor; }
@Override protected void channelRead0(ChannelHandlerContext ctx, O msg) throws Exception { UUID uuid = CHANNEL_UUID.get(ctx.channel()); if (null != uuid) { Session session = SESSIONS.get(uuid); if (null != session) { RingBuffer<ConcurrentEvent> ringBuffer = THREAD_LOCAL.get().getRingBuffer(); long next = ringBuffer.next(); try { ConcurrentEvent commandEvent = ringBuffer.get(next); commandEvent.setValues(newExecutor(session, msg)); } finally { ringBuffer.publish(next); } } } }
public void post(Object event) { Class<?> eventClass = event.getClass(); Disruptor<BaseEvent> eventDisruptor = disruptorPool.get(eventClass); if(eventDisruptor == null) { List<RegistedEventHandler> registedEventHandlers = handlesMap.get(eventClass); if(registedEventHandlers == null || registedEventHandlers.size() == 0) { throw new RuntimeException("The " + eventClass.getSimpleName() + " event dosen't regist any eventHandler."); } eventDisruptor = createDisruptor(eventClass, registedEventHandlers); disruptorPool.put(eventClass, eventDisruptor); } // check whether has event Repository definition. if(eventRepository != null) { eventRepository.save(event); } RingBuffer<BaseEvent> ringBuffer = eventDisruptor.getRingBuffer(); BaseEventProducer producer = new BaseEventProducer(ringBuffer); producer.onData(event); }
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) throws IOException { if (this.closed) { throw new IOException( "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString()); } MutableLong txidHolder = new MutableLong(); MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> { txidHolder.setValue(ringBuffer.next()); }); long txid = txidHolder.longValue(); try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) { FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore); entry.stampRegionSequenceId(we); ringBuffer.get(txid).load(entry); } finally { ringBuffer.publish(txid); } return txid; }
@Test public void shouldSetDispatchedHostnameAfterSend() { ServiceRegistry registryMock = mock(ServiceRegistry.class); ServiceFactory serviceFactory = mock(ServiceFactory.class); Service binaryServiceMock = mock(Service.class); when(binaryServiceMock.type()).thenReturn(ServiceType.BINARY); when(binaryServiceMock.states()).thenReturn(Observable.just(LifecycleState.CONNECTED)); when(binaryServiceMock.connect()).thenReturn(Observable.just(LifecycleState.CONNECTED)); when(serviceFactory.create(anyString(), anyString(), anyString(), anyString(), eq(0), same(environment), eq(ServiceType.BINARY), any(RingBuffer.class))).thenReturn(binaryServiceMock); CouchbaseNode node = new CouchbaseNode(host, registryMock, environment, null, serviceFactory); node.addService(new AddServiceRequest(ServiceType.BINARY, "bucket", null, 0, host)) .toBlocking().single(); CouchbaseRequest request = mock(CouchbaseRequest.class); node.send(request); verify(request, times(1)).dispatchHostname(any(String.class)); }
@Override public void synchronousFire( final Object event ) { eventCount.increment(); CountDownLatch latch = new CountDownLatch( handlers.size() ); try { RingBuffer<StatisticEventHolder> ringBuffer = disruptor.getRingBuffer(); long sequence = ringBuffer.next(); try { StatisticEventHolder holder = ringBuffer.get( sequence ); holder.set( event ); holder.setLatch( latch ); } finally { ringBuffer.publish( sequence ); } latch.await(); } catch ( InterruptedException e ) { return; } }
/** * Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when * the {@link com.lmax.disruptor.EventHandler#onEvent(Object, long, boolean)} method returns. * * @param ringBuffer to which events are published. * @param sequenceBarrier on which it is waiting. * @param eventHandler is the delegate to which events are dispatched. * @param turn is the value of, sequence % groupCount this batch processor process events. Turn must be * less than groupCount * @param groupCount total number of concurrent batch processors for the event type * @param batchSize size limit of total content size to batch. This is a loose limit */ public ConcurrentContentReadTaskBatchProcessor(final RingBuffer<DeliveryEventData> ringBuffer, final SequenceBarrier sequenceBarrier, final ContentCacheCreator eventHandler, long turn, int groupCount, int batchSize) { if (turn >= groupCount) { throw new IllegalArgumentException("Turn should be less than groupCount"); } this.ringBuffer = ringBuffer; this.sequenceBarrier = sequenceBarrier; this.eventHandler = eventHandler; this.turn = turn; this.groupCount = groupCount; this.batchSize = batchSize; exceptionHandler = new DeliveryExceptionHandler(); running = new AtomicBoolean(false); if (eventHandler instanceof SequenceReportingEventHandler) { ((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence); } }
RingBufferProvider(@Nonnegative final int capacity, @Nonnull final Factory<T> factory, @Nonnull final MessageBufferConsumer<T> appIn) { if (appIn == null) { throw new NullPointerException("appIn == null"); } this.waitOut = new WakeupWaitStrategy(); this.waitIn = null; final EventFactory<T> evfactory = wrapFactory(factory); this.out = RingBuffer.createMultiProducer(evfactory, capacity, waitOut); this.in = null; this.attachOut = new Object[capacity]; this.attachIn = null; this.appOut = new RingBufferProducer<>(out, attachOut); this.chnIn = new RingBufferConsumer<>(out, attachOut); this.chnOut = appIn.createProducer(); this.appIn = appIn; this.internalConsumer = false; }
RingBufferConsumer(@Nonnull final RingBuffer<T> buffer, @Nonnull final Object[] attachments) { if (buffer == null) { throw new NullPointerException("buffer == null"); } if (attachments == null) { throw new NullPointerException("attachments == null"); } if (buffer.getBufferSize() != attachments.length) { throw new IllegalArgumentException("buffer.getBufferSize() != attachments.length"); } this.buffer = buffer; this.attachments = attachments; this.barrier = buffer.newBarrier(); this.sequence = new Sequence(); buffer.addGatingSequences(sequence); this.cursor = sequence.get(); this.available = sequence.get(); }
public static Worker createWorker(File conf, EventHandler<KinesisEvent> handler, String appName)throws IOException{ Executor executor = Executors.newCachedThreadPool(); Disruptor<KinesisEvent> disruptor = new Disruptor<>(KinesisEvent.EVENT_FACTORY, 128, executor); disruptor.handleEventsWith(handler); RingBuffer<KinesisEvent> buffer = disruptor.start(); Properties props = new Properties(); props.load(new FileReader(conf)); // Generate a unique worker ID String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); String accessid = props.getProperty("aws-access-key-id"); String secretkey = props.getProperty("aws-secret-key"); String streamname = props.getProperty("aws-kinesis-stream-name"); BasicAWSCredentials creds = new BasicAWSCredentials(accessid, secretkey); CredProvider credprovider = new CredProvider(creds); KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(appName, streamname, credprovider, workerId); Worker worker = new Worker(new RecordProcessorFactory(buffer), config, new MetricsFactory()); return worker; }
protected AbstractDynamicService(final String hostname, final String bucket, final String username, final String password, final int port, final CoreEnvironment env, final int minEndpoints, final RingBuffer<ResponseEvent> responseBuffer, final EndpointFactory endpointFactory) { super(minEndpoints == 0 ? LifecycleState.IDLE : LifecycleState.DISCONNECTED); this.initialState = minEndpoints == 0 ? LifecycleState.IDLE : LifecycleState.DISCONNECTED; this.hostname = hostname; this.bucket = bucket; this.username = username; this.password = password; this.port = port; this.env = env; this.minEndpoints = minEndpoints; this.responseBuffer = responseBuffer; this.endpointFactory = endpointFactory; endpointStates = new EndpointStateZipper(initialState); endpoints = new Endpoint[minEndpoints]; endpointStates.states().subscribe(new Action1<LifecycleState>() { @Override public void call(LifecycleState lifecycleState) { transitionState(lifecycleState); } }); }
public DisruptorTransfer(final SpanEventHandler spanEventHandler, final int buffSize) { final ThreadFactory threadFactory = Executors.defaultThreadFactory(); final SpanEventFactory factory = new SpanEventFactory(); final int bufferSize = buffSize;// RingBuffer 大小,必须是 2 的 N 次方; disruptor = new Disruptor<>(factory, bufferSize, threadFactory); disruptor.handleEventsWith(spanEventHandler);// disruptor.handleEventsWith(new SpanEventHandler("http://localhost:9080/upload")); disruptor.start();// Start the Disruptor, starts all threads running final RingBuffer<SpanEvent> ringBuffer = disruptor.getRingBuffer(); producer = new SpanEventProducer(ringBuffer); }
default void loop(Disruptor disruptor, PcapHandle handle) throws Exception { RingBuffer<PacketContainer> ringBuffer = disruptor.getRingBuffer(); handle.loop(0, (RawPacketListener) packet -> { long sequence = ringBuffer.next(); PacketContainer container = ringBuffer.get(sequence); container.clear(); container.setDlt(handle.getDlt()); container.setTimestamp(handle.getTimestamp()); container.setRawPacket(packet); ringBuffer.publish(sequence); }); }
@Override protected void channelRead0(ChannelHandlerContext ctx, GpcCall msg) throws Exception { NetSession session = NET_SESSION_MAP.get(ctx.channel().id()); if (null == session) return; RingBuffer<ConcurrentEvent> ringBuffer = THREAD_LOCAL.get().getRingBuffer(); long next = ringBuffer.next(); try { ConcurrentEvent commandEvent = ringBuffer.get(next); commandEvent.setValues(newExecutor(session, msg)); } finally { ringBuffer.publish(next); } }
@Override protected void channelRead0(ChannelHandlerContext ctx, O msg) throws Exception { NetSession session = SESSIONS.get(ctx.channel().id()); if (null == session) { return; } RingBuffer<ConcurrentEvent> ringBuffer = THREAD_LOCAL.get().getRingBuffer(); long next = ringBuffer.next(); try { ConcurrentEvent commandEvent = ringBuffer.get(next); commandEvent.setValues(newExecutor(session, msg)); } finally { ringBuffer.publish(next); } }
public static void main(String[] args) throws Exception { Executor executor = Executors.newCachedThreadPool(); LongEventFactory eventFactory = new LongEventFactory(); int bufferSize = 1024; // Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, bufferSize, executor); // disruptor.handleEventsWith(new LongEventHandler()); // disruptor.start(); Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor); disruptor.handleEventsWith((event, sequence, endOfBatch) -> {System.out.println("Event: " + event); System.out.println("CurrentThreadName:" + Thread.currentThread().getName()); }); disruptor.start(); RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer producer = new LongEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb); // producer.onData(bb); //Thread.sleep(1000); } }
/** * Create a new Disruptor. * * @param eventFactory the factory to create events in the ring buffer. * @param ringBufferSize the size of the ring buffer, must be power of 2. * @param threadFactory a {@link ThreadFactory} to create threads for processors. * @param producerType the claim strategy to use for the ring buffer. * @param waitStrategy the wait strategy to use for the ring buffer. */ public Disruptor( final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory, final ProducerType producerType, final WaitStrategy waitStrategy) { this(RingBuffer.create( producerType, eventFactory, ringBufferSize, waitStrategy), new BasicExecutor(threadFactory)); }
/** * <p>Starts the event processors and returns the fully configured ring buffer.</p> * <p> * <p>The ring buffer is set up to prevent overwriting any entry that is yet to * be processed by the slowest event processor.</p> * <p> * <p>This method must only be called once after all event processors have been added.</p> * * @return the configured ring buffer. */ public RingBuffer<T> start() { final Sequence[] gatingSequences = consumerRepository.getLastSequenceInChain(true); ringBuffer.addGatingSequences(gatingSequences); checkOnlyStartedOnce(); for (final ConsumerInfo consumerInfo : consumerRepository) { consumerInfo.start(executor); } return ringBuffer; }
@Override protected long runDisruptorPass() throws InterruptedException { MutableLong value = this.value; final CountDownLatch latch = new CountDownLatch(1); long expectedCount = ringBuffer.getMinimumGatingSequence() + ITERATIONS; handler.reset(latch, expectedCount); long start = System.currentTimeMillis(); final RingBuffer<ValueEvent> rb = ringBuffer; for (long l = 0; l < ITERATIONS; l++) { value.set(l); rb.publishEvent(Translator.INSTANCE, value); } latch.await(); long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start); waitForEventProcessorSequence(expectedCount); failIfNot(expectedResult, handler.getValue()); return opsPerSecond; }
@Override protected long runDisruptorPass() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); long expectedCount = batchEventProcessor.getSequence().get() + ITERATIONS * BATCH_SIZE; handler.reset(latch, expectedCount); executor.submit(batchEventProcessor); long start = System.currentTimeMillis(); final RingBuffer<ValueEvent> rb = ringBuffer; for (long i = 0; i < ITERATIONS; i++) { long hi = rb.next(BATCH_SIZE); long lo = hi - (BATCH_SIZE - 1); for (long l = lo; l <= hi; l++) { rb.get(l).setValue(i); } rb.publish(lo, hi); } latch.await(); long opsPerSecond = (BATCH_SIZE * ITERATIONS * 1000L) / (System.currentTimeMillis() - start); waitForEventProcessorSequence(expectedCount); batchEventProcessor.halt(); failIfNot(expectedResult, handler.getValue()); return opsPerSecond; }
@Override protected long runDisruptorPass() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); long expectedCount = poller.getSequence().get() + ITERATIONS; pollRunnable.reset(latch, expectedCount); executor.submit(pollRunnable); long start = System.currentTimeMillis(); final RingBuffer<ValueEvent> rb = ringBuffer; for (long i = 0; i < ITERATIONS; i++) { long next = rb.next(); rb.get(next).setValue(i); rb.publish(next); } latch.await(); long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start); waitForEventProcessorSequence(expectedCount); pollRunnable.halt(); failIfNot(expectedResult, pollRunnable.getValue()); return opsPerSecond; }
@Override protected long runDisruptorPass() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); long expectedCount = batchEventProcessor.getSequence().get() + ITERATIONS; handler.reset(latch, expectedCount); executor.submit(batchEventProcessor); long start = System.currentTimeMillis(); final RingBuffer<ValueEvent> rb = ringBuffer; for (long i = 0; i < ITERATIONS; i++) { long next = rb.next(); rb.get(next).setValue(i); rb.publish(next); } latch.await(); long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start); waitForEventProcessorSequence(expectedCount); batchEventProcessor.halt(); failIfNot(expectedResult, handler.getValue()); return opsPerSecond; }
@Override protected long runDisruptorPass() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); long expectedCount = batchEventProcessor.getSequence().get() + ITERATIONS; handler.reset(latch, ITERATIONS); executor.submit(batchEventProcessor); long start = System.currentTimeMillis(); final RingBuffer<long[]> rb = ringBuffer; for (long i = 0; i < ITERATIONS; i++) { long next = rb.next(); long[] event = rb.get(next); for (int j = 0; j < event.length; j++) { event[j] = i; } rb.publish(next); } latch.await(); long opsPerSecond = (ITERATIONS * ARRAY_SIZE * 1000L) / (System.currentTimeMillis() - start); waitForEventProcessorSequence(expectedCount); batchEventProcessor.halt(); PerfTestUtil.failIf(0, handler.getValue()); return opsPerSecond; }
@Override protected long runDisruptorPass() throws Exception { byte[] data = this.data; final CountDownLatch latch = new CountDownLatch(1); long expectedCount = processor.getSequence().get() + ITERATIONS; handler.reset(latch, ITERATIONS); executor.execute(processor); long start = System.currentTimeMillis(); final RingBuffer<ByteBuffer> rb = buffer; for (long i = 0; i < ITERATIONS; i++) { long next = rb.next(); ByteBuffer event = rb.get(next); event.clear(); event.put(data); event.flip(); rb.publish(next); } latch.await(); long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start); waitForEventProcessorSequence(expectedCount); processor.halt(); return opsPerSecond; }
public ValueBatchPublisher( final CyclicBarrier cyclicBarrier, final RingBuffer<ValueEvent> ringBuffer, final long iterations, final int batchSize) { this.cyclicBarrier = cyclicBarrier; this.ringBuffer = ringBuffer; this.iterations = iterations; this.batchSize = batchSize; }
public ValuePublisher( final CyclicBarrier cyclicBarrier, final RingBuffer<ValueEvent> ringBuffer, final long iterations) { this.cyclicBarrier = cyclicBarrier; this.ringBuffer = ringBuffer; this.iterations = iterations; }
public LongArrayPublisher( final CyclicBarrier cyclicBarrier, final RingBuffer<long[]> ringBuffer, final long iterations, final long arraySize) { this.cyclicBarrier = cyclicBarrier; this.ringBuffer = ringBuffer; this.iterations = iterations; this.arraySize = arraySize; }
public static void main(String[] args) throws Exception { int batchSize = 40; RingBuffer<BatchedPoller.DataEvent<Object>> ringBuffer = RingBuffer.createMultiProducer(BatchedPoller.DataEvent.factory(), 1024); BatchedPoller<Object> poller = new BatchedPoller<Object>(ringBuffer, batchSize); Object value = poller.poll(); // Value could be null if no events are available. if (null != value) { // Process value. } }