Configure Pipeline
Set up and configure Runtime pipeline for LLM execution.
Create a functional stage-based pipeline with proper configuration for your use case.
Prerequisites
Section titled “Prerequisites”- Go 1.21+
- API key for LLM provider (OpenAI, Claude, or Gemini)
- Basic understanding of streaming pipelines
Basic Pipeline
Section titled “Basic Pipeline”Step 1: Import Dependencies
Section titled “Step 1: Import Dependencies”import ( "context" "log"
"github.com/AltairaLabs/PromptKit/runtime/pipeline/stage" "github.com/AltairaLabs/PromptKit/runtime/providers/openai")Step 2: Create Provider
Section titled “Step 2: Create Provider”provider := openai.NewOpenAIProvider( "openai", "gpt-4o-mini", "", // Use default base URL openai.DefaultProviderDefaults(), false, // Don't include raw output)defer provider.Close()Step 3: Build Pipeline
Section titled “Step 3: Build Pipeline”pipeline := stage.NewPipelineBuilder(). Chain( stage.NewProviderStage(provider, nil, nil, &stage.ProviderConfig{ MaxTokens: 1500, Temperature: 0.7, }), ). Build()Step 4: Execute
Section titled “Step 4: Execute”ctx := context.Background()
// Create input elementmsg := types.Message{Role: "user"}msg.AddTextPart("What is 2+2?")input := make(chan stage.StreamElement, 1)input <- stage.NewMessageElement(msg)close(input)
// Execute pipelineoutput, err := pipeline.Execute(ctx, input)if err != nil { log.Fatalf("Execution failed: %v", err)}
// Collect responsefor elem := range output { if elem.Text != nil { log.Printf("Response: %s\n", *elem.Text) }}Configuration Options
Section titled “Configuration Options”Pipeline Configuration
Section titled “Pipeline Configuration”config := stage.DefaultPipelineConfig(). WithChannelBufferSize(32). // Inter-stage channel buffer WithExecutionTimeout(30 * time.Second). // Per-request timeout WithGracefulShutdownTimeout(10 * time.Second). // Shutdown grace period WithMetrics(true). // Enable per-stage metrics WithTracing(true) // Enable distributed tracing
pipeline := stage.NewPipelineBuilderWithConfig(config). Chain(stages...). Build()Provider Stage Configuration
Section titled “Provider Stage Configuration”providerConfig := &stage.ProviderConfig{ MaxTokens: 2000, // Maximum response tokens Temperature: 0.7, // Randomness (0-2) Seed: &seed, // Reproducibility}Custom Provider Settings
Section titled “Custom Provider Settings”customDefaults := providers.ProviderDefaults{ Temperature: 0.8, TopP: 0.95, MaxTokens: 4000, Pricing: providers.Pricing{ InputCostPer1K: 0.00015, OutputCostPer1K: 0.0006, },}
provider := openai.NewOpenAIProvider( "custom-openai", "gpt-4o-mini", "", customDefaults, false,)Multiple Stages
Section titled “Multiple Stages”Adding Prompt Assembly
Section titled “Adding Prompt Assembly”pipeline := stage.NewPipelineBuilder(). Chain( stage.NewPromptAssemblyStage(promptRegistry, "chat", variables), stage.NewTemplateStage(), stage.NewProviderStage(provider, nil, nil, config), ). Build()Adding Validators
Section titled “Adding Validators”import "github.com/AltairaLabs/PromptKit/runtime/validators"
validatorRegistry := validators.NewRegistry()validatorRegistry.Register("banned_words", validators.NewBannedWordsValidator([]string{"inappropriate"}))validatorRegistry.Register("length", validators.NewLengthValidator(10, 500))
pipeline := stage.NewPipelineBuilder(). Chain( stage.NewPromptAssemblyStage(promptRegistry, "chat", variables), stage.NewValidationStage(validatorRegistry, &stage.ValidationConfig{ FailOnError: true, }), stage.NewProviderStage(provider, nil, nil, config), ). Build()Adding State Persistence
Section titled “Adding State Persistence”import ( "github.com/redis/go-redis/v9" "github.com/AltairaLabs/PromptKit/runtime/statestore" "github.com/AltairaLabs/PromptKit/runtime/pipeline")
redisClient := redis.NewClient(&redis.Options{ Addr: "localhost:6379",})
store := statestore.NewRedisStateStore(redisClient)
stateConfig := &pipeline.StateStoreConfig{ Store: store, ConversationID: "session-123",}
pipeline := stage.NewPipelineBuilder(). Chain( stage.NewStateStoreLoadStage(stateConfig), stage.NewPromptAssemblyStage(promptRegistry, "chat", variables), stage.NewProviderStage(provider, nil, nil, config), stage.NewStateStoreSaveStage(stateConfig), ). Build()Pipeline Modes
Section titled “Pipeline Modes”Text Mode (Default)
Section titled “Text Mode (Default)”Standard request/response pipeline:
pipeline := stage.NewPipelineBuilder(). Chain( stage.NewStateStoreLoadStage(stateConfig), stage.NewPromptAssemblyStage(promptRegistry, task, vars), stage.NewTemplateStage(), stage.NewProviderStage(provider, tools, policy, config), stage.NewValidationStage(validatorRegistry, validationConfig), stage.NewStateStoreSaveStage(stateConfig), ). Build()VAD Mode (Voice with Text LLM)
Section titled “VAD Mode (Voice with Text LLM)”For voice applications using text-based LLMs:
vadConfig := stage.AudioTurnConfig{ SilenceDuration: 800 * time.Millisecond, MinSpeechDuration: 200 * time.Millisecond, MaxTurnDuration: 30 * time.Second, SampleRate: 16000,}
sttConfig := stage.STTStageConfig{ Language: "en", MinAudioBytes: 1600,}
ttsConfig := stage.TTSConfig{ Voice: "alloy", Speed: 1.0,}
pipeline := stage.NewPipelineBuilder(). Chain( stage.NewAudioTurnStage(vadConfig), stage.NewSTTStage(sttService, sttConfig), stage.NewStateStoreLoadStage(stateConfig), stage.NewPromptAssemblyStage(promptRegistry, task, vars), stage.NewProviderStage(provider, tools, policy, config), stage.NewStateStoreSaveStage(stateConfig), stage.NewTTSStageWithInterruption(ttsService, interruptionHandler, ttsConfig), ). Build()ASM Mode (Duplex Streaming)
Section titled “ASM Mode (Duplex Streaming)”For native multimodal LLMs like Gemini Live:
session, _ := gemini.NewStreamSession(ctx, endpoint, apiKey, streamConfig)
pipeline := stage.NewPipelineBuilder(). Chain( stage.NewStateStoreLoadStage(stateConfig), stage.NewPromptAssemblyStage(promptRegistry, task, vars), stage.NewDuplexProviderStage(session), stage.NewStateStoreSaveStage(stateConfig), ). Build()Environment-Based Configuration
Section titled “Environment-Based Configuration”Production Configuration
Section titled “Production Configuration”func NewProductionPipeline() (*stage.StreamPipeline, error) { // Get API key from environment apiKey := os.Getenv("OPENAI_API_KEY") if apiKey == "" { return nil, fmt.Errorf("OPENAI_API_KEY not set") }
// Configure provider provider := openai.NewOpenAIProvider( "openai-prod", "gpt-4o-mini", "", openai.DefaultProviderDefaults(), false, )
// Production pipeline config config := stage.DefaultPipelineConfig(). WithChannelBufferSize(32). WithExecutionTimeout(60 * time.Second). WithGracefulShutdownTimeout(15 * time.Second). WithMetrics(true)
// Build pipeline return stage.NewPipelineBuilderWithConfig(config). Chain( stage.NewPromptAssemblyStage(promptRegistry, "chat", nil), stage.NewTemplateStage(), stage.NewValidationStage(validatorRegistry, validationConfig), stage.NewProviderStage(provider, toolRegistry, toolPolicy, &stage.ProviderConfig{ MaxTokens: 1500, Temperature: 0.7, }), ). Build(), nil}Development Configuration
Section titled “Development Configuration”func NewDevelopmentPipeline() *stage.StreamPipeline { // Use mock provider for testing provider := mock.NewMockProvider("mock", "test-model", true)
// Relaxed config for development config := stage.DefaultPipelineConfig(). WithChannelBufferSize(8). WithExecutionTimeout(10 * time.Second)
return stage.NewPipelineBuilderWithConfig(config). Chain( stage.NewDebugStage(os.Stdout), // Log all elements stage.NewProviderStage(provider, nil, nil, &stage.ProviderConfig{ MaxTokens: 500, Temperature: 1.0, }), ). Build()}Common Patterns
Section titled “Common Patterns”Pipeline Factory
Section titled “Pipeline Factory”type PipelineConfig struct { ProviderType string Model string MaxTokens int Temperature float32 EnableState bool EnableDebug bool}
func NewPipelineFromConfig(cfg PipelineConfig) (*stage.StreamPipeline, error) { var provider providers.Provider
switch cfg.ProviderType { case "openai": provider = openai.NewOpenAIProvider( "openai", cfg.Model, "", openai.DefaultProviderDefaults(), false, ) case "claude": provider = claude.NewClaudeProvider( "claude", cfg.Model, "", claude.DefaultProviderDefaults(), false, ) default: return nil, fmt.Errorf("unknown provider: %s", cfg.ProviderType) }
// Build stage list stages := []stage.Stage{}
if cfg.EnableDebug { stages = append(stages, stage.NewDebugStage(os.Stdout)) }
if cfg.EnableState { stages = append(stages, stage.NewStateStoreLoadStage(stateConfig)) }
stages = append(stages, stage.NewProviderStage(provider, nil, nil, &stage.ProviderConfig{ MaxTokens: cfg.MaxTokens, Temperature: cfg.Temperature, }))
if cfg.EnableState { stages = append(stages, stage.NewStateStoreSaveStage(stateConfig)) }
return stage.NewPipelineBuilder(). Chain(stages...). Build(), nil}Synchronous Execution Helper
Section titled “Synchronous Execution Helper”func ExecuteSync(ctx context.Context, pipeline *stage.StreamPipeline, message string) (*types.Message, error) { // Create input msg := types.Message{Role: "user"} msg.AddTextPart(message) input := make(chan stage.StreamElement, 1) input <- stage.NewMessageElement(msg) close(input)
// Execute output, err := pipeline.Execute(ctx, input) if err != nil { return nil, err }
// Collect response var response *types.Message for elem := range output { if elem.Message != nil { response = elem.Message } if elem.Error != nil { return nil, elem.Error } }
return response, nil}Testing Configuration
Section titled “Testing Configuration”Test Pipeline
Section titled “Test Pipeline”func TestPipeline(t *testing.T) { // Create mock provider provider := mock.NewMockProvider("test", "test-model", false) provider.AddResponse("test input", "test output")
// Simple test pipeline pipeline := stage.NewPipelineBuilder(). Chain( stage.NewProviderStage(provider, nil, nil, &stage.ProviderConfig{ MaxTokens: 100, }), ). Build()
// Create input msg := types.Message{Role: "user"} msg.AddTextPart("test input") input := make(chan stage.StreamElement, 1) input <- stage.NewMessageElement(msg) close(input)
// Execute output, err := pipeline.Execute(context.Background(), input) if err != nil { t.Fatalf("execution failed: %v", err) }
// Check output for elem := range output { if elem.Message != nil { content := elem.Message.GetTextContent() if content != "test output" { t.Errorf("unexpected response: %s", content) } } }}Troubleshooting
Section titled “Troubleshooting”Issue: Timeout Errors
Section titled “Issue: Timeout Errors”Problem: Pipeline executions timing out.
Solution: Increase execution timeout:
config := stage.DefaultPipelineConfig(). WithExecutionTimeout(120 * time.Second) // Increase from default 30sIssue: Backpressure
Section titled “Issue: Backpressure”Problem: Slow consumers causing producer blocking.
Solution: Increase channel buffer size:
config := stage.DefaultPipelineConfig(). WithChannelBufferSize(64) // Increase from default 16Issue: Memory Growth
Section titled “Issue: Memory Growth”Problem: Memory usage increasing over time.
Solution: Ensure proper cleanup:
defer pipeline.Shutdown(10 * time.Second)defer provider.Close()defer store.Close()Best Practices
Section titled “Best Practices”-
Always use defer for cleanup:
defer pipeline.Shutdown(10 * time.Second) -
Set appropriate timeouts:
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)defer cancel() -
Use environment variables for secrets:
apiKey := os.Getenv("OPENAI_API_KEY") -
Configure based on environment:
if os.Getenv("ENV") == "production" {config = config.WithMetrics(true).WithTracing(true)} -
Order stages correctly:
- State load before prompt assembly
- Validation before provider
- State save after provider
Next Steps
Section titled “Next Steps”- Setup Providers - Configure specific providers
- Handle Errors - Robust error handling
- Streaming Responses - Real-time output
- Create Custom Stages - Build your own stages
See Also
Section titled “See Also”- Pipeline Reference - Complete API
- Pipeline Tutorial - Step-by-step guide
- Stage Design - Architecture explanation