我很难使用Spring-boot将Spring-batch与Spring-batch-admin结合使用。
因为我使用的是Spring-batch-admin,所以必须禁用@EnableBatchProcessing,然后手动配置它提供的两个构建器。
现在我想做一个简单的场景,当我想使用@StepScope和属性注入将jobContext参数中的参数传递给ItemReader时
我在这里遵循了示例(该示例没有spring-batch-admin和spring boot)
使用@StepScope解释
我得到这个异常:
2015-01-06 18:22:09.852 ERROR 10260 --- [nio-8080-exec-1] o.s.batch.core.step.AbstractStep : Exception while closing step execution resources in step step2 in job MyTestJob java.lang.ClassCastException: com.sun.proxy.$Proxy74 cannot be cast to org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader$$FastClassBySpringCGLIB$$ebb633d0.invoke(<generated>) at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:717) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:133) at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:121) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:653) at org.springframework.batch.item.file.FlatFileItemReader$$EnhancerBySpringCGLIB$$e99a9c6b.close(<generated>) at org.springframework.batch.item.support.CompositeItemStream.close(CompositeItemStream.java:85) at org.springframework.batch.core.step.tasklet.TaskletStep.close(TaskletStep.java:305) at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:267) at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:386) at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135) at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:304) at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:135) at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128) at com.mycompany.notification.processor.service.controller.BatchJobController.job1(BatchJobController.java:38) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221) at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:137) at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:110) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandleMethod(RequestMappingHandlerAdapter.java:777) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:706) at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85) at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:943) at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:877) at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:966) at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:857) at javax.servlet.http.HttpServlet.service(HttpServlet.java:618) at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:842) at javax.servlet.http.HttpServlet.service(HttpServlet.java:725) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:291) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:88) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) at org.springframework.boot.actuate.autoconfigure.EndpointWebMvcAutoConfiguration$ApplicationContextHeaderFilter.doFilterInternal(EndpointWebMvcAutoConfiguration.java:288) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:77) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) at org.springframework.batch.admin.web.filter.ParameterUnpackerFilter.doFilterInternal(ParameterUnpackerFilter.java:99) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) at org.springframework.boot.actuate.trace.WebRequestTraceFilter.doFilterInternal(WebRequestTraceFilter.java:100) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) at org.springframework.boot.actuate.autoconfigure.MetricFilterAutoConfiguration$MetricsFilter.doFilterInternal(MetricFilterAutoConfiguration.java:90) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:219) at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106) at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:501) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:142) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88) at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:537) at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1085) at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:658) at org.apache.coyote.http11.Http11NioProtocol$Http11ConnectionHandler.process(Http11NioProtocol.java:222) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1556) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1513) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.lang.Thread.run(Thread.java:745) 2015-01-06 18:22:09.866 INFO 10260 --- [nio-8080-exec-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=MyTestJob]] completed with the following parameters: [{pathToFile=snids.csv, date=1420561328719}] and the following status: [FAILED]
这是我通过控制器创建作业的方式:
@RequestMapping(value = "/job1") public @ResponseBody StatusResponse job1() { try { JobParameters jobParameters = new JobParametersBuilder().addString("pathToFile", "snids.csv").addDate("date", new Date()).toJobParameters(); jobLauncher.run(job1,jobParameters); return new StatusResponse(true); } catch (JobInstanceAlreadyCompleteException ex) { return new StatusResponse(false, "This job has been completed already!"); } catch (Exception e) { throw new RuntimeException(e); } }
Application.class:
@ComponentScan({"com.mycompany.notification.processor.service"}) @EnableAutoConfiguration @Configuration @ImportResource({ "classpath:integration-context.xml","classpath:launch-context.xml"}) @PropertySource("file:///d://etc/mycompany/services/pushExecuterService/pushExecuterServices.properties") //@Import({ ServletConfiguration.class, WebappConfiguration.class }) public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); System.out.printf("hello man"); } }
和BatchConfiguration类:
@Configuration public class BatchConfiguration { private static final Logger logger = LoggerFactory.getLogger(BatchConfiguration.class); @Inject private WriteToKafkaTasklet writeToKafkaTasklet; @Inject private StepBuilderFactory steps; @Autowired private DownloadFileTasklet downloadFileTasklet; @Inject private JobBuilderFactory jobs; @Inject private StepBuilderFactory stepBuilderFactory; @Bean public Job job() throws Exception { return this.jobs.get("MyTestJob").start(downloadFileFromETStep()).next(processFileStep()).next(pushToKafkaStep()).build(); } private Step pushToKafkaStep() { return this.steps.get("step3").tasklet(writeToKafkaTasklet).build(); } private Step downloadFileFromETStep() { return this.steps.get("step1").tasklet(downloadFileTasklet).build(); } private static final String OVERRIDDEN_BY_EXPRESSION = null; @Bean public Step processFileStep() { return stepBuilderFactory.get("step2") .<PushItemDTO, PushItemDTO>chunk(1) //important to be one in this case to commit after every line read .reader(reader(OVERRIDDEN_BY_EXPRESSION)) // .processor(processor()) .writer(writer()) // .listener(logProcessListener()) // .faultTolerant() // .skipLimit(10) //default is set to 0 // .skip(MySQLIntegrityConstraintViolationException.class) .build(); } @Bean public ItemWriter writer() { return new ItemWriter() { @Override public void write(List items) throws Exception { logger.debug("hello"); } }; } @Bean @Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES) public FlatFileItemReader<PushItemDTO> reader(@Value("#{jobParameters[pathToFile]}") String pathToFile){ FlatFileItemReader<PushItemDTO> itemReader = new FlatFileItemReader<PushItemDTO>(); itemReader.setLineMapper(lineMapper()); itemReader.setResource(new ClassPathResource(pathToFile)); return itemReader; } @Bean public LineMapper<PushItemDTO> lineMapper() { DefaultLineMapper<PushItemDTO> lineMapper = new DefaultLineMapper<PushItemDTO>(); DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer(); // lineTokenizer.setDelimiter(","); lineTokenizer.setStrict(false); lineTokenizer.setNames(new String[]{"SNID"}); BeanWrapperFieldSetMapper<PushItemDTO> fieldSetMapper = new BeanWrapperFieldSetMapper<PushItemDTO>(); fieldSetMapper.setTargetType(PushItemDTO.class); lineMapper.setLineTokenizer(lineTokenizer); lineMapper.setFieldSetMapper(new PushItemFieldSetMapper()); return lineMapper; } }
Servlet配置:
@Configuration @ImportResource("classpath:/org/springframework/batch/admin/web/resources/servlet-config.xml") public class ServletConfiguration { }
Webapp配置:
@Configuration @ImportResource("classpath:/org/springframework/batch/admin/web/resources/webapp-config.xml") public class WebappConfiguration { }
谢谢
我编辑了代码,而不是添加了@StepScope
@Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES)
按照建议。
现在我得到了不同的异常:
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'batchJobController': Injection of autowired dependencies failed; nested exception is org.springframework.beans.factory.BeanCreationException: Could not autowire field: private org.springframework.batch.core.Job com.mycompany.notification.processor.service.controller.BatchJobController.job1; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'job' defined in class path resource [com/mycompany/notification/processor/service/batch/conf/flow/BatchConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.core.Job]: Factory method 'job' threw exception; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'processFileStep' defined in class path resource [com/mycompany/notification/processor/service/batch/conf/flow/BatchConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.core.Step]: Factory method 'processFileStep' threw exception; nested exception is java.lang.ClassCastException: com.sun.proxy.$Proxy59 cannot be cast to org.springframework.batch.item.file.FlatFileItemReader at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessPropertyValues(AutowiredAnnotationBeanPostProcessor.java:334) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1202) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:537) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:476) at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:302) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:298) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:193) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:762) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:757) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:480) at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:109) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:691) at org.springframework.boot.SpringApplication.run(SpringApplication.java:321) at org.springframework.boot.SpringApplication.run(SpringApplication.java:961) at org.springframework.boot.SpringApplication.run(SpringApplication.java:950) at com.mycompany.notification.processor.service.main.Application.main(Application.java:20) Caused by: org.springframework.beans.factory.BeanCreationException: Could not autowire field: private org.springframework.batch.core.Job com.mycompany.notification.processor.service.controller.BatchJobController.job1; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'job' defined in class path resource [com/mycompany/notification/processor/service/batch/conf/flow/BatchConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.core.Job]: Factory method 'job' threw exception; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'processFileStep' defined in class path resource [com/mycompany/notification/processor/service/batch/conf/flow/BatchConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.core.Step]: Factory method 'processFileStep' threw exception; nested exception is java.lang.ClassCastException: com.sun.proxy.$Proxy59 cannot be cast to org.springframework.batch.item.file.FlatFileItemReader at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.inject(AutowiredAnnotationBeanPostProcessor.java:558) at org.springframework.beans.factory.annotation.InjectionMetadata.inject(InjectionMetadata.java:87) at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessPropertyValues(AutowiredAnnotationBeanPostProcessor.java:331) ... 16 common frames omitted
Spring BatchAdmin通过XML配置StepScope,它提供了Java代理作为代理机制。但是,@StepScope使用动态子类。为了使它起作用,而不是使用@StepScope快捷方式,请使用以下命令:
@StepScope
除了上述更新之外,您还需要返回接口(而不是实现),以便正确地对其进行代理。因此,在这种情况下,您将要返回ItemStreamReader而不是FlatFileItemReader。
ItemStreamReader
FlatFileItemReader