SimpleCrawl
Back to Blog
RAGAIWeb ScrapingPython

How to Build a RAG Pipeline with Web Data (Complete Guide)

Learn how to build a production-ready RAG pipeline using web-scraped data. Covers extraction, chunking, embedding, and retrieval — with Python code examples and best practices.

SimpleCrawl TeamFebruary 18, 202614 min read

What Is RAG and Why Does Web Data Matter?

Web scraping for RAG pipelines is quickly becoming the default approach for feeding large language models with fresh, domain-specific information. Retrieval-Augmented Generation (RAG) is a technique that enhances LLM outputs by grounding them in external data retrieved at query time — rather than relying solely on a model's training data.

Quick answer: A RAG pipeline retrieves relevant documents from a knowledge base and passes them as context to an LLM before it generates a response. Web scraping supplies the knowledge base with up-to-date content from the open web.

Here's why that matters: LLMs have knowledge cutoffs. GPT-4's training data ends in early 2024. Claude's ends around a similar date. If your users ask about something that happened last week — or need accurate pricing, documentation, or regulatory info — the model will hallucinate or refuse to answer.

RAG fixes this by giving the model access to real, current data. And the richest source of current data is the web.

The RAG Architecture at a Glance

A standard RAG pipeline has four stages:

  1. Ingestion — Collect documents from sources (web pages, PDFs, databases)
  2. Chunking — Split documents into semantically meaningful pieces
  3. Embedding — Convert chunks into vector representations
  4. Retrieval + Generation — Find relevant chunks at query time and pass them to the LLM

Web scraping sits at stage 1, but the quality of your scraping directly affects every downstream stage. Noisy HTML, broken layouts, or missing content at the extraction step will propagate through chunking, embedding, and ultimately the generated answers.

Step 1: Extracting Web Data for RAG

The first challenge is getting clean content from web pages. Raw HTML is full of navigation bars, footers, ads, cookie banners, and boilerplate that you don't want in your knowledge base.

Why Markdown Is the Ideal Format for RAG

You could try to parse HTML yourself, strip tags, and hope for the best. But the industry has converged on markdown as the ideal intermediate format for RAG ingestion:

  • Preserves structure — Headings, lists, tables, and code blocks survive conversion
  • Strips noise — Navigation, ads, and chrome are removed
  • Token-efficient — Markdown uses far fewer tokens than HTML for the same content
  • LLM-native — Models are trained extensively on markdown and understand its structure

Extracting Pages with SimpleCrawl

The fastest path from URL to clean markdown is a single API call with SimpleCrawl:

import requests

response = requests.post(
    "https://api.simplecrawl.com/v1/scrape",
    headers={"Authorization": "Bearer sc_your_api_key"},
    json={
        "url": "https://docs.python.org/3/library/asyncio.html",
        "format": "markdown"
    }
)

data = response.json()
markdown_content = data["markdown"]
title = data["title"]

This returns clean markdown with headings, code blocks, and tables preserved — and all navigation, sidebars, and boilerplate stripped. No browser setup, no proxy configuration, no parsing code.

Scraping Multiple Pages for a Knowledge Base

A real RAG knowledge base typically contains hundreds or thousands of pages. Here's how to scrape an entire documentation site:

import requests
from concurrent.futures import ThreadPoolExecutor
from time import sleep

API_BASE = "https://api.simplecrawl.com/v1"
HEADERS = {"Authorization": "Bearer sc_your_api_key"}

def discover_urls(base_url: str) -> list[str]:
    """Use SimpleCrawl's crawl endpoint to discover all pages."""
    response = requests.post(
        f"{API_BASE}/crawl",
        headers=HEADERS,
        json={
            "url": base_url,
            "max_pages": 500,
            "format": "markdown"
        }
    )
    crawl = response.json()
    return [page["url"] for page in crawl["pages"]]

def scrape_page(url: str) -> dict:
    """Scrape a single page and return structured data."""
    response = requests.post(
        f"{API_BASE}/scrape",
        headers=HEADERS,
        json={"url": url, "format": "markdown"}
    )
    return response.json()

