import requests
from lxml import etree
from threading import Thread, Lock
from queue import Queue
import re
import csv
import time
"""
生产者/消费者模型
几乎在所有的编程语言中,都会涉及到一个关于并行生产和并行消费的模型,也就是我们通常说到的生产者/消费者模型。
模型的思路
这个模型的思路看名称就能知道,首先需要一个生产者,它负责生产,然后供消费者去使用,也就是说,他们之前共享了同一个资源,在我们这次的爬虫中,
这个资源就是每个网站的信息,生产者负责输出信息,消费者负责写入信息。
就像我们的市场一样,不可能一个市场只有一个生产者和一个消费者,一般都是多个生产者和多个消费者,并且他们是同时在运行的,生产者生产了资源,
消费者就会去消耗它,如果生产者生产的资源用完了,那么消费者应该等待,直到有新的资源可用。
模型的注意事项
虽然生产者/消费者模型是一个很经典的模型,但是在使用的过程中依然有一些需要注意的地方,比如下面几点是我在使用过程中总结的:
当生产者已经不再生产产品之后,需要给消费者线程传递信号,让消费者停止,这个过程涉及两个问题,第一个是什么时候传递信号,
第二个是怎么传递才能让消费者在不影响产品消耗的同时可以收到这个信息并停止线程?
生产者和消费者线程涉及到共享资源的时候有必要添加线程锁。
生产者和消费者多线程的启动顺序和结束顺序问题,即使 start() 和 join() 的时机。
"""
#Productor 类是生产者,作用是读取 URL 并请求和解析网页,最后将提取到的信息存放到一个队列里面供消费者使用
class Productor(Thread):
def __init__(self, q, w):
Thread.__init__(self)
self.q = q
self.w = w
"""
Productor 是一个继承了线程类的类,它需要传递2个参数,这2个参数都是队列,前一个是 URL 构成的队列,也就是“原材料”,后一个是用来存放网站信息的队列,供消费者使用。
每个生产者线程都有一个请求 Session(),而且我有注释给这个请求设置了一个属性,具体代码是:
"""
self.s = requests.Session()
"""
# 这个地方很重要,不设置这个请求状态的话后续请求会报错
这地方之所有要设置一下是因为我在使用多线程请求的过程中会有用到多次尝试请求,如果不设置这里就会造成大面积的请求失败,大概的报错就是请求达到上限。
"""
self.s.keep_alive = False
self.headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 5.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2634.33 Safari/537.36',
}
def run(self):
while not self.q.empty():
key = self.q.get()
"""
线程类只需要重写 run() 方法即可,这个的 run() 实现的就是使用 while 循环从队列中拿链接,直到队列为空就跳出循环。这里因为我存放到队列中的是一个元组,
元组有2个参数,第一个是链接,第二个默认是0,也就是表示链接请求的次数,从代码中可以看出,我使用了 try 语句来请求网页,每当有连接请求失败就把它重新丢进队列中,
并且最多请求三次,三次还失败就会停止请求这个链接并打印到控制台。具体代码实现:
"""
url = key[0]
if key[1] < 3:
try:
req = self.s.get(url, headers=self.headers, timeout=2)
time.sleep(0.1)
except Exception as e:
self.q.put((url, key[1] + 1))
print(url, e)
else:
# 设置网站的编码格式为utf-8
req.encoding = 'utf-8'
self.get_info(req.text)
else:
"""
这个请求三次的限制也是我在使用过程中想到的,因为我之前没有设置请求限制,然后发现有的链接根本就打不开(服务器的问题),然后就造成了死循环,
所以这里必须规定最大的请求次数,防止死循环的出现。
# 每个链接最多请求3次,如果3次还失败就放弃请求,打印链接
# 不这样设置的话,如果遇到有的链接请求一直超时就造成了死循环
"""
print(key)
print('{} 线程结束'.format(self.getName()))
"""
get_info 这个函数就不用多说了,这个函数的作用就是解析网页并提取需要的信息,最后将信息存放到队列中。具体解析的方式是使用 lxml 这个库,
这个解析库是用C语言写的,所有速度很快,而且它可以使用 Xpath 选择器语法来提取信息,我很喜欢这种方式,当然,使用 bs4 也是可以的。
"""
def get_info(self, html):
tree = etree.HTML(html)
sites = tree.xpath('//ul[@class="listCentent"]/li')
for site in sites:
info = dict()
name = site.xpath('.//h3[@class="rightTxtHead"]/a/text()')
# 有一个网站没有名字,所以要判断查找是否为空
name = name[0] if name else ''
url = site.xpath('.//h3[@class="rightTxtHead"]/span/text()')[0]
alexa = site.xpath('.//p[@class="RtCData"][1]/a/text()')[0]
baidu_s = site.xpath('.//p[@class="RtCData"][2]/a/img/@src')[0]
baidu_pr = site.xpath('.//p[@class="RtCData"][3]/a/img/@src')[0]
baidu_fl = site.xpath('.//p[@class="RtCData"][4]/a/text()')[0]
rank = site.xpath('.//div[@class="RtCRateCent"]/strong/text()')[0]
score = site.xpath('.//div[@class="RtCRateCent"]/span/text()')[0]
# 名称里面有的有特殊字符,所以可以过滤掉特殊的字符
info['name'] = name.encode('gbk', 'ignore').decode('gbk')
info['url'] = url
info['alexa'] = alexa
info['baidu_s'] = self.get_s(baidu_s)
info['baidu_pr'] = self.get_s(baidu_pr)
info['baidu_fl'] = baidu_fl
info['rank'] = rank
info['score'] = score.replace('得分:', '')
self.w.put(info)
def get_s(self, url):
'''从百度权重或PR的图片中提取权重值'''
s = re.findall(r'(\d+)\.gif', url)[0]
return s
"""
Worker 也是一个继承线程类的类,它的作用就是从网站信息中拿信息,然后写入文件中。这个类需要3个参数,第一个是信息存放的队列,第二个是文件的保存的名称,
第三个是一个线程锁,因为涉及到多线程文件操作,所有为了避免线程之间写文件出现文件错乱的情况,必须给线程加锁。
#Worker 类是消费者,作用是从队列拿信息,然后写入到文件中
"""
class Worker(Thread):
def __init__(self, w, file, l):
Thread.__init__(self)
self.w = w
self.file = file
self.l = l
def run(self):
# 循环,但是它跟生产者不同,它没有循环停止的条件,所有要线程跳出循环就必须在函数中设置条件,这个是后话。具体看代码:
while True:
info = self.w.get()
if info is None:
break
try:
# 因为共享资源,所以要加锁
self.l.acquire()
self.writeinfo(info)
self.l.release()
except Exception as e:
print(info, e)
self.w.task_done()
#上面的代码可以看出来,当消费者从线程中读取到的信息是 None 的时候,它就要跳出循环,所以,我们可以在要消费者线程结束的时候向队列中穿入 None。
# 看代码中,当拿到信息准备往文件中写入信息的时候,需要拿一个锁,然后文件写入完毕才释放锁,这是为了保证文件的写入安全。
print('{} worker is done!'.format(self.getName()))
# writeinfo 这个函数就不用多说了,它负责向表格中写入网站信息,由于我们在爬虫启动的时候就创建了一个表格并设置了表头,
# 所以这里的表格跟那个表格的格式应该是一样的。
def writeinfo(self, data):
headers = ['name', 'url', 'alexa', 'baidu_s', 'baidu_pr', 'baidu_fl', 'rank', 'score']
with open(self.file, 'a', newline='', encoding='gbk') as f:
writer = csv.DictWriter(f, fieldnames=headers)
writer.writerow(data)
#get_csv 函数是一个生成表格文件的函数,它可以在爬虫每次运行的时候新生成一个表格,并且给表格添加指定的列标题
def get_csv(filename):
'''创建一个新的csv表格,并且设置标题'''
headers = ['name', 'url', 'alexa', 'baidu_s', 'baidu_pr', 'baidu_fl', 'rank', 'score']
with open(filename, 'w', newline='', encoding='gbk') as f:
writer = csv.DictWriter(f, fieldnames=headers)
writer.writeheader()
#main 函数就不用多说了,它就是负责整个爬虫启动的函数,只需要传入一个文件的名称就行了
def main(file):
#运行 get_csv 函数,产生一个表格
get_csv(file)
# 设置一个全局线程锁和2个全局队列
#向原材料队列中加入指定的 URL 供生产者使用(其实这里的生产者既是生产者也是消费者)
#分别创建和启动多个生产者线程和消费者线程
#当生产者线程都结束了,就往队列中添加一个关键信息 None,用来给消费者发送跳出循环的信号(在 Java 里面称之为“毒丸”)
#最后结束消费者线程,主线程随后也会结束
l = Lock()
q = Queue()
work = Queue()
# 插入的信息是链接和一个基础的请求次数0构成的元组,为了后续判断链接请求了几次
q.put(('http://top.chinaz.com/all/index.html', 0))
baseurl = 'http://top.chinaz.com/all/index_{}.html'
for i in range(2, 1892):
q.put((baseurl.format(i), 0))
pl = [Productor(q, work) for i in range(10)]
for each in pl:
each.start()
wl = [Worker(work, file, l) for i in range(5)]
for each in wl:
each.start()
for each in pl:
each.join()
work.join()
for each in wl:
work.put(None)
for each in wl:
each.join()
print('game over!')
if __name__ == '__main__':
main('info.csv')
最后修改:2020 年 10 月 11 日
© 允许规范转载