Building a Gemini-Powered Pipeline
Theory becomes real when you build the full pipeline: multimodal input, Gemini processing, structured output, downstream action. This lesson walks through a production document extraction pipeline — every step, every edge case, every fallback.
Every lesson in this track has been building toward this one. Multimodal inputs. Model routing. Long context. Cost tracking. Error handling. Now we put them together into a real pipeline.
The pipeline we are building extracts structured data from invoice PDFs — a task that sounds simple and is genuinely non-trivial at production scale. It covers the full lifecycle: file ingestion, Gemini processing with structured output, validation, storage, and the error handling that separates a working prototype from a production system.
The Pipeline Architecture
Our invoice extraction pipeline has four stages:
- Input Upload — Receive a PDF, upload to Gemini File API, get a file URI
- Gemini Processing — Call
generate_contentwith the file URI and extraction prompt; receive structured JSON - Parse + Validate — Parse the JSON response, validate against schema, flag incomplete extractions
- Store + Act — Write validated data to database, trigger downstream actions, archive the raw response
Each stage can fail independently. Each failure has a defined recovery path.
Stage 1: Structured Output Setup
Before any pipeline code, configure Gemini for reliable structured output using response_mime_type and response_schema. This is the correct approach — do not ask Gemini to "output JSON" in prose and parse it manually.
import google.generativeai as genai
from pydantic import BaseModel, Field
from typing import Optional
import json
class InvoiceLineItem(BaseModel):
description: str
quantity: float
unit_price: float
total: float
class InvoiceData(BaseModel):
vendor_name: str
vendor_address: Optional[str] = None
invoice_number: str
invoice_date: str
due_date: Optional[str] = None
line_items: list[InvoiceLineItem]
subtotal: float
tax: Optional[float] = None
total_amount: float
currency: str = "USD"
confidence: float = Field(ge=0.0, le=1.0, description="Extraction confidence 0-1")
# Configure model with structured output
model = genai.GenerativeModel(
model_name="gemini-2.0-flash",
generation_config=genai.GenerationConfig(
response_mime_type="application/json",
response_schema=InvoiceData.model_json_schema()
),
system_instruction="""
You are an invoice data extraction system. Extract all invoice fields precisely.
For any field you cannot determine with confidence, use null.
Set confidence based on how many required fields were successfully extracted.
Never fabricate or estimate numerical values — use null if uncertain.
"""
)
Stage 2: The Upload and Process Function
import time
import base64
from pathlib import Path
def upload_pdf(pdf_path: str) -> genai.protos.File:
"""Upload a PDF to the Gemini File API and wait for processing."""
file = genai.upload_file(pdf_path, mime_type="application/pdf")
# Wait for file to be processed (usually < 10 seconds for PDFs)
max_wait = 60
elapsed = 0
while file.state.name == "PROCESSING" and elapsed < max_wait:
time.sleep(2)
elapsed += 2
file = genai.get_file(file.name)
if file.state.name == "FAILED":
raise ValueError(f"File processing failed for {pdf_path}: {file.state}")
return file
def extract_invoice_data(pdf_path: str) -> tuple[InvoiceData, dict]:
"""Extract invoice data from a PDF. Returns (validated_data, raw_response_dict)."""
# Upload and process
file = upload_pdf(pdf_path)
try:
response = model.generate_content([
file,
"Extract all invoice fields from this document. Be precise. Use null for uncertain fields."
])
# Parse and validate
raw_dict = json.loads(response.text)
validated = InvoiceData(**raw_dict)
return validated, raw_dict
finally:
# Clean up the uploaded file
genai.delete_file(file.name)
Stage 3: The Full Pipeline with Error Handling
import sqlite3
from dataclasses import dataclass
from typing import Literal
@dataclass
class ProcessingResult:
file_path: str
status: Literal["success", "low_confidence", "parse_error", "api_error", "skipped"]
data: Optional[InvoiceData] = None
error: Optional[str] = None
cost_usd: float = 0.0
class InvoicePipeline:
def __init__(self, db_path: str, confidence_threshold: float = 0.7):
self.db_path = db_path
self.confidence_threshold = confidence_threshold
self.total_cost = 0.0
def process_invoice(self, pdf_path: str) -> ProcessingResult:
"""Process a single invoice with full error handling."""
try:
data, raw_dict = self._extract_with_retry(pdf_path)
# Track token cost
# (in production, calculate from response.usage_metadata)
if data.confidence < self.confidence_threshold:
# Store for human review but flag it
self._store(pdf_path, data, raw_dict, status="low_confidence")
return ProcessingResult(
file_path=pdf_path,
status="low_confidence",
data=data
)
self._store(pdf_path, data, raw_dict, status="success")
return ProcessingResult(file_path=pdf_path, status="success", data=data)
except json.JSONDecodeError as e:
# Gemini returned non-JSON despite schema enforcement
return ProcessingResult(
file_path=pdf_path,
status="parse_error",
error=f"JSON parse error: {e}"
)
except Exception as e:
return ProcessingResult(
file_path=pdf_path,
status="api_error",
error=str(e)
)
def _extract_with_retry(self, pdf_path: str, max_attempts: int = 3) -> tuple[InvoiceData, dict]:
"""Extract with retry logic and model escalation."""
models_to_try = ["gemini-2.0-flash", "gemini-2.0-pro-exp"]
for model_name in models_to_try:
current_model = genai.GenerativeModel(
model_name=model_name,
generation_config=genai.GenerationConfig(
response_mime_type="application/json",
response_schema=InvoiceData.model_json_schema()
),
system_instruction=model.system_instruction
)
for attempt in range(max_attempts):
try:
file = upload_pdf(pdf_path)
response = current_model.generate_content([file, "Extract all invoice fields."])
raw_dict = json.loads(response.text)
validated = InvoiceData(**raw_dict)
return validated, raw_dict
except Exception as e:
if "429" in str(e) or "ResourceExhausted" in type(e).__name__:
wait = (2 ** attempt) + random.uniform(0, 1)
time.sleep(wait)
continue
elif attempt == max_attempts - 1:
break # Try next model
raise
raise RuntimeError(f"All extraction attempts failed for {pdf_path}")
def _store(self, file_path: str, data: InvoiceData, raw: dict, status: str):
"""Persist extracted data and raw response to database."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT OR REPLACE INTO invoices
(file_path, vendor_name, invoice_number, total_amount, status, raw_response, extracted_at)
VALUES (?, ?, ?, ?, ?, ?, datetime('now'))
""", (
file_path, data.vendor_name, data.invoice_number,
data.total_amount, status, json.dumps(raw)
))
def process_batch(self, pdf_paths: list[str]) -> list[ProcessingResult]:
"""Process a batch of invoices and return results."""
results = []
for path in pdf_paths:
result = self.process_invoice(path)
results.append(result)
print(f"{result.status.upper()}: {path}")
return results
The Fallback Chain in Practice
Production pipelines need provider fallbacks — not just model fallbacks within Gemini, but the ability to route to Claude or GPT-4V if Gemini is unavailable:
PROVIDER_FALLBACK_CHAIN = [
("gemini", "gemini-2.0-flash"),
("gemini", "gemini-2.0-pro-exp"),
("openai", "gpt-4o"), # fallback to GPT-4V
("anthropic", "claude-3-5-sonnet-20241022"), # final fallback
]
def extract_with_provider_fallback(pdf_path: str) -> InvoiceData:
for provider, model_id in PROVIDER_FALLBACK_CHAIN:
try:
if provider == "gemini":
return extract_with_gemini(pdf_path, model_id)
elif provider == "openai":
return extract_with_openai(pdf_path, model_id)
elif provider == "anthropic":
return extract_with_anthropic(pdf_path, model_id)
except Exception as e:
print(f"Provider {provider}/{model_id} failed: {e}. Trying next.")
continue
raise RuntimeError("All providers in fallback chain failed")
Running the Pipeline
# Initialize and run
pipeline = InvoicePipeline(db_path="invoices.db", confidence_threshold=0.7)
pdf_files = list(Path("./incoming").glob("*.pdf"))
results = pipeline.process_batch(pdf_files)
# Summary
success = sum(1 for r in results if r.status == "success")
low_conf = sum(1 for r in results if r.status == "low_confidence")
errors = sum(1 for r in results if r.status in ("parse_error", "api_error"))
print(f"\nPipeline complete: {len(results)} documents processed")
print(f" Success: {success}")
print(f" Low confidence (queued for review): {low_conf}")
print(f" Errors: {errors}")
print(f" Total cost: ${pipeline.total_cost:.4f}")
Lesson 95 Drill — Your Capstone
Build this pipeline for a domain relevant to your work. It does not have to be invoices — any document extraction task applies the same pattern:
- Define a Pydantic schema for the structured output you need.
- Configure Gemini with
response_mime_typeandresponse_schema. - Implement retry logic with model escalation.
- Add a confidence field and route low-confidence results to a review queue.
- Store raw API responses alongside extracted data.
- Process a batch of at least 5 documents and review the results.
Document what failed, what surprised you, and what you would change in a real production deployment.
Bottom Line
The full Gemini pipeline — multimodal input, structured extraction, schema validation, retry logic, model escalation, provider fallback, raw response storage — is not more complex than it looks. Each piece handles exactly one failure mode. Assemble them correctly and you have a production-grade document processing system that handles the real world's messiness without breaking. This is what separates AI demos from AI infrastructure.