我们抓取,我们采集,我们分析,我们挖掘

爬虫数据存储

保存为TXT

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import requests
from pyquery import PyQuery as pq
import re

url = 'https://ssr1.scrape.center/'
html = requests.get(url).text
doc = pq(html)
items = doc('.el-card').items()

file = open('movies.txt', 'w', encoding='utf-8')
for item in items:
# 电影名称
name = item.find('a > h2').text()
file.write(f'名称: {name}\n')
# 类别
categories = [item.text() for item in item.find('.categories button span').items()]
file.write(f'类别: {categories}\n')
# 上映时间
published_at = item.find('.info:contains(上映)').text()
published_at = re.search('(\d{4}-\d{2}-\d{2})', published_at).group(1) \
if published_at and re.search('\d{4}-\d{2}-\d{2}', published_at) else None
file.write(f'上映时间: {published_at}\n')
# 评分
score = item.find('p.score').text()
file.write(f'评分: {score}\n')
file.write(f'{"=" * 50}\n')
file.close()

效果:

image-20230303111116360

文件打开方式

  • r:以只读方式打开文件,意思就是只能读取文件内容,不能写入文件内容。这是默认模式。
  • rb:以二进制只读方式打开一个文件,通常用于打开二进制文件,比如音频、图片、视频等等。
  • r+:以读写方式打开一个文件,既可以读文件又可以写文件。
  • rb+:以二进制读写方式打开一个文件,同样既可以读又可以写,但读取和写入的都是二进制数据。
  • w:以写入方式打开一个文件。如果该文件已存在,则将其覆盖。如果该文件不存在,则创建新文件。
  • wb:以二进制写入方式打开一个文件。如果该文件已存在,则将其覆盖。如果该文件不存在,则创建新文件。
  • w+:以读写方式打开一个文件。如果该文件已存在,则将其覆盖。如果该文件不存在,则创建新文件。
  • wb+:以二进制读写格式打开一个文件。如果该文件已存在,则将其覆盖。如果该文件不存在,则创建新文件。
  • a:以追加方式打开一个文件。如果该文件已存在,文件指针将会放在文件结尾。也就是说,新的内容将会被写入到已有内容之后。如果该文件不存在,则创建新文件来写入。
  • ab:以二进制追加方式打开一个文件。如果该文件已存在,则文件指针将会放在文件结尾。也就是说,新的内容将会被写入到已有内容之后。如果该文件不存在,则创建新文件来写入。
  • a+:以读写方式打开一个文件。如果该文件已存在,文件指针将会放在文件的结尾。文件打开时会是追加模式。如果该文件不存在,则创建新文件来读写。
  • ab+:以二进制追加方式打开一个文件。如果该文件已存在,则文件指针将会放在文件结尾。如果该文件不存在,则创建新文件用于读写。

with as 优雅写法

在 with 控制块结束时,文件会自动关闭,所以就不需要再调用 close 方法了。

1
2
3
4
5
with open('movies.txt', 'w', encoding='utf-8'):
file.write(f'名称: {name}\n')
file.write(f'类别: {categories}\n')
file.write(f'上映时间: {published_at}\n')
file.write(f'评分: {score}\n')

保存为JSON

JavaScript中JSON有2种格式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{key1:value1, key2:value2, ...}
["java", "javascript", "vb", ...]

举例:
[
{
name: "Bob",
gender: "male",
birthday: "1992-10-18",
},
{
name: "Selina",
gender: "female",
birthday: "1995-10-18",
},
];

JSON可以由以上2种形式自由组合,结果清晰,是数据交换的极佳方式。

读取JSON

Python 为我们提供了简单易用的 JSON 库来实现 JSON 文件的读写操作,

我们可以调用 JSON 库的 loads 方法将 JSON 文本字符串转为 JSON 对象

另外我们还可以通过 dumps 方法将 JSON 对象转为文本字符串。

