查看原文
其他

(下篇)一个“制作午餐”的故事,帮助你理解并发和并行

阿旭 Python开发者 2022-09-07

导读

这是一个关于“午餐时间”的小故事,用于阐述 threading、asyncio、multiprocessing、cloud functions 等内容。为了方便阅读并理解文章的内容,全文分上、下两篇,上篇中主要讲解了并发,本文会讲解并行,并最后对并发和并行的4个方案做一个总结。

我们直接进入主题,并行的实践。

实践并行

在 Parallel Salads 有多个工人并行制作沙拉,我们将使用 multiprocessing 来实现它 然后我们将访问 Cloud Coffees,了解如何使用云函数来并行运行任务。

使用 multiprocessing 实现 Parallel Salads

Parallel Salads 完美地证明了这一点。

Parallel Salads 中的每个工作人员都由操作系统生成的新进程表示。这些进程是通过 ProcessPoolExecutor 创建的,它为每个进程分配任务。

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor

# Note: Some methods and variables are skipped to
#       focus only on the multiprocessing details


def run_parallel_salads():
    # Create multiprocessing queues that can
    # communicate across process boundaries
    customers = mp.Queue()
    bowls = mp.Queue()
    dirty_bowls = mp.Queue()

    # Run the tasks in parallel using a process pool executor
    with ProcessPoolExecutor(max_workers=NUM_STAFF) as executor:
        # Set all but one worker making salads
        for _ in range(NUM_STAFF - 1):
            executor.submit(make_salad, customers, bowls, dirty_bowls)

        # Set the other worker washing bowls
        executor.submit(wash_bowls, dirty_bowls, bowls)


def make_salad(customers, bowls):
    while True:
        customer = customers.get()
        order = take_order(customer)
        bowl = bowls.get()
        bowl.add(ingredients)
        bowl.add(dressing)
        bowl.mix()
        salad = fill_container(bowl)
        customer.serve(salad)
        dirty_bowls.put(bowl)


def wash_bowls(dirty_bowls, bowls):
    while True:
        bowl = dirty_bowls.get()
        wash(bowl)
        bowls.put(bowl)

使用 multiprocessing,每个任务都在单独的进程中运行。这些进程由操作系统独立并行运行,不会相互阻塞。实际可以并行运行的进程数量受 CPU 上的内核数量限制,因此我们将限制实际制作沙拉的员工数量。

由于这些任务在不同的进程中,它们不共享任何正常的 Python 状态。每个进程都有整个程序状态的独立副本。我们必须使用特殊的 multiprocessing 队列在它们之间进行通信。

另一种方式 -- 同时使用 asyncio 和 multiprocessing

multiprocessing 的一个用例是在 asyncio 应用程序中卸载 CPU 密集型任务,以阻止它们阻塞应用程序的其余部分。这是如何做到这一点的简短实现:

import asyncio
from concurrent.futures import ProcessPoolExecutor

process_pool = ProcessPoolExecutor()  # Default size is number of cores


async def handle_long_request(n):
    event_loop = asyncio.get_running_loop()
    # calculate_n_pi will be run in a separate process allowing the asyncio event
    # loop to continue to handle other async tasks in parallel
    return await event_loop.run_in_executor(process_pool_executor, calculate_n_pi, n)


def calculate_n_pi(n):
    threading.sleep(60)
    return n * 3.14

Multiprocessing 的优点

  • 加速 CPU 密集型任务
  • 线程安全
  • 可用于在 Web 服务器中使用单独的进程,进行长时间计算

Multiprocessing 的缺点

  • 不共享资源
  • 高开销 -- 不要用于 I/O 密集型任务

使用云函数的 Cloud Coffees

当您和您的朋友步行到公园吃午餐时,您会发现一朵蓬松的五彩云盘旋在一群人的上方。你仔细观察,看到 Cloud Coffees 的标志。

即使你的朋友讨厌咖啡,你们俩还是决定喝一杯来取乐。当你走上去的时候,你每个人都会面对自己的独立摊位,里面有一个咖啡师,慢慢地从云端飘下来。您下订单,咖啡师会为您制作咖啡并为您服务。

一大群人突然来到 Cloud Coffees ,稍等片刻后,更多的摊位飘了下来,他们很快就上桌了。这些额外的咖啡师等了一会儿,等待更多的顾客,完全没有注意到其他摊位,然后飘回云端。

