The PromptKit runtime pipeline is a stage-based streaming execution engine that processes LLM interactions through a composable DAG (Directed Acyclic Graph) of processing stages. This architecture provides flexibility, extensibility, true streaming execution, and concurrent processing.

Overview

The pipeline system uses a reactive streams pattern where data flows through stages as StreamElement objects. Each stage runs in its own goroutine and communicates via channels, enabling true streaming with backpressure support.

graph TB
    subgraph "Stage Pipeline Execution"
        Input["Input Element"]
        S1["Stage 1<br/>StateStoreLoad"]
        S2["Stage 2<br/>PromptAssembly"]
        S3["Stage 3<br/>Template"]
        S4["Stage 4<br/>Provider"]
        S5["Stage 5<br/>Validation"]
        S6["Stage 6<br/>StateStoreSave"]
        Output["Output Element"]
    end

    Input --> S1
    S1 --> S2
    S2 --> S3
    S3 --> S4
    S4 --> S5
    S5 --> S6
    S6 --> Output

    style S4 fill:#f9f,stroke:#333,stroke-width:3px

Core Concepts

StreamElement

The fundamental unit of data flowing through the pipeline. Each element can carry multiple content types:

type StreamElement struct {
    // Content types (mutually optional)
    Text     *string
    Audio    *AudioData
    Video    *VideoData
    Image    *ImageData
    Message  *types.Message
    ToolCall *types.ToolCall
    Parts    []types.ContentPart

    // Metadata for passing state between stages
    Metadata map[string]interface{}

    // Priority for QoS-aware scheduling
    Priority Priority  // Low, Normal, High, Critical

    // Control signals
    Error     error
    Timestamp time.Time
}

Helper functions for creating elements:

textElem := stage.NewTextElement("Hello")
msgElem := stage.NewMessageElement(types.Message{Role: "user", Content: "Hello"})
audioElem := stage.NewAudioElement(&stage.AudioData{
    Samples:    audioBytes,
    SampleRate: 16000,
    Format:     stage.AudioFormatPCM16,
})
errorElem := stage.NewErrorElement(err)

Stage Interface

A stage is a processing unit that transforms streaming elements:

type Stage interface {
    Name() string
    Type() StageType
    Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error
}

Stage Types:

Each stage:

PipelineBuilder

Constructs the pipeline DAG using a fluent API:

pipeline := stage.NewPipelineBuilder().
    Chain(
        stage.NewStateStoreLoadStage(stateConfig),
        stage.NewPromptAssemblyStage(registry, taskType, vars),
        stage.NewProviderStage(provider, toolRegistry, toolPolicy, config),
        stage.NewStateStoreSaveStage(stateConfig),
    ).
    Build()

StreamPipeline

The executable pipeline that:

// Streaming execution
output, err := pipeline.Execute(ctx, input)
for elem := range output {
    // Process elements as they arrive
}

// Synchronous execution (convenience wrapper)
result, err := pipeline.ExecuteSync(ctx, elements...)

Standard Stages

Core Stages

1. StateStoreLoadStage

Purpose: Loads conversation history from persistent storage.

Behavior:

Configuration:

stateConfig := &pipeline.StateStoreConfig{
    Store:          stateStore,
    ConversationID: "session-123",
}
loadStage := stage.NewStateStoreLoadStage(stateConfig)

2. PromptAssemblyStage

Purpose: Loads and assembles prompts from the registry with variable substitution.

Behavior:

Configuration:

assemblyStage := stage.NewPromptAssemblyStage(
    promptRegistry,
    "customer-support",  // task type
    map[string]string{   // variables
        "customer_name": "Alice",
    },
)

3. TemplateStage

Purpose: Processes template variables in content.

Behavior:

4. ProviderStage

Purpose: Executes LLM calls with streaming and tool support.

Behavior:

Configuration:

providerConfig := &stage.ProviderConfig{
    MaxTokens:   1000,
    Temperature: 0.7,
}
providerStage := stage.NewProviderStage(
    provider,
    toolRegistry,
    toolPolicy,
    providerConfig,
)

5. ValidationStage

