我们抓取,我们采集,我们分析,我们挖掘

我们在做爬虫的过程中经常会遇到这样的情况,最初爬虫正常运行,正常抓取数据,一切看起来都是那么美好,然而一杯茶的功夫可能就会出现错误,比如 403 Forbidden,这时打开网页一看,可能会看到 “您的 IP 访问频率太高” 这样的提示。出现这种现象的原因是网站采取了一些反爬虫措施。比如,服务器会检测某个 IP 在单位时间内的请求次数,如果超过了这个阈值,就会直接拒绝服务,返回一些错误信息,这种情况可以称为封 IP。

既然服务器检测的是某个 IP 单位时间的请求次数,那么借助某种方式来伪装我们的 IP,让服务器识别不出是由我们本机发起的请求,不就可以成功防止封 IP 了吗?

一种有效的方式就是使用代理,后面会详细说明代理的用法。在这之前,需要先了解下代理的基本原理,它是怎样实现伪装 IP 的呢?

代理基础概念

1. 基本原理

代理实际上指的就是代理服务器,英文叫作 Proxy Server,它的功能是代理网络用户去取得网络信息。形象地说,它是网络信息的中转站。在我们正常请求一个网站时,是发送了请求给 Web 服务器,Web 服务器把响应传回给我们。如果设置了代理服务器,实际上就是在本机和服务器之间搭建了一个桥,此时本机不是直接向 Web 服务器发起请求,而是向代理服务器发出请求,请求会发送给代理服务器,然后由代理服务器再发送给 Web 服务器,接着由代理服务器再把 Web 服务器返回的响应转发给本机。这样我们同样可以正常访问网页,但这个过程中 Web 服务器识别出的真实 IP 就不再是我们本机的 IP 了,就成功实现了 IP 伪装,这就是代理的基本原理。

2. 代理的作用

那么,代理有什么作用呢?我们可以简单列举如下。

  • 突破自身 IP 访问限制,访问一些平时不能访问的站点。
  • 访问一些单位或团体内部资源。比如,使用教育网内地址段的免费代理服务器,就可以下载和上传对教育网开放的各类 FTP,以及查询、共享各类资料等。
  • 提高访问速度。通常,代理服务器都设置一个较大的硬盘缓冲区,当有外界的信息通过时,会同时将其保存到缓冲区中,而当其他用户再访问相同的信息时,则直接由缓冲区中取出信息,传给用户,以提高访问速度。
  • 隐藏真实 IP。上网者也可以通过这种方法隐藏自己的 IP,免受攻击。对于爬虫来说,我们用代理就是为了隐藏自身的 IP,防止自身的 IP 被封锁。

3. 爬虫代理

对于爬虫来说,由于爬虫爬取速度过快,在爬取过程中可能遇到同一个 IP 访问过于频繁的问题,此时网站就会让我们输入验证码登录或者直接封锁 IP,这样会给爬取带来极大的不便。

使用代理隐藏真实的 IP,让服务器误以为是代理服务器在请求自己。这样在爬取过程中通过不断更换代理,就不会被封锁,可以达到很好的爬取效果。

4. 代理分类

对代理进行分类时,既可以根据协议区分,也可以根据其匿名程度区分,下面总结如下。

根据协议区分

根据代理的协议,代理可以分为如下类别。

  • FTP 代理服务器。主要用于访问 FTP 服务器,一般有上传、下载以及缓存功能,端口一般为 21、2121 等。
  • HTTP 代理服务器。主要用于访问网页,一般有内容过滤和缓存功能,端口一般为 80、8080、3128 等。
  • SSL/TLS 代理。主要用于访问加密网站,一般有 SSL 或 TLS 加密功能(最高支持 128 位加密强度),端口一般为 443。
  • RTSP 代理。主要用于 Realplayer 访问 Real 流媒体服务器,一般有缓存功能,端口一般为 554。
  • Telnet 代理。主要用于 Telnet 远程控制(黑客入侵计算机时常用于隐藏身份),端口一般为 23。
  • POP3/SMTP 代理。主要用于 POP3/SMTP 方式收发邮件,一般有缓存功能,端口一般为 110/25。
  • SOCKS 代理。只是单纯传递数据包,不关心具体协议和用法,所以速度快很多,一般有缓存功能,端口一般为 1080。SOCKS 代理协议又分为 SOCKS4 和 SOCKS5,SOCKS4 协议只支持 TCP,而 SOCKS5 协议支持 TCP 和 UDP,还支持各种身份验证机制、服务器端域名解析等。简单来说,SOCKS4 能做到的 SOCKS5 都可以做到,但 SOCKS5 能做到的 SOCKS4 不一定能做到。

根据匿名程度区分

根据代理的匿名程度,代理可以分为如下类别。

  • 高度匿名代理:高度匿名代理会将数据包原封不动地转发,在服务端看来就好像真的是一个普通客户端在访问,而记录的 IP 是代理服务器的 IP。
  • 普通匿名代理:普通匿名代理会在数据包上做一些改动,服务端上有可能发现这是个代理服务器,也有一定几率追查到客户端的真实 IP。代理服务器通常会加入的 HTTP 头有 HTTP_VIAHTTP_X_FORWARDED_FOR
  • 透明代理:透明代理不但改动了数据包,还会告诉服务器客户端的真实 IP。这种代理除了能用缓存技术提高浏览速度,能用内容过滤提高安全性之外,并无其他显著作用,最常见的例子是内网中的硬件防火墙。
  • 间谍代理:间谍代理指组织或个人创建的,用于记录用户传输的数据,然后进行研究、监控等目的的代理服务器。

5. 常见代理设置

常见的代理设置如下:

  • 使用网上的免费代理,最好使用高匿代理,使用前抓取下来并筛选一下可用代理,也可以进一步维护一个代理池。
  • 使用付费代理服务,互联网上存在许多代理商,可以付费使用,其质量比免费代理好很多。
  • ADSL 拨号,拨一次号换一次 IP,稳定性高,也是一种比较有效的解决方案。
  • 蜂窝代理,即用 4G 或 5G 网卡等制作的代理。由于蜂窝网络用作代理的情形较少,因此整体被封锁的几率会较低,但搭建蜂窝代理的成本较高。

在后面,我们会详细介绍一些代理的使用方式。

6. 总结

本文介绍了代理的相关知识,这对后文我们进行一些反爬绕过的实现有很大的帮助,同时也为后文的一些抓包操作打下基础,需要好好理解。

本节由于涉及一些专业名词,本节的部分内容参考来源如下:

代理使用方法

前面我们介绍了多种请求库,如 urllib、requests、Selenium、Playwright 等用法,但是没有统一梳理代理的设置方法,本节我们来针对这些库来梳理下代理的设置方法。

1. 准备工作

先获取一个可用代理,代理就是 IP 地址和端口的组合,就是 <ip>:<port> 这样的格式。如果代理需要访问认证,那就还需要额外的用户名密码两个信息。

那怎么获取一个可用代理呢?

使用搜索引擎搜索 “代理” 关键字,可以看到许多代理服务网站,网站上会有很多免费或付费代理,比如快代理的免费 HTTP 代理:https://www.kuaidaili.com/free/ 上面就写了很多免费代理,但是这些免费代理大多数情况下并不一定稳定,所以比较靠谱的方法是购买付费代理。付费代理的各大代理商家都有套餐,数量不用多,稳定可用即可,我们可以自行选购。

image-20230307164700371

另外除了购买付费 HTTP 代理,我们也可以在本机配置一些代理软件,具体的配置方法可以参考 https://setup.scrape.center/proxy-client,软件运行之后会在本机创建 HTTP 或 SOCKS 代理服务,所以代理地址一般都是 127.0.0.1:<port> 这样的格式,不同的软件用的端口可能不同。

这里我的本机安装了一部代理软件,它会在本地 7890 端口上创建 HTTP 代理服务,即代理为 127.0.0.1:7890。另外,该软件还会在 7891 端口上创建 SOCKS 代理服务,即代理为 127.0.0.1:7891,所以只要设置了这个代理,就可以成功将本机 IP 切换到代理软件连接的服务器的 IP 了。

在本章下面的示例里,我使用上述代理来演示其设置方法,你也可以自行替换成自己的可用代理。

设置代理后,测试的网址是 http://httpbin.org/get,访问该链接我们可以得到请求的相关信息,其中返回结果的 origin 字段就是客户端的 IP,我们可以根据它来判断代理是否设置成功,即是否成功伪装了 IP。

好,接下来我们就来看下各个请求库的代理设置方法吧。

2. urllib

首先我们以最基础的 urllib 为例,来看一下代理的设置方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from urllib.error import URLError
from urllib.request import ProxyHandler, build_opener

proxy = '127.0.0.1:7890'
proxy_handler = ProxyHandler({
'http': 'http://' + proxy,
'https': 'http://' + proxy
})
opener = build_opener(proxy_handler)
try:
response = opener.open('https://httpbin.org/get')
print(response.read().decode('utf-8'))
except URLError as e:
print(e.reason)

运行结果如下:

image-20230307182538416

这里我们需要借助 ProxyHandler 设置代理,参数是字典类型,键名为协议类型,键值是代理。注意,此处代理前面需要加上协议,即 http:// 或者 https://,当请求的链接是 HTTP 协议的时候,会使用 http 键名对应的代理,当请求的链接是 HTTPS 协议的时候,会使用 https 键名对应的代理。不过这里我们把代理本身设置为了 HTTP 协议,即前缀统一设置为了 http://,所以不论访问 HTTP 还是 HTTPS 协议的链接,都会使用我们配置的 HTTP 协议的代理进行请求。

创建完 ProxyHandler 对象之后,我们需要利用 build_opener 方法传入该对象来创建一个 Opener,这样就相当于此 Opener 已经设置好代理了。接下来直接调用 Opener 对象的 open 方法,即可访问我们所想要的链接。

运行输出结果是一个 JSON,它有一个字段 origin,标明了客户端的 IP。验证一下,此处的 IP 确实为代理的 IP,并不是真实的 IP。这样我们就成功设置好代理,并可以隐藏真实 IP 了。

如果遇到需要认证的代理,我们可以用如下的方法设置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from urllib.error import URLError
from urllib.request import ProxyHandler, build_opener