当您退后一步时,您会看到任何时候都有与下订单的客户数量相同的摊位数量。如果更多的顾客到达,更多的摊位会从云中出现,当订单完成后稍等片刻,这些摊位就会消失在云端。

你的朋友要了一份复杂得离谱的订单,试图掩盖咖啡的味道,但他仍然没有喝到。咖啡师正在添加棉花糖和巧克力片,突然咖啡师毫不客气地将整杯咖啡扔进垃圾桶,并对他大喊“超时”。

你们俩都歇斯底里地走出公园。

如果您正在编写 Web 服务,cloud functions 是另一个值得考虑的选项。到目前为止,这是最容易编写的,因为一次只能完成一个订单,您可以完全忘记并发性。

def cloud_coffees(order):
    ground_coffee = grind_beans()
    coffee = brew_coffee(ground_coffee)
    coffee.add_embellishments(order)
    return coffee

每个请求都由整个应用程序的一个单独实例来满足。当创建一个新实例时,它会在启动时产生一点延迟。出于这个原因,一个实例可能会一直等待更多的请求,这些请求几乎没有延迟。一段时间后,如果没有请求,它将被回收。

每个请求都会在几分钟后超时,具体取决于实现。您必须确保您的任务在此超时之前完成,否则它们将在未完成的情况下消失。

实例不能与其他实例通信,并且不应该在请求之间存储任何状态,因为实例可能随时消失。最常见的实现是 AWS Lambda、Azure Functions 和 Google Cloud Functions。

Cloud function 的优点

  • 极其简单的模型
  • 比运行持久的服务器便宜
  • 毫不费力地缩放

Cloud function 的缺点

  • 启动新实例时可能会出现延迟问题
  • 请求有超时限制
  • 对 Python 版本的控制较少 - 您只能使用云提供商提供的版本

您应该使用哪个并发选项?

让我们将已讨论过的所有内容汇总到一张表中:


threadingasynciomultiprocessingcloud functions
并发类型先发制人式的多任务处理合作式地多任务处理多进程多实例
并发 or 并行并发并发并行并行
是否可直接进行并发控制FalseTrueFalseFalse
切换方式由操作系统决定由程序自身决定在cpu 多核中运行,不存在切换请求同时在多台实例中运行,不存在切换
最大并行数11cpu 核数无限制
任务间通信方式状态共享状态共享使用多进程队列通信不存在通信
是否线程安全FalseTrueTrueTrue
适用类型I/O 密集型I/O 密集型CPU 密集型CPU 密集型(如果未超时的话)
任务开销每个任务所在的线程消耗内存并增加任务之间的切换时间所有的任务都尽可能少地在单个进程和单个线程中运行每个任务的系统进程本身比线程消耗更多的内存和切换时间启动新实例可能会带来延迟成本

既然您了解了所有选项,那么选择一个就变得很容易了。

不过,在执行此操作之前,您应该仔细检查您是否确实需要加快您的任务。如果它每周运行一次并且需要 10 分钟,那么加快它是否有任何意义?

如果是,那么只需参考此流程图:

总结

现在,我们已经看到了 Python 中可用的核心并发选项的示例:

  • threading
  • asyncio
  • multiprocessing

以及为并行 Python 提供简化环境的部署选项:

  • cloud functions

我们也了解了它们之间的区别、各自的优缺点以及何时选择每个选项。

好了,这就是用制作午餐的故事讲解并发和并行的全部内容。如果大家觉得本文内容有帮助,请点赞转发支持一下。

参考原文:https://sourcery.ai/blog/concurrency/


- EOF -


加主页君微信,不仅Python技能+1

主页君日常还会在个人微信分享Python相关工具资源精选技术文章,不定期分享一些有意思的活动岗位内推以及如何用技术做业余项目

加个微信,打开一扇窗



推荐阅读  点击标题可跳转

1、For-else:Python中一个奇怪但有用的特性

2、一行 Python 代码实现并行

3、一个“制作午餐”的故事,帮助你理解并发和并行(上)


觉得本文对你有帮助?请分享给更多人

推荐关注「Python开发者」,提升Python技能

点赞和在看就是最大的支持❤️

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存