Java 类io.reactivex.netty.pipeline.PipelineConfigurators 实例源码

项目:triathlon    文件:RxTestServer.java   
public HttpServer<ByteBuf, ByteBuf> createServer() {
    HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() {
        @Override
        public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
            if (request.getPath().contains("/v2/apps")) {
                if (request.getHttpMethod().equals(HttpMethod.POST)) {
                    return handleTest(request, response);
                }
            }
            response.setStatus(HttpResponseStatus.NOT_FOUND);
            return response.close();
        }
    }).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).enableWireLogging(LogLevel.ERROR).build();

    System.out.println("RxTetstServer server started...");
    return server;
}
项目:RxNetty-Android    文件:ServerFragment.java   
@Override public void onCreate(Bundle savedInstanceState) {
  super.onCreate(savedInstanceState);
  server = RxNetty.createTcpServer(PORT, PipelineConfigurators.textOnlyConfigurator(),
      connection -> {
        mainHandler.post(() -> adapter.add("New client connection established."));
        connection.writeAndFlush("Welcome! \n\n");
        return connection.getInput().flatMap(msg -> {
          Log.d(TAG, "Server onNext: " + msg);
          msg = msg.trim();
          if (!msg.isEmpty()) {
            return connection.writeAndFlush("echo => " + msg + '\n');
          } else {
            return Observable.empty();
          }
        });
      });
}
项目:nibbler    文件:NettyHttpServer.java   
private HttpServer<ByteBuf, ByteBuf> createServer(RestHttpServerConfiguration configuration) {
  ServerBootstrap serverBootstrap = createServerBootstrap(configuration.getInterface(), configuration.getPort());

  HttpServerBuilder<ByteBuf, ByteBuf> httpServerBuilder =
    new HttpServerBuilder<>(serverBootstrap, configuration.getPort(),
      createServiceRequestHandler(configuration.getRestServices(), configuration.getConverters()))
      .pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator())
      .withRequestProcessingThreads(configuration.getRequestProcessingThreads())
      .enableWireLogging(LogLevel.DEBUG);

  String message = "starting http server on port %s at interface %s, with %s processing threads ...";

  log.info(String.format(message, configuration.getPort(), configuration.getInterface(), configuration.getRequestProcessingThreads()));
  logServices(configuration.getRestServices());

  return httpServerBuilder.build();
}
项目:Prana    文件:HealthCheckHandler.java   
private Observable<HttpClientResponse<ByteBuf>> getResponse(String externalHealthCheckURL) {
    String host = "localhost";
    int port = DEFAULT_APPLICATION_PORT;
    String path = "/healthcheck";
    try {
        URL url = new URL(externalHealthCheckURL);
        host = url.getHost();
        port = url.getPort();
        path = url.getPath();
    } catch (MalformedURLException e) {
        //continue
    }
    Integer timeout = DynamicProperty.getInstance("prana.host.healthcheck.timeout").getInteger(DEFAULT_CONNECTION_TIMEOUT);
    HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(host, port)
            .pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpClientConfigurator())
            .channelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout)
            .build();
    return httpClient.submit(HttpClientRequest.createGet(path));

}
项目:ribbon    文件:RxMovieServer.java   
public HttpServer<ByteBuf, ByteBuf> createServer() {
    HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(port, new RequestHandler<ByteBuf, ByteBuf>() {
        @Override
        public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
            if (request.getPath().contains("/users")) {
                if (request.getHttpMethod().equals(HttpMethod.GET)) {
                    return handleRecommendationsByUserId(request, response);
                } else {
                    return handleUpdateRecommendationsForUser(request, response);
                }
            }
            if (request.getPath().contains("/recommendations")) {
                return handleRecommendationsBy(request, response);
            }
            if (request.getPath().contains("/movies")) {
                return handleRegisterMovie(request, response);
            }
            response.setStatus(HttpResponseStatus.NOT_FOUND);
            return response.close();
        }
    }).pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).enableWireLogging(LogLevel.ERROR).build();

    System.out.println("RxMovie server started...");
    return server;
}
项目:RxNetty    文件:TcpEventStreamClientFast.java   
public static void main(String[] args) {
    Observable<ObservableConnection<String, String>> connectionObservable =
            RxNetty.createTcpClient("localhost", 8181, PipelineConfigurators.stringMessageConfigurator()).connect();
    connectionObservable.flatMap(new Func1<ObservableConnection<String, String>, Observable<?>>() {
        @Override
        public Observable<?> call(ObservableConnection<String, String> connection) {
            return connection.getInput().map(new Func1<String, String>() {
                @Override
                public String call(String msg) {
                    return msg.trim();
                }
            });
        }
    }).toBlockingObservable().forEach(new Action1<Object>() {
        @Override
        public void call(Object o) {
            System.out.println("onNext event => " + o);
        }
    });

}
项目:RxNetty    文件:TcpEchoServer.java   
public static void main(final String[] args) {
    final int port = 8181;
    RxNetty.createTcpServer(port, PipelineConfigurators.textOnlyConfigurator(),
                            new ConnectionHandler<String, String>() {
                                @Override
                                public Observable<Void> handle(
                                        final ObservableConnection<String, String> connection) {
                                    System.out.println("New client connection established.");
                                    connection.writeAndFlush("Welcome! \n\n");
                                    return connection.getInput().flatMap(new Func1<String, Observable<Void>>() {
                                        @Override
                                        public Observable<Void> call(String msg) {
                                            System.out.println("onNext: " + msg);
                                            msg = msg.trim();
                                            if (!msg.isEmpty()) {
                                                return connection.writeAndFlush("echo => " + msg + '\n');
                                            } else {
                                                return COMPLETED_OBSERVABLE;
                                            }
                                        }
                                    });
                                }
                            }).startAndWait();
}
项目:RxNetty    文件:TcpEventStreamClientSlow.java   
public static void main(String[] args) {
    Observable<ObservableConnection<String, String>> connectionObservable =
            RxNetty.createTcpClient("localhost", 8181, PipelineConfigurators.stringMessageConfigurator()).connect();
    connectionObservable.flatMap(new Func1<ObservableConnection<String, String>, Observable<?>>() {
        @Override
        public Observable<?> call(ObservableConnection<String, String> connection) {
            return connection.getInput().map(new Func1<String, String>() {
                @Override
                public String call(String msg) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return msg.trim();
                }
            });
        }
    }).toBlockingObservable().forEach(new Action1<Object>() {
        @Override
        public void call(Object o) {
            System.out.println("onNext event => " + o);
        }
    });

}
项目:RxNetty    文件:HttpClientTest.java   
@Test
public void testNonChunkingStream() throws Exception {
    HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.createHttpClient("localhost", port,
                                                                    PipelineConfigurators.<ByteBuf>sseClientConfigurator());
    Observable<HttpClientResponse<ServerSentEvent>> response =
            client.submit(HttpClientRequest.createGet("test/nochunk_stream"));
    final List<String> result = new ArrayList<String>();
    response.flatMap(new Func1<HttpClientResponse<ServerSentEvent>, Observable<ServerSentEvent>>() {
        @Override
        public Observable<ServerSentEvent> call(HttpClientResponse<ServerSentEvent> httpResponse) {
            return httpResponse.getContent();
        }
    }).toBlockingObservable().forEach(new Action1<ServerSentEvent>() {
        @Override
        public void call(ServerSentEvent event) {
            result.add(event.getEventData());
        }
    });
    assertEquals(RequestProcessor.smallStreamContent, result);
}
项目:MarketData    文件:RxNettyEventEventStreamClient.java   
private Observable<String> initializeStream() {
    HttpClient<ByteBuf, ServerSentEvent> client =
            RxNetty.createHttpClient("localhost", port, PipelineConfigurators.<ByteBuf>clientSseConfigurator());

    return client.submit(HttpClientRequest.createGet("/hello")).
            flatMap(response -> {
                printResponseHeader(response);
                return response.getContent();
            }).map(serverSentEvent -> serverSentEvent.contentAsString());
}
项目:MarketData    文件:RxNettyEventServer.java   
public HttpServer<ByteBuf, ServerSentEvent> createServer() {
    HttpServer<ByteBuf, ServerSentEvent> server = RxNetty.createHttpServer(port,
            (request, response) -> {
                response.getHeaders().set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");
                response.getHeaders().set(CACHE_CONTROL, "no-cache");
                response.getHeaders().set(CONNECTION, "keep-alive");
                response.getHeaders().set(CONTENT_TYPE, "text/event-stream");
                return getIntervalObservable(request, response);
            }, PipelineConfigurators.<ByteBuf>serveSseConfigurator());
    System.out.println("HTTP Server Sent Events server started...");
    return server;
}
项目:vizceral-hystrix    文件:HystrixReader.java   
/**
 * Creates a new hystrix reader.
 *
 * @param configuration The configuration to use.
 * @param cluster       The cluster to read from.
 */
