Java 类com.lmax.disruptor.dsl.Disruptor 实例源码

项目:eds    文件:PublishManager.java   
private void initDisruptor(int processors, int ringBufferSize) {
  LOG.info("eds client init disruptor with processors="
      + processors + " and ringBufferSize=" + ringBufferSize);

  executor = Executors.newFixedThreadPool(
      processors,
      new ThreadFactoryBuilder().setNameFormat("disruptor-executor-%d").build());

  final WaitStrategy waitStrategy = createWaitStrategy();
  ringBufferSize = sizeFor(ringBufferSize); // power of 2
  disruptor = new Disruptor<>(EdsRingBufferEvent.FACTORY, ringBufferSize, executor,
      ProducerType.MULTI, waitStrategy);

  EdsEventWorkHandler[] handlers = new EdsEventWorkHandler[processors];
  for (int i = 0; i < handlers.length; i++) {
    handlers[i] = new EdsEventWorkHandler();
  }
  // handlers number = threads number
  disruptor.handleEventsWithWorkerPool(handlers); // "handleEventsWith" just like topics , with multiple consumers

  disruptor.start();
}
项目:message-broker    文件:SharedMessageStore.java   
@SuppressWarnings("unchecked")
public SharedMessageStore(MessageDao messageDao, int bufferSize, int maxDbBatchSize) {

    pendingMessages = new ConcurrentHashMap<>();
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
            .setNameFormat("DisruptorMessageStoreThread-%d").build();

    disruptor = new Disruptor<>(DbOperation.getFactory(),
            bufferSize, namedThreadFactory, ProducerType.MULTI, new SleepingWaitStrategy());

    disruptor.setDefaultExceptionHandler(new LogExceptionHandler());

    disruptor.handleEventsWith(new DbEventMatcher(bufferSize))
            .then(new DbWriter(messageDao, maxDbBatchSize))
            .then((EventHandler<DbOperation>) (event, sequence, endOfBatch) -> event.clear());
    disruptor.start();
    this.messageDao = messageDao;
}
项目:nuls    文件:DisruptorUtil.java   
/**
 * create a disruptor
 *
 * @param name           The title of the disruptor
 * @param ringBufferSize The size of ringBuffer
 */
public void createDisruptor(String name, int ringBufferSize) {
    if (DISRUPTOR_MAP.keySet().contains(name)) {
        throw new NulsRuntimeException(ErrorCode.FAILED, "create disruptor faild,the name is repetitive!");
    }

    Disruptor<DisruptorEvent> disruptor = new Disruptor<DisruptorEvent>(EVENT_FACTORY,
            ringBufferSize, new NulsThreadFactory(ModuleService.getInstance().getModuleId(EventBusModuleBootstrap.class),name), ProducerType.SINGLE,
            new SleepingWaitStrategy());
    disruptor.handleEventsWith(new EventHandler<DisruptorEvent>() {
        @Override
        public void onEvent(DisruptorEvent disruptorEvent, long l, boolean b) throws Exception {
            Log.debug(disruptorEvent.getData() + "");
        }
    });
    DISRUPTOR_MAP.put(name, disruptor);
}
项目:nuls    文件:DisruptorUtil.java   
/**
 * add the data obj to the disruptor named the field name
 *
 * @param name
 * @param obj
 */
