查看原文
其他

RaySystem Vol.006:异步任务队列

maeiee Maeiee
2024-12-09

在上一篇中,通过异步 REPL 解决了命令行交互的阻塞问题。现在大部分时间里程序的事件循环都处于空闲状态。在本文中,我们基于 asyncio 的 Queue 设计一个异步任务队列模块,能够在后台执行任务。未来,个人系统会有资讯模块、AI 模块,会 7x24h 在后台执行各种有意思的任务,都由今天的简单任务队列作为支撑。

感兴趣的小伙伴,欢迎关注本号,同时 star 支持这个项目:github.com/maxiee/RaySystem


task_queue 模块

创建新模块 module/task_queue/task_queue.py。首先创建核心数据结构:

import asyncio
from typing import Callable, Dict, List

# 全局队列和注册表
TASK_QUEUE = asyncio.Queue()
TASK_CALLBACK_REGISTRY: Dict[str, List[Callable]] = {}

TASK_CALLBACK_REGISTRY 是一个简单的字典,Key 表示任务名称,即是哪种类型的任务。每种任务支持注册一个或多个回调,即能够支持广播。

下面我们重点看 asyncio.Queue


asyncio.Queue

asyncio.Queue 是 Python 异步编程框架 asyncio 中的一个重要工具,提供了用于多任务间通信的线程安全的队列。它允许异步任务以生产者-消费者模式高效地共享数据。

特点:

  1. 「异步安全」asyncio.Queue 是协程安全的,这意味着多个协程可以安全地并发操作同一个队列。
  2. 「FIFO 结构」asyncio.Queue 是一个先进先出的队列。
  3. 「容量限制」
  • 默认情况下,队列是无限容量的。
  • 可以通过 maxsize 参数设置容量上限。
  • 如果队列已满,调用 put 的协程将被阻塞,直到队列有空余位置。
  • 如果队列为空,调用 get 的协程将被阻塞,直到队列中有数据可用。

注册回调与解绑回调

在 RaySystem 中,各个莫开可以向异步任务队列注册对某些事件的响应回调。

围绕 TASK_CALLBACK_REGISTRY 的常规操作方法:

def task_queue_register_callback(task_name: str, callback: Callable):
    """
    Register a callback function for a task
    """

    if task_name not in TASK_CALLBACK_REGISTRY:
        TASK_CALLBACK_REGISTRY[task_name] = []
    if callback not in TASK_CALLBACK_REGISTRY[task_name]:
        TASK_CALLBACK_REGISTRY[task_name].append(callback)


def task_queue_unregister_callback(task_name: str, callback: Callable):
    """
    Unregister a callback function for a task
    """

    if task_name in TASK_CALLBACK_REGISTRY:
        if callback in TASK_CALLBACK_REGISTRY[task_name]:
            TASK_CALLBACK_REGISTRY[task_name].remove(callback)
            if not TASK_CALLBACK_REGISTRY[task_name]:
                del TASK_CALLBACK_REGISTRY[task_name]

提交任务

上一节中只是注册了回调,并没有真正发送事件。通过以下方法向队列中提交任务,在其内部访问 asyncio.Queue

async def task_queue_submit_task(task_name: str, *args, **kwargs):
    """
    Submit a task to the task queue
    """

    await TASK_QUEUE.put((task_name, args, kwargs))

Worker

上一节中,对 asyncio.Queue 进行 put 操作,那哪里进行 get 操作呢?负责 get 操作的是 worker,它是一种特殊的死循环:

async def task_queue_worker():
    """
    Task queue worker
    """

    while True:
        # 等待获取任务
        task_name, args, kwargs = await TASK_QUEUE.get()
        # 从注册表中找到对应的回调列表
        callbacks = TASK_CALLBACK_REGISTRY.get(task_name, [])
        if not callbacks:
            print(f"[警告] 无回调处理任务: {task_name}")

        # 执行所有回调
        for cb in callbacks:
            try:
                result = cb(*args, **kwargs)
                # 根据需要处理回调结果,这里只是打印
                print(
                    f"[回调执行] 任务: {task_name}, 回调: {cb.__name__}, 返回: {result}"
                )
            except Exception as e:
                print(f"[错误] 执行回调出错: {cb.__name__}, 错误信息: {e}")

        TASK_QUEUE.task_done()