proxy = 'username:password@127.0.0.1:7890'
proxy_handler = ProxyHandler({
'http': 'http://' + proxy,
'https': 'http://' + proxy
})
opener = build_opener(proxy_handler)
try:
response = opener.open('https://httpbin.org/get')
print(response.read().decode('utf-8'))
except URLError as e:
print(e.reason)

这里改变的只是 proxy 变量,只需要在代理前面加入代理认证的用户名密码即可,其中 username 就是用户名,password 为密码,例如 username 为 foo,密码为 bar,那么代理就是 foo:bar@127.0.0.1:7890

如果代理是 SOCKS5 类型,那么可以用如下方式设置代理:

1
2
3
4
5
6
7
8
9
10
11
12
import socks
import socket
from urllib import request
from urllib.error import URLError

socks.set_default_proxy(socks.SOCKS5, '127.0.0.1', 7891)
socket.socket = socks.socksocket
try:
response = request.urlopen('https://httpbin.org/get')
print(response.read().decode('utf-8'))
except URLError as e:
print(e.reason)

此处需要一个 socks 模块,可以通过如下命令安装:

1
pip3 install PySocks

这里需要本地运行一个 SOCKS5 代理,运行在 7891 端口,运行成功之后和上文 HTTP 代理输出结果是一样的:

1
2
3
4
5
6
7
8
9
10
11
{
"args": {},
"headers": {
"Accept-Encoding": "identity",
"Host": "httpbin.org",
"User-Agent": "Python-urllib/3.7",
"X-Amzn-Trace-Id": "Root=1-60e9a1b6-0a20b8a678844a0b2ab4e889"
},
"origin": "210.173.1.204",
"url": "https://httpbin.org/get"
}

结果的 origin 字段同样为代理的 IP,代理设置成功。

3. requests 的代理设置

对于 requests 来说,代理设置非常简单,我们只需要传入 proxies 参数即可。

这里以我本机的代理为例,来看下 requests 的 HTTP 代理设置,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
import requests

proxy = '127.0.0.1:7890'
proxies = {
'http': 'http://' + proxy,
'https': 'http://' + proxy,
}
try:
response = requests.get('https://httpbin.org/get', proxies=proxies)
print(response.text)
except requests.exceptions.ConnectionError as e:
print('Error', e.args)

运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
{
"args": {},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Host": "httpbin.org",
"User-Agent": "python-requests/2.22.0",
"X-Amzn-Trace-Id": "Root=1-5e8f358d-87913f68a192fb9f87aa0323"
},
"origin": "210.173.1.204",
"url": "https://httpbin.org/get"
}

和 urllib 一样,当请求的链接是 HTTP 协议的时候,会使用 http 键名对应的代理,当请求的链接是 HTTPS 协议的时候,会使用 https 键名对应的代理,不过这里统一使用了 HTTP 协议的代理。

运行结果中的 origin 若是代理服务器的 IP,则证明代理已经设置成功。

如果代理需要认证,那么在代理的前面加上用户名和密码即可,代理的写法就变成如下所示:

1
proxy = 'username:password@127.0.0.1:7890'

这里只需要将 usernamepassword 替换即可。

如果需要使用 SOCKS 代理,则可以使用如下方式来设置:

1
2
3
4
5
6
7
8
9
10
11
12
import requests

proxy = '127.0.0.1:7891'
proxies = {
'http': 'socks5://' + proxy,
'https': 'socks5://' + proxy
}
try:
response = requests.get('https://httpbin.org/get', proxies=proxies)
print(response.text)
except requests.exceptions.ConnectionError as e:
print('Error', e.args)

这里我们需要额外安装一个包 requests[socks],相关命令如下所示:

1
pip3 install "requests[socks]"

运行结果是完全相同的:

1
2
3
4
5
6
7
8
9
10
11
12
{
"args": {},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Host": "httpbin.org",
"User-Agent": "python-requests/2.22.0",
"X-Amzn-Trace-Id": "Root=1-5e8f364a-589d3cf2500fafd47b5560f2"
},
"origin": "210.173.1.204",
"url": "https://httpbin.org/get"
}

另外,还有一种设置方式,即使用 socks 模块,也需要像上文一样安装 socks 库。这种设置方法如下所示:

1
2
3
4
5
6
7
8
9
10
11
import requests
import socks
import socket

socks.set_default_proxy(socks.SOCKS5, '127.0.0.1', 7891)
socket.socket = socks.socksocket
try:
response = requests.get('https://httpbin.org/get')
print(response.text)
except requests.exceptions.ConnectionError as e:
print('Error', e.args)

使用这种方法也可以设置 SOCKS 代理,运行结果完全相同。相比第一种方法,此方法是全局设置的。我们可以在不同情况下选用不同的方法。

4. httpx 的代理设置

httpx 的用法本身就与 requests 的使用非常相似,所以其也是通过 proxies 参数来设置代理的,不过与 requests 不同的是,proxies 参数的键名不能再是 httphttps,而需要更改为 http://https://,其他的设置是一样的。

对于 HTTP 代理来说,设置方法如下:

1
2
3
4
5
6
7
8
9
10
11
import httpx

proxy = '127.0.0.1:7890'
proxies = {
'http://': 'http://' + proxy,
'https://': 'http://' + proxy,
}

with httpx.Client(proxies=proxies) as client:
response = client.get('https://httpbin.org/get')
print(response.text)

对于需要认证的代理,也是改下 proxy 的值即可:

1
proxy = 'username:password@127.0.0.1:7890'

这里只需要将 usernamepassword 替换即可。

运行结果和使用 requests 是类似的,结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
{
"args": {},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Host": "httpbin.org",
"User-Agent": "python-httpx/0.18.1",
"X-Amzn-Trace-Id": "Root=1-60e9a3ef-5527ff6320484f8e46d39834"
},
"origin": "210.173.1.204",
"url": "https://httpbin.org/get"
}

对于 SOCKS 代理,我们需要安装 httpx-socks 库,安装方法如下:

1
pip3 install "httpx-socks[asyncio]"

这样会同时安装同步和异步两种模式的支持。

对于同步模式,设置方法如下:

1
2
3
4
5
6
7
8
9
import httpx
from httpx_socks import SyncProxyTransport

transport = SyncProxyTransport.from_url(
'socks5://127.0.0.1:7891')

with httpx.Client(transport=transport) as client:
response = client.get('https://httpbin.org/get')
print(response.text)

这里我们需要设置一个 transport 对象,并配置 SOCKS 代理的地址,同时在声明 httpx 的 Client 对象的时候传入 transport 参数即可,运行结果和刚才是一样的。

对于异步模式,设置方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import httpx
import asyncio
from httpx_socks import AsyncProxyTransport

transport = AsyncProxyTransport.from_url(
'socks5://127.0.0.1:7891')

async def main():
async with httpx.AsyncClient(transport=transport) as client:
response = await client.get('https://httpbin.org/get')
print(response.text)

if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

和同步模式不同的是,transport 对象我们用的是 AsyncProxyTransport 而不是 SyncProxyTransport,同时需要将 Client 对象更改为 AsyncClient 对象,其他的不变,运行结果是一样的。

5. Selenium 的代理设置

Selenium 同样可以设置代理,这里以 Chrome 为例来介绍其设置方法。

对于无认证的代理,设置方法如下:

1
2
3
4
5
6
7
8
9
from selenium import webdriver

proxy = '127.0.0.1:7890'
options = webdriver.ChromeOptions()
options.add_argument('--proxy-server=http://' + proxy)
browser = webdriver.Chrome(options=options)
browser.get('https://httpbin.org/get')
print(browser.page_source)
browser.close()

运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"args": {},
"headers": {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9",
"Host": "httpbin.org",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36",
"X-Amzn-Trace-Id": "Root=1-5e8f39cd-60930018205fd154a9af39cc"
},
"origin": "210.173.1.204",
"url": "http://httpbin.org/get"
}

代理设置成功,origin 同样为代理 IP 的地址。

如果代理是认证代理,则设置方法相对比较繁琐,具体如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import zipfile

ip = '127.0.0.1'
port = 7890
username = 'foo'
password = 'bar'

manifest_json = """{"version":"1.0.0","manifest_version": 2,"name":"Chrome Proxy","permissions": ["proxy","tabs","unlimitedStorage","storage","<all_urls>","webRequest","webRequestBlocking"],"background": {"scripts": ["background.js"]
}
}
"""
background_js = """
var config = {
mode: "fixed_servers",
rules: {
singleProxy: {
scheme: "http",
host: "%(ip) s",
port: %(port) s
}
}
}

chrome.proxy.settings.set({value: config, scope: "regular"}, function() {});

function callbackFn(details) {
return {
authCredentials: {username: "%(username) s",
password: "%(password) s"
}
}
}

chrome.webRequest.onAuthRequired.addListener(
callbackFn,
{urls: ["<all_urls>"]},
['blocking']
)
""" % {'ip': ip, 'port': port, 'username': username, 'password': password}

plugin_file = 'proxy_auth_plugin.zip'
with zipfile.ZipFile(plugin_file, 'w') as zp:
zp.writestr("manifest.json", manifest_json)
zp.writestr("background.js", background_js)
options = Options()
options.add_argument("--start-maximized")
options.add_extension(plugin_file)
browser = webdriver.Chrome(options=options)
browser.get('https://httpbin.org/get')
print(browser.page_source)
browser.close()

这里需要在本地创建一个 manifest.json 配置文件和 background.js 脚本来设置认证代理。运行代码之后,本地会生成一个 proxy_auth_plugin.zip 文件来保存当前配置。

运行结果和上例一致,origin 同样为代理 IP。

SOCKS 代理的设置也比较简单,把对应的协议修改为 socks5 即可,如无密码认证的代理设置方法为:

1
2
3
4
5
6
7
8
9
from selenium import webdriver

proxy = '127.0.0.1:7891'
options = webdriver.ChromeOptions()
options.add_argument('--proxy-server=socks5://' + proxy)
browser = webdriver.Chrome(options=options)
browser.get('https://httpbin.org/get')
print(browser.page_source)
browser.close()

运行结果是一样的。

6. aiohttp 的代理设置

