Java 类com.lmax.disruptor.dsl.ProducerType 实例源码

项目:eds    文件:PublishManager.java   
private void initDisruptor(int processors, int ringBufferSize) {
  LOG.info("eds client init disruptor with processors="
      + processors + " and ringBufferSize=" + ringBufferSize);

  executor = Executors.newFixedThreadPool(
      processors,
      new ThreadFactoryBuilder().setNameFormat("disruptor-executor-%d").build());

  final WaitStrategy waitStrategy = createWaitStrategy();
  ringBufferSize = sizeFor(ringBufferSize); // power of 2
  disruptor = new Disruptor<>(EdsRingBufferEvent.FACTORY, ringBufferSize, executor,
      ProducerType.MULTI, waitStrategy);

  EdsEventWorkHandler[] handlers = new EdsEventWorkHandler[processors];
  for (int i = 0; i < handlers.length; i++) {
    handlers[i] = new EdsEventWorkHandler();
  }
  // handlers number = threads number
  disruptor.handleEventsWithWorkerPool(handlers); // "handleEventsWith" just like topics , with multiple consumers

  disruptor.start();
}
项目:message-broker    文件:SharedMessageStore.java   
@SuppressWarnings("unchecked")
public SharedMessageStore(MessageDao messageDao, int bufferSize, int maxDbBatchSize) {

    pendingMessages = new ConcurrentHashMap<>();
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
            .setNameFormat("DisruptorMessageStoreThread-%d").build();

    disruptor = new Disruptor<>(DbOperation.getFactory(),
            bufferSize, namedThreadFactory, ProducerType.MULTI, new SleepingWaitStrategy());

    disruptor.setDefaultExceptionHandler(new LogExceptionHandler());

    disruptor.handleEventsWith(new DbEventMatcher(bufferSize))
            .then(new DbWriter(messageDao, maxDbBatchSize))
            .then((EventHandler<DbOperation>) (event, sequence, endOfBatch) -> event.clear());
    disruptor.start();
    this.messageDao = messageDao;
}
项目:nuls    文件:DisruptorUtil.java   
/**
 * create a disruptor
 *
 * @param name           The title of the disruptor
 * @param ringBufferSize The size of ringBuffer
 */
