如何使用asyncio和aiohttp实现大批量pdf文件的爬取任务?并控制并发量为200

如何使用asyncio和aiohttp实现大批量pdf文件的爬取任务?并控制并发量为200

yuzhiblue

2025年1月20日 20:17

author

使用 asyncioaiohttp 实现大批量 PDF 文件的爬取任务,并控制并发量为 200,可以通过以下步骤实现:


1. 实现思路

  • 异步爬取:使用 aiohttp 发起异步 HTTP 请求,下载 PDF 文件。
  • 并发控制:使用 asyncio.Semaphore 控制并发量,确保同时运行的下载任务不超过 200 个。
  • 任务管理:将下载任务添加到任务列表中,并使用 asyncio.gather 等待所有任务完成。

2. 实现代码

以下是一个完整的示例代码:

import asyncio
import aiohttp
import os

# 设置并发量为 200
CONCURRENCY = 200
semaphore = asyncio.Semaphore(CONCURRENCY)

# 下载目录
DOWNLOAD_DIR = "pdf_downloads"
os.makedirs(DOWNLOAD_DIR, exist_ok=True)

async def download_pdf(session, url):
    async with semaphore:  # 控制并发量
        try:
            print(f"开始下载: {url}")
            async with session.get(url) as response:
                if response.status == 200:
                    # 提取文件名
                    file_name = os.path.basename(url)
                    file_path = os.path.join(DOWNLOAD_DIR, file_name)

                    # 保存文件
                    with open(file_path, "wb") as f:
                        while True:
                            chunk = await response.content.read(1024)  # 分块读取
                            if not chunk:
                                break
                            f.write(chunk)
                    print(f"下载完成: {url} -> {file_path}")
                else:
                    print(f"下载失败: {url} (状态码: {response.status})")
        except Exception as e:
            print(f"下载出错: {url} ({str(e)})")

async def main(urls):
    # 创建 aiohttp 会话
    async with aiohttp.ClientSession() as session:
        # 创建下载任务
        tasks = [download_pdf(session, url) for url in urls]
        # 等待所有任务完成
        await asyncio.gather(*tasks)

if __name__ == "__main__":
    # 示例 PDF 文件 URL 列表
    urls = [
        f"https://example.com/pdf/file{i}.pdf" for i in range(1000)
    ]

    # 运行主函数
    asyncio.run(main(urls))

3. 代码说明

(1)并发控制

  • semaphore = asyncio.Semaphore(CONCURRENCY):创建一个信号量,限制并发量为 200。
  • async with semaphore:在 download_pdf 函数中使用信号量,确保同时运行的下载任务不超过 200 个。

(2)异步下载

  • aiohttp.ClientSession:创建一个异步 HTTP 会话,用于发起请求。
  • response.content.read(1024):分块读取文件内容,避免一次性加载大文件到内存。

(3)文件保存

  • os.path.basename(url):从 URL 中提取文件名。
  • os.path.join(DOWNLOAD_DIR, file_name):将文件保存到指定目录。

(4)任务管理

  • tasks = [download_pdf(session, url) for url in urls]:创建下载任务列表。
  • await asyncio.gather(*tasks):并发运行所有任务,并等待它们完成。

4. 优化建议

(1)动态调整并发量

可以根据网络带宽和目标服务器的响应时间动态调整并发量。例如:

async def adjust_concurrency(new_concurrency):
    global semaphore
    while semaphore._value < new_concurrency:
        semaphore.release()  # 增加并发量
    while semaphore._value > new_concurrency:
        await semaphore.acquire()  # 减少并发量

(2)错误重试

为下载任务添加重试机制,避免因网络波动导致任务失败:

import random

async def download_pdf(session, url, retries=3):
    async with semaphore:
        for attempt in range(retries):
            try:
                print(f"开始下载: {url} (尝试 {attempt + 1}/{retries})")
                async with session.get(url) as response:
                    if response.status == 200:
                        file_name = os.path.basename(url)
                        file_path = os.path.join(DOWNLOAD_DIR, file_name)
                        with open(file_path, "wb") as f:
                            while True:
                                chunk = await response.content.read(1024)
                                if not chunk:
                                    break
                                f.write(chunk)
                        print(f"下载完成: {url} -> {file_path}")
                        return
                    else:
                        print(f"下载失败: {url} (状态码: {response.status})")
            except Exception as e:
                print(f"下载出错: {url} ({str(e)})")
            await asyncio.sleep(random.uniform(1, 3))  # 随机等待一段时间后重试
        print(f"下载失败: {url} (重试次数用尽)")

(3)日志记录

将下载日志保存到文件中,便于后续分析:

import logging

logging.basicConfig(
    filename="download.log",
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
)

async def download_pdf(session, url):
    async with semaphore:
        try:
            logging.info(f"开始下载: {url}")
            async with session.get(url) as response:
                if response.status == 200:
                    file_name = os.path.basename(url)
                    file_path = os.path.join(DOWNLOAD_DIR, file_name)
                    with open(file_path, "wb") as f:
                        while True:
                            chunk = await response.content.read(1024)
                            if not chunk:
                                break
                            f.write(chunk)
                    logging.info(f"下载完成: {url} -> {file_path}")
                else:
                    logging.error(f"下载失败: {url} (状态码: {response.status})")
        except Exception as e:
            logging.error(f"下载出错: {url} ({str(e)})")

5. 总结

  • 使用 asyncioaiohttp 可以实现高效的异步爬取任务。
  • 通过 asyncio.Semaphore 控制并发量,确保同时运行的下载任务不超过 200 个。
  • 通过分块读取文件、动态调整并发量、错误重试和日志记录,可以进一步优化下载任务的性能和可靠性。

专业办理低费率POS机,使用稳定,不乱涨价,不乱扣费,微信联系salesleads

版权声明:本站文章大部分为原创文章,如需转载请提前联系站长获得授权;本站部分内容源自网络,本站承诺绝不用于商业用途,如有冒犯请联系站长删除,谢谢。站长微信:salesleads 本站公众号:企泰7TEC,敬请关注!本文链接:https://7tec.cn/detail/271

抖音快手直播伴侣定时下播助手,无需人工值守,直播利器!免费下载试用!

相关推荐