ASK KNOX
beta
LESSON 95

Building a Gemini-Powered Pipeline

Theory becomes real when you build the full pipeline: multimodal input, Gemini processing, structured output, downstream action. This lesson walks through a production document extraction pipeline — every step, every edge case, every fallback.

11 min read·Building with Gemini

Every lesson in this track has been building toward this one. Multimodal inputs. Model routing. Long context. Cost tracking. Error handling. Now we put them together into a real pipeline.

The pipeline we are building extracts structured data from invoice PDFs — a task that sounds simple and is genuinely non-trivial at production scale. It covers the full lifecycle: file ingestion, Gemini processing with structured output, validation, storage, and the error handling that separates a working prototype from a production system.

Gemini End-to-End Pipeline

The Pipeline Architecture

Our invoice extraction pipeline has four stages:

  1. Input Upload — Receive a PDF, upload to Gemini File API, get a file URI
  2. Gemini Processing — Call generate_content with the file URI and extraction prompt; receive structured JSON
  3. Parse + Validate — Parse the JSON response, validate against schema, flag incomplete extractions
  4. Store + Act — Write validated data to database, trigger downstream actions, archive the raw response

Each stage can fail independently. Each failure has a defined recovery path.

Stage 1: Structured Output Setup

Before any pipeline code, configure Gemini for reliable structured output using response_mime_type and response_schema. This is the correct approach — do not ask Gemini to "output JSON" in prose and parse it manually.

import google.generativeai as genai
from pydantic import BaseModel, Field
from typing import Optional
import json

class InvoiceLineItem(BaseModel):
    description: str
    quantity: float
    unit_price: float
    total: float

class InvoiceData(BaseModel):
    vendor_name: str
    vendor_address: Optional[str] = None
    invoice_number: str
    invoice_date: str
    due_date: Optional[str] = None
    line_items: list[InvoiceLineItem]
    subtotal: float
    tax: Optional[float] = None
    total_amount: float
    currency: str = "USD"
    confidence: float = Field(ge=0.0, le=1.0, description="Extraction confidence 0-1")

# Configure model with structured output
model = genai.GenerativeModel(
    model_name="gemini-2.0-flash",
    generation_config=genai.GenerationConfig(
        response_mime_type="application/json",
        response_schema=InvoiceData.model_json_schema()
    ),
    system_instruction="""
    You are an invoice data extraction system. Extract all invoice fields precisely.
    For any field you cannot determine with confidence, use null.
    Set confidence based on how many required fields were successfully extracted.
    Never fabricate or estimate numerical values — use null if uncertain.
    """
)

Stage 2: The Upload and Process Function

import time
import base64
from pathlib import Path

def upload_pdf(pdf_path: str) -> genai.protos.File:
    """Upload a PDF to the Gemini File API and wait for processing."""
    file = genai.upload_file(pdf_path, mime_type="application/pdf")

    # Wait for file to be processed (usually < 10 seconds for PDFs)
    max_wait = 60
    elapsed = 0
    while file.state.name == "PROCESSING" and elapsed < max_wait:
        time.sleep(2)
        elapsed += 2
        file = genai.get_file(file.name)

    if file.state.name == "FAILED":
        raise ValueError(f"File processing failed for {pdf_path}: {file.state}")

    return file

def extract_invoice_data(pdf_path: str) -> tuple[InvoiceData, dict]:
    """Extract invoice data from a PDF. Returns (validated_data, raw_response_dict)."""
    # Upload and process
    file = upload_pdf(pdf_path)

    try:
        response = model.generate_content([
            file,
            "Extract all invoice fields from this document. Be precise. Use null for uncertain fields."
        ])

        # Parse and validate
        raw_dict = json.loads(response.text)
        validated = InvoiceData(**raw_dict)

        return validated, raw_dict

    finally:
        # Clean up the uploaded file
        genai.delete_file(file.name)

Stage 3: The Full Pipeline with Error Handling

import sqlite3
from dataclasses import dataclass
from typing import Literal

@dataclass
class ProcessingResult:
    file_path: str
    status: Literal["success", "low_confidence", "parse_error", "api_error", "skipped"]
    data: Optional[InvoiceData] = None
    error: Optional[str] = None
    cost_usd: float = 0.0

