【技能篇】多进程队列间通信
本文作者:张雷豪,中南财经政法大学统计与数学学院
本文编辑:张孟晗
技术总编:王子一
Stata&Python云端课程来啦!
为了感谢大家长久以来的支持和信任,爬虫俱乐部为大家送福利啦!!!Stata&Python特惠课程双双上线腾讯课堂~爬虫俱乐部推出了Python编程培训课程、Stata基础课程、Stata进阶课程、Stata文本分析、正则表达式、网络爬虫、基本字符串课程。报名课程即可加入答疑群,对报名有任何疑问欢迎在公众号后台和腾讯课堂留言哦。我们在这篇推文的最后提供了每门课程的课程二维码,大家有需要的话可以直接扫描二维码查看课程详情并进行购买哦~
引言
在之前的两期推文《[技能篇]多线程爬虫》、《多线程队列间通信》中,我们向大家介绍了Python中的多线程编程方法和多线程间的队列通信。本期推文我们继续来讲解Python并发编程的另一种重要形式——多进程。
1.什么是进程
在了解进程之前,我们需要知道多任务的概念。多任务,顾名思义,就是指操作系统能够执行多个任务。例如,使用Windows或Linux操作系统可以同时看电影、聊天、查看网页等,此时,操作系统就是在执行多任务,而每一个任务就是一个进程。
我们可以打开Windows的任务管理器,查看一下操作系统正在执行的进程。下图中显示的进程不仅包括应用程序(如Word、Excel、Edge浏览器等),还包括系统后台进程。
2.multiprocess模块
Unix/Linux操作系统中提供了一个fork()系统调用,使用Python中os模块中的fork()方法即可开启多进程。由于Windows没有fork调用,Python提供了multiprocess模块来实现Windows中的多进程操作。multiprocess模块的使用与threading模块非常类似,可以进行类比使用。
3.Python中的多进程与多线程
在之前的推文中,我们讲解了Python中的全局解释器锁(GIL锁)机制,它使得在同一个进程中只有一个线程处于执行状态,因而Python中的多线程更适用于I/O密集型任务。对于CPU密集型任务,由于每一个进程都持有一个GIL锁,所以多进程可以实现真正意义上的并行执行。
下面写一个小程序测试多进程和多线程在CPU密集型任务中的效率差异:
首先定义一个CPU密集型计算函数:
def process():
for i in range(10000):
for j in range(10000):
n = i * j
然后分别定义多线程和多进程函数:
def multi_thread():
start_time = time()
# 创建子线程
t_list = [threading.Thread(target = process) for i in range(4)]
# 开启线程
for t in t_list:
t.start()
# 等待线程结束
for t in t_list:
t.join()
print("多线程运行时间为:", time()-start_time);
def multi_process():
start_time = time()
# 创建子进程
p_list = [Process(target = process) for i in range(4)]
# 开启进程
for p in p_list:
p.start()
# 等待进程结束
for p in p_list:
p.join()
print("多进程运行时间为:", time()-start_time)
多线程的运行时间反而比单线程还要慢,这是因为启动多线程也需要消耗一定的系统资源。相比之下,多进程任务处理就要快很多,但是多进程占用的系统资源也更多。
4.线程池和进程池
除了手动创建线程或进程,我们还可以利用Python的线程池或进程池实现多线程或多进程。这不仅使得代码更为简洁,还避免了频繁创建和销毁线程或进程带来的资源浪费。
Python中的concurrent.futures可以便捷地启动并行任务。创建线程池和进程池主要通过Executor类下的ThreadPoolExecutor和ProcessPoolExecutor两个子类实现,两者用法大同小异,这里我们以线程池为例。
用法一:使用map函数
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as pool:
results = pool.map(gen_data, urls)
for result in results:
print(result)
map函数的用法很简单,类似于Python中map的用法,第一个参数是自定义函数,第二个参数是传入自定义函数的参数(注意要以可迭代对象的形式传入),返回的结果会被存放在results中,并且其结果和map的入参顺序是相一致的。
用法二:使用future模式
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor() as pool:
futures = [pool.submit(gen_data, url) for url in urls]
for future in futures:
print(future.result())
for future in as_completed(futures):
print(future.result())
uture模式中要注意submit()函数的参数是单个出现的,与map方法不同。future模式还包含了一个as_completed()函数,当某个线程执行完毕后会立即输出结果,而不是按照顺序。
利用线程池技术,我们可以对豆瓣电影Top250项目进行改造,完整代码如下:
import requests
from lxml import etree
import pandas as pd
import time
from concurrent.futures import ThreadPoolExecutor
def get_data(url):
data = []
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4343.0 Safari/537.36',
'Referer': 'https://movie.douban.com/top250'}
res = requests.get(url,headers=headers)
html = res.text
xp = etree.HTML(html)
lis = xp.xpath('//*[@id="content"]/div/div[1]/ol/li')
for li in lis:
"""排名、标题、导演、演员、"""
ranks = li.xpath('div/div[1]/em/text()')
titles = li.xpath('div/div[2]/div[1]/a/span[1]/text()')
directors = li.xpath('div/div[2]/div[2]/p[1]/text()')[0].strip().replace("\xa0\xa0\xa0","\t").split("\t")
infos = li.xpath('div/div[2]/div[2]/p[1]/text()')[1].strip().replace('\xa0','').split('/')
dates,areas,genres = infos[0],infos[1],infos[2]
ratings = li.xpath('.//div[@class="star"]/span[2]/text()')[0]
scores = li.xpath('.//div[@class="star"]/span[4]/text()')[0][:-3]
quotes = li.xpath('.//p[@class="quote"]/span/text()')
for rank,title,director in zip(ranks,titles,directors):
if len(quotes) == 0:
quotes = None
else:
quotes = quotes[0]
data.append([rank,title,director,dates,areas,genres,ratings,scores,quotes])
return data
if __name__ == "__main__":
start_time = time.time()
datas = []
urls = []
columns = ['排名','电影名称','导演','上映年份','制作国家','类型','评分','评价分数','短评']
for i in range(0, 226, 25):
url = "https://movie.douban.com/top250?start={}&filter=".format(str(i))
urls.append(url)
with ThreadPoolExecutor() as pool:
results = pool.map(get_data, urls)
for result in results:
datas.append(result)
df = pd.concat(map(lambda x:pd.DataFrame(x, columns=columns), datas))
df.to_excel('Top250_1.xlsx',index=False)
end_time = time.time()
print(end_time - start_time)
5. 小结
通过三期推文的介绍,相信大家对于Python并发编程已经有了初步的了解。对于I/O密集型任务,由于耗费资源少,更适合使用多线程处理。对于CPU密集型任务,由于GIL锁的存在,多进程能大大提高效率。当然,如果要处理的数据量不大,单线程相对更优,毕竟多线程/多进程程序的编写和调试比单线程程序更繁琐费时。此外,在使用多线程/多进程时要特别注意线程/进程安全,否则可能会出现许多意想不到的bug。
以上就是本期推文的全部内容了,快快学习起来吧~
END
最后,我们为大家揭秘雪球网(https://xueqiu.com/)最新所展示的沪深证券和港股关注人数增长Top10。
腾讯课堂课程二维码
对我们的推文累计打赏超过1000元,我们即可给您开具发票,发票类别为“咨询费”。用心做事,不负您的支持!
往期推文推荐
Seminar丨公司董事会的人才增长:来自中国的证据
正则表达式--懒惰模式
爬完独立董事的年薪,我的眼镜跌破了! 识别旅游“照骗”——看风景名胜是否名副其实 主成分分析的Python实现正则表达式--贪婪模式
Seminar丨谁更重要:高管股权薪酬与财务报告欺诈DOS能量,超乎你想象!爬虫实战丨走进哈利波特的魔法世界
数据集合并的新路子-frlink命令
Seminar丨附近的公司:利用卫星图像研究本地信息优势线性同余法生成伪随机数
[技能篇]多线程爬虫
“好哭”是衡量一部好电影的标准吗?
Stata&Python云端课程来啦!带你了解Stata中的矩阵
Seminar|总统的朋友:政治关联与企业价值爬虫实战 | 爬取中国天气网
爬虫实战 | 爬取东方财富网经济数据——以居民消费价格指数(CPI)为例
Seminar|媒体关联董事对融资和外部治理的影响神奇的组内交叉合并 PDF分章节转TXT并实现可视化——以胡景北知青日记1971至1978年为例万物皆可开——shellout妙用
无处不在的系列配置项|从零开始的Pyecharts(三)
使用Python制作自动聊天机器人
fillin一下,平衡回来~
order命令——快速改变变量顺序的利器 Ajax应用场景——以获取雪球网港股代码及公司名称为例播放列表中的歌单排行
在Stata中轻松运用program编写命令
Meta Analysis in Stata17芒果TV视频弹幕爬取之《我在他乡挺好的》
Stata中的判断神器——confirm命令
关于我们
微信公众号“Stata and Python数据分析”分享实用的Stata、Python等软件的数据处理知识,欢迎转载、打赏。我们是由李春涛教授领导下的研究生及本科生组成的大数据处理和分析团队。
武汉字符串数据科技有限公司一直为广大用户提供数据采集和分析的服务工作,如果您有这方面的需求,请发邮件到statatraining@163.com,或者直接联系我们的数据中台总工程司海涛先生,电话:18203668525,wechat: super4ht。海涛先生曾长期在香港大学从事研究工作,现为知名985大学的博士生,爬虫俱乐部网络爬虫技术和正则表达式的课程负责人。
此外,欢迎大家踊跃投稿,介绍一些关于Stata和Python的数据处理和分析技巧。
投稿邮箱:statatraining@163.com投稿要求:
1)必须原创,禁止抄袭;
2)必须准确,详细,有例子,有截图;
注意事项:
1)所有投稿都会经过本公众号运营团队成员的审核,审核通过才可录用,一经录用,会在该推文里为作者署名,并有赏金分成。
2)邮件请注明投稿,邮件名称为“投稿+推文名称”。
3)应广大读者要求,现开通有偿问答服务,如果大家遇到有关数据处理、分析等问题,可以在公众号中提出,只需支付少量赏金,我们会在后期的推文里给予解答。