DevLogs
AI Python FastAPI Redis Docker

Wrapping the Video Pipeline in a Queue Service

Posted on

The pipeline works on GPU in ~5 minutes. Synchronous generation still doesn't. How a Redis queue, FastAPI, and persistent workers turn a video pipeline into a usable service.

The talking-head pipeline is covered separately — the short version is LivePortrait for head motion, Wav2Lip for lip sync, GFPGAN for face restoration. The output is what I want. The problem is time.

On Mac with MPS, the full pipeline with GFPGAN takes 36 minutes. On an NVIDIA GPU — tested on an RTX 3060 — it drops to about 5 minutes with enhancement included. The GPU is what makes the pipeline production-viable, not an optimization on top of it.

But even at 5 minutes, a synchronous HTTP endpoint isn’t the right interface. The connection ties up the client, timeouts become a real concern, and retries create duplicate jobs. The fix is a job queue.

The Architecture

Request flow: from client to worker to output

The API accepts a POST to /generate — multipart with a face image and audio file. It pushes a job onto the Redis queue, returns a job_id, and closes the connection immediately. The client is done in milliseconds.

Workers run independently. Each worker loads the models once at startup, then loops on blpop("video_jobs"), blocking until a job arrives. When one comes in, it runs the full pipeline and writes the output MP4 to shared storage. Status and result path go back into Redis with a 24-hour TTL.

The client polls /status/{job_id} until it sees completed, then fetches the file from /result/{job_id}.

The Worker Loop

The core of the worker is about ten lines:

while True:
    _, raw = r.blpop("video_jobs")
    job = json.loads(raw)
    job_id = job["job_id"]

    r.set(f"job:{job_id}:status", "processing", ex=JOB_TTL_SECS)

    result = lp.run(
        source_image=job["face_image_path"],
        audio=job["audio_path"],
        output=job["output_path"],
        **job["params"],
    )

    r.set(f"job:{job_id}:status", "completed", ex=JOB_TTL_SECS)
    r.set(f"job:{job_id}:output", result["output_path"], ex=JOB_TTL_SECS)

blpop blocks until a job arrives — no polling, no sleep loop. Status transitions are written back to Redis at each stage so the client always has a current view: processingcompleted.

What Happens on Failure

If the pipeline raises, the worker catches it, writes failed status and the error string back to Redis, and continues to the next job — the process doesn’t die.

except Exception as e:
    r.set(f"job:{job_id}:status", "failed", ex=JOB_TTL_SECS)
    r.set(f"job:{job_id}:error", str(e), ex=JOB_TTL_SECS)

Both the status and error keys carry the same 24-hour TTL as successful jobs, so /status/{job_id} returns the error message for the same window the client would poll in. After 24 hours everything expires and the job is gone. Upload files are deleted from disk in a finally block regardless of outcome.

Why Workers Load Models Once

Model loading is expensive. Each model takes several seconds to initialize on GPU. If the worker reloaded everything per job, a meaningful chunk of every run would be startup overhead rather than actual generation.

Persistent workers solve this. The worker process initializes once and stays alive, processing jobs as they arrive. Initialization cost is paid once per worker lifecycle, not once per job. For a pipeline that runs continuously at volume, that matters.

Two Dockerfiles

Production runs on a Windows server with an RTX 3060. The Dockerfile targets NVIDIA CUDA 12.1 + cuDNN, and workers get GPU access through the runtime: nvidia flag in docker-compose.

Mac testing uses a separate Dockerfile.mac — python:3.11-slim, no CUDA, no GPU flags. The models run on CPU, which is slow, but fast enough to verify the pipeline end-to-end without needing the production machine. docker-compose.mac.yml keeps the two environments completely separate.

The split means local iteration doesn’t require GPU hardware, and the production image never carries anything it doesn’t need.