Manage State
Persist conversation state across pipeline executions.
Store and retrieve conversation state for continuity.
Quick Start
Section titled “Quick Start”Step 1: Create State Store
Section titled “Step 1: Create State Store”import "github.com/AltairaLabs/PromptKit/runtime/statestore"
// In-memory store (dev/testing)store := statestore.NewInMemoryStateStore()
// Redis store (production)store, err := statestore.NewRedisStateStore("localhost:6379", "", 0)if err != nil { log.Fatal(err)}defer store.Close()Step 2: Add State Middleware
Section titled “Step 2: Add State Middleware”import "github.com/AltairaLabs/PromptKit/runtime/pipeline/middleware"
pipe := pipeline.NewPipeline( middleware.StateMiddleware(store), middleware.ProviderMiddleware(provider, toolRegistry, policy, config),)Step 3: Use Session IDs
Section titled “Step 3: Use Session IDs”ctx := context.Background()sessionID := "user-123-conversation"
// First message - state auto-savedresult, err := pipe.ExecuteWithContext(ctx, sessionID, "user", "Hi, I'm Alice")
// Subsequent messages - state auto-loadedresult2, err := pipe.ExecuteWithContext(ctx, sessionID, "user", "What's my name?")// Response: "Your name is Alice"In-Memory State Store
Section titled “In-Memory State Store”Basic Setup
Section titled “Basic Setup”store := statestore.NewInMemoryStateStore()Use Cases:
- Development
- Testing
- Single-instance applications
- Non-persistent sessions
Save and Load
Section titled “Save and Load”// Save state manuallysessionID := "session-1"messages := []types.Message{ {Role: "user", Content: "Hello"}, {Role: "assistant", Content: "Hi there!"},}
err := store.Save(ctx, sessionID, messages)if err != nil { log.Fatal(err)}
// Load stateloaded, err := store.Load(ctx, sessionID)if err != nil { log.Fatal(err)}
for _, msg := range loaded { log.Printf("%s: %s\n", msg.Role, msg.Content)}Delete State
Section titled “Delete State”err := store.Delete(ctx, sessionID)if err != nil { log.Fatal(err)}Redis State Store
Section titled “Redis State Store”Basic Setup
Section titled “Basic Setup”store, err := statestore.NewRedisStateStore( "localhost:6379", // address "", // password (empty for no auth) 0, // database)if err != nil { log.Fatal(err)}defer store.Close()With Authentication
Section titled “With Authentication”store, err := statestore.NewRedisStateStore( os.Getenv("REDIS_HOST"), os.Getenv("REDIS_PASSWORD"), 0,)Connection Pooling
Section titled “Connection Pooling”// Redis client auto-manages connection pool// Default settings work for most cases// For high load, tune Redis server configTTL and Expiration
Section titled “TTL and Expiration”// Redis keys auto-expire based on server TTL settings// Set global TTL in Redis config:// config set maxmemory-policy volatile-lru
// Or use EXPIRE in custom implementation:redisClient.Expire(ctx, key, 24*time.Hour)Complete Examples
Section titled “Complete Examples”Multi-User Chat
Section titled “Multi-User Chat”package main
import ( "context" "log"
"github.com/AltairaLabs/PromptKit/runtime/pipeline" "github.com/AltairaLabs/PromptKit/runtime/pipeline/middleware" "github.com/AltairaLabs/PromptKit/runtime/providers/openai" "github.com/AltairaLabs/PromptKit/runtime/statestore")
func main() { // Create Redis store store, err := statestore.NewRedisStateStore("localhost:6379", "", 0) if err != nil { log.Fatal(err) } defer store.Close()
// Create provider provider := openai.NewOpenAIProvider( "openai", "gpt-4o-mini", "", openai.DefaultProviderDefaults(), false, ) defer provider.Close()
// Build pipeline with state pipe := pipeline.NewPipeline( middleware.StateMiddleware(store), middleware.ProviderMiddleware(provider, nil, nil, &middleware.ProviderMiddlewareConfig{ MaxTokens: 1000, Temperature: 0.7, }), ) defer pipe.Shutdown(context.Background())
ctx := context.Background()
// User 1 conversation userID1 := "user-alice" pipe.ExecuteWithContext(ctx, userID1, "user", "My favorite color is blue") result1, _ := pipe.ExecuteWithContext(ctx, userID1, "user", "What's my favorite color?") log.Printf("User 1: %s\n", result1.Response.Content)
// User 2 conversation (separate state) userID2 := "user-bob" pipe.ExecuteWithContext(ctx, userID2, "user", "I love pizza") result2, _ := pipe.ExecuteWithContext(ctx, userID2, "user", "What food do I love?") log.Printf("User 2: %s\n", result2.Response.Content)}Session Cleanup
Section titled “Session Cleanup”package main
import ( "context" "log" "time"
"github.com/AltairaLabs/PromptKit/runtime/statestore")
func cleanupOldSessions(store statestore.StateStore, maxAge time.Duration) { // For in-memory store if memStore, ok := store.(*statestore.InMemoryStateStore); ok { // Custom cleanup logic // InMemoryStateStore doesn't expose session listing // Implement custom session tracking if needed }
// For Redis store if redisStore, ok := store.(*statestore.RedisStateStore); ok { // Redis TTL handles expiration automatically // Or manually delete specific sessions: ctx := context.Background() sessionIDs := []string{"old-session-1", "old-session-2"}
for _, id := range sessionIDs { if err := redisStore.Delete(ctx, id); err != nil { log.Printf("Failed to delete session %s: %v", id, err) } } }}State Management Patterns
Section titled “State Management Patterns”Session ID Strategy
Section titled “Session ID Strategy”// User-based sessionssessionID := fmt.Sprintf("user-%s", userID)
// Conversation-based sessionssessionID := fmt.Sprintf("conv-%s", conversationID)
// Time-based sessionssessionID := fmt.Sprintf("user-%s-%s", userID, time.Now().Format("2006-01-02"))
// Feature-based sessionssessionID := fmt.Sprintf("support-%s", ticketID)Context Window Management
Section titled “Context Window Management”// Load existing statemessages, err := store.Load(ctx, sessionID)if err != nil { log.Fatal(err)}
// Keep only recent messages (sliding window)maxMessages := 10if len(messages) > maxMessages { messages = messages[len(messages)-maxMessages:]}
// Save trimmed stateerr = store.Save(ctx, sessionID, messages)Token Limit Management
Section titled “Token Limit Management”import "github.com/AltairaLabs/PromptKit/runtime/prompt"
// Load statemessages, _ := store.Load(ctx, sessionID)
// Truncate by tokensmaxTokens := 4000truncated := prompt.TruncateMessages(messages, maxTokens)
// Save truncated statestore.Save(ctx, sessionID, truncated)State Sharing
Section titled “State Sharing”// Copy state between sessionssourceSession := "user-123-original"targetSession := "user-123-new-topic"
messages, err := store.Load(ctx, sourceSession)if err != nil { log.Fatal(err)}
// Save to new sessionerr = store.Save(ctx, targetSession, messages)Error Handling
Section titled “Error Handling”Connection Errors
Section titled “Connection Errors”messages, err := store.Load(ctx, sessionID)if err != nil { log.Printf("Failed to load state: %v", err) // Start with empty state messages = []types.Message{}}Save Failures
Section titled “Save Failures”err := store.Save(ctx, sessionID, messages)if err != nil { log.Printf("Warning: Failed to save state: %v", err) // Continue execution, state may be lost}Retry Logic
Section titled “Retry Logic”func saveWithRetry(store statestore.StateStore, sessionID string, messages []types.Message, maxRetries int) error { ctx := context.Background()
for i := 0; i < maxRetries; i++ { err := store.Save(ctx, sessionID, messages) if err == nil { return nil }
log.Printf("Save failed (attempt %d/%d): %v", i+1, maxRetries, err) time.Sleep(time.Second * time.Duration(i+1)) }
return fmt.Errorf("failed after %d retries", maxRetries)}Troubleshooting
Section titled “Troubleshooting”Issue: State Not Persisting
Section titled “Issue: State Not Persisting”Problem: Messages not saved between requests.
Solutions:
-
Verify state middleware is registered:
pipe := pipeline.NewPipeline(middleware.StateMiddleware(store), // Must be firstmiddleware.ProviderMiddleware(provider, nil, nil, config),) -
Check session ID consistency:
// Use same session ID for both callssessionID := "user-123"pipe.ExecuteWithContext(ctx, sessionID, "user", "First message")pipe.ExecuteWithContext(ctx, sessionID, "user", "Second message") -
Verify store connection:
// Test save/loadtestMessages := []types.Messagestore.Save(ctx, "test-session", testMessages)loaded, err := store.Load(ctx, "test-session")if err != nil || len(loaded) == 0 {log.Fatal("Store not working")}
Issue: Redis Connection Failed
Section titled “Issue: Redis Connection Failed”Problem: Cannot connect to Redis.
Solutions:
-
Check Redis is running:
Terminal window redis-cli ping# Should return: PONG -
Verify connection details:
store, err := statestore.NewRedisStateStore("localhost:6379", "", 0)if err != nil {log.Printf("Connection failed: %v", err)// Check host, port, password} -
Test network connectivity:
Terminal window telnet localhost 6379
Issue: Memory Growth
Section titled “Issue: Memory Growth”Problem: In-memory store consuming too much memory.
Solutions:
- Switch to Redis for production
- Implement periodic cleanup
- Limit messages per session:
messages, _ := store.Load(ctx, sessionID)if len(messages) > 50 {messages = messages[len(messages)-50:]store.Save(ctx, sessionID, messages)}
Best Practices
Section titled “Best Practices”-
Use Redis for production:
// Productionstore, _ := statestore.NewRedisStateStore(os.Getenv("REDIS_URL"), "", 0)// Developmentstore := statestore.NewInMemoryStateStore() -
Always close store:
defer store.Close() -
Use descriptive session IDs:
// GoodsessionID := fmt.Sprintf("user-%s-support", userID)// BadsessionID := "session1" -
Implement session cleanup:
// Set Redis TTLconfig set maxmemory-policy volatile-lru// Or periodic cleanupgo cleanupOldSessions(store, 24*time.Hour) -
Handle state errors gracefully:
messages, err := store.Load(ctx, sessionID)if err != nil {log.Printf("State load failed: %v", err)messages = []types.Message{} // Start fresh} -
Manage context window:
// Trim old messagesmaxMessages := 20if len(messages) > maxMessages {messages = messages[len(messages)-maxMessages:]}
Next Steps
Section titled “Next Steps”- Handle Errors - Error management
- Monitor Costs - Track usage
- Configure Pipeline - Complete setup
See Also
Section titled “See Also”- StateStore Reference - Complete API
- Pipeline Reference - Middleware order