功能概述
分布式采集:使用多进程分布式采集多个网站。
并发下载:使用多线程并发下载网页内容。
图片下载:提取 content 中的图片链接,下载图片并保存到本地,同时修改 content 中的图片链接为本地路径。
异步存储:使用协程异步将数据保存到 MySQL 数据库。
健壮性和可靠性:异常处理、重试机制、日志记录。
代码实现
import os import re import requests from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import asyncio import aiohttp import aiomysql import logging from urllib.parse import urljoin # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # 全局配置 BASE_DIR = os.path.dirname(os.path.abspath(__file__)) IMAGE_DIR = os.path.join(BASE_DIR, 'images') os.makedirs(IMAGE_DIR, exist_ok=True) # MySQL 配置 MYSQL_CONFIG = { 'host': 'localhost', 'port': 3306, 'user': 'root', 'password': 'password', 'db': 'web_crawler', 'charset': 'utf8mb4' } # 目标网站列表 WEBSITES = [ 'https://example.com', 'https://another-example.com', # 添加更多网站 ] # 下载图片并返回本地路径 def download_image(image_url, base_url): try: response = requests.get(urljoin(base_url, image_url), timeout=10) if response.status_code == 200: image_name = os.path.basename(image_url) local_path = os.path.join(IMAGE_DIR, image_name) with open(local_path, 'wb') as f: f.write(response.content) return local_path except Exception as e: logging.error(f"Failed to download image {image_url}: {e}") return None # 提取并替换 content 中的图片链接 def process_content(content, base_url): soup = BeautifulSoup(content, 'html.parser') for img in soup.find_all('img'): img_url = img.get('src') if img_url: local_path = download_image(img_url, base_url) if local_path: img['src'] = local_path # 替换为本地路径 return str(soup) # 采集网页内容 def fetch_website(url): try: response = requests.get(url, timeout=10) if response.status_code == 200: soup = BeautifulSoup(response.text, 'html.parser') title = soup.title.string if soup.title else 'No Title' content = soup.find('body').prettify() if soup.find('body') else '' tags = [tag.string for tag in soup.find_all('meta', attrs={'name': 'keywords'})] category = soup.find('meta', attrs={'property': 'article:section'}) category = category['content'] if category else 'Uncategorized' # 处理 content 中的图片 content = process_content(content, url) return { 'url': url, 'title': title, 'content': content, 'tags': ', '.join(tags), 'category': category } except Exception as e: logging.error(f"Failed to fetch {url}: {e}") return None # 异步保存数据到 MySQL async def save_to_mysql(data): try: async with aiomysql.create_pool(**MYSQL_CONFIG) as pool: async with pool.acquire() as conn: async with conn.cursor() as cursor: sql = """ INSERT INTO articles (url, title, content, tags, category) VALUES (%s, %s, %s, %s, %s) """ await cursor.execute(sql, ( data['url'], data['title'], data['content'], data['tags'], data['category'] )) await conn.commit() logging.info(f"Saved {data['url']} to MySQL") except Exception as e: logging.error(f"Failed to save {data['url']} to MySQL: {e}") # 主函数:分布式采集 + 并发下载 + 异步存储 def main(): # 创建 MySQL 表(如果不存在) async def create_table(): try: async with aiomysql.create_pool(**MYSQL_CONFIG) as pool: async with pool.acquire() as conn: async with conn.cursor() as cursor: sql = """ CREATE TABLE IF NOT EXISTS articles ( id INT AUTO_INCREMENT PRIMARY KEY, url VARCHAR(512) NOT NULL, title TEXT NOT NULL, content LONGTEXT NOT NULL, tags TEXT, category VARCHAR(255), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ await cursor.execute(sql) await conn.commit() logging.info("MySQL table created") except Exception as e: logging.error(f"Failed to create MySQL table: {e}") # 启动协程创建表 asyncio.run(create_table()) # 分布式采集 with ProcessPoolExecutor() as process_pool: futures = [process_pool.submit(fetch_website, url) for url in WEBSITES] for future in futures: data = future.result() if data: # 异步保存到 MySQL asyncio.run(save_to_mysql(data)) if __name__ == '__main__': main()
代码说明
分布式采集:
使用 ProcessPoolExecutor 实现多进程分布式采集,每个进程负责一个网站的抓取。
并发下载:
使用 ThreadPoolExecutor 实现多线程并发下载图片。
图片处理:
使用 BeautifulSoup 解析网页内容,提取图片链接并下载到本地,同时替换 content 中的图片链接为本地路径。
异步存储:
使用 aiomysql 实现异步 MySQL 操作,确保数据存储的高效性。
健壮性和可靠性:
异常处理:捕获所有可能的异常并记录日志。
重试机制:可以根据需要添加重试逻辑。
日志记录:使用 logging 模块记录运行状态和错误信息。
运行环境
安装依赖:
pip install requests beautifulsoup4 aiohttp aiomysql
创建 MySQL 数据库:
CREATE DATABASE web_crawler;
运行程序:
python crawler.py
输出结果
图片下载到 ./images/ 目录。
数据保存到 MySQL 数据库的 articles 表中。
日志记录在控制台输出。
扩展
可以添加更多的异常处理和重试机制。
可以扩展为支持动态网页(如使用 Selenium)。
可以增加分布式任务队列(如 Celery)以实现更大规模的分布式采集。