/** * Wraps the stream in a CryptoOutputStream if the underlying file is * encrypted. */ public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos, FileSystem.Statistics statistics, long startPos) throws IOException { final FileEncryptionInfo feInfo = dfsos.getFileEncryptionInfo(); if (feInfo != null) { // File is encrypted, wrap the stream in a crypto stream. // Currently only one version, so no special logic based on the version # getCryptoProtocolVersion(feInfo); final CryptoCodec codec = getCryptoCodec(conf, feInfo); KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo); final CryptoOutputStream cryptoOut = new CryptoOutputStream(dfsos, codec, decrypted.getMaterial(), feInfo.getIV(), startPos); return new HdfsDataOutputStream(cryptoOut, statistics, startPos); } else { // No FileEncryptionInfo present so no encryption. return new HdfsDataOutputStream(dfsos, statistics, startPos); } }
@Override void prepare() throws Exception { final Path filePath = new Path(file); DFSTestUtil.createFile(dfs, filePath, BlockSize, DataNodes, 0); // append to the file and leave the last block under construction out = this.client.append(file, BlockSize, EnumSet.of(CreateFlag.APPEND), null, null); byte[] appendContent = new byte[100]; new Random().nextBytes(appendContent); out.write(appendContent); ((HdfsDataOutputStream) out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); LocatedBlocks blks = dfs.getClient() .getLocatedBlocks(file, BlockSize + 1); assertEquals(1, blks.getLocatedBlocks().size()); nodes = blks.get(0).getLocations(); oldBlock = blks.get(0).getBlock(); LocatedBlock newLbk = client.getNamenode().updateBlockForPipeline( oldBlock, client.getClientName()); newBlock = new ExtendedBlock(oldBlock.getBlockPoolId(), oldBlock.getBlockId(), oldBlock.getNumBytes(), newLbk.getBlock().getGenerationStamp()); }
@Test public void testLease() throws Exception { try { NameNodeAdapter.setLeasePeriod(fsn, 100, 200); final Path foo = new Path(dir, "foo"); final Path bar = new Path(foo, "bar"); DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPLICATION, 0); HdfsDataOutputStream out = appendFileWithoutClosing(bar, 100); out.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); SnapshotTestHelper.createSnapshot(hdfs, dir, "s0"); hdfs.delete(foo, true); Thread.sleep(1000); try { fsn.writeLock(); NameNodeAdapter.getLeaseManager(fsn).runLeaseChecks(); } finally { fsn.writeUnlock(); } } finally { NameNodeAdapter.setLeasePeriod(fsn, HdfsConstants.LEASE_SOFTLIMIT_PERIOD, HdfsConstants.LEASE_HARDLIMIT_PERIOD); } }
public void hsyncWithSizeUpdate() throws IOException { if (out != null) { if (out instanceof HdfsDataOutputStream) { try { ((HdfsDataOutputStream) out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); } catch (NoSuchMethodError e){ // We are probably working with an older version of hadoop jars which does not have the // hsync function with SyncFlag. Use the hsync version that does not update the size. out.hsync(); } } else { out.hsync(); } } }
@Override void prepare() throws Exception { final Path filePath = new Path(file); DFSTestUtil.createFile(dfs, filePath, BlockSize, DataNodes, 0); // append to the file and leave the last block under construction out = this.client.append(file, BlockSize, null, null); byte[] appendContent = new byte[100]; new Random().nextBytes(appendContent); out.write(appendContent); ((HdfsDataOutputStream) out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); LocatedBlocks blks = dfs.getClient() .getLocatedBlocks(file, BlockSize + 1); assertEquals(1, blks.getLocatedBlocks().size()); nodes = blks.get(0).getLocations(); oldBlock = blks.get(0).getBlock(); LocatedBlock newLbk = client.getNamenode().updateBlockForPipeline( oldBlock, client.getClientName()); newBlock = new ExtendedBlock(oldBlock.getBlockPoolId(), oldBlock.getBlockId(), oldBlock.getNumBytes(), newLbk.getBlock().getGenerationStamp()); }
@Test public void testNoLogEntryBeforeClosing() throws Exception { Configuration conf = new HdfsConfiguration(); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(2) .build(); try { DistributedFileSystem dfs = cluster.getFileSystem(); Path projects = new Path("/projects"); Path project = new Path(projects, "project"); final Path dataset = new Path(project, "dataset"); Path file = new Path(dataset, "file"); dfs.mkdirs(dataset, FsPermission.getDefault()); dfs.setMetaEnabled(dataset, true); HdfsDataOutputStream out = TestFileCreation.create(dfs, file, 1); assertFalse(checkLog(TestUtil.getINodeId(cluster.getNameNode(), file), MetadataLogEntry.Operation.ADD)); out.close(); assertTrue(checkLog(TestUtil.getINodeId(cluster.getNameNode(), file), MetadataLogEntry.Operation.ADD)); } finally { if (cluster != null) { cluster.shutdown(); } } }
OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, String dumpFilePath) { this.fos = fos; this.latestAttr = latestAttr; // We use the ReverseComparatorOnMin as the comparator of the map. In this // way, we first dump the data with larger offset. In the meanwhile, we // retrieve the last element to write back to HDFS. pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>( OffsetRange.ReverseComparatorOnMin); updateLastAccessTime(); activeState = true; asyncStatus = false; dumpOut = null; raf = null; nonSequentialWriteInMemory = new AtomicLong(0); this.dumpFilePath = dumpFilePath; enabledDump = dumpFilePath == null ? false: true; nextOffset = new AtomicLong(); nextOffset.set(latestAttr.getSize()); try { assert(nextOffset.get() == this.fos.getPos()); } catch (IOException e) {} dumpThread = null; }
@Override public void execute(List<TridentTuple> tuples) throws IOException { boolean rotated = false; synchronized (this.writeLock) { for (TridentTuple tuple : tuples) { byte[] bytes = this.format.format(tuple); out.write(bytes); this.offset += bytes.length; if (this.rotationPolicy.mark(tuple, this.offset)) { rotateOutputFile(); this.offset = 0; this.rotationPolicy.reset(); rotated = true; } } if (!rotated) { if (this.out instanceof HdfsDataOutputStream) { ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); } else { this.out.hsync(); } } } }
OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, String dumpFilePath, DFSClient client, IdMappingServiceProvider iug, boolean aixCompatMode, NfsConfiguration config) { this.fos = fos; this.latestAttr = latestAttr; this.aixCompatMode = aixCompatMode; // We use the ReverseComparatorOnMin as the comparator of the map. In this // way, we first dump the data with larger offset. In the meanwhile, we // retrieve the last element to write back to HDFS. pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>( OffsetRange.ReverseComparatorOnMin); pendingCommits = new ConcurrentSkipListMap<Long, CommitCtx>(); updateLastAccessTime(); activeState = true; asyncStatus = false; asyncWriteBackStartOffset = 0; dumpOut = null; raf = null; nonSequentialWriteInMemory = new AtomicLong(0); this.dumpFilePath = dumpFilePath; enabledDump = dumpFilePath != null; nextOffset = new AtomicLong(); nextOffset.set(latestAttr.getSize()); try { assert(nextOffset.get() == this.fos.getPos()); } catch (IOException e) {} dumpThread = null; this.client = client; this.iug = iug; this.uploadLargeFile = config.getBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, NfsConfigKeys.LARGE_FILE_UPLOAD_DEFAULT); }
public void writeData(HdfsDataOutputStream fos) throws IOException { Preconditions.checkState(fos != null); ByteBuffer dataBuffer; try { dataBuffer = getData(); } catch (Exception e1) { LOG.error("Failed to get request data offset:" + offset + " count:" + count + " error:" + e1); throw new IOException("Can't get WriteCtx.data"); } byte[] data = dataBuffer.array(); int position = dataBuffer.position(); int limit = dataBuffer.limit(); Preconditions.checkState(limit - position == count); // Modified write has a valid original count if (position != 0) { if (limit != getOriginalCount()) { throw new IOException("Modified write has differnt original size." + "buff position:" + position + " buff limit:" + limit + ". " + toString()); } } // Now write data fos.write(data, position, count); }
@Test public void testCheckCommitAixCompatMode() throws IOException { DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); NfsConfiguration conf = new NfsConfiguration(); conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false); // Enable AIX compatibility mode. OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(new NfsConfiguration()), true, conf); // Test fall-through to pendingWrites check in the event that commitOffset // is greater than the number of bytes we've so far flushed. Mockito.when(fos.getPos()).thenReturn((long) 2); COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_FINISHED); // Test the case when we actually have received more bytes than we're trying // to commit. ctx.getPendingWritesForTest().put(new OffsetRange(0, 10), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); Mockito.when(fos.getPos()).thenReturn((long) 10); ctx.setNextOffsetForTest((long)10); status = ctx.checkCommitInternal(5, null, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); }
@Test public void testCheckSequential() throws IOException { DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); Mockito.when(fos.getPos()).thenReturn((long) 0); NfsConfiguration config = new NfsConfiguration(); config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false); OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, new ShellBasedIdMapping(config), false, config); ctx.getPendingWritesForTest().put(new OffsetRange(5, 10), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ctx.getPendingWritesForTest().put(new OffsetRange(10, 15), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ctx.getPendingWritesForTest().put(new OffsetRange(20, 25), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); assertTrue(!ctx.checkSequential(5, 4)); assertTrue(ctx.checkSequential(9, 5)); assertTrue(ctx.checkSequential(10, 5)); assertTrue(ctx.checkSequential(14, 5)); assertTrue(!ctx.checkSequential(15, 5)); assertTrue(!ctx.checkSequential(20, 5)); assertTrue(!ctx.checkSequential(25, 5)); assertTrue(!ctx.checkSequential(999, 5)); }
@Override public HdfsDataOutputStream createInternal(Path f, EnumSet<CreateFlag> createFlag, FsPermission absolutePermission, int bufferSize, short replication, long blockSize, Progressable progress, ChecksumOpt checksumOpt, boolean createParent) throws IOException { final DFSOutputStream dfsos = dfs.primitiveCreate(getUriPath(f), absolutePermission, createFlag, createParent, replication, blockSize, progress, bufferSize, checksumOpt); return dfs.createWrappedOutputStream(dfsos, statistics, dfsos.getInitialLen()); }
@Override protected HdfsDataOutputStream primitiveCreate(Path f, FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize, short replication, long blockSize, Progressable progress, ChecksumOpt checksumOpt) throws IOException { statistics.incrementWriteOps(1); final DFSOutputStream dfsos = dfs.primitiveCreate( getPathName(fixRelativePart(f)), absolutePermission, flag, true, replication, blockSize, progress, bufferSize, checksumOpt); return dfs.createWrappedOutputStream(dfsos, statistics); }
SlowWriter(DistributedFileSystem fs, Path filepath, final long sleepms ) throws IOException { super(SlowWriter.class.getSimpleName() + ":" + filepath); this.filepath = filepath; this.out = (HdfsDataOutputStream)fs.create(filepath, REPLICATION); this.sleepms = sleepms; }
/** * Test if the quota can be correctly updated when file length is updated * through fsync */ @Test (timeout=60000) public void testUpdateQuotaForFSync() throws Exception { final Path foo = new Path("/foo"); final Path bar = new Path(foo, "bar"); DFSTestUtil.createFile(dfs, bar, BLOCKSIZE, REPLICATION, 0L); dfs.setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1); FSDataOutputStream out = dfs.append(bar); out.write(new byte[BLOCKSIZE / 4]); ((DFSOutputStream) out.getWrappedStream()).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); INodeDirectory fooNode = fsdir.getINode4Write(foo.toString()).asDirectory(); QuotaCounts quota = fooNode.getDirectoryWithQuotaFeature() .getSpaceConsumed(); long ns = quota.getNameSpace(); long ds = quota.getStorageSpace(); assertEquals(2, ns); // foo and bar assertEquals(BLOCKSIZE * 2 * REPLICATION, ds); // file is under construction out.write(new byte[BLOCKSIZE / 4]); out.close(); fooNode = fsdir.getINode4Write(foo.toString()).asDirectory(); quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed(); ns = quota.getNameSpace(); ds = quota.getStorageSpace(); assertEquals(2, ns); assertEquals((BLOCKSIZE + BLOCKSIZE / 2) * REPLICATION, ds); // append another block DFSTestUtil.appendFile(dfs, bar, BLOCKSIZE); quota = fooNode.getDirectoryWithQuotaFeature().getSpaceConsumed(); ns = quota.getNameSpace(); ds = quota.getStorageSpace(); assertEquals(2, ns); // foo and bar assertEquals((BLOCKSIZE * 2 + BLOCKSIZE / 2) * REPLICATION, ds); }
/** Append a file without closing the output stream */ private HdfsDataOutputStream appendFileWithoutClosing(Path file, int length) throws IOException { byte[] toAppend = new byte[length]; Random random = new Random(); random.nextBytes(toAppend); HdfsDataOutputStream out = (HdfsDataOutputStream) hdfs.append(file); out.write(toAppend); return out; }
/** * Test the fsimage loading while there is file under construction. */ @Test (timeout=60000) public void testLoadImageWithAppending() throws Exception { Path sub1 = new Path(dir, "sub1"); Path sub1file1 = new Path(sub1, "sub1file1"); Path sub1file2 = new Path(sub1, "sub1file2"); DFSTestUtil.createFile(hdfs, sub1file1, BLOCKSIZE, REPLICATION, seed); DFSTestUtil.createFile(hdfs, sub1file2, BLOCKSIZE, REPLICATION, seed); hdfs.allowSnapshot(dir); hdfs.createSnapshot(dir, "s0"); HdfsDataOutputStream out = appendFileWithoutClosing(sub1file1, BLOCKSIZE); out.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); // save namespace and restart cluster hdfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); hdfs.saveNamespace(); hdfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); cluster.shutdown(); cluster = new MiniDFSCluster.Builder(conf).format(false) .numDataNodes(REPLICATION).build(); cluster.waitActive(); fsn = cluster.getNamesystem(); hdfs = cluster.getFileSystem(); }
private HdfsDataOutputStream appendFileWithoutClosing(Path file, int length) throws IOException { byte[] toAppend = new byte[length]; Random random = new Random(); random.nextBytes(toAppend); HdfsDataOutputStream out = (HdfsDataOutputStream) hdfs.append(file); out.write(toAppend); return out; }
@Override void modify() throws Exception { assertTrue(fs.exists(file)); byte[] toAppend = new byte[appendLen]; random.nextBytes(toAppend); out = (HdfsDataOutputStream)fs.append(file); out.write(toAppend); out.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); }
public void execute(Tuple tuple) { try { byte[] bytes = this.format.format(tuple); synchronized (this.writeLock) { out.write(bytes); this.offset += bytes.length; if (this.syncPolicy.mark(tuple, this.offset)) { if (this.out instanceof HdfsDataOutputStream) { ((HdfsDataOutputStream) this.out).hsync(EnumSet .of(SyncFlag.UPDATE_LENGTH)); } else { this.out.hsync(); } this.syncPolicy.reset(); } } this.collector.ack(tuple); if (this.rotationPolicy.mark(tuple, this.offset)) { rotateOutputFile(); // synchronized this.offset = 0; this.rotationPolicy.reset(); } } catch (IOException e) { LOG.warn("write/sync failed.", e); this.collector.fail(tuple); } }
/** * This method gets the datanode replication count for the current WAL. * * If the pipeline isn't started yet or is empty, you will get the default * replication factor. Therefore, if this function returns 0, it means you * are not properly running with the HDFS-826 patch. * @throws InvocationTargetException * @throws IllegalAccessException * @throws IllegalArgumentException * * @throws Exception */ @VisibleForTesting int getLogReplication() { try { //in standalone mode, it will return 0 if (this.hdfs_out instanceof HdfsDataOutputStream) { return ((HdfsDataOutputStream) this.hdfs_out).getCurrentBlockReplication(); } } catch (IOException e) { LOG.info("", e); } return 0; }
OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, String dumpFilePath, DFSClient client, IdMappingServiceProvider iug, boolean aixCompatMode, NfsConfiguration config) { this.fos = fos; this.latestAttr = latestAttr; this.aixCompatMode = aixCompatMode; // We use the ReverseComparatorOnMin as the comparator of the map. In this // way, we first dump the data with larger offset. In the meanwhile, we // retrieve the last element to write back to HDFS. pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>( OffsetRange.ReverseComparatorOnMin); pendingCommits = new ConcurrentSkipListMap<Long, CommitCtx>(); updateLastAccessTime(); activeState = true; asyncStatus = false; asyncWriteBackStartOffset = 0; dumpOut = null; raf = null; nonSequentialWriteInMemory = new AtomicLong(0); this.dumpFilePath = dumpFilePath; enabledDump = dumpFilePath != null; nextOffset = new AtomicLong(); nextOffset.set(latestAttr.getSize()); assert(nextOffset.get() == this.fos.getPos()); dumpThread = null; this.client = client; this.iug = iug; this.uploadLargeFile = config.getBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, NfsConfigKeys.LARGE_FILE_UPLOAD_DEFAULT); }
public void writeData(HdfsDataOutputStream fos) throws IOException { Preconditions.checkState(fos != null); ByteBuffer dataBuffer; try { dataBuffer = getData(); } catch (Exception e1) { LOG.error("Failed to get request data offset:" + getPlainOffset() + " " + "count:" + count + " error:" + e1); throw new IOException("Can't get WriteCtx.data"); } byte[] data = dataBuffer.array(); int position = dataBuffer.position(); int limit = dataBuffer.limit(); Preconditions.checkState(limit - position == count); // Modified write has a valid original count if (position != 0) { if (limit != getOriginalCount()) { throw new IOException("Modified write has differnt original size." + "buff position:" + position + " buff limit:" + limit + ". " + toString()); } } // Now write data fos.write(data, position, count); }