public void offer(String name, Object obj) {
    Disruptor<DisruptorEvent> disruptor = DISRUPTOR_MAP.get(name);
    AssertUtil.canNotEmpty(disruptor, "the disruptor is not exist!name:" + name);
    RingBuffer<DisruptorEvent> ringBuffer = disruptor.getRingBuffer();
    //请求下一个事件序号;
    long sequence = ringBuffer.next();
    try {
        //获取该序号对应的事件对象;
        DisruptorEvent event = ringBuffer.get(sequence);
        event.setData(obj);
    } finally {
        //发布事件;
        ringBuffer.publish(sequence);
    }
}
项目:-artemis-disruptor-miaosha    文件:JmsServerConfiguration.java   
@Bean
public JmsMessageSender responseMessageSender(@Value("${jms-sender.ring-buffer-size}")int ringBufferSize) throws JMSException {

  DisruptorJmsMessageSender disruptorJmsMessageSender = DisruptorJmsMessageSenderFactory.create(
      responseSession(),
      responseMessageProducer(),
      new ArtemisMessageDtoDupMessageDetectStrategy(),
      ringBufferSize
  );

  Disruptor disruptor = disruptorJmsMessageSender.getDisruptor();

  BeanRegisterUtils.registerSingleton(
      applicationContext,
      "responseMessageSenderLifeCycleContainer",
      new DisruptorLifeCycleContainer("responseMessageSender", disruptor, Ordered.LOWEST_PRECEDENCE)
  );

  return disruptorJmsMessageSender;

}
项目:artemis-disruptor-miaosha    文件:JmsServerConfiguration.java   
@Bean
public JmsMessageSender responseMessageSender(@Value("${jms-sender.ring-buffer-size}")int ringBufferSize) throws JMSException {

  DisruptorJmsMessageSender disruptorJmsMessageSender = DisruptorJmsMessageSenderFactory.create(
      responseSession(),
      responseMessageProducer(),
      new ArtemisMessageDtoDupMessageDetectStrategy(),
      ringBufferSize
  );

  Disruptor disruptor = disruptorJmsMessageSender.getDisruptor();

  BeanRegisterUtils.registerSingleton(
      applicationContext,
      "responseMessageSenderLifeCycleContainer",
      new DisruptorLifeCycleContainer("responseMessageSender", disruptor, Ordered.LOWEST_PRECEDENCE)
  );

  return disruptorJmsMessageSender;

}
项目:disruptor_benchmark    文件:LMAXDisruptorPushPullBenchmark.java   
@Setup
public void setup() {
    executor = Executors.newSingleThreadExecutor();
    disruptor = new Disruptor<LongEvent>(LongEvent.EVENT_FACTORY, executor, new SingleThreadedClaimStrategy(Run.QUEUE_SIZE), new BusySpinWaitStrategy());

    eventCount = new AtomicInteger();

    handler = (event, sequence, endOfBatch) -> {
        if(Run.LONGVAL == event.getValue()) {
            eventCount.incrementAndGet();
        } else {
            throw new RuntimeException("Failed.");
        }
    };

    disruptor.handleEventsWith(handler);

    ringBuffer = disruptor.start();
}
项目:disruptor_benchmark    文件:LMAXDisruptorBenchmark.java   
@Setup
public void setup() {
    executor = Executors.newFixedThreadPool(Run.NTHREAD);
    disruptor = new Disruptor<LongEvent>(LongEvent.EVENT_FACTORY, executor, new MultiThreadedClaimStrategy(Run.QUEUE_SIZE), new BusySpinWaitStrategy());

    eventCount = new AtomicInteger();

    handler = (event, sequence, endOfBatch) -> {
        if(Run.LONGVAL == event.getValue()) {
            eventCount.incrementAndGet();
        } else {
            throw new RuntimeException("Failed.");
        }
    };

    disruptor.handleEventsWith(handler);

    ringBuffer = disruptor.start();
}
项目:disruptor-code-analysis    文件:MultiProducerWithTranslator.java   
public static void main(String[] args) throws InterruptedException
{
    Disruptor<ObjectBox> disruptor = new Disruptor<ObjectBox>(
        ObjectBox.FACTORY, RING_SIZE, Executors.newCachedThreadPool(), ProducerType.MULTI,
        new BlockingWaitStrategy());
    disruptor.handleEventsWith(new Consumer()).then(new Consumer());
    final RingBuffer<ObjectBox> ringBuffer = disruptor.getRingBuffer();
    Publisher p = new Publisher();
    IMessage message = new IMessage();
    ITransportable transportable = new ITransportable();
    String streamName = "com.lmax.wibble";
    System.out.println("publishing " + RING_SIZE + " messages");
    for (int i = 0; i < RING_SIZE; i++)
    {
        ringBuffer.publishEvent(p, message, transportable, streamName);
        Thread.sleep(10);
    }
    System.out.println("start disruptor");
    disruptor.start();
    System.out.println("continue publishing");
    while (true)
    {
        ringBuffer.publishEvent(p, message, transportable, streamName);
        Thread.sleep(10);
    }
}
项目:HeliosStreams    文件:RingBufferService.java   
@SuppressWarnings("unchecked")
private RingBufferService(final Properties config) {
    config.put(KafkaProducerService.CONFIG_PREFIX + "value.serializer", ByteBufSerde.ByteBufSerializer.class.getName());
    config.put(KafkaProducerService.CONFIG_PREFIX + "key.serializer", Serdes.String().serializer());
    producer = KafkaProducerService.getInstance(config);
    bufferSize = ConfigurationHelper.getIntSystemThenEnvProperty(RB_CONF_BUFFER_SIZE, RB_DEFAULT_BUFFER_SIZE, config);
    targetTopic = ConfigurationHelper.getSystemThenEnvProperty(RB_CONF_TOPIC_NAME, RB_DEFAULT_TOPIC_NAME, config);
    eventBuffInitSize = ConfigurationHelper.getIntSystemThenEnvProperty(RB_CONF_BUFF_INITIAL_SIZE, RB_DEFAULT_BUFF_INITIAL_SIZE, config);
    shutdownTimeout = ConfigurationHelper.getIntSystemThenEnvProperty(RB_CONF_STOP_TIMEOUT, RB_DEFAULT_STOP_TIMEOUT, config);
    waitStrategy = RBWaitStrategy.getConfiguredStrategy(config);
    disruptor = new Disruptor<ByteBuf>(this, bufferSize, threadFactory, ProducerType.MULTI, waitStrategy);
    disruptor.handleEventsWith(this);   // FIXME: need to able to supply several handlers
    disruptor.start();
    ringBuffer = disruptor.getRingBuffer();
    log.info("<<<<< RawRingBufferDispatcher Started.");     
}
项目:Electrons    文件:ListenerChainBuilderNew.java   
private static void dealWithDisruptorFromTail(ListenerChain chain, Disruptor<ElectronsHolder> disruptor) {
    if (idOnly(chain.getId(), chain.getAfter())) {
        return;
    }
    List<ListenerChain> befores = chain.getBefores();
    if (CollectionUtils.isEmpty(befores)) {
        return;
    }
    for (ListenerChain c : befores) {
        dealWithDisruptorFromTail(c, disruptor);
    }
    ProxyHandler[] handlers = new ProxyHandler[befores.size()];
    for (int i = 0; i < befores.size(); i++) {
        handlers[i] = befores.get(i).getProxyHandler();
    }
    disruptor.after(handlers).handleEventsWith(chain.getProxyHandler());
}
项目:cicada    文件:DisruptorTransfer.java   
@SuppressWarnings("unchecked")
public DisruptorTransfer(final SpanEventHandler spanEventHandler, final int buffSize) {
  // Executor executor = Executors.newCachedThreadPool();
  final ThreadFactory threadFactory = Executors.defaultThreadFactory();

  // The factory for the event
  final SpanEventFactory factory = new SpanEventFactory();

  // Specify the size of the ring buffer, must be power of 2.
  final int bufferSize = buffSize;

  // Construct the Disruptor
  disruptor = new Disruptor<SpanEvent>(factory, bufferSize, threadFactory);

  // Connect the handler
  // disruptor.handleEventsWith(new
  // SpanEventHandler("http://localhost:9080/upload"));
  disruptor.handleEventsWith(spanEventHandler);

  // Start the Disruptor, starts all threads running
  disruptor.start();

  final RingBuffer<SpanEvent> ringBuffer = disruptor.getRingBuffer();
  producer = new SpanEventProducer(ringBuffer);
}
项目:aliyun-tablestore-java-sdk    文件:DefaultOTSWriter.java   
private void initialize() {
    logger.info("Start initialize ots writer, table name: {}.", tableName);
    DescribeTableRequest request = new DescribeTableRequest();
    request.setTableName(tableName);
    OTSFuture<DescribeTableResult> result = ots.describeTable(request);
    DescribeTableResult res = result.get();
    this.tableMeta = res.getTableMeta();
    logger.info("End initialize with table meta: {}.", tableMeta);

    RowChangeEvent.RowChangeEventFactory factory = new RowChangeEvent.RowChangeEventFactory();

    // start flush thread, we only need one event handler, so we just set a thread pool with fixed size 1.
    disruptorExecutor = Executors.newFixedThreadPool(1);
    disruptor = new Disruptor<RowChangeEvent>(factory, writerConfig.getBufferSize(), disruptorExecutor);
    ringBuffer = disruptor.getRingBuffer();
    eventHandler = new RowChangeEventHandler(ots, writerConfig, callback, executor);
    disruptor.handleEventsWith(eventHandler);
    disruptor.start();

    // start flush timer
    startFlushTimer(writerConfig.getFlushInterval());
}
项目:chaperone    文件:AuditReporter.java   
AuditReporter(int queueSize, long timeBucketIntervalInSec, int reportFreqMsgCount, int reportFreqIntervalSec,
    boolean combineMetricsAmongHosts) {
  reportExecutor =
      Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat(getType() + "-audit-reporter-%d")
          .build());;

  queueSize = Util.ceilingNextPowerOfTwo(queueSize);
  disruptor = new Disruptor<AuditMsgReportTask>(new AuditMsgReportTaskFactory(), queueSize, reportExecutor);
  disruptor.handleEventsWith(new AuditMsgReportTaskHandler(this));
  ringBuffer = disruptor.getRingBuffer();

  aggregator =
      new AuditAggregator(timeBucketIntervalInSec, reportFreqMsgCount, reportFreqIntervalSec,
          combineMetricsAmongHosts);

  SUBMITTED_COUNTER = Metrics.getRegistry().meter(getType() + ".auditReporter.submittedNumber");
  FAILED_TO_SUBMIT_COUNTER = Metrics.getRegistry().meter(getType() + ".auditReporter.failedToSubmitNumber");
  REPORTED_COUNTER = Metrics.getRegistry().meter(getType() + ".auditReporter.reportedNumber");
  FAILED_TO_REPORT_COUNTER = Metrics.getRegistry().meter(getType() + ".auditReporter.failedToReportNumber");
  Metrics.getRegistry().register(getType() + ".auditReporter.queueSize", new Gauge<Integer>() {
    @Override
    public Integer getValue() {
      return (int) disruptor.getRingBuffer().remainingCapacity();
    }
  });
}
项目:Camel    文件:DisruptorReference.java   
private Disruptor<ExchangeEvent> getCurrentDisruptor() throws DisruptorNotStartedException {
    Disruptor<ExchangeEvent> currentDisruptor = disruptor.getReference();

    if (currentDisruptor == null) {
        // no current Disruptor reference, we may be reconfiguring or it was not started
        // check which by looking at the reference mark...
        boolean[] changeIsPending = new boolean[1];

        while (currentDisruptor == null) {
            currentDisruptor = disruptor.get(changeIsPending);
            //Check if we are reconfiguring
            if (currentDisruptor == null && !changeIsPending[0]) {
                throw new DisruptorNotStartedException(
                        "Disruptor is not yet started or already shut down.");
            } else if (currentDisruptor == null && changeIsPending[0]) {
                //We should be back shortly...keep trying but spare CPU resources
                LockSupport.parkNanos(1L);
            }
        }
    }

    return currentDisruptor;
}
项目:silverflash    文件:EventReactor.java   
/**
 * Start dispatching events
 * 
 * @return a Future that asyncrhonously notifies an observer when this EventReactor is ready
 */
