我有一个Web服务(Python 3.7,Flask 1.0.2),其工作流程包括3个步骤:
远程计算作业的长度是任意的(介于秒和天之间),并且每个步骤都取决于上一个步骤的完成:
with Connection(redis.from_url(current_app.config['REDIS_URL'])): q = Queue() job1 = q.enqueue(step1) job2 = q.enqueue(step2, depends_on=job1) job3 = q.enqueue(step3, depends_on=job2)
但是,最终所有工作人员(4个工作人员)将进行轮询(4个客户请求中的第2步),而他们应继续执行其他传入请求的第1步以及已成功通过第2步的那些工作流的第3步。
每次民意调查后都应释放工人。他们应该定期返回步骤2进行下一次轮询(每个作业最多每61秒一次),如果远程计算作业轮询未返回“ DONE”,则重新排队该轮询作业。
在这一点上,我开始使用rq-scheduler(因为间隔和重新排队功能听起来很有希望):
rq-scheduler
with Connection(redis.from_url(current_app.config['REDIS_URL'])): q = Queue() s = Scheduler('default') job1 = q.enqueue(step1, REQ_ID) job2 = Job.create(step2, (REQ_ID,), depends_on=job1) job2.meta['interval'] = 61 job2.origin = 'default' job2.save() s.enqueue_job(job2) job3 = q.enqueue(step3, REQ_ID, depends_on=job2)
Job2已正确创建(包括与depends_onjob1 的关系,但s.enqueue_job()立即执行它,而忽略了其与job1的关系。(q.enqueue_job()函数doc- string实际上说它是立即执行的…) 。
depends_on
depends_on 当将job2放在调度程序中而不是队列中时,如何创建job1,job2和job3之间的关系?(或者,如何在不立即执行job2并等待job1完成的情况下将job2交给调度程序?)
为了进行测试,步骤如下所示:
def step1(): print(f'*** --> [{datetime.utcnow()}] JOB [ 1 ] STARTED...', flush=True) time.sleep(20) print(f' <-- [{datetime.utcnow()}] JOB [ 1 ] FINISHED', flush=True) return True def step2(): print(f' --> [{datetime.utcnow()}] POLL JOB [ 2 ] STARTED...', flush=True) time.sleep(10) print(f' <-- [{datetime.utcnow()}] POLL JOB [ 2 ] FINISHED', flush=True) return True def step3(): print(f' --> [{datetime.utcnow()}] JOB [ 3 ] STARTED...', flush=True) time.sleep(10) print(f'*** <-- [{datetime.utcnow()}] JOB [ 3 ] FINISHED', flush=True) return True
我收到的输出是这样的:
worker_1 | 14:44:57 default: project.server.main.tasks.step1(1) (d40256a2-904f-4ce3-98da-6e49b5d370c9) worker_2 | 14:44:57 default: project.server.main.tasks.step2(1) (3736909c-f05d-4160-9a76-01bb1b18db58) worker_2 | --> [2019-11-04 14:44:57.341133] POLL JOB [ 2 ] STARTED... worker_1 | *** --> [2019-11-04 14:44:57.342142] JOB [ 1 ] STARTED... ...
job2不等待job1完成…
#requirements.txt Flask==1.0.2 Flask-Bootstrap==3.3.7.1 Flask-Testing==0.7.1 Flask-WTF==0.14.2 redis==3.3.11 rq==0.13 rq_scheduler==0.9.1
我对此问题的解决方案rq仅使用了(并且不再使用rq_scheduler):
rq
rq_scheduler
升级到最新的python-rq软件包:
# requirements.txt
… rq==1.1.0
为轮询作业创建专用队列,并相应地使作业入队(具有depends_on关系):
with Connection(redis.from_url(current_app.config['REDIS_URL'])): q = Queue('default') p = Queue('pqueue') job1 = q.enqueue(step1) job2 = p.enqueue(step2, depends_on=job1) # step2 enqueued in polling queue job3 = q.enqueue(step3, depends_on=job2)
派遣专职工作人员进行轮询队列。它继承自标准Worker类:
Worker
class PWorker(rq.worker.Worker): def execute_job(self, *args, **kwargs): seconds_between_polls = 65 job = args[0] if 'lastpoll' in job.meta: job_timedelta = (datetime.utcnow() - job.meta["lastpoll"]).total_seconds() if job_timedelta < seconds_between_polls: sleep_period = seconds_between_polls - job_timedelta time.sleep(sleep_period) job.meta['lastpoll'] = datetime.utcnow() job.save_meta() super().execute_job(*args, **kwargs)
PWorker execute_job通过向作业的元数据添加时间戳来扩展该方法'lastpoll'。
execute_job
'lastpoll'
如果有lastpoll时间戳记的轮询作业进入,工作人员将检查此后的时间间隔lastpoll是否大于65秒。如果是,它将当前时间写入 'lastpoll'并执行轮询。如果没有,它将一直hibernate直到65s结束,然后将当前时间写入'lastpoll'并执行轮询。没有lastpoll时间戳的进来的作业是第一次轮询,而工作人员创建时间戳并执行轮询。
lastpoll
# exceptions.py class PACError(Exception): pass class PACJobRun(PACError): pass class PACJobExit(PACError): pass # exception_handlers.py def poll_exc_handler(job, exc_type, exc_value, traceback): if exc_type is PACJobRun: requeue_job(job.get_id(), connection=job.connection) return False # no further exception handling else: return True # further exception handling # tasks.py def step2(): # GET request to remote compute job portal API for status # if response == "RUN": raise PACJobRun return True
当定制异常处理程序捕获到定制异常(这意味着远程计算作业仍在运行)时,它将在轮询队列中重新排队该作业。
# manage.py @cli.command('run_pworker') def run_pworker(): redis_url = app.config['REDIS_URL'] redis_connection = redis.from_url(redis_url) with rq.connections.Connection(redis_connection): pworker = PWorker(app.config['PQUEUE'], exception_handlers=[poll_exc_handler]) pworker.work()
该解决方案的优点在于,它仅用几行额外的代码即可扩展python-rq的标准功能。另一方面,额外的队列和工作程序增加了复杂性……