public MatrixOpLogCache() { opLogs = new ConcurrentHashMap<>(); messageQueue = new PriorityBlockingQueue<OpLogMessage>(100, new PriorityComparator()); seqIdToMessageMaps = new Int2ObjectOpenHashMap<Int2ObjectAVLTreeMap<OpLogMessage>>(); waitedMessageQueues = new Int2ObjectOpenHashMap<LinkedBlockingQueue<OpLogMessage>>(); flushListeners = new Int2ObjectOpenHashMap<List<OpLogMessage>>(); seqIdGenerator = new AtomicInteger(0); mergingCounters = new Int2IntOpenHashMap(); stopped = new AtomicBoolean(false); messageToFutureMap = new ConcurrentHashMap<OpLogMessage, Future<VoidResult>>(); }
@Override public void run() { while (!stopped.get() && !Thread.currentThread().isInterrupted()) { OpLogMessage message; try { message = messageQueue.take(); } catch (InterruptedException e) { LOG.warn("oplog-merge-dispatcher interrupted"); return; } if (LOG.isDebugEnabled()) { LOG.debug("receive oplog request " + message); } int matrixId = message.getMatrixId(); switch (message.getType()) { case MERGE: { // If the matrix op log cache does not exist for the matrix, create a new one for the // matrix // and add it to cache maps if (!opLogs.containsKey(matrixId)) { mergingCounters.put(matrixId, 0); addMatrixOpLog(matrixId); } if (!seqIdToMessageMaps.containsKey(matrixId)) { seqIdToMessageMaps.put(matrixId, new Int2ObjectAVLTreeMap<OpLogMessage>()); } // Add the message to the tree map if (!seqIdToMessageMaps.get(matrixId).containsKey(message.getSeqId())) { seqIdToMessageMaps.get(matrixId).put(message.getSeqId(), message); } if (needWait(message.getMatrixId(), message.getSeqId())) { // If there are flush / clock requests blocked, we need to put this merge request into // the waiting queue LOG.debug("add message=" + message + " to wait queue"); addWaitQueue(message); } else { // Launch a merge worker to merge the update to matrix op log cache mergingCounters.addTo(message.getMatrixId(), 1); merge((OpLogMergeMessage) message); } break; } case MERGE_SUCCESS: { if (LOG.isDebugEnabled()) { LOG.debug(printMergingCounters()); } // Remove the message from the tree map seqIdToMessageMaps.get(matrixId).remove(message.getSeqId()); mergingCounters.addTo(message.getMatrixId(), -1); // Wake up blocked flush/clock request checkAndWakeUpListeners(message.getMatrixId()); break; } case CLOCK: case FLUSH: { // Add flush/clock request to listener list to waiting for all the existing // updates are merged addToListenerList(message); // Wake up blocked flush/clock request checkAndWakeUpListeners(message.getMatrixId()); break; } } } }