Java 类java.util.concurrent.ConcurrentLinkedQueue 实例源码

项目:boohee_v5.6    文件:CachedThreadScheduler.java   
CachedWorkerPool(long keepAliveTime, TimeUnit unit) {
    this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0;
    this.expiringWorkerQueue = new ConcurrentLinkedQueue();
    this.allWorkers = new CompositeSubscription();
    ScheduledExecutorService evictor = null;
    Future<?> task = null;
    if (unit != null) {
        evictor = Executors.newScheduledThreadPool(1, CachedThreadScheduler.EVICTOR_THREAD_FACTORY);
        NewThreadWorker.tryEnableCancelPolicy(evictor);
        task = evictor.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                CachedWorkerPool.this.evictExpiredWorkers();
            }
        }, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
    }
    this.evictorService = evictor;
    this.evictorTask = task;
}
项目:diorite-configs-java8    文件:YamlCollectionCreator.java   
static void putAllCollections(Map<Class<?>, IntFunction<?>> map, Map<Class<?>, Function<?, ?>> unmodMap)
{
    safePut(map, ArrayList.class, ArrayList::new);
    safePut(map, HashSet.class, LinkedHashSet::new);
    safePut(map, Properties.class, x -> new Properties());
    safePut(map, Hashtable.class, Hashtable::new);

    safePut(map, Collection.class, ArrayList::new);
    safePut(map, Set.class, LinkedHashSet::new);
    safePut(map, List.class, ArrayList::new);
    safePut(map, SortedSet.class, x -> new TreeSet<>());
    safePut(map, Queue.class, x -> new ConcurrentLinkedQueue<>());
    safePut(map, Deque.class, x -> new ConcurrentLinkedDeque<>());
    safePut(map, BlockingQueue.class, x -> new LinkedBlockingQueue<>());
    safePut(map, BlockingDeque.class, x -> new LinkedBlockingDeque<>());


    safePut(map, HashMap.class, LinkedHashMap::new);
    safePut(map, LinkedHashMap.class, LinkedHashMap::new);
    safePut(map, ConcurrentHashMap.class, ConcurrentHashMap::new);

    safePut(map, Map.class, LinkedHashMap::new);
    safePut(map, ConcurrentMap.class, x -> new ConcurrentSkipListMap<>());
    safePut(map, ConcurrentNavigableMap.class, x -> new ConcurrentSkipListMap<>());
    safePut(map, SortedMap.class, i -> new TreeMap<>());
}
项目:object-diff    文件:ObjectDiffCalculator.java   
@Override
public Collection<Diff> apply(Object before, Object after, String description) {
    Collection<Diff> diffs = new ConcurrentLinkedQueue<>();
    if (before == null && after == null) {
        diffs.add(new Diff.Builder().hasNotChanged().setFieldDescription(description).build());
    } else if (before == null) {
        diffs.add(new Diff.Builder().isAdded().setAfterValue(after).setFieldDescription(description).build());
    } else if (after == null) {
        diffs.add(new Diff.Builder().isDeleted().setBeforeValue(before).setFieldDescription(description).build());
    } else {
        if (before.equals(after)) {
            diffs.add(new Diff.Builder().hasNotChanged().setBeforeValue(before).setAfterValue(after).setFieldDescription(description).build());
        } else {
            diffs.add(new Diff.Builder().isUpdated().setBeforeValue(before).setAfterValue(after).setFieldDescription(description).build());
        }
    }
    return diffs;
}
项目:NioImapClient    文件:ImapClient.java   
public ImapClient(ImapClientConfiguration configuration,
                  Channel channel,
                  SslContext sslContext,
                  EventExecutorGroup promiseExecutor,
                  String clientName) {
  this.logger = LogUtils.loggerWithName(ImapClient.class, clientName);
  this.configuration = configuration;
  this.channel = channel;
  this.sslContext = sslContext;
  this.promiseExecutor = promiseExecutor;
  this.clientState = new ImapClientState(clientName, promiseExecutor);
  this.codec = new ImapCodec(clientState);
  this.pendingWriteQueue = new ConcurrentLinkedQueue<>();
  this.connectionShutdown = new AtomicBoolean(false);
  this.connectionClosed = new AtomicBoolean(false);
  this.capabilities = new AtomicReference<>(null);

  configureChannel();
}
项目:jdk8u-jdk    文件:RemovePollRace.java   
Collection<Queue<Boolean>> concurrentQueues() {
    List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>();
    queues.add(new ConcurrentLinkedDeque<Boolean>());
    queues.add(new ConcurrentLinkedQueue<Boolean>());
    queues.add(new ArrayBlockingQueue<Boolean>(count, false));
    queues.add(new ArrayBlockingQueue<Boolean>(count, true));
    queues.add(new LinkedBlockingQueue<Boolean>());
    queues.add(new LinkedBlockingDeque<Boolean>());
    queues.add(new LinkedTransferQueue<Boolean>());

    // Following additional implementations are available from:
    // http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
    // queues.add(new SynchronizedLinkedListQueue<Boolean>());

    // Avoid "first fast, second slow" benchmark effect.
    Collections.shuffle(queues);
    return queues;
}
项目:information-retrieval    文件:CorpusReader.java   
/**
 * Parses all document present in the referenced file path
 *
 * @param stringsQueue to parse
 * @return list with all documents with it's content in untokenized/unstemmed raw keywords
 */
