Skip to content

Architecture

Internal architecture for contributors and anyone who wants to understand how the system works.

Layer diagram

┌─────────────────────────────────────────────────────────────┐
│                        DSL Layer                            │
│    Pipeline.configure defines the DAG topology              │
│    Step keys are graph identity; job classes are impl       │
└─────────────────────────┬───────────────────────────────────┘

┌─────────────────────────▼───────────────────────────────────┐
│                  Validation Layer                            │
│    DAG validated at instantiation time, before DB writes    │
│    Cycles, unknown keys, duplicates rejected upfront        │
└─────────────────────────┬───────────────────────────────────┘

┌─────────────────────────▼───────────────────────────────────┐
│                     State Layer                             │
│    Postgres tables are the authoritative source of truth    │
│    coordination_status is the sole input for decisions      │
│    halt_triggered flag drives :halted derivation            │
└─────────────────────────┬───────────────────────────────────┘

┌─────────────────────────▼───────────────────────────────────┐
│                   Execution Layer                            │
│    One GoodJob::Batch per step                              │
│    User jobs enqueued via perform_later — fully untouched   │
│    Batch on_finish is the sole terminal signal              │
│    Enqueue is transactionally coupled to row transition     │
└─────────────────────────┬───────────────────────────────────┘

┌─────────────────────────▼───────────────────────────────────┐
│                  Coordination Layer                          │
│    Coordinator owns ALL coordination_status transitions     │
│    Atomic row-locked transitions (FOR UPDATE SKIP LOCKED)   │
│    Explicit transaction boundaries per atomic unit          │
│    recompute_pipeline_status is the sole derivation path    │
└─────────────────────────┬───────────────────────────────────┘

┌─────────────────────────▼───────────────────────────────────┐
│                    Chain Layer                               │
│    .then() wires pipeline-level DAG dependencies            │
│    Same coordinator pattern, one level up                   │
└─────────────────────────────────────────────────────────────┘

Data model

GoodPipeline uses four Postgres tables:

good_pipeline_pipelines

ColumnTypeNotes
iduuidPrimary key
typestringPipeline class name (e.g. "VideoProcessingPipeline")
paramsjsonbArguments passed to .run()
statusstringpending, running, succeeded, failed, halted, skipped
halt_triggeredbooleanSet to true when :halt strategy is applied
good_job_batch_iduuidPipeline-level GoodJob::Batch for grouping
on_failure_strategystringhalt, continue, or ignore
callbacks_dispatched_attimestampExactly-once callback dispatch guard
created_attimestamp
updated_attimestamp

good_pipeline_steps

ColumnTypeNotes
iduuidPrimary key
pipeline_iduuidForeign key
keystringStep key — graph identity
job_classstringActiveJob class name
paramsjsonbArguments passed to with:
coordination_statusstringpending, enqueued, succeeded, failed, skipped
on_failure_strategystringStep-level override (nullable)
enqueue_optionsjsonbOptions passed to job.enqueue() (queue, priority, wait, etc.)
good_job_batch_iduuidStep's own GoodJob::Batch
good_job_iduuidGoodJob record ID (nil until enqueued)
attemptsintegerExecution attempt count
error_classstringTerminal failure error class
error_messagetextTerminal failure error message
created_attimestamp
updated_attimestamp

Unique constraint: (pipeline_id, key) — enforces step key uniqueness within a pipeline.

good_pipeline_dependencies

ColumnTypeNotes
idbigintPrimary key
pipeline_iduuidDenormalized for fast querying
step_iduuidThe dependent step
depends_on_step_iduuidThe step that must complete first

good_pipeline_chains

ColumnTypeNotes
iduuidPrimary key
upstream_pipeline_iduuidThe pipeline that must finish first
downstream_pipeline_iduuidThe pipeline to start after

One batch per step

Each step has its own GoodJob::Batch. The user's job is enqueued via perform_later into that batch, preserving all ActiveJob semantics (instrumentation, callbacks, serialization, queue routing, retries, discard_on).

When the batch's on_finish fires, StepFinishedJob receives the signal and delegates to the coordinator. StepFinishedJob is a thin dispatcher — it does not own any state transitions.

The coordinator

