Evolution
Evolution is the strategy that decides which Stage to use when the pipeline is ready to process the
next input.
Every Yield carries an Evolution, and when the pipeline has finished with one input it calls
evolve() on that Yield, which in turn calls the Evolution with the current Status.
Three ideas make Evolution work:
- a status-driven dispatch that selects the next stage;
- a disposal contract that ties resource cleanup to the lifetime of the producing stage;
- a composition mechanism that wires together the evolutions of all stages in a pipeline.
import h8io.stages.*
Selecting the Next Stage
evolve(status) returns the next Stage based on the given status.
It is not called by application code directly; Yield.evolve() passes the Yield's own status to it automatically.
A stage that does not change between generations can return itself regardless of the status:
object ParseInt extends Stage[String, Int, String] {
private val alwaysSelf = new Evolution[String, Int, String] {
override def evolve(status: Status[?]): Stage[String, Int, String] = ParseInt
override def dispose(): Unit = ()
}
override def apply(in: String): Yield[String, Int, String] =
in.toIntOption match {
case Some(n) => Yield.Some(n, Status.Success, alwaysSelf)
case None => Yield.None(Status.error(s"not a number: $in"), alwaysSelf)
}
override def skip(): Evolution[String, Int, String] = alwaysSelf
}
val okResult = ParseInt("42")
// okResult: Yield[String, Int, String] = Some(
// out = 42,
// status = Success,
// evolution = repl.MdocSession$MdocApp$ParseInt$$anon$1@63c54ef5
// )
val errResult = ParseInt("hi")
// errResult: Yield[String, Int, String] = None(
// status = Complete("not a number: hi"),
// evolution = repl.MdocSession$MdocApp$ParseInt$$anon$1@63c54ef5
// )
Calling evolve() on either result returns ParseInt again, since this stage always supplies itself as the
continuation:
okResult.evolve()
// res0: Stage[String, Int, String] = <function1>
errResult.evolve()
// res1: Stage[String, Int, String] = <function1>
Disposing Resources
dispose() releases all resources held by the stage that produced this Evolution.
After dispose() is called the producing stage must be considered permanently unusable — it must not be
applied or skipped again.
Two situations guarantee dispose() will be called:
Stage.execute()calls it immediately after the stage runs, sinceexecuteis a terminal operation and the continuation will never be needed.- If
evolvethrows aThrowable, the caller is still required to calldispose()so that nothing is leaked even when evolution itself fails.
Implementations that hold no external resources may leave dispose() as a no-op.
A stage that acquires a resource at construction time can release it in dispose():
class ResourceStage(name: String) extends Stage[String, String, Nothing] {
private var open = true
println(s"[$name] acquired")
private val evolution = new Evolution[String, String, Nothing] {
override def evolve(status: Status[?]): Stage[String, String, Nothing] =
ResourceStage.this
override def dispose(): Unit = {
open = false
println(s"[$name] released")
}
}
override def apply(in: String): Yield[String, String, Nothing] = {
require(open, s"[$name] already disposed")
Yield.Some(s"[$name]($in)", Status.Success, evolution)
}
override def skip(): Evolution[String, String, Nothing] = evolution
}
Stage.execute() triggers dispose() automatically after the stage runs:
new ResourceStage("conn").execute("hello")
// [conn] acquired
// [conn] released
// res2: Outcome[String, Nothing] = Some(
// out = "[conn](hello)",
// status = Success,
// disposeFailure = None
// )
When multiple stages are composed into a pipeline, both Evolution methods — evolve and dispose() — are called
in the order opposite to the order in which stages are applied: downstream first, then upstream.
This ensures that a downstream stage can still access anything the upstream stage provides right up until the moment
the upstream stage is torn down:
val pipeline = new ResourceStage("upstream") ~> new ResourceStage("downstream")
// [upstream] acquired
// [downstream] acquired
// pipeline: Stage[String, String, Nothing] = AndThen(
// upstream = <function1>,
// downstream = <function1>
// )
pipeline.execute("hello")
// [downstream] released
// [upstream] released
// res3: Outcome[String, Nothing] = Some(
// out = "[downstream]([upstream](hello))",
// status = Success,
// disposeFailure = None
// )
Composing Evolutions
When Stage.AndThen composes two stages via ~>, it also composes their Evolution values using compose.
The result is an Evolution.AndThen whose continuation for any status s is the sequential composition of the
corresponding continuations of both evolutions:
composed(s) == self(s) ~> that(s)
All Evolution method calls in Evolution.AndThen follow the same order: pipeline-downstream first, then
pipeline-upstream — the reverse of the order in which stages are applied.
This applies equally to evolve and dispose(), since both may release or transition resources held by the
producing stage.
The Diagram section on finalization walks through a concrete example of why this matters.
compose is an implementation detail of Stage.AndThen and is not part of the typical application-level API.
Adapting Stages with map
map produces a new Evolution whose continuation is the result of applying a function to the stage the original
evolution would have returned.
Disposal is delegated unchanged to the wrapped evolution.
This is used internally when a Yield.None propagates through Stage.AndThen: because no output was produced,
the downstream stage cannot be invoked immediately, so it is folded into the evolution via map so that the entire
composed pipeline will be applied when the next input arrives.