Java 类com.lmax.disruptor.WaitStrategy 实例源码

项目:eds    文件:PublishManager.java   
private void initDisruptor(int processors, int ringBufferSize) {
  LOG.info("eds client init disruptor with processors="
      + processors + " and ringBufferSize=" + ringBufferSize);

  executor = Executors.newFixedThreadPool(
      processors,
      new ThreadFactoryBuilder().setNameFormat("disruptor-executor-%d").build());

  final WaitStrategy waitStrategy = createWaitStrategy();
  ringBufferSize = sizeFor(ringBufferSize); // power of 2
  disruptor = new Disruptor<>(EdsRingBufferEvent.FACTORY, ringBufferSize, executor,
      ProducerType.MULTI, waitStrategy);

  EdsEventWorkHandler[] handlers = new EdsEventWorkHandler[processors];
  for (int i = 0; i < handlers.length; i++) {
    handlers[i] = new EdsEventWorkHandler();
  }
  // handlers number = threads number
  disruptor.handleEventsWithWorkerPool(handlers); // "handleEventsWith" just like topics , with multiple consumers

  disruptor.start();
}
项目:incubator-omid    文件:DisruptorModule.java   
@Override
protected void configure() {
    switch (config.getWaitStrategyEnum()) {
    // A low-cpu usage Disruptor configuration for using in local/test environments
    case LOW_CPU:
         bind(WaitStrategy.class).annotatedWith(Names.named("PersistenceStrategy")).to(BlockingWaitStrategy.class);
         bind(WaitStrategy.class).annotatedWith(Names.named("ReplyStrategy")).to(BlockingWaitStrategy.class);
         bind(WaitStrategy.class).annotatedWith(Names.named("RetryStrategy")).to(BlockingWaitStrategy.class);
         break;
    // The default high-cpu usage Disruptor configuration for getting high throughput on production environments
    case HIGH_THROUGHPUT:
    default:
         bind(WaitStrategy.class).annotatedWith(Names.named("PersistenceStrategy")).to(BusySpinWaitStrategy.class);
         bind(WaitStrategy.class).annotatedWith(Names.named("ReplyStrategy")).to(BusySpinWaitStrategy.class);
         bind(WaitStrategy.class).annotatedWith(Names.named("RetryStrategy")).to(YieldingWaitStrategy.class);
         break;
    }
    bind(RequestProcessor.class).to(RequestProcessorImpl.class).in(Singleton.class);
    bind(PersistenceProcessor.class).to(PersistenceProcessorImpl.class).in(Singleton.class);
    bind(ReplyProcessor.class).to(ReplyProcessorImpl.class).in(Singleton.class);
    bind(RetryProcessor.class).to(RetryProcessorImpl.class).in(Singleton.class);

}
项目:camunda-bpm-reactor    文件:RingBufferProcessor.java   
private RingBufferProcessor(String name,
                            ExecutorService executor,
                            int bufferSize,
                            WaitStrategy waitStrategy,
                            boolean shared,
                            boolean autoCancel) {
  super(name, executor, autoCancel);

  this.ringBuffer = RingBuffer.create(
    shared ? ProducerType.MULTI : ProducerType.SINGLE,
    new EventFactory<MutableSignal<E>>() {
      @Override
      public MutableSignal<E> newInstance() {
        return new MutableSignal<E>();
      }
    },
    bufferSize,
    waitStrategy
  );

  this.recentSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
  this.barrier = ringBuffer.newBarrier();
  //ringBuffer.addGatingSequences(recentSequence);
}
项目:camunda-bpm-reactor    文件:RingBufferWorkProcessor.java   
private RingBufferWorkProcessor(String name,
                                ExecutorService executor,
                                int bufferSize,
                                WaitStrategy waitStrategy,
                                boolean share,
                                boolean autoCancel) {
  super(name, executor, autoCancel);

  this.ringBuffer = RingBuffer.create(
    share ? ProducerType.MULTI : ProducerType.SINGLE,
    new EventFactory<MutableSignal<E>>() {
      @Override
      public MutableSignal<E> newInstance() {
        return new MutableSignal<E>();
      }
    },
    bufferSize,
    waitStrategy
  );

  ringBuffer.addGatingSequences(workSequence);

}
项目:jstorm-0.9.6.3-    文件:DisruptorQueueImpl.java   
public DisruptorQueueImpl(String queueName, ProducerType producerType,
        int bufferSize, WaitStrategy wait) {
    this._queueName = PREFIX + queueName;
    _buffer = RingBuffer.create(producerType, new ObjectEventFactory(),
            bufferSize, wait);
    _consumer = new Sequence();
    _barrier = _buffer.newBarrier();
    _buffer.addGatingSequences(_consumer);
    if (producerType == ProducerType.SINGLE) {
        consumerStartedFlag = true;
    } else {
        // make sure we flush the pending messages in cache first
        if (bufferSize < 2) {
            throw new RuntimeException("QueueSize must >= 2");
        }
        try {
            publishDirect(FLUSH_CACHE, true);
        } catch (InsufficientCapacityException e) {
            throw new RuntimeException("This code should be unreachable!",
                    e);
        }
    }
}
项目:jstorm-0.9.6.3-    文件:RingBuffer.java   
/**
 * Create a new Ring Buffer with the specified producer type (SINGLE or MULTI)
 *
 * @param producerType producer type to use {@link ProducerType}.
 * @param factory used to create events within the ring buffer.
 * @param bufferSize number of elements to create within the ring buffer.
 * @param waitStrategy used to determine how to wait for new elements to become available.
 * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
 */