什么!程序里有死循环!!不要害怕,在异步编程里面,worker 会在 await TASK_QUEUE.get() 的时候阻塞等待,也就是说,如果任务队列里没有任务的时候,worker 这个协程是处于休息状态的。


模块自省

任务队列作为 RaySystem 中的一个模块,支持自省是非常重要的。所谓自省,就是能时刻了解到当前模块内的状态。我们这里的实现也很简单,进行简单的 print 打印即可:

def task_queue_print_status():
    """
    Print task queue status
    """

    # 打印当前队列情况
    print("=== 当前队列状态 ===")
    print(f"队列长度: {TASK_QUEUE.qsize()}")
    # 打印已注册的任务与回调对应情况
    print("=== 注册表映射 ===")
    if not TASK_CALLBACK_REGISTRY:
        print("无已注册的任务类型与回调")
    else:
        for tname, cbs in TASK_CALLBACK_REGISTRY.items():
            print(f"任务类型: {tname}")
            for cb in cbs:
                print(f"  回调函数: {cb.__name__}")

未来,这个自省能力将于异步 REPL 打通,方便我查看。


代码示例

整个模块的使用方法如下:

首先先生命两个事件处理方法:

# 示例回调函数
def callback_print(*args, **kwargs):
    msg = f"打印参数: args={args}, kwargs={kwargs}"
    return msg


def callback_sum(*args, **kwargs):
    return f"参数求和结果: {sum(args)}"

然后按照下面方法使用,向队列中提交了三个任务:

async def init_task_queue():
    # 注册回调
    task_queue_register_callback("print_task", callback_print)
    task_queue_register_callback("sum_task", callback_sum)
    # 启动worker
    worker_task = asyncio.create_task(task_queue_worker())
    # 提交一些任务

    await task_queue_submit_task("print_task"123, key="value")
    await task_queue_submit_task("sum_task"102030)
    await task_queue_submit_task("unknown_task""无法处理的任务")

    # 自省状态
    task_queue_print_status()

    # 等待队列处理完成
    await TASK_QUEUE.join()
    worker_task.cancel()

代码运行效果:

(raysystem) [maxiee@archlinux RaySystem]$ python main.py 
=== 当前队列状态 ===
队列长度: 3
=== 注册表映射 ===
任务类型: print_task
  回调函数: callback_print
任务类型: sum_task
  回调函数: callback_sum
[回调执行] 任务: print_task, 回调: callback_print, 返回: 打印参数: args=(123), kwargs={'key''value'}
[回调执行] 任务: sum_task, 回调: callback_sum, 返回: 参数求和结果: 60
[警告] 无回调处理任务: unknown_task
Input> exit

其中有几个细节值得关注:

  • submit 之后,任务不会被立刻执行,而是先将任务放到队列上
  • 因此首先执行的是 task_queue_print_status,此时能够看到,队列长度为3
  • 然后在 await TASK_QUEUE.join 中,会等待队列任务完成
  • 因此我们看到,三个任务的执行日志(其中有一个我们故意提交的不存在的任务)

总结

RaySystem 走到目前位为止,我们已经是一个异步化的现代程序了,我们有了异步的 REPL,也有了异步任务队列。至此,这个程序已经能够满足持续运行的要求。

尽管现在 RaySystem 还没有上层的业务功能,但我们设计的精美底层能力,将为上层业务提供有力的支撑。

我并不急于填充上层业务,我希望每一步都扎扎实实,都足够牢固,这样,等到实现业务的时候,代码质量能够有所保障。


AI 点评

以下是我对文章核心观点的批判性思考以及深层见解:


