Skip to content

Stage Design

Understanding Runtime’s composable stage architecture.

Stages are the core abstraction in Runtime. Every component that processes streaming data is a stage.

A stage transforms streaming elements:

type Stage interface {
Name() string
Type() StageType
Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error
}

Stages connect via channels to form pipelines:

[Stage A] ──channel──▶ [Stage B] ──channel──▶ [Stage C]

Each stage runs in its own goroutine, enabling concurrent processing.

Applications need many features:

  • State management
  • Prompt assembly
  • Validation
  • LLM execution
  • Speech-to-text
  • Text-to-speech
  • Metrics
  • Error handling

Without stages, code becomes tangled:

// Bad: Everything in one function
func execute(audio []byte) []byte {
// VAD detection
if !detectSpeech(audio) {
return nil
}
// Transcribe
text := transcribe(audio)
// Load state
history := loadFromRedis(sessionID)
// Call LLM
response := openai.Complete(text, history)
// Save state
saveToRedis(sessionID, response)
// Generate speech
return synthesize(response)
}

Problems:

  • Hard to test individual components
  • Can’t reuse logic
  • No streaming support
  • Difficult to add features

With stages, concerns are separate and stream-capable:

pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewAudioTurnStage(vadConfig),
stage.NewSTTStage(sttService, sttConfig),
stage.NewStateStoreLoadStage(stateConfig),
stage.NewProviderStage(provider, tools, policy, config),
stage.NewStateStoreSaveStage(stateConfig),
stage.NewTTSStage(ttsService, ttsConfig),
).
Build()

Each stage focuses on one thing, and data streams through.

Each stage does one thing:

StateStoreLoadStage: Only loads conversation history PromptAssemblyStage: Only assembles prompts ValidationStage: Only validates content ProviderStage: Only calls the LLM

Stages combine easily:

// Simple pipeline
pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewPromptAssemblyStage(registry, task, vars),
stage.NewProviderStage(provider, tools, policy, config),
).
Build()
// Add state management
pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewStateStoreLoadStage(stateConfig), // Added
stage.NewPromptAssemblyStage(registry, task, vars),
stage.NewProviderStage(provider, tools, policy, config),
stage.NewStateStoreSaveStage(stateConfig), // Added
).
Build()
// Add validation
pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewStateStoreLoadStage(stateConfig),
stage.NewPromptAssemblyStage(registry, task, vars),
stage.NewValidationStage(validators, config), // Added
stage.NewProviderStage(provider, tools, policy, config),
stage.NewStateStoreSaveStage(stateConfig),
).
Build()

Order matters. User controls it:

pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewStateStoreLoadStage(stateConfig), // 1. Load history
stage.NewPromptAssemblyStage(registry, task, vars), // 2. Assemble prompt
stage.NewValidationStage(validators, config), // 3. Validate
stage.NewProviderStage(provider, tools, policy, config), // 4. Execute
stage.NewStateStoreSaveStage(stateConfig), // 5. Save state
).
Build()

Stages process elements as they flow, not in batches:

func (s *MyStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
defer close(output)
for elem := range input {
// Process each element immediately
result := s.transform(elem)
output <- result
}
return nil
}

Transforms elements 1:1 or 1:N:

type UppercaseStage struct {
stage.BaseStage
}
func (s *UppercaseStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
defer close(output)
for elem := range input {
if elem.Text != nil {
upper := strings.ToUpper(*elem.Text)
elem.Text = &upper
}
output <- elem
}
return nil
}

Collects multiple elements into one:

type MessageCollectorStage struct {
stage.BaseStage
}
func (s *MessageCollectorStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
defer close(output)
var messages []types.Message
for elem := range input {
if elem.Message != nil {
messages = append(messages, *elem.Message)
}
}
// Emit single element with all messages
output <- stage.StreamElement{
Metadata: map[string]interface{}{
"messages": messages,
},
}
return nil
}

Produces multiple elements from one input:

type TokenizerStage struct {
stage.BaseStage
}
func (s *TokenizerStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
defer close(output)
for elem := range input {
if elem.Text != nil {
// Split into tokens, emit each
tokens := strings.Fields(*elem.Text)
for _, token := range tokens {
t := token
output <- stage.StreamElement{Text: &t}
}
}
}
return nil
}

Terminal stage that consumes without producing:

type LoggerStage struct {
stage.BaseStage
logger *log.Logger
}
func (s *LoggerStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
defer close(output)
for elem := range input {
s.logger.Printf("Element: %+v", elem)
// Don't emit anything
}
return nil
}
StagePurpose
StateStoreLoadStageLoad conversation history
StateStoreSaveStagePersist conversation state
PromptAssemblyStageAssemble prompts from registry
TemplateStageVariable substitution
ValidationStageContent validation
ProviderStageLLM execution with tool support
StagePurpose
AudioTurnStageVAD-based turn detection
STTStageSpeech-to-text transcription
TTSStageText-to-speech synthesis
TTSStageWithInterruptionTTS with barge-in support
DuplexProviderStageBidirectional WebSocket streaming
VADAccumulatorStageAudio buffering with VAD
StagePurpose
RouterStageDynamic routing to multiple outputs
MergeStageCombine multiple inputs
MetricsStagePerformance monitoring
TracingStageDistributed tracing
PriorityChannelPriority-based scheduling
StagePurpose
DebugStagePipeline debugging
VariableProviderStageDynamic variable resolution
MediaExternalizerStageExternal media storage
ContextBuilderStageToken budget management
type MyStage struct {
stage.BaseStage
config MyConfig
}
func NewMyStage(config MyConfig) *MyStage {
return &MyStage{
BaseStage: stage.NewBaseStage("my_stage", stage.StageTypeTransform),
config: config,
}
}
func (s *MyStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
defer close(output) // Always close output
for elem := range input {
// Transform element
result := s.transform(elem)
// Send with cancellation check
select {
case output <- result:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
type RateLimitConfig struct {
RequestsPerSecond int
BurstSize int
}
type RateLimitStage struct {
stage.BaseStage
limiter *rate.Limiter
}
func NewRateLimitStage(config RateLimitConfig) *RateLimitStage {
return &RateLimitStage{
BaseStage: stage.NewBaseStage("rate_limit", stage.StageTypeTransform),
limiter: rate.NewLimiter(rate.Limit(config.RequestsPerSecond), config.BurstSize),
}
}
func (s *RateLimitStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
defer close(output)
for elem := range input {
if err := s.limiter.Wait(ctx); err != nil {
return err
}
output <- elem
}
return nil
}
func (s *MyStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
defer close(output)
for elem := range input {
result, err := s.process(elem)
if err != nil {
// Option 1: Send error element and continue
output <- stage.NewErrorElement(err)
continue
// Option 2: Return error to stop pipeline
// return err
}
output <- result
}
return nil
}

Stages that prepare input:

  • StateStoreLoadStage: Load conversation history
  • PromptAssemblyStage: Build prompt from registry
  • TemplateStage: Substitute variables
  • ValidationStage: Check input constraints

Stages that handle output:

  • ValidationStage: Check output constraints
  • StateStoreSaveStage: Persist conversation
  • MetricsStage: Record performance
  • DebugStage: Log for debugging

Split stream to multiple destinations:

pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewProviderStage(provider, tools, policy, config),
).
Branch("provider", "tts", "logger").
Build()

Combine multiple streams:

mergeStage := stage.NewMergeStage(inputChannels...)
pipeline := stage.NewPipelineBuilder().
Chain(
// 1. Load state first
stage.NewStateStoreLoadStage(stateConfig),
// 2. Resolve variables
stage.NewVariableProviderStage(providers),
// 3. Assemble prompt
stage.NewPromptAssemblyStage(registry, task, vars),
// 4. Apply templates
stage.NewTemplateStage(),
// 5. Validate input
stage.NewValidationStage(inputValidators, config),
// 6. Execute LLM
stage.NewProviderStage(provider, tools, policy, config),
// 7. Validate output
stage.NewValidationStage(outputValidators, config),
// 8. Save state
stage.NewStateStoreSaveStage(stateConfig),
).
Build()

State early: Load history before prompt assembly Variables before templates: Resolve values, then substitute Validation twice: Before and after LLM execution Save last: Persist after all processing complete

Control latency vs. throughput:

config := stage.DefaultPipelineConfig().
WithChannelBufferSize(32) // Larger buffer = higher throughput, more latency

Stages run in parallel by default. Heavy stages don’t block light ones.

Slow consumers automatically throttle producers via channel blocking.

Test stages in isolation:

func TestUppercaseStage(t *testing.T) {
s := NewUppercaseStage()
input := make(chan stage.StreamElement, 1)
output := make(chan stage.StreamElement, 1)
go s.Process(context.Background(), input, output)
text := "hello"
input <- stage.StreamElement{Text: &text}
close(input)
result := <-output
assert.Equal(t, "HELLO", *result.Text)
}

Test stages in pipeline:

func TestPipeline(t *testing.T) {
pipeline := stage.NewPipelineBuilder().
Chain(
NewUppercaseStage(),
NewTrimStage(),
).
Build()
text := " hello "
input := make(chan stage.StreamElement, 1)
input <- stage.StreamElement{Text: &text}
close(input)
output, _ := pipeline.Execute(context.Background(), input)
result := <-output
assert.Equal(t, "HELLO", *result.Text)
}

Stage design provides:

  • Single Responsibility: Each stage has one job
  • Composability: Mix and match stages
  • Streaming: Process elements as they flow
  • Concurrency: Stages run in parallel
  • Testability: Test stages independently
  • Extensibility: Easy to add custom stages