public static <E> RingBuffer<E> create(ProducerType    producerType,
                                       EventFactory<E> factory,
                                       int             bufferSize,
                                       WaitStrategy    waitStrategy)
{
    switch (producerType)
    {
    case SINGLE:
        return createSingleProducer(factory, bufferSize, waitStrategy);
    case MULTI:
        return createMultiProducer(factory, bufferSize, waitStrategy);
    default:
        throw new IllegalStateException(producerType.toString());
    }
}
项目:learn_jstorm    文件:DisruptorQueueImpl.java   
public DisruptorQueueImpl(String queueName, ProducerType producerType,
        int bufferSize, WaitStrategy wait) {
    this._queueName = PREFIX + queueName;
    _buffer = RingBuffer.create(producerType, new ObjectEventFactory(),
            bufferSize, wait);
    _consumer = new Sequence();
    _barrier = _buffer.newBarrier();
    _buffer.addGatingSequences(_consumer);
    if (producerType == ProducerType.SINGLE) {
        consumerStartedFlag = true;
    } else {
        // make sure we flush the pending messages in cache first
        if (bufferSize < 2) {
            throw new RuntimeException("QueueSize must >= 2");
        }
        try {
            publishDirect(FLUSH_CACHE, true);
        } catch (InsufficientCapacityException e) {
            throw new RuntimeException("This code should be unreachable!",
                    e);
        }
    }
}
项目:learn_jstorm    文件:RingBuffer.java   
/**
 * Create a new Ring Buffer with the specified producer type (SINGLE or MULTI)
 *
 * @param producerType producer type to use {@link ProducerType}.
 * @param factory used to create events within the ring buffer.
 * @param bufferSize number of elements to create within the ring buffer.
 * @param waitStrategy used to determine how to wait for new elements to become available.
 * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
 */
public static <E> RingBuffer<E> create(ProducerType    producerType,
                                       EventFactory<E> factory,
                                       int             bufferSize,
                                       WaitStrategy    waitStrategy)
{
    switch (producerType)
    {
    case SINGLE:
        return createSingleProducer(factory, bufferSize, waitStrategy);
    case MULTI:
        return createMultiProducer(factory, bufferSize, waitStrategy);
    default:
        throw new IllegalStateException(producerType.toString());
    }
}
项目:jstrom    文件:DisruptorQueueImpl.java   
public DisruptorQueueImpl(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) {
    this._queueName = PREFIX + queueName;
    _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait);
    _consumer = new Sequence();
    _barrier = _buffer.newBarrier();
    _buffer.addGatingSequences(_consumer);
    if (producerType == ProducerType.SINGLE) {
        consumerStartedFlag = true;
    } else {
        // make sure we flush the pending messages in cache first
        if (bufferSize < 2) {
            throw new RuntimeException("QueueSize must >= 2");
        }
        try {
            publishDirect(FLUSH_CACHE, true);
        } catch (InsufficientCapacityException e) {
            throw new RuntimeException("This code should be unreachable!", e);
        }
    }
}
项目:Tstream    文件:DisruptorQueueImpl.java   
public DisruptorQueueImpl(String queueName, ProducerType producerType,
        int bufferSize, WaitStrategy wait) {
    this._queueName = PREFIX + queueName;
    _buffer = RingBuffer.create(producerType, new ObjectEventFactory(),
            bufferSize, wait);
    _consumer = new Sequence();
    _barrier = _buffer.newBarrier();
    _buffer.addGatingSequences(_consumer);
    if (producerType == ProducerType.SINGLE) {
        consumerStartedFlag = true;
    } else {
        // make sure we flush the pending messages in cache first
        if (bufferSize < 2) {
            throw new RuntimeException("QueueSize must >= 2");
        }
        try {
            publishDirect(FLUSH_CACHE, true);
        } catch (InsufficientCapacityException e) {
            throw new RuntimeException("This code should be unreachable!",
                    e);
        }
    }
}
项目:Tstream    文件:RingBuffer.java   
/**
 * Create a new Ring Buffer with the specified producer type (SINGLE or MULTI)
 *
 * @param producerType producer type to use {@link ProducerType}.
 * @param factory used to create events within the ring buffer.
 * @param bufferSize number of elements to create within the ring buffer.
 * @param waitStrategy used to determine how to wait for new elements to become available.
 * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
 */
