Java 类com.lmax.disruptor.EventTranslator 实例源码

项目:util4j    文件:FixedThreadPoolQueuesExecutor_mina_disruptor.java   
protected void publishQueueTask(final String queueName,final Task task)
    {
//       RingBuffer<QueueEvent> ringBuffer=disruptor.getRingBuffer();
//         long sequence = ringBuffer.next(); 
//         try
//         {
//           QueueEvent event = ringBuffer.get(sequence);
//           event.setQueueName(queueName);
//             event.getTasks().add(task);
//         }finally
//         {
//             ringBuffer.publish(sequence);
//         }
        disruptor.publishEvent(new EventTranslator<QueueEvent>() {
            @Override
            public void translateTo(QueueEvent event, long sequence) {
                event.setQueueName(queueName);
                event.getTasks().add(task);
            }
        });
    }
项目:util4j    文件:FixedThreadPoolQueuesExecutor_mina_disruptor.java   
protected void publishQueueTask(final String queueName,final List<Task> tasks)
    {
//      RingBuffer<QueueEvent> ringBuffer=disruptor.getRingBuffer();
//        long sequence = ringBuffer.next(); 
//        try
//        {
//              QueueEvent event = ringBuffer.get(sequence);
//              event.setQueueName(queueName);
//            event.getTasks().addAll(tasks);
//        }finally
//        {
//            ringBuffer.publish(sequence);
//        }
        disruptor.publishEvent(new EventTranslator<QueueEvent>() {
            @Override
            public void translateTo(QueueEvent event, long sequence) {
                event.setQueueName(queueName);
                event.getTasks().addAll(tasks);
            }
        });
    }
项目:disruptor-code-analysis    文件:DisruptorTest.java   
private TestEvent publishEvent() throws InterruptedException, BrokenBarrierException
{
    if (ringBuffer == null)
    {
        ringBuffer = disruptor.start();

        for (DelayedEventHandler eventHandler : delayedEventHandlers)
        {
            eventHandler.awaitStart();
        }
    }

    disruptor.publishEvent(
        new EventTranslator<TestEvent>()
        {
            @Override
            public void translateTo(final TestEvent event, final long sequence)
            {
                lastPublishedEvent = event;
            }
        });

    return lastPublishedEvent;
}
项目:jstorm-0.9.6.3-    文件:RingBuffer.java   
private void translateAndPublishBatch(final EventTranslator<E>[] translators, int batchStartsAt,
                                      final int batchSize, final long finalSequence)
{
    final long initialSequence = finalSequence - (batchSize - 1);
    try
    {
        long sequence = initialSequence;
        final int batchEndsAt = batchStartsAt + batchSize;
        for (int i = batchStartsAt; i < batchEndsAt; i++)
        {
            final EventTranslator<E> translator = translators[i];
            translator.translateTo(get(sequence), sequence++);
        }
    }
    finally
    {
        sequencer.publish(initialSequence, finalSequence);
    }
}
项目:learn_jstorm    文件:RingBuffer.java   
private void translateAndPublishBatch(final EventTranslator<E>[] translators, int batchStartsAt,
                                      final int batchSize, final long finalSequence)
{
    final long initialSequence = finalSequence - (batchSize - 1);
    try
    {
        long sequence = initialSequence;
        final int batchEndsAt = batchStartsAt + batchSize;
        for (int i = batchStartsAt; i < batchEndsAt; i++)
        {
            final EventTranslator<E> translator = translators[i];
            translator.translateTo(get(sequence), sequence++);
        }
    }
    finally
    {
        sequencer.publish(initialSequence, finalSequence);
    }
}
项目:Tstream    文件:RingBuffer.java   
private void translateAndPublishBatch(final EventTranslator<E>[] translators, int batchStartsAt,
                                      final int batchSize, final long finalSequence)
{
    final long initialSequence = finalSequence - (batchSize - 1);
    try
    {
        long sequence = initialSequence;
        final int batchEndsAt = batchStartsAt + batchSize;
        for (int i = batchStartsAt; i < batchEndsAt; i++)
        {
            final EventTranslator<E> translator = translators[i];
            translator.translateTo(get(sequence), sequence++);
        }
    }
    finally
    {
        sequencer.publish(initialSequence, finalSequence);
    }
}
项目:disruptor-spring-manager    文件:BaseDisruptorConfiguratorTest.java   
@Before
public void setup(){
    disruptorConfigurator = new BaseDisruptorConfig() {

        @Override
        public void disruptorExceptionHandler() {}

        @Override
        public void disruptorEventHandler() {}

        @Override
        public void publish(EventTranslator eventTranslator) {}
    };

    disruptorConfigurator.setThreadName(THREAD_NAME);
    disruptorConfigurator.setProducerType(ProducerType.SINGLE);
    disruptorConfigurator.setRingBufferSize(ringBufferSize);
    disruptorConfigurator.setWaitStrategyType(WaitStrategyType.BLOCKING);
    disruptorConfigurator.setEventFactory(new SampleEventFactory());

    disruptorConfigurator.init();
}
项目:disruptor-spring-manager    文件:DefaultDisruptorConfigTest.java   
/**
 * Publisher -> Ring buffer ---> Consumer A 
 * Look at the graph that gets printed by log4j.
 */