Purpose: Validates responses against schemas and constraints.

Behavior:

6. StateStoreSaveStage

Purpose: Persists conversation state after processing.

Behavior:

Streaming/Speech Stages

VADAccumulatorStage

Purpose: Voice activity detection and audio buffering.

Behavior:

AudioTurnStage

Purpose: Comprehensive turn detection for voice applications.

Behavior:

Configuration:

config := stage.AudioTurnConfig{
    SilenceDuration:   800 * time.Millisecond,
    MinSpeechDuration: 200 * time.Millisecond,
    MaxTurnDuration:   30 * time.Second,
    SampleRate:        16000,
}
turnStage, _ := stage.NewAudioTurnStage(config)

STTStage

Purpose: Speech-to-text transcription.

Behavior:

TTSStage / TTSStageWithInterruption

Purpose: Text-to-speech synthesis.

Behavior:

DuplexProviderStage

Purpose: Bidirectional WebSocket streaming for native audio LLMs.

Behavior:

Advanced Stages

RouterStage

Purpose: Dynamic routing to multiple output paths.

Behavior:

MergeStage

Purpose: Fan-in pattern for combining multiple streams.

Behavior:

MetricsStage

Purpose: Per-stage performance monitoring.

Behavior:

TracingStage

Purpose: Distributed tracing support.

Behavior:

Utility Stages

DebugStage

Purpose: Pipeline debugging and inspection.

Behavior:

VariableProviderStage

Purpose: Dynamic variable resolution.

Behavior:

MediaExternalizerStage

Purpose: External storage for large media content.

Behavior:

ContextBuilderStage

Purpose: Token budget management.

Behavior:

Pipeline Modes

The SDK supports three pipeline configurations for different use cases:

Text Mode

Standard HTTP-based LLM interactions:

Input → StateStoreLoad → VariableProvider → PromptAssembly → Template → Provider → Validation → StateStoreSave → Output

Use Cases: Chat applications, content generation, text processing

VAD Mode (Voice Activity Detection)

For voice applications without native audio LLM support:

Input → StateStoreLoad → VariableProvider → PromptAssembly → Template → AudioTurn → STT → Provider → TTS → Validation → StateStoreSave → Output

Use Cases: Voice assistants using text-based LLMs, telephony integrations

ASM Mode (Audio Streaming Mode)

For native multimodal LLMs with real-time audio:

Input → StateStoreLoad → VariableProvider → PromptAssembly → Template → DuplexProvider → Validation → StateStoreSave → Output

Use Cases: Gemini Live API, real-time voice conversations

Execution Modes

Streaming Execution

Elements flow through the pipeline as they’re produced:

sequenceDiagram
    participant Client
    participant Pipeline
    participant Stage1
    participant Provider
    participant Stage2

    Client->>Pipeline: Execute(input)
    Pipeline->>Stage1: Start goroutine
    Pipeline->>Provider: Start goroutine
    Pipeline->>Stage2: Start goroutine

    loop For each input element
        Client->>Stage1: element via channel
        Stage1->>Provider: processed element
        Provider->>Stage2: streaming chunks
        Stage2->>Client: output chunks
    end

Use Cases: Interactive chat, real-time applications, progressive display

Synchronous Execution

Convenience wrapper that collects all output:

result, err := pipeline.ExecuteSync(ctx, inputElement)
// result.Response contains the final response
// result.Messages contains all messages

Use Cases: Batch processing, testing, simple request/response

Multi-Round Tool Execution

The ProviderStage handles complex tool calling scenarios:

sequenceDiagram
    participant Pipeline
    participant Provider Stage
    participant LLM
    participant Tool Registry

    Pipeline->>Provider Stage: element with message
    Provider Stage->>LLM: Chat(messages, tools)
    LLM-->>Provider Stage: Response + Tool Calls

    loop Each Tool Call
        Provider Stage->>Tool Registry: Execute(toolName, args)
        Tool Registry-->>Provider Stage: Tool Result
    end

    Provider Stage->>Provider Stage: Append Tool Results
    Provider Stage->>LLM: Chat(messages + results, tools)
    LLM-->>Provider Stage: Final Response
    Provider Stage-->>Pipeline: Output elements

