ASK KNOX
beta
LESSON 103

Building a Production Image Pipeline

The complete fallback chain: Gemini → Leonardo → gpt-image-1. Quality gates, async generation, CDN delivery, cost tracking per image. This is the production architecture that runs in the Knox ecosystem — built once, running on every content pipeline.

12 min read·AI Image Generation

Track 12 has covered the landscape, the craft of visual prompting, and five individual providers in depth. This final lesson assembles everything into the production architecture — the complete fallback chain with quality gates, async generation, CDN delivery, and per-image cost tracking.

This is not a theoretical framework. It is the architecture running in the Knox content pipeline today, generating hero images for jeremyknox.ai articles via a daily cron, with 99.7%+ image delivery success across six months of production use.

Production Image Pipeline Architecture

The Complete Fallback Chain

The chain logic is simple: try Gemini first (free). On failure, try Leonardo (cheap). On failure, try gpt-image-1 (reliable). On three-provider failure, return a structured error with context for alerting.

The implementation is where the details live.

Full Pipeline Implementation

import asyncio
import base64
import httpx
import logging
import os
import time
from dataclasses import dataclass
from pathlib import Path

logger = logging.getLogger(__name__)

GOOGLE_AI_KEY = os.getenv("GOOGLE_AI_KEY")
LEONARDO_API_KEY = os.getenv("LEONARDO_API_KEY")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
PHOENIX_MODEL_ID = "de7d3faf-762f-48e0-b3b7-9d0ac3a3fcf3"

@dataclass
class ImageResult:
    image_bytes: bytes
    provider: str
    cost_usd: float
    generation_time_s: float

class RateLimitError(Exception):
    pass

class GenerationError(Exception):
    pass

async def _generate_gemini(prompt: str, aspect_ratio: str) -> bytes:
    async with httpx.AsyncClient() as client:
        response = await asyncio.wait_for(
            client.post(
                "https://generativelanguage.googleapis.com/v1beta/models/"
                "imagen-3.0-generate-001:predict",
                headers={"x-goog-api-key": GOOGLE_AI_KEY, "Content-Type": "application/json"},
                json={
                    "instances": [{"prompt": prompt}],
                    "parameters": {"sampleCount": 1, "aspectRatio": aspect_ratio, "safetyFilterLevel": "block_some"},
                },
            ),
            timeout=30,
        )
        if response.status_code == 429:
            raise RateLimitError("Gemini rate limit")
        response.raise_for_status()
        return base64.b64decode(response.json()["predictions"][0]["bytesBase64Encoded"])

async def _generate_leonardo(prompt: str, width: int, height: int) -> bytes:
    async with httpx.AsyncClient() as client:
        resp = await asyncio.wait_for(
            client.post(
                "https://cloud.leonardo.ai/api/rest/v1/generations",
                headers={"Authorization": f"Bearer {LEONARDO_API_KEY}", "Content-Type": "application/json"},
                json={"modelId": PHOENIX_MODEL_ID, "prompt": prompt, "width": width, "height": height,
                      "alchemy": True, "presetStyle": "CINEMATIC", "num_images": 1},
            ),
            timeout=30,
        )
        if resp.status_code == 429:
            raise RateLimitError("Leonardo rate limit")
        resp.raise_for_status()
        generation_id = resp.json()["sdGenerationJob"]["generationId"]

        for _ in range(30):
            await asyncio.sleep(1)
            poll = await client.get(
                f"https://cloud.leonardo.ai/api/rest/v1/generations/{generation_id}",
                headers={"Authorization": f"Bearer {LEONARDO_API_KEY}"},
            )
            data = poll.json()["generations_by_pk"]
            if data["status"] == "COMPLETE":
                img_url = data["generated_images"][0]["url"]
                img_resp = await client.get(img_url)
                return img_resp.content
            if data["status"] == "FAILED":
                raise GenerationError("Leonardo generation failed")
        raise TimeoutError("Leonardo generation timed out")

async def _generate_openai(prompt: str) -> bytes:
    import openai
    client = openai.AsyncOpenAI(api_key=OPENAI_API_KEY)
    response = await asyncio.wait_for(
        client.images.generate(model="gpt-image-1", prompt=prompt, size="1536x1024", quality="medium", n=1),
        timeout=60,
    )
    return base64.b64decode(response.data[0].b64_json)

PROVIDER_COSTS = {
    "gemini": 0.0,
    "leonardo": 0.008,
    "openai": 0.04,
}

