Skip to content

Pipeline Architecture

Understanding Runtime’s stage-based streaming pipeline design.

Runtime uses a stage-based streaming architecture for processing LLM requests. This design provides true streaming execution, concurrent processing, and a composable DAG (Directed Acyclic Graph) of processing units.

A pipeline is a DAG of stages that process streaming elements:

Diagram

Each stage:

  • Runs in its own goroutine
  • Receives elements via input channel
  • Sends processed elements via output channel
  • Supports true streaming (elements flow as they’re produced)

The stage architecture is designed for streaming scenarios like voice applications:

  1. Streaming input: Audio chunks from microphone
  2. Accumulation: VAD detects turn boundaries
  3. Processing: Transcribe, call LLM, generate TTS
  4. Streaming output: Audio chunks to speaker

Stages model this as a reactive stream where data flows through connected processing units:

pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewAudioTurnStage(vadConfig), // Accumulate until turn complete
stage.NewSTTStage(sttService, sttConfig), // Transcribe audio
stage.NewProviderStage(provider, tools, policy, config), // Call LLM
stage.NewTTSStageWithInterruption(ttsService, handler, ttsConfig), // Generate audio
).
Build()

The fundamental unit of data flowing through the pipeline:

type StreamElement struct {
// Content types (at most one is set)
Text *string
Audio *AudioData
Video *VideoData
Image *ImageData
Message *types.Message
ToolCall *types.ToolCall
Parts []types.ContentPart
// Metadata for inter-stage communication
Metadata map[string]interface{}
// Control and observability
Priority Priority // Low, Normal, High, Critical
Error error
Timestamp time.Time
}

Key insight: Each element carries one content type, enabling type-safe routing and priority scheduling.

Pipelines support BaseMetadata — a set of key-value pairs that are automatically merged into every StreamElement entering the pipeline. This is designed for session-level context (e.g., session_id, tenant_id, user_id) that should be available to all stages and providers on every turn without manual injection per element.

pipeline.BaseMetadata = map[string]interface{}{
"session_id": sessionID,
"tenant_id": tenantID,
}

Base metadata is merged at the Execute/ExecuteSync boundary. Per-element metadata takes precedence on key collision, so stages and callers can always override base values for specific elements.

All stages implement:

type Stage interface {
Name() string
Type() StageType
Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error
}
TypePatternExample
Transform1:1 or 1:NValidation, enrichment
AccumulateN:1VAD buffering, message collection
Generate0:NLLM streaming, TTS
SinkN:0State store save, metrics
Observe1:1 (pass-through)RecordingStage
BidirectionalVariesWebSocket session

Every stage must:

  1. Read from input channel until closed
  2. Send results to output channel
  3. Close output channel when done
  4. Respect context cancellation
