我使用以下逻辑重新启动未完成的Spring Batch(例如,在应用程序异常终止之后)作业:
public void restartUncompletedJobs() { LOGGER.info("Restarting uncompleted jobs"); try { jobRegistry.register(new ReferenceJobFactory(documetPipelineJob)); List<String> jobs = jobExplorer.getJobNames(); for (String job : jobs) { Set<JobExecution> runningJobs = jobExplorer.findRunningJobExecutions(job); for (JobExecution runningJob : runningJobs) { runningJob.setStatus(BatchStatus.FAILED); runningJob.setEndTime(new Date()); jobRepository.update(runningJob); jobOperator.restart(runningJob.getId()); LOGGER.info("Job restarted: " + runningJob); } } } catch (Exception e) { LOGGER.error(e.getMessage(), e); } }
这可以正常工作,但有一个副作用-它不会重新启动失败的作业执行,而是创建一个新的执行实例。如何更改此逻辑以便从失败的步骤重新启动失败的执行并且不创建新的执行?
更新
当我尝试以下代码时:
public void restartUncompletedJobs() { try { jobRegistry.register(new ReferenceJobFactory(documetPipelineJob)); List<String> jobs = jobExplorer.getJobNames(); for (String job : jobs) { Set<JobExecution> jobExecutions = jobExplorer.findRunningJobExecutions(job); for (JobExecution jobExecution : jobExecutions) { jobOperator.restart(jobExecution.getId()); } } } catch (Exception e) { LOGGER.error(e.getMessage(), e); } }
它失败,但以下异常:
2018-07-30 06:50:47.090 ERROR 1588 --- [ main] c.v.p.d.service.batch.BatchServiceImpl : Illegal state (only happens on a race condition): job execution already running with name=documetPipelineJob and parameters={ID=826407fa-d3bc-481a-8acb-b9643b849035, inputDir=/home/public/images, STORAGE_TYPE=LOCAL} org.springframework.batch.core.UnexpectedJobExecutionException: Illegal state (only happens on a race condition): job execution already running with name=documetPipelineJob and parameters={ID=826407fa-d3bc-481a-8acb-b9643b849035, inputDir=/home/public/images, STORAGE_TYPE=LOCAL} at org.springframework.batch.core.launch.support.SimpleJobOperator.restart(SimpleJobOperator.java:283) ~[spring-batch-core-4.0.1.RELEASE.jar!/:4.0.1.RELEASE] at org.springframework.batch.core.launch.support.SimpleJobOperator$$FastClassBySpringCGLIB$$44ee6049.invoke(<generated>) ~[spring-batch-core-4.0.1.RELEASE.jar!/:4.0.1.RELEASE] at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) [spring-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE] at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:684) [spring-aop-5.0.6.RELEASE.jar!/:5.0.6.RELEASE] at org.springframework.batch.core.launch.support.SimpleJobOperator$$EnhancerBySpringCGLIB$$7659d4c.restart(<generated>) ~[spring-batch-core-4.0.1.RELEASE.jar!/:4.0.1.RELEASE] at com.example.pipeline.domain.service.batch.BatchServiceImpl.restartUncompletedJobs(BatchServiceImpl.java:143) ~[domain-0.0.1.jar!/:0.0.1]
以下代码在作业存储数据库中创建新的执行:
public void restartUncompletedJobs() { try { jobRegistry.register(new ReferenceJobFactory(documetPipelineJob)); List<String> jobs = jobExplorer.getJobNames(); for (String job : jobs) { Set<JobExecution> jobExecutions = jobExplorer.findRunningJobExecutions(job); for (JobExecution jobExecution : jobExecutions) { jobExecution.setStatus(BatchStatus.STOPPED); jobExecution.setEndTime(new Date()); jobRepository.update(jobExecution); Long jobExecutionId = jobExecution.getId(); jobOperator.restart(jobExecutionId); } } } catch (Exception e) { LOGGER.error(e.getMessage(), e); } }
问题是-如何在应用程序重启后如何继续运行旧的未完成的执行而又不创建新的执行?
TL; DR:Spring Batch将始终创建新的作业执行,并且不会重复使用先前失败的作业执行来继续执行。
更长的答案:首先,您需要了解Spring Batch中的三个相似但不同的概念:作业,作业实例,作业执行
我总是用这个例子:
在较高级别,这就是Spring Batch的恢复工作方式:
假设您的第一次执行在步骤3中失败,则可以提交具有相同参数(2018-01-01)的相同作业(日末批)。Spring Batch将尝试查找提交的 作业实例* (2018-01-01的日间批处理)的最新 作业执行 (2018-01-01的日间批处理,执行#1 ),并找到它之前在步骤3中失败。然后,Spring Batch将创建一个新的执行, [2018年1月1日的日末批次,执行#2] ,然后从步骤3开始执行。 * __
因此,根据设计,Spring尝试恢复的是先前失败的 作业实例 (而不是作业执行)。当您重新运行先前失败的执行时,Spring批处理将不会重复使用执行。