| 3 min read

Running Playwright Scrapers at Scale for AI Data Collection

Playwright web scraping data collection automation Python

Why Playwright for AI Data Collection

AI applications need data, and much of the most valuable data lives on the web. Traditional HTTP-based scrapers fail on modern websites that rely heavily on JavaScript rendering, dynamic content loading, and anti-bot measures. Playwright solves these problems by running a real browser, which means you see exactly what a human user would see.

I use Playwright extensively to collect data for AI enrichment, content analysis, and competitive intelligence. Here is how I run it reliably at scale.

Basic Playwright Setup for Python

from playwright.async_api import async_playwright

async def scrape_page(url: str) -> dict:
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        context = await browser.new_context(
            user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                       "AppleWebKit/537.36 (KHTML, like Gecko) "
                       "Chrome/120.0.0.0 Safari/537.36"
        )
        page = await context.new_page()
        
        await page.goto(url, wait_until="networkidle")
        
        title = await page.title()
        content = await page.inner_text("body")
        
        await browser.close()
        
        return {"url": url, "title": title, "content": content}

Scaling to Hundreds of Pages

Scraping one page at a time is fine for testing but far too slow for production data collection. I use a concurrent scraping pattern with controlled parallelism:

import asyncio
from playwright.async_api import async_playwright

class ScalableScraper:
    def __init__(self, max_concurrent: int = 5):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.results = []
        self.errors = []
    
    async def scrape_url(self, browser, url: str) -> dict:
        async with self.semaphore:
            context = await browser.new_context()
            page = await context.new_page()
            
            try:
                await page.goto(url, timeout=30000, wait_until="domcontentloaded")
                await page.wait_for_timeout(2000)  # Let dynamic content load
                
                data = {
                    "url": url,
                    "title": await page.title(),
                    "content": await page.inner_text("main") 
                               if await page.query_selector("main")
                               else await page.inner_text("body"),
                    "status": "success"
                }
                return data
            except Exception as e:
                return {"url": url, "status": "error", "error": str(e)}
            finally:
                await context.close()
    
    async def scrape_batch(self, urls: list[str]) -> list[dict]:
        async with async_playwright() as p:
            browser = await p.chromium.launch(headless=True)
            
            tasks = [self.scrape_url(browser, url) for url in urls]
            results = await asyncio.gather(*tasks)
            
            await browser.close()
            return results

Reliability Patterns

Web scraping at scale is inherently flaky. Pages time out, layouts change, and anti-bot systems block you. Here are the reliability patterns I use:

Retry with Backoff

async def scrape_with_retry(browser, url: str, max_retries: int = 3) -> dict:
    for attempt in range(max_retries):
        try:
            result = await scrape_url(browser, url)
            if result["status"] == "success":
                return result
        except Exception:
            pass
        
        if attempt < max_retries - 1:
            await asyncio.sleep(2 ** attempt)  # Exponential backoff
    
    return {"url": url, "status": "failed", "attempts": max_retries}

Request Throttling

Being a good citizen matters for both ethics and reliability. I add delays between requests to the same domain:

  • Minimum 2 seconds between requests to the same domain
  • Randomized delays to avoid predictable patterns
  • Respect robots.txt directives
  • Stop immediately if the site returns 429 (Too Many Requests)

Session Management

For sites that require login or maintain state, I persist browser sessions:

async def create_authenticated_context(browser, credentials: dict):
    context = await browser.new_context()
    page = await context.new_page()
    
    # Perform login
    await page.goto(credentials["login_url"])
    await page.fill("input[name='email']", credentials["email"])
    await page.fill("input[name='password']", credentials["password"])
    await page.click("button[type='submit']")
    await page.wait_for_navigation()
    
    # Save session state for reuse
    storage = await context.storage_state()
    return context, storage

Data Extraction Patterns

Raw page content is rarely what you want. I use targeted extraction to pull specific data elements:

async def extract_structured_data(page) -> dict:
    return {
        "title": await page.text_content("h1"),
        "author": await safe_text(page, ".author-name"),
        "date": await safe_text(page, "time[datetime]"),
        "body": await safe_text(page, "article"),
        "tags": await page.eval_on_selector_all(
            ".tag", "els => els.map(e => e.textContent.trim())"
        ),
        "links": await page.eval_on_selector_all(
            "article a[href]", "els => els.map(e => e.href)"
        )
    }

async def safe_text(page, selector: str) -> str:
    element = await page.query_selector(selector)
    if element:
        return (await element.text_content()).strip()
    return ""

Resource Management on a VPS

Playwright launches real browser instances, which consume significant memory. On my VPS, I manage this carefully:

  • Limit concurrent browser contexts to 5 per scraping job
  • Close contexts immediately after scraping each page
  • Use a single browser instance shared across all concurrent contexts
  • Monitor memory usage and pause if it exceeds 80% of available RAM
  • Run scraping jobs during off-peak hours when other services are less active

Storing Scraped Data for AI Processing

Scraped data flows directly into my Supabase database where it becomes available for AI processing:

async def store_scraped_data(data: dict):
    # Generate embedding for semantic search
    embedding = get_embedding(data["content"][:8000])
    
    await supabase.table("scraped_pages").insert({
        "url": data["url"],
        "title": data["title"],
        "content": data["content"],
        "embedding": embedding,
        "scraped_at": datetime.utcnow().isoformat()
    }).execute()
Web scraping is the unglamorous foundation of many AI applications. The quality of your data collection directly determines the quality of your AI output. Invest in reliable, respectful scraping infrastructure.

Legal and Ethical Considerations

Always check terms of service, respect robots.txt, and consider the impact of your scraping on the target site. Collect only the data you need, store it securely, and delete it when it is no longer required. Responsible scraping ensures you can continue collecting data long-term without legal issues.