Skip to content

Stage Design

Understanding Runtime’s composable stage architecture.

Stages are the core abstraction in Runtime. Every component that processes streaming data is a stage.

A stage transforms streaming elements:

type Stage interface {
Name() string
Type() StageType
Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error
}

Stages connect via channels to form pipelines:

[Stage A] ──channel──▶ [Stage B] ──channel──▶ [Stage C]

Each stage runs in its own goroutine, enabling concurrent processing.

Applications need many features:

  • State management
  • Prompt assembly
  • Validation
  • LLM execution
  • Speech-to-text
  • Text-to-speech
  • Metrics
  • Error handling

Without stages, code becomes tangled:

// Bad: Everything in one function
func execute(audio []byte) []byte {
// VAD detection
if !detectSpeech(audio) {
return nil
}
// Transcribe
text := transcribe(audio)
// Load state
history := loadFromRedis(sessionID)
// Call LLM
response := openai.Complete(text, history)
// Save state
saveToRedis(sessionID, response)
// Generate speech
return synthesize(response)
}

Problems:

  • Hard to test individual components
  • Can’t reuse logic
  • No streaming support
  • Difficult to add features

With stages, concerns are separate and stream-capable:

pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewAudioTurnStage(vadConfig),
stage.NewSTTStage(sttService, sttConfig),
stage.NewStateStoreLoadStage(stateConfig),
stage.NewProviderStage(provider, tools, policy, config),
stage.NewIncrementalSaveStage(stateConfig),
stage.NewTTSStage(ttsService, ttsConfig),
).
Build()

Each stage focuses on one thing, and data streams through.

Each stage does one thing:

StateStoreLoadStage: Only loads conversation history PromptAssemblyStage: Only assembles prompts ProviderStage: Only calls the LLM and runs the tool loop IncrementalSaveStage: Only persists new messages

Validation is not a stage. It runs as ProviderHook / ToolHook chains invoked from inside ProviderStage (BeforeCall, AfterCall, ChunkInterceptor for streaming). Three authoring sources — pack-declared validators, eval-handler-as-guardrail, and user-registered WithProviderHook / WithToolHook calls — all converge on a single hooks.Registry.

Stages combine easily:

// Simple pipeline
pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewPromptAssemblyStage(registry, task, vars),
stage.NewProviderStage(provider, tools, policy, config),
).
Build()
// Add state management
pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewStateStoreLoadStage(stateConfig), // Added
stage.NewPromptAssemblyStage(registry, task, vars),
stage.NewProviderStage(provider, tools, policy, config),
stage.NewIncrementalSaveStage(stateConfig), // Added
).
Build()
// Add validation — registered on hooks.Registry, invoked from inside ProviderStage
hookRegistry.RegisterProviderHook(guardrails.NewGuardrailHook(validatorConfig))
// (the same pipeline as above; validators run inside ProviderStage via the hook chain,
// not as a separate stage)

Order matters. User controls it:

pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewStateStoreLoadStage(stateConfig), // 1. Load history
stage.NewPromptAssemblyStage(registry, task, vars), // 2. Assemble prompt
stage.NewProviderStage(provider, tools, policy, config), // 3. Execute (hooks validate before/after)
stage.NewIncrementalSaveStage(stateConfig), // 4. Save state
).
Build()

Stages process elements as they flow, not in batches:

func (s *MyStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
defer close(output)
for elem := range input {
// Process each element immediately
result := s.transform(elem)
output <- result
}
return nil
}

Transforms elements 1:1 or 1:N:

type UppercaseStage struct {
stage.BaseStage
}
func (s *UppercaseStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
defer close(output)
for elem := range input {
if elem.Text != nil {
upper := strings.ToUpper(*elem.Text)
elem.Text = &upper
}
output <- elem
}
return nil
}

Collects multiple elements into one:

type MessageCollectorStage struct {
stage.BaseStage
}
func (s *MessageCollectorStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
defer close(output)
var messages []types.Message
for elem := range input {
if elem.Message != nil {
messages = append(messages, *elem.Message)
}
}
// Emit single element with all messages
output <- stage.StreamElement{
Metadata: map[string]interface{}{
"messages": messages,
},
}
return nil
}

Produces multiple elements from one input:

type TokenizerStage struct {
stage.BaseStage
}
func (s *TokenizerStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
defer close(output)
for elem := range input {
if elem.Text != nil {
// Split into tokens, emit each
tokens := strings.Fields(*elem.Text)
for _, token := range tokens {
t := token
output <- stage.StreamElement{Text: &t}
}
}
}
return nil
}

Terminal stage that consumes without producing:

