@Test public void testConnection() { AtomicReference<Throwable> error = new AtomicReference<>(); AtomicReference<Buffer> frame = new AtomicReference<>(); AtomicReference<WebSocket> socket = new AtomicReference<>(); vertx.createHttpClient().websocket(8080, "localhost", "/stomp", MultiMap.caseInsensitiveMultiMap().add ("Sec-WebSocket-Protocol", "v10.stomp, v11.stomp, v12.stomp"), ws -> { socket.set(ws); ws.exceptionHandler(error::set) .handler(frame::set) .write(new Frame(Frame.Command.CONNECT, Headers.create("accept-version", "1.2,1.1,1.0", "heart-beat", "10000,10000"), null).toBuffer()); }); await().atMost(10, TimeUnit.SECONDS).until(() -> error.get() == null && frame.get() != null); assertThat(frame.get().toString()).startsWith("CONNECTED") .contains("server:vertx-stomp", "heart-beat:", "session:", "version:1.2"); socket.get().close(); }
@Test public void testTextFrameRawWebSocket() throws InterruptedException { String serverPath = "/textecho"; setupSockJsServer(serverPath, this::echoRequest); String message = "hello"; AtomicReference<String> receivedReply = new AtomicReference<>(); WebSocket ws = setupRawWebsocketClient(serverPath); ws.handler(replyBuffer -> receivedReply.set(replyBuffer.toString())); ws.writeFrame(WebSocketFrame.textFrame(message, true)); await(5, TimeUnit.SECONDS); assertEquals("Client reply should have matched request", message, receivedReply.get()); }
@Test public void testTextFrameSockJs() throws InterruptedException { String serverPath = "/text-sockjs"; setupSockJsServer(serverPath, this::echoRequest); List<Buffer> receivedMessages = new ArrayList<>(); WebSocket openedWebSocket = setupSockJsClient(serverPath, receivedMessages); String messageToSend = "[\"testMessage\"]"; openedWebSocket.writeFrame(WebSocketFrame.textFrame(messageToSend, true)); await(5, TimeUnit.SECONDS); assertEquals("Client should have received 2 messages: the reply and the close.", 2, receivedMessages.size()); Buffer expectedReply = Buffer.buffer("a" + messageToSend); assertEquals("Client reply should have matched request", expectedReply, receivedMessages.get(0)); assertEquals("Final message should have been a close", SOCKJS_CLOSE_REPLY, receivedMessages.get(1)); }
/** * This sets up a handler on the websocket */ private WebSocket setupSockJsClient(String serverPath, List<Buffer> receivedMessagesCollector) throws InterruptedException { String requestURI = serverPath + "/000/000/websocket"; AtomicReference<WebSocket> openedWebSocketReference = new AtomicReference<>(); CountDownLatch openSocketCountDown = new CountDownLatch(1); client.websocket(requestURI, ws -> { openedWebSocketReference.set(ws); ws.handler(replyBuffer -> { log.debug("Client received " + replyBuffer); String textReply = replyBuffer.toString(); if ("o".equals(textReply)) { openSocketCountDown.countDown(); } else { receivedMessagesCollector.add(replyBuffer); } }); ws.endHandler(v -> testComplete()); ws.exceptionHandler(this::fail); }); openSocketCountDown.await(5, TimeUnit.SECONDS); return openedWebSocketReference.get(); }
/** * This does not set up a handler on the websocket */ private WebSocket setupRawWebsocketClient(String serverPath) throws InterruptedException { String requestURI = serverPath + "/websocket"; AtomicReference<WebSocket> openedWebSocketReference = new AtomicReference<>(); CountDownLatch openSocketCountDown = new CountDownLatch(1); client.websocket(requestURI, ws -> { openedWebSocketReference.set(ws); openSocketCountDown.countDown(); ws.endHandler(v -> testComplete()); ws.exceptionHandler(this::fail); }); openSocketCountDown.await(5, TimeUnit.SECONDS); return openedWebSocketReference.get(); }
private void sendHeartbeat(WebSocket ws) { try { ws.writeFrame(new WebSocketFrameImpl(FrameType.PING)); } catch (IllegalStateException e) { LOGGER.error("heartbeat fail", e); } }
@Override public Map<HttpClientMetrics, ?> connected(Map<HttpClientMetrics, ?> endpointMetric, Map<HttpClientMetrics, ?> socketMetric, WebSocket webSocket) { return unmap2WithResult(endpointMetric, socketMetric, (m, ec, sc) -> m.connected(ec, sc, webSocket)); }
@Override public StopWatch connected(StopWatch endpointWatch, StopWatch socketWatch, WebSocket webSocket) { counterService.increment("websockets.connected"); StopWatch websocketWatch = new StopWatch(); websocketWatch.start("websocket"); return websocketWatch; }
@Override public SocketAddress connected(Void aVoid, SocketAddress socketAddress, WebSocket webSocket) { long value = wsConnections.incrementAndGet(); setMetric(SENSISION_CLASS_WEBSOCKETS, defaultLabels, value); incrementMetric(SENSISION_CLASS_WEBSOCKET_CONNECTED_COUNT, defaultLabels); return null; }
@Test public void testSendingAMessage() { AtomicReference<Throwable> error = new AtomicReference<>(); AtomicReference<Frame> frame = new AtomicReference<>(); AtomicReference<WebSocket> socket = new AtomicReference<>(); AtomicReference<StompClientConnection> client = new AtomicReference<>(); clients.add(StompClient.create(vertx).connect(61613, "localhost", connection -> { connection.result().subscribe("foo", frame::set, r -> { client.set(connection.result()); }); })); await().atMost(10, TimeUnit.SECONDS).until(() -> client.get() != null); await().atMost(10, TimeUnit.SECONDS).until(() -> server.stompHandler().getDestination("foo") != null); vertx.createHttpClient().websocket(8080, "localhost", "/stomp", MultiMap.caseInsensitiveMultiMap().add ("Sec-WebSocket-Protocol", "v10.stomp, v11.stomp, v12.stomp"), ws -> { socket.set(ws); ws.exceptionHandler(error::set) .handler(buffer -> { if (buffer.toString().startsWith("CONNECTED")) { ws.write( new Frame(Frame.Command.SEND, Headers.create("header", "value", "destination", "foo"), Buffer .buffer("hello")).toBuffer()); } }) .write(new Frame(Frame.Command.CONNECT, Headers.create("accept-version", "1.2,1.1,1.0", "heart-beat", "10000,10000"), null).toBuffer()); }); await().atMost(10, TimeUnit.SECONDS).until(() -> error.get() == null && frame.get() != null); assertThat(frame.get().toString()).startsWith("MESSAGE") .contains("destination:foo", "header:value", "\nhello"); socket.get().close(); }
@Test public void testPingFromServer() { AtomicReference<Throwable> error = new AtomicReference<>(); AtomicReference<WebSocket> socket = new AtomicReference<>(); AtomicReference<Boolean> flag = new AtomicReference<>(); AtomicReference<StompClientConnection> client = new AtomicReference<>(); clients.add(StompClient.create(vertx).connect(61613, "localhost", connection -> { client.set(connection.result()); })); await().atMost(10, TimeUnit.SECONDS).until(() -> client.get() != null); vertx.createHttpClient().websocket(8080, "localhost", "/stomp", MultiMap.caseInsensitiveMultiMap().add ("Sec-WebSocket-Protocol", "v10.stomp, v11.stomp, v12.stomp"), ws -> { socket.set(ws); ws.exceptionHandler(error::set) .handler(buffer -> { vertx.setTimer(1000, id -> { flag.set(true); }); }) .write(new Frame(Frame.Command.CONNECT, Headers.create("accept-version", "1.2,1.1,1.0", "heart-beat", "100,0"), null).toBuffer()); }); await().atMost(10, TimeUnit.SECONDS).until(() -> error.get() == null && flag.get() != null); socket.get().close(); }
protected static void sendTypeToBridge(WebSocket ws, String type, String address, String msg) { JsonObject json = new JsonObject(); json.put("type", type); json.put("address", address); json.put("body", msg); ws.write(Buffer.buffer(json.toString())); }
@Override public SocketAddress connected(Void endpointMetric, SocketAddress key, WebSocket webSocket) { HttpClientConnectionsMeasurements measurements = connectionsMeasurements.get(key); if (measurements != null) { measurements.incrementWsConnectionCount(); } return key; }
/** * Writing multiple continuation frames from the client side should result in a single message on the server side * after the frames are re-combined */ @Test public void testCombineBinaryContinuationFramesRawWebSocket() throws InterruptedException { String serverPath = "/combine"; AtomicReference<Buffer> serverReceivedMessage = new AtomicReference<>(); setupSockJsServer(serverPath, (sock, requestBuffer) -> { serverReceivedMessage.set(requestBuffer); sock.write(Buffer.buffer("reply")); sock.close(); }); Buffer largeMessage = Buffer.buffer(TestUtils.randomAlphaString(30)); WebSocketFrame frame1 = WebSocketFrame.binaryFrame(largeMessage.slice(0, 10), false); WebSocketFrame frame2 = WebSocketFrame.continuationFrame(largeMessage.slice(10, 20), false); WebSocketFrame frame3 = WebSocketFrame.continuationFrame(largeMessage.slice(20, largeMessage.length()), true); WebSocket ws = setupRawWebsocketClient(serverPath); ws.writeFrame(frame1); ws.writeFrame(frame2); ws.writeFrame(frame3); await(5, TimeUnit.SECONDS); assertEquals("Server did not combine continuation frames correctly", largeMessage, serverReceivedMessage.get()); }
@Test public void testSplitLargeReplyRawWebSocket() throws InterruptedException { String serverPath = "/split"; String largeReply = TestUtils.randomAlphaString(65536 * 5); Buffer largeReplyBuffer = Buffer.buffer(largeReply); setupSockJsServer(serverPath, (sock, requestBuffer) -> { sock.write(largeReplyBuffer); sock.close(); }); Buffer totalReplyBuffer = Buffer.buffer(largeReplyBuffer.length()); AtomicInteger receivedReplies = new AtomicInteger(0); WebSocket ws = setupRawWebsocketClient(serverPath); ws.handler(replyBuffer -> { totalReplyBuffer.appendBuffer(replyBuffer); receivedReplies.incrementAndGet(); }); ws.writeFrame(WebSocketFrame.binaryFrame(Buffer.buffer("hello"), true)); await(5, TimeUnit.SECONDS); int receivedReplyCount = receivedReplies.get(); assertEquals("Combined reply on client should equal message from server", largeReplyBuffer, totalReplyBuffer); assertTrue("Should have received > 1 reply frame, actually received " + receivedReplyCount, receivedReplyCount > 1); }
@Test public void testCombineTextFrameSockJs() throws InterruptedException { String serverPath = "/text-combine-sockjs"; setupSockJsServer(serverPath, this::echoRequest); List<Buffer> receivedMessages = new ArrayList<>(); WebSocket openedWebSocket = setupSockJsClient(serverPath, receivedMessages); Buffer largeMessage = Buffer.buffer("[\"" + TestUtils.randomAlphaString(30) + "\"]"); WebSocketFrame frame1 = new WebSocketFrameImpl(FrameType.TEXT, largeMessage.slice(0, 10).getByteBuf(), false); WebSocketFrame frame2 = WebSocketFrame.continuationFrame(largeMessage.slice(10, 20), false); WebSocketFrame frame3 = WebSocketFrame.continuationFrame(largeMessage.slice(20, largeMessage.length()), true); log.debug("Client sending " + frame1.textData()); openedWebSocket.writeFrame(frame1); log.debug("Client sending " + frame2.textData()); openedWebSocket.writeFrame(frame2); log.debug("Client sending " + frame3.textData()); openedWebSocket.writeFrame(frame3); await(5, TimeUnit.SECONDS); assertEquals("Client should have received 2 messages: the reply and the close.", 2, receivedMessages.size()); Buffer expectedReply = Buffer.buffer("a" + largeMessage.toString()); assertEquals("Client reply should have matched request", expectedReply, receivedMessages.get(0)); assertEquals("Final message should have been a close", SOCKJS_CLOSE_REPLY, receivedMessages.get(1)); }
@Test public void testSplitLargeReplySockJs() throws InterruptedException { String serverPath = "/large-reply-sockjs"; String largeMessage = TestUtils.randomAlphaString(65536 * 2); Buffer largeReplyBuffer = Buffer.buffer(largeMessage); setupSockJsServer(serverPath, (sock, requestBuffer) -> { sock.write(largeReplyBuffer); sock.close(); }); List<Buffer> receivedMessages = new ArrayList<>(); WebSocket openedWebSocket = setupSockJsClient(serverPath, receivedMessages); String messageToSend = "[\"hello\"]"; openedWebSocket.writeFrame(WebSocketFrame.textFrame(messageToSend, true)); await(5, TimeUnit.SECONDS); int receivedReplyCount = receivedMessages.size(); assertTrue("Should have received > 2 reply frame, actually received " + receivedReplyCount, receivedReplyCount > 2); Buffer expectedReplyBuffer = Buffer.buffer("a[\"").appendBuffer(largeReplyBuffer).appendBuffer(Buffer.buffer("\"]")); Buffer clientReplyBuffer = combineReplies(receivedMessages.subList(0, receivedMessages.size() - 1)); assertEquals(String.format("Combined reply on client (length %s) should equal message from server (%s)", clientReplyBuffer.length(), expectedReplyBuffer.length()), expectedReplyBuffer, clientReplyBuffer); Buffer finalMessage = receivedMessages.get(receivedMessages.size() - 1); assertEquals("Final message should have been a close", SOCKJS_CLOSE_REPLY, finalMessage); }
@Override public void handle(WebSocket webSocket) { connecting = false; connected = true; logger.info("Bridge Client - connected to server [" + remoteBridgeHost + ":" + remoteBridgePort + "]"); webSocket.write(Buffer.buffer( tenant + "\n" )); webSocket.write(Buffer.buffer( "START SESSION" + "\n" )); webSocket.pause(); final EventBusWebsocketBridge ebnb = new EventBusWebsocketBridge(webSocket, vertx.eventBus(), address); webSocket.closeHandler(aVoid -> { logger.error("Bridge Client - closed connection from server [" + remoteBridgeHost + ":" + remoteBridgePort + "]" + webSocket.textHandlerID()); ebnb.stop(); connected = false; }); webSocket.exceptionHandler(throwable -> { logger.error("Bridge Client - Exception: " + throwable.getMessage(), throwable); ebnb.stop(); connected = false; }); ebnb.setTenant(tenant); ebnb.start(); logger.info("Bridge Client - bridgeUUID: "+ ebnb.getBridgeUUID()); webSocket.resume(); }
private void startHeartBeatThread(WebSocket ws) { heartbeatTask = Executors.newScheduledThreadPool(1); heartbeatTask.scheduleWithFixedDelay(() -> sendHeartbeat(ws), HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS); }
private Handler<WebSocket> bittrexWebSocketHandler(){ JsonObject msg1 = new JsonObject().put("H", "corehub") .put("M", "SubscribeToExchangeDeltas") .put("A", new JsonArray().add(config().getString("tradingPair"))) .put("I", 0); JsonObject msg2 = new JsonObject().put("H", "corehub") .put("M", "QueryExchangeState") .put("A", new JsonArray().add(config().getString("tradingPair"))) .put("I", 1); return websocket -> { websocket.handler(data -> { //System.out.println("Received data " + data.toString("ISO-8859-1")); JsonObject msg = data.toJsonObject(); if(msg.containsKey("R") && msg.getString("I").equals("1")){ vertx.eventBus().publish(initOrderBookMessage, msg.getJsonObject("R")); } if(msg.containsKey("M") && msg.getJsonArray("M").size() > 0 && msg.getJsonArray("M").getJsonObject(0).getString("M").equals("updateExchangeState") && msg.getJsonArray("M").getJsonObject(0).containsKey("A") && msg.getJsonArray("M").getJsonObject(0).getJsonArray("A").size() > 0){ JsonObject payload = msg.getJsonArray("M").getJsonObject(0).getJsonArray("A").getJsonObject(0); //OrderBookUpdate payload = msg.getJsonArray("M").getJsonObject(0).getJsonArray("A").getJsonObject(0).mapTo(OrderBookUpdate.class); if(msg.getJsonArray("M").getJsonObject(0).getJsonArray("A").getJsonObject(0).getJsonArray("Fills").size()>0){ vertx.eventBus().publish(handleFillsMessage, payload); } vertx.eventBus().publish(updateOrderBookMessage, payload); } }); websocket.writeTextMessage(msg2.encode()); websocket.writeTextMessage(msg1.encode()); }; }
@Override public @Nullable Void connected(@Nullable Void endpointMetric, @Nullable Void socketMetric, @NotNull WebSocket webSocket) { websockets.increment(); return socketMetric; }
@Override public SocketAddress connected(final Void endpointMetric, final SocketAddress socketMetric, final WebSocket webSocket) { return null; }
@Override public void eventbus(Handler<WebSocket> wsConnect, Handler<Throwable> failureHandler) { getClient().websocket(getBaseUri() + "/eventbus/websocket", wsConnect, failureHandler); }
@Override public void eventbus(Handler<WebSocket> wsConnect, Handler<Throwable> failureHandler) { // TODO Auto-generated method stub }
@Test public void testReceivingAMessage() { AtomicReference<Throwable> error = new AtomicReference<>(); AtomicReference<Buffer> frame = new AtomicReference<>(); AtomicReference<WebSocket> socket = new AtomicReference<>(); AtomicReference<StompClientConnection> client = new AtomicReference<>(); clients.add(StompClient.create(vertx).connect(61613, "localhost", connection -> { client.set(connection.result()); })); await().atMost(10, TimeUnit.SECONDS).until(() -> client.get() != null); vertx.createHttpClient().websocket(8080, "localhost", "/stomp", MultiMap.caseInsensitiveMultiMap().add ("Sec-WebSocket-Protocol", "v10.stomp, v11.stomp, v12.stomp"), ws -> { socket.set(ws); ws.exceptionHandler(error::set) .handler(buffer -> { if (buffer.toString().startsWith("CONNECTED")) { ws.write( new Frame(Frame.Command.SUBSCRIBE, Headers.create("id", "sub-0", "destination", "foo"), null) .toBuffer()); return; } if (frame.get() == null) { frame.set(buffer); } }) .write(new Frame(Frame.Command.CONNECT, Headers.create("accept-version", "1.2,1.1,1.0", "heart-beat", "10000,10000"), null).toBuffer()); }); await().atMost(10, TimeUnit.SECONDS).until(() -> server.stompHandler().getDestination("foo") != null); client.get().send("foo", Headers.create("header", "value"), Buffer.buffer("hello")); await().atMost(10, TimeUnit.SECONDS).until(() -> error.get() == null && frame.get() != null); assertThat(frame.get().toString()).startsWith("MESSAGE") .contains("destination:foo", "content-length:5", "header:value", "subscription:sub-0", "\nhello"); socket.get().close(); }
@Test /* Constructs a message with size == 2*MAX_WEBSOCKET_FRAME_SIZE. The message is then sent via eventBus bridge. The test then reads the message via WebSocket and makes sure that the message is delivered in three WebSocketFrames. Regression for #35 */ public void testSendingAMessageBiggerThanSocketFrameSize() { AtomicReference<Throwable> error = new AtomicReference<>(); List<WebSocketFrame> wsBuffers = new ArrayList<>(); List<Buffer> stompBuffers = new ArrayList<>(); AtomicReference<WebSocket> socket = new AtomicReference<>(); AtomicReference<StompClientConnection> client = new AtomicReference<>(); clients.add(StompClient.create(vertx).connect(61613, "localhost", connection -> { connection.result().subscribe("bigData", h-> {}, r -> { client.set(connection.result()); }); connection.result().receivedFrameHandler(stompFrame -> { if(stompFrame.toBuffer().toString().startsWith("MESSAGE")) { stompBuffers.add(stompFrame.toBuffer()); } }); })); vertx.createHttpClient().websocket(8080, "localhost", "/stomp", MultiMap.caseInsensitiveMultiMap().add ("Sec-WebSocket-Protocol", "v10.stomp, v11.stomp, v12.stomp"), ws -> { ws.exceptionHandler(error::set) .handler(buffer -> { if (buffer.toString().startsWith("CONNECTED")) { ws.write( new Frame(Frame.Command.SUBSCRIBE, Headers.create("id", "myId", "destination", "bigData"), null) .toBuffer()); return; } // Start collecting the frames once we see the first real payload message if (buffer.toString().startsWith("MESSAGE")) { ws.frameHandler(wsBuffers::add); } }) .write(new Frame(Frame.Command.CONNECT, Headers.create("accept-version", "1.2,1.1,1.0", "heart-beat", "10000,10000"), null).toBuffer()); socket.set(ws); }); // Create content that is slightly bigger than the size of a single web socket frame String bufferContent = StringUtils.repeat("*", 2 * MAX_WEBSOCKET_FRAME_SIZE); await().atMost(10, TimeUnit.SECONDS).until(() -> client.get() != null); await().atMost(10, TimeUnit.SECONDS).until(() -> socket.get() != null); vertx.eventBus().publish("bigData",bufferContent); await().atMost(10, TimeUnit.SECONDS).until(() -> error.get() == null && stompBuffers.size() == 1); await().atMost(10, TimeUnit.SECONDS).until(() -> error.get() == null && wsBuffers.size() == 3); // STOMP message has 2048 bytes of payload + headers => 2167 bytes assertEquals(2167, stompBuffers.get(0).getBytes().length); // We expect two complete frames + 1 with 116 bytes assertEquals(MAX_WEBSOCKET_FRAME_SIZE, wsBuffers.get(0).binaryData().getBytes().length); assertEquals(MAX_WEBSOCKET_FRAME_SIZE, wsBuffers.get(1).binaryData().getBytes().length); assertEquals(116, wsBuffers.get(2).binaryData().getBytes().length); socket.get().close(); }
@Test public void testWebSocketsWhenTCPDisabled() { AsyncLock<Void> lock = new AsyncLock<>(); server.close(lock.handler()); lock.waitForSuccess(); lock = new AsyncLock<>(); http.close(lock.handler()); lock.waitForSuccess(); server = StompServer.create(vertx, new StompServerOptions().setWebsocketBridge(true).setPort(-1) .setWebsocketPath("/something")) .handler(StompServerHandler.create(vertx)); AsyncLock<HttpServer> httpLock = new AsyncLock<>(); http = vertx.createHttpServer().websocketHandler(server.webSocketHandler()).listen(8080, httpLock.handler()); httpLock.waitForSuccess(); AtomicReference<Throwable> error = new AtomicReference<>(); AtomicReference<WebSocket> sender = new AtomicReference<>(); AtomicReference<WebSocket> receiver = new AtomicReference<>(); AtomicReference<Buffer> frame = new AtomicReference<>(); vertx.createHttpClient().websocket(8080, "localhost", "/something", MultiMap.caseInsensitiveMultiMap().add ("Sec-WebSocket-Protocol", "v10.stomp, v11.stomp, v12.stomp"), ws -> { receiver.set(ws); ws.exceptionHandler(error::set) .handler(buffer -> { if (buffer.toString().startsWith("CONNECTED")) { ws.write( new Frame(Frame.Command.SUBSCRIBE, Headers.create("id", "sub-0", "destination", "foo"), null) .toBuffer()); return; } if (frame.get() == null) { frame.set(buffer); } }) .write(new Frame(Frame.Command.CONNECT, Headers.create("accept-version", "1.2,1.1,1.0", "heart-beat", "10000,10000"), null).toBuffer()); }); await().atMost(10, TimeUnit.SECONDS).until(() -> server.stompHandler().getDestination("foo") != null); vertx.createHttpClient().websocket(8080, "localhost", "/something", MultiMap.caseInsensitiveMultiMap().add ("Sec-WebSocket-Protocol", "v10.stomp, v11.stomp, v12.stomp"), ws -> { sender.set(ws); ws.exceptionHandler(error::set) .handler(buffer -> { if (buffer.toString().startsWith("CONNECTED")) { ws.write( new Frame(Frame.Command.SEND, Headers.create("header", "value", "destination", "foo"), Buffer .buffer("hello")).toBuffer()); } }) .write(new Frame(Frame.Command.CONNECT, Headers.create("accept-version", "1.2,1.1,1.0", "heart-beat", "10000,10000"), null).toBuffer()); }); await().atMost(10, TimeUnit.SECONDS).until(() -> error.get() == null && frame.get() != null); assertThat(frame.get().toString()).startsWith("MESSAGE") .contains("destination:foo", "header:value", "subscription:sub-0", "\nhello"); receiver.get().close(); sender.get().close(); }
protected static void registerThroughBridge(WebSocket ws, String address, String msg) { sendTypeToBridge(ws, "register", address, msg); }
protected static void publishThroughBridge(WebSocket ws, String address, String msg) { sendTypeToBridge(ws, "publish", address, msg); }
protected static void sendThroughBridge(WebSocket ws, String address, String msg) { sendTypeToBridge(ws, "send", address, msg); }
@Override public WebSocketMetric connected(EndpointMetric endpointMetric, Long socketMetric, WebSocket webSocket) { return clientReporter.createWebSocketMetric(); }
private HttpClient getClient(final Handler<WebSocket> handler, final String path) { HttpClient client = vertx. createHttpClient(new HttpClientOptions()).websocket(8080, "localhost", path, handler); return client; }
private HttpClient getClient(final Handler<WebSocket> handler, final String path) { HttpClient client = getVertx(). createHttpClient(new HttpClientOptions()).websocket(8080, "localhost", path, handler); return client; }
/** * Connect to the mesh eventbus bridge via a websocket. * * @param wsConnect * @param failureHandler */ void eventbus(Handler<WebSocket> wsConnect, Handler<Throwable> failureHandler);
/** * Connect to the mesh eventbus bridge via a websocket. * * @param wsConnect */ default void eventbus(Handler<WebSocket> wsConnect) { eventbus(wsConnect, null); }