@Override public void start() throws STException { if (server == null) { ThriftServerDef thriftServerDef = new ThriftServerDefBuilder().listen(getServerPort()) .withProcessor(getProcessor()).build(); server = nettyServerConfig == null ? new NettyServerTransport(thriftServerDef) : new NettyServerTransport(thriftServerDef, nettyServerConfig, channelGroup); } server.start(); LOG.debug("Server start ."); }
@Inject public NettyServerTransport(final ThriftServerDef def, final NettyServerConfig nettyServerConfig, final ChannelGroup allChannels, final boolean local) { this.def = def; this.nettyServerConfig = nettyServerConfig; this.requestedPort = def.getServerPort(); this.allChannels = allChannels; this.local = local; // connectionLimiter must be instantiated exactly once (and thus outside // the pipeline factory) final ConnectionLimiter connectionLimiter = new ConnectionLimiter(def.getMaxConnections()); this.channelStatistics = new ChannelStatistics(allChannels); this.pipelineFactory = new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline cp = Channels.pipeline(); TProtocolFactory inputProtocolFactory = def.getDuplexProtocolFactory().getInputProtocolFactory(); NiftySecurityHandlers securityHandlers = def.getSecurityFactory().getSecurityHandlers(def, nettyServerConfig); cp.addLast("connectionContext", new ConnectionContextHandler()); cp.addLast("connectionLimiter", connectionLimiter); cp.addLast(ChannelStatistics.NAME, channelStatistics); cp.addLast("encryptionHandler", securityHandlers.getEncryptionHandler()); cp.addLast("frameCodec", def.getThriftFrameCodecFactory().create(def.getMaxFrameSize(), inputProtocolFactory)); if (def.getClientIdleTimeout() != null) { // Add handlers to detect idle client connections and // disconnect them cp.addLast("idleTimeoutHandler", new IdleStateHandler(nettyServerConfig.getTimer(), def.getClientIdleTimeout().toMillis(), NO_WRITER_IDLE_TIMEOUT, NO_ALL_IDLE_TIMEOUT, TimeUnit.MILLISECONDS)); cp.addLast("idleDisconnectHandler", new IdleDisconnectHandler()); } cp.addLast("authHandler", securityHandlers.getAuthenticationHandler()); cp.addLast("dispatcher", new NiftyDispatcher(def, nettyServerConfig.getTimer())); cp.addLast("exceptionLogger", new NiftyExceptionLogger()); return cp; } }; }
public static void startServer() { // Create the handler //ThriftTestService.Iface serviceInterface = // MyService.Iface serviceInterface = new MyServiceHandler(); // Create the processor //TProcessor processor = new MyService.Processor<>(serviceInterface); // Create the processor //TProcessor processor = new ThriftTestService.Processor<>(new InMemoryScribe()); InMemoryScribe inMemoryScribe = new InMemoryScribeImpl(); TProtocolFactory protocolFactory = new TBinaryProtocol.Factory(); ThriftCodecManager thriftCodecManager = new ThriftCodecManager(); List list = new ArrayList<>(); list.add(inMemoryScribe); ThriftServiceProcessor processor = new ThriftServiceProcessor(thriftCodecManager, Arrays.<ThriftEventHandler>asList(), inMemoryScribe); // Build the server definition ThriftServerDef serverDef = new ThriftServerDefBuilder().withProcessor(processor) .build(); // Create the server transport final NettyServerTransport server = new NettyServerTransport(serverDef ); // Create netty boss and executor thread pools ExecutorService bossExecutor = Executors.newCachedThreadPool(); ExecutorService workerExecutor = Executors.newCachedThreadPool(); // Start the server //server.start(bossExecutor, workerExecutor); server.start(); // Arrange to stop the server at shutdown Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { server.stop(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }); }
public static void main(String[] args) { ThriftServiceProcessor processor = new ThriftServiceProcessor( new ThriftCodecManager(), ImmutableList.<ThriftEventHandler>of(), new ThirdPartyCollectionServiceImpl() ); // Build the server definition ThriftServerDef serverDef = new ThriftServerDefBuilder() .listen(8899) .withProcessor(processor) .build(); // Create the server transport final NettyServerTransport server = new NettyServerTransport(serverDef ); // Create netty boss and executor thread pools ExecutorService bossExecutor = Executors.newCachedThreadPool(); ExecutorService workerExecutor = Executors.newCachedThreadPool(); // Start the server //server.start(bossExecutor, workerExecutor); server.start(); // Arrange to stop the server at shutdown Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { server.stop(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }); /**ThreadPool taskWorkerExecutor = newFixedThreadPool(1); ThriftServerDef serverDef = ThriftServerDef.newBuilder() .listen(8899) .withProcessor(processor) .using(taskWorkerExecutor) .build(); bossExecutor = newCachedThreadPool(); ioWorkerExecutor = newCachedThreadPool(); NettyServerConfig serverConfig = NettyServerConfig.newBuilder() .setBossThreadExecutor(bossExecutor) .setWorkerThreadExecutor(ioWorkerExecutor) .build(); server = new ThriftServer(serverConfig, serverDef); server.start();**/ }
@Inject public ThriftServer(final NiftyProcessor processor, ThriftServerConfig config, @ThriftServerTimer Timer timer, Map<String, ThriftFrameCodecFactory> availableFrameCodecFactories, Map<String, TDuplexProtocolFactory> availableProtocolFactories, @ThriftServerWorkerExecutor Map<String, ExecutorService> availableWorkerExecutors, NiftySecurityFactoryHolder securityFactoryHolder, boolean local) { checkNotNull(availableFrameCodecFactories, "availableFrameCodecFactories cannot be null"); checkNotNull(availableProtocolFactories, "availableProtocolFactories cannot be null"); NiftyProcessorFactory processorFactory = new NiftyProcessorFactory() { @Override public NiftyProcessor getProcessor(TTransport transport) { return processor; } }; String transportName = config.getTransportName(); String protocolName = config.getProtocolName(); checkState(availableFrameCodecFactories.containsKey(transportName), "No available server transport named " + transportName); checkState(availableProtocolFactories.containsKey(protocolName), "No available server protocol named " + protocolName); workerExecutor = config.getOrBuildWorkerExecutor(availableWorkerExecutors); if (local) { log.warn("Using local server"); configuredPort = 0; ioThreads = 0; ioExecutor = null; acceptorThreads = 0; acceptorExecutor = null; serverChannelFactory = new DefaultLocalServerChannelFactory(); } else { configuredPort = config.getPort(); acceptorExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-acceptor-%s").build()); acceptorThreads = config.getAcceptorThreadCount(); ioExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-io-%s").build()); ioThreads = config.getIoThreadCount(); serverChannelFactory = new NioServerSocketChannelFactory(new NioServerBossPool(acceptorExecutor, acceptorThreads, ThreadNameDeterminer.CURRENT), new NioWorkerPool(ioExecutor, ioThreads, ThreadNameDeterminer.CURRENT)); } ThriftServerDef thriftServerDef = ThriftServerDef.newBuilder().name("thrift").listen(configuredPort) .limitFrameSizeTo((int) config.getMaxFrameSize().toBytes()).clientIdleTimeout(config.getIdleConnectionTimeout()) .withProcessorFactory(processorFactory).limitConnectionsTo(config.getConnectionLimit()) .limitQueuedResponsesPerConnection(config.getMaxQueuedResponsesPerConnection()) .thriftFrameCodecFactory(availableFrameCodecFactories.get(transportName)).protocol(availableProtocolFactories.get(protocolName)) .withSecurityFactory(securityFactoryHolder.niftySecurityFactory).using(workerExecutor).taskTimeout(config.getTaskExpirationTimeout()).build(); NettyServerConfigBuilder nettyServerConfigBuilder = NettyServerConfig.newBuilder(); nettyServerConfigBuilder.getServerSocketChannelConfig().setBacklog(config.getAcceptBacklog()); nettyServerConfigBuilder.setBossThreadCount(config.getAcceptorThreadCount()); nettyServerConfigBuilder.setWorkerThreadCount(config.getIoThreadCount()); nettyServerConfigBuilder.setTimer(timer); NettyServerConfig nettyServerConfig = nettyServerConfigBuilder.build(); transport = new NettyServerTransport(thriftServerDef, nettyServerConfig, allChannels, local); }
public static void main(String[] args) throws Exception { EventLoopGroup workerGroup = new NioEventLoopGroup(); EventLoopGroup bossGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup); b.channel(NioServerSocketChannel.class); b.handler(new LoggingHandler(LogLevel.DEBUG)); b.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { TMultiplexedProcessor multiprocessor = new TMultiplexedProcessor(); multiprocessor.registerProcessor("Calculator", new Calculator.Processor(new CalculatorHandler())); multiprocessor.registerProcessor("Scribe", new scribe.Processor<scribe.Iface>(new scribe.Iface() { @Override public ResultCode Log(List<LogEntry> messages) throws TException { for (LogEntry message : messages) { log.info("{}: {}", message.getCategory(), message.getMessage()); } return ResultCode.OK; } })); ThriftServerDef def = new ThriftServerDefBuilder().withProcessor(multiprocessor).build(); ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("frameDecoder", new ThriftFrameDecoder(def.getMaxFrameSize(), def.getInProtocolFactory())); pipeline.addLast("dispatcher", new NiftyDispatcher(def)); } }); b.option(ChannelOption.SO_BACKLOG, 128); b.childOption(ChannelOption.SO_KEEPALIVE, true); log.debug("configuration serverBootstrap"); if (log.isInfoEnabled()) { log.info("Start server with port: {} ", 9090); } else if (log.isWarnEnabled()) { log.warn("Start server with port: {} ", 9090); } else if (log.isErrorEnabled()) { log.error("Start server with port: {} ", 9090); } Channel serverChannel = b.bind(9090).sync().channel().closeFuture().sync().channel(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }