private boolean replay(final boolean unbounded) { Sequence replayedSequence; MutableSignal<T> signal; while ((replayedSequence = processor.cancelledSequences.poll()) != null) { signal = processor.ringBuffer.get(replayedSequence.get() + 1L); try { if (signal.value == null) { barrier.waitFor(replayedSequence.get() + 1L); } readNextEvent(signal, unbounded); RingBufferSubscriberUtils.routeOnce(signal, subscriber); processor.ringBuffer.removeGatingSequence(replayedSequence); } catch (TimeoutException | InterruptedException | AlertException | CancelException ce) { processor.ringBuffer.removeGatingSequence(sequence); processor.cancelledSequences.add(replayedSequence); return true; } } return false; }
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException { if(cursorSequence.get() < sequence) { this.lock.lock(); try { while(cursorSequence.get() < sequence) { barrier.checkAlert(); this.processorNotifyCondition.await(); } } finally { this.lock.unlock(); } } long availableSequence; while((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); LockSupport.parkNanos(1L); } return availableSequence; }
private synchronized List<Object> getConsumeBatch() throws AlertException, InterruptedException, TimeoutException { long endCursor = getAvailableConsumeCursor(); long currCursor = _consumer.get(); long eventNumber = endCursor - currCursor; List<Object> batch = new ArrayList<>((int) eventNumber); for (long curr = currCursor + 1; curr <= endCursor; curr++) { try { MutableObject mo = _buffer.get(curr); Object o = mo.o; mo.setObject(null); batch.add(o); } catch (Exception e) { LOG.error(e.getMessage(), e); throw new RuntimeException(e); } } _consumer.set(endCursor); return batch; }
public long waitFor( long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier ) throws AlertException, InterruptedException, TimeoutException { long availableSequence; if ((availableSequence = cursor.get()) < sequence) { flush(); synchronized (lock) { ++numWaiters; while ((availableSequence = cursor.get()) < sequence) { if (state == State.STOPPED) { disruptor.halt(); throw AlertException.INSTANCE; } barrier.checkAlert(); //*/ lock.wait(); /*/ Thread.sleep(1); //*/ } --numWaiters; } } while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); } return availableSequence; }
private void readNextEvent(MutableSignal<T> event, final boolean unbounded) throws AlertException { //if event is Next Signal we need to handle backpressure (pendingRequests) if (event.type == MutableSignal.Type.NEXT) { if (event.value == null) { return; } //if bounded and out of capacity if (!unbounded && pendingRequest.addAndGet(-1l) < 0l) { //re-add the retained capacity pendingRequest.incrementAndGet(); //if current sequence does not yet match the published one //if (nextSequence < cachedAvailableSequence) { //pause until request while (pendingRequest.addAndGet(-1l) < 0l) { pendingRequest.incrementAndGet(); if (!running.get()) throw CancelException.INSTANCE; //Todo Use WaitStrategy? LockSupport.parkNanos(1l); } } } else if (event.type != null) { //Complete or Error are terminal events, we shutdown the processor and process the signal running.set(false); RingBufferSubscriberUtils.route(event, subscriber); Subscription s = processor.upstreamSubscription; if (s != null) { s.cancel(); } throw CancelException.INSTANCE; } }
@Override public long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException, TimeoutException { long availableSequence; while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); LockSupport.parkNanos(parkFor); } return availableSequence; }
@Override public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; while ((availableSequence = dependentSequence.get()) < sequence) { SpinHint.spinLoopHint(); barrier.checkAlert(); } return availableSequence; }
/** * {@inheritDoc} */ @Override public long waitFor(final long sequence, final Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException, TimeoutException { return cursor.get(); }
public void asyncConsumeBatchToCursor(EventHandler<Object> handler) throws AlertException, InterruptedException, TimeoutException { List<Object> batch = getConsumeBatch(); if (batch == null) return; for (int i = 0; i < batch.size(); i++) { try { handler.onEvent(batch.get(i), 0, i == (batch.size() - 1)); } catch (Exception e) { LOG.error(e.getMessage(), e); throw new RuntimeException(e); } } }
public static <T> boolean waitRequestOrTerminalEvent( Sequence pendingRequest, RingBuffer<MutableSignal<T>> ringBuffer, SequenceBarrier barrier, Subscriber<? super T> subscriber, AtomicBoolean isRunning ) { final long waitedSequence = ringBuffer.getCursor() + 1L; try { MutableSignal<T> event = null; while (pendingRequest.get() < 0l) { //pause until first request if (event == null) { barrier.waitFor(waitedSequence); event = ringBuffer.get(waitedSequence); if (event.type == MutableSignal.Type.COMPLETE) { try { subscriber.onComplete(); return false; } catch (Throwable t) { Exceptions.throwIfFatal(t); subscriber.onError(t); return false; } } else if (event.type == MutableSignal.Type.ERROR) { subscriber.onError(event.error); return false; } } else { barrier.checkAlert(); } LockSupport.parkNanos(1l); } } catch (TimeoutException te) { //ignore } catch (AlertException ae) { if (!isRunning.get()) { return false; } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } return true; }
@Override public long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException, TimeoutException { return currentStrategy.waitFor(sequence, cursor, dependentSequence, barrier); }
@SuppressWarnings("unchecked") public static void main(String[] args) throws AlertException, InterruptedException, TimeoutException { // Executor that will be used to construct new threads for consumers ExecutorService executor = Executors.newCachedThreadPool(); try { // The factory for the event EventFactory<Message> factory = new MessageFactory(); // Specify the size of the ring buffer, must be power of 2 int bufferSize = 1024; // Construct the Disruptor with a SingleProducerSequencer Disruptor<Message> disruptor = new Disruptor<Message>( factory, bufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy()); try { // Connect the handlers disruptor .handleEventsWith(new MessageConsumer1()) .then(new MessageConsumer2()); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing RingBuffer<Message> ringBuffer = disruptor.getRingBuffer(); MessageProducer producer = new MessageProducer(ringBuffer); for (int i = 0; i < 1000; i++) { producer.onData("content_" + i); } } finally { disruptor.shutdown(); } } finally { executor.shutdown(); } }
private long getAvailableConsumeCursor() throws AlertException, InterruptedException, TimeoutException { final long nextSequence = _consumer.get() + 1; return _barrier.waitFor(nextSequence); }
@Override public List<Object> retreiveAvailableBatch() throws AlertException, InterruptedException, TimeoutException { // get all events in disruptor queue return getConsumeBatch(); }