Features:

Configuration

Pipeline Configuration

config := stage.DefaultPipelineConfig().
    WithChannelBufferSize(32).           // Larger buffers for throughput
    WithPriorityQueue(true).             // Enable priority scheduling
    WithExecutionTimeout(60 * time.Second).
    WithMetrics(true).                   // Enable per-stage metrics
    WithTracing(true)                    // Enable element-level tracing

pipeline := stage.NewPipelineBuilderWithConfig(config).
    Chain(stages...).
    Build()

Default Configuration Values

SettingDefaultDescription
ChannelBufferSize16Buffer size for inter-stage channels
ExecutionTimeout30sMaximum pipeline execution time
GracefulShutdownTimeout10sTimeout for graceful shutdown
PriorityQueuefalseEnable priority-based scheduling
MetricsfalseEnable per-stage metrics collection
TracingfalseEnable distributed tracing

Error Handling

Error Propagation

Errors can be sent as error elements or returned from Process:

func (s *MyStage) Process(ctx context.Context, input <-chan StreamElement, output chan<- StreamElement) error {
    defer close(output)

    for elem := range input {
        if err := s.process(elem); err != nil {
            // Option 1: Send error element and continue
            output <- stage.NewErrorElement(err)
            continue

            // Option 2: Return error to stop pipeline
            // return err
        }
        output <- elem
    }
    return nil
}

Context Cancellation

All stages should respect context cancellation:

select {
case output <- elem:
    // Element sent successfully
case <-ctx.Done():
    return ctx.Err()  // Pipeline cancelled
}

Creating Custom Stages

Basic Pattern

type MyStage struct {
    stage.BaseStage
    config MyConfig
}

func NewMyStage(config MyConfig) *MyStage {
    return &MyStage{
        BaseStage: stage.NewBaseStage("my_stage", stage.StageTypeTransform),
        config:    config,
    }
}

func (s *MyStage) Process(
    ctx context.Context,
    input <-chan stage.StreamElement,
    output chan<- stage.StreamElement,
) error {
    defer close(output)  // Always close output when done

    for elem := range input {
        // Transform the element
        if elem.Text != nil {
            transformed := strings.ToUpper(*elem.Text)
            elem.Text = &transformed
        }

        // Send to output with cancellation check
        select {
        case output <- elem:
        case <-ctx.Done():
            return ctx.Err()
        }
    }

    return nil
}

Stage Type Guidelines

Stage TypeInputOutputUse When
Transform11 or moreModifying or enriching elements
AccumulateMany1Buffering until condition met
Generate0 or 1ManyProducing streaming output
SinkMany0Terminal processing (save, log)
BidirectionalVariesVariesFull duplex communication

Performance Considerations

Optimization Strategies

  1. Stage Ordering: Place fast-failing stages early
  2. Channel Buffer Sizing: Balance memory vs. throughput
  3. Concurrent Execution: Stages run in parallel by default
  4. Backpressure: Slow consumers naturally slow producers

Monitoring Metrics

Key metrics to track:

Integration with Other Components

graph TB
    subgraph "Pipeline System"
        Pipeline
        Stages
    end

    subgraph "Provider System"
        ProviderRegistry
        Provider
    end

    subgraph "Tool System"
        ToolRegistry
        MCPClient
    end

    subgraph "State System"
        StateStore
    end

    Pipeline --> Stages
    Stages --> Provider
    Stages --> ToolRegistry
    Stages --> StateStore
    ProviderRegistry --> Provider
    ToolRegistry --> MCPClient

The pipeline integrates with:

Best Practices

  1. Always Close Output: Use defer close(output) at the start of Process
  2. Respect Context: Check ctx.Done() in long-running operations
  3. Use Metadata: Pass state between stages via element metadata
  4. Handle Errors Gracefully: Decide between error elements and fatal errors
  5. Keep Stages Focused: Each stage should have a single responsibility
  6. Test Independently: Unit test each stage with mock channels

Related Documentation: