使用调度程序进行高级多 URL 抓取

注意:Crawl4AI 支持高级调度程序,用于并行或限速爬取,提供动态速率限制和内存使用情况检查。内置arun_many()函数使用这些调度程序来有效地处理并发。

1. 简介

抓取大量 URL 时:

  • 基本:使用arun()循环(简单但效率较低)
  • 更好:使用arun_many(),通过适当的并发控制有效地处理多个 URL
  • 最佳:根据您的特定需求定制调度程序行为(内存管理、速率限制等)

为什么是调度员?

  • 自适应:基于内存的调度程序可以根据系统资源暂停或减慢速度
  • 速率限制:内置速率限制,针对 429/503 响应采用指数退避
  • 实时监控:正在进行的任务、内存使用情况和性能的实时仪表板
  • 灵活性:在内存自适应或基于信号量的并发之间进行选择

2.核心组件

2.1 速率限制器

class RateLimiter:
    def __init__(
        # Random delay range between requests
        base_delay: Tuple[float, float] = (1.0, 3.0),  

        # Maximum backoff delay
        max_delay: float = 60.0,                        

        # Retries before giving up
        max_retries: int = 3,                          

        # Status codes triggering backoff
        rate_limit_codes: List[int] = [429, 503]        
    )

这是对 RateLimiter 的修改和简化的解释,重点关注构造函数参数并遵守您的 markdown 风格和 mkDocs 指南。

RateLimiter 构造函数参数

RateLimiter 是一个实用程序,用于管理请求速度,以避免服务器过载或因速率限制而导致阻塞。它内部运行以延迟请求并处理重试,但可以使用其构造函数参数进行配置。

参数RateLimiter构造函数:

1.base_delay (Tuple[float, float] , 默认:(1.0, 3.0) ) 对同一域的连续请求之间的随机延迟范围(以秒为单位)。

  • 随机延迟选择在base_delay[0]base_delay[1]对于每个请求。
  • 这可以防止以可预测的频率发送请求,从而降低触发速率限制的机会。

例如:如果base_delay = (2.0, 5.0),延迟可以随机选择为2.3s4.1s , ETC。


2.max_delay (float , 默认:60.0 ) 发生限速错误时允许的最大延迟。

  • 当服务器返回速率限制响应(例如 429 或 503)时,延迟会随着抖动而呈指数增加。
  • max_delay确保延迟不会变得过高,并将其限制在这个值上。

例如:对于max_delay = 30.0,即使退避计算表明延迟45s,上限为30s


3.max_retries (int , 默认:3 ) 发生速率限制错误时请求的最大重试次数。

  • 遇到速率限制响应后,RateLimiter重试该请求最多次数。
  • 如果所有重试都失败,则请求将被标记为失败,并且该过程将继续。

例如:如果max_retries = 3,系统会重试失败的请求三次,然后放弃。


4.rate_limit_codes (List[int] , 默认:[429, 503] ) 触发速率限制逻辑的 HTTP 状态代码列表。

  • 这些状态代码表明服务器不堪重负或正在主动限制请求。
  • 您可以自定义此列表以根据特定的服务器行为包含其他代码。

例如:如果rate_limit_codes = [429, 503, 504],爬虫程序就会因为这三个错误代码而退出。


如何使用RateLimiter

以下是初始化和使用的示例RateLimiter在你的项目中:

from crawl4ai import RateLimiter

# Create a RateLimiter with custom settings
rate_limiter = RateLimiter(
    base_delay=(2.0, 4.0),  # Random delay between 2-4 seconds
    max_delay=30.0,         # Cap delay at 30 seconds
    max_retries=5,          # Retry up to 5 times on rate-limiting errors
    rate_limit_codes=[429, 503]  # Handle these HTTP status codes
)

# RateLimiter will handle delays and retries internally
# No additional setup is required for its operation

RateLimiter与调度程序无缝集成MemoryAdaptiveDispatcherSemaphoreDispatcher确保请求无需用户干预即可正确处理。其内部机制可管理延迟和重试,避免服务器过载,同时最大限度地提高效率。

2.2 爬虫监控

CrawlerMonitor 提供对爬取操作的实时可见性:

from crawl4ai import CrawlerMonitor, DisplayMode
monitor = CrawlerMonitor(
    # Maximum rows in live display
    max_visible_rows=15,          

    # DETAILED or AGGREGATED view
    display_mode=DisplayMode.DETAILED  
)