public static <E> RingBuffer<E> create(ProducerType    producerType,
                                       EventFactory<E> factory,
                                       int             bufferSize,
                                       WaitStrategy    waitStrategy)
{
    switch (producerType)
    {
    case SINGLE:
        return createSingleProducer(factory, bufferSize, waitStrategy);
    case MULTI:
        return createMultiProducer(factory, bufferSize, waitStrategy);
    default:
        throw new IllegalStateException(producerType.toString());
    }
}
项目:smart-cqrs    文件:EventBus.java   
protected Disruptor<BaseEvent> createDisruptor(Class<?> eventClass, List<RegistedEventHandler> registedEventHandlers) {
    WaitStrategy waitStrategy = new BlockingWaitStrategy();
    // load the customized event bufferSize.
    int bufferSize = EventUtils.getEventBufferSize(eventClass);
    Disruptor<BaseEvent> disruptor =new Disruptor<BaseEvent>(new BaseEventFactory(), 
            bufferSize, Executors.newCachedThreadPool(), ProducerType.SINGLE, waitStrategy);
    List<BaseEventHandler> baseEventHandlers = Lists.newArrayList();
    EventType eventType = EventUtils.getEventType(eventClass);
    if(EventType.ORDER.equals(eventType)) {
        List<RegistedEventHandler> orderingHandlers = orderingRegistedEventHandlers(registedEventHandlers);
        baseEventHandlers.add(new BaseEventHandler(EventType.ORDER, orderingHandlers.toArray(new RegistedEventHandler[0])));
    } else if(EventType.CONCURRENCY.equals(eventType)) { 
        for(RegistedEventHandler registedEventHandler : registedEventHandlers) {
            baseEventHandlers.add(new BaseEventHandler(EventType.CONCURRENCY, registedEventHandler));
        }
    } else {
        throw new RuntimeException("The definition of event type is not correct.");
    }
    disruptor.handleEventsWith(baseEventHandlers.toArray(new BaseEventHandler[0]));
    disruptor.start();
    return disruptor;
}
项目:log4j2    文件:AsyncLogger.java   
private static WaitStrategy createWaitStrategy() {
    final String strategy = System.getProperty("AsyncLogger.WaitStrategy");
    LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
    if ("Sleep".equals(strategy)) {
        LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
        return new SleepingWaitStrategy();
    } else if ("Yield".equals(strategy)) {
        LOGGER.debug("disruptor event handler uses YieldingWaitStrategy");
        return new YieldingWaitStrategy();
    } else if ("Block".equals(strategy)) {
        LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
        return new BlockingWaitStrategy();
    }
    LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
    return new SleepingWaitStrategy();
}
项目:log4j2    文件:AsyncLoggerConfigHelper.java   
private static synchronized void initDisruptor() {
    if (disruptor != null) {
        LOGGER.trace("AsyncLoggerConfigHelper not starting new disruptor, using existing object. Ref count is {}.", count);
        return;
    }
    LOGGER.trace("AsyncLoggerConfigHelper creating new disruptor. Ref count is {}.", count);
    final int ringBufferSize = calculateRingBufferSize();
    final WaitStrategy waitStrategy = createWaitStrategy();
    executor = Executors.newSingleThreadExecutor(threadFactory);
    disruptor = new Disruptor<Log4jEventWrapper>(FACTORY, ringBufferSize,
            executor, ProducerType.MULTI, waitStrategy);
    final EventHandler<Log4jEventWrapper>[] handlers = new Log4jEventWrapperHandler[] {//
    new Log4jEventWrapperHandler() };
    final ExceptionHandler errorHandler = getExceptionHandler();
    disruptor.handleExceptionsWith(errorHandler);
    disruptor.handleEventsWith(handlers);

    LOGGER.debug(
            "Starting AsyncLoggerConfig disruptor with ringbuffer size={}, waitStrategy={}, exceptionHandler={}...",
            disruptor.getRingBuffer().getBufferSize(), waitStrategy.getClass().getSimpleName(), errorHandler);
    disruptor.start();
}
项目:jstorm    文件:DisruptorQueueImpl.java   
public DisruptorQueueImpl(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait, boolean isBatch, int batchSize, long flushMs) {
    _queueName = PREFIX + queueName;
    _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait);
    _consumer = new Sequence();
    _barrier = _buffer.newBarrier();
    _buffer.addGatingSequences(_consumer);
    _isBatch = isBatch;
    _cache = new ArrayList<>();
    _inputBatchSize = batchSize;
    if (_isBatch) {
        _batcher = new ThreadLocalBatch();
        _flusher = new DisruptorFlusher(Math.max(flushMs, 1));
        _flusher.start();
    } else {
        _batcher = null;
    }
}
项目:jstorm    文件:NettyUnitTest.java   
private IConnection initNettyServer(int port) {
    ConcurrentHashMap<Integer, DisruptorQueue> deserializeQueues = new ConcurrentHashMap<Integer, DisruptorQueue>();
    //ConcurrentHashMap<Integer, DisruptorQueue> deserializeCtrlQueues = new ConcurrentHashMap<Integer, DisruptorQueue>();

    WaitStrategy wait = (WaitStrategy)Utils.newInstance("com.lmax.disruptor.TimeoutBlockingWaitStrategy", 5, TimeUnit.MILLISECONDS);
    DisruptorQueue recvControlQueue = DisruptorQueue.mkInstance("Dispatch-control", ProducerType.MULTI,
            256, wait, false, 0, 0);
    Set<Integer> taskSet = new HashSet<Integer>();
    taskSet.add(1);
    IConnection server = context.bind(null, port, deserializeQueues, recvControlQueue, true, taskSet);

    WaitStrategy waitStrategy = new BlockingWaitStrategy();
    DisruptorQueue recvQueue = DisruptorQueue.mkInstance("NettyUnitTest", ProducerType.SINGLE, 1024, waitStrategy, false, 0, 0);
    server.registerQueue(task, recvQueue);

    return server;
}
项目:hashsdn-controller    文件:DOMNotificationRouter.java   
@SuppressWarnings("unchecked")
private DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) {
    this.executor = Preconditions.checkNotNull(executor);

    disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, queueDepth, executor, ProducerType.MULTI, strategy);
    disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS);
    disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE);
    disruptor.start();
}
项目:hashsdn-controller    文件:DOMNotificationRouter.java   
public static DOMNotificationRouter create(final int queueDepth, final long spinTime, final long parkTime, final TimeUnit unit) {
    Preconditions.checkArgument(Long.lowestOneBit(queueDepth) == Long.highestOneBit(queueDepth),
            "Queue depth %s is not power-of-two", queueDepth);
    final ExecutorService executor = Executors.newCachedThreadPool();
    final WaitStrategy strategy = PhasedBackoffWaitStrategy.withLock(spinTime, parkTime, unit);

    return new DOMNotificationRouter(executor, queueDepth, strategy);
}
项目:incubator-omid    文件:ReplyProcessorImpl.java   
@Inject
ReplyProcessorImpl(@Named("ReplyStrategy") WaitStrategy strategy,
        MetricsRegistry metrics, Panicker panicker, ObjectPool<Batch> batchPool) {

    // ------------------------------------------------------------------------------------------------------------
    // Disruptor initialization
    // ------------------------------------------------------------------------------------------------------------

    ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("reply-%d");
    this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory.build());

    this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, MULTI, strategy);
    disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker));
    disruptor.handleEventsWith(this);
    this.replyRing = disruptor.start();

    // ------------------------------------------------------------------------------------------------------------
    // Attribute initialization
    // ------------------------------------------------------------------------------------------------------------

    this.batchPool = batchPool;
    this.nextIDToHandle.set(0);
    this.futureEvents = new PriorityQueue<>(10, new Comparator<ReplyBatchEvent>() {
        public int compare(ReplyBatchEvent replyBatchEvent1, ReplyBatchEvent replyBatchEvent2) {
            return Long.compare(replyBatchEvent1.getBatchSequence(), replyBatchEvent2.getBatchSequence());
        }
    });

    // Metrics config
    this.abortMeter = metrics.meter(name("tso", "aborts"));
    this.commitMeter = metrics.meter(name("tso", "commits"));
    this.timestampMeter = metrics.meter(name("tso", "timestampAllocation"));
    this.fenceMeter = metrics.meter(name("tso", "fences"));

    LOG.info("ReplyProcessor initialized");

}
项目:incubator-omid    文件:RetryProcessorImpl.java   
@Inject
RetryProcessorImpl(@Named("RetryStrategy") WaitStrategy strategy,
                   MetricsRegistry metrics,
                   CommitTable commitTable,
                   ReplyProcessor replyProc,
                   Panicker panicker,
                   ObjectPool<Batch> batchPool)
        throws InterruptedException, ExecutionException, IOException {

    // ------------------------------------------------------------------------------------------------------------
    // Disruptor initialization
    // ------------------------------------------------------------------------------------------------------------

    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("retry-%d").build();
    this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory);

    this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, SINGLE, strategy);
    disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This must be before handleEventsWith()
    disruptor.handleEventsWith(this);
    this.retryRing = disruptor.start();

    // ------------------------------------------------------------------------------------------------------------
    // Attribute initialization
    // ------------------------------------------------------------------------------------------------------------

    this.commitTableClient = commitTable.getClient();
    this.replyProc = replyProc;
    this.batchPool = batchPool;

    // Metrics configuration
    this.txAlreadyCommittedMeter = metrics.meter(name("tso", "retries", "commits", "tx-already-committed"));
    this.invalidTxMeter = metrics.meter(name("tso", "retries", "aborts", "tx-invalid"));
    this.noCTFoundMeter = metrics.meter(name("tso", "retries", "aborts", "tx-without-commit-timestamp"));

    LOG.info("RetryProcessor initialized");

}
项目:disruptor-code-analysis    文件:Disruptor.java   
/**
 * Create a new Disruptor.
 *
 * @param eventFactory   the factory to create events in the ring buffer.
 * @param ringBufferSize the size of the ring buffer, must be power of 2.
 * @param threadFactory  a {@link ThreadFactory} to create threads for processors.
 * @param producerType   the claim strategy to use for the ring buffer.
 * @param waitStrategy   the wait strategy to use for the ring buffer.
 */
