Pipeline Reference

The Pipeline component chains stages together for streaming LLM execution.

Overview

The Pipeline executes stages concurrently via goroutines, passing StreamElement objects through channels. It supports:

Core Types

StreamElement

type StreamElement struct {
    // Content (at most one is typically set)
    Text      *string
    Audio     *AudioData
    Video     *VideoData
    Image     *ImageData
    Message   *types.Message
    ToolCall  *types.ToolCall
    Parts     []types.ContentPart

    // Metadata for inter-stage communication
    Metadata  map[string]interface{}

    // Control and priority
    Priority  Priority  // Low, Normal, High, Critical
    Error     error
    Timestamp time.Time
}

The fundamental unit of data flowing through the pipeline.

AudioData

type AudioData struct {
    Samples    []byte
    SampleRate int
    Channels   int
    Format     AudioFormat  // AudioFormatPCM16, AudioFormatPCM32, etc.
}

Audio content for speech stages.

Stage

type Stage interface {
    Name() string
    Type() StageType
    Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error
}

Interface for all pipeline stages.

StageType

type StageType int

const (
    StageTypeTransform    StageType = iota  // 1:1 or 1:N transformation
    StageTypeAccumulate                      // N:1 accumulation
    StageTypeGenerate                        // 0:N generation
    StageTypeSink                            // N:0 terminal
    StageTypeBidirectional                   // Full duplex
)

Classification of stage behavior.

Priority

type Priority int

const (
    PriorityLow      Priority = iota
    PriorityNormal
    PriorityHigh
    PriorityCritical
)

Element priority for QoS-aware scheduling.

StreamPipeline

type StreamPipeline struct {
    stages []Stage
    config *PipelineConfig
}

Executable pipeline of connected stages.

PipelineConfig

type PipelineConfig struct {
    ChannelBufferSize       int           // Default: 16
    ExecutionTimeout        time.Duration // Default: 30s
    GracefulShutdownTimeout time.Duration // Default: 10s
    PriorityQueue           bool          // Enable priority scheduling
    Metrics                 bool          // Enable per-stage metrics
    Tracing                 bool          // Enable distributed tracing
}

Pipeline runtime configuration.

Constructor Functions

NewPipelineBuilder

func NewPipelineBuilder() *PipelineBuilder

Creates a new pipeline builder with default configuration.

Example:

pipeline := stage.NewPipelineBuilder().
    Chain(
        stage.NewProviderStage(provider, nil, nil, config),
    ).
    Build()

NewPipelineBuilderWithConfig

func NewPipelineBuilderWithConfig(config *PipelineConfig) *PipelineBuilder

Creates a pipeline builder with custom configuration.

Example:

config := stage.DefaultPipelineConfig().
    WithChannelBufferSize(32).
    WithMetrics(true)

pipeline := stage.NewPipelineBuilderWithConfig(config).
    Chain(stages...).
    Build()

DefaultPipelineConfig

func DefaultPipelineConfig() *PipelineConfig

Returns default configuration.

Example:

config := stage.DefaultPipelineConfig()
config.ChannelBufferSize = 32  // Override defaults

PipelineConfig Methods

WithChannelBufferSize

func (c *PipelineConfig) WithChannelBufferSize(size int) *PipelineConfig

Sets the buffer size for inter-stage channels.

WithExecutionTimeout

func (c *PipelineConfig) WithExecutionTimeout(timeout time.Duration) *PipelineConfig

Sets maximum execution time per pipeline run.

WithGracefulShutdownTimeout

func (c *PipelineConfig) WithGracefulShutdownTimeout(timeout time.Duration) *PipelineConfig

Sets timeout for graceful shutdown.

WithPriorityQueue

func (c *PipelineConfig) WithPriorityQueue(enabled bool) *PipelineConfig

Enables priority-based element scheduling.

WithMetrics

func (c *PipelineConfig) WithMetrics(enabled bool) *PipelineConfig

Enables per-stage metrics collection.

WithTracing

func (c *PipelineConfig) WithTracing(enabled bool) *PipelineConfig

Enables distributed tracing support.

PipelineBuilder Methods

Chain