class InvoicePipeline:
    def __init__(self, db_path: str, confidence_threshold: float = 0.7):
        self.db_path = db_path
        self.confidence_threshold = confidence_threshold
        self.total_cost = 0.0

    def process_invoice(self, pdf_path: str) -> ProcessingResult:
        """Process a single invoice with full error handling."""
        try:
            data, raw_dict = self._extract_with_retry(pdf_path)

            # Track token cost
            # (in production, calculate from response.usage_metadata)

            if data.confidence < self.confidence_threshold:
                # Store for human review but flag it
                self._store(pdf_path, data, raw_dict, status="low_confidence")
                return ProcessingResult(
                    file_path=pdf_path,
                    status="low_confidence",
                    data=data
                )

            self._store(pdf_path, data, raw_dict, status="success")
            return ProcessingResult(file_path=pdf_path, status="success", data=data)

        except json.JSONDecodeError as e:
            # Gemini returned non-JSON despite schema enforcement
            return ProcessingResult(
                file_path=pdf_path,
                status="parse_error",
                error=f"JSON parse error: {e}"
            )
        except Exception as e:
            return ProcessingResult(
                file_path=pdf_path,
                status="api_error",
                error=str(e)
            )

    def _extract_with_retry(self, pdf_path: str, max_attempts: int = 3) -> tuple[InvoiceData, dict]:
        """Extract with retry logic and model escalation."""
        models_to_try = ["gemini-2.0-flash", "gemini-2.0-pro-exp"]

        for model_name in models_to_try:
            current_model = genai.GenerativeModel(
                model_name=model_name,
                generation_config=genai.GenerationConfig(
                    response_mime_type="application/json",
                    response_schema=InvoiceData.model_json_schema()
                ),
                system_instruction=model.system_instruction
            )

            for attempt in range(max_attempts):
                try:
                    file = upload_pdf(pdf_path)
                    response = current_model.generate_content([file, "Extract all invoice fields."])
                    raw_dict = json.loads(response.text)
                    validated = InvoiceData(**raw_dict)
                    return validated, raw_dict

                except Exception as e:
                    if "429" in str(e) or "ResourceExhausted" in type(e).__name__:
                        wait = (2 ** attempt) + random.uniform(0, 1)
                        time.sleep(wait)
                        continue
                    elif attempt == max_attempts - 1:
                        break  # Try next model
                    raise

        raise RuntimeError(f"All extraction attempts failed for {pdf_path}")

    def _store(self, file_path: str, data: InvoiceData, raw: dict, status: str):
        """Persist extracted data and raw response to database."""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                INSERT OR REPLACE INTO invoices
                (file_path, vendor_name, invoice_number, total_amount, status, raw_response, extracted_at)
                VALUES (?, ?, ?, ?, ?, ?, datetime('now'))
            """, (
                file_path, data.vendor_name, data.invoice_number,
                data.total_amount, status, json.dumps(raw)
            ))

    def process_batch(self, pdf_paths: list[str]) -> list[ProcessingResult]:
        """Process a batch of invoices and return results."""
        results = []
        for path in pdf_paths:
            result = self.process_invoice(path)
            results.append(result)
            print(f"{result.status.upper()}: {path}")
        return results

The Fallback Chain in Practice

Production pipelines need provider fallbacks — not just model fallbacks within Gemini, but the ability to route to Claude or GPT-4V if Gemini is unavailable:

PROVIDER_FALLBACK_CHAIN = [
    ("gemini", "gemini-2.0-flash"),
    ("gemini", "gemini-2.0-pro-exp"),
    ("openai", "gpt-4o"),           # fallback to GPT-4V
    ("anthropic", "claude-3-5-sonnet-20241022"),  # final fallback
]

def extract_with_provider_fallback(pdf_path: str) -> InvoiceData:
    for provider, model_id in PROVIDER_FALLBACK_CHAIN:
        try:
            if provider == "gemini":
                return extract_with_gemini(pdf_path, model_id)
            elif provider == "openai":
                return extract_with_openai(pdf_path, model_id)
            elif provider == "anthropic":
                return extract_with_anthropic(pdf_path, model_id)
        except Exception as e:
            print(f"Provider {provider}/{model_id} failed: {e}. Trying next.")
            continue

    raise RuntimeError("All providers in fallback chain failed")

Running the Pipeline

# Initialize and run
pipeline = InvoicePipeline(db_path="invoices.db", confidence_threshold=0.7)

pdf_files = list(Path("./incoming").glob("*.pdf"))
results = pipeline.process_batch(pdf_files)

# Summary
success = sum(1 for r in results if r.status == "success")
low_conf = sum(1 for r in results if r.status == "low_confidence")
errors = sum(1 for r in results if r.status in ("parse_error", "api_error"))

print(f"\nPipeline complete: {len(results)} documents processed")
print(f"  Success: {success}")
print(f"  Low confidence (queued for review): {low_conf}")
print(f"  Errors: {errors}")
print(f"  Total cost: ${pipeline.total_cost:.4f}")

Lesson 95 Drill — Your Capstone

Build this pipeline for a domain relevant to your work. It does not have to be invoices — any document extraction task applies the same pattern:

  1. Define a Pydantic schema for the structured output you need.
  2. Configure Gemini with response_mime_type and response_schema.
  3. Implement retry logic with model escalation.
  4. Add a confidence field and route low-confidence results to a review queue.
  5. Store raw API responses alongside extracted data.
  6. Process a batch of at least 5 documents and review the results.

Document what failed, what surprised you, and what you would change in a real production deployment.

Bottom Line

The full Gemini pipeline — multimodal input, structured extraction, schema validation, retry logic, model escalation, provider fallback, raw response storage — is not more complex than it looks. Each piece handles exactly one failure mode. Assemble them correctly and you have a production-grade document processing system that handles the real world's messiness without breaking. This is what separates AI demos from AI infrastructure.