Skip to content

Pipeline Architecture

Understanding Runtime’s stage-based streaming pipeline design.

Runtime uses a stage-based streaming architecture for processing LLM requests. This design provides true streaming execution, concurrent processing, and a composable DAG (Directed Acyclic Graph) of processing units.

A pipeline is a DAG of stages that process streaming elements:

Diagram

Each stage:

  • Runs in its own goroutine
  • Receives elements via input channel
  • Sends processed elements via output channel
  • Supports true streaming (elements flow as they’re produced)

The stage architecture is designed for streaming scenarios like voice applications:

  1. Streaming input: Audio chunks from microphone
  2. Accumulation: VAD detects turn boundaries
  3. Processing: Transcribe, call LLM, generate TTS
  4. Streaming output: Audio chunks to speaker

Stages model this as a reactive stream where data flows through connected processing units:

pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewAudioTurnStage(vadConfig), // Accumulate until turn complete
stage.NewSTTStage(sttService, sttConfig), // Transcribe audio
stage.NewProviderStage(provider, tools, policy, config), // Call LLM
stage.NewTTSStageWithInterruption(ttsService, handler, ttsConfig), // Generate audio
).
Build()

The fundamental unit of data flowing through the pipeline:

type StreamElement struct {
// Content types (at most one is set)
Text *string
Audio *AudioData
Video *VideoData
Image *ImageData
Message *types.Message
ToolCall *types.ToolCall
Parts []types.ContentPart
// Metadata for inter-stage communication
Metadata map[string]interface{}
// Control and observability
Priority Priority // Low, Normal, High, Critical
Error error
Timestamp time.Time
}

Key insight: Each element carries one content type, enabling type-safe routing and priority scheduling.

All stages implement:

type Stage interface {
Name() string
Type() StageType
Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error
}
TypePatternExample
Transform1:1 or 1:NValidation, enrichment
AccumulateN:1VAD buffering, message collection
Generate0:NLLM streaming, TTS
SinkN:0State store save, metrics
BidirectionalVariesWebSocket session

Every stage must:

  1. Read from input channel until closed
  2. Send results to output channel
  3. Close output channel when done
  4. Respect context cancellation
func (s *MyStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
defer close(output) // Always close output
for elem := range input {
// Process element
result := s.transform(elem)
// Send with cancellation check
select {
case output <- result:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

Elements flow through the pipeline as they’re produced:

output, err := pipeline.Execute(ctx, input)
if err != nil {
return err
}
for elem := range output {
// Process elements as they arrive
if elem.Text != nil {
fmt.Print(*elem.Text)
}
}

For request/response patterns, ExecuteSync collects all output:

result, err := pipeline.ExecuteSync(ctx, inputElements...)
// result.Messages contains all messages
// result.Response contains the final response

This is just Execute() + drain and accumulate.

Standard HTTP-based LLM interactions:

Diagram

Use cases: Chat applications, content generation

For voice applications using text-based LLMs:

Diagram

Use cases: Voice assistants, telephony integrations

For native multimodal LLMs with real-time audio:

Diagram

Use cases: Gemini Live API, real-time voice conversations

Each stage runs in its own goroutine, managed by the pipeline:

func (p *Pipeline) Execute(ctx context.Context, input <-chan StreamElement) (<-chan StreamElement, error) {
ctx, cancel := context.WithCancel(ctx)
// Start each stage in its own goroutine
current := input
for _, stg := range p.stages {
output := make(chan StreamElement, p.config.ChannelBufferSize)
go func(s Stage, in <-chan StreamElement, out chan<- StreamElement) {
if err := s.Process(ctx, in, out); err != nil {
// Error handling
}
}(stg, current, output)
current = output
}
return current, nil
}

Channel-based communication naturally handles backpressure:

  • Slow consumers block producers
  • Buffer size controls latency vs. throughput tradeoff
  • No unbounded buffering

Graceful shutdown propagates through the pipeline:

func (p *Pipeline) Shutdown(timeout time.Duration) error {
p.cancel() // Cancel context
// Wait for all stages to complete
done := make(chan struct{})
go func() {
p.wg.Wait()
close(done)
}()
select {
case <-done:
return nil
case <-time.After(timeout):
return ErrShutdownTimeout
}
}

The builder constructs pipelines with a fluent API:

pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewStateStoreLoadStage(stateConfig),
stage.NewPromptAssemblyStage(registry, taskType, vars),
stage.NewProviderStage(provider, tools, policy, config),
stage.NewStateStoreSaveStage(stateConfig),
).
Build()

For parallel processing paths:

pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewProviderStage(provider, tools, policy, config),
).
Branch("provider", "tts", "text_output"). // Fork output
Chain(
stage.NewTTSStage(ttsService, config),
).
Build()

The builder validates the pipeline structure:

  • Detects cycles
  • Verifies all stages are connected
  • Checks for duplicate stage names

Errors can be sent as elements for downstream handling:

if err := s.validate(elem); err != nil {
output <- stage.NewErrorElement(err)
continue // Process next element
}

Returning an error from Process() stops the pipeline:

if err := s.criticalOperation(elem); err != nil {
return err // Pipeline stops
}

All stages should respect context cancellation:

select {
case output <- elem:
// Success
case <-ctx.Done():
return ctx.Err() // Pipeline cancelled
}

The pipeline emits events for observability:

// Automatic events for all stages
EventStageStarted // When stage begins processing
EventStageCompleted // When stage finishes successfully
EventStageFailed // When stage encounters an error
// Pipeline lifecycle
EventPipelineStarted
EventPipelineCompleted
EventPipelineFailed

These events are automatically emitted by the pipeline - stage authors don’t need to emit them manually.

ScenarioTargetNotes
Audio chunk → VAD< 10msMinimal buffering
Turn complete → LLM request< 50msChannel hop only
LLM token → TTS chunk< 50msParallel processing
Channel hop overhead~1-2msPer stage
  • Channel buffers control memory usage
  • No full response accumulation needed for streaming
  • Element pooling available for high-throughput scenarios
  • Concurrent stage execution
  • Backpressure prevents unbounded growth
  • Priority scheduling for QoS

Decision: Use Go channels for inter-stage communication

Rationale:

  • Natural fit for Go’s concurrency model
  • Built-in backpressure
  • Easy to reason about
  • Standard error propagation via context

Decision: Each stage runs in exactly one goroutine

Rationale:

  • Clear ownership of lifecycle
  • Predictable resource usage
  • Simple debugging (goroutine per stage)
  • Easy to add metrics/tracing

Decision: Stages must close their output channel when done

Rationale:

  • Signal completion to downstream stages
  • Enable for range iteration
  • Prevent goroutine leaks
  • Clear shutdown semantics

The stage-based pipeline architecture provides:

  • True Streaming: Elements flow as they’re produced
  • Concurrency: Each stage runs independently
  • Backpressure: Slow consumers naturally throttle producers
  • Composability: Build complex pipelines from simple stages
  • Observability: Automatic events for all stages
  • Type Safety: Strongly typed elements with clear contracts