查看原文
其他

[技能篇]多线程爬虫

爬虫俱乐部 Stata and Python数据分析 2022-03-15

本文作者:张雷豪,中南财经政法大学统计与数学学院

本文编辑:张孟晗

技术总编:王子一

Stata&Python云端课程来啦!

       为了感谢大家长久以来的支持和信任,爬虫俱乐部为大家送福利啦!!!Stata&Python特惠课程双双上线腾讯课堂~爬虫俱乐部推出了Python编程培训课程Stata基础课程Stata进阶课程Stata文本分析正则表达式网络爬虫基本字符串课程。报名课程即可加入答疑群,对报名有任何疑问欢迎在公众号后台和腾讯课堂留言哦。我们在这篇推文的最后提供了每门课程的课程二维码,大家有需要的话可以直接扫描二维码查看课程详情并进行购买哦~

引言

不知你是否有过为了抓取某些数据而苦苦等待的经历呢?当爬取的数据量非常大时,如果使用单线程的爬虫,爬取数据的速度是非常慢的。通常解决此问题的途径使用Python中的多线程与多进程,这样就可以实现同时完成多项工作,提高执行效率。今天小编将向大家介绍在Python中如何创建、使用多线程爬虫。


1、什么是线程

线程(Thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。例如,对于视频播放器,显示视频用一个线程,播放音频用另一个线程。只有2个线程同时工作,我们才能正常观看画面和声音同步的视频。


2、GIL锁

说起Python的多线程,就不得不提起Python的GIL锁机制。简而言之,Python全局解释器锁或GIL是一种互斥锁,仅允许一个线程持有Python解释器的控制权。

GIL锁意味着在任何时间点只能有一个线程处于执行状态,这似乎使得多线程加速变得不可能。事实上,对于受CPU限制的程序,多线程与单线程程序的运行时间几乎一样,而对于受I/O限制的程序(比如爬虫程序),线程会在I/O阻塞时(等待网站响应时)释放GIL锁,这时其他线程可以获取GIL锁并继续执行。通过各个线程之间的交替运行,从而实现多线程加速。

下图可以形象地说明这一过程:


3、threading库

Python的标准库提供了两个多线程模块:_threadthreading,由于threading_thread进行了封装,因此绝大多数情况下,我们只需要使用threading库即可。

创建多线程对象一般有2种手段。

一是直接创建threading.Thread,将一个函数从类的构造器传递进去,用来处理任务。下面给出一个简单的例子,process函数用来处理任务,使用threading.Threadtarget参数传入自定义函数即可创建多线程。

#(来自multithread.py)import threading,time
def process(): for i in range( 3): time.sleep(1) print("thread name is %s" % threading.current_thread( ).name)
if __name__ == '__main__':
print("-----主线程开始-----") # 创建4个线程,存入列表 threads = [threading.Thread(target=process) for i in range(4)] for t in threads: t.start() #开启线程 for t in threads: t.join() #等待子线程结束    print( "-----主线程结束-----")

二是编写一个自定义类继承threading.Thread,然后复写自定义类的run()方法,并在run()方法中编写任务处理代码,调用start()方法可以开启线程,代码如下:

import threading,time
class myThread(threading.Thread) :
def run(self): for i in range(3): time.sleep(1) tmp = "子线程"+self.name+'执行,i='+str(i) # name属性中保存的是当前线程的名字 print(tmp)
if __name__ == '__main__':
print( '-----主线程开始-----') t1 = myThread() #创建子线程t1 t2 = myThread() #创建子线程t2 t1.start() #启动子线程t1 t2.start() #启动子线程t2 t1.join() #等待子线程t1 t2.join() #等待子线程t2    print('-----主线程结束-----')


4.多线程爬虫实例—爬取豆瓣电影Top250

我们以爬虫经典项目——爬取豆瓣电影Top250为例,实现多线程爬虫。对于该项目的单线程爬虫,往期推文中已有介绍,这里不再赘述。下面我们以类继承的方法实现该项目。

首先,编写一个自定义继承类,并复写run()方法。

#来自top250_thread_class.pyclass myThread(threading.Thread):
def __init__(self, name, urls):
threading.Thread.__init__(self) self.urls = urls # 传入url self.name = name # 命名多线程
def run(self):
print("Starting " + self.name) get_data(self.urls) # 自定义函数        print("Exiting " + self.name)

第二,编写自定义函数,即代码中的get_data函数。由于其实现方法与单线程爬虫没有什么区别,这里不再赘述。

第三,编写函数用以分配任务。执行多线程程序时需要将全部任务合理分配给各个线程,常见的方法是将其均分,如下所示:

def split_list_n_list(origin_list, n):
_list = [] if len(origin_list) % n == 0: cnt = len(origin_list) // n else: cnt = len(origin_list) // n + 1
for i in range(0, n): _list.append(origin_list[i*cnt:(i+1)*cnt])
    return _list

此外,常用的方法还有队列以及线程池等。

最后,创建多线程。thread.start()方法用来开启新的线程,thread.join()方法用来等待所有线程结束。

def main(num_thread):
'''num_thread为要创建的线程数'''
start_time = time.time() # 标记开始 global df df = [] # 存储数据 threads = [] # 线程池 url_list = []
for i in range(0,251,25): url = "https://movie.douban.com/top250?start={}&filter=".format(str(i)) url_list.append(url) urls_list = split_list_n_list(url_list, num_thread) # 分配任务
for i in range(1, num_thread+1): # 创建新线程 thread = myThread("Thread-" + str(i), urls = urls_list[i-1]) # 开启新线程 thread.start() # 添加新线程到线程列表 threads.append(thread)
# 等待所有线程完成 for thread in threads: thread.join()
columns = ['排名','电影名称','导演','上映年份','制作国家','类型','评分','评价分数','短评'] d = pd.DataFrame(df,columns = columns) d['排名'] = d['排名'].apply(lambda x: int(x)) d = d.sort_values('排名') # 排序 d.to_excel('Top250_3.xlsx',index=False)
end_time = time.time() # 标记结束    print(end_time - start_time)    # 计算用时

我们分别运行单线程程序和多线程程序,来比较两者之间的效率。单线程程序运行花费2.3765220642089844秒,而对于多线程程序,在开启两个线程的情况下,仅花费1.3344297409057617秒,速度提升了一倍左右,如下图所示:

当然,开启更多的线程会有更大的速度提升,但是需要提醒的是,过于频繁的访问有可能会被封IP哦,而且会对网站服务器造成较大压力,所以对线程数进行合理选择即可。

细心的读者可能会发现,在main()中首先声明了全局变量,而且在存储数据时进行了排序操作,这又是为什么呢?我们将在该系列的下期推文中予以解答,欢迎持续关注。

最后,附上多线程爬虫的全部代码:
import pandas as pdimport timeimport requestsfrom lxml import etreeimport threading

class myThread(threading.Thread): def __init__(self, name, urls): threading.Thread.__init__(self) self.urls = urls # 传入url self.name = name # 命名多线程
def run(self):
print("Starting " + self.name) get_data(self.urls) # 自定义函数 print("Exiting " + self.name) def get_data(urls): global df headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4343.0 Safari/537.36', 'Referer': 'https://movie.douban.com/top250'} for url in urls: res = requests.get(url,headers=headers) html = res.text xp = etree.HTML(html) lis = xp.xpath('//*[@id="content"]/div/div[1]/ol/li') for li in lis: """排名、标题、导演、演员、""" ranks = li.xpath('div/div[1]/em/text()') titles = li.xpath('div/div[2]/div[1]/a/span[1]/text()') directors = li.xpath('div/div[2]/div[2]/p[1]/text()')[0].strip().replace("\xa0\xa0\xa0","\t").split("\t") infos = li.xpath('div/div[2]/div[2]/p[1]/text()')[1].strip().replace('\xa0','').split('/') dates,areas,genres = infos[0],infos[1],infos[2] ratings = li.xpath('.//div[@class="star"]/span[2]/text()')[0] scores = li.xpath('.//div[@class="star"]/span[4]/text()')[0][:-3] quotes = li.xpath('.//p[@class="quote"]/span/text()') for rank,title,director in zip(ranks,titles,directors): if len(quotes) == 0: quotes = None else: quotes = quotes[0] df.append([rank,title,director,dates,areas,genres,ratings,scores,quotes])
def split_list_n_list(origin_list, n): _list = [] if len(origin_list) % n == 0: cnt = len(origin_list) // n else: cnt = len(origin_list) // n + 1 for i in range(0, n): _list.append(origin_list[i*cnt:(i+1)*cnt]) return _list

def main(num_thread): '''num_thread为要创建的线程数''' start_time = time.time() # 标记开始 global df df = [] # 存储数据 threads = [] # 线程池 url_list = [] for i in range(0,251,25): url = "https://movie.douban.com/top250?start={}&filter=".format(str(i)) url_list.append(url) urls_list = split_list_n_list(url_list, num_thread) # 分配任务 for i in range(1, num_thread+1): # 创建新线程 thread = myThread("Thread-" + str(i), urls = urls_list[i-1]) # 开启新线程 thread.start() # 添加新线程到线程列表 threads.append(thread)
# 等待所有线程完成 for thread in threads: thread.join()
columns = ['排名','电影名称','导演','上映年份','制作国家','类型','评分','评价分数','短评'] d = pd.DataFrame(df,columns = columns) d['排名'] = d['排名'].apply(lambda x: int(x)) d = d.sort_values('排名') # 排序 d.to_excel('Top250_3.xlsx',index=False) end_time = time.time() # 标记结束 print(end_time - start_time) # 计算用时

END




最后,我们为大家揭秘雪球网(https://xueqiu.com/)最新所展示的沪深证券和港股关注人数增长Top10。




腾讯课堂课程二维码








            


 对我们的推文累计打赏超过1000元,我们即可给您开具发票,发票类别为“咨询费”。用心做事,不负您的支持!











往期推文推荐

        “好哭”是衡量一部好电影的标准吗?

        Stata&Python云端课程来啦!

         带你了解Stata中的矩阵

         Seminar|总统的朋友:政治关联与企业价值
         爬虫实战 | 爬取中国天气网

         爬虫实战 | 爬取东方财富网经济数据——以居民消费价格指数(CPI)为例

         Seminar|媒体关联董事对融资和外部治理的影响        神奇的组内交叉合并          PDF分章节转TXT并实现可视化——以胡景北知青日记1971至1978年为例

万物皆可开——shellout妙用

无处不在的系列配置项|从零开始的Pyecharts(三)

使用Python制作自动聊天机器人  

fillin一下,平衡回来~

order命令——快速改变变量顺序的利器 Ajax应用场景——以获取雪球网港股代码及公司名称为例

播放列表中的歌单排行 

在Stata中轻松运用program编写命令

Meta Analysis in Stata17      

芒果TV视频弹幕爬取之《我在他乡挺好的》

Stata中的判断神器——confirm命令

cngdf——名义GDP与实际GDP之间的摆渡船

最近《扫黑风暴》有点火爆!我从豆瓣评论中发现了这些……

随机森林-Random Forest 

复原之神--preserve&restore

合并,“纵”享新丝滑:frameappend & xframeappend
什么是全局配置项?|从零开始的Pyecharts(二)帮你拿下数据可视化|从零开始的Pyecharts 

Stata助力疫情打卡管理——是谁没有接龙呢?

这十年,《金融研究》的编委和读者偏爱哪些研究话题和文章?

【案例展示】Python与数据库交互

学好这一手,英语词典常在手 

玩转Stata中的数学函数

关于我们 


   微信公众号“Stata and Python数据分析”分享实用的Stata、Python等软件的数据处理知识,欢迎转载、打赏。我们是由李春涛教授领导下的研究生及本科生组成的大数据处理和分析团队。

   武汉字符串数据科技有限公司一直为广大用户提供数据采集和分析的服务工作,如果您有这方面的需求,请发邮件到statatraining@163.com,或者直接联系我们的数据中台总工程司海涛先生,电话:18203668525,wechat: super4ht。海涛先生曾长期在香港大学从事研究工作,现为知名985大学的博士生,爬虫俱乐部网络爬虫技术和正则表达式的课程负责人。



此外,欢迎大家踊跃投稿,介绍一些关于Stata和Python的数据处理和分析技巧。

投稿邮箱:statatraining@163.com投稿要求:
1)必须原创,禁止抄袭;
2)必须准确,详细,有例子,有截图;
注意事项:
1)所有投稿都会经过本公众号运营团队成员的审核,审核通过才可录用,一经录用,会在该推文里
为作者署名,并有赏金分成。

2)邮件请注明投稿,邮件名称为“投稿+推文名称”。
3)应广大读者要求,现开通有偿问答服务,如果大家遇到有关数据处理、分析等问题,可以在公众
号中提出,只需支付少量赏金,我们会在后期的推文里给予解答。



您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存