对于 aiohttp 来说,我们可以通过 proxy 参数直接设置。HTTP 代理设置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio
import aiohttp

proxy = 'http://127.0.0.1:7890'

async def main():
async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/get', proxy=proxy) as response:
print(await response.text())


if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

如果代理有用户名和密码,像 requests 一样,把 proxy 修改为如下内容:

1
proxy = 'http://username:password@127.0.0.1:7890'

这里只需要将 usernamepassword 替换即可。

对于 SOCKS 代理,我们需要安装一个支持库 aiohttp-socks,其安装命令如下:

1
pip3 install aiohttp-socks

我们可以借助于这个库的 ProxyConnector 来设置 SOCKS 代理,其代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio
import aiohttp
from aiohttp_socks import ProxyConnector

connector = ProxyConnector.from_url('socks5://127.0.0.1:7891')

async def main():
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get('https://httpbin.org/get') as response:
print(await response.text())


if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

运行结果是一样的。

另外,这个库还支持设置 SOCKS4、HTTP 代理以及对应的代理认证,可以参考其官方介绍。

7. Pyppeteer 的代理设置

对于 Pyppeteer 来说,由于其默认使用的是类似 Chrome 的 Chromium 浏览器,因此其设置方法和 Selenium 的 Chrome 一样,如 HTTP 无认证代理设置方法都是通过 args 来设置的,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio
from pyppeteer import launch

proxy = '127.0.0.1:7890'

async def main():
browser = await launch({'args': ['--proxy-server=http://' + proxy], 'headless': False})
page = await browser.newPage()
await page.goto('https://httpbin.org/get')
print(await page.content())
await browser.close()


if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"args": {},
"headers": {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Host": "httpbin.org",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3494.0 Safari/537.36",
"X-Amzn-Trace-Id": "Root=1-5e8f442c-12b1ed7865b049007267a66c"
},
"origin": "210.173.1.204",
"url": "https://httpbin.org/get"
}

同样可以看到设置成功。

SOCKS 代理也一样,只需要将协议修改为 socks5 即可,代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio
from pyppeteer import launch

proxy = '127.0.0.1:7891'

async def main():
browser = await launch({'args': ['--proxy-server=socks5://' + proxy], 'headless': False})
page = await browser.newPage()
await page.goto('https://httpbin.org/get')
print(await page.content())
await browser.close()

if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

运行结果也是一样的。

8. Playwright 的代理设置

相对 Selenium 和 Pyppeteer 来说,Playwright 的代理设置更加方便,其预留了一个 proxy 参数,可以在启动 Playwright 的时候设置。

对于 HTTP 代理来说,可以这样设置:

1
2
3
4
5
6
7
8
9
10
from playwright.sync_api import sync_playwright

with sync_playwright() as p:
browser = p.chromium.launch(proxy={
'server': 'http://127.0.0.1:7890'
})
page = browser.new_page()
page.goto('https://httpbin.org/get')
print(page.content())
browser.close()

在调用 launch 方法的时候,我们可以传一个 proxy 参数,是一个字典。字典有一个必填的字段叫做 server,这里我们可以直接填写 HTTP 代理的地址即可。

运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
"args": {},
"headers": {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Host": "httpbin.org",
"Sec-Ch-Ua": "\" Not A;Brand\";v=\"99\", \"Chromium\";v=\"92\"",
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4498.0 Safari/537.36",
"X-Amzn-Trace-Id": "Root=1-60e99eef-4fa746a01a38abd469ecb467"
},
"origin": "210.173.1.204",
"url": "https://httpbin.org/get"
}

对于 SOCKS 代理,设置方法也是完全一样的,我们只需要把 server 字段的值换成 SOCKS 代理的地址即可:

1
2
3
4
5
6
7
8
9
10
from playwright.sync_api import sync_playwright

with sync_playwright() as p:
browser = p.chromium.launch(proxy={
'server': 'socks5://127.0.0.1:7891'
})
page = browser.new_page()
page.goto('https://httpbin.org/get')
print(page.content())
browser.close()

运行结果和刚才也是完全一样的。

对于有用户名和密码的代理,Playwright 的设置也非常简单,我们只需要在 proxy 参数额外设置 username 和 password 字段即可,假如用户名和密码分别是 foo 和 bar,则设置方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
from playwright.sync_api import sync_playwright

with sync_playwright() as p:
browser = p.chromium.launch(proxy={
'server': 'http://127.0.0.1:7890',
'username': 'foo',
'password': 'bar'
})
page = browser.new_page()
page.goto('https://httpbin.org/get')
print(page.content())
browser.close()

这样我们就能非常方便地为 Playwright 实现认证代理的设置。

9. 总结

以上我们就总结了各个请求库的代理使用方式,各种库的设置方法大同小异,学会了这些方法之后,以后如果遇到封 IP 的问题,我们可以轻松通过加代理的方式来解决。

本节代码:https://github.com/Python3WebSpider/ProxyTest

代理池

我们在上一节中了解了各个请求库设置代理的各个方法,但是如何实时高效地获取到大量可用的代理是一个问题。

首先,在互联网上有大量公开的免费代理。当然,我们也可以购买付费的代理 IP,但是代理不论是免费的还是付费的,都不能保证是可用的,因为此 IP 可能被其他人用来爬取同样的目标站点而被封禁,或者代理服务器突然发生故障或网络繁忙。一旦我们选用了一个不可用的代理,这势必会影响爬虫的工作效率。

所以,我们需要提前做筛选,将不可用的代理剔除掉,保留可用代理。

那么,怎么实现呢?这就需要借助于一个叫作代理池的东西了。

接下来,本节就来介绍一下如何搭建一个高效易用的代理池。

1. 准备工作

这里代理池的存储需要借助于 Redis,因此需要额外安装它。总体来说,本节需要的环境如下:

  • redis 本地远程都可

  • 安装好一些必要的库,包括 aiohttp、requests、redis-py、pyquery、Flask、loguru 等,安装命令如下:

    1
    pip3 install aiohttp requests redis pyquery flask loguru

做好了如上准备工作,我们便可以开始实现或运行本节所讲的代理池了。

2. 代理池的目标

我们需要做到下面几个目标来实现易用高效的代理池。

代理池基本模块分为 4 部分:存储模块、获取模块、检测模块和接口模块,其功能如下:

  • 存储模块:负责存储抓取下来的代理。首先要保证代理不重复,要标识代理的可用情况,还要动态实时处理每个代理,所以一种比较高效和方便的存储方式就是使用 Redis 的 Sorted Set,即有序集合。
  • 获取模块:需要定时在各大代理网站抓取代理。代理既可以是免费公开代理,也可以是付费代理,代理的形式都是 IP 加端口。此模块尽量从不同来源获取,尽量抓取高匿代理,抓取成功之后将可用代理保存到数据库中。
  • 检测模块:需要定时检测数据库中的代理。这里需要设置一个检测链接,最好是爬取哪个网站就检测哪个网站,这样更加有针对性。如果要做一个通用型的代理,可以设置百度等链接来检测。另外,我们需要标识每一个代理的状态,如设置分数标识,100 分代表可用,分数越少代表越不可用。检测一次,如果代理可用,我们可以将分数标识立即设置为 100 满分,也可以在原基础上加 1 分;如果代理不可用,可以将分数标识减 1 分,当分数减到一定阈值后,代理就直接从数据库移除。通过这样标识分数,我们就可以辨别代理的可用情况,选用的时候会更有针对性。
  • 接口模块:需要用 API 来提供对外服务的接口。其实我们可以直接连接数据库来取对应的数据,但是这样就需要知道数据库的连接信息,并且要配置连接,而比较安全和方便的方式就是提供一个 Web API 接口,我们通过访问接口即可拿到可用代理。另外,由于可用代理可能有多个,所以我们可以设置一个随机返回某个可用代理的接口,这样就能保证每个可用代理都可以取到,实现负载均衡。

以上内容是设计代理的一些基本思路。接下来,我们设计整体的架构,然后用代码实现代理池。

3. 代理池的架构

根据上文的描述,代理池的架构如图所示。

image-20231003073321493

图中所示的代理池分为 4 个模块:存储模块、获取模块、检测模块和接口模块:

  • 存储模块使用 Redis 的有序集合,用来做代理的去重和状态标识,同时它也是中心模块和基础模块,用于将其他模块串联起来。
  • 获取模块定时从代理网站获取代理,将获取的代理传递给存储模块,并保存到数据库。
  • 检测模块定时通过存储模块获取所有代理,并对代理进行检测,根据不同的检测结果对代理设置不同的标识。
  • 接口模块通过 Web API 提供服务接口,接口通过连接数据库并通过 Web 形式返回可用的代理。

4. 代理池的实现

接下来,我们分别用代码来实现一下这 4 个模块。

注意:完整的代理池代码量较大,因此本节的代码我们不再一步步跟着编写,最后去了解源码即可,源码地址为:https://github.com/Python3WebSpider/ProxyPool

存储模块

这里我们使用 Redis 的有序集合,集合中的每一个元素都是不重复的。对于代理池来说,集合中的元素就变成了一个个代理,也就是 IP 加端口的形式,如 60.207.237.111:8888。另外,有序集合的每一个元素都有一个分数字段,分数是可以重复的,既可以是浮点数类型,也可以是整数类型。该集合会根据每一个元素的分数对集合进行排序,数值小的排在前面,数值大的排在后面,这样就可以实现集合元素的排序了。

对于代理池来说,这个分数可以作为判断一个代理是否可用的标志:100 为最高分,代表最可用;0 为最低分,代表最不可用。如果要获取可用代理,可以从代理池中随机获取分数最高的代理。注意这里是随机,这样可以保证每个可用代理都会被调用到。

分数是我们判断代理稳定性的重要标准。设置分数的规则如下所示。

  • 分数 100 为可用,检测器会定时循环检测每个代理的可用情况。一旦检测到有可用的代理,就立即置为 100;如果检测到不可用,就将分数减 1,分数减至 0 后代理移除。
  • 新获取的代理的分数为 10,如果测试可行,分数立即置为 100,不可行则将分数减 1,分数减至 0 后代理移除。

