State Store
Conversation persistence and state management.
Overview
Section titled “Overview”State stores provide persistent storage for conversation history, enabling:
- Session continuity: Resume conversations across restarts
- Multi-turn conversations: Maintain context across requests
- State sharing: Share conversation state across instances
- Debugging: Inspect conversation history
Supported Backends
Section titled “Supported Backends”- Redis: Production-ready distributed state
- In-memory: Development and testing
- Custom: Implement
Storeinterface
Core Interface
Section titled “Core Interface”type Store interface { Load(ctx context.Context, id string) (*ConversationState, error) Save(ctx context.Context, state *ConversationState) error Fork(ctx context.Context, sourceID, newID string) error}ConversationState
Section titled “ConversationState”type ConversationState struct { ID string UserID string Messages []types.Message SystemPrompt string Summaries []Summary TokenCount int LastAccessedAt time.Time Metadata map[string]interface{}}Optional Interfaces
Section titled “Optional Interfaces”The core Store interface covers basic load/save operations. For better performance with long conversations, stores can implement additional opt-in interfaces. Pipeline stages type-assert for these interfaces and fall back to Load/Save when they are not available.
MessageReader
Section titled “MessageReader”Enables loading only the tail of the conversation without deserializing the full state:
type MessageReader interface { LoadRecentMessages(ctx context.Context, id string, n int) ([]types.Message, error) MessageCount(ctx context.Context, id string) (int, error)}Used by ContextAssemblyStage to load only the hot window of recent messages.
MessageAppender
Section titled “MessageAppender”Enables appending new messages without the full load+replace+save cycle:
type MessageAppender interface { AppendMessages(ctx context.Context, id string, messages []types.Message) error}Used by IncrementalSaveStage to append only new messages after each turn.
MessageLog
Section titled “MessageLog”Enables per-round write-through persistence during tool loops. Messages are appended after each LLM round, ensuring durability even if the pipeline is interrupted mid-loop:
type MessageLog interface { LogAppend(ctx context.Context, id string, startSeq int, messages []types.Message) (int, error) LogLoad(ctx context.Context, id string, recent int) ([]types.Message, error) LogLen(ctx context.Context, id string) (int, error)}LogAppend uses sequence-based idempotent append: if startSeq is behind the current length, already-persisted messages are skipped. Used via sdk.WithMessageLog().
SummaryAccessor
Section titled “SummaryAccessor”Enables reading and writing summaries independently of the full conversation state:
type SummaryAccessor interface { LoadSummaries(ctx context.Context, id string) ([]Summary, error) SaveSummary(ctx context.Context, id string, summary Summary) error}Used by auto-summarization to store and retrieve compressed conversation history.
Summary
Section titled “Summary”The Summary type represents a compressed version of conversation turns:
type Summary struct { StartTurn int // First turn included in this summary EndTurn int // Last turn included in this summary Content string // Summarized content TokenCount int // Token count of the summary CreatedAt time.Time // When this summary was created}Summaries are stored in ConversationState.Summaries and prepended to the context as system messages during context assembly.
Redis State Store
Section titled “Redis State Store”Constructor
Section titled “Constructor”func NewRedisStore(client *redis.Client, opts ...RedisOption) *RedisStoreOptions:
WithTTL(ttl time.Duration)- Set TTL for conversation states (default: 24 hours)WithPrefix(prefix string)- Set key prefix (default: “promptkit”)
Example:
import ( "github.com/redis/go-redis/v9" "github.com/AltairaLabs/PromptKit/runtime/statestore")
// Create Redis clientredisClient := redis.NewClient(&redis.Options{ Addr: "localhost:6379", DB: 0,})
// Create state storestore := statestore.NewRedisStore(redisClient)RedisStore implements all optional interfaces (MessageReader, MessageAppender, SummaryAccessor) using Redis Lists for O(1) append and tail reads, making it well-suited for long conversations.
Methods
Section titled “Methods”Save:
state := &statestore.ConversationState{ ID: "session-123", Messages: []types.Message{ {Role: "user", Content: "Hello"}, {Role: "assistant", Content: "Hi there!"}, },}
err := store.Save(ctx, state)if err != nil { log.Fatal(err)}Load:
state, err := store.Load(ctx, "session-123")if err != nil { log.Fatal(err)}
for _, msg := range state.Messages { fmt.Printf("%s: %s\n", msg.Role, msg.Content)}Fork:
err := store.Fork(ctx, "session-123", "session-456")if err != nil { log.Fatal(err)}In-Memory State Store
Section titled “In-Memory State Store”For development and testing. MemoryStore implements all optional interfaces (MessageReader, MessageAppender, SummaryAccessor).
store := statestore.NewMemoryStore()
// Same interface as Redis storestate := &statestore.ConversationState{ ID: "session-1", Messages: messages,}store.Save(ctx, state)loaded, _ := store.Load(ctx, "session-1")Usage with Pipeline
Section titled “Usage with Pipeline”With Pipeline
Section titled “With Pipeline”import "github.com/AltairaLabs/PromptKit/runtime/statestore"
// Create state storestore := statestore.NewRedisStore(redisClient)
// Load state, execute, and save statestate, _ := store.Load(ctx, sessionID)// ... execute pipeline with state.Messages ...state.Messages = append(state.Messages, newMessages...)store.Save(ctx, state)Manual State Management
Section titled “Manual State Management”// Load existing conversationstate, _ := store.Load(ctx, sessionID)
// Add new messagestate.Messages = append(state.Messages, types.Message{ Role: "user", Content: "New message",})
// Save updated statestore.Save(ctx, state)Configuration
Section titled “Configuration”Redis Configuration
Section titled “Redis Configuration”redisClient := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", // No password DB: 0, // Default DB DialTimeout: 5 * time.Second, ReadTimeout: 3 * time.Second, WriteTimeout: 3 * time.Second, PoolSize: 10, MinIdleConns: 5,})
store := statestore.NewRedisStore(redisClient)TTL Management
Section titled “TTL Management”// Set expiration on session keys via Redis client configuration// or use RedisStoreOption when creating the storeCustom State Store
Section titled “Custom State Store”Implementation
Section titled “Implementation”type CustomStateStore struct { backend Database}
func (s *CustomStateStore) Load( ctx context.Context, id string,) (*statestore.ConversationState, error) { data, err := s.backend.Get(ctx, id) if err != nil { return nil, err }
var state statestore.ConversationState err = json.Unmarshal(data, &state) return &state, err}
func (s *CustomStateStore) Save( ctx context.Context, state *statestore.ConversationState,) error { data, _ := json.Marshal(state) return s.backend.Set(ctx, state.ID, data)}
func (s *CustomStateStore) Fork( ctx context.Context, sourceID string, newID string,) error { state, err := s.Load(ctx, sourceID) if err != nil { return err } state.ID = newID return s.Save(ctx, state)}Custom stores can optionally implement MessageReader, MessageAppender, and SummaryAccessor for better performance with long conversations. For example, to support incremental saves:
func (s *CustomStateStore) AppendMessages( ctx context.Context, id string, messages []types.Message,) error { return s.backend.ListAppend(ctx, id+":messages", messages)}Pipeline stages will automatically detect and use these interfaces when available.
Best Practices
Section titled “Best Practices”1. Session Management
Section titled “1. Session Management”// Use meaningful session IDssessionID := fmt.Sprintf("user-%s-%d", userID, time.Now().Unix())
// Fork a session for branching conversationsstore.Fork(ctx, sessionID, newSessionID)2. Error Handling
Section titled “2. Error Handling”state, err := store.Load(ctx, sessionID)if err != nil { // Start new conversation on any load error state = &statestore.ConversationState{ ID: sessionID, Messages: []types.Message{}, }}3. Message Truncation
Section titled “3. Message Truncation”// Limit conversation history to prevent memory issuesstate, _ := store.Load(ctx, sessionID)maxMessages := 50if len(state.Messages) > maxMessages { state.Messages = state.Messages[len(state.Messages)-maxMessages:]}
store.Save(ctx, state)4. Concurrent Access
Section titled “4. Concurrent Access”// Use session locking for concurrent accesslock := acquireLock(sessionID)defer lock.Release()
state, _ := store.Load(ctx, sessionID)// ... modify state.Messages ...store.Save(ctx, state)Performance Considerations
Section titled “Performance Considerations”Latency
Section titled “Latency”- Redis: 1-5ms per operation (network dependent)
- In-memory: <1ms per operation
Throughput
Section titled “Throughput”- Redis: 10,000+ ops/sec (single instance)
- In-memory: 100,000+ ops/sec
Storage
Section titled “Storage”- Average conversation: 1-10 KB
- 1M conversations: 1-10 GB storage
- Implement TTL to manage storage growth
See Also
Section titled “See Also”- Pipeline Reference - Using state stores in pipelines
- State Store How-To - State management patterns
- State Store Tutorial - Building stateful apps