@Override public void onOpen(Session session, EndpointConfig config) { String id = session.getId(); log.debug("{}: open ws proxy ", id); try { ChannelFuture cf = backend.connect().sync(); Channel channel = cf.channel(); WebSocketClientProtocolHandler wscph = makeWsProtocolHandler(session); WebSocketClientHandshaker handshaker = wscph.handshaker(); WsHandler handler = new WsHandler(handshaker, channel, session); channel.pipeline().addLast(new HttpObjectAggregator(1024 * 4), WebSocketClientCompressionHandler.INSTANCE, wscph, handler); handshaker.handshake(channel); log.debug("{}: wait messages", id); session.addMessageHandler(String.class, handler::onFrontString); session.addMessageHandler(ByteBuffer.class, handler::onFrontBytes); } catch (Exception e) { log.error("{}: can not establish ws connect with backed", id, e); } }
@Override protected void initChannel(final Channel channel) throws Exception { final ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("Bridge|SSLContext", this.webSocketConnection.getSslContext().newHandler(channel.alloc(), this.webSocketConnection.getIp(), this.webSocketConnection.getPort())); pipeline.addLast("Bridge|HttpClientCodec", new HttpClientCodec()); pipeline.addLast("Bridge|HttpObjectAggregator", new HttpObjectAggregator(8192)); pipeline.addLast("Bridge|WebSocketClientCompressionHandler", WebSocketClientCompressionHandler.INSTANCE); pipeline.addLast(new WebSocketHandler(this.webSocketConnection)); }
/** * @return true if the handshake is done properly. * @throws URISyntaxException throws if there is an error in the URI syntax. * @throws InterruptedException throws if the connecting the server is interrupted. */ public boolean handhshake() throws InterruptedException, URISyntaxException, SSLException, ProtocolException { boolean isSuccess; URI uri = new URI(url); String scheme = uri.getScheme() == null ? "ws" : uri.getScheme(); final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost(); final int port; if (uri.getPort() == -1) { if ("ws".equalsIgnoreCase(scheme)) { port = 80; } else if ("wss".equalsIgnoreCase(scheme)) { port = 443; } else { port = -1; } } else { port = uri.getPort(); } if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) { logger.error("Only WS(S) is supported."); return false; } final boolean ssl = "wss".equalsIgnoreCase(scheme); final SslContext sslCtx; if (ssl) { sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } group = new NioEventLoopGroup(); HttpHeaders headers = new DefaultHttpHeaders(); for (Map.Entry<String, String> entry : customHeaders.entrySet()) { headers.add(entry.getKey(), entry.getValue()); } // Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00. // If you change it to V00, ping is not supported and remember to change // HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline. handler = new WebSocketClientHandler( WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, subProtocol, true, headers), latch); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), host, port)); } p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), WebSocketClientCompressionHandler.INSTANCE, handler); } }); channel = bootstrap.connect(uri.getHost(), port).sync().channel(); isSuccess = handler.handshakeFuture().sync().isSuccess(); logger.info("WebSocket Handshake successful : " + isSuccess); return isSuccess; }
@Override public void connect() throws InterruptedException{ // Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00. // If you change it to V00, ping is not supported and remember to change // HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline. handler = new WebSocketClientHandler( WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders(),this.getMaxPayload())); //make sure the handler has a refernce to this object. handler.setClient(this); Bootstrap clientBoot = new Bootstrap(); clientBoot.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); SSLEngine sslEngine=null; if(AbstractClient.this.isEncrypted()){ if(sslContext == null){ sslEngine = new SSLFactory().createClientSslCtx(Config.getInstance()).newEngine(ch.alloc(), uri.getHost(),uri.getPort()); }else{ sslEngine = sslContext.newEngine(ch.alloc(),uri.getHost(),uri.getPort()); } sslEngine.setEnabledProtocols(Const.TLS_PROTOCOLS); sslEngine.setUseClientMode(true); p.addLast(new SslHandler(sslEngine)); } p.addLast( new HttpClientCodec()); p.addLast(new HttpObjectAggregator(8192)); if(AbstractClient.this.isCompress()){ p.addLast(WebSocketClientCompressionHandler.INSTANCE); } p.addLast(handler); } }); this.ch = clientBoot.connect(uri.getHost(), uri.getPort()).sync().channel(); handler.handshakeFuture().sync(); }
/** * @return true if the handshake is done properly. * @throws URISyntaxException throws if there is an error in the URI syntax. * @throws InterruptedException throws if the connecting the server is interrupted. */ public boolean handhshake() throws InterruptedException, URISyntaxException, SSLException { boolean isDone; URI uri = new URI(url); String scheme = uri.getScheme() == null ? "ws" : uri.getScheme(); final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost(); final int port; if (uri.getPort() == -1) { if ("ws".equalsIgnoreCase(scheme)) { port = 80; } else if ("wss".equalsIgnoreCase(scheme)) { port = 443; } else { port = -1; } } else { port = uri.getPort(); } if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) { logger.error("Only WS(S) is supported."); return false; } final boolean ssl = "wss".equalsIgnoreCase(scheme); final SslContext sslCtx; if (ssl) { sslCtx = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } group = new NioEventLoopGroup(); HttpHeaders headers = new DefaultHttpHeaders(); customHeaders.entrySet().forEach( header -> headers.add(header.getKey(), header.getValue()) ); try { // Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00. // If you change it to V00, ping is not supported and remember to change // HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline. handler = new WebSocketClientHandler( WebSocketClientHandshakerFactory.newHandshaker( uri, WebSocketVersion.V13, subProtocol, true, headers)); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), host, port)); } p.addLast( new HttpClientCodec(), new HttpObjectAggregator(8192), WebSocketClientCompressionHandler.INSTANCE, handler); } }); channel = b.connect(uri.getHost(), port).sync().channel(); isDone = handler.handshakeFuture().sync().isSuccess(); logger.debug("WebSocket Handshake successful : " + isDone); return isDone; } catch (Exception e) { logger.error("Handshake unsuccessful : " + e.getMessage(), e); return false; } }
/** * @return true if the handshake is done properly. * @throws URISyntaxException throws if there is an error in the URI syntax. * @throws InterruptedException throws if the connecting the server is interrupted. */ public boolean handhshake() throws InterruptedException, URISyntaxException, SSLException, ProtocolException { boolean isSuccess; URI uri = new URI(url); String scheme = uri.getScheme() == null ? "ws" : uri.getScheme(); final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost(); final int port; if (uri.getPort() == -1) { if ("ws".equalsIgnoreCase(scheme)) { port = 80; } else if ("wss".equalsIgnoreCase(scheme)) { port = 443; } else { port = -1; } } else { port = uri.getPort(); } if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) { logger.error("Only WS(S) is supported."); return false; } final boolean ssl = "wss".equalsIgnoreCase(scheme); final SslContext sslCtx; if (ssl) { sslCtx = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } group = new NioEventLoopGroup(); HttpHeaders headers = new DefaultHttpHeaders(); customHeaders.entrySet().forEach( header -> headers.add(header.getKey(), header.getValue()) ); try { // Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00. // If you change it to V00, ping is not supported and remember to change // HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline. handler = new WebSocketClientHandler( WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, subProtocol, true, headers), latch); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), host, port)); } p.addLast( new HttpClientCodec(), new HttpObjectAggregator(8192), WebSocketClientCompressionHandler.INSTANCE, handler); } }); channel = b.connect(uri.getHost(), port).sync().channel(); isSuccess = handler.handshakeFuture().sync().isSuccess(); logger.debug("WebSocket Handshake successful : " + isSuccess); return isSuccess; } catch (Exception e) { logger.error("Handshake unsuccessful : " + e.getMessage()); throw new ProtocolException("Protocol exception: " + e.getMessage()); } }