这只是一种解决方案,当然可能还有更合理的方案。之所以设置此方案,有如下几个原因。

  • 在检测到代理可用时,分数立即置为 100,这样可以保证所有可用代理有更大的机会被获取到。你可能会问,为什么不将分数加 1 而是直接将其设为最高值 100 呢?设想一下,有的代理是从各大免费公开代理网站获取的,常常一个代理并没有那么稳定,平均 5 次请求可能有 2 次成功,3 次失败。如果按照这种方式来设置分数,那么这个代理几乎不可能达到一个高的分数,也就是说即便它有时是可用的,但是筛选的分数最高,那这样的代理几乎不可能被取到。如果想追求代理稳定性,可以用上述方法,这种方法可确保分数最高的代理一定是最稳定可用的。所以,这里我们采取 “可用即设置 100” 的方法,确保只要可用的代理都可以被获取到。
  • 在检测到代理不可用时,分数减 1,分数减至 0 后,代理移除。这样一个有效代理如果被移除,需要连续不断失败 100 次。也就是说,当一个可用代理尝试了 100 次都失败了,就一直减分直到移除,一旦成功,就重新置回 100。尝试机会越多,这个代理拯救回来的机会越多,这样就不容易将曾经的一个可用代理丢弃,因为代理不可用的原因很可能是网络繁忙或者其他人用此代理请求太过频繁,所以这里将分数设为 100。
  • 将新获取的代理的分数设置为 10,如果它不可用,分数就减 1,直到减到 0 就移除;如果代理可用,分数就置为 100。由于很多代理是从免费网站获取的,所以新获取的代理无效的比例非常高,可能可用的代理不足 10%。这里我们将分数设置为 10,检测的机会没有可用代理的 100 次那么多,这也可以适当减少开销。

上述代理分数的设置思路不一定是最优思路,但据个人实测,它的实用性还是比较强的。

这里首先给出存储模块的实现代码,见 https://github.com/Python3WebSpider/ProxyPool/tree/master/proxypool/storages,建议直接对照源码阅读。

在代码中,我们定义了一个类来操作数据库的有序集合,定义了一些方法来实现分数的设置、代理的获取等。其核心实现代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import redis
from proxypool.exceptions import PoolEmptyException
from proxypool.schemas.proxy import Proxy
from proxypool.setting import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_KEY, PROXY_SCORE_MAX, PROXY_SCORE_MIN, \
PROXY_SCORE_INIT
from random import choice
from typing import List
from loguru import logger
from proxypool.utils.proxy import is_valid_proxy, convert_proxy_or_proxies


REDIS_CLIENT_VERSION = redis.__version__
IS_REDIS_VERSION_2 = REDIS_CLIENT_VERSION.startswith('2.')


class RedisClient(object):
"""
redis connection client of proxypool
"""

def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, **kwargs):
"""
init redis client
:param host: redis host
:param port: redis port
:param password: redis password
"""
self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True, **kwargs)

def add(self, proxy: Proxy, score=PROXY_SCORE_INIT) -> int:
"""
add proxy and set it to init score
:param proxy: proxy, ip:port, like 8.8.8.8:88
:param score: int score
:return: result
"""
if not is_valid_proxy(f'{proxy.host}:{proxy.port}'):
logger.info(f'invalid proxy {proxy}, throw it')
return
if not self.exists(proxy):
if IS_REDIS_VERSION_2:
return self.db.zadd(REDIS_KEY, score, proxy.string())
return self.db.zadd(REDIS_KEY, {proxy.string(): score})

def random(self) -> Proxy:
"""
get random proxy
firstly try to get proxy with max score
if not exists, try to get proxy by rank
if not exists, raise error
:return: proxy, like 8.8.8.8:8
"""
# try to get proxy with max score
proxies = self.db.zrangebyscore(REDIS_KEY, PROXY_SCORE_MAX, PROXY_SCORE_MAX)
if len(proxies):
return convert_proxy_or_proxies(choice(proxies))
# else get proxy by rank
proxies = self.db.zrevrange(REDIS_KEY, PROXY_SCORE_MIN, PROXY_SCORE_MAX)
if len(proxies):
return convert_proxy_or_proxies(choice(proxies))
# else raise error
raise PoolEmptyException

def decrease(self, proxy: Proxy) -> int:
"""
decrease score of proxy, if small than PROXY_SCORE_MIN, delete it
:param proxy: proxy
:return: new score
"""
score = self.db.zscore(REDIS_KEY, proxy.string())
# current score is larger than PROXY_SCORE_MIN
if score and score > PROXY_SCORE_MIN:
logger.info(f'{proxy.string()} current score {score}, decrease 1')
if IS_REDIS_VERSION_2:
return self.db.zincrby(REDIS_KEY, proxy.string(), -1)
return self.db.zincrby(REDIS_KEY, -1, proxy.string())
# otherwise delete proxy
else:
logger.info(f'{proxy.string()} current score {score}, remove')
return self.db.zrem(REDIS_KEY, proxy.string())

def exists(self, proxy: Proxy) -> bool:
"""
if proxy exists
:param proxy: proxy
:return: if exists, bool
"""
return not self.db.zscore(REDIS_KEY, proxy.string()) is None

def max(self, proxy: Proxy) -> int:
"""
set proxy to max score
:param proxy: proxy
:return: new score
"""
logger.info(f'{proxy.string()} is valid, set to {PROXY_SCORE_MAX}')
if IS_REDIS_VERSION_2:
return self.db.zadd(REDIS_KEY, PROXY_SCORE_MAX, proxy.string())
return self.db.zadd(REDIS_KEY, {proxy.string(): PROXY_SCORE_MAX})

def count(self) -> int:
"""
get count of proxies
:return: count, int
"""
return self.db.zcard(REDIS_KEY)

def all(self) -> List[Proxy]:
"""
get all proxies
:return: list of proxies
"""
return convert_proxy_or_proxies(self.db.zrangebyscore(REDIS_KEY, PROXY_SCORE_MIN, PROXY_SCORE_MAX))

def batch(self, start, end) -> List[Proxy]:
"""
get batch of proxies
:param start: start index
:param end: end index
:return: list of proxies
"""
return convert_proxy_or_proxies(self.db.zrevrange(REDIS_KEY, start, end - 1))


if __name__ == '__main__':
conn = RedisClient()
result = conn.random()
print(result)

首先,我们定义了一些常量,如 PROXY_SCORE_MAXPROXY_SCORE_MINPROXY_SCORE_INIT 分别代表最大分数、最小分数、初始分数。REDIS_HOSTREDIS_PORTREDIS_PASSWORD 分别代表了 Redis 的连接信息,即地址、端口和密码。REDIS_KEY 是有序集合的键名,我们可以通过它来获取代理存储所使用的有序集合。

RedisClient 这个类可以用来操作 Redis 的有序集合,其中定义了一些方法来对集合中的元素进行处理,它的主要功能如下所示。

  • __init__ 方法是初始化的方法,其参数是 Redis 的连接信息,默认的连接信息已经定义为常量。我们在 __init__ 方法中初始化了 StrictRedis 类,建立了 Redis 连接。
  • add 方法用于向数据库添加代理并设置分数,默认的分数是 PROXY_SCORE_INIT,也就是 10,返回结果是添加的结果。
  • random 方法是随机获取代理的方法。首先获取 100 分的代理,然后随机选择一个返回。如果不存在 100 分的代理,则此方法按照排名来获取,选取前 100 名,然后随机选择一个返回,否则抛出异常。
  • decrease 方法是在代理检测无效的时候设置分数减 1 的方法,代理传入后,此方法将代理的分数减 1,如果分数达到最低值,那么代理就删除。
  • exists 方法用于判断代理是否存在集合中。
  • max 方法用于将代理的分数设置为 PROXY_SCORE_MAX,即 100,也就是代理有效时的设置。
  • count 方法用于返回当前集合的元素个数。
  • all 方法返回所有的代理列表,供检测使用。

定义好这些方法后,我们可以在后续的模块中调用此类来连接和操作数据库。如果要获取随机可用的代理,只需要调用 random 方法即可,得到的就是随机的可用代理。

获取模块

获取模块主要是为了从各大网站抓取代理并调用存储模块进行保存,代码实现见 https://github.com/Python3WebSpider/ProxyPool/tree/master/proxypool/crawlers。

获取模块的逻辑相对简单,比如我们可以定义一些抓取代理的方法,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from proxypool.crawlers.base import BaseCrawler
from proxypool.schemas.proxy import Proxy
import re


MAX_PAGE = 5
BASE_URL = 'http://www.ip3366.net/free/?stype=1&page={page}'


class IP3366Crawler(BaseCrawler):
"""
ip3366 crawler, http://www.ip3366.net/
"""
urls = [BASE_URL.format(page=i) for i in range(1, 8)]

def parse(self, html):
"""
parse html file to get proxies
:return:
"""
ip_address = re.compile('<tr>\s*<td>(.*?)</td>\s*<td>(.*?)</td>')
# \s * 匹配空格,起到换行作用
re_ip_address = ip_address.findall(html)
for address, port in re_ip_address:
proxy = Proxy(host=address.strip(), port=int(port.strip()))
yield proxy

这里定义了一个代理类 Crawler,用来抓取某一网站的代理,这里抓取的是 IP3366 的公开代理,通过 parse 方法来解析页面的源码并构造一个个 Proxy 对象返回即可。

另外,在其父类 BaseCrawler 里面定义了通用的页面抓取方法,它可以读取子类里面定义的 urls 全局变量并进行爬取,然后调用子类的 parse 方法来解析页面,代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from retrying import retry
import requests
from loguru import logger


class BaseCrawler(object):
urls = []

@retry(stop_max_attempt_number=3, retry_on_result=lambda x: x is None)
def fetch(self, url, **kwargs):
try:
response = requests.get(url, **kwargs)
if response.status_code == 200:
return response.text
except requests.ConnectionError:
return

@logger.catch
def crawl(self):
"""
crawl main method
"""
for url in self.urls:
logger.info(f'fetching {url}')
html = self.fetch(url)
for proxy in self.parse(html):
logger.info(f'fetched proxy {proxy.string()} from {url}')
yield proxy

