private <A> void translateAndPublishBatch(final EventTranslatorOneArg<E, A> translator, final A[] arg0, int batchStartsAt, final int batchSize, final long finalSequence) { final long initialSequence = finalSequence - (batchSize - 1); try { long sequence = initialSequence; final int batchEndsAt = batchStartsAt + batchSize; for (int i = batchStartsAt; i < batchEndsAt; i++) { translator.translateTo(get(sequence), sequence++, arg0[i]); } } finally { sequencer.publish(initialSequence, finalSequence); } }
@Before public void init(){ m_latch = new CountDownLatch(1); BatchProcessor<Object> bp = new BatchProcessor<Object>() { @Override public void process(List<? extends Object> batch) throws BatchProcessorException { m_receivedEvents.add(new ArrayList<>(batch)); for (int i = 0; i < batch.size(); i++){ m_latch.countDown(); } try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } }; m_processor = new AsyncConcurrentBatchingProcessor<Object>(bp, Object::new, new EventTranslatorOneArg<Object, Object>() { public void translateTo(Object event, long sequence, Object arg0) {}; }); }
/** * Allows one user supplied argument. * * @see #tryPublishEvent(EventTranslator) * @param translator The user specified translation for the event * @param arg0 A user supplied argument. * @return true if the value was published, false if there was insufficient * capacity. */ public <A> boolean tryPublishEvent(EventTranslatorOneArg<E, A> translator, A arg0) { try { final long sequence = sequencer.tryNext(); translateAndPublish(translator, sequence, arg0); return true; } catch (InsufficientCapacityException e) { return false; } }
/** * Allows one user supplied argument. * * @param translator The user specified translation for each event * @param batchStartsAt The first element of the array which is within the batch. * @param batchSize The actual size of the batch * @param arg0 An array of user supplied arguments, one element per event. * @return true if the value was published, false if there was insufficient * capacity. * @see #tryPublishEvents(EventTranslator[]) */ public <A> boolean tryPublishEvents(EventTranslatorOneArg<E, A> translator, int batchStartsAt, int batchSize, A[] arg0) { checkBounds(arg0, batchStartsAt, batchSize); try { final long finalSequence = sequencer.tryNext(batchSize); translateAndPublishBatch(translator, arg0, batchStartsAt, batchSize, finalSequence); return true; } catch (InsufficientCapacityException e) { return false; } }
private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0) { try { translator.translateTo(get(sequence), sequence, arg0); } finally { sequencer.publish(sequence); } }
/** * Allows one user supplied argument. * * @see #tryPublishEvent(EventTranslator) * @param translator The user specified translation for the event * @param arg0 A user supplied argument. * @return true if the value was published, false if there was insufficient capacity. */ public <A> boolean tryPublishEvent(EventTranslatorOneArg<E, A> translator, A arg0) { try { final long sequence = sequencer.tryNext(); translateAndPublish(translator, sequence, arg0); return true; } catch (InsufficientCapacityException e) { return false; } }
/** * Allows one user supplied argument. * * @param translator The user specified translation for each event * @param batchStartsAt The first element of the array which is within the batch. * @param batchSize The actual size of the batch * @param arg0 An array of user supplied arguments, one element per event. * @return true if the value was published, false if there was insufficient capacity. * @see #tryPublishEvents(EventTranslator[]) */ public <A> boolean tryPublishEvents(EventTranslatorOneArg<E, A> translator, int batchStartsAt, int batchSize, A[] arg0) { checkBounds(arg0, batchStartsAt, batchSize); try { final long finalSequence = sequencer.tryNext(batchSize); translateAndPublishBatch(translator, arg0, batchStartsAt, batchSize, finalSequence); return true; } catch (InsufficientCapacityException e) { return false; } }
@org.junit.Test public void test() { long t1 =System.currentTimeMillis(); for (int i = 0;i < 10;i++) { new Thread() { @Override public void run() { for (int j = 0; j < count; j++) { disruptor.getRingBuffer().publishEvent(new EventTranslatorOneArg<Event, Integer>() { @Override public void translateTo(Event event, long sequence, Integer arg0) { event.setValue(arg0); } }, j); } } }.start(); } try { cdh1.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("count " + counter.get()); System.out.println(System.currentTimeMillis() - t1); }
@SuppressWarnings("unchecked") public AsyncConcurrentBatchingProcessor(BatchProcessor<T> processor, EventFactory<T> factory, EventTranslatorOneArg<T,T> translator ){ Executor executor = Executors.newSingleThreadExecutor(); int bufferSize = 1024; m_disruptor = new Disruptor<T>(factory, bufferSize, executor); m_disruptor.handleEventsWith(this); m_buffer = m_disruptor.start(); m_processor = processor; m_translator = translator; }
private void publishEvents(int size, RingBuffer<T> valueEventRingBuffer) { final EventTranslatorOneArg<T, Long> eventTranslator = new EventTranslatorOneArg<T, Long>() { @Override public void translateTo(T event, long sequence, Long arg0) { eventFactory.setEventValue(event, arg0); } }; for(long i = 0; i < size; i++) { valueEventRingBuffer.publishEvent(eventTranslator, i); } }
public EventTranslatorOneArg<DisruptorEvent, Object> getEventTranslator() { return eventTranslator; }
public void setEventTranslator(EventTranslatorOneArg<DisruptorEvent, Object> eventTranslator) { this.eventTranslator = eventTranslator; }
@Bean @ConditionalOnMissingBean public EventTranslatorOneArg<DisruptorEvent, Object> eventTranslator() { return new DisruptorEventTranslator(); }
public void setTranslator(EventTranslatorOneArg<QueueEvent, EventData> translator) { this.translator = translator; }
public final void setEventTranslator(EventTranslatorOneArg<LogEvent<E>, E> eventTranslator) { this.eventTranslator = eventTranslator; }
@Override public <A> void publishEvent(EventTranslatorOneArg<ResponseEvent, A> translator, A arg0) { throw new UnsupportedOperationException(); }
@Override public <A> boolean tryPublishEvent(EventTranslatorOneArg<ResponseEvent, A> translator, A arg0) { throw new UnsupportedOperationException(); }
@Override public <A> void publishEvents(EventTranslatorOneArg<ResponseEvent, A> translator, A[] arg0) { throw new UnsupportedOperationException(); }
@Override public <A> void publishEvents(EventTranslatorOneArg<ResponseEvent, A> translator, int batchStartsAt, int batchSize, A[] arg0) { throw new UnsupportedOperationException(); }
@Override public <A> boolean tryPublishEvents(EventTranslatorOneArg<ResponseEvent, A> translator, A[] arg0) { throw new UnsupportedOperationException(); }
@Override public <A> boolean tryPublishEvents(EventTranslatorOneArg<ResponseEvent, A> translator, int batchStartsAt, int batchSize, A[] arg0) { throw new UnsupportedOperationException(); }
/** * Allows one user supplied argument per event. * * @param translator The user specified translation for each event * @param batchStartsAt The first element of the array which is within the batch. * @param batchSize The actual size of the batch * @param arg0 An array of user supplied arguments, one element per event. * @see #publishEvents(EventTranslator[]) */ public <A> void publishEvents(EventTranslatorOneArg<E, A> translator, int batchStartsAt, int batchSize, A[] arg0) { checkBounds(arg0, batchStartsAt, batchSize); final long finalSequence = sequencer.next(batchSize); translateAndPublishBatch(translator, arg0, batchStartsAt, batchSize, finalSequence); }
public void onData(T command) { ringBuffer.publishEvent((EventTranslatorOneArg) TRANSLATOR, command); }