public void createDisruptor(String name, int ringBufferSize) {
    if (DISRUPTOR_MAP.keySet().contains(name)) {
        throw new NulsRuntimeException(ErrorCode.FAILED, "create disruptor faild,the name is repetitive!");
    }

    Disruptor<DisruptorEvent> disruptor = new Disruptor<DisruptorEvent>(EVENT_FACTORY,
            ringBufferSize, new NulsThreadFactory(ModuleService.getInstance().getModuleId(EventBusModuleBootstrap.class),name), ProducerType.SINGLE,
            new SleepingWaitStrategy());
    disruptor.handleEventsWith(new EventHandler<DisruptorEvent>() {
        @Override
        public void onEvent(DisruptorEvent disruptorEvent, long l, boolean b) throws Exception {
            Log.debug(disruptorEvent.getData() + "");
        }
    });
    DISRUPTOR_MAP.put(name, disruptor);
}
项目:disruptor-code-analysis    文件:MultiProducerWithTranslator.java   
public static void main(String[] args) throws InterruptedException
{
    Disruptor<ObjectBox> disruptor = new Disruptor<ObjectBox>(
        ObjectBox.FACTORY, RING_SIZE, Executors.newCachedThreadPool(), ProducerType.MULTI,
        new BlockingWaitStrategy());
    disruptor.handleEventsWith(new Consumer()).then(new Consumer());
    final RingBuffer<ObjectBox> ringBuffer = disruptor.getRingBuffer();
    Publisher p = new Publisher();
    IMessage message = new IMessage();
    ITransportable transportable = new ITransportable();
    String streamName = "com.lmax.wibble";
    System.out.println("publishing " + RING_SIZE + " messages");
    for (int i = 0; i < RING_SIZE; i++)
    {
        ringBuffer.publishEvent(p, message, transportable, streamName);
        Thread.sleep(10);
    }
    System.out.println("start disruptor");
    disruptor.start();
    System.out.println("continue publishing");
    while (true)
    {
        ringBuffer.publishEvent(p, message, transportable, streamName);
        Thread.sleep(10);
    }
}
项目:HeliosStreams    文件:RingBufferService.java   
@SuppressWarnings("unchecked")
private RingBufferService(final Properties config) {
    config.put(KafkaProducerService.CONFIG_PREFIX + "value.serializer", ByteBufSerde.ByteBufSerializer.class.getName());
    config.put(KafkaProducerService.CONFIG_PREFIX + "key.serializer", Serdes.String().serializer());
    producer = KafkaProducerService.getInstance(config);
    bufferSize = ConfigurationHelper.getIntSystemThenEnvProperty(RB_CONF_BUFFER_SIZE, RB_DEFAULT_BUFFER_SIZE, config);
    targetTopic = ConfigurationHelper.getSystemThenEnvProperty(RB_CONF_TOPIC_NAME, RB_DEFAULT_TOPIC_NAME, config);
    eventBuffInitSize = ConfigurationHelper.getIntSystemThenEnvProperty(RB_CONF_BUFF_INITIAL_SIZE, RB_DEFAULT_BUFF_INITIAL_SIZE, config);
    shutdownTimeout = ConfigurationHelper.getIntSystemThenEnvProperty(RB_CONF_STOP_TIMEOUT, RB_DEFAULT_STOP_TIMEOUT, config);
    waitStrategy = RBWaitStrategy.getConfiguredStrategy(config);
    disruptor = new Disruptor<ByteBuf>(this, bufferSize, threadFactory, ProducerType.MULTI, waitStrategy);
    disruptor.handleEventsWith(this);   // FIXME: need to able to supply several handlers
    disruptor.start();
    ringBuffer = disruptor.getRingBuffer();
    log.info("<<<<< RawRingBufferDispatcher Started.");     
}
项目:camunda-bpm-reactor    文件:Environment.java   
private void initDispatcherFactoryFromConfiguration(String name) {
  if (dispatcherFactories.get(name) != null) {
    return;
  }
  for (DispatcherConfiguration dispatcherConfiguration : configuration.getDispatcherConfigurations()) {

    if (!dispatcherConfiguration.getName()
      .equalsIgnoreCase(name)) {
      continue;
    }

    if (DispatcherType.DISPATCHER_GROUP == dispatcherConfiguration.getType()) {
      addCachedDispatchers(dispatcherConfiguration.getName(),
        createDispatcherFactory(dispatcherConfiguration.getName(),
          dispatcherConfiguration.getSize() == 0 ? PROCESSORS : dispatcherConfiguration.getSize(),
          dispatcherConfiguration.getBacklog(),
          null,
          ProducerType.MULTI,
          new AgileWaitingStrategy()));
    }
  }
}
项目:camunda-bpm-reactor    文件:RingBufferProcessor.java   
private RingBufferProcessor(String name,
                            ExecutorService executor,
                            int bufferSize,
                            WaitStrategy waitStrategy,
                            boolean shared,
                            boolean autoCancel) {
  super(name, executor, autoCancel);

  this.ringBuffer = RingBuffer.create(
    shared ? ProducerType.MULTI : ProducerType.SINGLE,
    new EventFactory<MutableSignal<E>>() {
      @Override
      public MutableSignal<E> newInstance() {
        return new MutableSignal<E>();
      }
    },
    bufferSize,
    waitStrategy
  );

  this.recentSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
  this.barrier = ringBuffer.newBarrier();
  //ringBuffer.addGatingSequences(recentSequence);
}
项目:camunda-bpm-reactor    文件:RingBufferWorkProcessor.java   
private RingBufferWorkProcessor(String name,
                                ExecutorService executor,
                                int bufferSize,
                                WaitStrategy waitStrategy,
                                boolean share,
                                boolean autoCancel) {
  super(name, executor, autoCancel);

  this.ringBuffer = RingBuffer.create(
    share ? ProducerType.MULTI : ProducerType.SINGLE,
    new EventFactory<MutableSignal<E>>() {
      @Override
      public MutableSignal<E> newInstance() {
        return new MutableSignal<E>();
      }
    },
    bufferSize,
    waitStrategy
  );

  ringBuffer.addGatingSequences(workSequence);

}
项目:ddth-queue    文件:QndDisruptor2.java   
@SuppressWarnings("unchecked")
static void qndSingleThread(int numItems) {
    final AtomicLong COUNTER_RECEIVED = new AtomicLong(0);
    final Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent.FACTORY, 128,
            Executors.defaultThreadFactory(), ProducerType.MULTI, new YieldingWaitStrategy());
    disruptor.handleEventsWith(
            (event, sequence, endOfBatch) -> COUNTER_RECEIVED.incrementAndGet());
    disruptor.start();

    final long t = System.currentTimeMillis();
    for (int i = 0; i < numItems; i++) {
        disruptor.publishEvent((event, seq) -> event.set(seq));
    }
    long d = System.currentTimeMillis() - t;
    NumberFormat nf = NumberFormat.getInstance();
    System.out.println("========== qndSingleThread:");
    System.out.println("Sent: " + nf.format(numItems) + " / Received: "
            + nf.format(COUNTER_RECEIVED.get()) + " / Duration: " + d + " / Speed: "
            + NumberFormat.getInstance().format((numItems * 1000.0 / d)) + " items/sec");

    disruptor.shutdown();
}
项目:jstorm-0.9.6.3-    文件:DisruptorQueueImpl.java   
public DisruptorQueueImpl(String queueName, ProducerType producerType,
        int bufferSize, WaitStrategy wait) {
    this._queueName = PREFIX + queueName;
    _buffer = RingBuffer.create(producerType, new ObjectEventFactory(),
            bufferSize, wait);
    _consumer = new Sequence();
    _barrier = _buffer.newBarrier();
    _buffer.addGatingSequences(_consumer);
    if (producerType == ProducerType.SINGLE) {
        consumerStartedFlag = true;
    } else {
        // make sure we flush the pending messages in cache first
        if (bufferSize < 2) {
            throw new RuntimeException("QueueSize must >= 2");
        }
        try {
            publishDirect(FLUSH_CACHE, true);
        } catch (InsufficientCapacityException e) {
            throw new RuntimeException("This code should be unreachable!",
                    e);
        }
    }
}
项目:jstorm-0.9.6.3-    文件:RingBuffer.java   
/**
 * Create a new Ring Buffer with the specified producer type (SINGLE or MULTI)
 *
 * @param producerType producer type to use {@link ProducerType}.
 * @param factory used to create events within the ring buffer.
 * @param bufferSize number of elements to create within the ring buffer.
 * @param waitStrategy used to determine how to wait for new elements to become available.
 * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
 */
