Java 类io.reactivex.netty.pipeline.PipelineConfiguratorComposite 实例源码

项目:wildfly-swarm    文件:SecuredTransportFactory.java   
@Override
public HttpClient<ByteBuf, ByteBuf> newHttpClient(final IClientConfig config) {
    final List<ExecutionListener<HttpClientRequest<ByteBuf>, HttpClientResponse<ByteBuf>>> listeners = new ArrayList<>();
    listeners.add(createBearerHeaderAdder());
    final PipelineConfiguratorComposite<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>> pipelineConfigurator = new PipelineConfiguratorComposite<HttpClientResponse<ByteBuf>,
            HttpClientRequest<ByteBuf>>(new HttpClientPipelineConfigurator<ByteBuf, ByteBuf>(),
                                        new HttpObjectAggregationConfigurator(maxChunkSize));
    final LoadBalancingHttpClient<ByteBuf, ByteBuf> client = LoadBalancingHttpClient.<ByteBuf, ByteBuf>builder()
            .withClientConfig(config)
            .withExecutorListeners(listeners)
            .withRetryHandler(getDefaultHttpRetryHandlerWithConfig(config))
            .withPipelineConfigurator(pipelineConfigurator)
            .withPoolCleanerScheduler(RibbonTransport.poolCleanerScheduler)
            .build();

    return client;
}
项目:ARCHIVE-wildfly-swarm    文件:SecuredTransportFactory.java   
@Override
public HttpClient<ByteBuf, ByteBuf> newHttpClient(final IClientConfig config) {
    final List<ExecutionListener<HttpClientRequest<ByteBuf>, HttpClientResponse<ByteBuf>>> listeners = new ArrayList<>();
    listeners.add(createBearerHeaderAdder());
    final PipelineConfiguratorComposite<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>> pipelineConfigurator = new PipelineConfiguratorComposite<HttpClientResponse<ByteBuf>, 
            HttpClientRequest<ByteBuf>>(new HttpClientPipelineConfigurator<ByteBuf, ByteBuf>(),
            new HttpObjectAggregationConfigurator(maxChunkSize));
    final LoadBalancingHttpClient<ByteBuf, ByteBuf> client = LoadBalancingHttpClient.<ByteBuf, ByteBuf>builder()
            .withClientConfig(config)
            .withExecutorListeners(listeners)
            .withRetryHandler(getDefaultHttpRetryHandlerWithConfig(config))
            .withPipelineConfigurator(pipelineConfigurator)
            .withPoolCleanerScheduler(RibbonTransport.poolCleanerScheduler)
            .build();

    return client;
}
项目:RxNetty    文件:RxClientImpl.java   
public RxClientImpl(ServerInfo serverInfo, Bootstrap clientBootstrap,
        PipelineConfigurator<O, I> pipelineConfigurator, ClientConfig clientConfig) {
    if (null == clientBootstrap) {
        throw new NullPointerException("Client bootstrap can not be null.");
    }
    if (null == serverInfo) {
        throw new NullPointerException("Server info can not be null.");
    }
    if (null == clientConfig) {
        throw new NullPointerException("Client config can not be null.");
    }
    this.clientConfig = clientConfig;
    this.serverInfo = serverInfo;
    this.clientBootstrap = clientBootstrap;
    if (clientConfig.isReadTimeoutSet()) {
        ReadTimeoutPipelineConfigurator readTimeoutConfigurator =
                new ReadTimeoutPipelineConfigurator(clientConfig.getReadTimeoutInMillis(), TimeUnit.MILLISECONDS);
        if (null != pipelineConfigurator) {
            pipelineConfigurator = new PipelineConfiguratorComposite<O, I>(pipelineConfigurator,
                                                                           readTimeoutConfigurator);
        } else {
            pipelineConfigurator = new PipelineConfiguratorComposite<O, I>(readTimeoutConfigurator);
        }
    }
    incompleteConfigurator = pipelineConfigurator;
}
项目:RxNetty    文件:RxServer.java   
public RxServer(ServerBootstrap bootstrap, int port, final PipelineConfigurator<I, O> pipelineConfigurator,
                final ConnectionHandler<I, O> connectionHandler) {
    if (null == bootstrap) {
        throw new NullPointerException("Bootstrap can not be null.");
    }
    this.bootstrap = bootstrap;
    this.port = port;
    this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            RxRequiredConfigurator<I, O> requiredConfigurator = new RxRequiredConfigurator<I, O>(connectionHandler,
                                                                                                 errorHandler);
            PipelineConfigurator<I, O> configurator;
            if (null == pipelineConfigurator) {
                configurator = requiredConfigurator;
            } else {
                configurator = new PipelineConfiguratorComposite<I, O>(pipelineConfigurator, requiredConfigurator);
            }
            configurator.configureNewPipeline(ch.pipeline());
        }
    });

    serverStateRef = new AtomicReference<ServerState>(ServerState.Created);
}
项目:RxNetty    文件:RemoteObservable.java   
private static <T> void serveMany(int port, final Observable<List<Observable<T>>> observable, final Encoder<T> encoder,
            boolean startAndWait, SlottingStrategy<T> slottingStrategy, IngressPolicy ingressPolicy){
        RxServer<RemoteRxEvent, RemoteRxEvent> server 
            = RxNetty.createTcpServer(port, new PipelineConfiguratorComposite<RemoteRxEvent, RemoteRxEvent>(
                new PipelineConfigurator<RemoteRxEvent, RemoteRxEvent>(){
                    @Override
                    public void configureNewPipeline(ChannelPipeline pipeline) {
//                      pipeline.addFirst(new LoggingHandler(LogLevel.ERROR)); // uncomment to enable debug logging 
                        pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // 4 bytes to encode length
                        pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(524288, 0, 4, 0, 4)); // max frame = half MB
                    }
                }, new RxEventPipelineConfigurator()),  
                new RemoteObservableConnectionHandler<T>(observable, encoder, slottingStrategy, ingressPolicy));
        if(startAndWait){
            server.startAndWait();
        }else{
            server.start();
        }
    }
