小编典典

我可以使用Spring5的WebClient返回的Flux的block()方法吗?

spring-boot

我创建了Spring Boot
2.0演示应用程序,其中包含两个使用WebClient进行通信的应用程序。当我从WebClient的响应中使用Flux的block()方法时,他们经常停止通信。由于某些原因,我想使用List
not Flux。

服务器端应用程序就是这样。它只是返回Flux对象。

@GetMapping
public Flux<Item> findAll() {
    return Flux.fromIterable(items);
}

客户端(或BFF端)应用程序就是这样。我从服务器获取Flux,并通过调用block()方法将其转换为List。

@GetMapping
public List<Item> findBlock() {
    return webClient.get()
        .retrieve()
        .bodyToFlux(Item.class)
        .collectList()
        .block(Duration.ofSeconds(10L));
}

虽然起初效果不错,但多次访问后findBlock()不会响应并且超时。当我修改find​​Block()方法以返回Flux并删除co​​llectList()和block()时,它运行良好。然后,我假设block()方法会导致此问题。
而且,当我修改find​​All()方法以返回List时,没有任何变化。

整个示例应用程序的源代码在这里。
https://github.com/cero-t/webclient-
example

“资源”是服务器应用程序,“前端”是客户端应用程序。运行两个应用程序后,当我访问localhost:8080时,它可以很好地工作,并且可以随时重新加载,但是当我访问localhost:8080
/ block时,它似乎工作得很好,但是在多次重载后,它不会响应。


顺便说一句,当我将“ spring-boot-starter-
web”依赖项添加到“前端”应用程序(而非资源应用程序)的pom.xml时,这意味着我使用了tomcat,这个问题永远不会发生。这是由于Netty服务器引起的问题吗?

任何指导将不胜感激。


阅读 982

收藏
2020-05-30

共1个答案

小编典典

首先,让我指出,Flux.fromIterable(items)仅当items从内存中获取使用建议,不涉及I /
O。否则,您可能会使用阻塞API来获取它-
这会破坏您的响应式应用程序。在这种情况下,这是一个内存列表,所以没有问题。请注意,您也可以去Flux.just(item1, item2, item3)

使用以下内容是最有效的:

@GetMapping("/")
public Flux<Item> findFlux() {
  return webClient.get()
    .retrieve()
    .bodyToFlux(Item.class);
}

Item 实例将以非常有效的方式即时读取/写入,解码/编码。

另一方面,这不是首选方法:

@GetMapping("/block")
public List<Item> findBlock() {
  return webClient.get()
    .retrieve()
    .bodyToFlux(Item.class)
    .collectList()
    .block(Duration.ofSeconds(10L));
}

在这种情况下,您的前台应用程序 将整个项目列表 缓存在内存
collectList但同时也会阻塞少数可用的服务器线程之一。这可能会导致性能很差,因为您的服务器可能被阻止等待该数据,并且无法同时处理其他请求。

在这种情况下,情况变得更糟,因为应用程序完全崩溃了。查看控制台,我们可以看到以下内容:

WARN 3075 --- [ctor-http-nio-7] io.netty.util.concurrent.DefaultPromise  : An exception was thrown by reactor.ipc.netty.channel.PooledClientContextHandler$$Lambda$532/356589024.operationComplete()

reactor.core.Exceptions$BubblingException: java.lang.IllegalArgumentException: Channel [id: 0xab15f050, L:/127.0.0.1:59350 - R:localhost/127.0.0.1:8081] was not acquired from this ChannelPool
    at reactor.core.Exceptions.bubble(Exceptions.java:154) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]

这可能与应在0​​.7.4.RELEASE中修复的反应堆网络客户端连接池问题有关。我不知道具体细节,但是我怀疑整个连接池都被破坏了,因为没有从客户端连接中正确读取HTTP响应。

添加spring-boot-starter-web的确使您的应用程序使用Tomcat,但是主要是将Spring WebFlux应用程序转换为Spring
MVC应用程序(现在支持某些反应式返回类型,但是具有不同的运行时模型)。如果希望使用Tomcat测试应用程序,则可以将其添加spring-boot- starter-tomcat到POM中,这将结合使用Tomcat和Spring WebFlux。

2020-05-30