private String format(ChannelHandlerContext ctx, String event, Object obj) { StringBuilder sb = new StringBuilder(ctx.channel().toString()).append(" ").append(event); if (obj instanceof ByteBuf) { ByteBuf buf = (ByteBuf) obj; sb.append(" ").append(buf.readableBytes()).append(" bytes\n").append(ByteBufUtil.prettyHexDump(buf)); } else if (obj instanceof ByteBufHolder) { ByteBufHolder holder = (ByteBufHolder) obj; sb.append(" ") .append(holder.content().readableBytes()) .append(" bytes\n") .append(String.valueOf(obj)) .append("\n") .append(ByteBufUtil.prettyHexDump(holder.content())); } else { sb.append("\n").append(String.valueOf(obj)); } return sb.toString(); }
private void onChannel(Object object, boolean incoming) { ByteBuf bytes = null; if (object instanceof ByteBuf) { bytes = ((ByteBuf) object); } else if (object instanceof ByteBufHolder) { bytes = ((ByteBufHolder) object).content(); } if (bytes != null) { int readableBytes = bytes.readableBytes(); if (incoming) { incomingBytes.getAndAdd(readableBytes); } else { outgoingBytes.getAndAdd(readableBytes); } } }
/** * Adds the given data to this deframer and attempts delivery to the listener. * * @param data the raw data read from the remote endpoint. Must be non-null. * @param endOfStream if {@code true}, indicates that {@code data} is the end of the stream from * the remote endpoint. End of stream should not be used in the event of a transport * error, such as a stream reset. * @throws IllegalStateException if {@link #close()} has been called previously or if * this method has previously been called with {@code endOfStream=true}. */ public void deframe(HttpData data, boolean endOfStream) { requireNonNull(data, "data"); checkNotClosed(); checkState(!this.endOfStream, "Past end of stream"); startedDeframing = true; if (!data.isEmpty()) { final ByteBuf buf; if (data instanceof ByteBufHolder) { buf = ((ByteBufHolder) data).content(); } else { buf = alloc.buffer(data.length()); buf.writeBytes(data.array(), data.offset(), data.length()); } unprocessed.addComponent(true, buf); } // Indicate that all of the data for this stream has been received. this.endOfStream = endOfStream; deliver(); }
/** * Create a {@link NettyServerRequestAdapter} when a {@link HttpRequest} is received, and use * @ {@link UnicastContentSubject} to send the content as a request stream. */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Class<?> messageClass = msg.getClass(); if (HttpRequest.class.isAssignableFrom(messageClass)) { this.request = new NettyServerRequestAdapter((HttpRequest) msg, this.requestContent); super.channelRead(ctx, this.request); } else if (HttpContent.class.isAssignableFrom(messageClass)) { Assert.notNull(this.request); ByteBuf content = ((ByteBufHolder) msg).content(); ByteBuffer nioBuffer = content.nioBuffer(); this.requestContent.onNext(nioBuffer); if (LastHttpContent.class.isAssignableFrom(messageClass)) { this.requestContent.onCompleted(); } // FIXME I need to make it works without that ... super.channelRead(ctx, this.request); } else { super.channelRead(ctx, msg); } }
@Test public void shouldLogByteBufHolderDataRead() throws Exception { ByteBufHolder msg = new DefaultByteBufHolder(Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8)) { @Override public String toString() { return "foobar"; } }; appender.doAppend(matchesLog(".+RECEIVED: foobar, 5B$")); replay(appender); EmbeddedChannel channel = new EmbeddedChannel(new LoggingHandler()); channel.writeInbound(msg); verify(appender); ByteBufHolder handledMsg = channel.readInbound(); assertThat(msg, is(sameInstance(handledMsg))); handledMsg.release(); assertThat(channel.readInbound(), is(nullValue())); }
@Override public void startAsync(final Executor executor, final Runnable runnable) { Channel channel = ctx.channel(); channel.attr(NEED_FLUSH).set(false); channel.attr(ASYNC).set(true); ReferenceCounted body = ((ByteBufHolder) req).content(); body.retain(); executor.execute(() -> { try { runnable.run(); } finally { body.release(); } }); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Class<?> recievedMsgClass = msg.getClass(); if (io.netty.handler.codec.http.HttpResponse.class.isAssignableFrom(recievedMsgClass)) { @SuppressWarnings({"rawtypes", "unchecked"}) HttpClientResponse rxResponse = new HttpClientResponse((io.netty.handler.codec.http.HttpResponse)msg, contentSubject); super.channelRead(ctx, rxResponse); // For FullHttpResponse, this assumes that after this call returns, // someone has subscribed to the content observable, if not the content will be lost. } if (HttpContent.class.isAssignableFrom(recievedMsgClass)) {// This will be executed if the incoming message is a FullHttpResponse or only HttpContent. ByteBuf content = ((ByteBufHolder) msg).content(); if (content.isReadable()) { invokeContentOnNext(content); } if (LastHttpContent.class.isAssignableFrom(recievedMsgClass)) { if (null != requestProcessingObserver) { requestProcessingObserver.onCompleted(); } contentSubject.onCompleted(); } } else if(!io.netty.handler.codec.http.HttpResponse.class.isAssignableFrom(recievedMsgClass)){ invokeContentOnNext(msg); } }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Class<?> recievedMsgClass = msg.getClass(); if (io.netty.handler.codec.http.HttpRequest.class.isAssignableFrom(recievedMsgClass)) { @SuppressWarnings({"rawtypes", "unchecked"}) HttpServerRequest rxRequest = new HttpServerRequest((io.netty.handler.codec.http.HttpRequest) msg, contentSubject); keepAlive = rxRequest.getHeaders().isKeepAlive(); super.channelRead(ctx, rxRequest); // For FullHttpRequest, this assumes that after this call returns, // someone has subscribed to the content observable, if not the content will be lost. } if (HttpContent.class.isAssignableFrom(recievedMsgClass)) {// This will be executed if the incoming message is a FullHttpRequest or only HttpContent. ByteBuf content = ((ByteBufHolder) msg).content(); invokeContentOnNext(content); if (LastHttpContent.class.isAssignableFrom(recievedMsgClass)) { contentSubject.onCompleted(); } } else { invokeContentOnNext(msg); } }
private void emmit(FluxSink<Message> emitter, String roomId) { HttpClient .create() .get("https://stream.gitter.im/v1/rooms/" + roomId + "/chatMessages", (r) -> r.addHeader("Authorization", "Bearer 3cd4820adf59b6a7116f99d92f68a1b786895ce7")) .flatMapMany(HttpClientResponse::receiveContent) .map(ByteBufHolder::content) .filter(bb -> bb.capacity() > 2) .map(MessageEncoder::mapToMessage) .doOnNext(m -> System.out.println("Log Emit: " + m)) .subscribe(emitter::next, emitter::error, emitter::complete); }
@Override final public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg == null || msg == Unpooled.EMPTY_BUFFER || msg instanceof EmptyByteBuf) { return; } try { ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel()); if (ops != null) { ops.onInboundNext(ctx, msg); } else { if (log.isDebugEnabled()) { String loggingMsg = msg.toString(); if (msg instanceof HttpResponse) { DecoderResult decoderResult = ((HttpResponse) msg).decoderResult(); if (decoderResult.isFailure()) { log.debug("Decoding failed: " + msg + " : ", decoderResult.cause()); } } if (msg instanceof ByteBufHolder) { loggingMsg = ((ByteBufHolder) msg).content() .toString(Charset.defaultCharset()); } log.debug("{} No ChannelOperation attached. Dropping: {}", ctx .channel().toString(), loggingMsg); } ReferenceCountUtil.release(msg); } } catch (Throwable err) { Exceptions.throwIfFatal(err); exceptionCaught(ctx, err); ReferenceCountUtil.safeRelease(msg); } }
@Override public int readByte(ByteBufHolder chunk) { ByteBuf buf = chunk.content(); if (buf.readableBytes() == 0) { return ChunkedInputStream.EOF; } return buf.readByte(); }
@Override public int readBytes(ByteBufHolder chunk, byte[] arr, int off, int len) { ByteBuf buf = chunk.content(); int avail = buf.readableBytes(); if (avail == 0) { return ChunkedInputStream.EOF; } int readed = Math.min(len, avail); buf.readBytes(arr, off, readed); return readed; }
/** * Calculate the size of the given {@link Object}. * * This implementation supports {@link ByteBuf} and {@link ByteBufHolder}. Sub-classes may override this. * * @param msg * the msg for which the size should be calculated. * @return size the size of the msg or {@code -1} if unknown. */ protected long calculateSize(Object msg) { if (msg instanceof ByteBuf) { return ((ByteBuf) msg).readableBytes(); } if (msg instanceof ByteBufHolder) { return ((ByteBufHolder) msg).content().readableBytes(); } return -1; }
private static int amount(Object msg) { if (msg instanceof ByteBuf) { return ((ByteBuf) msg).readableBytes(); } if (msg instanceof ByteBufHolder) { return ((ByteBufHolder) msg).content().readableBytes(); } return 1; }
protected String formatMessage(String eventName, Object msg) { if (msg instanceof ByteBuf) { return formatByteBuf(eventName, (ByteBuf) msg); } else if (msg instanceof ByteBufHolder) { return formatByteBufHolder(eventName, (ByteBufHolder) msg); } else { return formatNonByteBuf(eventName, msg); } }
@Override public int size(Object msg) { if (msg instanceof ByteBuf) { return ((ByteBuf) msg).readableBytes(); } if (msg instanceof ByteBufHolder) { return ((ByteBufHolder) msg).content().readableBytes(); } if (msg instanceof FileRegion) { return 0; } return unknownSize; }
private static Object safeDuplicate(Object message) { if (message instanceof ByteBuf) { return ((ByteBuf) message).duplicate().retain(); } else if (message instanceof ByteBufHolder) { return ((ByteBufHolder) message).duplicate().retain(); } else { return ReferenceCountUtil.retain(message); } }
private static long total(Object msg) { if (msg instanceof ByteBuf) { return ((ByteBuf) msg).readableBytes(); } if (msg instanceof FileRegion) { return ((FileRegion) msg).count(); } if (msg instanceof ByteBufHolder) { return ((ByteBufHolder) msg).content().readableBytes(); } return -1; }
@Override public HttpData decode(HttpData obj) { if (obj instanceof ByteBufHolder) { decoder.writeInbound(((ByteBufHolder) obj).content()); } else { final ByteBuf compressed = Unpooled.wrappedBuffer(obj.array(), obj.offset(), obj.length()); decoder.writeInbound(compressed); } return HttpData.of(fetchDecoderOutput()); }
@Nullable private Endpoint decodeSrvEndpoint(DnsRecord record) { if (!(record instanceof DnsRawRecord)) { return null; } final ByteBuf recordContent = ((ByteBufHolder) record).content(); recordContent.readShort(); // priority unused int weight = recordContent.readShort(); int port = recordContent.readUnsignedShort(); String target = DefaultDnsRecordDecoder.decodeName(recordContent); // Last character always a '.' target = target.substring(0, target.length() - 1); return Endpoint.of(target, port, weight); }
private static ByteBuf dataChunk(HttpData data, int offset, int chunkSize) { if (data instanceof ByteBufHolder) { ByteBuf buf = ((ByteBufHolder) data).content(); return buf.retainedSlice(offset, chunkSize); } else { return Unpooled.wrappedBuffer(data.array(), offset, chunkSize); } }
protected static ByteBuf toByteBuf(ChannelHandlerContext ctx, HttpData data) { if (data instanceof ByteBufHolder) { return ((ByteBufHolder) data).content(); } final ByteBuf buf = ctx.alloc().directBuffer(data.length(), data.length()); buf.writeBytes(data.array(), data.offset(), data.length()); return buf; }
/** * Converts the given object to an unpooled copy and releases the given object. */ public static <T> T toUnpooled(T o) { if (o instanceof ByteBufHolder) { o = copyAndRelease((ByteBufHolder) o); } else if (o instanceof ByteBuf) { o = copyAndRelease((ByteBuf) o); } return o; }
private static <T> T copyAndRelease(ByteBufHolder o) { try { final ByteBuf content = Unpooled.wrappedBuffer(ByteBufUtil.getBytes(o.content())); @SuppressWarnings("unchecked") final T copy = (T) o.replace(content); return copy; } finally { ReferenceCountUtil.safeRelease(o); } }
@Override public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exception { HttpResponse res = delegate().serve(ctx, req); HttpResponseWriter decorated = HttpResponse.streaming(); res.subscribe(new Subscriber<HttpObject>() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(HttpObject httpObject) { if (httpObject instanceof ByteBufHolder) { try { decorated.write(HttpData.of(((ByteBufHolder) httpObject).content())); } finally { ReferenceCountUtil.safeRelease(httpObject); } } else { decorated.write(httpObject); } } @Override public void onError(Throwable t) { decorated.close(t); } @Override public void onComplete() { decorated.close(); } }, true); return decorated; }
@Test public void releaseOnConsumption_HttpData() throws Exception { final ByteBufHttpData data = new ByteBufHttpData(newPooledBuffer(), false); StreamMessage<ByteBufHolder> stream = newStream(ImmutableList.of(data)); if (stream instanceof StreamWriter) { assertThat(((StreamWriter<ByteBufHolder>) stream).write(data)).isTrue(); ((StreamWriter<?>) stream).close(); } assertThat(data.refCnt()).isEqualTo(1); stream.subscribe(new Subscriber<ByteBufHolder>() { @Override public void onSubscribe(Subscription subscription) { subscription.request(1); } @Override public void onNext(ByteBufHolder o) { assertThat(o).isNotSameAs(data); assertThat(o).isInstanceOf(ByteBufHttpData.class); assertThat(o.content()).isInstanceOf(UnpooledHeapByteBuf.class); assertThat(o.refCnt()).isEqualTo(1); assertThat(data.refCnt()).isZero(); } @Override public void onError(Throwable throwable) { Exceptions.throwUnsafely(throwable); } @Override public void onComplete() { completed = true; } }); await().untilAsserted(() -> assertThat(completed).isTrue()); }
@Override protected String format(ChannelHandlerContext ctx, String eventName, Object msg) { String channelId = ctx.channel().id().asShortText(); String socketInfo = buildSocketInfo(ctx.channel().localAddress(), ctx.channel().remoteAddress()); String msgStr; try { if (msg instanceof ByteBuf) { msgStr = formatPayload((ByteBuf) msg); } else if (msg instanceof ByteBufHolder) { msgStr = formatPayload((ByteBufHolder) msg); } else { msgStr = String.valueOf(msg); } } catch (CharacterCodingException e) { msgStr = "<< Payload could not be decoded >>"; } StringBuilder stringBuilder = new StringBuilder( 7 + channelId.length() + 14 + correlatedSourceId.length() + socketInfo.length() + 2 + eventName.length() + 2 + msgStr.length()); if (EVENT_REGISTERED.equals(eventName) || EVENT_CONNECT.equals(eventName)) { return stringBuilder.append("[id: 0x").append(channelId).append("] ").append(eventName) .append(": ").append(msgStr).toString(); } else { return stringBuilder.append("[id: 0x").append(channelId).append(", corSrcId: ").append(correlatedSourceId) .append(socketInfo).append("] ").append(eventName).append(": ").append(msgStr).toString(); } }
/** * Formats an event and returns the formatted message. * * @param eventName the name of the event * @param arg the argument of the event */ protected String format(ChannelHandlerContext ctx, String eventName, Object arg) { if (arg instanceof ByteBuf) { return formatByteBuf(ctx, eventName, (ByteBuf) arg); } else if (arg instanceof ByteBufHolder) { return formatByteBufHolder(ctx, eventName, (ByteBufHolder) arg); } else { return formatSimple(ctx, eventName, arg); } }
@Override public HttpObject responsePost(HttpObject httpObject) { if (httpObject instanceof ByteBufHolder) { log.trace("Recording content on path {}", originalRequest.getUri()); String content = ((ByteBufHolder) httpObject).content().toString(Charset.forName("UTF-8")); buffer.append(content); } if (ProxyUtils.isLastChunk(httpObject) && httpObject instanceof ByteBufHolder) { Headers headers = new Headers(new HashMap<>()); int status = -1; if (httpObject instanceof HttpMessage) { if (httpObject instanceof HttpResponse) { status = ((HttpResponse) httpObject).getStatus().code(); } headers = NettyHttpHeadersUtil.convert(((HttpMessage) httpObject).headers()); } storage.store(new ServerResponse( buffer.toString(), new ServerResponseMeta( status, findRequestMethodFromString(originalRequest.getMethod().toString()), originalRequest.getUri(), headers))); } return httpObject; }
/** * Calculate the size of the given {@link Object}. * * This implementation supports {@link ByteBuf} and {@link ByteBufHolder}. Sub-classes may override this. * @param msg the msg for which the size should be calculated * @return size the size of the msg or {@code -1} if unknown. */ protected long calculateSize(Object msg) { if (msg instanceof ByteBuf) { return ((ByteBuf) msg).readableBytes(); } if (msg instanceof ByteBufHolder) { return ((ByteBufHolder) msg).content().readableBytes(); } return -1; }