protected QJournalProtocol createProxy() throws IOException { final Configuration confCopy = new Configuration(conf); // Need to set NODELAY or else batches larger than MTU can trigger // 40ms nagling delays. confCopy.setBoolean( CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY, true); RPC.setProtocolEngine(confCopy, QJournalProtocolPB.class, ProtobufRpcEngine.class); return SecurityUtil.doAsLoginUser( new PrivilegedExceptionAction<QJournalProtocol>() { @Override public QJournalProtocol run() throws IOException { RPC.setProtocolEngine(confCopy, QJournalProtocolPB.class, ProtobufRpcEngine.class); QJournalProtocolPB pbproxy = RPC.getProxy( QJournalProtocolPB.class, RPC.getProtocolVersion(QJournalProtocolPB.class), addr, confCopy); return new QJournalProtocolTranslatorPB(pbproxy); } }); }
JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException { this.jn = jn; Configuration confCopy = new Configuration(conf); // Ensure that nagling doesn't kick in, which could cause latency issues. confCopy.setBoolean( CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY, true); InetSocketAddress addr = getAddress(confCopy); RPC.setProtocolEngine(confCopy, QJournalProtocolPB.class, ProtobufRpcEngine.class); QJournalProtocolServerSideTranslatorPB translator = new QJournalProtocolServerSideTranslatorPB(this); BlockingService service = QJournalProtocolService .newReflectiveBlockingService(translator); this.server = new RPC.Builder(confCopy) .setProtocol(QJournalProtocolPB.class) .setInstance(service) .setBindAddress(addr.getHostName()) .setPort(addr.getPort()) .setNumHandlers(HANDLER_COUNT) .setVerbose(false) .build(); // set service-level authorization security policy if (confCopy.getBoolean( CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) { server.refreshServiceAcl(confCopy, new HDFSPolicyProvider()); } }
JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException { this.jn = jn; Configuration confCopy = new Configuration(conf); // Ensure that nagling doesn't kick in, which could cause latency issues. confCopy.setBoolean( CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY, true); InetSocketAddress addr = getAddress(confCopy); RPC.setProtocolEngine(confCopy, QJournalProtocolPB.class, ProtobufRpcEngine.class); QJournalProtocolServerSideTranslatorPB translator = new QJournalProtocolServerSideTranslatorPB(this); BlockingService service = QJournalProtocolService .newReflectiveBlockingService(translator); this.server = new RPC.Builder(confCopy) .setProtocol(QJournalProtocolPB.class) .setInstance(service) .setBindAddress(addr.getHostName()) .setPort(addr.getPort()) .setNumHandlers(HANDLER_COUNT) .setVerbose(false) .build(); // set service-level authorization security policy if (confCopy.getBoolean( CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) { server.refreshServiceAcl(confCopy, new HDFSPolicyProvider()); } this.server.setTracer(jn.tracer); }