@Test public void releaseOnConsumption_ByteBuf() throws Exception { final ByteBuf buf = newPooledBuffer(); StreamMessage<ByteBuf> stream = newStream(ImmutableList.of(buf)); if (stream instanceof StreamWriter) { assertThat(((StreamWriter<ByteBuf>) stream).write(buf)).isTrue(); ((StreamWriter<?>) stream).close(); } assertThat(buf.refCnt()).isEqualTo(1); stream.subscribe(new Subscriber<ByteBuf>() { @Override public void onSubscribe(Subscription subscription) { subscription.request(1); } @Override public void onNext(ByteBuf o) { assertThat(o).isNotSameAs(buf); assertThat(o).isInstanceOf(UnpooledHeapByteBuf.class); assertThat(o.refCnt()).isEqualTo(1); assertThat(buf.refCnt()).isZero(); } @Override public void onError(Throwable throwable) { Exceptions.throwUnsafely(throwable); } @Override public void onComplete() { completed = true; } }); await().untilAsserted(() -> assertThat(completed).isTrue()); }
@Test public void releaseOnConsumption_HttpData() throws Exception { final ByteBufHttpData data = new ByteBufHttpData(newPooledBuffer(), false); StreamMessage<ByteBufHolder> stream = newStream(ImmutableList.of(data)); if (stream instanceof StreamWriter) { assertThat(((StreamWriter<ByteBufHolder>) stream).write(data)).isTrue(); ((StreamWriter<?>) stream).close(); } assertThat(data.refCnt()).isEqualTo(1); stream.subscribe(new Subscriber<ByteBufHolder>() { @Override public void onSubscribe(Subscription subscription) { subscription.request(1); } @Override public void onNext(ByteBufHolder o) { assertThat(o).isNotSameAs(data); assertThat(o).isInstanceOf(ByteBufHttpData.class); assertThat(o.content()).isInstanceOf(UnpooledHeapByteBuf.class); assertThat(o.refCnt()).isEqualTo(1); assertThat(data.refCnt()).isZero(); } @Override public void onError(Throwable throwable) { Exceptions.throwUnsafely(throwable); } @Override public void onComplete() { completed = true; } }); await().untilAsserted(() -> assertThat(completed).isTrue()); }