显示模式:

  1. 详细信息:显示单个任务状态、内存使用情况和时间
  2. 汇总:显示汇总统计数据和总体进度

3. 可用的调度程序

3.1 MemoryAdaptiveDispatcher(默认)

根据系统内存使用情况自动管理并发:

from crawl4ai.async_dispatcher import MemoryAdaptiveDispatcher

dispatcher = MemoryAdaptiveDispatcher(
    memory_threshold_percent=90.0,  # Pause if memory exceeds this
    check_interval=1.0,             # How often to check memory
    max_session_permit=10,          # Maximum concurrent tasks
    rate_limiter=RateLimiter(       # Optional rate limiting
        base_delay=(1.0, 2.0),
        max_delay=30.0,
        max_retries=2
    ),
    monitor=CrawlerMonitor(         # Optional monitoring
        max_visible_rows=15,
        display_mode=DisplayMode.DETAILED
    )
)

构造函数参数:

1.memory_threshold_percent (float , 默认:90.0 ) 指定内存使用率阈值(以百分比表示)。如果系统内存使用率超过此值,调度程序将暂停抓取,以防止系统过载。

2.check_interval (float , 默认:1.0 )调度程序检查系统内存使用情况的间隔(以秒为单位)。

3.max_session_permit (int , 默认:10 ) 允许的最大并发爬取任务数。这确保在保持并发性的同时遵守资源限制。

4.memory_wait_timeout (float , 默认:600.0 ) 可选超时(以秒为单位)。如果内存使用量超过memory_threshold_percent超过这个时间,MemoryError被提出。

5.rate_limiter (RateLimiter , 默认:None ) 可选的速率限制逻辑,用于避免服务器端阻塞(例如,处理 429 或 503 错误)。详情请参阅 RateLimiter。

6.monitor (CrawlerMonitor , 默认:None ) 可选监控,用于实时任务跟踪和性能洞察。详情请参阅 CrawlerMonitor。


3.2 信号量调度器

提供具有固定限制的简单并发控制:

from crawl4ai.async_dispatcher import SemaphoreDispatcher

dispatcher = SemaphoreDispatcher(
    max_session_permit=20,         # Maximum concurrent tasks
    rate_limiter=RateLimiter(      # Optional rate limiting
        base_delay=(0.5, 1.0),
        max_delay=10.0
    ),
    monitor=CrawlerMonitor(        # Optional monitoring
        max_visible_rows=15,
        display_mode=DisplayMode.DETAILED
    )
)

构造函数参数:

1.max_session_permit (int , 默认:20 ) 允许的最大并发爬取任务数,与信号量槽无关。

2.rate_limiter (RateLimiter , 默认:None ) 可选的速率限制逻辑,用于避免服务器过载。详情请参阅 RateLimiter。

3.monitor (CrawlerMonitor , 默认:None ) 可选监控,用于跟踪任务进度和资源使用情况。详情请参阅 CrawlerMonitor。


4. 使用示例

4.1 批处理(默认)

async def crawl_batch():
    browser_config = BrowserConfig(headless=True, verbose=False)
    run_config = CrawlerRunConfig(
        cache_mode=CacheMode.BYPASS,
        stream=False  # Default: get all results at once
    )

    dispatcher = MemoryAdaptiveDispatcher(
        memory_threshold_percent=70.0,
        check_interval=1.0,
        max_session_permit=10,
        monitor=CrawlerMonitor(
            display_mode=DisplayMode.DETAILED
        )
    )

    async with AsyncWebCrawler(config=browser_config) as crawler:
        # Get all results at once
        results = await crawler.arun_many(
            urls=urls,
            config=run_config,
            dispatcher=dispatcher
        )

        # Process all results after completion
        for result in results:
            if result.success:
                await process_result(result)
            else:
                print(f"Failed to crawl {result.url}: {result.error_message}")

审核: - 目的:执行批量抓取,抓取完成后,将所有 URL 一起处理。 - 调度程序:用途MemoryAdaptiveDispatcher管理并发性和系统内存。 - 流:已禁用(stream=False ),这样所有结果都会被一次性收集起来以进行后期处理。 - 最佳用例:当您需要在抓取过程中批量分析结果而不是单独分析结果时。


4.2 流模式