@Test
public void test_publish_single_eventprocessor_topology() {
    ConsumerA consumerA = new ConsumerA();

    EventHandlerChain<String> eventHandlerChain1 = new EventHandlerChain<String>(new EventHandler[]{consumerA});

    disruptorConfig.setEventHandlerChain(new EventHandlerChain[]{eventHandlerChain1});
    disruptorConfig.init();

    disruptorConfig.publish(new EventTranslator<String>() {

        @Override
        public void translateTo(String event, long sequence) {
            event = "hi there";
        }
    });
}
项目:disruptor-spring-manager    文件:DefaultDisruptorConfigTest.java   
/**
 * Publisher -> Ring buffer ---> Consumer A -> Consumer B1 -> Consumer D 
 * Look at the graph that gets printed by log4j.
 */
@Test
public void test_publish_simple_eventprocessor_topology() {
    ConsumerA consumerA = new ConsumerA();
    ConsumerB1 consumerB1 = new ConsumerB1();
    ConsumerD consumerD = new ConsumerD();

    EventHandlerChain<String> eventHandlerChain1 = new EventHandlerChain<String>(new EventHandler[]{consumerA}, new EventHandler[]{consumerB1});
    EventHandlerChain<String> eventHandlerChain2 = new EventHandlerChain<String>(new EventHandler[]{consumerB1}, new EventHandler[]{consumerD});

    disruptorConfig.setEventHandlerChain(new EventHandlerChain[]{eventHandlerChain1, eventHandlerChain2});
    disruptorConfig.init();

    disruptorConfig.publish(new EventTranslator<String>() {

        @Override
        public void translateTo(String event, long sequence) {
            event = "hi there";
        }
    });
}
项目:disruptor-spring-manager    文件:DefaultDisruptorConfigTest.java   
/** 
 *                                            Consumer B1  
 *                                           /           \
 * Publisher -> Ring buffer ---> Consumer A -             -> Consumer D 
 *                                           \           /
 *                                            Consumer B2
 * 
 * Look at the graph that gets printed by log4j.
 */
@Test
public void test_publish_diamond_eventprocessor_topology() {
    ConsumerA consumerA = new ConsumerA();
    ConsumerB1 consumerB1 = new ConsumerB1();
    ConsumerB2 consumerB2 = new ConsumerB2();
    ConsumerD consumerD = new ConsumerD();

    EventHandlerChain<String> eventHandlerChain1 = new EventHandlerChain<String>(new EventHandler[]{consumerA}, new EventHandler[]{consumerB1, consumerB2});
    EventHandlerChain<String> eventHandlerChain2 = new EventHandlerChain<String>(new EventHandler[]{consumerB1, consumerB2}, new EventHandler[]{consumerD});

    disruptorConfig.setEventHandlerChain(new EventHandlerChain[]{eventHandlerChain1, eventHandlerChain2});
    disruptorConfig.init();

    disruptorConfig.publish(new EventTranslator<String>() {

        @Override
        public void translateTo(String event, long sequence) {
            event = "hi there";
        }
    });
}
项目:disruptor-spring-manager    文件:DefaultDisruptorConfigTest.java   
/** 
 *                                            Consumer B1 -> Consumer C1
 *                                           /                          \
 * Publisher -> Ring buffer ---> Consumer A -                            -> Consumer D 
 *                                           \                          /
 *                                            Consumer B2 -> Consumer C2
 * 
 * Look at the graph that gets printed by log4j.
 */
