protected void publishQueueTask(final String queueName,final Task task) { // RingBuffer<QueueEvent> ringBuffer=disruptor.getRingBuffer(); // long sequence = ringBuffer.next(); // try // { // QueueEvent event = ringBuffer.get(sequence); // event.setQueueName(queueName); // event.getTasks().add(task); // }finally // { // ringBuffer.publish(sequence); // } disruptor.publishEvent(new EventTranslator<QueueEvent>() { @Override public void translateTo(QueueEvent event, long sequence) { event.setQueueName(queueName); event.getTasks().add(task); } }); }
protected void publishQueueTask(final String queueName,final List<Task> tasks) { // RingBuffer<QueueEvent> ringBuffer=disruptor.getRingBuffer(); // long sequence = ringBuffer.next(); // try // { // QueueEvent event = ringBuffer.get(sequence); // event.setQueueName(queueName); // event.getTasks().addAll(tasks); // }finally // { // ringBuffer.publish(sequence); // } disruptor.publishEvent(new EventTranslator<QueueEvent>() { @Override public void translateTo(QueueEvent event, long sequence) { event.setQueueName(queueName); event.getTasks().addAll(tasks); } }); }
private TestEvent publishEvent() throws InterruptedException, BrokenBarrierException { if (ringBuffer == null) { ringBuffer = disruptor.start(); for (DelayedEventHandler eventHandler : delayedEventHandlers) { eventHandler.awaitStart(); } } disruptor.publishEvent( new EventTranslator<TestEvent>() { @Override public void translateTo(final TestEvent event, final long sequence) { lastPublishedEvent = event; } }); return lastPublishedEvent; }
private void translateAndPublishBatch(final EventTranslator<E>[] translators, int batchStartsAt, final int batchSize, final long finalSequence) { final long initialSequence = finalSequence - (batchSize - 1); try { long sequence = initialSequence; final int batchEndsAt = batchStartsAt + batchSize; for (int i = batchStartsAt; i < batchEndsAt; i++) { final EventTranslator<E> translator = translators[i]; translator.translateTo(get(sequence), sequence++); } } finally { sequencer.publish(initialSequence, finalSequence); } }
@Before public void setup(){ disruptorConfigurator = new BaseDisruptorConfig() { @Override public void disruptorExceptionHandler() {} @Override public void disruptorEventHandler() {} @Override public void publish(EventTranslator eventTranslator) {} }; disruptorConfigurator.setThreadName(THREAD_NAME); disruptorConfigurator.setProducerType(ProducerType.SINGLE); disruptorConfigurator.setRingBufferSize(ringBufferSize); disruptorConfigurator.setWaitStrategyType(WaitStrategyType.BLOCKING); disruptorConfigurator.setEventFactory(new SampleEventFactory()); disruptorConfigurator.init(); }
/** * Publisher -> Ring buffer ---> Consumer A * Look at the graph that gets printed by log4j. */ @Test public void test_publish_single_eventprocessor_topology() { ConsumerA consumerA = new ConsumerA(); EventHandlerChain<String> eventHandlerChain1 = new EventHandlerChain<String>(new EventHandler[]{consumerA}); disruptorConfig.setEventHandlerChain(new EventHandlerChain[]{eventHandlerChain1}); disruptorConfig.init(); disruptorConfig.publish(new EventTranslator<String>() { @Override public void translateTo(String event, long sequence) { event = "hi there"; } }); }
/** * Publisher -> Ring buffer ---> Consumer A -> Consumer B1 -> Consumer D * Look at the graph that gets printed by log4j. */ @Test public void test_publish_simple_eventprocessor_topology() { ConsumerA consumerA = new ConsumerA(); ConsumerB1 consumerB1 = new ConsumerB1(); ConsumerD consumerD = new ConsumerD(); EventHandlerChain<String> eventHandlerChain1 = new EventHandlerChain<String>(new EventHandler[]{consumerA}, new EventHandler[]{consumerB1}); EventHandlerChain<String> eventHandlerChain2 = new EventHandlerChain<String>(new EventHandler[]{consumerB1}, new EventHandler[]{consumerD}); disruptorConfig.setEventHandlerChain(new EventHandlerChain[]{eventHandlerChain1, eventHandlerChain2}); disruptorConfig.init(); disruptorConfig.publish(new EventTranslator<String>() { @Override public void translateTo(String event, long sequence) { event = "hi there"; } }); }
/** * Consumer B1 * / \ * Publisher -> Ring buffer ---> Consumer A - -> Consumer D * \ / * Consumer B2 * * Look at the graph that gets printed by log4j. */ @Test public void test_publish_diamond_eventprocessor_topology() { ConsumerA consumerA = new ConsumerA(); ConsumerB1 consumerB1 = new ConsumerB1(); ConsumerB2 consumerB2 = new ConsumerB2(); ConsumerD consumerD = new ConsumerD(); EventHandlerChain<String> eventHandlerChain1 = new EventHandlerChain<String>(new EventHandler[]{consumerA}, new EventHandler[]{consumerB1, consumerB2}); EventHandlerChain<String> eventHandlerChain2 = new EventHandlerChain<String>(new EventHandler[]{consumerB1, consumerB2}, new EventHandler[]{consumerD}); disruptorConfig.setEventHandlerChain(new EventHandlerChain[]{eventHandlerChain1, eventHandlerChain2}); disruptorConfig.init(); disruptorConfig.publish(new EventTranslator<String>() { @Override public void translateTo(String event, long sequence) { event = "hi there"; } }); }
/** * Consumer B1 -> Consumer C1 * / \ * Publisher -> Ring buffer ---> Consumer A - -> Consumer D * \ / * Consumer B2 -> Consumer C2 * * Look at the graph that gets printed by log4j. */ @Test public void test_publish_complicated_diamond_eventprocessor_topology() { ConsumerA consumerA = new ConsumerA(); ConsumerB1 consumerB1 = new ConsumerB1(); ConsumerB2 consumerB2 = new ConsumerB2(); ConsumerC1 consumerC1 = new ConsumerC1(); ConsumerC2 consumerC2 = new ConsumerC2(); ConsumerD consumerD = new ConsumerD(); EventHandlerChain<String> eventHandlerChain1 = new EventHandlerChain<String>(new EventHandler[]{consumerA}, new EventHandler[]{consumerB1, consumerB2}); EventHandlerChain<String> eventHandlerChain2 = new EventHandlerChain<String>(new EventHandler[]{consumerB1}, new EventHandler[]{consumerC1}); EventHandlerChain<String> eventHandlerChain3 = new EventHandlerChain<String>(new EventHandler[]{consumerB2}, new EventHandler[]{consumerC2}); EventHandlerChain<String> eventHandlerChain4 = new EventHandlerChain<String>(new EventHandler[]{consumerC1, consumerC2}, new EventHandler[]{consumerD}); disruptorConfig.setEventHandlerChain(new EventHandlerChain[]{eventHandlerChain1, eventHandlerChain2, eventHandlerChain3, eventHandlerChain4}); disruptorConfig.init(); disruptorConfig.publish(new EventTranslator<String>() { @Override public void translateTo(String event, long sequence) { event = "hi there"; } }); }
/** * Attempts to publish an event to the ring buffer. It handles * claiming the next sequence, getting the current (uninitialised) * event from the ring buffer and publishing the claimed sequence * after translation. Will return false if specified capacity * was not available. * * @param translator The user specified translation for the event * @return true if the value was published, false if there was insufficient * capacity. */ public boolean tryPublishEvent(EventTranslator<E> translator) { try { final long sequence = sequencer.tryNext(); translateAndPublish(translator, sequence); return true; } catch (InsufficientCapacityException e) { return false; } }
/** * Attempts to publish multiple events to the ring buffer. It handles * claiming the next sequence, getting the current (uninitialised) * event from the ring buffer and publishing the claimed sequence * after translation. Will return false if specified capacity * was not available. * * @param translators The user specified translation for the event * @param batchStartsAt The first element of the array which is within the batch. * @param batchSize The actual size of the batch * @return true if all the values were published, false if there was insufficient * capacity. */ public boolean tryPublishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize) { checkBounds(translators, batchStartsAt, batchSize); try { final long finalSequence = sequencer.tryNext(batchSize); translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence); return true; } catch (InsufficientCapacityException e) { return false; } }
private void translateAndPublish(EventTranslator<E> translator, long sequence) { try { translator.translateTo(get(sequence), sequence); } finally { sequencer.publish(sequence); } }
/** * Attempts to publish an event to the ring buffer. It handles claiming the next sequence, getting the current (uninitialised) event from the ring buffer * and publishing the claimed sequence after translation. Will return false if specified capacity was not available. * * @param translator The user specified translation for the event * @return true if the value was published, false if there was insufficient capacity. */ public boolean tryPublishEvent(EventTranslator<E> translator) { try { final long sequence = sequencer.tryNext(); translateAndPublish(translator, sequence); return true; } catch (InsufficientCapacityException e) { return false; } }
/** * Attempts to publish multiple events to the ring buffer. It handles claiming the next sequence, getting the current (uninitialised) event from the ring * buffer and publishing the claimed sequence after translation. Will return false if specified capacity was not available. * * @param translators The user specified translation for the event * @param batchStartsAt The first element of the array which is within the batch. * @param batchSize The actual size of the batch * @return true if all the values were published, false if there was insufficient capacity. */ public boolean tryPublishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize) { checkBounds(translators, batchStartsAt, batchSize); try { final long finalSequence = sequencer.tryNext(batchSize); translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence); return true; } catch (InsufficientCapacityException e) { return false; } }
/** * Publishes a RegisteredEvent */ @Override public void registered(final SchedulerDriver driver, final Protos.FrameworkID frameworkId, final Protos.MasterInfo masterInfo) { disruptorManager.getRegisteredEventDisruptor().publishEvent(new EventTranslator<RegisteredEvent>() { @Override public void translateTo(RegisteredEvent event, long sequence) { event.setDriver(driver); event.setFrameworkId(frameworkId); event.setMasterInfo(masterInfo); } }); }
/** * Publishes a ReRegisteredEvent */ @Override public void reregistered(final SchedulerDriver driver, final Protos.MasterInfo masterInfo) { disruptorManager.getReRegisteredEventDisruptor().publishEvent(new EventTranslator<ReRegisteredEvent>() { @Override public void translateTo(ReRegisteredEvent event, long sequence) { event.setDriver(driver); event.setMasterInfo(masterInfo); } }); }
/** * Publishes a ResourceOffersEvent */ @Override public void resourceOffers(final SchedulerDriver driver, final List<Protos.Offer> offers) { disruptorManager.getResourceOffersEventDisruptor().publishEvent(new EventTranslator<ResourceOffersEvent>() { @Override public void translateTo(ResourceOffersEvent event, long sequence) { event.setDriver(driver); event.setOffers(offers); } }); }
/** * Publishes a OfferRescindedEvent */ @Override public void offerRescinded(final SchedulerDriver driver, final Protos.OfferID offerId) { disruptorManager.getOfferRescindedEventDisruptor().publishEvent(new EventTranslator<OfferRescindedEvent>() { @Override public void translateTo(OfferRescindedEvent event, long sequence) { event.setDriver(driver); event.setOfferId(offerId); } }); }
/** * Publishes a StatusUpdateEvent */ @Override public void statusUpdate(final SchedulerDriver driver, final Protos.TaskStatus status) { disruptorManager.getStatusUpdateEventDisruptor().publishEvent(new EventTranslator<StatusUpdateEvent>() { @Override public void translateTo(StatusUpdateEvent event, long sequence) { event.setDriver(driver); event.setStatus(status); } }); }
/** * Publishes FrameworkMessageEvent */ @Override public void frameworkMessage(final SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final byte[] bytes) { disruptorManager.getFrameworkMessageEventDisruptor().publishEvent(new EventTranslator<FrameworkMessageEvent>() { @Override public void translateTo(FrameworkMessageEvent event, long sequence) { event.setDriver(driver); event.setBytes(bytes); event.setExecutorId(executorId); event.setSlaveId(slaveId); } }); }
/** * Publishes DisconnectedEvent */ @Override public void disconnected(final SchedulerDriver driver) { disruptorManager.getDisconnectedEventDisruptor().publishEvent(new EventTranslator<DisconnectedEvent>() { @Override public void translateTo(DisconnectedEvent event, long sequence) { event.setDriver(driver); } }); }
/** * Publishes SlaveLostEvent */ @Override public void slaveLost(final SchedulerDriver driver, final Protos.SlaveID slaveId) { disruptorManager.getSlaveLostEventDisruptor().publishEvent(new EventTranslator<SlaveLostEvent>() { @Override public void translateTo(SlaveLostEvent event, long sequence) { event.setDriver(driver); event.setSlaveId(slaveId); } }); }
/** * Publishes ExecutorLostEvent */ @Override public void executorLost(final SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final int exitStatus) { disruptorManager.getExecutorLostEventDisruptor().publishEvent(new EventTranslator<ExecutorLostEvent>() { @Override public void translateTo(ExecutorLostEvent event, long sequence) { event.setDriver(driver); event.setExecutorId(executorId); event.setSlaveId(slaveId); event.setExitStatus(exitStatus); } }); }
/** * Publishes ErrorEvent */ @Override public void error(final SchedulerDriver driver, final String message) { disruptorManager.getErrorEventDisruptor().publishEvent(new EventTranslator<ErrorEvent>() { @Override public void translateTo(ErrorEvent event, long sequence) { event.setDriver(driver); event.setMessage(message); } }); }
private void checkBounds(final EventTranslator<E>[] translators, final int batchStartsAt, final int batchSize) { checkBatchSizing(batchStartsAt, batchSize); batchOverRuns(translators, batchStartsAt, batchSize); }