async def crawl_streaming():
    browser_config = BrowserConfig(headless=True, verbose=False)
    run_config = CrawlerRunConfig(
        cache_mode=CacheMode.BYPASS,
        stream=True  # Enable streaming mode
    )

    dispatcher = MemoryAdaptiveDispatcher(
        memory_threshold_percent=70.0,
        check_interval=1.0,
        max_session_permit=10,
        monitor=CrawlerMonitor(
            display_mode=DisplayMode.DETAILED
        )
    )

    async with AsyncWebCrawler(config=browser_config) as crawler:
        # Process results as they become available
        async for result in await crawler.arun_many(
            urls=urls,
            config=run_config,
            dispatcher=dispatcher
        ):
            if result.success:
                # Process each result immediately
                await process_result(result)
            else:
                print(f"Failed to crawl {result.url}: {result.error_message}")

审核: - 目的:一旦结果可用,即可通过流式传输进行处理。 - 调度程序:用途MemoryAdaptiveDispatcher用于并发和内存管理。 - 流:已启用(stream=True ),允许在抓取过程中进行实时处理。 - 最佳用例:当您需要立即对结果采取行动时,例如实时分析或渐进式数据存储。


4.3 基于信号量的爬取

async def crawl_with_semaphore(urls):
    browser_config = BrowserConfig(headless=True, verbose=False)
    run_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS)

    dispatcher = SemaphoreDispatcher(
        semaphore_count=5,
        rate_limiter=RateLimiter(
            base_delay=(0.5, 1.0),
            max_delay=10.0
        ),
        monitor=CrawlerMonitor(
            max_visible_rows=15,
            display_mode=DisplayMode.DETAILED
        )
    )

    async with AsyncWebCrawler(config=browser_config) as crawler:
        results = await crawler.arun_many(
            urls, 
            config=run_config,
            dispatcher=dispatcher
        )
        return results

回顾:-目的:用途SemaphoreDispatcher使用固定数量的插槽来限制并发。 - 调度程序:配置信号量来控制并行抓取任务。 - 速率限制器:防止服务器因节奏请求而超负荷。 - 最佳用例:当您想要精确控制并发请求的数量,而不受系统内存的影响时。


4.4 Robots.txt 注意事项

import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, CacheMode

async def main():
    urls = [
        "https://example1.com",
        "https://example2.com",
        "https://example3.com"
    ]

    config = CrawlerRunConfig(
        cache_mode=CacheMode.ENABLED,
        check_robots_txt=True,  # Will respect robots.txt for each URL
        semaphore_count=3      # Max concurrent requests
    )

    async with AsyncWebCrawler() as crawler:
        async for result in crawler.arun_many(urls, config=config):
            if result.success:
                print(f"Successfully crawled {result.url}")
            elif result.status_code == 403 and "robots.txt" in result.error_message:
                print(f"Skipped {result.url} - blocked by robots.txt")
            else:
                print(f"Failed to crawl {result.url}: {result.error_message}")

if __name__ == "__main__":
    asyncio.run(main())

审查:-目的:确保遵守robots.txt道德和合法的网络爬虫规则。 - 配置:设置check_robots_txt=True验证每个 URLrobots.txt在抓取之前。 - 调度程序:处理具有并发限制的请求(semaphore_count=3 ). - 最佳用例:抓取严格执行 robots.txt 政策的网站或进行负责任的抓取实践时。


5. 调度结果

每个抓取结果都包含调度信息:

@dataclass
class DispatchResult:
    task_id: str
    memory_usage: float
    peak_memory: float
    start_time: datetime
    end_time: datetime
    error_message: str = ""

通过访问result.dispatch_result

for result in results:
    if result.success:
        dr = result.dispatch_result
        print(f"URL: {result.url}")
        print(f"Memory: {dr.memory_usage:.1f}MB")
        print(f"Duration: {dr.end_time - dr.start_time}")

6.总结

1. 两种调度程序类型:

  • MemoryAdaptiveDispatcher(默认):基于内存的动态并发
  • SemaphoreDispatcher:修复并发限制

2.可选组件:

  • RateLimiter:智能请求调节和退避
  • CrawlerMonitor:实时进度可视化

3.主要优点:

  • 自动内存管理
  • 内置速率限制
  • 实时进度监控
  • 灵活的并发控制

选择最适合您需求的调度程序:

  • MemoryAdaptiveDispatcher:适用于大型爬网或资源有限
  • SemaphoreDispatcher:适用于简单、固定并发场景

> Feedback