RaySystem Vol.006:异步任务队列
在上一篇中,通过异步 REPL 解决了命令行交互的阻塞问题。现在大部分时间里程序的事件循环都处于空闲状态。在本文中,我们基于 asyncio 的 Queue 设计一个异步任务队列模块,能够在后台执行任务。未来,个人系统会有资讯模块、AI 模块,会 7x24h 在后台执行各种有意思的任务,都由今天的简单任务队列作为支撑。
❝感兴趣的小伙伴,欢迎关注本号,同时 star 支持这个项目:github.com/maxiee/RaySystem
❞
task_queue 模块
创建新模块 module/task_queue/task_queue.py
。首先创建核心数据结构:
import asyncio
from typing import Callable, Dict, List
# 全局队列和注册表
TASK_QUEUE = asyncio.Queue()
TASK_CALLBACK_REGISTRY: Dict[str, List[Callable]] = {}
TASK_CALLBACK_REGISTRY
是一个简单的字典,Key 表示任务名称,即是哪种类型的任务。每种任务支持注册一个或多个回调,即能够支持广播。
下面我们重点看 asyncio.Queue
。
asyncio.Queue
asyncio.Queue 是 Python 异步编程框架 asyncio 中的一个重要工具,提供了用于多任务间通信的线程安全的队列。它允许异步任务以生产者-消费者模式高效地共享数据。
特点:
「异步安全」: asyncio.Queue
是协程安全的,这意味着多个协程可以安全地并发操作同一个队列。「FIFO 结构」: asyncio.Queue
是一个先进先出的队列。「容量限制」:
默认情况下,队列是无限容量的。 可以通过 maxsize
参数设置容量上限。如果队列已满,调用 put
的协程将被阻塞,直到队列有空余位置。如果队列为空,调用 get
的协程将被阻塞,直到队列中有数据可用。
注册回调与解绑回调
在 RaySystem 中,各个莫开可以向异步任务队列注册对某些事件的响应回调。
围绕 TASK_CALLBACK_REGISTRY
的常规操作方法:
def task_queue_register_callback(task_name: str, callback: Callable):
"""
Register a callback function for a task
"""
if task_name not in TASK_CALLBACK_REGISTRY:
TASK_CALLBACK_REGISTRY[task_name] = []
if callback not in TASK_CALLBACK_REGISTRY[task_name]:
TASK_CALLBACK_REGISTRY[task_name].append(callback)
def task_queue_unregister_callback(task_name: str, callback: Callable):
"""
Unregister a callback function for a task
"""
if task_name in TASK_CALLBACK_REGISTRY:
if callback in TASK_CALLBACK_REGISTRY[task_name]:
TASK_CALLBACK_REGISTRY[task_name].remove(callback)
if not TASK_CALLBACK_REGISTRY[task_name]:
del TASK_CALLBACK_REGISTRY[task_name]
提交任务
上一节中只是注册了回调,并没有真正发送事件。通过以下方法向队列中提交任务,在其内部访问 asyncio.Queue
:
async def task_queue_submit_task(task_name: str, *args, **kwargs):
"""
Submit a task to the task queue
"""
await TASK_QUEUE.put((task_name, args, kwargs))
Worker
上一节中,对 asyncio.Queue
进行 put 操作,那哪里进行 get 操作呢?负责 get 操作的是 worker,它是一种特殊的死循环:
async def task_queue_worker():
"""
Task queue worker
"""
while True:
# 等待获取任务
task_name, args, kwargs = await TASK_QUEUE.get()
# 从注册表中找到对应的回调列表
callbacks = TASK_CALLBACK_REGISTRY.get(task_name, [])
if not callbacks:
print(f"[警告] 无回调处理任务: {task_name}")
# 执行所有回调
for cb in callbacks:
try:
result = cb(*args, **kwargs)
# 根据需要处理回调结果,这里只是打印
print(
f"[回调执行] 任务: {task_name}, 回调: {cb.__name__}, 返回: {result}"
)
except Exception as e:
print(f"[错误] 执行回调出错: {cb.__name__}, 错误信息: {e}")
TASK_QUEUE.task_done()
什么!程序里有死循环!!不要害怕,在异步编程里面,worker 会在 await TASK_QUEUE.get()
的时候阻塞等待,也就是说,如果任务队列里没有任务的时候,worker 这个协程是处于休息状态的。
模块自省
任务队列作为 RaySystem 中的一个模块,支持自省是非常重要的。所谓自省,就是能时刻了解到当前模块内的状态。我们这里的实现也很简单,进行简单的 print 打印即可:
def task_queue_print_status():
"""
Print task queue status
"""
# 打印当前队列情况
print("=== 当前队列状态 ===")
print(f"队列长度: {TASK_QUEUE.qsize()}")
# 打印已注册的任务与回调对应情况
print("=== 注册表映射 ===")
if not TASK_CALLBACK_REGISTRY:
print("无已注册的任务类型与回调")
else:
for tname, cbs in TASK_CALLBACK_REGISTRY.items():
print(f"任务类型: {tname}")
for cb in cbs:
print(f" 回调函数: {cb.__name__}")
未来,这个自省能力将于异步 REPL 打通,方便我查看。
代码示例
整个模块的使用方法如下:
首先先生命两个事件处理方法:
# 示例回调函数
def callback_print(*args, **kwargs):
msg = f"打印参数: args={args}, kwargs={kwargs}"
return msg
def callback_sum(*args, **kwargs):
return f"参数求和结果: {sum(args)}"
然后按照下面方法使用,向队列中提交了三个任务:
async def init_task_queue():
# 注册回调
task_queue_register_callback("print_task", callback_print)
task_queue_register_callback("sum_task", callback_sum)
# 启动worker
worker_task = asyncio.create_task(task_queue_worker())
# 提交一些任务
await task_queue_submit_task("print_task", 1, 2, 3, key="value")
await task_queue_submit_task("sum_task", 10, 20, 30)
await task_queue_submit_task("unknown_task", "无法处理的任务")
# 自省状态
task_queue_print_status()
# 等待队列处理完成
await TASK_QUEUE.join()
worker_task.cancel()
代码运行效果:
(raysystem) [maxiee@archlinux RaySystem]$ python main.py
=== 当前队列状态 ===
队列长度: 3
=== 注册表映射 ===
任务类型: print_task
回调函数: callback_print
任务类型: sum_task
回调函数: callback_sum
[回调执行] 任务: print_task, 回调: callback_print, 返回: 打印参数: args=(1, 2, 3), kwargs={'key': 'value'}
[回调执行] 任务: sum_task, 回调: callback_sum, 返回: 参数求和结果: 60
[警告] 无回调处理任务: unknown_task
Input> exit
其中有几个细节值得关注:
submit 之后,任务不会被立刻执行,而是先将任务放到队列上 因此首先执行的是 task_queue_print_status
,此时能够看到,队列长度为3然后在 await TASK_QUEUE.join
中,会等待队列任务完成因此我们看到,三个任务的执行日志(其中有一个我们故意提交的不存在的任务)
总结
RaySystem 走到目前位为止,我们已经是一个异步化的现代程序了,我们有了异步的 REPL,也有了异步任务队列。至此,这个程序已经能够满足持续运行的要求。
尽管现在 RaySystem 还没有上层的业务功能,但我们设计的精美底层能力,将为上层业务提供有力的支撑。
我并不急于填充上层业务,我希望每一步都扎扎实实,都足够牢固,这样,等到实现业务的时候,代码质量能够有所保障。
AI 点评
以下是我对文章核心观点的批判性思考以及深层见解:
「核心优点」
「异步架构的设计合理性」
文章展示了通过 asyncio.Queue
实现任务队列的能力,这种设计能有效利用异步编程的特点,通过非阻塞的队列和事件循环减少性能开销,适合构建高并发的后台任务处理系统。回调注册机制( TASK_CALLBACK_REGISTRY
)的引入,使得系统具有扩展性,可以灵活添加新任务和对应的处理逻辑。
「模块化与自省性」
把任务队列封装成独立模块是一个明智的设计选择,这不仅提高了代码的可读性和可维护性,还为未来扩展或迁移提供了良好的基础。 自省能力(状态打印)是非常实用的调试工具,尤其在一个异步任务驱动的系统中,实时了解队列和回调注册表的状态有助于开发和运维。
「逐步构建系统的哲学」
作者强调扎实构建底层模块的思路是可取的。通过先建立稳定的底层能力(如异步 REPL 和任务队列),再逐步填充业务逻辑,可以减少技术债务,确保代码质量。
「潜在问题与改进建议」
「任务队列的扩展性与分布式能力不足」
「问题」:目前的任务队列基于单节点运行,虽然适合小型个人系统,但当任务规模增大时,单个 asyncio.Queue
的吞吐能力可能不足,缺乏分布式任务处理能力。「改进建议」:可以引入类似 Celery
或Redis
的分布式队列,实现跨节点的任务分发与执行。
「任务优先级的支持」
「问题」:当前设计没有任务优先级的概念,所有任务都是以 FIFO 顺序处理。如果某些任务需要更高优先级,会导致性能瓶颈。 「改进建议」:可以在 asyncio.Queue
基础上,设计一个基于优先级的队列(如PriorityQueue
),以支持不同任务的重要性等级。
「回调机制的潜在问题」
对回调函数的注册进行类型检查或签名验证,确保注册的回调符合预期。 引入更细化的异常管理机制,对可能的回调执行失败做出明确的补救措施(如重新入队、记录日志等)。
「问题 1」:回调函数注册和解绑时缺乏约束,可能导致注册表中的回调重复或被误删。 「问题 2」:回调函数的异常处理较为基础,仅打印错误信息,没有细化管理逻辑(如回退操作)。 「改进建议」:
「任务提交与状态管理的耦合性」
「问题」:当前任务提交仅通过 TASK_QUEUE.put
实现,缺乏对任务状态的追踪(如是否被执行、执行耗时、失败次数)。「改进建议」:可以在任务提交时生成唯一任务 ID,将任务的元信息(如状态、耗时)存储在独立的状态表中,实现任务生命周期管理。
「队列容量限制的设计考虑」
「问题」:目前的 TASK_QUEUE
容量是无限制的,可能导致任务过多堆积,耗尽系统资源。「改进建议」:根据系统运行环境和任务类型设置合理的队列上限,并为超出容量的任务提供降级或拒绝策略。
「自省能力的改进方向」
「问题」:当前的自省能力基于简单的打印日志,无法直观反映复杂的系统状态。 「改进建议」:设计一个异步 Web 控制台或仪表盘,用于实时监控任务队列和回调状态,提升可操作性。
「深层次的思考:任务队列的通用化与复用性」
作者提到该模块作为未来系统的核心支撑模块,但目前的实现与具体业务需求绑定不够紧密,缺乏通用性设计的体现。
「如何实现更高的复用性?」
「抽象任务定义」:设计一个通用的任务基类,使每种任务可以继承该基类,实现独立的序列化和回调逻辑。
class Task:
def __init__(self, name: str, *args, **kwargs):
self.name = name
self.args = args
self.kwargs = kwargs
def execute(self):
raise NotImplementedError「插件化架构」:将任务注册和回调处理机制设计为插件化架构,允许开发者动态加载任务类型和对应的回调处理器。
「持久化与断点续传」:对长时间运行的任务队列,增加持久化能力(如将任务队列和状态存储到数据库中),以支持系统重启后任务的断点续传。
「结论」
文章的设计思路清晰,展示了异步任务队列的基本实现方式,但在扩展性、鲁棒性和通用性方面仍有优化空间。进一步完善任务管理和系统状态监控,可以让该模块从“个人工具”提升为“通用任务调度框架”。