本文探讨了在celery中处理动态创建子任务并确保其完成同步等待的挑战。针对celery链(chain)和弦(chord)无法在运行时动态添加依赖的局限性,文章提出并详细阐述了一种手动实现策略。该策略通过在父任务中收集动态子任务id,并使用循环轮询这些子任务的状态,直至所有子任务成功完成,从而实现精确的流程控制和数据完整性保障。
在构建复杂的分布式任务流时,我们经常遇到需要顺序执行一系列主任务,但在某个主任务内部,为了提高效率,又希望并行处理一些子任务的场景。例如,一个主任务可能需要通过API分批获取数据页,每获取一页数据后,就立即触发一个子任务来处理和写入数据库。由于数据库写入操作耗时较长且数量庞大,将其异步化为子任务可以显著减少主任务的整体墙钟时间。然而,关键挑战在于,下一个主任务必须等待所有这些动态创建的数据库写入子任务完成后才能继续执行,以确保数据完整性。
Celery提供了强大的任务编排工具,如chain、chord和group,用于定义任务之间的依赖关系和执行顺序。然而,这些工具的核心设计理念是基于预先定义的任务签名(signatures)。这意味着,在创建chain或chord时,所有参与的任务及其依赖关系都必须是已知的。
对于我们上述的场景,子任务是在主任务执行过程中动态生成的,其数量和具体签名无法在主任务启动前确定。在这种情况下,传统的Celery编排工具便显得力不从心:

