Param<?,?>[] getAuthParameters(final HttpOpParam.Op op) throws IOException { List<Param<?,?>> authParams = Lists.newArrayList(); // Skip adding delegation token for token operations because these // operations require authentication. Token<?> token = null; if (!op.getRequireAuth()) { token = getDelegationToken(); } if (token != null) { authParams.add(new DelegationParam(token.encodeToUrlString())); } else { UserGroupInformation userUgi = ugi; UserGroupInformation realUgi = userUgi.getRealUser(); if (realUgi != null) { // proxy user authParams.add(new DoAsParam(userUgi.getShortUserName())); userUgi = realUgi; } authParams.add(new UserParam(userUgi.getShortUserName())); } return authParams.toArray(new Param<?,?>[0]); }
@Override public Map<String, byte[]> getXAttrs(Path p, final List<String> names) throws IOException { Preconditions.checkArgument(names != null && !names.isEmpty(), "XAttr names cannot be null or empty."); Param<?,?>[] parameters = new Param<?,?>[names.size() + 1]; for (int i = 0; i < parameters.length - 1; i++) { parameters[i] = new XAttrNameParam(names.get(i)); } parameters[parameters.length - 1] = new XAttrEncodingParam(XAttrCodec.HEX); final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS; return new FsPathResponseRunner<Map<String, byte[]>>(op, parameters, p) { @Override Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException { return JsonUtil.toXAttrs(json); } }.run(); }
@Override public FileStatus[] listStatus(final Path f) throws IOException { statistics.incrementReadOps(1); final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS; return new FsPathResponseRunner<FileStatus[]>(op, f) { @Override FileStatus[] decodeResponse(Map<?,?> json) { final Map<?, ?> rootmap = (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es"); final List<?> array = JsonUtil.getList( rootmap, FileStatus.class.getSimpleName()); //convert FileStatus final FileStatus[] statuses = new FileStatus[array.size()]; int i = 0; for (Object object : array) { final Map<?, ?> m = (Map<?, ?>) object; statuses[i++] = makeQualified(JsonUtil.toFileStatus(m, false), f); } return statuses; } }.run(); }
@Override public Token<DelegationTokenIdentifier> getDelegationToken( final String renewer) throws IOException { final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN; Token<DelegationTokenIdentifier> token = new FsPathResponseRunner<Token<DelegationTokenIdentifier>>( op, null, new RenewerParam(renewer)) { @Override Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json) throws IOException { return JsonUtil.toDelegationToken(json); } }.run(); if (token != null) { token.setService(tokenServiceName); } else { if (disallowFallbackToInsecureCluster) { throw new AccessControlException(CANT_FALLBACK_TO_INSECURE_MSG); } } return token; }
@Override public Path getHomeDirectory() { if (cachedHomeDirectory == null) { final HttpOpParam.Op op = GetOpParam.Op.GETHOMEDIRECTORY; try { String pathFromDelegatedFS = new FsPathResponseRunner<String>(op, null, new UserParam(ugi)) { @Override String decodeResponse(Map<?, ?> json) throws IOException { return JsonUtilClient.getPath(json); } } .run(); cachedHomeDirectory = new Path(pathFromDelegatedFS).makeQualified( this.getUri(), null); } catch (IOException e) { LOG.error("Unable to get HomeDirectory from original File System", e); cachedHomeDirectory = new Path("/user/" + ugi.getShortUserName()) .makeQualified(this.getUri(), null); } } return cachedHomeDirectory; }
@Override public Map<String, byte[]> getXAttrs(Path p, final List<String> names) throws IOException { Preconditions.checkArgument(names != null && !names.isEmpty(), "XAttr names cannot be null or empty."); Param<?,?>[] parameters = new Param<?,?>[names.size() + 1]; for (int i = 0; i < parameters.length - 1; i++) { parameters[i] = new XAttrNameParam(names.get(i)); } parameters[parameters.length - 1] = new XAttrEncodingParam(XAttrCodec.HEX); final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS; return new FsPathResponseRunner<Map<String, byte[]>>(op, parameters, p) { @Override Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException { return JsonUtilClient.toXAttrs(json); } }.run(); }
@Override public FSDataOutputStream createNonRecursive(final Path f, final FsPermission permission, final EnumSet<CreateFlag> flag, final int bufferSize, final short replication, final long blockSize, final Progressable progress) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.CREATE; return new FsPathOutputStreamRunner(op, f, bufferSize, new PermissionParam(applyUMask(permission)), new CreateFlagParam(flag), new CreateParentParam(false), new BufferSizeParam(bufferSize), new ReplicationParam(replication), new BlockSizeParam(blockSize) ).run(); }
@Override public Token<DelegationTokenIdentifier> getDelegationToken( final String renewer) throws IOException { final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN; Token<DelegationTokenIdentifier> token = new FsPathResponseRunner<Token<DelegationTokenIdentifier>>( op, null, new RenewerParam(renewer)) { @Override Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json) throws IOException { return JsonUtilClient.toDelegationToken(json); } }.run(); if (token != null) { token.setService(tokenServiceName); } else { if (disallowFallbackToInsecureCluster) { throw new AccessControlException(CANT_FALLBACK_TO_INSECURE_MSG); } } return token; }
URL toUrl(final HttpOpParam.Op op, final Path fspath, final Param<?,?>... parameters) throws IOException { //initialize URI path and query final String path = PATH_PREFIX + (fspath == null? "/": makeQualified(fspath).toUri().getRawPath()); final String query = op.toQueryString() + Param.toSortedString("&", getAuthParameters(op)) + Param.toSortedString("&", parameters); final URL url = getNamenodeURL(path, query); if (LOG.isTraceEnabled()) { LOG.trace("url=" + url); } return url; }
private HttpURLConnection connect(final HttpOpParam.Op op, final URL url) throws IOException { final HttpURLConnection conn = (HttpURLConnection)connectionFactory.openConnection(url); final boolean doOutput = op.getDoOutput(); conn.setRequestMethod(op.getType().toString()); conn.setInstanceFollowRedirects(false); switch (op.getType()) { // if not sending a message body for a POST or PUT operation, need // to ensure the server/proxy knows this case POST: case PUT: { conn.setDoOutput(true); if (!doOutput) { // explicitly setting content-length to 0 won't do spnego!! // opening and closing the stream will send "Content-Length: 0" conn.getOutputStream().close(); } else { conn.setRequestProperty("Content-Type", MediaType.APPLICATION_OCTET_STREAM); conn.setChunkedStreamingMode(32 << 10); //32kB-chunk } break; } default: { conn.setDoOutput(doOutput); break; } } conn.connect(); return conn; }
private HdfsFileStatus getHdfsFileStatus(Path f) throws IOException { final HttpOpParam.Op op = GetOpParam.Op.GETFILESTATUS; HdfsFileStatus status = new FsPathResponseRunner<HdfsFileStatus>(op, f) { @Override HdfsFileStatus decodeResponse(Map<?,?> json) { return JsonUtil.toFileStatus(json, true); } }.run(); if (status == null) { throw new FileNotFoundException("File does not exist: " + f); } return status; }
@Override public AclStatus getAclStatus(Path f) throws IOException { final HttpOpParam.Op op = GetOpParam.Op.GETACLSTATUS; AclStatus status = new FsPathResponseRunner<AclStatus>(op, f) { @Override AclStatus decodeResponse(Map<?,?> json) { return JsonUtil.toAclStatus(json); } }.run(); if (status == null) { throw new FileNotFoundException("File does not exist: " + f); } return status; }
@Override public boolean mkdirs(Path f, FsPermission permission) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.MKDIRS; return new FsPathBooleanRunner(op, f, new PermissionParam(applyUMask(permission)) ).run(); }
/** * Create a symlink pointing to the destination path. * @see org.apache.hadoop.fs.Hdfs#createSymlink(Path, Path, boolean) */ public void createSymlink(Path destination, Path f, boolean createParent ) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.CREATESYMLINK; new FsPathRunner(op, f, new DestinationParam(makeQualified(destination).toUri().getPath()), new CreateParentParam(createParent) ).run(); }
@Override public boolean rename(final Path src, final Path dst) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.RENAME; return new FsPathBooleanRunner(op, src, new DestinationParam(makeQualified(dst).toUri().getPath()) ).run(); }
@SuppressWarnings("deprecation") @Override public void rename(final Path src, final Path dst, final Options.Rename... options) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.RENAME; new FsPathRunner(op, src, new DestinationParam(makeQualified(dst).toUri().getPath()), new RenameOptionSetParam(options) ).run(); }
@Override public void setXAttr(Path p, String name, byte[] value, EnumSet<XAttrSetFlag> flag) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.SETXATTR; if (value != null) { new FsPathRunner(op, p, new XAttrNameParam(name), new XAttrValueParam( XAttrCodec.encodeValue(value, XAttrCodec.HEX)), new XAttrSetFlagParam(flag)).run(); } else { new FsPathRunner(op, p, new XAttrNameParam(name), new XAttrSetFlagParam(flag)).run(); } }
@Override public byte[] getXAttr(Path p, final String name) throws IOException { final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS; return new FsPathResponseRunner<byte[]>(op, p, new XAttrNameParam(name), new XAttrEncodingParam(XAttrCodec.HEX)) { @Override byte[] decodeResponse(Map<?, ?> json) throws IOException { return JsonUtil.getXAttr(json, name); } }.run(); }
@Override public Map<String, byte[]> getXAttrs(Path p) throws IOException { final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS; return new FsPathResponseRunner<Map<String, byte[]>>(op, p, new XAttrEncodingParam(XAttrCodec.HEX)) { @Override Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException { return JsonUtil.toXAttrs(json); } }.run(); }
@Override public List<String> listXAttrs(Path p) throws IOException { final HttpOpParam.Op op = GetOpParam.Op.LISTXATTRS; return new FsPathResponseRunner<List<String>>(op, p) { @Override List<String> decodeResponse(Map<?, ?> json) throws IOException { return JsonUtil.toXAttrNames(json); } }.run(); }
@Override public void setOwner(final Path p, final String owner, final String group ) throws IOException { if (owner == null && group == null) { throw new IOException("owner == null && group == null"); } statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.SETOWNER; new FsPathRunner(op, p, new OwnerParam(owner), new GroupParam(group) ).run(); }
@Override public void setPermission(final Path p, final FsPermission permission ) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.SETPERMISSION; new FsPathRunner(op, p,new PermissionParam(permission)).run(); }
@Override public void modifyAclEntries(Path path, List<AclEntry> aclSpec) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES; new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run(); }
@Override public void removeAclEntries(Path path, List<AclEntry> aclSpec) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES; new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run(); }
@Override public void setAcl(final Path p, final List<AclEntry> aclSpec) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.SETACL; new FsPathRunner(op, p, new AclPermissionParam(aclSpec)).run(); }
@Override public Path createSnapshot(final Path path, final String snapshotName) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.CREATESNAPSHOT; Path spath = new FsPathResponseRunner<Path>(op, path, new SnapshotNameParam(snapshotName)) { @Override Path decodeResponse(Map<?,?> json) { return new Path((String) json.get(Path.class.getSimpleName())); } }.run(); return spath; }
@Override public void deleteSnapshot(final Path path, final String snapshotName) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = DeleteOpParam.Op.DELETESNAPSHOT; new FsPathRunner(op, path, new SnapshotNameParam(snapshotName)).run(); }
@Override public void renameSnapshot(final Path path, final String snapshotOldName, final String snapshotNewName) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.RENAMESNAPSHOT; new FsPathRunner(op, path, new OldSnapshotNameParam(snapshotOldName), new SnapshotNameParam(snapshotNewName)).run(); }
@Override public boolean setReplication(final Path p, final short replication ) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION; return new FsPathBooleanRunner(op, p, new ReplicationParam(replication) ).run(); }
@Override public void setTimes(final Path p, final long mtime, final long atime ) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.SETTIMES; new FsPathRunner(op, p, new ModificationTimeParam(mtime), new AccessTimeParam(atime) ).run(); }
@Override public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication, final long blockSize, final Progressable progress) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.CREATE; return new FsPathOutputStreamRunner(op, f, bufferSize, new PermissionParam(applyUMask(permission)), new OverwriteParam(overwrite), new BufferSizeParam(bufferSize), new ReplicationParam(replication), new BlockSizeParam(blockSize) ).run(); }
@Override public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PostOpParam.Op.APPEND; return new FsPathOutputStreamRunner(op, f, bufferSize, new BufferSizeParam(bufferSize) ).run(); }
@Override public boolean truncate(Path f, long newLength) throws IOException { statistics.incrementWriteOps(1); final HttpOpParam.Op op = PostOpParam.Op.TRUNCATE; return new FsPathBooleanRunner(op, f, new NewLengthParam(newLength)).run(); }
@Override public boolean delete(Path f, boolean recursive) throws IOException { final HttpOpParam.Op op = DeleteOpParam.Op.DELETE; return new FsPathBooleanRunner(op, f, new RecursiveParam(recursive) ).run(); }
@Override public FSDataInputStream open(final Path f, final int buffersize ) throws IOException { statistics.incrementReadOps(1); final HttpOpParam.Op op = GetOpParam.Op.OPEN; // use a runner so the open can recover from an invalid token FsPathConnectionRunner runner = new FsPathConnectionRunner(op, f, new BufferSizeParam(buffersize)); return new FSDataInputStream(new OffsetUrlInputStream( new UnresolvedUrlOpener(runner), new OffsetUrlOpener(null))); }
/** Setup offset url and connect. */ @Override protected HttpURLConnection connect(final long offset, final boolean resolved) throws IOException { final URL offsetUrl = offset == 0L? url : new URL(url + "&" + new OffsetParam(offset)); return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run(); }