实现多线程爬取数据并保存到mongodb
时间:2021-07-01 10:21:17
帮助过:13人阅读
from lxml import etree
import requests
from queue import Queue
index_url=
‘https://m.lianjia.com/gz/ershoufang/pg{}/‘
detail_url=
‘https://m.lianjia.com{}‘
# 设置爬取主页的页数
INDEX_PAGE_NUM=
200
# 定义一个类
# 0定义主页url队列、主页html队列、详情页url队列、html队列、内容队列
# 1获取首页url并解析详情页url
# 2获取详情页的内容
# 3保存内容
# 4设置多线程调用方法
# 设置mongodb
client = pymongo.MongoClient(
‘localhost‘)
# 设置数据库名
db = client[
‘ershoufang‘]
# 指定集合名
index =
‘index_info‘
detail =
‘detail_info‘
class lianJia():
def __init__(self):
self.index_url_queue=
Queue()
self.html_index_queue=
Queue()
self.index_content_queue=
Queue()
self.detail_content_queue =
Queue()
# 获取主页的url和html内容并解析出index页内容和详情页url
def get_index(self):
for i
in range(INDEX_PAGE_NUM):
# print(index_url.format(i+
1))
url=index_url.format(i+
1)
self.index_url_queue.put(url)
# index=requests.
get(index_url.format(i+
1)).content.decode()
# self.html_index_queue.put(index)
# 获取主页html
def get_index_html(self):
while True:
url=self.index_url_queue.
get()
index = requests.
get(url).content.decode()
self.html_index_queue.put(index)
self.index_url_queue.task_done()
def parse_index(self):
while True:
# 获取队列里得内容
html1=self.html_index_queue.
get()
xml=
etree.HTML(html1)
pingjie_list=xml.xpath(
‘‘‘//ul[@class=‘lists
‘]/li[position()>1]‘‘‘)
# 将 pingjie_list拼接在xpath前,少写xpath语句
index_content_list=
[]
for pj
in pingjie_list:
index_infor=
{}
# #判空炒作,如果为空则显示none if len(index_infor[
‘title‘]) >
0 else None
index_infor[‘title‘]=pj.xpath(
‘‘‘./div/div[@class=‘item_list
‘]/div[1]/text()‘‘‘)
index_infor[‘title‘]=index_infor[
‘title‘][
0]
if len(index_infor[
‘title‘]) >
0 else None
index_infor[‘detail_url‘] = pj.xpath(
‘‘‘./a/@href‘‘‘)[
0]
index_infor[‘index_detail‘]=pj.xpath(
‘‘‘./div/div[2]/div[2]/text()‘‘‘)
index_infor[‘index_detail‘]=index_infor[
‘index_detail‘][
0]
if len(index_infor[
‘index_detail‘])>
0 else None
index_infor[‘total_price‘]=pj.xpath(
‘‘‘./div/div[2]/div[position()>2]/span[1]/em/text()‘‘‘)
index_infor[‘total_price‘]= index_infor[
‘total_price‘][
0]
if len( index_infor[
‘total_price‘])>
0 else None
index_infor[‘average_price‘]=pj.xpath(
‘‘‘./div/div[@class=‘item_list
‘]/div[3]/span[2]/text()‘‘‘)
index_infor[‘average_price‘]=index_infor[
‘average_price‘][
0]
if len(index_infor[
‘average_price‘])>
0 else None
index_content_list.append(index_infor)
# 队列保存时不能在循环里 否之回保存很多个队列
# self.index_content_queue.put(index_content_list)
# 把content_list放进content_queue里面
self.index_content_queue.put(index_content_list)
# print(index_content_list)
# 每从队列中获取一个数,队列则减少一个数,所以此代码必须写
self.html_index_queue.task_done()
# 获取详情页内容
def get_detail(self):
pass
# 保存内容
def save_content(self):
while True:
index_conten_list=self.index_content_queue.
get()
for i
in index_conten_list:
# print(i[‘title‘])
if i[
‘title‘]==None or i[
‘total_price‘]==None or i[
‘average_price‘]==
None:
print(‘该数据为空,不进行保存‘)
else:
db[‘index_info‘].insert(i)
# db[‘detailDta‘].insert(detail_datas)
print(‘保存数据成功‘)
self.index_content_queue.task_done()
# 主线程:分配各种子线程去执行class里得每一个函数
# 使用队列的方式得设置多线程进行调用函数,才能让程序执行速度更快
def run(self):
# 设置线程列表
thread_list=
[]
# start_time=
time.time()
# 1.url_list
# threading.Thread不需要传参数,参数都是从队列里面取得
# for i
in range(
20):
t_index_u=threading.Thread(target=
self.get_index)
thread_list.append(t_index_u)
# 2.遍历,发送请求,获取响应
for i
in range(
20):
t_index_html=threading.Thread(target=
self.get_index_html)
thread_list.append(t_index_html)
# 3.提取数据
for i
in range(
2):
t_parse_index=threading.Thread(target=
self.parse_index)
thread_list.append(t_parse_index)
# 4.保存数据
t_save=threading.Thread(target=
self.save_content)
thread_list.append(t_save)
# 循环开启各子线程
for t
in thread_list:
# 表示主线程结束,子线程(设置为true无限循环)也跟着结束(用主线程控制子线程)
t.setDaemon(True)
# 启动线程
t.start()
for q
in [self.index_url_queue,self.html_index_queue,self.index_content_queue]:
# 让主线程等待阻塞,等待队列的任务完成(即队列为空时 )之后再进行主线程
q.join()
# end_time=
time.time()
# print(‘总耗时%.2f秒‘%(end_time-
start_time))
if __name__==
‘__main__‘:
sk =
time.clock()
func=
lianJia()
func.run()
ek =
time.clock()
print(‘程序总耗时:‘,ek-sk)
多线程爬取糗事百科:
# coding=utf-8
import requests
from lxml import etree
import threading
from queue import Queue
# https://docs.python.org/3/library/queue.html#module-queue
# 队列使用方法简介
# q.qsize() 返回队列的大小
# q.empty() 如果队列为空,返回True,反之False
# q.full() 如果队列满了,返回True,反之False
# q.full 与 maxsize 大小对应
# q.get([block[, timeout]]) 获取队列,timeout等待时间
# q.get_nowait() 相当q.get(False)
# q.put(item) 写入队列,timeout等待时间
# q.put_nowait(item) 相当q.put(item, False)
# q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
# q.join() 实际上意味着等到队列为空,再执行别的操作
class QiubaiSpdier:
def __init__(self):
self.url_temp = "https://www.qiushibaike.com/8hr/page/{}/"
self.headers = {"User-Agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36"}
self.url_queue = Queue()
self.html_queue = Queue()
self.content_queue = Queue()
def get_url_list(self):
# return [self.url_temp.format(i) for i in range(1,14)]
for i in range(1,14):
# 把13个索引页面的Url放进url_queue队列里
self.url_queue.put(self.url_temp.format(i))
def parse_url(self):
while True:
# get方法和task_done搭配使用
# 在put是队列+1,get和task_done一起使用时队列才会-1
url = self.url_queue.get()
print(url)
response = requests.get(url,headers=self.headers)
# 然后把索引页的响应页面放进html_queue队列里
self.html_queue.put(response.content.decode())
self.url_queue.task_done()
def get_content_list(self): #提取数据
while True:
# 先从索引页响应页面html_queue队列里面取出索引页面
html_str = self.html_queue.get()
html = etree.HTML(html_str)
div_list = html.xpath("//div[@id=‘content-left‘]/div") #分组
content_list = []
for div in div_list:
item= {}
item["content"] = div.xpath(".//div[@class=‘content‘]/span/text()")
item["content"] = [i.replace("\n","") for i in item["content"]]
item["author_gender"] = div.xpath(".//div[contains(@class,‘articleGender‘)]/@class")
item["author_gender"] = item["author_gender"][0].split(" ")[-1].replace("Icon","") if len(item["author_gender"])>0 else None
item["auhtor_age"] = div.xpath(".//div[contains(@class,‘articleGender‘)]/text()")
item["auhtor_age"] = item["auhtor_age"][0] if len(item["auhtor_age"])>0 else None
item["content_img"] = div.xpath(".//div[@class=‘thumb‘]/a/img/@src")
item["content_img"] = "https:"+item["content_img"][0] if len(item["content_img"])>0 else None
item["author_img"] = div.xpath(".//div[@class=‘author clearfix‘]//img/@src")
item["author_img"] = "https:"+item["author_img"][0] if len(item["author_img"])>0 else None
item["stats_vote"] = div.xpath(".//span[@class=‘stats-vote‘]/i/text()")
item["stats_vote"] = item["stats_vote"][0] if len(item["stats_vote"])>0 else None
content_list.append(item)
# 把content_list放进content_queue里面
self.content_queue.put(content_list)
self.html_queue.task_done()
def save_content_list(self): #保存
while True:
content_list = self.content_queue.get()
for i in content_list:
print(i)
pass
self.content_queue.task_done()
def run(self): #实现主要逻辑
thread_list = []
#1.url_list
# threading.Thread不需要传参数,参数都是从队列里面取得
t_url = threading.Thread(target=self.get_url_list)
thread_list.append(t_url)
#2.遍历,发送请求,获取响应
for i in range(20): # 添加20个线程
t_parse = threading.Thread(target=self.parse_url)
thread_list.append(t_parse)
#3.提取数据
for i in range(2): # 添加2个线程
t_html = threading.Thread(target=self.get_content_list)
thread_list.append(t_html)
#4.保存
t_save = threading.Thread(target=self.save_content_list)
thread_list.append(t_save)
for t in thread_list:
t.setDaemon(True) #把子线程设置为守护线程,该线程不重要,主线程结束,子线程结束(子线程是while true不会自己结束)
t.start()
for q in [self.url_queue,self.html_queue,self.content_queue]:
q.join() #让主线程等待阻塞,等待队列的任务完成(即队列为空时 )之后再进行主线程
print("主线程结束")
if __name__ == ‘__main__‘:
qiubai = QiubaiSpdier()
qiubai.run()
# 所没有tast_done方法,程序最终会卡着不动,无法终止
# 线程的设计注意:耗时的操作要分配一些线程
实现多线程爬取数据并保存到mongodb
标签:mac os 数据库 结束 item timeout 速度 发送 requests www