@SuppressWarnings("unchecked")
public CompletableFuture<? extends EventReactor<T>> open() {
  if (isRunning.compareAndSet(false, true)) {
    CompletableFuture<EventReactor<T>> future = new CompletableFuture<>();
    this.disruptor =
        new Disruptor<>(EVENT_FACTORY, ringSize, threadFactory, ProducerType.MULTI,
            new BusySpinWaitStrategy());
    this.disruptor.handleEventsWith(this::handleEvent);
    this.ringBuffer = disruptor.start();

    // Starts a timer thread
    this.timer = new Timer();
    future.complete(this);
    return future;
  } else {
    return CompletableFuture.completedFuture(this);
  }
}
项目:cep    文件:Source.java   
/**
 * Create a new source.
 * <p>This method will prepare the instance with some needed variables
 * in order to be started later with the start method (implemented by children).
 *
 * @param parsersManager Instance of ParserManager that will serve parsers to this source instance
 * @param eventHandler Instance of EventHandler that will receive the events generated by this source instance
 * @param properties Map of properties associated with this source
 */

public Source(ParsersManager parsersManager, EventHandler eventHandler, Map<String, Object> properties) {
    // Save the references for later use
    this.parsersManager = parsersManager;
    this.properties = properties;

    // Create the ring buffer for this topic and start it
    Disruptor<MapEvent> disruptor = new Disruptor<>(new MapEventFactory(), ConfigData.getRingBufferSize(), Executors.newCachedThreadPool());
    disruptor.handleEventsWith(eventHandler);
    disruptor.start();

    // Create the event producer that will receive the events produced by
    // this source instance
    eventProducer = new EventProducer(disruptor.getRingBuffer());
    prepare();
}
项目:Scaled-ML    文件:AbstractParallelModule.java   
@Provides
@Singleton
@Named("disruptor")
@SuppressWarnings("Unchecked")
protected Disruptor<TwoPhaseEvent<T>> inputDisruptor(Provider<WorkHandler<TwoPhaseEvent<T>>> workHandlerProvider,
                                                     Provider<EventHandler<TwoPhaseEvent<T>>> evenHandlerProvider) {
    Disruptor<TwoPhaseEvent<T>> disruptor = new Disruptor<>(
            TwoPhaseEvent.factory(outputEventFactory()),
            options.ringSize(), threadsProvider,
            ProducerType.SINGLE, new SleepingWaitStrategy());
    WorkHandler<TwoPhaseEvent<T>>[] parsers = new WorkHandler[options.threads()];
    for (int i = 0; i < options.threads(); i++) {
        parsers[i] = workHandlerProvider.get();
    }
    disruptor.handleExceptionsWith(new FatalExceptionHandler());
    disruptor.handleEventsWithWorkerPool(parsers).then(evenHandlerProvider.get());
    return disruptor;
}
项目:Scaled-ML    文件:FeatureEngineeringModule.java   
@Provides
@Singleton
@Named("firstPassDisruptor")
public Disruptor<TwoPhaseEvent<Void>> firstPassDisruptor(
        Provider<WorkHandler<TwoPhaseEvent<Void>>> statisticsHandlerProvider) {
    Disruptor<TwoPhaseEvent<Void>> disruptor = new Disruptor<>(
            TwoPhaseEvent.factory(() -> null),
            options.ringSize(), threadsProvider,
            ProducerType.SINGLE, new SleepingWaitStrategy());
    WorkHandler<TwoPhaseEvent<Void>>[] parsers = new WorkHandler[options.threads()];
    for (int i = 0; i < options.threads(); i++) {
        parsers[i] = statisticsHandlerProvider.get();
    }
    disruptor.handleExceptionsWith(new FatalExceptionHandler());
    disruptor.handleEventsWithWorkerPool(parsers);
    return disruptor;
}
项目:Scaled-ML    文件:FeatureEngineeringModule.java   
@Provides
@Singleton
@Named("secondPassDisruptor")
public Disruptor<TwoPhaseEvent<SparseItem>> secondPassDisruptor(
        Provider<WorkHandler<TwoPhaseEvent<SparseItem>>> binningHandlerProvider,
        Provider<EventHandler<TwoPhaseEvent<SparseItem>>> outputHandlerProvider) {
    Disruptor<TwoPhaseEvent<SparseItem>> disruptor = new Disruptor<>(
            TwoPhaseEvent.factory(SparseItem::new),
            options.ringSize(), threadsProvider,
            ProducerType.SINGLE, new SleepingWaitStrategy());
    WorkHandler<TwoPhaseEvent<SparseItem>>[] parsers = new WorkHandler[options.threads()];
    for (int i = 0; i < options.threads(); i++) {
        parsers[i] = binningHandlerProvider.get();
    }
    disruptor.handleExceptionsWith(new FatalExceptionHandler());
    disruptor.handleEventsWithWorkerPool(parsers)
            .then(outputHandlerProvider.get());
    return disruptor;
}
项目:ddth-queue    文件:QndDisruptor2.java   
@SuppressWarnings("unchecked")
static void qndSingleThread(int numItems) {
    final AtomicLong COUNTER_RECEIVED = new AtomicLong(0);
    final Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent.FACTORY, 128,
            Executors.defaultThreadFactory(), ProducerType.MULTI, new YieldingWaitStrategy());
    disruptor.handleEventsWith(
            (event, sequence, endOfBatch) -> COUNTER_RECEIVED.incrementAndGet());
    disruptor.start();

    final long t = System.currentTimeMillis();
    for (int i = 0; i < numItems; i++) {
        disruptor.publishEvent((event, seq) -> event.set(seq));
    }
    long d = System.currentTimeMillis() - t;
    NumberFormat nf = NumberFormat.getInstance();
    System.out.println("========== qndSingleThread:");
    System.out.println("Sent: " + nf.format(numItems) + " / Received: "
            + nf.format(COUNTER_RECEIVED.get()) + " / Duration: " + d + " / Speed: "
            + NumberFormat.getInstance().format((numItems * 1000.0 / d)) + " items/sec");

    disruptor.shutdown();
}
项目:pinenut    文件:PersonHelper.java   
public PersonHelper() {
    //参数1 事件
    //参数2 单线程使用
    //参数3 等待策略
    EventFactory<PersonEvent> eventFactory = PersonEvent.EVENT_FACTORY;
    ExecutorService executor = Executors.newSingleThreadExecutor();
    int ringBufferSize = 4; // RingBuffer 大小,必须是 2 的 N 次方;

    Disruptor<PersonEvent> disruptor = new Disruptor<>(eventFactory,
            ringBufferSize, executor, ProducerType.SINGLE,
            new YieldingWaitStrategy());

    ringBuffer = disruptor.getRingBuffer();
    //获取生产者的位置信息
    sequenceBarrier = ringBuffer.newBarrier();
    //消费者
    handler = new PersonEventHandler();
    //事件处理器,监控指定ringBuffer,有数据时通知指定handler进行处理
    batchEventProcessor = new BatchEventProcessor<>(ringBuffer, sequenceBarrier, handler);
    //传入所有消费者线程的序号
//        ringBuffer.setGatingSequences(batchEventProcessor.getSequence());

  }
