@SuppressWarnings("unchecked") public SharedMessageStore(MessageDao messageDao, int bufferSize, int maxDbBatchSize) { pendingMessages = new ConcurrentHashMap<>(); ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("DisruptorMessageStoreThread-%d").build(); disruptor = new Disruptor<>(DbOperation.getFactory(), bufferSize, namedThreadFactory, ProducerType.MULTI, new SleepingWaitStrategy()); disruptor.setDefaultExceptionHandler(new LogExceptionHandler()); disruptor.handleEventsWith(new DbEventMatcher(bufferSize)) .then(new DbWriter(messageDao, maxDbBatchSize)) .then((EventHandler<DbOperation>) (event, sequence, endOfBatch) -> event.clear()); disruptor.start(); this.messageDao = messageDao; }
private static WaitStrategy createWaitStrategy() { final String strategy = System.getProperty("AsyncLogger.WaitStrategy"); LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy); if ("Sleep".equals(strategy)) { LOGGER.debug("disruptor event handler uses SleepingWaitStrategy"); return new SleepingWaitStrategy(); } else if ("Yield".equals(strategy)) { LOGGER.debug("disruptor event handler uses YieldingWaitStrategy"); return new YieldingWaitStrategy(); } else if ("Block".equals(strategy)) { LOGGER.debug("disruptor event handler uses BlockingWaitStrategy"); return new BlockingWaitStrategy(); } LOGGER.debug("disruptor event handler uses SleepingWaitStrategy"); return new SleepingWaitStrategy(); }
@SuppressWarnings({"deprecation", "unchecked", "varargs"}) private DisruptorEventQueue() { // Create new Disruptor for processing. Note that this uses a single thread for processing; this // ensures that the event handler can take unsynchronized actions whenever possible. disruptor = new Disruptor<DisruptorEvent>( new DisruptorEventFactory(), DISRUPTOR_BUFFER_SIZE, Executors.newSingleThreadExecutor(new DaemonThreadFactory("OpenCensus.Disruptor")), ProducerType.MULTI, new SleepingWaitStrategy()); disruptor.handleEventsWith(new DisruptorEventHandler()); disruptor.start(); ringBuffer = disruptor.getRingBuffer(); }
@SuppressWarnings("unchecked") public HttpRequestRecordManager() { this.disruptor = new Disruptor<>(new HttpRequestRecordEventFactory(), 1024, Executors.defaultThreadFactory(), ProducerType.MULTI, new SleepingWaitStrategy()); this.recordStorage = new HttpRequestRecordMemoryStorage(); this.eventStat = new HttpRequestRecordEventStat(); this.consumer = new HttpRequestRecordEventHandler(this.recordStorage, this.eventStat); this.disruptor.handleEventsWith(this.consumer); this.logEntryManager = new LogEntryManager(); }
@SuppressWarnings("unchecked") public LogEntryManager() { this.disruptor = new Disruptor<>(new LogEntryEventFactory(), 1024, Executors.defaultThreadFactory(), ProducerType.MULTI, new SleepingWaitStrategy()); this.logEntryStorage = new LogEntryMemoryStorage(); this.logEntryEventStat = new LogEntryEventStat(); this.consumer = new LogEntryEventHandler(this.logEntryStorage, this.logEntryEventStat); this.disruptor.handleEventsWith(consumer); }
public static void main(String[] args) { final List<ValueEventHandler1PMC> handlers = new ArrayList<>(NUMBER_CONSUMERS); RingBuffer<ValueEvent> ringBuffer = RingBuffer.createSingleProducer( ValueEvent.EVENT_FACTORY, RING_SIZE, new SleepingWaitStrategy()); start = System.nanoTime(); //Create consumers for(int i = 0; i < NUMBER_CONSUMERS; i++) { ValueEventHandler1PMC handler = new ValueEventHandler1PMC(start, handlers); handlers.add(handler); SequenceBarrier barrier = ringBuffer.newBarrier(); BatchEventProcessor<ValueEvent> eventProcessor = new BatchEventProcessor<ValueEvent>( ringBuffer, barrier, handler); ringBuffer.addGatingSequences(eventProcessor.getSequence()); // Each EventProcessor can run on a separate thread EXECUTOR.submit(eventProcessor); } for(int i = 0; i < SAMPLES_SIZE; i++) { // Publishers claim events in sequence long sequence = ringBuffer.next(); ValueEvent event = ringBuffer.get(sequence); event.setValue(i); // this could be more complex with multiple fields // make the event available to EventProcessors ringBuffer.publish(sequence); } }
@Test public void test_All_WaitStrategies() { assertTrue(WaitStrategyType.BLOCKING.instance() instanceof BlockingWaitStrategy); assertTrue(WaitStrategyType.BUSY_SPIN.instance() instanceof BusySpinWaitStrategy); assertTrue(WaitStrategyType.LITE_BLOCKING.instance() instanceof LiteBlockingWaitStrategy); assertTrue(WaitStrategyType.SLEEPING_WAIT.instance() instanceof SleepingWaitStrategy); assertTrue(WaitStrategyType.YIELDING.instance() instanceof YieldingWaitStrategy); }
public static WaitStrategy createFromType(String name) { if ("BusySpin".equalsIgnoreCase(name)) { return new BusySpinWaitStrategy(); } else if ("Blocking".equalsIgnoreCase(name)) { return new BlockingWaitStrategy(); } else if ("Yielding".equalsIgnoreCase(name)) { return new YieldingWaitStrategy(); } else if ("Sleeping".equalsIgnoreCase(name)) { return new SleepingWaitStrategy(); } else { throw new IllegalArgumentException("Invalid or unsupported wait strategy type '" + name + "'"); } }
public synchronized void startProcessing() { if (!receivers.isEmpty()) { Boolean asyncEnabled = null; try { Element element = AnnotationHelper.getAnnotationElement(SiddhiConstants.ANNOTATION_CONFIG, SiddhiConstants.ANNOTATION_ELEMENT_ASYNC, streamDefinition.getAnnotations()); if (element != null) { asyncEnabled = SiddhiConstants.TRUE.equalsIgnoreCase(element.getValue()); } } catch (DuplicateAnnotationException e) { throw new QueryCreationException(e.getMessage() + " for the same Stream Definition " + streamDefinition.toString()); } if (asyncEnabled != null && asyncEnabled || asyncEnabled == null && publishers.size() > 1) { ProducerType producerType = ProducerType.SINGLE; if (publishers.size() > 1) { producerType = ProducerType.MULTI; } disruptor = new Disruptor<Event>(new SiddhiEventFactory(streamDefinition.getAttributeList().size()), bufferSize, executorService, producerType, new SleepingWaitStrategy()); for (Receiver receiver : receivers) { disruptor.handleEventsWith(new StreamHandler(receiver)); } ringBuffer = disruptor.start(); } } }
public synchronized void startProcessing() { Boolean asyncEnabled = null; try { Element element = AnnotationHelper.getAnnotationElement(SiddhiConstants.ANNOTATION_CONFIG, SiddhiConstants.ANNOTATION_ELEMENT_CALLBACK_ASYNC, query.getAnnotations()); if (element != null) { asyncEnabled = SiddhiConstants.TRUE.equalsIgnoreCase(element.getValue()); } } catch (DuplicateAnnotationException e) { throw new QueryCreationException(e.getMessage() + " for the same Query " + query.toString()); } if (asyncEnabled != null && asyncEnabled || asyncEnabled == null) { disruptor = new Disruptor<EventHolder>(new EventHolderFactory(), siddhiContext.getDefaultEventBufferSize(), siddhiContext.getExecutorService(), ProducerType.SINGLE, new SleepingWaitStrategy()); asyncEventHandler = new AsyncEventHandler(this); disruptor.handleEventsWith(asyncEventHandler); ringBuffer = disruptor.start(); } }
private static WaitStrategy createWaitStrategy() { final String strategy = System .getProperty("AsyncLoggerConfig.WaitStrategy"); LOGGER.debug("property AsyncLoggerConfig.WaitStrategy={}", strategy); if ("Sleep".equals(strategy)) { return new SleepingWaitStrategy(); } else if ("Yield".equals(strategy)) { return new YieldingWaitStrategy(); } else if ("Block".equals(strategy)) { return new BlockingWaitStrategy(); } return new SleepingWaitStrategy(); }
@SuppressWarnings("unchecked") public DisruptorWorker() { QueuedMapEventCollectionFactory factory = new QueuedMapEventCollectionFactory(); QueuedMapEventCollectionHandler handler = new QueuedMapEventCollectionHandler(); this.disruptor = new Disruptor<QueuedMapEventCollection>(factory, EXECUTOR, new MultiThreadedClaimStrategy(RING_SIZE), new SleepingWaitStrategy()); this.disruptor.handleEventsWith(handler); this.ringBuffer = disruptor.start(); }
private void useSensibleDefaults() { ringBufferSize = 2048; producerType = ProducerType.MULTI; waitStrategy = new SleepingWaitStrategy(); }
private void verifyDisruptorDefaults() throws Exception { verifyNew(Disruptor.class).withArguments(any(AvroEventFactory.class), eq(2048), any(ExecutorService.class), eq(ProducerType.MULTI), isA(SleepingWaitStrategy.class)); }
public static void main(String[] args) { RingBuffer<ValueEvent> ringBuffer = RingBuffer.createSingleProducer( ValueEvent.EVENT_FACTORY, RING_SIZE, new SleepingWaitStrategy()); start = System.nanoTime(); //Create first consumer ValueEventHandler1PMCSequenceFirst handler = new ValueEventHandler1PMCSequenceFirst(start); SequenceBarrier barrier; barrier = ringBuffer.newBarrier(); BatchEventProcessor<ValueEvent> firstEventProcessor = new BatchEventProcessor<ValueEvent>( ringBuffer, barrier, handler); // ringBuffer.addGatingSequences(firstEventProcessor.getSequence()); // Each EventProcessor can run on a separate thread EXECUTOR.submit(firstEventProcessor); //Create second consumer ValueEventHandler1PMCSequenceSecond handler2 = new ValueEventHandler1PMCSequenceSecond(start); SequenceBarrier barrier2 = ringBuffer.newBarrier(firstEventProcessor.getSequence()); BatchEventProcessor<ValueEvent> secondEventProcessor = new BatchEventProcessor<ValueEvent>( ringBuffer, barrier2, handler2); ringBuffer.addGatingSequences(secondEventProcessor.getSequence()); // Each EventProcessor can run on a separate thread EXECUTOR.submit(secondEventProcessor); for(int i = 0; i < SAMPLES_SIZE; i++) { // Publishers claim events in sequence long sequence = ringBuffer.next(); ValueEvent event = ringBuffer.get(sequence); event.setValue(i); // this could be more complex with multiple fields // make the event available to EventProcessors ringBuffer.publish(sequence); } }
@Override public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) { return new DisruptorMessageQueue(bufferSize, new SleepingWaitStrategy()); }