@Test
public void test_publish_complicated_diamond_eventprocessor_topology() {
    ConsumerA consumerA = new ConsumerA();
    ConsumerB1 consumerB1 = new ConsumerB1();
    ConsumerB2 consumerB2 = new ConsumerB2();
    ConsumerC1 consumerC1 = new ConsumerC1();
    ConsumerC2 consumerC2 = new ConsumerC2();
    ConsumerD consumerD = new ConsumerD();

    EventHandlerChain<String> eventHandlerChain1 = new EventHandlerChain<String>(new EventHandler[]{consumerA}, new EventHandler[]{consumerB1, consumerB2});
    EventHandlerChain<String> eventHandlerChain2 = new EventHandlerChain<String>(new EventHandler[]{consumerB1}, new EventHandler[]{consumerC1});
    EventHandlerChain<String> eventHandlerChain3 = new EventHandlerChain<String>(new EventHandler[]{consumerB2}, new EventHandler[]{consumerC2});
    EventHandlerChain<String> eventHandlerChain4 = new EventHandlerChain<String>(new EventHandler[]{consumerC1, consumerC2}, new EventHandler[]{consumerD});

    disruptorConfig.setEventHandlerChain(new EventHandlerChain[]{eventHandlerChain1, eventHandlerChain2, eventHandlerChain3, eventHandlerChain4});
    disruptorConfig.init();

    disruptorConfig.publish(new EventTranslator<String>() {

        @Override
        public void translateTo(String event, long sequence) {
            event = "hi there";
        }
    });
}
项目:jstorm-0.9.6.3-    文件:RingBuffer.java   
/**
 * Attempts to publish an event to the ring buffer.  It handles
 * claiming the next sequence, getting the current (uninitialised)
 * event from the ring buffer and publishing the claimed sequence
 * after translation.  Will return false if specified capacity
 * was not available.
 *
 * @param translator The user specified translation for the event
 * @return true if the value was published, false if there was insufficient
 * capacity.
 */
public boolean tryPublishEvent(EventTranslator<E> translator)
{
    try
    {
        final long sequence = sequencer.tryNext();
        translateAndPublish(translator, sequence);
        return true;
    }
    catch (InsufficientCapacityException e)
    {
        return false;
    }
}
项目:jstorm-0.9.6.3-    文件:RingBuffer.java   
/**
 * Attempts to publish multiple events to the ring buffer.  It handles
 * claiming the next sequence, getting the current (uninitialised)
 * event from the ring buffer and publishing the claimed sequence
 * after translation.  Will return false if specified capacity
 * was not available.
 *
 * @param translators   The user specified translation for the event
 * @param batchStartsAt The first element of the array which is within the batch.
 * @param batchSize     The actual size of the batch
 * @return true if all the values were published, false if there was insufficient
 *         capacity.
 */
public boolean tryPublishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize)
{
    checkBounds(translators, batchStartsAt, batchSize);
    try
    {
        final long finalSequence = sequencer.tryNext(batchSize);
        translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence);
        return true;
    }
    catch (InsufficientCapacityException e)
    {
        return false;
    }
}
项目:jstorm-0.9.6.3-    文件:RingBuffer.java   
private void translateAndPublish(EventTranslator<E> translator, long sequence)
{
    try
    {
        translator.translateTo(get(sequence), sequence);
    }
    finally
    {
        sequencer.publish(sequence);
    }
}
项目:learn_jstorm    文件:RingBuffer.java   
/**
 * Attempts to publish an event to the ring buffer.  It handles
 * claiming the next sequence, getting the current (uninitialised)
 * event from the ring buffer and publishing the claimed sequence
 * after translation.  Will return false if specified capacity
 * was not available.
 *
 * @param translator The user specified translation for the event
 * @return true if the value was published, false if there was insufficient
 * capacity.
 */
public boolean tryPublishEvent(EventTranslator<E> translator)
{
    try
    {
        final long sequence = sequencer.tryNext();
        translateAndPublish(translator, sequence);
        return true;
    }
    catch (InsufficientCapacityException e)
    {
        return false;
    }
}
项目:learn_jstorm    文件:RingBuffer.java   
/**
 * Attempts to publish multiple events to the ring buffer.  It handles
 * claiming the next sequence, getting the current (uninitialised)
 * event from the ring buffer and publishing the claimed sequence
 * after translation.  Will return false if specified capacity
 * was not available.
 *
 * @param translators   The user specified translation for the event
 * @param batchStartsAt The first element of the array which is within the batch.
 * @param batchSize     The actual size of the batch
 * @return true if all the values were published, false if there was insufficient
 *         capacity.
 */