# Discover and scrape
urls = discover_urls("https://docs.example.com")
print(f"Found {len(urls)} pages to scrape")

with ThreadPoolExecutor(max_workers=5) as executor:
    documents = list(executor.map(scrape_page, urls))

print(f"Scraped {len(documents)} documents")

Handling JavaScript-Rendered Content

Many modern documentation sites and web applications render content with JavaScript. Traditional HTTP-based scrapers will get an empty shell. SimpleCrawl handles this automatically with headless browser rendering — you don't need to change anything in your API call. For more details, see our guide on scraping JavaScript-heavy websites.

Step 2: Cleaning and Preprocessing

Raw scraped data — even in markdown form — needs preprocessing before it's ready for chunking.

Remove Boilerplate Patterns

Even after markdown conversion, some pages may contain repeated elements:

import re

def clean_document(markdown: str) -> str:
    """Remove common boilerplate from scraped markdown."""
    # Remove navigation breadcrumbs
    markdown = re.sub(r'^.*?(?:Home|Docs)\s*[>/»].*$', '', markdown, flags=re.MULTILINE)

    # Remove "Was this page helpful?" sections
    markdown = re.sub(r'(?:Was this (?:page|article) helpful\?).*$', '', markdown, flags=re.DOTALL)

    # Remove consecutive blank lines
    markdown = re.sub(r'\n{3,}', '\n\n', markdown)

    return markdown.strip()

Enrich Metadata

Attach metadata to each document so you can filter during retrieval:

from datetime import datetime
from urllib.parse import urlparse

def enrich_document(doc: dict) -> dict:
    """Add useful metadata for downstream filtering."""
    parsed_url = urlparse(doc["url"])
    return {
        "content": clean_document(doc["markdown"]),
        "metadata": {
            "title": doc.get("title", ""),
            "url": doc["url"],
            "domain": parsed_url.netloc,
            "path": parsed_url.path,
            "scraped_at": datetime.utcnow().isoformat(),
            "word_count": len(doc["markdown"].split()),
            "language": doc.get("metadata", {}).get("language", "en"),
        }
    }

enriched_docs = [enrich_document(doc) for doc in documents]

Step 3: Chunking Strategies for Web Content

Chunking is where many RAG pipelines go wrong. Too large and your chunks dilute relevance. Too small and you lose context. The strategy matters even more for web data because web pages have irregular structures.

Fixed-Size Chunking

The simplest approach — split every N tokens with overlap:

from langchain.text_splitter import RecursiveCharacterTextSplitter

splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    separators=["\n## ", "\n### ", "\n\n", "\n", " "]
)

chunks = []
for doc in enriched_docs:
    doc_chunks = splitter.split_text(doc["content"])
    for i, chunk_text in enumerate(doc_chunks):
        chunks.append({
            "text": chunk_text,
            "metadata": {
                **doc["metadata"],
                "chunk_index": i,
            }
        })

This works but ignores semantic boundaries. A chunk might start mid-paragraph and end mid-sentence.

Since we're working with markdown, we can use the document structure to create semantically meaningful chunks:

def chunk_by_headers(markdown: str, max_chunk_size: int = 1500) -> list[dict]:
    """Split markdown into chunks based on heading hierarchy."""
    sections = []
    current_section = {"heading": "", "level": 0, "content": ""}

    for line in markdown.split("\n"):
        # Detect heading level
        if line.startswith("#"):
            # Save previous section if it has content
            if current_section["content"].strip():
                sections.append(current_section.copy())

            level = len(line) - len(line.lstrip("#"))
            current_section = {
                "heading": line.lstrip("# ").strip(),
                "level": level,
                "content": line + "\n"
            }
        else:
            current_section["content"] += line + "\n"

    # Don't forget the last section
    if current_section["content"].strip():
        sections.append(current_section)

    # Merge small sections, split large ones
    chunks = []
    buffer = ""

    for section in sections:
        if len(buffer) + len(section["content"]) > max_chunk_size and buffer:
            chunks.append(buffer.strip())
            buffer = ""

        buffer += section["content"]

        if len(buffer) > max_chunk_size:
            chunks.append(buffer.strip())
            buffer = ""

    if buffer.strip():
        chunks.append(buffer.strip())

    return [{"text": chunk, "index": i} for i, chunk in enumerate(chunks)]

