@Bean(name = "pool-server") public TServer poolServer() throws Exception { TServerTransport transport = new TServerSocket(this.port()); TThreadPoolServer.Args args = new TThreadPoolServer.Args(transport); args.transportFactory(new TTransportFactory()); args.protocolFactory(new TBinaryProtocol.Factory()); args.processor(this.processor()); args.executorService(new ThreadPoolExecutor(env.getProperty( "rpc.server.min.worker.threads", Integer.class, 512), env .getProperty("rpc.server.max.worker.threads", Integer.class, 65535), env.getProperty( "rpc.server.thread.keep.alive.time", Long.class, 600l), TimeUnit.SECONDS, new SynchronousQueue<Runnable>())); return new TThreadPoolServer(args); }
private static int logThrift(HostAndPort address, List<LogEntry> messages) { try { TSocket socket = new TSocket(address.getHost(), address.getPort()); socket.open(); try { TBinaryProtocol tp = new TBinaryProtocol(new TFramedTransport(socket)); assertEquals(new scribe.Client(tp).Log(messages), ResultCode.OK); } finally { socket.close(); } } catch (TException e) { throw new RuntimeException(e); } return 1; }
/** * Creates a Model and a Thrift Bridge to this model. */ protected static EBMI createModelBridge(String host, String pythonExecutable, File opendaPythonPath, File modelPythonPath, String modelPythonModuleName, String modelPythonClassName, File modelRunDir) throws IOException { // start local server. int port = getFreePort(); if (host == null) { //localhost host = "127.0.0.1"; } Process process = startModelProcess(host, port, pythonExecutable, opendaPythonPath, modelPythonPath, modelPythonModuleName, modelPythonClassName, modelRunDir); // connect to server. TTransport transport = connectToCode(host, port, process); // create client. TProtocol protocol = new TBinaryProtocol(transport); BMIService.Client client = new BMIService.Client(protocol); return new ThriftBmiBridge(client, process, transport); }
public ThriftTestingSource(String handlerName, int port, String protocol) throws Exception { TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(new InetSocketAddress("0.0.0.0", port)); ThriftSourceProtocol.Iface handler = getHandler(handlerName); TProtocolFactory transportProtocolFactory = null; if (protocol != null && protocol == ThriftRpcClient.BINARY_PROTOCOL) { transportProtocolFactory = new TBinaryProtocol.Factory(); } else { transportProtocolFactory = new TCompactProtocol.Factory(); } server = new THsHaServer(new THsHaServer.Args(serverTransport).processor( new ThriftSourceProtocol.Processor(handler)).protocolFactory( transportProtocolFactory)); Executors.newSingleThreadExecutor().submit(new Runnable() { @Override public void run() { server.serve(); } }); }
private TProtocolFactory getProtocolFactory() { if (protocol.equals(BINARY_PROTOCOL)) { logger.info("Using TBinaryProtocol"); return new TBinaryProtocol.Factory(); } else { logger.info("Using TCompactProtocol"); return new TCompactProtocol.Factory(); } }
public void run() { try { Scribe.Processor processor = new Scribe.Processor(new Receiver()); TNonblockingServerTransport transport = new TNonblockingServerSocket(port); THsHaServer.Args args = new THsHaServer.Args(transport); args.workerThreads(workers); args.processor(processor); args.transportFactory(new TFramedTransport.Factory(maxReadBufferBytes)); args.protocolFactory(new TBinaryProtocol.Factory(false, false)); args.maxReadBufferBytes = maxReadBufferBytes; server = new THsHaServer(args); LOG.info("Starting Scribe Source on port " + port); server.serve(); } catch (Exception e) { LOG.warn("Scribe failed", e); } }
@Test public void testScribeMessage() throws Exception { TTransport transport = new TFramedTransport(new TSocket("localhost", port)); TProtocol protocol = new TBinaryProtocol(transport); Scribe.Client client = new Scribe.Client(protocol); transport.open(); LogEntry logEntry = new LogEntry("INFO", "Sending info msg to scribe source"); List<LogEntry> logEntries = new ArrayList<LogEntry>(1); logEntries.add(logEntry); client.Log(logEntries); // try to get it from Channels Transaction tx = memoryChannel.getTransaction(); tx.begin(); Event e = memoryChannel.take(); Assert.assertNotNull(e); Assert.assertEquals("Sending info msg to scribe source", new String(e.getBody())); tx.commit(); tx.close(); }
/** * @Title: startSchedulerThriftService * @Description: 开启scheduler 同步、异步调用服务 * @return void 返回类型 */ private static void startSchedulerThriftService() { LOG.info("start scheduler thrift service...."); new Thread() { @Override public void run(){ try { SchedulerServiceImpl schedulerServiceImpl = SpringUtil.getBean(SchedulerServiceImpl.class); TProcessor tprocessor = new SchedulerService.Processor<SchedulerService.Iface>(schedulerServiceImpl); TServerSocket serverTransport = new TServerSocket(PropertyLoader.THRIFT_SCHEDULER_PORT); TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(serverTransport); ttpsArgs.processor(tprocessor); ttpsArgs.protocolFactory(new TBinaryProtocol.Factory()); //线程池服务模型,使用标准的阻塞式IO,预先创建一组线程处理请求。 TServer server = new TThreadPoolServer(ttpsArgs); server.serve(); } catch (Exception e) { LOG.error("start scheduler thrift service error,msg:"+ExceptionUtil.getStackTraceAsString(e)); } } }.start(); LOG.info("start scheduler thrift server success!"); }
public void startServer() { try { logger.info("TSimpleServer start ...."); // TMultiplexedProcessor TMultiplexedProcessor processor = new TMultiplexedProcessor(); processor.registerProcessor("Algorithm", new AlgorithmService.Processor<>(new AlgorithmServiceImpl())); TServerSocket serverTransport = new TServerSocket(SERVER_PORT); TServer.Args args = new TServer.Args(serverTransport); args.processor(processor); args.protocolFactory(new TBinaryProtocol.Factory()); // args.protocolFactory(new TJSONProtocol.Factory()); TServer server = new TSimpleServer(args); server.serve(); } catch (Exception e) { logger.error("Server start error!!!"); e.printStackTrace(); } }
@Override public Pair<TTransport, Bmv2DeviceThriftClient> load(DeviceId deviceId) throws TTransportException { log.debug("Instantiating new client... > deviceId={}", deviceId); // Make the expensive call Bmv2Device device = Bmv2Device.of(deviceId); TTransport transport = new TSocket(device.thriftServerHost(), device.thriftServerPort()); TProtocol protocol = new TBinaryProtocol(transport); // Our BMv2 device implements multiple Thrift services, create a client for each one on the same transport. Standard.Client standardClient = new Standard.Client( new TMultiplexedProtocol(protocol, "standard")); SimpleSwitch.Client simpleSwitch = new SimpleSwitch.Client( new TMultiplexedProtocol(protocol, "simple_switch")); // Wrap clients so to automatically have synchronization and resiliency to connectivity errors Standard.Iface safeStandardClient = SafeThriftClient.wrap(standardClient, Standard.Iface.class, options); SimpleSwitch.Iface safeSimpleSwitchClient = SafeThriftClient.wrap(simpleSwitch, SimpleSwitch.Iface.class, options); Bmv2DeviceThriftClient client = new Bmv2DeviceThriftClient(deviceId, transport, safeStandardClient, safeSimpleSwitchClient); return Pair.of(transport, client); }
/** * The function to create a thrift Half-Sync and Half-Async Server. * @param processor */ public static void hshaServer(PacketStreamer.Processor<PacketStreamerHandler> processor) { try { TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(port); THsHaServer.Args args = new THsHaServer.Args(serverTransport); args.processor(processor); args.transportFactory(new TFramedTransport.Factory()); args.protocolFactory(new TBinaryProtocol.Factory(true, true)); TServer server = new THsHaServer(args); log.info("Starting the packetstreamer hsha server on port {} ...", port); server.serve(); } catch (Exception e) { e.printStackTrace(); } }
public static void main(String[] args){ ExecutorService es = Executors.newFixedThreadPool(2); for(int i=0; i<ports.length; i++){ final int index = i; es.execute(new Runnable() { @Override public void run() { try{ TNonblockingServerSocket socket = new TNonblockingServerSocket(ports[index]); TestThriftJ.Processor processor = new TestThriftJ.Processor(new QueryImp()); TNonblockingServer.Args arg = new TNonblockingServer.Args(socket); arg.protocolFactory(new TBinaryProtocol.Factory()); arg.transportFactory(new TFramedTransport.Factory()); arg.processorFactory(new TProcessorFactory(processor)); TServer server = new TNonblockingServer (arg); logger.info("127.0.0.1:" + ports[index] + " start"); server.serve(); }catch(Exception e){ logger.error("127.0.0.1:" + ports[index] + " error"); } } }); } }
public static void main(String[] args){ ExecutorService es = Executors.newFixedThreadPool(2); for(int i=0; i<ports.length; i++){ final int index = i; es.execute(new Runnable() { @Override public void run() { try{ TNonblockingServerSocket socket = new TNonblockingServerSocket(ports[index]); TestThriftJ.Processor processor = new TestThriftJ.Processor(new QueryImp()); TNonblockingServer.Args arg = new TNonblockingServer.Args(socket); arg.protocolFactory(new TBinaryProtocol.Factory()); arg.transportFactory(new TFramedTransport.Factory()); arg.processorFactory(new TProcessorFactory(processor)); TServer server = new TNonblockingServer(arg); logger.info("127.0.0.1:" + ports[index] + " start"); server.serve(); }catch(Exception e){ logger.error("127.0.0.1:" + ports[index] + " error"); } } }); } }
public static void startClient(String ip ,int port ,int timeout) throws Exception { TTransport transport = new TSocket(ip,port,timeout); TProtocol protocol = new TBinaryProtocol(transport); leafrpc.Client client = new leafrpc.Client(protocol); transport.open(); int i = 0; while(i < 2000000) { client.getID(""); ++i; } transport.close(); }
public static void startClient2(String ip ,int port ,int timeout) throws Exception { TTransport transport = new TFramedTransport(new TSocket(ip,port,timeout)); TProtocol protocol = new TBinaryProtocol(transport); leafrpc.Client client = new leafrpc.Client(protocol); transport.open(); for(int i = 0; i< 1000000; i++) { client.getID(""); if (i % 100000 == 0) { System.out.println(Thread.currentThread().getName() + " " + client.getID("")); } //ai.incrementAndGet(); } transport.close(); }
public static void startRPCServer(leafServer leafserver , String ip , int port) throws Exception { ServerSocket serverSocket = new ServerSocket(port,10000, InetAddress.getByName(ip)); TServerSocket serverTransport = new TServerSocket(serverSocket); //设置协议工厂为TBinaryProtocolFactory Factory proFactory = new TBinaryProtocol.Factory(); //关联处理器leafrpc的实现 TProcessor processor = new leafrpc.Processor<leafrpc.Iface>(new RPCService(leafserver)); TThreadPoolServer.Args args2 = new TThreadPoolServer.Args(serverTransport); args2.processor(processor); args2.protocolFactory(proFactory); TServer server = new TThreadPoolServer(args2); LOG.info("leaf RPCServer(type:TThreadPoolServer) start at ip:port : "+ ip +":" + port ); server.serve(); }
public static void startRPCServer2(leafServer leafserver , String ip , int port) throws Exception { //关联处理器leafrpc的实现 TProcessor processor = new leafrpc.Processor<leafrpc.Iface>(new RPCService(leafserver)); //传输通道,非阻塞模式 InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(ip),port); TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(address,10000); //多线程半同步半异步 TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport); tArgs.processor(processor); //二进制协议 tArgs.protocolFactory(new TBinaryProtocol.Factory()); //多线程半同步半异步的服务模型 TServer server = new TThreadedSelectorServer(tArgs); LOG.info("leaf RPCServer(type:TThreadedSelectorServer) start at ip:port : "+ ip +":" + port ); server.serve(); }
@Override public TServiceClient makeObject() throws Exception { InetSocketAddress address = serverAddressProvider.selector(); if(address==null){ new ThriftException("No provider available for remote service"); } TSocket tsocket = new TSocket(address.getHostName(), address.getPort()); TTransport transport = new TFramedTransport(tsocket); TProtocol protocol = new TBinaryProtocol(transport); TServiceClient client = this.clientFactory.getClient(protocol); transport.open(); if (callback != null) { try { callback.make(client); } catch (Exception e) { logger.warn("makeObject:{}", e); } } return client; }
public TsfileConnection(String url, Properties info) throws SQLException, TTransportException { if (url == null) { throw new TsfileURLException("Input url cannot be null"); } params = Utils.parseURL(url, info); supportedProtocols.add(TSProtocolVersion.TSFILE_SERVICE_PROTOCOL_V1); openTransport(); client = new TSIService.Client(new TBinaryProtocol(transport)); // open client session openSession(); // Wrap the client with a thread-safe proxy to serialize the RPC calls client = newSynchronizedClient(client); autoCommit = false; }
public boolean reconnect() { boolean flag = false; for (int i = 1; i <= TsfileJDBCConfig.RETRY_NUM; i++) { try { if (transport != null) { transport.close(); openTransport(); client = new TSIService.Client(new TBinaryProtocol(transport)); openSession(); client = newSynchronizedClient(client); flag = true; break; } } catch (Exception e) { try { Thread.sleep(TsfileJDBCConfig.RETRY_INTERVAL); } catch (InterruptedException e1) { e.printStackTrace(); } } } return flag; }
@Override @SuppressWarnings("unchecked") public <X extends TAsyncClient> X getClient(final Class<X> clazz) { return (X) super.clients.computeIfAbsent(ClassNameUtils.getOuterClassName(clazz), (className) -> { TProtocolFactory protocolFactory = (TProtocolFactory) tTransport -> { TProtocol protocol = new TBinaryProtocol(tTransport); return new TMultiplexedProtocol(protocol, className); }; try { return clazz.getConstructor(TProtocolFactory.class, TAsyncClientManager.class, TNonblockingTransport.class) .newInstance(protocolFactory, this.clientManager, this.transport); } catch (Throwable e) { if (e instanceof UnresolvedAddressException) { this.isOpen = false; } return null; } }); }
public void start(CountDownLatch latch, int port) { try { TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port); //异步IO,需要使用TFramedTransport,它将分块缓存读取。 TTransportFactory transportFactory = new TFramedTransport.Factory(); //使用高密度二进制协议 TProtocolFactory proFactory = new TBinaryProtocol.Factory(); //发布多个服务 TMultiplexedProcessor processor = new TMultiplexedProcessor(); processor.registerProcessor(ClassNameUtils.getClassName(Hello.class), new Hello.Processor<>(new HelloServer())); TServer server = new TThreadedSelectorServer(new TThreadedSelectorServer.Args(serverTransport) .transportFactory(transportFactory) .protocolFactory(proFactory) .processor(processor) ); System.out.println("Starting the hello server..."); latch.countDown(); server.serve(); } catch (Exception e) { e.printStackTrace(); } }
@Test public void thriftTest() throws TException { TSocket transport = new TSocket("127.0.0.1", 8080); transport.open(); TProtocol protocol = new TBinaryProtocol(transport); TMultiplexedProtocol mp1 = new TMultiplexedProtocol(protocol,"helloWorld"); HelloWorld.Client client = new HelloWorld.Client(mp1); User user = new User(); user.setName("{\"proid\":\"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\"}"); user.setId(234242453); user.setIsman(true); Result result = client.createNewBaseResInfo(user); Assert.notNull(result); System.out.println(result.getMsg()); System.out.println("end>>>>>>>>>>>>>>>>"); }
private static void connectToCMD() { QueryInput query_input = new QueryInput(); query_input.type = "ensemble"; query_input.data = new ArrayList<String>(); query_input.data.add("localhost"); query_input.tags = new ArrayList<String>(); query_input.tags.add("9090"); QuerySpec spec = new QuerySpec(); spec.content = new ArrayList<QueryInput>(); spec.content.add(query_input); // Initialize thrift objects. TTransport transport = new TSocket("localhost", 8080); TProtocol protocol = new TBinaryProtocol(new TFramedTransport(transport)); LucidaService.Client client = new LucidaService.Client(protocol); try { transport.open(); System.out.println("Connecting to CMD at port " + 8080); // Register itself to CMD. client.create("", spec); transport.close(); System.out.println("Successfully connected to CMD"); } catch (TException x) { x.printStackTrace(); } }
public Worker(int id, long durationMillis, String path, int globals) throws IOException { this.id = id; this.workerDuration = durationMillis; this.localPath = path + "/f" + Integer.toString(id); this.globalPath = "/f" + Integer.toString(id); this.instanceMap = new HashMap<>(); this.globals = globals; String replicaHost = replicaAddr.split(":")[0]; int replicaPort = Integer.parseInt(replicaAddr.split(":")[1]); TTransport transport = new TSocket(replicaHost, replicaPort); try { transport.open(); } catch (TTransportException e) { throw new RuntimeException(e); } TProtocol protocol = new TBinaryProtocol(transport); c = new FuseOps.Client(protocol); out = new BufferedWriter(new FileWriter(new File(logPrefix + this.id))); }
public Worker(int id, long durationMillis, String path, int globals) throws IOException { this.id = id; this.workerDuration = durationMillis; this.path = path; this.globals = globals; String replicaHost = replicaAddr.split(":")[0]; int replicaPort = Integer.parseInt(replicaAddr.split(":")[1]); TTransport transport = new TSocket(replicaHost, replicaPort); try { transport.open(); } catch (TTransportException e) { throw new RuntimeException(e); } TProtocol protocol = new TBinaryProtocol(transport); c = new FuseOps.Client(protocol); out = new BufferedWriter(new FileWriter(new File(logPrefix + this.id))); }
@SuppressWarnings("unchecked") @Override public PooledObject makeObject() throws Exception { //logger.debug("makeObject..........."); try { String host = this.host; int port = this.port; int timeout = this.timeout; TFramedTransport transport = new TFramedTransport(new TSocket(host, port, timeout)); TBinaryProtocol protocol = new TBinaryProtocol(transport); FrcService.Client client = new FrcService.Client(protocol, protocol); transport.open(); RpcClient<Client> rpcClient = new RpcClient(client, transport, 1); return this.wrap(rpcClient); } catch (Exception e) { logger.error("exception", e); return null; } }
private static boolean createThrift() { String flag = getClassName() + ".createThrift"; try { TProcessor tProcessor = new FrcService.Processor<FrcService.Iface>(theInstance); TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(thrift_port); TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(tnbSocketTransport); tnbArgs.maxReadBufferBytes = DefaultValues.THRIFT_MAX_READ_BUF; tnbArgs.processor(tProcessor); // tnbArgs.transportFactory(new LCQTFramedTransport.Factory()); tnbArgs.transportFactory(new TFramedTransport.Factory()); tnbArgs.protocolFactory(new TBinaryProtocol.Factory()); server = new TNonblockingServer(tnbArgs); // server.setServerEventHandler(new LCQTServerEventHandler()); return true; } catch (Exception e) { FRCLogger.getInstance().warn(-100, flag, "exception", e); return false; } }
@Override public Client create() throws Exception { TSocket transport = new TSocket(host, port); try { transport.open(); } catch (TTransportException e) { throw new InterpreterException(e); } TProtocol protocol = new TBinaryProtocol(transport); Client client = new RemoteInterpreterService.Client(protocol); synchronized (clientSocketMap) { clientSocketMap.put(client, transport); } return client; }
protected static TProtocol newProtocol(URL url, ChannelBuffer buffer) throws IOException { String protocol = url.getParameter(ThriftConstants.THRIFT_PROTOCOL_KEY, ThriftConstants.DEFAULT_PROTOCOL); if (ThriftConstants.BINARY_THRIFT_PROTOCOL.equals(protocol)) { return new TBinaryProtocol(new TIOStreamTransport(new ChannelBufferOutputStream(buffer))); } throw new IOException("Unsupported protocol type " + protocol); }
protected void init() throws Exception { TServerTransport serverTransport = new TServerSocket( PORT ); TBinaryProtocol.Factory bFactory = new TBinaryProtocol.Factory(); server = new TThreadPoolServer( new TThreadPoolServer.Args( serverTransport ) .inputProtocolFactory( bFactory ) .outputProtocolFactory( bFactory ) .inputTransportFactory( getTransportFactory() ) .outputTransportFactory( getTransportFactory() ) .processor( getProcessor() ) ); Thread startTread = new Thread() { @Override public void run() { server.serve(); } }; startTread.setName( "thrift-server" ); startTread.start(); while( !server.isServing() ) { Thread.sleep( 100 ); } protocol = ExtensionLoader.getExtensionLoader(Protocol.class) .getExtension( ThriftProtocol.NAME ); invoker = protocol.refer( getInterface(), getUrl() ); }
private static int testProcessor(TProcessor processor, List<ToIntFunction<HostAndPort>> clients) throws Exception { try (TServerSocket serverTransport = new TServerSocket(0)) { TProtocolFactory protocolFactory = new TBinaryProtocol.Factory(); TTransportFactory transportFactory = new TFramedTransport.Factory(); TServer server = new TSimpleServer(new Args(serverTransport) .protocolFactory(protocolFactory) .transportFactory(transportFactory) .processor(processor)); Thread serverThread = new Thread(server::serve); try { serverThread.start(); int localPort = serverTransport.getServerSocket().getLocalPort(); HostAndPort address = HostAndPort.fromParts("localhost", localPort); int sum = 0; for (ToIntFunction<HostAndPort> client : clients) { sum += client.applyAsInt(address); } return sum; } finally { server.stop(); serverThread.interrupt(); } } }
public ThriftTestingSource(String handlerName, int port, String protocol, String keystore, String keystorePassword, String keyManagerType, String keystoreType) throws Exception { TSSLTransportFactory.TSSLTransportParameters params = new TSSLTransportFactory.TSSLTransportParameters(); params.setKeyStore(keystore, keystorePassword, keyManagerType, keystoreType); TServerSocket serverTransport = TSSLTransportFactory.getServerSocket( port, 10000, InetAddress.getByName("0.0.0.0"), params); ThriftSourceProtocol.Iface handler = getHandler(handlerName); Class serverClass = Class.forName("org.apache.thrift" + ".server.TThreadPoolServer"); Class argsClass = Class.forName("org.apache.thrift.server" + ".TThreadPoolServer$Args"); TServer.AbstractServerArgs args = (TServer.AbstractServerArgs) argsClass .getConstructor(TServerTransport.class) .newInstance(serverTransport); Method m = argsClass.getDeclaredMethod("maxWorkerThreads", int.class); m.invoke(args, Integer.MAX_VALUE); TProtocolFactory transportProtocolFactory = null; if (protocol != null && protocol == ThriftRpcClient.BINARY_PROTOCOL) { transportProtocolFactory = new TBinaryProtocol.Factory(); } else { transportProtocolFactory = new TCompactProtocol.Factory(); } args.protocolFactory(transportProtocolFactory); args.inputTransportFactory(new TFastFramedTransport.Factory()); args.outputTransportFactory(new TFastFramedTransport.Factory()); args.processor(new ThriftSourceProtocol.Processor<ThriftSourceProtocol.Iface>(handler)); server = (TServer) serverClass.getConstructor(argsClass).newInstance(args); Executors.newSingleThreadExecutor().submit(new Runnable() { @Override public void run() { server.serve(); } }); }