我正在使用spring JPA和HTTP postrequest,逐行获取数据,然后将数据发布到HTTP请求到API,它对我来说很好用,但是在这里我正在处理大量数据,所以我必须使用多线程但我是java和spring的新手,我如何实现与10个线程一起使用的功能,每个线程每次并行读取1k呢?
JPA
HTTP post
我已经阅读了有关10个线程的多线程的知识,每个线程每次读取1k行,我的数据库中大约有1000万条记录
AccessingDataJpaApplication类:
@SpringBootApplication public class AccessingDataJpaApplication implements CommandLineRunner { private static final Logger logger = LoggerFactory.getLogger(AccessingDataJpaApplication.class); @Autowired private Bulk_repositoryRepository bulk_repositoryRepository; public static void main(String[] args) { SpringApplication.run(AccessingDataJpaApplication.class); } Date currentDate = new Date(); @Override public void run(String... args) throws Exception { RestTemplate restTemplate = new RestTemplate(); HttpHeaders headers = new HttpHeaders(); headers.setAccept(Arrays.asList(MediaType.APPLICATION_JSON)); headers.setBasicAuth("user", "pass"); while(true) { Date currentDate = new Date(); logger.info("Just Started"); for (Bulk_repository churnss : bulk_repositoryRepository.findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(0,2,currentDate,currentDate)) { System.out.print(churnss); logger.info(churnss.toString()); AddOfferRequest AddOffer = new AddOfferRequest("113", churnss.getMsisdn(),churnss.getParam1()); logger.info(AddOffer.toString()); HttpEntity<AddOfferRequest> entity = new HttpEntity<AddOfferRequest>(AddOffer,headers); ResponseEntity<String> responseEntity = restTemplate.exchange( "api link", HttpMethod.POST, entity, String.class); if(responseEntity.getStatusCode() == HttpStatus.OK){ String response = responseEntity.getBody(); churnss.setStatus(1); churnss.setProcessDate(new Date()); churnss.setFulfilment_status(response); logger.info(churnss.toString() + ", Response: " + response); bulk_repositoryRepository.save(churnss); }else { logger.warn("Record Id: " + churnss.getId() + ", Http Failed Response: " + responseEntity.getStatusCode()); } } Thread.sleep(1000); } } }
Bulk_repository类:
@Entity @Table(name = "BULK_REPOSITORY") public class Bulk_repository { @Id @GeneratedValue(strategy=GenerationType.AUTO) @Column(name = "id") private long id; @Column(name = "msisdn") private String msisdn; @Column(name = "camp_start_date") private Date campStartDate; @Column(name = "camp_end_date") private Date campEndDate; @Column(name = "camp_type") private int campType; @Column(name = "camp_cd") private String camp_cd; @Column(name = "status") private int status; @Column(name = "process_date") private Date processDate; @Column(name = "entry_date") private Date entryDate; @Column(name = "entry_user") private String entry_user; @Column(name = "param1") private String param1; @Column(name = "param2") private String param2; @Column(name = "param3") private String param3; @Column(name = "param4") private String param4; @Column(name = "param5") private String param5; @Column(name = "error_desc") private String error_desc; @Column(name = "fulfilment_status") private int fulfilment_status; ##then getter and setters and tostring
Bulk_repositoryRepository类:
public interface Bulk_repositoryRepository extends CrudRepository<Bulk_repository, Long> { Date today = new Date(); List<Bulk_repository>findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(int status, int campType,Date today0, Date today1); Bulk_repository findById(long id); }
AddOfferRequest类:
public class AddOfferRequest { private String ChannelID="113"; private String MSISDN; private String ServiceID; public AddOfferRequest() { } public AddOfferRequest(String channelID,String mSISDN,String serviceID ) { this.MSISDN = mSISDN; this.ServiceID = serviceID; } ## then getter and setter and tostring
我创建了AsyncConfiguration类:
package com.example.accessingdatajpa; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; @Configuration @EnableAsync public class AsyncConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(AsyncConfiguration.class); @Bean (name = "taskExecutor") public Executor taskExecutor() { LOGGER.debug("Creating Async Task Executor"); final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(2); executor.setQueueCapacity(1000); executor.setThreadNamePrefix("CarThread-"); executor.initialize(); return executor; } }
但是直到现在我还是不明白如何将findby和http post与多线程结合起来
重写您的代码。而不是List<Bulk_repository>返回Stream<Bulk_repository>。这将懒惰地从数据库中加载记录,而不是尝试立即执行所有操作。
List<Bulk_repository>
Stream<Bulk_repository>
然后使用TaskExecutor来为每个线程执行不同的请求,只给它一个任务,它将在有空闲线程时执行。
TaskExecutor
@SpringBootApplication public class AccessingDataJpaApplication implements CommandLineRunner { private static final Logger logger = LoggerFactory.getLogger(AccessingDataJpaApplication.class); @Autowired private Bulk_repositoryRepository bulk_repositoryRepository; @Autowired private AsyncTaskExecutor executor; @Autowired private RestTemplate rest; public static void main(String[] args) { SpringApplication.run(AccessingDataJpaApplication.class); } @Override public void run(String... args) throws Exception { Date currentDate = new Date(); Stream< Bulk_repository> results = Bulk_repository churnss : bulk_repositoryRepository.findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(0,2,currentDate,currentDate); results.forEach(it -> executor.submit(this.process(it))); Thread.sleep(1000); } private void process(RestTemplate rest, Bulk_repository churnss) { AddOfferRequest AddOffer = new AddOfferRequest("113", churnss.getMsisdn(),churnss.getParam1()); HttpEntity<AddOfferRequest> entity = new HttpEntity<AddOfferRequest>(AddOffer,headers); try { ResponseEntity<String> responseEntity = restTemplate.exchange( "api link", HttpMethod.POST, entity, String.class); if(responseEntity.getStatusCode() == HttpStatus.OK){ String response = responseEntity.getBody(); churnss.setStatus(1); churnss.setProcessDate(new Date()); churnss.setFulfilment_status(response); bulk_repositoryRepository.save(churnss); }else { logger.warn("Record Id: {}, Http Failed Response: {}",churnss.getId(), responseEntity.getStatusCode()); } } catch (RestClientException rce) { logger.warn("Record Id: {} Http Failed. ", churnss.getId(), rce); } } }
注意: 这是从我的头顶输入的,未经测试。但是应该提供一些指导。