我的Spring启动应用程序中有一种如下所示的方法。
public Flux<Data> search(SearchRequest request) { Flux<Data> result = searchService.search(request);//this returns Flux<Data> Mono<List<Data>> listOfData = result.collectList(); // doThisAsync() // here I want to pass this list and run some processing on it // the processing should happen async and the search method should return immediately. return result; } //this method uses the complete List<Data> returned by above method public void doThisAsync(List<Data> data) { //do some processing here }
当前,我在使用带@Async注释的服务类doThisAsync,但不知道如何传递它List<Data>,因为我不想调用block。我只有Mono<List<Data>>。
@Async
doThisAsync
List<Data>
block
Mono<List<Data>>
我的主要问题是如何分别处理此Mono,并且该search方法应返回Flux<Data>。
search
Flux<Data>
Mono
Flux
public Flux<Data> search(SearchRequest request) { return searchService.search(request) .collectList() .doOnNext(data -> doThisAsync(data).subscribe()) // add error logging here or inside doThisAsync .flatMapMany(Flux::fromIterable); } public Mono<Void> doThisAsync(List<Data> data) { //do some async/non-blocking processing here like calling WebClient }
public Flux<Data> search(SearchRequest request) { return searchService.search(request) .collectList() .doOnNext(data -> Mono.fromRunnable(() -> doThisAsync(data)) .subscribeOn(Schedulers.elastic()) // delegate to proper thread to not block main flow .subscribe()) // add error logging here or inside doThisAsync .flatMapMany(Flux::fromIterable); } public void doThisAsync(List<Data> data) { //do some blocking I/O on calling thread }
请注意,在上述两种情况下,您都会失去反压支持。如果由于doAsyncThis某种原因减慢了速度,那么数据生产者将不在乎并继续生产项目。这是“火与火”机制的自然结果。
doAsyncThis