type LoggerStage struct {
stage.BaseStage
logger *log.Logger
}
func (s *LoggerStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
defer close(output)
for elem := range input {
s.logger.Printf("Element: %+v", elem)
// Don't emit anything
}
return nil
}
StagePurpose
StateStoreLoadStageLoad conversation history
IncrementalSaveStagePersist new messages (via MessageAppender if available, otherwise full save)
PromptAssemblyStageAssemble prompts from registry
TemplateStageVariable substitution
ProviderStageLLM execution with tool support; validation runs here as ProviderHook chains
StagePurpose
AudioTurnStageVAD-based turn detection
STTStageSpeech-to-text transcription
TTSStageText-to-speech synthesis
TTSStageWithInterruptionTTS with barge-in support
DuplexProviderStageBidirectional WebSocket streaming
VADAccumulatorStageAudio buffering with VAD
StagePurpose
RouterStageDynamic routing to multiple outputs
MergeStageCombine multiple inputs
MetricsStagePerformance monitoring
TracingStageDistributed tracing
PriorityChannelPriority-based scheduling
StagePurpose
DebugStagePipeline debugging
VariableProviderStageDynamic variable resolution
MediaExternalizerStageExternal media storage
ContextBuilderStageToken budget management
type MyStage struct {
stage.BaseStage
config MyConfig
}
func NewMyStage(config MyConfig) *MyStage {
return &MyStage{
BaseStage: stage.NewBaseStage("my_stage", stage.StageTypeTransform),
config: config,
}
}
func (s *MyStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
defer close(output) // Always close output
for elem := range input {
// Transform element
result := s.transform(elem)
// Send with cancellation check
select {
case output <- result:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
type RateLimitConfig struct {
RequestsPerSecond int
BurstSize int
}
type RateLimitStage struct {
stage.BaseStage
limiter *rate.Limiter
}
func NewRateLimitStage(config RateLimitConfig) *RateLimitStage {
return &RateLimitStage{
BaseStage: stage.NewBaseStage("rate_limit", stage.StageTypeTransform),
limiter: rate.NewLimiter(rate.Limit(config.RequestsPerSecond), config.BurstSize),
}
}
func (s *RateLimitStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
defer close(output)
for elem := range input {
if err := s.limiter.Wait(ctx); err != nil {
return err
}
output <- elem
}
return nil
}
func (s *MyStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
defer close(output)
for elem := range input {
result, err := s.process(elem)
if err != nil {
// Option 1: Send error element and continue
output <- stage.NewErrorElement(err)
continue
// Option 2: Return error to stop pipeline
// return err
}
output <- result
}
return nil
}

Stages that prepare input:

  • StateStoreLoadStage: Load conversation history
  • PromptAssemblyStage: Build prompt from registry
  • TemplateStage: Substitute variables

Input validation runs inside ProviderStage via ProviderHook.BeforeCall, not as a separate stage.

Stages that handle output:

  • IncrementalSaveStage: Persist new messages
  • MetricsStage: Record performance
  • DebugStage: Log for debugging

Output validation runs inside ProviderStage via ProviderHook.AfterCall (and ChunkInterceptor for streaming), not as a separate stage.

Split stream to multiple destinations:

pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewProviderStage(provider, tools, policy, config),
).
Branch("provider", "tts", "logger").
Build()

Combine multiple streams:

mergeStage := stage.NewMergeStage(inputChannels...)
pipeline := stage.NewPipelineBuilder().
Chain(
// 1. Load state first
stage.NewStateStoreLoadStage(stateConfig),
// 2. Resolve variables
stage.NewVariableProviderStage(providers),
// 3. Assemble prompt
stage.NewPromptAssemblyStage(registry, task, vars),
// 4. Apply templates
stage.NewTemplateStage(),
// 5. Execute LLM. Input/output validation runs here as ProviderHook chains
// (BeforeCall / AfterCall / ChunkInterceptor), not as a separate stage.
stage.NewProviderStage(provider, tools, policy, config),
// 6. Save state
stage.NewIncrementalSaveStage(stateConfig),
).
Build()

State early: Load history before prompt assembly Variables before templates: Resolve values, then substitute Validation via hooks: Pre/post-call validation runs inside ProviderStage via the hook registry, not as separate stages Save last: Persist after all processing complete

Control latency vs. throughput:

config := stage.DefaultPipelineConfig().
WithChannelBufferSize(32) // Larger buffer = higher throughput, more latency

Stages run in parallel by default. Heavy stages don’t block light ones.

Slow consumers automatically throttle producers via channel blocking.

Test stages in isolation:

func TestUppercaseStage(t *testing.T) {
s := NewUppercaseStage()
input := make(chan stage.StreamElement, 1)
output := make(chan stage.StreamElement, 1)
go s.Process(context.Background(), input, output)
text := "hello"
input <- stage.StreamElement{Text: &text}
close(input)
result := <-output
assert.Equal(t, "HELLO", *result.Text)
}

Test stages in pipeline:

func TestPipeline(t *testing.T) {
pipeline := stage.NewPipelineBuilder().
Chain(
NewUppercaseStage(),
NewTrimStage(),
).
Build()
text := " hello "
input := make(chan stage.StreamElement, 1)
input <- stage.StreamElement{Text: &text}
close(input)
output, _ := pipeline.Execute(context.Background(), input)
result := <-output
assert.Equal(t, "HELLO", *result.Text)
}

Stage design provides:

  • Single Responsibility: Each stage has one job
  • Composability: Mix and match stages
  • Streaming: Process elements as they flow
  • Concurrency: Stages run in parallel
  • Testability: Test stages independently
  • Extensibility: Easy to add custom stages