Pipeline Chaining
Pipelines can be wired together into pipeline-level DAGs. A downstream pipeline starts only after all its upstream pipelines succeed.
Serial chain
VideoProcessingPipeline
.run(video_id: 123)
.then(NotificationPipeline, with: { video_id: 123 })NotificationPipeline starts only after VideoProcessingPipeline succeeds.
Multi-step serial chain
VideoProcessingPipeline
.run(video_id: 123)
.then(QualityCheckPipeline, with: { video_id: 123 })
.then(NotificationPipeline, with: { video_id: 123 })Each pipeline waits for the previous one to succeed.
Fan-out
Multiple downstream pipelines start in parallel when the upstream succeeds:
VideoProcessingPipeline
.run(video_id: 123)
.then(
[NotificationPipeline, with: { video_id: 123 }],
[AnalyticsPipeline, with: { video_id: 123 }]
)Both NotificationPipeline and AnalyticsPipeline start simultaneously.
Fan-out then fan-in
VideoProcessingPipeline
.run(video_id: 123)
.then(QualityCheckPipeline, with: { video_id: 123 })
.then(
[NotificationPipeline, with: { video_id: 123 }],
[AnalyticsPipeline, with: { video_id: 123 }]
)
.then(ArchivePipeline, with: { video_id: 123 })VideoProcessingPipeline
↓
QualityCheckPipeline
↓
┌────┴────┐
↓ ↓
Notification Analytics
└────┬────┘
↓
ArchivePipelineArchivePipeline waits for both NotificationPipeline and AnalyticsPipeline to succeed.
Parallel start
Run multiple pipelines in parallel from the start using GoodPipeline.run:
GoodPipeline.run(
[VideoProcessingPipeline, with: { video_id: 123 }],
[AudioProcessingPipeline, with: { audio_id: 456 }]
).then(MergeMediaPipeline, with: { video_id: 123, audio_id: 456 })Both pipelines start immediately. MergeMediaPipeline waits for both to succeed.
Pipeline chaining is a first-class primitive — upstream/downstream relationships are tracked in a dedicated database table with atomic state propagation, rather than manually creating the next workflow in the last step of the current one.
How .then works internally
.then returns a GoodPipeline::Chain object which:
- Creates downstream pipeline records with
status: pending— params are stored immediately at chain registration time - Creates
good_pipeline_chainsrows linking upstream to downstream pipeline IDs - If any upstream has already reached a terminal state, immediately triggers chain propagation so the downstream is started or skipped
- After any upstream pipeline reaches a terminal state, the chain coordinator checks if all upstreams for each downstream have succeeded
- If all upstreams succeeded, the downstream pipeline starts (root steps are enqueued)
- If any upstream fails, halts, or is skipped, the downstream pipeline is set to
skipped
The chain coordinator uses the same atomic row-locking pattern (FOR UPDATE SKIP LOCKED) as the step-level coordinator to prevent double-start races. This means .then is safe to call at any time — even after the upstream has already completed.
Failure propagation
If any upstream pipeline in a chain fails, halts, or is skipped:
- The downstream pipeline transitions to
skipped - Any further downstream pipelines are also recursively
skipped on_completecallbacks fire on skipped pipelines, buton_failuredoes not — being skipped is not considered a failure
A (failed) → B (skipped) → C (skipped) → D (skipped)