@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { try { //LibraLog.info("decode is bytebuf in is :" + JsonUtil.ObjectToJsonString(in.array())); // 空的buf if (in instanceof EmptyByteBuf || in.readableBytes() < 0) { return; } short length = in.readShort(); if (length != in.readableBytes()) { return; } // 反序列化 LibraMessage message = LibraMessage.decode(in); if (message == null) { return; } out.add(message); } catch (Exception e) { e.printStackTrace(); return; } }
@Override protected void decode( ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list ) throws Exception { if( byteBuf instanceof EmptyByteBuf ) { System.out.println( "Got empty byte buf for " + channelHandlerContext.name() ); return; } ConnectionHandler connectionHandler = (ConnectionHandler) channelHandlerContext.pipeline().get( "connectionHandler" ); int messageId = Message.readVarInt( byteBuf ); Message message = connectionManager.getRegistry( connectionHandler ).createMessageFromId( messageId ); if( message == null ) { System.out.println( "Cannot find message id " + messageId ); while ( byteBuf.readableBytes() > 0 ){ byteBuf.readByte(); } return; } message.read( byteBuf ); list.add( message ); }
@Test public void convertContentChunksToRawString_and_convertContentChunksToRawBytes_works_with_EmptyByteBuf_chunks() throws IOException { // given Charset contentCharset = CharsetUtil.UTF_8; String chunk1Content = UUID.randomUUID().toString(); String chunk2Content = UUID.randomUUID().toString(); byte[] chunk1Bytes = chunk1Content.getBytes(contentCharset); byte[] chunk2Bytes = chunk2Content.getBytes(contentCharset); ByteBuf chunk1ByteBuf = Unpooled.copiedBuffer(chunk1Bytes); ByteBuf chunk2ByteBuf = Unpooled.copiedBuffer(chunk2Bytes); Collection<HttpContent> chunkCollection = Arrays.asList( new DefaultHttpContent(chunk1ByteBuf), new DefaultHttpContent(new EmptyByteBuf(ByteBufAllocator.DEFAULT)), new DefaultHttpContent(chunk2ByteBuf), new DefaultHttpContent(new EmptyByteBuf(ByteBufAllocator.DEFAULT)) ); // when String resultString = HttpUtils.convertContentChunksToRawString(contentCharset, chunkCollection); byte[] resultBytes = HttpUtils.convertContentChunksToRawBytes(chunkCollection); // then String expectedResultString = chunk1Content + chunk2Content; assertThat(resultString, is(expectedResultString)); ByteArrayOutputStream baos = new ByteArrayOutputStream(); baos.write(chunk1Bytes); baos.write(chunk2Bytes); assertThat(resultBytes, is(baos.toByteArray())); }
@Test public void convertContentChunksToRawBytes_returns_null_if_total_bytes_is_zero() { // given Collection<HttpContent> chunkCollection = Arrays.asList(new DefaultHttpContent(new EmptyByteBuf(ByteBufAllocator.DEFAULT)), new DefaultHttpContent(new EmptyByteBuf(ByteBufAllocator.DEFAULT))); // when byte[] resultBytes = HttpUtils.convertContentChunksToRawBytes(chunkCollection); // then assertThat(resultBytes, nullValue()); }
@Override final public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg == null || msg == Unpooled.EMPTY_BUFFER || msg instanceof EmptyByteBuf) { return; } try { ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel()); if (ops != null) { ops.onInboundNext(ctx, msg); } else { if (log.isDebugEnabled()) { String loggingMsg = msg.toString(); if (msg instanceof HttpResponse) { DecoderResult decoderResult = ((HttpResponse) msg).decoderResult(); if (decoderResult.isFailure()) { log.debug("Decoding failed: " + msg + " : ", decoderResult.cause()); } } if (msg instanceof ByteBufHolder) { loggingMsg = ((ByteBufHolder) msg).content() .toString(Charset.defaultCharset()); } log.debug("{} No ChannelOperation attached. Dropping: {}", ctx .channel().toString(), loggingMsg); } ReferenceCountUtil.release(msg); } } catch (Throwable err) { Exceptions.throwIfFatal(err); exceptionCaught(ctx, err); ReferenceCountUtil.safeRelease(msg); } }
@Override protected void decode( ChannelHandlerContext channelHandlerContext, ByteBuf buf, List<Object> objects ) throws Exception { if ( buf instanceof EmptyByteBuf ) { // The Channel has disconnected and this is the last message we got. R.I.P. connection return; } byte packetId = buf.readByte(); switch ( packetId ) { case 1: WrappedMCPEPacket wrappedMCPEPacket = new WrappedMCPEPacket(); wrappedMCPEPacket.read( buf ); objects.add( wrappedMCPEPacket ); break; case 2: UpdatePingPacket updatePingPacket = new UpdatePingPacket(); updatePingPacket.read( buf ); objects.add( updatePingPacket ); break; case 3: SendPlayerToServerPacket sendPlayerToServerPacket = new SendPlayerToServerPacket(); sendPlayerToServerPacket.read( buf ); objects.add( sendPlayerToServerPacket ); break; default: break; } }
/** * Sends a message either on behalf of the client or on behalf of the broker (Will Messages) * @param messageId * @param topic * @param qos * @param payload * @param retain * @param internal if true means on behalf of the broker (skips authorisation) and does not return ack. * @throws Exception */ void sendInternal(int messageId, String topic, int qos, ByteBuf payload, boolean retain, boolean internal) throws Exception { synchronized (lock) { Message serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, topic, retain, qos, payload); if (qos > 0) { serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES); } if (qos < 2 || !state.getPubRec().contains(messageId)) { if (qos == 2 && !internal) state.getPubRec().add(messageId); Transaction tx = session.getServerSession().newTransaction(); try { if (internal) { session.getServer().getPostOffice().route(serverMessage, tx, true); } else { session.getServerSession().send(tx, serverMessage, true, false); } if (retain) { boolean reset = payload instanceof EmptyByteBuf || payload.capacity() == 0; session.getRetainMessageManager().handleRetainedMessage(serverMessage, topic, reset, tx); } tx.commit(); } catch (Throwable t) { logger.warn(t.getMessage(), t); tx.rollback(); throw t; } createMessageAck(messageId, qos, internal); } } }
@Override protected void decodeLittleEndian( ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> objects ) throws Exception { if (byteBuf instanceof EmptyByteBuf) { //TODO: This is a workaround. Check how to prevent calling decode on channel inactive return; } // Header final RequestBaseMessage requestBaseMessage = BaseMessageDecoder.decode( channelHandlerContext, byteBuf); byteBuf.skipBytes(Ints.BYTES); // Ignore responseTo field in header int requestOpCodeInt = byteBuf.readInt(); RequestOpCode requestOpCode = RequestOpCode.getByOpcode(requestOpCodeInt); if (null == requestOpCode) { LOGGER.warn(INVALID_OPCODE_MESSAGE + requestOpCodeInt); throw new IllegalOperationException(requestOpCodeInt); } // Body MessageDecoder<?> messageDecoder = decoderLocator.getByOpCode(requestOpCode); if (null == messageDecoder) { LOGGER.error(OPERATION_NOT_IMPLEMENTED + requestOpCode); throw new UnsupportedOperationException(OPERATION_NOT_IMPLEMENTED + requestOpCode); } objects.add(messageDecoder.decode(byteBuf, requestBaseMessage)); }
@Test public void closeAfterClientHalfCloseShouldSucceed() throws Exception { ListMultimap<CharSequence, CharSequence> expectedHeaders = ImmutableListMultimap.copyOf(new DefaultHttp2Headers() .status(new AsciiString("200")) .set(new AsciiString("content-type"), new AsciiString("application/grpc")) .set(new AsciiString("grpc-status"), new AsciiString("0"))); // Client half-closes. Listener gets halfClosed() stream().transportState() .inboundDataReceived(new EmptyByteBuf(UnpooledByteBufAllocator.DEFAULT), true); verify(serverListener).halfClosed(); // Server closes. Status sent stream().close(Status.OK, trailers); assertNull("no message expected", listenerMessageQueue.poll()); ArgumentCaptor<SendResponseHeadersCommand> cmdCap = ArgumentCaptor.forClass(SendResponseHeadersCommand.class); verify(writeQueue).enqueue(cmdCap.capture(), eq(true)); SendResponseHeadersCommand cmd = cmdCap.getValue(); assertThat(cmd.stream()).isSameAs(stream.transportState()); assertThat(ImmutableListMultimap.copyOf(cmd.headers())) .containsExactlyEntriesIn(expectedHeaders); assertThat(cmd.endOfStream()).isTrue(); // Sending and receiving complete. Listener gets closed() stream().transportState().complete(); verify(serverListener).closed(Status.OK); assertNull("no message expected", listenerMessageQueue.poll()); }
@Test public void abortStreamAfterClientHalfCloseShouldCallClose() { Status status = Status.INTERNAL.withCause(new Throwable()); // Client half-closes. Listener gets halfClosed() stream().transportState().inboundDataReceived( new EmptyByteBuf(UnpooledByteBufAllocator.DEFAULT), true); verify(serverListener).halfClosed(); // Abort from the transport layer stream().transportState().transportReportStatus(status); verify(serverListener).closed(same(status)); assertNull("no message expected", listenerMessageQueue.poll()); }
@Test public void testFilterRequest() throws IOException { AppConfiguration appConfig = new AppConfiguration(new ConfigLoader(), null); appConfig.init(); PolicyManager policyManager = mock(PolicyManager.class); NettyRequestProxyFilter filter = new NettyRequestProxyFilter( policyManager, appConfig); ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); when(ctx.attr(any(AttributeKey.class))).thenReturn( mock(Attribute.class)); assertNull(filter.filterRequest(mock(HttpRequest.class), ctx)); DefaultFullHttpRequest req = new DefaultFullHttpRequest( HttpVersion.HTTP_1_1, HttpMethod.GET, "http://test.ebay.com/s/"); when(policyManager.cacheIsNeededFor(any(CacheDecisionObject.class))) .thenReturn(false); assertNull(filter.filterRequest(req, ctx)); when(policyManager.cacheIsNeededFor(any(CacheDecisionObject.class))) .thenReturn(true); CacheManager cm = mock(CacheManager.class); when(policyManager.getCacheManager()).thenReturn(cm); assertNull(filter.filterRequest(req, ctx)); FullHttpResponse resp = mock(FullHttpResponse.class); HttpHeaders respHeaders = mock(HttpHeaders.class); when(resp.headers()).thenReturn(respHeaders); when(respHeaders.get(any(CharSequence.class))).thenReturn("100"); when(cm.get(anyString())).thenReturn(resp); Channel channel = mock(Channel.class); SocketChannelConfig config = mock(SocketChannelConfig.class); when(channel.config()).thenReturn(config); when(ctx.channel()).thenReturn(channel); req.headers().add("h1", "v1"); when(resp.content()).thenReturn( new EmptyByteBuf(new PooledByteBufAllocator())).thenReturn( Unpooled.copiedBuffer("Hello".getBytes())); assertEquals(resp, filter.filterRequest(req, ctx)); assertEquals(resp, filter.filterRequest(req, ctx)); }
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if ((in instanceof EmptyByteBuf) || (in.readableBytes() == 0)) return; OperationContext.useHubContext(); Logger.trace("Decoding Stream Data: " + in.readableBytes()); switch (this.state) { case HEADER: { if (this.headerparser == null) { this.builder = new ObjectBuilder(); this.headerparser = new BufferToCompositeParser(this.builder); } this.headerparser.parseStruct(in); // if not done wait for more bytes if (!this.headerparser.isDone()) return; this.state = State.PAYLOAD_SIZE; // deliberate fall through } case PAYLOAD_SIZE: { if (in.readableBytes() < 4) return; this.size = in.readInt(); this.state = State.PAYLOAD; // deliberate fall through } case PAYLOAD: { // return here, without any state reset, means we need more before we can decide what to do if (in.readableBytes() < this.size) return; // we have enough data to send the message... StreamMessage msg = new StreamMessage(); // add Data only if there are some bytes, otherwise skip buffer allocation if (this.size > 0) { ByteBuf bb = in.readSlice(this.size); bb.retain(); msg.setData(bb); } msg.copyFields((RecordStruct) this.builder.getRoot()); out.add(msg); // set state to start over - ready to process next message this.headerparser = null; this.size = 0; this.state = State.HEADER; } } }