使用调度程序进行高级多 URL 抓取
注意:Crawl4AI 支持高级调度程序,用于并行或限速爬取,提供动态速率限制和内存使用情况检查。内置arun_many()
函数使用这些调度程序来有效地处理并发。
1. 简介
抓取大量 URL 时:
- 基本:使用
arun()
循环(简单但效率较低) - 更好:使用
arun_many()
,通过适当的并发控制有效地处理多个 URL - 最佳:根据您的特定需求定制调度程序行为(内存管理、速率限制等)
为什么是调度员?
- 自适应:基于内存的调度程序可以根据系统资源暂停或减慢速度
- 速率限制:内置速率限制,针对 429/503 响应采用指数退避
- 实时监控:正在进行的任务、内存使用情况和性能的实时仪表板
- 灵活性:在内存自适应或基于信号量的并发之间进行选择
2.核心组件
2.1 速率限制器
class RateLimiter:
def __init__(
# Random delay range between requests
base_delay: Tuple[float, float] = (1.0, 3.0),
# Maximum backoff delay
max_delay: float = 60.0,
# Retries before giving up
max_retries: int = 3,
# Status codes triggering backoff
rate_limit_codes: List[int] = [429, 503]
)
这是对 RateLimiter 的修改和简化的解释,重点关注构造函数参数并遵守您的 markdown 风格和 mkDocs 指南。
RateLimiter 构造函数参数
RateLimiter 是一个实用程序,用于管理请求速度,以避免服务器过载或因速率限制而导致阻塞。它内部运行以延迟请求并处理重试,但可以使用其构造函数参数进行配置。
参数RateLimiter
构造函数:
1.base_delay
(Tuple[float, float]
, 默认:(1.0, 3.0)
) 对同一域的连续请求之间的随机延迟范围(以秒为单位)。
- 随机延迟选择在
base_delay[0]
和base_delay[1]
对于每个请求。 - 这可以防止以可预测的频率发送请求,从而降低触发速率限制的机会。
例如:如果base_delay = (2.0, 5.0)
,延迟可以随机选择为2.3s
,4.1s
, ETC。
2.max_delay
(float
, 默认:60.0
) 发生限速错误时允许的最大延迟。
- 当服务器返回速率限制响应(例如 429 或 503)时,延迟会随着抖动而呈指数增加。
- 这
max_delay
确保延迟不会变得过高,并将其限制在这个值上。
例如:对于max_delay = 30.0
,即使退避计算表明延迟45s
,上限为30s
。
3.max_retries
(int
, 默认:3
) 发生速率限制错误时请求的最大重试次数。
- 遇到速率限制响应后,
RateLimiter
重试该请求最多次数。 - 如果所有重试都失败,则请求将被标记为失败,并且该过程将继续。
例如:如果max_retries = 3
,系统会重试失败的请求三次,然后放弃。
4.rate_limit_codes
(List[int]
, 默认:[429, 503]
) 触发速率限制逻辑的 HTTP 状态代码列表。
- 这些状态代码表明服务器不堪重负或正在主动限制请求。
- 您可以自定义此列表以根据特定的服务器行为包含其他代码。
例如:如果rate_limit_codes = [429, 503, 504]
,爬虫程序就会因为这三个错误代码而退出。
如何使用RateLimiter
:
以下是初始化和使用的示例RateLimiter
在你的项目中:
from crawl4ai import RateLimiter
# Create a RateLimiter with custom settings
rate_limiter = RateLimiter(
base_delay=(2.0, 4.0), # Random delay between 2-4 seconds
max_delay=30.0, # Cap delay at 30 seconds
max_retries=5, # Retry up to 5 times on rate-limiting errors
rate_limit_codes=[429, 503] # Handle these HTTP status codes
)
# RateLimiter will handle delays and retries internally
# No additional setup is required for its operation
这RateLimiter
与调度程序无缝集成MemoryAdaptiveDispatcher
和SemaphoreDispatcher
确保请求无需用户干预即可正确处理。其内部机制可管理延迟和重试,避免服务器过载,同时最大限度地提高效率。
2.2 爬虫监控
CrawlerMonitor 提供对爬取操作的实时可见性:
from crawl4ai import CrawlerMonitor, DisplayMode
monitor = CrawlerMonitor(
# Maximum rows in live display
max_visible_rows=15,
# DETAILED or AGGREGATED view
display_mode=DisplayMode.DETAILED
)
显示模式:
- 详细信息:显示单个任务状态、内存使用情况和时间
- 汇总:显示汇总统计数据和总体进度
3. 可用的调度程序
3.1 MemoryAdaptiveDispatcher(默认)
根据系统内存使用情况自动管理并发:
from crawl4ai.async_dispatcher import MemoryAdaptiveDispatcher
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=90.0, # Pause if memory exceeds this
check_interval=1.0, # How often to check memory
max_session_permit=10, # Maximum concurrent tasks
rate_limiter=RateLimiter( # Optional rate limiting
base_delay=(1.0, 2.0),
max_delay=30.0,
max_retries=2
),
monitor=CrawlerMonitor( # Optional monitoring
max_visible_rows=15,
display_mode=DisplayMode.DETAILED
)
)
构造函数参数:
1.memory_threshold_percent
(float
, 默认:90.0
) 指定内存使用率阈值(以百分比表示)。如果系统内存使用率超过此值,调度程序将暂停抓取,以防止系统过载。
2.check_interval
(float
, 默认:1.0
)调度程序检查系统内存使用情况的间隔(以秒为单位)。
3.max_session_permit
(int
, 默认:10
) 允许的最大并发爬取任务数。这确保在保持并发性的同时遵守资源限制。
4.memory_wait_timeout
(float
, 默认:600.0
) 可选超时(以秒为单位)。如果内存使用量超过memory_threshold_percent
超过这个时间,MemoryError
被提出。
5.rate_limiter
(RateLimiter
, 默认:None
) 可选的速率限制逻辑,用于避免服务器端阻塞(例如,处理 429 或 503 错误)。详情请参阅 RateLimiter。
6.monitor
(CrawlerMonitor
, 默认:None
) 可选监控,用于实时任务跟踪和性能洞察。详情请参阅 CrawlerMonitor。
3.2 信号量调度器
提供具有固定限制的简单并发控制:
from crawl4ai.async_dispatcher import SemaphoreDispatcher
dispatcher = SemaphoreDispatcher(
max_session_permit=20, # Maximum concurrent tasks
rate_limiter=RateLimiter( # Optional rate limiting
base_delay=(0.5, 1.0),
max_delay=10.0
),
monitor=CrawlerMonitor( # Optional monitoring
max_visible_rows=15,
display_mode=DisplayMode.DETAILED
)
)
构造函数参数:
1.max_session_permit
(int
, 默认:20
) 允许的最大并发爬取任务数,与信号量槽无关。
2.rate_limiter
(RateLimiter
, 默认:None
) 可选的速率限制逻辑,用于避免服务器过载。详情请参阅 RateLimiter。
3.monitor
(CrawlerMonitor
, 默认:None
) 可选监控,用于跟踪任务进度和资源使用情况。详情请参阅 CrawlerMonitor。
4. 使用示例
4.1 批处理(默认)
async def crawl_batch():
browser_config = BrowserConfig(headless=True, verbose=False)
run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
stream=False # Default: get all results at once
)
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=70.0,
check_interval=1.0,
max_session_permit=10,
monitor=CrawlerMonitor(
display_mode=DisplayMode.DETAILED
)
)
async with AsyncWebCrawler(config=browser_config) as crawler:
# Get all results at once
results = await crawler.arun_many(
urls=urls,
config=run_config,
dispatcher=dispatcher
)
# Process all results after completion
for result in results:
if result.success:
await process_result(result)
else:
print(f"Failed to crawl {result.url}: {result.error_message}")
审核: - 目的:执行批量抓取,抓取完成后,将所有 URL 一起处理。 - 调度程序:用途MemoryAdaptiveDispatcher
管理并发性和系统内存。 - 流:已禁用(stream=False
),这样所有结果都会被一次性收集起来以进行后期处理。 - 最佳用例:当您需要在抓取过程中批量分析结果而不是单独分析结果时。
4.2 流模式
async def crawl_streaming():
browser_config = BrowserConfig(headless=True, verbose=False)
run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
stream=True # Enable streaming mode
)
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=70.0,
check_interval=1.0,
max_session_permit=10,
monitor=CrawlerMonitor(
display_mode=DisplayMode.DETAILED
)
)
async with AsyncWebCrawler(config=browser_config) as crawler:
# Process results as they become available
async for result in await crawler.arun_many(
urls=urls,
config=run_config,
dispatcher=dispatcher
):
if result.success:
# Process each result immediately
await process_result(result)
else:
print(f"Failed to crawl {result.url}: {result.error_message}")
审核: - 目的:一旦结果可用,即可通过流式传输进行处理。 - 调度程序:用途MemoryAdaptiveDispatcher
用于并发和内存管理。 - 流:已启用(stream=True
),允许在抓取过程中进行实时处理。 - 最佳用例:当您需要立即对结果采取行动时,例如实时分析或渐进式数据存储。
4.3 基于信号量的爬取
async def crawl_with_semaphore(urls):
browser_config = BrowserConfig(headless=True, verbose=False)
run_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
dispatcher = SemaphoreDispatcher(
semaphore_count=5,
rate_limiter=RateLimiter(
base_delay=(0.5, 1.0),
max_delay=10.0
),
monitor=CrawlerMonitor(
max_visible_rows=15,
display_mode=DisplayMode.DETAILED
)
)
async with AsyncWebCrawler(config=browser_config) as crawler:
results = await crawler.arun_many(
urls,
config=run_config,
dispatcher=dispatcher
)
return results
回顾:-目的:用途SemaphoreDispatcher
使用固定数量的插槽来限制并发。 - 调度程序:配置信号量来控制并行抓取任务。 - 速率限制器:防止服务器因节奏请求而超负荷。 - 最佳用例:当您想要精确控制并发请求的数量,而不受系统内存的影响时。
4.4 Robots.txt 注意事项
import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, CacheMode
async def main():
urls = [
"https://example1.com",
"https://example2.com",
"https://example3.com"
]
config = CrawlerRunConfig(
cache_mode=CacheMode.ENABLED,
check_robots_txt=True, # Will respect robots.txt for each URL
semaphore_count=3 # Max concurrent requests
)
async with AsyncWebCrawler() as crawler:
async for result in crawler.arun_many(urls, config=config):
if result.success:
print(f"Successfully crawled {result.url}")
elif result.status_code == 403 and "robots.txt" in result.error_message:
print(f"Skipped {result.url} - blocked by robots.txt")
else:
print(f"Failed to crawl {result.url}: {result.error_message}")
if __name__ == "__main__":
asyncio.run(main())
审查:-目的:确保遵守robots.txt
道德和合法的网络爬虫规则。 - 配置:设置check_robots_txt=True
验证每个 URLrobots.txt
在抓取之前。 - 调度程序:处理具有并发限制的请求(semaphore_count=3
). - 最佳用例:抓取严格执行 robots.txt 政策的网站或进行负责任的抓取实践时。
5. 调度结果
每个抓取结果都包含调度信息:
@dataclass
class DispatchResult:
task_id: str
memory_usage: float
peak_memory: float
start_time: datetime
end_time: datetime
error_message: str = ""
通过访问result.dispatch_result
:
for result in results:
if result.success:
dr = result.dispatch_result
print(f"URL: {result.url}")
print(f"Memory: {dr.memory_usage:.1f}MB")
print(f"Duration: {dr.end_time - dr.start_time}")
6.总结
1. 两种调度程序类型:
- MemoryAdaptiveDispatcher(默认):基于内存的动态并发
- SemaphoreDispatcher:修复并发限制
2.可选组件:
- RateLimiter:智能请求调节和退避
- CrawlerMonitor:实时进度可视化
3.主要优点:
- 自动内存管理
- 内置速率限制
- 实时进度监控
- 灵活的并发控制
选择最适合您需求的调度程序:
- MemoryAdaptiveDispatcher:适用于大型爬网或资源有限
- SemaphoreDispatcher:适用于简单、固定并发场景