在本文中,我们将看到如何使用Spring WebFlux构建响应式REST API。在进入反应式API之前,让我们看看系统是如何发展的,传统REST实现遇到的问题以及现代API的需求。
如果您查看从旧版系统到下文所述的现代系统的期望,
现代系统的期望是:应将应用程序分布式,Cloud Native,拥抱以实现高可用性和可伸缩性。因此,有效利用系统资源至关重要。进入为什么要使用响应式编程来构建REST API?让我们看看传统的REST API请求处理是如何工作的。
以下是传统REST API遇到的问题,
阻止和同步→请求正在阻止和同步。请求线程将等待任何阻塞的I / O,并且直到I / O等待结束后,线程才可以释放以将响应返回给调用方。 每个请求的线程数→ Web容器使用每个请求 的线程数模型。这限制了要处理的并发请求的数量。除了某些请求之外,容器还会对请求进行排队,这些请求最终会影响API的性能。 处理高并发用户的限制 → 由于Web容器使用每个请求线程数模型,因此我们无法处理高并发请求。 无法更好地利用系统资源 → 线程将因I / O而阻塞并处于空闲状态。但是,Web容器不能接受更多请求。在这种情况下,我们将无法有效地利用系统资源。 没有反压支持→ 我们无法从客户端或服务器施加反压。如果突然出现大量请求,则服务器或客户端可能会中断。之后,用户将无法访问该应用程序。如果我们有背压支持,则应用程序应在重负载期间持续运行,而不是无法使用。 让我们看看如何使用反应式编程解决上述问题。以下是我们使用反应式API可获得的优势。
异步和非阻塞→ 反应式编程为编写异步和非阻塞应用程序提供了灵活性。 事件/消息驱动→ 系统将为任何活动生成事件或消息。例如,来自数据库的数据被视为事件流。 支持背压→ 我们可以通过施加背压来 优雅地处理从一个系统到另一个系统的压力,从而避免拒绝服务。 可预测的应用程序响应时间→ 由于线程是异步且非阻塞的,因此在负载下应用程序响应时间是可预测的。 更好地利用系统资源→ 由于线程是异步且非阻塞的,因此不会为I / O占用 线程。使用更少的线程,我们可以支持更多的用户请求。 根据负载缩放 远离每个请求的线程 → 通过反应式API,我们正在远离每个请求的线程模型,因为线程是异步且非阻塞的。发出请求后,它将与服务器一起创建事件,并且请求线程将被释放以处理其他请求。 现在,让我们看看反应式编程是如何工作的。在下面的示例中,一旦应用程序调用了从数据源获取数据的操作,线程将立即返回,并且来自数据源的数据将作为数据/事件流出现。在这里,应用程序是订阅者,数据源是发布者。数据流完成后,onComplete将触发事件。
下面是另一种情况,onError如果发生任何异常,发布者将触发事件。
在某些情况下,可能没有发布者要交付的任何物品。例如,从数据库中删除一个项目。在这种情况下,发布者将立即触发onComplete/onError事件,而无需调用onNext事件,因为没有数据可返回。
现在,让我们看看什么是背压 以及如何将背压应用于反应性物流。 例如,我们有一个客户端应用程序正在从另一个服务请求数据。该服务能够以1000TPS的速率发布事件,但是客户端应用程序能够以200TPS的速率处理事件。
在这种情况下,客户端应用程序应缓冲其余数据以进行处理。在随后的调用中,客户端应用程序可能会缓冲更多数据,并最终耗尽内存。这会对依赖于客户端应用程序的其他应用程序造成级联效应。为了避免这种情况,客户端应用程序可以要求服务在事件末尾缓冲事件,并以客户端应用程序的速率推送事件。这称为背压。下图描述了相同的内容。
现在,我们将看到反应流规范及其实现之一,称为Project Reactor。反应流规范定义了以下接口。让我们查看这些接口的详细信息。
发布者→ 发布者是数量可能不受限制的序列元素的提供者,可按其订阅者的要求发布它们
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); }
Subscriber → Subscriber 是数量可能不受限制的已排序元素的使用者。
public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
Subscription → 订阅表示订阅者订阅发布者的一对一生命周期。
public interface Subscription { public void request(long n); public void cancel(); }
处理器 → 处理器代表一个处理阶段-既是订户又是发布者,并且服从两者的合同。
反应流规格的类图如下所示。
反应性流规范具有许多实现。Project Reactor是实现之一。该反应堆完全无阻塞,可提供有效的需求管理。Reactor提供了两个反应式和可组合的API,即Flux [N]和Mono [0 | 1],它们广泛实现了Reactive Extensions。Reactor提供了用于HTTP(包括Websockets),TCP和UDP的非阻塞,反压就绪的网络引擎。它非常适合微服务架构。
通量→ 它是Publisher带有rx运算符的反应性流,它发出0到N个元素,然后完成(成功或有错误)。助焊剂的大理石图如下所示。
Mono → 它是Publisher具有基本rx运算符的反应式流,它通过发出0到1个元素或有错误而成功完成。Mono的大理石图如下所示。
由于Spring 5.x随Reactor实施一起提供,因此,如果我们想使用带有Spring servlet堆栈的命令式编程来构建REST API,它仍然支持。下图说明了Spring如何支持反应式和servlet堆栈实现。
现在,我们将看到一个应用程序,以公开响应式REST API。在此应用程序中,我们使用了:
下面是应用程序的高级体系结构。
响应式演示应用程序工作流程
让我们看一下build.gradle文件,以查看与Spring WebFlux一起使用的依赖项。
plugins { id 'org.springframework.boot' version '2.2.6.RELEASE' id 'io.spring.dependency-management' version '1.0.9.RELEASE' id 'java' } group = 'org.smarttechie' version = '0.0.1-SNAPSHOT' sourceCompatibility = '1.8' repositories { mavenCentral() } dependencies { implementation 'org.springframework.boot:spring-boot-starter-data-cassandra-reactive' implementation 'org.springframework.boot:spring-boot-starter-webflux' testImplementation('org.springframework.boot:spring-boot-starter-test') { exclude group: 'org.junit.vintage', module: 'junit-vintage-engine' } testImplementation 'io.projectreactor:reactor-test' } test { useJUnitPlatform() }
在此应用程序中,我公开了以下提到的API。您可以从GitHub下载源代码 。
package org.smarttechie.controller; import org.smarttechie.model.Product; import org.smarttechie.repository.ProductRepository; import org.smarttechie.service.ProductService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @RestController public class ProductController { @Autowired private ProductService productService; /** * This endpoint allows to create a product. * @param product - to create * @return - the created product */ @PostMapping("/product") @ResponseStatus(HttpStatus.CREATED) public Mono<Product> createProduct(@RequestBody Product product){ return productService.save(product); } /** * This endpoint gives all the products * @return - the list of products available */ @GetMapping("/products") public Flux<Product> getAllProducts(){ return productService.getAllProducts(); } /** * This endpoint allows to delete a product * @param id - to delete * @return */ @DeleteMapping("/product/{id}") public Mono<Void> deleteProduct(@PathVariable int id){ return productService.deleteProduct(id); } /** * This endpoint allows to update a product * @param product - to update * @return - the updated product */ @PutMapping("product/{id}") public Mono<ResponseEntity<Product>> updateProduct(@RequestBody Product product){ return productService.update(product); } }
在构建反应式API时,我们可以使用功能风格的编程模型来构建API,而无需使用RestController。在这种情况下,我们需要具有一个路由器和一个处理程序组件,如下所示。
package org.smarttechie.router; import org.smarttechie.handler.ProductHandler; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.RouterFunctions; import org.springframework.web.reactive.function.server.ServerResponse; import static org.springframework.web.reactive.function.server.RequestPredicates.*; @Configuration public class ProductRouter { /** * The router configuration for the product handler. * @param productHandler * @return */ @Bean public RouterFunction<ServerResponse> productsRoute(ProductHandler productHandler){ return RouterFunctions .route(GET("/products").and(accept(MediaType.APPLICATION_JSON)) ,productHandler::getAllProducts) .andRoute(POST("/product").and(accept(MediaType.APPLICATION_JSON)) ,productHandler::createProduct) .andRoute(DELETE("/product/{id}").and(accept(MediaType.APPLICATION_JSON)) ,productHandler::deleteProduct) .andRoute(PUT("/product/{id}").and(accept(MediaType.APPLICATION_JSON)) ,productHandler::updateProduct); } }
package org.smarttechie.handler; import org.smarttechie.model.Product; import org.smarttechie.service.ProductService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; import reactor.core.publisher.Mono; import static org.springframework.web.reactive.function.BodyInserters.fromObject; @Component public class ProductHandler { @Autowired private ProductService productService; static Mono<ServerResponse> notFound = ServerResponse.notFound().build(); /** * The handler to get all the available products. * @param serverRequest * @return - all the products info as part of ServerResponse */ public Mono<ServerResponse> getAllProducts(ServerRequest serverRequest) { return ServerResponse.ok() .contentType(MediaType.APPLICATION_JSON) .body(productService.getAllProducts(), Product.class); } /** * The handler to create a product * @param serverRequest * @return - return the created product as part of ServerResponse */ public Mono<ServerResponse> createProduct(ServerRequest serverRequest) { Mono<Product> productToSave = serverRequest.bodyToMono(Product.class); return productToSave.flatMap(product -> ServerResponse.ok() .contentType(MediaType.APPLICATION_JSON) .body(productService.save(product), Product.class)); } /** * The handler to delete a product based on the product id. * @param serverRequest * @return - return the deleted product as part of ServerResponse */ public Mono<ServerResponse> deleteProduct(ServerRequest serverRequest) { String id = serverRequest.pathVariable("id"); Mono<Void> deleteItem = productService.deleteProduct(Integer.parseInt(id)); return ServerResponse.ok() .contentType(MediaType.APPLICATION_JSON) .body(deleteItem, Void.class); } /** * The handler to update a product. * @param serverRequest * @return - The updated product as part of ServerResponse */ public Mono<ServerResponse> updateProduct(ServerRequest serverRequest) { return productService.update(serverRequest.bodyToMono(Product.class)).flatMap(product -> ServerResponse.ok() .contentType(MediaType.APPLICATION_JSON) .body(fromObject(product))) .switchIfEmpty(notFound); } }
到目前为止,我们已经看到了如何公开响应式REST API。通过这种实现,我使用Gatling在反应式API和非反应式API(使用Spring RestController构建非反应式API)上做了一个简单的基准测试。以下是反应式和非反应式API之间的比较指标。这不是一个广泛的基准测试。因此,在采用之前,请确保对您的用例进行广泛的基准测试。
Gatling负载测试脚本也可以在GitHub上获得,以供您参考。到此,我结束了有关“使用Spring WebFlux构建反应性REST API ”的文章。我们将在另一个主题上见面。到那时,快乐学习!!
原文链接:http://codingdict.com