private Handler<ServerWebSocket> initWebSocketHandler(Router vaadinRouter, SessionStore sessionStore) { /* VaadinVerticleConfiguration vaadinVerticleConfiguration = getClass().getAnnotation(VaadinVerticleConfiguration.class); String mountPoint = Optional.ofNullable(vaadinVerticleConfiguration) .map(VaadinVerticleConfiguration::mountPoint) .orElse(config().getString("mountPoint", "/")); */ String mountPoint = config().getString("mountPoint", "/"); String sessionCookieName = sessionCookieName(); WebsocketSessionHandler.WebsocketSessionHandlerBuilder websocketSessionHandlerBuilder = WebsocketSessionHandler.builder().mountPoint(mountPoint) .cookieName(sessionCookieName).sessionStore(sessionStore); AtmosphereCoordinator atmosphereCoordinator = initAtmosphere(vaadinRouter, service); router.get("/PUSH").handler(atmosphereCoordinator::route); router.post("/PUSH").handler(atmosphereCoordinator::route); return websocketSessionHandlerBuilder.next(atmosphereCoordinator::route).build(); }
@Override public void findRouteSocketInRegistryAndRemove(ServerWebSocket serverSocket) { final SharedData sharedData = this.vertx.sharedData(); final String binaryHandlerID = serverSocket.binaryHandlerID(); final String textHandlerID = serverSocket.textHandlerID(); final LocalMap<String, byte[]> wsRegistry = sharedData.getLocalMap(WS_REGISTRY); final WSEndpointHolder holder = getWSEndpointHolderFromSharedData(wsRegistry); if (holder != null) { final List<WSEndpoint> all = holder.getAll(); final Optional<WSEndpoint> first = all.parallelStream().filter(e -> e.getBinaryHandlerId().equals(binaryHandlerID) && e.getTextHandlerId().equals(textHandlerID)).findFirst(); first.ifPresent(endpoint -> { holder.remove(endpoint); wsRegistry.replace(WS_ENDPOINT_HOLDER, serialize(holder)); log("OK REMOVE: " + serverSocket.binaryHandlerID()); }); } }
private void sendToWSService(final ServerWebSocket serverSocket, final String path, final WSEndpoint endpoint) { final EventBus eventBus = vertx.eventBus(); serverSocket.handler(handler -> { try { log("send WS:+ " + endpoint.getUrl()); eventBus.send(path, Serializer.serialize(new WSDataWrapper(endpoint, handler.getBytes())), new DeliveryOptions().setSendTimeout(GlobalKeyHolder.DEFAULT_SERVICE_TIMEOUT)); } catch (IOException e) { e.printStackTrace(); } } ); serverSocket.resume(); //TODO set close handler!! }
@Override public void start() throws Exception { //TODO: Fix a better way of configuration other than system properties? Integer port = Integer.getInteger("websocket.port", 5556); ObservableFuture<HttpServer> httpServerObservable = RxHelper.observableFuture(); HttpServer httpServer = vertx.createHttpServer(new HttpServerOptions().setPort(port)); httpServerObservable.subscribe( a -> log.info("Starting web socket listener..."), e -> log.error("Could not start web socket listener at port " + port, e), () -> log.info("Started web socket listener on port " + port) ); Observable<Tup2<ServerWebSocket, Func1<Event, Boolean>>> eventObservable = EventObservable.convertFromWebSocketObservable(RxHelper.toObservable(httpServer.websocketStream())); eventObservable.subscribe(new EventToJsonAction(Riemann.getEvents(vertx), WebSocketFrameImpl::new), e -> { log.error(e); //TODO: Fix proper error handling }); httpServer.listen(httpServerObservable.asHandler()); }
@Override public void call(Tup2<ServerWebSocket, Func1<Event, Boolean>> r) { ServerWebSocket socket = r.getLeft(); Subscription subscription = eventObservable .filter(r.getRight()) .map(e -> new JsonObject() .put("tags", e.getTags()) .put("host", e.getHost()) .put("state", e.getState()) .put("service", e.getService()) .put("description", e.getDescription()) .put("metric", e.getMetric()) .put("time", e.getTime()) .put("ttl", e.getTtl()) ) .subscribe(json -> socket.writeFrame(frameFactory.call(json.encode()))); socket.closeHandler(h -> subscription.unsubscribe()); }
/** * Handles a web socket connection. * * @param socket the opening socket. */ @Override public void handle(final ServerWebSocket socket) { LOGGER.info("New web socket connection {}, {}", socket, socket.uri()); if (! configuration.accept(socket.uri())) { LOGGER.warn("Web Socket connection denied on {} by {}", socket.uri(), configuration.name()); return; } final Socket sock = new Socket(socket); accessor.getDispatcher().addSocket(socket.path(), sock); socket.closeHandler(event -> { LOGGER.info("Web Socket closed {}, {}", socket, socket.uri()); accessor.getDispatcher().removeSocket(socket.path(), sock); }); socket.handler(event -> accessor.getDispatcher().received(socket.path(), event.getBytes(), sock)); }
@Override public StopWatch upgrade(StopWatch requestWatch, ServerWebSocket serverWebSocket) { requestWatch.stop(); counterService.increment("requests.upgraded"); requestWatch.start("websocket"); return requestWatch; }
@Override public StopWatch connected(StopWatch socketMetric, ServerWebSocket serverWebSocket) { counterService.increment("websockets.connected"); StopWatch websocketWatch = new StopWatch(); websocketWatch.start("websocket"); return websocketWatch; }
@Test public void testWebsocketHandler(TestContext context) { Handler<ServerWebSocket> handler = new Handler<ServerWebSocket>() { @Override public void handle(ServerWebSocket event) { } }; context.assertEquals(server, server.websocketHandler(handler)); context.assertEquals(handler, server.websocketHandler()); }
@Override public void handle(ServerWebSocket serverWebSocket) { String basePath = Optional.ofNullable(mountPoint) .map(m -> m.substring(0, m.lastIndexOf('/')) ) .orElse(""); if (!serverWebSocket.path().startsWith(basePath + "/PUSH")) { serverWebSocket.reject(); } String cookieHeader = serverWebSocket.headers().get(COOKIE); if (cookieHeader != null) { Optional<String> sessionId = ServerCookieDecoder.STRICT.decode(cookieHeader).stream() .filter(cookie -> cookieName.equals(cookie.name())) .findFirst().map(Cookie::value); if (sessionId.isPresent()) { sessionId.ifPresent(sid -> sessionStore.get(sid, event -> { Session session = null; if (event.succeeded()) { session = event.result(); } next.accept(serverWebSocket, session); } )); return; } } next.accept(serverWebSocket, null); }
@Override public void start(Future<Void> fut) { vertx.createHttpServer().websocketHandler(new Handler<ServerWebSocket>() { public void handle(final ServerWebSocket ws) { final String id = ws.textHandlerID(); System.out.println("new connection from"+ ws.toString() + "id "+id); vertx.eventBus().consumer("chat",message -> { ws.writeFinalTextFrame((String) message.body()); }); ws.handler(new Handler<Buffer>() { public void handle(Buffer data) { // When our websocket receive data we publish it to our consumer vertx.eventBus().publish("chat",data.toString()); } }); ws.closeHandler(handler ->{ System.out.println("Close WS "); }); }} ).requestHandler(new Handler<HttpServerRequest>() { public void handle(HttpServerRequest req) { req.response().end("Chat"); //Not usefull but it display chat on our browser } }).listen(8080); }
@Override public Void connected(Void socketMetric, ServerWebSocket serverWebSocket) { long value = wsConnections.incrementAndGet(); setMetric(SENSISION_CLASS_WEBSOCKETS, defaultLabels, value); incrementMetric(SENSISION_CLASS_WEBSOCKET_CONNECTED_COUNT, defaultLabels); return null; }
public WebsocketWriteStream(ServerWebSocket socket, int maxFrameSize) { _socket = socket; _buffer = Buffer.buffer(maxFrameSize + 1024); _maxFrameSize = maxFrameSize; _format = DataFormat.Text; _waitForDrain = false; }
public VertxClientConnection(ExecutorService executorService, ServerWebSocket websocket, BackendConnectionPool connectionPool, int maxFrameSize) { _executorService = executorService; _maxFrameSize = maxFrameSize; _messageFormat = DataFormat.Text; _socket = websocket; _session = new ClientSession(connectionPool, this); _msgReceiver = new MessageReceiver(_session, new JacksonMessageDecoder()); logger = Logger.getGlobal(); initSendingPump(); registerHandlers(); logger.log(Level.INFO, "New client connected via websocket."); }
@Override public Handler<ServerWebSocket> webSocketHandler() { if (!options.isWebsocketBridge()) { return null; } StompServerHandler stomp; synchronized (this) { stomp = this.handler; } return socket -> { if (!socket.path().equals(options.getWebsocketPath())) { LOGGER.error("Receiving a web socket connection on an invalid path (" + socket.path() + "), the path is " + "configured to " + options.getWebsocketPath() + ". Rejecting connection"); socket.reject(); return; } StompServerConnection connection = new StompServerWebSocketConnectionImpl(socket, this, writingFrameHandler); FrameParser parser = new FrameParser(options); socket.exceptionHandler((exception) -> { LOGGER.error("The STOMP server caught a WebSocket error - closing connection", exception); connection.close(); }); socket.endHandler(v -> connection.close()); parser .errorHandler((exception) -> { connection.write( Frames.createInvalidFrameErrorFrame(exception)); connection.close(); } ) .handler(frame -> stomp.handle(new ServerFrameImpl(frame, connection))); socket.handler(parser); }; }
private void handleFrame(final ServerWebSocket webSocket, final WebSocketFrame frame) { // Deserializing received message: final Object request = serializer.deserialize(frame.binaryData().getBytes()); if (request instanceof MyJsonMessage) { System.out.println("Received message: " + ((MyJsonMessage) request).text); } // Sending a simple response message after 1 second: final MyJsonMessage response = new MyJsonMessage(); response.id = idCounter.getAndIncrement(); response.text = "Hello client "; vertx.setTimer(1000L, id -> webSocket.writeFinalBinaryFrame(Buffer.buffer(serializer.serialize(response)))); }
private void handleJsonFrame(final ServerWebSocket webSocket, final WebSocketFrame frame) { final byte[] packet = frame.binaryData().getBytes(); final long start = System.nanoTime(); final Object deserialized = jsonSerializer.deserialize(packet); final long time = System.nanoTime() - start; final com.github.czyzby.shared.json.ServerResponse response = new com.github.czyzby.shared.json.ServerResponse(); response.message = "Packet had " + packet.length + " bytes. Class: " + deserialized.getClass().getSimpleName() + ", took " + time + " nanos to deserialize."; System.out.println(response.message); final byte[] serialized = jsonSerializer.serialize(response); webSocket.writeFinalBinaryFrame(Buffer.buffer(serialized)); }
private void handleSerializationFrame(final ServerWebSocket webSocket, final WebSocketFrame frame) { final byte[] packet = frame.binaryData().getBytes(); final long start = System.nanoTime(); final Object deserialized = serializer.deserialize(packet); final long time = System.nanoTime() - start; final ServerResponse response = new ServerResponse("Packet had " + packet.length + " bytes. Class: " + deserialized.getClass().getSimpleName() + ", took " + time + " nanos to deserialize."); System.out.println(response.getMessage()); final byte[] serialized = serializer.serialize(response); webSocket.writeFinalBinaryFrame(Buffer.buffer(serialized)); }
@Override public void findRouteToWSServiceAndRegister(ServerWebSocket serverSocket) { this.vertx.sharedData().<String, ServiceInfoHolder>getClusterWideMap(REGISTRY, onSuccess(resultMap -> resultMap.get(GlobalKeyHolder.SERVICE_HOLDER, onSuccess(resultHolder -> findServiceEntryAndRegisterWS(serverSocket, resultHolder))) )); }
private void findServiceEntryAndRegisterWS(final ServerWebSocket serverSocket, final ServiceInfoHolder resultHolder) { if (resultHolder != null) { final String path = serverSocket.path(); log("find entry : " + path); final Optional<Operation> operationResult = findServiceInfoEntry(resultHolder, path); operationResult.ifPresent(op -> createEndpointDefinitionAndRegister(serverSocket) ); } }
@Override public void findRouteToWSServiceAndRegister(ServerWebSocket serverSocket) { final SharedData sharedData = this.vertx.sharedData(); sharedData.<String, ServiceInfoHolder>getClusterWideMap(REGISTRY, onSuccess(resultMap -> resultMap.get(GlobalKeyHolder.SERVICE_HOLDER, onSuccess(resultHolder -> findServiceEntryAndRegisterWS(serverSocket, resultHolder, sharedData))) )); }
@Override public void findRouteSocketInRegistryAndRemove(ServerWebSocket serverSocket) { final String binaryHandlerID = serverSocket.binaryHandlerID(); final String textHandlerID = serverSocket.textHandlerID(); this.vertx.sharedData().<String, WSEndpointHolder>getClusterWideMap(WS_REGISTRY, onSuccess(registryMap -> registryMap.get(WS_ENDPOINT_HOLDER, wsEndpointHolder -> { retrieveEndpointHolderAndRemove(serverSocket, binaryHandlerID, textHandlerID, registryMap, wsEndpointHolder); })) ); }
private void findServiceEntryAndRegisterWS(final ServerWebSocket serverSocket, final ServiceInfoHolder resultHolder, final SharedData sharedData) { if (resultHolder != null) { final String path = serverSocket.path(); log("find entry : " + path); final Optional<Operation> operationResult = findServiceInfoEntry(resultHolder, path); operationResult.ifPresent(op -> createEndpointDefinitionAndRegister(serverSocket, sharedData) ); } }
private void getEndpointHolderAndAdd(ServerWebSocket serverSocket, AsyncMap<String, WSEndpointHolder> registryMap) { registryMap.get(WS_ENDPOINT_HOLDER, wsEndpointHolder -> { if (wsEndpointHolder.succeeded()) { updateWSEndpointHolder(serverSocket, registryMap, wsEndpointHolder); } }); }
private void updateWSEndpointHolder(ServerWebSocket serverSocket, AsyncMap<String, WSEndpointHolder> registryMap, AsyncResult<WSEndpointHolder> wsEndpointHolder) { log("add entry: " + Thread.currentThread()); final String binaryHandlerId = serverSocket.binaryHandlerID(); final String textHandlerId = serverSocket.textHandlerID(); final String path = serverSocket.path(); final EventBus eventBus = vertx.eventBus(); final WSEndpoint endpoint = new WSEndpoint(binaryHandlerId, textHandlerId, path); final WSEndpointHolder result = wsEndpointHolder.result(); if (result != null) { addDefinitionToRegistry(serverSocket, eventBus, path, endpoint, registryMap, result); } else { createEntryAndAddDefinition(serverSocket, eventBus, path, endpoint, registryMap); } }
private void createEntryAndAddDefinition(ServerWebSocket serverSocket, EventBus eventBus, String path, WSEndpoint endpoint, AsyncMap<String, WSEndpointHolder> registryMap) { final WSEndpointHolder holder = new WSEndpointHolder(); holder.add(endpoint); registryMap.put(WS_ENDPOINT_HOLDER, holder, s -> { if (s.succeeded()) { log("OK ADD: " + serverSocket.binaryHandlerID() + " Thread" + Thread.currentThread()); sendToWSService(serverSocket, eventBus, path, endpoint); } } ); }
private void addDefinitionToRegistry(ServerWebSocket serverSocket, EventBus eventBus, String path, WSEndpoint endpoint, AsyncMap<String, WSEndpointHolder> registryMap, WSEndpointHolder wsEndpointHolder) { wsEndpointHolder.add(endpoint); registryMap.replace(WS_ENDPOINT_HOLDER, wsEndpointHolder, s -> { if (s.succeeded()) { log("OK REPLACE: " + serverSocket.binaryHandlerID() + " Thread" + Thread.currentThread()); sendToWSService(serverSocket, eventBus, path, endpoint); } } ); }
private void sendToWSService(final ServerWebSocket serverSocket, final EventBus eventBus, final String path, final WSEndpoint endpoint) { serverSocket.handler(handler -> { try { eventBus.send(path, Serializer.serialize(new WSDataWrapper(endpoint, handler.getBytes())), new DeliveryOptions().setSendTimeout(GlobalKeyHolder.DEFAULT_SERVICE_TIMEOUT)); } catch (IOException e) { e.printStackTrace(); } } ); serverSocket.resume(); //TODO set close handler!! }
private void retrieveEndpointHolderAndRemove(ServerWebSocket serverSocket, String binaryHandlerID, String textHandlerID, AsyncMap<String, WSEndpointHolder> registryMap, AsyncResult<WSEndpointHolder> wsEndpointHolder) { if (wsEndpointHolder.succeeded()) { final WSEndpointHolder result = wsEndpointHolder.result(); if (result != null) { findEndpointAndRemove(serverSocket, binaryHandlerID, textHandlerID, registryMap,result); } } }
private void findEndpointAndRemove(ServerWebSocket serverSocket, String binaryHandlerID, String textHandlerID, AsyncMap<String, WSEndpointHolder> registryMap, WSEndpointHolder wsEndpointHolder) { final List<WSEndpoint> all = wsEndpointHolder.getAll(); final Optional<WSEndpoint> first = all.stream().filter(e -> e.getBinaryHandlerId().equals(binaryHandlerID) && e.getTextHandlerId().equals(textHandlerID)).findFirst(); if (first.isPresent()) { first.ifPresent(endpoint -> { wsEndpointHolder.remove(endpoint); registryMap.replace(WS_ENDPOINT_HOLDER, wsEndpointHolder, replaceHolder -> log("OK REMOVE: " + serverSocket.binaryHandlerID() + " succeed:" + replaceHolder.succeeded())); }); } }
public void accept(ServerWebSocket ws){ final String path = ws.path(); if(routes.containsKey(path)) { sessions.put(ws,path); Handler handler = routes.get(path); ws.handler(handler); }else { //TODO error handling } }
public static Observable<Tup2<ServerWebSocket, Func1<Event, Boolean>>> convertFromWebSocketObservable(Observable<ServerWebSocket> webSocketObservable) { return webSocketObservable.map(s -> { try { List<NameValuePair> query = URLEncodedUtils.parse(new URI(s.uri()), "UTF-8"); NameValuePair nameValuePair = query.stream().filter(p -> "query".equals(p.getName())).findAny().get(); return Tup2.create(s, Query.parse(nameValuePair.getValue())); } catch (Exception e) { throw new NetSocketException(s, e); } }); }
@Test public void testEventAction() { WebSocketFrameImpl frame = new WebSocketFrameImpl(); ServerWebSocket socketMock = mock(ServerWebSocket.class); Event event = new Event("host", "service", "state", "desc", Arrays.asList("blaha"), null, 1, 1.0F, 1.0D); new EventToJsonAction(Observable.just(event), s -> { assertEquals(s, "{\"tags\":[\"blaha\"],\"host\":\"host\",\"state\":\"state\",\"service\":\"service\",\"description\":\"desc\",\"metric\":1.0,\"time\":1,\"ttl\":1.0}"); return frame; }).call(Tup2.create(socketMock, e -> true)); verify(socketMock).writeFrame(frame); }
@Test public void testConversionFromWebSocket() { ServerWebSocket socket = mock(ServerWebSocket.class); when(socket.uri()).thenReturn("http://something.com?query=true"); Observable<Tup2<ServerWebSocket, Func1<Event, Boolean>>> observable = EventObservable.convertFromWebSocketObservable(Observable.just(socket)); observable.subscribe(t -> { assertEquals(true, t.getRight().call(null)); testComplete(); }); await(); }
@Test public void testConversionFromWebSocketWithMoreComplexQuery() { ServerWebSocket socket = mock(ServerWebSocket.class); when(socket.uri()).thenReturn("http://something.com?query=host+%3D+test"); Observable<Tup2<ServerWebSocket, Func1<Event, Boolean>>> observable = EventObservable.convertFromWebSocketObservable(Observable.just(socket)); observable.subscribe(t -> { assertEquals(true, t.getRight().call(Event.builder().withHost("test").build())); assertEquals(false, t.getRight().call(Event.builder().withHost("fail").build())); testComplete(); }); await(); }
@Test public void testConversionFromWebSocketWithInvalidUri() { ServerWebSocket socket = mock(ServerWebSocket.class); when(socket.uri()).thenReturn("httpquery=true"); Observable<Tup2<ServerWebSocket, Func1<Event, Boolean>>> observable = EventObservable.convertFromWebSocketObservable(Observable.just(socket)); observable.subscribe(t -> { fail(); testComplete(); }, e -> { testComplete(); }); await(); }
WebSocketTransport(Vertx vertx, Router router, LocalMap<String, SockJSSession> sessions, SockJSHandlerOptions options, Handler<SockJSSocket> sockHandler) { super(vertx, sessions, options); String wsRE = COMMON_PATH_ELEMENT_RE + "websocket"; router.getWithRegex(wsRE).handler(rc -> { HttpServerRequest req = rc.request(); String connectionHeader = req.headers().get(io.vertx.core.http.HttpHeaders.CONNECTION); if (connectionHeader == null || !connectionHeader.toLowerCase().contains("upgrade")) { rc.response().setStatusCode(400); rc.response().end("Can \"Upgrade\" only to \"WebSocket\"."); } else { ServerWebSocket ws = rc.request().upgrade(); if (log.isTraceEnabled()) log.trace("WS, handler"); SockJSSession session = new SockJSSession(vertx, sessions, rc, options.getHeartbeatInterval(), sockHandler); session.register(req, new WebSocketListener(ws, session)); } }); router.getWithRegex(wsRE).handler(rc -> { if (log.isTraceEnabled()) log.trace("WS, get: " + rc.request().uri()); rc.response().setStatusCode(400); rc.response().end("Can \"Upgrade\" only to \"WebSocket\"."); }); router.routeWithRegex(wsRE).handler(rc -> { if (log.isTraceEnabled()) log.trace("WS, all: " + rc.request().uri()); rc.response().putHeader("Allow", "GET").setStatusCode(405).end(); }); }
WebSocketListener(ServerWebSocket ws, SockJSSession session) { this.ws = ws; this.session = session; ws.textMessageHandler(this::handleMessages); ws.closeHandler(v -> { closed = true; session.shutdown(); }); ws.exceptionHandler(t -> { closed = true; session.shutdown(); session.handleException(t); }); }
RawWSSockJSSocket(Vertx vertx, Session webSession, User webUser, ServerWebSocket ws) { super(vertx, webSession, webUser); this.ws = ws; ws.closeHandler(v -> { // Make sure the writeHandler gets unregistered RawWSSockJSSocket.super.close(); }); }
RawWebSocketTransport(Vertx vertx, Router router, Handler<SockJSSocket> sockHandler) { String wsRE = "/websocket"; router.get(wsRE).handler(rc -> { ServerWebSocket ws = rc.request().upgrade(); SockJSSocket sock = new RawWSSockJSSocket(vertx, rc.session(), rc.user(), ws); sockHandler.handle(sock); }); router.get(wsRE).handler(rc -> rc.response().setStatusCode(400).end("Can \"Upgrade\" only to \"WebSocket\".")); router.get(wsRE).handler(rc -> rc.response().putHeader("Allow", "GET").setStatusCode(405).end()); }