func (b *PipelineBuilder) Chain(stages ...Stage) *PipelineBuilder

Adds stages in sequence.

Example:

pipeline := stage.NewPipelineBuilder().
    Chain(
        stage.NewStateStoreLoadStage(stateConfig),
        stage.NewPromptAssemblyStage(registry, task, vars),
        stage.NewProviderStage(provider, tools, policy, config),
        stage.NewStateStoreSaveStage(stateConfig),
    ).
    Build()

Branch

func (b *PipelineBuilder) Branch(from string, to ...string) *PipelineBuilder

Creates branching from one stage to multiple destinations.

Example:

pipeline := stage.NewPipelineBuilder().
    Chain(
        stage.NewProviderStage(provider, tools, policy, config),
    ).
    Branch("provider", "tts", "logger").
    Build()

Build

func (b *PipelineBuilder) Build() *StreamPipeline

Constructs the final pipeline. Validates DAG structure.

StreamPipeline Methods

Execute

func (p *StreamPipeline) Execute(ctx context.Context, input <-chan StreamElement) (<-chan StreamElement, error)

Executes pipeline in streaming mode.

Parameters:

Returns: Channel of output elements

Example:

// Create input
input := make(chan stage.StreamElement, 1)
msg := types.Message{Role: "user"}
msg.AddTextPart("Hello!")
input <- stage.NewMessageElement(msg)
close(input)

// Execute
output, err := pipeline.Execute(ctx, input)
if err != nil {
    return err
}

// Process output
for elem := range output {
    if elem.Text != nil {
        fmt.Print(*elem.Text)
    }
    if elem.Error != nil {
        log.Printf("Error: %v", elem.Error)
    }
}

ExecuteSync

func (p *StreamPipeline) ExecuteSync(ctx context.Context, elements ...StreamElement) (*ExecutionResult, error)

Executes pipeline synchronously, collecting all output.

Parameters:

Returns: Collected execution result

Example:

msg := types.Message{Role: "user"}
msg.AddTextPart("What is 2+2?")
elem := stage.NewMessageElement(msg)

result, err := pipeline.ExecuteSync(ctx, elem)
if err != nil {
    return err
}

fmt.Printf("Response: %s\n", result.Response.GetTextContent())

Shutdown

func (p *StreamPipeline) Shutdown(timeout time.Duration) error

Gracefully shuts down pipeline.

Parameters:

Returns: Error if shutdown times out

Example:

defer pipeline.Shutdown(10 * time.Second)

StreamElement Constructor Functions

NewTextElement

func NewTextElement(text string) StreamElement

Creates element with text content.

NewMessageElement

func NewMessageElement(msg types.Message) StreamElement

Creates element with message content.

NewAudioElement

func NewAudioElement(audio *AudioData) StreamElement

Creates element with audio content.

NewErrorElement

func NewErrorElement(err error) StreamElement

Creates element representing an error.

NewElementWithMetadata

func NewElementWithMetadata(metadata map[string]interface{}) StreamElement

Creates element with only metadata.

Built-In Stages

Core Stages

StateStoreLoadStage

func NewStateStoreLoadStage(config *StateStoreConfig) *StateStoreLoadStage

Loads conversation history from persistent storage.

Configuration:

config := &pipeline.StateStoreConfig{
    Store:          stateStore,
    ConversationID: "session-123",
}

StateStoreSaveStage

func NewStateStoreSaveStage(config *StateStoreConfig) *StateStoreSaveStage

Saves conversation state after processing.

PromptAssemblyStage

func NewPromptAssemblyStage(registry *prompt.Registry, taskType string, variables map[string]string) *PromptAssemblyStage

Loads and assembles prompts from registry.

Sets metadata:

TemplateStage

func NewTemplateStage() *TemplateStage

Processes {{variable}} substitution in content.

ValidationStage

func NewValidationStage(registry *validators.Registry, config *ValidationConfig) *ValidationStage

Validates content against registered validators.

Configuration:

config := &stage.ValidationConfig{
    FailOnError:     true,
    SuppressErrors:  false,
}

ProviderStage