项目:smart-cqrs    文件:EventBus.java   
public void post(Object event) {
    Class<?> eventClass = event.getClass();
    Disruptor<BaseEvent> eventDisruptor = disruptorPool.get(eventClass);
    if(eventDisruptor == null) {
        List<RegistedEventHandler> registedEventHandlers = handlesMap.get(eventClass);
        if(registedEventHandlers == null || registedEventHandlers.size() == 0) {
            throw new RuntimeException("The " + eventClass.getSimpleName() + " event dosen't regist any eventHandler.");
        }
        eventDisruptor = createDisruptor(eventClass, registedEventHandlers);
        disruptorPool.put(eventClass, eventDisruptor);
    }
    // check whether has event Repository definition.
    if(eventRepository != null) {
        eventRepository.save(event);
    }
    RingBuffer<BaseEvent> ringBuffer = eventDisruptor.getRingBuffer();
    BaseEventProducer producer = new BaseEventProducer(ringBuffer);
    producer.onData(event);
}
项目:smart-cqrs    文件:EventBus.java   
protected Disruptor<BaseEvent> createDisruptor(Class<?> eventClass, List<RegistedEventHandler> registedEventHandlers) {
    WaitStrategy waitStrategy = new BlockingWaitStrategy();
    // load the customized event bufferSize.
    int bufferSize = EventUtils.getEventBufferSize(eventClass);
    Disruptor<BaseEvent> disruptor =new Disruptor<BaseEvent>(new BaseEventFactory(), 
            bufferSize, Executors.newCachedThreadPool(), ProducerType.SINGLE, waitStrategy);
    List<BaseEventHandler> baseEventHandlers = Lists.newArrayList();
    EventType eventType = EventUtils.getEventType(eventClass);
    if(EventType.ORDER.equals(eventType)) {
        List<RegistedEventHandler> orderingHandlers = orderingRegistedEventHandlers(registedEventHandlers);
        baseEventHandlers.add(new BaseEventHandler(EventType.ORDER, orderingHandlers.toArray(new RegistedEventHandler[0])));
    } else if(EventType.CONCURRENCY.equals(eventType)) { 
        for(RegistedEventHandler registedEventHandler : registedEventHandlers) {
            baseEventHandlers.add(new BaseEventHandler(EventType.CONCURRENCY, registedEventHandler));
        }
    } else {
        throw new RuntimeException("The definition of event type is not correct.");
    }
    disruptor.handleEventsWith(baseEventHandlers.toArray(new BaseEventHandler[0]));
    disruptor.start();
    return disruptor;
}
项目:disruptor-spring-manager    文件:AbstractDisruptorLifecycleManagerTest.java   
@Before
public void setup(){
    mockDisruptor = createStrictMock(Disruptor.class);
    mockThreadFactory = createStrictMock(ThreadFactory.class);
    disruptorLifecycleManager = new AbstractDisruptorLifecycleManager<String>() {

        @Override
        public void init() {
        }
    };

    disruptorLifecycleManager.setDisruptor(mockDisruptor);
    disruptorLifecycleManager.setThreadFactory(mockThreadFactory);
    disruptorLifecycleManager.setThreadName(THREAD_NAME);

    assertNotNull(disruptorLifecycleManager.getDisruptor());
    assertNotNull(disruptorLifecycleManager.getThreadFactory());
    assertNotNull(disruptorLifecycleManager.getThreadName());
}
项目:bifroest    文件:DisruptorEventBus.java   
@SuppressWarnings( "unchecked" )
public DisruptorEventBus( int handlerCount, int bufferSizeShift ) {
    executor = Executors.newFixedThreadPool( handlerCount,
                                                      r -> new Thread(r, "DisruptorHandler") );
    EventHolderFactory factory = new EventHolderFactory();

    int bufferSize = 1 << bufferSizeShift;
    disruptor = new Disruptor<>( factory, bufferSize, executor );

    for ( int  i = 0; i < handlerCount; i ++ ) {
        handlers.add( new StatisticEventHandler( i ) );
        disruptor.handleEventsWith( handlers.get( handlers.size() - 1 ) );
    }

    disruptor.start();

    this.subscribe( WriteToStorageEvent.class, e -> {
        MetricStorage workStorage = e.storageToWriteTo().getSubStorageCalled( "EventBus" );
        workStorage.store( "EventsFired", eventCount.doubleValue() );
    });
}
项目:DisruptorBootstrap    文件:App.java   
@SuppressWarnings("unchecked")
public static void main(String[] args) throws InterruptedException {
    //Executor that will be used to construct new threads for consumers
    Executor executor = Executors.newCachedThreadPool();
    //Specify the size of the ring buffer, must be power of 2.
    int bufferSize = 1024;
    //Disruptor<ObjectEvent> disruptor = new Disruptor<>(ObjectEvent::new, bufferSize, executor);
    Disruptor<ObjectEvent> disruptor = new Disruptor<>(ObjectEvent::new, bufferSize, executor, 
            ProducerType.SINGLE, new LiteBlockingWaitStrategy());

    disruptor.handleEventsWith(App::handleEvent1);
    disruptor.handleEventsWith(App::handleEvent2);

    //disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event.getObject()));
    disruptor.start();

    produceEvents(disruptor);
}
项目:andes    文件:SimpleMessaging.java   
public void init(Properties configProps) {
    // subscriptions = new SubscriptionsStore();
    //Modified by WSO2 in-order to extend the capability of the existing subscriptions store
    //to be more suitable for the distribution architecture of Andes
    subscriptions = new MQTTSubscriptionStore();
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
            .setNameFormat("Disruptor MQTT Simple Messaging Thread %d").build();
    ExecutorService executor = Executors.newCachedThreadPool(namedThreadFactory);
    Integer ringBufferSize = AndesConfigurationManager.readValue(
            AndesConfiguration.TRANSPORTS_MQTT_INBOUND_BUFFER_SIZE);

    disruptor = new Disruptor<ValueEvent>( ValueEvent.EVENT_FACTORY, ringBufferSize, executor);
    //Added by WSO2, we do not want to ignore the exception here
    disruptor.handleExceptionsWith(new MqttLogExceptionHandler());
    SequenceBarrier barrier = disruptor.getRingBuffer().newBarrier();
    BatchEventProcessor<ValueEvent> eventProcessor = new BatchEventProcessor<ValueEvent>(
            disruptor.getRingBuffer(), barrier, this);
    //Added by WSO2, we need to make sure the exceptions aren't ignored
    eventProcessor.setExceptionHandler(new MqttLogExceptionHandler());
    disruptor.handleEventsWith(eventProcessor);
    m_ringBuffer = disruptor.start();

    disruptorPublish(new InitEvent(configProps));
}
项目:Surf    文件:Util.java   
public static Worker createWorker(File conf, EventHandler<KinesisEvent> handler, String appName)throws IOException{
    Executor executor = Executors.newCachedThreadPool();
    Disruptor<KinesisEvent> disruptor = new Disruptor<>(KinesisEvent.EVENT_FACTORY, 128, executor);

    disruptor.handleEventsWith(handler);
    RingBuffer<KinesisEvent> buffer = disruptor.start();

    Properties props = new Properties();
    props.load(new FileReader(conf));
    // Generate a unique worker ID
    String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
    String accessid = props.getProperty("aws-access-key-id");
    String secretkey = props.getProperty("aws-secret-key");
    String streamname = props.getProperty("aws-kinesis-stream-name");
    BasicAWSCredentials creds = new BasicAWSCredentials(accessid, secretkey);
    CredProvider credprovider = new CredProvider(creds);
    KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(appName, streamname,  credprovider, workerId);

    Worker worker = new Worker(new RecordProcessorFactory(buffer), config, new MetricsFactory());
    return worker;
}
项目:couchbase-jvm-core    文件:AbstractGenericHandlerTest.java   
@Before
@SuppressWarnings("unchecked")
public void setup() {
    responseBuffer = new Disruptor<ResponseEvent>(new EventFactory<ResponseEvent>() {
        @Override
        public ResponseEvent newInstance() {
            return new ResponseEvent();
        }
    }, 1024, Executors.newCachedThreadPool());

    firedEvents = Collections.synchronizedList(new ArrayList<CouchbaseMessage>());
    latch = new CountDownLatch(1);
    responseBuffer.handleEventsWith(new EventHandler<ResponseEvent>() {
        @Override
        public void onEvent(ResponseEvent event, long sequence, boolean endOfBatch) throws Exception {
            firedEvents.add(event.getMessage());
            latch.countDown();
        }
    });
    responseRingBuffer = responseBuffer.start();
}
项目:jboss-fuse-examples    文件:DisruptorService.java   
public void init() throws NullPointerException {
    if (factory == null) {
        throw new NullPointerException("factory == null");
    }

    if (bufferSize <= 0) {
        throw new NullPointerException("bufferSize <= 0");
    }

    if (threadFactory == null) {
        throw new NullPointerException("threadFactory == null");
    }

    producers = new ArrayList<ExchangeEventProducer<T>>();
    disruptor = new Disruptor<Exchange>(factory, bufferSize, threadFactory, ProducerType.SINGLE, new YieldingWaitStrategy());
}
项目:log4j2    文件:AsyncLogger.java   
public static void stop() {
    final Disruptor<RingBufferLogEvent> temp = disruptor;

    // Must guarantee that publishing to the RingBuffer has stopped
    // before we call disruptor.shutdown()
    disruptor = null; // client code fails with NPE if log after stop = OK
    temp.shutdown();

    // wait up to 10 seconds for the ringbuffer to drain
    final RingBuffer<RingBufferLogEvent> ringBuffer = temp.getRingBuffer();
    for (int i = 0; i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
        if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) {
            break;
        }
        try {
            // give ringbuffer some time to drain...
            Thread.sleep(HALF_A_SECOND);
        } catch (final InterruptedException e) {
            // ignored
        }
    }
    executor.shutdown(); // finally, kill the processor thread
}
项目:log4j2    文件:AsyncLoggerConfigHelper.java   
private static synchronized void initDisruptor() {
    if (disruptor != null) {
        LOGGER.trace("AsyncLoggerConfigHelper not starting new disruptor, using existing object. Ref count is {}.", count);
        return;
    }
    LOGGER.trace("AsyncLoggerConfigHelper creating new disruptor. Ref count is {}.", count);
    final int ringBufferSize = calculateRingBufferSize();
    final WaitStrategy waitStrategy = createWaitStrategy();
    executor = Executors.newSingleThreadExecutor(threadFactory);
    disruptor = new Disruptor<Log4jEventWrapper>(FACTORY, ringBufferSize,
            executor, ProducerType.MULTI, waitStrategy);
    final EventHandler<Log4jEventWrapper>[] handlers = new Log4jEventWrapperHandler[] {//
    new Log4jEventWrapperHandler() };
    final ExceptionHandler errorHandler = getExceptionHandler();
    disruptor.handleExceptionsWith(errorHandler);
    disruptor.handleEventsWith(handlers);

    LOGGER.debug(
            "Starting AsyncLoggerConfig disruptor with ringbuffer size={}, waitStrategy={}, exceptionHandler={}...",
            disruptor.getRingBuffer().getBufferSize(), waitStrategy.getClass().getSimpleName(), errorHandler);
    disruptor.start();
}
项目:related    文件:DisruptorBasedRelatedItemIndexRequestProcessor.java   
public DisruptorBasedRelatedItemIndexRequestProcessor(Configuration configuration,
                                                      IndexingRequestConverterFactory requestConverter,
                                                      EventFactory<RelatedItemIndexingMessage> indexingMessageFactory,
                                                      RelatedItemIndexingMessageEventHandler eventHandler
) {
    this.executorService = getExecutorService();
    this.canOutputRequestData = configuration.isSafeToOutputRequestData();
    this.requestConverter = requestConverter;
    disruptor = new Disruptor<RelatedItemIndexingMessage>(
            indexingMessageFactory,
            Util.ceilingNextPowerOfTwo(configuration.getSizeOfIncomingMessageQueue()), executorService,
            ProducerType.MULTI, configuration.getWaitStrategyFactory().createWaitStrategy());

    this.eventHandler = eventHandler;
    disruptor.handleExceptionsWith(new IgnoreExceptionHandler());
    disruptor.handleEventsWith(new EventHandler[] {eventHandler});
    ringBuffer = disruptor.start();

}
项目:related    文件:DisruptorBasedSearchRequestProcessor.java   
public DisruptorBasedSearchRequestProcessor(IncomingSearchRequestTranslator searchRequestTranslator ,
                                            RelatedContentSearchRequestProcessorHandler eventHandler,
                                            RelatedItemSearchRequestFactory relatedItemSearchRequestFactory,
                                            Configuration configuration,
                                            SearchRequestParameterValidatorLocator searchRequestValidator) {
    this.executorService = getExecutorService();
    this.searchRequestTranslator = searchRequestTranslator;
    this.eventHandler = eventHandler;
    this.requestValidators= searchRequestValidator;
    disruptor = new Disruptor<RelatedItemSearchRequest>(
            relatedItemSearchRequestFactory,
            configuration.getSizeOfRelatedItemSearchRequestQueue(), executorService,
            ProducerType.MULTI, configuration.getWaitStrategyFactory().createWaitStrategy());
    disruptor.handleExceptionsWith(new IgnoreExceptionHandler());
    disruptor.handleEventsWith(eventHandler);
    ringBuffer = disruptor.start();

}
项目:related    文件:DisruptorBasedRelatedItemSearchExecutor.java   
public DisruptorBasedRelatedItemSearchExecutor(final Configuration configuration,
                                               EventFactory<RelatedItemSearch> eventFactory,
                                               RelatedItemSearchDisruptorEventHandler eventHandler
) {
    this.configuration = configuration;
    this.eventHandler = eventHandler;
    int bufferSize = configuration.getSizeOfRelatedItemSearchRequestHandlerQueue();

    this.executorService = getExecutorService();
    disruptor = new Disruptor<RelatedItemSearch>(
            eventFactory,bufferSize, executorService,
            ProducerType.SINGLE, configuration.getWaitStrategyFactory().createWaitStrategy());
    disruptor.handleExceptionsWith(new IgnoreExceptionHandler());
    disruptor.handleEventsWith(new EventHandler[] {eventHandler});
    disruptor.start();

}
项目:related    文件:DisruptorRelatedItemSearchResultsToResponseGateway.java   
public DisruptorRelatedItemSearchResultsToResponseGateway(Configuration configuration,
                                                          SearchEventProcessor requestProcessor,
                                                          SearchEventProcessor responseProcessor
)

