Python多线程爬虫详解

网络爬虫程序是一种 IO 密集型程序,程序中涉及了很多网络 IO 以及本地磁盘 IO 操作,这些都会消耗大量的时间,从而降低程序的执行效率,而 Python 提供的多线程能够在一定程度上提升 IO 密集型程序的执行效率。

如果想学习 Python 多进程、多线程以及 Python GIL 全局解释器锁的相关知识,可参考《Python并发编程教程》。

多线程使用流程

Python 提供了两个支持多线程的模块,分别是 _thread 和 threading。其中 _thread 模块偏底层,它相比于 threading 模块功能有限,因此推荐大家使用 threading 模块。 threading 中不仅包含了  _thread 模块中的所有方法,还提供了一些其他方法,如下所示:
  • threading.currentThread() 返回当前的线程变量。
  • threading.enumerate() 返回一个所有正在运行的线程的列表。
  • threading.activeCount() 返回正在运行的线程数量。

线程的具体使用方法如下所示:
  1. from threading import Thread
  2. #线程创建、启动、回收
  3. t = Thread(target=函数名) # 创建线程对象
  4. t.start() # 创建并启动线程
  5. t.join() # 阻塞等待回收线程
创建多线程的具体流程:
  1. t_list = []
  2. for i in range(5):
  3. t = Thread(target=函数名)
  4. t_list.append(t)
  5. t.start()
  6. for t in t_list:
  7. t.join()
除了使用该模块外,您也可以使用  Thread  线程类来创建多线程。

在处理线程的过程中要时刻注意线程的同步问题,即多个线程不能操作同一个数据,否则会造成数据的不确定性。通过 threading 模块的 Lock 对象能够保证数据的正确性。

比如,使用多线程将抓取数据写入磁盘文件,此时,就要对执行写入操作的线程加锁,这样才能够避免写入的数据被覆盖。当线程执行完写操作后会主动释放锁,继续让其他线程去获取锁,周而复始,直到所有写操作执行完毕。具体方法如下所示:
  1. from threading import Lock
  2. lock = Lock()
  3. # 获取锁
  4. lock.acquire()
  5. wirter.writerows("线程锁问题解决")
  6. # 释放锁
  7. lock.release()

Queue队列模型

对于 Python 多线程而言,由于 GIL 全局解释器锁的存在,同一时刻只允许一个线程占据解释器执行程序,当此线程遇到 IO 操作时就会主动让出解释器,让其他处于等待状态的线程去获取解释器来执行程序,而该线程则回到等待状态,这主要是通过线程的调度机制实现的。

由于上述原因,我们需要构建一个多线程共享数据的模型,让所有线程都到该模型中获取数据。queue(队列,先进先出) 模块提供了创建共享数据的队列模型。比如,把所有待爬取的 URL 地址放入队列中,每个线程都到这个队列中去提取 URL。queue 模块的具体使用方法如下:
  1. # 导入模块
  2. from queue import Queue
  3. q = Queue() #创界队列对象
  4. q.put(url) 向队列中添加爬取一个url链接
  5. q.get() # 获取一个url,当队列为空时,阻塞
  6. q.empty() # 判断队列是否为空,True/False

多线程爬虫案例

下面通过多线程方法抓取小米应用商店(https://app.mi.com/)中应用分类一栏,所有类别下的 APP 的名称、所属类别以及下载详情页 URL 。如下图所示:

多线程爬虫
图1:小米应用商城
抓取下来的数据 demo 如下所示:
三国杀,棋牌桌游,http://app.mi.com/details?id=com.bf.sgs.hdexp.mi

1) 案例分析

通过搜索关键字可知这是一个动态网站,因此需要抓包分析。

