我们抓取,我们采集,我们分析,我们挖掘

异步爬虫

我们知道爬虫是 IO 密集型任务,比如如果我们使用 requests 库来爬取某个站点的话,发出一个请求之后,程序必须要等待网站返回响应之后才能接着运行,而在等待响应的过程中,整个爬虫程序是一直在等待的,实际上没有做任何事情。对于这种情况,我们有没有优化方案呢?

当然有,下面我们就来了解一下异步爬虫的基本概念和实现。

要实现异步机制的爬虫,那自然和协程脱不了关系。

协程的基本原理

案例网站:https://httpbin.org/delay/5,该站点等待5s返回响应

平时我们浏览网页的时候,绝大部分网页响应速度还是很快的,如果我们写爬虫来爬取的话,发出 Request 到收到 Response 的时间不会很长,因此我们需要等待的时间并不多。

然而像上面这个网站,一次 Request 就需要 5 秒才能得到 Response,如果我们用 requests 写爬虫来爬取的话,那每次 requests 都要等待 5 秒才能拿到结果了。

此时如果我们开了多线程或多进程来爬取的话,其爬取速度确实会成倍提升,那是否有更好的解决方案呢?

使用协程来加速的方法,此种方法对于 IO 密集型任务非常有效。如将其应用到网络爬虫中,爬取效率甚至可以成百倍地提升。

协程本质上是个单进程,它相对于多进程来说,无须线程上下文切换的开销。我们可以使用协程来实现异步操作,比如在网络爬虫场景下,我们发出一个请求之后,需要等待一定时间才能得到响应,但其实在这个等待过程中,程序可以干许多其他事情,等到响应得到之后才切换回来继续处理,这样可以充分利用 CPU 和其他资源,这就是协程的优势。

从 Python 3.4 开始,Python 中加入了协程的概念,但这个版本的协程还是以生成器对象为基础。

Python 3.5 则增加了 async/await,使得协程的实现更加方便。

Python 中使用协程最常用的库莫过于 asyncio。

首先,我们需要了解下面几个概念:

  • event_loop:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足条件发生的时候,就会调用对应的处理方法。
  • coroutine:中文翻译叫协程,在 Python 中常指代协程对象类型,我们可以将协程对象注册到时间循环中,它会被事件循环调用。我们可以使用 async 关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。
  • task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。
  • future:代表将来执行或没有执行的任务的结果,实际上和 task 没有本质区别。

另外,我们还需要了解 async/await 关键字,它是从 Python 3.5 才出现的,专门用于定义协程。其中,async 定义一个协程,await 用来挂起阻塞方法的执行。

协程初体验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def callback(task):
print('回调:',task.result())

async def execute(x):
print('数字:',x)
return x

coroutine = execute(1)
print('协程:',coroutine)
print('11111111')

# task = asyncio.ensure_future(coroutine) # 创建task方式1
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine) # 创建task方式2
task.add_done_callback(callback) # 设置回调
print('任务:',task)
loop.run_until_complete(task)
print('任务:',task)
print('任务结果:',task.result())
print('222222222')

多任务协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio
import requests
import time

start = time.time()

async def request():
# 声明一个支持异步的上下文管理器
async with aiohttp.ClientSession() as session:
res = await session.get('https://httpbin.org/delay/5')
print(f'当前时间:{datetime.datetime.now()}, status_code = {res.status}')
# return await res.text()
# return i

# python < 3.10
"""
tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
"""
if __name__ == '__main__':
# python >= 3.10
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
tasks = [loop.create_task(request()) for i in range(5)]
wait_coro = asyncio.wait(tasks)
loop.run_until_complete(wait_coro)

end = time.time()
print('Cost time:', end - start)

理论来说,确实是这样的,不过有个前提,那就是服务器在同一时刻接受无限次请求都能保证正常返回结果,也就是服务器无限抗压。另外,还要忽略 IO 传输时延,确实可以做到无限 task 一起执行且在预想时间内得到结果。但由于不同服务器处理的实现机制不同,可能某些服务器并不能承受这么高的并发,因此响应速度也会减慢。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def test(number):
start = time.time()

async def get(url):
# timeout=aiohttp.ClientTimeout(total=1) # 超时设置
session = aiohttp.ClientSession()
response = await session.get(url)
# response = await session.post(url,json=jsonParameter)
await response.text()
await session.close()
return response

async def request():
url = 'https://www.baidu.com/'
await get(url)

# python >= 3.10
# 事件
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# 任务
tasks = [asyncio.ensure_future(request()) for _ in range(number)]
# 事件和任务
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Number:', number, 'Cost time:', end - start)

for number in [1, 3, 5, 10, 15, 30, 50, 75, 100, 200, 500]:
test(number)