This approach keeps related content together. An H2 section about "Authentication" won't be split across two chunks with unrelated content.

Choosing Chunk Size

The right chunk size depends on your embedding model and retrieval setup:

Embedding ModelRecommended Chunk SizeMax Tokens
OpenAI text-embedding-3-small500-1000 tokens8,191
OpenAI text-embedding-3-large500-1500 tokens8,191
Cohere embed-v3500-1000 tokens512
Voyage AI voyage-3500-1000 tokens32,000

A good default is 800-1200 tokens per chunk with 200 tokens of overlap if using fixed-size splitting.

Step 4: Generating Embeddings

With clean chunks ready, the next step is converting them to vector embeddings for similarity search.

Using OpenAI Embeddings

from openai import OpenAI

client = OpenAI()

def embed_chunks(chunks: list[dict], model: str = "text-embedding-3-small") -> list[dict]:
    """Generate embeddings for a list of chunks."""
    texts = [chunk["text"] for chunk in chunks]

    # Batch API calls (max 2048 texts per request)
    all_embeddings = []
    for i in range(0, len(texts), 2048):
        batch = texts[i:i + 2048]
        response = client.embeddings.create(input=batch, model=model)
        all_embeddings.extend([item.embedding for item in response.data])

    for chunk, embedding in zip(chunks, all_embeddings):
        chunk["embedding"] = embedding

    return chunks

embedded_chunks = embed_chunks(chunks)
print(f"Generated {len(embedded_chunks)} embeddings")

Cost Estimation

OpenAI's text-embedding-3-small costs $0.02 per million tokens. For a typical knowledge base:

  • 1,000 web pages × ~500 words each = 500,000 words ≈ 650,000 tokens
  • Embedding cost: ~$0.013
  • With chunking overhead: ~$0.02

Embedding web data is cheap. The scraping step is where most of the cost and complexity lies — which is why using an API like SimpleCrawl instead of maintaining your own scraping infrastructure makes financial sense.

Step 5: Storing Vectors in a Database

You need a vector database to store and query your embeddings efficiently.

Using Pinecone

from pinecone import Pinecone

pc = Pinecone(api_key="your-api-key")
index = pc.Index("web-knowledge-base")

def upsert_chunks(chunks: list[dict], batch_size: int = 100):
    """Store embedded chunks in Pinecone."""
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i:i + batch_size]
        vectors = [
            {
                "id": f"{chunk['metadata']['url']}#{chunk['metadata']['chunk_index']}",
                "values": chunk["embedding"],
                "metadata": {
                    "text": chunk["text"],
                    "title": chunk["metadata"]["title"],
                    "url": chunk["metadata"]["url"],
                    "domain": chunk["metadata"]["domain"],
                }
            }
            for chunk in batch
        ]
        index.upsert(vectors=vectors)

upsert_chunks(embedded_chunks)

Using PostgreSQL with pgvector

If you prefer self-hosted, pgvector is excellent:

import psycopg2
import numpy as np

conn = psycopg2.connect("postgresql://user:pass@localhost/ragdb")
cur = conn.cursor()

# Create table (run once)
cur.execute("""
    CREATE TABLE IF NOT EXISTS documents (
        id SERIAL PRIMARY KEY,
        url TEXT,
        title TEXT,
        chunk_text TEXT,
        chunk_index INTEGER,
        embedding vector(1536)
    );
    CREATE INDEX IF NOT EXISTS documents_embedding_idx
        ON documents USING ivfflat (embedding vector_cosine_ops);
""")

# Insert chunks
for chunk in embedded_chunks:
    cur.execute(
        "INSERT INTO documents (url, title, chunk_text, chunk_index, embedding) VALUES (%s, %s, %s, %s, %s)",
        (
            chunk["metadata"]["url"],
            chunk["metadata"]["title"],
            chunk["text"],
            chunk["metadata"]["chunk_index"],
            np.array(chunk["embedding"]).tolist(),
        )
    )