刷新网页来重新加载数据,可得知请求头的 URL 地址,如下所示:
https://app.mi.com/categotyAllListApi?page=0&categoryId=1&pageSize=30
其中查询参数 pageSize 参数值不变化,page 会随着页码的增加而变化,而类别 Id 通过查看页面元素,如下所示
  1. <ul class="category-list">
  2. <li><a class="current" href="/category/15">游戏</a></li>
  3. <li><a href="/category/5">实用工具</a></li>
  4. <li><a href="/category/27">影音视听</a></li>
  5. <li><a href="/category/2">聊天社交</a></li>
  6. <li><a href="/category/7">图书阅读</a></li>
  7. <li><a href="/category/12">学习教育</a></li>
  8. <li><a href="/category/10">效率办公</a></li>
  9. <li><a href="/category/9">时尚购物</a></li>
  10. <li><a href="/category/4">居家生活</a></li>
  11. <li><a href="/category/3">旅行交通</a></li>
  12. <li><a href="/category/6">摄影摄像</a></li>
  13. <li><a href="/category/14">医疗健康</a></li>
  14. <li><a href="/category/8">体育运动</a></li>
  15. <li><a href="/category/11">新闻资讯</a></li>
  16. <li><a href="/category/13">娱乐消遣</a></li>
  17. <li><a href="/category/1">金融理财</a></li>
  18. </ul>
