Handle Errors
Implement robust error handling for production pipelines.
Handle errors gracefully and implement retry logic.
Quick Start
Section titled “Quick Start”result, err := pipe.Execute(ctx, "user", "Your message")if err != nil { // Check error type if isRateLimitError(err) { log.Println("Rate limited, retry later") } else if isTimeoutError(err) { log.Println("Request timed out") } else { log.Printf("Error: %v", err) }}Common Error Types
Section titled “Common Error Types”Timeout Errors
Section titled “Timeout Errors”import "context"
// Set timeoutctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)defer cancel()
result, err := pipe.Execute(ctx, "user", "Your message")if err != nil { if errors.Is(err, context.DeadlineExceeded) { log.Println("Request timed out") // Retry with longer timeout or simplify request }}Rate Limit Errors
Section titled “Rate Limit Errors”result, err := provider.Complete(ctx, messages, config)if err != nil { if strings.Contains(err.Error(), "rate_limit") { log.Println("Rate limited") // Wait and retry time.Sleep(5 * time.Second) result, err = provider.Complete(ctx, messages, config) }}Authentication Errors
Section titled “Authentication Errors”result, err := provider.Complete(ctx, messages, config)if err != nil { if strings.Contains(err.Error(), "authentication") || strings.Contains(err.Error(), "invalid_api_key") { log.Fatal("Invalid API key - check environment variable") }}Tool Execution Errors
Section titled “Tool Execution Errors”result, err := pipe.Execute(ctx, "user", "Use the search tool")if err != nil { if strings.Contains(err.Error(), "tool") { log.Printf("Tool error: %v", err) // Check tool configuration // Verify tool is registered // Test tool separately }}Retry Strategies
Section titled “Retry Strategies”Exponential Backoff
Section titled “Exponential Backoff”func executeWithRetry(pipe *pipeline.Pipeline, ctx context.Context, role, content string, maxRetries int) (*pipeline.PipelineResult, error) { var result *pipeline.PipelineResult var err error
for i := 0; i < maxRetries; i++ { result, err = pipe.Execute(ctx, role, content) if err == nil { return result, nil }
// Check if retryable if !isRetryableError(err) { return nil, err }
// Exponential backoff backoff := time.Duration(1<<uint(i)) * time.Second log.Printf("Retry %d/%d after %v: %v", i+1, maxRetries, backoff, err) time.Sleep(backoff) }
return nil, fmt.Errorf("failed after %d retries: %w", maxRetries, err)}
func isRetryableError(err error) bool { errStr := err.Error() return strings.Contains(errStr, "rate_limit") || strings.Contains(errStr, "timeout") || strings.Contains(errStr, "503") || strings.Contains(errStr, "502")}Fixed Retry Delay
Section titled “Fixed Retry Delay”func executeWithFixedRetry(pipe *pipeline.Pipeline, ctx context.Context, role, content string, maxRetries int, delay time.Duration) (*pipeline.PipelineResult, error) { for i := 0; i < maxRetries; i++ { result, err := pipe.Execute(ctx, role, content) if err == nil { return result, nil }
if i < maxRetries-1 { log.Printf("Attempt %d failed, retrying in %v: %v", i+1, delay, err) time.Sleep(delay) } }
return nil, fmt.Errorf("failed after %d retries", maxRetries)}Provider Fallback
Section titled “Provider Fallback”Multi-Provider Strategy
Section titled “Multi-Provider Strategy”type ProviderPool struct { providers []types.Provider current int}
func (p *ProviderPool) Execute(ctx context.Context, messages []types.Message, config *types.ProviderConfig) (*types.ProviderResponse, error) { var lastErr error
// Try each provider for i := 0; i < len(p.providers); i++ { provider := p.providers[(p.current+i)%len(p.providers)]
result, err := provider.Complete(ctx, messages, config) if err == nil { // Success - update current p.current = (p.current + i) % len(p.providers) return result, nil }
lastErr = err log.Printf("Provider %d failed: %v", i, err) }
return nil, fmt.Errorf("all providers failed: %w", lastErr)}With Circuit Breaker
Section titled “With Circuit Breaker”type CircuitBreaker struct { failures int maxFailures int timeout time.Duration lastFailure time.Time}
func (cb *CircuitBreaker) Call(fn func() error) error { // Check if circuit is open if cb.failures >= cb.maxFailures { if time.Since(cb.lastFailure) < cb.timeout { return fmt.Errorf("circuit breaker open") } // Reset after timeout cb.failures = 0 }
// Call function err := fn() if err != nil { cb.failures++ cb.lastFailure = time.Now() return err }
// Success - reset cb.failures = 0 return nil}
// Usagecb := &CircuitBreaker{maxFailures: 3, timeout: 30 * time.Second}err := cb.Call(func() error { _, err := provider.Complete(ctx, messages, config) return err})Partial Results
Section titled “Partial Results”Handle Incomplete Responses
Section titled “Handle Incomplete Responses”result, err := pipe.Execute(ctx, "user", "Generate a long document")if err != nil { if strings.Contains(err.Error(), "timeout") && result != nil { // Use partial result log.Println("Request timed out, using partial response") log.Printf("Partial: %s", result.Response.Content) return result, nil } return nil, err}Streaming with Error Recovery
Section titled “Streaming with Error Recovery”stream, err := pipe.ExecuteStream(ctx, "user", "Long request")if err != nil { return err}defer stream.Close()
var accumulated stringfor { chunk, err := stream.Next() if err == io.EOF { break } if err != nil { log.Printf("Stream error: %v", err) // Use accumulated content if len(accumulated) > 0 { log.Printf("Recovered partial content: %s", accumulated) return nil } return err }
accumulated += chunk.Content}Error Monitoring
Section titled “Error Monitoring”Log Structured Errors
Section titled “Log Structured Errors”import "github.com/AltairaLabs/PromptKit/runtime/logger"
func executeWithLogging(pipe *pipeline.Pipeline, ctx context.Context, sessionID, role, content string) (*pipeline.PipelineResult, error) { start := time.Now() result, err := pipe.ExecuteWithContext(ctx, sessionID, role, content) duration := time.Since(start)
if err != nil { logger.Error("Pipeline execution failed", "session_id", sessionID, "role", role, "duration", duration, "error", err, ) return nil, err }
logger.Info("Pipeline execution succeeded", "session_id", sessionID, "duration", duration, "tokens", result.Response.Usage.TotalTokens, "cost", result.Cost.TotalCost, )
return result, nil}Error Metrics
Section titled “Error Metrics”type ErrorMetrics struct { timeouts int64 rateLimits int64 authErrors int64 toolErrors int64 otherErrors int64 totalRequests int64 successRequests int64}
func (m *ErrorMetrics) RecordError(err error) { atomic.AddInt64(&m.totalRequests, 1)
if err == nil { atomic.AddInt64(&m.successRequests, 1) return }
errStr := err.Error() switch { case strings.Contains(errStr, "timeout"): atomic.AddInt64(&m.timeouts, 1) case strings.Contains(errStr, "rate_limit"): atomic.AddInt64(&m.rateLimits, 1) case strings.Contains(errStr, "authentication"): atomic.AddInt64(&m.authErrors, 1) case strings.Contains(errStr, "tool"): atomic.AddInt64(&m.toolErrors, 1) default: atomic.AddInt64(&m.otherErrors, 1) }}
func (m *ErrorMetrics) Report() { total := atomic.LoadInt64(&m.totalRequests) success := atomic.LoadInt64(&m.successRequests) successRate := float64(success) / float64(total) * 100
log.Printf("Error Metrics:") log.Printf(" Total: %d", total) log.Printf(" Success: %d (%.2f%%)", success, successRate) log.Printf(" Timeouts: %d", atomic.LoadInt64(&m.timeouts)) log.Printf(" Rate Limits: %d", atomic.LoadInt64(&m.rateLimits)) log.Printf(" Auth Errors: %d", atomic.LoadInt64(&m.authErrors)) log.Printf(" Tool Errors: %d", atomic.LoadInt64(&m.toolErrors)) log.Printf(" Other: %d", atomic.LoadInt64(&m.otherErrors))}Complete Example
Section titled “Complete Example”package main
import ( "context" "fmt" "log" "strings" "time"
"github.com/AltairaLabs/PromptKit/runtime/pipeline" "github.com/AltairaLabs/PromptKit/runtime/pipeline/middleware" "github.com/AltairaLabs/PromptKit/runtime/providers/openai" "github.com/AltairaLabs/PromptKit/runtime/providers/anthropic")
func main() { // Create providers openaiProvider := openai.NewOpenAIProvider( "openai", "gpt-4o-mini", "", openai.DefaultProviderDefaults(), false, ) defer openaiProvider.Close()
claudeProvider := anthropic.NewAnthropicProvider( "claude", "claude-3-5-haiku-20241022", "", anthropic.DefaultProviderDefaults(), false, ) defer claudeProvider.Close()
// Provider pool with fallback providers := []types.Provider{openaiProvider, claudeProvider}
config := &middleware.ProviderMiddlewareConfig{ MaxTokens: 1000, Temperature: 0.7, }
// Try each provider with retry var result *pipeline.PipelineResult var err error
for _, provider := range providers { pipe := pipeline.NewPipeline( middleware.ProviderMiddleware(provider, nil, nil, config), ) defer pipe.Shutdown(context.Background())
// Retry with backoff result, err = executeWithRetry(pipe, context.Background(), "user", "Hello!", 3) if err == nil { break }
log.Printf("Provider %s failed: %v", provider.GetProviderName(), err) }
if err != nil { log.Fatal("All providers failed") }
log.Printf("Success: %s", result.Response.Content)}
func executeWithRetry(pipe *pipeline.Pipeline, ctx context.Context, role, content string, maxRetries int) (*pipeline.PipelineResult, error) { for i := 0; i < maxRetries; i++ { result, err := pipe.Execute(ctx, role, content) if err == nil { return result, nil }
if !isRetryableError(err) { return nil, err }
if i < maxRetries-1 { backoff := time.Duration(1<<uint(i)) * time.Second log.Printf("Retry %d/%d after %v", i+1, maxRetries, backoff) time.Sleep(backoff) } }
return nil, fmt.Errorf("failed after %d retries", maxRetries)}
func isRetryableError(err error) bool { errStr := err.Error() return strings.Contains(errStr, "rate_limit") || strings.Contains(errStr, "timeout") || strings.Contains(errStr, "503")}Troubleshooting
Section titled “Troubleshooting”Issue: Persistent Timeouts
Section titled “Issue: Persistent Timeouts”Problem: Requests consistently timing out.
Solutions:
-
Increase timeout:
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)defer cancel() -
Reduce response size:
config := &middleware.ProviderMiddlewareConfig{MaxTokens: 500, // Smaller response} -
Simplify prompt:
// Break complex request into smaller parts
Issue: Rate Limits
Section titled “Issue: Rate Limits”Problem: Frequent rate limit errors.
Solutions:
-
Add retry with backoff (see examples above)
-
Reduce concurrency:
config := &pipeline.RuntimeConfig{MaxConcurrentRequests: 1, // Sequential} -
Use rate limiter:
limiter := rate.NewLimiter(rate.Limit(10), 1) // 10 req/seclimiter.Wait(ctx)result, err := pipe.Execute(ctx, "user", content)
Issue: Memory Leaks
Section titled “Issue: Memory Leaks”Problem: Memory grows over time.
Solutions:
-
Always close resources:
defer provider.Close()defer pipe.Shutdown(ctx) -
Clean up state:
// Limit message historyif len(messages) > 50 {messages = messages[len(messages)-50:]}
Best Practices
Section titled “Best Practices”-
Set timeouts on all requests:
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)defer cancel() -
Implement retry logic for transient errors:
result, err := executeWithRetry(pipe, ctx, "user", content, 3) -
Use multiple providers for redundancy:
providers := []types.Provider{openai, claude, gemini} -
Log errors with context:
logger.Error("Request failed", "session", sessionID, "error", err) -
Monitor error rates:
metrics.RecordError(err)if metrics.ErrorRate() > 0.1 {alert("High error rate")} -
Handle partial results:
if err != nil && result != nil {// Use partial content} -
Fail fast on non-retryable errors:
if strings.Contains(err.Error(), "authentication") {log.Fatal("Invalid API key")}
Next Steps
Section titled “Next Steps”- Streaming Responses - Handle streams
- Monitor Costs - Track usage
- Setup Providers - Provider config
See Also
Section titled “See Also”- Pipeline Reference - Error details
- Providers Reference - Provider errors