public Disruptor(
        final EventFactory<T> eventFactory,
        final int ringBufferSize,
        final ThreadFactory threadFactory,
        final ProducerType producerType,
        final WaitStrategy waitStrategy) {
    this(RingBuffer.create(
            producerType, eventFactory, ringBufferSize, waitStrategy),
            new BasicExecutor(threadFactory));
}
项目:vulcan    文件:AvroWriterBuilder.java   
@Override
public OptionalSteps withWaitStrategy(WaitStrategy waitStrategy) {
  if (waitStrategy != null) {
    this.waitStrategy = waitStrategy;
  } else {
    log.warn("Tried to configure the waiting strategy with a null value");
  }
  return this;
}
项目:Camel    文件:DisruptorWaitStrategyCreationTest.java   
@Test
public void testCreateWaitStrategyInstance() throws Exception {
    for (final DisruptorWaitStrategy strategy : DisruptorWaitStrategy.values()) {
        final WaitStrategy waitStrategyInstance = strategy.createWaitStrategyInstance();

        assertNotNull(waitStrategyInstance);
        assertTrue(waitStrategyInstance instanceof WaitStrategy);
    }
}
项目:camunda-bpm-reactor    文件:Environment.java   
public RoundRobinSupplier(int poolsize,
                          String name,
                          int bufferSize,
                          Consumer<Throwable> errorHandler,
                          ProducerType producerType,
                          WaitStrategy waitStrategy) {
  this.poolsize = poolsize;
  this.name = name;
  this.bufferSize = bufferSize;
  this.errorHandler = errorHandler;
  this.producerType = producerType;
  this.waitStrategy = waitStrategy;
  dispatchers = new Dispatcher[poolsize];
  terminated = false;
}
项目:jstorm-0.9.6.3-    文件:Worker.java   
private AsyncLoopThread startDispatchThread() {
    Map stormConf = workerData.getStormConf();

    int queue_size = Utils.getInt(
            stormConf.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE), 1024);
    WaitStrategy waitStrategy = (WaitStrategy) Utils
            .newInstance((String) stormConf
                    .get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY));
    DisruptorQueue recvQueue = DisruptorQueue.mkInstance("Dispatch", ProducerType.MULTI,
            queue_size, waitStrategy);
    // stop  consumerStarted
    //recvQueue.consumerStarted();

    IContext context = workerData.getContext();
    String topologyId = workerData.getTopologyId();

    IConnection recvConnection = context.bind(topologyId,
            workerData.getPort());
    recvConnection.registerQueue(recvQueue);

    RunnableCallback recvDispather = new VirtualPortDispatch(workerData,
            recvConnection, recvQueue);

    AsyncLoopThread vthread = new AsyncLoopThread(recvDispather, false,
            Thread.MAX_PRIORITY, false);

    return vthread;
}
项目:jstorm-0.9.6.3-    文件:Task.java   
public DisruptorQueue registerDisruptorQueue() {
    int queueSize = JStormUtils.parseInt(
            stormConf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE), 256);

    WaitStrategy waitStrategy = (WaitStrategy) Utils
            .newInstance((String) stormConf
                    .get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY));
    DisruptorQueue queue = DisruptorQueue.mkInstance("TaskDeserialize", ProducerType.SINGLE,
            queueSize, waitStrategy);

    deserializeQueues.put(taskid, queue);

    return queue;
}
项目:jstorm-0.9.6.3-    文件:TaskTransfer.java   
public TaskTransfer(String taskName, 
        KryoTupleSerializer serializer, TaskStatus taskStatus,
        WorkerData workerData) {
    this.taskName = taskName;
    this.serializer = serializer;
    this.taskStatus = taskStatus;
    this.storm_conf = workerData.getConf();
    this.transferQueue = workerData.getTransferQueue();
    this.innerTaskTransfer = workerData.getInnerTaskTransfer();

    int queue_size = Utils.getInt(storm_conf
            .get(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE));
    WaitStrategy waitStrategy = (WaitStrategy) Utils
            .newInstance((String) storm_conf
                    .get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY));
    this.serializeQueue = DisruptorQueue.mkInstance(taskName, ProducerType.MULTI, 
            queue_size, waitStrategy);
    this.serializeQueue.consumerStarted();

    String taskId = taskName.substring(taskName.indexOf(":") + 1);
    Metrics.registerQueue(taskName, MetricDef.SERIALIZE_QUEUE, serializeQueue, taskId, Metrics.MetricType.TASK);
    timer = Metrics.registerTimer(taskName, MetricDef.SERIALIZE_TIME, taskId, Metrics.MetricType.TASK); 

    serializeThread = new AsyncLoopThread(new TransferRunnable());
    LOG.info("Successfully start TaskTransfer thread");

}
项目:jstorm-0.9.6.3-    文件:NettyUnitTest.java   
@Test
public void test_small_message() {
    System.out.println("!!!!!!!!Start test_small_message !!!!!!!!!!!");
    String req_msg = "Aloha is the most Hawaiian word.";

    IConnection server = null;
    IConnection client = null;

    server = context.bind(null, port);

    WaitStrategy waitStrategy = (WaitStrategy) Utils
            .newInstance((String) storm_conf
                    .get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY));
    DisruptorQueue recvQueue = DisruptorQueue.mkInstance(
            "NettyUnitTest", ProducerType.SINGLE, 1024, waitStrategy);
    server.registerQueue(recvQueue);

    client = context.connect(null, "localhost", port);

    List<TaskMessage> list = new ArrayList<TaskMessage>();
    TaskMessage message = new TaskMessage(task, req_msg.getBytes());
    list.add(message);

    client.send(message);

    TaskMessage recv = server.recv(0);
    Assert.assertEquals(req_msg, new String(recv.message()));

    System.out.println("!!!!!!!!!!!!!!!!!!Test one time!!!!!!!!!!!!!!!!!");

    server.close();
    client.close();


    System.out.println("!!!!!!!!!!!!End test_small_message!!!!!!!!!!!!!");
}
项目:jstorm-0.9.6.3-    文件:NettyUnitTest.java   
@Test
public void test_large_msg() {
    System.out.println("!!!!!!!!!!start larget message test!!!!!!!!");
    String req_msg = setupLargMsg();
    System.out.println("!!!!Finish batch data, size:" + req_msg.length()
            + "!!!!");

    IConnection server = null;
    IConnection client = null;

    server = context.bind(null, port);

    WaitStrategy waitStrategy = (WaitStrategy) Utils
            .newInstance((String) storm_conf
                    .get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY));
    DisruptorQueue recvQueue = DisruptorQueue.mkInstance(
            "NettyUnitTest", ProducerType.SINGLE, 1024, waitStrategy);
    server.registerQueue(recvQueue);

    client = context.connect(null, "localhost", port);

    List<TaskMessage> list = new ArrayList<TaskMessage>();
    TaskMessage message = new TaskMessage(task, req_msg.getBytes());
    list.add(message);

    LOG.info("Client send data");
    client.send(message);

    TaskMessage recv = server.recv(0);
    Assert.assertEquals(req_msg, new String(recv.message()));

    client.close();
    server.close();
    System.out.println("!!!!!!!!!!End larget message test!!!!!!!!");
}
项目:jstorm-0.9.6.3-    文件:NettyUnitTest.java   
@Test
public void test_server_delay() throws InterruptedException {
    System.out.println("!!!!!!!!!!Start delay message test!!!!!!!!");
    String req_msg = setupLargMsg();


    IConnection server = null;
    IConnection client = null;

    server = context.bind(null, port);

    WaitStrategy waitStrategy = (WaitStrategy) Utils
            .newInstance((String) storm_conf
                    .get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY));
    DisruptorQueue recvQueue = DisruptorQueue.mkInstance(
            "NettyUnitTest", ProducerType.SINGLE, 1024, waitStrategy);
    server.registerQueue(recvQueue);

    client = context.connect(null, "localhost", port);

    List<TaskMessage> list = new ArrayList<TaskMessage>();
    TaskMessage message = new TaskMessage(task, req_msg.getBytes());
    list.add(message);

    LOG.info("Client send data");
    client.send(message);
    Thread.sleep(1000);

    TaskMessage recv = server.recv(0);
    Assert.assertEquals(req_msg, new String(recv.message()));

    server.close();
    client.close();
    System.out.println("!!!!!!!!!!End delay message test!!!!!!!!");
}
项目:jstorm-0.9.6.3-    文件:MultiProducerSequencer.java   
/**
 * Construct a Sequencer with the selected wait strategy and buffer size.
 *
 * @param bufferSize the size of the buffer that this will sequence over.
 * @param waitStrategy for those waiting on sequences.
 */
