小编典典

Spring Boot SSL TCPClient〜StompBrokerRelayMessageHandler〜ActiveMQ〜Undertow

spring-boot

我正在尝试基于运行ActiveMQSpring
Websocket演示
构建一个Websocket消息传递应用程序,该ActiveMQ作为Undertow的STOMP消息代理。该应用程序在不安全的连接上运行良好。但是,我在配置STOMP
Broker Relay
以通过SSL连接转发时遇到困难。

如Spring WebSocket文档中所述…

上面的配置中的“ STOMP代理中继”是一个Spring
MessageHandler,它通过将消息转发到外部消息代理来处理消息。为此,它建立到代理的TCP连接,将所有消息转发给它,然后通过其WebSocket会话将从代理收到的所有消息转发给客户端。从本质上讲,它充当“转发器”,可以双向转发消息。

此外,文档指出我对Reactor-net有依赖性…

请添加对org.projectreactor:reactor-net的依赖以进行TCP连接管理。

问题是我当前的实现未通过SSL
初始化NettyTCPClient,因此ActiveMQ连接失败,并出现SSLException。


[r.i.n.i.n.t.NettyTcpClient:307] » CONNECTED: 
[id: 0xcfef39e9, /127.0.0.1:17779 => localhost/127.0.0.1:8442]
...
[o.a.a.b.TransportConnection.Transport:245] » 
Transport Connection to: tcp://127.0.0.1:17779 failed:
javax.net.ssl.SSLException: Unrecognized SSL message, plaintext connection?
...

因此,我尝试研究Project Reactor
Docs
来设置连接的SSL选项,但是我没有成功。

此时,我发现StompBrokerRelayMessageHandler默认在Reactor2TcpClient中初始化NettyTCPClient,但它似乎不可配置。

协助将不胜感激。

社会科学中心


应用道具

spring.activemq.in-memory=true
spring.activemq.pooled=false
spring.activemq.broker-url=stomp+ssl://localhost:8442
server.port=8443
server.ssl.enabled=true
server.ssl.protocol=tls
server.ssl.key-alias=undertow
server.ssl.key-store=classpath:undertow.jks
server.ssl.key-store-password=xxx
server.ssl.trust-store=classpath:undertow_certs.jks
server.ssl.trust-store-password=xxx

WebSocket配置

//... 
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    private static final Logger log = LoggerFactory.getLogger(WebSocketConfig.class);

    private final static String KEYSTORE = "/activemq.jks";
    private final static String KEYSTORE_PASS = "xxx";
    private final static String KEYSTORE_TYPE = "JKS";
    private final static String TRUSTSTORE = "/activemq_certs.jks";
    private final static String TRUSTSTORE_PASS = "xxx";

    private static String getBindLocation() {
        return "stomp+ssl://localhost:8442?transport.needClientAuth=false";
    }

    @Bean(initMethod = "start", destroyMethod = "stop")
    public SslBrokerService activeMQBroker() throws Exception {

        final SslBrokerService service = new SslBrokerService();
        service.setPersistent(false);

        KeyManager[] km = SecurityManager.getKeyManager();
        TrustManager[] tm = SecurityManager.getTrustManager();

        service.addSslConnector(getBindLocation(), km, tm, null);
        final ActiveMQTopic topic = new ActiveMQTopic("jms.topic.test");
        service.setDestinations(new ActiveMQDestination[]{topic});

        return service;
    }


    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/topic").setRelayHost("localhost").setRelayPort(8442);
        config.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/welcome").withSockJS();
        registry.addEndpoint("/test").withSockJS();
    }

   private static class SecurityManager { 
   //elided...
   }

}

WebSocket配置

@Configuration
public class WebSocketConfig extends DelegatingWebSocketMessageBrokerConfiguration {
    ...
    @Bean
    public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
      StompBrokerRelayMessageHandler handler = (StompBrokerRelayMessageHandler) super.stompBrokerRelayMessageHandler();
      ConfigurationReader reader = new StompClientDispatcherConfigReader();
      Environment environment = new Environment(reader).assignErrorJournal();
      TcpOperations<byte[]> client = new Reactor2TcpClient<>(new StompTcpClientSpecFactory(environment,"localhost", 8443));
      handler.setTcpClient(client);
      return handler;
    }
}

StompTCPClientSpecFactory

private static class StompTcpClientSpecFactory
        implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {

    private static final Logger log = LoggerFactory.getLogger(StompTcpClientSpecFactory.class);

    private final String host;
    private final int port;
    private final String KEYSTORE = "src/main/resources/tcpclient.jks";
    private final String KEYSTORE_PASS = "xxx";
    private final String KEYSTORE_TYPE = "JKS";
    private final String TRUSTSTORE = "/src/main/resources/tcpclient_certs.jks";
    private final String TRUSTSTORE_PASS = "xxx";
    private final String TRUSTSTORE_TYPE = "JKS";
    private final Environment environment;

    private final SecurityManager tcpManager = new SecurityManager
            .SSLBuilder(KEYSTORE, KEYSTORE_PASS)
            .keyStoreType(KEYSTORE_TYPE)
            .trustStore(TRUSTSTORE, TRUSTSTORE_PASS)
            .trustStoreType(TRUSTSTORE_TYPE)
            .build();

    public StompTcpClientSpecFactory(Environment environment, String host, int port) {
        this.environment = environment;
        this.host = host;
        this.port = port;
    }

    @Override
    public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(
            Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {

        return tcpClientSpec
                .ssl(new SslOptions()
                        .sslProtocol("TLS")
                        .keystoreFile(tcpManager.getKeyStore())
                        .keystorePasswd(tcpManager.getKeyStorePass())
                        .trustManagers(tcpManager::getTrustManager)
                        .trustManagerPasswd(tcpManager.getTrustStorePass()))
                .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
                .env(this.environment)
                .dispatcher(this.environment.getCachedDispatchers("StompClient").get())
                .connect(this.host, this.port);
    }
}

阅读 427

收藏
2020-05-30

共1个答案

小编典典

StompBrokerRelayMessageHandler有一个TcpClient的属性可以设置。但是,看起来我们没有通过WebSocketMessageBrokerConfigurer设置来公开它。

您可以删除@EnableWebSocketMessageBroker并扩展DelegatingWebSocketMessageBrokerConfiguration。实际上是一样的,但是您现在直接从提供所有bean的配置类扩展。

这样,您就可以覆盖stompBrokerRelayMessageHandler()Bean并直接设置其TcpClient属性。只要确保覆盖方法标记为即可@Bean

2020-05-30