例如,这里有一段 JSON 形式的字符串,它是 str 类型,我们用 Python 将其转换为可操作的数据结构,如列表或字典:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import json
# 加载JSON文件
with open('data.json', encoding='utf-8') as file:
str = file.read()
data = json.loads(str)
print(data)
data = json.load(open('data.json', encoding='utf-8'))

# 加载JSON字符串: JSON字符串必须用双引号,单引号不行
str = '''
[{
"name": "Bob",
"gender": "male",
"birthday": "1992-10-18"
}, {
"name": "Selina",
"gender": "female",
"birthday": "1995-10-18"
}]
'''
print(type(str)) # <class'str'>
data = json.loads(str)
print(data) # [{'name': 'Bob', 'gender': 'male', 'birthday': '1992-10-18'}, {'name': 'Selina', 'gender': 'female', 'birthday': '1995-10-18'}]
print(type(data)) # <class 'list'>

# 根据索引获取对象内容
data[0]['name'] # Bob
data[0].get('name') # Bob
data[0].get('age', 25) # 第二个参数为取不到时的默认值

输出JSON

另外,我们还可以调用 dumps 方法将 JSON 对象转化为字符串。例如,将上例中的列表重新写入文本:

1
2
3
4
5
6
7
8
9
10
11
import json

data = [{
'name': 'Bob',
'gender': 'male',
'birthday': '1992-10-18'
}]
with open('data.json', 'w', encoding='utf-8') as file:
file.write(json.dumps(data)) # 一行字符串
file.write(json.dumps(data, indent=2)) # 保存 JSON 的格式缩进
file.write(json.dumps(data, indent=2, ensure_ascii=False)) # 中文字符会变成 Unicode 字符,为了输出中文,还需要指定参数 ensure_ascii 为 False

利用 dumps 方法,我们可以将 JSON 对象转为字符串,然后再调用文件的 write 方法写入文本,结果如图所示。

写入结果如图所示。

image-20231003072332890

可以发现,这样就可以输出 JSON 为中文了。

保存为CSV

CSV(comma-Separated Values),逗号分割值/字符串分隔值。文件以纯文本形式存储表格数据。比Excel更简洁(xls文本是电子表格,包含文本数值和格式等)

1
2
3
4
5
6
7
8
9
10
11
12
#!/usr/bin/env python
# -*- coding:utf-8 -*- 

import csv

with open('data.csv','w') as f:
# fw = csv.writer(f) # 默认逗号分隔符
fw = csv.writer(f,delimiter =' ') # 指定空格分隔符
fw.writerow(['id','name','age'])
fw.writerow(['1001','tom','18'])
fw.writerow(['1002','jerry','18'])
fw.writerows([['1003','jerry2','18'],['1004','jerry3','18']]) # 写入多行

用CSV库保存字典数据为CSV文件

一般爬虫爬取的都是结构化数据,一般用字典表示

1
2
3
4
5
6
7
8
9
10
def 字典写入():
with open('data.csv', 'w',encoding='utf-8') as f:
fieldnames = ['id', 'name', 'age']
fw = csv.DictWriter(f, fieldnames=fieldnames) # 指定空格分隔符
fw.writeheader()
fw.writerow({'id':'1','name':'兰陵王','age':19}) # 中文,encoding='utf-8'
fw.writerow({'id':'1','name':'tom','age':19})
fw.writerow({'id':'1','name':'tom','age':19})
if __name__ == '__main__':
字典写入()

用pandas库保存字典数据为CSV文件

1
2
3
pip3 install pandas
# 网不行:
pip3 install pandas -i https://pypi.tuna.tsinghua.edu.cn/simple/ # 清华源

保存文件:

1
2
3
4
5
6
7
8
9
import pandas as pd
def pandas保存Csv():
data = [
{'id': '1', 'name': '兰陵王', 'age': 19},
{'id': '2', 'name': '孙尚香', 'age': 19},
{'id': '3', 'name': '鲁班', 'age': 19}
]
df = pd.DataFrame(data)
df.to_csv('data.csv',index=False)

