协程小爬虫示例,思路挺好的
import asyncio
import aiohttp
import aiomysql
from pyquery import PyQuery
start_url = "http://www.jobbole.com/"
waiting_urls = [] # 待爬取队列
seen_urls = set() # 以爬取队列set类型
stoping = False
sem = asyncio.Semaphore(5)
headers={
'Connection':'keep-alive',
'Cache-Control':'max-age=0',
'Upgrade-Insecure-Requests':'1',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36',
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'Referer':'https://www.baidu.com/s?wd=%E4%BC%AF%E4%B9%90%E5%9C%A8%E7%BA%BF&rsv_spt=1&rsv_iqid=0xa3362c9e0002f178&issp=1&f=8&rsv_bp=1&rsv_idx=2&ie=utf-8&rqlang=&tn=baiduhome_pg&ch=&rsv_enter=1&rsv_btype=i&rsv_dl=ib&inputT=2342',
'Accept-Language':'zh-CN,zh;q=0.9',
'Cookie':'security_session_verify=c429a1801b8fa00e02c38bcba2b77e50; Hm_lvt_42a9b1b1d382d464c04bb82b4916af4d=1589094864,1589103814,1589104191; Hm_lpvt_42a9b1b1d382d464c04bb82b4916af4d=1589104206',
'If-None-Match':'"8ea3-5a5378ea71554-gzip"',
'If-Modified-Since':'Sat, 09 May 2020 14:01:58 GMT',
}
async def fetch(url, session): # 专门用于返回html数据
async with sem:
await asyncio.sleep(1)
try:
async with session.get(url,headers=headers) as resp:
if resp.status in [200, 201]:
data = await resp.text()
return data
except Exception as e:
print(e)
def extract_urls(html): # 提取数据 生产者
urls = []
pq = PyQuery(html)
for link in pq.items('a'):
url = link.attr('href')
url = f"{start_url}{url}"
if url and url not in seen_urls:
urls.append(url)
waiting_urls.append(url)
return urls
async def init_url(url, session):
html = await fetch(url, session)
seen_urls.add(url)
extract_urls(html)
async def article_headler(url, session, pool):
# 获取文章详情并解析入库
html = await fetch(url, session)
extract_urls(html)
pq = PyQuery(html)
title = pq('title').text()
print(f"title:{title}")
async with pool.acquire() as conn:
async with conn.cursor() as cur:
insert_sql = 'insert into article (title) values("{}")'.format(title)
await cur.execute(insert_sql)
async def consumer(pool): # 消费者
async with aiohttp.ClientSession() as session:
while not stoping:
if len(waiting_urls) == 0:
await asyncio.sleep(0.5)
continue
url = waiting_urls.pop()
# print(f'start get url:{url}')
# if re.match('http://.*?jobbole.com/', url):
if url not in seen_urls:
asyncio.ensure_future(article_headler(url, session, pool))
# else:
# if url not in seen_urls:
# asyncio.ensure_future(init_url(url, session))
async def main(loop):
pool = await aiomysql.create_pool(host='192.168.0.101', port=3306,
user='root', password='123456',
db='aio_mysql_test', loop=loop, charset='utf8', autocommit=True)
async with aiohttp.ClientSession() as session:
html = await fetch(start_url, session)
seen_urls.add(start_url)
extract_urls(html)
asyncio.ensure_future(consumer(pool))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
asyncio.ensure_future(main(loop))
loop.run_forever()