Pipeline Architecture

Understanding Runtime’s stage-based streaming pipeline design.

Overview

Runtime uses a stage-based streaming architecture for processing LLM requests. This design provides true streaming execution, concurrent processing, and a composable DAG (Directed Acyclic Graph) of processing units.

Core Concept

A pipeline is a DAG of stages that process streaming elements:

Input → [Stage 1] → [Stage 2] → [Stage N] → Output
            │            │
            ▼            ▼
        [Branch A]   [Branch B]

Each stage:

Why Stages?

Streaming First

The stage architecture is designed for streaming scenarios like voice applications:

  1. Streaming input: Audio chunks from microphone
  2. Accumulation: VAD detects turn boundaries
  3. Processing: Transcribe, call LLM, generate TTS
  4. Streaming output: Audio chunks to speaker

Stages model this as a reactive stream where data flows through connected processing units:

pipeline := stage.NewPipelineBuilder().
    Chain(
        stage.NewAudioTurnStage(vadConfig),      // Accumulate until turn complete
        stage.NewSTTStage(sttService, sttConfig), // Transcribe audio
        stage.NewProviderStage(provider, tools, policy, config), // Call LLM
        stage.NewTTSStageWithInterruption(ttsService, handler, ttsConfig), // Generate audio
    ).
    Build()

StreamElement

The fundamental unit of data flowing through the pipeline:

type StreamElement struct {
    // Content types (at most one is set)
    Text      *string
    Audio     *AudioData
    Video     *VideoData
    Image     *ImageData
    Message   *types.Message
    ToolCall  *types.ToolCall
    Parts     []types.ContentPart

    // Metadata for inter-stage communication
    Metadata  map[string]interface{}

    // Control and observability
    Priority  Priority  // Low, Normal, High, Critical
    Error     error
    Timestamp time.Time
}

Key insight: Each element carries one content type, enabling type-safe routing and priority scheduling.

Stage Interface

All stages implement:

type Stage interface {
    Name() string
    Type() StageType
    Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error
}

Stage Types

TypePatternExample
Transform1:1 or 1:NValidation, enrichment
AccumulateN:1VAD buffering, message collection
Generate0:NLLM streaming, TTS
SinkN:0State store save, metrics
BidirectionalVariesWebSocket session

Contract

Every stage must:

  1. Read from input channel until closed
  2. Send results to output channel
  3. Close output channel when done
  4. Respect context cancellation
func (s *MyStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
    defer close(output)  // Always close output

    for elem := range input {
        // Process element
        result := s.transform(elem)

        // Send with cancellation check
        select {
        case output <- result:
        case <-ctx.Done():
            return ctx.Err()
        }
    }
    return nil
}

Pipeline Execution

Streaming Execution

Elements flow through the pipeline as they’re produced:

output, err := pipeline.Execute(ctx, input)
if err != nil {
    return err
}

for elem := range output {
    // Process elements as they arrive
    if elem.Text != nil {
        fmt.Print(*elem.Text)
    }
}

Synchronous Execution

For request/response patterns, ExecuteSync collects all output:

result, err := pipeline.ExecuteSync(ctx, inputElements...)
// result.Messages contains all messages
// result.Response contains the final response

This is just Execute() + drain and accumulate.

Execution Modes

Text Mode (Request/Response)

Standard HTTP-based LLM interactions:

Message → StateStoreLoad → PromptAssembly → Template → Provider → Validation → StateStoreSave → Response

Use cases: Chat applications, content generation

VAD Mode (Voice Activity Detection)

For voice applications using text-based LLMs:

Audio → AudioTurn → STT → StateStoreLoad → PromptAssembly → Template → Provider → TTS → StateStoreSave → Audio

Use cases: Voice assistants, telephony integrations

ASM Mode (Audio Streaming)

For native multimodal LLMs with real-time audio:

Audio/Text → StateStoreLoad → PromptAssembly → Template → DuplexProvider → StateStoreSave → Audio/Text

Use cases: Gemini Live API, real-time voice conversations

Concurrency Model

Goroutine Lifecycle

Each stage runs in its own goroutine, managed by the pipeline:

func (p *Pipeline) Execute(ctx context.Context, input <-chan StreamElement) (<-chan StreamElement, error) {
    ctx, cancel := context.WithCancel(ctx)

    // Start each stage in its own goroutine
    current := input
    for _, stg := range p.stages {
        output := make(chan StreamElement, p.config.ChannelBufferSize)

        go func(s Stage, in <-chan StreamElement, out chan<- StreamElement) {
            if err := s.Process(ctx, in, out); err != nil {
                // Error handling
            }
        }(stg, current, output)

        current = output
    }

    return current, nil
}

Backpressure

Channel-based communication naturally handles backpressure:

Shutdown

Graceful shutdown propagates through the pipeline:

func (p *Pipeline) Shutdown(timeout time.Duration) error {
    p.cancel()  // Cancel context

    // Wait for all stages to complete
    done := make(chan struct{})
    go func() {
        p.wg.Wait()
        close(done)
    }()

    select {
    case <-done:
        return nil
    case <-time.After(timeout):
        return ErrShutdownTimeout
    }
}

Pipeline Builder

The builder constructs pipelines with a fluent API:

pipeline := stage.NewPipelineBuilder().
    Chain(
        stage.NewStateStoreLoadStage(stateConfig),
        stage.NewPromptAssemblyStage(registry, taskType, vars),
        stage.NewProviderStage(provider, tools, policy, config),
        stage.NewStateStoreSaveStage(stateConfig),
    ).
    Build()

Branching

For parallel processing paths:

pipeline := stage.NewPipelineBuilder().
    Chain(
        stage.NewProviderStage(provider, tools, policy, config),
    ).
    Branch("provider", "tts", "text_output").  // Fork output
    Chain(
        stage.NewTTSStage(ttsService, config),
    ).
    Build()

DAG Validation

The builder validates the pipeline structure:

Error Handling

Error Elements

Errors can be sent as elements for downstream handling:

if err := s.validate(elem); err != nil {
    output <- stage.NewErrorElement(err)
    continue  // Process next element
}

Fatal Errors

Returning an error from Process() stops the pipeline:

if err := s.criticalOperation(elem); err != nil {
    return err  // Pipeline stops
}

Context Cancellation

All stages should respect context cancellation:

select {
case output <- elem:
    // Success
case <-ctx.Done():
    return ctx.Err()  // Pipeline cancelled
}

Event Integration

The pipeline emits events for observability:

// Automatic events for all stages
EventStageStarted    // When stage begins processing
EventStageCompleted  // When stage finishes successfully
EventStageFailed     // When stage encounters an error

// Pipeline lifecycle
EventPipelineStarted
EventPipelineCompleted
EventPipelineFailed

These events are automatically emitted by the pipeline - stage authors don’t need to emit them manually.

Performance Characteristics

Latency

ScenarioTargetNotes
Audio chunk → VAD< 10msMinimal buffering
Turn complete → LLM request< 50msChannel hop only
LLM token → TTS chunk< 50msParallel processing
Channel hop overhead~1-2msPer stage

Memory

Throughput

Design Decisions

Why Channels Over Callbacks?

Decision: Use Go channels for inter-stage communication

Rationale:

Why One Goroutine Per Stage?

Decision: Each stage runs in exactly one goroutine

Rationale:

Why Close Output Channel?

Decision: Stages must close their output channel when done

Rationale:

Summary

The stage-based pipeline architecture provides: