我为RSocket消息编写了一个小样
问题是我无法访问Rsocket端点,我从服务器收到以下异常:
Rsocket
客户端: 配置:
@Bean RSocket rSocket() { return RSocketFactory.connect() .mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE) .frameDecoder(PayloadDecoder.ZERO_COPY) .transport(TcpClientTransport.create(new InetSocketAddress(7500))) .start() .block(); } @Bean RSocketRequester requester(RSocketStrategies strategies) { return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, strategies); }
控制器:
private final RSocketRequester requester; @GetMapping("/greet/{name}") public Publisher<GreetingsResponse> greet(@PathVariable String name) { return requester .route("hello") .data(new GreetingsRequest(name)) .retrieveMono(GreetingsResponse.class); }
服务器端(使用spring Rsocket): yml:
spring: rsocket: server: port: 7500 transport: tcp main: lazy-initialization: true
组态:
@MessageMapping("hello") Mono<GreetingsResponse> greet(GreetingsRequest request) { return Mono.just(new GreetingsResponse("Hello " + request.getName() + " @ " + Instant.now())); }
我很确定它与新wrap功能有关,RSocketRequester.wrap 因为它接受了一个新参数metadataMimeType,我将其设置为application / Json,但似乎不起作用
wrap
RSocketRequester.wrap
metadataMimeType
堆栈跟踪:
org.springframework.messaging.MessageDeliveryException:org.springframework.messaging.handler.invocation.reactive上的org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler.handleNoMatch(RSocketMessageHandler.java:312)上没有用于目标’‘的处理程序。 org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.lambda $上的AbstractMethodMessageHandler.getHandlerMethod(AbstractMethodMessageHandler.java:445)在org.springframework.messaging.handler.invocation.reactive.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:417)在org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.lambda $在reactor.core.publisher处的handleAndReply $ 4(MessagingRSocket.java:173)在reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44)处在reactor.core.publisher.Mono.subscribe(Mono.java:3920)。io.rsocket上的FluxConcatArray $ ConcatArraySubscriber.onComplete(FluxConcatArray.java:207)在Reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)在react.core.publisher.MonoFromFluxOperator.subscribe(MonoFromFluxOperator.java:74) io.rsocket上的.RSocketResponder.handleRequestResponse(RSocketResponder.java:386)。反应器.core.publisher上的RSocketResponder.handleFrame(RSocketResponder.java:298).reactor.core.publisher上的LambdaSubscriber.onNext(LambdaSubscriber.java:160) MonoFlatMapMany $ FlatMapManyInner.onNext(MonoFlatMapMany.java:238)位于Reactor.core.publisher.FluxGroupBy $ UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)at Reactor.core.publisher.FluxGroupBy $ UnicastGroupedFlux.drain(FluxGroupBy)。在reactor.core.publisher.FluxGroupBy $ UnicastGroupedFlux。在reactor.core.publisher处订阅(FluxGroupBy.java:696).relux.subscribe(Flux.java:8000)在reactor.core.publisher.MonoFlatMapMany $ FlatMapManyMain.onNext(MonoFlatMapMany.java:184) .operator $ MonoSubscriber.complete(Operators.java:1582)at io.rsocket.internal.ClientServerInputMultiplexer.lambda $ new $ 1(ClientServer.core.publisher.MonoProcessor.onNext(MonoProcessor.java:316)在reactor.core.publisher.FluxGroupBy $ GroupByMain.drainLoop(FluxGroupBy.java:380)在reactor.core.publisher.FluxGroupBy $ GroupByMain.drainLoop(FluxGroupBy.java:380)在reactor.core.publisher.FluxGroupBy $ GroupByMain.drain(FluxGroupBy .java:316)在Reactor.core.publisher.FluxGroupBy $ GroupByMain.onNext(FluxGroupBy.java:201)在Reactor.core.publisher.FluxMap $ MapSubscriber处。在Reactor.core.publisher.FluxMap $ MapSubscriber.onNext(FluxMap.java:114)在Reactor.netty.channel上的onNext(FluxMap.java:114)在Reactor.netty.channel上的FluxReceive.drainReceiver(FluxReceive.java:206) io.netty处的react.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:342)处的.FluxReceive.onInboundNext(FluxReceive.java:322)io.netty处的react.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:91)处的。 io.netty处io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)处io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)io.netty处io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) .handler.codec.ByteToMessageDecoder。io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)上的io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)上的fireChannelRead(ByteToMessageDecoder.java:328) io上的.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)在io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)在io.netty.channel.DefaultChannelPipeline $ HeadContext.channelRead(DefaultChannelPipeline.java:1421)在io。 io.netty.channel上的netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)io.netty.channel上的netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)。io.netty.channel.nio.AbstractNioByteChannel $ NioByteUnsafe.read(AbstractNioByteChannel.java:163)上的DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)在io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)在io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)在io.netty.channel.nio.NioEventLoop.run (NioEventLoop.java:511)在io.netty.util.concurrent.SingleThreadEventExecutor $ 5.run(SingleThreadEventExecutor.java:918)在io.netty.util.internal.ThreadExecutorMap $ 2.run(ThreadExecutorMap.java:74)在io。 netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)位于java.base / java.lang.Thread.run(Thread.java:834)io上io.netty.channel.nio.AbstractNioByteChannel $ NioByteUnsafe.read(AbstractNioByteChannel.java:163)的fireChannelRead(DefaultChannelPipeline.java:930)io io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697) .netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)在io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)在io.netty.channel.nio.NioEventLoop.run(NioEventLoop .java:511)在io.netty.util.concurrent.SingleThreadEventExecutor $ 5.run(SingleThreadEventExecutor.java:918)在io.netty.util.internal.ThreadExecutorMap $ 2.run(ThreadExecutorMap.java:74)在io.netty。 util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)在java.base / java.lang.Thread.run(Thread.java:834)io上io.netty.channel.nio.AbstractNioByteChannel $ NioByteUnsafe.read(AbstractNioByteChannel.java:163)的fireChannelRead(DefaultChannelPipeline.java:930)io io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697) .netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)在io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)在io.netty.channel.nio.NioEventLoop.run(NioEventLoop .java:511)在io.netty.util.concurrent.SingleThreadEventExecutor $ 5.run(SingleThreadEventExecutor.java:918)在io.netty.util.internal.ThreadExecutorMap $ 2.run(ThreadExecutorMap.java:74)在io.netty。 util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)在java.base / java.lang.Thread.run(Thread.java:834)930)在io.netty.channel.nio的io.netty.channel.nio.AbstractNioByteChannel $ NioByteUnsafe.read(AbstractNioByteChannel.java:163)在io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)位于io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)的io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)的.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632) io.netty.util.internal.ThreadExecutorMap $ 2.run(ThreadExecutorMap.java:74)的io.netty.util.concurrent.SingleThreadEventExecutor $ 5.run(SingleThreadEventExecutor.java:918)在io.netty.util.concurrent.FastThreadLocalRunnable。在java.base / java.lang.Thread.run(Thread.java:834)上运行(FastThreadLocalRunnable.java:30)930)在io.netty.channel.nio的io.netty.channel.nio.AbstractNioByteChannel $ NioByteUnsafe.read(AbstractNioByteChannel.java:163)在io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)位于io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)的io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)的.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632) io.netty.util.internal.ThreadExecutorMap $ 2.run(ThreadExecutorMap.java:74)的io.netty.util.concurrent.SingleThreadEventExecutor $ 5.run(SingleThreadEventExecutor.java:918)在io.netty.util.concurrent.FastThreadLocalRunnable。在java.base / java.lang.Thread.run(Thread.java:834)上运行(FastThreadLocalRunnable.java:30)在io.netty上读取io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)在io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)处的(AbstractNioByteChannel.java:163) io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)上的.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)在io.netty.util.concurrent.SingleThreadEventExecutor $ 5.run(SingleThreadEventExecutor。 java:918)位于io.netty.util.internal.ThreadExecutorMap $ 2.run(ThreadExecutorMap.java:74)位于io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)位于java.base / java。 lang.Thread.run(Thread.java:834)在io.netty上读取io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)在io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)处的(AbstractNioByteChannel.java:163) io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)上的.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)在io.netty.util.concurrent.SingleThreadEventExecutor $ 5.run(SingleThreadEventExecutor。 java:918)位于io.netty.util.internal.ThreadExecutorMap $ 2.run(ThreadExecutorMap.java:74)位于io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)位于java.base / java。 lang.Thread.run(Thread.java:834)io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)的io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)的io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)的processSelectedKeysOptimized(NioEventLoop.java:632) .io.netty.util.internal.ThreadExecutorMap $ 2.run(ThreadExecutorMap.java:74)上的.util.concurrent.SingleThreadEventExecutor $ 5.run(SingleThreadEventExecutor.java:918)在io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable .java:30),位于java.base / java.lang.Thread.run(Thread.java:834)io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)的io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)的io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)的processSelectedKeysOptimized(NioEventLoop.java:632) .io.netty.util.internal.ThreadExecutorMap $ 2.run(ThreadExecutorMap.java:74)上的.util.concurrent.SingleThreadEventExecutor $ 5.run(SingleThreadEventExecutor.java:918)在io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable .java:30),位于java.base / java.lang.Thread.run(Thread.java:834)在io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)处的netty.util.internal.ThreadExecutorMap $ 2.run(ThreadExecutorMap.java:74)在java.base / java.lang.Thread.run(Thread .java:834)在io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)处的netty.util.internal.ThreadExecutorMap $ 2.run(ThreadExecutorMap.java:74)在java.base / java.lang.Thread.run(Thread .java:834)
您正在使用哪个spring版本?我有一个相同的问题,我通过更改spring-boot-starter-parent 2.2.0.M3来解决了。
这是我的来源 https://github.com/han1448/spring-rsocket- example
添加。
我解决了这个问题。您需要将mimeType更改为message/x.rsocket.routing.v0。您可以从获取此mimeType MetadataExtractor.ROUTING。
message/x.rsocket.routing.v0
MetadataExtractor.ROUTING
@Bean RSocket rSocket() { return RSocketFactory.connect() .mimeType(MetadataExtractor.ROUTING.toString(), MimeTypeUtils.APPLICATION_JSON_VALUE) .frameDecoder(PayloadDecoder.ZERO_COPY) .transport(TcpClientTransport.create(new InetSocketAddress(7500))) .start() .block(); } @Bean RSocketRequester requester(RSocketStrategies strategies) { return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MetadataExtractor.ROUTING, strategies); }