conn.commit()

Step 6: Retrieval and Generation

Now for the payoff — querying the knowledge base and generating grounded answers.

Basic RAG Query

from openai import OpenAI

client = OpenAI()

def rag_query(question: str, top_k: int = 5) -> str:
    """Answer a question using the web knowledge base."""
    # 1. Embed the question
    q_embedding = client.embeddings.create(
        input=question,
        model="text-embedding-3-small"
    ).data[0].embedding

    # 2. Retrieve relevant chunks
    results = index.query(
        vector=q_embedding,
        top_k=top_k,
        include_metadata=True
    )

    # 3. Build context
    context_parts = []
    for match in results.matches:
        source = match.metadata
        context_parts.append(
            f"[Source: {source['title']} — {source['url']}]\n{source['text']}"
        )
    context = "\n\n---\n\n".join(context_parts)

    # 4. Generate answer
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": (
                    "You are a helpful assistant. Answer the user's question using "
                    "ONLY the provided context. If the context doesn't contain enough "
                    "information, say so. Cite sources using [Source: title] format."
                )
            },
            {
                "role": "user",
                "content": f"Context:\n{context}\n\nQuestion: {question}"
            }
        ],
        temperature=0.1
    )

    return response.choices[0].message.content

# Example usage
answer = rag_query("How do I handle async generators in Python 3.12?")
print(answer)

Improving Retrieval Quality

Basic vector similarity search works, but there are proven techniques to improve relevance:

Hybrid search — Combine vector similarity with keyword matching (BM25). This catches cases where the exact term matters more than semantic meaning.

Re-ranking — Use a cross-encoder model to re-rank the top-k results. Cohere's Rerank API or a local cross-encoder from sentence-transformers both work well:

import cohere

co = cohere.Client("your-api-key")

def rerank_results(question: str, documents: list[str], top_n: int = 3) -> list:
    """Re-rank retrieved documents for better relevance."""
    results = co.rerank(
        query=question,
        documents=documents,
        top_n=top_n,
        model="rerank-english-v3.0"
    )
    return [
        {"index": r.index, "relevance_score": r.relevance_score}
        for r in results.results
    ]

Metadata filtering — Filter by domain, date, or content type before vector search to narrow the search space.

Step 7: Keeping Your Knowledge Base Fresh

Web data goes stale. Prices change, documentation gets updated, and news happens daily. A production RAG pipeline needs a refresh strategy.

Incremental Scraping

Rather than re-scraping everything, track what's changed:

import hashlib
from datetime import datetime, timedelta

def content_hash(text: str) -> str:
    return hashlib.sha256(text.encode()).hexdigest()

def refresh_knowledge_base(urls: list[str], existing_hashes: dict):
    """Only re-process pages whose content has changed."""
    updated = []

    for url in urls:
        doc = scrape_page(url)
        new_hash = content_hash(doc["markdown"])

        if url not in existing_hashes or existing_hashes[url] != new_hash:
            updated.append(doc)
            existing_hashes[url] = new_hash

    print(f"{len(updated)} of {len(urls)} pages have changed")
    return updated, existing_hashes

Scheduling with Cron

For production deployments, run the refresh on a schedule. Daily is sufficient for most use cases; hourly for fast-changing data like news or pricing.

# refresh_pipeline.py — Run via cron: 0 2 * * * python refresh_pipeline.py
import json

HASH_FILE = "content_hashes.json"

# Load existing hashes
try:
    with open(HASH_FILE) as f:
        existing_hashes = json.load(f)
except FileNotFoundError:
    existing_hashes = {}

# Discover and check for changes
urls = discover_urls("https://docs.example.com")
updated_docs, existing_hashes = refresh_knowledge_base(urls, existing_hashes)

