@Override public void startup() { EventBus eventBus = disruptorDispatchThread.getEventBus(); executorService = new NonOrderedQueuePoolExecutor(poolName, excutorSize); cycleEventHandler = new CycleEventHandler[excutorSize]; for(int i = 0; i < excutorSize; i++){ cycleEventHandler[i] = new CycleEventHandler(eventBus); } RingBuffer ringBuffer = disruptorDispatchThread.getRingBuffer(); workerPool = new WorkerPool(ringBuffer, ringBuffer.newBarrier(), new FatalExceptionHandler(), cycleEventHandler); ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); workerPool.start(executorService); // BatchEventProcessor<CycleEvent>[] batchEventProcessors = new BatchEventProcessor[excutorSize]; // for(int i = 0; i < excutorSize; i++){ // batchEventProcessors[i] = new BatchEventProcessor<>(ringBuffer, sequenceBarrier, cycleEventHandler[i]); // ringBuffer.addGatingSequences(batchEventProcessors[i].getSequence()); //// executorService.submit(batchEventProcessors[i]); // } }
@Test public void shouldOnlyApplyExceptionsHandlersSpecifiedViaHandleExceptionsWithOnNewEventProcessors() throws Exception { AtomicReference<Throwable> eventHandled = new AtomicReference<Throwable>(); ExceptionHandler exceptionHandler = new StubExceptionHandler(eventHandled); RuntimeException testException = new RuntimeException(); ExceptionThrowingEventHandler handler = new ExceptionThrowingEventHandler(testException); disruptor.handleExceptionsWith(exceptionHandler); disruptor.handleEventsWith(handler); disruptor.handleExceptionsWith(new FatalExceptionHandler()); publishEvent(); final Throwable actualException = waitFor(eventHandled); assertSame(testException, actualException); }
public RpcServer build() { final int workerThreads = 3; Producer<String, Response> responseProducer = new KafkaProducer<>(producerProps, new StringSerializer(), producerValueSerializer); final ServerEventHandler[] workHandlers = new ServerEventHandler[workerThreads]; IntStream.range(0, workerThreads).forEach( nbr -> workHandlers[nbr] = new ServerEventHandler(requestHandler, responseProducer) ); final WorkerPool<ServerEvent> workerPool = new WorkerPool<>(ServerEvent::new, new FatalExceptionHandler(), workHandlers); RequestConsumer requestConsumer = new RequestConsumer(topic, consumerProps, consumerThreads, consumerValueDeserializer); return new KafkaRpcServer(workerPool, requestConsumer, new ServerEventHandler(requestHandler, responseProducer), workerThreads); }
private static void bulkLoad( final Cache cache, long entriesToLoad,double[] bbox) { final AtomicLong atomicLong = new AtomicLong(); final ValueHandler[] valueHandlers = new ValueHandler[]{ new ValueHandler(cache, atomicLong), new ValueHandler(cache, atomicLong), new ValueHandler(cache, atomicLong), new ValueHandler(cache, atomicLong) }; final ExecutorService executorService = Executors.newCachedThreadPool(); try { final RingBuffer<ValueEvent> ringBuffer = RingBuffer.createSingleProducer(ValueEvent.EVENT_FACTORY, 4096, new YieldingWaitStrategy()); final WorkerPool<ValueEvent> workerPool = new WorkerPool<ValueEvent>(ringBuffer, ringBuffer.newBarrier(), new FatalExceptionHandler(), valueHandlers); workerPool.start(executorService); try { publishLoadEvents(ringBuffer,entriesToLoad,bbox); } finally { workerPool.drainAndHalt(); } } finally { executorService.shutdown(); } System.out.format("Put %d elements.", atomicLong.get()); }
private Disruptor<ProxyMethodInvocation> createDisruptor(final ExecutorService executor, final int ringBufferSize) { final Disruptor<ProxyMethodInvocation> disruptor = new Disruptor<ProxyMethodInvocation>(new RingBufferProxyEventFactory(), ringBufferSize, executor); disruptor.handleExceptionsWith(new FatalExceptionHandler()); return disruptor; }