Stage Design
Understanding Runtime’s composable stage architecture.
Overview
Section titled “Overview”Stages are the core abstraction in Runtime. Every component that processes streaming data is a stage.
Core Concept
Section titled “Core Concept”A stage transforms streaming elements:
type Stage interface { Name() string Type() StageType Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error}Stages connect via channels to form pipelines:
[Stage A] ──channel──▶ [Stage B] ──channel──▶ [Stage C]Each stage runs in its own goroutine, enabling concurrent processing.
Why Stages?
Section titled “Why Stages?”Problem: Cross-Cutting Concerns
Section titled “Problem: Cross-Cutting Concerns”Applications need many features:
- State management
- Prompt assembly
- Validation
- LLM execution
- Speech-to-text
- Text-to-speech
- Metrics
- Error handling
Without stages, code becomes tangled:
// Bad: Everything in one functionfunc execute(audio []byte) []byte { // VAD detection if !detectSpeech(audio) { return nil }
// Transcribe text := transcribe(audio)
// Load state history := loadFromRedis(sessionID)
// Call LLM response := openai.Complete(text, history)
// Save state saveToRedis(sessionID, response)
// Generate speech return synthesize(response)}Problems:
- Hard to test individual components
- Can’t reuse logic
- No streaming support
- Difficult to add features
Solution: Stage Pipeline
Section titled “Solution: Stage Pipeline”With stages, concerns are separate and stream-capable:
pipeline := stage.NewPipelineBuilder(). Chain( stage.NewAudioTurnStage(vadConfig), stage.NewSTTStage(sttService, sttConfig), stage.NewStateStoreLoadStage(stateConfig), stage.NewProviderStage(provider, tools, policy, config), stage.NewStateStoreSaveStage(stateConfig), stage.NewTTSStage(ttsService, ttsConfig), ). Build()Each stage focuses on one thing, and data streams through.
Design Principles
Section titled “Design Principles”1. Single Responsibility
Section titled “1. Single Responsibility”Each stage does one thing:
StateStoreLoadStage: Only loads conversation history PromptAssemblyStage: Only assembles prompts ValidationStage: Only validates content ProviderStage: Only calls the LLM
2. Composability
Section titled “2. Composability”Stages combine easily:
// Simple pipelinepipeline := stage.NewPipelineBuilder(). Chain( stage.NewPromptAssemblyStage(registry, task, vars), stage.NewProviderStage(provider, tools, policy, config), ). Build()
// Add state managementpipeline := stage.NewPipelineBuilder(). Chain( stage.NewStateStoreLoadStage(stateConfig), // Added stage.NewPromptAssemblyStage(registry, task, vars), stage.NewProviderStage(provider, tools, policy, config), stage.NewStateStoreSaveStage(stateConfig), // Added ). Build()
// Add validationpipeline := stage.NewPipelineBuilder(). Chain( stage.NewStateStoreLoadStage(stateConfig), stage.NewPromptAssemblyStage(registry, task, vars), stage.NewValidationStage(validators, config), // Added stage.NewProviderStage(provider, tools, policy, config), stage.NewStateStoreSaveStage(stateConfig), ). Build()3. Explicit Ordering
Section titled “3. Explicit Ordering”Order matters. User controls it:
pipeline := stage.NewPipelineBuilder(). Chain( stage.NewStateStoreLoadStage(stateConfig), // 1. Load history stage.NewPromptAssemblyStage(registry, task, vars), // 2. Assemble prompt stage.NewValidationStage(validators, config), // 3. Validate stage.NewProviderStage(provider, tools, policy, config), // 4. Execute stage.NewStateStoreSaveStage(stateConfig), // 5. Save state ). Build()4. Streaming First
Section titled “4. Streaming First”Stages process elements as they flow, not in batches:
func (s *MyStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error { defer close(output)
for elem := range input { // Process each element immediately result := s.transform(elem) output <- result } return nil}Stage Types
Section titled “Stage Types”Transform Stage
Section titled “Transform Stage”Transforms elements 1:1 or 1:N:
type UppercaseStage struct { stage.BaseStage}
func (s *UppercaseStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error { defer close(output)
for elem := range input { if elem.Text != nil { upper := strings.ToUpper(*elem.Text) elem.Text = &upper } output <- elem } return nil}Accumulate Stage
Section titled “Accumulate Stage”Collects multiple elements into one:
type MessageCollectorStage struct { stage.BaseStage}
func (s *MessageCollectorStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error { defer close(output)
var messages []types.Message for elem := range input { if elem.Message != nil { messages = append(messages, *elem.Message) } }
// Emit single element with all messages output <- stage.StreamElement{ Metadata: map[string]interface{}{ "messages": messages, }, } return nil}Generate Stage
Section titled “Generate Stage”Produces multiple elements from one input:
type TokenizerStage struct { stage.BaseStage}
func (s *TokenizerStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error { defer close(output)
for elem := range input { if elem.Text != nil { // Split into tokens, emit each tokens := strings.Fields(*elem.Text) for _, token := range tokens { t := token output <- stage.StreamElement{Text: &t} } } } return nil}Sink Stage
Section titled “Sink Stage”Terminal stage that consumes without producing:
type LoggerStage struct { stage.BaseStage logger *log.Logger}
func (s *LoggerStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error { defer close(output)
for elem := range input { s.logger.Printf("Element: %+v", elem) // Don't emit anything } return nil}Built-In Stages
Section titled “Built-In Stages”Core Stages
Section titled “Core Stages”| Stage | Purpose |
|---|---|
StateStoreLoadStage | Load conversation history |
StateStoreSaveStage | Persist conversation state |
PromptAssemblyStage | Assemble prompts from registry |
TemplateStage | Variable substitution |
ValidationStage | Content validation |
ProviderStage | LLM execution with tool support |
Streaming Stages
Section titled “Streaming Stages”| Stage | Purpose |
|---|---|
AudioTurnStage | VAD-based turn detection |
STTStage | Speech-to-text transcription |
TTSStage | Text-to-speech synthesis |
TTSStageWithInterruption | TTS with barge-in support |
DuplexProviderStage | Bidirectional WebSocket streaming |
VADAccumulatorStage | Audio buffering with VAD |
Advanced Stages
Section titled “Advanced Stages”| Stage | Purpose |
|---|---|
RouterStage | Dynamic routing to multiple outputs |
MergeStage | Combine multiple inputs |
MetricsStage | Performance monitoring |
TracingStage | Distributed tracing |
PriorityChannel | Priority-based scheduling |
Utility Stages
Section titled “Utility Stages”| Stage | Purpose |
|---|---|
DebugStage | Pipeline debugging |
VariableProviderStage | Dynamic variable resolution |
MediaExternalizerStage | External media storage |
ContextBuilderStage | Token budget management |
Creating Custom Stages
Section titled “Creating Custom Stages”Basic Pattern
Section titled “Basic Pattern”type MyStage struct { stage.BaseStage config MyConfig}
func NewMyStage(config MyConfig) *MyStage { return &MyStage{ BaseStage: stage.NewBaseStage("my_stage", stage.StageTypeTransform), config: config, }}
func (s *MyStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error { defer close(output) // Always close output
for elem := range input { // Transform element result := s.transform(elem)
// Send with cancellation check select { case output <- result: case <-ctx.Done(): return ctx.Err() } } return nil}With Configuration
Section titled “With Configuration”type RateLimitConfig struct { RequestsPerSecond int BurstSize int}
type RateLimitStage struct { stage.BaseStage limiter *rate.Limiter}
func NewRateLimitStage(config RateLimitConfig) *RateLimitStage { return &RateLimitStage{ BaseStage: stage.NewBaseStage("rate_limit", stage.StageTypeTransform), limiter: rate.NewLimiter(rate.Limit(config.RequestsPerSecond), config.BurstSize), }}
func (s *RateLimitStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error { defer close(output)
for elem := range input { if err := s.limiter.Wait(ctx); err != nil { return err } output <- elem } return nil}Error Handling
Section titled “Error Handling”func (s *MyStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error { defer close(output)
for elem := range input { result, err := s.process(elem) if err != nil { // Option 1: Send error element and continue output <- stage.NewErrorElement(err) continue
// Option 2: Return error to stop pipeline // return err } output <- result } return nil}Design Patterns
Section titled “Design Patterns”Pre-Processing
Section titled “Pre-Processing”Stages that prepare input:
- StateStoreLoadStage: Load conversation history
- PromptAssemblyStage: Build prompt from registry
- TemplateStage: Substitute variables
- ValidationStage: Check input constraints
Post-Processing
Section titled “Post-Processing”Stages that handle output:
- ValidationStage: Check output constraints
- StateStoreSaveStage: Persist conversation
- MetricsStage: Record performance
- DebugStage: Log for debugging
Branching
Section titled “Branching”Split stream to multiple destinations:
pipeline := stage.NewPipelineBuilder(). Chain( stage.NewProviderStage(provider, tools, policy, config), ). Branch("provider", "tts", "logger"). Build()Merging
Section titled “Merging”Combine multiple streams:
mergeStage := stage.NewMergeStage(inputChannels...)Ordering Best Practices
Section titled “Ordering Best Practices”Recommended Order
Section titled “Recommended Order”pipeline := stage.NewPipelineBuilder(). Chain( // 1. Load state first stage.NewStateStoreLoadStage(stateConfig),
// 2. Resolve variables stage.NewVariableProviderStage(providers),
// 3. Assemble prompt stage.NewPromptAssemblyStage(registry, task, vars),
// 4. Apply templates stage.NewTemplateStage(),
// 5. Validate input stage.NewValidationStage(inputValidators, config),
// 6. Execute LLM stage.NewProviderStage(provider, tools, policy, config),
// 7. Validate output stage.NewValidationStage(outputValidators, config),
// 8. Save state stage.NewStateStoreSaveStage(stateConfig), ). Build()Order Principles
Section titled “Order Principles”State early: Load history before prompt assembly Variables before templates: Resolve values, then substitute Validation twice: Before and after LLM execution Save last: Persist after all processing complete
Performance Considerations
Section titled “Performance Considerations”Channel Buffer Size
Section titled “Channel Buffer Size”Control latency vs. throughput:
config := stage.DefaultPipelineConfig(). WithChannelBufferSize(32) // Larger buffer = higher throughput, more latencyConcurrent Processing
Section titled “Concurrent Processing”Stages run in parallel by default. Heavy stages don’t block light ones.
Backpressure
Section titled “Backpressure”Slow consumers automatically throttle producers via channel blocking.
Testing Stages
Section titled “Testing Stages”Unit Testing
Section titled “Unit Testing”Test stages in isolation:
func TestUppercaseStage(t *testing.T) { s := NewUppercaseStage()
input := make(chan stage.StreamElement, 1) output := make(chan stage.StreamElement, 1)
go s.Process(context.Background(), input, output)
text := "hello" input <- stage.StreamElement{Text: &text} close(input)
result := <-output assert.Equal(t, "HELLO", *result.Text)}Integration Testing
Section titled “Integration Testing”Test stages in pipeline:
func TestPipeline(t *testing.T) { pipeline := stage.NewPipelineBuilder(). Chain( NewUppercaseStage(), NewTrimStage(), ). Build()
text := " hello " input := make(chan stage.StreamElement, 1) input <- stage.StreamElement{Text: &text} close(input)
output, _ := pipeline.Execute(context.Background(), input)
result := <-output assert.Equal(t, "HELLO", *result.Text)}Summary
Section titled “Summary”Stage design provides:
- Single Responsibility: Each stage has one job
- Composability: Mix and match stages
- Streaming: Process elements as they flow
- Concurrency: Stages run in parallel
- Testability: Test stages independently
- Extensibility: Easy to add custom stages
Related Topics
Section titled “Related Topics”- Pipeline Architecture - How stages form pipelines
- State Management - StateStore stages
- Stage Reference - Complete API