如果要扩展一个代理的 Crawler,只需要集成 BaseCrawler 并实现 parse 方法即可,扩展性较好。

因此,这一个个的 Crawler 就可以针对各个不同的代理网站进行代理的抓取。最后,有一个统一的方法将 Crawler 汇总起来,遍历调用即可。

如何汇总呢?这里我们可以检测代码只要定义有 BaseCrawler 的子类就算一个有效的代理 Crawler,可以直接通过遍历 Python 文件包的方式来获取,代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
import pkgutil
from .base import BaseCrawler
import inspect

# load classes subclass of BaseCrawler
classes = []
for loader, name, is_pkg in pkgutil.walk_packages(__path__):
module = loader.find_module(name).load_module(name)
for name, value in inspect.getmembers(module):
globals()[name] = value
if inspect.isclass(value) and issubclass(value, BaseCrawler) and value is not BaseCrawler:
classes.append(value)
__all__ = __ALL__ = classes

这里我们调用了 walk_packages 方法,遍历了整个 crawlers 模块下的类,并判断它是 BaseCrawler 的子类,那就将其添加到结果中并返回。

最后,只要将 classes 遍历并依次实例化,调用其 crawl 方法即可完成代理的爬取和提取,代码实现见 https://github.com/Python3WebSpider/ProxyPool/blob/master/proxypool/processors/getter.py。

检测模块

我们已经成功将各个网站的代理获取下来了,现在需要一个检测模块来对所有代理进行多轮检测。代理检测可用,分数就设置为 100,代理不可用,分数就减 1,这样可以实时改变每个代理的可用情况。如果要获取有效代理,只需要获取分数高的代理即可。

由于代理的数量非常多,为了提高代理的检测效率,这里使用异步请求库 aiohttp 来检测。

requests 作为一个同步请求库,我们在发出一个请求之后,程序需要等待网页加载完成之后才能继续执行。也就是这个过程会阻塞等待响应,如果服务器响应非常慢,比如一个请求等待十几秒,那么我们使用 requests 完成一个请求就会需要十几秒的时间,程序也不会继续往下执行,而在这十几秒的时间里,程序其实完全可以去做其他的事情,比如调度其他的请求或者进行网页解析等。

对于响应速度比较快的网站来说,requests 同步请求和 aiohttp 异步请求的效果差距没那么大。可对于检测代理来说,检测一个代理一般需要十多秒甚至几十秒的时间,这时候使用 aiohttp 异步请求库的优势就大大体现出来了,效率可能会提高几十倍不止。

所以,我们的代理检测使用异步请求库 aiohttp,实现示例如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import asyncio
import aiohttp
from loguru import logger
from proxypool.schemas import Proxy
from proxypool.storages.redis import RedisClient
from proxypool.setting import TEST_TIMEOUT, TEST_BATCH, TEST_URL, TEST_VALID_STATUS
from aiohttp import ClientProxyConnectionError, ServerDisconnectedError, ClientOSError, ClientHttpProxyError
from asyncio import TimeoutError

EXCEPTIONS = (
ClientProxyConnectionError,
ConnectionRefusedError,
TimeoutError,
ServerDisconnectedError,
ClientOSError,
ClientHttpProxyError
)

class Tester(object):
"""
tester for testing proxies in queue
"""

def __init__(self):
"""
init redis
"""
self.redis = RedisClient()
self.loop = asyncio.get_event_loop()

async def test(self, proxy: Proxy):
"""
test single proxy
:param proxy: Proxy object
:return:
"""
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
try:
logger.debug(f'testing {proxy.string()}')
async with session.get(TEST_URL, proxy=f'http://{proxy.string()}', timeout=TEST_TIMEOUT,
allow_redirects=False) as response:
if response.status in TEST_VALID_STATUS:
self.redis.max(proxy)
logger.debug(f'proxy {proxy.string()} is valid, set max score')
else:
self.redis.decrease(proxy)
logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
except EXCEPTIONS:
self.redis.decrease(proxy)
logger.debug(f'proxy {proxy.string()} is invalid, decrease score')

@logger.catch
def run(self):
"""
test main method
:return:
"""
# event loop of aiohttp
logger.info('stating tester...')
count = self.redis.count()
logger.debug(f'{count} proxies to test')
for i in range(0, count, TEST_BATCH):
# start end end offset
start, end = i, min(i + TEST_BATCH, count)
logger.debug(f'testing proxies from {start} to {end} indices')
proxies = self.redis.batch(start, end)
tasks = [self.test(proxy) for proxy in proxies]
# run tasks using event loop
self.loop.run_until_complete(asyncio.wait(tasks))


if __name__ == '__main__':
tester = Tester()
tester.run()

这里定义了一个类 Tester__init__ 方法中建立了一个 RedisClient 对象,供该对象中其他方法使用。接下来,定义了一个 test 方法,这个方法用来检测单个代理的可用情况,其参数就是被检测的代理。注意,test 方法前面加了 async 关键词,这代表这个方法是异步的。方法内部首先创建了 aiohttp 的 ClientSession 对象,可以直接调用该对象的 get 方法来访问页面。

测试链接在这里定义为常量 TEST_URL。如果针对某个网站有抓取需求,建议将 TEST_URL 设置为目标网站的地址,因为在抓取过程中,代理本身可能是可用的,但是该代理的 IP 已经被目标网站封掉了。例如,某些代理可以正常访问百度等页面,但是对知乎来说可能就被封了,所以我们可以将 TEST_URL 设置为知乎的某个页面的链接。当请求失败、代理被封时,分数自然会减下来,失效的代理就不会被取到了。

如果想做一个通用的代理池,则不需要专门设置 TEST_URL,既可以将其设置为一个不会封 IP 的网站,也可以设置为百度这类响应稳定的网站。

我们还定义了 TEST_VALID_STATUS 变量,这个变量是一个列表形式,包含了正常的状态码,如可以定义成 [200]。当然,某些目标网站可能会出现其他的状态码,可以自行配置。

程序在获取响应后需要判断响应的状态,如果状态码在 TEST_VALID_STATUS 列表里,则代表代理可用,可以调用 RedisClientmax 方法将代理分数设为 100,否则调用 decrease 方法将代理分数减 1,如果出现异常,也同样将代理分数减 1。

另外,我们设置了批量测试的最大值 TEST_BATCH,也就是一批测试最多 TEST_BATCH 个,这可以避免代理池过大时一次性测试全部代理导致内存开销过大的问题。当然,也可以用信号量来实现并发控制。

随后,在 run 方法里面获取了所有的代理列表,使用 aiohttp 分配任务,启动运行。这样在不断的运行过程中,代理池中无效代理的分数会一直被减 1,直至被清除,有效的代理则会一直保持 100 分,供随时取用。

这样测试模块的逻辑就完成了。

接口模块

通过上述 3 个模块,我们已经可以做到代理的获取、检测和更新,数据库就会以有序集合的形式存储各个代理及其对应的分数,分数 100 代表可用,分数越小代表越不可用。

但是我们怎样方便地获取可用代理呢?可以用 RedisClient 类直接连接 Redis,然后调用 random 方法。这样做没问题,效率很高,但是会有几个弊端。

  • 如果其他人使用这个代理池,他需要知道 Redis 连接的用户名和密码信息,这样很不安全。
  • 如果代理池需要部署在远程服务器上运行,而远程服务器的 Redis 只允许本地连接,那么我们就不能远程直连 Redis 来获取代理。
  • 如果爬虫所在的主机没有连接 Redis 模块,或者爬虫不是由 Python 语言编写的,那么我们就无法使用 RedisClient 来获取代理。
  • 如果 RedisClient 类或者数据库结构有更新,那么爬虫端必须同步这些更新,这样非常麻烦。

综上考虑,为了使代理池可以作为一个独立服务运行,我们最好增加一个接口模块,并以 Web API 的形式暴露可用代理。

这样一来,获取代理只需要请求接口即可,以上的几个缺点也可以避免。

我们使用一个比较轻量级的库 Flask 来实现这个接口模块,实现示例如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from flask import Flask, g
from proxypool.storages.redis import RedisClient
from proxypool.setting import API_HOST, API_PORT, API_THREADED

__all__ = ['app']

app = Flask(__name__)

def get_conn():
"""
get redis client object
:return:
"""
if not hasattr(g, 'redis'):
g.redis = RedisClient()
return g.redis

@app.route('/')
def index():
"""
get home page, you can define your own templates
:return:
"""
return '<h2>Welcome to Proxy Pool System</h2>'

@app.route('/random')
def get_proxy():
"""
get a random proxy
:return: get a random proxy
"""
conn = get_conn()
return conn.random().string()

@app.route('/count')
def get_count():
"""
get the count of proxies
:return: count, int
"""
conn = get_conn()
return str(conn.count())

if __name__ == '__main__':
app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED)

这里我们声明了一个 Flask 对象,定义了 3 个接口,分别是首页、随机代理页和获取数量页。

运行之后,Flask 会启动一个 Web 服务,我们只需要访问对应的接口即可获取到可用代理。

调度模块

调度模块就是调用上面所定义的 3 个模块,将这 3 个模块通过多进程的形式运行起来,示例如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import time
import multiprocessing
from proxypool.processors.server import app
from proxypool.processors.getter import Getter
from proxypool.processors.tester import Tester
from proxypool.setting import CYCLE_GETTER, CYCLE_TESTER, API_HOST, API_THREADED, API_PORT, ENABLE_SERVER, \
ENABLE_GETTER, ENABLE_TESTER, IS_WINDOWS
from loguru import logger

if IS_WINDOWS:
multiprocessing.freeze_support()

tester_process, getter_process, server_process = None, None, None

class Scheduler():
"""
scheduler
"""

def run_tester(self, cycle=CYCLE_TESTER):
"""
run tester
"""
if not ENABLE_TESTER:
logger.info('tester not enabled, exit')
return
tester = Tester()
loop = 0
while True:
logger.debug(f'tester loop {loop} start...')
tester.run()
loop += 1
time.sleep(cycle)

def run_getter(self, cycle=CYCLE_GETTER):
"""
run getter
"""
if not ENABLE_GETTER:
logger.info('getter not enabled, exit')
return
getter = Getter()
loop = 0
while True:
logger.debug(f'getter loop {loop} start...')
getter.run()
loop += 1
time.sleep(cycle)

