进程池爬取并存入mongodb
时间:2021-07-01 10:21:17
帮助过:4人阅读
import json
import pymongo
import pandas as pd
import requests
from lxml import etree
import time
from multiprocessing import Pool
# 设置mongodb
client = pymongo.MongoClient(
‘localhost‘)
db = client[
‘lagou‘]
# 查询的岗位名称
POSITION_NAME =
‘数据挖掘‘
# 想要爬取的总页面数
PAGE_SUM =
200
# 每页返回的职位数量
PAGE_SIZE =
15
# 指定数据库的名字
DATA_NAME =
"DataMiningPosition"
base_url =
‘https://m.lagou.com/search.json?city=%E5%85%A8%E5%9B%BD&positionName={positionName}‘ ‘&pageNo={pageNo}&pageSize={pageSize}‘
def page_index(pageno):
headers =
{
"Accept":
"application/json",
"Accept-Encoding":
"gzip, deflate",
"Accept-Language":
"zh-CN,zh;q=0.9",
# cookie能不要尽量不要,这里正好不用cookie也可以正常返回数据
# "Cookie":
"user_trace_token=20181119151914-03711263-38a2-4d81-bd81-5f480d930039; _ga=GA1.2.605262108.1542611954; _gid=GA1.2.249787972.1542611954; LGSID=20181119151916-6c3da9fa-ebcb-11e8-8958-5254005c3644; PRE_UTM=; PRE_HOST=www.baidu.com; PRE_SITE=https%3A%2F%2Fwww.baidu.com%2Flink%3Furl%3DOnHWjpEfiW4_pVm7hX8NYOFm0iJ7bz1ZJJlaKPPnmMzLE-6ypKNo0f19ABO5bjW4%26wd%3D%26eqid%3D8f61629100016e18000000065bf263e7; PRE_LAND=https%3A%2F%2Fwww.lagou.com%2Fgongsi%2F147.html; LGUID=20181119151916-6c3dabf3-ebcb-11e8-8958-5254005c3644; index_location_city=%E5%85%A8%E5%9B%BD; JSESSIONID=ABAAABAAAGCABCC2D851CA25D1CFCD2B28DCDD6E00A2C7E; _ga=GA1.3.605262108.1542611954; X_HTTP_TOKEN=a0cc1a4beb8a41f57f144bc0bfd77bd7; sajssdk_2015_cross_new_user=1; sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%221672adb3834203-08b3706084b44a-3961430f-1327104-1672adb3835428%22%2C%22%24device_id%22%3A%221672adb3834203-08b3706084b44a-3961430f-1327104-1672adb3835428%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E7%9B%B4%E6%8E%A5%E6%B5%81%E9%87%8F%22%2C%22%24latest_referrer%22%3A%22%22%2C%22%24latest_referrer_host%22%3A%22%22%2C%22%24latest_search_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC_%E7%9B%B4%E6%8E%A5%E6%89%93%E5%BC%80%22%7D%7D; Hm_lvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1542611954,1542612053,1542612277,1542612493; _gat=1; Hm_lpvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1542613115; LGRID=20181119153837-20bafb1a-ebce-11e8-8958-5254005c3644",
"Host":
"m.lagou.com",
"Proxy-Connection":
"keep-alive",
"Referer":
"http://m.lagou.com/search.html",
"X-Requested-With":
"XMLHttpRequest",
‘User-Agent‘:
‘Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) ‘
‘Chrome/45.0.2454.85 Safari/537.36 115Browser/6.0.3‘,
}
url = base_url.format(positionName=POSITION_NAME, pageNo=pageno, pageSize=
PAGE_SIZE)
response = requests.
get(url, headers=
headers)
html =
response.text
content =
json.loads(html)
print(content)
if content.
get(
"content"):
return content
else:
time.sleep(30)
return page_index(pageno)
def parse_page_index(content):
for i
in range(
15):
try:
item = content[
‘content‘][
‘data‘][
‘page‘][
‘result‘][i]
#print(item)
yield {
‘positionId‘: item.
get(
‘positionId‘),
‘positionName‘: item.
get(
‘positionName‘),
‘city‘: item.
get(
‘city‘),
‘createTime‘: item.
get(
‘createTime‘),
‘salary‘: item.
get(
‘salary‘),
‘companyId‘: item.
get(
‘companyId‘),
‘companyFullName‘: item.
get(
‘companyFullName‘)
}
except IndexError as e:
print(‘可能没有那么多字段‘, e)
def save_to_mongo(data):
if db[DATA_NAME].update({
‘positionId‘: data[
‘positionId‘]}, {
‘$set‘: data}, True):
print(‘Saved to Mongo‘, data[
‘positionId‘])
else:
print(‘Saved to Mongo Failed‘, data[
‘positionId‘])
def parse_detail(url):
# url =
"http://m.lagou.com/jobs/4593934.html"
headers =
{
"User-Agent":
"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.140 Safari/537.36",
"Accept":
"text / html, application / xhtml + xml, application / xml;q = 0.9, image / webp, image / apng, * / *;q = 0.8",
"Accept - Encoding":
"gzip, deflate",
"Accept - Language":
"zh - CN, zh;q = 0.9",
"Cache - Control":
"max - age = 0",
"Connection":
"eep - alive",
# "Cookie":
"_ga=GA1.2.474762156.1528795210; _gid=GA1.2.574638607.1528795210; user_trace_token=20180612172010-cdf76dc1-6e21-11e8-9af0-525400f775ce; LGUID=20180612172010-cdf772c0-6e21-11e8-9af0-525400f775ce; Hm_lvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1528795210,1528795215,1528795223; index_location_city=%E5%85%A8%E5%9B%BD; X_HTTP_TOKEN=f3ed266ddeee802fb7d402e4f6d4f4a3; JSESSIONID=ABAAABAAAFDABFG9F9C52FA9D8CAE24F139A0131C45E918; _ga=GA1.3.474762156.1528795210; _gat=1; LGSID=20180612184248-597a7795-6e2d-11e8-9479-5254005c3644; PRE_UTM=; PRE_HOST=; PRE_SITE=http%3A%2F%2Fm.lagou.com%2Fsearch.html; PRE_LAND=http%3A%2F%2Fm.lagou.com%2Fjobs%2F4079910.html; LGRID=20180612184505-ab051d02-6e2d-11e8-9479-5254005c3644; Hm_lpvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1528800306"
}
try:
response = requests.
get(url, headers=
headers)
if response.status_code ==
200:
print("请求成功")
text =
response.content.decode()
# print(text)
html =
etree.HTML(text)
workyear = html.xpath(
‘//span[@class="item workyear"]/span/text()‘)
if workyear:
workyear = workyear[
0]
else:
time.sleep(5)
parse_detail(url)
positiondesc = html.xpath(
‘//div[@class="positiondesc"]//p/text()‘)
#print(workyear, positiondesc)
return workyear, positiondesc
except Exception as e:
print(e)
# 将爬取的数据存到Mongodb
def to_mongo(page_sum):
# 拉勾网顶多只能显示到334页
for page
in range(page_sum):
html =
page_index(page)
items =
parse_page_index(html)
# print(items)
for item
in items:
print(item)
save_to_mongo(item)
# 运用进程池将爬取的数据存到Mongodb
def to_mongo_pool(page):
# 拉勾网顶多只能显示到334页
content =
page_index(page)
items =
parse_page_index(content)
# print(items)
for item
in items:
print(item)
save_to_mongo(item)
# 解析爬取的字条,以便把数据转为DataFrame格式
def parse_items(page_sum):
for page
in range(page_sum):
html =
page_index(page)
items =
parse_page_index(html)
for item
in items:
positionId = item[
"positionId"]
detail_url =
"http://m.lagou.com/jobs/{}.html".format(positionId)
workyear, positiondesc =
parse_detail(detail_url)
print(positionId,positiondesc)
yield [
item["positionId"],
item["positionName"],
item["city"],
item["createTime"],
item["salary"],
item["companyId"],
item["companyFullName"],
workyear,
positiondesc
]
# 把数据保存为csv格式
def to_csv(page_sum):
item_lists =
[]
# print(parse_items())
for item
in parse_items(page_sum):
item_lists.append(item)
#print(item_lists)
data =
pd.DataFrame(item_lists,
columns=[
"positionId",
"positionName",
"city",
"createTime",
"salary",
"companyId",
"companyFullName",
"workyear",
"positiondesc"])
data.to_csv("python_positon.csv")
if __name__ ==
‘__main__‘:
#to_csv
#to_mongo(200)
# 建议保存到mongodb数据库中
start_time =
time.time()
pool =
Pool() # pool()参数:进程个数:默认的是电脑cpu的核的个数,如果要指定进程个数,这个进程个数要小于等于cpu的核数
# 第一个参数是一个函数体,不需要加括号,也不需指定参数。。
# 第二个参数是一个列表,列表中的每个参数都会传给那个函数体
pool.map(to_mongo_pool,[i for i
in range(PAGE_SUM)])
# close它只是把进程池关闭
pool.close()
# join起到一个阻塞的作用,主进程要等待子进程运行完,才能接着往下运行
pool.join()
end_time =
time.time()
print("总耗费时间%.2f秒" % (end_time -
start_time))
进程池爬取并存入mongodb
标签:1.2 默认 data rop get esc row error arch