func (s *MyStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
defer close(output) // Always close output
for elem := range input {
// Process element
result := s.transform(elem)
// Send with cancellation check
select {
case output <- result:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

Elements flow through the pipeline as they’re produced:

output, err := pipeline.Execute(ctx, input)
if err != nil {
return err
}
for elem := range output {
// Process elements as they arrive
if elem.Text != nil {
fmt.Print(*elem.Text)
}
}

For request/response patterns, ExecuteSync collects all output:

result, err := pipeline.ExecuteSync(ctx, inputElements...)
// result.Messages contains all messages
// result.Response contains the final response

This is just Execute() + drain and accumulate.

Standard HTTP-based LLM interactions:

Diagram

RecordingIn and RecordingOut are optional RecordingStage instances that capture user input and assistant output as events on the EventBus. They pass data through unchanged.

Hooks (guardrails, tool hooks) run inside ProviderStage — they are not separate pipeline stages. Provider hooks execute before/after each LLM call, and chunk interceptors run on each streaming chunk within the provider stage.

Use cases: Chat applications, content generation

For voice applications using text-based LLMs:

Diagram

Use cases: Voice assistants, telephony integrations

For native multimodal LLMs with real-time audio:

Diagram

Use cases: Gemini Live API, real-time voice conversations

Each stage runs in its own goroutine, managed by the pipeline:

func (p *Pipeline) Execute(ctx context.Context, input <-chan StreamElement) (<-chan StreamElement, error) {
ctx, cancel := context.WithCancel(ctx)
// Start each stage in its own goroutine
current := input
for _, stg := range p.stages {
output := make(chan StreamElement, p.config.ChannelBufferSize)
go func(s Stage, in <-chan StreamElement, out chan<- StreamElement) {
if err := s.Process(ctx, in, out); err != nil {
// Error handling
}
}(stg, current, output)
current = output
}
return current, nil
}

Channel-based communication naturally handles backpressure:

  • Slow consumers block producers
  • Buffer size controls latency vs. throughput tradeoff
  • No unbounded buffering

Graceful shutdown propagates through the pipeline:

func (p *Pipeline) Shutdown(timeout time.Duration) error {
p.cancel() // Cancel context
// Wait for all stages to complete
done := make(chan struct{})
go func() {
p.wg.Wait()
close(done)
}()
select {
case <-done:
return nil
case <-time.After(timeout):
return ErrShutdownTimeout
}
}

The builder constructs pipelines with a fluent API:

pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewStateStoreLoadStage(stateConfig),
stage.NewPromptAssemblyStage(registry, taskType, vars),
stage.NewProviderStage(provider, tools, policy, config),
stage.NewStateStoreSaveStage(stateConfig),
).
Build()

For parallel processing paths:

pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewProviderStage(provider, tools, policy, config),
).
Branch("provider", "tts", "text_output"). // Fork output
Chain(
stage.NewTTSStage(ttsService, config),
).
Build()

The builder validates the pipeline structure:

  • Detects cycles
  • Verifies all stages are connected
  • Checks for duplicate stage names

Errors can be sent as elements for downstream handling:

if err := s.validate(elem); err != nil {
output <- stage.NewErrorElement(err)
continue // Process next element
}

Returning an error from Process() stops the pipeline:

if err := s.criticalOperation(elem); err != nil {
return err // Pipeline stops
}

All stages should respect context cancellation:

select {
case output <- elem:
// Success
case <-ctx.Done():
return ctx.Err() // Pipeline cancelled
}

The pipeline emits events for observability:

// Automatic events for all stages
EventStageStarted // When stage begins processing
EventStageCompleted // When stage finishes successfully
EventStageFailed // When stage encounters an error
// Pipeline lifecycle
EventPipelineStarted
EventPipelineCompleted
EventPipelineFailed

These events are automatically emitted by the pipeline — stage authors don’t need to emit them manually.

RecordingStage is a special observe-only stage that publishes content-carrying events to the EventBus as elements flow through the pipeline:

stage.NewRecordingStage(eventBus, stage.RecordingStageConfig{
Position: "output", // "input" for user messages, "output" for assistant
SessionID: sessionID,
ConversationID: conversationID,
})

It records different element types as events:

  • Text / MessageEventMessageCreated (with tool calls and results if present)
  • Audio / Image / Video → corresponding multimodal events with metadata
  • ToolCallEventToolCallStarted
  • ErrorEventStreamInterrupted

These events can be consumed by any EventBus listener, including:

  • FileEventStore for JSONL persistence and replay
  • EventBusEvalListener for automatic eval execution on message.created events
  • Prometheus metrics listener for operational monitoring

See Observability for EventBus architecture and Eval Framework for how recorded events trigger evals.

ScenarioTargetNotes
Audio chunk → VAD< 10msMinimal buffering
Turn complete → LLM request< 50msChannel hop only
LLM token → TTS chunk< 50msParallel processing
Channel hop overhead~1-2msPer stage
  • Channel buffers control memory usage
  • No full response accumulation needed for streaming
  • Element pooling available for high-throughput scenarios
  • Concurrent stage execution
  • Backpressure prevents unbounded growth
  • Priority scheduling for QoS

Decision: Use Go channels for inter-stage communication

Rationale:

  • Natural fit for Go’s concurrency model
  • Built-in backpressure
  • Easy to reason about
  • Standard error propagation via context

Decision: Each stage runs in exactly one goroutine

Rationale:

  • Clear ownership of lifecycle
  • Predictable resource usage
  • Simple debugging (goroutine per stage)
  • Easy to add metrics/tracing

Decision: Stages must close their output channel when done

Rationale:

  • Signal completion to downstream stages
  • Enable for range iteration
  • Prevent goroutine leaks
  • Clear shutdown semantics

The stage-based pipeline architecture provides:

  • True Streaming: Elements flow as they’re produced
  • Concurrency: Each stage runs independently
  • Backpressure: Slow consumers naturally throttle producers
  • Composability: Build complex pipelines from simple stages
  • Observability: Automatic events for all stages
  • Type Safety: Strongly typed elements with clear contracts