Skip to content

Pipeline

The Pipeline component chains stages together for streaming LLM execution.

The Pipeline executes stages concurrently via goroutines, passing StreamElement objects through channels. It supports:

  • Streaming execution: True streaming with elements flowing as produced
  • Concurrent stages: Each stage runs in its own goroutine
  • Backpressure: Channel-based flow control
  • Graceful shutdown: Wait for all stages to complete
  • Multiple modes: Text, VAD, and ASM pipeline configurations
type StreamElement struct {
// Content (at most one is typically set)
Text *string
Audio *AudioData
Video *VideoData
Image *ImageData
Message *types.Message
ToolCall *types.ToolCall
Parts []types.ContentPart
// Metadata for inter-stage communication
Metadata map[string]interface{}
// Control and priority
Priority Priority // Low, Normal, High, Critical
Error error
Timestamp time.Time
}

The fundamental unit of data flowing through the pipeline.

type AudioData struct {
Samples []byte
SampleRate int
Channels int
Format AudioFormat // AudioFormatPCM16, AudioFormatPCM32, etc.
}

Audio content for speech stages.

type Stage interface {
Name() string
Type() StageType
Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error
}

Interface for all pipeline stages.

type StageType int
const (
StageTypeTransform StageType = iota // 1:1 or 1:N transformation
StageTypeAccumulate // N:1 accumulation
StageTypeGenerate // 0:N generation
StageTypeSink // N:0 terminal
StageTypeBidirectional // Full duplex
)

Classification of stage behavior.

type Priority int
const (
PriorityLow Priority = iota
PriorityNormal
PriorityHigh
PriorityCritical
)

Element priority for QoS-aware scheduling.

type StreamPipeline struct {
stages []Stage
config *PipelineConfig
}

Executable pipeline of connected stages.

type PipelineConfig struct {
ChannelBufferSize int // Default: 16
ExecutionTimeout time.Duration // Default: 0 (disabled)
IdleTimeout time.Duration // Default: 30s (resets on activity)
GracefulShutdownTimeout time.Duration // Default: 10s
PriorityQueue bool // Enable priority scheduling
Metrics bool // Enable per-stage metrics
Tracing bool // Enable distributed tracing
}

Pipeline runtime configuration.

func NewPipelineBuilder() *PipelineBuilder

Creates a new pipeline builder with default configuration.

Example:

pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewProviderStage(provider, nil, nil, config),
).
Build()
func NewPipelineBuilderWithConfig(config *PipelineConfig) *PipelineBuilder

Creates a pipeline builder with custom configuration.

Example:

config := stage.DefaultPipelineConfig().
WithChannelBufferSize(32).
WithMetrics(true)
pipeline := stage.NewPipelineBuilderWithConfig(config).
Chain(stages...).
Build()
func DefaultPipelineConfig() *PipelineConfig

Returns default configuration.

Example:

config := stage.DefaultPipelineConfig()
config.ChannelBufferSize = 32 // Override defaults
func (c *PipelineConfig) WithChannelBufferSize(size int) *PipelineConfig

Sets the buffer size for inter-stage channels.

func (c *PipelineConfig) WithExecutionTimeout(timeout time.Duration) *PipelineConfig

Sets maximum execution time per pipeline run.

func (c *PipelineConfig) WithGracefulShutdownTimeout(timeout time.Duration) *PipelineConfig

Sets timeout for graceful shutdown.

func (c *PipelineConfig) WithPriorityQueue(enabled bool) *PipelineConfig

Enables priority-based element scheduling.

func (c *PipelineConfig) WithMetrics(enabled bool) *PipelineConfig

Enables per-stage metrics collection.

func (c *PipelineConfig) WithTracing(enabled bool) *PipelineConfig

Enables distributed tracing support.

func (b *PipelineBuilder) Chain(stages ...Stage) *PipelineBuilder

Adds stages in sequence.

Example:

pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewStateStoreLoadStage(stateConfig),
stage.NewPromptAssemblyStage(registry, task, vars),
stage.NewProviderStage(provider, tools, policy, config),
stage.NewStateStoreSaveStage(stateConfig),
).
Build()
func (b *PipelineBuilder) Branch(from string, to ...string) *PipelineBuilder

Creates branching from one stage to multiple destinations.

Example:

pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewProviderStage(provider, tools, policy, config),
).
Branch("provider", "tts", "logger").
Build()
func (b *PipelineBuilder) Build() *StreamPipeline

Constructs the final pipeline. Validates DAG structure.

func (p *StreamPipeline) Execute(ctx context.Context, input <-chan StreamElement) (<-chan StreamElement, error)

Executes pipeline in streaming mode.

Parameters:

  • ctx: Context for cancellation/timeout
  • input: Channel of input elements

Returns: Channel of output elements

Example:

// Create input
input := make(chan stage.StreamElement, 1)
msg := types.Message{Role: "user"}
msg.AddTextPart("Hello!")
input <- stage.NewMessageElement(msg)
close(input)
// Execute
output, err := pipeline.Execute(ctx, input)
if err != nil {
return err
}
// Process output
for elem := range output {
if elem.Text != nil {
fmt.Print(*elem.Text)
}
if elem.Error != nil {
log.Printf("Error: %v", elem.Error)
}
}
func (p *StreamPipeline) ExecuteSync(ctx context.Context, elements ...StreamElement) (*ExecutionResult, error)

Executes pipeline synchronously, collecting all output.

Parameters:

  • ctx: Context for cancellation/timeout
  • elements: Input elements

Returns: Collected execution result

Example:

msg := types.Message{Role: "user"}
msg.AddTextPart("What is 2+2?")
elem := stage.NewMessageElement(msg)
result, err := pipeline.ExecuteSync(ctx, elem)
if err != nil {
return err
}
fmt.Printf("Response: %s\n", result.Response.GetTextContent())
func (p *StreamPipeline) Shutdown(timeout time.Duration) error

Gracefully shuts down pipeline.

Parameters:

  • timeout: Maximum time to wait for stages to complete

Returns: Error if shutdown times out

Example:

defer pipeline.Shutdown(10 * time.Second)
func NewTextElement(text string) StreamElement

Creates element with text content.

func NewMessageElement(msg types.Message) StreamElement

Creates element with message content.

func NewAudioElement(audio *AudioData) StreamElement

Creates element with audio content.

func NewErrorElement(err error) StreamElement

Creates element representing an error.

func NewElementWithMetadata(metadata map[string]interface{}) StreamElement

Creates element with only metadata.

func NewStateStoreLoadStage(config *StateStoreConfig) *StateStoreLoadStage

Loads conversation history from persistent storage.

Configuration:

config := &pipeline.StateStoreConfig{
Store: stateStore,
ConversationID: "session-123",
}
func NewStateStoreSaveStage(config *StateStoreConfig) *StateStoreSaveStage

Saves conversation state after processing.

func NewPromptAssemblyStage(registry *prompt.Registry, taskType string, variables map[string]string) *PromptAssemblyStage

Loads and assembles prompts from registry.

Sets metadata:

  • system_prompt: Assembled system prompt
  • allowed_tools: Tools allowed for this prompt
  • validators: Validator configurations
func NewTemplateStage() *TemplateStage

Processes {{variable}} substitution in content.

func NewValidationStage(registry *validators.Registry, config *ValidationConfig) *ValidationStage

Validates content against registered validators.

Configuration:

config := &stage.ValidationConfig{
FailOnError: true,
SuppressErrors: false,
}
func NewProviderStage(
provider providers.Provider,
toolRegistry *tools.Registry,
toolPolicy *ToolPolicy,
config *ProviderConfig,
) *ProviderStage

Executes LLM calls with streaming and tool support.

Configuration:

config := &stage.ProviderConfig{
MaxTokens: 1500,
Temperature: 0.7,
Seed: &seed,
}

Tool Policy:

policy := &pipeline.ToolPolicy{
ToolChoice: "auto", // "auto", "none", "required", or specific tool name
MaxCostUSD: 1.00, // Cost budget in USD (0 = unlimited)
Blocklist: []string{"dangerous_tool"},
}
func NewAudioTurnStage(config AudioTurnConfig) (*AudioTurnStage, error)

VAD-based turn detection and audio accumulation.

Configuration:

config := stage.AudioTurnConfig{
SilenceDuration: 800 * time.Millisecond,
MinSpeechDuration: 200 * time.Millisecond,
MaxTurnDuration: 30 * time.Second,
SampleRate: 16000,
}
func NewSTTStage(service stt.Service, config STTStageConfig) *STTStage

Speech-to-text transcription.

Configuration:

config := stage.STTStageConfig{
Language: "en",
SkipEmpty: true,
MinAudioBytes: 1600, // 50ms at 16kHz
}
func NewTTSStage(service tts.Service, config TTSConfig) *TTSStage

Text-to-speech synthesis.

Configuration:

config := stage.TTSConfig{
Voice: "alloy",
Speed: 1.0,
}
func NewTTSStageWithInterruption(
service tts.Service,
handler *audio.InterruptionHandler,
config TTSConfig,
) *TTSStageWithInterruption

TTS with barge-in/interruption support.

func NewDuplexProviderStage(session providers.StreamInputSession) *DuplexProviderStage

Bidirectional WebSocket streaming for native audio LLMs.

func NewVADAccumulatorStage(config VADConfig) *VADAccumulatorStage

Audio buffering with voice activity detection.

func NewRouterStage(routeFunc func(StreamElement) string, outputs map[string]chan<- StreamElement) *RouterStage

Dynamic routing based on element content.

func NewMergeStage(inputs ...<-chan StreamElement) *MergeStage

Combines multiple input streams.

func NewMetricsStage(inner Stage) *MetricsStage

Wraps a stage to collect performance metrics.

Metrics collected:

  • Latency (min/max/avg)
  • Throughput (elements/sec)
  • Error count
func NewTracingStage(inner Stage, tracer Tracer) *TracingStage

Adds distributed tracing to a stage.

func NewDebugStage(output io.Writer) *DebugStage

Logs all elements as JSON for debugging.

func NewVariableProviderStage(providers []variables.Provider) *VariableProviderStage

Resolves variables from multiple sources.

func NewMediaExternalizerStage(storage MediaStorage, config MediaExternalizerConfig) *MediaExternalizerStage

Uploads large media to external storage.

func NewContextBuilderStage(config ContextBuilderConfig) *ContextBuilderStage

Manages token budget with truncation strategies.

Configuration:

config := stage.ContextBuilderConfig{
TokenBudget: 4000,
TruncationStrategy: "sliding_window", // or "summarize"
}

Base implementation for custom stages:

type BaseStage struct {
name string
stageType StageType
}
func NewBaseStage(name string, stageType StageType) BaseStage
func (s *BaseStage) Name() string
func (s *BaseStage) Type() StageType

Example custom stage:

