Pipeline Architecture
Understanding Runtime’s stage-based streaming pipeline design.
Overview
Runtime uses a stage-based streaming architecture for processing LLM requests. This design provides true streaming execution, concurrent processing, and a composable DAG (Directed Acyclic Graph) of processing units.
Core Concept
A pipeline is a DAG of stages that process streaming elements:
Input → [Stage 1] → [Stage 2] → [Stage N] → Output
│ │
▼ ▼
[Branch A] [Branch B]
Each stage:
- Runs in its own goroutine
- Receives elements via input channel
- Sends processed elements via output channel
- Supports true streaming (elements flow as they’re produced)
Why Stages?
Streaming First
The stage architecture is designed for streaming scenarios like voice applications:
- Streaming input: Audio chunks from microphone
- Accumulation: VAD detects turn boundaries
- Processing: Transcribe, call LLM, generate TTS
- Streaming output: Audio chunks to speaker
Stages model this as a reactive stream where data flows through connected processing units:
pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewAudioTurnStage(vadConfig), // Accumulate until turn complete
stage.NewSTTStage(sttService, sttConfig), // Transcribe audio
stage.NewProviderStage(provider, tools, policy, config), // Call LLM
stage.NewTTSStageWithInterruption(ttsService, handler, ttsConfig), // Generate audio
).
Build()
StreamElement
The fundamental unit of data flowing through the pipeline:
type StreamElement struct {
// Content types (at most one is set)
Text *string
Audio *AudioData
Video *VideoData
Image *ImageData
Message *types.Message
ToolCall *types.ToolCall
Parts []types.ContentPart
// Metadata for inter-stage communication
Metadata map[string]interface{}
// Control and observability
Priority Priority // Low, Normal, High, Critical
Error error
Timestamp time.Time
}
Key insight: Each element carries one content type, enabling type-safe routing and priority scheduling.
Stage Interface
All stages implement:
type Stage interface {
Name() string
Type() StageType
Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error
}
Stage Types
| Type | Pattern | Example |
|---|---|---|
| Transform | 1:1 or 1:N | Validation, enrichment |
| Accumulate | N:1 | VAD buffering, message collection |
| Generate | 0:N | LLM streaming, TTS |
| Sink | N:0 | State store save, metrics |
| Bidirectional | Varies | WebSocket session |
Contract
Every stage must:
- Read from input channel until closed
- Send results to output channel
- Close output channel when done
- Respect context cancellation
func (s *MyStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
defer close(output) // Always close output
for elem := range input {
// Process element
result := s.transform(elem)
// Send with cancellation check
select {
case output <- result:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
Pipeline Execution
Streaming Execution
Elements flow through the pipeline as they’re produced:
output, err := pipeline.Execute(ctx, input)
if err != nil {
return err
}
for elem := range output {
// Process elements as they arrive
if elem.Text != nil {
fmt.Print(*elem.Text)
}
}
Synchronous Execution
For request/response patterns, ExecuteSync collects all output:
result, err := pipeline.ExecuteSync(ctx, inputElements...)
// result.Messages contains all messages
// result.Response contains the final response
This is just Execute() + drain and accumulate.
Execution Modes
Text Mode (Request/Response)
Standard HTTP-based LLM interactions:
Message → StateStoreLoad → PromptAssembly → Template → Provider → Validation → StateStoreSave → Response
Use cases: Chat applications, content generation
VAD Mode (Voice Activity Detection)
For voice applications using text-based LLMs:
Audio → AudioTurn → STT → StateStoreLoad → PromptAssembly → Template → Provider → TTS → StateStoreSave → Audio
Use cases: Voice assistants, telephony integrations
ASM Mode (Audio Streaming)
For native multimodal LLMs with real-time audio:
Audio/Text → StateStoreLoad → PromptAssembly → Template → DuplexProvider → StateStoreSave → Audio/Text
Use cases: Gemini Live API, real-time voice conversations
Concurrency Model
Goroutine Lifecycle
Each stage runs in its own goroutine, managed by the pipeline:
func (p *Pipeline) Execute(ctx context.Context, input <-chan StreamElement) (<-chan StreamElement, error) {
ctx, cancel := context.WithCancel(ctx)
// Start each stage in its own goroutine
current := input
for _, stg := range p.stages {
output := make(chan StreamElement, p.config.ChannelBufferSize)
go func(s Stage, in <-chan StreamElement, out chan<- StreamElement) {
if err := s.Process(ctx, in, out); err != nil {
// Error handling
}
}(stg, current, output)
current = output
}
return current, nil
}
Backpressure
Channel-based communication naturally handles backpressure:
- Slow consumers block producers
- Buffer size controls latency vs. throughput tradeoff
- No unbounded buffering
Shutdown
Graceful shutdown propagates through the pipeline:
func (p *Pipeline) Shutdown(timeout time.Duration) error {
p.cancel() // Cancel context
// Wait for all stages to complete
done := make(chan struct{})
go func() {
p.wg.Wait()
close(done)
}()
select {
case <-done:
return nil
case <-time.After(timeout):
return ErrShutdownTimeout
}
}
Pipeline Builder
The builder constructs pipelines with a fluent API:
pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewStateStoreLoadStage(stateConfig),
stage.NewPromptAssemblyStage(registry, taskType, vars),
stage.NewProviderStage(provider, tools, policy, config),
stage.NewStateStoreSaveStage(stateConfig),
).
Build()
Branching
For parallel processing paths:
pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewProviderStage(provider, tools, policy, config),
).
Branch("provider", "tts", "text_output"). // Fork output
Chain(
stage.NewTTSStage(ttsService, config),
).
Build()
DAG Validation
The builder validates the pipeline structure:
- Detects cycles
- Verifies all stages are connected
- Checks for duplicate stage names
Error Handling
Error Elements
Errors can be sent as elements for downstream handling:
if err := s.validate(elem); err != nil {
output <- stage.NewErrorElement(err)
continue // Process next element
}
Fatal Errors
Returning an error from Process() stops the pipeline:
if err := s.criticalOperation(elem); err != nil {
return err // Pipeline stops
}
Context Cancellation
All stages should respect context cancellation:
select {
case output <- elem:
// Success
case <-ctx.Done():
return ctx.Err() // Pipeline cancelled
}
Event Integration
The pipeline emits events for observability:
// Automatic events for all stages
EventStageStarted // When stage begins processing
EventStageCompleted // When stage finishes successfully
EventStageFailed // When stage encounters an error
// Pipeline lifecycle
EventPipelineStarted
EventPipelineCompleted
EventPipelineFailed
These events are automatically emitted by the pipeline - stage authors don’t need to emit them manually.
Performance Characteristics
Latency
| Scenario | Target | Notes |
|---|---|---|
| Audio chunk → VAD | < 10ms | Minimal buffering |
| Turn complete → LLM request | < 50ms | Channel hop only |
| LLM token → TTS chunk | < 50ms | Parallel processing |
| Channel hop overhead | ~1-2ms | Per stage |
Memory
- Channel buffers control memory usage
- No full response accumulation needed for streaming
- Element pooling available for high-throughput scenarios
Throughput
- Concurrent stage execution
- Backpressure prevents unbounded growth
- Priority scheduling for QoS
Design Decisions
Why Channels Over Callbacks?
Decision: Use Go channels for inter-stage communication
Rationale:
- Natural fit for Go’s concurrency model
- Built-in backpressure
- Easy to reason about
- Standard error propagation via context
Why One Goroutine Per Stage?
Decision: Each stage runs in exactly one goroutine
Rationale:
- Clear ownership of lifecycle
- Predictable resource usage
- Simple debugging (goroutine per stage)
- Easy to add metrics/tracing
Why Close Output Channel?
Decision: Stages must close their output channel when done
Rationale:
- Signal completion to downstream stages
- Enable
for rangeiteration - Prevent goroutine leaks
- Clear shutdown semantics
Summary
The stage-based pipeline architecture provides:
- True Streaming: Elements flow as they’re produced
- Concurrency: Each stage runs independently
- Backpressure: Slow consumers naturally throttle producers
- Composability: Build complex pipelines from simple stages
- Observability: Automatic events for all stages
- Type Safety: Strongly typed elements with clear contracts
Related Topics
- Stage Reference - Complete stage API
- Provider System - How providers integrate
- State Management - Conversation persistence