public HystrixReader(Configuration configuration, String cluster)
{
    this.configuration = configuration;
    this.cluster = cluster;
    HttpClientBuilder<ByteBuf, ServerSentEvent> builder = RxNetty.newHttpClientBuilder(configuration.getTurbineHost(), configuration.getTurbinePort());
    builder.pipelineConfigurator(PipelineConfigurators.clientSseConfigurator());
    if (configuration.isSecure())
    {
        builder.withSslEngineFactory(DefaultFactories.trustAll());
    }
    rxNetty = builder.build();
}
项目:triathlon    文件:MarathonClient.java   
public Observable<HttpClientResponse<ByteBuf>> postMessage(String message) {
    PipelineConfigurator<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>> pipelineConfigurator
            = PipelineConfigurators.httpClientConfigurator();

    HttpClient<ByteBuf, ByteBuf> client = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(networkAddress.getIpAddress(), port)
            .pipelineConfigurator(pipelineConfigurator)
            .enableWireLogging(LogLevel.ERROR).build();

    HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost("/v2/apps");
    request.withRawContentSource(Observable.just(message), StringTransformer.DEFAULT_INSTANCE);
    request.withHeader("Content-Type", "application/json");
    return client.submit(request);
}
项目:triathlon    文件:HealthCheck.java   
private HttpClientResponse<ByteBuf> getResponse(String serviceUrl) throws MalformedURLException, InterruptedException, ExecutionException, TimeoutException {
    String host, path;
    int port;

    URL url = new URL(serviceUrl);
    host = url.getHost();
    port = url.getPort();
    path = url.getPath();
    System.out.println(url);

    HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(host, port)
            .pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpClientConfigurator())
            .build();
    return httpClient.submit(HttpClientRequest.createGet(path)).toBlocking().toFuture().get(checkTimeout, TimeUnit.MILLISECONDS);
}
项目:argos-dashboard    文件:DefaultHystrixClusterMonitor.java   
@Override
public Observable<String> observeJson() {
    if(jsonObservable != null) {
        return jsonObservable;
    }

    HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(url.getPath() + "?" + url.getQuery());
    int port = url.getPort() < 0 ? url.getDefaultPort() : url.getPort();
    HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.<ByteBuf, ServerSentEvent>newHttpClientBuilder(url.getHost(), port)
            .withNoConnectionPooling()
            .pipelineConfigurator(PipelineConfigurators.<ByteBuf>clientSseConfigurator())
            .build();


    jsonObservable = client.submit(request)
            .doOnError(t -> LOG.error("Error connecting to " + url, t))
            .flatMap(response -> {
                        if (response.getStatus().code() != 200) {
                            return Observable.error(new RuntimeException("Failed to connect: " + response.getStatus()));
                        }

                        return response.getContent()
                                .doOnSubscribe(() -> LOG.info("Turbine => Aggregate Stream from URL: " + url))
                                .doOnUnsubscribe(() -> LOG.info("Turbine => Unsubscribing Stream: " + url))
                                .map(ServerSentEvent::contentAsString);
                    }
            )
            .timeout(120, TimeUnit.SECONDS)
            .retryWhen(attempts -> attempts.zipWith(Observable.range(1, Integer.MAX_VALUE), (k, i) -> i)
                    .flatMap(n -> {
                        int waitTimeSeconds = Math.min(6, n) * 10; // wait in 10 second increments up to a max of 1 minute
                        LOG.info("Turbine => Retrying connection to: " + this.url + " in {} seconds", waitTimeSeconds);
                        return Observable.timer(waitTimeSeconds, TimeUnit.SECONDS);
                    })
            )
            .repeat()
            .share();

    return jsonObservable;
}
项目:RxNetty-Android    文件:ServerFragment.java   
@OnClick(R.id.client_button) void startClient() {
  Observable<ObservableConnection<String, String>> connectionObservable =
      RxNetty.createTcpClient("localhost", PORT, PipelineConfigurators.textOnlyConfigurator()).connect();

  connectionObservable.flatMap(connection -> {
    Observable<String> helloMessage = connection.getInput()
        .take(1).map(String::trim);

    // output 10 values at intervals and receive the echo back
    Observable<String> intervalOutput =

        Observable.interval(500, TimeUnit.MILLISECONDS)
            .flatMap(aLong -> connection.writeAndFlush(String.valueOf(aLong + 1))
                .map(aVoid -> ""));

    // capture the output from the server
    Observable<String> echo = connection.getInput().map(String::trim);

    // wait for the helloMessage then start the output and receive echo input
    return Observable.concat(helloMessage, Observable.merge(intervalOutput, echo));
  })
  .take(10)
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Observer<Object>() {
    @Override public void onCompleted() {
      Log.d(TAG, "Client Complete!");
    }

    @Override public void onError(Throwable throwable) {
      Log.e(TAG, "onError: " + throwable.getMessage());
    }

    @Override public void onNext(Object o) {
      final String message = o.toString();
      Log.d(TAG, "Client onNext: " + message);
      adapter.add(message);
    }
  });
}
项目:ReactiveLab    文件:AbstractMiddleTierService.java   
public HttpServer<ByteBuf, ServerSentEvent> createServer(int port) {
    System.out.println("Start " + getClass().getSimpleName() + " on port: " + port);

    // declare handler chain (wrapped in Hystrix)
    // TODO create a better way of chaining these (related https://github.com/ReactiveX/RxNetty/issues/232 and https://github.com/ReactiveX/RxNetty/issues/202)
    HystrixMetricsStreamHandler<ByteBuf, ServerSentEvent> handlerChain 
      = new HystrixMetricsStreamHandler<>(metrics, "/hystrix.stream", 1000, (request, response) -> {
        try {
            long startTime = System.currentTimeMillis();
            return handleRequest(request, response)
                    .doOnCompleted(() -> System.out.println("Response => " + request.getPath() + " Time => " + (int) (System.currentTimeMillis() - startTime) + "ms"))
                    .doOnCompleted(() -> metrics.getRollingPercentile().addValue((int) (System.currentTimeMillis() - startTime)))
                    .doOnCompleted(() -> metrics.getRollingNumber().add(Metrics.EventType.SUCCESS, 1))
                    .doOnError(t -> metrics.getRollingNumber().add(Metrics.EventType.FAILURE, 1));
        } catch (Throwable e) {
            e.printStackTrace();
            System.err.println("Server => Error [" + request.getPath() + "] => " + e);
            response.setStatus(HttpResponseStatus.BAD_REQUEST);
            return response.writeStringAndFlush("data: Error 500: Bad Request\n" + e.getMessage() + "\n");
        }
    });

    return RxNetty.createHttpServer(port, (request, response) -> {
        // System.out.println("Server => Request: " + request.getPath());
            return handlerChain.handle(request, response);
        }, PipelineConfigurators.<ByteBuf> serveSseConfigurator());
}
项目:Prana    文件:HealthCheckHandlerTest.java   
@Before
public void setUp() {
    super.setUp();
    externalServer = RxNetty.newHttpServerBuilder(0, new ExternalServerHandler())
            .pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).build();
    externalServer.start();
    this.externalServerPort = externalServer.getServerPort();
}
项目:Prana    文件:AbstractIntegrationTest.java   
@Before
public void setUp() {
    server = RxNetty.newHttpServerBuilder(0, getHandler())
            .pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpServerConfigurator()).build();
    server.start();
    client = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder("localhost", server.getServerPort())
            .pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpClientConfigurator())
            .channelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000)
            .build();

}
项目:RxNetty    文件:TcpEventStreamServer.java   
public static void main(String[] args) {
    RxNetty.createTcpServer(8181, PipelineConfigurators.textOnlyConfigurator(),
                            new ConnectionHandler<String, String>() {
                                @Override
                                public Observable<Void> handle(ObservableConnection<String, String> newConnection) {
                                    return startEventStream(newConnection);
                                }
                            }).startAndWait();
}
项目:RxNetty    文件:HttpSseServer.java   
public static void main(String[] args) {
    final int port = 8080;

    RxNetty.createHttpServer(port,
                             new RequestHandler<ByteBuf, ServerSentEvent>() {
                                 @Override
                                 public Observable<Void> handle(HttpRequest<ByteBuf> request,
                                                                HttpResponse<ServerSentEvent> response) {
                                     return getIntervalObservable(response);
                                 }
                             }, PipelineConfigurators.<ByteBuf>sseServerConfigurator()).startAndWait();
}
项目:RxNetty    文件:HttpClientTest.java   
@Test
public void testChunkedStreaming() throws Exception {
    HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.createHttpClient("localhost", port,
                                                                    PipelineConfigurators.<ByteBuf>sseClientConfigurator());
    Observable<HttpClientResponse<ServerSentEvent>> response =
            client.submit(HttpClientRequest.createGet("test/stream"));

    final List<String> result = new ArrayList<String>();
    readResponseContent(response, result);
    assertEquals(RequestProcessor.smallStreamContent, result);
}
项目:RxNetty    文件:HttpClientTest.java   
@Test
public void testMultipleChunks() throws Exception {
    HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.createHttpClient("localhost", port,
                                                                    PipelineConfigurators
                                                                            .<ByteBuf>sseClientConfigurator());
    Observable<HttpClientResponse<ServerSentEvent>> response =
            client.submit(HttpClientRequest.createDelete("test/largeStream"));

    final List<String> result = new ArrayList<String>();
    readResponseContent(response, result);
    assertEquals(RequestProcessor.largeStreamContent, result);
}
项目:RxNetty    文件:HttpClientTest.java   
@Test
public void testMultipleChunksWithTransformation() throws Exception {
    HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.createHttpClient("localhost", port,
                                                                    PipelineConfigurators
                                                                            .<ByteBuf>sseClientConfigurator());
    Observable<HttpClientResponse<ServerSentEvent>> response =
            client.submit(HttpClientRequest.createGet("test/largeStream"));
    Observable<String> transformed = response.flatMap(new Func1<HttpClientResponse<ServerSentEvent>, Observable<String>>() {
        @Override
        public Observable<String> call(HttpClientResponse<ServerSentEvent> httpResponse) {
            if (httpResponse.getStatus().equals(HttpResponseStatus.OK)) {
                return httpResponse.getContent().map(new Func1<ServerSentEvent, String>() {
                    @Override
                    public String call(ServerSentEvent sseEvent) {
                        return sseEvent.getEventData();
                    }
                });
            }
            return Observable.error(new RuntimeException("Unexpected response"));
        }
    });

   final List<String> result = new ArrayList<String>();
    transformed.toBlockingObservable().forEach(new Action1<String>() {

        @Override
        public void call(String t1) {
            result.add(t1);
        }
    });
    assertEquals(RequestProcessor.largeStreamContent, result);
}
项目:atlas-oss-plugin    文件:RxHttp.java   
/**
 * Execute an HTTP request.
 *
 * @param server Server to send the request to.
 * @param req    Request to execute.
 * @return Observable with the response of the request.
 */
