private boolean replay(final boolean unbounded) { Sequence replayedSequence; MutableSignal<T> signal; while ((replayedSequence = processor.cancelledSequences.poll()) != null) { signal = processor.ringBuffer.get(replayedSequence.get() + 1L); try { if (signal.value == null) { barrier.waitFor(replayedSequence.get() + 1L); } readNextEvent(signal, unbounded); RingBufferSubscriberUtils.routeOnce(signal, subscriber); processor.ringBuffer.removeGatingSequence(replayedSequence); } catch (TimeoutException | InterruptedException | AlertException | CancelException ce) { processor.ringBuffer.removeGatingSequence(sequence); processor.cancelledSequences.add(replayedSequence); return true; } } return false; }
@Override protected void doShutdown() throws IOException { // Shutdown the disruptor. Will stop after all entries have been processed. Make sure we // have stopped incoming appends before calling this else it will not shutdown. We are // conservative below waiting a long time and if not elapsed, then halting. if (this.disruptor != null) { long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000); try { this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { LOG.warn("Timed out bringing down disruptor after " + timeoutms + "ms; forcing halt " + "(It is a problem if this is NOT an ABORT! -- DATALOSS!!!!)"); this.disruptor.halt(); this.disruptor.shutdown(); } } if (LOG.isDebugEnabled()) { LOG.debug("Closing WAL writer in " + FSUtils.getPath(walDir)); } if (this.writer != null) { this.writer.close(); this.writer = null; } }
private synchronized List<Object> getConsumeBatch() throws AlertException, InterruptedException, TimeoutException { long endCursor = getAvailableConsumeCursor(); long currCursor = _consumer.get(); long eventNumber = endCursor - currCursor; List<Object> batch = new ArrayList<>((int) eventNumber); for (long curr = currCursor + 1; curr <= endCursor; curr++) { try { MutableObject mo = _buffer.get(curr); Object o = mo.o; mo.setObject(null); batch.add(o); } catch (Exception e) { LOG.error(e.getMessage(), e); throw new RuntimeException(e); } } _consumer.set(endCursor); return batch; }
@Override public void close() throws Exception { isAlive = false; ringBuffer.tryPublishEvent((container, sequence) -> container.clear()); try { disruptor.shutdown(5, TimeUnit.SECONDS); } catch (TimeoutException e) { logger.warn("Disruptor shutdown timeout....", e); disruptor.halt(); } }
/** * <p>Waits until all events currently in the disruptor have been processed by all event processors * and then halts the processors.</p> * <p> * <p>This method will not shutdown the executor, nor will it await the final termination of the * processor threads.</p> * * @param timeout the amount of time to wait for all events to be processed. <code>-1</code> will give an infinite timeout * @param timeUnit the unit the timeOut is specified in */ public void shutdown(final long timeout, final TimeUnit timeUnit) throws TimeoutException { final long timeOutAt = System.currentTimeMillis() + timeUnit.toMillis(timeout); while (hasBacklog()) { if (timeout >= 0 && System.currentTimeMillis() > timeOutAt) { throw TimeoutException.INSTANCE; } // Busy spin } halt(); }
@Test(expected = TimeoutException.class, timeout = 2000) public void shouldThrowTimeoutExceptionIfShutdownDoesNotCompleteNormally() throws Exception { //Given final DelayedEventHandler delayedEventHandler = createDelayedEventHandler(); disruptor.handleEventsWith(delayedEventHandler); publishEvent(); //When disruptor.shutdown(1, SECONDS); //Then }
public long waitFor( long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier ) throws AlertException, InterruptedException, TimeoutException { long availableSequence; if ((availableSequence = cursor.get()) < sequence) { flush(); synchronized (lock) { ++numWaiters; while ((availableSequence = cursor.get()) < sequence) { if (state == State.STOPPED) { disruptor.halt(); throw AlertException.INSTANCE; } barrier.checkAlert(); //*/ lock.wait(); /*/ Thread.sleep(1); //*/ } --numWaiters; } } while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); } return availableSequence; }
@Override public long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException, TimeoutException { long availableSequence; while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); LockSupport.parkNanos(parkFor); } return availableSequence; }
@Override public void awaitAndShutdown(long time) { try { LOG.debug("Disruptor {} is going to shutdown in {} {}", getThreadName(), time, TimeUnit.SECONDS); disruptor.shutdown(time, TimeUnit.SECONDS); LOG.info("Disruptor {} has shutdown after {} {}.", getThreadName(), time, TimeUnit.SECONDS); } catch (TimeoutException e) { LOG.error(e.getMessage(),e); } }
@Test public void test_awaitAndShutdown() throws TimeoutException, InterruptedException { mockDisruptor.shutdown(1, TimeUnit.SECONDS); replay(mockDisruptor); disruptorLifecycleManager.awaitAndShutdown(1); verify(mockDisruptor); }
@Test public void test_awaitAndShutdown_InterruptedException() throws TimeoutException, InterruptedException { mockDisruptor.shutdown(1, TimeUnit.SECONDS); replay(mockDisruptor); disruptorLifecycleManager.awaitAndShutdown(1); verify(mockDisruptor); }
@Test public void test_awaitAndShutdown_TimeoutException() throws TimeoutException, InterruptedException { mockDisruptor.shutdown(1, TimeUnit.SECONDS); expectLastCall().andThrow(TimeoutException.INSTANCE); replay(mockDisruptor); disruptorLifecycleManager.awaitAndShutdown(1); verify(mockDisruptor); }
/** * Waits until all events currently in the disruptor have been processed by all event processors * and then halts the processors. It is critical that publishing to the ring buffer has stopped * before calling this method, otherwise it may never return. */ public void stop() { try { disruptor.shutdown(OUTBOUND_DISRUPTOR_SHUTDOWN_WAIT_TIME, TimeUnit.SECONDS); } catch (TimeoutException e) { log.error("Outbound disruptor did not shut down properly."); } }
/** * {@inheritDoc} */ @Override public long waitFor(final long sequence, final Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException, TimeoutException { return cursor.get(); }
public void shutdownGracefully() throws IllegalStateException { if (disruptor == null) { throw new IllegalStateException("disruptor == null, call init"); } for (ExchangeEventProducer<T> producer : producers) { producer.stop(); } try { disruptor.shutdown(shutdownTimeout, TimeUnit.MINUTES); } catch (TimeoutException ex) { LOG.error(ex.getMessage()); } }
/** * Waits until all events currently in the disruptor have been processed by all event processors * and then halts the processors. It is critical that publishing to the ring buffer has stopped * before calling this method, otherwise it may never return. * * <p>This method will not shutdown the executor, nor will it await the final termination of the * processor threads.</p> */ public void shutdown() { try { shutdown(-1, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { exceptionHandler.handleOnShutdownException(e); } }
/** * Waits until all events currently in the disruptor have been processed by all event processors * and then halts the processors. * <p/> * <p>This method will not shutdown the executor, nor will it await the final termination of the * processor threads.</p> * * @param timeout the amount of time to wait for all events to be processed. <code>-1</code> will give an infinite timeout * @param timeUnit the unit the timeOut is specified in */ public void shutdown(final long timeout, final TimeUnit timeUnit) throws TimeoutException { long timeOutAt = System.currentTimeMillis() + timeUnit.toMillis(timeout); while (hasBacklog()) { if (timeout >= 0 && System.currentTimeMillis() > timeOutAt) { throw TimeoutException.INSTANCE; } // Busy spin } halt(); }
public void asyncConsumeBatchToCursor(EventHandler<Object> handler) throws AlertException, InterruptedException, TimeoutException { List<Object> batch = getConsumeBatch(); if (batch == null) return; for (int i = 0; i < batch.size(); i++) { try { handler.onEvent(batch.get(i), 0, i == (batch.size() - 1)); } catch (Exception e) { LOG.error(e.getMessage(), e); throw new RuntimeException(e); } } }
public static <T> boolean waitRequestOrTerminalEvent( Sequence pendingRequest, RingBuffer<MutableSignal<T>> ringBuffer, SequenceBarrier barrier, Subscriber<? super T> subscriber, AtomicBoolean isRunning ) { final long waitedSequence = ringBuffer.getCursor() + 1L; try { MutableSignal<T> event = null; while (pendingRequest.get() < 0l) { //pause until first request if (event == null) { barrier.waitFor(waitedSequence); event = ringBuffer.get(waitedSequence); if (event.type == MutableSignal.Type.COMPLETE) { try { subscriber.onComplete(); return false; } catch (Throwable t) { Exceptions.throwIfFatal(t); subscriber.onError(t); return false; } } else if (event.type == MutableSignal.Type.ERROR) { subscriber.onError(event.error); return false; } } else { barrier.checkAlert(); } LockSupport.parkNanos(1l); } } catch (TimeoutException te) { //ignore } catch (AlertException ae) { if (!isRunning.get()) { return false; } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } return true; }
@Override public long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException, TimeoutException { return currentStrategy.waitFor(sequence, cursor, dependentSequence, barrier); }
public static void main(final String[] args) throws Exception { final CommandLineArgs commandLineArgs = new CommandLineArgs(); new JCommander(commandLineArgs).parse(args); final Disruptor<Packet> packetDisruptor = new Disruptor<>(new Packet.Factory(commandLineArgs.getRecordLength()), commandLineArgs.getBufferSize(), newCachedThreadPool(DAEMON_THREAD_FACTORY), ProducerType.SINGLE, new SpinLoopHintBusySpinWaitStrategy()); final Overrides overrides = new Overrides(commandLineArgs); overrides.init(); final Journaller journaller = new Journaller(SYSTEM_NANO_TIMER, commandLineArgs, overrides.enableJournaller()); journaller.init(); final Histogram[] messageTransitTimeHistograms = new Histogram[commandLineArgs.getNumberOfIterations()]; setAll(messageTransitTimeHistograms, HISTOGRAMS::createHistogramForArray); final Histogram[] interMessageTimeHistograms = new Histogram[commandLineArgs.getNumberOfIterations()]; setAll(interMessageTimeHistograms, HISTOGRAMS::createHistogramForArray); packetDisruptor.handleEventsWith( runOnCpus(wrap(new Accumulator(messageTransitTimeHistograms, interMessageTimeHistograms, SYSTEM_NANO_TIMER, commandLineArgs)::process), "Accumulator", overrides.getAccumulatorThreadAffinity()), runOnCpus(wrap(journaller::process), "Journaller", overrides.getJournallerThreadAffinity())); packetDisruptor.start(); final InputReader inputReader = new InputReader(packetDisruptor.getRingBuffer(), SYSTEM_NANO_TIMER, commandLineArgs); if(commandLineArgs.runSpinners()) { System.out.println("Starting spinner threads to perturb the system"); Spinners.SPINNERS.start(); } System.out.println("Starting replay at " + new Date()); final Thread thread = DAEMON_THREAD_FACTORY.newThread(THREADS.runOnCpu(inputReader::processFiles, overrides.getProducerThreadAffinity())); thread.start(); try { thread.join(); System.out.println("Finished replay at " + new Date()); packetDisruptor.shutdown(1, TimeUnit.MINUTES); } catch (TimeoutException e) { throw new RuntimeException("Consumers did not process remaining events within timeout", e); } finally { Spinners.SPINNERS.stop(); packetDisruptor.halt(); } System.out.println("Pausing for 10 seconds..."); THREADS.sleep(10L, TimeUnit.SECONDS); }
@SuppressWarnings("unchecked") public static void main(String[] args) throws AlertException, InterruptedException, TimeoutException { // Executor that will be used to construct new threads for consumers ExecutorService executor = Executors.newCachedThreadPool(); try { // The factory for the event EventFactory<Message> factory = new MessageFactory(); // Specify the size of the ring buffer, must be power of 2 int bufferSize = 1024; // Construct the Disruptor with a SingleProducerSequencer Disruptor<Message> disruptor = new Disruptor<Message>( factory, bufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy()); try { // Connect the handlers disruptor .handleEventsWith(new MessageConsumer1()) .then(new MessageConsumer2()); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing RingBuffer<Message> ringBuffer = disruptor.getRingBuffer(); MessageProducer producer = new MessageProducer(ringBuffer); for (int i = 0; i < 1000; i++) { producer.onData("content_" + i); } } finally { disruptor.shutdown(); } } finally { executor.shutdown(); } }
private long getAvailableConsumeCursor() throws AlertException, InterruptedException, TimeoutException { final long nextSequence = _consumer.get() + 1; return _barrier.waitFor(nextSequence); }
@Override public List<Object> retreiveAvailableBatch() throws AlertException, InterruptedException, TimeoutException { // get all events in disruptor queue return getConsumeBatch(); }
/** * Waits until all events currently in the disruptor have been processed by all event processors * and then halts the processors. It is critical that publishing to the ring buffer has stopped * before calling this method, otherwise it may never return. * <p> * <p>This method will not shutdown the executor, nor will it await the final termination of the * processor threads.</p> */ public void shutdown() { try { shutdown(-1, TimeUnit.MILLISECONDS); } catch (final TimeoutException e) { exceptionHandler.handleOnShutdownException(e); } }