private void initDisruptor(int processors, int ringBufferSize) { LOG.info("eds client init disruptor with processors=" + processors + " and ringBufferSize=" + ringBufferSize); executor = Executors.newFixedThreadPool( processors, new ThreadFactoryBuilder().setNameFormat("disruptor-executor-%d").build()); final WaitStrategy waitStrategy = createWaitStrategy(); ringBufferSize = sizeFor(ringBufferSize); // power of 2 disruptor = new Disruptor<>(EdsRingBufferEvent.FACTORY, ringBufferSize, executor, ProducerType.MULTI, waitStrategy); EdsEventWorkHandler[] handlers = new EdsEventWorkHandler[processors]; for (int i = 0; i < handlers.length; i++) { handlers[i] = new EdsEventWorkHandler(); } // handlers number = threads number disruptor.handleEventsWithWorkerPool(handlers); // "handleEventsWith" just like topics , with multiple consumers disruptor.start(); }
@SuppressWarnings("unchecked") public SharedMessageStore(MessageDao messageDao, int bufferSize, int maxDbBatchSize) { pendingMessages = new ConcurrentHashMap<>(); ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("DisruptorMessageStoreThread-%d").build(); disruptor = new Disruptor<>(DbOperation.getFactory(), bufferSize, namedThreadFactory, ProducerType.MULTI, new SleepingWaitStrategy()); disruptor.setDefaultExceptionHandler(new LogExceptionHandler()); disruptor.handleEventsWith(new DbEventMatcher(bufferSize)) .then(new DbWriter(messageDao, maxDbBatchSize)) .then((EventHandler<DbOperation>) (event, sequence, endOfBatch) -> event.clear()); disruptor.start(); this.messageDao = messageDao; }
/** * create a disruptor * * @param name The title of the disruptor * @param ringBufferSize The size of ringBuffer */ public void createDisruptor(String name, int ringBufferSize) { if (DISRUPTOR_MAP.keySet().contains(name)) { throw new NulsRuntimeException(ErrorCode.FAILED, "create disruptor faild,the name is repetitive!"); } Disruptor<DisruptorEvent> disruptor = new Disruptor<DisruptorEvent>(EVENT_FACTORY, ringBufferSize, new NulsThreadFactory(ModuleService.getInstance().getModuleId(EventBusModuleBootstrap.class),name), ProducerType.SINGLE, new SleepingWaitStrategy()); disruptor.handleEventsWith(new EventHandler<DisruptorEvent>() { @Override public void onEvent(DisruptorEvent disruptorEvent, long l, boolean b) throws Exception { Log.debug(disruptorEvent.getData() + ""); } }); DISRUPTOR_MAP.put(name, disruptor); }
public static void main(String[] args) throws InterruptedException { Disruptor<ObjectBox> disruptor = new Disruptor<ObjectBox>( ObjectBox.FACTORY, RING_SIZE, Executors.newCachedThreadPool(), ProducerType.MULTI, new BlockingWaitStrategy()); disruptor.handleEventsWith(new Consumer()).then(new Consumer()); final RingBuffer<ObjectBox> ringBuffer = disruptor.getRingBuffer(); Publisher p = new Publisher(); IMessage message = new IMessage(); ITransportable transportable = new ITransportable(); String streamName = "com.lmax.wibble"; System.out.println("publishing " + RING_SIZE + " messages"); for (int i = 0; i < RING_SIZE; i++) { ringBuffer.publishEvent(p, message, transportable, streamName); Thread.sleep(10); } System.out.println("start disruptor"); disruptor.start(); System.out.println("continue publishing"); while (true) { ringBuffer.publishEvent(p, message, transportable, streamName); Thread.sleep(10); } }
@SuppressWarnings("unchecked") private RingBufferService(final Properties config) { config.put(KafkaProducerService.CONFIG_PREFIX + "value.serializer", ByteBufSerde.ByteBufSerializer.class.getName()); config.put(KafkaProducerService.CONFIG_PREFIX + "key.serializer", Serdes.String().serializer()); producer = KafkaProducerService.getInstance(config); bufferSize = ConfigurationHelper.getIntSystemThenEnvProperty(RB_CONF_BUFFER_SIZE, RB_DEFAULT_BUFFER_SIZE, config); targetTopic = ConfigurationHelper.getSystemThenEnvProperty(RB_CONF_TOPIC_NAME, RB_DEFAULT_TOPIC_NAME, config); eventBuffInitSize = ConfigurationHelper.getIntSystemThenEnvProperty(RB_CONF_BUFF_INITIAL_SIZE, RB_DEFAULT_BUFF_INITIAL_SIZE, config); shutdownTimeout = ConfigurationHelper.getIntSystemThenEnvProperty(RB_CONF_STOP_TIMEOUT, RB_DEFAULT_STOP_TIMEOUT, config); waitStrategy = RBWaitStrategy.getConfiguredStrategy(config); disruptor = new Disruptor<ByteBuf>(this, bufferSize, threadFactory, ProducerType.MULTI, waitStrategy); disruptor.handleEventsWith(this); // FIXME: need to able to supply several handlers disruptor.start(); ringBuffer = disruptor.getRingBuffer(); log.info("<<<<< RawRingBufferDispatcher Started."); }
private void initDispatcherFactoryFromConfiguration(String name) { if (dispatcherFactories.get(name) != null) { return; } for (DispatcherConfiguration dispatcherConfiguration : configuration.getDispatcherConfigurations()) { if (!dispatcherConfiguration.getName() .equalsIgnoreCase(name)) { continue; } if (DispatcherType.DISPATCHER_GROUP == dispatcherConfiguration.getType()) { addCachedDispatchers(dispatcherConfiguration.getName(), createDispatcherFactory(dispatcherConfiguration.getName(), dispatcherConfiguration.getSize() == 0 ? PROCESSORS : dispatcherConfiguration.getSize(), dispatcherConfiguration.getBacklog(), null, ProducerType.MULTI, new AgileWaitingStrategy())); } } }
private RingBufferProcessor(String name, ExecutorService executor, int bufferSize, WaitStrategy waitStrategy, boolean shared, boolean autoCancel) { super(name, executor, autoCancel); this.ringBuffer = RingBuffer.create( shared ? ProducerType.MULTI : ProducerType.SINGLE, new EventFactory<MutableSignal<E>>() { @Override public MutableSignal<E> newInstance() { return new MutableSignal<E>(); } }, bufferSize, waitStrategy ); this.recentSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); this.barrier = ringBuffer.newBarrier(); //ringBuffer.addGatingSequences(recentSequence); }
private RingBufferWorkProcessor(String name, ExecutorService executor, int bufferSize, WaitStrategy waitStrategy, boolean share, boolean autoCancel) { super(name, executor, autoCancel); this.ringBuffer = RingBuffer.create( share ? ProducerType.MULTI : ProducerType.SINGLE, new EventFactory<MutableSignal<E>>() { @Override public MutableSignal<E> newInstance() { return new MutableSignal<E>(); } }, bufferSize, waitStrategy ); ringBuffer.addGatingSequences(workSequence); }
@SuppressWarnings("unchecked") static void qndSingleThread(int numItems) { final AtomicLong COUNTER_RECEIVED = new AtomicLong(0); final Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent.FACTORY, 128, Executors.defaultThreadFactory(), ProducerType.MULTI, new YieldingWaitStrategy()); disruptor.handleEventsWith( (event, sequence, endOfBatch) -> COUNTER_RECEIVED.incrementAndGet()); disruptor.start(); final long t = System.currentTimeMillis(); for (int i = 0; i < numItems; i++) { disruptor.publishEvent((event, seq) -> event.set(seq)); } long d = System.currentTimeMillis() - t; NumberFormat nf = NumberFormat.getInstance(); System.out.println("========== qndSingleThread:"); System.out.println("Sent: " + nf.format(numItems) + " / Received: " + nf.format(COUNTER_RECEIVED.get()) + " / Duration: " + d + " / Speed: " + NumberFormat.getInstance().format((numItems * 1000.0 / d)) + " items/sec"); disruptor.shutdown(); }
public DisruptorQueueImpl(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) { this._queueName = PREFIX + queueName; _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait); _consumer = new Sequence(); _barrier = _buffer.newBarrier(); _buffer.addGatingSequences(_consumer); if (producerType == ProducerType.SINGLE) { consumerStartedFlag = true; } else { // make sure we flush the pending messages in cache first if (bufferSize < 2) { throw new RuntimeException("QueueSize must >= 2"); } try { publishDirect(FLUSH_CACHE, true); } catch (InsufficientCapacityException e) { throw new RuntimeException("This code should be unreachable!", e); } } }
/** * Create a new Ring Buffer with the specified producer type (SINGLE or MULTI) * * @param producerType producer type to use {@link ProducerType}. * @param factory used to create events within the ring buffer. * @param bufferSize number of elements to create within the ring buffer. * @param waitStrategy used to determine how to wait for new elements to become available. * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2 */ public static <E> RingBuffer<E> create(ProducerType producerType, EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { switch (producerType) { case SINGLE: return createSingleProducer(factory, bufferSize, waitStrategy); case MULTI: return createMultiProducer(factory, bufferSize, waitStrategy); default: throw new IllegalStateException(producerType.toString()); } }
/** * Start dispatching events * * @return a Future that asyncrhonously notifies an observer when this EventReactor is ready */ @SuppressWarnings("unchecked") public CompletableFuture<? extends EventReactor<T>> open() { if (isRunning.compareAndSet(false, true)) { CompletableFuture<EventReactor<T>> future = new CompletableFuture<>(); this.disruptor = new Disruptor<>(EVENT_FACTORY, ringSize, threadFactory, ProducerType.MULTI, new BusySpinWaitStrategy()); this.disruptor.handleEventsWith(this::handleEvent); this.ringBuffer = disruptor.start(); // Starts a timer thread this.timer = new Timer(); future.complete(this); return future; } else { return CompletableFuture.completedFuture(this); } }
@Test public void testLaterStartConsumer() throws InterruptedException { System.out.println("!!!!!!!!!!!!!!!Begin testLaterStartConsumer!!!!!!!!!!"); final AtomicBoolean messageConsumed = new AtomicBoolean(false); // Set queue length to 1, so that the RingBuffer can be easily full // to trigger consumer blocking DisruptorQueue queue = createQueue("consumerHang", ProducerType.MULTI, 2); push(queue, 1); Runnable producer = new Producer(queue); Runnable consumer = new Consumer(queue, new EventHandler<Object>() { long count = 0; @Override public void onEvent(Object obj, long sequence, boolean endOfBatch) throws Exception { messageConsumed.set(true); System.out.println("Consume " + count++); } }); run(producer, 0, 0, consumer, 50); Assert.assertTrue("disruptor message is never consumed due to consumer thread hangs", messageConsumed.get()); System.out.println("!!!!!!!!!!!!!!!!End testLaterStartConsumer!!!!!!!!!!"); }
@Test public void testSingleProducer() throws InterruptedException { System.out.println("!!!!!!!!!!!!!!Begin testSingleProducer!!!!!!!!!!!!!!"); final AtomicBoolean messageConsumed = new AtomicBoolean(false); // Set queue length to 1, so that the RingBuffer can be easily full // to trigger consumer blocking DisruptorQueue queue = createQueue("consumerHang", ProducerType.SINGLE, 1); push(queue, 1); Runnable producer = new Producer(queue); Runnable consumer = new Consumer(queue, new EventHandler<Object>() { long count = 0; @Override public void onEvent(Object obj, long sequence, boolean endOfBatch) throws Exception { messageConsumed.set(true); System.out.println("Consume " + count++); } }); run(producer, 0, 0, consumer, 50); Assert.assertTrue("disruptor message is never consumed due to consumer thread hangs", messageConsumed.get()); System.out.println("!!!!!!!!!!!!!!End testSingleProducer!!!!!!!!!!!!!!"); }
@Provides @Singleton @Named("disruptor") @SuppressWarnings("Unchecked") protected Disruptor<TwoPhaseEvent<T>> inputDisruptor(Provider<WorkHandler<TwoPhaseEvent<T>>> workHandlerProvider, Provider<EventHandler<TwoPhaseEvent<T>>> evenHandlerProvider) { Disruptor<TwoPhaseEvent<T>> disruptor = new Disruptor<>( TwoPhaseEvent.factory(outputEventFactory()), options.ringSize(), threadsProvider, ProducerType.SINGLE, new SleepingWaitStrategy()); WorkHandler<TwoPhaseEvent<T>>[] parsers = new WorkHandler[options.threads()]; for (int i = 0; i < options.threads(); i++) { parsers[i] = workHandlerProvider.get(); } disruptor.handleExceptionsWith(new FatalExceptionHandler()); disruptor.handleEventsWithWorkerPool(parsers).then(evenHandlerProvider.get()); return disruptor; }
@Provides @Singleton @Named("firstPassDisruptor") public Disruptor<TwoPhaseEvent<Void>> firstPassDisruptor( Provider<WorkHandler<TwoPhaseEvent<Void>>> statisticsHandlerProvider) { Disruptor<TwoPhaseEvent<Void>> disruptor = new Disruptor<>( TwoPhaseEvent.factory(() -> null), options.ringSize(), threadsProvider, ProducerType.SINGLE, new SleepingWaitStrategy()); WorkHandler<TwoPhaseEvent<Void>>[] parsers = new WorkHandler[options.threads()]; for (int i = 0; i < options.threads(); i++) { parsers[i] = statisticsHandlerProvider.get(); } disruptor.handleExceptionsWith(new FatalExceptionHandler()); disruptor.handleEventsWithWorkerPool(parsers); return disruptor; }
@Provides @Singleton @Named("secondPassDisruptor") public Disruptor<TwoPhaseEvent<SparseItem>> secondPassDisruptor( Provider<WorkHandler<TwoPhaseEvent<SparseItem>>> binningHandlerProvider, Provider<EventHandler<TwoPhaseEvent<SparseItem>>> outputHandlerProvider) { Disruptor<TwoPhaseEvent<SparseItem>> disruptor = new Disruptor<>( TwoPhaseEvent.factory(SparseItem::new), options.ringSize(), threadsProvider, ProducerType.SINGLE, new SleepingWaitStrategy()); WorkHandler<TwoPhaseEvent<SparseItem>>[] parsers = new WorkHandler[options.threads()]; for (int i = 0; i < options.threads(); i++) { parsers[i] = binningHandlerProvider.get(); } disruptor.handleExceptionsWith(new FatalExceptionHandler()); disruptor.handleEventsWithWorkerPool(parsers) .then(outputHandlerProvider.get()); return disruptor; }
public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); int bufferSize = 1024; WaitStrategy ws = new BlockingWaitStrategy(); Disruptor<CpuUsageEvent> disruptor = new Disruptor<>(factory, bufferSize, executor, ProducerType.SINGLE, ws); disruptor.handleEventsWith(handler); RingBuffer<CpuUsageEvent> ringBuffer = disruptor.start(); publishEvents(ringBuffer); disruptor.shutdown(); executor.shutdown(); }
@SuppressWarnings("unchecked") protected void initDis() { disruptorExecutor = new ThreadPoolExecutor(1,1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),new NamedThreadFactory("disruptor")); int bufferSize = 1024*1024; disruptor = new Disruptor<>(new QueueEventFactory(), bufferSize, disruptorExecutor,ProducerType.MULTI,YIELDING_WAIT); //注册消费者 disruptor.handleEventsWithWorkerPool(new QueueEventHandler()); disruptor.start(); }
@SuppressWarnings("unchecked") private DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) { this.executor = Preconditions.checkNotNull(executor); disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, queueDepth, executor, ProducerType.MULTI, strategy); disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS); disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE); disruptor.start(); }
public DisruptorPublisher(int bufferSize, TestHandler handler) { this.handler = new HelloEventHandler(handler); EventFactory<HelloEvent> eventFactory = new HelloEventFactory(); int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必须是 2 的 N 次方; executor = Executors.newSingleThreadExecutor(); disruptor = new Disruptor<HelloEvent>( eventFactory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, YIELDING_WAIT); }
public static void main(String[] args) { EventFactory<HelloEvent> eventFactory = new HelloEventFactory(); int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必须是 2 的 N 次方; Disruptor<HelloEvent> disruptor = new Disruptor<HelloEvent>( eventFactory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); EventHandler<HelloEvent> eventHandler = new HelloEventHandler(); disruptor.handleEventsWith(eventHandler, eventHandler); disruptor.start(); }
public LogicProcessor(EventFactory<ConcurrentEvent> factory, EventHandler<ConcurrentEvent> handler, int rbSize, ExecutorService es, ProducerType pt, WaitStrategy ws) { this.disruptor = new Disruptor<>(factory, rbSize, es, pt, ws); this.disruptor.handleEventsWith(handler); // disruptor.handleExceptionsWith(); this.disruptor.start(); this.msgQueue = newQueue(); }
@Override protected Disruptor<ConcurrentEvent> initialValue() { Disruptor<ConcurrentEvent> disruptor = new Disruptor<>( ConcurrentEventFactory.DEFAULT, DEFAULT_RING_BUFFER_SIZE, CACHED_THREAD_POOL, ProducerType.SINGLE, new BlockingWaitStrategy()); disruptor.handleEventsWith(new ConcurrentHandler()); // disruptor.handleExceptionsWith(); disruptor.start(); return disruptor; }
@SuppressWarnings("unchecked") public OneToOneTranslatorThroughputTest() { Disruptor<ValueEvent> disruptor = new Disruptor<ValueEvent>( ValueEvent.EVENT_FACTORY, BUFFER_SIZE, executor, ProducerType.SINGLE, new YieldingWaitStrategy()); disruptor.handleEventsWith(handler); this.ringBuffer = disruptor.start(); }
@Parameters public static Collection<Object[]> generateData() { Object[][] allocators = { {ProducerType.SINGLE, new BlockingWaitStrategy()}, {ProducerType.MULTI, new BlockingWaitStrategy()}, }; return Arrays.asList(allocators); }
private Sequencer newProducer(ProducerType producerType, int bufferSize, WaitStrategy waitStrategy) { switch (producerType) { case SINGLE: return new SingleProducerSequencer(bufferSize, waitStrategy); case MULTI: return new MultiProducerSequencer(bufferSize, waitStrategy); default: throw new IllegalStateException(producerType.toString()); } }
@SuppressWarnings("unchecked") @Before public void setUp() { disruptor = new Disruptor<byte[]>( new ByteArrayFactory(256), 1024, Executors.newCachedThreadPool(), ProducerType.SINGLE, new BlockingWaitStrategy()); disruptor.handleEventsWith(eventHandler); disruptor.setDefaultExceptionHandler(new FatalExceptionHandler()); }
@SuppressWarnings({"deprecation", "unchecked", "varargs"}) private DisruptorEventQueue() { // Create new Disruptor for processing. Note that this uses a single thread for processing; this // ensures that the event handler can take unsynchronized actions whenever possible. disruptor = new Disruptor<DisruptorEvent>( new DisruptorEventFactory(), DISRUPTOR_BUFFER_SIZE, Executors.newSingleThreadExecutor(new DaemonThreadFactory("OpenCensus.Disruptor")), ProducerType.MULTI, new SleepingWaitStrategy()); disruptor.handleEventsWith(new DisruptorEventHandler()); disruptor.start(); ringBuffer = disruptor.getRingBuffer(); }
/** * Creates a new MetricsMetaAPIImpl * @param properties The configuration properties */ public MetricsMetaAPIImpl(final Properties properties) { dataSource = SQLCompilerDataSource.getInstance(properties); sqlWorker = dataSource.getSQLWorker(); tagPredicateCache = new TagPredicateCache(sqlWorker); fjPool = new ManagedForkJoinPool(getClass().getSimpleName(), Runtime.getRuntime().availableProcessors(), true, JMXHelper.objectName(getClass())); metaReader = new DefaultMetaReader(sqlWorker); dispatcher = new WorkQueueDispatcher("MetricsMetaDispatcher", Runtime.getRuntime().availableProcessors(), 1024, this, ProducerType.MULTI, new LiteBlockingWaitStrategy()); log.info("Dispatcher Alive: {}", dispatcher.alive()); }
/** * 根据config初始化特殊通道 * * @param symbol 事件 * @param listeners 对应的监听器集合 */ private void initSpecDisruptor(String symbol, List<ElectronsListener> listeners) { ExecutorService specPool = Executors.newFixedThreadPool(conf.getSpecCircuitNum(), new ThreadFactory() { final AtomicInteger cursor = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "Electrons Thread (from spec channel) : thread" + cursor.incrementAndGet()); } }); pools.add(specPool); Disruptor<ElectronsHolder> disruptor = new Disruptor<>(ElectronsHolder::new, conf.getSpecCircuitLen(), specPool, ProducerType.MULTI, new LiteBlockingWaitStrategy()); disruptor.handleExceptionsWith(new ElecExceptionHandler("Spec Disruptor {" + symbol + "}")); //初始化管道并放入集合中 SpecChannel specChannel = new SpecChannel(disruptor); if (conf.isBreaker()) { EventCountCircuitBreaker breaker = new EventCountCircuitBreaker(conf.getErrorNum(), conf.getPerUnit(), conf.getUnit(), conf.getCloseThreshold(), conf.getRest(), conf.getRestUnit()); specChannel.setBreaker(breaker); } //构建listener顺序 ListenerChainBuilderNew.buildChain(specChannel, listeners); channelMap.put(SPEC_CHANNEL_PREFIX + symbol, specChannel); }
/** * 初始化正常管道,任何情况下都会有 * * @param pool 线程池 */ private void initNormalChannel(ExecutorService pool) { Disruptor<ElectronsHolder> normalDis = new Disruptor<>(ElectronsHolder::new, conf.getCircuitLen(), pool, ProducerType.MULTI, new LiteBlockingWaitStrategy()); WorkHandler[] workHandlers = new WorkHandler[conf.getCircuitNum()]; Arrays.fill(workHandlers, (WorkHandler<ElectronsHolder>) electronsHolder -> electronsHolder.handle()); normalDis.handleEventsWithWorkerPool(workHandlers); normalDis.handleExceptionsWith(new ElecExceptionHandler("Normal Disruptor")); //初始化channel Channel normalChannel = new NormalChannel(normalDis); //配置限流相关 normalChannel.confLimitRate(conf.isLimitRate(), conf.getPermitsPerSecond(), conf.isWarmup(), conf.getWarmupPeriod(), conf.getWarmPeriodUnit()); channelMap.put(NORMAL_CHANNEL_KEY, normalChannel); }
public static void main(String[] args) throws InterruptedException { long beginTime=System.currentTimeMillis(); int bufferSize=1024; ExecutorService executor=Executors.newFixedThreadPool(4); //������캯�����������������˽�����2��demo֮��Ϳ��¾������ˣ���������~ Disruptor<TradeTransaction> disruptor=new Disruptor<TradeTransaction>(new EventFactory<TradeTransaction>() { @Override public TradeTransaction newInstance() { return new TradeTransaction(); } }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); //ʹ��disruptor������������C1,C2 EventHandlerGroup<TradeTransaction> handlerGroup=disruptor.handleEventsWith(new TradeTransactionVasConsumer(),new TradeTransactionInDBHandler()); TradeTransactionJMSNotifyHandler jmsConsumer=new TradeTransactionJMSNotifyHandler(); //������C1,C2����֮��ִ��JMS��Ϣ���Ͳ��� Ҳ���������ߵ�C3 handlerGroup.then(jmsConsumer); disruptor.start();//���� CountDownLatch latch=new CountDownLatch(1); //�������� executor.submit(new TradeTransactionPublisher(latch, disruptor)); latch.await();//�ȴ�����������. disruptor.shutdown(); executor.shutdown(); System.out.println("�ܺ�ʱ:"+(System.currentTimeMillis()-beginTime)); // long tt= System.currentTimeMillis(); // for (int i = 0; i < 1000; i++) { // int j=i; // } // System.out.println("�ܺ�ʱ:"+(System.currentTimeMillis()-tt)); }