public boolean tryPublishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize)
{
    checkBounds(translators, batchStartsAt, batchSize);
    try
    {
        final long finalSequence = sequencer.tryNext(batchSize);
        translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence);
        return true;
    }
    catch (InsufficientCapacityException e)
    {
        return false;
    }
}
项目:learn_jstorm    文件:RingBuffer.java   
private void translateAndPublish(EventTranslator<E> translator, long sequence)
{
    try
    {
        translator.translateTo(get(sequence), sequence);
    }
    finally
    {
        sequencer.publish(sequence);
    }
}
项目:jstrom    文件:RingBuffer.java   
/**
 * Attempts to publish an event to the ring buffer. It handles claiming the next sequence, getting the current (uninitialised) event from the ring buffer
 * and publishing the claimed sequence after translation. Will return false if specified capacity was not available.
 * 
 * @param translator The user specified translation for the event
 * @return true if the value was published, false if there was insufficient capacity.
 */
public boolean tryPublishEvent(EventTranslator<E> translator) {
    try {
        final long sequence = sequencer.tryNext();
        translateAndPublish(translator, sequence);
        return true;
    } catch (InsufficientCapacityException e) {
        return false;
    }
}
项目:jstrom    文件:RingBuffer.java   
/**
 * Attempts to publish multiple events to the ring buffer. It handles claiming the next sequence, getting the current (uninitialised) event from the ring
 * buffer and publishing the claimed sequence after translation. Will return false if specified capacity was not available.
 * 
 * @param translators The user specified translation for the event
 * @param batchStartsAt The first element of the array which is within the batch.
 * @param batchSize The actual size of the batch
 * @return true if all the values were published, false if there was insufficient capacity.
 */
public boolean tryPublishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize) {
    checkBounds(translators, batchStartsAt, batchSize);
    try {
        final long finalSequence = sequencer.tryNext(batchSize);
        translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence);
        return true;
    } catch (InsufficientCapacityException e) {
        return false;
    }
}
项目:jstrom    文件:RingBuffer.java   
private void translateAndPublish(EventTranslator<E> translator, long sequence) {
    try {
        translator.translateTo(get(sequence), sequence);
    } finally {
        sequencer.publish(sequence);
    }
}
项目:jstrom    文件:RingBuffer.java   
private void translateAndPublishBatch(final EventTranslator<E>[] translators, int batchStartsAt, final int batchSize, final long finalSequence) {
    final long initialSequence = finalSequence - (batchSize - 1);
    try {
        long sequence = initialSequence;
        final int batchEndsAt = batchStartsAt + batchSize;
        for (int i = batchStartsAt; i < batchEndsAt; i++) {
            final EventTranslator<E> translator = translators[i];
            translator.translateTo(get(sequence), sequence++);
        }
    } finally {
        sequencer.publish(initialSequence, finalSequence);
    }
}
项目:Tstream    文件:RingBuffer.java   
/**
 * Attempts to publish an event to the ring buffer.  It handles
 * claiming the next sequence, getting the current (uninitialised)
 * event from the ring buffer and publishing the claimed sequence
 * after translation.  Will return false if specified capacity
 * was not available.
 *
 * @param translator The user specified translation for the event
 * @return true if the value was published, false if there was insufficient
 * capacity.
 */
public boolean tryPublishEvent(EventTranslator<E> translator)
{
    try
    {
        final long sequence = sequencer.tryNext();
        translateAndPublish(translator, sequence);
        return true;
    }
    catch (InsufficientCapacityException e)
    {
        return false;
    }
}
项目:Tstream    文件:RingBuffer.java   
/**
 * Attempts to publish multiple events to the ring buffer.  It handles
 * claiming the next sequence, getting the current (uninitialised)
 * event from the ring buffer and publishing the claimed sequence
 * after translation.  Will return false if specified capacity
 * was not available.
 *
 * @param translators   The user specified translation for the event
 * @param batchStartsAt The first element of the array which is within the batch.
 * @param batchSize     The actual size of the batch
 * @return true if all the values were published, false if there was insufficient
 *         capacity.
 */