用CSV库读取CSV文件

1
2
3
4
5
def 用CSV库读取CSV文件():
with open('data.csv','r',encoding='utf-8') as f:
fr = csv.reader(f)
for row in fr:
print(row)

用pandas库读取CSV文件

1
2
3
4
5
6
7
8
9
10
import pandas as pd
def 用pandas库读取CSV文件():
df = pd.read_csv('data.csv')
print('------将数据按默认格式输出,包括表头------')
print(df)
print('------将数据转为list输出------')
print(df.values.tolist())
print('------遍历每一行------')
for index,row in df.iterrows():
print(row.tolist())

保存到MySQL

1
2
# 安装库
pip3 install pymysql

创建一个爬虫数据库spiders

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import pymysql

def 创建表():
db = pymysql.connect(host='localhost',user='root',password='root',db='spiders')
cursor = db.cursor()
sql = 'CREATE TABLE IF NOT EXISTS students(id varchar(255) not null,name varchar(255) not null,age int not null,primary key (id))'
cursor.execute(sql)
db.close()

def 插入数据():
db = pymysql.connect(host='localhost', user='root', password='root', port=3306, db='spiders')
cursor = db.cursor()
sql = 'insert into students(id,name,age) values(%ss,%ss,%ss)'
try:
cursor.execute(sql,('1','孙尚香',18))
db.commit()
except Exception as e:
print('出错',e)
db.rollback()
db.close()

def 插入JSON数据():
db = pymysql.connect(host='localhost', user='root', password='root', port=3306, db='spiders')
cursor = db.cursor()
data = {
'id': '2',
'name': 'tom',
'age': '20'
}
table = 'students'
keys = ','.join(data.keys())
values = ','.join(['%s'* len(data)])
sql = 'insert into {table} ({keys}) values ({values})'.format(table=table,keys=keys,values=values)
try:
if cursor.execute(sql,tuple(data.values())):
print('插入成功')
db.commit()
except Exception as e:
print('出错',e)
db.rollback()
db.close()

def 查询():
db = pymysql.connect(host='localhost', user='root', password='root', port=3306, db='spiders')
cursor = db.cursor()
sql = 'select * from students where age > 1'
try:
cursor.execute(sql)
print('总数', cursor.rowcount)
print('一条', cursor.fetchone())
for row in cursor.fetchall():
print('每条',row)
except Exception as e:
print('出错',e)

if __name__ == '__main__':
# 创建表()
# 插入数据()
# 插入JSON数据()
查询()

保存到MongoDB

爬虫一条数据可能存在因某些字段提取失败而缺失的情况,且数据可能随时调整。或者存在嵌套关系。

如果用关系型数据库,需要提前建好表,而且如果存在嵌套,要序列化才能保存,很不方便,

所以可以考虑用非关系型数据库MongoDB。

1
2
# 安装库
pip3 install pymongo

文档: https://docs.mongodb.com/manual/reference/operator/query/

crud示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import pymongo

# 连接 MongoDB
from bson import ObjectId

client = pymongo.MongoClient(host='localhost', port=27017)
# client = MongoClient('mongodb://localhost:27017/') # 效果一样

# 指定数据库
db = client.test
# db = client['test'] # 效果一样

# 指定集合
collection = db.students
# collection = db['students'] # 效果一样

# 推荐用insert_one 和 insert_many代替insert
def 插入单条数据():
student = {
'id': '20170101',
'name': 'Jordan',
'age': 20,
'gender': 'male'
}

result = collection.insert(student)
print(result)


def 插入多条数据():
student1 = {
'id': '20170101',
'name': 'Jordan',
'age': 20,
'gender': 'male'
}

student2 = {
'id': '20170202',
'name': 'Mike',
'age': 21,
'gender': 'male'
}

result = collection.insert([student1, student2])
print(result)

def 查询单条():
result = collection.find_one({'name': 'Mike'})
print(type(result))
print(result)
collection.find({'_id': {'$gt': ObjectId('593278c815c2602678bb2b8d')}})


def 查询多条():
results = collection.find({'age': 20})
results = collection.find({'age': {'$gt': 20}}) # 查询年龄大于 20
results = collection.find({'name': {'$regex': '^M.*'}}) # 查询名字以 M 开头
print(results)
for result in results:
print(result)

def 计数():
count = collection.find().count()
print(count)

def 排序():
results = collection.find().sort('name', pymongo.ASCENDING) # 降序:pymongo.DESCENDING
print([result['name'] for result in results])

# 在某些情况下,我们可能想只取某几个元素
def 偏移():
results = collection.find().sort('name', pymongo.ASCENDING).skip(2).limit(2)
# skip(2) 比如偏移 2,就忽略前两个元素
# limit(2) 限制2个结果
print([result['name'] for result in results])

def 更新():
condition = {'name': 'Kevin'}
student = collection.find_one(condition)
student['age'] = 25
result = collection.update(condition, student)
print(result)
# $set 只更新字典内的字段
result = collection.update(condition, {'$set': student})
# 推荐更新api
condition = {'name': 'Kevin'}
student = collection.find_one(condition)
student['age'] = 26
result = collection.update_one(condition, {'$set': student})
print(result)
# 获取更新结果
print(result.matched_count, result.modified_count)

def 删除():
result = collection.remove({'name': 'Kevin'})
print(result)
# 推荐API
result = collection.delete_one({'name': 'Kevin'})
print(result)
print(result.deleted_count)
result = collection.delete_many({'age': {'$lt': 25}})
print(result.deleted_count)

可以用robot 3T查看

保存到redis

Redis 是一个基于内存的高效的非关系型数据库,鉴于redis的便捷性和高效性我们会利用redis实现很多架构,比如维护代理池,张好吃,ADSL拨号代理池,scrapy-redis分布式架构等。

1
2
# 安装库
pip3 install redis

代码:

1
2
3
4
5
6
7
8
9
10
11
12
def 直接连接redis并设置值():
# 连接redis
redis = StrictRedis(host='localhost', port=6379, db=0) # ,password=''
# 设置值
redis.set('name', 'tom')
print(redis.get('name'))

def 连接池连接redis():
# pool = ConnectionPool(host='localhost', port=6379, db=0)
# 还可以用redis tcp连接
pool = ConnectionPool.from_url('redis://@localhost:6379/0') # redis://[:password]@host:port/db
redis = StrictRedis(connection_pool=pool)

ElasticSearch

想查数据,就免不了搜索,而搜索离不开搜搜引擎。百度、谷歌都是非常庞大、复杂的搜索引擎,他们几乎能够索引互联网上开放的所有网页和数据。然而对于我们自己的业务数据来说,没有必要使用这么复杂的技能,如果为了便于存储和检索,想要实现自己的搜索引擎,elasticSearch就是不二之选。这是一个全文搜索引擎,可以快速存储、搜索和分析海量数据。

所以,如果我们将爬取到的数据存储到elasticsearch中,检索时会非常方便。

1
2
3
4
# 安装库
# pip3 install elasticsearch # 版本太高可能使用时会报错
# pip3 install elasticsearch==7.13.0
pip3 install elasticsearch==6.8.0 # API不一样,建议7

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#!/usr/bin/env python
# -*- coding:utf-8 -*- 
from datetime import datetime

from elasticsearch import Elasticsearch

# 连接es
# es = Elasticsearch() # 默认连接本地9200
# 可以连接到SSL
# es = Elasticsearch(
# ['https://[username:password@]hostname:port'],
# verify_certs=True
# )
# 连接到http地址
es = Elasticsearch(
['http://192.168.88.130:9200']
)



def 创建索引():
result = es.indices.create(index='news_20230324',ignore=400)
print(result)

def 删除索引():
result = es.indices.delete(index='news_20230324',ignore=400)
print(result)

def 插入数据():
doc = {
'author': '左耳',
'text': 'python练习es记录',
'timestamp': datetime.now(),
}

res = es.index(index="test-index", doc_type='tweet', id=1, body=doc)
print(res['result'])

def 查询数据():
res = es.get(index="test-index", doc_type='tweet', id=1)
print(res['_source'])

def 搜索数据():
es.indices.refresh(index="test-index")

res = es.search(index="test-index", body={"query": {"match_all": {}}})
print("查询 %d 命中:" % res['hits']['total'])
for hit in res['hits']['hits']:
print("%(timestamp)s %(author)s: %(text)s" % hit["_source"])

def 更新数据():
doc = {
'author': '左耳',
'text': 'python练习es记录-修改',
'timestamp': datetime.now(),
}
result = es.update(index="test-index", doc_type="tweet",id=1, body={"doc":doc})
print(result)
if __name__ == '__main__':
# 创建索引()
# 删除索引()
# 插入数据()
更新数据()
查询数据()
# 搜索数据()

一个比较详细的例子:https://zhuanlan.zhihu.com/p/122511360

保存到rabbitmq

在爬取数据的过程中,可能需要一些进程间的通信机制,比如:

  • 一个进程负责构造爬取请求,另一个进程负责进程爬取请求
  • 某个进程新建了一个爬取任务,通知另外一个负责数据爬取的进程开始爬取数据
  • 某个数据爬取进程执行完毕,通知另外一个负责数据处理的进程开始处理数据

为了降低这些进程的耦合度,需要一个类似消息队列的中间件来存储和转发消息,实现进程间的通信。

1
2
# 安装库
pip3 install pika
  • 生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/usr/bin/env python
# -*- coding:utf-8 -*- 

import pika

# 创建凭证,使用rabbitmq用户密码登录
credentials = pika.PlainCredentials("zuoer", "zuoer")
cparameters = pika.ConnectionParameters(host='192.168.88.130', credentials=credentials, virtual_host='/zuoer_space')
connection = pika.BlockingConnection(cparameters)

# 创建频道
channel = connection.channel()
# 优先级
MAX_PRIORITY = 100
# 创建队列
QUEUE_NAME = 'scrape'

def 声明队列():

# 新建一个queue1队列,用于接收消息
channel.queue_declare(queue=QUEUE_NAME,
arguments={'x-max-priority': MAX_PRIORITY}, # 设置权重
durable=True, # 队列持久化
)

def 发送消息(data,priority=100):
channel.basic_publish(exchange='',
routing_key=QUEUE_NAME,
properties=pika.BasicProperties(priority=int(priority),delivery_mode=2), # 设置消息权重、持久化
body=data)
print("已经发送了消息")

# 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,需要关闭本次连接
# connection.close()

if __name__ == '__main__':
# 声明队列()
while True:
data = input()
发送消息(data)
  • 消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#!/usr/bin/env python
# -*- coding:utf-8 -*- 

import pika

# 创建凭证,使用rabbitmq用户密码登录
credentials = pika.PlainCredentials("zuoer", "zuoer")
cparameters = pika.ConnectionParameters(host='192.168.88.130', credentials=credentials, virtual_host='/zuoer_space')
connection = pika.BlockingConnection(cparameters)

# 创建频道
channel = connection.channel()
# 创建队列
QUEUE_NAME = 'scrape'

def 消费消息():
channel.basic_consume(queue=QUEUE_NAME,
auto_ack=True,
on_message_callback=callback)
channel.start_consuming()
def callback(ch,method,properties,body):
if body:
result = pickle.loads(body)
print(f'消费到消息: {result}')

if __name__ == '__main__':
消费消息()