if updated_docs:
    enriched = [enrich_document(doc) for doc in updated_docs]
    new_chunks = []
    for doc in enriched:
        new_chunks.extend(chunk_by_headers(doc["content"]))
    embedded = embed_chunks(new_chunks)
    upsert_chunks(embedded)
    print(f"Updated {len(embedded)} chunks in the knowledge base")

# Save hashes
with open(HASH_FILE, "w") as f:
    json.dump(existing_hashes, f)

Common Pitfalls and How to Avoid Them

Building RAG pipelines with web data introduces challenges that don't exist with static document collections:

1. Scraping Failures Corrupt the Knowledge Base

If a scrape returns an error page or partial content, that garbage gets embedded and can surface in retrieval. Always validate scraped content:

def validate_document(doc: dict) -> bool:
    """Reject documents that are likely error pages or too short."""
    content = doc.get("markdown", "")
    if len(content.split()) < 50:
        return False
    if any(phrase in content.lower() for phrase in ["403 forbidden", "404 not found", "access denied"]):
        return False
    return True

2. Duplicate Content Inflates Your Index

Many sites have the same content at multiple URLs (www vs non-www, trailing slashes, query parameters). Deduplicate by content hash before embedding.

3. Token Budget Overflow

If your retrieved chunks are too large, you'll blow past the LLM's context window. Calculate token counts and truncate intelligently:

import tiktoken

enc = tiktoken.encoding_for_model("gpt-4o")

def fit_context(chunks: list[str], max_tokens: int = 6000) -> list[str]:
    """Select chunks that fit within the token budget."""
    selected = []
    total_tokens = 0

    for chunk in chunks:
        chunk_tokens = len(enc.encode(chunk))
        if total_tokens + chunk_tokens > max_tokens:
            break
        selected.append(chunk)
        total_tokens += chunk_tokens

    return selected

4. Ignoring Source Attribution

Users trust RAG answers more when they can verify the source. Always pass URLs and titles through to your final output.

Why SimpleCrawl for RAG Pipelines?

Building a RAG pipeline involves enough complexity in the chunking, embedding, and retrieval layers. The data extraction step should be the easy part.

SimpleCrawl handles the entire scraping layer so you can focus on building:

  • Clean markdown output — No HTML parsing or content extraction logic needed
  • JavaScript rendering — SPAs and dynamic content work out of the box
  • Anti-bot bypass — Proxies, fingerprint management, and CAPTCHA solving handled for you
  • Crawl endpoint — Discover and scrape entire sites with a single API call
  • Structured extraction — Define a schema and get JSON back, no selectors needed
  • 99.9% uptime — Reliable enough for production refresh pipelines

With a free tier of 500 credits per month, you can build and test your RAG pipeline without spending a dollar.

FAQ

What is the best data format for RAG pipelines?

Markdown is the best format for RAG pipelines. It preserves document structure (headings, lists, code blocks) while stripping away HTML noise, uses far fewer tokens than raw HTML, and LLMs understand markdown natively since they were trained on large amounts of it.

How often should I refresh my RAG knowledge base?

It depends on how quickly your source data changes. For documentation sites, weekly refreshes are usually sufficient. For news or pricing data, daily or hourly updates may be necessary. Use content hashing to only re-process pages that have actually changed.

Can I use RAG with web data for commercial applications?

Yes, but respect website terms of service and robots.txt directives. Many public websites allow scraping for indexing purposes. Always check the specific site's policies and consider reaching out for explicit permission for large-scale commercial use.

How many web pages do I need for a good RAG knowledge base?

Quality matters more than quantity. A focused knowledge base of 200-500 well-chosen pages on a specific topic will outperform 10,000 random pages. Start small, measure retrieval quality, and expand strategically.

What embedding model should I use for web-scraped RAG data?

For most use cases, OpenAI's text-embedding-3-small offers the best balance of quality, speed, and cost. If you need higher accuracy and can afford longer latency, text-embedding-3-large or Cohere's embed-v3 are excellent choices.

Ready to try SimpleCrawl?

We're building the simplest web scraping API for AI. Join the waitlist and get 500 free credits at launch.

Get early access + 500 free credits