python分布式爬虫实例

功能概述
分布式采集:使用多进程分布式采集多个网站。

并发下载:使用多线程并发下载网页内容。

图片下载:提取 content 中的图片链接,下载图片并保存到本地,同时修改 content 中的图片链接为本地路径。

异步存储:使用协程异步将数据保存到 MySQL 数据库。

健壮性和可靠性:异常处理、重试机制、日志记录。

代码实现

import os
import re
import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import asyncio
import aiohttp
import aiomysql
import logging
from urllib.parse import urljoin

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 全局配置
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
IMAGE_DIR = os.path.join(BASE_DIR, 'images')
os.makedirs(IMAGE_DIR, exist_ok=True)

# MySQL 配置
MYSQL_CONFIG = {
    'host': 'localhost',
    'port': 3306,
    'user': 'root',
    'password': 'password',
    'db': 'web_crawler',
    'charset': 'utf8mb4'
}

# 目标网站列表
WEBSITES = [
    'https://example.com',
    'https://another-example.com',
    # 添加更多网站
]

# 下载图片并返回本地路径
def download_image(image_url, base_url):
    try:
        response = requests.get(urljoin(base_url, image_url), timeout=10)
        if response.status_code == 200:
            image_name = os.path.basename(image_url)
            local_path = os.path.join(IMAGE_DIR, image_name)
            with open(local_path, 'wb') as f:
                f.write(response.content)
            return local_path
    except Exception as e:
        logging.error(f"Failed to download image {image_url}: {e}")
    return None

# 提取并替换 content 中的图片链接
def process_content(content, base_url):
    soup = BeautifulSoup(content, 'html.parser')
    for img in soup.find_all('img'):
        img_url = img.get('src')
        if img_url:
            local_path = download_image(img_url, base_url)
            if local_path:
                img['src'] = local_path  # 替换为本地路径
    return str(soup)

# 采集网页内容
def fetch_website(url):
    try:
        response = requests.get(url, timeout=10)
        if response.status_code == 200:
            soup = BeautifulSoup(response.text, 'html.parser')
            title = soup.title.string if soup.title else 'No Title'
            content = soup.find('body').prettify() if soup.find('body') else ''
            tags = [tag.string for tag in soup.find_all('meta', attrs={'name': 'keywords'})]
            category = soup.find('meta', attrs={'property': 'article:section'})
            category = category['content'] if category else 'Uncategorized'

            # 处理 content 中的图片
            content = process_content(content, url)

            return {
                'url': url,
                'title': title,
                'content': content,
                'tags': ', '.join(tags),
                'category': category
            }
    except Exception as e:
        logging.error(f"Failed to fetch {url}: {e}")
    return None

# 异步保存数据到 MySQL
async def save_to_mysql(data):
    try:
        async with aiomysql.create_pool(**MYSQL_CONFIG) as pool:
            async with pool.acquire() as conn:
                async with conn.cursor() as cursor:
                    sql = """
                    INSERT INTO articles (url, title, content, tags, category)
                    VALUES (%s, %s, %s, %s, %s)
                    """
                    await cursor.execute(sql, (
                        data['url'],
                        data['title'],
                        data['content'],
                        data['tags'],
                        data['category']
                    ))
                    await conn.commit()
                    logging.info(f"Saved {data['url']} to MySQL")
    except Exception as e:
        logging.error(f"Failed to save {data['url']} to MySQL: {e}")

# 主函数:分布式采集 + 并发下载 + 异步存储
def main():
    # 创建 MySQL 表(如果不存在)
    async def create_table():
        try:
            async with aiomysql.create_pool(**MYSQL_CONFIG) as pool:
                async with pool.acquire() as conn:
                    async with conn.cursor() as cursor:
                        sql = """
                        CREATE TABLE IF NOT EXISTS articles (
                            id INT AUTO_INCREMENT PRIMARY KEY,
                            url VARCHAR(512) NOT NULL,
                            title TEXT NOT NULL,
                            content LONGTEXT NOT NULL,
                            tags TEXT,
                            category VARCHAR(255),
                            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                        )
                        """
                        await cursor.execute(sql)
                        await conn.commit()
                        logging.info("MySQL table created")
        except Exception as e:
            logging.error(f"Failed to create MySQL table: {e}")

    # 启动协程创建表
    asyncio.run(create_table())

    # 分布式采集
    with ProcessPoolExecutor() as process_pool:
        futures = [process_pool.submit(fetch_website, url) for url in WEBSITES]
        for future in futures:
            data = future.result()
            if data:
                # 异步保存到 MySQL
                asyncio.run(save_to_mysql(data))

if __name__ == '__main__':
    main()

代码说明

分布式采集:

使用 ProcessPoolExecutor 实现多进程分布式采集,每个进程负责一个网站的抓取。

并发下载:

使用 ThreadPoolExecutor 实现多线程并发下载图片。

图片处理:

使用 BeautifulSoup 解析网页内容,提取图片链接并下载到本地,同时替换 content 中的图片链接为本地路径。

异步存储:

使用 aiomysql 实现异步 MySQL 操作,确保数据存储的高效性。

健壮性和可靠性:

异常处理:捕获所有可能的异常并记录日志。

重试机制:可以根据需要添加重试逻辑。

日志记录:使用 logging 模块记录运行状态和错误信息。

运行环境

安装依赖:

pip install requests beautifulsoup4 aiohttp aiomysql

创建 MySQL 数据库:

CREATE DATABASE web_crawler;

运行程序:

python crawler.py

输出结果
图片下载到 ./images/ 目录。

数据保存到 MySQL 数据库的 articles 表中。

日志记录在控制台输出。

扩展

可以添加更多的异常处理和重试机制。

可以扩展为支持动态网页(如使用 Selenium)。

可以增加分布式任务队列(如 Celery)以实现更大规模的分布式采集。