Runtime Event System
The PromptKit runtime event system provides comprehensive observability into pipeline execution through a unified pub/sub architecture. This enables real-time monitoring, debugging, and integration with external observability platforms.
Overview
Section titled “Overview”The event system emits detailed lifecycle events for every stage of pipeline execution, from initial request through final response. Unlike content streaming (which forwards LLM response chunks), events provide metadata about execution progress, performance metrics, and errors.
graph TB subgraph "Event Flow" Pipeline["Pipeline Execution"] Middleware["Middleware Chain"] Provider["LLM Provider"]
Pipeline --> Bus["Event Bus"] Middleware --> Bus Provider --> Bus
Bus --> TUI["Arena TUI"] Bus --> SDK["SDK Listeners"] Bus --> Custom["Custom Handlers"] end
style Bus fill:#f9f,stroke:#333,stroke-width:3pxCore Components
Section titled “Core Components”Event Bus
Section titled “Event Bus”The EventBus is a thread-safe pub/sub system that distributes events to registered listeners:
// Create an event busbus := events.NewEventBus()
// Subscribe to specific event typesbus.Subscribe(events.EventPipelineStarted, func(e events.Event) { log.Printf("Pipeline started: %s", e.ConversationID)})
// Subscribe to all eventsbus.SubscribeAll(func(e events.Event) { metrics.RecordEvent(e.Type)})Key Features:
- Asynchronous Delivery: Events are delivered in goroutines to avoid blocking pipeline execution
- Type-Safe Subscriptions: Subscribe to specific event types or all events
- Panic Recovery: Listener panics are caught to prevent cascading failures
- Thread-Safe: Safe for concurrent use across multiple goroutines
Event Emitter
Section titled “Event Emitter”The Emitter provides convenient methods for emitting events with consistent metadata:
// Create an emitter with context identifiersemitter := events.NewEmitter(bus, runID, sessionID, conversationID)
// Emit typed eventsemitter.EmitPipelineStarted(middlewareCount)emitter.EmitProviderCallCompleted(provider, model, duration, tokens, cost)emitter.EmitMiddlewareFailed(name, index, err, duration)The emitter automatically populates:
Timestamp: When the event occurredRunID,SessionID,ConversationID: Context identifiersType: Specific event typeData: Type-specific payloadMetadata: Additional context (extensible)
Event Types
Section titled “Event Types”Events are organized into several categories:
Pipeline Lifecycle
Section titled “Pipeline Lifecycle”pipeline.started- Pipeline execution beginspipeline.completed- Pipeline execution succeedspipeline.failed- Pipeline execution fails
Middleware Execution
Section titled “Middleware Execution”middleware.started- Middleware begins processingmiddleware.completed- Middleware finishes successfullymiddleware.failed- Middleware encounters an error
Provider Operations
Section titled “Provider Operations”provider.call.started- LLM API call beginsprovider.call.completed- LLM API call succeeds (includes tokens, cost, duration)provider.call.failed- LLM API call fails
Tool Execution
Section titled “Tool Execution”tool.call.started- Tool execution beginstool.call.completed- Tool execution succeedstool.call.failed- Tool execution fails
Validation
Section titled “Validation”validation.started- Validation beginsvalidation.passed- Validation succeedsvalidation.failed- Validation fails (includes violations)
Context & State
Section titled “Context & State”context.built- Message context assembled (includes token counts)context.token_budget_exceeded- Token budget exceededstate.loaded- Conversation state loadedstate.saved- Conversation state saved
Streaming
Section titled “Streaming”stream.interrupted- Stream was interrupted (includes reason)
Conversation Lifecycle
Section titled “Conversation Lifecycle”conversation.started- New conversation started (includes assembled system prompt)
Message Events
Section titled “Message Events”message.created- New message added to conversation (includes role, content, index, tool calls/results)message.updated- Message metadata updated (includes latency, token counts, cost after completion)
Custom Events
Section titled “Custom Events”Middleware can emit custom events for domain-specific observability:
emitter.EmitCustom("middleware.cache.hit", events.CustomEventData{ MiddlewareName: "cache", EventName: "cache_hit", Data: map[string]interface{}{ "cache_key": key, "response_size": size, }, Message: "Response retrieved from cache",})Integration Points
Section titled “Integration Points”Pipeline Integration
Section titled “Pipeline Integration”The pipeline automatically creates an emitter and passes it through the execution context:
// Pipeline creates emitteremitter := events.NewEmitter(p.eventBus, runID, sessionID, conversationID)ctx := &pipeline.ExecutionContext{ EventEmitter: emitter, // ... other fields}
// Emits lifecycle eventsemitter.EmitPipelineStarted(len(p.middleware))// ... execute middlewareemitter.EmitPipelineCompleted(duration, cost, tokens, messageCount)Middleware Integration
Section titled “Middleware Integration”All built-in middleware emit lifecycle events:
func (m *ContextBuilderMiddleware) Execute(ctx *ExecutionContext, next NextFunc) error { if ctx.EventEmitter != nil { ctx.EventEmitter.EmitMiddlewareStarted("context_builder", m.index) }
start := time.Now() err := m.buildContext(ctx) duration := time.Since(start)
if err != nil { if ctx.EventEmitter != nil { ctx.EventEmitter.EmitMiddlewareFailed("context_builder", m.index, err, duration) } return err }
if ctx.EventEmitter != nil { ctx.EventEmitter.EmitMiddlewareCompleted("context_builder", m.index, duration) }
return next(ctx)}SDK Integration
Section titled “SDK Integration”The SDK exposes event listeners on conversations:
// Create manager with event busmanager, _ := sdk.NewConversationManager( sdk.WithProvider(provider), sdk.WithEventBus(eventBus),)
// Add listener to conversationconversation.AddEventListener(func(e *events.Event) { fmt.Printf("[%s] %s\n", e.Type, e.Timestamp)})Arena TUI Integration
Section titled “Arena TUI Integration”The Arena TUI uses an event adapter to convert events to bubbletea messages:
// Subscribe to events and convert to TUI messagesadapter := tui.NewEventAdapter(bus, program)adapter.Start()Use Cases
Section titled “Use Cases”Production Monitoring
Section titled “Production Monitoring”Integrate with observability platforms:
bus.SubscribeAll(func(e events.Event) { // Send to Datadog, New Relic, etc. datadog.SendEvent(map[string]interface{}{ "type": string(e.Type), "timestamp": e.Timestamp, "conversation_id": e.ConversationID, "data": e.Data, })})Cost Tracking
Section titled “Cost Tracking”Monitor LLM costs in real-time:
var totalCost float64var mu sync.Mutex
bus.Subscribe(events.EventProviderCallCompleted, func(e events.Event) { data := e.Data.(events.ProviderCallCompletedData) mu.Lock() totalCost += data.Cost mu.Unlock()
log.Printf("Call cost: $%.4f | Total: $%.4f", data.Cost, totalCost)})Performance Profiling
Section titled “Performance Profiling”Track middleware execution times:
type MiddlewareStats struct { Name string TotalDuration time.Duration CallCount int}
stats := make(map[string]*MiddlewareStats)
bus.Subscribe(events.EventMiddlewareCompleted, func(e events.Event) { data := e.Data.(events.MiddlewareCompletedData)
if _, ok := stats[data.Name]; !ok { stats[data.Name] = &MiddlewareStats{Name: data.Name} }
stats[data.Name].TotalDuration += data.Duration stats[data.Name].CallCount++})Debug Tracing
Section titled “Debug Tracing”Capture execution traces for debugging:
var trace []events.Event
bus.SubscribeAll(func(e events.Event) { trace = append(trace, e)})
// On error, dump traceif err != nil { log.Printf("Execution trace:") for _, e := range trace { log.Printf(" [%s] %s: %+v", e.Timestamp, e.Type, e.Data) }}Design Principles
Section titled “Design Principles”Asynchronous by Default
Section titled “Asynchronous by Default”Events are delivered asynchronously to avoid impacting pipeline performance:
func (eb *EventBus) Publish(event Event) { // Copy listeners while holding lock eb.mu.RLock() listeners := make([]Listener, len(eb.listeners[event.Type])) copy(listeners, eb.listeners[event.Type]) eb.mu.RUnlock()
// Execute asynchronously go func() { for _, listener := range listeners { safeInvoke(listener, event) } }()}Lightweight Payloads
Section titled “Lightweight Payloads”Events contain metadata and metrics, not full message payloads, to minimize memory overhead.
Fail-Safe
Section titled “Fail-Safe”Listener panics are caught to prevent cascading failures:
func safeInvoke(listener Listener, event Event) { defer func() { if r := recover(); r != nil { log.Printf("Event listener panic: %v", r) } }() listener(event)}Opt-In
Section titled “Opt-In”The event system is optional - if no EventEmitter is provided, no events are emitted (zero overhead).
Performance Considerations
Section titled “Performance Considerations”- Memory: Events are ephemeral - not persisted by default
- Concurrency: Event bus uses read-write locks for optimal performance
- Overhead: Minimal when no listeners registered (~0ns per event)
- Throughput: Tested at >100k events/sec on modern hardware
Future Enhancements
Section titled “Future Enhancements”Potential future additions include:
- Event Filtering: Predicate-based filtering at bus level
- Event Replay: Capture and replay for debugging
- Event Persistence: Optional logging to structured storage
- Event Aggregation: Built-in metrics aggregation
- Remote Streaming: OpenTelemetry, StatsD integration
- Event-driven Automation: Circuit breakers, rate limiting
References
Section titled “References”- Implementation:
runtime/events/package - Pipeline Integration:
runtime/pipeline/pipeline.go - Middleware Examples:
runtime/pipeline/middleware/ - SDK Integration:
sdk/conversation.go - Example Program:
sdk/examples/events/