Skip to content

Manage State

Persist conversation state across pipeline executions.

Store and retrieve conversation state for continuity.

import "github.com/AltairaLabs/PromptKit/runtime/statestore"
// In-memory store (dev/testing)
store := statestore.NewMemoryStore()
// Redis store (production)
store := statestore.NewRedisStore(redisClient)
ctx := context.Background()
sessionID := "user-123-conversation"
// First message - state auto-saved
result, err := pipe.ExecuteWithContext(ctx, sessionID, "user", "Hi, I'm Alice")
// Subsequent messages - state auto-loaded
result2, err := pipe.ExecuteWithContext(ctx, sessionID, "user", "What's my name?")
// Response: "Your name is Alice"
store := statestore.NewMemoryStore()

Use Cases:

  • Development
  • Testing
  • Single-instance applications
  • Non-persistent sessions
// Save state manually
state := &statestore.ConversationState{
ID: "session-1",
Messages: []types.Message{
{Role: "user", Content: "Hello"},
{Role: "assistant", Content: "Hi there!"},
},
}
err := store.Save(ctx, state)
if err != nil {
log.Fatal(err)
}
// Load state
loaded, err := store.Load(ctx, "session-1")
if err != nil {
log.Fatal(err)
}
for _, msg := range loaded.Messages {
log.Printf("%s: %s\n", msg.Role, msg.Content)
}
// Fork creates a copy of the session state under a new session ID
err := store.Fork(ctx, sourceSessionID, newSessionID)
if err != nil {
log.Fatal(err)
}
import "github.com/redis/go-redis/v9"
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
store := statestore.NewRedisStore(redisClient)
redisClient := redis.NewClient(&redis.Options{
Addr: os.Getenv("REDIS_HOST"),
Password: os.Getenv("REDIS_PASSWORD"),
DB: 0,
})
store := statestore.NewRedisStore(redisClient,
statestore.WithTTL(24*time.Hour),
statestore.WithPrefix("myapp:"),
)
// Redis client auto-manages connection pool
// Default settings work for most cases
// For high load, tune Redis server config
// Redis keys auto-expire based on server TTL settings
// Set global TTL in Redis config:
// config set maxmemory-policy volatile-lru
// Or use EXPIRE in custom implementation:
redisClient.Expire(ctx, key, 24*time.Hour)
package main
import (
"context"
"log"
"github.com/redis/go-redis/v9"
"github.com/AltairaLabs/PromptKit/runtime/pipeline"
"github.com/AltairaLabs/PromptKit/runtime/providers/openai"
"github.com/AltairaLabs/PromptKit/runtime/statestore"
)
func main() {
// Create Redis store
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
store := statestore.NewRedisStore(redisClient)
// Create provider
provider := openai.NewProvider(
"openai",
"gpt-4o-mini",
"",
providers.ProviderDefaults{Temperature: 0.7, MaxTokens: 2000},
false,
)
defer provider.Close()
ctx := context.Background()
// User 1 conversation - save state
state1 := &statestore.ConversationState{
ID: "user-alice",
Messages: []types.Message{{Role: "user", Content: "My favorite color is blue"}},
}
store.Save(ctx, state1)
// User 2 conversation - separate state
state2 := &statestore.ConversationState{
ID: "user-bob",
Messages: []types.Message{{Role: "user", Content: "I love pizza"}},
}
store.Save(ctx, state2)
// Load each user's state independently
loaded1, _ := store.Load(ctx, "user-alice")
log.Printf("User 1 messages: %d\n", len(loaded1.Messages))
loaded2, _ := store.Load(ctx, "user-bob")
log.Printf("User 2 messages: %d\n", len(loaded2.Messages))
}
package main
import (
"context"
"log"
"time"
"github.com/AltairaLabs/PromptKit/runtime/statestore"
)
func cleanupOldSessions(store statestore.Store) {
// Redis TTL handles expiration automatically via WithTTL option
// For manual cleanup, load and re-save trimmed state
ctx := context.Background()
conversationIDs := []string{"old-session-1", "old-session-2"}
for _, id := range conversationIDs {
state, err := store.Load(ctx, id)
if err != nil {
log.Printf("Failed to load conversation %s: %v", id, err)
continue
}
// Trim old messages
if len(state.Messages) > 20 {
state.Messages = state.Messages[len(state.Messages)-20:]
if err := store.Save(ctx, state); err != nil {
log.Printf("Failed to save conversation %s: %v", id, err)
}
}
}
}
// User-based sessions
sessionID := fmt.Sprintf("user-%s", userID)
// Conversation-based sessions
sessionID := fmt.Sprintf("conv-%s", conversationID)
// Time-based sessions
sessionID := fmt.Sprintf("user-%s-%s", userID, time.Now().Format("2006-01-02"))
// Feature-based sessions
sessionID := fmt.Sprintf("support-%s", ticketID)
// Load existing state
state, err := store.Load(ctx, conversationID)
if err != nil {
log.Fatal(err)
}
// Keep only recent messages (sliding window)
maxMessages := 10
if len(state.Messages) > maxMessages {
state.Messages = state.Messages[len(state.Messages)-maxMessages:]
}
// Save trimmed state
err = store.Save(ctx, state)
// Load state
state, _ := store.Load(ctx, conversationID)
// Estimate and trim by token count
// Rough estimate: 4 chars per token
totalChars := 0
for _, msg := range state.Messages {
totalChars += len(msg.Content)
}
estimatedTokens := totalChars / 4
// Trim if too many tokens
maxTokens := 4000
for estimatedTokens > maxTokens && len(state.Messages) > 1 {
state.Messages = state.Messages[1:]
totalChars = 0
for _, msg := range state.Messages {
totalChars += len(msg.Content)
}
estimatedTokens = totalChars / 4
}
// Save trimmed state
store.Save(ctx, state)
// Fork state to a new session
sourceSession := "user-123-original"
targetSession := "user-123-new-topic"
err := store.Fork(ctx, sourceSession, targetSession)
if err != nil {
log.Fatal(err)
}
state, err := store.Load(ctx, conversationID)
if err != nil {
log.Printf("Failed to load state: %v", err)
// Start with empty state
state = &statestore.ConversationState{ID: conversationID}
}
err := store.Save(ctx, state)
if err != nil {
log.Printf("Warning: Failed to save state: %v", err)
// Continue execution, state may be lost
}
func saveWithRetry(store statestore.Store, state *statestore.ConversationState, maxRetries int) error {
ctx := context.Background()
for i := 0; i < maxRetries; i++ {
err := store.Save(ctx, state)
if err == nil {
return nil
}
log.Printf("Save failed (attempt %d/%d): %v", i+1, maxRetries, err)
time.Sleep(time.Second * time.Duration(i+1))
}
return fmt.Errorf("failed after %d retries", maxRetries)
}