public static <E> RingBuffer<E> create(ProducerType    producerType,
                                       EventFactory<E> factory,
                                       int             bufferSize,
                                       WaitStrategy    waitStrategy)
{
    switch (producerType)
    {
    case SINGLE:
        return createSingleProducer(factory, bufferSize, waitStrategy);
    case MULTI:
        return createMultiProducer(factory, bufferSize, waitStrategy);
    default:
        throw new IllegalStateException(producerType.toString());
    }
}
项目:silverflash    文件:EventReactor.java   
/**
 * Start dispatching events
 * 
 * @return a Future that asyncrhonously notifies an observer when this EventReactor is ready
 */
@SuppressWarnings("unchecked")
public CompletableFuture<? extends EventReactor<T>> open() {
  if (isRunning.compareAndSet(false, true)) {
    CompletableFuture<EventReactor<T>> future = new CompletableFuture<>();
    this.disruptor =
        new Disruptor<>(EVENT_FACTORY, ringSize, threadFactory, ProducerType.MULTI,
            new BusySpinWaitStrategy());
    this.disruptor.handleEventsWith(this::handleEvent);
    this.ringBuffer = disruptor.start();

    // Starts a timer thread
    this.timer = new Timer();
    future.complete(this);
    return future;
  } else {
    return CompletableFuture.completedFuture(this);
  }
}
项目:learn_jstorm    文件:DisruptorQueueImpl.java   
public DisruptorQueueImpl(String queueName, ProducerType producerType,
        int bufferSize, WaitStrategy wait) {
    this._queueName = PREFIX + queueName;
    _buffer = RingBuffer.create(producerType, new ObjectEventFactory(),
            bufferSize, wait);
    _consumer = new Sequence();
    _barrier = _buffer.newBarrier();
    _buffer.addGatingSequences(_consumer);
    if (producerType == ProducerType.SINGLE) {
        consumerStartedFlag = true;
    } else {
        // make sure we flush the pending messages in cache first
        if (bufferSize < 2) {
            throw new RuntimeException("QueueSize must >= 2");
        }
        try {
            publishDirect(FLUSH_CACHE, true);
        } catch (InsufficientCapacityException e) {
            throw new RuntimeException("This code should be unreachable!",
                    e);
        }
    }
}
项目:learn_jstorm    文件:RingBuffer.java   
/**
 * Create a new Ring Buffer with the specified producer type (SINGLE or MULTI)
 *
 * @param producerType producer type to use {@link ProducerType}.
 * @param factory used to create events within the ring buffer.
 * @param bufferSize number of elements to create within the ring buffer.
 * @param waitStrategy used to determine how to wait for new elements to become available.
 * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
 */
