介绍和范围 Servlet规范是使用阻塞语义或每个线程一个请求的模型构建的。通常,在云环境中,机器比传统的数据中心要小。代替大型计算机,流行使用许多小型计算机并尝试水平扩展应用程序。在这种情况下,可以将Servlet规范切换到基于响应流创建的体系结构。对于云环境,这种架构比servlet更适合。Spring框架一直在创建Spring WebFlux,以帮助开发人员创建反应式Web应用程序[1]。
基于Project Reactor的Spring WebFlux使我们能够:
在我们的演示项目中,我们将以响应式WebClient组件为中心,对远程服务进行调用。这实际上是一个很好的起点,也是一个非常普遍的情况。如[2]中所述,每个呼叫的延迟或呼叫之间的相互依赖性越大,性能优势就越明显。
这种方法的另一个动机是这样的事实,即从Spring 5.0版本开始,org.springframework.web.client.RestTemplate 该类处于维护模式,并且仅接受很小的更改和错误请求。因此,建议开始使用org.springframework.web.reactive.client.WebClient 具有更现代API的。此外,它支持同步,异步和流传输方案。
我们将基于最小的微服务架构构建示例应用程序,并演示由每个用例需求驱动的几种功能。除了对Java的理解之外,还需要熟悉Spring Cloud项目,Project Reactor的基本知识(什么是Mono,什么是Flux)以及Spring WebClient的基础知识。
我们将遵循业务问题解决方案的方法,使事情变得更现实。尽管我们不打算使用所有反应式API,但如果您要进入此领域并需要不同的心态,就足以让您很好地了解未来的情况。这是Mono和Flux的简单“ hello-world”示例无法简单捕获的。
响应式编程目前正在流行,但是使用Mono&Flux的琐碎“ hello-world”示例不能简单地捕获为生产用途构建12要素应用程序的需求。如果要为实际生产系统做出决策,您将需要的远远超过此。
使用反应式WebClient,我们可以直接从Spring MVC控制器方法返回反应式类型(例如Flux或Mono)。Spring MVC控制器也可以调用其他反应式组件。如果我们由于某些原因而使某些端点和服务无法响应,则也可以进行混合,例如:在没有响应选择的情况下阻止依赖关系,或者我们可能有一个想要逐步迁移的现有遗留应用程序等。我们将遵循带注释的控制器编程模型。
方案/项目结构 我们将实现简化的一次性密码(OTP)服务,提供以下功能:
我们的应用程序将包含以下微服务:
customer-service:将保留我们服务注册用户的目录,其中包含帐户ID,MSISDN,电子邮件等信息。 许多远程(外部)服务将被调用。我们假设我们的应用程序有权使用它们,并将通过其REST API访问它们。当然,为简单起见,将对它们进行嘲笑。这些“第三方”服务是:
number-information:将电话号码作为输入,并验证它是否属于电信运营商并且当前处于活动状态
由于以下原因,我们选择Spring Cloud Gateway而不是Zuul:
我们将使用Spring Cloud Loadbalancer(而不是Ribbon)来进行客户端负载平衡,并使用@LoadBalancedWebClient(而不是Feign)来进行服务到服务的通信。除此之外,每个微服务都将基于Spring Boot。我们还将使Spring Data R2DBC发挥作用,以使用反应性驱动程序与PostgreSQL数据库集成。我们的组件图如下所示:
完整的源代码可以在https://github.com/kmandalas/webclient-showcase中进行检查。
一,生成OTP 业务需求 给定E.164格式的用户数:
解决方案
您可以在此处查看完整的实施
// 1st call to customer-service using @LoadBalanced WebClient Mono<CustomerDTO> customerInfo = loadbalanced.build() .get() .uri(customerURI) .header("Authorization", String.format("%s %s", "Bearer", tokenUtils.getAccessToken())) .accept(MediaType.APPLICATION_JSON) .retrieve() .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new OTPException("Error retrieving Customer", FaultReason.CUSTOMER_ERROR))) .bodyToMono(CustomerDTO.class); // 2nd call to external service, to check that the MSISDN is valid Mono<String> msisdnStatus = webclient.build() .get() .uri(numberInfoURI) .retrieve() .onStatus(HttpStatus::isError, clientResponse -> Mono.error( new OTPException("Error retrieving msisdn status", FaultReason.NUMBER_INFORMATION_ERROR))) .bodyToMono(String.class); // Combine the results in a single Mono, that completes when both calls have returned. // If an error occurs in one of the Monos, execution stops immediately. // If we want to delay errors and execute all Monos, then we can use zipDelayError instead Mono<Tuple2<CustomerDTO, String>> zippedCalls = Mono.zip(customerInfo, msisdnStatus); // Perform additional actions after the combined mono has returned return zippedCalls.flatMap(resultTuple -> { // After the calls have completed, generate a random pin int pin = 100000 + new Random().nextInt(900000); // Save the OTP to local DB, in a reactive manner Mono<OTP> otpMono = otpRepository.save(OTP.builder() .customerId(resultTuple.getT1().getAccountId()) .msisdn(form.getMsisdn()) .pin(pin) .createdOn(ZonedDateTime.now()) .expires(ZonedDateTime.now().plus(Duration.ofMinutes(1))) .status(OTPStatus.ACTIVE) .applicationId("PPR") .attemptCount(0) .build()); // External notification service invocation Mono<NotificationResultDTO> notificationResultDTOMono = webclient.build() .post() .uri(notificationServiceUrl) .accept(MediaType.APPLICATION_JSON) .body(BodyInserters.fromValue(NotificationRequestForm.builder() .channel(Channel.AUTO.name()) .destination(form.getMsisdn()) .message(String.valueOf(pin)) .build())) .retrieve() .bodyToMono(NotificationResultDTO.class); // When this operation is complete, the external notification service // will be invoked. The results are combined in a single Mono: return otpMono.zipWhen(otp -> notificationResultDTOMono) // Return only the result of the first call (DB) .map(Tuple2::getT1); });
首先,我们看到我们需要与1个内部微服务(服务到服务通信)和2个外部(远程)服务进行通信。
如前所述,我们选择使用@LoadBalancedWebClient。因此,我们需要在类路径中有一个负载均衡器实现。在我们的案例中,我们已将spring-cloud loadbalancer依赖项添加到项目中。这样,ReactiveLoadBalancer 将在引擎盖下使用a 。
或者,此功能也可以与spring-cloud-starter-netflix-ribbon一起使用,但是该请求将由非反应式处理LoadBalancerClient。 另外,就像我们说的那样,Ribbon已处于维护模式,因此不建议在新项目中使用[3]。
我们需要做的另一件事是在服务的应用程序属性中禁用功能区:
spring: loadbalancer: ribbon: enabled: false
最后,关于Feign的说明(到目前为止,它与Ribbon一起是一个非常受欢迎的选择):OpenFeign项目当前不支持响应客户端,Spring Cloud OpenFeign也不支持。因此,我们将不使用它。有关更多详细信息,请点击此处。
现在,这是现实应用程序可能会遇到的几个实际问题:
需要多个WebClient对象。 在我们的各个端点都受到保护的情况下传播JWT令牌。 为了解决第一个问题,我们将在WebClientConfig 类中声明2个不同的WebClient Bean 。这是必要的,因为服务发现和负载平衡仅适用于我们自己的域和服务。因此,我们需要使用WebClient Bean的不同实例,这些实例在配置(例如超时)方面当然可能比@LoadBalanced注解有更多差异。
对于第二个问题,我们需要在WebClient的header属性内传播访问令牌:
.header("Authorization", String.format("%s %s", "Bearer", tokenUtils.getAccessToken()))
在上面的代码段中,我们假设有一个实用程序方法,该方法将从通过Spring Cloud Gateway转发到otp-service的传入请求中获取JWT令牌。我们使用它来设置“授权”标头属性和承载令牌的值,以有效地将其传递给客户服务。请记住,还需要在以下设置application.yml中的网关服务,以便使该继电器:
globalcors: cors-configurations: '[/**]': allowedOrigins: ["*"] allowedMethods: ["POST","GET","DELETE","PUT"] allowedHeaders: "*" allowCredentials: true
现在我们已经解决了这些问题,让我们看看可以使用哪些Reactor Publisher函数获得结果:
Tuple2<CustomerDTO, String>
ReactiveCrudRepository
二。验证OTP 业务需求 给定现有的OTP ID和先前已交付给用户设备的有效引脚:
解决方案 您可以在此处检查完整的实现:
@NewSpan public Mono<OTP> validate(Long otpId, Integer pin) { log.info("Entered resend with arguments: {}, {}", otpId, pin); AtomicReference<FaultReason> faultReason = new AtomicReference<>(); return otpRepository.findById(otpId) .switchIfEmpty(Mono.error(new OTPException("Error validating OTP", FaultReason.NOT_FOUND))) .zipWhen(otp -> applicationRepository.findById(otp.getApplicationId())) .flatMap(Tuple2 -> { // perform various status checks here... if (!otp.getStatus().equals(OTPStatus.TOO_MANY_ATTEMPTS)) otp.setAttemptCount(otp.getAttemptCount() + 1); if (otp.getStatus().equals(OTPStatus.VERIFIED)) return otpRepository.save(otp); else { return Mono.error(new OTPException("Error validating OTP", faultReason.get(), otp)); } }) .doOnError(throwable -> { if (throwable instanceof OTPException) { OTPException error = ((OTPException) throwable); if (!error.getFaultReason().equals(FaultReason.NOT_FOUND) && error.getOtp() != null) { otpRepository.save(error.getOtp()).subscribe(); } } }); }
我们首先使用反应式CRUD存储库按ID查询OTP。注意,对于这种简单的查询,不需要任何实现 然后,如果未找到任何记录,则使用switchIfEmpty和Mono.error方法引发Exception。我们带@ControllerAdvice注释的Bean负责其余所有工作 否则,如果找到记录,我们将使用zipWhen构建下一步以从“应用程序”表中获得允许的最大尝试次数
我们再次使用Flatmap将条件逻辑应用于返回的结果。请注意,先前对zipWhen的调用以Tuple的形式给出,即Tuple2<OTP, Application>允许访问这些对象及其所包含的信息
如果所有验证均通过,则将OTP的状态更新为VERIFIED并返回结果,否则通过Mono.error返回Exception 。同样,OTPControllerAdvice通过返回正确的状态和消息来完成工作
我们还没有完成。即使在发生Mono.error的情况下,我们仍然需要更新数据库中的内容。因此,最后有doOnError方法。顾名思义,它充当错误处理程序,因此我们可以在其中执行相关操作。这就像一个finally子句,但有错误。请记住,doOnSuccess也存在,并且还有其他变体 让我们停下来这里了一会儿,发现,我们的内部doOnError到底方法,我们称之为订阅方法。如果您查看Reactor文档,通常使用subscription方法及其变体来触发反应式操作链的执行。但是到目前为止,我们在代码中的任何地方都没有它。而且我们不需要它,因为我们一直将Mono或Flux返回到Rest控制器。他们是在幕后为我们执行订阅的人。正如罗森·斯托扬切夫(Rossen Stoyanchev)在他的必看演示文稿中指出的那样,“针对MVC开发人员的“反应式”指南,“您应该继续努力”,即,如果可能的话,不要阻塞并从端点返回Reactive类型。在另一方面,我们需要使用订阅 里面方法doOnError,因为我们没有任何回报,所以我们需要以某种方式触发我们的反应库来执行更新。
三,重新发送OTP 业务需求 给定给定一个现有的OTP id,可以同时(并行)重新发送到多个渠道(SMS,电子邮件,Viber等):
return otpRepository.findById(otpId) .switchIfEmpty(Mono.error(new OTPException("Error resending OTP", FaultReason.NOT_FOUND))) .zipWhen(otp -> { // perform various status checks here... List<Mono<NotificationResultDTO>> monoList = channels.stream() .filter(Objects::nonNull) .map(method -> webclient.build() .post() .uri(notificationServiceUrl) .accept(MediaType.APPLICATION_JSON) .body(BodyInserters.fromValue(NotificationRequestForm.builder() .channel(method) .destination(Channel.EMAIL.name().equals(method) ? mail : otp.getMsisdn()) .message(otp.getPin().toString()) .build())) .retrieve() .bodyToMono(NotificationResultDTO.class)) .collect(Collectors.toList()); return Flux.merge(monoList).collectList(); }) .map(Tuple2::getT1);
我们在这一点上的重点是演示并行调用相同的端点。在第一个用例中,我们看到了如何使用Mono.zip并行调用不同的端点,并返回不同的类型。现在,我们将使用Flux.merge代替。
让我们看看我们是如何做到这一点的:
Flux.merge
Notification-service
List<Mono<NotificationResultDTO>>
解决方案 您可以在此处检查实施。
最后,我们留下了更简单的案例,因为您会发现许多示例通常都是通过查询具有反应性驱动程序支持的NoSQL数据库的关系来简单地返回Flux或Mono。请记住,尽管在关系数据库的情况下,Spring Data R2DBC(R2DBC代表反应型关系数据库连接)没有提供ORM框架(JPA,Hibernate)的许多功能。它的主要目标是成为一个简单的,有限的,自以为是的对象映射器。因此,如果您过去使用过JPA和Hibernate的项目,那么也准备对这部分内容有个转变。
其他话题 记录中 日志记录是每种软件的重要方面。基于微服务架构的解决方案对集中式日志记录还有其他要求。但是,当我们使用File Appenders进行日志记录时,会遇到问题,因为此I / O操作被阻止。有关示例,请参见以下问题:
https://github.com/spring-projects/spring-framework/issues/25547 一个解决方案是选择和配置主要由SLF4J实现(例如Log4j和Logback)支持的Async Appenders。在我们的示例中,我们使用Logback AsyncAppender。可以在此处看到示例配置。
https://github.com/spring-projects/spring-framework/issues/25547
在AsyncAppender 具有五(5)配置选项:
queueSize
throwingThreshold
neverBlock
includeCallerData
maxFlushTime
分布式跟踪
跟踪是微服务监控的另一个重要方面。我们可以使用Spring Cloud Sleuth和Jaeger跟踪从微服务进行的所有调用。
Sleuth提供了方便的自动配置,可以与Spring MVC和WebFlux等流行框架直接使用。它允许自动注入跟踪和跨度ID,并在日志中显示此信息,以及基于注释的跨度控件。要使其与Jaeger一起使用,我们需要在Jaeger的配置中启用Zipkin收集器端口。
要记住的一件事是,这里也确实存在局限性。例如,尚不支持使用R2DBC跟踪数据库调用。您可以在此处找到相关问题:
https://github.com/spring-cloud/spring-cloud-sleuth/issues/1524 以下是Jaeger UI主页的屏幕截图:
https://github.com/spring-cloud/spring-cloud-sleuth/issues/1524
这是跟踪生成OTP的呼叫的示例:
这是跟踪通过多个通道重新发送OTP的呼叫的示例:
@Cacheable方法的反应类型支持 Spring的@Cacheable注释是一种通常在服务级别上处理缓存的便捷方法。此缓存抽象可与各种缓存实现(包括符合JSR-107的缓存,Redis等)无缝配合。但是,在撰写本文时,仍不支持@Cacheable方法的Reactive类型。相关的问题是:
https://github.com/spring-projects/spring-framework/issues/17920 尽管Redis是一个非常常见的集中式缓存解决方案,并且Spring Data项目中存在Redis的Reactive驱动程序,但目前尚无计划添加反应式缓存实现:
https://github.com/spring-projects/spring-framework/issues/17920
https://jira.spring.io/browse/DATAREDIS-967
响应时间极长的远程端点的呼叫 需要提高并发级别,例如,我们希望并行提交10个调用,因为我们知道我们负担得起,并且适合我们的情况 为此,Spring WebFlux提供了称为Schedulers的线程池抽象。这些与任务执行器结合在一起,我们可以在其中创建不同的并发策略,设置线程的最小和最大数量等。
public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); executor.setQueueCapacity(100); executor.setThreadNamePrefix("otp-"); executor.initialize(); return executor; }
现在让我们想象一下,我们想要同时检查五个MSISDN的状态并将结果作为以下列表返回NotificationResultDTOs:
public Flux<NotificationResultDTO> checkMsisdns(List<String> msisdns) { return Flux.fromIterable(msisdns) .parallel() .runOn(Schedulers.fromExecutor(taskExecutor)) .flatMap(this::checkMsisdn) .sequential() }
BlockHound 从阻塞代码到非阻塞代码或从命令式代码到响应式代码的更改非常棘手,需要积累一定的经验水平,以使您感到自在。有时可能很难在Reactor线程中检测阻塞代码。这是因为我们不需要使用block来使事物成为阻塞,但是我们可以不知不觉地通过使用可以阻塞当前线程的库来引入阻塞。
BlockHound是帮助我们检测某些情况的有用工具。它是由Pivotal赞助的,可以以多种方式使用,但是我们建议在测试阶段使用它。您唯一需要做的就是包括以下依赖项:
<dependency> <groupId>io.projectreactor.tools</groupId> <artifactId>blockhound-junit-platform</artifactId> <version>1.0.4.RELEASE</version> </dependency>
请记住,如果使用Java 11及更高版本,则需要以下JVM参数才能使该工具正常工作:
-XX:+AllowRedefinitionToAddDeleteMethods
整合测试 在我们的示例项目中,我们展示了一个示例集成测试,该测试涵盖了我们最“复杂”的端点,即生成OTP的端点。我们使用HoverFly来模拟两个“外部”服务(即号码信息和通知服务)的响应以及对“内部”服务(即客户服务)的调用。在测试执行期间,我们还使用Testcontainer来整理dockerized的PostgresDB。
完整的代码可以在OTPControllerIntegrationTests类中看到。
OTPControllerIntegrationTests
我们还使用WebTestClient 哪个客户端来测试Web服务器,该客户端内部使用WebClient执行请求,同时还提供流利的API来验证响应。该客户端可以通过HTTP连接到任何服务器,或者通过模拟请求和响应对象连接到WebFlux应用程序。
值得一提的是我们为了模拟客户服务实例的存在而执行的“技巧” 。AServiceInstance 代表发现系统中服务的一个实例。在运行集成测试时,我们通常会禁用部分云功能,而服务发现就是其中之一。但是,由于我们在测试集成流程@LoadBalanced时调用客户服务时使用WebClient ,因此我们需要一种方法来模拟此服务的“静态”实例。此外,我们需要将其与HoverFly“绑定”,以便在调用它以返回所需的模拟响应时使用。这是通过 gr.kmandalas.service.otp.OTPControllerIntegrationTests.TestConfig静态类实现的。
@LoadBalanced
gr.kmandalas.service.otp.OTPControllerIntegrationTests.TestConfig
Async SOAP 如今,我们与大多数系统集成在一起都公开了REST端点。但是,它仍然必须与基于SOAP的Web服务集成在一起并不少见。JAX-WS和ApacheCXF都允许生成非阻塞客户端。您可以在带有JAX-WS的Reactive Web Service Client中找到有关如何处理这种情况的示例。
How to Run
要构建和测试应用程序,先决条件是:
mvn clean verify
最简单的方法是使用Docker和Docker Compose运行微服务:
docker-compose up --build
容器启动并运行后,您可以访问领事的UI来查看活动的服务:
http:// localhost:8500 / ui / dc1 / services
在下面,您可以找到curl用于通过我们的API网关调用各个端点的命令:
Generate OTP
curl --location --request POST 'localhost:8000/otp-service/v1/otp' \ --header 'Content-Type: application/json' \ --data-raw '{ "msisdn": "00306933177321" }'
Validate OTP
curl --location --request POST 'http://localhost:8000/otp-service/v1/otp/36/validate?pin=356775'
Resend OTP
curl --location --request POST 'localhost:8000/otp-service/v1/otp/2?via=AUTO,EMAIL,VIBER&[email protected]' \ 2 --header 'Content-Type: application/json'
Get All OTPs
curl --location --request GET 'localhost:8000/otp-service/v1/otp?number=00306933177321'
OTPStatus
curl --location --request GET 'localhost:8000/otp-service/v1/otp/1'
结论
表演具有许多特征和意义。反应性和非阻塞性通常不会使应用程序运行得更快。在某些情况下,例如,当使用WebClient并行运行远程调用时,它们可以同时避免与Task Executor牵扯在一起,而可以使用更优雅,更流畅的API。当然,它带有明显的学习曲线。
反应性和非阻塞性的主要预期好处是能够以较少的固定数量的线程和较少的内存进行扩展。这使应用程序在负载下更具弹性,因为它们以更可预测的方式扩展。但是,为了观察这些好处,您需要有一些延迟(包括缓慢的和不可预测的网络I / O的混合)。这就是反应堆开始显示其优势的地方,差异可能很大[8]。
在[9]上给出了一些有趣的负载测试和比较结果。结论是,在所有情况下,带有WebClient和Apache客户端的Spring Webflux都是“赢家”。当基础服务很慢(500毫秒)时,最大的区别(比阻止Servlet快4倍)出现了。它比使用的非阻塞Servlet快15–20%CompetableFuture。而且,与Servlet相比,它不会创建很多线程(20 vs 220)。
因此,如果您打算开始使用Spring构建Reactive Microservices并渴望利用上述好处,我们希望我们对即将出现的挑战和所需的准备工作有一个很好的了解。
原文链接:http://codingdict.com