简而言之,Celery的编排机制无法在任务被调度到Worker后,动态地修改其依赖关系或为其添加新的、运行时产生的子任务。任何阻塞等待逻辑都必须由任务本身显式地实现。
鉴于Celery原生编排工具的局限性,解决动态子任务同步等待问题的有效方法是手动实现一个轮询(polling)机制。这种策略的核心思想是:父任务在创建所有动态子任务后,收集这些子任务的ID,然后进入一个循环,周期性地检查每个子任务的状态,直到所有子任务都成功完成。
以下是实现这一策略的详细步骤和示例代码:
在父任务中,当需要创建子任务时,使用apply_async()方法调度它们,并务必将返回的AsyncResult对象的id属性收集到一个列表中。这个列表将用于后续的轮询。
import time
from typing import List
from celery import Celery, Task, AsyncResult
from celery.signals import task_postrun
# 假设的Celery应用实例
app = Celery('my_app', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
# 模拟的JobMaster和常量,用于日志记录
class JobMaster:
def get_job(self, job_id, job_title):
print(f"[JobMaster] Getting job {job_id} - {job_title}")
return self, job_id
def log_message(self, log_message, status=None, job_score=None):
print(f"[JobMaster] Log: {log_message} (Status: {status}, Score: {job_score})")
class Consts:
IN_PROGRESS = "IN_PROGRESS"
COMPLETED = "COMPLETED"
ERRORS_FOUND = "ERRORS_FOUND"
# 模拟的子任务
@app.task
def task_dummy_subtask(parent_task_name: str, job_id: int = None):
job, _ = JobMaster().get_job(job_id, job_title="dummy subtask")
job.log_message(log_message=f"Subtask {parent_task_name} started.")
time.sleep(2) # 模拟耗时操作
job.log_message(log_message=f"Subtask {parent_task_name} finished successfully.")
return f"Result from {parent_task_name}"
# 模拟的中间函数,用于创建子任务
def intermediary_dummy_subtask_function(parent_task_name: str, job_id: int = None) -> AsyncResult:
job, _ = JobMaster().get_job(job_id, job_title="dummy task")
job.log_message(log_message=f"Intermediary function for {parent_task_name} has been reached, will now make a task")
r = task_dummy_subtask.apply_async(kwargs={"parent_task_name": parent_task_name, "job_id": job_id},
add_to_parent=True)
return r
@app.task(bind=True)
def task_dummy_task1(self: Task, part_number: int, job_id: int = None):
job, job_id = JobMaster().get_job(job_id, job_title="dummy task")
sleeping_duration = 1
subtask_ids = []
job.log_message(log_message=f"Entered dummy task 1 with sleeping duration of {sleeping_duration}")
# 直接创建子任务
job.log_message(log_message="In dummy task1, creating subtask a")
subtask = task_dummy_subtask.apply_async(kwargs={"parent_task_name": "task1_a", "job_id": job_id},
add_to_parent=True)
subtask_ids.append(subtask.id)
job.log_message(log_message="In dummy task1, creating subtask b")
subtask = task_dummy_subtask.apply_async(kwargs={"parent_task_name": "task1_b", "job_id": job_id},
add_to_parent=True)
subtask_ids.append(subtask.id)
# 通过中间函数创建子任务
job.log_message(log_message="In dummy task1, creating intermediary subtask c")
subtask = intermediary_dummy_subtask_function(parent_task_name="task1_c", job_id=job_id)
subtask_ids.append(subtask.id)
time.sleep(sleeping_duration) # 模拟主任务的其他操作
# 等待所有子任务完成
wait_for_tasks_to_complete(async_ids=subtask_ids, job_id=job_id,
msg="Waiting in dummy task1 for subtasks to complete")
job.log_message(log_message="Finished dummy task1 main body")
return part_number创建一个辅助函数,如wait_for_tasks_to_complete,它接收子任务ID列表、日志ID和可选的超时时间。该函数将循环检查每个子任务的状态,直到所有子任务都完成或达到超时。
def wait_for_tasks_to_complete(async_ids: List[str], job_id: int = None, msg: str = None, timeout: int = 300):
job, _ = JobMaster().get_job(job_id, job_title="waiting for refresh data")
# 复制一份ID列表,因为在循环中会移除已完成的任务
remaining_async_ids = list(async_ids)
job.log_message(log_message=f"Waiting for {len(remaining_async_ids)} tasks to complete, {msg}", status=Consts.IN_PROGRESS, job_score=0)
job.log_message(log_message=f"tasks: {remaining_async_ids}", status=Consts.IN_PROGRESS, job_score=0)
count_down = timeout
while count_down > 0:
# 遍历剩余任务,检查其状态
tasks_to_check = list(remaining_async_ids) # 避免在迭代时修改列表
all_succeeded_in_this_check = True
for async_id in tasks_to_check:
result = app.AsyncResult(async_id) # 获取任务结果对象
status = result.status
if status == "SUCCESS":
returned_value = result.result
job.log_message(log_message=f"Task {async_id} confirmed status SUCCESS with {returned_value=}")
remaining_async_ids.remove(async_id) # 从待检查列表中移除
elif status in ["FAILURE", "REVOKED", "RETRY"]: # 考虑失败或撤销状态
job.log_message(log_message=f"Task {async_id} failed or revoked with status {status}. Aborting wait.", status=Consts.ERRORS_FOUND)
# 根据业务需求,可以选择在此处抛出异常或返回失败
return False
else:
all_succeeded_in_this_check = False # 仍有任务未完成或未成功
# 如果所有任务都已完成
if not remaining_async_ids:
job.log_message(log_message="Finished waiting for refresh data, all tasks succeeded",
status=Consts.COMPLETED, job_score=100)
return True # 所有任务成功完成
count_down -= 1
job.log_message(log_message=f"There are {len(remaining_async_ids)} tasks remaining. Waiting...", status=Consts.IN_PROGRESS)
time.sleep(1) # 避免忙等,每秒检查一次
# 超时退出
job.log_message(log_message=f"After waiting for {timeout=} seconds, some tasks did not complete on time. Remaining tasks: {remaining_async_ids}",
status=Consts.ERRORS_FOUND, job_score=100)
return False # 超时,未所有任务完成在父任务中,创建完所有动态子任务并进行其他必要操作后,调用上述wait_for_tasks_to_complete函数。父任务会在此处阻塞,直到所有子任务完成或超时。
# task_dummy_task1 的最后部分
# ...
time.sleep(sleeping_duration)
# 等待所有子任务完成
if wait_for_tasks_to_complete(async_ids=subtask_ids, job_id=job_id,
msg="Waiting in dummy task1 for subtasks to complete"):
job.log_message(log_message="Finished dummy task1 main body after all subtasks completed.")
else:
job.log_message(log_message="Dummy task1 finished with issues: subtasks did not complete on time or failed.", status=Consts.ERRORS_FOUND)
return part_number尽管Celery的内置编排工具在处理预定义任务流时表现出色,但面对运行时动态创建的子任务,它们存在固有的局限性。通过手动收集动态子任务ID并实现一个轮询等待循环,我们可以有效地解决这一挑战,确保父任务在所有相关子任务完成后才继续执行。这种手动策略虽然增加了代码的复杂性,但为需要精确流程控制和数据完整性的复杂异步任务场景提供了必要的灵活性和可靠性。在实施时,务必关注错误处理、超时管理和合理的轮询频率,以构建健壮的分布式系统。
# redis
# app
# 工具
# 后端
# ai
# 异步任务
# red
# elif
# rabbitmq
# 分布式
# 封装
# 循环
# 对象
# 异步
# 数据库
# 这一
# 会在
# 将其
# 可以选择
# 至关重要
# 移除
# 重试
# 过程中
# 是在
相关文章:
如何在Golang中实现微服务服务拆分_Golang微服务拆分与接口管理方法
怎么将XML数据可视化 D3.js加载XML
免费制作统计图的网站有哪些,如何看待现如今年轻人买房难的情况?
如何在IIS7中新建站点?详细步骤解析
怎么制作一个起泡网,水泡粪全漏粪育肥舍冬季氨气超过25ppm,可以有哪些措施降低舍内氨气水平?
建站之星导航如何优化提升用户体验?
如何用wdcp快速搭建高效网站?
建站之星CMS五站合一模板配置与SEO优化指南
如何使用Golang table-driven基准测试_多组数据测量函数效率
如何在云服务器上快速搭建个人网站?
建站之星安全性能如何?防护体系能否抵御黑客入侵?
如何选择适配移动端的WAP自助建站平台?
如何选择高效可靠的多用户建站源码资源?
如何在景安服务器上快速搭建个人网站?
高性价比服务器租赁——企业级配置与24小时运维服务
电商网站制作多少钱一个,电子商务公司的网站制作费用计入什么科目?
如何在橙子建站中快速调整背景颜色?
如何用免费手机建站系统零基础打造专业网站?
如何通过虚拟主机快速完成网站搭建?
网站制作员失业,怎样查看自己网站的注册者?
实现虚拟支付需哪些建站技术支撑?
建站之星后台密码如何安全设置与找回?
韩国网站服务器搭建指南:VPS选购、域名解析与DNS配置推荐
如何制作算命网站,怎么注册算命网站?
高端建站如何打造兼具美学与转化的品牌官网?
盐城做公司网站,江苏电子版退休证办理流程?
建站ABC备案流程中有哪些关键注意事项?
如何通过PHP快速构建高效问答网站功能?
如何快速查询网址的建站时间与历史轨迹?
音响网站制作视频教程,隆霸音响官方网站?
网站制作说明怎么写,简述网页设计的流程并说明原因?
制作网站公司那家好,网络公司是做什么的?
如何在Golang中引入测试模块_Golang测试包导入与使用实践
家具网站制作软件,家具厂怎么跑业务?
香港网站服务器数量如何影响SEO优化效果?
北京网站制作网页,网站升级改版需要多久?
建站主机与虚拟主机有何区别?如何选择最优方案?
c# Task.ConfigureAwait(true) 在什么场景下是必须的
如何在建站之星绑定自定义域名?
头像制作网站在线观看,除了站酷,还有哪些比较好的设计网站?
c# 在ASP.NET Core中管理和取消后台任务
如何挑选优质建站一级代理提升网站排名?
详解免费开源的DotNet二维码操作组件ThoughtWorks.QRCode(.NET组件介绍之四)
建站之星后台管理如何实现高效配置?
建站之星如何快速生成多端适配网站?
中山网站制作网页,中山新生登记系统登记流程?
建站主机类型有哪些?如何正确选型
建站之星如何配置系统实现高效建站?
TestNG的testng.xml配置文件怎么写
小米网站链接制作教程,请问miui新增网页链接调用服务有什么用啊?
*请认真填写需求信息,我们会在24小时内与您取得联系。