public static <E> RingBuffer<E> create(ProducerType    producerType,
                                       EventFactory<E> factory,
                                       int             bufferSize,
                                       WaitStrategy    waitStrategy)
{
    switch (producerType)
    {
    case SINGLE:
        return createSingleProducer(factory, bufferSize, waitStrategy);
    case MULTI:
        return createMultiProducer(factory, bufferSize, waitStrategy);
    default:
        throw new IllegalStateException(producerType.toString());
    }
}
项目:jstrom    文件:DisruptorQueueImpl.java   
public DisruptorQueueImpl(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) {
    this._queueName = PREFIX + queueName;
    _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait);
    _consumer = new Sequence();
    _barrier = _buffer.newBarrier();
    _buffer.addGatingSequences(_consumer);
    if (producerType == ProducerType.SINGLE) {
        consumerStartedFlag = true;
    } else {
        // make sure we flush the pending messages in cache first
        if (bufferSize < 2) {
            throw new RuntimeException("QueueSize must >= 2");
        }
        try {
            publishDirect(FLUSH_CACHE, true);
        } catch (InsufficientCapacityException e) {
            throw new RuntimeException("This code should be unreachable!", e);
        }
    }
}
项目:jstrom    文件:DisruptorTest.java   
@Test
public void testLaterStartConsumer() throws InterruptedException {
    System.out.println("!!!!!!!!!!!!!!!Begin testLaterStartConsumer!!!!!!!!!!");
    final AtomicBoolean messageConsumed = new AtomicBoolean(false);

    // Set queue length to 1, so that the RingBuffer can be easily full
    // to trigger consumer blocking
    DisruptorQueue queue = createQueue("consumerHang", ProducerType.MULTI, 2);
    push(queue, 1);
    Runnable producer = new Producer(queue);
    Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
        long count = 0;

        @Override
        public void onEvent(Object obj, long sequence, boolean endOfBatch) throws Exception {

            messageConsumed.set(true);
            System.out.println("Consume " + count++);
        }
    });

    run(producer, 0, 0, consumer, 50);
    Assert.assertTrue("disruptor message is never consumed due to consumer thread hangs", messageConsumed.get());

    System.out.println("!!!!!!!!!!!!!!!!End testLaterStartConsumer!!!!!!!!!!");
}
项目:jstrom    文件:DisruptorTest.java   
@Test
public void testSingleProducer() throws InterruptedException {
    System.out.println("!!!!!!!!!!!!!!Begin testSingleProducer!!!!!!!!!!!!!!");
    final AtomicBoolean messageConsumed = new AtomicBoolean(false);

    // Set queue length to 1, so that the RingBuffer can be easily full
    // to trigger consumer blocking
    DisruptorQueue queue = createQueue("consumerHang", ProducerType.SINGLE, 1);
    push(queue, 1);
    Runnable producer = new Producer(queue);
    Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
        long count = 0;

        @Override
        public void onEvent(Object obj, long sequence, boolean endOfBatch) throws Exception {

            messageConsumed.set(true);
            System.out.println("Consume " + count++);
        }
    });

    run(producer, 0, 0, consumer, 50);
    Assert.assertTrue("disruptor message is never consumed due to consumer thread hangs", messageConsumed.get());

    System.out.println("!!!!!!!!!!!!!!End testSingleProducer!!!!!!!!!!!!!!");
}
项目:Scaled-ML    文件:AbstractParallelModule.java   
@Provides
@Singleton
@Named("disruptor")
@SuppressWarnings("Unchecked")
protected Disruptor<TwoPhaseEvent<T>> inputDisruptor(Provider<WorkHandler<TwoPhaseEvent<T>>> workHandlerProvider,
                                                     Provider<EventHandler<TwoPhaseEvent<T>>> evenHandlerProvider) {
    Disruptor<TwoPhaseEvent<T>> disruptor = new Disruptor<>(
            TwoPhaseEvent.factory(outputEventFactory()),
            options.ringSize(), threadsProvider,
            ProducerType.SINGLE, new SleepingWaitStrategy());
    WorkHandler<TwoPhaseEvent<T>>[] parsers = new WorkHandler[options.threads()];
    for (int i = 0; i < options.threads(); i++) {
        parsers[i] = workHandlerProvider.get();
    }
    disruptor.handleExceptionsWith(new FatalExceptionHandler());
    disruptor.handleEventsWithWorkerPool(parsers).then(evenHandlerProvider.get());
    return disruptor;
}
项目:Scaled-ML    文件:FeatureEngineeringModule.java   
@Provides
@Singleton
@Named("firstPassDisruptor")
public Disruptor<TwoPhaseEvent<Void>> firstPassDisruptor(
        Provider<WorkHandler<TwoPhaseEvent<Void>>> statisticsHandlerProvider) {
    Disruptor<TwoPhaseEvent<Void>> disruptor = new Disruptor<>(
            TwoPhaseEvent.factory(() -> null),
            options.ringSize(), threadsProvider,
            ProducerType.SINGLE, new SleepingWaitStrategy());
    WorkHandler<TwoPhaseEvent<Void>>[] parsers = new WorkHandler[options.threads()];
    for (int i = 0; i < options.threads(); i++) {
        parsers[i] = statisticsHandlerProvider.get();
    }
    disruptor.handleExceptionsWith(new FatalExceptionHandler());
    disruptor.handleEventsWithWorkerPool(parsers);
    return disruptor;
}
项目:Scaled-ML    文件:FeatureEngineeringModule.java   
@Provides
@Singleton
@Named("secondPassDisruptor")
public Disruptor<TwoPhaseEvent<SparseItem>> secondPassDisruptor(
        Provider<WorkHandler<TwoPhaseEvent<SparseItem>>> binningHandlerProvider,
        Provider<EventHandler<TwoPhaseEvent<SparseItem>>> outputHandlerProvider) {
    Disruptor<TwoPhaseEvent<SparseItem>> disruptor = new Disruptor<>(
            TwoPhaseEvent.factory(SparseItem::new),
            options.ringSize(), threadsProvider,
            ProducerType.SINGLE, new SleepingWaitStrategy());
    WorkHandler<TwoPhaseEvent<SparseItem>>[] parsers = new WorkHandler[options.threads()];
    for (int i = 0; i < options.threads(); i++) {
        parsers[i] = binningHandlerProvider.get();
    }
    disruptor.handleExceptionsWith(new FatalExceptionHandler());
    disruptor.handleEventsWithWorkerPool(parsers)
            .then(outputHandlerProvider.get());
    return disruptor;
}
项目:Tstream    文件:DisruptorQueueImpl.java   
public DisruptorQueueImpl(String queueName, ProducerType producerType,
        int bufferSize, WaitStrategy wait) {
    this._queueName = PREFIX + queueName;
    _buffer = RingBuffer.create(producerType, new ObjectEventFactory(),
            bufferSize, wait);
    _consumer = new Sequence();
    _barrier = _buffer.newBarrier();
    _buffer.addGatingSequences(_consumer);
    if (producerType == ProducerType.SINGLE) {
        consumerStartedFlag = true;
    } else {
        // make sure we flush the pending messages in cache first
        if (bufferSize < 2) {
            throw new RuntimeException("QueueSize must >= 2");
        }
        try {
            publishDirect(FLUSH_CACHE, true);
        } catch (InsufficientCapacityException e) {
            throw new RuntimeException("This code should be unreachable!",
                    e);
        }
    }
}
项目:Tstream    文件:RingBuffer.java   
/**
 * Create a new Ring Buffer with the specified producer type (SINGLE or MULTI)
 *
 * @param producerType producer type to use {@link ProducerType}.
 * @param factory used to create events within the ring buffer.
 * @param bufferSize number of elements to create within the ring buffer.
 * @param waitStrategy used to determine how to wait for new elements to become available.
 * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
 */