项目:RxNetty    文件:HttpClientImpl.java   
@Override
protected PipelineConfigurator<HttpClientRequest<I>, HttpClientResponse<O>> getPipelineConfiguratorForAChannel(ClientConnectionHandler clientConnectionHandler,
                                                                                                   PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator) {
    PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> configurator =
            new PipelineConfiguratorComposite<HttpClientResponse<O>, HttpClientRequest<I>>(pipelineConfigurator,
                                                                               new ClientRequiredConfigurator<I, O>());
    return super.getPipelineConfiguratorForAChannel(clientConnectionHandler, configurator);
}
项目:RxNetty    文件:RxClientImpl.java   
protected PipelineConfigurator<I, O> getPipelineConfiguratorForAChannel(ClientConnectionHandler clientConnectionHandler,
        PipelineConfigurator<O, I> pipelineConfigurator) {
    RxRequiredConfigurator<O, I> requiredConfigurator = new RxRequiredConfigurator<O, I>(clientConnectionHandler);
    PipelineConfiguratorComposite<I, O> toReturn;
    if (null != pipelineConfigurator) {
        toReturn = new PipelineConfiguratorComposite<I, O>(pipelineConfigurator, requiredConfigurator);
    } else {
        toReturn = new PipelineConfiguratorComposite<I, O>(requiredConfigurator);
    }
    return toReturn;
}
项目:RxNetty    文件:RemoteObservable.java   
private static <T> Observable<T> createTcpConnectionToServer(String host, int port, final Decoder<T> decoder, 
            final RemoteUnsubscribe remoteUnsubscribe){
        return RxNetty.createTcpClient(host, port, new PipelineConfiguratorComposite<RemoteRxEvent, RemoteRxEvent>(
                new PipelineConfigurator<RemoteRxEvent, RemoteRxEvent>(){
                    @Override
                    public void configureNewPipeline(ChannelPipeline pipeline) {
//                      pipeline.addFirst(new LoggingHandler(LogLevel.ERROR)); // uncomment to enable debug logging             
                        pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // 4 bytes to encode length
                        pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(524288, 0, 4, 0, 4)); // max frame = half MB
                    }
                }, new RxEventPipelineConfigurator()))
            .connect().flatMap(new Func1<ObservableConnection<RemoteRxEvent, RemoteRxEvent>, Observable<RemoteRxEvent>>(){
            @Override
            public Observable<RemoteRxEvent> call(final ObservableConnection<RemoteRxEvent, RemoteRxEvent> connection) {
                connection.writeAndFlush(RemoteRxEvent.subscribed()); // send subscribe event to server
                remoteUnsubscribe.setConnection(connection);
                return connection.getInput();
            }
        })
        // data received form server
        .map(new Func1<RemoteRxEvent,Notification<T>>(){
            @Override
            public Notification<T> call(RemoteRxEvent rxEvent) {
                if (rxEvent.getType() == RemoteRxEvent.Type.next){
                    return Notification.createOnNext(decoder.decode(rxEvent.getData()));
                }else if (rxEvent.getType() == RemoteRxEvent.Type.error){
                    return Notification.createOnError(fromBytesToThrowable(rxEvent.getData()));
                }else if (rxEvent.getType() == RemoteRxEvent.Type.completed){
                    return Notification.createOnCompleted();
                }else{
                    throw new RuntimeException("RemoteRxEvent of type:"+rxEvent.getType()+", not supported.");
                }
            }
        })
        .<T>dematerialize();
    }
项目:RxNetty    文件:HttpServer.java   
private static <I, O> PipelineConfigurator<HttpServerRequest<I>, HttpServerResponse<O>> addRequiredConfigurator(
        PipelineConfigurator<HttpServerRequest<I>, HttpServerResponse<O>> pipelineConfigurator) {
    return new PipelineConfiguratorComposite<HttpServerRequest<I>, HttpServerResponse<O>>(pipelineConfigurator,
                                                                              new ServerRequiredConfigurator<I, O>());
}