Building a Production Image Pipeline
The complete fallback chain: Gemini → Leonardo → gpt-image-1. Quality gates, async generation, CDN delivery, cost tracking per image. This is the production architecture that runs in the Knox ecosystem — built once, running on every content pipeline.
Track 12 has covered the landscape, the craft of visual prompting, and five individual providers in depth. This final lesson assembles everything into the production architecture — the complete fallback chain with quality gates, async generation, CDN delivery, and per-image cost tracking.
This is not a theoretical framework. It is the architecture running in the Knox content pipeline today, generating hero images for jeremyknox.ai articles via a daily cron, with 99.7%+ image delivery success across six months of production use.
The Complete Fallback Chain
The chain logic is simple: try Gemini first (free). On failure, try Leonardo (cheap). On failure, try gpt-image-1 (reliable). On three-provider failure, return a structured error with context for alerting.
The implementation is where the details live.
Full Pipeline Implementation
import asyncio
import base64
import httpx
import logging
import os
import time
from dataclasses import dataclass
from pathlib import Path
logger = logging.getLogger(__name__)
GOOGLE_AI_KEY = os.getenv("GOOGLE_AI_KEY")
LEONARDO_API_KEY = os.getenv("LEONARDO_API_KEY")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
PHOENIX_MODEL_ID = "de7d3faf-762f-48e0-b3b7-9d0ac3a3fcf3"
@dataclass
class ImageResult:
image_bytes: bytes
provider: str
cost_usd: float
generation_time_s: float
class RateLimitError(Exception):
pass
class GenerationError(Exception):
pass
async def _generate_gemini(prompt: str, aspect_ratio: str) -> bytes:
async with httpx.AsyncClient() as client:
response = await asyncio.wait_for(
client.post(
"https://generativelanguage.googleapis.com/v1beta/models/"
"imagen-3.0-generate-001:predict",
headers={"x-goog-api-key": GOOGLE_AI_KEY, "Content-Type": "application/json"},
json={
"instances": [{"prompt": prompt}],
"parameters": {"sampleCount": 1, "aspectRatio": aspect_ratio, "safetyFilterLevel": "block_some"},
},
),
timeout=30,
)
if response.status_code == 429:
raise RateLimitError("Gemini rate limit")
response.raise_for_status()
return base64.b64decode(response.json()["predictions"][0]["bytesBase64Encoded"])
async def _generate_leonardo(prompt: str, width: int, height: int) -> bytes:
async with httpx.AsyncClient() as client:
resp = await asyncio.wait_for(
client.post(
"https://cloud.leonardo.ai/api/rest/v1/generations",
headers={"Authorization": f"Bearer {LEONARDO_API_KEY}", "Content-Type": "application/json"},
json={"modelId": PHOENIX_MODEL_ID, "prompt": prompt, "width": width, "height": height,
"alchemy": True, "presetStyle": "CINEMATIC", "num_images": 1},
),
timeout=30,
)
if resp.status_code == 429:
raise RateLimitError("Leonardo rate limit")
resp.raise_for_status()
generation_id = resp.json()["sdGenerationJob"]["generationId"]
for _ in range(30):
await asyncio.sleep(1)
poll = await client.get(
f"https://cloud.leonardo.ai/api/rest/v1/generations/{generation_id}",
headers={"Authorization": f"Bearer {LEONARDO_API_KEY}"},
)
data = poll.json()["generations_by_pk"]
if data["status"] == "COMPLETE":
img_url = data["generated_images"][0]["url"]
img_resp = await client.get(img_url)
return img_resp.content
if data["status"] == "FAILED":
raise GenerationError("Leonardo generation failed")
raise TimeoutError("Leonardo generation timed out")
async def _generate_openai(prompt: str) -> bytes:
import openai
client = openai.AsyncOpenAI(api_key=OPENAI_API_KEY)
response = await asyncio.wait_for(
client.images.generate(model="gpt-image-1", prompt=prompt, size="1536x1024", quality="medium", n=1),
timeout=60,
)
return base64.b64decode(response.data[0].b64_json)
PROVIDER_COSTS = {
"gemini": 0.0,
"leonardo": 0.008,
"openai": 0.04,
}
async def generate_image(
prompt: str,
aspect_ratio: str = "16:9",
width: int = 1536,
height: int = 864,
) -> ImageResult:
providers = [
("gemini", lambda: _generate_gemini(prompt, aspect_ratio)),
("leonardo", lambda: _generate_leonardo(prompt, width, height)),
("openai", lambda: _generate_openai(prompt)),
]
last_error = None
for provider_name, generate_fn in providers:
start = time.time()
try:
logger.info(f"Attempting image generation via {provider_name}")
image_bytes = await generate_fn()
elapsed = time.time() - start
# Quality gate
if len(image_bytes) < 50_000:
raise GenerationError(f"{provider_name}: image too small ({len(image_bytes)} bytes)")
logger.info(f"Image generated via {provider_name} in {elapsed:.1f}s")
return ImageResult(
image_bytes=image_bytes,
provider=provider_name,
cost_usd=PROVIDER_COSTS[provider_name],
generation_time_s=elapsed,
)
except (RateLimitError, TimeoutError, GenerationError) as e:
logger.warning(f"{provider_name} failed: {e}. Falling through to next provider.")
last_error = e
except Exception as e:
logger.error(f"{provider_name} unexpected error: {e}. Falling through.")
last_error = e
raise GenerationError(f"All providers failed. Last error: {last_error}")
Quality Gates
Every generated image passes through validation before the result is accepted. The minimum gate: file size must exceed 50KB. Images below this threshold indicate the model returned an error placeholder, a corrupt response, or a trivially small artifact rather than a real generation.
Extended quality checks for production pipelines:
Dimension validation. Decode the image header and verify width/height match the requested dimensions within a tolerance (±10%). Some providers return slightly different dimensions than requested.
Format validation. Verify the file is a valid PNG or JPEG by checking magic bytes (\x89PNG or \xFF\xD8\xFF). API failures sometimes return JSON error bodies with a 200 status code — the format check catches these.
Content screening. For pipelines where content policy compliance is critical, run the image through a safety classifier before acceptance. Google Cloud Vision or AWS Rekognition handle this at roughly $0.001 per image.
Async Generation at Scale
For batch generation (multiple images simultaneously), use asyncio.Semaphore to control concurrency:
async def generate_batch(
prompts: list[str],
max_concurrent: int = 5,
) -> list[ImageResult]:
semaphore = asyncio.Semaphore(max_concurrent)
async def generate_with_limit(prompt: str) -> ImageResult:
async with semaphore:
return await generate_image(prompt)
return await asyncio.gather(*[generate_with_limit(p) for p in prompts], return_exceptions=True)
The semaphore prevents the batch from hammering all providers simultaneously. Five concurrent generations is sufficient for most content pipelines without triggering rate limits aggressively.
Storage and CDN Delivery
Generated images should never be served directly from provider URLs. DALL-E 3 URLs expire after 60 minutes. Leonardo URLs persist longer but are not guaranteed to remain stable. Always store the image bytes yourself and serve from your own CDN.
import boto3
s3 = boto3.client("s3")
def store_image(image_bytes: bytes, key: str, bucket: str) -> str:
s3.put_object(
Bucket=bucket,
Key=key,
Body=image_bytes,
ContentType="image/png",
CacheControl="public, max-age=31536000",
)
return f"https://cdn.yourdomain.com/{key}"
Cloudflare R2 is the cost-optimal CDN storage layer — zero egress fees with S3-compatible API. For high-volume image pipelines, the egress cost difference between R2 and S3 is material.
Cost Tracking Per Image
Track cost at the generation event level, not in aggregate:
@dataclass
class GenerationRecord:
timestamp: str
prompt_hash: str # hash of prompt, not full text
provider: str
cost_usd: float
generation_time_s: float
image_size_bytes: int
success: bool
# Log to your analytics store, structured log, or database
The cost tracking system serves two purposes: budget monitoring and provider optimization. If your pipeline shows that Leonardo is being used more than expected (Gemini failing more frequently), the data tells you — and you can investigate whether the prompts are hitting content filters, whether there is a rate limit configuration issue, or whether Gemini is experiencing reliability issues.
Monthly cost report by provider is the minimum telemetry. Alert when any month's projected cost exceeds budget threshold.
Complete Pipeline Configuration
The pipeline needs five environment variables:
GOOGLE_AI_KEY=... # Google AI Studio API key
LEONARDO_API_KEY=... # Leonardo platform API key
OPENAI_API_KEY=... # OpenAI API key
IMAGE_CDN_BUCKET=... # S3/R2 bucket name
IMAGE_CDN_BASE_URL=... # CDN base URL for generated public URLs
These live in your .env file or secrets manager, never in code.
Lesson 103 Drill
Assemble the complete pipeline from the code in this lesson. Generate 20 images using the batch function — enough to verify that rate limit handling works and that the fallback chain fires correctly when you exhaust the Gemini quota. Log the provider, cost, and generation time for each image. Calculate the total cost for the batch. Verify every image passes the quality gate. Store the images in a CDN bucket and confirm the public URLs resolve. That is the production baseline.
Bottom Line
The three-provider fallback chain is not engineering for its own sake. It is the architecture that turns a fragile single-provider dependency into a reliable infrastructure component. Gemini handles the free tier. Leonardo catches the rate limit failures with cinematic quality. gpt-image-1 is the reliable backstop. Quality gates validate the output. CDN storage decouples you from provider URL lifecycles. Cost tracking keeps the budget visible.
Build this once. Wire it into every pipeline that needs images. Every content flywheel, every blog autopilot, every social image generator runs on this foundation. The track is complete — go ship something that generates images while you sleep.