public static <E> RingBuffer<E> create(ProducerType    producerType,
                                       EventFactory<E> factory,
                                       int             bufferSize,
                                       WaitStrategy    waitStrategy)
{
    switch (producerType)
    {
    case SINGLE:
        return createSingleProducer(factory, bufferSize, waitStrategy);
    case MULTI:
        return createMultiProducer(factory, bufferSize, waitStrategy);
    default:
        throw new IllegalStateException(producerType.toString());
    }
}
项目:scala-playground    文件:Main.java   
public static void main(String[] args) {
  ExecutorService executor = Executors.newCachedThreadPool();
  int bufferSize = 1024;

  WaitStrategy ws = new BlockingWaitStrategy();
  Disruptor<CpuUsageEvent> disruptor = new Disruptor<>(factory, bufferSize, executor, ProducerType.SINGLE, ws);
  disruptor.handleEventsWith(handler);
  RingBuffer<CpuUsageEvent> ringBuffer = disruptor.start();

  publishEvents(ringBuffer);

  disruptor.shutdown();
  executor.shutdown();
}
项目:util4j    文件:FixedThreadPoolQueuesExecutor_mina_disruptor.java   
@SuppressWarnings("unchecked")
protected void initDis()
   {
    disruptorExecutor = new ThreadPoolExecutor(1,1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),new NamedThreadFactory("disruptor"));
       int bufferSize = 1024*1024;
       disruptor = new Disruptor<>(new QueueEventFactory(), bufferSize, disruptorExecutor,ProducerType.MULTI,YIELDING_WAIT);
       //注册消费者
       disruptor.handleEventsWithWorkerPool(new QueueEventHandler());
       disruptor.start();
   }
