/** * Performs memstore flush, writing data from scanner into sink. * * @param scanner Scanner to get data from. * @param sink Sink to write data to. Could be StoreFile.Writer. * @param smallestReadPoint Smallest read point used for the flush. */ protected void performFlush(InternalScanner scanner, Compactor.CellSink sink, long smallestReadPoint) throws IOException { int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); List<Cell> kvs = new ArrayList<Cell>(); boolean hasMore; do { hasMore = scanner.next(kvs, scannerContext); if (!kvs.isEmpty()) { for (Cell c : kvs) { // If we know that this KV is going to be included always, then let us // set its memstoreTS to 0. This will help us save space when writing // to // disk. sink.append(c); } kvs.clear(); } } while (hasMore); }
/** * Performs memstore flush, writing data from scanner into sink. * @param scanner Scanner to get data from. * @param sink Sink to write data to. Could be StoreFile.Writer. * @param smallestReadPoint Smallest read point used for the flush. */ protected void performFlush(InternalScanner scanner, Compactor.CellSink sink, long smallestReadPoint) throws IOException { int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); List<Cell> kvs = new ArrayList<Cell>(); boolean hasMore; do { hasMore = scanner.next(kvs, compactionKVMax); if (!kvs.isEmpty()) { for (Cell c : kvs) { // If we know that this KV is going to be included always, then let us // set its memstoreTS to 0. This will help us save space when writing to // disk. sink.append(c); } kvs.clear(); } } while (hasMore); }
/** * Performs memstore flush, writing data from scanner into sink. * @param scanner Scanner to get data from. * @param sink Sink to write data to. Could be StoreFile.Writer. * @param smallestReadPoint Smallest read point used for the flush. * @return Bytes flushed. */ protected long performFlush(InternalScanner scanner, Compactor.CellSink sink, long smallestReadPoint) throws IOException { int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); List<Cell> kvs = new ArrayList<Cell>(); boolean hasMore; long flushed = 0; do { hasMore = scanner.next(kvs, compactionKVMax); if (!kvs.isEmpty()) { for (Cell c : kvs) { // If we know that this KV is going to be included always, then let us // set its memstoreTS to 0. This will help us save space when writing to // disk. KeyValue kv = KeyValueUtil.ensureKeyValue(c); if (kv.getMvccVersion() <= smallestReadPoint) { // let us not change the original KV. It could be in the memstore // changing its memstoreTS could affect other threads/scanners. kv = kv.shallowCopy(); kv.setMvccVersion(0); } sink.append(kv); flushed += MemStore.heapSizeChange(kv, true); } kvs.clear(); } } while (hasMore); return flushed; }
/** * Performs memstore flush, writing data from scanner into sink. * @param scanner Scanner to get data from. * @param sink Sink to write data to. Could be StoreFile.Writer. * @param smallestReadPoint Smallest read point used for the flush. * @return Bytes flushed. s */ protected long performFlush(InternalScanner scanner, Compactor.CellSink sink, long smallestReadPoint) throws IOException { int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); List<Cell> kvs = new ArrayList<Cell>(); boolean hasMore; long flushed = 0; do { hasMore = scanner.next(kvs, compactionKVMax); if (!kvs.isEmpty()) { for (Cell c : kvs) { // If we know that this KV is going to be included always, then let us // set its memstoreTS to 0. This will help us save space when writing to // disk. KeyValue kv = KeyValueUtil.ensureKeyValue(c); if (kv.getMvccVersion() <= smallestReadPoint) { // let us not change the original KV. It could be in the memstore // changing its memstoreTS could affect other threads/scanners. kv = kv.shallowCopy(); kv.setMvccVersion(0); } sink.append(kv); flushed += MemStore.heapSizeChange(kv, true); } kvs.clear(); } } while (hasMore); return flushed; }
@Test public void testCompactionWithCorruptResult() throws Exception { int nfiles = 10; for (int i = 0; i < nfiles; i++) { createStoreFile(r); } HStore store = (HStore) r.getStore(COLUMN_FAMILY); Collection<StoreFile> storeFiles = store.getStorefiles(); Compactor tool = store.storeEngine.getCompactor(); List<Path> newFiles = tool.compactForTesting(storeFiles, false); // Now lets corrupt the compacted file. FileSystem fs = store.getFileSystem(); // default compaction policy created one and only one new compacted file Path dstPath = store.getRegionFileSystem().createTempName(); FSDataOutputStream stream = fs.create(dstPath, null, true, 512, (short)3, (long)1024, null); stream.writeChars("CORRUPT FILE!!!!"); stream.close(); Path origPath = store.getRegionFileSystem().commitStoreFile( Bytes.toString(COLUMN_FAMILY), dstPath); try { ((HStore)store).moveFileIntoPlace(origPath); } catch (Exception e) { // The complete compaction should fail and the corrupt file should remain // in the 'tmp' directory; assert (fs.exists(origPath)); assert (!fs.exists(dstPath)); System.out.println("testCompactionWithCorruptResult Passed"); return; } fail("testCompactionWithCorruptResult failed since no exception was" + "thrown while completing a corrupt file"); }
/** * @return Compactor to use. */ public Compactor getCompactor() { return this.compactor; }
/** * Performs memstore flush, writing data from scanner into sink. * @param scanner Scanner to get data from. * @param sink Sink to write data to. Could be StoreFile.Writer. * @param smallestReadPoint Smallest read point used for the flush. */ protected void performFlush(InternalScanner scanner, Compactor.CellSink sink, long smallestReadPoint) throws IOException { int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); List<Cell> kvs = new ArrayList<Cell>(); // Shen Li: init nextSplitRow splitKeyIndex = 0; nextSplitRow = store.getRegionInfo().getSplitKey(splitKeyIndex); boolean hasMore; do { hasMore = scanner.next(kvs, compactionKVMax); if (!kvs.isEmpty()) { for (Cell c : kvs) { // If we know that this KV is going to be included always, then let us // set its memstoreTS to 0. This will help us save space when writing to // disk. KeyValue kv = KeyValueUtil.ensureKeyValue(c); if (kv.getMvccVersion() <= smallestReadPoint) { // let us not change the original KV. It could be in the memstore // changing its memstoreTS could affect other threads/scanners. kv = kv.shallowCopy(); kv.setMvccVersion(0); } // Shen Li: TODO check split boundary. use Store, if exceed boundary, // call Store to seal block and reset replica group // // sink is a instance of StoreFile.Writer which has a // HFile.Writer as a member variable // // HFile.Writer has a FSDataOutputStream member variable // which can do seal, and set replica group operations. // if (shouldSeal(kv)) { // the sealCurBlock will flush buffer before seal block sink.sealCurBlock(); sink.setReplicaGroups(getReplicaNamespace(), getReplicaGroups()); } sink.append(kv); } kvs.clear(); } } while (hasMore); }
protected boolean performCompaction(Compactor.FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, CompactionThroughputController throughputController, boolean major) throws IOException { if (LOG.isTraceEnabled()) SpliceLogUtils.trace(LOG,"performCompaction"); long bytesWritten = 0; long bytesWrittenProgress = 0; // Since scanner.next() can return 'false' but still be delivering data, // we have to use a do/while loop. List<Cell> cells =new ArrayList<>(); long closeCheckInterval = HStore.getCloseCheckInterval(); long lastMillis = 0; if (LOG.isDebugEnabled()) { lastMillis = EnvironmentEdgeManager.currentTime(); } long now = 0; boolean hasMore; int compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); do { hasMore = scanner.next(cells, scannerContext); if (LOG.isDebugEnabled()) { now = EnvironmentEdgeManager.currentTime(); } // output to writer: for (Cell c : cells) { if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { CellUtil.setSequenceId(c, 0); } writer.append(c); int len = KeyValueUtil.length(c); ++progress.currentCompactedKVs; progress.totalCompactedSize += len; if (LOG.isDebugEnabled()) { bytesWrittenProgress += len; } // check periodically to see if a system stop is requested if (closeCheckInterval > 0) { bytesWritten += len; if (bytesWritten > closeCheckInterval) { bytesWritten = 0; // if (!store.areWritesEnabled()) { // progress.cancel(); // return false; // } } } } // Log the progress of long running compactions every minute if // logging at DEBUG level if (LOG.isDebugEnabled()) { if ((now - lastMillis) >= 60 * 1000) { LOG.debug("Compaction progress: " + progress + String.format(", rate=%.2f kB/sec", (bytesWrittenProgress / 1024.0) / ((now - lastMillis) / 1000.0))); lastMillis = now; bytesWrittenProgress = 0; } } cells.clear(); } while (hasMore); progress.complete(); return true; }
/** * Performs memstore flush, writing data from scanner into sink. * * @param scanner Scanner to get data from. * @param sink Sink to write data to. Could be StoreFile.Writer. * @param smallestReadPoint Smallest read point used for the flush. */ @Override protected void performFlush(InternalScanner scanner, Compactor.CellSink sink, long smallestReadPoint) throws IOException { super.performFlush(scanner, sink, smallestReadPoint); }