/** * Do the cleanup if a current block. * * @throws IOException */ private void finishBlock() throws IOException { if (this.out == null) return; long startTimeNs = System.nanoTime(); int size = releaseCompressingStream(this.out); this.out = null; blockKeys.add(firstKeyInBlock); blockOffsets.add(Long.valueOf(blockBegin)); blockDataSizes.add(Integer.valueOf(size)); this.totalUncompressedBytes += size; HFile.offerWriteLatency(System.nanoTime() - startTimeNs); if (cacheConf.shouldCacheDataOnWrite()) { baosDos.flush(); // we do not do data block encoding on disk for HFile v1 byte[] bytes = baos.toByteArray(); HFileBlock block = new HFileBlock(BlockType.DATA, (int) (outputStream.getPos() - blockBegin), bytes.length, -1, ByteBuffer.wrap(bytes, 0, bytes.length), HFileBlock.FILL_HEADER, blockBegin, MemStore.NO_PERSISTENT_TS, HFileBlock.MINOR_VERSION_NO_CHECKSUM, // minor version 0, // bytesPerChecksum ChecksumType.NULL.getCode(), // checksum type (int) (outputStream.getPos() - blockBegin) + HFileBlock.HEADER_SIZE_NO_CHECKSUM); // onDiskDataSizeWithHeader block = blockEncoder.diskToCacheFormat(block, false); passSchemaMetricsTo(block); cacheConf.getBlockCache().cacheBlock( new BlockCacheKey(name, blockBegin, DataBlockEncoding.NONE, block.getBlockType()), block); baosDos.close(); } blockNumber++; }
@Override protected void setup(Context context) throws IOException, InterruptedException { this.conf = context.getConfiguration(); this.fs = FileSystem.get(conf); // the MOB_COMPACTION_DELAY is ONE_DAY by default. Its value is only changed when testing. mobCompactionDelay = conf.getLong(SweepJob.MOB_COMPACTION_DELAY, SweepJob.ONE_DAY); String tableName = conf.get(TableInputFormat.INPUT_TABLE); String familyName = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY); this.familyDir = MobUtils.getMobFamilyPath(conf, TableName.valueOf(tableName), familyName); HBaseAdmin admin = new HBaseAdmin(this.conf); try { family = admin.getTableDescriptor(Bytes.toBytes(tableName)).getFamily( Bytes.toBytes(familyName)); } finally { try { admin.close(); } catch (IOException e) { LOG.warn("Fail to close the HBaseAdmin", e); } } // disable the block cache. Configuration copyOfConf = new Configuration(conf); copyOfConf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.00001f); this.cacheConfig = new CacheConfig(copyOfConf); table = new HTable(this.conf, Bytes.toBytes(tableName)); table.setAutoFlush(false, false); table.setWriteBufferSize(1 * 1024 * 1024); // 1MB memstore = new MemStoreWrapper(context, fs, table, family, new MemStore(), cacheConfig); // The start time of the sweep tool. // Only the mob files whose creation time is older than startTime-oneDay will be handled by the // reducer since it brings inconsistency to handle the latest mob files. this.compactionBegin = conf.getLong(MobConstants.MOB_COMPACTION_START_DATE, 0); }
/** * Do the cleanup if a current block. * * @throws IOException */ private void finishBlock() throws IOException { if (this.out == null) return; long startTimeNs = System.nanoTime(); int size = releaseCompressingStream(this.out); this.out = null; blockKeys.add(firstKeyInBlock); blockOffsets.add(Long.valueOf(blockBegin)); blockDataSizes.add(Integer.valueOf(size)); this.totalUncompressedBytes += size; HFile.offerWriteLatency(System.nanoTime() - startTimeNs); if (cacheConf.shouldCacheDataOnWrite()) { baosDos.flush(); // we do not do data block encoding on disk for HFile v1 byte[] bytes = baos.toByteArray(); HFileBlock block = new HFileBlock(BlockType.DATA, (int) (outputStream.getPos() - blockBegin), bytes.length, -1, ByteBuffer.wrap(bytes, 0, bytes.length), HFileBlock.FILL_HEADER, blockBegin, MemStore.NO_PERSISTENT_TS, HFileBlock.MINOR_VERSION_NO_CHECKSUM, // minor version 0, // bytesPerChecksum ChecksumType.NULL.getCode(), // checksum type (int) (outputStream.getPos() - blockBegin) + HFileBlock.HEADER_SIZE_NO_CHECKSUM); // onDiskDataSizeWithHeader block = blockEncoder.diskToCacheFormat(block, false); cacheConf.getBlockCache().cacheBlock( new BlockCacheKey(name, blockBegin, DataBlockEncoding.NONE, block.getBlockType()), block); baosDos.close(); } }
/** * Read a version 1 block. There is no uncompressed header, and the block type (the magic * record) is part of the compressed data. This implementation assumes that the bounded range * file input stream is needed to stop the decompressor reading into next block, because the * decompressor just grabs a bunch of data without regard to whether it is coming to end of the * compressed section. The block returned is still a version 2 block, and in particular, its * first {@link #HEADER_SIZE_WITH_CHECKSUMS} bytes contain a valid version 2 header. * @param offset the offset of the block to read in the file * @param onDiskSizeWithMagic the on-disk size of the version 1 block, including the magic * record, which is the part of compressed data if using compression * @param uncompressedSizeWithMagic uncompressed size of the version 1 block, including the * magic record */ @Override public HFileBlock readBlockData(long offset, long onDiskSizeWithMagic, int uncompressedSizeWithMagic, boolean pread) throws IOException { if (uncompressedSizeWithMagic <= 0) { throw new IOException("Invalid uncompressedSize=" + uncompressedSizeWithMagic + " for a version 1 block"); } if (onDiskSizeWithMagic <= 0 || onDiskSizeWithMagic >= Integer.MAX_VALUE) { throw new IOException("Invalid onDiskSize=" + onDiskSizeWithMagic + " (maximum allowed: " + Integer.MAX_VALUE + ")"); } int onDiskSize = (int) onDiskSizeWithMagic; if (uncompressedSizeWithMagic < MAGIC_LENGTH) { throw new IOException("Uncompressed size for a version 1 block is " + uncompressedSizeWithMagic + " but must be at least " + MAGIC_LENGTH); } // The existing size already includes magic size, and we are inserting // a version 2 header. ByteBuffer buf = ByteBuffer.allocate(uncompressedSizeWithMagic + HEADER_DELTA); int onDiskSizeWithoutHeader; if (compressAlgo == Compression.Algorithm.NONE) { // A special case when there is no compression. if (onDiskSize != uncompressedSizeWithMagic) { throw new IOException("onDiskSize=" + onDiskSize + " and uncompressedSize=" + uncompressedSizeWithMagic + " must be equal for version 1 with no compression"); } // The first MAGIC_LENGTH bytes of what this will read will be // overwritten. readAtOffset(istream, buf.array(), buf.arrayOffset() + HEADER_DELTA, onDiskSize, false, offset, pread); onDiskSizeWithoutHeader = uncompressedSizeWithMagic - MAGIC_LENGTH; } else { InputStream bufferedBoundedStream = createBufferedBoundedStream(offset, onDiskSize, pread); decompress(buf.array(), buf.arrayOffset() + HEADER_DELTA, bufferedBoundedStream, uncompressedSizeWithMagic); // We don't really have a good way to exclude the "magic record" size // from the compressed block's size, since it is compressed as well. onDiskSizeWithoutHeader = onDiskSize; } BlockType newBlockType = BlockType.parse(buf.array(), buf.arrayOffset() + HEADER_DELTA, MAGIC_LENGTH); // We set the uncompressed size of the new HFile block we are creating // to the size of the data portion of the block without the magic record, // since the magic record gets moved to the header. HFileBlock b = new HFileBlock(newBlockType, onDiskSizeWithoutHeader, uncompressedSizeWithMagic - MAGIC_LENGTH, -1L, buf, FILL_HEADER, offset, MemStore.NO_PERSISTENT_TS, 0, 0, ChecksumType.NULL.getCode(), onDiskSizeWithoutHeader + HEADER_SIZE_NO_CHECKSUM); return b; }