Pipeline
The Pipeline component chains stages together for streaming LLM execution.
Overview
Section titled “Overview”The Pipeline executes stages concurrently via goroutines, passing StreamElement objects through channels. It supports:
- Streaming execution: True streaming with elements flowing as produced
- Concurrent stages: Each stage runs in its own goroutine
- Backpressure: Channel-based flow control
- Graceful shutdown: Wait for all stages to complete
- Multiple modes: Text, VAD, and ASM pipeline configurations
Core Types
Section titled “Core Types”StreamElement
Section titled “StreamElement”type StreamElement struct { // Content (at most one is typically set) Text *string Audio *AudioData Video *VideoData Image *ImageData Message *types.Message ToolCall *types.ToolCall Parts []types.ContentPart
// Metadata for inter-stage communication Metadata map[string]interface{}
// Control and priority Priority Priority // Low, Normal, High, Critical Error error Timestamp time.Time}The fundamental unit of data flowing through the pipeline.
AudioData
Section titled “AudioData”type AudioData struct { Samples []byte SampleRate int Channels int Format AudioFormat // AudioFormatPCM16, AudioFormatPCM32, etc.}Audio content for speech stages.
type Stage interface { Name() string Type() StageType Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error}Interface for all pipeline stages.
StageType
Section titled “StageType”type StageType int
const ( StageTypeTransform StageType = iota // 1:1 or 1:N transformation StageTypeAccumulate // N:1 accumulation StageTypeGenerate // 0:N generation StageTypeSink // N:0 terminal StageTypeBidirectional // Full duplex)Classification of stage behavior.
Priority
Section titled “Priority”type Priority int
const ( PriorityLow Priority = iota PriorityNormal PriorityHigh PriorityCritical)Element priority for QoS-aware scheduling.
StreamPipeline
Section titled “StreamPipeline”type StreamPipeline struct { stages []Stage config *PipelineConfig}Executable pipeline of connected stages.
PipelineConfig
Section titled “PipelineConfig”type PipelineConfig struct { ChannelBufferSize int // Default: 16 ExecutionTimeout time.Duration // Default: 30s GracefulShutdownTimeout time.Duration // Default: 10s PriorityQueue bool // Enable priority scheduling Metrics bool // Enable per-stage metrics Tracing bool // Enable distributed tracing}Pipeline runtime configuration.
Constructor Functions
Section titled “Constructor Functions”NewPipelineBuilder
Section titled “NewPipelineBuilder”func NewPipelineBuilder() *PipelineBuilderCreates a new pipeline builder with default configuration.
Example:
pipeline := stage.NewPipelineBuilder(). Chain( stage.NewProviderStage(provider, nil, nil, config), ). Build()NewPipelineBuilderWithConfig
Section titled “NewPipelineBuilderWithConfig”func NewPipelineBuilderWithConfig(config *PipelineConfig) *PipelineBuilderCreates a pipeline builder with custom configuration.
Example:
config := stage.DefaultPipelineConfig(). WithChannelBufferSize(32). WithMetrics(true)
pipeline := stage.NewPipelineBuilderWithConfig(config). Chain(stages...). Build()DefaultPipelineConfig
Section titled “DefaultPipelineConfig”func DefaultPipelineConfig() *PipelineConfigReturns default configuration.
Example:
config := stage.DefaultPipelineConfig()config.ChannelBufferSize = 32 // Override defaultsPipelineConfig Methods
Section titled “PipelineConfig Methods”WithChannelBufferSize
Section titled “WithChannelBufferSize”func (c *PipelineConfig) WithChannelBufferSize(size int) *PipelineConfigSets the buffer size for inter-stage channels.
WithExecutionTimeout
Section titled “WithExecutionTimeout”func (c *PipelineConfig) WithExecutionTimeout(timeout time.Duration) *PipelineConfigSets maximum execution time per pipeline run.
WithGracefulShutdownTimeout
Section titled “WithGracefulShutdownTimeout”func (c *PipelineConfig) WithGracefulShutdownTimeout(timeout time.Duration) *PipelineConfigSets timeout for graceful shutdown.
WithPriorityQueue
Section titled “WithPriorityQueue”func (c *PipelineConfig) WithPriorityQueue(enabled bool) *PipelineConfigEnables priority-based element scheduling.
WithMetrics
Section titled “WithMetrics”func (c *PipelineConfig) WithMetrics(enabled bool) *PipelineConfigEnables per-stage metrics collection.
WithTracing
Section titled “WithTracing”func (c *PipelineConfig) WithTracing(enabled bool) *PipelineConfigEnables distributed tracing support.
PipelineBuilder Methods
Section titled “PipelineBuilder Methods”func (b *PipelineBuilder) Chain(stages ...Stage) *PipelineBuilderAdds stages in sequence.
Example:
pipeline := stage.NewPipelineBuilder(). Chain( stage.NewStateStoreLoadStage(stateConfig), stage.NewPromptAssemblyStage(registry, task, vars), stage.NewProviderStage(provider, tools, policy, config), stage.NewStateStoreSaveStage(stateConfig), ). Build()Branch
Section titled “Branch”func (b *PipelineBuilder) Branch(from string, to ...string) *PipelineBuilderCreates branching from one stage to multiple destinations.
Example:
pipeline := stage.NewPipelineBuilder(). Chain( stage.NewProviderStage(provider, tools, policy, config), ). Branch("provider", "tts", "logger"). Build()func (b *PipelineBuilder) Build() *StreamPipelineConstructs the final pipeline. Validates DAG structure.
StreamPipeline Methods
Section titled “StreamPipeline Methods”Execute
Section titled “Execute”func (p *StreamPipeline) Execute(ctx context.Context, input <-chan StreamElement) (<-chan StreamElement, error)Executes pipeline in streaming mode.
Parameters:
ctx: Context for cancellation/timeoutinput: Channel of input elements
Returns: Channel of output elements
Example:
// Create inputinput := make(chan stage.StreamElement, 1)msg := types.Message{Role: "user"}msg.AddTextPart("Hello!")input <- stage.NewMessageElement(msg)close(input)
// Executeoutput, err := pipeline.Execute(ctx, input)if err != nil { return err}
// Process outputfor elem := range output { if elem.Text != nil { fmt.Print(*elem.Text) } if elem.Error != nil { log.Printf("Error: %v", elem.Error) }}ExecuteSync
Section titled “ExecuteSync”func (p *StreamPipeline) ExecuteSync(ctx context.Context, elements ...StreamElement) (*ExecutionResult, error)Executes pipeline synchronously, collecting all output.
Parameters:
ctx: Context for cancellation/timeoutelements: Input elements
Returns: Collected execution result
Example:
msg := types.Message{Role: "user"}msg.AddTextPart("What is 2+2?")elem := stage.NewMessageElement(msg)
result, err := pipeline.ExecuteSync(ctx, elem)if err != nil { return err}
fmt.Printf("Response: %s\n", result.Response.GetTextContent())Shutdown
Section titled “Shutdown”func (p *StreamPipeline) Shutdown(timeout time.Duration) errorGracefully shuts down pipeline.
Parameters:
timeout: Maximum time to wait for stages to complete
Returns: Error if shutdown times out
Example:
defer pipeline.Shutdown(10 * time.Second)StreamElement Constructor Functions
Section titled “StreamElement Constructor Functions”NewTextElement
Section titled “NewTextElement”func NewTextElement(text string) StreamElementCreates element with text content.
NewMessageElement
Section titled “NewMessageElement”func NewMessageElement(msg types.Message) StreamElementCreates element with message content.
NewAudioElement
Section titled “NewAudioElement”func NewAudioElement(audio *AudioData) StreamElementCreates element with audio content.
NewErrorElement
Section titled “NewErrorElement”func NewErrorElement(err error) StreamElementCreates element representing an error.
NewElementWithMetadata
Section titled “NewElementWithMetadata”func NewElementWithMetadata(metadata map[string]interface{}) StreamElementCreates element with only metadata.
Built-In Stages
Section titled “Built-In Stages”Core Stages
Section titled “Core Stages”StateStoreLoadStage
Section titled “StateStoreLoadStage”func NewStateStoreLoadStage(config *StateStoreConfig) *StateStoreLoadStageLoads conversation history from persistent storage.
Configuration:
config := &pipeline.StateStoreConfig{ Store: stateStore, ConversationID: "session-123",}StateStoreSaveStage
Section titled “StateStoreSaveStage”func NewStateStoreSaveStage(config *StateStoreConfig) *StateStoreSaveStageSaves conversation state after processing.
PromptAssemblyStage
Section titled “PromptAssemblyStage”func NewPromptAssemblyStage(registry *prompt.Registry, taskType string, variables map[string]string) *PromptAssemblyStageLoads and assembles prompts from registry.
Sets metadata:
system_prompt: Assembled system promptallowed_tools: Tools allowed for this promptvalidators: Validator configurations
TemplateStage
Section titled “TemplateStage”func NewTemplateStage() *TemplateStageProcesses {{variable}} substitution in content.
ValidationStage
Section titled “ValidationStage”func NewValidationStage(registry *validators.Registry, config *ValidationConfig) *ValidationStageValidates content against registered validators.
Configuration:
config := &stage.ValidationConfig{ FailOnError: true, SuppressErrors: false,}ProviderStage
Section titled “ProviderStage”func NewProviderStage( provider providers.Provider, toolRegistry *tools.Registry, toolPolicy *ToolPolicy, config *ProviderConfig,) *ProviderStageExecutes LLM calls with streaming and tool support.
Configuration:
config := &stage.ProviderConfig{ MaxTokens: 1500, Temperature: 0.7, Seed: &seed,}Tool Policy:
policy := &pipeline.ToolPolicy{ BlockedTools: []string{"dangerous_tool"}, ToolChoice: "auto", // "auto", "none", "required", or specific tool name}Streaming/Speech Stages
Section titled “Streaming/Speech Stages”AudioTurnStage
Section titled “AudioTurnStage”func NewAudioTurnStage(config AudioTurnConfig) (*AudioTurnStage, error)VAD-based turn detection and audio accumulation.
Configuration:
config := stage.AudioTurnConfig{ SilenceDuration: 800 * time.Millisecond, MinSpeechDuration: 200 * time.Millisecond, MaxTurnDuration: 30 * time.Second, SampleRate: 16000,}STTStage
Section titled “STTStage”func NewSTTStage(service stt.Service, config STTStageConfig) *STTStageSpeech-to-text transcription.
Configuration:
config := stage.STTStageConfig{ Language: "en", SkipEmpty: true, MinAudioBytes: 1600, // 50ms at 16kHz}TTSStage
Section titled “TTSStage”func NewTTSStage(service tts.Service, config TTSConfig) *TTSStageText-to-speech synthesis.
Configuration:
config := stage.TTSConfig{ Voice: "alloy", Speed: 1.0,}TTSStageWithInterruption
Section titled “TTSStageWithInterruption”func NewTTSStageWithInterruption( service tts.Service, handler *audio.InterruptionHandler, config TTSConfig,) *TTSStageWithInterruptionTTS with barge-in/interruption support.
DuplexProviderStage
Section titled “DuplexProviderStage”func NewDuplexProviderStage(session providers.StreamInputSession) *DuplexProviderStageBidirectional WebSocket streaming for native audio LLMs.
VADAccumulatorStage
Section titled “VADAccumulatorStage”func NewVADAccumulatorStage(config VADConfig) *VADAccumulatorStageAudio buffering with voice activity detection.
Advanced Stages
Section titled “Advanced Stages”RouterStage
Section titled “RouterStage”func NewRouterStage(routeFunc func(StreamElement) string, outputs map[string]chan<- StreamElement) *RouterStageDynamic routing based on element content.
MergeStage
Section titled “MergeStage”func NewMergeStage(inputs ...<-chan StreamElement) *MergeStageCombines multiple input streams.
MetricsStage
Section titled “MetricsStage”func NewMetricsStage(inner Stage) *MetricsStageWraps a stage to collect performance metrics.
Metrics collected:
- Latency (min/max/avg)
- Throughput (elements/sec)
- Error count
TracingStage
Section titled “TracingStage”func NewTracingStage(inner Stage, tracer Tracer) *TracingStageAdds distributed tracing to a stage.
Utility Stages
Section titled “Utility Stages”DebugStage
Section titled “DebugStage”func NewDebugStage(output io.Writer) *DebugStageLogs all elements as JSON for debugging.
VariableProviderStage
Section titled “VariableProviderStage”func NewVariableProviderStage(providers []variables.Provider) *VariableProviderStageResolves variables from multiple sources.
MediaExternalizerStage
Section titled “MediaExternalizerStage”func NewMediaExternalizerStage(storage MediaStorage, config MediaExternalizerConfig) *MediaExternalizerStageUploads large media to external storage.
ContextBuilderStage
Section titled “ContextBuilderStage”func NewContextBuilderStage(config ContextBuilderConfig) *ContextBuilderStageManages token budget with truncation strategies.
Configuration:
config := stage.ContextBuilderConfig{ TokenBudget: 4000, TruncationStrategy: "sliding_window", // or "summarize"}BaseStage
Section titled “BaseStage”Base implementation for custom stages:
type BaseStage struct { name string stageType StageType}
func NewBaseStage(name string, stageType StageType) BaseStagefunc (s *BaseStage) Name() stringfunc (s *BaseStage) Type() StageTypeExample custom stage:
type MyStage struct { stage.BaseStage config MyConfig}
func NewMyStage(config MyConfig) *MyStage { return &MyStage{ BaseStage: stage.NewBaseStage("my_stage", stage.StageTypeTransform), config: config, }}
func (s *MyStage) Process(ctx context.Context, input <-chan stage.StreamElement, output chan<- stage.StreamElement) error { defer close(output)
for elem := range input { // Transform element if elem.Text != nil { upper := strings.ToUpper(*elem.Text) elem.Text = &upper }
select { case output <- elem: case <-ctx.Done(): return ctx.Err() } } return nil}Error Handling
Section titled “Error Handling”Error Elements
Section titled “Error Elements”Send errors as elements for downstream handling:
if err := validate(elem); err != nil { output <- stage.NewErrorElement(err) continue}Fatal Errors
Section titled “Fatal Errors”Return error to stop the pipeline:
if err := criticalOperation(); err != nil { return err // Pipeline stops}Context Cancellation
Section titled “Context Cancellation”Always check for cancellation:
select {case output <- elem:case <-ctx.Done(): return ctx.Err()}Metadata Keys
Section titled “Metadata Keys”Standard metadata keys used by built-in stages:
| Key | Type | Set By | Used By |
|---|---|---|---|
system_prompt | string | PromptAssemblyStage | ProviderStage |
allowed_tools | []string | PromptAssemblyStage | ProviderStage |
validators | []ValidatorConfig | PromptAssemblyStage | ValidationStage |
variables | map[string]string | VariableProviderStage | TemplateStage |
conversation_id | string | StateStoreLoadStage | StateStoreSaveStage |
from_history | bool | StateStoreLoadStage | - |
validation_results | []ValidationResult | ValidationStage | - |
cost_info | types.CostInfo | ProviderStage | - |
latency_ms | int64 | ProviderStage | - |
Configuration Tuning
Section titled “Configuration Tuning”Channel Buffer Size
Section titled “Channel Buffer Size”| Use Case | Recommended | Notes |
|---|---|---|
| Low latency | 4-8 | Minimize buffering |
| High throughput | 32-64 | Allow producer ahead |
| Memory constrained | 8-16 | Balance |
Timeout Settings
Section titled “Timeout Settings”| Pipeline Type | Execution Timeout | Shutdown Timeout |
|---|---|---|
| Simple chat | 30s | 5s |
| Tool-heavy | 120s | 30s |
| Voice (VAD) | 300s | 10s |
| Voice (ASM) | 600s | 15s |
Examples
Section titled “Examples”Text Pipeline
Section titled “Text Pipeline”pipeline := stage.NewPipelineBuilder(). Chain( stage.NewStateStoreLoadStage(stateConfig), stage.NewPromptAssemblyStage(registry, "chat", vars), stage.NewTemplateStage(), stage.NewProviderStage(provider, tools, policy, config), stage.NewStateStoreSaveStage(stateConfig), ). Build()
// Executemsg := types.Message{Role: "user"}msg.AddTextPart("Hello!")input := make(chan stage.StreamElement, 1)input <- stage.NewMessageElement(msg)close(input)
output, _ := pipeline.Execute(ctx, input)for elem := range output { if elem.Text != nil { fmt.Print(*elem.Text) }}VAD Voice Pipeline
Section titled “VAD Voice Pipeline”pipeline := stage.NewPipelineBuilder(). Chain( stage.NewAudioTurnStage(vadConfig), stage.NewSTTStage(sttService, sttConfig), stage.NewStateStoreLoadStage(stateConfig), stage.NewPromptAssemblyStage(registry, task, vars), stage.NewProviderStage(provider, tools, policy, providerConfig), stage.NewStateStoreSaveStage(stateConfig), stage.NewTTSStageWithInterruption(ttsService, handler, ttsConfig), ). Build()
// Feed audio chunksfor audioChunk := range microphoneStream { input <- stage.NewAudioElement(&stage.AudioData{ Samples: audioChunk, SampleRate: 16000, Format: stage.AudioFormatPCM16, })}ASM Duplex Pipeline
Section titled “ASM Duplex Pipeline”session, _ := gemini.NewStreamSession(ctx, endpoint, apiKey, streamConfig)
pipeline := stage.NewPipelineBuilder(). Chain( stage.NewStateStoreLoadStage(stateConfig), stage.NewPromptAssemblyStage(registry, task, vars), stage.NewDuplexProviderStage(session), stage.NewStateStoreSaveStage(stateConfig), ). Build()Best Practices
Section titled “Best Practices”- Always close output channels: Use
defer close(output)at start of Process - Check context cancellation: Use select with
ctx.Done() - Use metadata for state: Pass data between stages via metadata
- Handle errors gracefully: Decide between error elements and fatal returns
- Order stages correctly: State load → Prompt assembly → Provider → State save
- Clean up resources: Use
defer pipeline.Shutdown(timeout)
See Also
Section titled “See Also”- Stage Design - Stage architecture
- Pipeline Architecture - How pipelines work
- Configure Pipeline - Configuration guide
- Provider Reference - LLM providers
- Types Reference - Data structures