我一直在2.0.0.RC1使用webflux启动器(spring-boot-starter-webflux)进行spring-boot 。我创建了一个返回无限磁通的简单控制器。我希望发布者仅在有客户端(订阅者)的情况下进行工作。假设我有一个像这样的控制器:
2.0.0.RC1
spring-boot-starter-webflux
@RestController public class Demo { @GetMapping(value = "/") public Flux<String> getEvents(){ return Flux.create((FluxSink<String> sink) -> { while(!sink.isCancelled()){ // TODO e.g. fetch data from somewhere sink.next("DATA"); } sink.complete(); }).doFinally(signal -> System.out.println("END")); } }
现在,当我尝试运行该代码并使用Chrome 访问端点http:// localhost:8080 /时,便可以看到数据。但是,一旦关闭浏览器,由于没有取消事件被触发,因此while循环继续。 关闭浏览器后,如何终止/取消流传输?
从这个答案我引用:
当前使用HTTP,由于HTTP协议不支持此功能,因此确切的背压信息无法通过网络传输。如果我们使用其他有线协议,这可能会改变。
我假设,由于HTTP协议不支持反压,因此这意味着也不会发出取消请求。
通过分析网络流量进一步研究表明,FIN关闭浏览器后浏览器立即发送TCP 。有没有一种方法可以配置Netty(或其他方式),以便半关闭的连接将触发发布者上的cancel事件,从而使while循环停止?
FIN
还是我必须编写自己的适配器(类似于org.springframework.http.server.reactive.ServletHttpHandlerAdapter实现自己的订户的位置)?
org.springframework.http.server.reactive.ServletHttpHandlerAdapter
谢谢你的帮助。
编辑:IOException如果没有客户端,尝试将数据写入套接字 的操作将引发。如您在堆栈跟踪中所见。
IOException
但这还不够好,因为下一个数据块准备好发送之前可能需要一段时间,因此检测到消失的客户端需要花费相同的时间。正如Brian Clozel的回答所指出的,这是Reactor Netty中的一个已知问题。我尝试通过将依赖项添加到来使用Tomcat POM.xml。像这样:
POM.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> </dependency>
尽管它代替了Netty并改用Tomcat,但由于浏览器不显示任何数据,因此它似乎没有反应。但是,控制台中没有警告/信息/例外。 从spring- boot-starter-webflux(2.0.0.RC1)版本开始,该版本是否可以与Tomcat一起使用吗?
spring- boot-starter-webflux
由于这是一个已知问题,所以我最终使用一个Flux来获取我的真实数据,并使用另一个来实现某种类型的ping /心跳机制。结果,我与合并在一起Flux.merge()。
Flux
Flux.merge()
在这里,您可以看到我的解决方案的简化版本:
@RestController public class Demo { public interface Notification{} public static class MyData implements Notification{ … public boolean isEmpty(){…} } @GetMapping(value = "/", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<ServerSentEvent<? extends Notification>> getNotificationStream() { return Flux.merge(getEventMessageStream(), getHeartbeatStream()); } private Flux<ServerSentEvent<Notification>> getHeartbeatStream() { return Flux.interval(Duration.ofSeconds(2)) .map(i -> ServerSentEvent.<Notification>builder().event("ping").build()) .doFinally(signalType ->System.out.println("END")); } private Flux<ServerSentEvent<MyData>> getEventMessageStream() { return Flux.interval(Duration.ofSeconds(30)) .map(i -> { // TODO e.g. fetch data from somewhere, // if there is no data return an empty object return data; }) .filter(data -> !data.isEmpty()) .map(data -> ServerSentEvent .builder(data) .event("message").build()); } }
我把所有东西都打包成ServerSentEvent<? extends Notification>。Notification只是一个标记界面。我使用类中的event字段ServerSentEvent来区分数据和ping事件。由于心跳Flux以短间隔连续不断地发送事件,因此检测到客户端已消失所需的时间最多为该间隔的长度。请记住,我需要这样做是因为可能要花一些时间才能获得一些可以发送的真实数据,结果,它可能还需要一段时间才能检测到客户端已消失。这样,它将在客户端无法发送ping(或消息事件)后立即检测到客户端已消失。
ServerSentEvent<? extends Notification>
Notification
event
ServerSentEvent
标记界面上的最后一个注释,我称为“通知”。这并不是真正必要的,但是它提供了一些类型安全性。否则,我们可以为getNotificationStream()方法编写Flux<ServerSentEvent<?>>而不是Flux<ServerSentEvent<?extendsNotification>>作为返回类型。或者也可以使getHeartbeatStream()返回Flux<ServerSentEvent<MyData>>。但是,这样可以允许发送任何对象,而这是我所不希望的。结果,我添加了接口。
Flux<ServerSentEvent<?>>
Flux<ServerSentEvent<?extendsNotification>>
Flux<ServerSentEvent<MyData>>