然后主节点和工作节点如何通信的呢?

然后主节点和工作节点如何通信的呢?

yuzhiblue

2025年1月20日 14:46

author

在分布式爬虫系统中,主节点和工作节点之间的通信通常通过消息队列分布式任务队列来实现。以下是常见的通信方式和实现方法:


1. 通信方式

  • 消息队列:主节点将任务(如URL)放入消息队列,工作节点从队列中获取任务并执行。
  • 分布式任务队列:使用任务队列框架(如Celery、RQ)来分发任务和接收结果。
  • 直接通信:主节点和工作节点通过HTTP或RPC(如gRPC)直接通信(较少使用,因为扩展性较差)。

2. 常用工具

  • 消息队列:RabbitMQ、Kafka、Redis。
  • 任务队列框架:Celery、RQ(Redis Queue)。
  • RPC框架:gRPC、ZeroMQ。

3. 基于消息队列的通信(以RabbitMQ为例)

  • 主节点:将任务(如URL)发布到消息队列。
  • 工作节点:从消息队列中消费任务,执行后返回结果。

主节点代码(发布任务)

import pika

# 连接到RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明队列
channel.queue_declare(queue='pdf_tasks')

# 发布任务(URL)
urls = [
    "http://example.com/page1",
    "http://example.com/page2",
    "http://example.com/page3"
]

for url in urls:
    channel.basic_publish(exchange='', routing_key='pdf_tasks', body=url)
    print(f" [x] Sent {url}")

# 关闭连接
connection.close()

工作节点代码(消费任务)

import pika
import requests
from bs4 import BeautifulSoup
import os

# 连接到RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明队列
channel.queue_declare(queue='pdf_tasks')

# 下载PDF文件的函数
def download_pdf(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.content, 'html.parser')
    pdf_links = [link['href'] for link in soup.find_all('a', href=True) if link['href'].endswith('.pdf')]

    for pdf_link in pdf_links:
        pdf_url = pdf_link if pdf_link.startswith('http') else f"{url}/{pdf_link}"
        pdf_response = requests.get(pdf_url)
        pdf_filename = os.path.basename(pdf_url)
        with open(pdf_filename, 'wb') as f:
            f.write(pdf_response.content)
        print(f" [x] Downloaded {pdf_filename}")

# 回调函数,处理接收到的任务
def callback(ch, method, properties, body):
    url = body.decode('utf-8')
    print(f" [x] Received {url}")
    download_pdf(url)
    print(f" [x] Done {url}")

# 消费任务
channel.basic_consume(queue='pdf_tasks', on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

4. 基于任务队列的通信(以Celery为例)

  • 主节点:将任务发送到任务队列。
  • 工作节点:从任务队列中获取任务并执行,结果可以返回给主节点或存储到数据库。

主节点代码(发送任务)

from celery import Celery

# 初始化Celery应用
app = Celery('pdf_crawler', broker='pyamqp://guest@localhost//')

# 定义任务
@app.task
def download_pdf(url):
    # 这里可以调用工作节点的逻辑
    return f"Processed {url}"

# 发送任务
if __name__ == "__main__":
    urls = [
        "http://example.com/page1",
        "http://example.com/page2",
        "http://example.com/page3"
    ]
    for url in urls:
        download_pdf.delay(url)
        print(f" [x] Sent {url}")

工作节点代码(执行任务)

from celery import Celery
import requests
from bs4 import BeautifulSoup
import os

# 初始化Celery应用
app = Celery('pdf_crawler', broker='pyamqp://guest@localhost//')

# 定义任务
@app.task
def download_pdf(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.content, 'html.parser')
    pdf_links = [link['href'] for link in soup.find_all('a', href=True) if link['href'].endswith('.pdf')]

    for pdf_link in pdf_links:
        pdf_url = pdf_link if pdf_link.startswith('http') else f"{url}/{pdf_link}"
        pdf_response = requests.get(pdf_url)
        pdf_filename = os.path.basename(pdf_url)
        with open(pdf_filename, 'wb') as f:
            f.write(pdf_response.content)
        print(f" [x] Downloaded {pdf_filename}")
    return f"Processed {url}"

# 启动工作节点
if __name__ == "__main__":
    app.worker_main()

5. 通信流程总结

  1. 主节点
  2. 将任务(如URL)发送到消息队列或任务队列。
  3. 工作节点
  4. 从队列中获取任务。
  5. 执行任务(爬取网页、下载PDF)。
  6. 将结果返回给主节点或存储到数据库。
  7. 队列
  8. 作为中间件,解耦主节点和工作节点,确保任务可靠分发。

6. 选择通信方式的考虑

  • 消息队列:适合任务分发和异步处理,扩展性强。
  • 任务队列:适合需要任务管理和结果返回的场景。
  • 直接通信:适合小规模系统,但扩展性较差。

通过消息队列或任务队列,主节点和工作节点可以高效通信,实现分布式爬虫的任务分发和执行。

专业办理低费率POS机,使用稳定,不乱涨价,不乱扣费,微信联系salesleads

版权声明:本站文章大部分为原创文章,如需转载请提前联系站长获得授权;本站部分内容源自网络,本站承诺绝不用于商业用途,如有冒犯请联系站长删除,谢谢。站长微信:salesleads 本站公众号:企泰7TEC,敬请关注!本文链接:https://7tec.cn/detail/254

抖音快手直播伴侣定时下播助手,无需人工值守,直播利器!免费下载试用!

相关推荐