查看原文
其他

多线程队列间通信

爬虫俱乐部 Stata and Python数据分析 2022-03-15

本文作者:张雷豪,中南财经政法大学统计与数学学院

本文编辑:赵一帆

技术总编:王子一

Stata&Python云端课程来啦!

       为了平衡团队运营成本,维系公众号的运营,也与国内动辄数千元的Stata课程缩短差距,我们的网课不得不上调价格,我们决定于11月1日起调价,Python课程的价格调整为249.9元Stata进阶课程调为249.9元Stata基础课程调整到299.9元。大家可以告知一下身边想要购买的小伙伴,欲购从速哦,对报名有任何疑问欢迎在公众号后台和腾讯课堂留言~我们在这篇推文提供了每门课程的课程二维码,大家有需要的话可以直接扫描二维码查看课程详情并进行购买哦~



导读

在上期推文《[技能篇]多线程爬虫》中,我们介绍了多线程爬虫的有关概念和实例。本期推文我们来讲解有关多线程队列间通信的问题。

细心的读者可能会发现,在爬取豆瓣电影Top250的实例中,main()函数中首先声明了全局变量,而且在存储数据时进行了排序操作,这么做实际上与多线程之间的通信有关。

一、线程间通信

在执行单线程爬虫任务时,一般很少需要考虑变量的作用域问题。而在多线程程序中,由于各个线程是并行执行的,可能出现同时多个线程需要对变量进行修改的情形,因此我们要考虑线程间变量的传输与通信问题

最常见也是最简单的方法是利用全局变量,只需在各个线程中声明全局变量,即可实现线程间的变量传递。下面是一个简单的例子:

from threading import Threadimport time
def plus(): print('-------子线程1开始------') global g_num g_num += 50 print( 'g_num is %d '%g_num) print('-------子线程1结束------')
def minus(): time.sleep(1) print('-------子线程2开始------') global g_num g_num -= 50 print('g_num is %d '%g_num) print('-------子线程2结束------')
g_num = 100 #定义一个全局变量
if __name__ == "__main__" : print('-------主线程开始------') print('g_num is %d ' %g_num) t1 = Thread(target = plus) #实例化线程t1 t2 = Thread(target = minus) #实例化线程t2 t1.start() #开启线程t1 t2.start() #开启线程t2 t1.join() #等待t1线程结束 t2.join() #等待t2线程结束    print('-------主线程结束------')

运行结果如下:

可以看到,线程一首先对全局变量g_num进行了操作,接着线程二再对g_num进行修改,从而实现了线程间参数的传递。值得注意的是,在程序中需要在每个子线程中对全局变量进行声明,不然变量的作用域仍然是函数内。

回到开始的两个问题,第一个问题的答案是显而易见的,声明全局变量的原因是在线程间进行通信,方便数据的存储。但是为什么要在最后对数据进行排序呢?答案是:由于线程可以对全局变量随意修改,这就可能造成多线程之间对全局变量使用的混乱。线程锁可以较好地解决这个问题。

二、线程锁

在介绍线程锁之前,我们先来看一个简单的例子:

import threading
n = 0 # 全局变量
def process_1(): # 声明n为全局变量 global n for i in range(1000000): n = n + 1 print('线程1运行完之后,n=', n)
def process_2(): # 声明n为全局变量 global n for i in range(1000000): n = n + 1 print('线程2运行完之后,n=', n)
# 创建子线程t1 = threading.Thread(target = process_1)t2 = threading.Thread(target = process_2)# 开启线程t1.start()t2.start()

这是一个两线程程序,首先声明全局变量n,接着两个子线程同时对n进行操作,我们希望线程一结束后n=1000000,线程二结束后n=2000000,事实果真如此吗?我们来看运行结果:

事实上,由于两个线程同时对全局变量n进行操作,程序的运行结果出现混乱,如何解决这个问题呢?一个防止他人进入房间的简单方法,就是门上加一把锁。先到的人锁上门,后到的人就在门口排队,等锁打开再进去。在Python的threading模块,也提供了类似门锁功能的类——threading.Lock