因此,可以使用 Xpath 表达式匹配 href 属性,从而提取类别 ID 以及类别名称,表达式如下:
基准表达式:xpath_bds = '//ul[@class="category-list"]/li'
提取 id 表达式:typ_id = li.xpath('./a/@href')[0].split('/')[-1]
类型名称:typ_name = li.xpath('./a/text()')[0]
点击开发者工具的 response 选项卡,查看响应数据,如下所示:
{
count: 2000,
data: [
{
appId: 1348407,
displayName: "天气暖暖-关心Ta从关心天气开始",
icon: "http://file.market.xiaomi.com/thumbnail/PNG/l62/AppStore/004ff4467a7eda75641eea8d38ec4d41018433d33",
level1CategoryName: "居家生活",
packageName: "com.xiaowoniu.WarmWeather"
},
{
appId: 1348403,
displayName: "贵斌同城",
icon: "http://file.market.xiaomi.com/thumbnail/PNG/l62/AppStore/0e607ac85ed9742d2ac2ec1094fca3a85170b15c8",
level1CategoryName: "居家生活",
packageName: "com.gbtc.guibintongcheng"
},
...
...
通过上述响应内容,我们可以从中提取出 APP 总数量(count)和 APP (displayName)名称,以及下载详情页的 packageName。由于每页中包含了 30 个 APP,所以总数量(count)可以计算出每个类别共有多少页。
pages = int(count) // 30 + 1
下载详情页的地址是使用 packageName 拼接而成,如下所示:
link = 'http://app.mi.com/details?id=' + app['packageName']

​2) 完整程序

完整程序如下所示:
  1. # -*- coding:utf8 -*-
  2. import requests
  3. from threading import Thread
  4. from queue import Queue
  5. import time
  6. from fake_useragent import UserAgent
  7. from lxml import etree
  8. import csv
  9. from threading import Lock
  10. import json
  11.  
  12. class XiaomiSpider(object):
  13. def __init__(self):
  14. self.url = 'http://app.mi.com/categotyAllListApi?page={}&categoryId={}&pageSize=30'
  15. # 存放所有URL地址的队列
  16. self.q = Queue()
  17. self.i = 0
  18. # 存放所有类型id的空列表
  19. self.id_list = []
  20. # 打开文件
  21. self.f = open('XiaomiShangcheng.csv','a',encoding='utf-8')
  22. self.writer = csv.writer(self.f)
  23. # 创建锁
  24. self.lock = Lock()
  25.  
  26. def get_cateid(self):
  27. # 请求
  28. url = 'http://app.mi.com/'
  29. headers = { 'User-Agent': UserAgent().random}
  30. html = requests.get(url=url,headers=headers).text
  31. # 解析
  32. parse_html = etree.HTML(html)
  33. xpath_bds = '//ul[@class="category-list"]/li'
  34. li_list = parse_html.xpath(xpath_bds)
  35. for li in li_list:
  36. typ_name = li.xpath('./a/text()')[0]
  37. typ_id = li.xpath('./a/@href')[0].split('/')[-1]
  38. # 计算每个类型的页数
  39. pages = self.get_pages(typ_id)
  40. #往列表中添加二元组
  41. self.id_list.append( (typ_id,pages) )
  42.  
  43. # 入队列
  44. self.url_in()
  45.  
  46. # 获取count的值并计算页数
  47. def get_pages(self,typ_id):
  48. # 获取count的值,即app总数
  49. url = self.url.format(0,typ_id)
  50. html = requests.get(
  51. url=url,
  52. headers={'User-Agent':UserAgent().random}
  53. ).json()
  54. count = html['count']
  55. pages = int(count) // 30 + 1
  56. return pages
  57.  
  58. # url入队函数,拼接url,并将url加入队列
  59. def url_in(self):
  60. for id in self.id_list:
  61. # id格式:('4',pages)
  62. for page in range(1,id[1]+1):
  63. url = self.url.format(page,id[0])
  64. # 把URL地址入队列
  65. self.q.put(url)
  66.  
  67. # 线程事件函数: get() -请求-解析-处理数据,三步骤
  68. def get_data(self):
  69. while True:
  70. # 判断队列不为空则执行,否则终止
  71. if not self.q.empty():
  72. url = self.q.get()
  73. headers = {'User-Agent':UserAgent().random}
  74. html = requests.get(url=url,headers=headers)
  75. res_html = html.content.decode(encoding='utf-8')
  76. html=json.loads(res_html)
  77. self.parse_html(html)
  78. else:
  79. break
  80. # 解析函数
  81. def parse_html(self,html):
  82. # 写入到csv文件
  83. app_list = []
  84.  
  85. for app in html['data']:
  86. # app名称 + 分类 + 详情链接
  87. name = app['displayName']
  88. link = 'http://app.mi.com/details?id=' + app['packageName']
  89. typ_name = app['level1CategoryName']
  90. # 把每一条数据放到app_list中,并通过writerows()实现多行写入
  91. app_list.append([name,typ_name,link])
  92.  
  93. print(name,typ_name)
  94. self.i += 1
  95.  
  96. # 向CSV文件中写入数据
  97. self.lock.acquire()
  98. self.writer.writerows(app_list)
  99. self.lock.release()
  100.  
  101. # 入口函数
  102. def main(self):
  103. # URL入队列
  104. self.get_cateid()
  105. t_list = []
  106. # 创建多线程
  107. for i in range(1):
  108. t = Thread(target=self.get_data)
  109. t_list.append(t)
  110. # 启动线程
  111. t.start()
  112.  
  113. for t in t_list:
  114. # 回收线程
  115. t.join()
  116.  
  117. self.f.close()
  118. print('数量:',self.i)
  119.  
  120. if __name__ == '__main__':
  121. start = time.time()
  122. spider = XiaomiSpider()
  123. spider.main()
  124. end = time.time()
  125. print('执行时间:%.1f' % (end-start))
运行上述程序后,打开存储文件,其内容如下:
在我们之间-单机版,休闲创意,http://app.mi.com/details?id=com.easybrain.impostor.gtx

粉末游戏,模拟经营,http://app.mi.com/details?id=jp.danball.powdergameviewer.bnn

三国杀,棋牌桌游,http://app.mi.com/details?id=com.bf.sgs.hdexp.mi

腾讯欢乐麻将全集,棋牌桌游,http://app.mi.com/details?id=com.qqgame.happymj

快游戏,休闲创意,http://app.mi.com/details?id=com.h5gamecenter.h2mgc

皇室战争,战争策略,http://app.mi.com/details?id=com.supercell.clashroyale.mi

地铁跑酷,跑酷闯关,http://app.mi.com/details?id=com.kiloo.subwaysurf
...
...