项目:hashsdn-controller    文件:DOMNotificationRouter.java   
@SuppressWarnings("unchecked")
private DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) {
    this.executor = Preconditions.checkNotNull(executor);

    disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, queueDepth, executor, ProducerType.MULTI, strategy);
    disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS);
    disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE);
    disruptor.start();
}
项目:jaf-examples    文件:DisruptorPublisher.java   
public DisruptorPublisher(int bufferSize, TestHandler handler) {
      this.handler = new HelloEventHandler(handler);

      EventFactory<HelloEvent> eventFactory = new HelloEventFactory();
int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必须是 2 的 N 次方;

      executor = Executors.newSingleThreadExecutor();
      disruptor = new Disruptor<HelloEvent>(
        eventFactory, ringBufferSize, Executors.defaultThreadFactory(),
        ProducerType.SINGLE, YIELDING_WAIT);
  }
项目:jaf-examples    文件:Main.java   
public static void main(String[] args) {
    EventFactory<HelloEvent> eventFactory = new HelloEventFactory();
    int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必须是 2 的 N 次方;

    Disruptor<HelloEvent> disruptor = new Disruptor<HelloEvent>(
            eventFactory, ringBufferSize, Executors.defaultThreadFactory(),
            ProducerType.SINGLE, new YieldingWaitStrategy());

    EventHandler<HelloEvent> eventHandler = new HelloEventHandler();
    disruptor.handleEventsWith(eventHandler, eventHandler);

    disruptor.start();

}
项目:Okra-Ax    文件:LogicProcessor.java   
public LogicProcessor(EventFactory<ConcurrentEvent> factory, EventHandler<ConcurrentEvent> handler, int rbSize, ExecutorService es, ProducerType pt, WaitStrategy ws) {
    this.disruptor = new Disruptor<>(factory, rbSize, es, pt, ws);
    this.disruptor.handleEventsWith(handler);
    //  disruptor.handleExceptionsWith();
    this.disruptor.start();
    this.msgQueue = newQueue();
}
项目:Okra-Ax    文件:DisruptorAdapterHandler.java   
@Override
        protected Disruptor<ConcurrentEvent> initialValue() {
            Disruptor<ConcurrentEvent> disruptor = new Disruptor<>(
                    ConcurrentEventFactory.DEFAULT, DEFAULT_RING_BUFFER_SIZE, CACHED_THREAD_POOL, ProducerType.SINGLE, new BlockingWaitStrategy());
            disruptor.handleEventsWith(new ConcurrentHandler());
//            disruptor.handleExceptionsWith();
            disruptor.start();
            return disruptor;
        }
项目:Okra-Ax    文件:DisruptorAdapterBy41xHandler.java   
@Override
        protected Disruptor<ConcurrentEvent> initialValue() {
            Disruptor<ConcurrentEvent> disruptor = new Disruptor<>(
                    ConcurrentEventFactory.DEFAULT, DEFAULT_RING_BUFFER_SIZE, CACHED_THREAD_POOL, ProducerType.SINGLE, new BlockingWaitStrategy());
            disruptor.handleEventsWith(new ConcurrentHandler());
//            disruptor.handleExceptionsWith();
            disruptor.start();
            return disruptor;
        }
