Monitor Costs
Track and optimize LLM API costs.
Monitor token usage and costs for budget control.
Quick Start
Section titled “Quick Start”result, err := pipe.Execute(ctx, "user", "Your message")if err != nil { log.Fatal(err)}
// Check costlog.Printf("Tokens: %d", result.Response.Usage.TotalTokens)log.Printf("Cost: $%.6f", result.Cost.TotalCost)Cost Tracking
Section titled “Cost Tracking”Basic Cost Info
Section titled “Basic Cost Info”result, _ := pipe.Execute(ctx, "user", "Explain AI")
// Token usageusage := result.Response.Usagelog.Printf("Input tokens: %d", usage.PromptTokens)log.Printf("Output tokens: %d", usage.CompletionTokens)log.Printf("Total tokens: %d", usage.TotalTokens)
// Cost breakdowncost := result.Costlog.Printf("Input cost: $%.6f", cost.InputCost)log.Printf("Output cost: $%.6f", cost.OutputCost)log.Printf("Total cost: $%.6f", cost.TotalCost)Per-Session Tracking
Section titled “Per-Session Tracking”type SessionCosts struct { mu sync.Mutex costs map[string]float64 tokens map[string]int requestCounts map[string]int}
func (sc *SessionCosts) Record(sessionID string, result *pipeline.PipelineResult) { sc.mu.Lock() defer sc.mu.Unlock()
sc.costs[sessionID] += result.Cost.TotalCost sc.tokens[sessionID] += result.Response.Usage.TotalTokens sc.requestCounts[sessionID]++}
func (sc *SessionCosts) GetStats(sessionID string) (float64, int, int) { sc.mu.Lock() defer sc.mu.Unlock()
return sc.costs[sessionID], sc.tokens[sessionID], sc.requestCounts[sessionID]}
// Usagetracker := &SessionCosts{ costs: make(map[string]float64), tokens: make(map[string]int), requestCounts: make(map[string]int),}
result, _ := pipe.ExecuteWithContext(ctx, sessionID, "user", "Hello")tracker.Record(sessionID, result)
cost, tokens, count := tracker.GetStats(sessionID)log.Printf("Session %s: $%.6f, %d tokens, %d requests", sessionID, cost, tokens, count)Cumulative Cost Tracking
Section titled “Cumulative Cost Tracking”import "sync/atomic"
type CostTracker struct { totalCost int64 // Store as cents (multiply by 100) totalTokens int64 totalReqs int64}
func (ct *CostTracker) Record(result *pipeline.PipelineResult) { costCents := int64(result.Cost.TotalCost * 100) atomic.AddInt64(&ct.totalCost, costCents) atomic.AddInt64(&ct.totalTokens, int64(result.Response.Usage.TotalTokens)) atomic.AddInt64(&ct.totalReqs, 1)}
func (ct *CostTracker) Report() { cost := float64(atomic.LoadInt64(&ct.totalCost)) / 100 tokens := atomic.LoadInt64(&ct.totalTokens) reqs := atomic.LoadInt64(&ct.totalReqs)
log.Printf("Total cost: $%.2f", cost) log.Printf("Total tokens: %d", tokens) log.Printf("Total requests: %d", reqs) log.Printf("Avg cost/request: $%.4f", cost/float64(reqs)) log.Printf("Avg tokens/request: %.0f", float64(tokens)/float64(reqs))}Budget Management
Section titled “Budget Management”Budget Limits
Section titled “Budget Limits”type BudgetManager struct { limit float64 currentCost float64 mu sync.Mutex}
func (bm *BudgetManager) CanExecute() bool { bm.mu.Lock() defer bm.mu.Unlock() return bm.currentCost < bm.limit}
func (bm *BudgetManager) Record(cost float64) error { bm.mu.Lock() defer bm.mu.Unlock()
bm.currentCost += cost if bm.currentCost >= bm.limit { return fmt.Errorf("budget exceeded: $%.2f / $%.2f", bm.currentCost, bm.limit) } return nil}
// Usagebudget := &BudgetManager{limit: 10.0} // $10 limit
if !budget.CanExecute() { log.Fatal("Budget exceeded")}
result, _ := pipe.Execute(ctx, "user", "Your message")if err := budget.Record(result.Cost.TotalCost); err != nil { log.Printf("Warning: %v", err)}Per-User Budgets
Section titled “Per-User Budgets”type UserBudgets struct { limits map[string]float64 costs map[string]float64 mu sync.RWMutex}
func (ub *UserBudgets) SetLimit(userID string, limit float64) { ub.mu.Lock() defer ub.mu.Unlock() ub.limits[userID] = limit}
func (ub *UserBudgets) CanExecute(userID string) bool { ub.mu.RLock() defer ub.mu.RUnlock()
limit, hasLimit := ub.limits[userID] if !hasLimit { return true // No limit set }
cost := ub.costs[userID] return cost < limit}
func (ub *UserBudgets) Record(userID string, cost float64) error { ub.mu.Lock() defer ub.mu.Unlock()
ub.costs[userID] += cost
if limit, hasLimit := ub.limits[userID]; hasLimit { if ub.costs[userID] >= limit { return fmt.Errorf("user %s exceeded budget: $%.2f / $%.2f", userID, ub.costs[userID], limit) } }
return nil}Cost Optimization
Section titled “Cost Optimization”Model Selection by Cost
Section titled “Model Selection by Cost”func selectCostEffectiveModel(complexity string) (string, string) { switch complexity { case "simple": // Cheapest: GPT-4o-mini return "gpt-4o-mini", "openai" case "medium": // Balanced: Claude Haiku return "claude-3-5-haiku-20241022", "claude" case "complex": // Best quality: GPT-4o return "gpt-4o", "openai" default: return "gpt-4o-mini", "openai" }}
// Usagemodel, providerType := selectCostEffectiveModel("simple")provider := createProvider(providerType, model)Token Limit Optimization
Section titled “Token Limit Optimization”func estimateInputTokens(messages []types.Message) int { // Rough estimate: 4 chars per token total := 0 for _, msg := range messages { total += len(msg.Content) / 4 } return total}
func optimizeMaxTokens(messages []types.Message, budget float64, pricePerToken float64) int { inputTokens := estimateInputTokens(messages) inputCost := float64(inputTokens) * pricePerToken
// Remaining budget for output remainingBudget := budget - inputCost if remainingBudget <= 0 { return 100 // Minimum }
// Calculate max output tokens maxTokens := int(remainingBudget / (pricePerToken * 3)) // Output costs ~3x more
// Cap at reasonable limits if maxTokens > 2000 { maxTokens = 2000 } if maxTokens < 100 { maxTokens = 100 }
return maxTokens}Caching Responses
Section titled “Caching Responses”import "crypto/sha256"
type ResponseCache struct { cache map[string]*pipeline.PipelineResult mu sync.RWMutex}
func (rc *ResponseCache) Key(prompt string) string { hash := sha256.Sum256([]byte(prompt)) return fmt.Sprintf("%x", hash[:8])}
func (rc *ResponseCache) Get(prompt string) (*pipeline.PipelineResult, bool) { rc.mu.RLock() defer rc.mu.RUnlock()
result, exists := rc.cache[rc.Key(prompt)] return result, exists}
func (rc *ResponseCache) Set(prompt string, result *pipeline.PipelineResult) { rc.mu.Lock() defer rc.mu.Unlock()
rc.cache[rc.Key(prompt)] = result}
// Usagecache := &ResponseCache{cache: make(map[string]*pipeline.PipelineResult)}
prompt := "What is AI?"if cached, exists := cache.Get(prompt); exists { log.Println("Using cached response (no cost)") return cached, nil}
result, _ := pipe.Execute(ctx, "user", prompt)cache.Set(prompt, result)log.Printf("Fresh response cost: $%.6f", result.Cost.TotalCost)Reporting
Section titled “Reporting”Daily Cost Summary
Section titled “Daily Cost Summary”type DailyCosts struct { costs map[string]float64 // date -> cost mu sync.Mutex}
func (dc *DailyCosts) Record(cost float64) { dc.mu.Lock() defer dc.mu.Unlock()
date := time.Now().Format("2006-01-02") dc.costs[date] += cost}
func (dc *DailyCosts) Report() { dc.mu.Lock() defer dc.mu.Unlock()
var dates []string for date := range dc.costs { dates = append(dates, date) } sort.Strings(dates)
log.Println("Daily Costs:") total := 0.0 for _, date := range dates { cost := dc.costs[date] total += cost log.Printf(" %s: $%.2f", date, cost) } log.Printf("Total: $%.2f", total)}Cost Alerts
Section titled “Cost Alerts”type CostAlerts struct { threshold float64 current float64 lastAlert time.Time mu sync.Mutex}
func (ca *CostAlerts) Record(cost float64) { ca.mu.Lock() defer ca.mu.Unlock()
ca.current += cost
// Alert every hour if over threshold if ca.current >= ca.threshold && time.Since(ca.lastAlert) >= time.Hour { ca.sendAlert() ca.lastAlert = time.Now() }}
func (ca *CostAlerts) sendAlert() { log.Printf("ALERT: Cost threshold exceeded: $%.2f / $%.2f", ca.current, ca.threshold) // Send email, Slack message, etc.}Complete Example
Section titled “Complete Example”package main
import ( "context" "log" "sync"
"github.com/AltairaLabs/PromptKit/runtime/pipeline" "github.com/AltairaLabs/PromptKit/runtime/pipeline/middleware" "github.com/AltairaLabs/PromptKit/runtime/providers/openai")
type CostMonitor struct { totalCost float64 totalTokens int requests int mu sync.Mutex}
func (cm *CostMonitor) Record(result *pipeline.PipelineResult) { cm.mu.Lock() defer cm.mu.Unlock()
cm.totalCost += result.Cost.TotalCost cm.totalTokens += result.Response.Usage.TotalTokens cm.requests++
log.Printf("Request cost: $%.6f (%d tokens)", result.Cost.TotalCost, result.Response.Usage.TotalTokens)}
func (cm *CostMonitor) Report() { cm.mu.Lock() defer cm.mu.Unlock()
log.Printf("\n=== Cost Summary ===") log.Printf("Total requests: %d", cm.requests) log.Printf("Total tokens: %d", cm.totalTokens) log.Printf("Total cost: $%.4f", cm.totalCost) log.Printf("Avg cost/request: $%.6f", cm.totalCost/float64(cm.requests)) log.Printf("Avg tokens/request: %.1f", float64(cm.totalTokens)/float64(cm.requests))}
func main() { monitor := &CostMonitor{}
// Create provider provider := openai.NewOpenAIProvider( "openai", "gpt-4o-mini", // Cost-effective model "", openai.DefaultProviderDefaults(), false, ) defer provider.Close()
// Build pipeline pipe := pipeline.NewPipeline( middleware.ProviderMiddleware(provider, nil, nil, &middleware.ProviderMiddlewareConfig{ MaxTokens: 500, // Limit output tokens Temperature: 0.7, }), ) defer pipe.Shutdown(context.Background())
ctx := context.Background()
// Execute requests prompts := []string{ "What is AI?", "Explain machine learning", "What is deep learning?", }
for _, prompt := range prompts { result, err := pipe.Execute(ctx, "user", prompt) if err != nil { log.Printf("Error: %v", err) continue }
monitor.Record(result) }
// Print summary monitor.Report()}Troubleshooting
Section titled “Troubleshooting”Issue: Higher Costs Than Expected
Section titled “Issue: Higher Costs Than Expected”Problem: Costs exceeding budget.
Solutions:
-
Check model pricing:
// GPT-4o is expensive, use gpt-4o-miniprovider := openai.NewOpenAIProvider("openai", "gpt-4o-mini", ...) -
Limit max tokens:
config := &middleware.ProviderMiddlewareConfig{MaxTokens: 500, // Reduce from default} -
Trim conversation history:
if len(messages) > 10 {messages = messages[len(messages)-10:]}
Issue: Cost Tracking Inaccurate
Section titled “Issue: Cost Tracking Inaccurate”Problem: Reported costs don’t match bills.
Solutions:
-
Verify pricing is current:
// Check provider's pricing page// Update if prices changed -
Include all cost components:
total := result.Cost.InputCost + result.Cost.OutputCost// Some providers may have additional fees -
Check for tool costs:
// Tool calls may incur additional token costs
Best Practices
Section titled “Best Practices”-
Always monitor costs:
log.Printf("Cost: $%.6f", result.Cost.TotalCost) -
Set budget limits:
budget := &BudgetManager{limit: 10.0} -
Use cost-effective models:
// Simple tasks: gpt-4o-mini// Complex tasks: gpt-4o only when needed -
Limit max tokens:
config.MaxTokens = 500 // Reasonable default -
Cache repeated requests:
if cached, exists := cache.Get(prompt); exists {return cached, nil} -
Track per-user costs:
budgets.Record(userID, result.Cost.TotalCost) -
Set up alerts:
if totalCost > threshold {sendAlert()} -
Generate regular reports:
go func() {ticker := time.NewTicker(24 * time.Hour)for range ticker.C {monitor.Report()}}()
Next Steps
Section titled “Next Steps”- Setup Providers - Model selection
- Configure Pipeline - Token limits
- Handle Errors - Cost-aware retries
See Also
Section titled “See Also”- Providers Reference - Pricing tables
- Pipeline Reference - Cost structures