type MyStage struct {
stage.BaseStage
config MyConfig
}
func NewMyStage(config MyConfig) *MyStage {
return &MyStage{
BaseStage: stage.NewBaseStage("my_stage", stage.StageTypeTransform),
config: config,
}
}
func (s *MyStage) Process(ctx context.Context, input <-chan stage.StreamElement, output chan<- stage.StreamElement) error {
defer close(output)
for elem := range input {
// Transform element
if elem.Text != nil {
upper := strings.ToUpper(*elem.Text)
elem.Text = &upper
}
select {
case output <- elem:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

Send errors as elements for downstream handling:

if err := validate(elem); err != nil {
output <- stage.NewErrorElement(err)
continue
}

Return error to stop the pipeline:

if err := criticalOperation(); err != nil {
return err // Pipeline stops
}

Always check for cancellation:

select {
case output <- elem:
case <-ctx.Done():
return ctx.Err()
}

StreamPipeline.BaseMetadata provides session-level metadata that is automatically merged into every StreamElement at the start of each Execute/ExecuteSync call. This is useful for injecting context that should be available to all stages and providers across every turn of a conversation — without manually setting it on each input element.

pipeline, _ := builder.Build()
pipeline.BaseMetadata = map[string]interface{}{
"session_id": "sess-abc123",
"tenant_id": "acme-corp",
"user_id": "user-42",
}

Per-element metadata takes precedence over base metadata on key collision. BaseMetadata is nil by default with zero cost when unused.

Base metadata flows through the same path as per-element metadata: it is accumulated by ProviderStage and included in PredictionRequest.Metadata, making it available to all provider implementations.

Standard metadata keys used by built-in stages:

KeyTypeSet ByUsed By
system_promptstringPromptAssemblyStageProviderStage
allowed_tools[]stringPromptAssemblyStageProviderStage
validators[]ValidatorConfigPromptAssemblyStageValidationStage
variablesmap[string]stringVariableProviderStageTemplateStage
conversation_idstringStateStoreLoadStageStateStoreSaveStage
from_historyboolStateStoreLoadStage-
validation_results[]ValidationResultValidationStage-
cost_infotypes.CostInfoProviderStage-
latency_msint64ProviderStage-
Use CaseRecommendedNotes
Low latency4-8Minimize buffering
High throughput32-64Allow producer ahead
Memory constrained8-16Balance

The default timeout is an idle timeout (30s) that resets on each activity (provider call, tool execution, streaming chunk). The hard execution timeout is disabled by default.

Pipeline TypeIdle TimeoutExecution TimeoutShutdown Timeout
Simple chat30s (default)0 (disabled)5s
Tool-heavy60s0 (disabled)30s
Voice (VAD)300s0 (disabled)10s
Voice (ASM)0 (disabled)0 (disabled)15s
pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewStateStoreLoadStage(stateConfig),
stage.NewPromptAssemblyStage(registry, "chat", vars),
stage.NewTemplateStage(),
stage.NewProviderStage(provider, tools, policy, config),
stage.NewStateStoreSaveStage(stateConfig),
).
Build()
// Execute
msg := types.Message{Role: "user"}
msg.AddTextPart("Hello!")
input := make(chan stage.StreamElement, 1)
input <- stage.NewMessageElement(msg)
close(input)
output, _ := pipeline.Execute(ctx, input)
for elem := range output {
if elem.Text != nil {
fmt.Print(*elem.Text)
}
}
pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewAudioTurnStage(vadConfig),
stage.NewSTTStage(sttService, sttConfig),
stage.NewStateStoreLoadStage(stateConfig),
stage.NewPromptAssemblyStage(registry, task, vars),
stage.NewProviderStage(provider, tools, policy, providerConfig),
stage.NewStateStoreSaveStage(stateConfig),
stage.NewTTSStageWithInterruption(ttsService, handler, ttsConfig),
).
Build()
// Feed audio chunks
for audioChunk := range microphoneStream {
input <- stage.NewAudioElement(&stage.AudioData{
Samples: audioChunk,
SampleRate: 16000,
Format: stage.AudioFormatPCM16,
})
}
session, _ := gemini.NewStreamSession(ctx, endpoint, apiKey, streamConfig)
pipeline := stage.NewPipelineBuilder().
Chain(
stage.NewStateStoreLoadStage(stateConfig),
stage.NewPromptAssemblyStage(registry, task, vars),
stage.NewDuplexProviderStage(session),
stage.NewStateStoreSaveStage(stateConfig),
).
Build()
  1. Always close output channels: Use defer close(output) at start of Process
  2. Check context cancellation: Use select with ctx.Done()
  3. Use metadata for state: Pass data between stages via metadata
  4. Handle errors gracefully: Decide between error elements and fatal returns
  5. Order stages correctly: State load → Prompt assembly → Provider → State save
  6. Clean up resources: Use defer pipeline.Shutdown(timeout)