这是我的用例:
所以基本上我使用的是一个非常方便的功能insert_or_update_many:
但这引入了并发问题。例如:如果在第1步中不存在对象,则将其添加到要插入的对象列表中。但是在此期间,另一个Celery任务已经创建了该对象,当它尝试执行批量插入时(步骤3),我会收到重复Entry的错误。
我想我需要将3个步骤包装在一个“阻止”块中。我已经阅读了有关事务的内容,并且尝试将步骤1,2,3包装在一个with transaction.commit_on_success:块中
with transaction.commit_on_success:
with transaction.commit_on_success(): cursor.execute(sql, parameters) existing = set(cursor.fetchall()) if not skip_update: # Find the objects that need to be updated update_objects = [o for (o, k) in object_keys if k in existing] _update_many(model, update_objects, keys=keys, using=using) # Find the objects that need to be inserted. insert_objects = [o for (o, k) in object_keys if k not in existing] # Filter out any duplicates in the insertion filtered_objects = _filter_objects(con, insert_objects, key_fields) _insert_many(model, filtered_objects, using=using)
但这对我不起作用。我不确定我是否对交易有充分的了解。我基本上需要一个块,可以在其中放置几个操作,以确保没有其他进程或线程正在访问(写入)我的数据库资源。
我基本上需要一个块,可以在其中放置几个操作,以确保没有其他进程或线程正在访问(写入)我的数据库资源。
Django交易通常不会为您保证。如果您来自计算机科学的其他领域,那么您自然会以这种方式将事务视为阻塞,但是在数据库世界中,锁的级别不同,隔离级别也不同,并且每个数据库的锁也不同。因此,为了确保您的事务能够做到这一点,您将必须了解事务,锁及其性能特征,以及数据库提供的用于控制它们的机制。
但是,让一堆进程都试图锁定表以执行竞争性插入听起来不是一个好主意。如果很少发生冲突,则可以进行某种形式的乐观锁定,如果失败则重试该事务。或者,您可以将所有这些celery任务定向到一个进程中(如果无论如何都要获取表锁,则并行执行此操作没有性能优势)。
我的建议是从忘记批量操作开始,而仅使用Django的一次执行一次update_or_create。只要您的数据库具有防止重复条目的约束(听起来确实如此),它就不会出现上述竞争条件。如果性能确实不可接受,那么可以考虑使用更复杂的选项。
update_or_create
采用开放式并发方法意味着,“带走”不是像平常那样进行冲突,而是要获取表锁,而是应该照常进行,然后在发现有问题的情况下重试该操作。在您的情况下,它可能看起来像:
while True: try: with transaction.atomic(): # do your bulk insert / update operation except IntegrityError: pass else: break
因此,如果遇到竞争状况,结果IntegrityError将导致该transaction.atomic()块回滚所做的任何更改,并且while循环将强制重试该事务(假定大容量操作现在将看到新存在的行,并且将其标记为要更新而不是插入)。
IntegrityError
transaction.atomic()
while
如果冲突很少发生,则这种方法非常有效,如果冲突频繁发生,则效果很差。