小编典典

用反应堆射击并忘记

spring-boot

我的Spring启动应用程序中有一种如下所示的方法。

public Flux<Data> search(SearchRequest request) {
  Flux<Data> result = searchService.search(request);//this returns Flux<Data>
  Mono<List<Data>> listOfData = result.collectList();
//  doThisAsync() // here I want to pass this list and run some processing on it
// the processing should happen async and the search method should return immediately.
  return result;
}

//this method uses the complete List<Data> returned by above method
public void doThisAsync(List<Data> data) {
  //do some processing here
}

当前,我在使用带@Async注释的服务类doThisAsync,但不知道如何传递它List<Data>,因为我不想调用block。我只有Mono<List<Data>>

我的主要问题是如何分别处理此Mono,并且该search方法应返回Flux<Data>


阅读 334

收藏
2020-05-30

共1个答案

小编典典

1,如果你的 发射后不管 已经是异步返回Mono/Flux

public Flux<Data> search(SearchRequest request)
{
    return searchService.search(request)
                        .collectList()
                        .doOnNext(data -> doThisAsync(data).subscribe())  // add error logging here or inside doThisAsync
                        .flatMapMany(Flux::fromIterable);
}

public Mono<Void> doThisAsync(List<Data> data) {
    //do some async/non-blocking processing here like calling WebClient
}

2,如果您 的一劳永逸 确实阻止了I / O

public Flux<Data> search(SearchRequest request)
{
    return searchService.search(request)
                        .collectList()
                        .doOnNext(data -> Mono.fromRunnable(() -> doThisAsync(data))
                                              .subscribeOn(Schedulers.elastic())  // delegate to proper thread to not block main flow
                                              .subscribe())  // add error logging here or inside doThisAsync
                        .flatMapMany(Flux::fromIterable);
}

public void doThisAsync(List<Data> data) {
    //do some blocking I/O on calling thread
}

请注意,在上述两种情况下,您都会失去反压支持。如果由于doAsyncThis某种原因减慢了速度,那么数据生产者将不在乎并继续生产项目。这是“火与火”机制的自然结果。

2020-05-30