Skip to content

Handle Errors

Implement robust error handling for production pipelines.

Handle errors gracefully and implement retry logic.

result, err := pipe.Execute(ctx, "user", "Your message")
if err != nil {
// Check error type
if isRateLimitError(err) {
log.Println("Rate limited, retry later")
} else if isTimeoutError(err) {
log.Println("Request timed out")
} else {
log.Printf("Error: %v", err)
}
}
import "context"
// Set timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
result, err := pipe.Execute(ctx, "user", "Your message")
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
log.Println("Request timed out")
// Retry with longer timeout or simplify request
}
}
result, err := provider.Complete(ctx, messages, config)
if err != nil {
if strings.Contains(err.Error(), "rate_limit") {
log.Println("Rate limited")
// Wait and retry
time.Sleep(5 * time.Second)
result, err = provider.Complete(ctx, messages, config)
}
}
result, err := provider.Complete(ctx, messages, config)
if err != nil {
if strings.Contains(err.Error(), "authentication") ||
strings.Contains(err.Error(), "invalid_api_key") {
log.Fatal("Invalid API key - check environment variable")
}
}
result, err := pipe.Execute(ctx, "user", "Use the search tool")
if err != nil {
if strings.Contains(err.Error(), "tool") {
log.Printf("Tool error: %v", err)
// Check tool configuration
// Verify tool is registered
// Test tool separately
}
}
func executeWithRetry(pipe *pipeline.Pipeline, ctx context.Context, role, content string, maxRetries int) (*pipeline.PipelineResult, error) {
var result *pipeline.PipelineResult
var err error
for i := 0; i < maxRetries; i++ {
result, err = pipe.Execute(ctx, role, content)
if err == nil {
return result, nil
}
// Check if retryable
if !isRetryableError(err) {
return nil, err
}
// Exponential backoff
backoff := time.Duration(1<<uint(i)) * time.Second
log.Printf("Retry %d/%d after %v: %v", i+1, maxRetries, backoff, err)
time.Sleep(backoff)
}
return nil, fmt.Errorf("failed after %d retries: %w", maxRetries, err)
}
func isRetryableError(err error) bool {
errStr := err.Error()
return strings.Contains(errStr, "rate_limit") ||
strings.Contains(errStr, "timeout") ||
strings.Contains(errStr, "503") ||
strings.Contains(errStr, "502")
}
func executeWithFixedRetry(pipe *pipeline.Pipeline, ctx context.Context, role, content string, maxRetries int, delay time.Duration) (*pipeline.PipelineResult, error) {
for i := 0; i < maxRetries; i++ {
result, err := pipe.Execute(ctx, role, content)
if err == nil {
return result, nil
}
if i < maxRetries-1 {
log.Printf("Attempt %d failed, retrying in %v: %v", i+1, delay, err)
time.Sleep(delay)
}
}
return nil, fmt.Errorf("failed after %d retries", maxRetries)
}
type ProviderPool struct {
providers []types.Provider
current int
}
func (p *ProviderPool) Execute(ctx context.Context, messages []types.Message, config *types.ProviderConfig) (*types.ProviderResponse, error) {
var lastErr error
// Try each provider
for i := 0; i < len(p.providers); i++ {
provider := p.providers[(p.current+i)%len(p.providers)]
result, err := provider.Complete(ctx, messages, config)
if err == nil {
// Success - update current
p.current = (p.current + i) % len(p.providers)
return result, nil
}
lastErr = err
log.Printf("Provider %d failed: %v", i, err)
}
return nil, fmt.Errorf("all providers failed: %w", lastErr)
}
type CircuitBreaker struct {
failures int
maxFailures int
timeout time.Duration
lastFailure time.Time
}
func (cb *CircuitBreaker) Call(fn func() error) error {
// Check if circuit is open
if cb.failures >= cb.maxFailures {
if time.Since(cb.lastFailure) < cb.timeout {
return fmt.Errorf("circuit breaker open")
}
// Reset after timeout
cb.failures = 0
}
// Call function
err := fn()
if err != nil {
cb.failures++
cb.lastFailure = time.Now()
return err
}
// Success - reset
cb.failures = 0
return nil
}
// Usage
cb := &CircuitBreaker{maxFailures: 3, timeout: 30 * time.Second}
err := cb.Call(func() error {
_, err := provider.Complete(ctx, messages, config)
return err
})
result, err := pipe.Execute(ctx, "user", "Generate a long document")
if err != nil {
if strings.Contains(err.Error(), "timeout") && result != nil {
// Use partial result
log.Println("Request timed out, using partial response")
log.Printf("Partial: %s", result.Response.Content)
return result, nil
}
return nil, err
}
stream, err := pipe.ExecuteStream(ctx, "user", "Long request")
if err != nil {
return err
}
defer stream.Close()
var accumulated string
for {
chunk, err := stream.Next()
if err == io.EOF {
break
}
if err != nil {
log.Printf("Stream error: %v", err)
// Use accumulated content
if len(accumulated) > 0 {
log.Printf("Recovered partial content: %s", accumulated)
return nil
}
return err
}
accumulated += chunk.Content
}
import "github.com/AltairaLabs/PromptKit/runtime/logger"
func executeWithLogging(pipe *pipeline.Pipeline, ctx context.Context, sessionID, role, content string) (*pipeline.PipelineResult, error) {
start := time.Now()
result, err := pipe.ExecuteWithContext(ctx, sessionID, role, content)
duration := time.Since(start)
if err != nil {
logger.Error("Pipeline execution failed",
"session_id", sessionID,
"role", role,
"duration", duration,
"error", err,
)
return nil, err
}
logger.Info("Pipeline execution succeeded",
"session_id", sessionID,
"duration", duration,
"tokens", result.Response.Usage.TotalTokens,
"cost", result.Cost.TotalCost,
)
return result, nil
}
type ErrorMetrics struct {
timeouts int64
rateLimits int64
authErrors int64
toolErrors int64
otherErrors int64
totalRequests int64
successRequests int64
}
func (m *ErrorMetrics) RecordError(err error) {
atomic.AddInt64(&m.totalRequests, 1)
if err == nil {
atomic.AddInt64(&m.successRequests, 1)
return
}
errStr := err.Error()
switch {
case strings.Contains(errStr, "timeout"):
atomic.AddInt64(&m.timeouts, 1)
case strings.Contains(errStr, "rate_limit"):
atomic.AddInt64(&m.rateLimits, 1)
case strings.Contains(errStr, "authentication"):
atomic.AddInt64(&m.authErrors, 1)
case strings.Contains(errStr, "tool"):
atomic.AddInt64(&m.toolErrors, 1)
default:
atomic.AddInt64(&m.otherErrors, 1)
}
}
func (m *ErrorMetrics) Report() {
total := atomic.LoadInt64(&m.totalRequests)
success := atomic.LoadInt64(&m.successRequests)
successRate := float64(success) / float64(total) * 100
log.Printf("Error Metrics:")
log.Printf(" Total: %d", total)
log.Printf(" Success: %d (%.2f%%)", success, successRate)
log.Printf(" Timeouts: %d", atomic.LoadInt64(&m.timeouts))
log.Printf(" Rate Limits: %d", atomic.LoadInt64(&m.rateLimits))
log.Printf(" Auth Errors: %d", atomic.LoadInt64(&m.authErrors))
log.Printf(" Tool Errors: %d", atomic.LoadInt64(&m.toolErrors))
log.Printf(" Other: %d", atomic.LoadInt64(&m.otherErrors))
}
package main
import (
"context"
"fmt"
"log"
"strings"
"time"
"github.com/AltairaLabs/PromptKit/runtime/pipeline"
"github.com/AltairaLabs/PromptKit/runtime/pipeline/middleware"
"github.com/AltairaLabs/PromptKit/runtime/providers/openai"
"github.com/AltairaLabs/PromptKit/runtime/providers/anthropic"
)
func main() {
// Create providers
openaiProvider := openai.NewOpenAIProvider(
"openai", "gpt-4o-mini", "", openai.DefaultProviderDefaults(), false,
)
defer openaiProvider.Close()
claudeProvider := anthropic.NewAnthropicProvider(
"claude", "claude-3-5-haiku-20241022", "", anthropic.DefaultProviderDefaults(), false,
)
defer claudeProvider.Close()
// Provider pool with fallback
providers := []types.Provider{openaiProvider, claudeProvider}
config := &middleware.ProviderMiddlewareConfig{
MaxTokens: 1000,
Temperature: 0.7,
}
// Try each provider with retry
var result *pipeline.PipelineResult
var err error
for _, provider := range providers {
pipe := pipeline.NewPipeline(
middleware.ProviderMiddleware(provider, nil, nil, config),
)
defer pipe.Shutdown(context.Background())
// Retry with backoff
result, err = executeWithRetry(pipe, context.Background(), "user", "Hello!", 3)
if err == nil {
break
}
log.Printf("Provider %s failed: %v", provider.GetProviderName(), err)
}
if err != nil {
log.Fatal("All providers failed")
}
log.Printf("Success: %s", result.Response.Content)
}
func executeWithRetry(pipe *pipeline.Pipeline, ctx context.Context, role, content string, maxRetries int) (*pipeline.PipelineResult, error) {
for i := 0; i < maxRetries; i++ {
result, err := pipe.Execute(ctx, role, content)
if err == nil {
return result, nil
}
if !isRetryableError(err) {
return nil, err
}
if i < maxRetries-1 {
backoff := time.Duration(1<<uint(i)) * time.Second
log.Printf("Retry %d/%d after %v", i+1, maxRetries, backoff)
time.Sleep(backoff)
}
}
return nil, fmt.Errorf("failed after %d retries", maxRetries)
}
func isRetryableError(err error) bool {
errStr := err.Error()
return strings.Contains(errStr, "rate_limit") ||
strings.Contains(errStr, "timeout") ||
strings.Contains(errStr, "503")
}

