我正在尝试基于运行ActiveMQ的Spring Websocket演示构建一个Websocket消息传递应用程序,该ActiveMQ作为Undertow的STOMP消息代理。该应用程序在不安全的连接上运行良好。但是,我在配置STOMP Broker Relay以通过SSL连接转发时遇到困难。
如Spring WebSocket文档中所述…
上面的配置中的“ STOMP代理中继”是一个Spring MessageHandler,它通过将消息转发到外部消息代理来处理消息。为此,它建立到代理的TCP连接,将所有消息转发给它,然后通过其WebSocket会话将从代理收到的所有消息转发给客户端。从本质上讲,它充当“转发器”,可以双向转发消息。
此外,文档指出我对Reactor-net有依赖性…
请添加对org.projectreactor:reactor-net的依赖以进行TCP连接管理。
问题是我当前的实现未通过SSL 初始化NettyTCPClient,因此ActiveMQ连接失败,并出现SSLException。
[r.i.n.i.n.t.NettyTcpClient:307] » CONNECTED: [id: 0xcfef39e9, /127.0.0.1:17779 => localhost/127.0.0.1:8442] ... [o.a.a.b.TransportConnection.Transport:245] » Transport Connection to: tcp://127.0.0.1:17779 failed: javax.net.ssl.SSLException: Unrecognized SSL message, plaintext connection? ...
因此,我尝试研究Project Reactor Docs来设置连接的SSL选项,但是我没有成功。
此时,我发现StompBrokerRelayMessageHandler默认在Reactor2TcpClient中初始化NettyTCPClient,但它似乎不可配置。
协助将不胜感激。
社会科学中心
应用道具
spring.activemq.in-memory=true spring.activemq.pooled=false spring.activemq.broker-url=stomp+ssl://localhost:8442 server.port=8443 server.ssl.enabled=true server.ssl.protocol=tls server.ssl.key-alias=undertow server.ssl.key-store=classpath:undertow.jks server.ssl.key-store-password=xxx server.ssl.trust-store=classpath:undertow_certs.jks server.ssl.trust-store-password=xxx
WebSocket配置
//... @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { private static final Logger log = LoggerFactory.getLogger(WebSocketConfig.class); private final static String KEYSTORE = "/activemq.jks"; private final static String KEYSTORE_PASS = "xxx"; private final static String KEYSTORE_TYPE = "JKS"; private final static String TRUSTSTORE = "/activemq_certs.jks"; private final static String TRUSTSTORE_PASS = "xxx"; private static String getBindLocation() { return "stomp+ssl://localhost:8442?transport.needClientAuth=false"; } @Bean(initMethod = "start", destroyMethod = "stop") public SslBrokerService activeMQBroker() throws Exception { final SslBrokerService service = new SslBrokerService(); service.setPersistent(false); KeyManager[] km = SecurityManager.getKeyManager(); TrustManager[] tm = SecurityManager.getTrustManager(); service.addSslConnector(getBindLocation(), km, tm, null); final ActiveMQTopic topic = new ActiveMQTopic("jms.topic.test"); service.setDestinations(new ActiveMQDestination[]{topic}); return service; } @Override public void configureMessageBroker(MessageBrokerRegistry config) { config.enableStompBrokerRelay("/topic").setRelayHost("localhost").setRelayPort(8442); config.setApplicationDestinationPrefixes("/app"); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/welcome").withSockJS(); registry.addEndpoint("/test").withSockJS(); } private static class SecurityManager { //elided... } }
@Configuration public class WebSocketConfig extends DelegatingWebSocketMessageBrokerConfiguration { ... @Bean public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() { StompBrokerRelayMessageHandler handler = (StompBrokerRelayMessageHandler) super.stompBrokerRelayMessageHandler(); ConfigurationReader reader = new StompClientDispatcherConfigReader(); Environment environment = new Environment(reader).assignErrorJournal(); TcpOperations<byte[]> client = new Reactor2TcpClient<>(new StompTcpClientSpecFactory(environment,"localhost", 8443)); handler.setTcpClient(client); return handler; } }
StompTCPClientSpecFactory
private static class StompTcpClientSpecFactory implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> { private static final Logger log = LoggerFactory.getLogger(StompTcpClientSpecFactory.class); private final String host; private final int port; private final String KEYSTORE = "src/main/resources/tcpclient.jks"; private final String KEYSTORE_PASS = "xxx"; private final String KEYSTORE_TYPE = "JKS"; private final String TRUSTSTORE = "/src/main/resources/tcpclient_certs.jks"; private final String TRUSTSTORE_PASS = "xxx"; private final String TRUSTSTORE_TYPE = "JKS"; private final Environment environment; private final SecurityManager tcpManager = new SecurityManager .SSLBuilder(KEYSTORE, KEYSTORE_PASS) .keyStoreType(KEYSTORE_TYPE) .trustStore(TRUSTSTORE, TRUSTSTORE_PASS) .trustStoreType(TRUSTSTORE_TYPE) .build(); public StompTcpClientSpecFactory(Environment environment, String host, int port) { this.environment = environment; this.host = host; this.port = port; } @Override public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply( Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) { return tcpClientSpec .ssl(new SslOptions() .sslProtocol("TLS") .keystoreFile(tcpManager.getKeyStore()) .keystorePasswd(tcpManager.getKeyStorePass()) .trustManagers(tcpManager::getTrustManager) .trustManagerPasswd(tcpManager.getTrustStorePass())) .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder())) .env(this.environment) .dispatcher(this.environment.getCachedDispatchers("StompClient").get()) .connect(this.host, this.port); } }
该StompBrokerRelayMessageHandler有一个TcpClient的属性可以设置。但是,看起来我们没有通过WebSocketMessageBrokerConfigurer设置来公开它。
StompBrokerRelayMessageHandler
WebSocketMessageBrokerConfigurer
您可以删除@EnableWebSocketMessageBroker并扩展DelegatingWebSocketMessageBrokerConfiguration。实际上是一样的,但是您现在直接从提供所有bean的配置类扩展。
@EnableWebSocketMessageBroker
DelegatingWebSocketMessageBrokerConfiguration
这样,您就可以覆盖stompBrokerRelayMessageHandler()Bean并直接设置其TcpClient属性。只要确保覆盖方法标记为即可@Bean。
stompBrokerRelayMessageHandler()
@Bean