ASK KNOX
beta
LESSON 111

Building a Production Video Pipeline

Video generation takes minutes, costs real money, and fails in specific ways. A production-grade pipeline handles async job management, quality validation, cost tracking, and retry logic — before you discover you need them at 2am.

11 min read·AI Video Generation

A video pipeline that works on the first test run and a video pipeline that works in production are not the same thing.

The test run executes under ideal conditions: fresh credentials, no rate limits hit, the API returns on the first call, the generated video passes every quality check, storage writes without error. Production is different. APIs return 429s at 3am. Generation jobs timeout without explanation. Files arrive corrupted. Cost spikes from a bug that queued 50 jobs instead of 5.

The difference between a pipeline that handles these failures gracefully and one that fails silently — costing money and producing nothing — is architecture. Not clever code. Architecture: async job management, quality validation, cost tracking, retry logic, and storage that doesn't depend on the provider keeping your files around.

Production Video Pipeline Architecture

Async Job Management

Video generation is never synchronous. Veo 2 takes 30-120 seconds. HeyGen takes 2-10 minutes. Runway takes 30-90 seconds. Any pipeline that blocks synchronously on these calls will either timeout or hold worker processes hostage for minutes per video.

The correct pattern is a queue-backed async job system:

import asyncio
from dataclasses import dataclass
from typing import Optional
import uuid

@dataclass
class VideoJob:
    job_id: str
    prompt: str
    platform: str  # "veo" | "heygen" | "runway"
    status: str    # "pending" | "submitted" | "processing" | "complete" | "failed"
    provider_job_id: Optional[str] = None
    video_url: Optional[str] = None
    cost_usd: Optional[float] = None
    created_at: Optional[float] = None

class VideoQueue:
    def __init__(self):
        self.queue: asyncio.Queue[VideoJob] = asyncio.Queue()
        self.results: dict[str, VideoJob] = {}

    async def submit(self, prompt: str, platform: str) -> str:
        job = VideoJob(
            job_id=str(uuid.uuid4()),
            prompt=prompt,
            platform=platform,
            status="pending",
        )
        await self.queue.put(job)
        return job.job_id

    async def process(self):
        """Worker: pulls from queue, submits to API, polls for completion."""
        while True:
            job = await self.queue.get()
            try:
                await self._execute(job)
            except Exception as e:
                job.status = "failed"
                await self._alert(job, str(e))
            finally:
                self.results[job.job_id] = job
                self.queue.task_done()

The caller submits a job and receives a job_id. The worker handles submission, polling, and result storage asynchronously. The caller polls results[job_id] to check for completion — or gets notified via callback when the status changes.

Key design decisions:

  • Job state is persisted to database, not memory — a process restart should not lose in-flight jobs
  • Worker pool size is bounded — do not allow unbounded concurrent generation requests, which will hit API rate limits and create cost spikes
  • Job_id maps back to caller context — so when the video completes, the pipeline knows what to do with it

Quality Validation

Every generated video requires validation before it enters the delivery pipeline. "The API returned a 200" is not sufficient. Specifically:

Resolution check: Use ffprobe to read the video metadata and confirm the output resolution matches the requested spec. A 1080p request that returns a 720p file is a quality failure.

import subprocess, json

def check_resolution(video_path: str, expected_width: int, expected_height: int) -> bool:
    result = subprocess.run(
        ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", video_path],
        capture_output=True, text=True
    )
    streams = json.loads(result.stdout).get("streams", [])
    for s in streams:
        if s.get("codec_type") == "video":
            return s["width"] >= expected_width and s["height"] >= expected_height
    return False

Duration check: Generated video duration should be within ±10% of the requested duration. An 8-second request that returns 3.2 seconds indicates a generation failure.

File size sanity: A video that is under 50KB is almost certainly corrupted or a placeholder error response. Log and discard.

MP4 playability: ffprobe the file without silent errors. If ffprobe exits with an error code, the file is unplayable.

Content check (optional): For pipelines that generate public-facing content, an automated content safety check (NSFW detection) should run before delivery. Most providers offer safety filtering as an API parameter; add a validation layer for anything that the provider's filter might miss.

Storage Architecture

Rule: never depend on the provider's hosted URL.

Every AI video provider hosts generated video on temporary storage. Veo returns a URI that expires. HeyGen's hosted video URL is valid for a limited window. Runway's output is similarly time-bounded.

