我正在使用带有OpenCSV的Spring Batch Tasklet来读取我的CSV文件。在问这个问题之前,我知道很多块,但是在以后的步骤中文件之间会进行交叉验证,因此我必须继续使用Tasklet。
我想做的是向我的报告步骤报告丢失的文件或解析错误。我不确定应该向下一步报告失败的正确方法。我有以下代码。
读取文件的第一步。
public class CsvBatchReader<T> implements Tasklet, StepExecutionListener { private final Logger logger = LoggerFactory.getLogger(CsvBatchReader.class); private List batch; private final Class<T> clazz; private Path path; public CsvBatchReader(Class<T> clazz, Path path) { this.clazz = clazz; this.path = path; } @Override public void beforeStep(StepExecution stepExecution) { logger.info("Reader initialized - " + clazz.getSimpleName()); batch = new ArrayList(); } @Override public ExitStatus afterStep(StepExecution stepExecution) { logger.info("Reader ended - " + clazz.getSimpleName()); return ExitStatus.COMPLETED; } @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws UnexpectedJobExecutionException { logger.info("Reader execute - " + clazz.getSimpleName()); ICsvToBean csvToBean = new CsvToBean(clazz, path); try { batch = csvToBean.readCsv(); } catch(IOException ex) { // error message being caught from my csvToBean class. throw new UnexpectedJobExecutionException("Invalid file " + ex.getMessage()); } return RepeatStatus.FINISHED; } }
报告步骤
我不确定如何传递异常消息,或者是否有定义的方法来传递故障消息而不使用步骤执行上下文。
public class CsvBatchReporting implements Tasklet, StepExecutionListener { private final Logger logger = LoggerFactory.getLogger(CsvBatchCrossValidation.class); private List errorMessages; private List skippedInserts; @Override public void beforeStep(StepExecution stepExecution) { logger.info("Reporting initialized"); ExecutionContext executionContext = stepExecution .getJobExecution() .getExecutionContext(); System.out.println("description " + stepExecution.getStatus()); } @Override public ExitStatus afterStep(StepExecution stepExecution) { logger.info("Reporting ended"); return ExitStatus.COMPLETED; } @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { logger.info("Reporting execute"); //Email Error return RepeatStatus.FINISHED; } }
作业配置
@Bean public Job primaryCareJob(@Qualifier("reportingStep") Step reportingStep, @Qualifier("crossValidationStep") Step crossValidationStep) { logger.info("Start PrimaryCare Job"); return jobs.get("primaryCareJob") .start(readPrimaryCareStep()).on("FAILED").to(reportingStep) .from(readPrimaryCareStep()).on("*").to(readPrimaryCareDetailStep()) .from(readPrimaryCareDetailStep()).on("FAILED").to(reportingStep) .from(readPrimaryCareDetailStep()).on("*").to(processPrimaryCareStep()) .from(processPrimaryCareStep()).on("INVALID").to(reportingStep) .from(processPrimaryCareStep()).on("*").to(processPrimaryCareDetailStep()) .from(processPrimaryCareDetailStep()).on("INVALID").to(reportingStep) //Other steps .from(reportingStep).on("*").end() .from(reportingStep).on("*").fail() .build() .build(); }
我开始将工作模式更改为失败,而不是将其定义为无效,以获取异常以自动调用失败的步骤。在我的afterStep中,使用以下代码定义了我使用无效的其他步骤。
if(!errorMessages.isEmpty()) { chunkContext.getStepContext().getStepExecution().setExitStatus(new ExitStatus("INVALID")); }
如何从阅读器获取CSV异常消息以传递到我的报告步骤中,以便可以将其作为电子邮件发送?
您可以访问作业执行中上一步中引发的异常。这是一个例子:
import java.util.List; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.Step; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @EnableBatchProcessing public class MyJob { @Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory steps; @Bean public Step step1() { return steps.get("step1") .tasklet((contribution, chunkContext) -> { System.out.println("hello"); throw new Exception("Boom!"); }) .build(); } @Bean public Step step2() { return steps.get("step2") .tasklet((contribution, chunkContext) -> { JobExecution jobExecution = chunkContext.getStepContext().getStepExecution().getJobExecution(); StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next(); // TODO properly get the stepExecution of the previous step List<Throwable> failureExceptions = stepExecution.getFailureExceptions(); if (!failureExceptions.isEmpty()) { Throwable throwable = failureExceptions.get(0); System.out.println("Looks like step1 has thrown an exception: " + throwable.getMessage()); } System.out.println("world"); return RepeatStatus.FINISHED; }) .build(); } @Bean public Job job() { return jobs.get("job") .flow(step1()) .on("*").to(step2()) .build() .build(); } public static void main(String[] args) throws Exception { ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class); JobLauncher jobLauncher = context.getBean(JobLauncher.class); Job job = context.getBean(Job.class); jobLauncher.run(job, new JobParameters()); } }
该样本打印:
hello Looks like step1 has thrown an exception: Boom! world
显然,您需要确保在所有情况下步骤1都流至步骤2(因此定义了流程)。
希望这可以帮助。