Problem: Requests consistently timing out.

Solutions:

  1. Increase timeout:

    ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
    defer cancel()
  2. Reduce response size:

    config := &middleware.ProviderMiddlewareConfig{
    MaxTokens: 500, // Smaller response
    }
  3. Simplify prompt:

    // Break complex request into smaller parts

Problem: Frequent rate limit errors.

Solutions:

  1. Add retry with backoff (see examples above)

  2. Reduce concurrency:

    config := &pipeline.RuntimeConfig{
    MaxConcurrentRequests: 1, // Sequential
    }
  3. Use rate limiter:

    limiter := rate.NewLimiter(rate.Limit(10), 1) // 10 req/sec
    limiter.Wait(ctx)
    result, err := pipe.Execute(ctx, "user", content)

Problem: Memory grows over time.

Solutions:

  1. Always close resources:

    defer provider.Close()
    defer pipe.Shutdown(ctx)
  2. Clean up state:

    // Limit message history
    if len(messages) > 50 {
    messages = messages[len(messages)-50:]
    }
  1. Set timeouts on all requests:

    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
  2. Implement retry logic for transient errors:

    result, err := executeWithRetry(pipe, ctx, "user", content, 3)
  3. Use multiple providers for redundancy:

    providers := []types.Provider{openai, claude, gemini}
  4. Log errors with context:

    logger.Error("Request failed", "session", sessionID, "error", err)
  5. Monitor error rates:

    metrics.RecordError(err)
    if metrics.ErrorRate() > 0.1 {
    alert("High error rate")
    }
  6. Handle partial results:

    if err != nil && result != nil {
    // Use partial content
    }
  7. Fail fast on non-retryable errors:

    if strings.Contains(err.Error(), "authentication") {
    log.Fatal("Invalid API key")
    }