我正在使用Spring Batch和Partition进行并行处理。面向数据库的Hibernate和Spring Data Jpa。对于分区步骤,读取器,处理器和写入器都具有步进镜,因此我可以向其注入分区键和范围(从-到)。现在在处理器中,我有一个同步方法,希望该方法一次可以运行一次,但事实并非如此。
我将其设置为具有10个分区,所有10个Item读取器均读取了正确的分区范围。问题来自项目处理器。打击代码与我使用的逻辑相同。
public class accountProcessor implementes ItemProcessor{ @override public Custom process(item) { createAccount(item); return item; } //account has unique constraints username, gender, and email /* When 1 thread execute that method, it will create 1 account and save it. If next thread comes in and try to save the same account, it should find the account created by first thread and do one update. But now it doesn't happen, instead findIfExist return null and it try to do another insert of duplicate data */ private synchronized void createAccount(item) { Account account = accountRepo.findIfExist(item.getUsername(), item.getGender(), item.getEmail()); if(account == null) { //account doesn't exist account = new Account(); account.setUsername(item.getUsername()); account.setGender(item.getGender()); account.setEmail(item.getEmail()); account.setMoney(10000); } else { account.setMoney(account.getMoney()-10); } accountRepo.save(account); } }
预期的输出是,在任何给定时间只有1个线程将运行此方法,因此db中将没有重复的插入,并且避免了DataintegrityViolationexception。
实际结果是第二个线程找不到第一个帐户并尝试创建重复的帐户并保存到db,这将导致DataintegrityViolationexception,唯一约束错误。
由于我已同步该方法,因此线程应按顺序执行它,第二个线程应等待第一个线程完成然后运行,这意味着它应该能够找到第一个帐户。
我尝试了许多方法,例如使用volatile集来包含所有唯一帐户,使用savelocal进行saveAndFlush来尽快提交提交,但这些方法均无效。
需要一些帮助。
由于您使商品处理器成为跨步域的,因此您实际上并不需要同步,因为每个步骤都有自己的处理器实例。
但是看起来您遇到的是设计问题,而不是实现问题。您正在尝试使线程同步以在并行设置中以特定顺序执行。当您决定进行并行处理并将数据划分为多个分区并为每个工作线程(本地或远程)分配一个要处理的分区时,您必须承认这些分区将以未定义的顺序进行处理,并且记录之间应该没有关系每个分区之间的距离,或每个工人完成的工作之间的距离。
当1个线程执行该方法时,它将创建1个帐户并保存。如果下一个线程进入并尝试保存相同的帐户,则它应找到第一个线程创建的帐户并进行一次更新。但是现在它没有发生,而是findIfExist返回null并尝试再次插入重复数据
这是因为线程1的事务可能尚未提交,因此线程2将找不到您认为已被线程1插入的记录。
您似乎正在尝试使用分区设置来创建或更新某些帐户。我不确定此设置是否适合当前的问题。
附带说明一下,我不会accountRepo.save(account);在项目处理器中调用而是在项目编写器中进行调用。
accountRepo.save(account);
希望这可以帮助。