The Coordinator class is the sole owner of all coordination_status transitions. Its complete_step method uses explicit transaction boundaries around three atomic units:

  1. Terminal step transition — marks the step succeeded or failed with metadata
  2. Halt propagation — sets halt_triggered and skips all pending steps (if :halt strategy)
  3. Downstream unblocking — checks and enqueues each downstream step independently via try_enqueue_step, which acquires a per-step row lock

These three units are intentionally not wrapped in a single outer transaction. Holding locks across multiple downstream step enqueues would be a bottleneck under high parallelism.

After the three units complete, complete_step calls recompute_pipeline_status to derive the pipeline's terminal state from the current database state.

Terminal state derivation

Pipeline terminal status is never inferred from a single event. It is always derived by recompute_pipeline_status, which reads all step coordination_status values and the halt_triggered flag:

ConditionDerived status
Any step is pending or enqueuedNot terminal — still running
All steps terminal, none failedsucceeded
All steps terminal, at least one failed, halt_triggered is truehalted
All steps terminal, at least one failed, halt_triggered is falsefailed

This function is safe to call from multiple code paths (coordinator, batch reconciliation) because it is idempotent on terminal pipelines.

Enqueue transaction contract

The transition of a step from pending to enqueued and the insertion of the corresponding GoodJob record happen inside a single database transaction. This is possible because GoodJob stores jobs in Postgres — the same database as GoodPipeline's tables.

If the transaction rolls back, both the step status revert and the GoodJob record insertion are cancelled atomically. No stuck-enqueued steps, no ghost jobs.

Concurrency safety

The double-enqueue problem

If two upstream steps of a shared downstream complete simultaneously, both coordinator invocations may try to enqueue the downstream step. GoodPipeline prevents this with:

  1. FOR UPDATE SKIP LOCKED — one coordinator acquires the row lock; the other skips silently
  2. good_job_id null guard — a non-null good_job_id is conclusive proof the step was already enqueued (only valid inside a row lock)
  3. Status guard — the coordinator checks coordination_status == "pending" inside the lock

Callback exactly-once

dispatch_callbacks_once uses a FOR UPDATE locked transaction with the callbacks_dispatched_at timestamp as a guard. Even if recompute_pipeline_status is called concurrently from multiple code paths, the callback bundle fires exactly once.

Retry model

GoodPipeline never inspects exceptions during retry attempts. It only responds to the terminal signal from the step batch's on_finish callback:

GoodJob outcomeGoodPipeline response
Job completed successfullyStep → succeeded
Job raised, retries remainingNo action — on_finish hasn't fired
Job exhausted retriesStep → failed
Job discarded via discard_onStep → failed

This ensures a step is never prematurely marked failed on attempt 1 of 5.

Design decisions

  1. Postgres only -- all state in Postgres, which is what makes atomic enqueue transactions possible
  2. One batch per step -- user jobs are enqueued via perform_later, so all ActiveJob semantics (instrumentation, callbacks, retries, discard_on) work as expected
  3. Terminal signal comes from batch.succeeded?, not exception rescue
  4. coordination_status is the sole decision input -- the coordinator reads only this column
  5. :halted is policy-driven -- set via halt_triggered flag, not pattern-derived
  6. The coordinator owns all transitions; StepFinishedJob is a thin dispatcher
  7. Separate atomic units per transaction boundary to minimize lock contention
  8. DAG validation runs at instantiation, before any database writes
  9. failure_strategy and on_failure are distinct concepts -- strategy vs. callback, no naming collision

Why these tradeoffs

GoodPipeline is intentionally GoodJob-specific and Postgres-only. This is what enables atomic enqueue transactions — step status transitions and GoodJob record inserts happen in a single database transaction, eliminating an entire class of partial-state bugs that adapter-agnostic gems must work around.

The DAG execution model (vs. strictly sequential steps) adds coordination complexity — row locks, atomic counters, fan-in race prevention — but unlocks parallel execution of independent steps. For workflows where steps have no dependency on each other, this means wall-clock time is bounded by the longest path through the graph, not the sum of all steps.

The four-table data model (pipelines, steps, dependencies, chains) is more tables than a two-table approach, but dedicated dependency and chain tables enable efficient graph queries and keep the step table free of self-referential joins.

Released under the MIT License.