Stage

Stage is the fundamental processing unit of a pipeline.
It is a function I => Yield[I, O, E] that transforms a single input value into a Yield — an optional output, a Status, and an Evolution that decides what the pipeline looks like for the next run.

Stages are contravariant in their input type I and covariant in their output type O and error type E. This variance enables safe composition: a stage that accepts a wider input type can be used wherever a more specific one is expected, and outputs can flow naturally from one stage to the next.

import h8io.stages.*

The Lifecycle: apply and skip

Every Stage participates in exactly one of two paths during a pipeline run.

apply — the active path. The stage receives a real input value and returns a Yield. It is free to perform side effects, update internal state, and produce any output it likes.

skip — the bypassed path. Any stage that participates in a pipeline run but does not process the current input must take this path: it returns the Evolution it would have returned had it run — without performing any side effects or consuming any value. Common triggers are an upstream stage producing Yield.None (nothing to pass downstream) or a non-inclusive binary operation that excludes this branch.

skip exists precisely so that stages further downstream still get a chance to evolve correctly even when they are not directly executed. The Diagram explains this with a concrete example involving Stage 3-1, which is skipped in the first generation but still needs to evolve into Stage 3-2.

Here is a minimal stage that doubles its input on the active path and supplies its evolution on the skipped path:

object Double extends Stage[Int, Int, Nothing] {
  private def evo: Evolution[Int, Int, Nothing] = new Evolution[Int, Int, Nothing] {
    override def evolve(status: Status[?]): Stage[Int, Int, Nothing] = Double
    override def dispose(): Unit = ()
  }

  override def apply(in: Int): Yield[Int, Int, Nothing] =
    Yield.Some(in * 2, Status.Success, evo)

  override def skip(): Evolution[Int, Int, Nothing] = evo
}

Double(7)
// res0: Yield[Int, Int, Nothing] = Some(
//   out = 14,
//   status = Success,
//   evolution = repl.MdocSession$MdocApp$Double$$anon$1@1e1ab27c
// )
Double.skip()
// res1: Evolution[Int, Int, Nothing] = repl.MdocSession$MdocApp$Double$$anon$1@4dc935a0

Building Pipelines with ~>

~> composes two stages into a Stage.AndThen: the output of the left stage becomes the input of the right stage. The result is itself a Stage, so further stages can be appended with additional ~> calls:

object ToString extends Stage[Int, String, Nothing] {
  private def evo: Evolution[Int, String, Nothing] = new Evolution[Int, String, Nothing] {
    override def evolve(status: Status[?]): Stage[Int, String, Nothing] = ToString
    override def dispose(): Unit = ()
  }

  override def apply(in: Int): Yield[Int, String, Nothing] =
    Yield.Some(in.toString, Status.Success, evo)

  override def skip(): Evolution[Int, String, Nothing] = evo
}

object Shout extends Stage[String, String, Nothing] {
  private def evo: Evolution[String, String, Nothing] = new Evolution[String, String, Nothing] {
    override def evolve(status: Status[?]): Stage[String, String, Nothing] = Shout
    override def dispose(): Unit = ()
  }

  override def apply(in: String): Yield[String, String, Nothing] =
    Yield.Some(in.toUpperCase + "!", Status.Success, evo)

  override def skip(): Evolution[String, String, Nothing] = evo
}

val pipeline = Double ~> ToString ~> Shout
// pipeline: Stage[Int, String, Nothing] = AndThen(
//   upstream = AndThen(upstream = <function1>, downstream = <function1>),
//   downstream = <function1>
// )

The static type of pipeline is Stage[Int, String, Nothing]: it accepts the input of the first stage and produces the output of the last one. The intermediate type Int → String is erased at the pipeline boundary.

Running the pipeline produces a single Yield that combines the status and evolution of all three stages:

pipeline(5)
// res2: Yield[Int, String, Nothing] = Some(
//   out = "10!",
//   status = Success,
//   evolution = AndThen(
//     upstream = repl.MdocSession$MdocApp$Shout$$anon$3@2708355d,
//     downstream = AndThen(
//       upstream = repl.MdocSession$MdocApp$ToString$$anon$2@2ef3db97,
//       downstream = repl.MdocSession$MdocApp$Double$$anon$1@7a7b57af
//     )
//   )
// )

When the upstream stage produces Yield.None, the downstream stage is not applied; instead it is wired into the combined evolution so that it will be called when the next input arrives.

Terminal Execution with execute

execute is the one-shot path: it applies the stage to an input, immediately disposes the Evolution, and returns a plain Outcome with no continuation.

It is the right choice when a single stage — or a composed pipeline — should run exactly once and the next generation is not needed:

val outcome = pipeline.execute(5)
// outcome: Outcome[String, Nothing] = Some(
//   out = "10!",
//   status = Success,
//   disposeFailure = None
// )

The disposal of the Evolution happens inside execute regardless of what the stage returned. If dispose() throws a non-fatal exception, the exception is captured in Outcome.disposeFailure and the outcome is still returned normally. Fatal exceptions are not caught and will propagate.