Skip to main content

Worker Pipeline

How the orchestrator chains workers into a multi-step flow, and how to add a step.

Today the pipeline is:

implementation ──▶ bug-hunter ──▶ (end)

implementation opens a PR, then bug-hunter reviews that PR and comments on it. Each step runs as its own ECS worker and reports back to the orchestrator.

How a step triggers the next one

The orchestrator owns a single SQS queue. External producers (core, Slack handlers) drop the initial task on it; the orchestrator's consumer reads it and launches the matching worker. Follow-up steps reuse that exact same path — the orchestrator just enqueues the next task on the same queue and its own consumer launches it.

Worker callback and advancePipeline flow

Why enqueue instead of launching directly?

The orchestrator is the launcher, so enqueueing onto its own queue looks like a detour. It isn't — the queue gives follow-up steps the same properties the initial task gets:

  • Backpressure — the consumer only launches when concurrency.hasAvailableSlot() is true (MAX_CONCURRENT_WORKERS). If slots are full, the message simply waits in the queue.
  • Durability & retry — if RunTask fails or the orchestrator restarts mid-flight, SQS redelivery + the DLQ recover the task.
  • One launch path — every worker (initial and follow-up) is born the same way, with the same observability and concurrency tracking.

The single source of truth: orchestrator/src/pipeline.ts

The flow is declarative. PIPELINE maps a completed step's taskType to its successor:

const PIPELINE: Record<string, StageTransition | undefined> = {
implementation: {
next: 'bug-hunter',
build: (stored, result) => ({ payload: { repo, prNumber, ... }, callbacks: { ... } }),
},
// 'bug-hunter' has no entry → terminal step.
};

Two functions consume it:

  • nextStage(stored, result) — pure. Returns the next BackgroundTask to enqueue, or null if the step is terminal, unknown, or its build guard didn't pass (e.g. an implementation that opened no PR). No I/O, so it's directly unit-tested.
  • advancePipeline({ sqs, queueUrl, stored, result }) — calls nextStage and, if there's a next step, enqueues it. Best-effort: it never throws, so a failed enqueue is logged but can never break the completing worker's callback.

The callback handler is flow-agnostic — it just calls advancePipeline(...) and knows nothing about which step follows which.

Two structural guarantees

  • No self-amplification. A step with no PIPELINE entry is terminal. bug-hunter has none, so a bug-hunter completion can never enqueue another worker.
  • Guarded transitions. A transition's build returns null to skip — e.g. implementation → bug-hunter only fires when the result actually produced a PR and the repo is resolvable.

Adding a step

Say you want bug-hunter → security-scan → (end). You touch one file, pipeline.ts:

const PIPELINE: Record<string, StageTransition | undefined> = {
implementation: { next: 'bug-hunter', build: ... }, // unchanged
'bug-hunter': { // was terminal, now transitions
next: 'security-scan',
build: (stored, result) => ({ payload: { /* derive from bug-hunter's stored task/result */ },
callbacks: { ... } }),
},
// 'security-scan' has no entry → new terminal step.
};

Then the usual worker wiring (unchanged by the pipeline refactor):

  1. Register the type in SUPPORTED_TASK_TYPES (sqsConsumer.ts) and resolveTaskDefinition (workerLauncher.ts).
  2. Add a case to notifier.notify if the step needs origin notifications.
  3. Add the worker itself under workers/<name>/ and a dispatch case in workers/src/main.ts.

The callback handler and advancePipeline need no changes.

Tests

  • orchestrator/test/pipeline.test.ts — the transition matrix (pure nextStage) and the enqueue side-effects + no-throw behavior (advancePipeline).
  • workers/bug-hunter/test/* — the bug-hunter worker lifecycle.

Local note

With ENV=local the orchestrator's WorkerLauncher skips ECS, so an enqueued follow-up is stored but not auto-launched — run that worker manually (see workers/bug-hunter/README.md). The transition logic itself is fully exercised by pipeline.test.ts without any infrastructure.