def run_server(self):
"""
run server for api
"""
if not ENABLE_SERVER:
logger.info('server not enabled, exit')
return
app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED)

def run(self):
global tester_process, getter_process, server_process
try:
logger.info('starting proxypool...')
if ENABLE_TESTER:
tester_process = multiprocessing.Process(target=self.run_tester)
logger.info(f'starting tester, pid {tester_process.pid}...')
tester_process.start()

if ENABLE_GETTER:
getter_process = multiprocessing.Process(target=self.run_getter)
logger.info(f'starting getter, pid{getter_process.pid}...')
getter_process.start()

if ENABLE_SERVER:
server_process = multiprocessing.Process(target=self.run_server)
logger.info(f'starting server, pid{server_process.pid}...')
server_process.start()

tester_process.join()
getter_process.join()
server_process.join()
except KeyboardInterrupt:
logger.info('received keyboard interrupt signal')
tester_process.terminate()
getter_process.terminate()
server_process.terminate()
finally:
# must call join method before calling is_alive
tester_process.join()
getter_process.join()
server_process.join()
logger.info(f'tester is {"alive" if tester_process.is_alive() else "dead"}')
logger.info(f'getter is {"alive" if getter_process.is_alive() else "dead"}')
logger.info(f'server is {"alive" if server_process.is_alive() else "dead"}')
logger.info('proxy terminated')


if __name__ == '__main__':
scheduler = Scheduler()
scheduler.run()

3 个常量 ENABLE_TESTERENABLE_GETTERENABLE_SERVER 都是布尔类型,表示测试模块、获取模块和接口模块的开关,如果都为 True,则代表模块开启。

启动入口是 run 方法,这个方法分别判断 3 个模块的开关。如果开关开启,启动时程序就新建一个 Process 进程,设置好启动目标,然后调用 start 方法运行,这样 3 个进程就可以并行执行,互不干扰。

3 个调度方法的结构也非常清晰。比如,run_tester 方法用来调度测试模块。首先声明一个 Tester 对象,然后进入死循环不断循环调用其 run 方法,执行完一轮之后就休眠一段时间,休眠结束之后重新再执行。这里休眠时间也定义为一个常量,如 20 秒,即每隔 20 秒进行一次代理检测。

最后,只需要调用 Schedulerrun 方法即可启动整个代理池。

以上内容便是整个代理池的架构和相应实现逻辑。

5. 运行

接下来,我们将代码整合一下,将代理运行起来,运行之后的输出结果如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
2020-04-13 02:52:06.510 | INFO     | proxypool.storages.redis:decrease:73 - 60.186.146.193:9000 current score 10.0, decrease 1
2020-04-13 02:52:06.517 | DEBUG | proxypool.processors.tester:test:52 - proxy 60.186.146.193:9000 is invalid, decrease score
2020-04-13 02:52:06.524 | INFO | proxypool.storages.redis:decrease:73 - 60.186.151.147:9000 current score 10.0, decrease 1
2020-04-13 02:52:06.532 | DEBUG | proxypool.processors.tester:test:52 - proxy 60.186.151.147:9000 is invalid, decrease score
2020-04-13 02:52:07.159 | INFO | proxypool.storages.redis:max:96 - 60.191.11.246:3128 is valid, set to 100
2020-04-13 02:52:07.167 | DEBUG | proxypool.processors.tester:test:46 - proxy 60.191.11.246:3128 is valid, set max score
2020-04-13 02:52:17.271 | INFO | proxypool.storages.redis:decrease:73 - 59.62.7.130:9000 current score 10.0, decrease 1
2020-04-13 02:52:17.280 | DEBUG | proxypool.processors.tester:test:52 - proxy 59.62.7.130:9000 is invalid, decrease score
2020-04-13 02:52:17.288 | INFO | proxypool.storages.redis:decrease:73 - 60.167.103.74:1133 current score 10.0, decrease 1
2020-04-13 02:52:17.295 | DEBUG | proxypool.processors.tester:test:52 - proxy 60.167.103.74:1133 is invalid, decrease score
2020-04-13 02:52:17.302 | INFO | proxypool.storages.redis:decrease:73 - 60.162.71.113:9000 current score 10.0, decrease 1
2020-04-13 02:52:17.309 | DEBUG | proxypool.processors.tester:test:52 - proxy 60.162.71.113:9000 is invalid, decrease score

以上是代理池的控制台输出,可以看到这里将可用代理设置为 100,不可用代理分数减 1。

接下来,我们再打开浏览器,当前配置运行在 5555 端口,所以打开 http://127.0.0.1:5555 即可看到其首页,如图所示。

image-20231003073520357

再访问 http://127.0.0.1:5555/random,即可获取随机可用代理,如图 9-3 所示。

image-20231003073333205

只需要访问此接口,即可获取一个随机可用代理,这非常方便。

获取代理的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
import requests

PROXY_POOL_URL = 'http://localhost:5555/random'

def get_proxy():
try:
response = requests.get(PROXY_POOL_URL)
if response.status_code == 200:
return response.text
except ConnectionError:
return None

这样便可以获取到一个随机代理了。它是字符串类型,此代理可以按照上一节所示的方法设置,如 requests 的使用方法如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
import requests

proxy = get_proxy()
proxies = {
'http': 'http://' + proxy,
'https': 'https://' + proxy,
}
try:
response = requests.get('http://httpbin.org/get', proxies=proxies)
print(response.text)
except requests.exceptions.ConnectionError as e:
print('Error', e.args)

有了代理池之后,再取出代理即可有效防止 IP 被封禁的情况。

6. 总结

本节我们学习了一个代理池的设计思路和实现方案,有了这个代理池,我们就可以实时获取一些可用的代理了。相对之前的实战案例来说,整个代理池的代码量和逻辑复杂了比较多,建议可以好好理解和消化一下。

本节的代码地址为 https://github.com/Python3WebSpider/ProxyPool,代码库中还提供了基于 Docker 和 Kubernetes 的运行和部署操作,可以帮助我们更加快捷地运行代理池,同时本书后文也会介绍代理池的部署方法。

付费代理

相对免费代理来说,付费代理的稳定性相对更高一点。

付费代理分类

  • 提供接口获取海量代理,按天或者按量付费,如讯代理、快代理、芝麻代理、多贝云代理等
  • 搭建了代理隧道,直接设置固定域名代理,如阿布云代理,快代理,多贝云代理等

讯代理

讯代理个人使用过代理有效率还是蛮高的,此处非广告,其官网为:http://www.xdaili.cn/,

image-20230307190537176

讯代理官网 有多种类别的代理可供选购,摘抄其官网的各类别代理介绍如下:

  • 优质代理: 适合对代理 IP 需求量非常大,但能接受代理有效时长较短(10~30 分钟),小部分不稳定的客户
  • 独享动态: 适合对代理 IP 稳定性要求非常高,且可以自主控制的客户,支持地区筛选。
  • 独享秒切: 适合对代理 IP 稳定性要求非常高,且可以自主控制的客户,快速获取 IP,地区随机分配
  • 动态混拨: 适合对代理 IP 需求量大,代理 IP 使用时效短(3 分钟),切换快的客户
  • 优质定制: 如果优质代理的套餐不能满足您的需求,请使用定制服务

一般选择第一类别优质代理即可,代理量比较大,但是代理的稳定性没那么高,有一些代理也是不可用的,所以这种代理的使用方式就需要借助于上一节所说的代理池,我们自己再做一次筛选,确保代理可用。 可以购买一天的试一下效果,购买之后会提供一个 API 来提取代理

image-20231003073341510比如在这里我的提取 API 为:http://www.xdaili.cn/ipagent/greatRecharge/getGreatIp?spiderId=da289b78fec24f19b392e04106253f2a&orderno=YZ20177140586mTTnd7&returnType=2&count=20,可能已过期,在此仅做演示。

在这里指定了提取数量为 20,提取格式为 Json,直接访问链接即可提取代理:

image-20231003073346771接下来我们要做的就是解析这个 Json,然后将其放入我们的代理池中。 当然如果信赖讯代理的话也可以不做代理池筛选,直接使用,不过我个人还是推荐再使用代理池筛选一遍,提高可用几率。 根据上一节代理池的写法,我们只需要在 Crawler 中再加入一个 crawl 开头的方法即可。 方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
def crawl_xdaili(self):
"""
获取讯代理
:return: 代理
"""
url = 'http://www.xdaili.cn/ipagent/greatRecharge/getGreatIp?spiderId=da289b78fec24f19b392e04106253f2a&orderno=YZ20177140586mTTnd7&returnType=2&count=20'
html = get_page(url)
if html:
result = json.loads(html)
proxies = result.get('RESULT')
for proxy in proxies:
yield proxy.get('ip') + ':' + proxy.get('port')

这样我们就在代理池中接入了讯代理,获取讯代理的结果之后,解析 Json,返回代理即可。 这样代理池运行之后就会抓取和检测该接口返回的代理了,如果可用,那么就会被设为 100,通过代理池接口即可获取到。 以上以讯代理为例说明了此种批量提取代理的使用方法。

阿布云代理

阿布云代理提供了代理隧道,代理速度快而且非常稳定,此处依然非广告,其官网为:https://www.abuyun.com/

image-20231003073351420阿布云的代理主要分为两种,专业版和动态版,另外还有定制版,摘抄官网的介绍如下:

  • 专业版,多个请求锁定一个代理 IP,海量 IP 资源池需求,近 300 个区域全覆盖,代理 IP 可连续使用 1 分钟,适用于请求 IP 连续型业务
  • 动态版,每个请求一个随机代理 IP,海量 IP 资源池需求,近 300 个区域全覆盖,适用于爬虫类业务
  • 定制版,灵活按照需求定制,定制 IP 区域,定制 IP 使用时长,定制 IP 每秒请求数

关于专业版和动态版的更多介绍可以查看官网:https://www.abuyun.com/http-proxy/dyn-intro.html。 对于爬虫来说,推荐使用动态版,购买之后可以在后台看到代理隧道的用户名和密码:

image-20231003073356871

