相信很多业余学习的小伙伴们开始接触python爬虫技术时,都被它深深的吸引了,上手简单,见效快,一个text=urllib.request.urlopen(‘xxxx’).read().decode(‘utf-8)就可以抓到网页的代码内容,通过正则提取自己想要的部分并保存到自己想要的地方去,成就感油然而生,心中萌生出各种伟大的项目。。。。。。
2017年11月23日正式开始自学python,买到了一本《python基础教程》第二版开始了python之旅,12月10日买了本《精通Python网络爬虫》和《精通Scrapy网络爬虫》两本书,这三本书基础教程只看了基础语法,数据类型,定义函数和类,异常处理,精通python网络爬虫只看了多线程实例,精通Scrapy一页都没有翻开,通过自己的学习感悟,给比我还新的新人一点点自学建议,对于零基础的,建议买本基础的工具书把基础语法,数据类型,定义函数学懂再去上手
此博客是本人人生中第一篇博客,比较激动,废话较多,上正题,自学的小伙伴们如果有兴趣可以加我qq互相督促学习进步哈。
正题:python3.6 协程 异步爬虫实现 及 爬取资源到数据库
1,为提高爬取的效率,我们会用到多线程,多进程,协程。关于多进程和多线程网上的概念及用法太多太多,不多啰嗦。关于协程如何实现,首先要用到asyncio和aiohttp模块,asyncio模块用于定义协程函数,启动协程。
定义协程函数用 async def xxx 和 await ,启动协程用loop=asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) (tasks是一个asyncio.ensure_future(协程函数(参数))的列表,相当于多任务,异步执行tasks里的所有任务)或者loop=asyncio.get_event_loop() future=asyncio.ensure_future(协程函数(参数)) loop.run_until_complete(future) 相当于一个任务,调用一次协程函数
2,爬虫时间阻塞主要在对服务器发送请求和接收请求,对抓取的函数我们用异步,那么支持异步的抓取的模块,用aiohttp。至于用法,照猫画虎就好,举个栗子:(这个例子是 根据一个关键词为入口在wap.sogou.com上抓取 搜索相关 的关键词并存入数据库 根据搜索相关的关键词再去搜索得到搜索相关关键词,无限循环)
async def run(url,urlq,kwdq2): headers={'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Encoding':'gb2312,utf-8', 'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:57.0) Gecko/20100101 Firefox/57.0', 'Accept-Language':'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', 'Connection':'Keep-alive' } #headers信息,一般包含User-Agent就可以 try: async with aiohttp.ClientSession() as session: aiohttp.Timeout(1) #设置超时时间 async with session.get(url,headers=headers) as response: #相当于response=urllib.request.urlopen(url) res=await response.text() #相当于res=response.read() 示例种await response.text(encoding='utf-8)
#相当于res.decode(‘utf-8’)
async关键字和await关键字的用法 ,await并不是随便在哪都可以await,async我的理解是告诉程序这条是异步的,我搞得不是特别清楚,记住怎么使用就好,研究明白了我会更新的:)
3,协程的启动:分三步:
1>建立协程任务列表并向列表里放入任务(tasks=[] tasks.append(asyncio.ensure_future(run(参数)) run为定义的协程函数)
理解为tasks里放的是需要异步执行的异步函数,每个异步函数都有自己的具体的数据参数(相当于我已经把具体的url传给你了,只是你还没启动运行,在等候),我觉得很多人会卡在这里,网上的很多例子都是tasks里装的url很简单,而实际中很难先预设好tasks里的任务,所以说tasks里的任务是动态的,不规律的,我提供个思路是,在工作量很大的情况下,将协程函数的参数放到队列或者列表里(队列用put 和get 存取数据,列表用append 和pop存取数据)
2>loop=asyncio.get_event_loop()
理解为建立一个loop,这个loop要工作,等待执行
3>loop.run_until_complete(asyncio.wait(tasks))
这是在告诉程序异步操作的是tasks里的任务 直到处理完所有
while True: start=time.time() tasks=[] count=0 while count<=200: if not urlq.empty(): tempurl=urlq.get() tasks.append(asyncio.ensure_future(run(tempurl,urlq,kwdq2))) #步骤1,把任务存入tasks列表(200个或者队列空) else: #time.sleep(60) break count+=1 #print('------Thread (%s) is working now......------' % threading.currentThread()) #print('url队列剩余{}'.format(len(urlq.queue))) #print('****'*20) loop = asyncio.get_event_loop() #步骤2 loop.run_until_complete(asyncio.wait(tasks)) #步骤3 #print('完成{}个Tasks的时间:{}'.format(count,time.time()-start))
完整代码:
import asyncio #import threading import aiohttp import pymysql import re import time import queue import urllib from urllib import request def is_in_queue(item,q): #判断item是否在队列里 if item in q.queue: return True else: return False async def run(url,urlq,kwdq2): headers={'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Encoding':'gb2312,utf-8', 'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:57.0) Gecko/20100101 Firefox/57.0', 'Accept-Language':'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', 'Connection':'Keep-alive' } try: async with aiohttp.ClientSession() as session: aiohttp.Timeout(1) async with session.get(url,headers=headers) as response: res=await response.text() keywords=re.findall(r'<a href="\?.*?htprequery=.*?">(.*?)</a>',res,re.S) for keyword in keywords: if not is_in_queue(keyword,kwdq2): url='https://wap.sogou.com/web/searchList.jsp?keyword=%s' % urllib.request.quote(keyword) urlq.put(url) kwdq2.put(keyword) conn=pymysql.connect(host='localhost',user='root',passwd='root',db='test',port=3306,charset='utf8',cursorclass=pymysql.cursors.DictCursor) cur=conn.cursor() sql=sql="INSERT INTO keywords(keyword) VALUES ('%s')" try: cur.execute(sql % keyword) conn.commit() print('%s 存入数据库' % keyword) except: print('%s存入数据库出错************' % keyword) pass except: pass urlq=queue.LifoQueue() #Lifo 和 Fifo的队列都可以,看需求 #kwdq1=queue.Queue() kwdq2=queue.Queue() s='游戏' url='https://wap.sogou.com/web/searchList.jsp?keyword=%s' % urllib.request.quote(s) #入口关键词的url地址 urlq.put(url) while True: start=time.time() tasks=[] count=0 while count<=200: if not urlq.empty(): tempurl=urlq.get() tasks.append(asyncio.ensure_future(run(tempurl,urlq,kwdq2))) else: #time.sleep(60) break count+=1 print('------Thread (%s) is working now......------' % threading.currentThread()) print('url队列剩余{}'.format(len(urlq.queue))) print('****'*20) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) print('完成{}个Tasks的时间:{}'.format(count,time.time()-start))
PS:要运行的话 数据库那边的代码要删掉或者改为自己的数据库,关于数据库的操作网上也很多,建议就到百度上面找找.
因为我用的Lifo队列,主要看速度,关键词爬的相关度查好远。。。 201个请求加存入数据库用了76秒。。。在晚上的时候网速好了可以达到15秒左右请求200个链接。
希望这个例子能对初学者有所帮助,共同进步
对以上代码做了点点改良,在存入数据库操作上面也用了异步操作,网速好的时候100个url请求加存储关键词的时间可达到2秒内,每分钟大概可存数据
import asyncio import aiohttp import pymysql import re import time import random import urllib from urllib import request #import uvloop #asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) def is_in_queue(item,q): if item in q: return True else: return False async def run(url,urlq,kwdq2): headers = { 'User-Agent': random.choice(USER_AGENTS), 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Connection': 'keep-alive', 'Accept-Encoding': 'gzip, deflate', } try: async with aiohttp.ClientSession() as session: with aiohttp.Timeout(3): async with session.get(url,headers=headers) as response: res=await response.text(encoding='utf-8') keywords=re.findall(r'<a href="\?keyword=.*?htprequery=.*?">(.*?)</a>',res,re.S) for keyword in keywords: if not is_in_queue(keyword,kwdq2): url='https://wap.sogou.com/web/searchList.jsp?keyword=%s' % urllib.request.quote(keyword) urlq.append(url) kwdq2.append(keyword) try: sql="INSERT INTO keywords(keyword) VALUES ('%s')" cur.execute(sql % keyword) await conn.commit() #print('%s 存入数据库' % keyword) except: #print('%s存入数据库出错************' % keyword) pass except: print('网页打开失败') pass try: conn=pymysql.connect(host='localhost',user='root',passwd='root',db='mysql',port=3306,charset='utf8',cursorclass=pymysql.cursors.DictCursor) cur=conn.cursor() except: print('数据库打开失败') while True: s='电影' url='http://wap.sogou.com/web/searchList.jsp?keyword="%s"&pg=webSearchList' % urllib.request.quote(s) urlq=[] kwdq2=[] urlq.append(url) while True: time_=lambda :time.time() start=time_() tasks=[] count=0 while count<100: if not urlq==[]: tempurl=urlq.pop(random.choice(range(0,len(urlq)))) #random.choice(urlq.queue[0] tasks.append(asyncio.ensure_future(run(tempurl,urlq,kwdq2))) if len(urlq)>10000: for i in range(5000): urlq.pop(random.choice(range(len(urlq)))) if len(kwdq2)>20000: for i in range(10000): kwdq2.pop(random.choice(range(len(kwdq2)))) count+=1 print('url队列剩余{}'.format(len(urlq))) print('****'*20) loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(tasks)) except: print('over') break print('完成{}个Tasks的时间:{}秒'.format(count,time_()-start)) print('sleeping..........') time.sleep(120)
这个版本把url地址的存储从queue变成了list,做了个简单测试,循环十万个存储和读取 list的速度要快于queue,改进前的版本劣势在于判断keyword是否重复的queue无限增大,效率很低,下面的版本将url存储的列表最大上限设为了5000,用来判断重复关键词的kedq2列表上限设为了10000,url的读取采用的random choice,保证了广度和随机性
原文链接:https://blog.csdn.net/Aries8842/article/details/78952532