{
    this.executorService = getExecutorService();
    int bufferSize = configuration.getSizeOfRelatedItemSearchRequestAndResponseQueue();
    disruptor = new Disruptor<SearchEvent>(
            SearchEvent.FACTORY,
            bufferSize, executorService,
            ProducerType.MULTI, configuration.getWaitStrategyFactory().createWaitStrategy());
    disruptor.handleExceptionsWith(new IgnoreExceptionHandler());

    disruptor.handleEventsWith(new EventHandler[] {eventHandler});
    ringBuffer = disruptor.start();
    eventProcessors[SearchEventType.REQUEST.getIndex()] = requestProcessor;
    eventProcessors[SearchEventType.RESPONSE.getIndex()] = responseProcessor;
}
项目:related    文件:DisruptorBasedResponseContextTypeBasedResponseEventHandler.java   
public DisruptorBasedResponseContextTypeBasedResponseEventHandler(Configuration configuration,
                                                                  ResponseEventHandler delegate)
{
    this.delegateHandler = delegate;
    this.executorService = getExecutorService();
    int bufferSize = configuration.getSizeOfResponseProcessingQueue();
    if(bufferSize==-1) {
        bufferSize = configuration.getSizeOfRelatedItemSearchRequestQueue();
    } else {
        bufferSize = Util.ceilingNextPowerOfTwo(bufferSize);
    }
    disruptor = new Disruptor<SearchResultsToDistributeToResponseContexts>(
            FACTORY,
            bufferSize, executorService,
            ProducerType.MULTI, configuration.getWaitStrategyFactory().createWaitStrategy());
    disruptor.handleExceptionsWith(new IgnoreExceptionHandler());

    disruptor.handleEventsWith(new EventHandler[] {EVENT_HANDLER});

    ringBuffer = disruptor.start();
}
项目:spliceengine    文件:AsyncReadResolver.java   
public AsyncReadResolver(int maxThreads,int bufferSize,
                         TxnSupplier txnSupplier,
                         RollForwardStatus status,
                         TrafficControl trafficControl,
                         KeyedReadResolver synchronousResolver){
    this.txnSupplier=txnSupplier;
    this.trafficControl=trafficControl;
    this.status=status;
    this.synchronousResolver = synchronousResolver;
    consumerThreads=new ThreadPoolExecutor(maxThreads,maxThreads,
            60,TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(),
            new ThreadFactoryBuilder().setNameFormat("readResolver-%d").setDaemon(true).build());

    int bSize=1;
    while(bSize<bufferSize)
        bSize<<=1;
    disruptor=new Disruptor<>(new ResolveEventFactory(),bSize,consumerThreads,
            ProducerType.MULTI,
            new BlockingWaitStrategy()); //we want low latency here, but it might cost too much in CPU
    disruptor.handleEventsWith(new ResolveEventHandler());
    ringBuffer=disruptor.getRingBuffer();
}
项目:spliceengine    文件:TransactionResolver.java   
public TransactionResolver(TxnSupplier txnSupplier, int numThreads, int bufferSize) {
     this.txnSupplier = txnSupplier;
     this.consumerThreads = MoreExecutors.namedThreadPool(numThreads, numThreads, "txn-resolve-%d", 60, true);
     this.consumerThreads.allowCoreThreadTimeOut(true);

     int bSize=1;
     while(bSize<bufferSize)
         bSize<<=1;

     disruptor = new Disruptor<>(new EventFactory<TxnResolveEvent>() {
@Override
public TxnResolveEvent newInstance() {
              return new TxnResolveEvent();
}
     },bSize,consumerThreads,
             ProducerType.MULTI,
             new BlockingWaitStrategy());
     disruptor.handleEventsWith(new ResolveEventHandler());
     ringBuffer = disruptor.getRingBuffer();
     disruptor.start();
 }