public List<Document> parse(ConcurrentLinkedQueue<String> stringsQueue) {

    //compile our corpus regex so we can apply it on our parsing process
    Pattern id_content = Pattern.compile(CORPUS_REGEX_DOCUMENT);

    //parsing process
    return stringsQueue.parallelStream()
            .filter(line -> !line.isEmpty()) // line is not empty
            .map(id_content::matcher)// regex it
            .filter(Matcher::find) // did we regex anything? if so create document
            .map(match ->
            {
                //get the corpusID for this new file that we processing
                int corpusID = corpusCount.getAndIncrement();

                //map the corpusID to its corresponding filepath
                corpusIDToPath.computeIfAbsent(corpusID, v -> new ImmutablePair<>(match.group(4), Integer.parseInt(match.group(1))));
                return new Document(
                        corpusID, //first match is doc id and used to create our own doc id
                        Arrays.asList(match.group(5).split(" ")).parallelStream() // split document content in words
                                .collect(Collectors.toList())); // and put them in a list
            })
            .collect(Collectors.toList()); //collect all parsed lines
}
项目:openjdk-jdk10    文件:RemovePollRace.java   
Collection<Queue<Boolean>> concurrentQueues() {
    List<Queue<Boolean>> queues = new ArrayList<>();
    queues.add(new ConcurrentLinkedDeque<Boolean>());
    queues.add(new ConcurrentLinkedQueue<Boolean>());
    queues.add(new ArrayBlockingQueue<Boolean>(count, false));
    queues.add(new ArrayBlockingQueue<Boolean>(count, true));
    queues.add(new LinkedBlockingQueue<Boolean>());
    queues.add(new LinkedBlockingDeque<Boolean>());
    queues.add(new LinkedTransferQueue<Boolean>());

    // Following additional implementations are available from:
    // http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
    // queues.add(new SynchronizedLinkedListQueue<Boolean>());

    // Avoid "first fast, second slow" benchmark effect.
    Collections.shuffle(queues);
    return queues;
}
项目:otus_java_2017_06    文件:MessageSystem.java   
@SuppressWarnings("InfiniteLoopStatement")
public void start() {
    for (Map.Entry<Address, Addressee> entry : addresseeMap.entrySet()) {
        new Thread(() -> {
            while (true) {

                ConcurrentLinkedQueue<Message> queue = messagesMap.get(entry.getKey());
                while (!queue.isEmpty()) {
                    Message message = queue.poll();
                    message.exec(entry.getValue());
                }
                try {
                    Thread.sleep(MessageSystem.DEFAULT_STEP_TIME);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}
项目:ditb    文件:PerfInserterBase.java   
public PerfInserterBase(Configuration conf, TableName tableName, String loadDataDir,
    int processId, int threadNum, String statFilePath, ConcurrentLinkedQueue<String> reportQueue,
    AbstractWorkload workload) throws IOException {
  this.tableName = tableName;
  this.processId = processId;
  this.threadNum = threadNum;
  this.loadDataDir = loadDataDir;
  this.statFilePath = statFilePath;
  this.reportQueue = reportQueue;
  this.conf = conf;
  loaders = new RunnableDataLoader[threadNum];
  inserters = new RunnablePerfInserter[threadNum];
  threadFinishMark = new boolean[threadNum];
  threadLatency = new double[threadNum];
  globalBoxNumber = new int[ResultParser.LatencyBoxPivots.length];
  for (int i = 0; i < globalBoxNumber.length; ++i) {
    globalBoxNumber[i] = 0;
  }
  this.workload = workload;
}
项目:onedatashare    文件:HTTPBuilder.java   
/** Constructor that sets up the connection */
public HTTPBuilder(HTTPSession session) {
  try {
    boot = new Bootstrap();
    boot.group(session.workGroup)
      .channel(HTTPChannel.class)
      .handler(new HTTPInitializer(session.uri.scheme(), this));

    // Channel setup
    onConnectBell = new Bell<Void>();
    setUri(session.uri);
    setupWithTest();

    // Tap bells queue setup
    tapBellQueue = new ConcurrentLinkedQueue<Bell<Void>>();
  } catch (HTTPException e) {
    System.err.println(e.getMessage());
  }
}
项目:iot-plat    文件:RotatingList.java   
public Object remove(K key) {
    for (ConcurrentLinkedQueue<K> bucket : _buckets) {
        if (contains(key,bucket)) {
            return bucket.remove(key);
        }
    }
    return null;
}
项目:openjdk-jdk10    文件:ConcurrentLinkedQueueTest.java   
/**
 * addAll(this) throws IllegalArgumentException
 */
public void testAddAllSelf() {
    ConcurrentLinkedQueue q = populatedQueue(SIZE);
    try {
        q.addAll(q);
        shouldThrow();
    } catch (IllegalArgumentException success) {}
}
项目:openjdk-jdk10    文件:ConcurrentLinkedQueueTest.java   
/**
 * addAll(null) throws NullPointerException
 */
public void testAddAll1() {
    ConcurrentLinkedQueue q = new ConcurrentLinkedQueue();
    try {
        q.addAll(null);
        shouldThrow();
    } catch (NullPointerException success) {}
}
项目:openjdk-jdk10    文件:ConcurrentLinkedQueueTest.java   
/**
 * size changes when elements added and removed
 */
public void testSize() {
    ConcurrentLinkedQueue q = populatedQueue(SIZE);
    for (int i = 0; i < SIZE; ++i) {
        assertEquals(SIZE - i, q.size());
        q.remove();
    }
    for (int i = 0; i < SIZE; ++i) {
        assertEquals(i, q.size());
        q.add(new Integer(i));
    }
}
项目:openjdk-jdk10    文件:ConcurrentLinkedQueueTest.java   
/**
 * clear removes all elements
 */
public void testClear() {
    ConcurrentLinkedQueue q = populatedQueue(SIZE);
    q.clear();
    assertTrue(q.isEmpty());
    assertEquals(0, q.size());
    q.add(one);
    assertFalse(q.isEmpty());
    q.clear();
    assertTrue(q.isEmpty());
}
项目:openjdk-jdk10    文件:ConcurrentLinkedQueueTest.java   
/**
 * Initializing from Collection with some null elements throws NPE
 */
public void testConstructor5() {
    Integer[] ints = new Integer[SIZE];
    for (int i = 0; i < SIZE - 1; ++i)
        ints[i] = new Integer(i);
    try {
        new ConcurrentLinkedQueue(Arrays.asList(ints));
        shouldThrow();
    } catch (NullPointerException success) {}
}
项目:UDOOBluLib-android    文件:SeqObserverQueue.java   
private void init(BlockingQueue<Callable> tBlockingQeque, int wait){
    tBlockingDeque = tBlockingQeque;
    mExecutorService = Executors.newSingleThreadExecutor();
    mBusy = new AtomicBoolean(false);
    observers = new ConcurrentLinkedQueue<>();
    changed = false;
    mWAIT = wait;
}
项目:apache-tomcat-7.0.73-with-comment    文件:ConcurrentMessageDigest.java   
/**
 * Ensures that {@link #digest(String, byte[][])} will support the specified
 * algorithm. This method <b>must</b> be called and return successfully
 * before using {@link #digest(String, byte[][])}.
 *
 * @param algorithm The message digest algorithm to be supported
 *
 * @throws NoSuchAlgorithmException If the algorithm is not supported by the
 *                                  JVM
 */
public static void init(String algorithm) throws NoSuchAlgorithmException {
    synchronized (queues) {
        if (!queues.containsKey(algorithm)) {
            MessageDigest md = MessageDigest.getInstance(algorithm);
            Queue<MessageDigest> queue =
                    new ConcurrentLinkedQueue<MessageDigest>();
            queue.add(md);
            queues.put(algorithm, queue);
        }
    }
}
项目:SpeechToText-WebSockets-Java    文件:CallsTelemetry.java   
public void recordCall(String endpoint) {
    String now = newTimestamp();
    Queue<String> timestamps = callTimestamps.get(endpoint);
    if (timestamps == null) {
        Queue<String> newTimestamps = new ConcurrentLinkedQueue<>();
        timestamps = callTimestamps.putIfAbsent(endpoint, newTimestamps);
        if (timestamps == null) {
            timestamps = newTimestamps;
        }
    }
    timestamps.add(now);
}
项目:Jenisys3    文件:RakNetServer.java   
public RakNetServer(ThreadedLogger logger, int port, String interfaz) {
    this.port = port;
    if (port < 1 || port > 65536) {
        throw new IllegalArgumentException("Invalid port range");
    }

    this.interfaz = interfaz;
    this.logger = logger;

    this.externalQueue = new ConcurrentLinkedQueue<>();
    this.internalQueue = new ConcurrentLinkedQueue<>();

    this.start();
}
项目:openjdk-jdk10    文件:WhiteBox.java   
int nodeCount(ConcurrentLinkedQueue q) {
    int i = 0;
    for (Object p = head(q); p != null; ) {
        i++;
        if (p == (p = next(p))) p = head(q);
    }
    return i;
}
项目:cruise-control    文件:BrokerFailureDetectorTest.java   
@Test
public void testDetectorStartWithFailedBrokers() throws Exception {
  Time mockTime = getMockTime();
  Queue<Anomaly> anomalies = new ConcurrentLinkedQueue<>();
  BrokerFailureDetector detector = createBrokerFailureDetector(anomalies, mockTime);

  try {
    int brokerId = 0;
    killBroker(brokerId);
    detector.startDetection();
    assertEquals(Collections.singletonMap(brokerId, 100L), detector.failedBrokers());
  } finally {
    detector.shutdown();
  }
}
项目:L2J-Global    文件:CharEffectList.java   
/**
 * Gets triggered skill skills.
 * @return the triggered skill skills
 */
public Queue<BuffInfo> getTriggered()
{
    if (_triggered == null)
    {
        synchronized (this)
        {
            if (_triggered == null)
            {
                _triggered = new ConcurrentLinkedQueue<>();
            }
        }
    }
    return _triggered;
}
项目:openjdk-jdk10    文件:ConcurrentLinkedQueueTest.java   
/**
 * Queue contains all elements of collection used to initialize
 */
public void testConstructor6() {
    Integer[] ints = new Integer[SIZE];
    for (int i = 0; i < SIZE; ++i)
        ints[i] = new Integer(i);
    ConcurrentLinkedQueue q = new ConcurrentLinkedQueue(Arrays.asList(ints));
    for (int i = 0; i < SIZE; ++i)
        assertEquals(ints[i], q.poll());
}
项目:ZooKeeper    文件:FollowerZooKeeperServer.java   
/**
 * @param port
 * @param dataDir
 * @throws IOException
 */
FollowerZooKeeperServer(FileTxnSnapLog logFactory,QuorumPeer self,
        DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException {
    super(logFactory, self.tickTime, self.minSessionTimeout,
            self.maxSessionTimeout, treeBuilder, zkDb, self);
    this.pendingSyncs = new ConcurrentLinkedQueue<Request>();
}
项目:AndroidMuseumBleManager    文件:BluetoothConnectManager.java   
public BluetoothConnectManager(Context context) {
    super(context);
    subscribeQueue = new ConcurrentLinkedQueue<BluetoothSubScribeData>();
    mBluetoothUtils = BluetoothUtils.getInstance(context);
    bluetoothManager = (BluetoothManager) context.getSystemService(Context.BLUETOOTH_SERVICE);
    gattMap = new ConcurrentHashMap<String, BluetoothGatt>(); //会有并发的断开和连接,故而必须使用并发ConcurrentHashMap才行,否则会有ConcurrentModificationException
    connectStateListeners = new ArrayList<>();
    BleManager.getBleParamsOptions();
}
项目:log4j2-elasticsearch    文件:RollingIndexNameFormatterTest.java   
private ConcurrentLinkedQueue<TestTuple> generateLogEvents() {
    ConcurrentLinkedQueue<TestTuple> events = new ConcurrentLinkedQueue<>();

    Random random = new Random();
    for (int ii = 0; ii < 1000; ii++) {
        LogEvent logEvent = mock(LogEvent.class);

        int increment = random.nextInt(3) - 1;
        when(logEvent.getTimeMillis()).thenReturn(DEFAULT_TEST_TIME_IN_MILLIS + increment * 60000 + random.nextInt(60000));
        events.add(new TestTuple(logEvent, increment));
    }
    return events;
}
项目:ditb    文件:HybridWorker.java   
public void loadAndExecuteOperations() throws InterruptedException, IOException {
  for (int i = 0; i < threadNum; ++i) {
    threadFinishMark[i] = false;
    ConcurrentLinkedQueue<Operation> queue = new ConcurrentLinkedQueue<>();
    loaders[i] = new OperationLoader(i, reportInterval,
        DITBUtil.getDataFileName(loadDataDir, processId, i), queue);
    executors[i] = getOperationExecutor(i, reportInterval, queue, finishCounter);
    new Thread(loaders[i]).start();
    new Thread(executors[i]).start();
  }
}
项目:openjdk-jdk10    文件:OfferRemoveLoops.java   
void test(String[] args) throws Throwable {
    testQueue(new LinkedBlockingQueue(10));
    testQueue(new LinkedBlockingQueue());
    testQueue(new LinkedBlockingDeque(10));
    testQueue(new LinkedBlockingDeque());
    testQueue(new ArrayBlockingQueue(10));
    testQueue(new PriorityBlockingQueue(10));
    testQueue(new ConcurrentLinkedDeque());
    testQueue(new ConcurrentLinkedQueue());
    testQueue(new LinkedTransferQueue());
}
项目:mug    文件:Parallelizer.java   
void board(Runnable task) {
  requireNonNull(task);
  AtomicBoolean done = new AtomicBoolean();
  // Use '<:' to denote happens-before throughout this method body.
  Future<?> future = executor.submit(() -> {
    try {
      try {
        task.run();
      } finally {
        done.set(true);  // A
        onboard.remove(done);  // B
      }
    } catch (Throwable e) {
      ConcurrentLinkedQueue<Throwable> toPropagate = thrown;
      if (toPropagate == null) {
        // The main thread propagates exceptions as soon as any task fails.
        // If a task did not respond in time and yet fails afterwards, the main thread has
        // already thrown and nothing will propagate this exception.
        // So just log it as best effort.
        logger.log(Level.WARNING, "Orphan task failure", e);
      } else {
        // Upon race condition, the exception may be added while the main thread is propagating.
        // It's ok though since the best we could have done is logging.
        toPropagate.add(e);
      }
    } finally {
      semaphore.release();
    }
  });
  onboard.put(done, future);  // C
  checkInFlight();
  // A <: B, C <: D <: E
  // if B <: C => A <: C => done == true => put() <: remove()
  // if C <: B => put() <: remove()
  // remove() could be executed more than once, but it's idempotent.
  if (done.get()) {  // D
    onboard.remove(done);  // E
  }
  propagateExceptions();
}
项目:openjdk-jdk10    文件:ConcurrentLinkedQueueTest.java   
/**
 * toArray(incompatible array type) throws ArrayStoreException
 */
public void testToArray1_BadArg() {
    ConcurrentLinkedQueue q = populatedQueue(SIZE);
    try {
        q.toArray(new String[10]);
        shouldThrow();
    } catch (ArrayStoreException success) {}
}
项目:openjdk-jdk10    文件:ConcurrentLinkedQueueTest.java   
/**
 * isEmpty is true before add, false after
 */
public void testEmpty() {
    ConcurrentLinkedQueue q = new ConcurrentLinkedQueue();
    assertTrue(q.isEmpty());
    q.add(one);
    assertFalse(q.isEmpty());
    q.add(two);
    q.remove();
    q.remove();
    assertTrue(q.isEmpty());
}
项目:JRediClients    文件:RedissonFairLockTest.java   
@Test
public void testConcurrency_MultiInstance_Ordering() throws InterruptedException {
    final ConcurrentLinkedQueue<Thread> queue = new ConcurrentLinkedQueue<>();
    final AtomicInteger lockedCounter = new AtomicInteger();

    int totalThreads = Runtime.getRuntime().availableProcessors()*2;
    for (int i = 0; i < totalThreads; i++) {
        Thread t1 = new Thread(() -> {
            Lock lock = redisson.getFairLock("testConcurrency_MultiInstance2");
            queue.add(Thread.currentThread());
            lock.lock();
            Thread t = queue.poll();
            assertThat(t).isEqualTo(Thread.currentThread());
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            lockedCounter.incrementAndGet();
            lock.unlock();
        });
        Thread.sleep(10);
        t1.start();
    }

    await().atMost(30, TimeUnit.SECONDS).until(() -> assertThat(lockedCounter.get()).isEqualTo(totalThreads));
}
项目:navigator    文件:MessageQueue.java   
public void pushMessageTo(Key recipient, Object message) {
    Queue<Object> messageQueue = messages.get(recipient);
    if(messageQueue == null) {
        messageQueue = new ConcurrentLinkedQueue<>();
        messages.put(recipient, messageQueue);
    }
    messageQueue.add(message);
}
项目:openjdk-jdk10    文件:ConcurrentLinkedQueueTest.java   
/**
 * iterator ordering is FIFO
 */
public void testIteratorOrdering() {
    final ConcurrentLinkedQueue q = new ConcurrentLinkedQueue();
    q.add(one);
    q.add(two);
    q.add(three);

    int k = 0;
    for (Iterator it = q.iterator(); it.hasNext();) {
        assertEquals(++k, it.next());
    }

    assertEquals(3, k);
}
项目:tinkergraph-gremlin    文件:TinkerMessenger.java   
private void addMessage(final Vertex vertex, final M message, MessageScope messageScope) {
    this.messageBoard.sendMessages.compute(messageScope, (ms, messages) -> {
        if(null==messages) messages = new ConcurrentHashMap<>();
        return messages;
    });
    this.messageBoard.sendMessages.get(messageScope).compute(vertex, (v, queue) -> {
        if (null == queue) queue = new ConcurrentLinkedQueue<>();
        queue.add(null != this.combiner && !queue.isEmpty() ? this.combiner.combine(queue.remove(), message) : message);
        return queue;
    });
}
项目:ndbc    文件:LockFreePool.java   
private LockFreePool(final Supplier<Future<T>> supplier, final Optional<Integer> maxSize,
    final Optional<Integer> maxWaiters,
    final Optional<Duration> validationInterval, final ScheduledExecutorService scheduler) {
  this.supplier = supplier;
  this.sizeSemaphore = semaphore(maxSize);
  this.waitersSemaphore = semaphore(maxWaiters);
  // TODO is this the best data structure?
  this.items = new ConcurrentLinkedQueue<>();
  this.waiters = new ConcurrentLinkedQueue<>();
  validationInterval.ifPresent(i -> scheduleValidation(i, scheduler));
}
项目:feeyo-redisproxy    文件:ConQueue.java   
public RedisBackendConnection takeIdleCon() {       
    ConcurrentLinkedQueue<RedisBackendConnection> f1 = cons;
    RedisBackendConnection con = f1.poll();
    if (con == null || con.isClosed() || !con.isConnected() ) {
        return null;
    } else {
        return con;
    }
}
项目:ripostiglio    文件:InMemoryEventStoreTest.java   
@Test
public void appendManyEventsOnEmptyEventStore() throws Exception {

  Collection<Event<?>> events = new ConcurrentLinkedQueue<>();

  EventStore eventStore = new InMemoryEventStore(events);
  eventStore.append(FIRST_EVENT);
  eventStore.append(SECOND_EVENT);
  eventStore.append(THIRD_EVENT);

  assertThat(events, contains(FIRST_EVENT, SECOND_EVENT, THIRD_EVENT));
}
项目:ditb    文件:PerfMD.java   
public RunnableMDSecondaryPerfInsert(int id, int reportInterval,
    ConcurrentLinkedQueue<AbstractDITBRecord> queue, FinishCounter fc, MDHBaseAdmin mdAdmin) {
  super(id, reportInterval, queue, fc);
  try {
    table = conn.getTable(opTableName);
  } catch (IOException e) {
    e.printStackTrace();
  }
}