(下篇)一个“制作午餐”的故事,帮助你理解并发和并行
导读
这是一个关于“午餐时间”的小故事,用于阐述 threading、asyncio、multiprocessing、cloud functions 等内容。为了方便阅读并理解文章的内容,全文分上、下两篇,上篇中主要讲解了并发,本文会讲解并行,并最后对并发和并行的4个方案做一个总结。
我们直接进入主题,并行的实践。
实践并行
在 Parallel Salads 有多个工人并行制作沙拉,我们将使用 multiprocessing 来实现它 然后我们将访问 Cloud Coffees,了解如何使用云函数来并行运行任务。
使用 multiprocessing 实现 Parallel Salads
Parallel Salads 完美地证明了这一点。
Parallel Salads 中的每个工作人员都由操作系统生成的新进程表示。这些进程是通过 ProcessPoolExecutor
创建的,它为每个进程分配任务。
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
# Note: Some methods and variables are skipped to
# focus only on the multiprocessing details
def run_parallel_salads():
# Create multiprocessing queues that can
# communicate across process boundaries
customers = mp.Queue()
bowls = mp.Queue()
dirty_bowls = mp.Queue()
# Run the tasks in parallel using a process pool executor
with ProcessPoolExecutor(max_workers=NUM_STAFF) as executor:
# Set all but one worker making salads
for _ in range(NUM_STAFF - 1):
executor.submit(make_salad, customers, bowls, dirty_bowls)
# Set the other worker washing bowls
executor.submit(wash_bowls, dirty_bowls, bowls)
def make_salad(customers, bowls):
while True:
customer = customers.get()
order = take_order(customer)
bowl = bowls.get()
bowl.add(ingredients)
bowl.add(dressing)
bowl.mix()
salad = fill_container(bowl)
customer.serve(salad)
dirty_bowls.put(bowl)
def wash_bowls(dirty_bowls, bowls):
while True:
bowl = dirty_bowls.get()
wash(bowl)
bowls.put(bowl)
使用 multiprocessing,每个任务都在单独的进程中运行。这些进程由操作系统独立并行运行,不会相互阻塞。实际可以并行运行的进程数量受 CPU 上的内核数量限制,因此我们将限制实际制作沙拉的员工数量。
由于这些任务在不同的进程中,它们不共享任何正常的 Python 状态。每个进程都有整个程序状态的独立副本。我们必须使用特殊的 multiprocessing
队列在它们之间进行通信。
另一种方式 -- 同时使用 asyncio 和 multiprocessing
multiprocessing 的一个用例是在 asyncio 应用程序中卸载 CPU 密集型任务,以阻止它们阻塞应用程序的其余部分。这是如何做到这一点的简短实现:
import asyncio
from concurrent.futures import ProcessPoolExecutor
process_pool = ProcessPoolExecutor() # Default size is number of cores
async def handle_long_request(n):
event_loop = asyncio.get_running_loop()
# calculate_n_pi will be run in a separate process allowing the asyncio event
# loop to continue to handle other async tasks in parallel
return await event_loop.run_in_executor(process_pool_executor, calculate_n_pi, n)
def calculate_n_pi(n):
threading.sleep(60)
return n * 3.14
Multiprocessing 的优点
加速 CPU 密集型任务 线程安全 可用于在 Web 服务器中使用单独的进程,进行长时间计算
Multiprocessing 的缺点
不共享资源 高开销 -- 不要用于 I/O 密集型任务
使用云函数的 Cloud Coffees
当您和您的朋友步行到公园吃午餐时,您会发现一朵蓬松的五彩云盘旋在一群人的上方。你仔细观察,看到 Cloud Coffees 的标志。
即使你的朋友讨厌咖啡,你们俩还是决定喝一杯来取乐。当你走上去的时候,你每个人都会面对自己的独立摊位,里面有一个咖啡师,慢慢地从云端飘下来。您下订单,咖啡师会为您制作咖啡并为您服务。
一大群人突然来到 Cloud Coffees ,稍等片刻后,更多的摊位飘了下来,他们很快就上桌了。这些额外的咖啡师等了一会儿,等待更多的顾客,完全没有注意到其他摊位,然后飘回云端。
当您退后一步时,您会看到任何时候都有与下订单的客户数量相同的摊位数量。如果更多的顾客到达,更多的摊位会从云中出现,当订单完成后稍等片刻,这些摊位就会消失在云端。
你的朋友要了一份复杂得离谱的订单,试图掩盖咖啡的味道,但他仍然没有喝到。咖啡师正在添加棉花糖和巧克力片,突然咖啡师毫不客气地将整杯咖啡扔进垃圾桶,并对他大喊“超时”。
你们俩都歇斯底里地走出公园。
如果您正在编写 Web 服务,cloud functions 是另一个值得考虑的选项。到目前为止,这是最容易编写的,因为一次只能完成一个订单,您可以完全忘记并发性。
def cloud_coffees(order):
ground_coffee = grind_beans()
coffee = brew_coffee(ground_coffee)
coffee.add_embellishments(order)
return coffee
每个请求都由整个应用程序的一个单独实例来满足。当创建一个新实例时,它会在启动时产生一点延迟。出于这个原因,一个实例可能会一直等待更多的请求,这些请求几乎没有延迟。一段时间后,如果没有请求,它将被回收。
每个请求都会在几分钟后超时,具体取决于实现。您必须确保您的任务在此超时之前完成,否则它们将在未完成的情况下消失。
实例不能与其他实例通信,并且不应该在请求之间存储任何状态,因为实例可能随时消失。最常见的实现是 AWS Lambda、Azure Functions 和 Google Cloud Functions。
Cloud function 的优点
极其简单的模型 比运行持久的服务器便宜 毫不费力地缩放
Cloud function 的缺点
启动新实例时可能会出现延迟问题 请求有超时限制 对 Python 版本的控制较少 - 您只能使用云提供商提供的版本
您应该使用哪个并发选项?
让我们将已讨论过的所有内容汇总到一张表中:
threading | asyncio | multiprocessing | cloud functions | |
---|---|---|---|---|
并发类型 | 先发制人式的多任务处理 | 合作式地多任务处理 | 多进程 | 多实例 |
并发 or 并行 | 并发 | 并发 | 并行 | 并行 |
是否可直接进行并发控制 | False | True | False | False |
切换方式 | 由操作系统决定 | 由程序自身决定 | 在cpu 多核中运行,不存在切换 | 请求同时在多台实例中运行,不存在切换 |
最大并行数 | 1 | 1 | cpu 核数 | 无限制 |
任务间通信方式 | 状态共享 | 状态共享 | 使用多进程队列通信 | 不存在通信 |
是否线程安全 | False | True | True | True |
适用类型 | I/O 密集型 | I/O 密集型 | CPU 密集型 | CPU 密集型(如果未超时的话) |
任务开销 | 每个任务所在的线程消耗内存并增加任务之间的切换时间 | 所有的任务都尽可能少地在单个进程和单个线程中运行 | 每个任务的系统进程本身比线程消耗更多的内存和切换时间 | 启动新实例可能会带来延迟成本 |
既然您了解了所有选项,那么选择一个就变得很容易了。
不过,在执行此操作之前,您应该仔细检查您是否确实需要加快您的任务。如果它每周运行一次并且需要 10 分钟,那么加快它是否有任何意义?
如果是,那么只需参考此流程图:
总结
现在,我们已经看到了 Python 中可用的核心并发选项的示例:
threading asyncio multiprocessing
以及为并行 Python 提供简化环境的部署选项:
cloud functions
我们也了解了它们之间的区别、各自的优缺点以及何时选择每个选项。
好了,这就是用制作午餐的故事讲解并发和并行的全部内容。如果大家觉得本文内容有帮助,请点赞转发支持一下。
参考原文:https://sourcery.ai/blog/concurrency/
- EOF -
加主页君微信,不仅Python技能+1
主页君日常还会在个人微信分享Python相关工具、资源和精选技术文章,不定期分享一些有意思的活动、岗位内推以及如何用技术做业余项目
加个微信,打开一扇窗
觉得本文对你有帮助?请分享给更多人
推荐关注「Python开发者」,提升Python技能
点赞和在看就是最大的支持❤️