public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
{
    super(bufferSize, waitStrategy);
    availableBuffer = new int[bufferSize];
    indexMask = bufferSize - 1;
    indexShift = Util.log2(bufferSize);
    initialiseAvailableBuffer();
}
项目:jstorm-0.9.6.3-    文件:DisruptorQueue.java   
public static DisruptorQueue mkInstance(String queueName,
        ProducerType producerType, int bufferSize, WaitStrategy wait) {
    if (CAPACITY_LIMITED == true) {
        return new DisruptorQueueImpl(queueName, producerType, bufferSize,
                wait);
    } else {
        return new DisruptorWrapBlockingQueue(queueName, producerType,
                bufferSize, wait);
    }
}
项目:learn_jstorm    文件:Worker.java   
private AsyncLoopThread startDispatchThread() {
    Map stormConf = workerData.getStormConf();

    int queue_size = Utils.getInt(
            stormConf.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE), 1024);
    WaitStrategy waitStrategy = (WaitStrategy) Utils
            .newInstance((String) stormConf
                    .get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY));
    DisruptorQueue recvQueue = DisruptorQueue.mkInstance("Dispatch", ProducerType.MULTI,
            queue_size, waitStrategy);
    // stop  consumerStarted
    //recvQueue.consumerStarted();

    IContext context = workerData.getContext();
    String topologyId = workerData.getTopologyId();

    IConnection recvConnection = context.bind(topologyId,
            workerData.getPort());
    recvConnection.registerQueue(recvQueue);

    RunnableCallback recvDispather = new VirtualPortDispatch(workerData,
            recvConnection, recvQueue);

    AsyncLoopThread vthread = new AsyncLoopThread(recvDispather, false,
            Thread.MAX_PRIORITY, false);

    return vthread;
}
项目:learn_jstorm    文件:Task.java   
public DisruptorQueue registerDisruptorQueue() {
    int queueSize = JStormUtils.parseInt(
            stormConf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE), 256);

    WaitStrategy waitStrategy = (WaitStrategy) Utils
            .newInstance((String) stormConf
                    .get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY));
    DisruptorQueue queue = DisruptorQueue.mkInstance("TaskDeserialize", ProducerType.SINGLE,
            queueSize, waitStrategy);

    deserializeQueues.put(taskid, queue);

    return queue;
}
项目:learn_jstorm    文件:TaskTransfer.java   
public TaskTransfer(String taskName, 
        KryoTupleSerializer serializer, TaskStatus taskStatus,
        WorkerData workerData) {
    this.taskName = taskName;
    this.serializer = serializer;
    this.taskStatus = taskStatus;
    this.storm_conf = workerData.getConf();
    this.transferQueue = workerData.getTransferQueue();
    this.innerTaskTransfer = workerData.getInnerTaskTransfer();

    int queue_size = Utils.getInt(storm_conf
            .get(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE));
    WaitStrategy waitStrategy = (WaitStrategy) Utils
            .newInstance((String) storm_conf
                    .get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY));
    this.serializeQueue = DisruptorQueue.mkInstance(taskName, ProducerType.MULTI, 
            queue_size, waitStrategy);
    this.serializeQueue.consumerStarted();

    String taskId = taskName.substring(taskName.indexOf(":") + 1);
    Metrics.registerQueue(taskName, MetricDef.SERIALIZE_QUEUE, serializeQueue, taskId, Metrics.MetricType.TASK);
    timer = Metrics.registerTimer(taskName, MetricDef.SERIALIZE_TIME, taskId, Metrics.MetricType.TASK); 

    serializeThread = new AsyncLoopThread(new TransferRunnable());
    LOG.info("Successfully start TaskTransfer thread");

}
项目:learn_jstorm    文件:NettyUnitTest.java   
@Test
public void test_small_message() {
    System.out.println("!!!!!!!!Start test_small_message !!!!!!!!!!!");
    String req_msg = "Aloha is the most Hawaiian word.";

    IConnection server = null;
    IConnection client = null;

    server = context.bind(null, port);

    WaitStrategy waitStrategy = (WaitStrategy) Utils
            .newInstance((String) storm_conf
                    .get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY));
    DisruptorQueue recvQueue = DisruptorQueue.mkInstance(
            "NettyUnitTest", ProducerType.SINGLE, 1024, waitStrategy);
    server.registerQueue(recvQueue);

    client = context.connect(null, "localhost", port);

    List<TaskMessage> list = new ArrayList<TaskMessage>();
    TaskMessage message = new TaskMessage(task, req_msg.getBytes());
    list.add(message);

    client.send(message);

    TaskMessage recv = server.recv(0);
    Assert.assertEquals(req_msg, new String(recv.message()));

    System.out.println("!!!!!!!!!!!!!!!!!!Test one time!!!!!!!!!!!!!!!!!");

    server.close();
    client.close();


    System.out.println("!!!!!!!!!!!!End test_small_message!!!!!!!!!!!!!");
}
项目:learn_jstorm    文件:NettyUnitTest.java   
@Test
public void test_large_msg() {
    System.out.println("!!!!!!!!!!start larget message test!!!!!!!!");
    String req_msg = setupLargMsg();
    System.out.println("!!!!Finish batch data, size:" + req_msg.length()
            + "!!!!");

    IConnection server = null;
    IConnection client = null;

    server = context.bind(null, port);

    WaitStrategy waitStrategy = (WaitStrategy) Utils
            .newInstance((String) storm_conf
                    .get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY));
    DisruptorQueue recvQueue = DisruptorQueue.mkInstance(
            "NettyUnitTest", ProducerType.SINGLE, 1024, waitStrategy);
    server.registerQueue(recvQueue);

    client = context.connect(null, "localhost", port);

    List<TaskMessage> list = new ArrayList<TaskMessage>();
    TaskMessage message = new TaskMessage(task, req_msg.getBytes());
    list.add(message);

    LOG.info("Client send data");
    client.send(message);

    TaskMessage recv = server.recv(0);
    Assert.assertEquals(req_msg, new String(recv.message()));

    client.close();
    server.close();
    System.out.println("!!!!!!!!!!End larget message test!!!!!!!!");
}
项目:learn_jstorm    文件:NettyUnitTest.java   
@Test
public void test_server_delay() throws InterruptedException {
    System.out.println("!!!!!!!!!!Start delay message test!!!!!!!!");
    String req_msg = setupLargMsg();


    IConnection server = null;
    IConnection client = null;

    server = context.bind(null, port);

    WaitStrategy waitStrategy = (WaitStrategy) Utils
            .newInstance((String) storm_conf
                    .get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY));
    DisruptorQueue recvQueue = DisruptorQueue.mkInstance(
            "NettyUnitTest", ProducerType.SINGLE, 1024, waitStrategy);
    server.registerQueue(recvQueue);

    client = context.connect(null, "localhost", port);

    List<TaskMessage> list = new ArrayList<TaskMessage>();
    TaskMessage message = new TaskMessage(task, req_msg.getBytes());
    list.add(message);

    LOG.info("Client send data");
    client.send(message);
    Thread.sleep(1000);

    TaskMessage recv = server.recv(0);
    Assert.assertEquals(req_msg, new String(recv.message()));

    server.close();
    client.close();
    System.out.println("!!!!!!!!!!End delay message test!!!!!!!!");
}
项目:learn_jstorm    文件:MultiProducerSequencer.java   
/**
 * Construct a Sequencer with the selected wait strategy and buffer size.
 *
 * @param bufferSize the size of the buffer that this will sequence over.
 * @param waitStrategy for those waiting on sequences.
 */
public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
{
    super(bufferSize, waitStrategy);
    availableBuffer = new int[bufferSize];
    indexMask = bufferSize - 1;
    indexShift = Util.log2(bufferSize);
    initialiseAvailableBuffer();
}
项目:learn_jstorm    文件:DisruptorQueue.java   
public static DisruptorQueue mkInstance(String queueName,
        ProducerType producerType, int bufferSize, WaitStrategy wait) {
    if (CAPACITY_LIMITED == true) {
        return new DisruptorQueueImpl(queueName, producerType, bufferSize,
                wait);
    } else {
        return new DisruptorWrapBlockingQueue(queueName, producerType,
                bufferSize, wait);
    }
}