protected ClientProcessData(GfxdTSocket socket, int connectionNumber, TProcessor proc, TTransport in, TTransport out, TProtocol inp, TProtocol outp, TServerEventHandler eventHandler) { this.clientSocket = socket; this.connectionNumber = connectionNumber; this.processor = proc; this.inputTransport = in; this.outputTransport = out; this.inputProtocol = inp; this.outputProtocol = outp; this.eventHandler = eventHandler; if (eventHandler != null) { this.connectionContext = eventHandler.createContext(inp, outp); } else { this.connectionContext = null; } this.idle = true; }
@Test public void testThriftServerConfiguration() { ThriftServerConfiguration thriftServerConfiguration = new ThriftServerConfiguration(); // 是否有默认值 assertNotNull(thriftServerConfiguration.getTransportFactory()); assertNotNull(thriftServerConfiguration.getProtocolFactory()); // 测试事件处理器配置 TServerEventHandler testServerEventHandler = new TestServerEventHandler(); thriftServerConfiguration.setServerEventHandler(testServerEventHandler); assertEquals(thriftServerConfiguration.getServerEventHandler(), testServerEventHandler); // 测试线程池配置 ExecutorService executorService = Executors.newSingleThreadExecutor(); thriftServerConfiguration.setExecutorService(executorService); assertEquals(thriftServerConfiguration.getExecutorService(), executorService); }
public static <I> TServer exportTServer( Class<I> svIf, I sv, int port, TServerEventHandler eventHandler, int poolSize, Map<String, ClassLoader> loaders) throws Throwable { ThriftServerFactory f = new ThriftServerFactory(); f.setServerHost("0.0.0.0"); f.setServerPort(port); f.setProcessor(new TClusterService.Processor<>( wrapService(svIf, sv, loaders == null ? new HashMap<>() : loaders))); f.setServerEventHandler(eventHandler); f.setServerPoolSize(poolSize); f.setNonTLSServerMaxFrameSize(THRIFT_MAX_FRAME_SIZE); return f.build(); }
/** * Add Thrift event handler to underlying thrift threadpool server * @param eventHandler */ public void setThriftEventHandler(TServerEventHandler eventHandler) throws IllegalStateException { if (thriftServer == null) { throw new IllegalStateException("Server is not initialized or stopped"); } thriftServer.setServerEventHandler(eventHandler); }
/** * {@inheritDoc} */ @Override public TServerEventHandler getServerEventHandler() { return new CatalogThriftEventHandler(); }
/** * Loops on processing a client forever */ @Override public void run() { TProcessor processor = null; TTransport inputTransport = null; TTransport outputTransport = null; TProtocol inputProtocol = null; TProtocol outputProtocol = null; final TServerEventHandler eventHandler = getEventHandler(); ServerContext connectionContext = null; final ConnectionListener listener = connListener; final TTransport client = this.client; Socket clientSocket = null; try { processor = processorFactory_.getProcessor(client); inputTransport = inputTransportFactory_.getTransport(client); outputTransport = outputTransportFactory_.getTransport(client); inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); if (eventHandler != null) { connectionContext = eventHandler.createContext(inputProtocol, outputProtocol); } // register with ConnectionListener if (listener != null) { if (client instanceof GfxdTSocket) { clientSocket = ((GfxdTSocket)client).getSocket(); } else if (client instanceof TSocket) { clientSocket = ((TSocket)client).getSocket(); } listener.connectionOpened(clientSocket, this.connectionNumber); } // we check stopped_ first to make sure we're not supposed to be // shutting down. this is necessary for graceful shutdown. while (true) { if (eventHandler != null) { eventHandler.processContext(connectionContext, inputTransport, outputTransport); } if (stopped || !processor.process(inputProtocol, outputProtocol)) { break; } } } catch (TTransportException tte) { // Assume the client died and continue silently } catch (TException te) { LOGGER.error("Thrift error occurred during processing of message.", te); } catch (Exception e) { LOGGER.error("Error occurred during processing of message.", e); } if (eventHandler != null) { eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol); } if (inputTransport != null) { inputTransport.close(); } if (outputTransport != null) { outputTransport.close(); } // deregister with ConnectionListener if (listener != null) { listener.connectionClosed(clientSocket, this.connectionNumber); } }
public TServerEventHandler getServerEventHandler() { return serverEventHandler; }
public void setServerEventHandler(TServerEventHandler serverEventHandler) { this.serverEventHandler = serverEventHandler; }
private void buildServerConfig(Properties properties) { ConfigUtils.assertEmpty("port", properties.getProperty("port")); serverConfig = new ThriftServerConfig(Integer.parseInt(properties.getProperty("port"))); if (StringUtils.isNoneEmpty(properties.getProperty("group"))) { serverConfig.setGroup(properties.getProperty("group")); } if (StringUtils.isNoneEmpty(properties.getProperty("version"))) { serverConfig.setVersion(properties.getProperty("version")); } if (StringUtils.isNoneEmpty(properties.getProperty("MinWorkerThreads"))) { serverConfig.setMinWorkerThreads(Integer.parseInt(properties.getProperty("MinWorkerThreads"))); } if (StringUtils.isNoneEmpty(properties.getProperty("MaxWorkerThreads"))) { serverConfig.setMaxWorkerThreads(Integer.parseInt(properties.getProperty("MaxWorkerThreads"))); } if (StringUtils.isNoneEmpty(properties.getProperty("weight"))) { serverConfig.setWeight(Integer.parseInt(properties.getProperty("weight"))); } if (StringUtils.isNoneEmpty(properties.getProperty("status"))) { serverConfig.setStatus(Boolean.parseBoolean(properties.getProperty("status"))); } if (StringUtils.isNoneEmpty(properties.getProperty("directInvoke"))) { serverConfig.setDirectInvoke(Boolean.parseBoolean(properties.getProperty("directInvoke"))); } if (StringUtils.isNoneEmpty(properties.getProperty("daemonRun"))) { serverConfig.setDaemonRun(Boolean.parseBoolean(properties.getProperty("daemonRun"))); } if (StringUtils.isNoneEmpty(properties.getProperty("serverEventHandler"))) { try { Class<TServerEventHandler> handlerClass = (Class<TServerEventHandler>) Class .forName(properties.getProperty("serverEventHandler")); serverConfig.setServerEventHandler(handlerClass.newInstance()); } catch (Exception e) { LOGGER.warn( "config full loadBalanceStrategy class not find.use default RoundRobinLoadBalanceStrategy."); } } }
/** * 服务端事件监听器。 */ public void setServerEventHandler(TServerEventHandler serverEventHandler) { this.serverEventHandler = serverEventHandler; }
public TServerEventHandler getThriftEventHandler() throws IllegalStateException { if (thriftServer == null) { throw new IllegalStateException("Server is not initialized or stopped"); } return thriftServer.getEventHandler(); }
/** * Returns the server event handler. * * @return server event handler */ public abstract TServerEventHandler getServerEventHandler();