Problem: Messages not saved between requests.

Solutions:

  1. Verify store is properly initialized:

    store := statestore.NewMemoryStore()
    // or
    store := statestore.NewRedisStore(redisClient)
  2. Check session ID consistency:

    // Use same session ID for both calls
    sessionID := "user-123"
    pipe.ExecuteWithContext(ctx, sessionID, "user", "First message")
    pipe.ExecuteWithContext(ctx, sessionID, "user", "Second message")
  3. Verify store connection:

    // Test save/load
    testState := &statestore.ConversationState{ID: "test-session"}
    store.Save(ctx, testState)
    loaded, err := store.Load(ctx, "test-session")
    if err != nil {
    log.Fatal("Store not working")
    }

Problem: Cannot connect to Redis.

Solutions:

  1. Check Redis is running:

    Terminal window
    redis-cli ping
    # Should return: PONG
  2. Verify connection details:

    redisClient := redis.NewClient(&redis.Options{
    Addr: "localhost:6379",
    })
    store := statestore.NewRedisStore(redisClient)
  3. Test network connectivity:

    Terminal window
    telnet localhost 6379

Problem: In-memory store consuming too much memory.

Solutions:

  1. Switch to Redis for production
  2. Implement periodic cleanup
  3. Limit messages per conversation:
    state, _ := store.Load(ctx, conversationID)
    if len(state.Messages) > 50 {
    state.Messages = state.Messages[len(state.Messages)-50:]
    store.Save(ctx, state)
    }
  1. Use Redis for production:

    // Production
    redisClient := redis.NewClient(&redis.Options{
    Addr: os.Getenv("REDIS_URL"),
    })
    store := statestore.NewRedisStore(redisClient)
    // Development
    store := statestore.NewMemoryStore()
  2. Use TTL for Redis store:

    store := statestore.NewRedisStore(redisClient, statestore.WithTTL(24*time.Hour))
  3. Use descriptive session IDs:

    // Good
    sessionID := fmt.Sprintf("user-%s-support", userID)
    // Bad
    sessionID := "session1"
  4. Implement session cleanup:

    // Set Redis TTL
    config set maxmemory-policy volatile-lru
    // Or periodic cleanup
    go cleanupOldSessions(store, 24*time.Hour)
  5. Handle state errors gracefully:

    state, err := store.Load(ctx, conversationID)
    if err != nil {
    log.Printf("State load failed: %v", err)
    state = &statestore.ConversationState{ID: conversationID} // Start fresh
    }
  6. Manage context window:

    // Trim old messages
    maxMessages := 20
    if len(state.Messages) > maxMessages {
    state.Messages = state.Messages[len(state.Messages)-maxMessages:]
    }