func NewProviderStage(
    provider providers.Provider,
    toolRegistry *tools.Registry,
    toolPolicy *ToolPolicy,
    config *ProviderConfig,
) *ProviderStage

Executes LLM calls with streaming and tool support.

Configuration:

config := &stage.ProviderConfig{
    MaxTokens:   1500,
    Temperature: 0.7,
    Seed:        &seed,
}

Tool Policy:

policy := &pipeline.ToolPolicy{
    BlockedTools: []string{"dangerous_tool"},
    ToolChoice:   "auto",  // "auto", "none", "required", or specific tool name
}

Streaming/Speech Stages

AudioTurnStage

func NewAudioTurnStage(config AudioTurnConfig) (*AudioTurnStage, error)

VAD-based turn detection and audio accumulation.

Configuration:

config := stage.AudioTurnConfig{
    SilenceDuration:   800 * time.Millisecond,
    MinSpeechDuration: 200 * time.Millisecond,
    MaxTurnDuration:   30 * time.Second,
    SampleRate:        16000,
}

STTStage

func NewSTTStage(service stt.Service, config STTStageConfig) *STTStage

Speech-to-text transcription.

Configuration:

config := stage.STTStageConfig{
    Language:      "en",
    SkipEmpty:     true,
    MinAudioBytes: 1600,  // 50ms at 16kHz
}

TTSStage

func NewTTSStage(service tts.Service, config TTSConfig) *TTSStage

Text-to-speech synthesis.

Configuration:

config := stage.TTSConfig{
    Voice: "alloy",
    Speed: 1.0,
}

TTSStageWithInterruption

func NewTTSStageWithInterruption(
    service tts.Service,
    handler *audio.InterruptionHandler,
    config TTSConfig,
) *TTSStageWithInterruption

TTS with barge-in/interruption support.

DuplexProviderStage

func NewDuplexProviderStage(session providers.StreamInputSession) *DuplexProviderStage

Bidirectional WebSocket streaming for native audio LLMs.

VADAccumulatorStage

func NewVADAccumulatorStage(config VADConfig) *VADAccumulatorStage

Audio buffering with voice activity detection.

Advanced Stages

RouterStage

func NewRouterStage(routeFunc func(StreamElement) string, outputs map[string]chan<- StreamElement) *RouterStage

Dynamic routing based on element content.

MergeStage

func NewMergeStage(inputs ...<-chan StreamElement) *MergeStage

Combines multiple input streams.

MetricsStage

func NewMetricsStage(inner Stage) *MetricsStage

Wraps a stage to collect performance metrics.

Metrics collected:

TracingStage

func NewTracingStage(inner Stage, tracer Tracer) *TracingStage

Adds distributed tracing to a stage.

Utility Stages

DebugStage

func NewDebugStage(output io.Writer) *DebugStage

Logs all elements as JSON for debugging.

VariableProviderStage

func NewVariableProviderStage(providers []variables.Provider) *VariableProviderStage

Resolves variables from multiple sources.

MediaExternalizerStage

func NewMediaExternalizerStage(storage MediaStorage, config MediaExternalizerConfig) *MediaExternalizerStage

Uploads large media to external storage.

ContextBuilderStage

func NewContextBuilderStage(config ContextBuilderConfig) *ContextBuilderStage

Manages token budget with truncation strategies.

Configuration:

config := stage.ContextBuilderConfig{
    TokenBudget:        4000,
    TruncationStrategy: "sliding_window",  // or "summarize"
}

BaseStage

Base implementation for custom stages:

type BaseStage struct {
    name      string
    stageType StageType
}

func NewBaseStage(name string, stageType StageType) BaseStage
func (s *BaseStage) Name() string
func (s *BaseStage) Type() StageType

Example custom stage:

type MyStage struct {
    stage.BaseStage
    config MyConfig
}

func NewMyStage(config MyConfig) *MyStage {
    return &MyStage{
        BaseStage: stage.NewBaseStage("my_stage", stage.StageTypeTransform),
        config:    config,
    }
}