public boolean tryPublishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize)
{
    checkBounds(translators, batchStartsAt, batchSize);
    try
    {
        final long finalSequence = sequencer.tryNext(batchSize);
        translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence);
        return true;
    }
    catch (InsufficientCapacityException e)
    {
        return false;
    }
}
项目:Tstream    文件:RingBuffer.java   
private void translateAndPublish(EventTranslator<E> translator, long sequence)
{
    try
    {
        translator.translateTo(get(sequence), sequence);
    }
    finally
    {
        sequencer.publish(sequence);
    }
}
项目:incubator-myriad    文件:MyriadScheduler.java   
/**
 * Publishes a RegisteredEvent
 */
@Override
public void registered(final SchedulerDriver driver, final Protos.FrameworkID frameworkId, final Protos.MasterInfo masterInfo) {
  disruptorManager.getRegisteredEventDisruptor().publishEvent(new EventTranslator<RegisteredEvent>() {
    @Override
    public void translateTo(RegisteredEvent event, long sequence) {
      event.setDriver(driver);
      event.setFrameworkId(frameworkId);
      event.setMasterInfo(masterInfo);
    }
  });
}
项目:incubator-myriad    文件:MyriadScheduler.java   
/**
 * Publishes a ReRegisteredEvent
 */
@Override
public void reregistered(final SchedulerDriver driver, final Protos.MasterInfo masterInfo) {
  disruptorManager.getReRegisteredEventDisruptor().publishEvent(new EventTranslator<ReRegisteredEvent>() {
    @Override
    public void translateTo(ReRegisteredEvent event, long sequence) {
      event.setDriver(driver);
      event.setMasterInfo(masterInfo);
    }
  });
}
项目:incubator-myriad    文件:MyriadScheduler.java   
/**
 * Publishes a ResourceOffersEvent
 */
@Override
public void resourceOffers(final SchedulerDriver driver, final List<Protos.Offer> offers) {
  disruptorManager.getResourceOffersEventDisruptor().publishEvent(new EventTranslator<ResourceOffersEvent>() {
    @Override
    public void translateTo(ResourceOffersEvent event, long sequence) {
      event.setDriver(driver);
      event.setOffers(offers);
    }
  });
}
项目:incubator-myriad    文件:MyriadScheduler.java   
/**
 * Publishes a OfferRescindedEvent
 */
@Override
public void offerRescinded(final SchedulerDriver driver, final Protos.OfferID offerId) {
  disruptorManager.getOfferRescindedEventDisruptor().publishEvent(new EventTranslator<OfferRescindedEvent>() {
    @Override
    public void translateTo(OfferRescindedEvent event, long sequence) {
      event.setDriver(driver);
      event.setOfferId(offerId);
    }
  });
}
项目:incubator-myriad    文件:MyriadScheduler.java   
/**
 * Publishes a StatusUpdateEvent
 */
@Override
public void statusUpdate(final SchedulerDriver driver, final Protos.TaskStatus status) {
  disruptorManager.getStatusUpdateEventDisruptor().publishEvent(new EventTranslator<StatusUpdateEvent>() {
    @Override
    public void translateTo(StatusUpdateEvent event, long sequence) {
      event.setDriver(driver);
      event.setStatus(status);
    }
  });
}
项目:incubator-myriad    文件:MyriadScheduler.java   
/**
 * Publishes FrameworkMessageEvent
 */
@Override
public void frameworkMessage(final SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId,
                             final byte[] bytes) {
  disruptorManager.getFrameworkMessageEventDisruptor().publishEvent(new EventTranslator<FrameworkMessageEvent>() {
    @Override
    public void translateTo(FrameworkMessageEvent event, long sequence) {
      event.setDriver(driver);
      event.setBytes(bytes);
      event.setExecutorId(executorId);
      event.setSlaveId(slaveId);
    }
  });
}
项目:incubator-myriad    文件:MyriadScheduler.java   
/**
 * Publishes DisconnectedEvent
 */
@Override
public void disconnected(final SchedulerDriver driver) {
  disruptorManager.getDisconnectedEventDisruptor().publishEvent(new EventTranslator<DisconnectedEvent>() {
    @Override
    public void translateTo(DisconnectedEvent event, long sequence) {
      event.setDriver(driver);
    }
  });
}
项目:incubator-myriad    文件:MyriadScheduler.java   
/**
 * Publishes SlaveLostEvent
 */