项目:disruptor-code-analysis    文件:RingBuffer.java   
/**
 * Create a new Ring Buffer with the specified producer type (SINGLE or MULTI)
 *
 * @param producerType producer type to use {@link ProducerType}.
 * @param factory      used to create events within the ring buffer.
 * @param bufferSize   number of elements to create within the ring buffer.
 * @param waitStrategy used to determine how to wait for new elements to become available.
 * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
 */
public static <E> RingBuffer<E> create(ProducerType producerType, EventFactory<E> factory, int bufferSize,
                                       WaitStrategy waitStrategy) {
    switch (producerType) {
        case SINGLE:
            return createSingleProducer(factory, bufferSize, waitStrategy);
        case MULTI:
            return createMultiProducer(factory, bufferSize, waitStrategy);
        default:
            throw new IllegalStateException(producerType.toString());
    }
}
项目:disruptor-code-analysis    文件:OneToOneTranslatorThroughputTest.java   
@SuppressWarnings("unchecked")
public OneToOneTranslatorThroughputTest()
{
    Disruptor<ValueEvent> disruptor =
        new Disruptor<ValueEvent>(
            ValueEvent.EVENT_FACTORY,
            BUFFER_SIZE, executor,
            ProducerType.SINGLE,
            new YieldingWaitStrategy());
    disruptor.handleEventsWith(handler);
    this.ringBuffer = disruptor.start();
}
项目:disruptor-code-analysis    文件:SequencerTest.java   
@Parameters
public static Collection<Object[]> generateData()
{
    Object[][] allocators =
        {
            {ProducerType.SINGLE, new BlockingWaitStrategy()},
            {ProducerType.MULTI, new BlockingWaitStrategy()},
        };
    return Arrays.asList(allocators);
}
项目:disruptor-code-analysis    文件:SequencerTest.java   
private Sequencer newProducer(ProducerType producerType, int bufferSize, WaitStrategy waitStrategy)
{
    switch (producerType)
    {
        case SINGLE:
            return new SingleProducerSequencer(bufferSize, waitStrategy);
        case MULTI:
            return new MultiProducerSequencer(bufferSize, waitStrategy);
        default:
            throw new IllegalStateException(producerType.toString());
    }
}
项目:disruptor-code-analysis    文件:ShutdownOnFatalExceptionTest.java   
@SuppressWarnings("unchecked")
@Before
public void setUp()
{
    disruptor = new Disruptor<byte[]>(
        new ByteArrayFactory(256), 1024, Executors.newCachedThreadPool(), ProducerType.SINGLE,
        new BlockingWaitStrategy());
    disruptor.handleEventsWith(eventHandler);
    disruptor.setDefaultExceptionHandler(new FatalExceptionHandler());
}
项目:opencensus-java    文件:DisruptorEventQueue.java   
@SuppressWarnings({"deprecation", "unchecked", "varargs"})
private DisruptorEventQueue() {
  // Create new Disruptor for processing. Note that this uses a single thread for processing; this
  // ensures that the event handler can take unsynchronized actions whenever possible.
  disruptor =
      new Disruptor<DisruptorEvent>(
          new DisruptorEventFactory(),
          DISRUPTOR_BUFFER_SIZE,
          Executors.newSingleThreadExecutor(new DaemonThreadFactory("OpenCensus.Disruptor")),
          ProducerType.MULTI,
          new SleepingWaitStrategy());
  disruptor.handleEventsWith(new DisruptorEventHandler());
  disruptor.start();
  ringBuffer = disruptor.getRingBuffer();
}
项目:HeliosStreams    文件:MetricsMetaAPIImpl.java   
/**
 * Creates a new MetricsMetaAPIImpl
 * @param properties The configuration properties
 */
public MetricsMetaAPIImpl(final Properties properties) {
    dataSource = SQLCompilerDataSource.getInstance(properties);
    sqlWorker = dataSource.getSQLWorker();
    tagPredicateCache = new TagPredicateCache(sqlWorker);
    fjPool = new ManagedForkJoinPool(getClass().getSimpleName(), Runtime.getRuntime().availableProcessors(), true, JMXHelper.objectName(getClass()));
    metaReader = new DefaultMetaReader(sqlWorker);
    dispatcher = new WorkQueueDispatcher("MetricsMetaDispatcher", Runtime.getRuntime().availableProcessors(), 1024, this, ProducerType.MULTI, new LiteBlockingWaitStrategy());
    log.info("Dispatcher Alive: {}", dispatcher.alive());
}
项目:Electrons    文件:Dispatcher.java   
/**
 * 根据config初始化特殊通道
 *
 * @param symbol    事件
 * @param listeners 对应的监听器集合
 */
