Configure Pipeline
Set up and configure Runtime pipeline for LLM execution.
Create a functional stage-based pipeline with proper configuration for your use case.
Prerequisites
Section titled “Prerequisites”- Go 1.21+
- API key for LLM provider (OpenAI, Claude, or Gemini)
- Basic understanding of streaming pipelines
Basic Pipeline
Section titled “Basic Pipeline”Step 1: Import Dependencies
Section titled “Step 1: Import Dependencies”import ( "context" "log"
"github.com/AltairaLabs/PromptKit/runtime/pipeline/stage" "github.com/AltairaLabs/PromptKit/runtime/providers/openai")Step 2: Create Provider
Section titled “Step 2: Create Provider”provider := openai.NewProvider( "openai", "gpt-4o-mini", "", // Use default base URL providers.ProviderDefaults{Temperature: 0.7, MaxTokens: 2000}, false, // Don't include raw output)defer provider.Close()Step 3: Build Pipeline
Section titled “Step 3: Build Pipeline”pipeline := stage.NewPipelineBuilder(). Chain( stage.NewProviderStage(provider, nil, nil, &stage.ProviderConfig{ MaxTokens: 1500, Temperature: 0.7, }), ). Build()Step 4: Execute
Section titled “Step 4: Execute”ctx := context.Background()
// Create input elementmsg := types.Message{Role: "user"}msg.AddTextPart("What is 2+2?")input := make(chan stage.StreamElement, 1)input <- stage.NewMessageElement(msg)close(input)
// Execute pipelineoutput, err := pipeline.Execute(ctx, input)if err != nil { log.Fatalf("Execution failed: %v", err)}
// Collect responsefor elem := range output { if elem.Text != nil { log.Printf("Response: %s\n", *elem.Text) }}Configuration Options
Section titled “Configuration Options”Pipeline Configuration
Section titled “Pipeline Configuration”config := stage.DefaultPipelineConfig(). WithChannelBufferSize(32). // Inter-stage channel buffer WithIdleTimeout(60 * time.Second). // Cancel after 60s of inactivity (default: 30s) WithGracefulShutdownTimeout(10 * time.Second). // Shutdown grace period WithMetrics(true). // Enable per-stage metrics WithTracing(true) // Enable distributed tracing
pipeline := stage.NewPipelineBuilderWithConfig(config). Chain(stages...). Build()Provider Stage Configuration
Section titled “Provider Stage Configuration”providerConfig := &stage.ProviderConfig{ MaxTokens: 2000, // Maximum response tokens Temperature: 0.7, // Randomness (0-2) Seed: &seed, // Reproducibility}Custom Provider Settings
Section titled “Custom Provider Settings”customDefaults := providers.ProviderDefaults{ Temperature: 0.8, TopP: 0.95, MaxTokens: 4000, Pricing: providers.Pricing{ InputCostPer1K: 0.00015, OutputCostPer1K: 0.0006, },}
provider := openai.NewProvider( "custom-openai", "gpt-4o-mini", "", customDefaults, false,)Multiple Stages
Section titled “Multiple Stages”Adding Prompt Assembly
Section titled “Adding Prompt Assembly”pipeline := stage.NewPipelineBuilder(). Chain( stage.NewPromptAssemblyStage(promptRegistry, "chat", variables), stage.NewTemplateStage(), stage.NewProviderStage(provider, nil, nil, config), ). Build()Adding Guardrail Hooks
Section titled “Adding Guardrail Hooks”import ( "github.com/AltairaLabs/PromptKit/runtime/hooks" "github.com/AltairaLabs/PromptKit/runtime/hooks/guardrails")
hookRegistry := hooks.NewRegistry( hooks.WithProviderHook(guardrails.NewBannedWordsHook([]string{"inappropriate"})), hooks.WithProviderHook(guardrails.NewLengthHook(2000, 500)),)
pipeline := stage.NewPipelineBuilder(). Chain( stage.NewPromptAssemblyStage(promptRegistry, "chat", variables), stage.NewProviderStage(provider, nil, hookRegistry, config), ). Build()Adding State Persistence
Section titled “Adding State Persistence”import ( "github.com/redis/go-redis/v9" "github.com/AltairaLabs/PromptKit/runtime/statestore" "github.com/AltairaLabs/PromptKit/runtime/pipeline")
redisClient := redis.NewClient(&redis.Options{ Addr: "localhost:6379",})
store := statestore.NewRedisStore(redisClient)
stateConfig := &pipeline.StateStoreConfig{ Store: store, ConversationID: "session-123",}
pipeline := stage.NewPipelineBuilder(). Chain( stage.NewStateStoreLoadStage(stateConfig), stage.NewPromptAssemblyStage(promptRegistry, "chat", variables), stage.NewProviderStage(provider, nil, nil, config), stage.NewStateStoreSaveStage(stateConfig), ). Build()Pipeline Modes
Section titled “Pipeline Modes”Text Mode (Default)
Section titled “Text Mode (Default)”Standard request/response pipeline:
pipeline := stage.NewPipelineBuilder(). Chain( stage.NewStateStoreLoadStage(stateConfig), stage.NewPromptAssemblyStage(promptRegistry, task, vars), stage.NewTemplateStage(), stage.NewProviderStage(provider, tools, policy, config), stage.NewValidationStage(validatorRegistry, validationConfig), stage.NewStateStoreSaveStage(stateConfig), ). Build()VAD Mode (Voice with Text LLM)
Section titled “VAD Mode (Voice with Text LLM)”For voice applications using text-based LLMs:
vadConfig := stage.AudioTurnConfig{ SilenceDuration: 800 * time.Millisecond, MinSpeechDuration: 200 * time.Millisecond, MaxTurnDuration: 30 * time.Second, SampleRate: 16000,}
sttConfig := stage.STTStageConfig{ Language: "en", MinAudioBytes: 1600,}
ttsConfig := stage.TTSConfig{ Voice: "alloy", Speed: 1.0,}
pipeline := stage.NewPipelineBuilder(). Chain( stage.NewAudioTurnStage(vadConfig), stage.NewSTTStage(sttService, sttConfig), stage.NewStateStoreLoadStage(stateConfig), stage.NewPromptAssemblyStage(promptRegistry, task, vars), stage.NewProviderStage(provider, tools, policy, config), stage.NewStateStoreSaveStage(stateConfig), stage.NewTTSStageWithInterruption(ttsService, interruptionHandler, ttsConfig), ). Build()ASM Mode (Duplex Streaming)
Section titled “ASM Mode (Duplex Streaming)”For native multimodal LLMs like Gemini Live:
session, _ := gemini.NewStreamSession(ctx, endpoint, apiKey, streamConfig)
pipeline := stage.NewPipelineBuilder(). Chain( stage.NewStateStoreLoadStage(stateConfig), stage.NewPromptAssemblyStage(promptRegistry, task, vars), stage.NewDuplexProviderStage(session), stage.NewStateStoreSaveStage(stateConfig), ). Build()Environment-Based Configuration
Section titled “Environment-Based Configuration”Production Configuration
Section titled “Production Configuration”func NewProductionPipeline() (*stage.StreamPipeline, error) { // Get API key from environment apiKey := os.Getenv("OPENAI_API_KEY") if apiKey == "" { return nil, fmt.Errorf("OPENAI_API_KEY not set") }
// Configure provider provider := openai.NewProvider( "openai-prod", "gpt-4o-mini", "", providers.ProviderDefaults{Temperature: 0.7, MaxTokens: 2000}, false, )
// Production pipeline config config := stage.DefaultPipelineConfig(). WithChannelBufferSize(32). WithExecutionTimeout(60 * time.Second). WithGracefulShutdownTimeout(15 * time.Second). WithMetrics(true)
// Build pipeline return stage.NewPipelineBuilderWithConfig(config). Chain( stage.NewPromptAssemblyStage(promptRegistry, "chat", nil), stage.NewTemplateStage(), stage.NewValidationStage(validatorRegistry, validationConfig), stage.NewProviderStage(provider, toolRegistry, toolPolicy, &stage.ProviderConfig{ MaxTokens: 1500, Temperature: 0.7, }), ). Build(), nil}Development Configuration
Section titled “Development Configuration”func NewDevelopmentPipeline() *stage.StreamPipeline { // Use mock provider for testing provider := mock.NewProvider("mock", "test-model", true)
// Relaxed config for development config := stage.DefaultPipelineConfig(). WithChannelBufferSize(8). WithExecutionTimeout(10 * time.Second)
return stage.NewPipelineBuilderWithConfig(config). Chain( stage.NewDebugStage(os.Stdout), // Log all elements stage.NewProviderStage(provider, nil, nil, &stage.ProviderConfig{ MaxTokens: 500, Temperature: 1.0, }), ). Build()}Common Patterns
Section titled “Common Patterns”Pipeline Factory
Section titled “Pipeline Factory”type PipelineConfig struct { ProviderType string Model string MaxTokens int Temperature float32 EnableState bool EnableDebug bool}
func NewPipelineFromConfig(cfg PipelineConfig) (*stage.StreamPipeline, error) { var provider providers.Provider
switch cfg.ProviderType { case "openai": provider = openai.NewProvider( "openai", cfg.Model, "", providers.ProviderDefaults{Temperature: 0.7, MaxTokens: 2000}, false, ) case "claude": provider = claude.NewProvider( "claude", cfg.Model, "", providers.ProviderDefaults{Temperature: 0.7, MaxTokens: 4096}, false, ) default: return nil, fmt.Errorf("unknown provider: %s", cfg.ProviderType) }
// Build stage list stages := []stage.Stage{}
if cfg.EnableDebug { stages = append(stages, stage.NewDebugStage(os.Stdout)) }
if cfg.EnableState { stages = append(stages, stage.NewStateStoreLoadStage(stateConfig)) }
stages = append(stages, stage.NewProviderStage(provider, nil, nil, &stage.ProviderConfig{ MaxTokens: cfg.MaxTokens, Temperature: cfg.Temperature, }))
if cfg.EnableState { stages = append(stages, stage.NewStateStoreSaveStage(stateConfig)) }
return stage.NewPipelineBuilder(). Chain(stages...). Build(), nil}Synchronous Execution Helper
Section titled “Synchronous Execution Helper”func ExecuteSync(ctx context.Context, pipeline *stage.StreamPipeline, message string) (*types.Message, error) { // Create input msg := types.Message{Role: "user"} msg.AddTextPart(message) input := make(chan stage.StreamElement, 1) input <- stage.NewMessageElement(msg) close(input)
// Execute output, err := pipeline.Execute(ctx, input) if err != nil { return nil, err }
// Collect response var response *types.Message for elem := range output { if elem.Message != nil { response = elem.Message } if elem.Error != nil { return nil, elem.Error } }
return response, nil}Session Metadata
Section titled “Session Metadata”Use BaseMetadata to attach session-level context that flows through to all stages and providers on every turn:
pipeline, _ := builder.Build()pipeline.BaseMetadata = map[string]interface{}{ "session_id": sessionID, "tenant_id": tenantID, "user_id": userID,}
// All Execute/ExecuteSync calls now include these keys in element metadata.// Per-element metadata overrides base metadata on key collision.result, err := pipeline.ExecuteSync(ctx, input)This avoids manually injecting the same metadata into every StreamElement on each turn. Providers receive these values in PredictionRequest.Metadata.
Testing Configuration
Section titled “Testing Configuration”Test Pipeline
Section titled “Test Pipeline”func TestPipeline(t *testing.T) { // Create mock provider provider := mock.NewProvider("test", "test-model", false) provider.AddResponse("test input", "test output")
// Simple test pipeline pipeline := stage.NewPipelineBuilder(). Chain( stage.NewProviderStage(provider, nil, nil, &stage.ProviderConfig{ MaxTokens: 100, }), ). Build()
// Create input msg := types.Message{Role: "user"} msg.AddTextPart("test input") input := make(chan stage.StreamElement, 1) input <- stage.NewMessageElement(msg) close(input)
// Execute output, err := pipeline.Execute(context.Background(), input) if err != nil { t.Fatalf("execution failed: %v", err) }
// Check output for elem := range output { if elem.Message != nil { content := elem.Message.GetTextContent() if content != "test output" { t.Errorf("unexpected response: %s", content) } } }}Troubleshooting
Section titled “Troubleshooting”Issue: Timeout Errors
Section titled “Issue: Timeout Errors”Problem: Pipeline executions timing out.
Solution: Increase the idle timeout (resets on each activity — provider call, tool execution, streaming chunk):
config := stage.DefaultPipelineConfig(). WithIdleTimeout(120 * time.Second) // Increase from default 30sFor a hard wall-clock limit, use WithExecutionTimeout (disabled by default).
Issue: Backpressure
Section titled “Issue: Backpressure”Problem: Slow consumers causing producer blocking.
Solution: Increase channel buffer size:
config := stage.DefaultPipelineConfig(). WithChannelBufferSize(64) // Increase from default 16Issue: Memory Growth
Section titled “Issue: Memory Growth”Problem: Memory usage increasing over time.
Solution: Ensure proper cleanup:
defer provider.Close()defer mcpRegistry.Close()Best Practices
Section titled “Best Practices”-
Always use defer for cleanup:
defer pipeline.Shutdown(10 * time.Second) -
Set appropriate timeouts:
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)defer cancel() -
Use environment variables for secrets:
apiKey := os.Getenv("OPENAI_API_KEY") -
Configure based on environment:
if os.Getenv("ENV") == "production" {config = config.WithMetrics(true).WithTracing(true)} -
Order stages correctly:
- State load before prompt assembly
- Validation before provider
- State save after provider
Next Steps
Section titled “Next Steps”- Setup Providers - Configure specific providers
- Handle Errors - Robust error handling
- Streaming Responses - Real-time output
- Create Custom Stages - Build your own stages
See Also
Section titled “See Also”- Pipeline Reference - Complete API
- Pipeline Tutorial - Step-by-step guide
- Stage Design - Architecture explanation