public static void setIn(Exchange exchange, Object payload) { if (payload instanceof DefaultExchangeHolder) { DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) payload); } else if (payload instanceof AddressedEnvelope) { @SuppressWarnings("unchecked") AddressedEnvelope<Object, InetSocketAddress> dp = (AddressedEnvelope<Object, InetSocketAddress>)payload; // need to check if the content is ExchangeHolder if (dp.content() instanceof DefaultExchangeHolder) { DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) dp.content()); } else { // need to take out the payload here exchange.getIn().setBody(dp.content()); } // setup the sender address here for sending the response message back exchange.setProperty(NettyConstants.NETTY_REMOTE_ADDRESS, dp.sender()); // setup the remote address to the message header at the same time exchange.getIn().setHeader(NettyConstants.NETTY_REMOTE_ADDRESS, dp.sender()); } else { // normal transfer using the body only exchange.getIn().setBody(payload); } }
public static void setOut(Exchange exchange, Object payload) { if (payload instanceof DefaultExchangeHolder) { DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) payload); } else if (payload instanceof AddressedEnvelope) { @SuppressWarnings("unchecked") AddressedEnvelope<Object, InetSocketAddress> dp = (AddressedEnvelope<Object, InetSocketAddress>)payload; // need to check if the content is ExchangeHolder if (dp.content() instanceof DefaultExchangeHolder) { DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) dp.content()); } else { // need to take out the payload here exchange.getOut().setBody(dp.content()); } // setup the sender address here for sending the response message back exchange.setProperty(NettyConstants.NETTY_REMOTE_ADDRESS, dp.sender()); } else { // normal transfer using the body only and preserve the headers exchange.getOut().setHeaders(exchange.getIn().getHeaders()); exchange.getOut().setBody(payload); } }
@Test public void testDecoder() { ByteBuf buf = Unpooled.buffer(); buf.writeBytes(VALUE.getBytes()); ByteBuf input = buf.duplicate(); AddressedEnvelope<Object, InetSocketAddress> addressedEnvelop = new DefaultAddressedEnvelope<Object, InetSocketAddress>(input, new InetSocketAddress(8888)); EmbeddedChannel channel = new EmbeddedChannel(ChannelHandlerFactories.newByteArrayDecoder("udp").newChannelHandler()); Assert.assertTrue(channel.writeInbound(addressedEnvelop)); Assert.assertTrue(channel.finish()); AddressedEnvelope<Object, InetSocketAddress> result = (AddressedEnvelope) channel.readInbound(); Assert.assertEquals(result.recipient().getPort(), addressedEnvelop.recipient().getPort()); Assert.assertTrue(result.content() instanceof byte[]); Assert.assertEquals(VALUE, new String((byte[]) result.content())); Assert.assertNull(channel.readInbound()); }
@Override protected Object filterOutboundMessage(Object msg) { if (msg instanceof DatagramPacket || msg instanceof ByteBuf) { return msg; } if (msg instanceof AddressedEnvelope) { @SuppressWarnings("unchecked") AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg; if (e.content() instanceof ByteBuf) { return msg; } } throw new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); }
@VisibleForTesting void query() { final DnsQuestion question = new DefaultDnsQuestion(hostname, DnsRecordType.SRV); final CompletableFuture<List<Endpoint>> promise = new CompletableFuture<>(); resolver.query(question).addListener( (Future<AddressedEnvelope<DnsResponse, InetSocketAddress>> future) -> { if (future.cause() != null) { logger.warn("Error resolving a domain name: {}", hostname, future.cause()); return; } onResponse(question, future.getNow(), promise); }); promise.thenAccept(newEndpoints -> { List<Endpoint> endpoints = endpoints(); if (!endpoints.equals(newEndpoints)) { setEndpoints(newEndpoints); } }); }
private void onResponse( DnsQuestion question, AddressedEnvelope<DnsResponse, InetSocketAddress> envelope, CompletableFuture<List<Endpoint>> promise) { try { final DnsResponse res = envelope.content(); final DnsResponseCode code = res.code(); if (code == DnsResponseCode.NOERROR) { decodeResponse(question, envelope, promise); return; } if (code != DnsResponseCode.NXDOMAIN) { logger.warn( "Name lookup failed on configured name server for hostname: {} - querying other " + "name servers is not supported.", hostname); } else { logger.warn("No records found for hostname: {}. Is it registered in DNS?", hostname); } promise.complete(ImmutableList.of()); } finally { ReferenceCountUtil.safeRelease(envelope); } }
/** * Decodes a {@link DatagramPacket} to a {@link DataPacket} wrapped into an {@link AddressedEnvelope} to allow multicast on * the used {@link SocketChannel}. * * @param ctx The context of the ChannelHandler * @param msg the message which should be encoded * @param out a list where all messages are written to */ @Override protected void decode(ChannelHandlerContext ctx, DatagramPacket msg, List<Object> out) throws Exception { final ByteBuf content = msg.content(); final SocketAddress sender = msg.sender(); final SocketAddress recipient = msg.recipient(); try { final DataPacket dataPacket = DataPacket.decode(content); final AddressedEnvelope<DataPacket, SocketAddress> newMsg = new DefaultAddressedEnvelope<>( dataPacket, recipient, sender); out.add(newMsg); } catch (Exception e) { LOG.debug("Failed to decode RTP packet.", e); } }
/** * Encodes a {@link CompoundControlPacket} wrapped into an {@link AddressedEnvelope} to a {@link ByteBuf} also wrapped * into an {@link AddressedEnvelope}. * * @param ctx The context of the ChannelHandler * @param msg the message which should be encoded * @param out a list where all messages are written to */ @Override protected void encode(ChannelHandlerContext ctx, AddressedEnvelope<CompoundControlPacket, SocketAddress> msg, List<Object> out) throws Exception { // encode CompountControlPacket here and forward destination (recipient) of the packet final CompoundControlPacket compoundControlPacket = msg.content(); final List<ControlPacket> packets = compoundControlPacket.getControlPackets(); ByteBuf compoundBuffer = Unpooled.EMPTY_BUFFER; if(!packets.isEmpty()) { final ByteBuf[] buffers = new ByteBuf[packets.size()]; for (int i = 0; i < buffers.length; i++) { buffers[i] = packets.get(i).encode(); } compoundBuffer = Unpooled.wrappedBuffer(buffers); } AddressedEnvelope<ByteBuf, SocketAddress> newMsg = new DefaultAddressedEnvelope<>(compoundBuffer, msg.recipient(), ctx.channel().localAddress()); out.add(newMsg); }
/** * Encodes a {@link DataPacket} wrapped into an {@link AddressedEnvelope} in a {@link ByteBuf} also wrapped into an * {@link AddressedEnvelope}. If the {@link DataPacket}'s content is not empty it is added, otherwise an empty ByteBuf * is added to the AddressedEnvelope. * * @param ctx The context of the ChannelHandler * @param msg the message which should be encoded * @param out a list where all messages are written to */ @Override protected void encode(ChannelHandlerContext ctx, AddressedEnvelope<DataPacket, SocketAddress> msg, List<Object> out) throws Exception { // encode CompountControlPacket here and forward destination (recipient) of the packet final DataPacket dataPacket = msg.content(); final SocketAddress recipient = msg.recipient(); final SocketAddress sender = ctx.channel().localAddress(); final ByteBuf buffer; if (dataPacket.getDataSize() == 0) { buffer = Unpooled.EMPTY_BUFFER; } else { buffer = dataPacket.encode(); } final AddressedEnvelope<ByteBuf, SocketAddress> newMsg = new DefaultAddressedEnvelope<>(buffer, recipient, sender); out.add(newMsg); }
@Override protected final void encode(ChannelHandlerContext ctx, AddressedEnvelope<RakNetPacket, InetSocketAddress> msg, List<Object> out) throws Exception { assert out.isEmpty(); RakNetPacket packet = msg.content(); RakNetByteBuf data = RakNetByteBuf.buffer(); packet.write(data); out.add(new DatagramPacket(data, msg.recipient(), msg.sender())); }
@Override protected void encode(ChannelHandlerContext ctx, AddressedEnvelope<Object, InetSocketAddress> msg, List<Object> out) throws Exception { if (msg.content() instanceof ByteBuf) { ByteBuf payload = (ByteBuf)msg.content(); // Just wrap the message as DatagramPacket, need to make sure the message content is ByteBuf DatagramPacket dp = new DatagramPacket(payload.retain(), msg.recipient()); out.add(dp); } }
@Override protected void decode(ChannelHandlerContext ctx, AddressedEnvelope<Object, InetSocketAddress> msg, List<Object> out) throws Exception { if (msg.content() instanceof ByteBuf) { ByteBuf payload = (ByteBuf) msg.content(); Object result = delegateDecoder.decode(ctx, payload); AddressedEnvelope<Object, InetSocketAddress> addressedEnvelop = new DefaultAddressedEnvelope<Object, InetSocketAddress>(result, msg.recipient(), msg.sender()); out.add(addressedEnvelop); } }
@Override protected void encode(ChannelHandlerContext ctx, AddressedEnvelope<Object, InetSocketAddress> msg, List<Object> out) throws Exception { if (msg.content() instanceof Serializable) { Serializable payload = (Serializable) msg.content(); ByteBuf buf = ctx.alloc().heapBuffer(); delegateObjectEncoder.encode(ctx, payload, buf); AddressedEnvelope<Object, InetSocketAddress> addressedEnvelop = new DefaultAddressedEnvelope<Object, InetSocketAddress>(buf.retain(), msg.recipient(), msg.sender()); out.add(addressedEnvelop); } }
@Override protected void decode(ChannelHandlerContext ctx, AddressedEnvelope<Object, InetSocketAddress> msg, List<Object> out) throws Exception { if (msg.content() instanceof ByteBuf) { ByteBuf payload = (ByteBuf)msg.content(); Object result = delegateDecoder.decode(ctx, payload); AddressedEnvelope<Object, InetSocketAddress> addressEvelop = new DefaultAddressedEnvelope<Object, InetSocketAddress>(result, msg.recipient(), msg.sender()); out.add(addressEvelop); } }
@Override protected void encode(ChannelHandlerContext ctx, AddressedEnvelope<Object, InetSocketAddress> msg, List<Object> out) throws Exception { if (msg.content() instanceof CharSequence) { CharSequence payload = (CharSequence)msg.content(); if (payload.length() == 0) { return; } AddressedEnvelope<Object, InetSocketAddress> addressedEnvelop = new DefaultAddressedEnvelope<Object, InetSocketAddress>(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(payload), charset), msg.recipient(), msg.sender()); out.add(addressedEnvelop); } }
@Override protected void decode(ChannelHandlerContext ctx, AddressedEnvelope<Object, InetSocketAddress> msg, List<Object> out) throws Exception { if (msg.content() instanceof ByteBuf) { ByteBuf payload = (ByteBuf)msg.content(); AddressedEnvelope<Object, InetSocketAddress> addressedEnvelop = new DefaultAddressedEnvelope<Object, InetSocketAddress>(payload.toString(charset), msg.recipient(), msg.sender()); out.add(addressedEnvelop); } }
@Override protected void encode(ChannelHandlerContext ctx, AddressedEnvelope<Object, InetSocketAddress> msg, List<Object> out) throws Exception { if (msg.content() instanceof byte[]) { delegateEncoder.encode(ctx, (byte[]) msg.content(), out); ByteBuf buf = (ByteBuf) out.remove(out.size() - 1); AddressedEnvelope<Object, InetSocketAddress> addressedEnvelop = new DefaultAddressedEnvelope<Object, InetSocketAddress>(buf.retain(), msg.recipient(), msg.sender()); out.add(addressedEnvelop); } }
@Test public void testEncoder() { ByteBuf buf = Unpooled.buffer(); buf.writeBytes(VALUE.getBytes()); AddressedEnvelope<Object, InetSocketAddress> addressedEnvelop = new DefaultAddressedEnvelope<Object, InetSocketAddress>(VALUE.getBytes(), new InetSocketAddress(8888)); EmbeddedChannel channel = new EmbeddedChannel(ChannelHandlerFactories.newByteArrayEncoder("udp").newChannelHandler()); Assert.assertTrue(channel.writeOutbound(addressedEnvelop)); Assert.assertTrue(channel.finish()); AddressedEnvelope output = (AddressedEnvelope) channel.readOutbound(); Assert.assertTrue(output.content() instanceof ByteBuf); ByteBuf resultContent = (ByteBuf) output.content(); Assert.assertEquals(VALUE, new String(resultContent.array())); Assert.assertNull(channel.readOutbound()); }
@Override protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { final SocketAddress remoteAddress; final ByteBuf data; if (msg instanceof AddressedEnvelope) { @SuppressWarnings("unchecked") AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) msg; remoteAddress = envelope.recipient(); data = envelope.content(); } else { data = (ByteBuf) msg; remoteAddress = null; } final int dataLen = data.readableBytes(); if (dataLen == 0) { return true; } final ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), dataLen); final int writtenBytes; if (remoteAddress != null) { writtenBytes = javaChannel().send(nioData, remoteAddress); } else { writtenBytes = javaChannel().write(nioData); } return writtenBytes > 0; }
private void decodeResponse( DnsQuestion question, AddressedEnvelope<DnsResponse, InetSocketAddress> envelope, CompletableFuture<List<Endpoint>> promise) { final DnsResponse response = envelope.content(); final int answerCount = response.count(DnsSection.ANSWER); ImmutableList.Builder<Endpoint> resolvedEndpoints = ImmutableList.builder(); for (int i = 0; i < answerCount; i++) { final DnsRecord r = response.recordAt(DnsSection.ANSWER, i); final DnsRecordType type = r.type(); if (type != DnsRecordType.SRV) { continue; } final String questionName = Ascii.toLowerCase(question.name()); final String recordName = Ascii.toLowerCase(r.name()); // Make sure the record is for the questioned domain. if (!recordName.equals(questionName)) { continue; } final Endpoint resolved = decodeSrvEndpoint(r); if (resolved == null) { continue; } resolvedEndpoints.add(resolved); // Note that we do not break from the loop here, so we decode all SRV records. } promise.complete(resolvedEndpoints.build()); }
/** * Decodes a {@link DatagramPacket} to a {@link CompoundControlPacket} wrapped into an {@link AddressedEnvelope}. * * @param ctx The context of the ChannelHandler * @param msg the message which should be encoded * @param out a list where all messages are written to */ @Override protected void decode(ChannelHandlerContext ctx, DatagramPacket msg, List<Object> out) throws Exception { final ByteBuf content = msg.content(); final SocketAddress sender = msg.sender(); final SocketAddress recipient = msg.recipient(); if ((content.readableBytes() % 4) != 0) { LOG.debug("Invalid RTCP packet received: total length should be multiple of 4 but is {}", content.readableBytes()); return; } // Usually 2 packets per UDP frame... final List<ControlPacket> controlPacketList = new ArrayList<>(2); // While there's data to read, keep on decoding. while (content.readableBytes() > 0) { try { // prevent adding null final ControlPacket packet = ControlPacket.decode(content); if(packet != null){ controlPacketList.add(packet); } } catch (Exception e1) { LOG.debug("Exception caught while decoding RTCP packet.", e1); break; } } if (!controlPacketList.isEmpty()) { // Only forward to next ChannelHandler when there were more than one valid decoded packets. // TODO shouldn't the whole compound packet be discarded when one of them has errors?! final AddressedEnvelope<CompoundControlPacket, SocketAddress> newMsg = new DefaultAddressedEnvelope<>(new CompoundControlPacket(controlPacketList), recipient, sender); out.add(newMsg); } }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { final Object o = in.current(); if (o == null) { break; } final ByteBuf data; final SocketAddress remoteAddress; if (o instanceof AddressedEnvelope) { @SuppressWarnings("unchecked") AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) o; remoteAddress = envelope.recipient(); data = envelope.content(); } else { data = (ByteBuf) o; remoteAddress = null; } final int length = data.readableBytes(); if (remoteAddress != null) { tmpPacket.setSocketAddress(remoteAddress); } if (data.hasArray()) { tmpPacket.setData(data.array(), data.arrayOffset() + data.readerIndex(), length); } else { byte[] tmp = new byte[length]; data.getBytes(data.readerIndex(), tmp); tmpPacket.setData(tmp); } try { socket.send(tmpPacket); in.remove(); } catch (IOException e) { // Continue on write error as a DatagramChannel can write to multiple remote peers // // See https://github.com/netty/netty/issues/2665 in.remove(e); } } }
@Override protected void channelRead0(ChannelHandlerContext ctx, AddressedEnvelope<CompoundControlPacket, SocketAddress> msg) throws Exception { this.messageReceived(ctx, msg); }
@Override protected void channelRead0(ChannelHandlerContext ctx, AddressedEnvelope<DataPacket, SocketAddress> msg) throws Exception { this.messageReceived(ctx, msg); }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { final Object o = in.current(); if (o == null) { break; } final Object m; final ByteBuf data; final SocketAddress remoteAddress; if (o instanceof AddressedEnvelope) { @SuppressWarnings("unchecked") AddressedEnvelope<Object, SocketAddress> envelope = (AddressedEnvelope<Object, SocketAddress>) o; remoteAddress = envelope.recipient(); m = envelope.content(); } else { m = o; remoteAddress = null; } if (m instanceof ByteBufHolder) { data = ((ByteBufHolder) m).content(); } else if (m instanceof ByteBuf) { data = (ByteBuf) m; } else { throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(o)); } int length = data.readableBytes(); if (remoteAddress != null) { tmpPacket.setSocketAddress(remoteAddress); } if (data.hasArray()) { tmpPacket.setData(data.array(), data.arrayOffset() + data.readerIndex(), length); } else { byte[] tmp = new byte[length]; data.getBytes(data.readerIndex(), tmp); tmpPacket.setData(tmp); } socket.send(tmpPacket); in.remove(); } }
@Override protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { final Object m; final SocketAddress remoteAddress; ByteBuf data; if (msg instanceof AddressedEnvelope) { @SuppressWarnings("unchecked") AddressedEnvelope<Object, SocketAddress> envelope = (AddressedEnvelope<Object, SocketAddress>) msg; remoteAddress = envelope.recipient(); m = envelope.content(); } else { m = msg; remoteAddress = null; } if (m instanceof ByteBufHolder) { data = ((ByteBufHolder) m).content(); } else if (m instanceof ByteBuf) { data = (ByteBuf) m; } else { throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg)); } int dataLen = data.readableBytes(); if (dataLen == 0) { return true; } ByteBufAllocator alloc = alloc(); boolean needsCopy = data.nioBufferCount() != 1; if (!needsCopy) { if (!data.isDirect() && alloc.isDirectBufferPooled()) { needsCopy = true; } } ByteBuffer nioData; if (!needsCopy) { nioData = data.nioBuffer(); } else { data = alloc.directBuffer(dataLen).writeBytes(data); nioData = data.nioBuffer(); } final int writtenBytes; if (remoteAddress != null) { writtenBytes = javaChannel().send(nioData, remoteAddress); } else { writtenBytes = javaChannel().write(nioData); } boolean done = writtenBytes > 0; if (needsCopy) { // This means we have allocated a new buffer and need to store it back so we not need to allocate it again // later if (remoteAddress == null) { // remoteAddress is null which means we can handle it as ByteBuf directly in.current(data); } else { if (!done) { // store it back with all the needed informations in.current(new DefaultAddressedEnvelope<ByteBuf, SocketAddress>(data, remoteAddress)); } else { // Just store back the new create buffer so it is cleaned up once in.remove() is called. in.current(data); } } } return done; }
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { final Object o = in.current(false); if (o == null) { break; } final Object m; final ByteBuf data; final SocketAddress remoteAddress; if (o instanceof AddressedEnvelope) { @SuppressWarnings("unchecked") AddressedEnvelope<Object, SocketAddress> envelope = (AddressedEnvelope<Object, SocketAddress>) o; remoteAddress = envelope.recipient(); m = envelope.content(); } else { m = o; remoteAddress = null; } if (m instanceof ByteBufHolder) { data = ((ByteBufHolder) m).content(); } else if (m instanceof ByteBuf) { data = (ByteBuf) m; } else { throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(o)); } int length = data.readableBytes(); if (remoteAddress != null) { tmpPacket.setSocketAddress(remoteAddress); } if (data.hasArray()) { tmpPacket.setData(data.array(), data.arrayOffset() + data.readerIndex(), length); } else { byte[] tmp = new byte[length]; data.getBytes(data.readerIndex(), tmp); tmpPacket.setData(tmp); } socket.send(tmpPacket); in.remove(); } }
/** * To be compatible to io.Netty version 5.0: * {@code channelRead0(ChannelHandlerContext, I)} will be renamed to {@code messageReceived(ChannelHandlerContext, I)} in 5.0. * * @param ctx the {@link ChannelHandlerContext} which this {@link SimpleChannelInboundHandler}/ * {@link UdpDataHandler} belongs to * @param msg the message to handle * @throws Exception is thrown if an error occurred */ //@Override protected void messageReceived(ChannelHandlerContext ctx, AddressedEnvelope<CompoundControlPacket, SocketAddress> msg) throws Exception { final CompoundControlPacket packet = msg.content(); final SocketAddress sender = msg.sender(); this.counter.incrementAndGet(); this.receiver.controlPacketReceived(sender, packet); }
/** * To be compatible to io.Netty version 5.0: * {@code channelRead0(ChannelHandlerContext, I)} will be renamed to {@code messageReceived(ChannelHandlerContext, I)} in 5.0. * * @param ctx the {@link ChannelHandlerContext} which this {@link SimpleChannelInboundHandler}/ * {@link UdpDataHandler} belongs to * @param msg the message to handle * @throws Exception is thrown if an error occurred */ //@Override protected void messageReceived(ChannelHandlerContext ctx, AddressedEnvelope<DataPacket, SocketAddress> msg) throws Exception { final DataPacket packet = msg.content(); final SocketAddress sender = msg.sender(); this.counter.incrementAndGet(); this.receiver.dataPacketReceived(sender, packet); }
/** * Writes the packets information to the data channel * * @param packet * @param destination */ protected void writeToData(DataPacket packet, SocketAddress destination) { final AddressedEnvelope<DataPacket, SocketAddress> envelope = new DefaultAddressedEnvelope<>(packet, destination); this.dataChannel.writeAndFlush(envelope); }
/** * Write the packets information to the control channel * * @param packet * @param destination */ protected void writeToControl(ControlPacket packet, SocketAddress destination) { // FIXME: does not work currently -> add new encoder for ControlPackets wrapped into Envelopes final AddressedEnvelope<ControlPacket, SocketAddress> envelope = new DefaultAddressedEnvelope<>(packet, destination); this.controlChannel.writeAndFlush(envelope); }
/** * Write the packets information to the control channel * * @param packet * @param destination */ protected void writeToControl(CompoundControlPacket packet, SocketAddress destination) { final AddressedEnvelope<CompoundControlPacket, SocketAddress> envelope = new DefaultAddressedEnvelope<>(packet, destination); this.controlChannel.writeAndFlush(envelope); }