@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { final ChannelConfig config = ctx.channel().config(); if (config.isAutoRead()) { // stop accept new connections for 1 second to allow the channel to recover // See https://github.com/netty/netty/issues/1328 config.setAutoRead(false); ctx.channel().eventLoop().schedule(new Runnable() { @Override public void run() { config.setAutoRead(true); } }, 1, TimeUnit.SECONDS); } // still let the exceptionCaught event flow through the pipeline to give the user // a chance to do something with it ctx.fireExceptionCaught(cause); }
@SuppressWarnings("unchecked") @Test public void resume() throws Exception { new MockUnit(ChannelHandlerContext.class, WebSocketServerHandshaker.class, Consumer.class) .expect(unit -> { ChannelConfig chconf = unit.mock(ChannelConfig.class); expect(chconf.isAutoRead()).andReturn(false); expect(chconf.setAutoRead(true)).andReturn(chconf); Channel ch = unit.mock(Channel.class); expect(ch.config()).andReturn(chconf); ChannelHandlerContext ctx = unit.get(ChannelHandlerContext.class); expect(ctx.channel()).andReturn(ch); }) .run(unit -> { new NettyWebSocket( unit.get(ChannelHandlerContext.class), unit.get(WebSocketServerHandshaker.class), unit.get(Consumer.class)).resume(); }); }
@SuppressWarnings("unchecked") @Test public void resumeIgnored() throws Exception { new MockUnit(ChannelHandlerContext.class, WebSocketServerHandshaker.class, Consumer.class) .expect(unit -> { ChannelConfig chconf = unit.mock(ChannelConfig.class); expect(chconf.isAutoRead()).andReturn(true); Channel ch = unit.mock(Channel.class); expect(ch.config()).andReturn(chconf); ChannelHandlerContext ctx = unit.get(ChannelHandlerContext.class); expect(ctx.channel()).andReturn(ch); }) .run(unit -> { new NettyWebSocket( unit.get(ChannelHandlerContext.class), unit.get(WebSocketServerHandshaker.class), unit.get(Consumer.class)).resume(); }); }
@SuppressWarnings("unchecked") @Test public void pause() throws Exception { new MockUnit(ChannelHandlerContext.class, WebSocketServerHandshaker.class, Consumer.class) .expect(unit -> { ChannelConfig chconf = unit.mock(ChannelConfig.class); expect(chconf.isAutoRead()).andReturn(true); expect(chconf.setAutoRead(false)).andReturn(chconf); Channel ch = unit.mock(Channel.class); expect(ch.config()).andReturn(chconf); ChannelHandlerContext ctx = unit.get(ChannelHandlerContext.class); expect(ctx.channel()).andReturn(ch); }) .run(unit -> { new NettyWebSocket( unit.get(ChannelHandlerContext.class), unit.get(WebSocketServerHandshaker.class), unit.get(Consumer.class)).pause(); }); }
@SuppressWarnings("unchecked") @Test public void pauseIgnored() throws Exception { new MockUnit(ChannelHandlerContext.class, WebSocketServerHandshaker.class, Consumer.class) .expect(unit -> { ChannelConfig chconf = unit.mock(ChannelConfig.class); expect(chconf.isAutoRead()).andReturn(false); Channel ch = unit.mock(Channel.class); expect(ch.config()).andReturn(chconf); ChannelHandlerContext ctx = unit.get(ChannelHandlerContext.class); expect(ctx.channel()).andReturn(ch); }) .run(unit -> { new NettyWebSocket( unit.get(ChannelHandlerContext.class), unit.get(WebSocketServerHandshaker.class), unit.get(Consumer.class)).pause(); }); }
@Test public void setSoLingerChannelOption() throws IOException { startServer(); Map<ChannelOption<?>, Object> channelOptions = new HashMap<ChannelOption<?>, Object>(); // set SO_LINGER option int soLinger = 123; channelOptions.put(ChannelOption.SO_LINGER, soLinger); NettyClientTransport transport = new NettyClientTransport( address, NioSocketChannel.class, channelOptions, group, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */, tooManyPingsRunnable, new TransportTracer()); transports.add(transport); callMeMaybe(transport.start(clientTransportListener)); // verify SO_LINGER has been set ChannelConfig config = transport.channel().config(); assertTrue(config instanceof SocketChannelConfig); assertEquals(soLinger, ((SocketChannelConfig) config).getSoLinger()); }
private void setBufferSizeIfConfigIsSocketChannelConfig( ChannelConfig config, long contentLength) { if (config instanceof SocketChannelConfig) { int sendBufferSize = contentLength < m_maxSendBufferSize ? (int) contentLength : m_maxSendBufferSize; ((SocketChannelConfig) config).setSendBufferSize(sendBufferSize); } }
@Override public void run() { ChannelConfig config = ctx.channel().config(); if (!config.isAutoRead() && isHandlerActive(ctx)) { // If AutoRead is False and Active is True, user make a direct setAutoRead(false) // Then Just reset the status if (logger.isDebugEnabled()) { logger.debug("Not unsuspend: " + config.isAutoRead() + ':' + isHandlerActive(ctx)); } ctx.attr(READ_SUSPENDED).set(false); } else { // Anything else allows the handler to reset the AutoRead if (logger.isDebugEnabled()) { if (config.isAutoRead() && !isHandlerActive(ctx)) { logger.debug("Unsuspend: " + config.isAutoRead() + ':' + isHandlerActive(ctx)); } else { logger.debug("Normal unsuspend: " + config.isAutoRead() + ':' + isHandlerActive(ctx)); } } ctx.attr(READ_SUSPENDED).set(false); config.setAutoRead(true); ctx.channel().read(); } if (logger.isDebugEnabled()) { logger.debug("Unsupsend final status => " + config.isAutoRead() + ':' + isHandlerActive(ctx)); } }
private static void runLineBasedFrameDecoder() { TcpServer<String, String> transport = Netty4TcpServer.<String, String>create( 0, new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { int bufferSize = 1; ChannelConfig config = channel.config(); config.setOption(ChannelOption.SO_RCVBUF, bufferSize); config.setOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(bufferSize)); channel.pipeline().addFirst( new LineBasedFrameDecoder(256), new StringDecoder(CharsetUtil.UTF_8), new StringEncoder(CharsetUtil.UTF_8)); } }); ReactorTcpServer.create(transport).start(connection -> { connection.log("input") .observeComplete(v -> LOG.info("Connection input complete")) .capacity(1) .consume(line -> { String response = "Hello " + line + "\n"; Streams.wrap(connection.writeWith(Streams.just(response))).consume(); }); return Streams.never(); }); }
private <T extends Channel> T configure(T channel) { ChannelConfig channelConfig = channel.config(); if (connectTimeout != null) { channelConfig.setConnectTimeoutMillis(connectTimeout); } return channel; }
private <T extends Channel> T configure(T channel) { ChannelConfig channelConfig = channel.config(); if (connectTimeout != null) { channelConfig.setConnectTimeoutMillis(connectTimeout); } // START of new readTimeout code if (readTimeout != null) { channel.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler()); } // END of new readTimeout code return channel; }
@Override public void resume() { ChannelConfig config = ctx.channel().config(); if (!config.isAutoRead()) { config.setAutoRead(true); } }
@Override public void pause() { ChannelConfig config = ctx.channel().config(); if (config.isAutoRead()) { config.setAutoRead(false); } }
@Override public ChannelConfig config() { return null; }
@Override public ChannelConfig config() { return config; }
@Override public ChannelConfig config() { // TODO Auto-generated method stub return null; }
@Override protected void doRead() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); boolean closed = false; final int maxMessagesPerRead = config.getMaxMessagesPerRead(); Throwable exception = null; int localRead = 0; int totalRead = 0; try { for (;;) { // Perform a read. localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } // Notify with the received messages and clear the buffer. int size = readBuf.size(); for (int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); // Do not read beyond maxMessagesPerRead. // Do not continue reading if autoRead has been turned off. totalRead += localRead; if (totalRead >= maxMessagesPerRead || !config.isAutoRead()) { break; } } } catch (Throwable t) { exception = t; } pipeline.fireChannelReadComplete(); if (exception != null) { if (exception instanceof IOException) { closed = true; } pipeline().fireExceptionCaught(exception); } if (closed) { if (isOpen()) { unsafe().close(unsafe().voidPromise()); } } else if (localRead == 0 && isActive()) { // If the read amount was 0 and the channel is still active we need to trigger a new read() // as otherwise we will never try to read again and the user will never know. // Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are // able to process the rest of the tasks in the queue first. // // See https://github.com/netty/netty/issues/2404 read(); } }
@Override public void read() { //得到配置信息 final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final int maxMessagesPerRead = config.getMaxMessagesPerRead(); RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); } if (!config.isAutoRead()) { removeReadOp(); } ByteBuf byteBuf = null; int messages = 0; boolean close = false; try { int byteBufCapacity = allocHandle.guess(); int totalReadAmount = 0; do { byteBuf = allocator.ioBuffer(byteBufCapacity); int writable = byteBuf.writableBytes(); int localReadAmount = doReadBytes(byteBuf); if (localReadAmount <= 0) { // not was read release the buffer byteBuf.release(); close = localReadAmount < 0; break; } //触发fireChannelRead事件 pipeline.fireChannelRead(byteBuf); byteBuf = null; if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { // Avoid overflow. totalReadAmount = Integer.MAX_VALUE; break; } totalReadAmount += localReadAmount; if (localReadAmount < writable) { // Read less than what the buffer can hold, // which might mean we drained the recv buffer completely. break; } } while (++ messages < maxMessagesPerRead); pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } }
PeerRegistryListenerImpl(final ChannelConfig channelConfig) { this.channelConfig = channelConfig; this.keys = KeyMapping.getKeyMapping(); }
@Override public void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final int maxMessagesPerRead = config.getMaxMessagesPerRead(); RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); } if (!config.isAutoRead()) { removeReadOp(); } ByteBuf byteBuf = null; int messages = 0; boolean close = false; try { int byteBufCapacity = allocHandle.guess(); int totalReadAmount = 0; do { byteBuf = allocator.ioBuffer(byteBufCapacity); int writable = byteBuf.writableBytes(); int localReadAmount = doReadBytes(byteBuf); if (localReadAmount <= 0) { // not was read release the buffer byteBuf.release(); close = localReadAmount < 0; break; } pipeline.fireChannelRead(byteBuf); byteBuf = null; if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { // Avoid overflow. totalReadAmount = Integer.MAX_VALUE; break; } totalReadAmount += localReadAmount; if (localReadAmount < writable) { // Read less than what the buffer can hold, // which might mean we drained the recv buffer completely. break; } } while (++ messages < maxMessagesPerRead); pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } }
@Override public void handleEvent(ConduitStreamSourceChannel channel) { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final int maxMessagesPerRead = config.getMaxMessagesPerRead(); RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); } ByteBuf byteBuf = null; int messages = 0; boolean close = false; try { int byteBufCapacity = allocHandle.guess(); int totalReadAmount = 0; do { byteBuf = allocator.ioBuffer(byteBufCapacity); int writable = byteBuf.writableBytes(); int localReadAmount = byteBuf.writeBytes(channel, byteBuf.writableBytes()); if (localReadAmount <= 0) { // not was read release the buffer byteBuf.release(); close = localReadAmount < 0; break; } ((AbstractXnioUnsafe) unsafe()).readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { // Avoid overflow. totalReadAmount = Integer.MAX_VALUE; break; } totalReadAmount += localReadAmount; // stop reading if (!config.isAutoRead()) { break; } if (localReadAmount < writable) { // Read less than what the buffer can hold, // which might mean we drained the recv buffer completely. break; } } while (++ messages < maxMessagesPerRead); pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); if (close) { closeOnRead(); close = false; } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!config.isAutoRead() && !((AbstractXnioUnsafe) unsafe()).readPending) { removeReadOp(channel); } } }