@Override public void channelRead0(ChannelHandlerContext context, Object message) throws Exception { Channel channel = context.channel(); if (message instanceof FullHttpResponse) { checkState(!handshaker.isHandshakeComplete()); try { handshaker.finishHandshake(channel, (FullHttpResponse) message); delegate.onOpen(); } catch (WebSocketHandshakeException e) { delegate.onError(e); } } else if (message instanceof TextWebSocketFrame) { delegate.onMessage(((TextWebSocketFrame) message).text()); } else { checkState(message instanceof CloseWebSocketFrame); delegate.onClose(); } }
/** * 广播 * @param buildmessage: 经过build的Protocol */ public void broadMessage(String buildmessage){ if (!BlankUtil.isBlank(buildmessage)){ try { rwLock.readLock().lock(); Set<Channel> keySet = chatUserMap.keySet(); for (Channel ch : keySet) { ChatUser cUser = chatUserMap.get(ch); if (cUser == null || !cUser.isAuth()) continue; ch.writeAndFlush(new TextWebSocketFrame(buildmessage)); } }finally { rwLock.readLock().unlock(); } } }
/** * 广播普通消息 * * @param message */ public static void broadcastMess(int uid, String nick, String message) { if (!BlankUtil.isBlank(message)) { try { rwLock.readLock().lock(); Set<Channel> keySet = userInfos.keySet(); for (Channel ch : keySet) { UserInfo userInfo = userInfos.get(ch); if (userInfo == null || !userInfo.isAuth()) continue; ch.writeAndFlush(new TextWebSocketFrame(ChatProto.buildMessProto(uid, nick, message))); } } finally { rwLock.readLock().unlock(); } } }
private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // 判断是否关闭链路的指令 if (frame instanceof CloseWebSocketFrame) { socketServerHandshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); } // 判断是否ping消息 if (frame instanceof PingWebSocketFrame) { ctx.channel().write( new PongWebSocketFrame(frame.content().retain())); return; } // 本例程仅支持文本消息,不支持二进制消息 if (!(frame instanceof TextWebSocketFrame)) { throw new UnsupportedOperationException(String.format( "%s frame types not supported", frame.getClass().getName())); } // 返回应答消息 String request = ((TextWebSocketFrame) frame).text(); System.out.println("服务端收到:" + request); TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + ":" + request); // 群发 group.writeAndFlush(tws); }
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // 如果WebSocket握手完成 if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) { // 删除ChannelPipeline中的HttpRequestHttpHandler ctx.pipeline().remove(HttpRequestHandler.class); String user = ChatUtils.addChannel(ctx.channel()); Users us = new Users(user); ctx.channel().writeAndFlush(new TextWebSocketFrame(us.getCurrentUser())); // 写一个消息到ChannelGroup group.writeAndFlush(new TextWebSocketFrame(user + " 加入聊天室.")); // 将channel添加到ChannelGroup group.add(ctx.channel()); group.writeAndFlush(new TextWebSocketFrame(us.getAllUsers())); } else { super.userEventTriggered(ctx, evt); } }
@Test public void testCreateSubscriptionWithMissingSessionId() throws Exception { decoder = new WebSocketRequestDecoder(config); // @formatter:off String request = "{ "+ "\"operation\" : \"create\", " + "\"subscriptionId\" : \"1234\"" + " }"; // @formatter:on TextWebSocketFrame frame = new TextWebSocketFrame(); frame.content().writeBytes(request.getBytes(StandardCharsets.UTF_8)); decoder.decode(ctx, frame, results); Assert.assertNotNull(ctx.msg); Assert.assertEquals(CloseWebSocketFrame.class, ctx.msg.getClass()); Assert.assertEquals(1008, ((CloseWebSocketFrame) ctx.msg).statusCode()); Assert.assertEquals("User must log in", ((CloseWebSocketFrame) ctx.msg).reasonText()); }
@Test public void testCreateSubscriptionWithInvalidSessionIdAndNonAnonymousAccess() throws Exception { ctx.channel().attr(SubscriptionRegistry.SESSION_ID_ATTR) .set(URLEncoder.encode(UUID.randomUUID().toString(), StandardCharsets.UTF_8.name())); decoder = new WebSocketRequestDecoder(config); // @formatter:off String request = "{ "+ "\"operation\" : \"create\", " + "\"subscriptionId\" : \"1234\"" + " }"; // @formatter:on TextWebSocketFrame frame = new TextWebSocketFrame(); frame.content().writeBytes(request.getBytes(StandardCharsets.UTF_8)); decoder.decode(ctx, frame, results); Assert.assertNotNull(ctx.msg); Assert.assertEquals(CloseWebSocketFrame.class, ctx.msg.getClass()); Assert.assertEquals(1008, ((CloseWebSocketFrame) ctx.msg).statusCode()); Assert.assertEquals("User must log in", ((CloseWebSocketFrame) ctx.msg).reasonText()); }
@Test public void testCreateSubscriptionWithValidSessionIdAndNonAnonymousAccess() throws Exception { ctx.channel().attr(SubscriptionRegistry.SESSION_ID_ATTR).set(cookie); decoder = new WebSocketRequestDecoder(config); // @formatter:off String request = "{ " + "\"operation\" : \"create\"," + "\"subscriptionId\" : \"" + cookie + "\"" + "}"; // @formatter:on TextWebSocketFrame frame = new TextWebSocketFrame(); frame.content().writeBytes(request.getBytes(StandardCharsets.UTF_8)); decoder.decode(ctx, frame, results); Assert.assertEquals(1, results.size()); Assert.assertEquals(CreateSubscription.class, results.get(0).getClass()); }
@Test public void testSuggest() throws Exception { // @formatter:off String request = "{\n" + " \"operation\" : \"suggest\",\n" + " \"sessionId\" : \"1234\",\n" + " \"type\": \"metrics\",\n" + " \"q\": \"sys.cpu.user\",\n" + " \"max\": 30\n" + "}"; // @formatter:on decoder = new WebSocketRequestDecoder(anonConfig); TextWebSocketFrame frame = new TextWebSocketFrame(); frame.content().writeBytes(request.getBytes(StandardCharsets.UTF_8)); decoder.decode(ctx, frame, results); Assert.assertEquals(1, results.size()); Assert.assertEquals(SuggestRequest.class, results.get(0).getClass()); SuggestRequest suggest = (SuggestRequest) results.iterator().next(); Assert.assertEquals("metrics", suggest.getType()); Assert.assertEquals("sys.cpu.user", suggest.getQuery().get()); Assert.assertEquals(30, suggest.getMax()); suggest.validate(); }
@Test public void testWSAggregators() throws Exception { try { AggregatorsRequest request = new AggregatorsRequest(); ch.writeAndFlush(new TextWebSocketFrame(JsonUtil.getObjectMapper().writeValueAsString(request))); // Latency in TestConfiguration is 2s, wait for it sleepUninterruptibly(TestConfiguration.WAIT_SECONDS, TimeUnit.SECONDS); // Confirm receipt of all data sent to this point List<String> response = handler.getResponses(); while (response.size() == 0 && handler.isConnected()) { LOG.info("Waiting for web socket response"); sleepUninterruptibly(500, TimeUnit.MILLISECONDS); response = handler.getResponses(); } Assert.assertEquals(1, response.size()); JsonUtil.getObjectMapper().readValue(response.get(0), AggregatorsResponse.class); } finally { ch.close().sync(); s.shutdown(); group.shutdownGracefully(); } }
@Test public void testWSMetrics() throws Exception { try { MetricsRequest request = new MetricsRequest(); ch.writeAndFlush(new TextWebSocketFrame(JsonUtil.getObjectMapper().writeValueAsString(request))); // Confirm receipt of all data sent to this point List<String> response = handler.getResponses(); while (response.size() == 0 && handler.isConnected()) { LOG.info("Waiting for web socket response"); sleepUninterruptibly(500, TimeUnit.MILLISECONDS); response = handler.getResponses(); } Assert.assertEquals(1, response.size()); Assert.assertEquals("{\"metrics\":[]}", response.get(0)); } finally { ch.close().sync(); s.shutdown(); group.shutdownGracefully(); } }
@Test public void testVersion() throws Exception { try { String request = "{ \"operation\" : \"version\" }"; ch.writeAndFlush(new TextWebSocketFrame(request)); // Confirm receipt of all data sent to this point List<String> response = handler.getResponses(); while (response.size() == 0 && handler.isConnected()) { LOG.info("Waiting for web socket response"); sleepUninterruptibly(500, TimeUnit.MILLISECONDS); response = handler.getResponses(); } assertEquals(1, response.size()); assertEquals(VersionRequest.VERSION, response.get(0)); } finally { ch.close().sync(); s.shutdown(); group.shutdownGracefully(); } }
private void handleFrame(Channel channel, WebSocketFrame frame, WebSocketUpgradeHandler handler, NettyWebSocket webSocket) throws Exception { if (frame instanceof CloseWebSocketFrame) { Channels.setDiscard(channel); CloseWebSocketFrame closeFrame = (CloseWebSocketFrame) frame; webSocket.onClose(closeFrame.statusCode(), closeFrame.reasonText()); } else { ByteBuf buf = frame.content(); if (buf != null && buf.readableBytes() > 0) { HttpResponseBodyPart part = config.getResponseBodyPartFactory().newResponseBodyPart(buf, frame.isFinalFragment()); handler.onBodyPartReceived(part); if (frame instanceof BinaryWebSocketFrame) { webSocket.onBinaryFragment(part); } else if (frame instanceof TextWebSocketFrame) { webSocket.onTextFragment(part); } else if (frame instanceof PingWebSocketFrame) { webSocket.onPing(part); } else if (frame instanceof PongWebSocketFrame) { webSocket.onPong(part); } } } }
@Override protected void decode(final ChannelHandlerContext channelHandlerContext, final TextWebSocketFrame frame, final List<Object> objects) throws Exception { try { // the default serializer must be a MessageTextSerializer instance to be compatible with this decoder final MessageTextSerializer serializer = (MessageTextSerializer) select("application/json", Serializers.DEFAULT_REQUEST_SERIALIZER); // it's important to re-initialize these channel attributes as they apply globally to the channel. in // other words, the next request to this channel might not come with the same configuration and mixed // state can carry through from one request to the next channelHandlerContext.channel().attr(StateKey.SESSION).set(null); channelHandlerContext.channel().attr(StateKey.SERIALIZER).set(serializer); channelHandlerContext.channel().attr(StateKey.USE_BINARY).set(false); objects.add(serializer.deserializeRequest(frame.text())); } catch (SerializationException se) { objects.add(RequestMessage.INVALID); } }
/** * Sends this response to all the passed channels as a {@link TextWebSocketFrame} * @param listener A channel future listener to attach to each channel future. Ignored if null. * @param channels The channels to send this response to * @return An array of the futures for the write of this response to each channel written to */ public ChannelFuture[] send(ChannelFutureListener listener, Channel...channels) { if(channels!=null && channels.length>0) { Set<ChannelFuture> futures = new HashSet<ChannelFuture>(channels.length); if(opCode==null) { opCode = "ok"; } TextWebSocketFrame frame = new TextWebSocketFrame(this.toByteBuf()); for(Channel channel: channels) { if(channel!=null && channel.isWritable()) { ChannelFuture cf = channel.pipeline().writeAndFlush(frame); if(listener!=null) cf.addListener(listener); futures.add(cf); } } return futures.toArray(new ChannelFuture[futures.size()]); } return EMPTY_CHANNEL_FUTURE_ARR; }
private Message decodeWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // Check for closing frame if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return null; } if (frame instanceof PingWebSocketFrame) { ctx.write(new PongWebSocketFrame(frame.content().retain())); return null; } if (frame instanceof TextWebSocketFrame) { TextWebSocketFrame textFrame = (TextWebSocketFrame) frame; return parseMessage(textFrame.content()); } if (frame instanceof BinaryWebSocketFrame) { BinaryWebSocketFrame binFrame = (BinaryWebSocketFrame) frame; return parseMessage(binFrame.content()); } log.warn("Message format error: " + frame); return null; }
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // Check for closing frame if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } if (frame instanceof PingWebSocketFrame) { ctx.write(new PongWebSocketFrame(frame.content().retain())); return; } if (frame instanceof TextWebSocketFrame) { // Echo the frame ctx.write(frame.retain()); return; } if (frame instanceof BinaryWebSocketFrame) { // Echo the frame ctx.write(frame.retain()); return; } }
@ServiceActivator(inputChannel = Sink.INPUT) public void websocketSink(Message<?> message) { if (logger.isTraceEnabled()) { logger.trace(String.format("Handling message: %s", message)); } SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message); headers.setMessageTypeIfNotSet(SimpMessageType.MESSAGE); String messagePayload = message.getPayload().toString(); for (Channel channel : WebsocketSinkServer.channels) { if (logger.isTraceEnabled()) { logger.trace(String.format("Writing message %s to channel %s", messagePayload, channel.localAddress())); } channel.write(new TextWebSocketFrame(messagePayload)); channel.flush(); } if (traceEndpointEnabled) { addMessageToTraceRepository(message); } }
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // Check for closing frame if (frame instanceof CloseWebSocketFrame) { addTraceForFrame(frame, "close"); handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } if (frame instanceof PingWebSocketFrame) { addTraceForFrame(frame, "ping"); ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } if (!(frame instanceof TextWebSocketFrame)) { throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass() .getName())); } // todo [om] think about BinaryWebsocketFrame handleTextWebSocketFrameInternal((TextWebSocketFrame) frame, ctx); }
@Override public String call() throws Exception { // keep going until all messages are sent while (keepRunning.get()) { if (tickerSymbols.size() > 0) { TickerResponse tickerResponse = new TickerResponse(); tickerResponse.setResult("success"); tickerResponse.setTickerData(getPricesForSymbols(tickerSymbols)); String response = gson.toJson(tickerResponse); // send the client an update channel.get().writeAndFlush(new TextWebSocketFrame(response)); } // only try to send back to client every 2 seconds so it isn't overwhelmed with messages Thread.sleep(2000L); } return "done"; }
@Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) { // ping and pong frames already handled if (frame instanceof TextWebSocketFrame) { // Send the uppercase string back. String request = ((TextWebSocketFrame) frame).text(); handler.onText(new WsChannelImpl(ctx.channel()), request); } else { String message = "unsupported frame type: " + frame.getClass().getName(); throw new UnsupportedOperationException(message); } }
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // Check for closing frame if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } if (!(frame instanceof TextWebSocketFrame)) { throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass() .getName())); } // Send the uppercase string back. String request = ((TextWebSocketFrame) frame).text(); System.err.printf("%s received %s%n", ctx.channel(), request); ctx.channel().write(new TextWebSocketFrame(request.toUpperCase())); }
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { if (logger.isLoggable(Level.FINE)) { logger.fine(String.format( "Channel %s received %s", ctx.channel().hashCode(), StringUtil.simpleClassName(frame))); } if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame); } else if (frame instanceof PingWebSocketFrame) { ctx.write(new PongWebSocketFrame(frame.isFinalFragment(), frame.rsv(), frame.content()), ctx.voidPromise()); } else if (frame instanceof TextWebSocketFrame) { ctx.write(frame, ctx.voidPromise()); } else if (frame instanceof BinaryWebSocketFrame) { ctx.write(frame, ctx.voidPromise()); } else if (frame instanceof ContinuationWebSocketFrame) { ctx.write(frame, ctx.voidPromise()); } else if (frame instanceof PongWebSocketFrame) { frame.release(); // Ignore } else { throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass() .getName())); } }
@Override public void accept(ChannelHandlerContext ctx, WebSocketFrame frame) { if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); endpoint.releaseReferences(); endpoint.onClose(); return; } if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } if (frame instanceof TextWebSocketFrame) { endpoint.onMessage(((TextWebSocketFrame) frame).text()); return; } throw new UnsupportedOperationException(String.format("Unsupported websocket frame of type %s", frame.getClass().getName())); }
private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // 判断是否关闭链路的指令 if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } // 判断是否ping消息 if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } // 仅支持文本消息,不支持二进制消息 if (!(frame instanceof TextWebSocketFrame)) { ctx.close();//(String.format("%s frame types not supported", frame.getClass().getName())); return; } }
public void handle(ChannelHandlerContext ctx, WebSocketFrame frame) { if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame); onClose(ctx); return; } if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content())); return; } if (!(frame instanceof TextWebSocketFrame)) { throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass() .getName())); } String msg = ((TextWebSocketFrame) frame).text(); onMessage(ctx, msg); }
public void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // 判断是否是关闭链路的指令 if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } // 判断是否是Ping消息 if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } if (!(frame instanceof TextWebSocketFrame)) { throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName())); } //返回应答消息 String request= ((TextWebSocketFrame)frame).text(); System.out.println(String.format("%s received %s", ctx.channel(), request)); ctx.channel().write(new TextWebSocketFrame(request+" ,现在时刻:"+new Date())); }
@Override public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { Channel channel = ctx.channel(); if (!handshaker.isHandshakeComplete()) { handshaker.finishHandshake(channel, (FullHttpResponse) msg); handshakeFuture.setSuccess(); eventBus.post(new Connected()); return; } if (msg instanceof FullHttpResponse) { FullHttpResponse response = (FullHttpResponse) msg; throw new IllegalStateException( "Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')'); } WebSocketFrame frame = (WebSocketFrame) msg; if (frame instanceof TextWebSocketFrame) { TextWebSocketFrame textFrame = (TextWebSocketFrame) frame; eventBus.post(new Response(textFrame.text())); } else if (frame instanceof CloseWebSocketFrame) { channel.close(); eventBus.post(new Disconnected()); } }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnknownWebSocketFrameTypeException, ServerConnectorException { if (!(msg instanceof WebSocketFrame)) { logger.error("Expecting WebSocketFrame. Unknown type."); throw new UnknownWebSocketFrameTypeException("Expecting WebSocketFrame. Unknown type."); } if (msg instanceof TextWebSocketFrame) { notifyTextMessage((TextWebSocketFrame) msg); } else if (msg instanceof BinaryWebSocketFrame) { notifyBinaryMessage((BinaryWebSocketFrame) msg); } else if (msg instanceof CloseWebSocketFrame) { notifyCloseMessage((CloseWebSocketFrame) msg); } else if (msg instanceof PingWebSocketFrame) { notifyPingMessage((PingWebSocketFrame) msg); } else if (msg instanceof PongWebSocketFrame) { notifyPongMessage((PongWebSocketFrame) msg); } }
@Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { if (frame instanceof TextWebSocketFrame) { // Echos the same text String text = ((TextWebSocketFrame) frame).text(); if (PING.equals(text)) { ctx.writeAndFlush(new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[]{1, 2, 3, 4}))); return; } ctx.channel().writeAndFlush(new TextWebSocketFrame(text)); } else if (frame instanceof BinaryWebSocketFrame) { ctx.channel().writeAndFlush(frame.retain()); } else if (frame instanceof CloseWebSocketFrame) { ctx.close(); } else { String message = "unsupported frame type: " + frame.getClass().getName(); throw new UnsupportedOperationException(message); } }
private static void sendWebSockError(final ChannelHandlerContext ctx, final Number rid, final String session, final String error, final Throwable t) { final String ts; if(t != null) { final StringWriter sw = new StringWriter(); final PrintWriter pw = new PrintWriter(sw, true); sw.flush(); t.printStackTrace(pw); ts = sw.toString(); } else { ts = null; } ctx.writeAndFlush(new TextWebSocketFrame( JSON.serializeToBuf( FluentMap.newMap(MapType.LINK, String.class, Object.class) .fput("error", error) .fput("rid", rid) .sfput("session", session) .sfput("trace", ts) .asMap(LinkedHashMap.class) ) )); }
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // Check for closing frame if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } if (!(frame instanceof TextWebSocketFrame)) { throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass() .getName())); } // Send the uppercase string back. String request = ((TextWebSocketFrame) frame).text(); if (logger.isLoggable(Level.FINE)) { logger.fine(String.format("%s received %s", ctx.channel(), request)); } ctx.channel().write(new TextWebSocketFrame(request.toUpperCase())); }
private void performSend(byte[] raw) throws IOException { if (this.outBuf != null) { this.outBuf.write(raw); raw = this.outBuf.toByteArray(); this.outBuf = null; } //char[] encoded = Base64.encode(raw); if (this.binary) { this.ctx.channel().write(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(raw))); } else { this.ctx.channel().write(new TextWebSocketFrame(StringUtil.toUtfString(raw))); } }
@Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { this.last = ctx; if (frame instanceof CloseWebSocketFrame) { this.log.debug("recevied close frame"); this.server.unsubscribe(this); this.handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame); } else if (frame instanceof PingWebSocketFrame) { this.log.debug("recevied ping frame"); ctx.write(new PongWebSocketFrame(frame.content())); } else if (frame instanceof TextWebSocketFrame) { this.log.debug("recevied text frame"); this.handleTextWebSocketFrame(ctx, (TextWebSocketFrame)frame); } else { this.log.info("recevied unknown incompatible frame"); ctx.close(); } }
@Override protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception { if (msg instanceof String) { ObjectNode node = objectMapper.createObjectNode(); node.put("desc", (String)msg); out.add(new TextWebSocketFrame(node.toString())); } else if (msg instanceof JsonNode) { out.add(new TextWebSocketFrame(msg.toString())); } else { out.add(msg); } }
@Override public void channelRead0(ChannelHandlerContext ctx, Object data) { if (data instanceof FullHttpRequest) { FullHttpRequest req = (FullHttpRequest) data; if (!isWebSocket) { handleHttpRequest(ctx, req); } else { sendErrorResponse(ctx, req, BAD_REQUEST); } } else if (data instanceof TextWebSocketFrame) { handleWebSocketFrame(ctx, (TextWebSocketFrame) data); } else { // invalid data ctx.close(); } }