某个线程要更改共享数据时,Lock先将其锁定,其他线程不能更改;直到该线程释放资源,其他的线程才能再次锁定该资源。线程锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。

threading.Lock的使用非常简单,只需先利用threading.Lock()创建锁对象,然后分别使用acquire()和release()获取和释放一个锁定,acquire()和release()必须成对出现。下面我们对程序加锁:

import threading
n = 0 # 全局变量lock = threading.Lock() # 创建锁对象
def process_1(): # 声明n为全局变量 global n lock.acquire() # 加锁 for i in range(1000000): n = n + 1 print('线程1运行完之后,n=', n) lock.release() # 解锁
def process_2(): # 声明n为全局变量 global n lock.acquire() # 加锁 for i in range(1000000): n = n + 1 print('线程2运行完之后,n=', n) lock.release() # 解锁
# 创建子线程t1 = threading.Thread(target = process_1)t2 = threading.Thread(target = process_2)# 开启线程t1.start()t2.start()


process_1()首先获取线程锁,对n进行操作,直到process_1()释放线程锁,process_2()才可以获取线程锁并继续运行。程序的运行结果如下:

需要注意的是,使用线程锁时,要避免死锁。在多任务系统下,当一个或多个线程等待系统资源,而资源又被线程本身或其他线程占用时,就形成了死锁。

三、队列

除了使用全局变量,Python中的queue模块还提供了Queue队列来实现线程间通信。Queue中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列,LIFO(后入先出)队列和优先级队列。

Queue的常用方法如下:

Queue():创建FIFO队列。

Queue.qsize():返回队列的大小。

Queue.empty():如果队列为空,返回True,反之False。

Queue.get():获取队列。

Queue.put():写入队列。

Queue.task_done():在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号。每个get()调用得到一个任务,接下来task_done()调用告诉队列该任务已经处理完毕。

Queue.join():实际上意味着等到队列为空,再执行别的操作

使用Queue队列在线程间通信通常应用于生产者消费者模式。产生数据的模块称为生产者,而处理数据的模块称为消费者。在生产者与消费者之间的缓冲区称之为仓库。生产者负责往仓库运输商品,而消费者负责从仓库里取出商品,这就构成了生产者消费者模式。这种模式可以用下图进行表示:


利用Queue模块,我们可以将上期推文中的爬虫程序改写为生产者消费者模式,从而实现线程间的安全通信。下面附上完整代码:

import pandas as pdimport timeimport requestsfrom lxml import etreefrom queue import Queuefrom threading import Thread
class Movie(): def __init__(self): self.df = [] self.headers ={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4343.0 Safari/537.36', 'Referer': 'https://movie.douban.com/top250'} self.columns = ['排名','电影名称','导演','上映年份','制作国家','类型','评分','评价分数','短评'] self.html_list = Queue() # 创建队列对象 self.url_list = Queue()
# 生产URL def get_url(self): url = 'https://movie.douban.com/top250?start={}&filter=' for i in range(0,250,25): self.url_list.put(url.format(str(i)))
# 消费URL并生产HTML def get_html(self): while True: if not self.url_list.empty(): url = self.url_list.get() resp = requests.get(url, headers=self.headers) html = resp.text self.html_list.put(html) else: break
# 消费HTML,解析数据 def get_data(self): while True: if not self.html_list.empty(): html = self.html_list.get() xp = etree.HTML(html) lis = xp.xpath('//*[@id="content"]/div/div[1]/ol/li') for li in lis: """排名、标题、导演、演员、""" ranks = li.xpath('div/div[1]/em/text()') titles = li.xpath('div/div[2]/div[1]/a/span[1]/text()') directors = li.xpath('div/div[2]/div[2]/p[1]/text()')[0].strip().replace("\xa0\xa0\xa0","\t").split("\t") infos = li.xpath('div/div[2]/div[2]/p[1]/text()')[1].strip().replace('\xa0','').split('/') dates,areas,genres = infos[0],infos[1],infos[2] ratings = li.xpath('.//div[@class="star"]/span[2]/text()')[0] scores = li.xpath('.//div[@class="star"]/span[4]/text()')[0][:-3] quotes = li.xpath('.//p[@class="quote"]/span/text()') for rank,title,director in zip(ranks,titles,directors): if len(quotes) == 0: quotes = None else: quotes = quotes[0] self.df.append([rank,title,director,dates,areas,genres,ratings,scores,quotes]) else: break

def main(self): start_time = time.time() self.get_url()
th_list = [] for i in range(5): th = Thread(target = self.get_html) th.start() th_list.append(th)
for th in th_list: th.join()
self.get_data() d = pd.DataFrame(self.df, columns=self.columns) d['排名'] = d['排名'].apply(lambda x: int(x)) d = d.sort_values('排名') # 排序 d.to_excel('Top250_2.xlsx', index=False) end_time = time.time() print(end_time-start_time)
if __name__ == '__main__': spider = Movie() spider.main()

最后,我们为大家揭秘雪球网(https://xueqiu.com/)最新所展示的沪深证券和港股关注人数增长Top10。



腾讯课堂课程二维码








            


 对我们的推文累计打赏超过1000元,我们即可给您开具发票,发票类别为“咨询费”。用心做事,不负您的支持!











往期推文推荐

        线性同余法生成伪随机数 

         [技能篇]多线程爬虫

“好哭”是衡量一部好电影的标准吗?

Stata&Python云端课程来啦!

带你了解Stata中的矩阵

Seminar|总统的朋友:政治关联与企业价值
爬虫实战 | 爬取中国天气网

爬虫实战 | 爬取东方财富网经济数据——以居民消费价格指数(CPI)为例

Seminar|媒体关联董事对融资和外部治理的影响神奇的组内交叉合并 PDF分章节转TXT并实现可视化——以胡景北知青日记1971至1978年为例

万物皆可开——shellout妙用

无处不在的系列配置项|从零开始的Pyecharts(三)

使用Python制作自动聊天机器人  

fillin一下,平衡回来~

order命令——快速改变变量顺序的利器 Ajax应用场景——以获取雪球网港股代码及公司名称为例

播放列表中的歌单排行 

在Stata中轻松运用program编写命令

Meta Analysis in Stata17      

芒果TV视频弹幕爬取之《我在他乡挺好的》

Stata中的判断神器——confirm命令

cngdf——名义GDP与实际GDP之间的摆渡船

最近《扫黑风暴》有点火爆!我从豆瓣评论中发现了这些……

随机森林-Random Forest 

复原之神--preserve&restore

合并,“纵”享新丝滑:frameappend & xframeappend
什么是全局配置项?|从零开始的Pyecharts(二)帮你拿下数据可视化|从零开始的Pyecharts 

Stata助力疫情打卡管理——是谁没有接龙呢?

这十年,《金融研究》的编委和读者偏爱哪些研究话题和文章?

【案例展示】Python与数据库交互

关于我们 


   微信公众号“Stata and Python数据分析”分享实用的Stata、Python等软件的数据处理知识,欢迎转载、打赏。我们是由李春涛教授领导下的研究生及本科生组成的大数据处理和分析团队。

   武汉字符串数据科技有限公司一直为广大用户提供数据采集和分析的服务工作,如果您有这方面的需求,请发邮件到statatraining@163.com,或者直接联系我们的数据中台总工程司海涛先生,电话:18203668525,wechat: super4ht。海涛先生曾长期在香港大学从事研究工作,现为知名985大学的博士生,爬虫俱乐部网络爬虫技术和正则表达式的课程负责人。



此外,欢迎大家踊跃投稿,介绍一些关于Stata和Python的数据处理和分析技巧。

投稿邮箱:statatraining@163.com投稿要求:
1)必须原创,禁止抄袭;
2)必须准确,详细,有例子,有截图;
注意事项:
1)所有投稿都会经过本公众号运营团队成员的审核,审核通过才可录用,一经录用,会在该推文里为作者署名,并有赏金分成。

2)邮件请注明投稿,邮件名称为“投稿+推文名称”。
3)应广大读者要求,现开通有偿问答服务,如果大家遇到有关数据处理、分析等问题,可以在公众号中提出,只需支付少量赏金,我们会在后期的推文里给予解答。



您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存