The PromptKit runtime event system provides comprehensive observability into pipeline execution through a unified pub/sub architecture. This enables real-time monitoring, debugging, and integration with external observability platforms.

Overview

The event system emits detailed lifecycle events for every stage of pipeline execution, from initial request through final response. Unlike content streaming (which forwards LLM response chunks), events provide metadata about execution progress, performance metrics, and errors.

graph TB
    subgraph "Event Flow"
        Pipeline["Pipeline Execution"]
        Middleware["Middleware Chain"]
        Provider["LLM Provider"]
        
        Pipeline --> Bus["Event Bus"]
        Middleware --> Bus
        Provider --> Bus
        
        Bus --> TUI["Arena TUI"]
        Bus --> SDK["SDK Listeners"]
        Bus --> Custom["Custom Handlers"]
    end
    
    style Bus fill:#f9f,stroke:#333,stroke-width:3px

Core Components

Event Bus

The EventBus is a thread-safe pub/sub system that distributes events to registered listeners:

// Create an event bus
bus := events.NewEventBus()

// Subscribe to specific event types
bus.Subscribe(events.EventPipelineStarted, func(e events.Event) {
    log.Printf("Pipeline started: %s", e.ConversationID)
})

// Subscribe to all events
bus.SubscribeAll(func(e events.Event) {
    metrics.RecordEvent(e.Type)
})

Key Features:

Event Emitter

The Emitter provides convenient methods for emitting events with consistent metadata:

// Create an emitter with context identifiers
emitter := events.NewEmitter(bus, runID, sessionID, conversationID)

// Emit typed events
emitter.EmitPipelineStarted(middlewareCount)
emitter.EmitProviderCallCompleted(provider, model, duration, tokens, cost)
emitter.EmitMiddlewareFailed(name, index, err, duration)

The emitter automatically populates:

Event Types

Events are organized into several categories:

Pipeline Lifecycle

Middleware Execution

Provider Operations

Tool Execution

Validation

Context & State

Streaming

Conversation Lifecycle

Message Events

Custom Events

Middleware can emit custom events for domain-specific observability:

emitter.EmitCustom("middleware.cache.hit", events.CustomEventData{
    MiddlewareName: "cache",
    EventName: "cache_hit",
    Data: map[string]interface{}{
        "cache_key": key,
        "response_size": size,
    },
    Message: "Response retrieved from cache",
})

Integration Points

Pipeline Integration

The pipeline automatically creates an emitter and passes it through the execution context:

// Pipeline creates emitter
emitter := events.NewEmitter(p.eventBus, runID, sessionID, conversationID)
ctx := &pipeline.ExecutionContext{
    EventEmitter: emitter,
    // ... other fields
}

// Emits lifecycle events
emitter.EmitPipelineStarted(len(p.middleware))
// ... execute middleware
emitter.EmitPipelineCompleted(duration, cost, tokens, messageCount)

Middleware Integration

All built-in middleware emit lifecycle events:

func (m *ContextBuilderMiddleware) Execute(ctx *ExecutionContext, next NextFunc) error {
    if ctx.EventEmitter != nil {
        ctx.EventEmitter.EmitMiddlewareStarted("context_builder", m.index)
    }
    
    start := time.Now()
    err := m.buildContext(ctx)
    duration := time.Since(start)
    
    if err != nil {
        if ctx.EventEmitter != nil {
            ctx.EventEmitter.EmitMiddlewareFailed("context_builder", m.index, err, duration)
        }
        return err
    }
    
    if ctx.EventEmitter != nil {
        ctx.EventEmitter.EmitMiddlewareCompleted("context_builder", m.index, duration)
    }
    
    return next(ctx)
}

SDK Integration

The SDK exposes event listeners on conversations:

// Create manager with event bus
manager, _ := sdk.NewConversationManager(
    sdk.WithProvider(provider),
    sdk.WithEventBus(eventBus),
)

// Add listener to conversation
conversation.AddEventListener(func(e *events.Event) {
    fmt.Printf("[%s] %s\n", e.Type, e.Timestamp)
})

Arena TUI Integration

The Arena TUI uses an event adapter to convert events to bubbletea messages:

// Subscribe to events and convert to TUI messages
adapter := tui.NewEventAdapter(bus, program)
adapter.Start()

Use Cases

Production Monitoring

Integrate with observability platforms:

bus.SubscribeAll(func(e events.Event) {
    // Send to Datadog, New Relic, etc.
    datadog.SendEvent(map[string]interface{}{
        "type": string(e.Type),
        "timestamp": e.Timestamp,
        "conversation_id": e.ConversationID,
        "data": e.Data,
    })
})

Cost Tracking

Monitor LLM costs in real-time:

var totalCost float64
var mu sync.Mutex

bus.Subscribe(events.EventProviderCallCompleted, func(e events.Event) {
    data := e.Data.(events.ProviderCallCompletedData)
    mu.Lock()
    totalCost += data.Cost
    mu.Unlock()
    
    log.Printf("Call cost: $%.4f | Total: $%.4f", data.Cost, totalCost)
})

Performance Profiling

Track middleware execution times:

type MiddlewareStats struct {
    Name string
    TotalDuration time.Duration
    CallCount int
}

stats := make(map[string]*MiddlewareStats)

bus.Subscribe(events.EventMiddlewareCompleted, func(e events.Event) {
    data := e.Data.(events.MiddlewareCompletedData)
    
    if _, ok := stats[data.Name]; !ok {
        stats[data.Name] = &MiddlewareStats{Name: data.Name}
    }
    
    stats[data.Name].TotalDuration += data.Duration
    stats[data.Name].CallCount++
})

Debug Tracing

Capture execution traces for debugging:

var trace []events.Event

bus.SubscribeAll(func(e events.Event) {
    trace = append(trace, e)
})

// On error, dump trace
if err != nil {
    log.Printf("Execution trace:")
    for _, e := range trace {
        log.Printf("  [%s] %s: %+v", e.Timestamp, e.Type, e.Data)
    }
}

Design Principles

Asynchronous by Default

Events are delivered asynchronously to avoid impacting pipeline performance:

func (eb *EventBus) Publish(event Event) {
    // Copy listeners while holding lock
    eb.mu.RLock()
    listeners := make([]Listener, len(eb.listeners[event.Type]))
    copy(listeners, eb.listeners[event.Type])
    eb.mu.RUnlock()
    
    // Execute asynchronously
    go func() {
        for _, listener := range listeners {
            safeInvoke(listener, event)
        }
    }()
}

Lightweight Payloads

Events contain metadata and metrics, not full message payloads, to minimize memory overhead.

Fail-Safe

Listener panics are caught to prevent cascading failures:

func safeInvoke(listener Listener, event Event) {
    defer func() {
        if r := recover(); r != nil {
            log.Printf("Event listener panic: %v", r)
        }
    }()
    listener(event)
}

Opt-In

The event system is optional - if no EventEmitter is provided, no events are emitted (zero overhead).

Performance Considerations

Future Enhancements

Potential future additions include:

  1. Event Filtering: Predicate-based filtering at bus level
  2. Event Replay: Capture and replay for debugging
  3. Event Persistence: Optional logging to structured storage
  4. Event Aggregation: Built-in metrics aggregation
  5. Remote Streaming: OpenTelemetry, StatsD integration
  6. Event-driven Automation: Circuit breakers, rate limiting

References