@Override
public void slaveLost(final SchedulerDriver driver, final Protos.SlaveID slaveId) {
  disruptorManager.getSlaveLostEventDisruptor().publishEvent(new EventTranslator<SlaveLostEvent>() {
    @Override
    public void translateTo(SlaveLostEvent event, long sequence) {
      event.setDriver(driver);
      event.setSlaveId(slaveId);
    }
  });
}
项目:incubator-myriad    文件:MyriadScheduler.java   
/**
 * Publishes ExecutorLostEvent
 */
@Override
public void executorLost(final SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId,
                         final int exitStatus) {
  disruptorManager.getExecutorLostEventDisruptor().publishEvent(new EventTranslator<ExecutorLostEvent>() {
    @Override
    public void translateTo(ExecutorLostEvent event, long sequence) {
      event.setDriver(driver);
      event.setExecutorId(executorId);
      event.setSlaveId(slaveId);
      event.setExitStatus(exitStatus);
    }
  });
}
项目:incubator-myriad    文件:MyriadScheduler.java   
/**
 * Publishes ErrorEvent
 */
@Override
public void error(final SchedulerDriver driver, final String message) {
  disruptorManager.getErrorEventDisruptor().publishEvent(new EventTranslator<ErrorEvent>() {
    @Override
    public void translateTo(ErrorEvent event, long sequence) {
      event.setDriver(driver);
      event.setMessage(message);
    }
  });
}
项目:jstorm    文件:RingBuffer.java   
/**
 * Attempts to publish an event to the ring buffer. It handles claiming the next sequence, getting the current (uninitialised) event from the ring buffer
 * and publishing the claimed sequence after translation. Will return false if specified capacity was not available.
 * 
 * @param translator The user specified translation for the event
 * @return true if the value was published, false if there was insufficient capacity.
 */
public boolean tryPublishEvent(EventTranslator<E> translator) {
    try {
        final long sequence = sequencer.tryNext();
        translateAndPublish(translator, sequence);
        return true;
    } catch (InsufficientCapacityException e) {
        return false;
    }
}
项目:jstorm    文件:RingBuffer.java   
/**
 * Attempts to publish multiple events to the ring buffer. It handles claiming the next sequence, getting the current (uninitialised) event from the ring
 * buffer and publishing the claimed sequence after translation. Will return false if specified capacity was not available.
 * 
 * @param translators The user specified translation for the event
 * @param batchStartsAt The first element of the array which is within the batch.
 * @param batchSize The actual size of the batch
 * @return true if all the values were published, false if there was insufficient capacity.
 */
public boolean tryPublishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize) {
    checkBounds(translators, batchStartsAt, batchSize);
    try {
        final long finalSequence = sequencer.tryNext(batchSize);
        translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence);
        return true;
    } catch (InsufficientCapacityException e) {
        return false;
    }
}
项目:jstorm    文件:RingBuffer.java   
private void translateAndPublish(EventTranslator<E> translator, long sequence) {
    try {
        translator.translateTo(get(sequence), sequence);
    } finally {
        sequencer.publish(sequence);
    }
}
项目:jstorm    文件:RingBuffer.java   
private void translateAndPublishBatch(final EventTranslator<E>[] translators, int batchStartsAt, final int batchSize, final long finalSequence) {
    final long initialSequence = finalSequence - (batchSize - 1);
    try {
        long sequence = initialSequence;
        final int batchEndsAt = batchStartsAt + batchSize;
        for (int i = batchStartsAt; i < batchEndsAt; i++) {
            final EventTranslator<E> translator = translators[i];
            translator.translateTo(get(sequence), sequence++);
        }
    } finally {
        sequencer.publish(initialSequence, finalSequence);
    }
}
项目:jstorm-0.9.6.3-    文件:RingBuffer.java   
private void checkBounds(final EventTranslator<E>[] translators, final int batchStartsAt, final int batchSize)
{
    checkBatchSizing(batchStartsAt, batchSize);
    batchOverRuns(translators, batchStartsAt, batchSize);
}
项目:learn_jstorm    文件:RingBuffer.java   
private void checkBounds(final EventTranslator<E>[] translators, final int batchStartsAt, final int batchSize)
{
    checkBatchSizing(batchStartsAt, batchSize);
    batchOverRuns(translators, batchStartsAt, batchSize);
}