Skip to content

Defining Pipelines

Pipeline class structure

Every pipeline is a subclass of GoodPipeline::Pipeline that implements configure:

ruby
class VideoProcessingPipeline < GoodPipeline::Pipeline
  description "Downloads, transcodes and publishes a video"
  failure_strategy :halt
  coordination_queue_name "video_coordination"
  callback_queue_name "video_callbacks"

  on_complete :notify
  on_success  :celebrate
  on_failure  :alert

  def configure(video_id:)
    run :download,  DownloadJob,  with: { video_id: video_id }
    run :transcode, TranscodeJob, after: :download
    run :thumbnail, ThumbnailJob, after: :download
    run :publish,   PublishJob,   after: %i[transcode thumbnail]
    run :cleanup,   CleanupJob,   after: :publish
  end

  private

  def notify = Rails.logger.info("Pipeline complete")
  def celebrate = Rails.logger.info("All steps succeeded!")
  def alert = Rails.logger.warn("Pipeline had failures")
end

Class-level methods:

MethodPurposeDefault
display_nameOverride the pipeline name shown on the dashboardnil (falls back to underscore.titleize)
descriptionHuman-readable label for the dashboardnil
failure_strategyPipeline-level failure handling policy:halt
on_completeCallback for any terminal statenil
on_successCallback for succeedednil
on_failureCallback for failed or haltednil
coordination_queue_nameQueue for StepFinishedJob and PipelineReconciliationJob"good_pipeline_coordination"
callback_queue_nameQueue for PipelineCallbackJob"good_pipeline_callbacks"

DSL verbs

There are two DSL verbs: run for defining steps, and branch for conditional paths. See Conditional Branching for the branch DSL.

The run DSL verb

All DAG topology is expressed through after: edges:

ruby
run :step_key, JobClass,
  with:       { keyword: args },               # keyword args passed to job's perform method
  after:      :other_step,                     # single dependency (symbol or array of symbols)
  on_failure: :ignore,                         # step-level failure strategy override
  enqueue:    { queue: :media, priority: 10 }  # options passed to job.enqueue()

Parameters

ParameterTypeRequiredDescription
keySymbolYesUnique step identifier within this pipeline
job_classClassYesThe ActiveJob class to execute
with:HashNoKeyword arguments forwarded to the job's perform method
after:Symbol or ArrayNoStep key(s) this step depends on
on_failure:SymbolNoOverride: :halt, :continue, or :ignore
enqueue:HashNoOptions passed through to job.enqueue() (see below)

Enqueue options

The enqueue: hash supports any option that ActiveJob's enqueue method accepts, except wait_until:

KeyTypeDescription
queueString/SymbolQueue name for this step's job
priorityIntegerPriority for this step's job
waitNumericDelay in seconds after dependencies are satisfied
good_job_labelsArrayGoodJob labels for the job
good_job_notifyBooleanWhether GoodJob emits a NOTIFY event

wait_until is not supported because absolute times don't survive JSONB serialization and are semantically wrong in a DAG context — the step may not be enqueued until minutes or hours after the pipeline is created.

Step keys vs job classes

The step key is the graph identity — it's what you reference in after:. The job class is the implementation — what code runs:

ConceptPurposeExample
Step keyGraph identity — used in after::transcode_720p
Job classImplementation — what code runsTranscodeJob

The same job class can appear multiple times in one pipeline under different keys:

ruby
run :resize_small, ResizeImageJob, with: { size: "small" }
run :resize_large, ResizeImageJob, with: { size: "large" }
run :combine,      CombineJob,     after: [:resize_small, :resize_large]

DAG topology via after:

All topology flows from after:. Steps with no after: are root steps and start immediately. Steps with after: wait for all listed dependencies to be satisfied.

Fan-out

One step feeds into multiple parallel steps:

ruby
run :download,  DownloadJob
run :transcode, TranscodeJob, after: :download
run :thumbnail, ThumbnailJob, after: :download
   :download

 ┌────┴────┐
 ↓         ↓
:transcode :thumbnail

Fan-in

Multiple steps converge into one:

ruby
run :transcode, TranscodeJob, after: :download
run :thumbnail, ThumbnailJob, after: :download
run :publish,   PublishJob,   after: [:transcode, :thumbnail]

:publish waits for both :transcode and :thumbnail to succeed.

Complex DAGs

Combine fan-out and fan-in freely:

ruby
def configure(video_id:)
  run :download,  DownloadJob,  with: { video_id: video_id }
  run :transcode, TranscodeJob, after: :download
  run :thumbnail, ThumbnailJob, after: :download
  run :publish,   PublishJob,   after: [:transcode, :thumbnail]
  run :cleanup,   CleanupJob,   after: :publish
end
   :download

 ┌────┴────┐
 ↓         ↓
:transcode :thumbnail
 └────┬────┘

  :publish

  :cleanup

Running a pipeline

ruby
# Fire and forget
VideoProcessingPipeline.run(video_id: 123)

# Capture the result for monitoring or chaining
pipeline = VideoProcessingPipeline.run(video_id: 123)
pipeline.id      # => "uuid-string"
pipeline.status  # => "running"

.run returns a GoodPipeline::Chain object that delegates common methods (id, status, params, steps, etc.) to the underlying pipeline record.

Definition freeze semantics

Once a pipeline instance is created and validated:

  1. The graph topology is immutable — steps and edges are written to the database and cannot be modified
  2. All step definitions are frozen
  3. The params hash is frozen

This ensures the pipeline definition is a snapshot — later changes to the pipeline class don't affect running instances.

Released under the MIT License.