private void initDisruptor(int processors, int ringBufferSize) { LOG.info("eds client init disruptor with processors=" + processors + " and ringBufferSize=" + ringBufferSize); executor = Executors.newFixedThreadPool( processors, new ThreadFactoryBuilder().setNameFormat("disruptor-executor-%d").build()); final WaitStrategy waitStrategy = createWaitStrategy(); ringBufferSize = sizeFor(ringBufferSize); // power of 2 disruptor = new Disruptor<>(EdsRingBufferEvent.FACTORY, ringBufferSize, executor, ProducerType.MULTI, waitStrategy); EdsEventWorkHandler[] handlers = new EdsEventWorkHandler[processors]; for (int i = 0; i < handlers.length; i++) { handlers[i] = new EdsEventWorkHandler(); } // handlers number = threads number disruptor.handleEventsWithWorkerPool(handlers); // "handleEventsWith" just like topics , with multiple consumers disruptor.start(); }
@SuppressWarnings("unchecked") public SharedMessageStore(MessageDao messageDao, int bufferSize, int maxDbBatchSize) { pendingMessages = new ConcurrentHashMap<>(); ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("DisruptorMessageStoreThread-%d").build(); disruptor = new Disruptor<>(DbOperation.getFactory(), bufferSize, namedThreadFactory, ProducerType.MULTI, new SleepingWaitStrategy()); disruptor.setDefaultExceptionHandler(new LogExceptionHandler()); disruptor.handleEventsWith(new DbEventMatcher(bufferSize)) .then(new DbWriter(messageDao, maxDbBatchSize)) .then((EventHandler<DbOperation>) (event, sequence, endOfBatch) -> event.clear()); disruptor.start(); this.messageDao = messageDao; }
/** * create a disruptor * * @param name The title of the disruptor * @param ringBufferSize The size of ringBuffer */ public void createDisruptor(String name, int ringBufferSize) { if (DISRUPTOR_MAP.keySet().contains(name)) { throw new NulsRuntimeException(ErrorCode.FAILED, "create disruptor faild,the name is repetitive!"); } Disruptor<DisruptorEvent> disruptor = new Disruptor<DisruptorEvent>(EVENT_FACTORY, ringBufferSize, new NulsThreadFactory(ModuleService.getInstance().getModuleId(EventBusModuleBootstrap.class),name), ProducerType.SINGLE, new SleepingWaitStrategy()); disruptor.handleEventsWith(new EventHandler<DisruptorEvent>() { @Override public void onEvent(DisruptorEvent disruptorEvent, long l, boolean b) throws Exception { Log.debug(disruptorEvent.getData() + ""); } }); DISRUPTOR_MAP.put(name, disruptor); }
/** * add the data obj to the disruptor named the field name * * @param name * @param obj */ public void offer(String name, Object obj) { Disruptor<DisruptorEvent> disruptor = DISRUPTOR_MAP.get(name); AssertUtil.canNotEmpty(disruptor, "the disruptor is not exist!name:" + name); RingBuffer<DisruptorEvent> ringBuffer = disruptor.getRingBuffer(); //请求下一个事件序号; long sequence = ringBuffer.next(); try { //获取该序号对应的事件对象; DisruptorEvent event = ringBuffer.get(sequence); event.setData(obj); } finally { //发布事件; ringBuffer.publish(sequence); } }
@Bean public JmsMessageSender responseMessageSender(@Value("${jms-sender.ring-buffer-size}")int ringBufferSize) throws JMSException { DisruptorJmsMessageSender disruptorJmsMessageSender = DisruptorJmsMessageSenderFactory.create( responseSession(), responseMessageProducer(), new ArtemisMessageDtoDupMessageDetectStrategy(), ringBufferSize ); Disruptor disruptor = disruptorJmsMessageSender.getDisruptor(); BeanRegisterUtils.registerSingleton( applicationContext, "responseMessageSenderLifeCycleContainer", new DisruptorLifeCycleContainer("responseMessageSender", disruptor, Ordered.LOWEST_PRECEDENCE) ); return disruptorJmsMessageSender; }
@Setup public void setup() { executor = Executors.newSingleThreadExecutor(); disruptor = new Disruptor<LongEvent>(LongEvent.EVENT_FACTORY, executor, new SingleThreadedClaimStrategy(Run.QUEUE_SIZE), new BusySpinWaitStrategy()); eventCount = new AtomicInteger(); handler = (event, sequence, endOfBatch) -> { if(Run.LONGVAL == event.getValue()) { eventCount.incrementAndGet(); } else { throw new RuntimeException("Failed."); } }; disruptor.handleEventsWith(handler); ringBuffer = disruptor.start(); }
@Setup public void setup() { executor = Executors.newFixedThreadPool(Run.NTHREAD); disruptor = new Disruptor<LongEvent>(LongEvent.EVENT_FACTORY, executor, new MultiThreadedClaimStrategy(Run.QUEUE_SIZE), new BusySpinWaitStrategy()); eventCount = new AtomicInteger(); handler = (event, sequence, endOfBatch) -> { if(Run.LONGVAL == event.getValue()) { eventCount.incrementAndGet(); } else { throw new RuntimeException("Failed."); } }; disruptor.handleEventsWith(handler); ringBuffer = disruptor.start(); }
public static void main(String[] args) throws InterruptedException { Disruptor<ObjectBox> disruptor = new Disruptor<ObjectBox>( ObjectBox.FACTORY, RING_SIZE, Executors.newCachedThreadPool(), ProducerType.MULTI, new BlockingWaitStrategy()); disruptor.handleEventsWith(new Consumer()).then(new Consumer()); final RingBuffer<ObjectBox> ringBuffer = disruptor.getRingBuffer(); Publisher p = new Publisher(); IMessage message = new IMessage(); ITransportable transportable = new ITransportable(); String streamName = "com.lmax.wibble"; System.out.println("publishing " + RING_SIZE + " messages"); for (int i = 0; i < RING_SIZE; i++) { ringBuffer.publishEvent(p, message, transportable, streamName); Thread.sleep(10); } System.out.println("start disruptor"); disruptor.start(); System.out.println("continue publishing"); while (true) { ringBuffer.publishEvent(p, message, transportable, streamName); Thread.sleep(10); } }
@SuppressWarnings("unchecked") private RingBufferService(final Properties config) { config.put(KafkaProducerService.CONFIG_PREFIX + "value.serializer", ByteBufSerde.ByteBufSerializer.class.getName()); config.put(KafkaProducerService.CONFIG_PREFIX + "key.serializer", Serdes.String().serializer()); producer = KafkaProducerService.getInstance(config); bufferSize = ConfigurationHelper.getIntSystemThenEnvProperty(RB_CONF_BUFFER_SIZE, RB_DEFAULT_BUFFER_SIZE, config); targetTopic = ConfigurationHelper.getSystemThenEnvProperty(RB_CONF_TOPIC_NAME, RB_DEFAULT_TOPIC_NAME, config); eventBuffInitSize = ConfigurationHelper.getIntSystemThenEnvProperty(RB_CONF_BUFF_INITIAL_SIZE, RB_DEFAULT_BUFF_INITIAL_SIZE, config); shutdownTimeout = ConfigurationHelper.getIntSystemThenEnvProperty(RB_CONF_STOP_TIMEOUT, RB_DEFAULT_STOP_TIMEOUT, config); waitStrategy = RBWaitStrategy.getConfiguredStrategy(config); disruptor = new Disruptor<ByteBuf>(this, bufferSize, threadFactory, ProducerType.MULTI, waitStrategy); disruptor.handleEventsWith(this); // FIXME: need to able to supply several handlers disruptor.start(); ringBuffer = disruptor.getRingBuffer(); log.info("<<<<< RawRingBufferDispatcher Started."); }
private static void dealWithDisruptorFromTail(ListenerChain chain, Disruptor<ElectronsHolder> disruptor) { if (idOnly(chain.getId(), chain.getAfter())) { return; } List<ListenerChain> befores = chain.getBefores(); if (CollectionUtils.isEmpty(befores)) { return; } for (ListenerChain c : befores) { dealWithDisruptorFromTail(c, disruptor); } ProxyHandler[] handlers = new ProxyHandler[befores.size()]; for (int i = 0; i < befores.size(); i++) { handlers[i] = befores.get(i).getProxyHandler(); } disruptor.after(handlers).handleEventsWith(chain.getProxyHandler()); }
@SuppressWarnings("unchecked") public DisruptorTransfer(final SpanEventHandler spanEventHandler, final int buffSize) { // Executor executor = Executors.newCachedThreadPool(); final ThreadFactory threadFactory = Executors.defaultThreadFactory(); // The factory for the event final SpanEventFactory factory = new SpanEventFactory(); // Specify the size of the ring buffer, must be power of 2. final int bufferSize = buffSize; // Construct the Disruptor disruptor = new Disruptor<SpanEvent>(factory, bufferSize, threadFactory); // Connect the handler // disruptor.handleEventsWith(new // SpanEventHandler("http://localhost:9080/upload")); disruptor.handleEventsWith(spanEventHandler); // Start the Disruptor, starts all threads running disruptor.start(); final RingBuffer<SpanEvent> ringBuffer = disruptor.getRingBuffer(); producer = new SpanEventProducer(ringBuffer); }
private void initialize() { logger.info("Start initialize ots writer, table name: {}.", tableName); DescribeTableRequest request = new DescribeTableRequest(); request.setTableName(tableName); OTSFuture<DescribeTableResult> result = ots.describeTable(request); DescribeTableResult res = result.get(); this.tableMeta = res.getTableMeta(); logger.info("End initialize with table meta: {}.", tableMeta); RowChangeEvent.RowChangeEventFactory factory = new RowChangeEvent.RowChangeEventFactory(); // start flush thread, we only need one event handler, so we just set a thread pool with fixed size 1. disruptorExecutor = Executors.newFixedThreadPool(1); disruptor = new Disruptor<RowChangeEvent>(factory, writerConfig.getBufferSize(), disruptorExecutor); ringBuffer = disruptor.getRingBuffer(); eventHandler = new RowChangeEventHandler(ots, writerConfig, callback, executor); disruptor.handleEventsWith(eventHandler); disruptor.start(); // start flush timer startFlushTimer(writerConfig.getFlushInterval()); }
AuditReporter(int queueSize, long timeBucketIntervalInSec, int reportFreqMsgCount, int reportFreqIntervalSec, boolean combineMetricsAmongHosts) { reportExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat(getType() + "-audit-reporter-%d") .build());; queueSize = Util.ceilingNextPowerOfTwo(queueSize); disruptor = new Disruptor<AuditMsgReportTask>(new AuditMsgReportTaskFactory(), queueSize, reportExecutor); disruptor.handleEventsWith(new AuditMsgReportTaskHandler(this)); ringBuffer = disruptor.getRingBuffer(); aggregator = new AuditAggregator(timeBucketIntervalInSec, reportFreqMsgCount, reportFreqIntervalSec, combineMetricsAmongHosts); SUBMITTED_COUNTER = Metrics.getRegistry().meter(getType() + ".auditReporter.submittedNumber"); FAILED_TO_SUBMIT_COUNTER = Metrics.getRegistry().meter(getType() + ".auditReporter.failedToSubmitNumber"); REPORTED_COUNTER = Metrics.getRegistry().meter(getType() + ".auditReporter.reportedNumber"); FAILED_TO_REPORT_COUNTER = Metrics.getRegistry().meter(getType() + ".auditReporter.failedToReportNumber"); Metrics.getRegistry().register(getType() + ".auditReporter.queueSize", new Gauge<Integer>() { @Override public Integer getValue() { return (int) disruptor.getRingBuffer().remainingCapacity(); } }); }
private Disruptor<ExchangeEvent> getCurrentDisruptor() throws DisruptorNotStartedException { Disruptor<ExchangeEvent> currentDisruptor = disruptor.getReference(); if (currentDisruptor == null) { // no current Disruptor reference, we may be reconfiguring or it was not started // check which by looking at the reference mark... boolean[] changeIsPending = new boolean[1]; while (currentDisruptor == null) { currentDisruptor = disruptor.get(changeIsPending); //Check if we are reconfiguring if (currentDisruptor == null && !changeIsPending[0]) { throw new DisruptorNotStartedException( "Disruptor is not yet started or already shut down."); } else if (currentDisruptor == null && changeIsPending[0]) { //We should be back shortly...keep trying but spare CPU resources LockSupport.parkNanos(1L); } } } return currentDisruptor; }
/** * Start dispatching events * * @return a Future that asyncrhonously notifies an observer when this EventReactor is ready */ @SuppressWarnings("unchecked") public CompletableFuture<? extends EventReactor<T>> open() { if (isRunning.compareAndSet(false, true)) { CompletableFuture<EventReactor<T>> future = new CompletableFuture<>(); this.disruptor = new Disruptor<>(EVENT_FACTORY, ringSize, threadFactory, ProducerType.MULTI, new BusySpinWaitStrategy()); this.disruptor.handleEventsWith(this::handleEvent); this.ringBuffer = disruptor.start(); // Starts a timer thread this.timer = new Timer(); future.complete(this); return future; } else { return CompletableFuture.completedFuture(this); } }
/** * Create a new source. * <p>This method will prepare the instance with some needed variables * in order to be started later with the start method (implemented by children). * * @param parsersManager Instance of ParserManager that will serve parsers to this source instance * @param eventHandler Instance of EventHandler that will receive the events generated by this source instance * @param properties Map of properties associated with this source */ public Source(ParsersManager parsersManager, EventHandler eventHandler, Map<String, Object> properties) { // Save the references for later use this.parsersManager = parsersManager; this.properties = properties; // Create the ring buffer for this topic and start it Disruptor<MapEvent> disruptor = new Disruptor<>(new MapEventFactory(), ConfigData.getRingBufferSize(), Executors.newCachedThreadPool()); disruptor.handleEventsWith(eventHandler); disruptor.start(); // Create the event producer that will receive the events produced by // this source instance eventProducer = new EventProducer(disruptor.getRingBuffer()); prepare(); }
@Provides @Singleton @Named("disruptor") @SuppressWarnings("Unchecked") protected Disruptor<TwoPhaseEvent<T>> inputDisruptor(Provider<WorkHandler<TwoPhaseEvent<T>>> workHandlerProvider, Provider<EventHandler<TwoPhaseEvent<T>>> evenHandlerProvider) { Disruptor<TwoPhaseEvent<T>> disruptor = new Disruptor<>( TwoPhaseEvent.factory(outputEventFactory()), options.ringSize(), threadsProvider, ProducerType.SINGLE, new SleepingWaitStrategy()); WorkHandler<TwoPhaseEvent<T>>[] parsers = new WorkHandler[options.threads()]; for (int i = 0; i < options.threads(); i++) { parsers[i] = workHandlerProvider.get(); } disruptor.handleExceptionsWith(new FatalExceptionHandler()); disruptor.handleEventsWithWorkerPool(parsers).then(evenHandlerProvider.get()); return disruptor; }
@Provides @Singleton @Named("firstPassDisruptor") public Disruptor<TwoPhaseEvent<Void>> firstPassDisruptor( Provider<WorkHandler<TwoPhaseEvent<Void>>> statisticsHandlerProvider) { Disruptor<TwoPhaseEvent<Void>> disruptor = new Disruptor<>( TwoPhaseEvent.factory(() -> null), options.ringSize(), threadsProvider, ProducerType.SINGLE, new SleepingWaitStrategy()); WorkHandler<TwoPhaseEvent<Void>>[] parsers = new WorkHandler[options.threads()]; for (int i = 0; i < options.threads(); i++) { parsers[i] = statisticsHandlerProvider.get(); } disruptor.handleExceptionsWith(new FatalExceptionHandler()); disruptor.handleEventsWithWorkerPool(parsers); return disruptor; }
@Provides @Singleton @Named("secondPassDisruptor") public Disruptor<TwoPhaseEvent<SparseItem>> secondPassDisruptor( Provider<WorkHandler<TwoPhaseEvent<SparseItem>>> binningHandlerProvider, Provider<EventHandler<TwoPhaseEvent<SparseItem>>> outputHandlerProvider) { Disruptor<TwoPhaseEvent<SparseItem>> disruptor = new Disruptor<>( TwoPhaseEvent.factory(SparseItem::new), options.ringSize(), threadsProvider, ProducerType.SINGLE, new SleepingWaitStrategy()); WorkHandler<TwoPhaseEvent<SparseItem>>[] parsers = new WorkHandler[options.threads()]; for (int i = 0; i < options.threads(); i++) { parsers[i] = binningHandlerProvider.get(); } disruptor.handleExceptionsWith(new FatalExceptionHandler()); disruptor.handleEventsWithWorkerPool(parsers) .then(outputHandlerProvider.get()); return disruptor; }
@SuppressWarnings("unchecked") static void qndSingleThread(int numItems) { final AtomicLong COUNTER_RECEIVED = new AtomicLong(0); final Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent.FACTORY, 128, Executors.defaultThreadFactory(), ProducerType.MULTI, new YieldingWaitStrategy()); disruptor.handleEventsWith( (event, sequence, endOfBatch) -> COUNTER_RECEIVED.incrementAndGet()); disruptor.start(); final long t = System.currentTimeMillis(); for (int i = 0; i < numItems; i++) { disruptor.publishEvent((event, seq) -> event.set(seq)); } long d = System.currentTimeMillis() - t; NumberFormat nf = NumberFormat.getInstance(); System.out.println("========== qndSingleThread:"); System.out.println("Sent: " + nf.format(numItems) + " / Received: " + nf.format(COUNTER_RECEIVED.get()) + " / Duration: " + d + " / Speed: " + NumberFormat.getInstance().format((numItems * 1000.0 / d)) + " items/sec"); disruptor.shutdown(); }
public PersonHelper() { //参数1 事件 //参数2 单线程使用 //参数3 等待策略 EventFactory<PersonEvent> eventFactory = PersonEvent.EVENT_FACTORY; ExecutorService executor = Executors.newSingleThreadExecutor(); int ringBufferSize = 4; // RingBuffer 大小,必须是 2 的 N 次方; Disruptor<PersonEvent> disruptor = new Disruptor<>(eventFactory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy()); ringBuffer = disruptor.getRingBuffer(); //获取生产者的位置信息 sequenceBarrier = ringBuffer.newBarrier(); //消费者 handler = new PersonEventHandler(); //事件处理器,监控指定ringBuffer,有数据时通知指定handler进行处理 batchEventProcessor = new BatchEventProcessor<>(ringBuffer, sequenceBarrier, handler); //传入所有消费者线程的序号 // ringBuffer.setGatingSequences(batchEventProcessor.getSequence()); }
public void post(Object event) { Class<?> eventClass = event.getClass(); Disruptor<BaseEvent> eventDisruptor = disruptorPool.get(eventClass); if(eventDisruptor == null) { List<RegistedEventHandler> registedEventHandlers = handlesMap.get(eventClass); if(registedEventHandlers == null || registedEventHandlers.size() == 0) { throw new RuntimeException("The " + eventClass.getSimpleName() + " event dosen't regist any eventHandler."); } eventDisruptor = createDisruptor(eventClass, registedEventHandlers); disruptorPool.put(eventClass, eventDisruptor); } // check whether has event Repository definition. if(eventRepository != null) { eventRepository.save(event); } RingBuffer<BaseEvent> ringBuffer = eventDisruptor.getRingBuffer(); BaseEventProducer producer = new BaseEventProducer(ringBuffer); producer.onData(event); }
protected Disruptor<BaseEvent> createDisruptor(Class<?> eventClass, List<RegistedEventHandler> registedEventHandlers) { WaitStrategy waitStrategy = new BlockingWaitStrategy(); // load the customized event bufferSize. int bufferSize = EventUtils.getEventBufferSize(eventClass); Disruptor<BaseEvent> disruptor =new Disruptor<BaseEvent>(new BaseEventFactory(), bufferSize, Executors.newCachedThreadPool(), ProducerType.SINGLE, waitStrategy); List<BaseEventHandler> baseEventHandlers = Lists.newArrayList(); EventType eventType = EventUtils.getEventType(eventClass); if(EventType.ORDER.equals(eventType)) { List<RegistedEventHandler> orderingHandlers = orderingRegistedEventHandlers(registedEventHandlers); baseEventHandlers.add(new BaseEventHandler(EventType.ORDER, orderingHandlers.toArray(new RegistedEventHandler[0]))); } else if(EventType.CONCURRENCY.equals(eventType)) { for(RegistedEventHandler registedEventHandler : registedEventHandlers) { baseEventHandlers.add(new BaseEventHandler(EventType.CONCURRENCY, registedEventHandler)); } } else { throw new RuntimeException("The definition of event type is not correct."); } disruptor.handleEventsWith(baseEventHandlers.toArray(new BaseEventHandler[0])); disruptor.start(); return disruptor; }
@Before public void setup(){ mockDisruptor = createStrictMock(Disruptor.class); mockThreadFactory = createStrictMock(ThreadFactory.class); disruptorLifecycleManager = new AbstractDisruptorLifecycleManager<String>() { @Override public void init() { } }; disruptorLifecycleManager.setDisruptor(mockDisruptor); disruptorLifecycleManager.setThreadFactory(mockThreadFactory); disruptorLifecycleManager.setThreadName(THREAD_NAME); assertNotNull(disruptorLifecycleManager.getDisruptor()); assertNotNull(disruptorLifecycleManager.getThreadFactory()); assertNotNull(disruptorLifecycleManager.getThreadName()); }
@SuppressWarnings( "unchecked" ) public DisruptorEventBus( int handlerCount, int bufferSizeShift ) { executor = Executors.newFixedThreadPool( handlerCount, r -> new Thread(r, "DisruptorHandler") ); EventHolderFactory factory = new EventHolderFactory(); int bufferSize = 1 << bufferSizeShift; disruptor = new Disruptor<>( factory, bufferSize, executor ); for ( int i = 0; i < handlerCount; i ++ ) { handlers.add( new StatisticEventHandler( i ) ); disruptor.handleEventsWith( handlers.get( handlers.size() - 1 ) ); } disruptor.start(); this.subscribe( WriteToStorageEvent.class, e -> { MetricStorage workStorage = e.storageToWriteTo().getSubStorageCalled( "EventBus" ); workStorage.store( "EventsFired", eventCount.doubleValue() ); }); }
@SuppressWarnings("unchecked") public static void main(String[] args) throws InterruptedException { //Executor that will be used to construct new threads for consumers Executor executor = Executors.newCachedThreadPool(); //Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; //Disruptor<ObjectEvent> disruptor = new Disruptor<>(ObjectEvent::new, bufferSize, executor); Disruptor<ObjectEvent> disruptor = new Disruptor<>(ObjectEvent::new, bufferSize, executor, ProducerType.SINGLE, new LiteBlockingWaitStrategy()); disruptor.handleEventsWith(App::handleEvent1); disruptor.handleEventsWith(App::handleEvent2); //disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event.getObject())); disruptor.start(); produceEvents(disruptor); }
public void init(Properties configProps) { // subscriptions = new SubscriptionsStore(); //Modified by WSO2 in-order to extend the capability of the existing subscriptions store //to be more suitable for the distribution architecture of Andes subscriptions = new MQTTSubscriptionStore(); ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("Disruptor MQTT Simple Messaging Thread %d").build(); ExecutorService executor = Executors.newCachedThreadPool(namedThreadFactory); Integer ringBufferSize = AndesConfigurationManager.readValue( AndesConfiguration.TRANSPORTS_MQTT_INBOUND_BUFFER_SIZE); disruptor = new Disruptor<ValueEvent>( ValueEvent.EVENT_FACTORY, ringBufferSize, executor); //Added by WSO2, we do not want to ignore the exception here disruptor.handleExceptionsWith(new MqttLogExceptionHandler()); SequenceBarrier barrier = disruptor.getRingBuffer().newBarrier(); BatchEventProcessor<ValueEvent> eventProcessor = new BatchEventProcessor<ValueEvent>( disruptor.getRingBuffer(), barrier, this); //Added by WSO2, we need to make sure the exceptions aren't ignored eventProcessor.setExceptionHandler(new MqttLogExceptionHandler()); disruptor.handleEventsWith(eventProcessor); m_ringBuffer = disruptor.start(); disruptorPublish(new InitEvent(configProps)); }
public static Worker createWorker(File conf, EventHandler<KinesisEvent> handler, String appName)throws IOException{ Executor executor = Executors.newCachedThreadPool(); Disruptor<KinesisEvent> disruptor = new Disruptor<>(KinesisEvent.EVENT_FACTORY, 128, executor); disruptor.handleEventsWith(handler); RingBuffer<KinesisEvent> buffer = disruptor.start(); Properties props = new Properties(); props.load(new FileReader(conf)); // Generate a unique worker ID String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); String accessid = props.getProperty("aws-access-key-id"); String secretkey = props.getProperty("aws-secret-key"); String streamname = props.getProperty("aws-kinesis-stream-name"); BasicAWSCredentials creds = new BasicAWSCredentials(accessid, secretkey); CredProvider credprovider = new CredProvider(creds); KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(appName, streamname, credprovider, workerId); Worker worker = new Worker(new RecordProcessorFactory(buffer), config, new MetricsFactory()); return worker; }
@Before @SuppressWarnings("unchecked") public void setup() { responseBuffer = new Disruptor<ResponseEvent>(new EventFactory<ResponseEvent>() { @Override public ResponseEvent newInstance() { return new ResponseEvent(); } }, 1024, Executors.newCachedThreadPool()); firedEvents = Collections.synchronizedList(new ArrayList<CouchbaseMessage>()); latch = new CountDownLatch(1); responseBuffer.handleEventsWith(new EventHandler<ResponseEvent>() { @Override public void onEvent(ResponseEvent event, long sequence, boolean endOfBatch) throws Exception { firedEvents.add(event.getMessage()); latch.countDown(); } }); responseRingBuffer = responseBuffer.start(); }
public void init() throws NullPointerException { if (factory == null) { throw new NullPointerException("factory == null"); } if (bufferSize <= 0) { throw new NullPointerException("bufferSize <= 0"); } if (threadFactory == null) { throw new NullPointerException("threadFactory == null"); } producers = new ArrayList<ExchangeEventProducer<T>>(); disruptor = new Disruptor<Exchange>(factory, bufferSize, threadFactory, ProducerType.SINGLE, new YieldingWaitStrategy()); }
public static void stop() { final Disruptor<RingBufferLogEvent> temp = disruptor; // Must guarantee that publishing to the RingBuffer has stopped // before we call disruptor.shutdown() disruptor = null; // client code fails with NPE if log after stop = OK temp.shutdown(); // wait up to 10 seconds for the ringbuffer to drain final RingBuffer<RingBufferLogEvent> ringBuffer = temp.getRingBuffer(); for (int i = 0; i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) { if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) { break; } try { // give ringbuffer some time to drain... Thread.sleep(HALF_A_SECOND); } catch (final InterruptedException e) { // ignored } } executor.shutdown(); // finally, kill the processor thread }
private static synchronized void initDisruptor() { if (disruptor != null) { LOGGER.trace("AsyncLoggerConfigHelper not starting new disruptor, using existing object. Ref count is {}.", count); return; } LOGGER.trace("AsyncLoggerConfigHelper creating new disruptor. Ref count is {}.", count); final int ringBufferSize = calculateRingBufferSize(); final WaitStrategy waitStrategy = createWaitStrategy(); executor = Executors.newSingleThreadExecutor(threadFactory); disruptor = new Disruptor<Log4jEventWrapper>(FACTORY, ringBufferSize, executor, ProducerType.MULTI, waitStrategy); final EventHandler<Log4jEventWrapper>[] handlers = new Log4jEventWrapperHandler[] {// new Log4jEventWrapperHandler() }; final ExceptionHandler errorHandler = getExceptionHandler(); disruptor.handleExceptionsWith(errorHandler); disruptor.handleEventsWith(handlers); LOGGER.debug( "Starting AsyncLoggerConfig disruptor with ringbuffer size={}, waitStrategy={}, exceptionHandler={}...", disruptor.getRingBuffer().getBufferSize(), waitStrategy.getClass().getSimpleName(), errorHandler); disruptor.start(); }
public DisruptorBasedRelatedItemIndexRequestProcessor(Configuration configuration, IndexingRequestConverterFactory requestConverter, EventFactory<RelatedItemIndexingMessage> indexingMessageFactory, RelatedItemIndexingMessageEventHandler eventHandler ) { this.executorService = getExecutorService(); this.canOutputRequestData = configuration.isSafeToOutputRequestData(); this.requestConverter = requestConverter; disruptor = new Disruptor<RelatedItemIndexingMessage>( indexingMessageFactory, Util.ceilingNextPowerOfTwo(configuration.getSizeOfIncomingMessageQueue()), executorService, ProducerType.MULTI, configuration.getWaitStrategyFactory().createWaitStrategy()); this.eventHandler = eventHandler; disruptor.handleExceptionsWith(new IgnoreExceptionHandler()); disruptor.handleEventsWith(new EventHandler[] {eventHandler}); ringBuffer = disruptor.start(); }
public DisruptorBasedSearchRequestProcessor(IncomingSearchRequestTranslator searchRequestTranslator , RelatedContentSearchRequestProcessorHandler eventHandler, RelatedItemSearchRequestFactory relatedItemSearchRequestFactory, Configuration configuration, SearchRequestParameterValidatorLocator searchRequestValidator) { this.executorService = getExecutorService(); this.searchRequestTranslator = searchRequestTranslator; this.eventHandler = eventHandler; this.requestValidators= searchRequestValidator; disruptor = new Disruptor<RelatedItemSearchRequest>( relatedItemSearchRequestFactory, configuration.getSizeOfRelatedItemSearchRequestQueue(), executorService, ProducerType.MULTI, configuration.getWaitStrategyFactory().createWaitStrategy()); disruptor.handleExceptionsWith(new IgnoreExceptionHandler()); disruptor.handleEventsWith(eventHandler); ringBuffer = disruptor.start(); }
public DisruptorBasedRelatedItemSearchExecutor(final Configuration configuration, EventFactory<RelatedItemSearch> eventFactory, RelatedItemSearchDisruptorEventHandler eventHandler ) { this.configuration = configuration; this.eventHandler = eventHandler; int bufferSize = configuration.getSizeOfRelatedItemSearchRequestHandlerQueue(); this.executorService = getExecutorService(); disruptor = new Disruptor<RelatedItemSearch>( eventFactory,bufferSize, executorService, ProducerType.SINGLE, configuration.getWaitStrategyFactory().createWaitStrategy()); disruptor.handleExceptionsWith(new IgnoreExceptionHandler()); disruptor.handleEventsWith(new EventHandler[] {eventHandler}); disruptor.start(); }
public DisruptorRelatedItemSearchResultsToResponseGateway(Configuration configuration, SearchEventProcessor requestProcessor, SearchEventProcessor responseProcessor ) { this.executorService = getExecutorService(); int bufferSize = configuration.getSizeOfRelatedItemSearchRequestAndResponseQueue(); disruptor = new Disruptor<SearchEvent>( SearchEvent.FACTORY, bufferSize, executorService, ProducerType.MULTI, configuration.getWaitStrategyFactory().createWaitStrategy()); disruptor.handleExceptionsWith(new IgnoreExceptionHandler()); disruptor.handleEventsWith(new EventHandler[] {eventHandler}); ringBuffer = disruptor.start(); eventProcessors[SearchEventType.REQUEST.getIndex()] = requestProcessor; eventProcessors[SearchEventType.RESPONSE.getIndex()] = responseProcessor; }
public DisruptorBasedResponseContextTypeBasedResponseEventHandler(Configuration configuration, ResponseEventHandler delegate) { this.delegateHandler = delegate; this.executorService = getExecutorService(); int bufferSize = configuration.getSizeOfResponseProcessingQueue(); if(bufferSize==-1) { bufferSize = configuration.getSizeOfRelatedItemSearchRequestQueue(); } else { bufferSize = Util.ceilingNextPowerOfTwo(bufferSize); } disruptor = new Disruptor<SearchResultsToDistributeToResponseContexts>( FACTORY, bufferSize, executorService, ProducerType.MULTI, configuration.getWaitStrategyFactory().createWaitStrategy()); disruptor.handleExceptionsWith(new IgnoreExceptionHandler()); disruptor.handleEventsWith(new EventHandler[] {EVENT_HANDLER}); ringBuffer = disruptor.start(); }
public AsyncReadResolver(int maxThreads,int bufferSize, TxnSupplier txnSupplier, RollForwardStatus status, TrafficControl trafficControl, KeyedReadResolver synchronousResolver){ this.txnSupplier=txnSupplier; this.trafficControl=trafficControl; this.status=status; this.synchronousResolver = synchronousResolver; consumerThreads=new ThreadPoolExecutor(maxThreads,maxThreads, 60,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setNameFormat("readResolver-%d").setDaemon(true).build()); int bSize=1; while(bSize<bufferSize) bSize<<=1; disruptor=new Disruptor<>(new ResolveEventFactory(),bSize,consumerThreads, ProducerType.MULTI, new BlockingWaitStrategy()); //we want low latency here, but it might cost too much in CPU disruptor.handleEventsWith(new ResolveEventHandler()); ringBuffer=disruptor.getRingBuffer(); }
public TransactionResolver(TxnSupplier txnSupplier, int numThreads, int bufferSize) { this.txnSupplier = txnSupplier; this.consumerThreads = MoreExecutors.namedThreadPool(numThreads, numThreads, "txn-resolve-%d", 60, true); this.consumerThreads.allowCoreThreadTimeOut(true); int bSize=1; while(bSize<bufferSize) bSize<<=1; disruptor = new Disruptor<>(new EventFactory<TxnResolveEvent>() { @Override public TxnResolveEvent newInstance() { return new TxnResolveEvent(); } },bSize,consumerThreads, ProducerType.MULTI, new BlockingWaitStrategy()); disruptor.handleEventsWith(new ResolveEventHandler()); ringBuffer = disruptor.getRingBuffer(); disruptor.start(); }