尽管我的业务逻辑没有问题,但事实证明我没有使用Netty ByteBuf。更新要使用的测试代码后ByteBuf,我遇到了IllegalReferenceCountException的无尽循环。我承认对Netty还是陌生的,但这并不能证明在手动分配和释放资源的日子里回来。创建GC就是为了避免这种混乱。迪斯科,有人吗?那贝尔底呢?
ByteBuf
public class StringDecoder extends AbstractDecoder<String> { private static final IntPredicate NEWLINE_DELIMITER = b -> b == '\n' || b == '\r'; @Override public Flux<String> decode(Publisher<DataBuffer> publisher, ResolvableType elementType, MimeType mimeType, Map<String, Object> hints) { return Flux.from(publisher) .scan(Tuples.<Flux<DataBuffer>, Optional<DataBuffer>>of(Flux.empty(), Optional.empty()), (acc, buffer) -> { List<DataBuffer> results = new ArrayList<>(); int startIdx = 0, endIdx = 0, limit = buffer.readableByteCount(); Optional<DataBuffer> incomplete = acc.getT2(); while (startIdx < limit && endIdx != -1) { endIdx = buffer.indexOf(NEWLINE_DELIMITER, startIdx); int length = (endIdx == -1 ? limit : endIdx) - startIdx; DataBuffer slice = buffer.slice(startIdx, length); DataBuffer tmp = incomplete.map(b -> b.write(slice)) .orElse(buffer.factory().allocateBuffer(length).write(slice)); tmp = DataBufferUtils.retain(tmp); if (endIdx != -1) { startIdx = endIdx + 1; results.add(tmp); incomplete = Optional.empty(); } else { incomplete = Optional.of(tmp); } } releaseBuffer(buffer); return Tuples.of(Flux.fromIterable(results), incomplete); }) .flatMap(t -> { t.getT2().ifPresent(this::releaseBuffer); return t.getT1(); }) .map(buffer -> { // charset resolution should in general use supplied mimeType String s = UTF_8.decode(buffer.asByteBuffer()).toString(); releaseBuffer(buffer); return s; }) .log(); } private void releaseBuffer(DataBuffer buffer) { boolean release = DataBufferUtils.release(buffer); if (release) { System.out.println("Buffer was released."); } } } public class StringDecoderTest { private StringDecoder stringDecoder = new StringDecoder(); DataBufferFactory dataBufferFactory = new NettyDataBufferFactory(UnpooledByteBufAllocator.DEFAULT); @Test public void testDecode() { Flux<DataBuffer> pub = Flux.just("abc\n", "abc", "def\n", "abc", "def\nxyz\n", "abc", "def", "xyz\n") .map(s -> dataBufferFactory.wrap(s.getBytes(UTF_8))); StepVerifier.create(stringDecoder.decode(pub, null, null, null)) .expectNext("abc", "abcdef", "abcdef", "xyz", "abcdefxyz") .verifyComplete(); } }
我不断得到:
[ERROR] (main) onError(io.netty.util.IllegalReferenceCountException: refCnt: 0) [ERROR] (main) - io.netty.util.IllegalReferenceCountException: refCnt: 0 io.netty.util.IllegalReferenceCountException: refCnt: 0 at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1415) at io.netty.buffer.UnpooledHeapByteBuf.nioBuffer(UnpooledHeapByteBuf.java:314) at io.netty.buffer.AbstractUnpooledSlicedByteBuf.nioBuffer(AbstractUnpooledSlicedByteBuf.java:434) at io.netty.buffer.CompositeByteBuf.nioBuffers(CompositeByteBuf.java:1496) at io.netty.buffer.CompositeByteBuf.nioBuffer(CompositeByteBuf.java:1468) at io.netty.buffer.AbstractByteBuf.nioBuffer(AbstractByteBuf.java:1205) at org.springframework.core.io.buffer.NettyDataBuffer.asByteBuffer(NettyDataBuffer.java:234) at org.abhijitsarkar.java.StringDecoder.lambda$decode$4(StringDecoder.java:61)
工作代码:
public class StringDecoder extends AbstractDecoder<String> { private static final IntPredicate NEWLINE_DELIMITER = b -> b == '\n' || b == '\r'; @Override public Flux<String> decode(Publisher<DataBuffer> publisher, ResolvableType elementType, MimeType mimeType, Map<String, Object> hints) { DataBuffer incomplete = new NettyDataBufferFactory(UnpooledByteBufAllocator.DEFAULT).allocateBuffer(0); return Flux.from(publisher) .scan(Tuples.<Flux<DataBuffer>, DataBuffer>of(Flux.empty(), retain(incomplete)), (acc, buffer) -> { List<DataBuffer> results = new ArrayList<>(); int startIdx = 0, endIdx = 0, limit = buffer.readableByteCount(); while (startIdx < limit && endIdx != -1) { endIdx = buffer.indexOf(NEWLINE_DELIMITER, startIdx); int length = (endIdx == -1 ? limit : endIdx) - startIdx; DataBuffer slice = buffer.slice(startIdx, length); byte[] slice1 = new byte[length]; slice.read(slice1, 0, slice1.length); if (endIdx != -1) { byte[] slice2 = new byte[incomplete.readableByteCount()]; incomplete.read(slice2, 0, slice2.length); // call retain to match release during decoding to string later results.add(retain( incomplete.factory().allocateBuffer() .write(slice2) .write(slice1) )); startIdx = endIdx + 1; } else { incomplete.write(slice1); } } return Tuples.of(Flux.fromIterable(results), incomplete); }) .flatMap(Tuple2::getT1) .map(buffer -> { // charset resolution should in general use supplied mimeType String s = UTF_8.decode(buffer.asByteBuffer()).toString(); return s; }) .doOnTerminate(() -> release(incomplete)) .log(); } }
该代码可能更简洁一些,但是适用于Spring bug SPR-16351。