private static Observable<HttpClientResponse<ByteBuf>>
executeSingle(Server server, HttpClientRequest<ByteBuf> req) {
    HttpClient.HttpClientConfig config = new HttpClient.HttpClientConfig.Builder()
            .readTimeout(READ_TIMEOUT_MS, TimeUnit.MILLISECONDS)
            .userAgent(USER_AGENT)
            .build();


    HttpClientBuilder<ByteBuf, ByteBuf> builder =
            RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(server.host(), server.port())
                    .pipelineConfigurator(PipelineConfigurators.<ByteBuf, ByteBuf>httpClientConfigurator())
                    .config(config)
                    .channelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS);

    if (server.isSecure()) {
        builder.withSslEngineFactory(DefaultFactories.trustAll());
    }

    final HttpClient<ByteBuf, ByteBuf> client = builder.build();
    return client.submit(req)
            .doOnNext(new Action1<HttpClientResponse<ByteBuf>>() {
                @Override
                public void call(HttpClientResponse<ByteBuf> res) {
                    LOGGER.debug("Got response: {}", res.getStatus().code());
                }
            })
            .doOnError(new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    LOGGER.info("Error sending metrics: {}/{}",
                            throwable.getClass().getSimpleName(),
                            throwable.getMessage());
                }
            })
            .doOnTerminate(new Action0() {
                @Override
                public void call() {
                    client.shutdown();
                }
            });
}
项目:RxNetty    文件:HttpClientBuilder.java   
public HttpClientBuilder(String host, int port) {
    super(host, port);
    clientConfig = HttpClient.HttpClientConfig.DEFAULT_CONFIG;
    pipelineConfigurator(PipelineConfigurators.<I, O>httpClientConfigurator());
}
项目:RxNetty    文件:HttpClientBuilder.java   
public HttpClientBuilder(Bootstrap bootstrap, String host, int port) {
    super(bootstrap, host, port);
    pipelineConfigurator(PipelineConfigurators.<I, O>httpClientConfigurator());
}
项目:RxNetty    文件:HttpServerBuilder.java   
public HttpServerBuilder(int port, RequestHandler<I, O> requestHandler) {
    super(port, new HttpConnectionHandler<I, O>(requestHandler));
    pipelineConfigurator(PipelineConfigurators.<I, O>httpServerConfigurator());
}
项目:RxNetty    文件:HttpServerBuilder.java   
public HttpServerBuilder(ServerBootstrap bootstrap, int port, RequestHandler<I, O> requestHandler) {
    super(port, new HttpConnectionHandler<I, O>(requestHandler), bootstrap);
    pipelineConfigurator(PipelineConfigurators.<I, O>httpServerConfigurator());
}
项目:karyon    文件:ShutdownListener.java   
public ShutdownListener(int shutdownPort, final Func1<String, Observable<Void>> commandHandler) {
    shutdownCmdServer = RxNetty.createTcpServer(shutdownPort,
                                     PipelineConfigurators.stringMessageConfigurator(),
                                     new ShutdownConnectionHandler(commandHandler));
}