private void initSpecDisruptor(String symbol, List<ElectronsListener> listeners) {
    ExecutorService specPool = Executors.newFixedThreadPool(conf.getSpecCircuitNum(), new ThreadFactory() {

        final AtomicInteger cursor = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "Electrons Thread (from spec channel) : thread" + cursor.incrementAndGet());
        }
    });
    pools.add(specPool);

    Disruptor<ElectronsHolder> disruptor = new Disruptor<>(ElectronsHolder::new, conf.getSpecCircuitLen(), specPool, ProducerType.MULTI, new LiteBlockingWaitStrategy());
    disruptor.handleExceptionsWith(new ElecExceptionHandler("Spec Disruptor {" + symbol + "}"));

    //初始化管道并放入集合中
    SpecChannel specChannel = new SpecChannel(disruptor);
    if (conf.isBreaker()) {
        EventCountCircuitBreaker breaker = new EventCountCircuitBreaker(conf.getErrorNum(), conf.getPerUnit(), conf.getUnit(), conf.getCloseThreshold(), conf.getRest(), conf.getRestUnit());
        specChannel.setBreaker(breaker);
    }

    //构建listener顺序
    ListenerChainBuilderNew.buildChain(specChannel, listeners);

    channelMap.put(SPEC_CHANNEL_PREFIX + symbol, specChannel);
}
项目:Electrons    文件:Dispatcher.java   
/**
 * 初始化正常管道,任何情况下都会有
 *
 * @param pool 线程池
 */
private void initNormalChannel(ExecutorService pool) {
    Disruptor<ElectronsHolder> normalDis = new Disruptor<>(ElectronsHolder::new, conf.getCircuitLen(), pool, ProducerType.MULTI, new LiteBlockingWaitStrategy());
    WorkHandler[] workHandlers = new WorkHandler[conf.getCircuitNum()];
    Arrays.fill(workHandlers, (WorkHandler<ElectronsHolder>) electronsHolder -> electronsHolder.handle());
    normalDis.handleEventsWithWorkerPool(workHandlers);
    normalDis.handleExceptionsWith(new ElecExceptionHandler("Normal Disruptor"));

    //初始化channel
    Channel normalChannel = new NormalChannel(normalDis);
    //配置限流相关
    normalChannel.confLimitRate(conf.isLimitRate(), conf.getPermitsPerSecond(), conf.isWarmup(), conf.getWarmupPeriod(), conf.getWarmPeriodUnit());
    channelMap.put(NORMAL_CHANNEL_KEY, normalChannel);
}
项目:disruptorDemo    文件:Demo3.java   
public static void main(String[] args) throws InterruptedException {
        long beginTime=System.currentTimeMillis();

        int bufferSize=1024;
        ExecutorService executor=Executors.newFixedThreadPool(4);
        //������캯�����������������˽�����2��demo֮��Ϳ��¾������ˣ���������~
        Disruptor<TradeTransaction> disruptor=new Disruptor<TradeTransaction>(new EventFactory<TradeTransaction>() {
            @Override
            public TradeTransaction newInstance() {
                return new TradeTransaction();
            }
        }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());

        //ʹ��disruptor������������C1,C2
        EventHandlerGroup<TradeTransaction> handlerGroup=disruptor.handleEventsWith(new TradeTransactionVasConsumer(),new TradeTransactionInDBHandler());

        TradeTransactionJMSNotifyHandler jmsConsumer=new TradeTransactionJMSNotifyHandler();
        //������C1,C2����֮��ִ��JMS��Ϣ���Ͳ��� Ҳ���������ߵ�C3
        handlerGroup.then(jmsConsumer);


        disruptor.start();//����
        CountDownLatch latch=new CountDownLatch(1);
        //������׼��
        executor.submit(new TradeTransactionPublisher(latch, disruptor));
        latch.await();//�ȴ�����������.
        disruptor.shutdown();
        executor.shutdown();

        System.out.println("�ܺ�ʱ:"+(System.currentTimeMillis()-beginTime));
//      long tt= System.currentTimeMillis();
//      for (int i = 0; i < 1000; i++) {
//          int j=i;
//      }
//      System.out.println("�ܺ�ʱ:"+(System.currentTimeMillis()-tt));

    }