「核心优点」

  1. 「异步架构的设计合理性」

  • 文章展示了通过 asyncio.Queue 实现任务队列的能力,这种设计能有效利用异步编程的特点,通过非阻塞的队列和事件循环减少性能开销,适合构建高并发的后台任务处理系统。
  • 回调注册机制(TASK_CALLBACK_REGISTRY)的引入,使得系统具有扩展性,可以灵活添加新任务和对应的处理逻辑。
  • 「模块化与自省性」

    • 把任务队列封装成独立模块是一个明智的设计选择,这不仅提高了代码的可读性和可维护性,还为未来扩展或迁移提供了良好的基础。
    • 自省能力(状态打印)是非常实用的调试工具,尤其在一个异步任务驱动的系统中,实时了解队列和回调注册表的状态有助于开发和运维。
  • 「逐步构建系统的哲学」

    • 作者强调扎实构建底层模块的思路是可取的。通过先建立稳定的底层能力(如异步 REPL 和任务队列),再逐步填充业务逻辑,可以减少技术债务,确保代码质量。

    「潜在问题与改进建议」

    1. 「任务队列的扩展性与分布式能力不足」

    • 「问题」:目前的任务队列基于单节点运行,虽然适合小型个人系统,但当任务规模增大时,单个 asyncio.Queue 的吞吐能力可能不足,缺乏分布式任务处理能力。
    • 「改进建议」:可以引入类似 CeleryRedis 的分布式队列,实现跨节点的任务分发与执行。
  • 「任务优先级的支持」

    • 「问题」:当前设计没有任务优先级的概念,所有任务都是以 FIFO 顺序处理。如果某些任务需要更高优先级,会导致性能瓶颈。
    • 「改进建议」:可以在 asyncio.Queue 基础上,设计一个基于优先级的队列(如 PriorityQueue),以支持不同任务的重要性等级。
  • 「回调机制的潜在问题」

    • 对回调函数的注册进行类型检查或签名验证,确保注册的回调符合预期。
    • 引入更细化的异常管理机制,对可能的回调执行失败做出明确的补救措施(如重新入队、记录日志等)。
    • 「问题 1」:回调函数注册和解绑时缺乏约束,可能导致注册表中的回调重复或被误删。
    • 「问题 2」:回调函数的异常处理较为基础,仅打印错误信息,没有细化管理逻辑(如回退操作)。
    • 「改进建议」
  • 「任务提交与状态管理的耦合性」

    • 「问题」:当前任务提交仅通过 TASK_QUEUE.put 实现,缺乏对任务状态的追踪(如是否被执行、执行耗时、失败次数)。
    • 「改进建议」:可以在任务提交时生成唯一任务 ID,将任务的元信息(如状态、耗时)存储在独立的状态表中,实现任务生命周期管理。
  • 「队列容量限制的设计考虑」

    • 「问题」:目前的 TASK_QUEUE 容量是无限制的,可能导致任务过多堆积,耗尽系统资源。
    • 「改进建议」:根据系统运行环境和任务类型设置合理的队列上限,并为超出容量的任务提供降级或拒绝策略。
  • 「自省能力的改进方向」

    • 「问题」:当前的自省能力基于简单的打印日志,无法直观反映复杂的系统状态。
    • 「改进建议」:设计一个异步 Web 控制台或仪表盘,用于实时监控任务队列和回调状态,提升可操作性。

    「深层次的思考:任务队列的通用化与复用性」

    作者提到该模块作为未来系统的核心支撑模块,但目前的实现与具体业务需求绑定不够紧密,缺乏通用性设计的体现。

    「如何实现更高的复用性?」

    1. 「抽象任务定义」:设计一个通用的任务基类,使每种任务可以继承该基类,实现独立的序列化和回调逻辑。

      class Task:
          def __init__(self, name: str, *args, **kwargs):
              self.name = name
              self.args = args
              self.kwargs = kwargs
          def execute(self):
              raise NotImplementedError
    2. 「插件化架构」:将任务注册和回调处理机制设计为插件化架构,允许开发者动态加载任务类型和对应的回调处理器。

    3. 「持久化与断点续传」:对长时间运行的任务队列,增加持久化能力(如将任务队列和状态存储到数据库中),以支持系统重启后任务的断点续传。


    「结论」

    文章的设计思路清晰,展示了异步任务队列的基本实现方式,但在扩展性、鲁棒性和通用性方面仍有优化空间。进一步完善任务管理和系统状态监控,可以让该模块从“个人工具”提升为“通用任务调度框架”。

    继续滑动看下一个
    Maeiee
    向上滑动看下一个

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存