MemStoreChunkPool(Configuration conf, int chunkSize, int maxCount, int initialCount) { this.maxCount = maxCount; this.chunkSize = chunkSize; this.reclaimedChunks = new LinkedBlockingQueue<Chunk>(); for (int i = 0; i < initialCount; i++) { Chunk chunk = new Chunk(chunkSize); chunk.init(); reclaimedChunks.add(chunk); } final String n = Thread.currentThread().getName(); scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat(n+"-MemStoreChunkPool Statistics") .setDaemon(true).build()); this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); }
/** * Add the chunks to the pool, when the pool achieves the max size, it will * skip the remaining chunks * @param chunks */ void putbackChunks(BlockingQueue<Chunk> chunks) { int maxNumToPutback = this.maxCount - reclaimedChunks.size(); if (maxNumToPutback <= 0) { return; } chunks.drainTo(reclaimedChunks, maxNumToPutback); }
/** * Add the chunk to the pool, if the pool has achieved the max size, it will * skip it * @param chunk */ void putbackChunk(Chunk chunk) { if (reclaimedChunks.size() >= this.maxCount) { return; } reclaimedChunks.add(chunk); }