image-20230304154051837

我们增加了并发数量,但在服务器能承受高并发的前提下,其爬取速度几乎不太受影响。

综上所述,使用了异步请求之后,我们几乎可以在相同的时间内实现成百上千倍次的网络请求,把这个运用在爬虫中,速度提升可谓是非常可观了。

更多例子:https://blog.csdn.net/weixin_45203607/article/details/127396527

并发限制

由于aiohtt可以支持很高的并发量,如几万、几十万、上百万都是可以做到的。面对比如高的并发,目标网站很可能无法在短时间内响应,而且有瞬间将目标网站爬挂的危险,所以我们要控制一下爬取的并发量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
CONCURRENCY=5 # 并发5
URL = 'https://httpbin.org/delay/5' # 爬取网站

semaphore = asyncio.Semaphore(CONCURRENCY)
session = None

async def scrape_api():
async with semaphore:
print('爬取网站:', URL)
async with session.get(URL) as response:
await asyncio.sleep(1)
return await response.text()
async def main():
global session
session = aiohttp.ClientSession()
scrape_index_tasks = [asyncio.ensure_future(scrape_api()) for _ in range(10000)]
await asyncio.gather(*scrape_index_tasks)

if __name__ == '__main__':
# 事件
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main())

异步爬取实战

目标网站:https://spa5.scrape.center/

目标:

  • aiohttp异步爬取全站图书数据
  • 数据异步保存到MongoDB
1
2
# 安装库 => MongoDB异步存储
pip3 install motor

思路:

一个完善的异步爬虫应该能够充分利用资源进行全速爬取,其实现思路是维护一个动态变化的爬取队列,没产生一个task,就将其放入到爬取队列中,有专门的爬虫消费者从队列中获取task并执行,能做到在最大并发量的前提下充分利用等待时间进行额外的爬取处理。

思路整体较为繁琐,需要设计爬取队列,对调函数,消费者机制等。

此处简化实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#!/usr/bin/env python
# -*- coding:utf-8 -*- 
import asyncio
import json
import logging

import aiohttp
import requests
import pymongo


logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
# 每个页面的URL
INDEX_URL = 'https://spa5.scrape.center/api/book/?limit={limit}&offset={offset}'
# 每个详情的URL
DETAIL_URL = 'https://spa5.scrape.center/api/book/{id}'
# 每页10条记录
LIMIT = 18
# 爬取10页
TOTAL_PAGE = 10

CONCURRENCY = 5 # 并发数



# 并发初始化
semaphore = asyncio.Semaphore(CONCURRENCY)
session = None

# MongoDB信息
MONGO_CONNECTION_STRING = 'mongodb://localhost:27017'
MONGO_DB_NAME = 'books'
MONGO_COLLECTION_NAME = 'books'
# client = pymongo.MongoClient(MONGO_CONNECTION_STRING)
client = pymongo.MongoClient(MONGO_CONNECTION_STRING) # MongoDB异步存储
db = client[MONGO_DB_NAME] # 数据库
collection = db[MONGO_COLLECTION_NAME] # 表

# 爬取URL,获取响应JSON数据
async def scrape_api(url):
async with semaphore:
try:
logging.info('scraping %s...', url)
async with session.get(url) as response:
return await response.json()
except aiohttp.ClientError:
logging.error('error occurred while scraping %s', url, exc_info=True)

# 格式化列表也url
async def scrape_index(page):
url = INDEX_URL.format(limit=LIMIT, offset=LIMIT * (page - 1))
return await scrape_api(url)

# 格式化详情页url
async def scrape_detail(id):
url = DETAIL_URL.format(id=id)
data = await scrape_api(url)
# 保存数据
await save_data(data)


# 保存数据到MongoDB
async def save_data(data):
logging.info('saving data %s',data)
if data:
return await collection.update_one({
'id': data.get('id') # 根据 name 进行查询
}, {
'$set': data # 存在即更新,不存在即插入,防止数据库中出现同名的电影数据
}, upsert=True)

async def main():
global session
session = aiohttp.ClientSession()
scrape_index_tasks = [asyncio.ensure_future(scrape_index(page)) for page in range(1,TOTAL_PAGE+1) ]
results = await asyncio.gather(*scrape_index_tasks)
logging.info('results %s',json.dumps(results,ensure_ascii=False,indent=2))

# 详情页的id
ids = []
for index_data in results:
if not index_data: continue
for item in index_data.get('results'):
ids.append(item.get('id'))

scrape_detail_tasks = [asyncio.ensure_future(scrape_detail(id)) for id in ids]
await asyncio.wait(scrape_detail_tasks) # 和gather效果一下,返回略有差异
await session.close() # 全部执行完毕关闭session

if __name__ == '__main__':
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main())