If your pipeline does not download and re-store the video immediately upon generation, you risk:

  • URL expiry before downstream consumption
  • Provider storage failures or policy changes invalidating URLs
  • No redundancy if the provider has an outage during the window you need to serve the video

The production storage pattern:

  1. Generation completes → receive temporary provider URL
  2. Immediately download → stream to local storage or directly to R2/S3
  3. Generate your own CDN URL → Cloudflare R2, AWS CloudFront, or equivalent
  4. Record your URL, not the provider URL, in the database
import httpx, boto3

async def store_video(provider_url: str, job_id: str) -> str:
    # Stream video from provider
    async with httpx.AsyncClient() as client:
        response = await client.get(provider_url)
        response.raise_for_status()

    # Upload to R2
    s3 = boto3.client("s3", endpoint_url=R2_ENDPOINT)
    key = f"videos/{job_id}.mp4"
    s3.put_object(
        Bucket=R2_BUCKET,
        Key=key,
        Body=response.content,
        ContentType="video/mp4"
    )

    return f"https://cdn.yourdomain.com/{key}"

Cost Tracking

AI video generation has real, variable costs. At $0.35/second for Veo, a pipeline bug that queues 50 8-second clips instead of 5 costs $140 instead of $14. That bug will happen. The question is whether you find out via an alert or via your credit card statement.

Cost tracking requirements:

  1. Record cost per job at generation time: cost_usd = duration_seconds × platform_rate
  2. Track daily spend in a database table with date, platform, and total
  3. Set a ceiling — when daily spend exceeds a threshold, halt the queue and alert
  4. Weekly cost report — generate and post to Discord; know your spend trends before they become surprises
PLATFORM_RATES = {
    "veo": 0.35,
    "runway": 0.05,
    "heygen": 0.08,
    "kling": 0.02,
}

DAILY_BUDGET_USD = 50.0

async def check_budget_and_record(platform: str, duration_s: float) -> float:
    cost = duration_s * PLATFORM_RATES[platform]
    today_spend = await db.get_today_spend()

    if today_spend + cost > DAILY_BUDGET_USD:
        await alert_discord(f"Budget ceiling hit: ${today_spend:.2f} + ${cost:.2f} > ${DAILY_BUDGET_USD}")
        raise BudgetExceededError("Daily budget ceiling reached — queue halted")

    await db.record_spend(platform=platform, cost=cost, duration=duration_s)
    return cost

Retry Logic

Three categories of failures require different retry approaches:

Transient API failures (5xx, network timeout): Exponential backoff. First retry at 2 seconds, second at 4 seconds, third at 8 seconds. After 3 retries, mark the job failed and alert.

Generation failures (job returns FAILED state): Re-queue with modified prompt if the failure is likely prompt-related. Log the original prompt and failure reason. Repeat once; if it fails again, alert and halt.

Quality failures (video fails validation): Discard and re-generate. Quality failures are often non-deterministic — the same prompt will produce a passing result on the next attempt. If quality fails 3 consecutive times, the prompt may be the issue; alert for human review.

Complete Production Checklist

Before declaring a video pipeline production-ready:

Async job management

  • Job queue with persistence (not in-memory)
  • Polling loop with configurable interval and timeout
  • Worker pool with bounded concurrency

Quality validation

  • Resolution check via ffprobe
  • Duration check with tolerance
  • File size sanity check
  • MP4 playability check
  • Content safety check (if public-facing)

Storage

  • Immediate download on job completion
  • R2/S3 upload with consistent key scheme
  • CDN URL returned to caller, not provider URL
  • Storage failure handling (retry upload, alert on repeated failure)

Cost tracking

  • Cost recorded per job at completion
  • Daily spend tracking with ceiling
  • Alert on ceiling approach (80%) and hit (100%)
  • Weekly cost report to monitoring channel

Retry logic

  • Transient API failure: exponential backoff, 3 retries max
  • Generation failure: re-queue with modified prompt, one retry
  • Quality failure: re-generate, three retries, then alert

Monitoring

  • Discord alerts on any failure
  • Log file with timestamps at every stage
  • Watchdog monitors log staleness

Lesson 111 Drill

Implement the minimum viable production video pipeline:

  1. Submit a video generation job to one platform (Veo or Runway)
  2. Implement the async polling loop with a 15-minute timeout
  3. Validate the result: resolution, duration, file size
  4. Download to local storage
  5. Record: job_id, platform, cost, generation time, storage path

Run it 10 times with different prompts. Document every failure you encounter. Each failure is a production mitigation waiting to be added. Add them all before declaring the pipeline production-ready.