func (s *MyStage) Process(ctx context.Context, input <-chan stage.StreamElement, output chan<- stage.StreamElement) error {
    defer close(output)

    for elem := range input {
        // Transform element
        if elem.Text != nil {
            upper := strings.ToUpper(*elem.Text)
            elem.Text = &upper
        }

        select {
        case output <- elem:
        case <-ctx.Done():
            return ctx.Err()
        }
    }
    return nil
}

Error Handling

Error Elements

Send errors as elements for downstream handling:

if err := validate(elem); err != nil {
    output <- stage.NewErrorElement(err)
    continue
}

Fatal Errors

Return error to stop the pipeline:

if err := criticalOperation(); err != nil {
    return err  // Pipeline stops
}

Context Cancellation

Always check for cancellation:

select {
case output <- elem:
case <-ctx.Done():
    return ctx.Err()
}

Metadata Keys

Standard metadata keys used by built-in stages:

KeyTypeSet ByUsed By
system_promptstringPromptAssemblyStageProviderStage
allowed_tools[]stringPromptAssemblyStageProviderStage
validators[]ValidatorConfigPromptAssemblyStageValidationStage
variablesmap[string]stringVariableProviderStageTemplateStage
conversation_idstringStateStoreLoadStageStateStoreSaveStage
from_historyboolStateStoreLoadStage-
validation_results[]ValidationResultValidationStage-
cost_infotypes.CostInfoProviderStage-
latency_msint64ProviderStage-

Configuration Tuning

Channel Buffer Size

Use CaseRecommendedNotes
Low latency4-8Minimize buffering
High throughput32-64Allow producer ahead
Memory constrained8-16Balance

Timeout Settings

Pipeline TypeExecution TimeoutShutdown Timeout
Simple chat30s5s
Tool-heavy120s30s
Voice (VAD)300s10s
Voice (ASM)600s15s

Examples

Text Pipeline

pipeline := stage.NewPipelineBuilder().
    Chain(
        stage.NewStateStoreLoadStage(stateConfig),
        stage.NewPromptAssemblyStage(registry, "chat", vars),
        stage.NewTemplateStage(),
        stage.NewProviderStage(provider, tools, policy, config),
        stage.NewStateStoreSaveStage(stateConfig),
    ).
    Build()

// Execute
msg := types.Message{Role: "user"}
msg.AddTextPart("Hello!")
input := make(chan stage.StreamElement, 1)
input <- stage.NewMessageElement(msg)
close(input)

output, _ := pipeline.Execute(ctx, input)
for elem := range output {
    if elem.Text != nil {
        fmt.Print(*elem.Text)
    }
}

VAD Voice Pipeline

pipeline := stage.NewPipelineBuilder().
    Chain(
        stage.NewAudioTurnStage(vadConfig),
        stage.NewSTTStage(sttService, sttConfig),
        stage.NewStateStoreLoadStage(stateConfig),
        stage.NewPromptAssemblyStage(registry, task, vars),
        stage.NewProviderStage(provider, tools, policy, providerConfig),
        stage.NewStateStoreSaveStage(stateConfig),
        stage.NewTTSStageWithInterruption(ttsService, handler, ttsConfig),
    ).
    Build()

// Feed audio chunks
for audioChunk := range microphoneStream {
    input <- stage.NewAudioElement(&stage.AudioData{
        Samples:    audioChunk,
        SampleRate: 16000,
        Format:     stage.AudioFormatPCM16,
    })
}

ASM Duplex Pipeline

session, _ := gemini.NewStreamSession(ctx, endpoint, apiKey, streamConfig)

pipeline := stage.NewPipelineBuilder().
    Chain(
        stage.NewStateStoreLoadStage(stateConfig),
        stage.NewPromptAssemblyStage(registry, task, vars),
        stage.NewDuplexProviderStage(session),
        stage.NewStateStoreSaveStage(stateConfig),
    ).
    Build()

Best Practices

  1. Always close output channels: Use defer close(output) at start of Process
  2. Check context cancellation: Use select with ctx.Done()
  3. Use metadata for state: Pass data between stages via metadata
  4. Handle errors gracefully: Decide between error elements and fatal returns
  5. Order stages correctly: State load → Prompt assembly → Provider → State save
  6. Clean up resources: Use defer pipeline.Shutdown(timeout)

See Also