Java 类io.reactivex.netty.protocol.http.AbstractHttpContentHolder 实例源码

项目:microservices-dashboard-server    文件:NettyServiceCaller.java   
/**
 * Calls the remote service using the provided request, applies error handling and
 * converts the response into a {@link Map}. The entire request and response are
 * executed and handled in a hot observable.
 *
 * @param serviceId the id of the service for which the request is made
 * @param request   the request which has to be executed using RxNetty
 * @return an {@link Observable} emitting the JSON response as a Map with String keys
 * and Object values.
 */
public Observable<Map<String, Object>> retrieveJsonFromRequest(String serviceId, HttpClientRequest<ByteBuf> request) {
    RxClient.ServerInfo serverInfo = getServerInfoFromRequestOrClient(request, rxClient);

    return rxClient.submit(serverInfo, request)
            .publish().autoConnect()
            .doOnError(el -> errorHandler.handleNodeError(serviceId, format("Error retrieving node(s) for url {0} with headers {1}: {2}",
                    request.getUri(), request.getHeaders().entries(), el), el))
            .filter(r -> {
                if (r.getStatus().code() < 400) {
                    return true;
                } else {
                    errorHandler.handleNodeWarning(serviceId, "Exception " + r.getStatus() + " for url " + request.getUri() + " with headers " + r.getHeaders().entries());
                    return false;
                }
            })
            .flatMap(AbstractHttpContentHolder::getContent)
            .map(data -> data.toString(Charset.defaultCharset()))
            .map(response -> {
                JacksonJsonParser jsonParser = new JacksonJsonParser();
                return jsonParser.parseMap(response);
            })
            .doOnNext(r -> logger.info("Json retrieved from call: {}", r))
            .onErrorResumeNext(Observable.empty());
}
项目:mesos-rxjava    文件:TcpSocketProxyTest.java   
@Test
public void testConnectionTerminatedOnClose() throws Exception {
    final TcpSocketProxy proxy = new TcpSocketProxy(
        new InetSocketAddress("localhost", 0),
        new InetSocketAddress("localhost", server.getServerPort())
    );
    proxy.start();

    final int listenPort = proxy.getListenPort();
    final HttpClient<ByteBuf, ByteBuf> client = RxNetty.createHttpClient("localhost", listenPort);

    final String first = client.submit(HttpClientRequest.createGet("/"))
        .flatMap(AbstractHttpContentHolder::getContent)
        .map(bb -> bb.toString(StandardCharsets.UTF_8))
        .toBlocking()
        .first();

    assertThat(first).isEqualTo("Hello World");
    LOGGER.info("first request done");
    proxy.shutdown();
    if (proxy.isShutdown()) {
        proxy.close();
    } else {
        fail("proxy should have been shutdown");
    }

    try {
        final URI uri = URI.create(String.format("http://localhost:%d/", listenPort));
        uri.toURL().getContent();
        fail("Shouldn't have been able to get content");
    } catch (IOException e) {
        // expected
    }
}