@Override public void broadcastOne(String user, String message) { Multimap<String, MessageInbound> syncMap = Multimaps.synchronizedMultimap(userPagesMap); Collection<MessageInbound> mis = syncMap.get(user); synchronized (syncMap) { if (mis != null) { Iterator<MessageInbound> it = mis.iterator(); while (it.hasNext()) { MessageInbound inbound = it.next(); try { sendToPage(inbound, message); } catch (IOException e) { // userPagesMap.remove(user, inbound); logger.info("The WebSocket connection has been closed: " + inbound.toString()); } } } } }
@Override public void sendToAllExcept(final List<Command> commands, final Object nonReciver) { final byte[] buffer; try { buffer = serializer.serialize(commands); } catch (final SynchronizeFXException e) { shutdown(); callback.onFatalError(e); return; } synchronized (connections) { // This ensures that no client is added or removed for the connection list while iterating over it. // This ensures also that all clients get messages in the correct order for the case that sendToAllExcept // as already called a second time. for (final MessageInbound connection : connections) { if (connection != nonReciver) { send(buffer, connection); } } } }
/** * Disconnects all clients and makes the servlet refuse new connections. */ @Override public void shutdown() { synchronized (connections) { parent.channelCloses(this); for (final MessageInbound connection : connections) { try { connection.getWsOutbound().close(0, null); } catch (final IOException e) { LOG.error("Connection [" + connection.toString() + "] can't be closed.", e); } finally { final ExecutorService executorService = connectionThreads.get(connection); if (executorService != null) { executorService.shutdown(); } connectionThreads.remove(connection); } } connections.clear(); } callback = null; }
@Override protected StreamInbound createWebSocketInbound(String subProtocol, HttpServletRequest request) { try{ final URI uri = new URI(request.getRequestURI()); return new MessageInbound() { @Override protected void onBinaryMessage(ByteBuffer arg0) throws IOException { } @Override protected void onTextMessage(CharBuffer message) throws IOException { l.onTextMessage(message); } @Override protected void onOpen(final WsOutbound outbound) { l.onOpen(new Connection() { @Override public URI getRequestUri() { return uri; } @Override public void send(CharSequence text) throws IOException { outbound.writeTextMessage(CharBuffer.wrap(text)); } }); } @Override protected void onClose(int status) { l.onClose(status); } private ConnectionListener l = handler.createConnectionListener(); }; } catch(URISyntaxException e){ throw new RuntimeException(e); } }
/** * Sends send the result of {@link Serializer#serialize(List)} to a destination. * * @param buffer the bytes to send. * @param destination The peer to send to. */ private void send(final byte[] buffer, final Object destination) { if (LOG.isTraceEnabled()) { LOG.trace("Sending from thread: id: " + Thread.currentThread().getName() + ", name: " + Thread.currentThread().getName()); } final WsOutbound outbound = ((MessageInbound) destination).getWsOutbound(); final ExecutorService executorService = connectionThreads.get(destination); // execute asynchronously to avoid slower clients from interfering with faster clients executorService.execute(new Runnable() { @Override public void run() { try { outbound.writeBinaryMessage(ByteBuffer.wrap(buffer)); } catch (final IOException e) { LOG.warn("Sending data to a client failed. Closing connection to this client."); try { outbound.close(1002, null); // CHECKSTYLE:OFF } catch (final IOException e1) { // Maybe the connection is already closed. This is no exceptional state but rather the // default in // this case. So it's safe to ignore this exception. } // CHECKSTYLE:ON connectionCloses((SynchronizeFXTomcatConnection) destination); } } }); }
@Override public void registerClient(String user, MessageInbound inbound) { logger.info("New page registered at" + new Date().toString()); userPagesMap.put(user, inbound); logger.info("Size of " + user + ":" + userPagesMap.get(user).size()); }
@Override public void unregisterClient(String user, MessageInbound inbound) { logger.info("A page unregistered at " + new Date().toString()); userPagesMap.remove(user, inbound); logger.info("Size of " + user + ":" + userPagesMap.get(user).size()); }
@Override public void sendToPage(MessageInbound inbound, String message) throws IOException { inbound.getWsOutbound().writeTextMessage(CharBuffer.wrap(message)); logger.info("message sent to client:" + message); }
public MessageInboundConnection(MessageInbound messageInbound) { this.messageInbound = messageInbound; this.isConnected = true; }
/** * 注册一个用户页面的WebSocket客户端 * @param user 用户对象 * @param inbound */ public void registerClient(String user, MessageInbound inbound);
/** * 注销一个用户页面的WebSocket客户端 * @param user 用户端想 * @param inbound */ public void unregisterClient(String user, MessageInbound inbound);
/** * 将文本信息发送给某个特定页面 * @param inbound 页面信息 * @param message 需要发送的文本信息 * @throws IOException */ public void sendToPage(MessageInbound inbound, String message) throws IOException;