async def generate_image(
    prompt: str,
    aspect_ratio: str = "16:9",
    width: int = 1536,
    height: int = 864,
) -> ImageResult:
    providers = [
        ("gemini", lambda: _generate_gemini(prompt, aspect_ratio)),
        ("leonardo", lambda: _generate_leonardo(prompt, width, height)),
        ("openai", lambda: _generate_openai(prompt)),
    ]

    last_error = None
    for provider_name, generate_fn in providers:
        start = time.time()
        try:
            logger.info(f"Attempting image generation via {provider_name}")
            image_bytes = await generate_fn()
            elapsed = time.time() - start

            # Quality gate
            if len(image_bytes) < 50_000:
                raise GenerationError(f"{provider_name}: image too small ({len(image_bytes)} bytes)")

            logger.info(f"Image generated via {provider_name} in {elapsed:.1f}s")
            return ImageResult(
                image_bytes=image_bytes,
                provider=provider_name,
                cost_usd=PROVIDER_COSTS[provider_name],
                generation_time_s=elapsed,
            )
        except (RateLimitError, TimeoutError, GenerationError) as e:
            logger.warning(f"{provider_name} failed: {e}. Falling through to next provider.")
            last_error = e
        except Exception as e:
            logger.error(f"{provider_name} unexpected error: {e}. Falling through.")
            last_error = e

    raise GenerationError(f"All providers failed. Last error: {last_error}")

Quality Gates

Every generated image passes through validation before the result is accepted. The minimum gate: file size must exceed 50KB. Images below this threshold indicate the model returned an error placeholder, a corrupt response, or a trivially small artifact rather than a real generation.

Extended quality checks for production pipelines:

Dimension validation. Decode the image header and verify width/height match the requested dimensions within a tolerance (±10%). Some providers return slightly different dimensions than requested.

Format validation. Verify the file is a valid PNG or JPEG by checking magic bytes (\x89PNG or \xFF\xD8\xFF). API failures sometimes return JSON error bodies with a 200 status code — the format check catches these.

Content screening. For pipelines where content policy compliance is critical, run the image through a safety classifier before acceptance. Google Cloud Vision or AWS Rekognition handle this at roughly $0.001 per image.

Async Generation at Scale

For batch generation (multiple images simultaneously), use asyncio.Semaphore to control concurrency:

async def generate_batch(
    prompts: list[str],
    max_concurrent: int = 5,
) -> list[ImageResult]:
    semaphore = asyncio.Semaphore(max_concurrent)

    async def generate_with_limit(prompt: str) -> ImageResult:
        async with semaphore:
            return await generate_image(prompt)

    return await asyncio.gather(*[generate_with_limit(p) for p in prompts], return_exceptions=True)

The semaphore prevents the batch from hammering all providers simultaneously. Five concurrent generations is sufficient for most content pipelines without triggering rate limits aggressively.

Storage and CDN Delivery

Generated images should never be served directly from provider URLs. DALL-E 3 URLs expire after 60 minutes. Leonardo URLs persist longer but are not guaranteed to remain stable. Always store the image bytes yourself and serve from your own CDN.

import boto3

s3 = boto3.client("s3")

def store_image(image_bytes: bytes, key: str, bucket: str) -> str:
    s3.put_object(
        Bucket=bucket,
        Key=key,
        Body=image_bytes,
        ContentType="image/png",
        CacheControl="public, max-age=31536000",
    )
    return f"https://cdn.yourdomain.com/{key}"

Cloudflare R2 is the cost-optimal CDN storage layer — zero egress fees with S3-compatible API. For high-volume image pipelines, the egress cost difference between R2 and S3 is material.

Cost Tracking Per Image

Track cost at the generation event level, not in aggregate:

@dataclass
class GenerationRecord:
    timestamp: str
    prompt_hash: str  # hash of prompt, not full text
    provider: str
    cost_usd: float
    generation_time_s: float
    image_size_bytes: int
    success: bool

# Log to your analytics store, structured log, or database

The cost tracking system serves two purposes: budget monitoring and provider optimization. If your pipeline shows that Leonardo is being used more than expected (Gemini failing more frequently), the data tells you — and you can investigate whether the prompts are hitting content filters, whether there is a rate limit configuration issue, or whether Gemini is experiencing reliability issues.

Monthly cost report by provider is the minimum telemetry. Alert when any month's projected cost exceeds budget threshold.

Complete Pipeline Configuration

The pipeline needs five environment variables:

GOOGLE_AI_KEY=...           # Google AI Studio API key
LEONARDO_API_KEY=...        # Leonardo platform API key
OPENAI_API_KEY=...          # OpenAI API key
IMAGE_CDN_BUCKET=...        # S3/R2 bucket name
IMAGE_CDN_BASE_URL=...      # CDN base URL for generated public URLs

These live in your .env file or secrets manager, never in code.

Lesson 103 Drill

Assemble the complete pipeline from the code in this lesson. Generate 20 images using the batch function — enough to verify that rate limit handling works and that the fallback chain fires correctly when you exhaust the Gemini quota. Log the provider, cost, and generation time for each image. Calculate the total cost for the batch. Verify every image passes the quality gate. Store the images in a CDN bucket and confirm the public URLs resolve. That is the production baseline.

Bottom Line

The three-provider fallback chain is not engineering for its own sake. It is the architecture that turns a fragile single-provider dependency into a reliable infrastructure component. Gemini handles the free tier. Leonardo catches the rate limit failures with cinematic quality. gpt-image-1 is the reliable backstop. Quality gates validate the output. CDN storage decouples you from provider URL lifecycles. Cost tracking keeps the budget visible.

Build this once. Wire it into every pipeline that needs images. Every content flywheel, every blog autopilot, every social image generator runs on this foundation. The track is complete — go ship something that generates images while you sleep.