Pipeline Architecture
Understanding Runtime’s stage-based streaming pipeline design.
Overview
Section titled “Overview”Runtime uses a stage-based streaming architecture for processing LLM requests. This design provides true streaming execution, concurrent processing, and a composable DAG (Directed Acyclic Graph) of processing units.
Core Concept
Section titled “Core Concept”A pipeline is a DAG of stages that process streaming elements:
Each stage:
- Runs in its own goroutine
- Receives elements via input channel
- Sends processed elements via output channel
- Supports true streaming (elements flow as they’re produced)
Why Stages?
Section titled “Why Stages?”Streaming First
Section titled “Streaming First”The stage architecture is designed for streaming scenarios like voice applications:
- Streaming input: Audio chunks from microphone
- Accumulation: VAD detects turn boundaries
- Processing: Transcribe, call LLM, generate TTS
- Streaming output: Audio chunks to speaker
Stages model this as a reactive stream where data flows through connected processing units:
pipeline := stage.NewPipelineBuilder(). Chain( stage.NewAudioTurnStage(vadConfig), // Accumulate until turn complete stage.NewSTTStage(sttService, sttConfig), // Transcribe audio stage.NewProviderStage(provider, tools, policy, config), // Call LLM stage.NewTTSStageWithInterruption(ttsService, handler, ttsConfig), // Generate audio ). Build()StreamElement
Section titled “StreamElement”The fundamental unit of data flowing through the pipeline:
type StreamElement struct { // Content types (at most one is set) Text *string Audio *AudioData Video *VideoData Image *ImageData Message *types.Message ToolCall *types.ToolCall Parts []types.ContentPart
// Metadata for inter-stage communication Metadata map[string]interface{}
// Control and observability Priority Priority // Low, Normal, High, Critical Error error Timestamp time.Time}Key insight: Each element carries one content type, enabling type-safe routing and priority scheduling.
Stage Interface
Section titled “Stage Interface”All stages implement:
type Stage interface { Name() string Type() StageType Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error}Stage Types
Section titled “Stage Types”| Type | Pattern | Example |
|---|---|---|
| Transform | 1:1 or 1:N | Validation, enrichment |
| Accumulate | N:1 | VAD buffering, message collection |
| Generate | 0:N | LLM streaming, TTS |
| Sink | N:0 | State store save, metrics |
| Bidirectional | Varies | WebSocket session |
Contract
Section titled “Contract”Every stage must:
- Read from input channel until closed
- Send results to output channel
- Close output channel when done
- Respect context cancellation
func (s *MyStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error { defer close(output) // Always close output
for elem := range input { // Process element result := s.transform(elem)
// Send with cancellation check select { case output <- result: case <-ctx.Done(): return ctx.Err() } } return nil}Pipeline Execution
Section titled “Pipeline Execution”Streaming Execution
Section titled “Streaming Execution”Elements flow through the pipeline as they’re produced:
output, err := pipeline.Execute(ctx, input)if err != nil { return err}
for elem := range output { // Process elements as they arrive if elem.Text != nil { fmt.Print(*elem.Text) }}Synchronous Execution
Section titled “Synchronous Execution”For request/response patterns, ExecuteSync collects all output:
result, err := pipeline.ExecuteSync(ctx, inputElements...)// result.Messages contains all messages// result.Response contains the final responseThis is just Execute() + drain and accumulate.
Execution Modes
Section titled “Execution Modes”Text Mode (Request/Response)
Section titled “Text Mode (Request/Response)”Standard HTTP-based LLM interactions:
Use cases: Chat applications, content generation
VAD Mode (Voice Activity Detection)
Section titled “VAD Mode (Voice Activity Detection)”For voice applications using text-based LLMs:
Use cases: Voice assistants, telephony integrations
ASM Mode (Audio Streaming)
Section titled “ASM Mode (Audio Streaming)”For native multimodal LLMs with real-time audio:
Use cases: Gemini Live API, real-time voice conversations
Concurrency Model
Section titled “Concurrency Model”Goroutine Lifecycle
Section titled “Goroutine Lifecycle”Each stage runs in its own goroutine, managed by the pipeline:
func (p *Pipeline) Execute(ctx context.Context, input <-chan StreamElement) (<-chan StreamElement, error) { ctx, cancel := context.WithCancel(ctx)
// Start each stage in its own goroutine current := input for _, stg := range p.stages { output := make(chan StreamElement, p.config.ChannelBufferSize)
go func(s Stage, in <-chan StreamElement, out chan<- StreamElement) { if err := s.Process(ctx, in, out); err != nil { // Error handling } }(stg, current, output)
current = output }
return current, nil}Backpressure
Section titled “Backpressure”Channel-based communication naturally handles backpressure:
- Slow consumers block producers
- Buffer size controls latency vs. throughput tradeoff
- No unbounded buffering
Shutdown
Section titled “Shutdown”Graceful shutdown propagates through the pipeline:
func (p *Pipeline) Shutdown(timeout time.Duration) error { p.cancel() // Cancel context
// Wait for all stages to complete done := make(chan struct{}) go func() { p.wg.Wait() close(done) }()
select { case <-done: return nil case <-time.After(timeout): return ErrShutdownTimeout }}Pipeline Builder
Section titled “Pipeline Builder”The builder constructs pipelines with a fluent API:
pipeline := stage.NewPipelineBuilder(). Chain( stage.NewStateStoreLoadStage(stateConfig), stage.NewPromptAssemblyStage(registry, taskType, vars), stage.NewProviderStage(provider, tools, policy, config), stage.NewStateStoreSaveStage(stateConfig), ). Build()Branching
Section titled “Branching”For parallel processing paths:
pipeline := stage.NewPipelineBuilder(). Chain( stage.NewProviderStage(provider, tools, policy, config), ). Branch("provider", "tts", "text_output"). // Fork output Chain( stage.NewTTSStage(ttsService, config), ). Build()DAG Validation
Section titled “DAG Validation”The builder validates the pipeline structure:
- Detects cycles
- Verifies all stages are connected
- Checks for duplicate stage names
Error Handling
Section titled “Error Handling”Error Elements
Section titled “Error Elements”Errors can be sent as elements for downstream handling:
if err := s.validate(elem); err != nil { output <- stage.NewErrorElement(err) continue // Process next element}Fatal Errors
Section titled “Fatal Errors”Returning an error from Process() stops the pipeline:
if err := s.criticalOperation(elem); err != nil { return err // Pipeline stops}Context Cancellation
Section titled “Context Cancellation”All stages should respect context cancellation:
select {case output <- elem: // Successcase <-ctx.Done(): return ctx.Err() // Pipeline cancelled}Event Integration
Section titled “Event Integration”The pipeline emits events for observability:
// Automatic events for all stagesEventStageStarted // When stage begins processingEventStageCompleted // When stage finishes successfullyEventStageFailed // When stage encounters an error
// Pipeline lifecycleEventPipelineStartedEventPipelineCompletedEventPipelineFailedThese events are automatically emitted by the pipeline - stage authors don’t need to emit them manually.
Performance Characteristics
Section titled “Performance Characteristics”Latency
Section titled “Latency”| Scenario | Target | Notes |
|---|---|---|
| Audio chunk → VAD | < 10ms | Minimal buffering |
| Turn complete → LLM request | < 50ms | Channel hop only |
| LLM token → TTS chunk | < 50ms | Parallel processing |
| Channel hop overhead | ~1-2ms | Per stage |
Memory
Section titled “Memory”- Channel buffers control memory usage
- No full response accumulation needed for streaming
- Element pooling available for high-throughput scenarios
Throughput
Section titled “Throughput”- Concurrent stage execution
- Backpressure prevents unbounded growth
- Priority scheduling for QoS
Design Decisions
Section titled “Design Decisions”Why Channels Over Callbacks?
Section titled “Why Channels Over Callbacks?”Decision: Use Go channels for inter-stage communication
Rationale:
- Natural fit for Go’s concurrency model
- Built-in backpressure
- Easy to reason about
- Standard error propagation via context
Why One Goroutine Per Stage?
Section titled “Why One Goroutine Per Stage?”Decision: Each stage runs in exactly one goroutine
Rationale:
- Clear ownership of lifecycle
- Predictable resource usage
- Simple debugging (goroutine per stage)
- Easy to add metrics/tracing
Why Close Output Channel?
Section titled “Why Close Output Channel?”Decision: Stages must close their output channel when done
Rationale:
- Signal completion to downstream stages
- Enable
for rangeiteration - Prevent goroutine leaks
- Clear shutdown semantics
Summary
Section titled “Summary”The stage-based pipeline architecture provides:
- True Streaming: Elements flow as they’re produced
- Concurrency: Each stage runs independently
- Backpressure: Slow consumers naturally throttle producers
- Composability: Build complex pipelines from simple stages
- Observability: Automatic events for all stages
- Type Safety: Strongly typed elements with clear contracts
Related Topics
Section titled “Related Topics”- Stage Reference - Complete stage API
- Provider System - How providers integrate
- State Management - Conversation persistence