Pipeline Reference
The Pipeline component chains stages together for streaming LLM execution.
Overview
The Pipeline executes stages concurrently via goroutines, passing StreamElement objects through channels. It supports:
- Streaming execution: True streaming with elements flowing as produced
- Concurrent stages: Each stage runs in its own goroutine
- Backpressure: Channel-based flow control
- Graceful shutdown: Wait for all stages to complete
- Multiple modes: Text, VAD, and ASM pipeline configurations
Core Types
StreamElement
type StreamElement struct {
// Content (at most one is typically set)
Text *string
Audio *AudioData
Video *VideoData
Image *ImageData
Message *types.Message
ToolCall *types.ToolCall
Parts []types.ContentPart
// Metadata for inter-stage communication
Metadata map[string]interface{}
// Control and priority
Priority Priority // Low, Normal, High, Critical
Error error
Timestamp time.Time
}
The fundamental unit of data flowing through the pipeline.
AudioData
type AudioData struct {
Samples []byte
SampleRate int
Channels int
Format AudioFormat // AudioFormatPCM16, AudioFormatPCM32, etc.
}
Audio content for speech stages.
Stage
type Stage interface {
Name() string
Type() StageType
Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error
}
Interface for all pipeline stages.
StageType
type StageType int
const (
StageTypeTransform StageType = iota // 1:1 or 1:N transformation
StageTypeAccumulate // N:1 accumulation
StageTypeGenerate // 0:N generation
StageTypeSink // N:0 terminal
StageTypeBidirectional // Full duplex
)
Classification of stage behavior.
Priority
type Priority int
const (
PriorityLow Priority = iota
PriorityNormal
PriorityHigh
PriorityCritical
)
Element priority for QoS-aware scheduling.
StreamPipeline
type StreamPipeline struct {
stages []Stage
config *PipelineConfig
}
Executable pipeline of connected stages.
PipelineConfig
type PipelineConfig struct {
ChannelBufferSize int // Default: 16
ExecutionTimeout time.Duration // Default: 30s
GracefulShutdownTimeout time.Duration // Default: 10s
PriorityQueue bool // Enable priority scheduling
Metrics bool // Enable per-stage metrics
Tracing bool // Enable distributed tracing
}
Pipeline runtime configuration.
Constructor Functions
NewPipelineBuilder
func NewPipelineBuilder() *PipelineBuilder
Creates a new pipeline builder with default configuration.
Example:
pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewProviderStage(provider, nil, nil, config),
).
Build()
NewPipelineBuilderWithConfig
func NewPipelineBuilderWithConfig(config *PipelineConfig) *PipelineBuilder
Creates a pipeline builder with custom configuration.
Example:
config := stage.DefaultPipelineConfig().
WithChannelBufferSize(32).
WithMetrics(true)
pipeline := stage.NewPipelineBuilderWithConfig(config).
Chain(stages...).
Build()
DefaultPipelineConfig
func DefaultPipelineConfig() *PipelineConfig
Returns default configuration.
Example:
config := stage.DefaultPipelineConfig()
config.ChannelBufferSize = 32 // Override defaults
PipelineConfig Methods
WithChannelBufferSize
func (c *PipelineConfig) WithChannelBufferSize(size int) *PipelineConfig
Sets the buffer size for inter-stage channels.
WithExecutionTimeout
func (c *PipelineConfig) WithExecutionTimeout(timeout time.Duration) *PipelineConfig
Sets maximum execution time per pipeline run.
WithGracefulShutdownTimeout
func (c *PipelineConfig) WithGracefulShutdownTimeout(timeout time.Duration) *PipelineConfig
Sets timeout for graceful shutdown.
WithPriorityQueue
func (c *PipelineConfig) WithPriorityQueue(enabled bool) *PipelineConfig
Enables priority-based element scheduling.
WithMetrics
func (c *PipelineConfig) WithMetrics(enabled bool) *PipelineConfig
Enables per-stage metrics collection.
WithTracing
func (c *PipelineConfig) WithTracing(enabled bool) *PipelineConfig
Enables distributed tracing support.
PipelineBuilder Methods
Chain
func (b *PipelineBuilder) Chain(stages ...Stage) *PipelineBuilder
Adds stages in sequence.
Example:
pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewStateStoreLoadStage(stateConfig),
stage.NewPromptAssemblyStage(registry, task, vars),
stage.NewProviderStage(provider, tools, policy, config),
stage.NewStateStoreSaveStage(stateConfig),
).
Build()
Branch
func (b *PipelineBuilder) Branch(from string, to ...string) *PipelineBuilder
Creates branching from one stage to multiple destinations.
Example:
pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewProviderStage(provider, tools, policy, config),
).
Branch("provider", "tts", "logger").
Build()
Build
func (b *PipelineBuilder) Build() *StreamPipeline
Constructs the final pipeline. Validates DAG structure.
StreamPipeline Methods
Execute
func (p *StreamPipeline) Execute(ctx context.Context, input <-chan StreamElement) (<-chan StreamElement, error)
Executes pipeline in streaming mode.
Parameters:
ctx: Context for cancellation/timeoutinput: Channel of input elements
Returns: Channel of output elements
Example:
// Create input
input := make(chan stage.StreamElement, 1)
msg := types.Message{Role: "user"}
msg.AddTextPart("Hello!")
input <- stage.NewMessageElement(msg)
close(input)
// Execute
output, err := pipeline.Execute(ctx, input)
if err != nil {
return err
}
// Process output
for elem := range output {
if elem.Text != nil {
fmt.Print(*elem.Text)
}
if elem.Error != nil {
log.Printf("Error: %v", elem.Error)
}
}
ExecuteSync
func (p *StreamPipeline) ExecuteSync(ctx context.Context, elements ...StreamElement) (*ExecutionResult, error)
Executes pipeline synchronously, collecting all output.
Parameters:
ctx: Context for cancellation/timeoutelements: Input elements
Returns: Collected execution result
Example:
msg := types.Message{Role: "user"}
msg.AddTextPart("What is 2+2?")
elem := stage.NewMessageElement(msg)
result, err := pipeline.ExecuteSync(ctx, elem)
if err != nil {
return err
}
fmt.Printf("Response: %s\n", result.Response.GetTextContent())
Shutdown
func (p *StreamPipeline) Shutdown(timeout time.Duration) error
Gracefully shuts down pipeline.
Parameters:
timeout: Maximum time to wait for stages to complete
Returns: Error if shutdown times out
Example:
defer pipeline.Shutdown(10 * time.Second)
StreamElement Constructor Functions
NewTextElement
func NewTextElement(text string) StreamElement
Creates element with text content.
NewMessageElement
func NewMessageElement(msg types.Message) StreamElement
Creates element with message content.
NewAudioElement
func NewAudioElement(audio *AudioData) StreamElement
Creates element with audio content.
NewErrorElement
func NewErrorElement(err error) StreamElement
Creates element representing an error.
NewElementWithMetadata
func NewElementWithMetadata(metadata map[string]interface{}) StreamElement
Creates element with only metadata.
Built-In Stages
Core Stages
StateStoreLoadStage
func NewStateStoreLoadStage(config *StateStoreConfig) *StateStoreLoadStage
Loads conversation history from persistent storage.
Configuration:
config := &pipeline.StateStoreConfig{
Store: stateStore,
ConversationID: "session-123",
}
StateStoreSaveStage
func NewStateStoreSaveStage(config *StateStoreConfig) *StateStoreSaveStage
Saves conversation state after processing.
PromptAssemblyStage
func NewPromptAssemblyStage(registry *prompt.Registry, taskType string, variables map[string]string) *PromptAssemblyStage
Loads and assembles prompts from registry.
Sets metadata:
system_prompt: Assembled system promptallowed_tools: Tools allowed for this promptvalidators: Validator configurations
TemplateStage
func NewTemplateStage() *TemplateStage
Processes {{variable}} substitution in content.
ValidationStage
func NewValidationStage(registry *validators.Registry, config *ValidationConfig) *ValidationStage
Validates content against registered validators.
Configuration:
config := &stage.ValidationConfig{
FailOnError: true,
SuppressErrors: false,
}
ProviderStage
func NewProviderStage(
provider providers.Provider,
toolRegistry *tools.Registry,
toolPolicy *ToolPolicy,
config *ProviderConfig,
) *ProviderStage
Executes LLM calls with streaming and tool support.
Configuration:
config := &stage.ProviderConfig{
MaxTokens: 1500,
Temperature: 0.7,
Seed: &seed,
}
Tool Policy:
policy := &pipeline.ToolPolicy{
BlockedTools: []string{"dangerous_tool"},
ToolChoice: "auto", // "auto", "none", "required", or specific tool name
}
Streaming/Speech Stages
AudioTurnStage
func NewAudioTurnStage(config AudioTurnConfig) (*AudioTurnStage, error)
VAD-based turn detection and audio accumulation.
Configuration:
config := stage.AudioTurnConfig{
SilenceDuration: 800 * time.Millisecond,
MinSpeechDuration: 200 * time.Millisecond,
MaxTurnDuration: 30 * time.Second,
SampleRate: 16000,
}
STTStage
func NewSTTStage(service stt.Service, config STTStageConfig) *STTStage
Speech-to-text transcription.
Configuration:
config := stage.STTStageConfig{
Language: "en",
SkipEmpty: true,
MinAudioBytes: 1600, // 50ms at 16kHz
}
TTSStage
func NewTTSStage(service tts.Service, config TTSConfig) *TTSStage
Text-to-speech synthesis.
Configuration:
config := stage.TTSConfig{
Voice: "alloy",
Speed: 1.0,
}
TTSStageWithInterruption
func NewTTSStageWithInterruption(
service tts.Service,
handler *audio.InterruptionHandler,
config TTSConfig,
) *TTSStageWithInterruption
TTS with barge-in/interruption support.
DuplexProviderStage
func NewDuplexProviderStage(session providers.StreamInputSession) *DuplexProviderStage
Bidirectional WebSocket streaming for native audio LLMs.
VADAccumulatorStage
func NewVADAccumulatorStage(config VADConfig) *VADAccumulatorStage
Audio buffering with voice activity detection.
Advanced Stages
RouterStage
func NewRouterStage(routeFunc func(StreamElement) string, outputs map[string]chan<- StreamElement) *RouterStage
Dynamic routing based on element content.
MergeStage
func NewMergeStage(inputs ...<-chan StreamElement) *MergeStage
Combines multiple input streams.
MetricsStage
func NewMetricsStage(inner Stage) *MetricsStage
Wraps a stage to collect performance metrics.
Metrics collected:
- Latency (min/max/avg)
- Throughput (elements/sec)
- Error count
TracingStage
func NewTracingStage(inner Stage, tracer Tracer) *TracingStage
Adds distributed tracing to a stage.
Utility Stages
DebugStage
func NewDebugStage(output io.Writer) *DebugStage
Logs all elements as JSON for debugging.
VariableProviderStage
func NewVariableProviderStage(providers []variables.Provider) *VariableProviderStage
Resolves variables from multiple sources.
MediaExternalizerStage
func NewMediaExternalizerStage(storage MediaStorage, config MediaExternalizerConfig) *MediaExternalizerStage
Uploads large media to external storage.
ContextBuilderStage
func NewContextBuilderStage(config ContextBuilderConfig) *ContextBuilderStage
Manages token budget with truncation strategies.
Configuration:
config := stage.ContextBuilderConfig{
TokenBudget: 4000,
TruncationStrategy: "sliding_window", // or "summarize"
}
BaseStage
Base implementation for custom stages:
type BaseStage struct {
name string
stageType StageType
}
func NewBaseStage(name string, stageType StageType) BaseStage
func (s *BaseStage) Name() string
func (s *BaseStage) Type() StageType
Example custom stage:
type MyStage struct {
stage.BaseStage
config MyConfig
}
func NewMyStage(config MyConfig) *MyStage {
return &MyStage{
BaseStage: stage.NewBaseStage("my_stage", stage.StageTypeTransform),
config: config,
}
}
func (s *MyStage) Process(ctx context.Context, input <-chan stage.StreamElement, output chan<- stage.StreamElement) error {
defer close(output)
for elem := range input {
// Transform element
if elem.Text != nil {
upper := strings.ToUpper(*elem.Text)
elem.Text = &upper
}
select {
case output <- elem:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
Error Handling
Error Elements
Send errors as elements for downstream handling:
if err := validate(elem); err != nil {
output <- stage.NewErrorElement(err)
continue
}
Fatal Errors
Return error to stop the pipeline:
if err := criticalOperation(); err != nil {
return err // Pipeline stops
}
Context Cancellation
Always check for cancellation:
select {
case output <- elem:
case <-ctx.Done():
return ctx.Err()
}
Metadata Keys
Standard metadata keys used by built-in stages:
| Key | Type | Set By | Used By |
|---|---|---|---|
system_prompt | string | PromptAssemblyStage | ProviderStage |
allowed_tools | []string | PromptAssemblyStage | ProviderStage |
validators | []ValidatorConfig | PromptAssemblyStage | ValidationStage |
variables | map[string]string | VariableProviderStage | TemplateStage |
conversation_id | string | StateStoreLoadStage | StateStoreSaveStage |
from_history | bool | StateStoreLoadStage | - |
validation_results | []ValidationResult | ValidationStage | - |
cost_info | types.CostInfo | ProviderStage | - |
latency_ms | int64 | ProviderStage | - |
Configuration Tuning
Channel Buffer Size
| Use Case | Recommended | Notes |
|---|---|---|
| Low latency | 4-8 | Minimize buffering |
| High throughput | 32-64 | Allow producer ahead |
| Memory constrained | 8-16 | Balance |
Timeout Settings
| Pipeline Type | Execution Timeout | Shutdown Timeout |
|---|---|---|
| Simple chat | 30s | 5s |
| Tool-heavy | 120s | 30s |
| Voice (VAD) | 300s | 10s |
| Voice (ASM) | 600s | 15s |
Examples
Text Pipeline
pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewStateStoreLoadStage(stateConfig),
stage.NewPromptAssemblyStage(registry, "chat", vars),
stage.NewTemplateStage(),
stage.NewProviderStage(provider, tools, policy, config),
stage.NewStateStoreSaveStage(stateConfig),
).
Build()
// Execute
msg := types.Message{Role: "user"}
msg.AddTextPart("Hello!")
input := make(chan stage.StreamElement, 1)
input <- stage.NewMessageElement(msg)
close(input)
output, _ := pipeline.Execute(ctx, input)
for elem := range output {
if elem.Text != nil {
fmt.Print(*elem.Text)
}
}
VAD Voice Pipeline
pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewAudioTurnStage(vadConfig),
stage.NewSTTStage(sttService, sttConfig),
stage.NewStateStoreLoadStage(stateConfig),
stage.NewPromptAssemblyStage(registry, task, vars),
stage.NewProviderStage(provider, tools, policy, providerConfig),
stage.NewStateStoreSaveStage(stateConfig),
stage.NewTTSStageWithInterruption(ttsService, handler, ttsConfig),
).
Build()
// Feed audio chunks
for audioChunk := range microphoneStream {
input <- stage.NewAudioElement(&stage.AudioData{
Samples: audioChunk,
SampleRate: 16000,
Format: stage.AudioFormatPCM16,
})
}
ASM Duplex Pipeline
session, _ := gemini.NewStreamSession(ctx, endpoint, apiKey, streamConfig)
pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewStateStoreLoadStage(stateConfig),
stage.NewPromptAssemblyStage(registry, task, vars),
stage.NewDuplexProviderStage(session),
stage.NewStateStoreSaveStage(stateConfig),
).
Build()
Best Practices
- Always close output channels: Use
defer close(output)at start of Process - Check context cancellation: Use select with
ctx.Done() - Use metadata for state: Pass data between stages via metadata
- Handle errors gracefully: Decide between error elements and fatal returns
- Order stages correctly: State load → Prompt assembly → Provider → State save
- Clean up resources: Use
defer pipeline.Shutdown(timeout)
See Also
- Stage Design - Stage architecture
- Pipeline Architecture - How pipelines work
- Configure Pipeline - Configuration guide
- Provider Reference - LLM providers
- Types Reference - Data structures