可以发现整个代理的连接域名为 proxy.abuyun.com,端口为 9020,均是固定的,但是使用之后每次的 IP 都会更改,这其实就是利用了代理隧道实现。

其官网原理介绍如下:

  • 云代理通过代理隧道的形式提供高匿名代理服务,支持 HTTP/HTTPS 协议。
  • 云代理在云端维护一个全局 IP 池供代理隧道使用,池中的 IP 会不间断更新,以保证同一时刻 IP 池中有几十到几百个可用代理 IP。
  • 需要注意的是代理 IP 池中有部分 IP 可能会在当天重复出现多次。
  • 动态版 HTTP 代理隧道会为每个请求从 IP 池中挑选一个随机代理 IP。
  • 无须切换代理 IP,每一个请求一个随机代理 IP。
  • HTTP 代理隧道有并发请求限制,默认每秒只允许 5 个请求。如果需要更多请求数,请额外购买。

注意默认套餐的并发请求是 5 个,如果需要更多需要另外购买。 使用的教程在官网也有,链接为:https://www.abuyun.com/http-proxy/dyn-manual-python.html,提供了 Requests、Urllib、Scrapy 的接入方式。 以 Requests 为例,接入示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import requests

url = 'http://httpbin.org/get'

# 代理服务器
proxy_host = 'proxy.abuyun.com'
proxy_port = '9020'

# 代理隧道验证信息
proxy_user = 'H01234567890123D'
proxy_pass = '0123456789012345'

proxy_meta = 'http://%(user)s:%(pass)s@%(host)s:%(port)s' % {
'host': proxy_host,
'port': proxy_port,
'user': proxy_user,
'pass': proxy_pass,
}
proxies = {
'http': proxy_meta,
'https': proxy_meta,
}
response = requests.get(url, proxies=proxies)
print(response.status_code)
print(response.text)

在这里其实就是使用了代理认证,在前面我们也提到过类似的设置方法,运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
200
{
"args": {},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Connection": "close",
"Host": "httpbin.org",
"User-Agent": "python-requests/2.18.1"
},
"origin": "60.207.237.111",
"url": "http://httpbin.org/get"
}

输出结果的 origin 即为代理 IP 的实际地址,可以多次运行测试,可以发现每次请求 origin 都会在变化,这就是动态版代理的效果。 这种效果其实跟我们之前的代理池的随机代理效果类似,都是随机取出了一个当前可用代理。 但是此服务相比于维护代理池来说,使用更加方便,配置简单,省时省力,在价格可以接受的情况下,个人推荐此种代理。

ADSL代理搭建

我们在前面尝试维护过一个代理池,代理池可以挑选出许多可用代理,但是常常其稳定性不高、响应速度慢,而且这些代理通常是公共代理,可能不止一人同时使用,其 IP 被封的概率很大。另外,这些代理可能有效时间比较短,虽然代理池一直在筛选,但如果没有及时更新状态,也有可能获取到不可用的代理。

上一节我们也了解了付费代理的使用,付费代理的质量相对免费代理就会好不少,这的确已经是一个相对不错的方案了,但本节要介绍的方案可以使我们既能不断更换代理,又可以保证代理的稳定性

在一些付费代理套餐中,大家可能会注意到有这样的一个套餐 - 独享代理或私密代理,这种其实就是用了专用服务器搭建了代理服务,相对一般的付费代理来说,其稳定性更好,速度也更快,同时 IP 可以动态变化。这种独享代理或私密代理的 IP 切换大多数都是基于 ADSL 拨号机制来实现的,一台云主机每拨号一次就可以换一个 IP,同时云主机上搭建了代理服务,我们就可以直接使用该云主机的 HTTP 代理来进行数据爬取了。

本节我们就来实际操作一下搭建 ADSL 拨号代理服务的方法。

什么是 ADSL

ADSL,英文全称是 Asymmetric Digital Subscriber Line,即非对称数字用户环路。它的上行和下行带宽不对称,它采用频分复用技术把普通的电话线分成了电话、上行和下行 3 个相对独立的信道,从而避免了相互之间的干扰。

ADSL 通过拨号的方式上网,拨号时需要输入 ADSL 账号和密码,每次拨号就更换一个 IP。IP 分布在多个 A 段,如果 IP 都能使用,则意味着 IP 量级可达千万。如果我们将 ADSL 主机作为代理,每隔一段时间云主机拨号就换一个 IP,这样可以有效防止 IP 被封禁。另外,由于我们是直接使用专有的云主机搭建的代理服务,所以其代理的稳定性相对更好,代理响应速度也相对更快。

准备工作

在本节开始之前,我们需要先购买几台 ADSL 代理云主机,建议 2 台或以上。因为云主机在拨号的一瞬间服务器正在切换 IP,所以拨号之后代理是不可用的状态,所以需要 2 台及以上云主机来做负载均衡。

ADSL 代理云主机的服务商还是比较多的,个人推荐的有阿斯云、云立方等,其官网分别为:

本节案例中,我们以阿斯云为例,购买了一台电信型同时安装了 CentOS Linux 系统的云主机。

购买成功之后,我们可以在后台找到服务器的连接 IP、端口、用户名、密码,拨号所用的用户名和密码,如图所示:

image-20231003073402678

然后找到远程管理面板 − 远程连接的用户名和密码,也就是 SSH 远程连接服务器的信息。比如我使用的 IP 和端口是 zhongweidx01.jsq.bz:30042,用户名是 root,命令行下输入如下内容:

1
ssh root@zhongweidx01.jsq.bz -p 30042

输入连接密码,就可以连接上远程服务器了,如图所示:

image-20231003073406286

登录成功之后,我们便可以开始本节的正式内容了。

测试拨号

云主机默认已经配置了拨号相关的信息,如宽带用户名和密码等,所以我们无需额外进行配置,只需要调用相应的拨号命令即可实现拨号和 IP 地址的切换。

我们可以输入如下拨号命令来进行拨号:

1
pppoe-start

拨号命令成功运行,没有报错信息,耗时约几秒,结束之后整个主机就获得了一个有效的 IP 地址。

如果要停止拨号,可以输入如下命令:

1
pppoe-stop

运行完该命令后,网络就会断开,之前的 IP 地址也会被释放。

注意:不同的云主机的拨号命令可能不同,如云立方主机的拨号命令为 adsl-startadsl-stop,请以官方文档的说明为准。

所以,如果要想切换 IP,我们只需要将上面的两个命令组合起来,先执行 pppoe-stop,再执行 pppoe-start。每次拨号,ifconfig 命令观察主机的 IP,如图所示:

image-20231003073410668

可以看到,这里我们执行了停止和开始拨号的命令之后,通过 ifconfig 命令获取的网卡信息的 IP 地址就变化了,所以我们成功实现了 IP 地址的切换。

好,那如果我们要想将这台云主机设置为可以实时变化 IP 的代理服务器的话,主要就有这几件事情:

  • 在云主机上运行代理服务软件,使之可以提供 HTTP 代理服务
  • 实现云主机定时拨号更换 IP
  • 实时获取云主机的代理 IP 和端口信息

接下来我们就来完成这几部分内容吧。

设置代理服务器

当前我们使用的云主机使用的是 Linux 的 CentOS 系统,目前它是无法作为一个 HTTP 代理服务器来使用的,因为该云主机上面目前并没有运行相关的代理软件。要想让该云主机提供 HTTP 代理服务,我们需要在其上面安装并运行相关的代理服务。

那什么软件能提供这种代理服务呢?目前业界比较流行的有 Squid 和 TinyProxy,配置完成之后它们会在特定端口上运行一个 HTTP 代理。知道了该云主机当前的 IP 之后,我们就能使用该云主机上 Squid 或 TinyProxy 提供的 HTTP 代理了。

这里我们以 Squid 为例来进行一下配置。

首先我们安装一下 Squid,在 CentOS 的安装命令如下:

1
2
sudo yum -y update
yum -y install squid

运行完之后,我们便可以成功安装好 Squid 了。

如果要想启动 Squid,可以运行如下命令:

1
systemctl start squid

如果想配置开机自动启动,可以运行如下命令:

1
systemctl enable squid

Squid 成功运行之后,我们可以使用如下命令查看当前 Squid 的运行状态:

1
systemctl status squid

如图所示,我们可以看到 Squid 就成功运行了:

image-20231003073415510

默认情况下,Squid 会运行在 3128 端口,也就是相当于在云主机的 127.0.0.1:3128 上启动了代理服务,接下来我们可以来测试下 Squid 的代理效果,在该台云主机上运行 curl 命令请求 https://httpbin.org,并配置使用云主机的代理:

1
curl -x http://127.0.0.1:3128 https://httpbin.org/get

这里 curl 的 -x 参数代表设置 HTTP 代理,由于这是在云主机上运行的,所以代理直接设置为了 http://127.0.0.1:3128。

运行完毕之后,我们再运行下 ifconfig 获取下当前云主机的 IP,运行结果如图所示:

image-20231003073439688

可以看到返回结果的 origin 字段的 IP 就和 ifconfig 获取的 IP 地址是一致的。

接下来,我们在自己本机上(非云主机)运行如下命令测试下代理的连通情况,这里 IP 就需要更换为云主机本身的 IP 了,刚才可以看到云主机当前拨号的 IP 是 106.45.104.166,所以需要运行如下命令:

1
curl -x http://106.45.104.166:3128 https://httpbin.org/get

然而发现并没有对应的输出结果,代理连接失败。

其实原因在于默认情况下 Squid 并没有开启允许外网访问,我们可以进行 Squid 的相关配置,如更改当前代理运行端口、允许连接的 IP,配置高匿代理等等,这些都需要修改 /etc/squid/squid.conf 文件。

要允许公网访问,最简单的方案就是将 /etc/squid/squid.conf 中的该行:

1
http_access deny all

修改为:

1
http_access allow all

意思是允许来自所有 IP 的请求连接。

另外还需要在配置文件的开头 acl 配置的部分添加该行内容:

1
acl all src 0.0.0.0/0

另外我们还想将 Squid 配置成高度匿名代理,这样目标网站就无从通过一些参数如 X-Forwarded-For 来得知爬虫机本身的 IP 了,所以在配置文件中再添加如下配置:

1
2
request_header_access Via deny all
request_header_access X-Forwarded-For deny all

另外有的云主机厂商可能默认封禁了 Squid 的 3128 端口,建议更换一个端口,比如 3328,修改改行:

1
http_port 3128

修改为:

1
http_port 3328

修改完配置之后保存配置文件,然后重新启动 Squid 即可:

1
systemctl restart squid

这时候在本机上(非云主机)重新运行刚才的 curl 命令,同时更改下端口:

1
curl -x http://106.45.104.166:3328 https://httpbin.org/get

即可得到返回结果:

1
2
3
4
5
6
7
8
9
10
11
{
"args": {},
"headers": {
"Accept": "*/*",
"Host": "httpbin.org",
"User-Agent": "curl/7.64.1",
"X-Amzn-Trace-Id": "Root=1-60ea8fc0-0701b1494e4680b95889cdb1"
},
"origin": "106.45.104.166",
"url": "https://httpbin.org/get"
}

这时候我们就可以在本机上直接使用云主机的代理了!

动态获取 IP

现在我们已经可以执行命令让主机动态切换 IP 了,同时也在主机上搭建好代理服务器了,接下来我们只需要知道拨号后的 IP 就可以使用代理了。

那怎么动态获取拨号主机的 IP 呢?又怎么来维护这些代理呢?怎么保证获取到的代理一定是可用的呢?这时候我们可能想到一些问题:

  • 如果我们只有一台拨号云主机并设置了定时拨号的话,那么在拨号的几秒时间内,该云主机提供的代理服务是不可用的。
  • 如果我们不用定时拨号的方法,而想要在爬虫端控制拨号云主机的拨号操作的话,爬虫端还需要单独的逻辑来处理拨号和重连的问题,这会带来额外的开销。

综合考虑下来,一个比较好的解决方案是:

  • 为了不增加爬虫端的逻辑开销,爬虫端应该无需关心拨号云主机的拨号操作,我们只需要保证爬虫通过某个接口获取到的代理是可用的就行了,拨号云主机的代理的维护逻辑和爬虫是毫不相关的。
  • 为了解决一台拨号云主机在拨号时代理不可用的问题,我们需要有多台云主机同时提供代理服务,我们可以将不同云主机的拨号时段错开,当一台云主机正在拨号时,我们可以用其他云主机顶替。
  • 为了更加方便地维护和使用代理,我们可以像前文介绍的代理池一样把这些云主机的代理统一维护起来,所有拨号云主机的代理统一存储到一个公共的 Redis 数据库中,可以使用 Redis 的 Hash 存储方式,存好每台云主机和对应代理的映射关系。拨号云主机拨号前将自己对应的代理内容清空,拨号成功之后再将代理更新,这样 Redis 数据库中的代理就一定是实时可用的代理了。

利用这种思路,我们要做的其实就有如下几点:

  • 配置一个可以公网访问的 Redis 数据库,每台云主机可以将自己的代理存储到对应的 Redis 数据库中,由该 Redis 数据库维护这些代理。
  • 申请多台拨号云主机并按照上文所述配置好 Squid 代理服务,每台云主机设置定时拨号来更换 IP。
  • 每台云主机在拨号前删除 Redis 数据库中原来的代理,拨号成功之后测试一下代理的可用性,将最新的代理更新到 Redis 数据库中即可。

OK,接下来我们就来操作一下吧。

由于云主机要进行 Redis 数据库的操作,所以我们可以使用 Python 来实现,所以先在云主机上装下 Python:

1
yum -y install python3

关于自动拨号、连接 Redis 数据库、获取本机代理、设置 Redis 数据库的操作,我已经写好了一个 Python 的包并发布到 PyPi 了,我们可以直接使用这个包来完成如上的功能,这个包叫做 adslproxy,可以在云主机上使用 pip3 来安装:

1
pip3 install adslproxy

安装完毕之后,我们可以使用 export 命令设置下环境变量:

1
2
3
4
5
6
7
8
export REDIS_HOST=<Redis数据库的地址>
export REDIS_PORT=<Redis数据库的端口>
export REDIS_PASSWORD=<Redis数据库的密码>
export PROXY_PORT=<拨号云主机配置的代理端口>
export DIAL_BASH=<拨号脚本>
export DIAL_IFNAME=<网卡名称>
export CLIENT_NAME=<云主机的唯一标识>
export DIAL_CYCLE=<拨号间隔>

这里 REDIS_HOST、REDIS_PORT、REDIS_PASSWORD 就是远程 Redis 的连接信息,就不再赘述了。PROXY_PORT 就是云主机上代理服务的端口,我们已经设置为了 3328。DIAL_BASH 就是拨号的命令,即 pppoe-stop;pppoe-start,当然该脚本的内容不同的云主机厂商可能不同,以实际为准。DIAL_IFNAME 即拨号云主机上的网卡名称,程序可以通过获取该网卡的信息来获取当前拨号主机的 IP 地址,通过上述操作可以发现,网卡名称叫做 ppp0,当然这个名称也是以实际为准。CLIENT_NAME 就是云主机的唯一标识,用来在 Redis 中存储主机和代理的映射,因为我们有多台云主机,所以不同云主机的名称应该设置为不同的字符串,比如 adsl1、adsl2 等等。

这里我们设置如图所示:

image-20231003073446464

设置好环境变量之后,我们就可以运行 adslproxy 命令来进行拨号了,命令如下:

1
adslproxy send

运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
2021-07-11 15:30:03.062 | INFO     | adslproxy.sender.sender:loop:90 - Starting dial...
2021-07-11 15:30:03.063 | INFO | adslproxy.sender.sender:run:99 - Dial started, remove proxy
2021-07-11 15:30:03.063 | INFO | adslproxy.sender.sender:remove_proxy:62 - Removing adsl1...
2021-07-11 15:30:04.065 | INFO | adslproxy.sender.sender:remove_proxy:69 - Removed adsl1 successfully
2021-07-11 15:30:05.373 | INFO | adslproxy.sender.sender:run:111 - Get new IP 106.45.105.33
2021-07-11 15:30:15.552 | INFO | adslproxy.sender.sender:run:120 - Valid proxy 106.45.105.33:3328
2021-07-11 15:30:16.501 | INFO | adslproxy.sender.sender:set_proxy:82 - Successfully set proxy 106.45.105.33:3328
2021-07-11 15:33:36.678 | INFO | adslproxy.sender.sender:loop:90 - Starting dial...
2021-07-11 15:33:36.679 | INFO | adslproxy.sender.sender:run:99 - Dial started, remove proxy
2021-07-11 15:33:36.680 | INFO | adslproxy.sender.sender:remove_proxy:62 - Removing adsl1...
2021-07-11 15:33:37.214 | INFO | adslproxy.sender.sender:remove_proxy:69 - Removed adsl1 successfully
2021-07-11 15:33:38.617 | INFO | adslproxy.sender.sender:run:111 - Get new IP 106.45.105.219
2021-07-11 15:33:48.750 | INFO | adslproxy.sender.sender:run:120 - Valid proxy 106.45.105.219:3328
...

这里我们就可以看到,因为云主机在拨号之后当前代理就会失效了,所以在拨号之前程序先尝试从 Redis 中删除当前云主机的代理。接下来就开始执行拨号操作,拨号成功之后验证一下代理是可用的,然后再将该代理存储到 Redis 数据库中。循环往复运行,我们就达到了定时更换 IP 的效果,同时 Redis 数据库中也是实时可用的代理。

最后按照同样的配置,我们可以购买多台拨号云主机并进行如上同样的设置,这样就有多个稳定的定时更新的代理可用了,Redis 中会实时更新各台云主机的代理,如图所示。

image-20231003073451662

图中所示是四台 ADSL 拨号云主机配置并运行后 Redis 数据库中的内容,其中的代理都是实时可用的。

使用代理

那怎么使用代理呢?我们可以在任意可以公网访问的云主机上连接刚才的 Redis 数据库并搭建一个 API 服务即可。怎么搭建呢?我们可以同样使用刚才的 adslproxy 库,该库也提供了 API 服务的功能。

为了方便测试,我们在本机进行测试,安装好 adslproxy 包之后,然后设置好 REDIS 相关的环境变量:

1
2
3
export REDIS_HOST=<Redis数据库的地址>
export REDIS_PORT=<Redis数据库的端口>
export REDIS_PASSWORD=<Redis数据库的密码>

然后运行如下命令启动即可:

1
2020-07-11 16:31:58.651 | INFO     | adslproxy.server.server:serve:68 - API listening on http://0.0.0.0:8425

可以看到 API 服务就在 8425 端口上运行了,我们打开浏览器即可访问首页,如图所示:

image-20231003073455963

其中最重要的就是 random 接口了,我们使用 random 接口即可获取 Redis 数据库中的一个随机代理,如图所示:

image-20231003073459922

测试下可用性也没有问题,这样爬虫就可以使用这个代理来进行数据爬取了。

最后,我们将 API 服务部署一下,这个 ADSL 代理服务就可以像代理池一样被使用了,每请求一次 API 就可以获取一个实时可用代理,不同的时间段这个代理就会实时更换,而且连接稳定速度又快,实在是网络爬虫的最佳搭档。

总结

本节我们介绍了 ADSL 拨号代理的搭建过程。通过这种代理,我们可以无限次更换 IP,而且线路非常稳定,爬虫抓取效果也会好很多。

本节代码:https://github.com/Python3WebSpider/AdslProxy。

代理反爬实战

以一个ip反爬网站为例进行一次实战演练,该网站限制单个ip没5分钟最多访问10次。超过10便会封锁该ip返回403,10分钟才解除。此时如果切换一个网络环境(全局代理/WiFi切换到手机热点/其他能让访问目标网站所用的ip变化的手段),就可以看到页面了。

目标网站:https://antispider5.scrape.center/

  • 如果我们想要短时间内爬取这个网站内的所有数据,得更换多个ip进行爬取。就要用到代理了。
  • 无法预知某个代理是否可以一次性完成操作,因此请求可能成功可能失败,失败是因为被封锁了,或者代理本身失效,为了正常爬取,需要有重试机制。(可以用redis队列,将失败的请求记录要记录下来,等待下次被调度。)